mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 12:49:06 +00:00
feat: add SyncStatus enum and refactor ICS sync to use rooms controller
- Add SyncStatus enum to replace string literals in ICS sync status - Replace direct SQL queries in worker with rooms_controller.get_ics_enabled() - Improve type safety and maintainability of ICS sync code - Enum values: SUCCESS, UNCHANGED, ERROR, SKIPPED maintain backward compatibility
This commit is contained in:
@@ -217,6 +217,16 @@ class RoomController:
|
|||||||
|
|
||||||
return room
|
return room
|
||||||
|
|
||||||
|
async def get_ics_enabled(self) -> list[Room]:
|
||||||
|
"""
|
||||||
|
Get all rooms with ICS enabled
|
||||||
|
"""
|
||||||
|
query = rooms.select().where(
|
||||||
|
rooms.c.ics_enabled == True, rooms.c.ics_url != None
|
||||||
|
)
|
||||||
|
results = await get_database().fetch_all(query)
|
||||||
|
return [Room(**result) for result in results]
|
||||||
|
|
||||||
async def remove_by_id(
|
async def remove_by_id(
|
||||||
self,
|
self,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import hashlib
|
import hashlib
|
||||||
from datetime import date, datetime, timedelta, timezone
|
from datetime import date, datetime, timedelta, timezone
|
||||||
|
from enum import Enum
|
||||||
from typing import TypedDict
|
from typing import TypedDict
|
||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
@@ -12,6 +13,13 @@ from reflector.db.rooms import Room, rooms_controller
|
|||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
|
|
||||||
|
|
||||||
|
class SyncStatus(str, Enum):
|
||||||
|
SUCCESS = "success"
|
||||||
|
UNCHANGED = "unchanged"
|
||||||
|
ERROR = "error"
|
||||||
|
SKIPPED = "skipped"
|
||||||
|
|
||||||
|
|
||||||
class AttendeeData(TypedDict, total=False):
|
class AttendeeData(TypedDict, total=False):
|
||||||
email: str | None
|
email: str | None
|
||||||
name: str | None
|
name: str | None
|
||||||
@@ -37,7 +45,7 @@ class SyncStats(TypedDict):
|
|||||||
|
|
||||||
|
|
||||||
class SyncResultBase(TypedDict):
|
class SyncResultBase(TypedDict):
|
||||||
status: str # "success", "unchanged", "error", "skipped"
|
status: SyncStatus
|
||||||
|
|
||||||
|
|
||||||
class SyncResult(SyncResultBase, total=False):
|
class SyncResult(SyncResultBase, total=False):
|
||||||
@@ -241,12 +249,12 @@ class ICSSyncService:
|
|||||||
|
|
||||||
async def sync_room_calendar(self, room: Room) -> SyncResult:
|
async def sync_room_calendar(self, room: Room) -> SyncResult:
|
||||||
if not room.ics_enabled or not room.ics_url:
|
if not room.ics_enabled or not room.ics_url:
|
||||||
return {"status": "skipped", "reason": "ICS not configured"}
|
return {"status": SyncStatus.SKIPPED, "reason": "ICS not configured"}
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Check if it's time to sync
|
# Check if it's time to sync
|
||||||
if not self._should_sync(room):
|
if not self._should_sync(room):
|
||||||
return {"status": "skipped", "reason": "Not time to sync yet"}
|
return {"status": SyncStatus.SKIPPED, "reason": "Not time to sync yet"}
|
||||||
|
|
||||||
# Fetch ICS file
|
# Fetch ICS file
|
||||||
ics_content = await self.fetch_service.fetch_ics(room.ics_url)
|
ics_content = await self.fetch_service.fetch_ics(room.ics_url)
|
||||||
@@ -262,7 +270,7 @@ class ICSSyncService:
|
|||||||
calendar, room.name, room_url
|
calendar, room.name, room_url
|
||||||
)
|
)
|
||||||
return {
|
return {
|
||||||
"status": "unchanged",
|
"status": SyncStatus.UNCHANGED,
|
||||||
"hash": content_hash,
|
"hash": content_hash,
|
||||||
"events_found": len(events),
|
"events_found": len(events),
|
||||||
"total_events": total_events,
|
"total_events": total_events,
|
||||||
@@ -296,7 +304,7 @@ class ICSSyncService:
|
|||||||
)
|
)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"status": "success",
|
"status": SyncStatus.SUCCESS,
|
||||||
"hash": content_hash,
|
"hash": content_hash,
|
||||||
"events_found": len(events),
|
"events_found": len(events),
|
||||||
"total_events": total_events,
|
"total_events": total_events,
|
||||||
@@ -305,7 +313,7 @@ class ICSSyncService:
|
|||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Failed to sync ICS for room {room.id}: {e}")
|
logger.error(f"Failed to sync ICS for room {room.id}: {e}")
|
||||||
return {"status": "error", "error": str(e)}
|
return {"status": SyncStatus.ERROR, "error": str(e)}
|
||||||
|
|
||||||
def _should_sync(self, room: Room) -> bool:
|
def _should_sync(self, room: Room) -> bool:
|
||||||
if not room.ics_last_sync:
|
if not room.ics_last_sync:
|
||||||
|
|||||||
@@ -5,11 +5,10 @@ from celery import shared_task
|
|||||||
from celery.utils.log import get_task_logger
|
from celery.utils.log import get_task_logger
|
||||||
|
|
||||||
from reflector.asynctask import asynctask
|
from reflector.asynctask import asynctask
|
||||||
from reflector.db import get_database
|
|
||||||
from reflector.db.calendar_events import calendar_events_controller
|
from reflector.db.calendar_events import calendar_events_controller
|
||||||
from reflector.db.meetings import meetings_controller
|
from reflector.db.meetings import meetings_controller
|
||||||
from reflector.db.rooms import rooms, rooms_controller
|
from reflector.db.rooms import rooms_controller
|
||||||
from reflector.services.ics_sync import ics_sync_service
|
from reflector.services.ics_sync import SyncStatus, ics_sync_service
|
||||||
from reflector.whereby import create_meeting, upload_logo
|
from reflector.whereby import create_meeting, upload_logo
|
||||||
|
|
||||||
logger = structlog.wrap_logger(get_task_logger(__name__))
|
logger = structlog.wrap_logger(get_task_logger(__name__))
|
||||||
@@ -31,7 +30,7 @@ async def sync_room_ics(room_id: str):
|
|||||||
logger.info("Starting ICS sync for room", room_id=room_id, room_name=room.name)
|
logger.info("Starting ICS sync for room", room_id=room_id, room_name=room.name)
|
||||||
result = await ics_sync_service.sync_room_calendar(room)
|
result = await ics_sync_service.sync_room_calendar(room)
|
||||||
|
|
||||||
if result["status"] == "success":
|
if result["status"] == SyncStatus.SUCCESS:
|
||||||
logger.info(
|
logger.info(
|
||||||
"ICS sync completed successfully",
|
"ICS sync completed successfully",
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
@@ -40,9 +39,9 @@ async def sync_room_ics(room_id: str):
|
|||||||
events_updated=result.get("events_updated", 0),
|
events_updated=result.get("events_updated", 0),
|
||||||
events_deleted=result.get("events_deleted", 0),
|
events_deleted=result.get("events_deleted", 0),
|
||||||
)
|
)
|
||||||
elif result["status"] == "unchanged":
|
elif result["status"] == SyncStatus.UNCHANGED:
|
||||||
logger.debug("ICS content unchanged", room_id=room_id)
|
logger.debug("ICS content unchanged", room_id=room_id)
|
||||||
elif result["status"] == "error":
|
elif result["status"] == SyncStatus.ERROR:
|
||||||
logger.error("ICS sync failed", room_id=room_id, error=result.get("error"))
|
logger.error("ICS sync failed", room_id=room_id, error=result.get("error"))
|
||||||
else:
|
else:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
@@ -59,27 +58,15 @@ async def sync_all_ics_calendars():
|
|||||||
try:
|
try:
|
||||||
logger.info("Starting sync for all ICS-enabled rooms")
|
logger.info("Starting sync for all ICS-enabled rooms")
|
||||||
|
|
||||||
# Get ALL rooms - not filtered by is_shared
|
ics_enabled_rooms = await rooms_controller.get_ics_enabled()
|
||||||
query = rooms.select().where(
|
|
||||||
rooms.c.ics_enabled == True, rooms.c.ics_url != None
|
|
||||||
)
|
|
||||||
all_rooms = await get_database().fetch_all(query)
|
|
||||||
ics_enabled_rooms = list(all_rooms)
|
|
||||||
|
|
||||||
logger.info(f"Found {len(ics_enabled_rooms)} rooms with ICS enabled")
|
logger.info(f"Found {len(ics_enabled_rooms)} rooms with ICS enabled")
|
||||||
|
|
||||||
for room_data in ics_enabled_rooms:
|
for room in ics_enabled_rooms:
|
||||||
room_id = room_data["id"]
|
|
||||||
room = await rooms_controller.get_by_id(room_id)
|
|
||||||
|
|
||||||
if not room:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not _should_sync(room):
|
if not _should_sync(room):
|
||||||
logger.debug("Skipping room, not time to sync yet", room_id=room_id)
|
logger.debug("Skipping room, not time to sync yet", room_id=room.id)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
sync_room_ics.delay(room_id)
|
sync_room_ics.delay(room.id)
|
||||||
|
|
||||||
logger.info("Queued sync tasks for all eligible rooms")
|
logger.info("Queued sync tasks for all eligible rooms")
|
||||||
|
|
||||||
@@ -158,29 +145,19 @@ async def create_upcoming_meetings():
|
|||||||
try:
|
try:
|
||||||
logger.info("Starting creation of upcoming meetings")
|
logger.info("Starting creation of upcoming meetings")
|
||||||
|
|
||||||
# Get ALL rooms with ICS enabled
|
ics_enabled_rooms = await rooms_controller.get_ics_enabled()
|
||||||
query = rooms.select().where(
|
|
||||||
rooms.c.ics_enabled == True, rooms.c.ics_url != None
|
|
||||||
)
|
|
||||||
all_rooms = await get_database().fetch_all(query)
|
|
||||||
now = datetime.now(timezone.utc)
|
now = datetime.now(timezone.utc)
|
||||||
create_window = now - timedelta(minutes=6)
|
create_window = now - timedelta(minutes=6)
|
||||||
|
|
||||||
for room_data in all_rooms:
|
for room in ics_enabled_rooms:
|
||||||
room_id = room_data["id"]
|
|
||||||
room = await rooms_controller.get_by_id(room_id)
|
|
||||||
|
|
||||||
if not room:
|
|
||||||
continue
|
|
||||||
|
|
||||||
events = await calendar_events_controller.get_upcoming(
|
events = await calendar_events_controller.get_upcoming(
|
||||||
room_id,
|
room.id,
|
||||||
minutes_ahead=7,
|
minutes_ahead=7,
|
||||||
)
|
)
|
||||||
|
|
||||||
for event in events:
|
for event in events:
|
||||||
await create_upcoming_meetings_for_event(
|
await create_upcoming_meetings_for_event(
|
||||||
event, create_window, room_id, room
|
event, create_window, room.id, room
|
||||||
)
|
)
|
||||||
logger.info("Completed pre-creation check for upcoming meetings")
|
logger.info("Completed pre-creation check for upcoming meetings")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user