diff --git a/server/reflector/worker/cleanup.py b/server/reflector/worker/cleanup.py index 8b7ac5a9..7fcc1f73 100644 --- a/server/reflector/worker/cleanup.py +++ b/server/reflector/worker/cleanup.py @@ -34,49 +34,47 @@ class CleanupStats(TypedDict): async def delete_single_transcript( - session_factory, transcript_data: dict, stats: CleanupStats + session_factory, transcript_data: dict, stats: CleanupStats, session=None ): transcript_id = transcript_data["id"] meeting_id = transcript_data["meeting_id"] recording_id = transcript_data["recording_id"] try: - async with session_factory() as session: - async with session.begin(): - if meeting_id: + if session: + # Use provided session for testing - don't start new transaction + if meeting_id: + await session.execute( + delete(MeetingModel).where(MeetingModel.id == meeting_id) + ) + stats["meetings_deleted"] += 1 + logger.info("Deleted associated meeting", meeting_id=meeting_id) + + if recording_id: + result = await session.execute( + select(RecordingModel).where(RecordingModel.id == recording_id) + ) + recording = result.mappings().first() + if recording: + try: + await get_recordings_storage().delete_file( + recording["object_key"] + ) + except Exception as storage_error: + logger.warning( + "Failed to delete recording from storage", + recording_id=recording_id, + object_key=recording["object_key"], + error=str(storage_error), + ) + await session.execute( - delete(MeetingModel).where(MeetingModel.id == meeting_id) + delete(RecordingModel).where(RecordingModel.id == recording_id) ) - stats["meetings_deleted"] += 1 - logger.info("Deleted associated meeting", meeting_id=meeting_id) - - if recording_id: - result = await session.execute( - select(RecordingModel).where(RecordingModel.id == recording_id) + stats["recordings_deleted"] += 1 + logger.info( + "Deleted associated recording", recording_id=recording_id ) - recording = result.mappings().first() - if recording: - try: - await get_recordings_storage().delete_file( - recording["object_key"] - ) - except Exception as storage_error: - logger.warning( - "Failed to delete recording from storage", - recording_id=recording_id, - object_key=recording["object_key"], - error=str(storage_error), - ) - - await session.execute( - delete(RecordingModel).where( - RecordingModel.id == recording_id - ) - ) - stats["recordings_deleted"] += 1 - logger.info( - "Deleted associated recording", recording_id=recording_id - ) await transcripts_controller.remove_by_id(session, transcript_id) stats["transcripts_deleted"] += 1 @@ -85,6 +83,55 @@ async def delete_single_transcript( transcript_id=transcript_id, created_at=transcript_data["created_at"].isoformat(), ) + else: + # Use session factory for production + async with session_factory() as session: + async with session.begin(): + if meeting_id: + await session.execute( + delete(MeetingModel).where(MeetingModel.id == meeting_id) + ) + stats["meetings_deleted"] += 1 + logger.info("Deleted associated meeting", meeting_id=meeting_id) + + if recording_id: + result = await session.execute( + select(RecordingModel).where( + RecordingModel.id == recording_id + ) + ) + recording = result.mappings().first() + if recording: + try: + await get_recordings_storage().delete_file( + recording["object_key"] + ) + except Exception as storage_error: + logger.warning( + "Failed to delete recording from storage", + recording_id=recording_id, + object_key=recording["object_key"], + error=str(storage_error), + ) + + await session.execute( + delete(RecordingModel).where( + RecordingModel.id == recording_id + ) + ) + stats["recordings_deleted"] += 1 + logger.info( + "Deleted associated recording", + recording_id=recording_id, + ) + + await transcripts_controller.remove_by_id(session, transcript_id) + stats["transcripts_deleted"] += 1 + logger.info( + "Deleted transcript", + transcript_id=transcript_id, + created_at=transcript_data["created_at"].isoformat(), + ) except Exception as e: error_msg = f"Failed to delete transcript {transcript_id}: {str(e)}" logger.error(error_msg, exc_info=e) @@ -92,7 +139,7 @@ async def delete_single_transcript( async def cleanup_old_transcripts( - session_factory, cutoff_date: datetime, stats: CleanupStats + session_factory, cutoff_date: datetime, stats: CleanupStats, session=None ): """Delete old anonymous transcripts and their associated recordings/meetings.""" query = select( @@ -104,14 +151,27 @@ async def cleanup_old_transcripts( (TranscriptModel.created_at < cutoff_date) & (TranscriptModel.user_id.is_(None)) ) - async with session_factory() as session: + if session: + # Use provided session for testing result = await session.execute(query) old_transcripts = result.mappings().all() + else: + # Use session factory for production + async with session_factory() as session: + result = await session.execute(query) + old_transcripts = result.mappings().all() logger.info(f"Found {len(old_transcripts)} old transcripts to delete") for transcript_data in old_transcripts: - await delete_single_transcript(session_factory, transcript_data, stats) + try: + await delete_single_transcript( + session_factory, transcript_data, stats, session + ) + except Exception as e: + error_msg = f"Failed to delete transcript {transcript_data['id']}: {str(e)}" + logger.error(error_msg, exc_info=e) + stats["errors"].append(error_msg) def log_cleanup_results(stats: CleanupStats): @@ -132,6 +192,7 @@ def log_cleanup_results(stats: CleanupStats): async def cleanup_old_public_data( days: PositiveInt | None = None, + session=None, ) -> CleanupStats | None: if days is None: days = settings.PUBLIC_DATA_RETENTION_DAYS @@ -154,7 +215,7 @@ async def cleanup_old_public_data( } session_factory = get_session_factory() - await cleanup_old_transcripts(session_factory, cutoff_date, stats) + await cleanup_old_transcripts(session_factory, cutoff_date, stats, session) log_cleanup_results(stats) return stats diff --git a/server/tests/test_attendee_parsing_bug.py b/server/tests/test_attendee_parsing_bug.py index f50064d1..9495c0ab 100644 --- a/server/tests/test_attendee_parsing_bug.py +++ b/server/tests/test_attendee_parsing_bug.py @@ -1,15 +1,14 @@ import os -from unittest.mock import AsyncMock, patch +from unittest.mock import patch import pytest -from reflector.db import get_session_factory from reflector.db.rooms import rooms_controller from reflector.services.ics_sync import ICSSyncService @pytest.mark.asyncio -async def test_attendee_parsing_bug(): +async def test_attendee_parsing_bug(session): """ Test that reproduces the attendee parsing bug where a string with comma-separated emails gets parsed as individual characters instead of separate email addresses. @@ -18,22 +17,24 @@ async def test_attendee_parsing_bug(): instead of properly parsed email addresses. """ # Create a test room - async with get_session_factory()() as session: - room = await rooms_controller.add( - session, - name="test-room", - user_id="test-user", - zulip_auto_post=False, - zulip_stream="", - zulip_topic="", - is_locked=False, - room_mode="normal", - recording_type="cloud", - recording_trigger="automatic-2nd-participant", - is_shared=False, - ics_url="http://test.com/test.ics", - ics_enabled=True, - ) + room = await rooms_controller.add( + session, + name="test-room", + user_id="test-user", + zulip_auto_post=False, + zulip_stream="", + zulip_topic="", + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic-2nd-participant", + is_shared=False, + ics_url="http://test.com/test.ics", + ics_enabled=True, + ) + + # Force flush to make room visible to other sessions + await session.flush() # Read the test ICS file that reproduces the bug and update it with current time from datetime import datetime, timedelta, timezone @@ -62,37 +63,55 @@ async def test_attendee_parsing_bug(): # Create sync service and mock the fetch sync_service = ICSSyncService() - with patch.object( - sync_service.fetch_service, "fetch_ics", new_callable=AsyncMock - ) as mock_fetch: - mock_fetch.return_value = ics_content + # Mock the session factory to use our test session + from contextlib import asynccontextmanager + from unittest.mock import AsyncMock - # Debug: Parse the ICS content directly to examine attendee parsing - calendar = sync_service.fetch_service.parse_ics(ics_content) - from reflector.settings import settings + @asynccontextmanager + async def mock_session_context(): + yield session - room_url = f"{settings.UI_BASE_URL}/{room.name}" + # Create a mock sessionmaker that behaves like async_sessionmaker + class MockSessionMaker: + def __call__(self): + return mock_session_context() - print(f"Room URL being used for matching: {room_url}") - print(f"ICS content:\n{ics_content}") + mock_session_factory = MockSessionMaker() - events, total_events = sync_service.fetch_service.extract_room_events( - calendar, room.name, room_url - ) + with patch("reflector.services.ics_sync.get_session_factory") as mock_get_factory: + mock_get_factory.return_value = mock_session_factory - print(f"Total events in calendar: {total_events}") - print(f"Events matching room: {len(events)}") + with patch.object( + sync_service.fetch_service, "fetch_ics", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = ics_content - # Perform the sync - result = await sync_service.sync_room_calendar(room) + # Debug: Parse the ICS content directly to examine attendee parsing + calendar = sync_service.fetch_service.parse_ics(ics_content) + from reflector.settings import settings - # Check that the sync succeeded - assert result.get("status") == "success" - assert result.get("events_found", 0) >= 0 # Allow for debugging + room_url = f"{settings.UI_BASE_URL}/{room.name}" - # We already have the matching events from the debug code above - assert len(events) == 1 - event = events[0] + print(f"Room URL being used for matching: {room_url}") + print(f"ICS content:\n{ics_content}") + + events, total_events = sync_service.fetch_service.extract_room_events( + calendar, room.name, room_url + ) + + print(f"Total events in calendar: {total_events}") + print(f"Events matching room: {len(events)}") + + # Perform the sync + result = await sync_service.sync_room_calendar(room) + + # Check that the sync succeeded + assert result.get("status") == "success" + assert result.get("events_found", 0) >= 0 # Allow for debugging + + # We already have the matching events from the debug code above + assert len(events) == 1 + event = events[0] # This is where the bug manifests - check the attendees attendees = event["attendees"] diff --git a/server/tests/test_cleanup.py b/server/tests/test_cleanup.py index 032b161c..9ccead68 100644 --- a/server/tests/test_cleanup.py +++ b/server/tests/test_cleanup.py @@ -80,8 +80,8 @@ async def test_cleanup_old_public_data_deletes_old_anonymous_transcripts(session with patch("reflector.worker.cleanup.delete_single_transcript") as mock_delete: mock_delete.return_value = None - # Run cleanup - await cleanup_old_public_data() + # Run cleanup with test session + await cleanup_old_public_data(session=session) # Verify only old anonymous transcript was deleted assert mock_delete.call_count == 1 @@ -161,8 +161,8 @@ async def test_cleanup_deletes_associated_meeting_and_recording(session): with patch("reflector.worker.cleanup.get_recordings_storage") as mock_storage: mock_storage.return_value.delete_file = AsyncMock() - # Run cleanup - await cleanup_old_public_data() + # Run cleanup with test session + await cleanup_old_public_data(session=session) # Verify transcript was deleted result = await session.execute( @@ -225,8 +225,8 @@ async def test_cleanup_handles_errors_gracefully(session): with patch("reflector.worker.cleanup.delete_single_transcript") as mock_delete: mock_delete.side_effect = [Exception("Delete failed"), None] - # Run cleanup - should not raise exception - await cleanup_old_public_data() + # Run cleanup with test session - should not raise exception + await cleanup_old_public_data(session=session) # Both transcripts should have been attempted to delete assert mock_delete.call_count == 2 diff --git a/server/tests/test_ics_background_tasks.py b/server/tests/test_ics_background_tasks.py index b31b4ae3..4b16767c 100644 --- a/server/tests/test_ics_background_tasks.py +++ b/server/tests/test_ics_background_tasks.py @@ -30,8 +30,8 @@ async def test_sync_room_ics_task(session): ics_url="https://calendar.example.com/task.ics", ics_enabled=True, ) - # Commit to make room visible to ICS service's separate session - await session.commit() + # Flush to make room visible to other operations within the same session + await session.flush() cal = Calendar() event = Event() @@ -46,17 +46,34 @@ async def test_sync_room_ics_task(session): cal.add_component(event) ics_content = cal.to_ical().decode("utf-8") - with patch( - "reflector.services.ics_sync.ICSFetchService.fetch_ics", new_callable=AsyncMock - ) as mock_fetch: - mock_fetch.return_value = ics_content + # Mock the session factory to use our test session + from contextlib import asynccontextmanager - # Call the service directly instead of the Celery task to avoid event loop issues - await ics_sync_service.sync_room_calendar(room) + @asynccontextmanager + async def mock_session_context(): + yield session - events = await calendar_events_controller.get_by_room(session, room.id) - assert len(events) == 1 - assert events[0].ics_uid == "task-event-1" + class MockSessionMaker: + def __call__(self): + return mock_session_context() + + mock_session_factory = MockSessionMaker() + + with patch("reflector.services.ics_sync.get_session_factory") as mock_get_factory: + mock_get_factory.return_value = mock_session_factory + + with patch( + "reflector.services.ics_sync.ICSFetchService.fetch_ics", + new_callable=AsyncMock, + ) as mock_fetch: + mock_fetch.return_value = ics_content + + # Call the service directly instead of the Celery task to avoid event loop issues + await ics_sync_service.sync_room_calendar(room) + + events = await calendar_events_controller.get_by_room(session, room.id) + assert len(events) == 1 + assert events[0].ics_uid == "task-event-1" @pytest.mark.asyncio diff --git a/server/tests/test_ics_sync.py b/server/tests/test_ics_sync.py index e448dd7d..7c604de7 100644 --- a/server/tests/test_ics_sync.py +++ b/server/tests/test_ics_sync.py @@ -134,9 +134,10 @@ async def test_ics_fetch_service_extract_room_events(): @pytest.mark.asyncio -async def test_ics_sync_service_sync_room_calendar(): +async def test_ics_sync_service_sync_room_calendar(session): # Create room room = await rooms_controller.add( + session, name="sync-test", user_id="test-user", zulip_auto_post=False, @@ -150,6 +151,8 @@ async def test_ics_sync_service_sync_room_calendar(): ics_url="https://calendar.example.com/test.ics", ics_enabled=True, ) + # Flush to make room visible to other operations within the same session + await session.flush() # Mock ICS content cal = Calendar() @@ -166,57 +169,75 @@ async def test_ics_sync_service_sync_room_calendar(): cal.add_component(event) ics_content = cal.to_ical().decode("utf-8") + # Mock the session factory to use our test session + from contextlib import asynccontextmanager + + @asynccontextmanager + async def mock_session_context(): + yield session + + class MockSessionMaker: + def __call__(self): + return mock_session_context() + + mock_session_factory = MockSessionMaker() + # Create sync service and mock fetch sync_service = ICSSyncService() - with patch.object( - sync_service.fetch_service, "fetch_ics", new_callable=AsyncMock - ) as mock_fetch: - mock_fetch.return_value = ics_content + with patch("reflector.services.ics_sync.get_session_factory") as mock_get_factory: + mock_get_factory.return_value = mock_session_factory - # First sync - result = await sync_service.sync_room_calendar(room) + with patch.object( + sync_service.fetch_service, "fetch_ics", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.return_value = ics_content - assert result["status"] == "success" - assert result["events_found"] == 1 - assert result["events_created"] == 1 - assert result["events_updated"] == 0 - assert result["events_deleted"] == 0 + # First sync + result = await sync_service.sync_room_calendar(room) - # Verify event was created - events = await calendar_events_controller.get_by_room(room.id) - assert len(events) == 1 - assert events[0].ics_uid == "sync-event-1" - assert events[0].title == "Sync Test Meeting" + assert result["status"] == "success" + assert result["events_found"] == 1 + assert result["events_created"] == 1 + assert result["events_updated"] == 0 + assert result["events_deleted"] == 0 - # Second sync with same content (should be unchanged) - # Refresh room to get updated etag and force sync by setting old sync time - room = await rooms_controller.get_by_id(room.id) - await rooms_controller.update( - room, {"ics_last_sync": datetime.now(timezone.utc) - timedelta(minutes=10)} - ) - result = await sync_service.sync_room_calendar(room) - assert result["status"] == "unchanged" + # Verify event was created + events = await calendar_events_controller.get_by_room(session, room.id) + assert len(events) == 1 + assert events[0].ics_uid == "sync-event-1" + assert events[0].title == "Sync Test Meeting" - # Third sync with updated event - event["summary"] = "Updated Meeting Title" - cal = Calendar() - cal.add_component(event) - ics_content = cal.to_ical().decode("utf-8") - mock_fetch.return_value = ics_content + # Second sync with same content (should be unchanged) + # Refresh room to get updated etag and force sync by setting old sync time + room = await rooms_controller.get_by_id(session, room.id) + await rooms_controller.update( + session, + room, + {"ics_last_sync": datetime.now(timezone.utc) - timedelta(minutes=10)}, + ) + result = await sync_service.sync_room_calendar(room) + assert result["status"] == "unchanged" - # Force sync by clearing etag - await rooms_controller.update(room, {"ics_last_etag": None}) + # Third sync with updated event + event["summary"] = "Updated Meeting Title" + cal = Calendar() + cal.add_component(event) + ics_content = cal.to_ical().decode("utf-8") + mock_fetch.return_value = ics_content - result = await sync_service.sync_room_calendar(room) - assert result["status"] == "success" - assert result["events_created"] == 0 - assert result["events_updated"] == 1 + # Force sync by clearing etag + await rooms_controller.update(session, room, {"ics_last_etag": None}) - # Verify event was updated - events = await calendar_events_controller.get_by_room(room.id) - assert len(events) == 1 - assert events[0].title == "Updated Meeting Title" + result = await sync_service.sync_room_calendar(room) + assert result["status"] == "success" + assert result["events_created"] == 0 + assert result["events_updated"] == 1 + + # Verify event was updated + events = await calendar_events_controller.get_by_room(session, room.id) + assert len(events) == 1 + assert events[0].title == "Updated Meeting Title" @pytest.mark.asyncio @@ -261,9 +282,10 @@ async def test_ics_sync_service_skip_disabled(): @pytest.mark.asyncio -async def test_ics_sync_service_error_handling(): +async def test_ics_sync_service_error_handling(session): # Create room room = await rooms_controller.add( + session, name="error-test", user_id="test-user", zulip_auto_post=False, @@ -277,14 +299,32 @@ async def test_ics_sync_service_error_handling(): ics_url="https://calendar.example.com/error.ics", ics_enabled=True, ) + # Flush to make room visible to other operations within the same session + await session.flush() + + # Mock the session factory to use our test session + from contextlib import asynccontextmanager + + @asynccontextmanager + async def mock_session_context(): + yield session + + class MockSessionMaker: + def __call__(self): + return mock_session_context() + + mock_session_factory = MockSessionMaker() sync_service = ICSSyncService() - with patch.object( - sync_service.fetch_service, "fetch_ics", new_callable=AsyncMock - ) as mock_fetch: - mock_fetch.side_effect = Exception("Network error") + with patch("reflector.services.ics_sync.get_session_factory") as mock_get_factory: + mock_get_factory.return_value = mock_session_factory - result = await sync_service.sync_room_calendar(room) - assert result["status"] == "error" - assert "Network error" in result["error"] + with patch.object( + sync_service.fetch_service, "fetch_ics", new_callable=AsyncMock + ) as mock_fetch: + mock_fetch.side_effect = Exception("Network error") + + result = await sync_service.sync_room_calendar(room) + assert result["status"] == "error" + assert "Network error" in result["error"] diff --git a/server/tests/test_pipeline_main_file.py b/server/tests/test_pipeline_main_file.py index f86dc85d..32a69f24 100644 --- a/server/tests/test_pipeline_main_file.py +++ b/server/tests/test_pipeline_main_file.py @@ -101,21 +101,37 @@ async def mock_transcript_in_db(tmpdir): target_language="en", ) - # Mock the controller to return our transcript + # Mock all transcripts controller methods that are used in the pipeline try: with patch( "reflector.pipelines.main_file_pipeline.transcripts_controller.get_by_id" ) as mock_get: mock_get.return_value = transcript with patch( - "reflector.pipelines.main_live_pipeline.transcripts_controller.get_by_id" - ) as mock_get2: - mock_get2.return_value = transcript + "reflector.pipelines.main_file_pipeline.transcripts_controller.update" + ) as mock_update: + mock_update.return_value = transcript with patch( - "reflector.pipelines.main_live_pipeline.transcripts_controller.update" - ) as mock_update: - mock_update.return_value = None - yield transcript + "reflector.pipelines.main_file_pipeline.transcripts_controller.set_status" + ) as mock_set_status: + mock_set_status.return_value = None + with patch( + "reflector.pipelines.main_file_pipeline.transcripts_controller.upsert_topic" + ) as mock_upsert_topic: + mock_upsert_topic.return_value = None + with patch( + "reflector.pipelines.main_file_pipeline.transcripts_controller.append_event" + ) as mock_append_event: + mock_append_event.return_value = None + with patch( + "reflector.pipelines.main_live_pipeline.transcripts_controller.get_by_id" + ) as mock_get2: + mock_get2.return_value = transcript + with patch( + "reflector.pipelines.main_live_pipeline.transcripts_controller.update" + ) as mock_update2: + mock_update2.return_value = None + yield transcript finally: # Restore original DATA_DIR settings.DATA_DIR = original_data_dir