diff --git a/server/migrations/versions/c1d2e3f4a5b6_add_store_video_to_room_and_meeting.py b/server/migrations/versions/c1d2e3f4a5b6_add_store_video_to_room_and_meeting.py new file mode 100644 index 00000000..ca7dffc8 --- /dev/null +++ b/server/migrations/versions/c1d2e3f4a5b6_add_store_video_to_room_and_meeting.py @@ -0,0 +1,43 @@ +"""add store_video to room and meeting + +Revision ID: c1d2e3f4a5b6 +Revises: b4c7e8f9a012 +Create Date: 2026-04-08 00:00:00.000000 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +revision: str = "c1d2e3f4a5b6" +down_revision: Union[str, None] = "b4c7e8f9a012" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "room", + sa.Column( + "store_video", + sa.Boolean(), + nullable=False, + server_default=sa.false(), + ), + ) + op.add_column( + "meeting", + sa.Column( + "store_video", + sa.Boolean(), + nullable=False, + server_default=sa.false(), + ), + ) + + +def downgrade() -> None: + op.drop_column("meeting", "store_video") + op.drop_column("room", "store_video") diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index 212877f0..3dacc00c 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -69,6 +69,7 @@ meetings = sa.Table( sa.Column("daily_composed_video_duration", sa.Integer, nullable=True), # Email recipients for transcript notification sa.Column("email_recipients", JSONB, nullable=True), + sa.Column("store_video", sa.Boolean, nullable=False, server_default=sa.false()), sa.Index("idx_meeting_room_id", "room_id"), sa.Index("idx_meeting_calendar_event", "calendar_event_id"), ) @@ -122,6 +123,7 @@ class Meeting(BaseModel): # Email recipients for transcript notification # Each entry is {"email": str, "include_link": bool} or a legacy plain str email_recipients: list[dict | str] | None = None + store_video: bool = False class MeetingController: @@ -152,6 +154,7 @@ class MeetingController: calendar_event_id=calendar_event_id, calendar_metadata=calendar_metadata, platform=room.platform, + store_video=room.store_video, ) query = meetings.insert().values(**meeting.model_dump()) await get_database().execute(query) diff --git a/server/reflector/db/rooms.py b/server/reflector/db/rooms.py index 80922af9..f6ca7974 100644 --- a/server/reflector/db/rooms.py +++ b/server/reflector/db/rooms.py @@ -64,6 +64,9 @@ rooms = sqlalchemy.Table( server_default=sqlalchemy.sql.false(), ), sqlalchemy.Column("email_transcript_to", sqlalchemy.String, nullable=True), + sqlalchemy.Column( + "store_video", sqlalchemy.Boolean, nullable=False, server_default=false() + ), sqlalchemy.Index("idx_room_is_shared", "is_shared"), sqlalchemy.Index("idx_room_ics_enabled", "ics_enabled"), ) @@ -94,6 +97,7 @@ class Room(BaseModel): platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) skip_consent: bool = False email_transcript_to: str | None = None + store_video: bool = False class RoomController: @@ -150,6 +154,7 @@ class RoomController: platform: Platform = settings.DEFAULT_VIDEO_PLATFORM, skip_consent: bool = False, email_transcript_to: str | None = None, + store_video: bool = False, ): """ Add a new room @@ -176,6 +181,7 @@ class RoomController: "platform": platform, "skip_consent": skip_consent, "email_transcript_to": email_transcript_to, + "store_video": store_video, } room = Room(**room_data) diff --git a/server/reflector/hatchet/run_workers_llm.py b/server/reflector/hatchet/run_workers_llm.py index fdf61127..1be0b2bb 100644 --- a/server/reflector/hatchet/run_workers_llm.py +++ b/server/reflector/hatchet/run_workers_llm.py @@ -10,6 +10,7 @@ from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.workflows.daily_multitrack_pipeline import ( daily_multitrack_pipeline, ) +from reflector.hatchet.workflows.failed_runs_monitor import failed_runs_monitor from reflector.hatchet.workflows.file_pipeline import file_pipeline from reflector.hatchet.workflows.live_post_pipeline import live_post_pipeline from reflector.hatchet.workflows.subject_processing import subject_workflow @@ -54,10 +55,6 @@ def main(): ] ) if _zulip_dag_enabled: - from reflector.hatchet.workflows.failed_runs_monitor import ( # noqa: PLC0415 - failed_runs_monitor, - ) - workflows.append(failed_runs_monitor) logger.info( "FailedRunsMonitor cron enabled", diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index 84b5b203..43faefcc 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -219,6 +219,32 @@ async def _handle_recording_ready(event: RecordingReadyEvent): track_keys = [t.s3Key for t in tracks if t.type == "audio"] + # Delete video tracks when store_video is disabled (same pattern as LiveKit). + # Only delete if we have a meeting AND store_video is explicitly false. + # If no meeting found, leave files alone (can't confirm user intent). + video_track_keys = [t.s3Key for t in tracks if t.type == "video"] + if video_track_keys: + meeting = await meetings_controller.get_by_room_name(room_name) + if meeting is not None and not meeting.store_video: + from reflector.storage import get_source_storage + + storage = get_source_storage("daily") + for video_key in video_track_keys: + try: + await storage.delete_file(video_key) + logger.info( + "Deleted video track from raw-tracks recording", + s3_key=video_key, + room_name=room_name, + ) + except Exception as e: + # Non-critical — pipeline filters these out anyway + logger.warning( + "Failed to delete video track from raw-tracks recording", + s3_key=video_key, + error=str(e), + ) + logger.info( "Raw-tracks recording queuing processing", recording_id=recording_id, diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index 3ba55f63..90969aa7 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -45,6 +45,7 @@ class Room(BaseModel): platform: Platform skip_consent: bool = False email_transcript_to: str | None = None + store_video: bool = False class RoomDetails(Room): @@ -75,6 +76,7 @@ class Meeting(BaseModel): platform: Platform daily_composed_video_s3_key: str | None = None daily_composed_video_duration: int | None = None + store_video: bool = False class CreateRoom(BaseModel): @@ -95,6 +97,7 @@ class CreateRoom(BaseModel): platform: Platform skip_consent: bool = False email_transcript_to: str | None = None + store_video: bool = False class UpdateRoom(BaseModel): @@ -115,6 +118,7 @@ class UpdateRoom(BaseModel): platform: Optional[Platform] = None skip_consent: Optional[bool] = None email_transcript_to: Optional[str] = None + store_video: Optional[bool] = None class CreateRoomMeeting(BaseModel): @@ -257,6 +261,7 @@ async def rooms_create( platform=room.platform, skip_consent=room.skip_consent, email_transcript_to=room.email_transcript_to, + store_video=room.store_video, ) @@ -325,6 +330,7 @@ async def rooms_create_meeting( and meeting.recording_type == room.recording_type and meeting.recording_trigger == room.recording_trigger and meeting.platform == room.platform + and meeting.store_video == room.store_video ) if not settings_match: logger.info( diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index 5e2f730f..5387079b 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -30,6 +30,8 @@ def build_beat_schedule( whereby_api_key=None, aws_process_recording_queue_url=None, daily_api_key=None, + livekit_api_key=None, + livekit_url=None, public_mode=False, public_data_retention_days=None, healthcheck_url=None, @@ -83,7 +85,7 @@ def build_beat_schedule( else: logger.info("Daily.co beat tasks disabled (no DAILY_API_KEY)") - _livekit_enabled = bool(settings.LIVEKIT_API_KEY and settings.LIVEKIT_URL) + _livekit_enabled = bool(livekit_api_key and livekit_url) if _livekit_enabled: beat_schedule["process_livekit_ended_meetings"] = { "task": "reflector.worker.process.process_livekit_ended_meetings", @@ -175,6 +177,8 @@ else: whereby_api_key=settings.WHEREBY_API_KEY, aws_process_recording_queue_url=settings.AWS_PROCESS_RECORDING_QUEUE_URL, daily_api_key=settings.DAILY_API_KEY, + livekit_api_key=settings.LIVEKIT_API_KEY, + livekit_url=settings.LIVEKIT_URL, public_mode=settings.PUBLIC_MODE, public_data_retention_days=settings.PUBLIC_DATA_RETENTION_DAYS, healthcheck_url=settings.HEALTHCHECK_URL, diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 432a701c..130b32d8 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -562,6 +562,15 @@ async def store_cloud_recording( ) return False + if not meeting.store_video: + logger.info( + f"Cloud recording ({source}): skipped, store_video=false", + recording_id=recording_id, + room_name=room_name, + meeting_id=meeting.id, + ) + return False + success = await meetings_controller.set_cloud_recording_if_missing( meeting_id=meeting.id, s3_key=s3_key, diff --git a/server/tests/test_beat_schedule.py b/server/tests/test_beat_schedule.py index 382af9a2..4cb0230f 100644 --- a/server/tests/test_beat_schedule.py +++ b/server/tests/test_beat_schedule.py @@ -32,6 +32,10 @@ DAILY_TASKS = { "trigger_daily_reconciliation", "reprocess_failed_daily_recordings", } +LIVEKIT_TASKS = { + "process_livekit_ended_meetings", + "reprocess_failed_livekit_recordings", +} PLATFORM_TASKS = { "process_meetings", "sync_all_ics_calendars", @@ -47,6 +51,7 @@ class TestNoPlatformConfigured: task_names = set(schedule.keys()) assert not task_names & WHEREBY_TASKS assert not task_names & DAILY_TASKS + assert not task_names & LIVEKIT_TASKS assert not task_names & PLATFORM_TASKS def test_only_healthcheck_disabled_warning(self): @@ -72,6 +77,7 @@ class TestWherebyOnly: assert WHEREBY_TASKS <= task_names assert PLATFORM_TASKS <= task_names assert not task_names & DAILY_TASKS + assert not task_names & LIVEKIT_TASKS def test_whereby_sqs_url(self): schedule = build_beat_schedule( @@ -81,6 +87,7 @@ class TestWherebyOnly: assert WHEREBY_TASKS <= task_names assert PLATFORM_TASKS <= task_names assert not task_names & DAILY_TASKS + assert not task_names & LIVEKIT_TASKS def test_whereby_task_count(self): schedule = build_beat_schedule(whereby_api_key="test-key") @@ -97,6 +104,7 @@ class TestDailyOnly: assert DAILY_TASKS <= task_names assert PLATFORM_TASKS <= task_names assert not task_names & WHEREBY_TASKS + assert not task_names & LIVEKIT_TASKS def test_daily_task_count(self): schedule = build_beat_schedule(daily_api_key="test-daily-key") @@ -104,6 +112,33 @@ class TestDailyOnly: assert len(schedule) == 6 +class TestLiveKitOnly: + """When only LiveKit is configured.""" + + def test_livekit_keys(self): + schedule = build_beat_schedule( + livekit_api_key="test-lk-key", livekit_url="ws://livekit:7880" + ) + task_names = set(schedule.keys()) + assert LIVEKIT_TASKS <= task_names + assert PLATFORM_TASKS <= task_names + assert not task_names & WHEREBY_TASKS + assert not task_names & DAILY_TASKS + + def test_livekit_task_count(self): + schedule = build_beat_schedule( + livekit_api_key="test-lk-key", livekit_url="ws://livekit:7880" + ) + # LiveKit (2) + Platform (3) = 5 + assert len(schedule) == 5 + + def test_livekit_needs_both_key_and_url(self): + schedule_key_only = build_beat_schedule(livekit_api_key="test-lk-key") + schedule_url_only = build_beat_schedule(livekit_url="ws://livekit:7880") + assert not set(schedule_key_only.keys()) & LIVEKIT_TASKS + assert not set(schedule_url_only.keys()) & LIVEKIT_TASKS + + class TestBothPlatforms: """When both Whereby and Daily.co are configured.""" diff --git a/www/app/(app)/rooms/page.tsx b/www/app/(app)/rooms/page.tsx index f03315e8..fe181b45 100644 --- a/www/app/(app)/rooms/page.tsx +++ b/www/app/(app)/rooms/page.tsx @@ -95,6 +95,7 @@ const roomInitialState = { platform: "whereby", skipConsent: false, emailTranscriptTo: "", + storeVideo: false, }; export default function RoomsList() { @@ -185,6 +186,7 @@ export default function RoomsList() { platform: detailedEditedRoom.platform, skipConsent: detailedEditedRoom.skip_consent || false, emailTranscriptTo: detailedEditedRoom.email_transcript_to || "", + storeVideo: detailedEditedRoom.store_video || false, } : null, [detailedEditedRoom], @@ -335,6 +337,7 @@ export default function RoomsList() { platform, skip_consent: room.skipConsent, email_transcript_to: room.emailTranscriptTo || null, + store_video: room.storeVideo, }; if (isEditing) { @@ -400,6 +403,7 @@ export default function RoomsList() { platform: roomData.platform, skipConsent: roomData.skip_consent || false, emailTranscriptTo: roomData.email_transcript_to || "", + storeVideo: roomData.store_video || false, }); setEditRoomId(roomId); setIsEditing(true); @@ -842,6 +846,38 @@ export default function RoomsList() { )} + {room.platform === "daily" && + room.recordingType === "cloud" && ( + + { + const syntheticEvent = { + target: { + name: "storeVideo", + type: "checkbox", + checked: e.checked, + }, + }; + handleRoomChange(syntheticEvent); + }} + > + + + + + + Store video recording + + + + When enabled, a composed video recording will be + saved alongside audio. Disabling saves significant + storage. + + + )} diff --git a/www/app/[roomName]/components/DailyRoom.tsx b/www/app/[roomName]/components/DailyRoom.tsx index 94add0e4..5498f0ac 100644 --- a/www/app/[roomName]/components/DailyRoom.tsx +++ b/www/app/[roomName]/components/DailyRoom.tsx @@ -267,12 +267,13 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { const handleFrameJoinMeeting = useCallback(() => { if (meeting.recording_type === "cloud") { - console.log("Starting dual recording via REST API", { + console.log("Starting recording via REST API", { cloudInstanceId, rawTracksInstanceId, + storeVideo: meeting.store_video, }); - // Start both cloud and raw-tracks via backend REST API (with retry on 404) + // Start recordings via backend REST API (with retry on 404) // Daily.co needs time to register call as "hosting" for REST API const startRecordingWithRetry = ( type: DailyRecordingType, @@ -320,12 +321,17 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { }, RECORDING_START_DELAY_MS); }; - // Start both recordings - startRecordingWithRetry("cloud", cloudInstanceId); + // Always start raw-tracks (needed for transcription pipeline) startRecordingWithRetry("raw-tracks", rawTracksInstanceId); + + // Only start cloud (composed video) if store_video is enabled + if (meeting.store_video) { + startRecordingWithRetry("cloud", cloudInstanceId); + } } }, [ meeting.recording_type, + meeting.store_video, meeting.id, startRecordingMutation, cloudInstanceId, diff --git a/www/app/reflector-api.d.ts b/www/app/reflector-api.d.ts index dd41259f..83529f54 100644 --- a/www/app/reflector-api.d.ts +++ b/www/app/reflector-api.d.ts @@ -1134,6 +1134,11 @@ export interface components { skip_consent: boolean; /** Email Transcript To */ email_transcript_to?: string | null; + /** + * Store Video + * @default false + */ + store_video: boolean; }; /** CreateRoomMeeting */ CreateRoomMeeting: { @@ -1852,6 +1857,11 @@ export interface components { daily_composed_video_s3_key?: string | null; /** Daily Composed Video Duration */ daily_composed_video_duration?: number | null; + /** + * Store Video + * @default false + */ + store_video: boolean; }; /** MeetingConsentRequest */ MeetingConsentRequest: { @@ -1955,6 +1965,11 @@ export interface components { skip_consent: boolean; /** Email Transcript To */ email_transcript_to?: string | null; + /** + * Store Video + * @default false + */ + store_video: boolean; }; /** RoomDetails */ RoomDetails: { @@ -2013,6 +2028,11 @@ export interface components { skip_consent: boolean; /** Email Transcript To */ email_transcript_to?: string | null; + /** + * Store Video + * @default false + */ + store_video: boolean; /** Webhook Url */ webhook_url: string | null; /** Webhook Secret */ @@ -2389,6 +2409,8 @@ export interface components { skip_consent?: boolean | null; /** Email Transcript To */ email_transcript_to?: string | null; + /** Store Video */ + store_video?: boolean | null; }; /** UpdateTranscript */ UpdateTranscript: {