mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 04:39:06 +00:00
refactor: add session parameter to ICSSyncService.sync_room_calendar
- Updated sync_room_calendar method to accept AsyncSession as first parameter - Removed internal get_session_factory() calls from the service - Updated all callers (views/rooms.py, worker/ics_sync.py) to pass session - Fixed all test files to remove mocking of get_session_factory - Consistent with @with_session decorator pattern used elsewhere
This commit is contained in:
@@ -55,8 +55,8 @@ import httpx
|
|||||||
import pytz
|
import pytz
|
||||||
import structlog
|
import structlog
|
||||||
from icalendar import Calendar, Event
|
from icalendar import Calendar, Event
|
||||||
|
from sqlalchemy.ext.asyncio import AsyncSession
|
||||||
|
|
||||||
from reflector.db import get_session_factory
|
|
||||||
from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
|
from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
|
||||||
from reflector.db.rooms import Room, rooms_controller
|
from reflector.db.rooms import Room, rooms_controller
|
||||||
from reflector.redis_cache import RedisAsyncLock
|
from reflector.redis_cache import RedisAsyncLock
|
||||||
@@ -295,7 +295,7 @@ class ICSSyncService:
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.fetch_service = ICSFetchService()
|
self.fetch_service = ICSFetchService()
|
||||||
|
|
||||||
async def sync_room_calendar(self, room: Room) -> SyncResult:
|
async def sync_room_calendar(self, session: AsyncSession, room: Room) -> SyncResult:
|
||||||
async with RedisAsyncLock(
|
async with RedisAsyncLock(
|
||||||
f"ics_sync_room:{room.id}", skip_if_locked=True
|
f"ics_sync_room:{room.id}", skip_if_locked=True
|
||||||
) as lock:
|
) as lock:
|
||||||
@@ -306,9 +306,11 @@ class ICSSyncService:
|
|||||||
"reason": "Sync already in progress",
|
"reason": "Sync already in progress",
|
||||||
}
|
}
|
||||||
|
|
||||||
return await self._sync_room_calendar(room)
|
return await self._sync_room_calendar(session, room)
|
||||||
|
|
||||||
async def _sync_room_calendar(self, room: Room) -> SyncResult:
|
async def _sync_room_calendar(
|
||||||
|
self, session: AsyncSession, room: Room
|
||||||
|
) -> SyncResult:
|
||||||
if not room.ics_enabled or not room.ics_url:
|
if not room.ics_enabled or not room.ics_url:
|
||||||
return {"status": SyncStatus.SKIPPED, "reason": "ICS not configured"}
|
return {"status": SyncStatus.SKIPPED, "reason": "ICS not configured"}
|
||||||
|
|
||||||
@@ -341,20 +343,18 @@ class ICSSyncService:
|
|||||||
events, total_events = self.fetch_service.extract_room_events(
|
events, total_events = self.fetch_service.extract_room_events(
|
||||||
calendar, room.name, room_url
|
calendar, room.name, room_url
|
||||||
)
|
)
|
||||||
sync_result = await self._sync_events_to_database(room.id, events)
|
sync_result = await self._sync_events_to_database(session, room.id, events)
|
||||||
|
|
||||||
# Update room sync metadata
|
# Update room sync metadata
|
||||||
session_factory = get_session_factory()
|
await rooms_controller.update(
|
||||||
async with session_factory() as session:
|
session,
|
||||||
await rooms_controller.update(
|
room,
|
||||||
session,
|
{
|
||||||
room,
|
"ics_last_sync": datetime.now(timezone.utc),
|
||||||
{
|
"ics_last_etag": content_hash,
|
||||||
"ics_last_sync": datetime.now(timezone.utc),
|
},
|
||||||
"ics_last_etag": content_hash,
|
mutate=False,
|
||||||
},
|
)
|
||||||
mutate=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"status": SyncStatus.SUCCESS,
|
"status": SyncStatus.SUCCESS,
|
||||||
@@ -376,34 +376,32 @@ class ICSSyncService:
|
|||||||
return time_since_sync.total_seconds() >= room.ics_fetch_interval
|
return time_since_sync.total_seconds() >= room.ics_fetch_interval
|
||||||
|
|
||||||
async def _sync_events_to_database(
|
async def _sync_events_to_database(
|
||||||
self, room_id: str, events: list[EventData]
|
self, session: AsyncSession, room_id: str, events: list[EventData]
|
||||||
) -> SyncStats:
|
) -> SyncStats:
|
||||||
created = 0
|
created = 0
|
||||||
updated = 0
|
updated = 0
|
||||||
|
|
||||||
current_ics_uids = []
|
current_ics_uids = []
|
||||||
|
|
||||||
session_factory = get_session_factory()
|
for event_data in events:
|
||||||
async with session_factory() as session:
|
calendar_event = CalendarEvent(room_id=room_id, **event_data)
|
||||||
for event_data in events:
|
existing = await calendar_events_controller.get_by_ics_uid(
|
||||||
calendar_event = CalendarEvent(room_id=room_id, **event_data)
|
session, room_id, event_data["ics_uid"]
|
||||||
existing = await calendar_events_controller.get_by_ics_uid(
|
|
||||||
session, room_id, event_data["ics_uid"]
|
|
||||||
)
|
|
||||||
|
|
||||||
if existing:
|
|
||||||
updated += 1
|
|
||||||
else:
|
|
||||||
created += 1
|
|
||||||
|
|
||||||
await calendar_events_controller.upsert(session, calendar_event)
|
|
||||||
current_ics_uids.append(event_data["ics_uid"])
|
|
||||||
|
|
||||||
# Soft delete events that are no longer in calendar
|
|
||||||
deleted = await calendar_events_controller.soft_delete_missing(
|
|
||||||
session, room_id, current_ics_uids
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if existing:
|
||||||
|
updated += 1
|
||||||
|
else:
|
||||||
|
created += 1
|
||||||
|
|
||||||
|
await calendar_events_controller.upsert(session, calendar_event)
|
||||||
|
current_ics_uids.append(event_data["ics_uid"])
|
||||||
|
|
||||||
|
# Soft delete events that are no longer in calendar
|
||||||
|
deleted = await calendar_events_controller.soft_delete_missing(
|
||||||
|
session, room_id, current_ics_uids
|
||||||
|
)
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"events_created": created,
|
"events_created": created,
|
||||||
"events_updated": updated,
|
"events_updated": updated,
|
||||||
|
|||||||
@@ -387,7 +387,7 @@ async def rooms_sync_ics(
|
|||||||
if not room.ics_enabled or not room.ics_url:
|
if not room.ics_enabled or not room.ics_url:
|
||||||
raise HTTPException(status_code=400, detail="ICS not configured for this room")
|
raise HTTPException(status_code=400, detail="ICS not configured for this room")
|
||||||
|
|
||||||
result = await ics_sync_service.sync_room_calendar(room)
|
result = await ics_sync_service.sync_room_calendar(session, room)
|
||||||
|
|
||||||
if result["status"] == "error":
|
if result["status"] == "error":
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ async def sync_room_ics(session: AsyncSession, room_id: str):
|
|||||||
return
|
return
|
||||||
|
|
||||||
logger.info("Starting ICS sync for room", room_id=room_id, room_name=room.name)
|
logger.info("Starting ICS sync for room", room_id=room_id, room_name=room.name)
|
||||||
result = await ics_sync_service.sync_room_calendar(room)
|
result = await ics_sync_service.sync_room_calendar(session, room)
|
||||||
|
|
||||||
if result["status"] == SyncStatus.SUCCESS:
|
if result["status"] == SyncStatus.SUCCESS:
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|||||||
@@ -54,59 +54,45 @@ async def test_attendee_parsing_bug(db_session):
|
|||||||
ics_content = ics_content.replace("20250910T174000Z", dtstamp)
|
ics_content = ics_content.replace("20250910T174000Z", dtstamp)
|
||||||
|
|
||||||
sync_service = ICSSyncService()
|
sync_service = ICSSyncService()
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
from unittest.mock import AsyncMock
|
from unittest.mock import AsyncMock
|
||||||
|
|
||||||
@asynccontextmanager
|
with patch.object(
|
||||||
async def mock_session_context():
|
sync_service.fetch_service, "fetch_ics", new_callable=AsyncMock
|
||||||
yield db_session
|
) as mock_fetch:
|
||||||
|
mock_fetch.return_value = ics_content
|
||||||
|
|
||||||
class MockSessionMaker:
|
calendar = sync_service.fetch_service.parse_ics(ics_content)
|
||||||
def __call__(self):
|
from reflector.settings import settings
|
||||||
return mock_session_context()
|
|
||||||
|
|
||||||
mock_session_factory = MockSessionMaker()
|
room_url = f"{settings.UI_BASE_URL}/{room.name}"
|
||||||
|
|
||||||
with patch("reflector.services.ics_sync.get_session_factory") as mock_get_factory:
|
print(f"Room URL being used for matching: {room_url}")
|
||||||
mock_get_factory.return_value = mock_session_factory
|
print(f"ICS content:\n{ics_content}")
|
||||||
|
|
||||||
with patch.object(
|
events, total_events = sync_service.fetch_service.extract_room_events(
|
||||||
sync_service.fetch_service, "fetch_ics", new_callable=AsyncMock
|
calendar, room.name, room_url
|
||||||
) as mock_fetch:
|
)
|
||||||
mock_fetch.return_value = ics_content
|
|
||||||
|
|
||||||
calendar = sync_service.fetch_service.parse_ics(ics_content)
|
print(f"Total events in calendar: {total_events}")
|
||||||
from reflector.settings import settings
|
print(f"Events matching room: {len(events)}")
|
||||||
|
|
||||||
room_url = f"{settings.UI_BASE_URL}/{room.name}"
|
result = await sync_service.sync_room_calendar(db_session, room)
|
||||||
|
|
||||||
print(f"Room URL being used for matching: {room_url}")
|
assert result.get("status") == "success"
|
||||||
print(f"ICS content:\n{ics_content}")
|
assert result.get("events_found", 0) >= 0
|
||||||
|
|
||||||
events, total_events = sync_service.fetch_service.extract_room_events(
|
assert len(events) == 1
|
||||||
calendar, room.name, room_url
|
event = events[0]
|
||||||
)
|
|
||||||
|
|
||||||
print(f"Total events in calendar: {total_events}")
|
attendees = event["attendees"]
|
||||||
print(f"Events matching room: {len(events)}")
|
|
||||||
|
|
||||||
result = await sync_service.sync_room_calendar(room)
|
print(f"Number of attendees: {len(attendees)}")
|
||||||
|
for i, attendee in enumerate(attendees):
|
||||||
|
print(f"Attendee {i}: {attendee}")
|
||||||
|
|
||||||
assert result.get("status") == "success"
|
assert len(attendees) == 30, f"Expected 30 attendees, got {len(attendees)}"
|
||||||
assert result.get("events_found", 0) >= 0
|
|
||||||
|
|
||||||
assert len(events) == 1
|
assert attendees[0]["email"] == "alice@example.com"
|
||||||
event = events[0]
|
assert attendees[1]["email"] == "bob@example.com"
|
||||||
|
assert attendees[2]["email"] == "charlie@example.com"
|
||||||
attendees = event["attendees"]
|
assert any(att["email"] == "organizer@example.com" for att in attendees)
|
||||||
|
|
||||||
print(f"Number of attendees: {len(attendees)}")
|
|
||||||
for i, attendee in enumerate(attendees):
|
|
||||||
print(f"Attendee {i}: {attendee}")
|
|
||||||
|
|
||||||
assert len(attendees) == 30, f"Expected 30 attendees, got {len(attendees)}"
|
|
||||||
|
|
||||||
assert attendees[0]["email"] == "alice@example.com"
|
|
||||||
assert attendees[1]["email"] == "bob@example.com"
|
|
||||||
assert attendees[2]["email"] == "charlie@example.com"
|
|
||||||
assert any(att["email"] == "organizer@example.com" for att in attendees)
|
|
||||||
|
|||||||
@@ -45,32 +45,17 @@ async def test_sync_room_ics_task(db_session):
|
|||||||
cal.add_component(event)
|
cal.add_component(event)
|
||||||
ics_content = cal.to_ical().decode("utf-8")
|
ics_content = cal.to_ical().decode("utf-8")
|
||||||
|
|
||||||
from contextlib import asynccontextmanager
|
with patch(
|
||||||
|
"reflector.services.ics_sync.ICSFetchService.fetch_ics",
|
||||||
|
new_callable=AsyncMock,
|
||||||
|
) as mock_fetch:
|
||||||
|
mock_fetch.return_value = ics_content
|
||||||
|
|
||||||
@asynccontextmanager
|
await ics_sync_service.sync_room_calendar(db_session, room)
|
||||||
async def mock_session_context():
|
|
||||||
yield db_session
|
|
||||||
|
|
||||||
class MockSessionMaker:
|
events = await calendar_events_controller.get_by_room(db_session, room.id)
|
||||||
def __call__(self):
|
assert len(events) == 1
|
||||||
return mock_session_context()
|
assert events[0].ics_uid == "task-event-1"
|
||||||
|
|
||||||
mock_session_factory = MockSessionMaker()
|
|
||||||
|
|
||||||
with patch("reflector.services.ics_sync.get_session_factory") as mock_get_factory:
|
|
||||||
mock_get_factory.return_value = mock_session_factory
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.ics_sync.ICSFetchService.fetch_ics",
|
|
||||||
new_callable=AsyncMock,
|
|
||||||
) as mock_fetch:
|
|
||||||
mock_fetch.return_value = ics_content
|
|
||||||
|
|
||||||
await ics_sync_service.sync_room_calendar(room)
|
|
||||||
|
|
||||||
events = await calendar_events_controller.get_by_room(db_session, room.id)
|
|
||||||
assert len(events) == 1
|
|
||||||
assert events[0].ics_uid == "task-event-1"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@@ -90,7 +75,7 @@ async def test_sync_room_ics_disabled(db_session):
|
|||||||
ics_enabled=False,
|
ics_enabled=False,
|
||||||
)
|
)
|
||||||
|
|
||||||
result = await ics_sync_service.sync_room_calendar(room)
|
result = await ics_sync_service.sync_room_calendar(db_session, room)
|
||||||
|
|
||||||
events = await calendar_events_controller.get_by_room(db_session, room.id)
|
events = await calendar_events_controller.get_by_room(db_session, room.id)
|
||||||
assert len(events) == 0
|
assert len(events) == 0
|
||||||
@@ -259,7 +244,7 @@ async def test_sync_handles_errors_gracefully(db_session):
|
|||||||
) as mock_fetch:
|
) as mock_fetch:
|
||||||
mock_fetch.side_effect = Exception("Network error")
|
mock_fetch.side_effect = Exception("Network error")
|
||||||
|
|
||||||
result = await ics_sync_service.sync_room_calendar(room)
|
result = await ics_sync_service.sync_room_calendar(db_session, room)
|
||||||
assert result["status"] == "error"
|
assert result["status"] == "error"
|
||||||
|
|
||||||
events = await calendar_events_controller.get_by_room(db_session, room.id)
|
events = await calendar_events_controller.get_by_room(db_session, room.id)
|
||||||
|
|||||||
@@ -168,74 +168,59 @@ async def test_ics_sync_service_sync_room_calendar(db_session):
|
|||||||
cal.add_component(event)
|
cal.add_component(event)
|
||||||
ics_content = cal.to_ical().decode("utf-8")
|
ics_content = cal.to_ical().decode("utf-8")
|
||||||
|
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def mock_session_context():
|
|
||||||
yield db_session
|
|
||||||
|
|
||||||
class MockSessionMaker:
|
|
||||||
def __call__(self):
|
|
||||||
return mock_session_context()
|
|
||||||
|
|
||||||
mock_session_factory = MockSessionMaker()
|
|
||||||
|
|
||||||
# Create sync service and mock fetch
|
# Create sync service and mock fetch
|
||||||
sync_service = ICSSyncService()
|
sync_service = ICSSyncService()
|
||||||
|
|
||||||
with patch("reflector.services.ics_sync.get_session_factory") as mock_get_factory:
|
with patch.object(
|
||||||
mock_get_factory.return_value = mock_session_factory
|
sync_service.fetch_service, "fetch_ics", new_callable=AsyncMock
|
||||||
|
) as mock_fetch:
|
||||||
|
mock_fetch.return_value = ics_content
|
||||||
|
|
||||||
with patch.object(
|
# First sync
|
||||||
sync_service.fetch_service, "fetch_ics", new_callable=AsyncMock
|
result = await sync_service.sync_room_calendar(db_session, room)
|
||||||
) as mock_fetch:
|
|
||||||
mock_fetch.return_value = ics_content
|
|
||||||
|
|
||||||
# First sync
|
assert result["status"] == "success"
|
||||||
result = await sync_service.sync_room_calendar(room)
|
assert result["events_found"] == 1
|
||||||
|
assert result["events_created"] == 1
|
||||||
|
assert result["events_updated"] == 0
|
||||||
|
assert result["events_deleted"] == 0
|
||||||
|
|
||||||
assert result["status"] == "success"
|
# Verify event was created
|
||||||
assert result["events_found"] == 1
|
events = await calendar_events_controller.get_by_room(db_session, room.id)
|
||||||
assert result["events_created"] == 1
|
assert len(events) == 1
|
||||||
assert result["events_updated"] == 0
|
assert events[0].ics_uid == "sync-event-1"
|
||||||
assert result["events_deleted"] == 0
|
assert events[0].title == "Sync Test Meeting"
|
||||||
|
|
||||||
# Verify event was created
|
# Second sync with same content (should be unchanged)
|
||||||
events = await calendar_events_controller.get_by_room(db_session, room.id)
|
# Refresh room to get updated etag and force sync by setting old sync time
|
||||||
assert len(events) == 1
|
room = await rooms_controller.get_by_id(db_session, room.id)
|
||||||
assert events[0].ics_uid == "sync-event-1"
|
await rooms_controller.update(
|
||||||
assert events[0].title == "Sync Test Meeting"
|
db_session,
|
||||||
|
room,
|
||||||
|
{"ics_last_sync": datetime.now(timezone.utc) - timedelta(minutes=10)},
|
||||||
|
)
|
||||||
|
result = await sync_service.sync_room_calendar(db_session, room)
|
||||||
|
assert result["status"] == "unchanged"
|
||||||
|
|
||||||
# Second sync with same content (should be unchanged)
|
# Third sync with updated event
|
||||||
# Refresh room to get updated etag and force sync by setting old sync time
|
event["summary"] = "Updated Meeting Title"
|
||||||
room = await rooms_controller.get_by_id(db_session, room.id)
|
cal = Calendar()
|
||||||
await rooms_controller.update(
|
cal.add_component(event)
|
||||||
db_session,
|
ics_content = cal.to_ical().decode("utf-8")
|
||||||
room,
|
mock_fetch.return_value = ics_content
|
||||||
{"ics_last_sync": datetime.now(timezone.utc) - timedelta(minutes=10)},
|
|
||||||
)
|
|
||||||
result = await sync_service.sync_room_calendar(room)
|
|
||||||
assert result["status"] == "unchanged"
|
|
||||||
|
|
||||||
# Third sync with updated event
|
# Force sync by clearing etag
|
||||||
event["summary"] = "Updated Meeting Title"
|
await rooms_controller.update(db_session, room, {"ics_last_etag": None})
|
||||||
cal = Calendar()
|
|
||||||
cal.add_component(event)
|
|
||||||
ics_content = cal.to_ical().decode("utf-8")
|
|
||||||
mock_fetch.return_value = ics_content
|
|
||||||
|
|
||||||
# Force sync by clearing etag
|
result = await sync_service.sync_room_calendar(db_session, room)
|
||||||
await rooms_controller.update(db_session, room, {"ics_last_etag": None})
|
assert result["status"] == "success"
|
||||||
|
assert result["events_created"] == 0
|
||||||
|
assert result["events_updated"] == 1
|
||||||
|
|
||||||
result = await sync_service.sync_room_calendar(room)
|
# Verify event was updated
|
||||||
assert result["status"] == "success"
|
events = await calendar_events_controller.get_by_room(db_session, room.id)
|
||||||
assert result["events_created"] == 0
|
assert len(events) == 1
|
||||||
assert result["events_updated"] == 1
|
assert events[0].title == "Updated Meeting Title"
|
||||||
|
|
||||||
# Verify event was updated
|
|
||||||
events = await calendar_events_controller.get_by_room(db_session, room.id)
|
|
||||||
assert len(events) == 1
|
|
||||||
assert events[0].title == "Updated Meeting Title"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@@ -266,7 +251,7 @@ async def test_ics_sync_service_skip_disabled():
|
|||||||
room.ics_enabled = False
|
room.ics_enabled = False
|
||||||
room.ics_url = "https://calendar.example.com/test.ics"
|
room.ics_url = "https://calendar.example.com/test.ics"
|
||||||
|
|
||||||
result = await service.sync_room_calendar(room)
|
result = await service.sync_room_calendar(MagicMock(), room)
|
||||||
assert result["status"] == "skipped"
|
assert result["status"] == "skipped"
|
||||||
assert result["reason"] == "ICS not configured"
|
assert result["reason"] == "ICS not configured"
|
||||||
|
|
||||||
@@ -274,7 +259,7 @@ async def test_ics_sync_service_skip_disabled():
|
|||||||
room.ics_enabled = True
|
room.ics_enabled = True
|
||||||
room.ics_url = None
|
room.ics_url = None
|
||||||
|
|
||||||
result = await service.sync_room_calendar(room)
|
result = await service.sync_room_calendar(MagicMock(), room)
|
||||||
assert result["status"] == "skipped"
|
assert result["status"] == "skipped"
|
||||||
assert result["reason"] == "ICS not configured"
|
assert result["reason"] == "ICS not configured"
|
||||||
|
|
||||||
@@ -299,28 +284,13 @@ async def test_ics_sync_service_error_handling(db_session):
|
|||||||
)
|
)
|
||||||
await db_session.flush()
|
await db_session.flush()
|
||||||
|
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def mock_session_context():
|
|
||||||
yield db_session
|
|
||||||
|
|
||||||
class MockSessionMaker:
|
|
||||||
def __call__(self):
|
|
||||||
return mock_session_context()
|
|
||||||
|
|
||||||
mock_session_factory = MockSessionMaker()
|
|
||||||
|
|
||||||
sync_service = ICSSyncService()
|
sync_service = ICSSyncService()
|
||||||
|
|
||||||
with patch("reflector.services.ics_sync.get_session_factory") as mock_get_factory:
|
with patch.object(
|
||||||
mock_get_factory.return_value = mock_session_factory
|
sync_service.fetch_service, "fetch_ics", new_callable=AsyncMock
|
||||||
|
) as mock_fetch:
|
||||||
|
mock_fetch.side_effect = Exception("Network error")
|
||||||
|
|
||||||
with patch.object(
|
result = await sync_service.sync_room_calendar(db_session, room)
|
||||||
sync_service.fetch_service, "fetch_ics", new_callable=AsyncMock
|
assert result["status"] == "error"
|
||||||
) as mock_fetch:
|
assert "Network error" in result["error"]
|
||||||
mock_fetch.side_effect = Exception("Network error")
|
|
||||||
|
|
||||||
result = await sync_service.sync_room_calendar(room)
|
|
||||||
assert result["status"] == "error"
|
|
||||||
assert "Network error" in result["error"]
|
|
||||||
|
|||||||
Reference in New Issue
Block a user