mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-22 05:09:05 +00:00
feat: add Celery background tasks for ICS sync
This commit is contained in:
@@ -103,6 +103,8 @@ class MeetingController:
|
||||
end_date: datetime,
|
||||
user_id: str,
|
||||
room: Room,
|
||||
calendar_event_id: str | None = None,
|
||||
calendar_metadata: dict[str, Any] | None = None,
|
||||
):
|
||||
"""
|
||||
Create a new meeting
|
||||
@@ -120,6 +122,8 @@ class MeetingController:
|
||||
room_mode=room.room_mode,
|
||||
recording_type=room.recording_type,
|
||||
recording_trigger=room.recording_trigger,
|
||||
calendar_event_id=calendar_event_id,
|
||||
calendar_metadata=calendar_metadata,
|
||||
)
|
||||
query = meetings.insert().values(**meeting.model_dump())
|
||||
await get_database().execute(query)
|
||||
@@ -195,6 +199,15 @@ class MeetingController:
|
||||
|
||||
return meeting
|
||||
|
||||
async def get_by_calendar_event(self, calendar_event_id: str) -> Meeting | None:
|
||||
query = meetings.select().where(
|
||||
meetings.c.calendar_event_id == calendar_event_id
|
||||
)
|
||||
result = await get_database().fetch_one(query)
|
||||
if not result:
|
||||
return None
|
||||
return Meeting(**result)
|
||||
|
||||
async def update_meeting(self, meeting_id: str, **kwargs):
|
||||
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
|
||||
await get_database().execute(query)
|
||||
|
||||
@@ -19,6 +19,7 @@ else:
|
||||
"reflector.pipelines.main_live_pipeline",
|
||||
"reflector.worker.healthcheck",
|
||||
"reflector.worker.process",
|
||||
"reflector.worker.ics_sync",
|
||||
]
|
||||
)
|
||||
|
||||
@@ -36,6 +37,14 @@ else:
|
||||
"task": "reflector.worker.process.reprocess_failed_recordings",
|
||||
"schedule": crontab(hour=5, minute=0), # Midnight EST
|
||||
},
|
||||
"sync_all_ics_calendars": {
|
||||
"task": "reflector.worker.ics_sync.sync_all_ics_calendars",
|
||||
"schedule": 60.0, # Run every minute to check which rooms need sync
|
||||
},
|
||||
"pre_create_upcoming_meetings": {
|
||||
"task": "reflector.worker.ics_sync.pre_create_upcoming_meetings",
|
||||
"schedule": 30.0, # Run every 30 seconds to pre-create meetings
|
||||
},
|
||||
}
|
||||
|
||||
if settings.HEALTHCHECK_URL:
|
||||
|
||||
209
server/reflector/worker/ics_sync.py
Normal file
209
server/reflector/worker/ics_sync.py
Normal file
@@ -0,0 +1,209 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import structlog
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
from reflector.db import get_database
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.rooms import rooms, rooms_controller
|
||||
from reflector.services.ics_sync import ics_sync_service
|
||||
from reflector.whereby import create_meeting, upload_logo
|
||||
|
||||
logger = structlog.wrap_logger(get_task_logger(__name__))
|
||||
|
||||
|
||||
@shared_task
|
||||
def sync_room_ics(room_id: str):
|
||||
asynctask(_sync_room_ics_async(room_id))
|
||||
|
||||
|
||||
async def _sync_room_ics_async(room_id: str):
|
||||
try:
|
||||
room = await rooms_controller.get_by_id(room_id)
|
||||
if not room:
|
||||
logger.warning("Room not found for ICS sync", room_id=room_id)
|
||||
return
|
||||
|
||||
if not room.ics_enabled or not room.ics_url:
|
||||
logger.debug("ICS not enabled for room", room_id=room_id)
|
||||
return
|
||||
|
||||
logger.info("Starting ICS sync for room", room_id=room_id, room_name=room.name)
|
||||
result = await ics_sync_service.sync_room_calendar(room)
|
||||
|
||||
if result["status"] == "success":
|
||||
logger.info(
|
||||
"ICS sync completed successfully",
|
||||
room_id=room_id,
|
||||
events_found=result.get("events_found", 0),
|
||||
events_created=result.get("events_created", 0),
|
||||
events_updated=result.get("events_updated", 0),
|
||||
events_deleted=result.get("events_deleted", 0),
|
||||
)
|
||||
elif result["status"] == "unchanged":
|
||||
logger.debug("ICS content unchanged", room_id=room_id)
|
||||
elif result["status"] == "error":
|
||||
logger.error("ICS sync failed", room_id=room_id, error=result.get("error"))
|
||||
else:
|
||||
logger.debug(
|
||||
"ICS sync skipped", room_id=room_id, reason=result.get("reason")
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Unexpected error during ICS sync", room_id=room_id, error=str(e))
|
||||
|
||||
|
||||
@shared_task
|
||||
def sync_all_ics_calendars():
|
||||
asynctask(_sync_all_ics_calendars_async())
|
||||
|
||||
|
||||
async def _sync_all_ics_calendars_async():
|
||||
try:
|
||||
logger.info("Starting sync for all ICS-enabled rooms")
|
||||
|
||||
# Get ALL rooms - not filtered by is_shared
|
||||
query = rooms.select().where(
|
||||
rooms.c.ics_enabled == True, rooms.c.ics_url != None
|
||||
)
|
||||
all_rooms = await get_database().fetch_all(query)
|
||||
ics_enabled_rooms = list(all_rooms)
|
||||
|
||||
logger.info(f"Found {len(ics_enabled_rooms)} rooms with ICS enabled")
|
||||
|
||||
for room_data in ics_enabled_rooms:
|
||||
room_id = room_data["id"]
|
||||
room = await rooms_controller.get_by_id(room_id)
|
||||
|
||||
if not room:
|
||||
continue
|
||||
|
||||
if not _should_sync(room):
|
||||
logger.debug("Skipping room, not time to sync yet", room_id=room_id)
|
||||
continue
|
||||
|
||||
sync_room_ics.delay(room_id)
|
||||
|
||||
logger.info("Queued sync tasks for all eligible rooms")
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error in sync_all_ics_calendars", error=str(e))
|
||||
|
||||
|
||||
def _should_sync(room) -> bool:
|
||||
if not room.ics_last_sync:
|
||||
return True
|
||||
|
||||
time_since_sync = datetime.now(timezone.utc) - room.ics_last_sync
|
||||
return time_since_sync.total_seconds() >= room.ics_fetch_interval
|
||||
|
||||
|
||||
@shared_task
|
||||
def pre_create_upcoming_meetings():
|
||||
asynctask(_pre_create_upcoming_meetings_async())
|
||||
|
||||
|
||||
async def _pre_create_upcoming_meetings_async():
|
||||
try:
|
||||
logger.info("Starting pre-creation of upcoming meetings")
|
||||
|
||||
from reflector.db.calendar_events import calendar_events_controller
|
||||
|
||||
# Get ALL rooms with ICS enabled
|
||||
query = rooms.select().where(
|
||||
rooms.c.ics_enabled == True, rooms.c.ics_url != None
|
||||
)
|
||||
all_rooms = await get_database().fetch_all(query)
|
||||
now = datetime.now(timezone.utc)
|
||||
pre_create_window = now + timedelta(minutes=1)
|
||||
|
||||
for room_data in all_rooms:
|
||||
room_id = room_data["id"]
|
||||
room = await rooms_controller.get_by_id(room_id)
|
||||
|
||||
if not room:
|
||||
continue
|
||||
|
||||
events = await calendar_events_controller.get_upcoming(
|
||||
room_id, minutes_ahead=2
|
||||
)
|
||||
|
||||
for event in events:
|
||||
if event.start_time <= pre_create_window:
|
||||
existing_meeting = await meetings_controller.get_by_calendar_event(
|
||||
event.id
|
||||
)
|
||||
|
||||
if not existing_meeting:
|
||||
logger.info(
|
||||
"Pre-creating meeting for calendar event",
|
||||
room_id=room_id,
|
||||
event_id=event.id,
|
||||
event_title=event.title,
|
||||
)
|
||||
|
||||
try:
|
||||
end_date = event.end_time or (
|
||||
event.start_time + timedelta(hours=1)
|
||||
)
|
||||
|
||||
whereby_meeting = await create_meeting(
|
||||
event.title or "Scheduled Meeting",
|
||||
end_date=end_date,
|
||||
room=room,
|
||||
)
|
||||
await upload_logo(
|
||||
whereby_meeting["roomName"], "./images/logo.png"
|
||||
)
|
||||
|
||||
meeting = await meetings_controller.create(
|
||||
id=whereby_meeting["meetingId"],
|
||||
room_name=whereby_meeting["roomName"],
|
||||
room_url=whereby_meeting["roomUrl"],
|
||||
host_room_url=whereby_meeting["hostRoomUrl"],
|
||||
start_date=datetime.fromisoformat(
|
||||
whereby_meeting["startDate"]
|
||||
),
|
||||
end_date=datetime.fromisoformat(
|
||||
whereby_meeting["endDate"]
|
||||
),
|
||||
user_id=room.user_id,
|
||||
room=room,
|
||||
calendar_event_id=event.id,
|
||||
calendar_metadata={
|
||||
"title": event.title,
|
||||
"description": event.description,
|
||||
"attendees": event.attendees,
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Meeting pre-created successfully",
|
||||
meeting_id=meeting.id,
|
||||
event_id=event.id,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Failed to pre-create meeting",
|
||||
room_id=room_id,
|
||||
event_id=event.id,
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
logger.info("Completed pre-creation check for upcoming meetings")
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error in pre_create_upcoming_meetings", error=str(e))
|
||||
|
||||
|
||||
def asynctask(coro):
|
||||
import asyncio
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
return loop.run_until_complete(coro)
|
||||
finally:
|
||||
loop.close()
|
||||
230
server/tests/test_ics_background_tasks.py
Normal file
230
server/tests/test_ics_background_tasks.py
Normal file
@@ -0,0 +1,230 @@
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
from icalendar import Calendar, Event
|
||||
|
||||
from reflector.db.calendar_events import calendar_events_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.worker.ics_sync import (
|
||||
_should_sync,
|
||||
_sync_all_ics_calendars_async,
|
||||
_sync_room_ics_async,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sync_room_ics_task():
|
||||
room = await rooms_controller.add(
|
||||
name="task-test-room",
|
||||
user_id="test-user",
|
||||
zulip_auto_post=False,
|
||||
zulip_stream="",
|
||||
zulip_topic="",
|
||||
is_locked=False,
|
||||
room_mode="normal",
|
||||
recording_type="cloud",
|
||||
recording_trigger="automatic-2nd-participant",
|
||||
is_shared=False,
|
||||
ics_url="https://calendar.example.com/task.ics",
|
||||
ics_enabled=True,
|
||||
)
|
||||
|
||||
cal = Calendar()
|
||||
event = Event()
|
||||
event.add("uid", "task-event-1")
|
||||
event.add("summary", "Task Test Meeting")
|
||||
from reflector.settings import settings
|
||||
|
||||
event.add("location", f"{settings.BASE_URL}/room/{room.name}")
|
||||
now = datetime.now(timezone.utc)
|
||||
event.add("dtstart", now + timedelta(hours=1))
|
||||
event.add("dtend", now + timedelta(hours=2))
|
||||
cal.add_component(event)
|
||||
ics_content = cal.to_ical().decode("utf-8")
|
||||
|
||||
with patch(
|
||||
"reflector.services.ics_sync.ICSFetchService.fetch_ics", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.return_value = ics_content
|
||||
|
||||
await _sync_room_ics_async(room.id)
|
||||
|
||||
events = await calendar_events_controller.get_by_room(room.id)
|
||||
assert len(events) == 1
|
||||
assert events[0].ics_uid == "task-event-1"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sync_room_ics_disabled():
|
||||
room = await rooms_controller.add(
|
||||
name="disabled-room",
|
||||
user_id="test-user",
|
||||
zulip_auto_post=False,
|
||||
zulip_stream="",
|
||||
zulip_topic="",
|
||||
is_locked=False,
|
||||
room_mode="normal",
|
||||
recording_type="cloud",
|
||||
recording_trigger="automatic-2nd-participant",
|
||||
is_shared=False,
|
||||
ics_enabled=False,
|
||||
)
|
||||
|
||||
await _sync_room_ics_async(room.id)
|
||||
|
||||
events = await calendar_events_controller.get_by_room(room.id)
|
||||
assert len(events) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sync_all_ics_calendars():
|
||||
room1 = await rooms_controller.add(
|
||||
name="sync-all-1",
|
||||
user_id="test-user",
|
||||
zulip_auto_post=False,
|
||||
zulip_stream="",
|
||||
zulip_topic="",
|
||||
is_locked=False,
|
||||
room_mode="normal",
|
||||
recording_type="cloud",
|
||||
recording_trigger="automatic-2nd-participant",
|
||||
is_shared=False,
|
||||
ics_url="https://calendar.example.com/1.ics",
|
||||
ics_enabled=True,
|
||||
)
|
||||
|
||||
room2 = await rooms_controller.add(
|
||||
name="sync-all-2",
|
||||
user_id="test-user",
|
||||
zulip_auto_post=False,
|
||||
zulip_stream="",
|
||||
zulip_topic="",
|
||||
is_locked=False,
|
||||
room_mode="normal",
|
||||
recording_type="cloud",
|
||||
recording_trigger="automatic-2nd-participant",
|
||||
is_shared=False,
|
||||
ics_url="https://calendar.example.com/2.ics",
|
||||
ics_enabled=True,
|
||||
)
|
||||
|
||||
room3 = await rooms_controller.add(
|
||||
name="sync-all-3",
|
||||
user_id="test-user",
|
||||
zulip_auto_post=False,
|
||||
zulip_stream="",
|
||||
zulip_topic="",
|
||||
is_locked=False,
|
||||
room_mode="normal",
|
||||
recording_type="cloud",
|
||||
recording_trigger="automatic-2nd-participant",
|
||||
is_shared=False,
|
||||
ics_enabled=False,
|
||||
)
|
||||
|
||||
with patch("reflector.worker.ics_sync.sync_room_ics.delay") as mock_delay:
|
||||
await _sync_all_ics_calendars_async()
|
||||
|
||||
assert mock_delay.call_count == 2
|
||||
called_room_ids = [call.args[0] for call in mock_delay.call_args_list]
|
||||
assert room1.id in called_room_ids
|
||||
assert room2.id in called_room_ids
|
||||
assert room3.id not in called_room_ids
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_should_sync_logic():
|
||||
room = MagicMock()
|
||||
|
||||
room.ics_last_sync = None
|
||||
assert _should_sync(room) is True
|
||||
|
||||
room.ics_last_sync = datetime.now(timezone.utc) - timedelta(seconds=100)
|
||||
room.ics_fetch_interval = 300
|
||||
assert _should_sync(room) is False
|
||||
|
||||
room.ics_last_sync = datetime.now(timezone.utc) - timedelta(seconds=400)
|
||||
room.ics_fetch_interval = 300
|
||||
assert _should_sync(room) is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sync_respects_fetch_interval():
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
room1 = await rooms_controller.add(
|
||||
name="interval-test-1",
|
||||
user_id="test-user",
|
||||
zulip_auto_post=False,
|
||||
zulip_stream="",
|
||||
zulip_topic="",
|
||||
is_locked=False,
|
||||
room_mode="normal",
|
||||
recording_type="cloud",
|
||||
recording_trigger="automatic-2nd-participant",
|
||||
is_shared=False,
|
||||
ics_url="https://calendar.example.com/interval.ics",
|
||||
ics_enabled=True,
|
||||
ics_fetch_interval=300,
|
||||
)
|
||||
|
||||
await rooms_controller.update(
|
||||
room1,
|
||||
{"ics_last_sync": now - timedelta(seconds=100)},
|
||||
)
|
||||
|
||||
room2 = await rooms_controller.add(
|
||||
name="interval-test-2",
|
||||
user_id="test-user",
|
||||
zulip_auto_post=False,
|
||||
zulip_stream="",
|
||||
zulip_topic="",
|
||||
is_locked=False,
|
||||
room_mode="normal",
|
||||
recording_type="cloud",
|
||||
recording_trigger="automatic-2nd-participant",
|
||||
is_shared=False,
|
||||
ics_url="https://calendar.example.com/interval2.ics",
|
||||
ics_enabled=True,
|
||||
ics_fetch_interval=60,
|
||||
)
|
||||
|
||||
await rooms_controller.update(
|
||||
room2,
|
||||
{"ics_last_sync": now - timedelta(seconds=100)},
|
||||
)
|
||||
|
||||
with patch("reflector.worker.ics_sync.sync_room_ics.delay") as mock_delay:
|
||||
await _sync_all_ics_calendars_async()
|
||||
|
||||
assert mock_delay.call_count == 1
|
||||
assert mock_delay.call_args[0][0] == room2.id
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_sync_handles_errors_gracefully():
|
||||
room = await rooms_controller.add(
|
||||
name="error-task-room",
|
||||
user_id="test-user",
|
||||
zulip_auto_post=False,
|
||||
zulip_stream="",
|
||||
zulip_topic="",
|
||||
is_locked=False,
|
||||
room_mode="normal",
|
||||
recording_type="cloud",
|
||||
recording_trigger="automatic-2nd-participant",
|
||||
is_shared=False,
|
||||
ics_url="https://calendar.example.com/error.ics",
|
||||
ics_enabled=True,
|
||||
)
|
||||
|
||||
with patch(
|
||||
"reflector.services.ics_sync.ICSFetchService.fetch_ics", new_callable=AsyncMock
|
||||
) as mock_fetch:
|
||||
mock_fetch.side_effect = Exception("Network error")
|
||||
|
||||
await _sync_room_ics_async(room.id)
|
||||
|
||||
events = await calendar_events_controller.get_by_room(room.id)
|
||||
assert len(events) == 0
|
||||
Reference in New Issue
Block a user