From 6f0c7c1a5e751713366886c8e764c2009e12ba72 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Fri, 29 Aug 2025 08:47:14 -0600 Subject: [PATCH] feat(cleanup): add automatic data retention for public instances (#574) * feat(cleanup): add automatic data retention for public instances - Add Celery task to clean up anonymous data after configurable retention period - Delete transcripts, meetings, and orphaned recordings older than retention days - Only runs when PUBLIC_MODE is enabled to prevent accidental data loss - Properly removes all associated files (local and S3 storage) - Add manual cleanup tool for testing and intervention - Configure retention via PUBLIC_DATA_RETENTION_DAYS setting (default: 7 days) Fixes #571 * fix: apply pre-commit formatting fixes * fix: properly delete recording files from storage during cleanup - Add storage deletion for orphaned recordings in both cleanup task and manual tool - Delete from storage before removing database records - Log warnings if storage deletion fails but continue with database cleanup * Apply suggestion from @pr-agent-monadical[bot] Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com> * Apply suggestion from @pr-agent-monadical[bot] Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com> * refactor: cleanup_old_data for better logging * fix: linting * test: fix meeting cleanup test to not require room controller - Simplify test by directly inserting meetings into database - Remove dependency on non-existent rooms_controller.create method - Tests now pass successfully * fix: linting * refactor: simplify cleanup tool to use worker implementation - Remove duplicate cleanup logic from manual tool - Use the same _cleanup_old_public_data function from worker - Remove dry-run feature as requested - Prevent code duplication and ensure consistency - Update documentation to reflect changes * refactor: split cleanup worker into smaller functions - Move all imports to the top of the file - Extract cleanup logic into separate functions: - cleanup_old_transcripts() - cleanup_old_meetings() - cleanup_orphaned_recordings() - log_cleanup_results() - Make code more maintainable and testable - Add days parameter support to Celery task - Update manual tool to work with refactored code * feat: add TypedDict typing for cleanup stats - Add CleanupStats TypedDict for better type safety - Update all function signatures to use proper typing - Add return type annotations to _cleanup_old_public_data - Improves code maintainability and IDE support * feat: add CASCADE DELETE to meeting_consent foreign key - Add ondelete="CASCADE" to meeting_consent.meeting_id foreign key - Generate and apply migration to update existing constraint - Remove manual consent deletion from cleanup code - Add unit test to verify CASCADE DELETE behavior * style: linting * fix: alembic migration branchpoint * fix: correct downgrade constraint name in CASCADE DELETE migration * fix: regenerate CASCADE DELETE migration with proper constraint names - Delete problematic migration and regenerate with correct names - Use explicit constraint name in both upgrade and downgrade - Ensure migration works bidirectionally - All tests passing including CASCADE DELETE test * style: linting * refactor: simplify cleanup to use transcripts as entry point - Remove orphaned_recordings cleanup (not part of this PR scope) - Remove separate old_meetings cleanup - Transcripts are now the main entry point for cleanup - Associated meetings and recordings are deleted with their transcript - Use single database connection for all operations - Update tests to reflect new approach * refactor: cleanup and rename functions for clarity - Rename _cleanup_old_public_data to cleanup_old_public_data (make public) - Rename celery task to cleanup_old_public_data_task for clarity - Update docstrings and improve code organization - Remove unnecessary comments and simplify deletion logic - Update tests to use new function names - All tests passing * style: linting\ * style: typing and review * fix: add transaction on cleanup_single_transcript * fix: naming --------- Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com> --- server/docs/data_retention.md | 95 ++++++ ..._add_cascade_delete_to_meeting_consent_.py | 50 +++ server/reflector/asynctask.py | 27 ++ server/reflector/db/meetings.py | 7 +- .../reflector/pipelines/main_file_pipeline.py | 2 +- .../reflector/pipelines/main_live_pipeline.py | 25 +- server/reflector/settings.py | 4 +- server/reflector/tools/cleanup_old_data.py | 72 +++++ server/reflector/worker/app.py | 11 + server/reflector/worker/cleanup.py | 156 ++++++++++ server/tests/test_cleanup.py | 287 ++++++++++++++++++ 11 files changed, 708 insertions(+), 28 deletions(-) create mode 100644 server/docs/data_retention.md create mode 100644 server/migrations/versions/5a8907fd1d78_add_cascade_delete_to_meeting_consent_.py create mode 100644 server/reflector/asynctask.py create mode 100644 server/reflector/tools/cleanup_old_data.py create mode 100644 server/reflector/worker/cleanup.py create mode 100644 server/tests/test_cleanup.py diff --git a/server/docs/data_retention.md b/server/docs/data_retention.md new file mode 100644 index 00000000..1a21b59d --- /dev/null +++ b/server/docs/data_retention.md @@ -0,0 +1,95 @@ +# Data Retention and Cleanup + +## Overview + +For public instances of Reflector, a data retention policy is automatically enforced to delete anonymous user data after a configurable period (default: 7 days). This ensures compliance with privacy expectations and prevents unbounded storage growth. + +## Configuration + +### Environment Variables + +- `PUBLIC_MODE` (bool): Must be set to `true` to enable automatic cleanup +- `PUBLIC_DATA_RETENTION_DAYS` (int): Number of days to retain anonymous data (default: 7) + +### What Gets Deleted + +When data reaches the retention period, the following items are automatically removed: + +1. **Transcripts** from anonymous users (where `user_id` is NULL): + - Database records + - Local files (audio.wav, audio.mp3, audio.json waveform) + - Storage files (cloud storage if configured) + +## Automatic Cleanup + +### Celery Beat Schedule + +When `PUBLIC_MODE=true`, a Celery beat task runs daily at 3 AM to clean up old data: + +```python +# Automatically scheduled when PUBLIC_MODE=true +"cleanup_old_public_data": { + "task": "reflector.worker.cleanup.cleanup_old_public_data", + "schedule": crontab(hour=3, minute=0), # Daily at 3 AM +} +``` + +### Running the Worker + +Ensure both Celery worker and beat scheduler are running: + +```bash +# Start Celery worker +uv run celery -A reflector.worker.app worker --loglevel=info + +# Start Celery beat scheduler (in another terminal) +uv run celery -A reflector.worker.app beat +``` + +## Manual Cleanup + +For testing or manual intervention, use the cleanup tool: + +```bash +# Delete data older than 7 days (default) +uv run python -m reflector.tools.cleanup_old_data + +# Delete data older than 30 days +uv run python -m reflector.tools.cleanup_old_data --days 30 +``` + +Note: The manual tool uses the same implementation as the Celery worker task to ensure consistency. + +## Important Notes + +1. **User Data Deletion**: Only anonymous data (where `user_id` is NULL) is deleted. Authenticated user data is preserved. + +2. **Storage Cleanup**: The system properly cleans up both local files and cloud storage when configured. + +3. **Error Handling**: If individual deletions fail, the cleanup continues and logs errors. Failed deletions are reported in the task output. + +4. **Public Instance Only**: The automatic cleanup task only runs when `PUBLIC_MODE=true` to prevent accidental data loss in private deployments. + +## Testing + +Run the cleanup tests: + +```bash +uv run pytest tests/test_cleanup.py -v +``` + +## Monitoring + +Check Celery logs for cleanup task execution: + +```bash +# Look for cleanup task logs +grep "cleanup_old_public_data" celery.log +grep "Starting cleanup of old public data" celery.log +``` + +Task statistics are logged after each run: +- Number of transcripts deleted +- Number of meetings deleted +- Number of orphaned recordings deleted +- Any errors encountered diff --git a/server/migrations/versions/5a8907fd1d78_add_cascade_delete_to_meeting_consent_.py b/server/migrations/versions/5a8907fd1d78_add_cascade_delete_to_meeting_consent_.py new file mode 100644 index 00000000..af6a5c22 --- /dev/null +++ b/server/migrations/versions/5a8907fd1d78_add_cascade_delete_to_meeting_consent_.py @@ -0,0 +1,50 @@ +"""add cascade delete to meeting consent foreign key + +Revision ID: 5a8907fd1d78 +Revises: 0ab2d7ffaa16 +Create Date: 2025-08-26 17:26:50.945491 + +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "5a8907fd1d78" +down_revision: Union[str, None] = "0ab2d7ffaa16" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("meeting_consent", schema=None) as batch_op: + batch_op.drop_constraint( + batch_op.f("meeting_consent_meeting_id_fkey"), type_="foreignkey" + ) + batch_op.create_foreign_key( + batch_op.f("meeting_consent_meeting_id_fkey"), + "meeting", + ["meeting_id"], + ["id"], + ondelete="CASCADE", + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("meeting_consent", schema=None) as batch_op: + batch_op.drop_constraint( + batch_op.f("meeting_consent_meeting_id_fkey"), type_="foreignkey" + ) + batch_op.create_foreign_key( + batch_op.f("meeting_consent_meeting_id_fkey"), + "meeting", + ["meeting_id"], + ["id"], + ) + + # ### end Alembic commands ### diff --git a/server/reflector/asynctask.py b/server/reflector/asynctask.py new file mode 100644 index 00000000..61523a6f --- /dev/null +++ b/server/reflector/asynctask.py @@ -0,0 +1,27 @@ +import asyncio +import functools + +from reflector.db import get_database + + +def asynctask(f): + @functools.wraps(f) + def wrapper(*args, **kwargs): + async def run_with_db(): + database = get_database() + await database.connect() + try: + return await f(*args, **kwargs) + finally: + await database.disconnect() + + coro = run_with_db() + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + if loop and loop.is_running(): + return loop.run_until_complete(coro) + return asyncio.run(coro) + + return wrapper diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index 40bd6f8a..85178351 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -54,7 +54,12 @@ meeting_consent = sa.Table( "meeting_consent", metadata, sa.Column("id", sa.String, primary_key=True), - sa.Column("meeting_id", sa.String, sa.ForeignKey("meeting.id"), nullable=False), + sa.Column( + "meeting_id", + sa.String, + sa.ForeignKey("meeting.id", ondelete="CASCADE"), + nullable=False, + ), sa.Column("user_id", sa.String), sa.Column("consent_given", sa.Boolean, nullable=False), sa.Column("consent_timestamp", sa.DateTime(timezone=True), nullable=False), diff --git a/server/reflector/pipelines/main_file_pipeline.py b/server/reflector/pipelines/main_file_pipeline.py index f11cddca..42333aa9 100644 --- a/server/reflector/pipelines/main_file_pipeline.py +++ b/server/reflector/pipelines/main_file_pipeline.py @@ -13,6 +13,7 @@ import av import structlog from celery import shared_task +from reflector.asynctask import asynctask from reflector.db.transcripts import ( Transcript, TranscriptStatus, @@ -21,7 +22,6 @@ from reflector.db.transcripts import ( from reflector.logger import logger from reflector.pipelines.main_live_pipeline import ( PipelineMainBase, - asynctask, broadcast_to_sockets, ) from reflector.processors import ( diff --git a/server/reflector/pipelines/main_live_pipeline.py b/server/reflector/pipelines/main_live_pipeline.py index 30c8777b..64904952 100644 --- a/server/reflector/pipelines/main_live_pipeline.py +++ b/server/reflector/pipelines/main_live_pipeline.py @@ -22,7 +22,7 @@ from celery import chord, current_task, group, shared_task from pydantic import BaseModel from structlog import BoundLogger as Logger -from reflector.db import get_database +from reflector.asynctask import asynctask from reflector.db.meetings import meeting_consent_controller, meetings_controller from reflector.db.recordings import recordings_controller from reflector.db.rooms import rooms_controller @@ -70,29 +70,6 @@ from reflector.zulip import ( ) -def asynctask(f): - @functools.wraps(f) - def wrapper(*args, **kwargs): - async def run_with_db(): - database = get_database() - await database.connect() - try: - return await f(*args, **kwargs) - finally: - await database.disconnect() - - coro = run_with_db() - try: - loop = asyncio.get_running_loop() - except RuntimeError: - loop = None - if loop and loop.is_running(): - return loop.run_until_complete(coro) - return asyncio.run(coro) - - return wrapper - - def broadcast_to_sockets(func): """ Decorator to broadcast transcript event to websockets diff --git a/server/reflector/settings.py b/server/reflector/settings.py index bbc835cd..686f67c1 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -1,3 +1,4 @@ +from pydantic.types import PositiveInt from pydantic_settings import BaseSettings, SettingsConfigDict @@ -90,9 +91,8 @@ class Settings(BaseSettings): AUTH_JWT_PUBLIC_KEY: str | None = "authentik.monadical.com_public.pem" AUTH_JWT_AUDIENCE: str | None = None - # API public mode - # if set, all anonymous record will be public PUBLIC_MODE: bool = False + PUBLIC_DATA_RETENTION_DAYS: PositiveInt = 7 # Min transcript length to generate topic + summary MIN_TRANSCRIPT_LENGTH: int = 750 diff --git a/server/reflector/tools/cleanup_old_data.py b/server/reflector/tools/cleanup_old_data.py new file mode 100644 index 00000000..9ffa4684 --- /dev/null +++ b/server/reflector/tools/cleanup_old_data.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python +""" +Manual cleanup tool for old public data. +Uses the same implementation as the Celery worker task. +""" + +import argparse +import asyncio +import sys + +import structlog + +from reflector.settings import settings +from reflector.worker.cleanup import _cleanup_old_public_data + +logger = structlog.get_logger(__name__) + + +async def cleanup_old_data(days: int = 7): + logger.info( + "Starting manual cleanup", + retention_days=days, + public_mode=settings.PUBLIC_MODE, + ) + + if not settings.PUBLIC_MODE: + logger.critical( + "WARNING: PUBLIC_MODE is False. " + "This tool is intended for public instances only." + ) + raise Exception("Tool intended for public instances only") + + result = await _cleanup_old_public_data(days=days) + + if result: + logger.info( + "Cleanup completed", + transcripts_deleted=result.get("transcripts_deleted", 0), + meetings_deleted=result.get("meetings_deleted", 0), + recordings_deleted=result.get("recordings_deleted", 0), + errors_count=len(result.get("errors", [])), + ) + if result.get("errors"): + logger.warning( + "Errors encountered during cleanup:", errors=result["errors"][:10] + ) + else: + logger.info("Cleanup skipped or completed without results") + + +def main(): + parser = argparse.ArgumentParser( + description="Clean up old transcripts and meetings" + ) + parser.add_argument( + "--days", + type=int, + default=7, + help="Number of days to keep data (default: 7)", + ) + + args = parser.parse_args() + + if args.days < 1: + logger.error("Days must be at least 1") + sys.exit(1) + + asyncio.run(cleanup_old_data(days=args.days)) + + +if __name__ == "__main__": + main() diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index 7e888f41..e9468bd2 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -19,6 +19,7 @@ else: "reflector.pipelines.main_live_pipeline", "reflector.worker.healthcheck", "reflector.worker.process", + "reflector.worker.cleanup", ] ) @@ -38,6 +39,16 @@ else: }, } + if settings.PUBLIC_MODE: + app.conf.beat_schedule["cleanup_old_public_data"] = { + "task": "reflector.worker.cleanup.cleanup_old_public_data_task", + "schedule": crontab(hour=3, minute=0), + } + logger.info( + "Public mode cleanup enabled", + retention_days=settings.PUBLIC_DATA_RETENTION_DAYS, + ) + if settings.HEALTHCHECK_URL: app.conf.beat_schedule["healthcheck_ping"] = { "task": "reflector.worker.healthcheck.healthcheck_ping", diff --git a/server/reflector/worker/cleanup.py b/server/reflector/worker/cleanup.py new file mode 100644 index 00000000..e634994d --- /dev/null +++ b/server/reflector/worker/cleanup.py @@ -0,0 +1,156 @@ +""" +Main task for cleanup old public data. + +Deletes old anonymous transcripts and their associated meetings/recordings. +Transcripts are the main entry point - any associated data is also removed. +""" + +import asyncio +from datetime import datetime, timedelta, timezone +from typing import TypedDict + +import structlog +from celery import shared_task +from databases import Database +from pydantic.types import PositiveInt + +from reflector.asynctask import asynctask +from reflector.db import get_database +from reflector.db.meetings import meetings +from reflector.db.recordings import recordings +from reflector.db.transcripts import transcripts, transcripts_controller +from reflector.settings import settings +from reflector.storage import get_recordings_storage + +logger = structlog.get_logger(__name__) + + +class CleanupStats(TypedDict): + """Statistics for cleanup operation.""" + + transcripts_deleted: int + meetings_deleted: int + recordings_deleted: int + errors: list[str] + + +async def delete_single_transcript( + db: Database, transcript_data: dict, stats: CleanupStats +): + transcript_id = transcript_data["id"] + meeting_id = transcript_data["meeting_id"] + recording_id = transcript_data["recording_id"] + + try: + async with db.transaction(isolation="serializable"): + if meeting_id: + await db.execute(meetings.delete().where(meetings.c.id == meeting_id)) + stats["meetings_deleted"] += 1 + logger.info("Deleted associated meeting", meeting_id=meeting_id) + + if recording_id: + recording = await db.fetch_one( + recordings.select().where(recordings.c.id == recording_id) + ) + if recording: + try: + await get_recordings_storage().delete_file( + recording["object_key"] + ) + except Exception as storage_error: + logger.warning( + "Failed to delete recording from storage", + recording_id=recording_id, + object_key=recording["object_key"], + error=str(storage_error), + ) + + await db.execute( + recordings.delete().where(recordings.c.id == recording_id) + ) + stats["recordings_deleted"] += 1 + logger.info( + "Deleted associated recording", recording_id=recording_id + ) + + await transcripts_controller.remove_by_id(transcript_id) + stats["transcripts_deleted"] += 1 + logger.info( + "Deleted transcript", + transcript_id=transcript_id, + created_at=transcript_data["created_at"].isoformat(), + ) + except Exception as e: + error_msg = f"Failed to delete transcript {transcript_id}: {str(e)}" + logger.error(error_msg, exc_info=e) + stats["errors"].append(error_msg) + + +async def cleanup_old_transcripts( + db: Database, cutoff_date: datetime, stats: CleanupStats +): + """Delete old anonymous transcripts and their associated recordings/meetings.""" + query = transcripts.select().where( + (transcripts.c.created_at < cutoff_date) & (transcripts.c.user_id.is_(None)) + ) + old_transcripts = await db.fetch_all(query) + + logger.info(f"Found {len(old_transcripts)} old transcripts to delete") + + for transcript_data in old_transcripts: + await delete_single_transcript(db, transcript_data, stats) + + +def log_cleanup_results(stats: CleanupStats): + logger.info( + "Cleanup completed", + transcripts_deleted=stats["transcripts_deleted"], + meetings_deleted=stats["meetings_deleted"], + recordings_deleted=stats["recordings_deleted"], + errors_count=len(stats["errors"]), + ) + + if stats["errors"]: + logger.warning( + "Cleanup completed with errors", + errors=stats["errors"][:10], + ) + + +async def cleanup_old_public_data( + days: PositiveInt | None = None, +) -> CleanupStats | None: + if days is None: + days = settings.PUBLIC_DATA_RETENTION_DAYS + + if not settings.PUBLIC_MODE: + logger.info("Skipping cleanup - not a public instance") + return None + + cutoff_date = datetime.now(timezone.utc) - timedelta(days=days) + logger.info( + "Starting cleanup of old public data", + cutoff_date=cutoff_date.isoformat(), + ) + + stats: CleanupStats = { + "transcripts_deleted": 0, + "meetings_deleted": 0, + "recordings_deleted": 0, + "errors": [], + } + + db = get_database() + await cleanup_old_transcripts(db, cutoff_date, stats) + + log_cleanup_results(stats) + return stats + + +@shared_task( + autoretry_for=(Exception,), + retry_kwargs={"max_retries": 3, "countdown": 300}, +) +@asynctask +def cleanup_old_public_data_task(days: int | None = None): + asyncio.run(cleanup_old_public_data(days=days)) diff --git a/server/tests/test_cleanup.py b/server/tests/test_cleanup.py new file mode 100644 index 00000000..3c5149ae --- /dev/null +++ b/server/tests/test_cleanup.py @@ -0,0 +1,287 @@ +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, patch + +import pytest + +from reflector.db.recordings import Recording, recordings_controller +from reflector.db.transcripts import SourceKind, transcripts_controller +from reflector.worker.cleanup import cleanup_old_public_data + + +@pytest.mark.asyncio +async def test_cleanup_old_public_data_skips_when_not_public(): + """Test that cleanup is skipped when PUBLIC_MODE is False.""" + with patch("reflector.worker.cleanup.settings") as mock_settings: + mock_settings.PUBLIC_MODE = False + + result = await cleanup_old_public_data() + + # Should return early without doing anything + assert result is None + + +@pytest.mark.asyncio +async def test_cleanup_old_public_data_deletes_old_anonymous_transcripts(): + """Test that old anonymous transcripts are deleted.""" + # Create old and new anonymous transcripts + old_date = datetime.now(timezone.utc) - timedelta(days=8) + new_date = datetime.now(timezone.utc) - timedelta(days=2) + + # Create old anonymous transcript (should be deleted) + old_transcript = await transcripts_controller.add( + name="Old Anonymous Transcript", + source_kind=SourceKind.FILE, + user_id=None, # Anonymous + ) + # Manually update created_at to be old + from reflector.db import get_database + from reflector.db.transcripts import transcripts + + await get_database().execute( + transcripts.update() + .where(transcripts.c.id == old_transcript.id) + .values(created_at=old_date) + ) + + # Create new anonymous transcript (should NOT be deleted) + new_transcript = await transcripts_controller.add( + name="New Anonymous Transcript", + source_kind=SourceKind.FILE, + user_id=None, # Anonymous + ) + + # Create old transcript with user (should NOT be deleted) + old_user_transcript = await transcripts_controller.add( + name="Old User Transcript", + source_kind=SourceKind.FILE, + user_id="user123", + ) + await get_database().execute( + transcripts.update() + .where(transcripts.c.id == old_user_transcript.id) + .values(created_at=old_date) + ) + + with patch("reflector.worker.cleanup.settings") as mock_settings: + mock_settings.PUBLIC_MODE = True + mock_settings.PUBLIC_DATA_RETENTION_DAYS = 7 + + # Mock the storage deletion + with patch("reflector.db.transcripts.get_transcripts_storage") as mock_storage: + mock_storage.return_value.delete_file = AsyncMock() + + result = await cleanup_old_public_data() + + # Check results + assert result["transcripts_deleted"] == 1 + assert result["errors"] == [] + + # Verify old anonymous transcript was deleted + assert await transcripts_controller.get_by_id(old_transcript.id) is None + + # Verify new anonymous transcript still exists + assert await transcripts_controller.get_by_id(new_transcript.id) is not None + + # Verify user transcript still exists + assert await transcripts_controller.get_by_id(old_user_transcript.id) is not None + + +@pytest.mark.asyncio +async def test_cleanup_deletes_associated_meeting_and_recording(): + """Test that meetings and recordings associated with old transcripts are deleted.""" + from reflector.db import get_database + from reflector.db.meetings import meetings + from reflector.db.transcripts import transcripts + + old_date = datetime.now(timezone.utc) - timedelta(days=8) + + # Create a meeting + meeting_id = "test-meeting-for-transcript" + await get_database().execute( + meetings.insert().values( + id=meeting_id, + room_name="Meeting with Transcript", + room_url="https://example.com/meeting", + host_room_url="https://example.com/meeting-host", + start_date=old_date, + end_date=old_date + timedelta(hours=1), + user_id=None, + room_id=None, + ) + ) + + # Create a recording + recording = await recordings_controller.create( + Recording( + bucket_name="test-bucket", + object_key="test-recording.mp4", + recorded_at=old_date, + ) + ) + + # Create an old transcript with both meeting and recording + old_transcript = await transcripts_controller.add( + name="Old Transcript with Meeting and Recording", + source_kind=SourceKind.ROOM, + user_id=None, + meeting_id=meeting_id, + recording_id=recording.id, + ) + + # Update created_at to be old + await get_database().execute( + transcripts.update() + .where(transcripts.c.id == old_transcript.id) + .values(created_at=old_date) + ) + + with patch("reflector.worker.cleanup.settings") as mock_settings: + mock_settings.PUBLIC_MODE = True + mock_settings.PUBLIC_DATA_RETENTION_DAYS = 7 + + # Mock storage deletion + with patch("reflector.db.transcripts.get_transcripts_storage") as mock_storage: + mock_storage.return_value.delete_file = AsyncMock() + with patch( + "reflector.worker.cleanup.get_recordings_storage" + ) as mock_rec_storage: + mock_rec_storage.return_value.delete_file = AsyncMock() + + result = await cleanup_old_public_data() + + # Check results + assert result["transcripts_deleted"] == 1 + assert result["meetings_deleted"] == 1 + assert result["recordings_deleted"] == 1 + assert result["errors"] == [] + + # Verify transcript was deleted + assert await transcripts_controller.get_by_id(old_transcript.id) is None + + # Verify meeting was deleted + query = meetings.select().where(meetings.c.id == meeting_id) + meeting_result = await get_database().fetch_one(query) + assert meeting_result is None + + # Verify recording was deleted + assert await recordings_controller.get_by_id(recording.id) is None + + +@pytest.mark.asyncio +async def test_cleanup_handles_errors_gracefully(): + """Test that cleanup continues even when individual deletions fail.""" + old_date = datetime.now(timezone.utc) - timedelta(days=8) + + # Create multiple old transcripts + transcript1 = await transcripts_controller.add( + name="Transcript 1", + source_kind=SourceKind.FILE, + user_id=None, + ) + transcript2 = await transcripts_controller.add( + name="Transcript 2", + source_kind=SourceKind.FILE, + user_id=None, + ) + + # Update created_at to be old + from reflector.db import get_database + from reflector.db.transcripts import transcripts + + for t_id in [transcript1.id, transcript2.id]: + await get_database().execute( + transcripts.update() + .where(transcripts.c.id == t_id) + .values(created_at=old_date) + ) + + with patch("reflector.worker.cleanup.settings") as mock_settings: + mock_settings.PUBLIC_MODE = True + mock_settings.PUBLIC_DATA_RETENTION_DAYS = 7 + + # Mock remove_by_id to fail for the first transcript + original_remove = transcripts_controller.remove_by_id + call_count = 0 + + async def mock_remove_by_id(transcript_id, user_id=None): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise Exception("Simulated deletion error") + return await original_remove(transcript_id, user_id) + + with patch.object( + transcripts_controller, "remove_by_id", side_effect=mock_remove_by_id + ): + result = await cleanup_old_public_data() + + # Should have one successful deletion and one error + assert result["transcripts_deleted"] == 1 + assert len(result["errors"]) == 1 + assert "Failed to delete transcript" in result["errors"][0] + + +@pytest.mark.asyncio +async def test_meeting_consent_cascade_delete(): + """Test that meeting_consent records are automatically deleted when meeting is deleted.""" + from reflector.db import get_database + from reflector.db.meetings import ( + meeting_consent, + meeting_consent_controller, + meetings, + ) + + # Create a meeting + meeting_id = "test-cascade-meeting" + await get_database().execute( + meetings.insert().values( + id=meeting_id, + room_name="Test Meeting for CASCADE", + room_url="https://example.com/cascade-test", + host_room_url="https://example.com/cascade-test-host", + start_date=datetime.now(timezone.utc), + end_date=datetime.now(timezone.utc) + timedelta(hours=1), + user_id="test-user", + room_id=None, + ) + ) + + # Create consent records for this meeting + consent1_id = "consent-1" + consent2_id = "consent-2" + + await get_database().execute( + meeting_consent.insert().values( + id=consent1_id, + meeting_id=meeting_id, + user_id="user1", + consent_given=True, + consent_timestamp=datetime.now(timezone.utc), + ) + ) + + await get_database().execute( + meeting_consent.insert().values( + id=consent2_id, + meeting_id=meeting_id, + user_id="user2", + consent_given=False, + consent_timestamp=datetime.now(timezone.utc), + ) + ) + + # Verify consent records exist + consents = await meeting_consent_controller.get_by_meeting_id(meeting_id) + assert len(consents) == 2 + + # Delete the meeting + await get_database().execute(meetings.delete().where(meetings.c.id == meeting_id)) + + # Verify meeting is deleted + query = meetings.select().where(meetings.c.id == meeting_id) + result = await get_database().fetch_one(query) + assert result is None + + # Verify consent records are automatically deleted (CASCADE DELETE) + consents_after = await meeting_consent_controller.get_by_meeting_id(meeting_id) + assert len(consents_after) == 0