fix: participants update from daily (#749)

* Fix participants update from daily

* Use track keys from params
This commit is contained in:
2025-11-27 16:53:26 +01:00
committed by GitHub
parent 692895c859
commit 7f0b728991
3 changed files with 98 additions and 84 deletions

View File

@@ -12,7 +12,7 @@ from celery import shared_task
from celery.utils.log import get_task_logger
from pydantic import ValidationError
from reflector.dailyco_api import MeetingParticipantsResponse, RecordingResponse
from reflector.dailyco_api import RecordingResponse
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
@@ -22,7 +22,6 @@ from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import (
SourceKind,
TranscriptParticipant,
transcripts_controller,
)
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
@@ -40,7 +39,6 @@ from reflector.utils.daily import (
DailyRoomName,
extract_base_room_name,
filter_cam_audio_tracks,
parse_daily_recording_filename,
recording_lock_key,
)
from reflector.video_platforms.factory import create_platform_client
@@ -275,15 +273,7 @@ async def _process_multitrack_recording_inner(
# else: Recording already exists; metadata set at creation time
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if transcript:
await transcripts_controller.update(
transcript,
{
"topics": [],
"participants": [],
},
)
else:
if not transcript:
transcript = await transcripts_controller.add(
"",
source_kind=SourceKind.ROOM,
@@ -296,77 +286,6 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
try:
async with create_platform_client("daily") as daily_client:
id_to_name = {}
id_to_user_id = {}
try:
rec_details = await daily_client.get_recording(recording_id)
mtg_session_id = rec_details.mtgSessionId
if mtg_session_id:
try:
payload: MeetingParticipantsResponse = (
await daily_client.get_meeting_participants(mtg_session_id)
)
for p in payload.data:
pid = p.participant_id
assert (
pid is not None
), "panic! participant id cannot be None"
name = p.user_name
user_id = p.user_id
if name:
id_to_name[pid] = name
if user_id:
id_to_user_id[pid] = user_id
except Exception as e:
logger.warning(
"Failed to fetch Daily meeting participants",
error=str(e),
mtg_session_id=mtg_session_id,
exc_info=True,
)
else:
logger.warning(
"No mtgSessionId found for recording; participant names may be generic",
recording_id=recording_id,
)
except Exception as e:
logger.warning(
"Failed to fetch Daily recording details",
error=str(e),
recording_id=recording_id,
exc_info=True,
)
cam_audio_keys = filter_cam_audio_tracks(track_keys)
for idx, key in enumerate(cam_audio_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
exc_info=True,
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
)
await transcripts_controller.upsert_participant(transcript, participant)
except Exception as e:
logger.warning("Failed to map participant names", error=str(e), exc_info=True)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,