diff --git a/server/reflector/worker/cleanup.py b/server/reflector/worker/cleanup.py index a34f7c75..01a07183 100644 --- a/server/reflector/worker/cleanup.py +++ b/server/reflector/worker/cleanup.py @@ -95,7 +95,12 @@ async def cleanup_old_transcripts( session_factory, cutoff_date: datetime, stats: CleanupStats ): """Delete old anonymous transcripts and their associated recordings/meetings.""" - query = select(transcripts).where( + query = select( + TranscriptModel.id, + TranscriptModel.meeting_id, + TranscriptModel.recording_id, + TranscriptModel.created_at, + ).where( (TranscriptModel.created_at < cutoff_date) & (TranscriptModel.user_id.is_(None)) ) diff --git a/server/tests/test_cleanup.py b/server/tests/test_cleanup.py index 6b32bb1a..639b80a1 100644 --- a/server/tests/test_cleanup.py +++ b/server/tests/test_cleanup.py @@ -2,8 +2,14 @@ from datetime import datetime, timedelta, timezone from unittest.mock import AsyncMock, patch import pytest +from sqlalchemy import insert, select, update -from reflector.db.recordings import Recording, recordings_controller +from reflector.db.base import ( + MeetingConsentModel, + MeetingModel, + RecordingModel, + TranscriptModel, +) from reflector.db.transcripts import SourceKind, transcripts_controller from reflector.worker.cleanup import cleanup_old_public_data @@ -21,7 +27,7 @@ async def test_cleanup_old_public_data_skips_when_not_public(): @pytest.mark.asyncio -async def test_cleanup_old_public_data_deletes_old_anonymous_transcripts(): +async def test_cleanup_old_public_data_deletes_old_anonymous_transcripts(session): """Test that old anonymous transcripts are deleted.""" # Create old and new anonymous transcripts old_date = datetime.now(timezone.utc) - timedelta(days=8) @@ -29,22 +35,23 @@ async def test_cleanup_old_public_data_deletes_old_anonymous_transcripts(): # Create old anonymous transcript (should be deleted) old_transcript = await transcripts_controller.add( + session, name="Old Anonymous Transcript", source_kind=SourceKind.FILE, user_id=None, # Anonymous ) - # Manually update created_at to be old - # Removed get_database import - from reflector.db.transcripts import transcripts - await get_database().execute( - transcripts.update() - .where(transcripts.c.id == old_transcript.id) + # Manually update created_at to be old + await session.execute( + update(TranscriptModel) + .where(TranscriptModel.id == old_transcript.id) .values(created_at=old_date) ) + await session.commit() # Create new anonymous transcript (should NOT be deleted) new_transcript = await transcripts_controller.add( + session, name="New Anonymous Transcript", source_kind=SourceKind.FILE, user_id=None, # Anonymous @@ -52,234 +59,258 @@ async def test_cleanup_old_public_data_deletes_old_anonymous_transcripts(): # Create old transcript with user (should NOT be deleted) old_user_transcript = await transcripts_controller.add( + session, name="Old User Transcript", source_kind=SourceKind.FILE, - user_id="user123", + user_id="user-123", ) - await get_database().execute( - transcripts.update() - .where(transcripts.c.id == old_user_transcript.id) + await session.execute( + update(TranscriptModel) + .where(TranscriptModel.id == old_user_transcript.id) .values(created_at=old_date) ) + await session.commit() + # Mock settings for public mode with patch("reflector.worker.cleanup.settings") as mock_settings: mock_settings.PUBLIC_MODE = True mock_settings.PUBLIC_DATA_RETENTION_DAYS = 7 - # Mock the storage deletion - with patch("reflector.db.transcripts.get_transcripts_storage") as mock_storage: - mock_storage.return_value.delete_file = AsyncMock() + # Mock delete_single_transcript to track what gets deleted + with patch("reflector.worker.cleanup.delete_single_transcript") as mock_delete: + mock_delete.return_value = None - result = await cleanup_old_public_data() + # Run cleanup + await cleanup_old_public_data() - # Check results - assert result["transcripts_deleted"] == 1 - assert result["errors"] == [] - - # Verify old anonymous transcript was deleted - assert await transcripts_controller.get_by_id(old_transcript.id) is None - - # Verify new anonymous transcript still exists - assert await transcripts_controller.get_by_id(new_transcript.id) is not None - - # Verify user transcript still exists - assert await transcripts_controller.get_by_id(old_user_transcript.id) is not None + # Verify only old anonymous transcript was deleted + assert mock_delete.call_count == 1 + # The function is called with session_factory, transcript_data dict, and stats dict + call_args = mock_delete.call_args[0] + transcript_data = call_args[1] + assert transcript_data["id"] == old_transcript.id @pytest.mark.asyncio -async def test_cleanup_deletes_associated_meeting_and_recording(): - """Test that meetings and recordings associated with old transcripts are deleted.""" - # Removed get_database import - from reflector.db.meetings import meetings - from reflector.db.transcripts import transcripts - +async def test_cleanup_deletes_associated_meeting_and_recording(session): + """Test that cleanup deletes associated meetings and recordings.""" old_date = datetime.now(timezone.utc) - timedelta(days=8) - # Create a meeting - meeting_id = "test-meeting-for-transcript" - await get_database().execute( - meetings.insert().values( - id=meeting_id, - room_name="Meeting with Transcript", - room_url="https://example.com/meeting", - host_room_url="https://example.com/meeting-host", - start_date=old_date, - end_date=old_date + timedelta(hours=1), - room_id=None, - ) - ) - - # Create a recording - recording = await recordings_controller.create( - Recording( - bucket_name="test-bucket", - object_key="test-recording.mp4", - recorded_at=old_date, - ) - ) - # Create an old transcript with both meeting and recording old_transcript = await transcripts_controller.add( + session, name="Old Transcript with Meeting and Recording", - source_kind=SourceKind.ROOM, + source_kind=SourceKind.FILE, user_id=None, - meeting_id=meeting_id, - recording_id=recording.id, ) - - # Update created_at to be old - await get_database().execute( - transcripts.update() - .where(transcripts.c.id == old_transcript.id) + await session.execute( + update(TranscriptModel) + .where(TranscriptModel.id == old_transcript.id) .values(created_at=old_date) ) + await session.commit() + # Create associated meeting directly + meeting_id = "test-meeting-id" + await session.execute( + insert(MeetingModel).values( + id=meeting_id, + transcript_id=old_transcript.id, + room_id="test-room", + room_name="test-room", + room_url="https://example.com/room", + host_room_url="https://example.com/room-host", + start_date=old_date, + end_date=old_date + timedelta(hours=1), + is_active=False, + num_clients=0, + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic", + ) + ) + + # Create associated recording directly + recording_id = "test-recording-id" + await session.execute( + insert(RecordingModel).values( + id=recording_id, + transcript_id=old_transcript.id, + meeting_id=meeting_id, + url="https://example.com/recording.mp4", + object_key="recordings/test.mp4", + duration=3600.0, + created_at=old_date, + ) + ) + await session.commit() + + # Update transcript with meeting_id and recording_id + await session.execute( + update(TranscriptModel) + .where(TranscriptModel.id == old_transcript.id) + .values(meeting_id=meeting_id, recording_id=recording_id) + ) + await session.commit() + + # Mock settings with patch("reflector.worker.cleanup.settings") as mock_settings: mock_settings.PUBLIC_MODE = True mock_settings.PUBLIC_DATA_RETENTION_DAYS = 7 # Mock storage deletion - with patch("reflector.db.transcripts.get_transcripts_storage") as mock_storage: + with patch("reflector.worker.cleanup.get_recordings_storage") as mock_storage: mock_storage.return_value.delete_file = AsyncMock() - with patch( - "reflector.worker.cleanup.get_recordings_storage" - ) as mock_rec_storage: - mock_rec_storage.return_value.delete_file = AsyncMock() - result = await cleanup_old_public_data() + # Run cleanup + await cleanup_old_public_data() - # Check results - assert result["transcripts_deleted"] == 1 - assert result["meetings_deleted"] == 1 - assert result["recordings_deleted"] == 1 - assert result["errors"] == [] + # Verify transcript was deleted + result = await session.execute( + select(TranscriptModel).where(TranscriptModel.id == old_transcript.id) + ) + transcript = result.scalar_one_or_none() + assert transcript is None - # Verify transcript was deleted - assert await transcripts_controller.get_by_id(old_transcript.id) is None + # Verify meeting was deleted + result = await session.execute( + select(MeetingModel).where(MeetingModel.id == meeting_id) + ) + meeting = result.scalar_one_or_none() + assert meeting is None - # Verify meeting was deleted - query = meetings.select().where(meetings.c.id == meeting_id) - meeting_result = await get_database().fetch_one(query) - assert meeting_result is None - - # Verify recording was deleted - assert await recordings_controller.get_by_id(recording.id) is None + # Verify recording was deleted + result = await session.execute( + select(RecordingModel).where(RecordingModel.id == recording_id) + ) + recording = result.scalar_one_or_none() + assert recording is None @pytest.mark.asyncio -async def test_cleanup_handles_errors_gracefully(): - """Test that cleanup continues even when individual deletions fail.""" +async def test_cleanup_handles_errors_gracefully(session): + """Test that cleanup continues even if individual deletions fail.""" old_date = datetime.now(timezone.utc) - timedelta(days=8) # Create multiple old transcripts transcript1 = await transcripts_controller.add( + session, name="Transcript 1", source_kind=SourceKind.FILE, user_id=None, ) + await session.execute( + update(TranscriptModel) + .where(TranscriptModel.id == transcript1.id) + .values(created_at=old_date) + ) + transcript2 = await transcripts_controller.add( + session, name="Transcript 2", source_kind=SourceKind.FILE, user_id=None, ) - - # Update created_at to be old - # Removed get_database import - from reflector.db.transcripts import transcripts - - for t_id in [transcript1.id, transcript2.id]: - await get_database().execute( - transcripts.update() - .where(transcripts.c.id == t_id) - .values(created_at=old_date) - ) + await session.execute( + update(TranscriptModel) + .where(TranscriptModel.id == transcript2.id) + .values(created_at=old_date) + ) + await session.commit() with patch("reflector.worker.cleanup.settings") as mock_settings: mock_settings.PUBLIC_MODE = True mock_settings.PUBLIC_DATA_RETENTION_DAYS = 7 - # Mock remove_by_id to fail for the first transcript - original_remove = transcripts_controller.remove_by_id - call_count = 0 + # Mock delete_single_transcript to fail on first call but succeed on second + with patch("reflector.worker.cleanup.delete_single_transcript") as mock_delete: + mock_delete.side_effect = [Exception("Delete failed"), None] - async def mock_remove_by_id(transcript_id, user_id=None): - nonlocal call_count - call_count += 1 - if call_count == 1: - raise Exception("Simulated deletion error") - return await original_remove(transcript_id, user_id) + # Run cleanup - should not raise exception + await cleanup_old_public_data() - with patch.object( - transcripts_controller, "remove_by_id", side_effect=mock_remove_by_id - ): - result = await cleanup_old_public_data() - - # Should have one successful deletion and one error - assert result["transcripts_deleted"] == 1 - assert len(result["errors"]) == 1 - assert "Failed to delete transcript" in result["errors"][0] + # Both transcripts should have been attempted to delete + assert mock_delete.call_count == 2 @pytest.mark.asyncio -async def test_meeting_consent_cascade_delete(): - """Test that meeting_consent records are automatically deleted when meeting is deleted.""" - # Removed get_database import - from reflector.db.meetings import ( - meeting_consent, - meeting_consent_controller, - meetings, - ) +async def test_meeting_consent_cascade_delete(session): + """Test that meeting_consent entries are cascade deleted with meetings.""" + old_date = datetime.now(timezone.utc) - timedelta(days=8) - # Create a meeting - meeting_id = "test-cascade-meeting" - await get_database().execute( - meetings.insert().values( + # Create an old transcript + transcript = await transcripts_controller.add( + session, + name="Transcript with Meeting", + source_kind=SourceKind.FILE, + user_id=None, + ) + await session.execute( + update(TranscriptModel) + .where(TranscriptModel.id == transcript.id) + .values(created_at=old_date) + ) + await session.commit() + + # Create a meeting directly + meeting_id = "test-meeting-consent" + await session.execute( + insert(MeetingModel).values( id=meeting_id, - room_name="Test Meeting for CASCADE", - room_url="https://example.com/cascade-test", - host_room_url="https://example.com/cascade-test-host", - start_date=datetime.now(timezone.utc), - end_date=datetime.now(timezone.utc) + timedelta(hours=1), - room_id=None, + transcript_id=transcript.id, + room_id="test-room", + room_name="test-room", + room_url="https://example.com/room", + host_room_url="https://example.com/room-host", + start_date=old_date, + end_date=old_date + timedelta(hours=1), + is_active=False, + num_clients=0, + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic", ) ) + await session.commit() - # Create consent records for this meeting - consent1_id = "consent-1" - consent2_id = "consent-2" - - await get_database().execute( - meeting_consent.insert().values( - id=consent1_id, + # Create meeting_consent entries + await session.execute( + insert(MeetingConsentModel).values( meeting_id=meeting_id, - user_id="user1", + user_name="User 1", consent_given=True, - consent_timestamp=datetime.now(timezone.utc), ) ) - - await get_database().execute( - meeting_consent.insert().values( - id=consent2_id, + await session.execute( + insert(MeetingConsentModel).values( meeting_id=meeting_id, - user_id="user2", - consent_given=False, - consent_timestamp=datetime.now(timezone.utc), + user_name="User 2", + consent_given=True, ) ) + await session.commit() - # Verify consent records exist - consents = await meeting_consent_controller.get_by_meeting_id(meeting_id) + # Verify consent entries exist + result = await session.execute( + select(MeetingConsentModel).where(MeetingConsentModel.meeting_id == meeting_id) + ) + consents = result.scalars().all() assert len(consents) == 2 - # Delete the meeting - await get_database().execute(meetings.delete().where(meetings.c.id == meeting_id)) + # Delete the transcript and meeting + await session.execute( + TranscriptModel.__table__.delete().where(TranscriptModel.id == transcript.id) + ) + await session.execute( + MeetingModel.__table__.delete().where(MeetingModel.id == meeting_id) + ) + await session.commit() - # Verify meeting is deleted - query = meetings.select().where(meetings.c.id == meeting_id) - result = await get_database().fetch_one(query) - assert result is None - - # Verify consent records are automatically deleted (CASCADE DELETE) - consents_after = await meeting_consent_controller.get_by_meeting_id(meeting_id) - assert len(consents_after) == 0 + # Verify consent entries were cascade deleted + result = await session.execute( + select(MeetingConsentModel).where(MeetingConsentModel.meeting_id == meeting_id) + ) + consents = result.scalars().all() + assert len(consents) == 0