From 23d2bc283d4d02187b250d2055103e0374ee93d6 Mon Sep 17 00:00:00 2001 From: Igor Monadical Date: Wed, 21 Jan 2026 15:10:19 -0500 Subject: [PATCH 1/8] fix: ics non-sync bugfix (#823) * ics non-sync bugfix * fix tests --------- Co-authored-by: Igor Loskutov --- server/reflector/services/ics_sync.py | 60 +++++++++++++++++++-------- server/tests/test_ics_sync.py | 47 ++++++++++++++++++++- 2 files changed, 88 insertions(+), 19 deletions(-) diff --git a/server/reflector/services/ics_sync.py b/server/reflector/services/ics_sync.py index 2a4855cb..56f83495 100644 --- a/server/reflector/services/ics_sync.py +++ b/server/reflector/services/ics_sync.py @@ -319,21 +319,6 @@ class ICSSyncService: calendar = self.fetch_service.parse_ics(ics_content) content_hash = hashlib.md5(ics_content.encode()).hexdigest() - if room.ics_last_etag == content_hash: - logger.info("No changes in ICS for room", room_id=room.id) - room_url = f"{settings.UI_BASE_URL}/{room.name}" - events, total_events = self.fetch_service.extract_room_events( - calendar, room.name, room_url - ) - return { - "status": SyncStatus.UNCHANGED, - "hash": content_hash, - "events_found": len(events), - "total_events": total_events, - "events_created": 0, - "events_updated": 0, - "events_deleted": 0, - } # Extract matching events room_url = f"{settings.UI_BASE_URL}/{room.name}" @@ -371,6 +356,44 @@ class ICSSyncService: time_since_sync = datetime.now(timezone.utc) - room.ics_last_sync return time_since_sync.total_seconds() >= room.ics_fetch_interval + def _event_data_changed(self, existing: CalendarEvent, new_data: EventData) -> bool: + """Check if event data has changed by comparing relevant fields. + + IMPORTANT: When adding fields to CalendarEvent/EventData, update this method + and the _COMPARED_FIELDS set below for runtime validation. + """ + # Fields that come from ICS and should trigger updates when changed + _COMPARED_FIELDS = { + "title", + "description", + "start_time", + "end_time", + "location", + "attendees", + "ics_raw_data", + } + + # Runtime exhaustiveness check: ensure we're comparing all EventData fields + event_data_fields = set(EventData.__annotations__.keys()) - {"ics_uid"} + if event_data_fields != _COMPARED_FIELDS: + missing = event_data_fields - _COMPARED_FIELDS + extra = _COMPARED_FIELDS - event_data_fields + raise RuntimeError( + f"_event_data_changed() field mismatch: " + f"missing={missing}, extra={extra}. " + f"Update the comparison logic when adding/removing fields." + ) + + return ( + existing.title != new_data["title"] + or existing.description != new_data["description"] + or existing.start_time != new_data["start_time"] + or existing.end_time != new_data["end_time"] + or existing.location != new_data["location"] + or existing.attendees != new_data["attendees"] + or existing.ics_raw_data != new_data["ics_raw_data"] + ) + async def _sync_events_to_database( self, room_id: str, events: list[EventData] ) -> SyncStats: @@ -386,11 +409,14 @@ class ICSSyncService: ) if existing: - updated += 1 + # Only count as updated if data actually changed + if self._event_data_changed(existing, event_data): + updated += 1 + await calendar_events_controller.upsert(calendar_event) else: created += 1 + await calendar_events_controller.upsert(calendar_event) - await calendar_events_controller.upsert(calendar_event) current_ics_uids.append(event_data["ics_uid"]) # Soft delete events that are no longer in calendar diff --git a/server/tests/test_ics_sync.py b/server/tests/test_ics_sync.py index e448dd7d..cac10848 100644 --- a/server/tests/test_ics_sync.py +++ b/server/tests/test_ics_sync.py @@ -189,14 +189,17 @@ async def test_ics_sync_service_sync_room_calendar(): assert events[0].ics_uid == "sync-event-1" assert events[0].title == "Sync Test Meeting" - # Second sync with same content (should be unchanged) + # Second sync with same content (calendar unchanged, but sync always runs) # Refresh room to get updated etag and force sync by setting old sync time room = await rooms_controller.get_by_id(room.id) await rooms_controller.update( room, {"ics_last_sync": datetime.now(timezone.utc) - timedelta(minutes=10)} ) result = await sync_service.sync_room_calendar(room) - assert result["status"] == "unchanged" + assert result["status"] == "success" + assert result["events_created"] == 0 + assert result["events_updated"] == 0 + assert result["events_deleted"] == 0 # Third sync with updated event event["summary"] = "Updated Meeting Title" @@ -288,3 +291,43 @@ async def test_ics_sync_service_error_handling(): result = await sync_service.sync_room_calendar(room) assert result["status"] == "error" assert "Network error" in result["error"] + + +@pytest.mark.asyncio +async def test_event_data_changed_exhaustiveness(): + """Test that _event_data_changed compares all EventData fields (except ics_uid). + + This test ensures programmers don't forget to update the comparison logic + when adding new fields to EventData/CalendarEvent. + """ + from reflector.services.ics_sync import EventData + + sync_service = ICSSyncService() + + from reflector.db.calendar_events import CalendarEvent + + now = datetime.now(timezone.utc) + event_data: EventData = { + "ics_uid": "test-123", + "title": "Test", + "description": "Desc", + "location": "Loc", + "start_time": now, + "end_time": now + timedelta(hours=1), + "attendees": [], + "ics_raw_data": "raw", + } + + existing = CalendarEvent( + room_id="room1", + **event_data, + ) + + # Will raise RuntimeError if fields are missing from comparison + result = sync_service._event_data_changed(existing, event_data) + assert result is False + + modified_data = event_data.copy() + modified_data["title"] = "Changed Title" + result = sync_service._event_data_changed(existing, modified_data) + assert result is True From 4dc49e5b2581f43066f072d6105b5f65124ebb00 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Wed, 21 Jan 2026 14:30:42 -0600 Subject: [PATCH 2/8] chore(main): release 0.28.1 (#827) --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c6de4f8f..d3796e92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [0.28.1](https://github.com/Monadical-SAS/reflector/compare/v0.28.0...v0.28.1) (2026-01-21) + + +### Bug Fixes + +* ics non-sync bugfix ([#823](https://github.com/Monadical-SAS/reflector/issues/823)) ([23d2bc2](https://github.com/Monadical-SAS/reflector/commit/23d2bc283d4d02187b250d2055103e0374ee93d6)) + ## [0.28.0](https://github.com/Monadical-SAS/reflector/compare/v0.27.0...v0.28.0) (2026-01-20) From c723752b7e15aa48a41ad22856f147a5517d3f46 Mon Sep 17 00:00:00 2001 From: Igor Monadical Date: Wed, 21 Jan 2026 17:05:03 -0500 Subject: [PATCH 3/8] feat: set hatchet as default for multitracks (#822) * set hatchet as default for multitracks * fix: pipeline routing tests for hatchet-default branch - Create room with use_celery=True to force Celery backend in tests - Link transcript to room to enable multitrack pipeline routing - Fixes test failures caused by missing HATCHET_CLIENT_TOKEN in test env * Update server/reflector/services/transcript_process.py Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com> --------- Co-authored-by: Igor Loskutov Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com> --- ...269_replace_use_hatchet_with_use_celery.py | 44 ++++ server/reflector/db/rooms.py | 4 +- server/reflector/hatchet/run_workers_cpu.py | 5 - server/reflector/hatchet/run_workers_llm.py | 5 - .../reflector/services/transcript_process.py | 21 +- server/reflector/settings.py | 11 +- server/reflector/worker/process.py | 12 +- server/tests/test_hatchet_dispatch.py | 189 +++++++----------- server/tests/test_transcripts_process.py | 18 +- 9 files changed, 148 insertions(+), 161 deletions(-) create mode 100644 server/migrations/versions/80beb1ea3269_replace_use_hatchet_with_use_celery.py diff --git a/server/migrations/versions/80beb1ea3269_replace_use_hatchet_with_use_celery.py b/server/migrations/versions/80beb1ea3269_replace_use_hatchet_with_use_celery.py new file mode 100644 index 00000000..778fa001 --- /dev/null +++ b/server/migrations/versions/80beb1ea3269_replace_use_hatchet_with_use_celery.py @@ -0,0 +1,44 @@ +"""replace_use_hatchet_with_use_celery + +Revision ID: 80beb1ea3269 +Revises: bd3a729bb379 +Create Date: 2026-01-20 16:26:25.555869 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "80beb1ea3269" +down_revision: Union[str, None] = "bd3a729bb379" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + with op.batch_alter_table("room", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "use_celery", + sa.Boolean(), + server_default=sa.text("false"), + nullable=False, + ) + ) + batch_op.drop_column("use_hatchet") + + +def downgrade() -> None: + with op.batch_alter_table("room", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "use_hatchet", + sa.Boolean(), + server_default=sa.text("false"), + nullable=False, + ) + ) + batch_op.drop_column("use_celery") diff --git a/server/reflector/db/rooms.py b/server/reflector/db/rooms.py index 478c096e..8228144c 100644 --- a/server/reflector/db/rooms.py +++ b/server/reflector/db/rooms.py @@ -58,7 +58,7 @@ rooms = sqlalchemy.Table( nullable=False, ), sqlalchemy.Column( - "use_hatchet", + "use_celery", sqlalchemy.Boolean, nullable=False, server_default=false(), @@ -97,7 +97,7 @@ class Room(BaseModel): ics_last_sync: datetime | None = None ics_last_etag: str | None = None platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) - use_hatchet: bool = False + use_celery: bool = False skip_consent: bool = False diff --git a/server/reflector/hatchet/run_workers_cpu.py b/server/reflector/hatchet/run_workers_cpu.py index 3fa1106d..ad946578 100644 --- a/server/reflector/hatchet/run_workers_cpu.py +++ b/server/reflector/hatchet/run_workers_cpu.py @@ -12,14 +12,9 @@ from reflector.hatchet.workflows.daily_multitrack_pipeline import ( daily_multitrack_pipeline, ) from reflector.logger import logger -from reflector.settings import settings def main(): - if not settings.HATCHET_ENABLED: - logger.error("HATCHET_ENABLED is False, not starting CPU workers") - return - hatchet = HatchetClientManager.get_client() logger.info( diff --git a/server/reflector/hatchet/run_workers_llm.py b/server/reflector/hatchet/run_workers_llm.py index 00c3a115..3ab0529a 100644 --- a/server/reflector/hatchet/run_workers_llm.py +++ b/server/reflector/hatchet/run_workers_llm.py @@ -11,7 +11,6 @@ from reflector.hatchet.workflows.subject_processing import subject_workflow from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow from reflector.hatchet.workflows.track_processing import track_workflow from reflector.logger import logger -from reflector.settings import settings SLOTS = 10 WORKER_NAME = "llm-worker-pool" @@ -19,10 +18,6 @@ POOL = "llm-io" def main(): - if not settings.HATCHET_ENABLED: - logger.error("HATCHET_ENABLED is False, not starting LLM workers") - return - hatchet = HatchetClientManager.get_client() logger.info( diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index 0e0ebc7c..0341304f 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -23,7 +23,6 @@ from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_multitrack_pipeline import ( task_pipeline_multitrack_process, ) -from reflector.settings import settings from reflector.utils.string import NonEmptyString @@ -102,8 +101,8 @@ async def validate_transcript_for_processing( if transcript.locked: return ValidationLocked(detail="Recording is locked") - # hatchet is idempotent anyways + if it wasn't dispatched successfully - if transcript.status == "idle" and not settings.HATCHET_ENABLED: + # Check if recording is ready for processing + if transcript.status == "idle" and not transcript.workflow_run_id: return ValidationNotReady(detail="Recording is not ready for processing") # Check Celery tasks @@ -116,7 +115,8 @@ async def validate_transcript_for_processing( ): return ValidationAlreadyScheduled(detail="already running") - if settings.HATCHET_ENABLED and transcript.workflow_run_id: + # Check Hatchet workflow status if workflow_run_id exists + if transcript.workflow_run_id: try: status = await HatchetClientManager.get_workflow_run_status( transcript.workflow_run_id @@ -181,19 +181,16 @@ async def dispatch_transcript_processing( Returns AsyncResult for Celery tasks, None for Hatchet workflows. """ if isinstance(config, MultitrackProcessingConfig): - # Check if room has use_hatchet=True (overrides env vars) - room_forces_hatchet = False + use_celery = False if config.room_id: room = await rooms_controller.get_by_id(config.room_id) - room_forces_hatchet = room.use_hatchet if room else False + use_celery = room.use_celery if room else False - # Start durable workflow if enabled (Hatchet) - # and if room has use_hatchet=True - use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet + use_hatchet = not use_celery - if room_forces_hatchet: + if use_celery: logger.info( - "Room forces Hatchet workflow", + "Room uses legacy Celery processing", room_id=config.room_id, transcript_id=config.transcript_id, ) diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 0057479d..6fc808d2 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -158,19 +158,10 @@ class Settings(BaseSettings): ZULIP_API_KEY: str | None = None ZULIP_BOT_EMAIL: str | None = None - # Durable workflow orchestration - # Provider: "hatchet" (or "none" to disable) - DURABLE_WORKFLOW_PROVIDER: str = "none" - - # Hatchet workflow orchestration + # Hatchet workflow orchestration (always enabled for multitrack processing) HATCHET_CLIENT_TOKEN: str | None = None HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls HATCHET_DEBUG: bool = False - @property - def HATCHET_ENABLED(self) -> bool: - """True if Hatchet is the active provider.""" - return self.DURABLE_WORKFLOW_PROVIDER == "hatchet" - settings = Settings() diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 466cdef0..a1fa3a3c 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -287,11 +287,12 @@ async def _process_multitrack_recording_inner( room_id=room.id, ) - use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet + use_celery = room and room.use_celery + use_hatchet = not use_celery - if room and room.use_hatchet and not settings.HATCHET_ENABLED: + if use_celery: logger.info( - "Room forces Hatchet workflow", + "Room uses legacy Celery processing", room_id=room.id, transcript_id=transcript.id, ) @@ -810,7 +811,6 @@ async def reprocess_failed_daily_recordings(): ) continue - # Fetch room to check use_hatchet flag room = None if meeting.room_id: room = await rooms_controller.get_by_id(meeting.room_id) @@ -834,10 +834,10 @@ async def reprocess_failed_daily_recordings(): ) continue - use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet + use_celery = room and room.use_celery + use_hatchet = not use_celery if use_hatchet: - # Hatchet requires a transcript for workflow_run_id tracking if not transcript: logger.warning( "No transcript for Hatchet reprocessing, skipping", diff --git a/server/tests/test_hatchet_dispatch.py b/server/tests/test_hatchet_dispatch.py index 57a699ee..157f2b5c 100644 --- a/server/tests/test_hatchet_dispatch.py +++ b/server/tests/test_hatchet_dispatch.py @@ -2,10 +2,9 @@ Tests for Hatchet workflow dispatch and routing logic. These tests verify: -1. Routing to Hatchet when HATCHET_ENABLED=True -2. Replay logic for failed workflows -3. Force flag to cancel and restart -4. Validation prevents concurrent workflows +1. Hatchet workflow validation and replay logic +2. Force flag to cancel and restart workflows +3. Validation prevents concurrent workflows """ from unittest.mock import AsyncMock, patch @@ -34,25 +33,22 @@ async def test_hatchet_validation_blocks_running_workflow(): workflow_run_id="running-workflow-123", ) - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = True + with patch( + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet: + mock_hatchet.get_workflow_run_status = AsyncMock( + return_value=V1TaskStatus.RUNNING + ) with patch( - "reflector.services.transcript_process.HatchetClientManager" - ) as mock_hatchet: - mock_hatchet.get_workflow_run_status = AsyncMock( - return_value=V1TaskStatus.RUNNING - ) + "reflector.services.transcript_process.task_is_scheduled_or_active" + ) as mock_celery_check: + mock_celery_check.return_value = False - with patch( - "reflector.services.transcript_process.task_is_scheduled_or_active" - ) as mock_celery_check: - mock_celery_check.return_value = False + result = await validate_transcript_for_processing(mock_transcript) - result = await validate_transcript_for_processing(mock_transcript) - - assert isinstance(result, ValidationAlreadyScheduled) - assert "running" in result.detail.lower() + assert isinstance(result, ValidationAlreadyScheduled) + assert "running" in result.detail.lower() @pytest.mark.usefixtures("setup_database") @@ -72,24 +68,21 @@ async def test_hatchet_validation_blocks_queued_workflow(): workflow_run_id="queued-workflow-123", ) - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = True + with patch( + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet: + mock_hatchet.get_workflow_run_status = AsyncMock( + return_value=V1TaskStatus.QUEUED + ) with patch( - "reflector.services.transcript_process.HatchetClientManager" - ) as mock_hatchet: - mock_hatchet.get_workflow_run_status = AsyncMock( - return_value=V1TaskStatus.QUEUED - ) + "reflector.services.transcript_process.task_is_scheduled_or_active" + ) as mock_celery_check: + mock_celery_check.return_value = False - with patch( - "reflector.services.transcript_process.task_is_scheduled_or_active" - ) as mock_celery_check: - mock_celery_check.return_value = False + result = await validate_transcript_for_processing(mock_transcript) - result = await validate_transcript_for_processing(mock_transcript) - - assert isinstance(result, ValidationAlreadyScheduled) + assert isinstance(result, ValidationAlreadyScheduled) @pytest.mark.usefixtures("setup_database") @@ -110,25 +103,22 @@ async def test_hatchet_validation_allows_failed_workflow(): recording_id="test-recording-id", ) - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = True + with patch( + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet: + mock_hatchet.get_workflow_run_status = AsyncMock( + return_value=V1TaskStatus.FAILED + ) with patch( - "reflector.services.transcript_process.HatchetClientManager" - ) as mock_hatchet: - mock_hatchet.get_workflow_run_status = AsyncMock( - return_value=V1TaskStatus.FAILED - ) + "reflector.services.transcript_process.task_is_scheduled_or_active" + ) as mock_celery_check: + mock_celery_check.return_value = False - with patch( - "reflector.services.transcript_process.task_is_scheduled_or_active" - ) as mock_celery_check: - mock_celery_check.return_value = False + result = await validate_transcript_for_processing(mock_transcript) - result = await validate_transcript_for_processing(mock_transcript) - - assert isinstance(result, ValidationOk) - assert result.transcript_id == "test-transcript-id" + assert isinstance(result, ValidationOk) + assert result.transcript_id == "test-transcript-id" @pytest.mark.usefixtures("setup_database") @@ -149,24 +139,21 @@ async def test_hatchet_validation_allows_completed_workflow(): recording_id="test-recording-id", ) - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = True + with patch( + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet: + mock_hatchet.get_workflow_run_status = AsyncMock( + return_value=V1TaskStatus.COMPLETED + ) with patch( - "reflector.services.transcript_process.HatchetClientManager" - ) as mock_hatchet: - mock_hatchet.get_workflow_run_status = AsyncMock( - return_value=V1TaskStatus.COMPLETED - ) + "reflector.services.transcript_process.task_is_scheduled_or_active" + ) as mock_celery_check: + mock_celery_check.return_value = False - with patch( - "reflector.services.transcript_process.task_is_scheduled_or_active" - ) as mock_celery_check: - mock_celery_check.return_value = False + result = await validate_transcript_for_processing(mock_transcript) - result = await validate_transcript_for_processing(mock_transcript) - - assert isinstance(result, ValidationOk) + assert isinstance(result, ValidationOk) @pytest.mark.usefixtures("setup_database") @@ -187,26 +174,23 @@ async def test_hatchet_validation_allows_when_status_check_fails(): recording_id="test-recording-id", ) - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = True + with patch( + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet: + # Status check fails (workflow might be deleted) + mock_hatchet.get_workflow_run_status = AsyncMock( + side_effect=ApiException("Workflow not found") + ) with patch( - "reflector.services.transcript_process.HatchetClientManager" - ) as mock_hatchet: - # Status check fails (workflow might be deleted) - mock_hatchet.get_workflow_run_status = AsyncMock( - side_effect=ApiException("Workflow not found") - ) + "reflector.services.transcript_process.task_is_scheduled_or_active" + ) as mock_celery_check: + mock_celery_check.return_value = False - with patch( - "reflector.services.transcript_process.task_is_scheduled_or_active" - ) as mock_celery_check: - mock_celery_check.return_value = False + result = await validate_transcript_for_processing(mock_transcript) - result = await validate_transcript_for_processing(mock_transcript) - - # Should allow processing when we can't get status - assert isinstance(result, ValidationOk) + # Should allow processing when we can't get status + assert isinstance(result, ValidationOk) @pytest.mark.usefixtures("setup_database") @@ -227,47 +211,11 @@ async def test_hatchet_validation_skipped_when_no_workflow_id(): recording_id="test-recording-id", ) - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = True - - with patch( - "reflector.services.transcript_process.HatchetClientManager" - ) as mock_hatchet: - # Should not be called - mock_hatchet.get_workflow_run_status = AsyncMock() - - with patch( - "reflector.services.transcript_process.task_is_scheduled_or_active" - ) as mock_celery_check: - mock_celery_check.return_value = False - - result = await validate_transcript_for_processing(mock_transcript) - - # Should not check Hatchet status - mock_hatchet.get_workflow_run_status.assert_not_called() - assert isinstance(result, ValidationOk) - - -@pytest.mark.usefixtures("setup_database") -@pytest.mark.asyncio -async def test_hatchet_validation_skipped_when_disabled(): - """Test that Hatchet validation is skipped when HATCHET_ENABLED is False.""" - from reflector.services.transcript_process import ( - ValidationOk, - validate_transcript_for_processing, - ) - - mock_transcript = Transcript( - id="test-transcript-id", - name="Test", - status="uploaded", - source_kind="room", - workflow_run_id="some-workflow-123", - recording_id="test-recording-id", - ) - - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = False # Hatchet disabled + with patch( + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet: + # Should not be called + mock_hatchet.get_workflow_run_status = AsyncMock() with patch( "reflector.services.transcript_process.task_is_scheduled_or_active" @@ -276,7 +224,8 @@ async def test_hatchet_validation_skipped_when_disabled(): result = await validate_transcript_for_processing(mock_transcript) - # Should not check Hatchet at all + # Should not check Hatchet status + mock_hatchet.get_workflow_run_status.assert_not_called() assert isinstance(result, ValidationOk) diff --git a/server/tests/test_transcripts_process.py b/server/tests/test_transcripts_process.py index e3d749df..3e2aed2b 100644 --- a/server/tests/test_transcripts_process.py +++ b/server/tests/test_transcripts_process.py @@ -162,9 +162,24 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client): from datetime import datetime, timezone from reflector.db.recordings import Recording, recordings_controller + from reflector.db.rooms import rooms_controller from reflector.db.transcripts import transcripts_controller - # Create transcript with Daily.co multitrack recording + room = await rooms_controller.add( + name="test-room", + user_id="test-user", + zulip_auto_post=False, + zulip_stream="", + zulip_topic="", + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic-2nd-participant", + is_shared=False, + ) + # Force Celery backend for test + await rooms_controller.update(room, {"use_celery": True}) + transcript = await transcripts_controller.add( "", source_kind="room", @@ -172,6 +187,7 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client): target_language="en", user_id="test-user", share_mode="public", + room_id=room.id, ) track_keys = [ From 8fc8d8bf4a0f89e5c1ba9855df741ebedd60ef72 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Thu, 22 Jan 2026 11:52:39 -0600 Subject: [PATCH 4/8] chore(main): release 0.29.0 (#828) --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d3796e92..544aaff9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [0.29.0](https://github.com/Monadical-SAS/reflector/compare/v0.28.1...v0.29.0) (2026-01-21) + + +### Features + +* set hatchet as default for multitracks ([#822](https://github.com/Monadical-SAS/reflector/issues/822)) ([c723752](https://github.com/Monadical-SAS/reflector/commit/c723752b7e15aa48a41ad22856f147a5517d3f46)) + ## [0.28.1](https://github.com/Monadical-SAS/reflector/compare/v0.28.0...v0.28.1) (2026-01-21) From 6e786b7631f81b5a7e558991d557d837de776f6e Mon Sep 17 00:00:00 2001 From: Igor Monadical Date: Thu, 22 Jan 2026 19:03:33 -0500 Subject: [PATCH 5/8] hatchet processing resilence several fixes (#831) Co-authored-by: Igor Loskutov --- .../workflows/daily_multitrack_pipeline.py | 42 ++++++++++++----- .../reflector/services/transcript_process.py | 45 ++++++++++++------- 2 files changed, 61 insertions(+), 26 deletions(-) diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index 0726cfd6..2d1ab194 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -1149,8 +1149,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: transcript, "TRANSCRIPT", TranscriptText( - text=merged_transcript.text, - translation=merged_transcript.translation, + text="", + translation=None, ), logger=logger, ) @@ -1347,14 +1347,34 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult: f"participants={len(payload.transcript.participants)})" ) - response = await send_webhook_request( - url=room.webhook_url, - payload=payload, - event_type="transcript.completed", - webhook_secret=room.webhook_secret, - timeout=30.0, - ) + try: + response = await send_webhook_request( + url=room.webhook_url, + payload=payload, + event_type="transcript.completed", + webhook_secret=room.webhook_secret, + timeout=30.0, + ) - ctx.log(f"send_webhook complete: status_code={response.status_code}") + ctx.log(f"send_webhook complete: status_code={response.status_code}") + return WebhookResult(webhook_sent=True, response_code=response.status_code) - return WebhookResult(webhook_sent=True, response_code=response.status_code) + except httpx.HTTPStatusError as e: + ctx.log( + f"send_webhook failed (HTTP {e.response.status_code}), continuing anyway" + ) + return WebhookResult( + webhook_sent=False, response_code=e.response.status_code + ) + + except httpx.ConnectError as e: + ctx.log(f"send_webhook failed (connection error), continuing anyway: {e}") + return WebhookResult(webhook_sent=False) + + except httpx.TimeoutException as e: + ctx.log(f"send_webhook failed (timeout), continuing anyway: {e}") + return WebhookResult(webhook_sent=False) + + except Exception as e: + ctx.log(f"send_webhook unexpected error, continuing anyway: {e}") + return WebhookResult(webhook_sent=False) diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index 0341304f..6a7fdbea 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -11,7 +11,7 @@ from typing import Literal, Union, assert_never import celery from celery.result import AsyncResult -from hatchet_sdk.clients.rest.exceptions import ApiException +from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException from hatchet_sdk.clients.rest.models import V1TaskStatus from reflector.db.recordings import recordings_controller @@ -212,24 +212,39 @@ async def dispatch_transcript_processing( ) return None else: - # Workflow exists but can't replay (CANCELLED, COMPLETED, etc.) + # Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted) # Log and proceed to start new workflow - status = await HatchetClientManager.get_workflow_run_status( - transcript.workflow_run_id - ) - logger.info( - "Old workflow not replayable, starting new", - old_workflow_id=transcript.workflow_run_id, - old_status=status.value, - ) + try: + status = await HatchetClientManager.get_workflow_run_status( + transcript.workflow_run_id + ) + logger.info( + "Old workflow not replayable, starting new", + old_workflow_id=transcript.workflow_run_id, + old_status=status.value, + ) + except NotFoundException: + # Workflow deleted from Hatchet but ID still in DB + logger.info( + "Old workflow not found in Hatchet, starting new", + old_workflow_id=transcript.workflow_run_id, + ) # Force: cancel old workflow if exists if force and transcript and transcript.workflow_run_id: - await HatchetClientManager.cancel_workflow(transcript.workflow_run_id) - logger.info( - "Cancelled old workflow (--force)", - workflow_id=transcript.workflow_run_id, - ) + try: + await HatchetClientManager.cancel_workflow( + transcript.workflow_run_id + ) + logger.info( + "Cancelled old workflow (--force)", + workflow_id=transcript.workflow_run_id, + ) + except NotFoundException: + logger.info( + "Old workflow already deleted (--force)", + workflow_id=transcript.workflow_run_id, + ) await transcripts_controller.update( transcript, {"workflow_run_id": None} ) From 6c175a11d8a3745095bfad06a4ad3ccdfd278433 Mon Sep 17 00:00:00 2001 From: Igor Monadical Date: Fri, 23 Jan 2026 12:33:06 -0500 Subject: [PATCH 6/8] feat: brady bunch (#816) * brady bunch PRD/tasks * clean dead daily.co code * brady bunch prototype (no-mistakes) * brady bunch prototype (no-mistakes) review * self-review * daily poll time match (no-mistakes) * daily poll self-review (no-mistakes) * daily poll self-review (no-mistakes) * daily co doc * cleanup * cleanup * self-review (no-mistakes) * self-review (no-mistakes) * self-review * self-review * ui typefix * dupe calls error handling proper * daily reflector data model doc * logging style fix * migration merge --------- Co-authored-by: Igor Loskutov --- .gitleaksignore | 1 + server/docs/DAILY_REFLECTOR_DATA_MODEL.md | 496 ++++++++++++++++++ ...b1e6a6fc465_add_cloud_recording_support.py | 40 ++ ..._merge_cloud_recording_and_celery_heads.py | 23 + server/reflector/dailyco_api/__init__.py | 3 +- server/reflector/dailyco_api/client.py | 37 +- server/reflector/dailyco_api/instance_id.py | 37 ++ server/reflector/dailyco_api/requests.py | 7 - server/reflector/dailyco_api/responses.py | 7 + server/reflector/db/meetings.py | 132 ++++- server/reflector/db/recordings.py | 14 + server/reflector/video_platforms/daily.py | 36 +- server/reflector/views/daily.py | 71 ++- server/reflector/views/meetings.py | 78 ++- server/reflector/views/rooms.py | 3 +- server/reflector/worker/app.py | 7 +- server/reflector/worker/process.py | 321 ++++++++++-- server/tests/test_dailyco_instance_id.py | 147 ++++++ .../tests/test_time_based_meeting_matching.py | 374 +++++++++++++ .../[transcriptId]/finalSummary.tsx | 5 +- www/app/(app)/transcripts/createTranscript.ts | 5 +- www/app/(app)/transcripts/shareAndPrivacy.tsx | 5 +- www/app/(app)/transcripts/shareZulip.tsx | 5 +- www/app/(app)/transcripts/transcriptTitle.tsx | 5 +- www/app/[roomName]/components/DailyRoom.tsx | 113 +++- www/app/lib/apiHooks.ts | 14 + www/app/lib/transcript.ts | 3 +- www/app/lib/types.ts | 2 + www/app/reflector-api.d.ts | 79 +++ www/package.json | 1 + www/pnpm-lock.yaml | 25 + 31 files changed, 1973 insertions(+), 123 deletions(-) create mode 100644 server/docs/DAILY_REFLECTOR_DATA_MODEL.md create mode 100644 server/migrations/versions/1b1e6a6fc465_add_cloud_recording_support.py create mode 100644 server/migrations/versions/e69f08ead8ea_merge_cloud_recording_and_celery_heads.py create mode 100644 server/reflector/dailyco_api/instance_id.py create mode 100644 server/tests/test_dailyco_instance_id.py create mode 100644 server/tests/test_time_based_meeting_matching.py diff --git a/.gitleaksignore b/.gitleaksignore index 8eb80bd5..141c82d5 100644 --- a/.gitleaksignore +++ b/.gitleaksignore @@ -3,3 +3,4 @@ docs/docs/installation/auth-setup.md:curl-auth-header:250 docs/docs/installation/daily-setup.md:curl-auth-header:277 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83 +server/reflector/worker/process.py:generic-api-key:465 diff --git a/server/docs/DAILY_REFLECTOR_DATA_MODEL.md b/server/docs/DAILY_REFLECTOR_DATA_MODEL.md new file mode 100644 index 00000000..c25a3fd6 --- /dev/null +++ b/server/docs/DAILY_REFLECTOR_DATA_MODEL.md @@ -0,0 +1,496 @@ +# Daily.co and Reflector Data Model + +This document explains the data model relationships between Daily.co's API concepts and Reflector's database schema, clarifying common sources of confusion. + +--- + +## Table of Contents + +1. [Core Entities Overview](#core-entities-overview) +2. [Daily.co vs Reflector Terminology](#dailyco-vs-reflector-terminology) +3. [Entity Relationships](#entity-relationships) +4. [Recording Multiplicity](#recording-multiplicity) +5. [Session Identifiers Explained](#session-identifiers-explained) +6. [Time-Based Matching](#time-based-matching) +7. [Multitrack Recording Details](#multitrack-recording-details) +8. [Verified Example](#verified-example) + +--- + +## Core Entities Overview + +### Reflector's Four Primary Entities + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Room (Reflector) │ +│ - Persistent meeting template │ +│ - User-created configuration │ +│ - Example: "team-standup" │ +└────────────────────┬────────────────────────────────────────────┘ + │ 1:N + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Meeting (Reflector) │ +│ - Single session instance │ +│ - Creates NEW Daily.co room with timestamp │ +│ - Example: "team-standup-20260115120000" │ +└────────────────────┬────────────────────────────────────────────┘ + │ 1:N + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Recording (Reflector + Daily.co) │ +│ - One segment of audio/video │ +│ - New recording created on stop/restart │ +│ - track_keys: JSON array of S3 file paths │ +└────────────────────┬────────────────────────────────────────────┘ + │ 1:1 + ▼ +┌─────────────────────────────────────────────────────────────────┐ +│ Transcript (Reflector) │ +│ - Processed audio with transcription │ +│ - Diarization, summaries, topics │ +│ - One transcript per recording │ +└─────────────────────────────────────────────────────────────────┘ +``` + +--- + +## Daily.co vs Reflector Terminology + +### Room + +| Aspect | Daily.co | Reflector | +|--------|----------|-----------| +| **Definition** | Virtual meeting space on Daily.co platform | User-created meeting template/configuration | +| **Lifetime** | Configurable expiration | Persistent until user deletes | +| **Creation** | API call for each meeting | Pre-created by user once | +| **Reuse** | Can host multiple sessions | Generates new Daily.co room per meeting | +| **Name Format** | `room-name` (reusable) | `room-name` (base identifier) | +| **Timestamping** | Not required | Meeting adds timestamp: `{name}-YYYYMMDDHHMMSS` | + +**Example:** +``` +Reflector Room: "daily-private-igor" (persistent config) + ↓ starts meeting +Daily.co Room: "daily-private-igor-20260110042117" +``` + +### Meeting + +| Aspect | Daily.co | Reflector | +|--------|----------|-----------| +| **Definition** | Session that starts when first participant joins | Explicit database record of a session | +| **Identifier** | `mtgSessionId` (generated by Daily.co) | `meeting.id` (UUID, generated by Reflector) | +| **Creation** | Implicit (first participant join) | Explicit API call before participants join | +| **Purpose** | Tracks active session state | Links recordings, transcripts, participants | +| **Scope** | Per room instance | Per Reflector room + timestamp | + +**Critical Limitation:** Daily.co's recordings API often does NOT return `mtgSessionId`, requiring time-based matching (see [Time-Based Matching](#time-based-matching)). + +### Recording + +| Aspect | Daily.co | Reflector | +|--------|----------|-----------| +| **Definition** | Audio/video files on S3 | Metadata + processing status | +| **Types** | `cloud` (composed video), `raw-tracks` (multitrack) | Stores references + `track_keys` array | +| **Multiplicity** | One recording object per start/stop cycle | One DB row per Daily.co recording object | +| **Identifier** | Daily.co `recording_id` | Same `recording_id` (stored in DB) | +| **Multitrack** | Array of `.webm` files (one per participant) | `track_keys` JSON array with S3 paths | +| **Linkage** | Via `room_name` + `start_ts` | FK `meeting_id` (set via time-based match) | + +**Critical Behavior:** Recording **stops/restarts** create **separate recording objects** with unique IDs. + +--- + +## Entity Relationships + +### Database Schema Relationships + +```sql +-- Simplified schema showing key relationships + +TABLE room ( + id VARCHAR PRIMARY KEY, + name VARCHAR UNIQUE, + platform VARCHAR -- 'whereby' | 'daily' +) + +TABLE meeting ( + id VARCHAR PRIMARY KEY, + room_id VARCHAR REFERENCES room(id) ON DELETE CASCADE, -- nullable + room_name VARCHAR, -- Daily.co room name (timestamped) + start_date TIMESTAMP, + platform VARCHAR +) + +TABLE recording ( + id VARCHAR PRIMARY KEY, -- Daily.co recording_id + meeting_id VARCHAR, -- FK to meeting (set via time-based match) + bucket_name VARCHAR, + object_key VARCHAR, -- S3 prefix + track_keys JSON, -- Array of S3 keys for multitrack + recorded_at TIMESTAMP +) + +TABLE transcript ( + id VARCHAR PRIMARY KEY, + recording_id VARCHAR, -- nullable FK + meeting_id VARCHAR, -- nullable FK + room_id VARCHAR, -- nullable FK + participants JSON, -- [{id, speaker, name, user_id}, ...] + title VARCHAR, + long_summary VARCHAR, + webvtt TEXT +) +``` + +**Relationship Cardinalities:** +``` +1 Room → N Meetings +1 Meeting → N Recordings (common: 1-21 recordings per meeting) +1 Recording → 1 Transcript +1 Meeting → N Transcripts (via recordings) +``` + +--- + +## Recording Multiplicity + +### Why Multiple Recordings Per Meeting? + +Daily.co creates a **new recording object** (new ID, new files) whenever recording stops and restarts. This happens due to: + +1. **Manual stop/start** - User clicks stop, then start recording again +2. **Network reconnection** - Participant drops, reconnects → triggers restart +3. **Participant rejoin** - Last participant leaves, new one joins → new session + +--- + +## Session Identifiers Explained + +### The Hidden Entity: Daily.co Meeting Session + +Daily.co has an **implicit ephemeral entity** that sits between Room and Recording: + +``` +Daily.co Room: "daily-private-igor-20260110042117" + │ + ├─ Daily.co Meeting Session #1 (mtgSessionId: c04334de...) + │ └─ Recording #3 (f4a50f94) - 4s, 1 track + │ + └─ Daily.co Meeting Session #2 (mtgSessionId: 4cdae3c0...) + ├─ Recording #2 (b0fa94da) - 80s, 2 tracks ← recording stopped + └─ Recording #1 (05edf519) - 62s, 1 track ← then restarted +``` + +**Daily.co Meeting Session:** +- **Lifecycle:** Starts when first participant joins, ends when last participant leaves +- **Identifier:** `mtgSessionId` (generated by Daily.co) +- **Persistence:** Ephemeral - new ID if everyone leaves and someone rejoins +- **Relationship:** 1 Session → N Recordings (if recording stops/restarts during session) + +**Key Insight:** Multiple recordings can share the same `mtgSessionId` if recording was stopped and restarted while participants remained connected. + +### mtgSessionId (Meeting Session Identifier) + +`mtgSessionId` identifies a **Daily.co meeting session** (not individual participants, not a room). + +### session_id (Per-Participant) + +**Different concept:** Per-participant connection identifier from webhooks. + +**Reflector Tracking:** `daily_participant_session` table +```sql +TABLE daily_participant_session ( + id VARCHAR PRIMARY KEY, -- {meeting_id}:{user_id}:{joined_at_ms} + meeting_id VARCHAR, + session_id VARCHAR, -- From webhook (per-participant) + user_id VARCHAR, + user_name VARCHAR, + joined_at TIMESTAMP, + left_at TIMESTAMP +) +``` +--- + +## Time-Based Matching + +### Problem Statement + +Daily.co's recordings API does not reliably return `mtgSessionId`, making it impossible to directly link recordings to meetings via Daily.co's identifiers. + +**Example API response:** +```json +{ + "id": "recording-uuid", + "room_name": "daily-private-igor-20260110042117", + "start_ts": 1768018896, + "mtgSessionId": null ← Missing! +} +``` + +### Solution: Time-Based Matching + +**Implementation:** `reflector/db/meetings.py:get_by_room_name_and_time()` + + +--- + +## Multitrack Recording Details + +### track_keys JSON Array + +**Schema:** `recording.track_keys` (JSON, nullable) +```sql +-- Example recording with 2 audio tracks +{ + "id": "b0fa94da-73b5-4f95-9239-5216a682a505", + "track_keys": [ + "igormonadical/daily-private-igor-20260110042117/1768018896877-890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-1768018914565", + "igormonadical/daily-private-igor-20260110042117/1768018896877-9660e8e9-4297-4f17-951d-0b2bf2401803-cam-audio-1768018899286" + ] +} +``` + +**Semantics:** +- `track_keys = null` → Not multitrack (cloud recording) +- `track_keys = []` → Multitrack recording with no audio captured (silence/muted) +- `track_keys = [...]` → Multitrack with N audio tracks + +**Property:** `recording.is_multitrack` (Python) +```python +@property +def is_multitrack(self) -> bool: + return self.track_keys is not None and len(self.track_keys) > 0 +``` + +### Track Filename Format + +Daily.co multitrack filenames encode timing and participant information: + +**Format:** `{recording_start_ts}-{participant_id}-cam-audio-{track_start_ts}` + +**Example:** `1768018896877-890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-1768018914565` + +**Parsed Components:** +```python +# reflector/utils/daily.py:25-60 +class DailyRecordingFilename(NamedTuple): + recording_start_ts: int # 1768018896877 (milliseconds) + participant_id: str # 890c0eae-e186-4534-a7bd-7c794b7d6d7f + track_start_ts: int # 1768018914565 (milliseconds) +``` + +**Note:** Browser downloads from S3 add `.webm` extension due to MIME headers, but S3 object keys have no extension. + +### Video Track Filtering + +Daily.co API returns both audio and video tracks, but Reflector only processes audio. + +**Filtering Logic:** `reflector/worker/process.py:660` +```python +track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"] +``` + +**Example API Response:** +```json +{ + "tracks": [ + {"type": "audio", "s3Key": "...cam-audio-1768018914565"}, + {"type": "audio", "s3Key": "...cam-audio-1768018899286"}, + {"type": "video", "s3Key": "...cam-video-1768018897095"} ← Filtered out + ] +} +``` + +**Result:** Only 2 audio tracks stored in `recording.track_keys`, video track discarded. + +**Rationale:** Reflector is audio transcription system; video not needed for processing. + +### Track-to-Participant Mapping + +**Flow:** +1. Daily.co webhook/polling provides `track_keys` array +2. Each track filename contains `participant_id` +3. Reflector queries Daily.co API: `GET /meetings/{mtgSessionId}/participants` +4. Maps `participant_id` → `user_name` +5. Stores in `transcript.participants` JSON: +```json +[ + { + "id": "890c0eae-e186-4534-a7bd-7c794b7d6d7f", + "speaker": 0, + "name": "test2", + "user_id": "907f2cc1-eaab-435f-8ee2-09185f416b22" + }, + { + "id": "9660e8e9-4297-4f17-951d-0b2bf2401803", + "speaker": 1, + "name": "test", + "user_id": "907f2cc1-eaab-435f-8ee2-09185f416b22" + } +] +``` + +**Diarization:** Multitrack recordings don't need speaker diarization AI — speaker identity comes from separate audio tracks. + +--- + +## Example + +### Meeting: daily-private-igor-20260110042117 + +**Context:** User conducted test recording with start/stop cycles, producing 3 recordings. + +#### Database State + +```sql +-- Meeting +id: 034804b8-cee2-4fb4-94d7-122f6f068a61 +room_name: daily-private-igor-20260110042117 +start_date: 2026-01-10 04:21:17+00 +``` + +#### Daily.co API Response + +```json +[ + { + "id": "f4a50f94-053c-4f9d-bda6-78ad051fbc36", + "room_name": "daily-private-igor-20260110042117", + "start_ts": 1768018885, + "duration": 4, + "status": "finished", + "mtgSessionId": "c04334de-42a0-4c2a-96be-a49b068dca85", + "tracks": [ + {"type": "audio", "s3Key": "...62e8f3ae...cam-audio-1768018885417"} + ] + }, + { + "id": "b0fa94da-73b5-4f95-9239-5216a682a505", + "room_name": "daily-private-igor-20260110042117", + "start_ts": 1768018896, + "duration": 80, + "status": "finished", + "mtgSessionId": "4cdae3c0-86cb-4578-8a6d-3a228bb48345", + "tracks": [ + {"type": "audio", "s3Key": "...890c0eae...cam-audio-1768018914565"}, + {"type": "audio", "s3Key": "...9660e8e9...cam-audio-1768018899286"}, + {"type": "video", "s3Key": "...9660e8e9...cam-video-1768018897095"} + ] + }, + { + "id": "05edf519-9048-4b49-9a75-73e9826fd950", + "room_name": "daily-private-igor-20260110042117", + "start_ts": 1768018914, + "duration": 62, + "status": "finished", + "mtgSessionId": "4cdae3c0-86cb-4578-8a6d-3a228bb48345", + "tracks": [ + {"type": "audio", "s3Key": "...890c0eae...cam-audio-1768018914948"} + ] + } +] +``` + +**Key Observations:** +- 3 recording objects returned by Daily.co +- 2 different `mtgSessionId` values (2 different meeting instances) +- Recording #2 has 3 tracks (2 audio + 1 video) +- Timestamps: 1768018885 → 1768018896 (+11s) → 1768018914 (+18s) + +#### Reflector Database + +**Recordings:** +``` +┌──────────────────────────────────────┬──────────────┬────────────┬──────────────────────────────────────┐ +│ id │ track_count │ duration │ mtgSessionId │ +├──────────────────────────────────────┼──────────────┼────────────┼──────────────────────────────────────┤ +│ f4a50f94-053c-4f9d-bda6-78ad051fbc36 │ 1 │ 4s │ c04334de-42a0-4c2a-96be-a49b068dca85 │ +│ b0fa94da-73b5-4f95-9239-5216a682a505 │ 2 (video=0) │ 80s │ 4cdae3c0-86cb-4578-8a6d-3a228bb48345 │ +│ 05edf519-9048-4b49-9a75-73e9826fd950 │ 1 │ 62s │ 4cdae3c0-86cb-4578-8a6d-3a228bb48345 │ +└──────────────────────────────────────┴──────────────┴────────────┴──────────────────────────────────────┘ +``` +**Note:** Recording #2 has 2 audio tracks (video filtered out), not 3. + +**Transcripts:** +``` +┌──────────────────────────────────────┬──────────────────────────────────────┬──────────────┬──────────────────────────────────────────────┐ +│ id │ recording_id │ participants │ title │ +├──────────────────────────────────────┼──────────────────────────────────────┼──────────────┼──────────────────────────────────────────────┤ +│ 17149b1f-546c-4837-80a0-f8140bd16592 │ f4a50f94-053c-4f9d-bda6-78ad051fbc36 │ 1 (test) │ (empty - no speech) │ +│ 49801332-3222-4c11-bdb2-375479fc87f2 │ b0fa94da-73b5-4f95-9239-5216a682a505 │ 2 (test, │ "Examination and Validation Procedures │ +│ │ │ test2) │ Review" │ +│ e5271e12-20fb-42d2-b5a8-21438abadef9 │ 05edf519-9048-4b49-9a75-73e9826fd950 │ 1 (test2) │ "Technical Sound Check Procedure Review" │ +└──────────────────────────────────────┴──────────────────────────────────────┴──────────────┴──────────────────────────────────────────────┘ +``` + +**Transcript Content:** + +*Transcript #1* (17149b1f): Empty WebVTT (no audio captured) + +*Transcript #2* (49801332): +```webvtt +WEBVTT + +00:00:03.109 --> 00:00:05.589 +Test, test, test. Test, test, test, test, test. + +00:00:19.829 --> 00:00:22.710 +Test test test test test test test test test test test. +``` +**AI-Generated Summary:** +> "The meeting focused on the critical importance of rigorous testing for ensuring reliability and quality, with test and test2 emphasizing the need for a structured testing framework and meticulous documentation..." + +*Transcript #3* (e5271e12): +```webvtt +WEBVTT + +00:00:02.029 --> 00:00:04.910 +Test, test, test, test, test, test, test, test, test, test, test. +``` + +#### Validation: track_keys → participants + +**Recording #2 (b0fa94da) tracks:** +```json +[ + ".../890c0eae-e186-4534-a7bd-7c794b7d6d7f-cam-audio-...", + ".../9660e8e9-4297-4f17-951d-0b2bf2401803-cam-audio-..." +] +``` + +**Transcript #2 (49801332) participants:** +```json +[ + {"id": "890c0eae-e186-4534-a7bd-7c794b7d6d7f", "speaker": 0, "name": "test2"}, + {"id": "9660e8e9-4297-4f17-951d-0b2bf2401803", "speaker": 1, "name": "test"} +] +``` + +### Data Flow + +``` +Daily.co API: 3 recordings + ↓ +Polling: _poll_raw_tracks_recordings() + ↓ +Worker: process_multitrack_recording.delay() × 3 + ↓ +DB: 3 recording rows created + ↓ +Pipeline: Audio processing + transcription × 3 + ↓ +DB: 3 transcript rows created (1:1 with recordings) + ↓ +UI: User sees 3 separate transcripts +``` + +**Result:** ✅ 1:1 Recording → Transcript relationship maintained. + + +--- +**Document Version:** 1.0 +**Last Verified:** 2026-01-15 +**Data Source:** Production database + Daily.co API inspection diff --git a/server/migrations/versions/1b1e6a6fc465_add_cloud_recording_support.py b/server/migrations/versions/1b1e6a6fc465_add_cloud_recording_support.py new file mode 100644 index 00000000..6df05b8a --- /dev/null +++ b/server/migrations/versions/1b1e6a6fc465_add_cloud_recording_support.py @@ -0,0 +1,40 @@ +"""add cloud recording support + +Revision ID: 1b1e6a6fc465 +Revises: bd3a729bb379 +Create Date: 2026-01-09 17:17:33.535620 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "1b1e6a6fc465" +down_revision: Union[str, None] = "bd3a729bb379" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("meeting", schema=None) as batch_op: + batch_op.add_column( + sa.Column("daily_composed_video_s3_key", sa.String(), nullable=True) + ) + batch_op.add_column( + sa.Column("daily_composed_video_duration", sa.Integer(), nullable=True) + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("meeting", schema=None) as batch_op: + batch_op.drop_column("daily_composed_video_duration") + batch_op.drop_column("daily_composed_video_s3_key") + + # ### end Alembic commands ### diff --git a/server/migrations/versions/e69f08ead8ea_merge_cloud_recording_and_celery_heads.py b/server/migrations/versions/e69f08ead8ea_merge_cloud_recording_and_celery_heads.py new file mode 100644 index 00000000..bdf8691d --- /dev/null +++ b/server/migrations/versions/e69f08ead8ea_merge_cloud_recording_and_celery_heads.py @@ -0,0 +1,23 @@ +"""merge cloud recording and celery heads + +Revision ID: e69f08ead8ea +Revises: 1b1e6a6fc465, 80beb1ea3269 +Create Date: 2026-01-21 21:39:10.326841 + +""" + +from typing import Sequence, Union + +# revision identifiers, used by Alembic. +revision: str = "e69f08ead8ea" +down_revision: Union[str, None] = ("1b1e6a6fc465", "80beb1ea3269") +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + pass + + +def downgrade() -> None: + pass diff --git a/server/reflector/dailyco_api/__init__.py b/server/reflector/dailyco_api/__init__.py index 65be426e..69e94a08 100644 --- a/server/reflector/dailyco_api/__init__.py +++ b/server/reflector/dailyco_api/__init__.py @@ -3,7 +3,7 @@ Daily.co API Module """ # Client -from .client import DailyApiClient, DailyApiError +from .client import DailyApiClient, DailyApiError, RecordingType # Request models from .requests import ( @@ -64,6 +64,7 @@ __all__ = [ # Client "DailyApiClient", "DailyApiError", + "RecordingType", # Requests "CreateRoomRequest", "RoomProperties", diff --git a/server/reflector/dailyco_api/client.py b/server/reflector/dailyco_api/client.py index 43b76d88..8634039f 100644 --- a/server/reflector/dailyco_api/client.py +++ b/server/reflector/dailyco_api/client.py @@ -7,7 +7,8 @@ Reference: https://docs.daily.co/reference/rest-api """ from http import HTTPStatus -from typing import Any +from typing import Any, Literal +from uuid import UUID import httpx import structlog @@ -32,6 +33,8 @@ from .responses import ( logger = structlog.get_logger(__name__) +RecordingType = Literal["cloud", "raw-tracks"] + class DailyApiError(Exception): """Daily.co API error with full request/response context.""" @@ -395,6 +398,38 @@ class DailyApiClient: return [RecordingResponse(**r) for r in data["data"]] + async def start_recording( + self, + room_name: NonEmptyString, + recording_type: RecordingType, + instance_id: UUID, + ) -> dict[str, Any]: + """Start recording via REST API. + + Reference: https://docs.daily.co/reference/rest-api/rooms/recordings/start + + Args: + room_name: Daily.co room name + recording_type: Recording type + instance_id: UUID for this recording session + + Returns: + Recording start confirmation from Daily.co API + + Raises: + DailyApiError: If API request fails + """ + client = await self._get_client() + response = await client.post( + f"{self.base_url}/rooms/{room_name}/recordings/start", + headers=self.headers, + json={ + "type": recording_type, + "instanceId": str(instance_id), + }, + ) + return await self._handle_response(response, "start_recording") + # ============================================================================ # MEETING TOKENS # ============================================================================ diff --git a/server/reflector/dailyco_api/instance_id.py b/server/reflector/dailyco_api/instance_id.py new file mode 100644 index 00000000..7743229f --- /dev/null +++ b/server/reflector/dailyco_api/instance_id.py @@ -0,0 +1,37 @@ +""" +Daily.co recording instanceId generation utilities. + +Deterministic instance ID generation for cloud and raw-tracks recordings. +MUST match frontend logic +""" + +from uuid import UUID, uuid5 + +from reflector.utils.string import NonEmptyString + +# Namespace UUID for UUIDv5 generation of raw-tracks instanceIds +# DO NOT CHANGE: Breaks instanceId determinism across deployments and frontend/backend matching +RAW_TRACKS_NAMESPACE = UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890") + + +def generate_cloud_instance_id(meeting_id: NonEmptyString) -> UUID: + """ + Generate instanceId for cloud recording. + + Cloud recordings use meeting ID directly as instanceId. + This ensures each meeting has one unique cloud recording. + """ + return UUID(meeting_id) + + +def generate_raw_tracks_instance_id(meeting_id: NonEmptyString) -> UUID: + """ + Generate instanceId for raw-tracks recording. + + Raw-tracks recordings use UUIDv5(meeting_id, namespace) to ensure + different instanceId from cloud while remaining deterministic. + + Daily.co requires cloud and raw-tracks to have different instanceIds + for concurrent recording. + """ + return uuid5(RAW_TRACKS_NAMESPACE, meeting_id) diff --git a/server/reflector/dailyco_api/requests.py b/server/reflector/dailyco_api/requests.py index 0adf892b..885579e0 100644 --- a/server/reflector/dailyco_api/requests.py +++ b/server/reflector/dailyco_api/requests.py @@ -88,13 +88,6 @@ class MeetingTokenProperties(BaseModel): is_owner: bool = Field( default=False, description="Grant owner privileges to token holder" ) - start_cloud_recording: bool = Field( - default=False, description="Automatically start cloud recording on join" - ) - start_cloud_recording_opts: dict | None = Field( - default=None, - description="Options for startRecording when start_cloud_recording is true (e.g., maxDuration)", - ) enable_recording_ui: bool = Field( default=True, description="Show recording controls in UI" ) diff --git a/server/reflector/dailyco_api/responses.py b/server/reflector/dailyco_api/responses.py index 6ac95188..21b8fcf0 100644 --- a/server/reflector/dailyco_api/responses.py +++ b/server/reflector/dailyco_api/responses.py @@ -116,6 +116,7 @@ class RecordingS3Info(BaseModel): bucket_name: NonEmptyString bucket_region: NonEmptyString + key: NonEmptyString | None = None endpoint: NonEmptyString | None = None @@ -132,6 +133,9 @@ class RecordingResponse(BaseModel): id: NonEmptyString = Field(description="Recording identifier") room_name: NonEmptyString = Field(description="Room where recording occurred") start_ts: int = Field(description="Recording start timestamp (Unix epoch seconds)") + type: Literal["cloud", "raw-tracks"] | None = Field( + None, description="Recording type (may be missing from API)" + ) status: RecordingStatus = Field( description="Recording status ('in-progress' or 'finished')" ) @@ -145,6 +149,9 @@ class RecordingResponse(BaseModel): None, description="Token for sharing recording" ) s3: RecordingS3Info | None = Field(None, description="S3 bucket information") + s3key: NonEmptyString | None = Field( + None, description="S3 key for cloud recordings (top-level field)" + ) tracks: list[DailyTrack] = Field( default_factory=list, description="Track list for raw-tracks recordings (always array, never null)", diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index 8a80e756..02f407b2 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timedelta from typing import Any, Literal import sqlalchemy as sa @@ -9,7 +9,7 @@ from reflector.db import get_database, metadata from reflector.db.rooms import Room from reflector.schemas.platform import WHEREBY_PLATFORM, Platform from reflector.utils import generate_uuid4 -from reflector.utils.string import assert_equal +from reflector.utils.string import NonEmptyString, assert_equal meetings = sa.Table( "meeting", @@ -63,6 +63,9 @@ meetings = sa.Table( nullable=False, server_default=assert_equal(WHEREBY_PLATFORM, "whereby"), ), + # Daily.co composed video (Brady Bunch grid layout) - Daily.co only, not Whereby + sa.Column("daily_composed_video_s3_key", sa.String, nullable=True), + sa.Column("daily_composed_video_duration", sa.Integer, nullable=True), sa.Index("idx_meeting_room_id", "room_id"), sa.Index("idx_meeting_calendar_event", "calendar_event_id"), ) @@ -110,6 +113,9 @@ class Meeting(BaseModel): calendar_event_id: str | None = None calendar_metadata: dict[str, Any] | None = None platform: Platform = WHEREBY_PLATFORM + # Daily.co composed video (Brady Bunch grid) - Daily.co only + daily_composed_video_s3_key: str | None = None + daily_composed_video_duration: int | None = None class MeetingController: @@ -171,6 +177,90 @@ class MeetingController: return None return Meeting(**result) + async def get_by_room_name_all(self, room_name: str) -> list[Meeting]: + """Get all meetings for a room name (not just most recent).""" + query = meetings.select().where(meetings.c.room_name == room_name) + results = await get_database().fetch_all(query) + return [Meeting(**r) for r in results] + + async def get_by_room_name_and_time( + self, + room_name: NonEmptyString, + recording_start: datetime, + time_window_hours: int = 168, + ) -> Meeting | None: + """ + Get meeting by room name closest to recording timestamp. + + HACK ALERT: Daily.co doesn't return instanceId in recordings API response, + and mtgSessionId is separate from our instanceId. Time-based matching is + the least-bad workaround. + + This handles edge case of duplicate room_name values in DB (race conditions, + double-clicks, etc.) by matching based on temporal proximity. + + Algorithm: + 1. Find meetings within time_window_hours of recording_start + 2. Return meeting with start_date closest to recording_start + 3. If tie, return first by meeting.id (deterministic) + + Args: + room_name: Daily.co room name from recording + recording_start: Timezone-aware datetime from recording.start_ts + time_window_hours: Search window (default 168 = 1 week) + + Returns: + Meeting closest to recording timestamp, or None if no matches + + Failure modes: + - Multiple meetings in same room within ~5 minutes: picks closest + - All meetings outside time window: returns None + - Clock skew between Daily.co and DB: 1-week window tolerates this + + Why 1 week window: + - Handles webhook failures (recording discovered days later) + - Tolerates clock skew + - Rejects unrelated meetings from weeks ago + + """ + # Validate timezone-aware datetime + if recording_start.tzinfo is None: + raise ValueError( + f"recording_start must be timezone-aware, got naive datetime: {recording_start}" + ) + + window_start = recording_start - timedelta(hours=time_window_hours) + window_end = recording_start + timedelta(hours=time_window_hours) + + query = ( + meetings.select() + .where( + sa.and_( + meetings.c.room_name == room_name, + meetings.c.start_date >= window_start, + meetings.c.start_date <= window_end, + ) + ) + .order_by(meetings.c.start_date) + ) + + results = await get_database().fetch_all(query) + if not results: + return None + + candidates = [Meeting(**r) for r in results] + + # Find meeting with start_date closest to recording_start + closest = min( + candidates, + key=lambda m: ( + abs((m.start_date - recording_start).total_seconds()), + m.id, # Tie-breaker: deterministic by UUID + ), + ) + + return closest + async def get_active(self, room: Room, current_time: datetime) -> Meeting | None: """ Get latest active meeting for a room. @@ -260,6 +350,44 @@ class MeetingController: query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs) await get_database().execute(query) + async def set_cloud_recording_if_missing( + self, + meeting_id: NonEmptyString, + s3_key: NonEmptyString, + duration: int, + ) -> bool: + """ + Set cloud recording only if not already set. + + Returns True if updated, False if already set. + Prevents webhook/polling race condition via atomic WHERE clause. + """ + # Check current value before update to detect actual change + meeting_before = await self.get_by_id(meeting_id) + if not meeting_before: + return False + + was_null = meeting_before.daily_composed_video_s3_key is None + + query = ( + meetings.update() + .where( + sa.and_( + meetings.c.id == meeting_id, + meetings.c.daily_composed_video_s3_key.is_(None), + ) + ) + .values( + daily_composed_video_s3_key=s3_key, + daily_composed_video_duration=duration, + ) + ) + await get_database().execute(query) + + # Return True only if value was NULL before (actual update occurred) + # If was_null=False, the WHERE clause prevented the update + return was_null + async def increment_num_clients(self, meeting_id: str) -> None: """Atomically increment participant count.""" query = ( diff --git a/server/reflector/db/recordings.py b/server/reflector/db/recordings.py index 82609b38..bf799561 100644 --- a/server/reflector/db/recordings.py +++ b/server/reflector/db/recordings.py @@ -7,6 +7,7 @@ from sqlalchemy import or_ from reflector.db import get_database, metadata from reflector.utils import generate_uuid4 +from reflector.utils.string import NonEmptyString recordings = sa.Table( "recording", @@ -71,6 +72,19 @@ class RecordingController: query = recordings.delete().where(recordings.c.id == id) await get_database().execute(query) + async def set_meeting_id( + self, + recording_id: NonEmptyString, + meeting_id: NonEmptyString, + ) -> None: + """Link recording to meeting.""" + query = ( + recordings.update() + .where(recordings.c.id == recording_id) + .values(meeting_id=meeting_id) + ) + await get_database().execute(query) + # no check for existence async def get_by_ids(self, recording_ids: list[str]) -> list[Recording]: if not recording_ids: diff --git a/server/reflector/video_platforms/daily.py b/server/reflector/video_platforms/daily.py index f71e959b..cef78b4c 100644 --- a/server/reflector/video_platforms/daily.py +++ b/server/reflector/video_platforms/daily.py @@ -1,4 +1,5 @@ from datetime import datetime +from uuid import UUID from reflector.dailyco_api import ( CreateMeetingTokenRequest, @@ -12,9 +13,11 @@ from reflector.dailyco_api import ( RoomProperties, verify_webhook_signature, ) +from reflector.dailyco_api import RecordingType as DailyRecordingType from reflector.db.daily_participant_sessions import ( daily_participant_sessions_controller, ) +from reflector.db.meetings import meetings_controller from reflector.db.rooms import Room from reflector.logger import logger from reflector.storage import get_dailyco_storage @@ -58,10 +61,9 @@ class DailyClient(VideoPlatformClient): enable_recording = None if room.recording_type == self.RECORDING_LOCAL: enable_recording = "local" - elif ( - room.recording_type == self.RECORDING_CLOUD - ): # daily "cloud" is not our "cloud" - enable_recording = "raw-tracks" + elif room.recording_type == self.RECORDING_CLOUD: + # Don't set enable_recording - recordings started via REST API (not auto-start) + enable_recording = None properties = RoomProperties( enable_recording=enable_recording, @@ -106,8 +108,6 @@ class DailyClient(VideoPlatformClient): Daily.co doesn't provide historical session API, so we query our database where participant.joined/left webhooks are stored. """ - from reflector.db.meetings import meetings_controller # noqa: PLC0415 - meeting = await meetings_controller.get_by_room_name(room_name) if not meeting: return [] @@ -179,21 +179,14 @@ class DailyClient(VideoPlatformClient): async def create_meeting_token( self, room_name: DailyRoomName, - start_cloud_recording: bool, enable_recording_ui: bool, user_id: NonEmptyString | None = None, is_owner: bool = False, max_recording_duration_seconds: int | None = None, ) -> NonEmptyString: - start_cloud_recording_opts = None - if start_cloud_recording and max_recording_duration_seconds: - start_cloud_recording_opts = {"maxDuration": max_recording_duration_seconds} - properties = MeetingTokenProperties( room_name=room_name, user_id=user_id, - start_cloud_recording=start_cloud_recording, - start_cloud_recording_opts=start_cloud_recording_opts, enable_recording_ui=enable_recording_ui, is_owner=is_owner, ) @@ -201,6 +194,23 @@ class DailyClient(VideoPlatformClient): result = await self._api_client.create_meeting_token(request) return result.token + async def start_recording( + self, + room_name: DailyRoomName, + recording_type: DailyRecordingType, + instance_id: UUID, + ) -> dict: + """Start recording via Daily.co REST API. + + Args: + instance_id: UUID for this recording session - one UUID per "room" in Daily (which is "meeting" in Reflector) + """ + return await self._api_client.start_recording( + room_name=room_name, + recording_type=recording_type, + instance_id=instance_id, + ) + async def close(self): """Clean up API client resources.""" await self._api_client.close() diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index cbdac409..384290da 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -19,6 +19,7 @@ from reflector.video_platforms.factory import create_platform_client from reflector.worker.process import ( poll_daily_room_presence_task, process_multitrack_recording, + store_cloud_recording, ) router = APIRouter() @@ -174,46 +175,64 @@ async def _handle_recording_started(event: RecordingStartedEvent): async def _handle_recording_ready(event: RecordingReadyEvent): room_name = event.payload.room_name recording_id = event.payload.recording_id - tracks = event.payload.tracks - - if not tracks: - logger.warning( - "recording.ready-to-download: missing tracks", - room_name=room_name, - recording_id=recording_id, - payload=event.payload, - ) - return + recording_type = event.payload.type logger.info( "Recording ready for download", room_name=room_name, recording_id=recording_id, - num_tracks=len(tracks), + recording_type=recording_type, platform="daily", ) bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME if not bucket_name: - logger.error( - "DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; cannot process Daily recording" - ) + logger.error("DAILYCO_STORAGE_AWS_BUCKET_NAME not configured") return - track_keys = [t.s3Key for t in tracks if t.type == "audio"] + if recording_type == "cloud": + await store_cloud_recording( + recording_id=recording_id, + room_name=room_name, + s3_key=event.payload.s3_key, + duration=event.payload.duration, + start_ts=event.payload.start_ts, + source="webhook", + ) - logger.info( - "Recording webhook queuing processing", - recording_id=recording_id, - room_name=room_name, - ) + elif recording_type == "raw-tracks": + tracks = event.payload.tracks + if not tracks: + logger.warning( + "raw-tracks recording: missing tracks array", + room_name=room_name, + recording_id=recording_id, + ) + return - process_multitrack_recording.delay( - bucket_name=bucket_name, - daily_room_name=room_name, - recording_id=recording_id, - track_keys=track_keys, - ) + track_keys = [t.s3Key for t in tracks if t.type == "audio"] + + logger.info( + "Raw-tracks recording queuing processing", + recording_id=recording_id, + room_name=room_name, + num_tracks=len(track_keys), + ) + + process_multitrack_recording.delay( + bucket_name=bucket_name, + daily_room_name=room_name, + recording_id=recording_id, + track_keys=track_keys, + recording_start_ts=event.payload.start_ts, + ) + + else: + logger.warning( + "Unknown recording type", + recording_type=recording_type, + recording_id=recording_id, + ) async def _handle_recording_error(event: RecordingErrorEvent): diff --git a/server/reflector/views/meetings.py b/server/reflector/views/meetings.py index 25987e47..44adf500 100644 --- a/server/reflector/views/meetings.py +++ b/server/reflector/views/meetings.py @@ -1,16 +1,23 @@ +import json from datetime import datetime, timezone -from typing import Annotated, Optional +from typing import Annotated, Any, Optional +from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Request from pydantic import BaseModel import reflector.auth as auth +from reflector.dailyco_api import RecordingType +from reflector.dailyco_api.client import DailyApiError from reflector.db.meetings import ( MeetingConsent, meeting_consent_controller, meetings_controller, ) from reflector.db.rooms import rooms_controller +from reflector.logger import logger +from reflector.utils.string import NonEmptyString +from reflector.video_platforms.factory import create_platform_client router = APIRouter() @@ -73,3 +80,72 @@ async def meeting_deactivate( await meetings_controller.update_meeting(meeting_id, is_active=False) return {"status": "success", "meeting_id": meeting_id} + + +class StartRecordingRequest(BaseModel): + type: RecordingType + instanceId: UUID + + +@router.post("/meetings/{meeting_id}/recordings/start") +async def start_recording( + meeting_id: NonEmptyString, body: StartRecordingRequest +) -> dict[str, Any]: + """Start cloud or raw-tracks recording via Daily.co REST API. + + Both cloud and raw-tracks are started via REST API to bypass enable_recording limitation of allowing only 1 recording at a time. + Uses different instanceIds for cloud vs raw-tracks (same won't work) + + Note: No authentication required - anonymous users supported. TODO this is a DOS vector + """ + meeting = await meetings_controller.get_by_id(meeting_id) + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + log = logger.bind( + meeting_id=meeting_id, + room_name=meeting.room_name, + recording_type=body.type, + instance_id=body.instanceId, + ) + + try: + client = create_platform_client("daily") + result = await client.start_recording( + room_name=meeting.room_name, + recording_type=body.type, + instance_id=body.instanceId, + ) + + log.info(f"Started {body.type} recording via REST API") + + return {"status": "ok", "result": result} + + except DailyApiError as e: + # Parse Daily.co error response to detect "has an active stream" + try: + error_body = json.loads(e.response_body) + error_info = error_body.get("info", "") + + # "has an active stream" means recording already started by another participant + # This is SUCCESS from business logic perspective - return 200 + if "has an active stream" in error_info: + log.info( + f"{body.type} recording already active (started by another participant)" + ) + return {"status": "already_active", "instanceId": str(body.instanceId)} + except (json.JSONDecodeError, KeyError): + pass # Fall through to error handling + + # All other Daily.co API errors + log.error(f"Failed to start {body.type} recording", error=str(e)) + raise HTTPException( + status_code=500, detail=f"Failed to start recording: {str(e)}" + ) + + except Exception as e: + # Non-Daily.co errors + log.error(f"Failed to start {body.type} recording", error=str(e)) + raise HTTPException( + status_code=500, detail=f"Failed to start recording: {str(e)}" + ) diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index 278235b4..11e668c0 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -73,6 +73,8 @@ class Meeting(BaseModel): calendar_event_id: str | None = None calendar_metadata: dict[str, Any] | None = None platform: Platform + daily_composed_video_s3_key: str | None = None + daily_composed_video_duration: int | None = None class CreateRoom(BaseModel): @@ -586,7 +588,6 @@ async def rooms_join_meeting( ) token = await client.create_meeting_token( meeting.room_name, - start_cloud_recording=meeting.recording_type == "cloud", enable_recording_ui=enable_recording_ui, user_id=user_id, is_owner=user_id == room.user_id, diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index b1256c94..a353cf55 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -6,6 +6,11 @@ from celery.schedules import crontab from reflector.settings import settings logger = structlog.get_logger(__name__) + +# Polling intervals (seconds) +# Webhook-aware: 180s when webhook configured (backup mode), 15s when no webhook (primary discovery) +POLL_DAILY_RECORDINGS_INTERVAL_SEC = 180.0 if settings.DAILY_WEBHOOK_SECRET else 15.0 + if celery.current_app.main != "default": logger.info(f"Celery already configured ({celery.current_app})") app = celery.current_app @@ -44,7 +49,7 @@ else: }, "poll_daily_recordings": { "task": "reflector.worker.process.poll_daily_recordings", - "schedule": 180.0, # Every 3 minutes (configurable lookback window) + "schedule": POLL_DAILY_RECORDINGS_INTERVAL_SEC, }, "trigger_daily_reconciliation": { "task": "reflector.worker.process.trigger_daily_reconciliation", diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index a1fa3a3c..8d88de43 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -2,7 +2,7 @@ import json import os import re from datetime import datetime, timezone -from typing import List +from typing import List, Literal from urllib.parse import unquote import av @@ -42,6 +42,7 @@ from reflector.utils.daily import ( filter_cam_audio_tracks, recording_lock_key, ) +from reflector.utils.string import NonEmptyString from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.whereby_utils import ( parse_whereby_recording_filename, @@ -175,13 +176,18 @@ async def process_multitrack_recording( daily_room_name: DailyRoomName, recording_id: str, track_keys: list[str], + recording_start_ts: int, ): + """ + Process raw-tracks (multitrack) recording from Daily.co. + """ logger.info( "Processing multitrack recording", bucket=bucket_name, room_name=daily_room_name, recording_id=recording_id, provided_keys=len(track_keys), + recording_start_ts=recording_start_ts, ) if not track_keys: @@ -212,7 +218,7 @@ async def process_multitrack_recording( ) await _process_multitrack_recording_inner( - bucket_name, daily_room_name, recording_id, track_keys + bucket_name, daily_room_name, recording_id, track_keys, recording_start_ts ) @@ -221,8 +227,18 @@ async def _process_multitrack_recording_inner( daily_room_name: DailyRoomName, recording_id: str, track_keys: list[str], + recording_start_ts: int, ): - """Inner function containing the actual processing logic.""" + """ + Process multitrack recording (first time or reprocessing). + + For first processing (webhook/polling): + - Uses recording_start_ts for time-based meeting matching (no instanceId available) + + For reprocessing: + - Uses recording.meeting_id directly (already linked during first processing) + - recording_start_ts is ignored + """ tz = timezone.utc recorded_at = datetime.now(tz) @@ -240,7 +256,53 @@ async def _process_multitrack_recording_inner( exc_info=True, ) - meeting = await meetings_controller.get_by_room_name(daily_room_name) + # Check if recording already exists (reprocessing path) + recording = await recordings_controller.get_by_id(recording_id) + + if recording and recording.meeting_id: + # Reprocessing: recording exists with meeting already linked + meeting = await meetings_controller.get_by_id(recording.meeting_id) + if not meeting: + logger.error( + "Reprocessing: meeting not found for recording - skipping", + meeting_id=recording.meeting_id, + recording_id=recording_id, + ) + return + + logger.info( + "Reprocessing: using existing recording.meeting_id", + recording_id=recording_id, + meeting_id=meeting.id, + room_name=daily_room_name, + ) + else: + # First processing: recording doesn't exist, need time-based matching + # (Daily.co doesn't return instanceId in API, must match by timestamp) + recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc) + meeting = await meetings_controller.get_by_room_name_and_time( + room_name=daily_room_name, + recording_start=recording_start, + time_window_hours=168, # 1 week + ) + if not meeting: + logger.error( + "Raw-tracks: no meeting found within 1-week window (time-based match) - skipping", + recording_id=recording_id, + room_name=daily_room_name, + recording_start_ts=recording_start_ts, + recording_start=recording_start.isoformat(), + ) + return # Skip processing, will retry on next poll + logger.info( + "First processing: found meeting via time-based matching", + meeting_id=meeting.id, + room_name=daily_room_name, + recording_id=recording_id, + time_delta_seconds=abs( + (meeting.start_date - recording_start).total_seconds() + ), + ) room_name_base = extract_base_room_name(daily_room_name) @@ -248,18 +310,8 @@ async def _process_multitrack_recording_inner( if not room: raise Exception(f"Room not found: {room_name_base}") - if not meeting: - raise Exception(f"Meeting not found: {room_name_base}") - - logger.info( - "Found existing Meeting for recording", - meeting_id=meeting.id, - room_name=daily_room_name, - recording_id=recording_id, - ) - - recording = await recordings_controller.get_by_id(recording_id) if not recording: + # Create recording (only happens during first processing) object_key_dir = os.path.dirname(track_keys[0]) if track_keys else "" recording = await recordings_controller.create( Recording( @@ -271,7 +323,19 @@ async def _process_multitrack_recording_inner( track_keys=track_keys, ) ) - # else: Recording already exists; metadata set at creation time + elif not recording.meeting_id: + # Recording exists but meeting_id is null (failed first processing) + # Update with meeting from time-based matching + await recordings_controller.set_meeting_id( + recording_id=recording.id, + meeting_id=meeting.id, + ) + recording.meeting_id = meeting.id + logger.info( + "Updated existing recording with meeting_id", + recording_id=recording.id, + meeting_id=meeting.id, + ) transcript = await transcripts_controller.get_by_recording_id(recording.id) if not transcript: @@ -338,9 +402,11 @@ async def poll_daily_recordings(): """Poll Daily.co API for recordings and process missing ones. Fetches latest recordings from Daily.co API (default limit 100), compares with DB, - and queues processing for recordings not already in DB. + and stores/queues missing recordings: + - Cloud recordings: Store S3 key in meeting table + - Raw-tracks recordings: Queue multitrack processing - For each missing recording, uses audio tracks from API response. + Acts as fallback when webhooks active, primary discovery when webhooks unavailable. Worker-level locking provides idempotency (see process_multitrack_recording). """ @@ -381,51 +447,222 @@ async def poll_daily_recordings(): ) return - recording_ids = [rec.id for rec in finished_recordings] + # Separate cloud and raw-tracks recordings + cloud_recordings = [] + raw_tracks_recordings = [] + for rec in finished_recordings: + if rec.type: + # Daily.co API returns null type - make sure this assumption stays + # If this logs, Daily.co API changed - we can remove inference logic. + recording_type = rec.type + logger.warning( + "Recording has explicit type field from Daily.co API (unexpected, API may have changed)", + recording_id=rec.id, + room_name=rec.room_name, + recording_type=recording_type, + has_s3key=bool(rec.s3key), + tracks_count=len(rec.tracks), + ) + else: + # DAILY.CO API LIMITATION: + # GET /recordings response does NOT include type field. + # Daily.co docs mention type field exists, but API never returns it. + # Verified: 84 recordings from Nov 2025 - Jan 2026 ALL have type=None. + # + # This is not a recent API change - Daily.co has never returned type. + # Must infer from structural properties. + # + # Inference heuristic (reliable for finished recordings): + # - Has tracks array → raw-tracks + # - Has s3key but no tracks → cloud + # - Neither → failed/incomplete recording + if len(rec.tracks) > 0: + recording_type = "raw-tracks" + elif rec.s3key and len(rec.tracks) == 0: + recording_type = "cloud" + else: + logger.warning( + "Recording has no type, no s3key, and no tracks - likely failed recording", + recording_id=rec.id, + room_name=rec.room_name, + status=rec.status, + duration=rec.duration, + mtg_session_id=rec.mtgSessionId, + ) + continue + + if recording_type == "cloud": + cloud_recordings.append(rec) + else: + raw_tracks_recordings.append(rec) + + logger.debug( + "Poll results", + total=len(finished_recordings), + cloud=len(cloud_recordings), + raw_tracks=len(raw_tracks_recordings), + ) + + # Process cloud recordings + await _poll_cloud_recordings(cloud_recordings) + + # Process raw-tracks recordings + await _poll_raw_tracks_recordings(raw_tracks_recordings, bucket_name) + + +async def store_cloud_recording( + recording_id: NonEmptyString, + room_name: NonEmptyString, + s3_key: NonEmptyString, + duration: int, + start_ts: int, + source: Literal["webhook", "polling"], +) -> bool: + """ + Store cloud recording reference in meeting table. + + Common function for both webhook and polling code paths. + Uses time-based matching to handle duplicate room_name values. + + Args: + recording_id: Daily.co recording ID + room_name: Daily.co room name + s3_key: S3 key where recording is stored + duration: Recording duration in seconds + start_ts: Unix timestamp when recording started + source: "webhook" or "polling" (for logging) + + Returns: + True if stored, False if skipped/failed + """ + recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc) + + meeting = await meetings_controller.get_by_room_name_and_time( + room_name=room_name, + recording_start=recording_start, + time_window_hours=168, # 1 week + ) + + if not meeting: + logger.warning( + f"Cloud recording ({source}): no meeting found within 1-week window", + recording_id=recording_id, + room_name=room_name, + recording_start_ts=start_ts, + recording_start=recording_start.isoformat(), + ) + return False + + success = await meetings_controller.set_cloud_recording_if_missing( + meeting_id=meeting.id, + s3_key=s3_key, + duration=duration, + ) + + if not success: + logger.debug( + f"Cloud recording ({source}): already set (race lost)", + recording_id=recording_id, + room_name=room_name, + meeting_id=meeting.id, + ) + return False + + logger.info( + f"Cloud recording stored via {source} (time-based match)", + meeting_id=meeting.id, + recording_id=recording_id, + s3_key=s3_key, + duration=duration, + time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()), + ) + return True + + +async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]): + """ + Store cloud recordings missing from meeting table via polling. + + Uses time-based matching via store_cloud_recording(). + """ + if not cloud_recordings: + return + + stored_count = 0 + for recording in cloud_recordings: + # Extract S3 key from recording (cloud recordings use s3key field) + s3_key = recording.s3key or (recording.s3.key if recording.s3 else None) + if not s3_key: + logger.warning( + "Cloud recording: missing S3 key", + recording_id=recording.id, + room_name=recording.room_name, + ) + continue + + stored = await store_cloud_recording( + recording_id=recording.id, + room_name=recording.room_name, + s3_key=s3_key, + duration=recording.duration, + start_ts=recording.start_ts, + source="polling", + ) + if stored: + stored_count += 1 + + logger.info( + "Cloud recording polling complete", + total=len(cloud_recordings), + stored=stored_count, + ) + + +async def _poll_raw_tracks_recordings( + raw_tracks_recordings: List[FinishedRecordingResponse], + bucket_name: str, +): + """Queue raw-tracks recordings missing from DB (existing logic).""" + if not raw_tracks_recordings: + return + + recording_ids = [rec.id for rec in raw_tracks_recordings] existing_recordings = await recordings_controller.get_by_ids(recording_ids) existing_ids = {rec.id for rec in existing_recordings} missing_recordings = [ - rec for rec in finished_recordings if rec.id not in existing_ids + rec for rec in raw_tracks_recordings if rec.id not in existing_ids ] if not missing_recordings: logger.debug( - "All recordings already in DB", - api_count=len(finished_recordings), + "All raw-tracks recordings already in DB", + api_count=len(raw_tracks_recordings), existing_count=len(existing_recordings), ) return logger.info( - "Found recordings missing from DB", + "Found raw-tracks recordings missing from DB", missing_count=len(missing_recordings), - total_api_count=len(finished_recordings), + total_api_count=len(raw_tracks_recordings), existing_count=len(existing_recordings), ) for recording in missing_recordings: if not recording.tracks: - if recording.status == "finished": - logger.warning( - "Finished recording has no tracks (no audio captured)", - recording_id=recording.id, - room_name=recording.room_name, - ) - else: - logger.debug( - "No tracks in recording yet", - recording_id=recording.id, - room_name=recording.room_name, - status=recording.status, - ) + logger.warning( + "Finished raw-tracks recording has no tracks (no audio captured)", + recording_id=recording.id, + room_name=recording.room_name, + ) continue track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"] if not track_keys: logger.warning( - "No audio tracks found in recording (only video tracks)", + "No audio tracks found in raw-tracks recording", recording_id=recording.id, room_name=recording.room_name, total_tracks=len(recording.tracks), @@ -433,7 +670,7 @@ async def poll_daily_recordings(): continue logger.info( - "Queueing missing recording for processing", + "Queueing missing raw-tracks recording for processing", recording_id=recording.id, room_name=recording.room_name, track_count=len(track_keys), @@ -444,6 +681,7 @@ async def poll_daily_recordings(): daily_room_name=recording.room_name, recording_id=recording.id, track_keys=track_keys, + recording_start_ts=recording.start_ts, ) @@ -883,11 +1121,16 @@ async def reprocess_failed_daily_recordings(): transcript_status=transcript.status if transcript else None, ) + # For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner) + # Reprocessing uses recording.meeting_id directly instead of time-based matching + recording_start_ts = int(recording.recorded_at.timestamp()) + process_multitrack_recording.delay( bucket_name=bucket_name, daily_room_name=meeting.room_name, recording_id=recording.id, track_keys=recording.track_keys, + recording_start_ts=recording_start_ts, ) reprocessed_count += 1 diff --git a/server/tests/test_dailyco_instance_id.py b/server/tests/test_dailyco_instance_id.py new file mode 100644 index 00000000..d410205b --- /dev/null +++ b/server/tests/test_dailyco_instance_id.py @@ -0,0 +1,147 @@ +""" +Tests for Daily.co instanceId generation. + +Verifies deterministic behavior and frontend/backend consistency. +""" + +import pytest + +from reflector.dailyco_api.instance_id import ( + RAW_TRACKS_NAMESPACE, + generate_cloud_instance_id, + generate_raw_tracks_instance_id, +) + + +class TestInstanceIdDeterminism: + """Test deterministic generation of instanceIds.""" + + def test_cloud_instance_id_is_meeting_id(self): + """Cloud instanceId is meeting ID directly (implicitly tests determinism).""" + meeting_id = "550e8400-e29b-41d4-a716-446655440000" + result1 = generate_cloud_instance_id(meeting_id) + result2 = generate_cloud_instance_id(meeting_id) + assert str(result1) == meeting_id + assert result1 == result2 + + def test_raw_tracks_instance_id_deterministic(self): + """Raw-tracks instanceId generation is deterministic.""" + meeting_id = "550e8400-e29b-41d4-a716-446655440000" + result1 = generate_raw_tracks_instance_id(meeting_id) + result2 = generate_raw_tracks_instance_id(meeting_id) + assert result1 == result2 + + def test_raw_tracks_different_from_cloud(self): + """Raw-tracks instanceId differs from cloud instanceId.""" + meeting_id = "550e8400-e29b-41d4-a716-446655440000" + cloud_id = generate_cloud_instance_id(meeting_id) + raw_tracks_id = generate_raw_tracks_instance_id(meeting_id) + assert cloud_id != raw_tracks_id + + def test_different_meetings_different_instance_ids(self): + """Different meetings generate different instanceIds.""" + meeting_id1 = "550e8400-e29b-41d4-a716-446655440000" + meeting_id2 = "6ba7b810-9dad-11d1-80b4-00c04fd430c8" + + cloud1 = generate_cloud_instance_id(meeting_id1) + cloud2 = generate_cloud_instance_id(meeting_id2) + assert cloud1 != cloud2 + + raw1 = generate_raw_tracks_instance_id(meeting_id1) + raw2 = generate_raw_tracks_instance_id(meeting_id2) + assert raw1 != raw2 + + +class TestFrontendBackendConsistency: + """Test that backend matches frontend logic.""" + + def test_namespace_matches_frontend(self): + """Namespace UUID matches frontend RAW_TRACKS_NAMESPACE constant.""" + # From www/app/[roomName]/components/DailyRoom.tsx + frontend_namespace = "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + assert str(RAW_TRACKS_NAMESPACE) == frontend_namespace + + def test_raw_tracks_generation_matches_frontend_logic(self): + """Backend UUIDv5 generation matches frontend uuidv5() call.""" + # Example meeting ID + meeting_id = "550e8400-e29b-41d4-a716-446655440000" + + # Backend result + backend_result = generate_raw_tracks_instance_id(meeting_id) + + # Expected result from frontend: uuidv5(meeting.id, RAW_TRACKS_NAMESPACE) + # Python uuid5 uses (namespace, name) argument order + # JavaScript uuid.v5(name, namespace) - same args, different order + # Frontend: uuidv5(meeting.id, "a1b2c3d4-e5f6-7890-abcd-ef1234567890") + # Backend: uuid5(UUID("a1b2c3d4-e5f6-7890-abcd-ef1234567890"), meeting.id) + + # Verify it's a valid UUID (will raise if not) + assert len(str(backend_result)) == 36 + assert backend_result.version == 5 + + +class TestEdgeCases: + """Test edge cases and error conditions.""" + + def test_invalid_uuid_format_raises(self): + """Invalid UUID format raises ValueError.""" + with pytest.raises(ValueError): + generate_cloud_instance_id("not-a-uuid") + + def test_lowercase_uuid_normalized_for_cloud(self): + """Cloud instanceId: lowercase/uppercase UUIDs produce same result.""" + meeting_id_lower = "550e8400-e29b-41d4-a716-446655440000" + meeting_id_upper = "550E8400-E29B-41D4-A716-446655440000" + + cloud_lower = generate_cloud_instance_id(meeting_id_lower) + cloud_upper = generate_cloud_instance_id(meeting_id_upper) + assert cloud_lower == cloud_upper + + def test_uuid5_is_case_sensitive_warning(self): + """ + Documents uuid5 case sensitivity - different case UUIDs produce different hashes. + + Not a problem: meeting.id always lowercase from DB and API. + Frontend generates raw-tracks instanceId from lowercase meeting.id. + Backend receives lowercase meeting_id when matching. + + This test documents the behavior, not a requirement. + """ + meeting_id_lower = "550e8400-e29b-41d4-a716-446655440000" + meeting_id_upper = "550E8400-E29B-41D4-A716-446655440000" + + raw_lower = generate_raw_tracks_instance_id(meeting_id_lower) + raw_upper = generate_raw_tracks_instance_id(meeting_id_upper) + assert raw_lower != raw_upper + + +class TestMtgSessionIdVsInstanceId: + """ + Documents that Daily.co's mtgSessionId differs from our instanceId. + + Why this matters: We investigated using mtgSessionId for matching but discovered + it's Daily.co-generated and unrelated to instanceId we send. This test documents + that finding so we don't investigate it again. + + Production data from 2026-01-13: + - Meeting ID: 4ad503b6-8189-4910-a8f7-68cdd1b7f990 + - Cloud instanceId: 4ad503b6-8189-4910-a8f7-68cdd1b7f990 (same as meeting ID) + - Raw-tracks instanceId: 784b3af3-c7dd-57f0-ac54-2ee91c6927cb (UUIDv5 derived) + - Recording mtgSessionId: f25a2e09-740f-4932-9c0d-b1bebaa669c6 (different!) + + Conclusion: Cannot use mtgSessionId for recording-to-meeting matching. + """ + + def test_mtg_session_id_differs_from_our_instance_ids(self): + """mtgSessionId (Daily.co) != instanceId (ours) for both cloud and raw-tracks.""" + meeting_id = "4ad503b6-8189-4910-a8f7-68cdd1b7f990" + expected_raw_tracks_id = "784b3af3-c7dd-57f0-ac54-2ee91c6927cb" + mtg_session_id = "f25a2e09-740f-4932-9c0d-b1bebaa669c6" + + cloud_instance_id = generate_cloud_instance_id(meeting_id) + raw_tracks_instance_id = generate_raw_tracks_instance_id(meeting_id) + + assert str(cloud_instance_id) == meeting_id + assert str(raw_tracks_instance_id) == expected_raw_tracks_id + assert str(cloud_instance_id) != mtg_session_id + assert str(raw_tracks_instance_id) != mtg_session_id diff --git a/server/tests/test_time_based_meeting_matching.py b/server/tests/test_time_based_meeting_matching.py new file mode 100644 index 00000000..3506c183 --- /dev/null +++ b/server/tests/test_time_based_meeting_matching.py @@ -0,0 +1,374 @@ +""" +Integration tests for time-based meeting-to-recording matching. + +Tests the critical path for matching Daily.co recordings to meetings when +API doesn't return instanceId. +""" + +from datetime import datetime, timedelta, timezone + +import pytest + +from reflector.db.meetings import meetings_controller +from reflector.db.rooms import rooms_controller + + +@pytest.fixture +async def test_room(): + """Create a test room for meetings.""" + room = await rooms_controller.add( + name="test-room-time", + user_id="test-user-id", + zulip_auto_post=False, + zulip_stream="", + zulip_topic="", + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic", + is_shared=False, + platform="daily", + ) + return room + + +@pytest.fixture +def base_time(): + """Fixed timestamp for deterministic tests.""" + return datetime(2026, 1, 14, 9, 0, 0, tzinfo=timezone.utc) + + +class TestTimeBasedMatching: + """Test get_by_room_name_and_time() matching logic.""" + + async def test_exact_time_match(self, test_room, base_time): + """Recording timestamp exactly matches meeting start_date.""" + meeting = await meetings_controller.create( + id="meeting-exact", + room_name="daily-test-20260114090000", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-20260114090000", + recording_start=base_time, + time_window_hours=168, + ) + + assert result is not None + assert result.id == meeting.id + + async def test_recording_slightly_after_meeting_start(self, test_room, base_time): + """Recording started 1 minute after meeting (participants joined late).""" + meeting = await meetings_controller.create( + id="meeting-late", + room_name="daily-test-20260114090100", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + recording_start = base_time + timedelta(minutes=1) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-20260114090100", + recording_start=recording_start, + time_window_hours=168, + ) + + assert result is not None + assert result.id == meeting.id + + async def test_duplicate_room_names_picks_closest(self, test_room, base_time): + """ + Two meetings with same room_name (duplicate/race condition). + Should pick closest by timestamp. + """ + meeting1 = await meetings_controller.create( + id="meeting-1-first", + room_name="daily-duplicate-room", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + meeting2 = await meetings_controller.create( + id="meeting-2-second", + room_name="daily-duplicate-room", # Same room_name! + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time + timedelta(seconds=0.99), # 0.99s later + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + # Recording started 0.5s after meeting1 + # Distance: meeting1 = 0.5s, meeting2 = 0.49s → meeting2 is closer + recording_start = base_time + timedelta(seconds=0.5) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-duplicate-room", + recording_start=recording_start, + time_window_hours=168, + ) + + assert result is not None + assert result.id == meeting2.id # meeting2 is closer (0.49s vs 0.5s) + + async def test_outside_time_window_returns_none(self, test_room, base_time): + """Recording outside 1-week window returns None.""" + await meetings_controller.create( + id="meeting-old", + room_name="daily-test-old", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + # Recording 8 days later (outside 7-day window) + recording_start = base_time + timedelta(days=8) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-old", + recording_start=recording_start, + time_window_hours=168, + ) + + assert result is None + + async def test_tie_breaker_deterministic(self, test_room, base_time): + """When time delta identical, tie-breaker by meeting.id is deterministic.""" + meeting_z = await meetings_controller.create( + id="zzz-last-uuid", + room_name="daily-test-tie", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + meeting_a = await meetings_controller.create( + id="aaa-first-uuid", + room_name="daily-test-tie", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, # Exact same start_date + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-tie", + recording_start=base_time, + time_window_hours=168, + ) + + assert result is not None + # Tie-breaker: lexicographically first UUID + assert result.id == "aaa-first-uuid" + + async def test_timezone_naive_datetime_raises(self, test_room, base_time): + """Timezone-naive datetime raises ValueError.""" + await meetings_controller.create( + id="meeting-tz", + room_name="daily-test-tz", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + # Naive datetime (no timezone) + naive_dt = datetime(2026, 1, 14, 9, 0, 0) + + with pytest.raises(ValueError, match="timezone-aware"): + await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-tz", + recording_start=naive_dt, + time_window_hours=168, + ) + + async def test_one_week_boundary_after_included(self, test_room, base_time): + """Meeting 1-week AFTER recording is included (window_end boundary).""" + meeting_time = base_time + timedelta(hours=168) + + await meetings_controller.create( + id="meeting-boundary-after", + room_name="daily-test-boundary-after", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=meeting_time, + end_date=meeting_time + timedelta(hours=1), + room=test_room, + ) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-boundary-after", + recording_start=base_time, + time_window_hours=168, + ) + + assert result is not None + assert result.id == "meeting-boundary-after" + + async def test_one_week_boundary_before_included(self, test_room, base_time): + """Meeting 1-week BEFORE recording is included (window_start boundary).""" + meeting_time = base_time - timedelta(hours=168) + + await meetings_controller.create( + id="meeting-boundary-before", + room_name="daily-test-boundary-before", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=meeting_time, + end_date=meeting_time + timedelta(hours=1), + room=test_room, + ) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-boundary-before", + recording_start=base_time, + time_window_hours=168, + ) + + assert result is not None + assert result.id == "meeting-boundary-before" + + async def test_recording_before_meeting_start(self, test_room, base_time): + """Recording started before meeting (clock skew or early join).""" + await meetings_controller.create( + id="meeting-early", + room_name="daily-test-early", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + recording_start = base_time - timedelta(minutes=2) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-early", + recording_start=recording_start, + time_window_hours=168, + ) + + assert result is not None + assert result.id == "meeting-early" + + async def test_mixed_inside_outside_window(self, test_room, base_time): + """Multiple meetings, only one inside window - returns the inside one.""" + await meetings_controller.create( + id="meeting-old", + room_name="daily-test-mixed", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time - timedelta(days=10), + end_date=base_time - timedelta(days=10, hours=-1), + room=test_room, + ) + + await meetings_controller.create( + id="meeting-inside", + room_name="daily-test-mixed", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time - timedelta(days=2), + end_date=base_time - timedelta(days=2, hours=-1), + room=test_room, + ) + + await meetings_controller.create( + id="meeting-future", + room_name="daily-test-mixed", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time + timedelta(days=10), + end_date=base_time + timedelta(days=10, hours=1), + room=test_room, + ) + + result = await meetings_controller.get_by_room_name_and_time( + room_name="daily-test-mixed", + recording_start=base_time, + time_window_hours=168, + ) + + assert result is not None + assert result.id == "meeting-inside" + + +class TestAtomicCloudRecordingUpdate: + """Test atomic update prevents race conditions.""" + + async def test_first_update_succeeds(self, test_room, base_time): + """First call to set_cloud_recording_if_missing succeeds.""" + meeting = await meetings_controller.create( + id="meeting-atomic-1", + room_name="daily-test-atomic", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + success = await meetings_controller.set_cloud_recording_if_missing( + meeting_id=meeting.id, + s3_key="first-s3-key", + duration=100, + ) + + assert success is True + + updated = await meetings_controller.get_by_id(meeting.id) + assert updated.daily_composed_video_s3_key == "first-s3-key" + assert updated.daily_composed_video_duration == 100 + + async def test_second_update_fails_atomically(self, test_room, base_time): + """Second call to update same meeting doesn't overwrite (atomic check).""" + meeting = await meetings_controller.create( + id="meeting-atomic-2", + room_name="daily-test-atomic2", + room_url="https://example.daily.co/test", + host_room_url="https://example.daily.co/test?t=host", + start_date=base_time, + end_date=base_time + timedelta(hours=1), + room=test_room, + ) + + success1 = await meetings_controller.set_cloud_recording_if_missing( + meeting_id=meeting.id, + s3_key="first-s3-key", + duration=100, + ) + + assert success1 is True + + after_first = await meetings_controller.get_by_id(meeting.id) + assert after_first.daily_composed_video_s3_key == "first-s3-key" + + success2 = await meetings_controller.set_cloud_recording_if_missing( + meeting_id=meeting.id, + s3_key="bucket/path/should-not-overwrite", + duration=200, + ) + + assert success2 is False + + final = await meetings_controller.get_by_id(meeting.id) + assert final.daily_composed_video_s3_key == "first-s3-key" + assert final.daily_composed_video_duration == 100 diff --git a/www/app/(app)/transcripts/[transcriptId]/finalSummary.tsx b/www/app/(app)/transcripts/[transcriptId]/finalSummary.tsx index d7ba37dc..500c4cc5 100644 --- a/www/app/(app)/transcripts/[transcriptId]/finalSummary.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/finalSummary.tsx @@ -3,7 +3,8 @@ import React from "react"; import Markdown from "react-markdown"; import "../../../styles/markdown.css"; import type { components } from "../../../reflector-api"; -type GetTranscript = components["schemas"]["GetTranscript"]; +type GetTranscriptWithParticipants = + components["schemas"]["GetTranscriptWithParticipants"]; type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"]; import { useTranscriptUpdate } from "../../../lib/apiHooks"; import { @@ -18,7 +19,7 @@ import { LuPen } from "react-icons/lu"; import { useError } from "../../../(errors)/errorContext"; type FinalSummaryProps = { - transcript: GetTranscript; + transcript: GetTranscriptWithParticipants; topics: GetTranscriptTopic[]; onUpdate: (newSummary: string) => void; finalSummaryRef: React.Dispatch>; diff --git a/www/app/(app)/transcripts/createTranscript.ts b/www/app/(app)/transcripts/createTranscript.ts index 8a235161..0991130f 100644 --- a/www/app/(app)/transcripts/createTranscript.ts +++ b/www/app/(app)/transcripts/createTranscript.ts @@ -2,10 +2,11 @@ import type { components } from "../../reflector-api"; import { useTranscriptCreate } from "../../lib/apiHooks"; type CreateTranscript = components["schemas"]["CreateTranscript"]; -type GetTranscript = components["schemas"]["GetTranscript"]; +type GetTranscriptWithParticipants = + components["schemas"]["GetTranscriptWithParticipants"]; type UseCreateTranscript = { - transcript: GetTranscript | null; + transcript: GetTranscriptWithParticipants | null; loading: boolean; error: Error | null; create: (transcriptCreationDetails: CreateTranscript) => Promise; diff --git a/www/app/(app)/transcripts/shareAndPrivacy.tsx b/www/app/(app)/transcripts/shareAndPrivacy.tsx index 04cda920..207d900f 100644 --- a/www/app/(app)/transcripts/shareAndPrivacy.tsx +++ b/www/app/(app)/transcripts/shareAndPrivacy.tsx @@ -2,7 +2,8 @@ import { useEffect, useState } from "react"; import { ShareMode, toShareMode } from "../../lib/shareMode"; import type { components } from "../../reflector-api"; -type GetTranscript = components["schemas"]["GetTranscript"]; +type GetTranscriptWithParticipants = + components["schemas"]["GetTranscriptWithParticipants"]; type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"]; type UpdateTranscript = components["schemas"]["UpdateTranscript"]; import { @@ -27,7 +28,7 @@ import { featureEnabled } from "../../lib/features"; type ShareAndPrivacyProps = { finalSummaryElement: HTMLDivElement | null; - transcript: GetTranscript; + transcript: GetTranscriptWithParticipants; topics: GetTranscriptTopic[]; }; diff --git a/www/app/(app)/transcripts/shareZulip.tsx b/www/app/(app)/transcripts/shareZulip.tsx index c3efe3ab..96242de2 100644 --- a/www/app/(app)/transcripts/shareZulip.tsx +++ b/www/app/(app)/transcripts/shareZulip.tsx @@ -1,7 +1,8 @@ import { useState, useEffect, useMemo } from "react"; import type { components } from "../../reflector-api"; -type GetTranscript = components["schemas"]["GetTranscript"]; +type GetTranscriptWithParticipants = + components["schemas"]["GetTranscriptWithParticipants"]; type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"]; import { BoxProps, @@ -26,7 +27,7 @@ import { import { featureEnabled } from "../../lib/features"; type ShareZulipProps = { - transcript: GetTranscript; + transcript: GetTranscriptWithParticipants; topics: GetTranscriptTopic[]; disabled: boolean; }; diff --git a/www/app/(app)/transcripts/transcriptTitle.tsx b/www/app/(app)/transcripts/transcriptTitle.tsx index 49a22c71..ea738673 100644 --- a/www/app/(app)/transcripts/transcriptTitle.tsx +++ b/www/app/(app)/transcripts/transcriptTitle.tsx @@ -2,7 +2,8 @@ import { useState } from "react"; import type { components } from "../../reflector-api"; type UpdateTranscript = components["schemas"]["UpdateTranscript"]; -type GetTranscript = components["schemas"]["GetTranscript"]; +type GetTranscriptWithParticipants = + components["schemas"]["GetTranscriptWithParticipants"]; type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"]; import { useTranscriptUpdate, @@ -20,7 +21,7 @@ type TranscriptTitle = { onUpdate: (newTitle: string) => void; // share props - transcript: GetTranscript | null; + transcript: GetTranscriptWithParticipants | null; topics: GetTranscriptTopic[] | null; finalSummaryElement: HTMLDivElement | null; }; diff --git a/www/app/[roomName]/components/DailyRoom.tsx b/www/app/[roomName]/components/DailyRoom.tsx index 44fa6315..d1c00254 100644 --- a/www/app/[roomName]/components/DailyRoom.tsx +++ b/www/app/[roomName]/components/DailyRoom.tsx @@ -22,14 +22,29 @@ import DailyIframe, { import type { components } from "../../reflector-api"; import { useAuth } from "../../lib/AuthProvider"; import { useConsentDialog } from "../../lib/consent"; -import { useRoomJoinMeeting } from "../../lib/apiHooks"; +import { + useRoomJoinMeeting, + useMeetingStartRecording, +} from "../../lib/apiHooks"; import { omit } from "remeda"; -import { assertExists } from "../../lib/utils"; -import { assertMeetingId } from "../../lib/types"; +import { + assertExists, + NonEmptyString, + parseNonEmptyString, +} from "../../lib/utils"; +import { assertMeetingId, DailyRecordingType } from "../../lib/types"; +import { useUuidV5 } from "react-uuid-hook"; const CONSENT_BUTTON_ID = "recording-consent"; const RECORDING_INDICATOR_ID = "recording-indicator"; +// Namespace UUID for UUIDv5 generation of raw-tracks instanceIds +// DO NOT CHANGE: Breaks instanceId determinism across deployments +const RAW_TRACKS_NAMESPACE = "a1b2c3d4-e5f6-7890-abcd-ef1234567890"; + +const RECORDING_START_DELAY_MS = 2000; +const RECORDING_START_MAX_RETRIES = 5; + type Meeting = components["schemas"]["Meeting"]; type Room = components["schemas"]["RoomDetails"]; @@ -73,9 +88,7 @@ const useFrame = ( cbs: { onLeftMeeting: () => void; onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void; - onJoinMeeting: ( - startRecording: (args: { type: "raw-tracks" }) => void, - ) => void; + onJoinMeeting: () => void; }, ) => { const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE); @@ -126,7 +139,7 @@ const useFrame = ( console.error("frame is null in joined-meeting callback"); return; } - cbs.onJoinMeeting(frame.startRecording.bind(frame)); + cbs.onJoinMeeting(); }; frame.on("joined-meeting", joinCb); return () => { @@ -173,8 +186,15 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { const authLastUserId = auth.lastUserId; const [container, setContainer] = useState(null); const joinMutation = useRoomJoinMeeting(); + const startRecordingMutation = useMeetingStartRecording(); const [joinedMeeting, setJoinedMeeting] = useState(null); + // Generate deterministic instanceIds so all participants use SAME IDs + const cloudInstanceId = parseNonEmptyString(meeting.id); + const rawTracksInstanceId = parseNonEmptyString( + useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0], + ); + const roomName = params?.roomName as string; const { @@ -228,19 +248,72 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) { ], ); - const handleFrameJoinMeeting = useCallback( - (startRecording: (args: { type: "raw-tracks" }) => void) => { - try { - if (meeting.recording_type === "cloud") { - console.log("Starting cloud recording"); - startRecording({ type: "raw-tracks" }); - } - } catch (error) { - console.error("Failed to start recording:", error); - } - }, - [meeting.recording_type], - ); + const handleFrameJoinMeeting = useCallback(() => { + if (meeting.recording_type === "cloud") { + console.log("Starting dual recording via REST API", { + cloudInstanceId, + rawTracksInstanceId, + }); + + // Start both cloud and raw-tracks via backend REST API (with retry on 404) + // Daily.co needs time to register call as "hosting" for REST API + const startRecordingWithRetry = ( + type: DailyRecordingType, + instanceId: NonEmptyString, + attempt: number = 1, + ) => { + setTimeout(() => { + startRecordingMutation.mutate( + { + params: { + path: { + meeting_id: meeting.id, + }, + }, + body: { + type, + instanceId, + }, + }, + { + onError: (error: any) => { + const errorText = error?.detail || error?.message || ""; + const is404NotHosting = errorText.includes( + "does not seem to be hosting a call", + ); + const isActiveStream = errorText.includes( + "has an active stream", + ); + + if (is404NotHosting && attempt < RECORDING_START_MAX_RETRIES) { + console.log( + `${type}: Call not hosting yet, retry ${attempt + 1}/${RECORDING_START_MAX_RETRIES} in ${RECORDING_START_DELAY_MS}ms...`, + ); + startRecordingWithRetry(type, instanceId, attempt + 1); + } else if (isActiveStream) { + console.log( + `${type}: Recording already active (started by another participant)`, + ); + } else { + console.error(`Failed to start ${type} recording:`, error); + } + }, + }, + ); + }, RECORDING_START_DELAY_MS); + }; + + // Start both recordings + startRecordingWithRetry("cloud", cloudInstanceId); + startRecordingWithRetry("raw-tracks", rawTracksInstanceId); + } + }, [ + meeting.recording_type, + meeting.id, + startRecordingMutation, + cloudInstanceId, + rawTracksInstanceId, + ]); const recordingIconUrl = useMemo( () => new URL("/recording-icon.svg", window.location.origin), diff --git a/www/app/lib/apiHooks.ts b/www/app/lib/apiHooks.ts index a59c31eb..a00eb552 100644 --- a/www/app/lib/apiHooks.ts +++ b/www/app/lib/apiHooks.ts @@ -567,6 +567,20 @@ export function useTranscriptSpeakerMerge() { ); } +export function useMeetingStartRecording() { + const { setError } = useError(); + + return $api.useMutation( + "post", + "/v1/meetings/{meeting_id}/recordings/start", + { + onError: (error) => { + setError(error as Error, "Failed to start recording"); + }, + }, + ); +} + export function useMeetingAudioConsent() { const { setError } = useError(); diff --git a/www/app/lib/transcript.ts b/www/app/lib/transcript.ts index d1fd8b3d..f23a7c38 100644 --- a/www/app/lib/transcript.ts +++ b/www/app/lib/transcript.ts @@ -1,5 +1,6 @@ import { components } from "../reflector-api"; -type ApiTranscriptStatus = components["schemas"]["GetTranscript"]["status"]; +type ApiTranscriptStatus = + components["schemas"]["GetTranscriptWithParticipants"]["status"]; export type TranscriptStatus = ApiTranscriptStatus; diff --git a/www/app/lib/types.ts b/www/app/lib/types.ts index c5ab8ce7..54e2bae1 100644 --- a/www/app/lib/types.ts +++ b/www/app/lib/types.ts @@ -89,3 +89,5 @@ export const assertMeetingId = (s: string): MeetingId => { // just cast for now return nes as MeetingId; }; + +export type DailyRecordingType = "cloud" | "raw-tracks"; diff --git a/www/app/reflector-api.d.ts b/www/app/reflector-api.d.ts index 3704a9a0..12a7085c 100644 --- a/www/app/reflector-api.d.ts +++ b/www/app/reflector-api.d.ts @@ -75,6 +75,31 @@ export interface paths { patch: operations["v1_meeting_deactivate"]; trace?: never; }; + "/v1/meetings/{meeting_id}/recordings/start": { + parameters: { + query?: never; + header?: never; + path?: never; + cookie?: never; + }; + get?: never; + put?: never; + /** + * Start Recording + * @description Start cloud or raw-tracks recording via Daily.co REST API. + * + * Both cloud and raw-tracks are started via REST API to bypass enable_recording limitation of allowing only 1 recording at a time. + * Uses different instanceIds for cloud vs raw-tracks (same won't work) + * + * Note: No authentication required - anonymous users supported. TODO this is a DOS vector + */ + post: operations["v1_start_recording"]; + delete?: never; + options?: never; + head?: never; + patch?: never; + trace?: never; + }; "/v1/rooms": { parameters: { query?: never; @@ -1544,6 +1569,10 @@ export interface components { * @enum {string} */ platform: "whereby" | "daily"; + /** Daily Composed Video S3 Key */ + daily_composed_video_s3_key?: string | null; + /** Daily Composed Video Duration */ + daily_composed_video_duration?: number | null; }; /** MeetingConsentRequest */ MeetingConsentRequest: { @@ -1818,6 +1847,19 @@ export interface components { /** Words */ words: components["schemas"]["Word"][]; }; + /** StartRecordingRequest */ + StartRecordingRequest: { + /** + * Type + * @enum {string} + */ + type: "cloud" | "raw-tracks"; + /** + * Instanceid + * Format: uuid + */ + instanceId: string; + }; /** Stream */ Stream: { /** Stream Id */ @@ -2126,6 +2168,43 @@ export interface operations { }; }; }; + v1_start_recording: { + parameters: { + query?: never; + header?: never; + path: { + meeting_id: string; + }; + cookie?: never; + }; + requestBody: { + content: { + "application/json": components["schemas"]["StartRecordingRequest"]; + }; + }; + responses: { + /** @description Successful Response */ + 200: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": { + [key: string]: unknown; + }; + }; + }; + /** @description Validation Error */ + 422: { + headers: { + [name: string]: unknown; + }; + content: { + "application/json": components["schemas"]["HTTPValidationError"]; + }; + }; + }; + }; v1_rooms_list: { parameters: { query?: { diff --git a/www/package.json b/www/package.json index 13895a3a..ceefbf55 100644 --- a/www/package.json +++ b/www/package.json @@ -46,6 +46,7 @@ "react-markdown": "^9.0.0", "react-qr-code": "^2.0.12", "react-select-search": "^4.1.7", + "react-uuid-hook": "^0.0.6", "redlock": "5.0.0-beta.2", "remeda": "^2.31.1", "sass": "^1.63.6", diff --git a/www/pnpm-lock.yaml b/www/pnpm-lock.yaml index 4cc219ea..cd65de55 100644 --- a/www/pnpm-lock.yaml +++ b/www/pnpm-lock.yaml @@ -106,6 +106,9 @@ importers: react-select-search: specifier: ^4.1.7 version: 4.1.8(prop-types@15.8.1)(react-dom@18.3.1(react@18.3.1))(react@18.3.1) + react-uuid-hook: + specifier: ^0.0.6 + version: 0.0.6(react@18.3.1) redlock: specifier: 5.0.0-beta.2 version: 5.0.0-beta.2 @@ -7628,6 +7631,14 @@ packages: "@types/react": optional: true + react-uuid-hook@0.0.6: + resolution: + { + integrity: sha512-u9+EvFbqpWfLE/ReYFry0vYu1BAg1fY9ekr0XLSDNnfWyrnVFytpurwz5qYsIB0psevuvrpZHIcvu7AjUwqinA==, + } + peerDependencies: + react: ">=16.8.0" + react@18.3.1: resolution: { @@ -8771,6 +8782,13 @@ packages: integrity: sha512-Fykw5U4eZESbq739BeLvEBFRuJODfrlmjx5eJux7W817LjRaq4b7/i4t2zxQmhcX+fAj4nMfRdTzO4tmwLKn0w==, } + uuid@13.0.0: + resolution: + { + integrity: sha512-XQegIaBTVUjSHliKqcnFqYypAd4S+WCYt5NIeRs6w/UAry7z8Y9j5ZwRRL4kzq9U3sD6v+85er9FvkEaBpji2w==, + } + hasBin: true + uuid@8.3.2: resolution: { @@ -14570,6 +14588,11 @@ snapshots: optionalDependencies: "@types/react": 18.2.20 + react-uuid-hook@0.0.6(react@18.3.1): + dependencies: + react: 18.3.1 + uuid: 13.0.0 + react@18.3.1: dependencies: loose-envify: 1.4.0 @@ -15401,6 +15424,8 @@ snapshots: uuid-validate@0.0.3: {} + uuid@13.0.0: {} + uuid@8.3.2: {} uuid@9.0.1: {} From 5d2646147752ec1084b5233896aa96ee93e7f7dd Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Fri, 23 Jan 2026 13:58:33 -0500 Subject: [PATCH 7/8] chore(main): release 0.30.0 (#832) --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 544aaff9..fefb45c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [0.30.0](https://github.com/Monadical-SAS/reflector/compare/v0.29.0...v0.30.0) (2026-01-23) + + +### Features + +* brady bunch ([#816](https://github.com/Monadical-SAS/reflector/issues/816)) ([6c175a1](https://github.com/Monadical-SAS/reflector/commit/6c175a11d8a3745095bfad06a4ad3ccdfd278433)) + ## [0.29.0](https://github.com/Monadical-SAS/reflector/compare/v0.28.1...v0.29.0) (2026-01-21) From fc3ef6c8933231c731fad84e7477a476a6220a5e Mon Sep 17 00:00:00 2001 From: Igor Monadical Date: Fri, 23 Jan 2026 15:51:18 -0500 Subject: [PATCH 8/8] feat: mixdown optional (#834) * optional mixdown * optional mixdown --------- Co-authored-by: Igor Loskutov --- .../workflows/daily_multitrack_pipeline.py | 2 +- www/app/(app)/rooms/page.tsx | 4 +- .../[transcriptId]/correct/page.tsx | 4 +- .../(app)/transcripts/[transcriptId]/page.tsx | 5 +- .../[transcriptId]/processing/page.tsx | 3 +- .../[transcriptId]/record/page.tsx | 8 ++- .../[transcriptId]/upload/page.tsx | 8 ++- www/app/(app)/transcripts/transcriptTitle.tsx | 3 +- www/app/(app)/transcripts/useMp3.ts | 3 +- www/app/(app)/transcripts/useParticipants.ts | 3 +- .../(app)/transcripts/useTopicWithWords.ts | 3 +- www/app/(app)/transcripts/useTopics.ts | 7 ++- www/app/(app)/transcripts/useWaveform.ts | 3 +- www/app/(app)/transcripts/useWebSockets.ts | 43 ++++++--------- www/app/[roomName]/MeetingSelection.tsx | 10 +++- www/app/lib/apiHooks.ts | 55 ++++++++++++++++--- 16 files changed, 108 insertions(+), 56 deletions(-) diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index 2d1ab194..3fd64c2c 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -1095,7 +1095,7 @@ async def identify_action_items( @daily_multitrack_pipeline.task( - parents=[generate_waveform, generate_title, generate_recap, identify_action_items], + parents=[generate_title, generate_recap, identify_action_items], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3, ) diff --git a/www/app/(app)/rooms/page.tsx b/www/app/(app)/rooms/page.tsx index f542e8e8..e5349bab 100644 --- a/www/app/(app)/rooms/page.tsx +++ b/www/app/(app)/rooms/page.tsx @@ -302,10 +302,10 @@ export default function RoomsList() { return; } - const platform: "whereby" | "daily" | null = + const platform: "whereby" | "daily" = room.platform === "whereby" || room.platform === "daily" ? room.platform - : null; + : "daily"; const roomData = { name: room.name, diff --git a/www/app/(app)/transcripts/[transcriptId]/correct/page.tsx b/www/app/(app)/transcripts/[transcriptId]/correct/page.tsx index c4d5a9fc..10ea2f82 100644 --- a/www/app/(app)/transcripts/[transcriptId]/correct/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/correct/page.tsx @@ -16,6 +16,7 @@ import { import { useError } from "../../../../(errors)/errorContext"; import { useRouter } from "next/navigation"; import { Box, Grid } from "@chakra-ui/react"; +import { parseNonEmptyString } from "../../../../lib/utils"; export type TranscriptCorrect = { params: Promise<{ @@ -25,8 +26,7 @@ export type TranscriptCorrect = { export default function TranscriptCorrect(props: TranscriptCorrect) { const params = use(props.params); - - const { transcriptId } = params; + const transcriptId = parseNonEmptyString(params.transcriptId); const updateTranscriptMutation = useTranscriptUpdate(); const transcript = useTranscriptGet(transcriptId); diff --git a/www/app/(app)/transcripts/[transcriptId]/page.tsx b/www/app/(app)/transcripts/[transcriptId]/page.tsx index ead2d259..523f8072 100644 --- a/www/app/(app)/transcripts/[transcriptId]/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/page.tsx @@ -9,7 +9,9 @@ import React, { useEffect, useState, use } from "react"; import FinalSummary from "./finalSummary"; import TranscriptTitle from "../transcriptTitle"; import Player from "../player"; +import { useWebSockets } from "../useWebSockets"; import { useRouter } from "next/navigation"; +import { parseNonEmptyString } from "../../../lib/utils"; import { Box, Flex, @@ -30,7 +32,7 @@ type TranscriptDetails = { export default function TranscriptDetails(details: TranscriptDetails) { const params = use(details.params); - const transcriptId = params.transcriptId; + const transcriptId = parseNonEmptyString(params.transcriptId); const router = useRouter(); const statusToRedirect = [ "idle", @@ -49,6 +51,7 @@ export default function TranscriptDetails(details: TranscriptDetails) { transcriptId, waiting || mp3.audioDeleted === true, ); + useWebSockets(transcriptId); const useActiveTopic = useState(null); const [finalSummaryElement, setFinalSummaryElement] = useState(null); diff --git a/www/app/(app)/transcripts/[transcriptId]/processing/page.tsx b/www/app/(app)/transcripts/[transcriptId]/processing/page.tsx index 4422e077..0b7affaf 100644 --- a/www/app/(app)/transcripts/[transcriptId]/processing/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/processing/page.tsx @@ -10,6 +10,7 @@ import { } from "@chakra-ui/react"; import { useRouter } from "next/navigation"; import { useTranscriptGet } from "../../../../lib/apiHooks"; +import { parseNonEmptyString } from "../../../../lib/utils"; type TranscriptProcessing = { params: Promise<{ @@ -19,7 +20,7 @@ type TranscriptProcessing = { export default function TranscriptProcessing(details: TranscriptProcessing) { const params = use(details.params); - const transcriptId = params.transcriptId; + const transcriptId = parseNonEmptyString(params.transcriptId); const router = useRouter(); const transcript = useTranscriptGet(transcriptId); diff --git a/www/app/(app)/transcripts/[transcriptId]/record/page.tsx b/www/app/(app)/transcripts/[transcriptId]/record/page.tsx index d93b34b6..cc6fbbc0 100644 --- a/www/app/(app)/transcripts/[transcriptId]/record/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/record/page.tsx @@ -12,6 +12,7 @@ import { Box, Text, Grid, Heading, VStack, Flex } from "@chakra-ui/react"; import LiveTrancription from "../../liveTranscription"; import { useTranscriptGet } from "../../../../lib/apiHooks"; import { TranscriptStatus } from "../../../../lib/transcript"; +import { parseNonEmptyString } from "../../../../lib/utils"; type TranscriptDetails = { params: Promise<{ @@ -21,13 +22,14 @@ type TranscriptDetails = { const TranscriptRecord = (details: TranscriptDetails) => { const params = use(details.params); - const transcript = useTranscriptGet(params.transcriptId); + const transcriptId = parseNonEmptyString(params.transcriptId); + const transcript = useTranscriptGet(transcriptId); const [transcriptStarted, setTranscriptStarted] = useState(false); const useActiveTopic = useState(null); - const webSockets = useWebSockets(params.transcriptId); + const webSockets = useWebSockets(transcriptId); - const mp3 = useMp3(params.transcriptId, true); + const mp3 = useMp3(transcriptId, true); const router = useRouter(); diff --git a/www/app/(app)/transcripts/[transcriptId]/upload/page.tsx b/www/app/(app)/transcripts/[transcriptId]/upload/page.tsx index 9fc6a687..76722d6f 100644 --- a/www/app/(app)/transcripts/[transcriptId]/upload/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/upload/page.tsx @@ -7,6 +7,7 @@ import useMp3 from "../../useMp3"; import { Center, VStack, Text, Heading } from "@chakra-ui/react"; import FileUploadButton from "../../fileUploadButton"; import { useTranscriptGet } from "../../../../lib/apiHooks"; +import { parseNonEmptyString } from "../../../../lib/utils"; type TranscriptUpload = { params: Promise<{ @@ -16,12 +17,13 @@ type TranscriptUpload = { const TranscriptUpload = (details: TranscriptUpload) => { const params = use(details.params); - const transcript = useTranscriptGet(params.transcriptId); + const transcriptId = parseNonEmptyString(params.transcriptId); + const transcript = useTranscriptGet(transcriptId); const [transcriptStarted, setTranscriptStarted] = useState(false); - const webSockets = useWebSockets(params.transcriptId); + const webSockets = useWebSockets(transcriptId); - const mp3 = useMp3(params.transcriptId, true); + const mp3 = useMp3(transcriptId, true); const router = useRouter(); diff --git a/www/app/(app)/transcripts/transcriptTitle.tsx b/www/app/(app)/transcripts/transcriptTitle.tsx index ea738673..9eb6f375 100644 --- a/www/app/(app)/transcripts/transcriptTitle.tsx +++ b/www/app/(app)/transcripts/transcriptTitle.tsx @@ -1,5 +1,6 @@ import { useState } from "react"; import type { components } from "../../reflector-api"; +import { parseMaybeNonEmptyString } from "../../lib/utils"; type UpdateTranscript = components["schemas"]["UpdateTranscript"]; type GetTranscriptWithParticipants = @@ -32,7 +33,7 @@ const TranscriptTitle = (props: TranscriptTitle) => { const [isEditing, setIsEditing] = useState(false); const updateTranscriptMutation = useTranscriptUpdate(); const participantsQuery = useTranscriptParticipants( - props.transcript?.id || null, + props.transcript?.id ? parseMaybeNonEmptyString(props.transcript.id) : null, ); const updateTitle = async (newTitle: string, transcriptId: string) => { diff --git a/www/app/(app)/transcripts/useMp3.ts b/www/app/(app)/transcripts/useMp3.ts index cc0635ec..cfeafb90 100644 --- a/www/app/(app)/transcripts/useMp3.ts +++ b/www/app/(app)/transcripts/useMp3.ts @@ -1,5 +1,6 @@ import { useEffect, useState } from "react"; import { useTranscriptGet } from "../../lib/apiHooks"; +import { parseMaybeNonEmptyString } from "../../lib/utils"; import { useAuth } from "../../lib/AuthProvider"; import { API_URL } from "../../lib/apiClient"; @@ -27,7 +28,7 @@ const useMp3 = (transcriptId: string, waiting?: boolean): Mp3Response => { data: transcript, isLoading: transcriptMetadataLoading, error: transcriptError, - } = useTranscriptGet(later ? null : transcriptId); + } = useTranscriptGet(later ? null : parseMaybeNonEmptyString(transcriptId)); const [serviceWorker, setServiceWorker] = useState(null); diff --git a/www/app/(app)/transcripts/useParticipants.ts b/www/app/(app)/transcripts/useParticipants.ts index a3674597..0230075b 100644 --- a/www/app/(app)/transcripts/useParticipants.ts +++ b/www/app/(app)/transcripts/useParticipants.ts @@ -1,6 +1,7 @@ import type { components } from "../../reflector-api"; type Participant = components["schemas"]["Participant"]; import { useTranscriptParticipants } from "../../lib/apiHooks"; +import { parseMaybeNonEmptyString } from "../../lib/utils"; type ErrorParticipants = { error: Error; @@ -32,7 +33,7 @@ const useParticipants = (transcriptId: string): UseParticipants => { isLoading: loading, error, refetch, - } = useTranscriptParticipants(transcriptId || null); + } = useTranscriptParticipants(parseMaybeNonEmptyString(transcriptId)); // Type-safe return based on state if (error) { diff --git a/www/app/(app)/transcripts/useTopicWithWords.ts b/www/app/(app)/transcripts/useTopicWithWords.ts index 31e184cc..dcf2dd60 100644 --- a/www/app/(app)/transcripts/useTopicWithWords.ts +++ b/www/app/(app)/transcripts/useTopicWithWords.ts @@ -1,5 +1,6 @@ import type { components } from "../../reflector-api"; import { useTranscriptTopicsWithWordsPerSpeaker } from "../../lib/apiHooks"; +import { parseMaybeNonEmptyString } from "../../lib/utils"; type GetTranscriptTopicWithWordsPerSpeaker = components["schemas"]["GetTranscriptTopicWithWordsPerSpeaker"]; @@ -38,7 +39,7 @@ const useTopicWithWords = ( error, refetch, } = useTranscriptTopicsWithWordsPerSpeaker( - transcriptId || null, + parseMaybeNonEmptyString(transcriptId), topicId || null, ); diff --git a/www/app/(app)/transcripts/useTopics.ts b/www/app/(app)/transcripts/useTopics.ts index 7f337582..faafcf9a 100644 --- a/www/app/(app)/transcripts/useTopics.ts +++ b/www/app/(app)/transcripts/useTopics.ts @@ -1,5 +1,6 @@ import { useTranscriptTopics } from "../../lib/apiHooks"; import type { components } from "../../reflector-api"; +import { parseMaybeNonEmptyString } from "../../lib/utils"; type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"]; @@ -10,7 +11,11 @@ type TranscriptTopics = { }; const useTopics = (id: string): TranscriptTopics => { - const { data: topics, isLoading: loading, error } = useTranscriptTopics(id); + const { + data: topics, + isLoading: loading, + error, + } = useTranscriptTopics(parseMaybeNonEmptyString(id)); return { topics: topics || null, diff --git a/www/app/(app)/transcripts/useWaveform.ts b/www/app/(app)/transcripts/useWaveform.ts index 8bb8c4c9..896aa002 100644 --- a/www/app/(app)/transcripts/useWaveform.ts +++ b/www/app/(app)/transcripts/useWaveform.ts @@ -1,5 +1,6 @@ import type { components } from "../../reflector-api"; import { useTranscriptWaveform } from "../../lib/apiHooks"; +import { parseMaybeNonEmptyString } from "../../lib/utils"; type AudioWaveform = components["schemas"]["AudioWaveform"]; @@ -14,7 +15,7 @@ const useWaveform = (id: string, skip: boolean): AudioWaveFormResponse => { data: waveform, isLoading: loading, error, - } = useTranscriptWaveform(skip ? null : id); + } = useTranscriptWaveform(skip ? null : parseMaybeNonEmptyString(id)); return { waveform: waveform || null, diff --git a/www/app/(app)/transcripts/useWebSockets.ts b/www/app/(app)/transcripts/useWebSockets.ts index ed44577e..47c036b8 100644 --- a/www/app/(app)/transcripts/useWebSockets.ts +++ b/www/app/(app)/transcripts/useWebSockets.ts @@ -7,6 +7,12 @@ type GetTranscriptSegmentTopic = components["schemas"]["GetTranscriptSegmentTopic"]; import { useQueryClient } from "@tanstack/react-query"; import { $api, WEBSOCKET_URL } from "../../lib/apiClient"; +import { + invalidateTranscript, + invalidateTranscriptTopics, + invalidateTranscriptWaveform, +} from "../../lib/apiHooks"; +import { NonEmptyString } from "../../lib/utils"; export type UseWebSockets = { transcriptTextLive: string; @@ -369,15 +375,10 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => { }); console.debug("TOPIC event:", message.data); // Invalidate topics query to sync with WebSocket data - queryClient.invalidateQueries({ - queryKey: $api.queryOptions( - "get", - "/v1/transcripts/{transcript_id}/topics", - { - params: { path: { transcript_id: transcriptId } }, - }, - ).queryKey, - }); + invalidateTranscriptTopics( + queryClient, + transcriptId as NonEmptyString, + ); break; case "FINAL_SHORT_SUMMARY": @@ -388,15 +389,7 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => { if (message.data) { setFinalSummary(message.data); // Invalidate transcript query to sync summary - queryClient.invalidateQueries({ - queryKey: $api.queryOptions( - "get", - "/v1/transcripts/{transcript_id}", - { - params: { path: { transcript_id: transcriptId } }, - }, - ).queryKey, - }); + invalidateTranscript(queryClient, transcriptId as NonEmptyString); } break; @@ -405,15 +398,7 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => { if (message.data) { setTitle(message.data.title); // Invalidate transcript query to sync title - queryClient.invalidateQueries({ - queryKey: $api.queryOptions( - "get", - "/v1/transcripts/{transcript_id}", - { - params: { path: { transcript_id: transcriptId } }, - }, - ).queryKey, - }); + invalidateTranscript(queryClient, transcriptId as NonEmptyString); } break; @@ -424,6 +409,10 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => { ); if (message.data) { setWaveForm(message.data.waveform); + invalidateTranscriptWaveform( + queryClient, + transcriptId as NonEmptyString, + ); } break; case "DURATION": diff --git a/www/app/[roomName]/MeetingSelection.tsx b/www/app/[roomName]/MeetingSelection.tsx index 63743668..e2356810 100644 --- a/www/app/[roomName]/MeetingSelection.tsx +++ b/www/app/[roomName]/MeetingSelection.tsx @@ -26,7 +26,7 @@ import { useRouter } from "next/navigation"; import { formatDateTime, formatStartedAgo } from "../lib/timeUtils"; import MeetingMinimalHeader from "../components/MeetingMinimalHeader"; import { NonEmptyString } from "../lib/utils"; -import { MeetingId } from "../lib/types"; +import { MeetingId, assertMeetingId } from "../lib/types"; type Meeting = components["schemas"]["Meeting"]; @@ -315,7 +315,9 @@ export default function MeetingSelection({ variant="outline" colorScheme="red" size="md" - onClick={() => handleEndMeeting(meeting.id)} + onClick={() => + handleEndMeeting(assertMeetingId(meeting.id)) + } loading={deactivateMeetingMutation.isPending} > @@ -460,7 +462,9 @@ export default function MeetingSelection({ variant="outline" colorScheme="red" size="md" - onClick={() => handleEndMeeting(meeting.id)} + onClick={() => + handleEndMeeting(assertMeetingId(meeting.id)) + } loading={deactivateMeetingMutation.isPending} > diff --git a/www/app/lib/apiHooks.ts b/www/app/lib/apiHooks.ts index a00eb552..788dfac6 100644 --- a/www/app/lib/apiHooks.ts +++ b/www/app/lib/apiHooks.ts @@ -6,6 +6,7 @@ import { QueryClient, useQueryClient } from "@tanstack/react-query"; import type { components } from "../reflector-api"; import { useAuth } from "./AuthProvider"; import { MeetingId } from "./types"; +import { NonEmptyString } from "./utils"; /* * XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other @@ -103,7 +104,7 @@ export function useTranscriptProcess() { }); } -export function useTranscriptGet(transcriptId: string | null) { +export function useTranscriptGet(transcriptId: NonEmptyString | null) { return $api.useQuery( "get", "/v1/transcripts/{transcript_id}", @@ -120,6 +121,16 @@ export function useTranscriptGet(transcriptId: string | null) { ); } +export const invalidateTranscript = ( + queryClient: QueryClient, + transcriptId: NonEmptyString, +) => + queryClient.invalidateQueries({ + queryKey: $api.queryOptions("get", "/v1/transcripts/{transcript_id}", { + params: { path: { transcript_id: transcriptId } }, + }).queryKey, + }); + export function useRoomGet(roomId: string | null) { const { isAuthenticated } = useAuthReady(); @@ -297,7 +308,7 @@ export function useTranscriptUploadAudio() { ); } -export function useTranscriptWaveform(transcriptId: string | null) { +export function useTranscriptWaveform(transcriptId: NonEmptyString | null) { return $api.useQuery( "get", "/v1/transcripts/{transcript_id}/audio/waveform", @@ -312,7 +323,21 @@ export function useTranscriptWaveform(transcriptId: string | null) { ); } -export function useTranscriptMP3(transcriptId: string | null) { +export const invalidateTranscriptWaveform = ( + queryClient: QueryClient, + transcriptId: NonEmptyString, +) => + queryClient.invalidateQueries({ + queryKey: $api.queryOptions( + "get", + "/v1/transcripts/{transcript_id}/audio/waveform", + { + params: { path: { transcript_id: transcriptId } }, + }, + ).queryKey, + }); + +export function useTranscriptMP3(transcriptId: NonEmptyString | null) { const { isAuthenticated } = useAuthReady(); return $api.useQuery( @@ -329,7 +354,7 @@ export function useTranscriptMP3(transcriptId: string | null) { ); } -export function useTranscriptTopics(transcriptId: string | null) { +export function useTranscriptTopics(transcriptId: NonEmptyString | null) { return $api.useQuery( "get", "/v1/transcripts/{transcript_id}/topics", @@ -344,7 +369,23 @@ export function useTranscriptTopics(transcriptId: string | null) { ); } -export function useTranscriptTopicsWithWords(transcriptId: string | null) { +export const invalidateTranscriptTopics = ( + queryClient: QueryClient, + transcriptId: NonEmptyString, +) => + queryClient.invalidateQueries({ + queryKey: $api.queryOptions( + "get", + "/v1/transcripts/{transcript_id}/topics", + { + params: { path: { transcript_id: transcriptId } }, + }, + ).queryKey, + }); + +export function useTranscriptTopicsWithWords( + transcriptId: NonEmptyString | null, +) { const { isAuthenticated } = useAuthReady(); return $api.useQuery( @@ -362,7 +403,7 @@ export function useTranscriptTopicsWithWords(transcriptId: string | null) { } export function useTranscriptTopicsWithWordsPerSpeaker( - transcriptId: string | null, + transcriptId: NonEmptyString | null, topicId: string | null, ) { const { isAuthenticated } = useAuthReady(); @@ -384,7 +425,7 @@ export function useTranscriptTopicsWithWordsPerSpeaker( ); } -export function useTranscriptParticipants(transcriptId: string | null) { +export function useTranscriptParticipants(transcriptId: NonEmptyString | null) { const { isAuthenticated } = useAuthReady(); return $api.useQuery(