From 7f0b728991c1b9f9aae702c96297eae63b561ef5 Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Thu, 27 Nov 2025 16:53:26 +0100 Subject: [PATCH] fix: participants update from daily (#749) * Fix participants update from daily * Use track keys from params --- server/reflector/dailyco_api/responses.py | 2 +- .../pipelines/main_multitrack_pipeline.py | 95 +++++++++++++++++++ server/reflector/worker/process.py | 85 +---------------- 3 files changed, 98 insertions(+), 84 deletions(-) diff --git a/server/reflector/dailyco_api/responses.py b/server/reflector/dailyco_api/responses.py index 3dc18815..279682ae 100644 --- a/server/reflector/dailyco_api/responses.py +++ b/server/reflector/dailyco_api/responses.py @@ -68,7 +68,7 @@ class MeetingParticipant(BaseModel): Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-participants """ - user_id: NonEmptyString = Field(description="User identifier") + user_id: NonEmptyString | None = Field(None, description="User identifier") participant_id: NonEmptyString = Field(description="Participant session identifier") user_name: NonEmptyString | None = Field(None, description="User display name") join_time: int = Field(description="Join timestamp (Unix epoch seconds)") diff --git a/server/reflector/pipelines/main_multitrack_pipeline.py b/server/reflector/pipelines/main_multitrack_pipeline.py index f91c8250..d202206c 100644 --- a/server/reflector/pipelines/main_multitrack_pipeline.py +++ b/server/reflector/pipelines/main_multitrack_pipeline.py @@ -9,7 +9,10 @@ from av.audio.resampler import AudioResampler from celery import chain, shared_task from reflector.asynctask import asynctask +from reflector.dailyco_api import MeetingParticipantsResponse from reflector.db.transcripts import ( + Transcript, + TranscriptParticipant, TranscriptStatus, TranscriptWaveform, transcripts_controller, @@ -29,7 +32,12 @@ from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.processors.types import TitleSummary from reflector.processors.types import Transcript as TranscriptType from reflector.storage import Storage, get_transcripts_storage +from reflector.utils.daily import ( + filter_cam_audio_tracks, + parse_daily_recording_filename, +) from reflector.utils.string import NonEmptyString +from reflector.video_platforms.factory import create_platform_client # Audio encoding constants OPUS_STANDARD_SAMPLE_RATE = 48000 @@ -494,6 +502,90 @@ class PipelineMainMultitrack(PipelineMainBase): transcript=transcript, event="WAVEFORM", data=waveform ) + async def update_participants_from_daily( + self, transcript: Transcript, track_keys: list[str] + ) -> None: + """Update transcript participants with user_id and names from Daily.co API.""" + if not transcript.recording_id: + return + + try: + async with create_platform_client("daily") as daily_client: + id_to_name = {} + id_to_user_id = {} + + try: + rec_details = await daily_client.get_recording( + transcript.recording_id + ) + mtg_session_id = rec_details.mtgSessionId + if mtg_session_id: + try: + payload: MeetingParticipantsResponse = ( + await daily_client.get_meeting_participants( + mtg_session_id + ) + ) + for p in payload.data: + pid = p.participant_id + name = p.user_name + user_id = p.user_id + if name: + id_to_name[pid] = name + if user_id: + id_to_user_id[pid] = user_id + except Exception as e: + self.logger.warning( + "Failed to fetch Daily meeting participants", + error=str(e), + mtg_session_id=mtg_session_id, + exc_info=True, + ) + else: + self.logger.warning( + "No mtgSessionId found for recording; participant names may be generic", + recording_id=transcript.recording_id, + ) + except Exception as e: + self.logger.warning( + "Failed to fetch Daily recording details", + error=str(e), + recording_id=transcript.recording_id, + exc_info=True, + ) + return + + cam_audio_keys = filter_cam_audio_tracks(track_keys) + + for idx, key in enumerate(cam_audio_keys): + try: + parsed = parse_daily_recording_filename(key) + participant_id = parsed.participant_id + except ValueError as e: + self.logger.error( + "Failed to parse Daily recording filename", + error=str(e), + key=key, + exc_info=True, + ) + continue + + default_name = f"Speaker {idx}" + name = id_to_name.get(participant_id, default_name) + user_id = id_to_user_id.get(participant_id) + + participant = TranscriptParticipant( + id=participant_id, speaker=idx, name=name, user_id=user_id + ) + await transcripts_controller.upsert_participant( + transcript, participant + ) + + except Exception as e: + self.logger.warning( + "Failed to map participant names", error=str(e), exc_info=True + ) + async def process(self, bucket_name: str, track_keys: list[str]): transcript = await self.get_transcript() async with self.transaction(): @@ -502,9 +594,12 @@ class PipelineMainMultitrack(PipelineMainBase): { "events": [], "topics": [], + "participants": [], }, ) + await self.update_participants_from_daily(transcript, track_keys) + source_storage = get_transcripts_storage() transcript_storage = source_storage diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index adf73d15..21e73723 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -12,7 +12,7 @@ from celery import shared_task from celery.utils.log import get_task_logger from pydantic import ValidationError -from reflector.dailyco_api import MeetingParticipantsResponse, RecordingResponse +from reflector.dailyco_api import RecordingResponse from reflector.db.daily_participant_sessions import ( DailyParticipantSession, daily_participant_sessions_controller, @@ -22,7 +22,6 @@ from reflector.db.recordings import Recording, recordings_controller from reflector.db.rooms import rooms_controller from reflector.db.transcripts import ( SourceKind, - TranscriptParticipant, transcripts_controller, ) from reflector.pipelines.main_file_pipeline import task_pipeline_file_process @@ -40,7 +39,6 @@ from reflector.utils.daily import ( DailyRoomName, extract_base_room_name, filter_cam_audio_tracks, - parse_daily_recording_filename, recording_lock_key, ) from reflector.video_platforms.factory import create_platform_client @@ -275,15 +273,7 @@ async def _process_multitrack_recording_inner( # else: Recording already exists; metadata set at creation time transcript = await transcripts_controller.get_by_recording_id(recording.id) - if transcript: - await transcripts_controller.update( - transcript, - { - "topics": [], - "participants": [], - }, - ) - else: + if not transcript: transcript = await transcripts_controller.add( "", source_kind=SourceKind.ROOM, @@ -296,77 +286,6 @@ async def _process_multitrack_recording_inner( room_id=room.id, ) - try: - async with create_platform_client("daily") as daily_client: - id_to_name = {} - id_to_user_id = {} - - try: - rec_details = await daily_client.get_recording(recording_id) - mtg_session_id = rec_details.mtgSessionId - if mtg_session_id: - try: - payload: MeetingParticipantsResponse = ( - await daily_client.get_meeting_participants(mtg_session_id) - ) - for p in payload.data: - pid = p.participant_id - assert ( - pid is not None - ), "panic! participant id cannot be None" - name = p.user_name - user_id = p.user_id - if name: - id_to_name[pid] = name - if user_id: - id_to_user_id[pid] = user_id - except Exception as e: - logger.warning( - "Failed to fetch Daily meeting participants", - error=str(e), - mtg_session_id=mtg_session_id, - exc_info=True, - ) - else: - logger.warning( - "No mtgSessionId found for recording; participant names may be generic", - recording_id=recording_id, - ) - except Exception as e: - logger.warning( - "Failed to fetch Daily recording details", - error=str(e), - recording_id=recording_id, - exc_info=True, - ) - - cam_audio_keys = filter_cam_audio_tracks(track_keys) - - for idx, key in enumerate(cam_audio_keys): - try: - parsed = parse_daily_recording_filename(key) - participant_id = parsed.participant_id - except ValueError as e: - logger.error( - "Failed to parse Daily recording filename", - error=str(e), - key=key, - exc_info=True, - ) - continue - - default_name = f"Speaker {idx}" - name = id_to_name.get(participant_id, default_name) - user_id = id_to_user_id.get(participant_id) - - participant = TranscriptParticipant( - id=participant_id, speaker=idx, name=name, user_id=user_id - ) - await transcripts_controller.upsert_participant(transcript, participant) - - except Exception as e: - logger.warning("Failed to map participant names", error=str(e), exc_info=True) - task_pipeline_multitrack_process.delay( transcript_id=transcript.id, bucket_name=bucket_name,