diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index b92ab49b..acc15f6b 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -392,6 +392,38 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe parse_livekit_track_filepath, # noqa: PLC0415 ) + # Look up identity → Reflector user_id mapping from Redis + # (stored at join time in rooms.py) + identity_to_user_id: dict[str, str] = {} + try: + from reflector.db.meetings import ( + meetings_controller as mc, # noqa: PLC0415 + ) + from reflector.redis_cache import get_redis # noqa: PLC0415 + + meeting = ( + await mc.get_by_id(transcript.meeting_id) + if transcript.meeting_id + else None + ) + if meeting: + redis = await get_redis() + mapping_key = f"livekit:participant_map:{meeting.room_name}" + raw_map = await redis.hgetall(mapping_key) + identity_to_user_id = { + k.decode() if isinstance(k, bytes) else k: v.decode() + if isinstance(v, bytes) + else v + for k, v in raw_map.items() + } + ctx.log( + f"get_participants: loaded {len(identity_to_user_id)} identity→user_id mappings from Redis" + ) + except Exception as e: + ctx.log( + f"get_participants: could not load identity map from Redis: {e}" + ) + for idx, track in enumerate(input.tracks): identity = track.get("participant_identity") if not identity: @@ -401,17 +433,25 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe identity = parsed.participant_identity except (ValueError, KeyError): identity = f"speaker-{idx}" + + # Strip the uuid suffix from identity for display name + # e.g., "Juan-2bcea0" → "Juan" + display_name = ( + identity.rsplit("-", 1)[0] if "-" in identity else identity + ) + reflector_user_id = identity_to_user_id.get(identity) + participant = TranscriptParticipant( id=identity, speaker=idx, - name=identity, - user_id=identity if not identity.startswith("anon-") else None, + name=display_name, + user_id=reflector_user_id, ) await transcripts_controller.upsert_participant(transcript, participant) participants_list.append( ParticipantInfo( participant_id=identity, - user_name=identity, + user_name=display_name, speaker=idx, ) ) @@ -705,13 +745,31 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: # else: modal backend already uploaded to output_url async with fresh_db_connection(): - from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 + from reflector.db.transcripts import ( # noqa: PLC0415 + TranscriptDuration, + transcripts_controller, + ) transcript = await transcripts_controller.get_by_id(input.transcript_id) if transcript: - await transcripts_controller.update( - transcript, {"audio_location": "storage"} - ) + update_data = {"audio_location": "storage"} + # Set duration from mixdown if not already set (LiveKit: duration starts at 0) + if not transcript.duration or transcript.duration == 0: + update_data["duration"] = result.duration_ms + await transcripts_controller.update(transcript, update_data) + + # Broadcast duration update if it was missing + if not transcript.duration or transcript.duration == 0: + await append_event_and_broadcast( + input.transcript_id, + transcript, + "DURATION", + TranscriptDuration(duration=result.duration_ms), + logger=logger, + ) + ctx.log( + f"mixdown_tracks: set duration={result.duration_ms}ms from mixdown" + ) ctx.log(f"mixdown_tracks complete: {result.size} bytes to {storage_path}") diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index 5ff90c97..166a28a1 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -621,6 +621,17 @@ async def rooms_join_meeting( else: participant_identity = f"anon-{uid_suffix}" participant_name = display_name or participant_identity + + # Store identity → Reflector user_id mapping for the pipeline + # (so TranscriptParticipant.user_id can be set correctly) + if user_id: + from reflector.redis_cache import get_redis # noqa: PLC0415 + + redis = await get_redis() + mapping_key = f"livekit:participant_map:{meeting.room_name}" + await redis.hset(mapping_key, participant_identity, user_id) + await redis.expire(mapping_key, 7 * 86400) # 7 day TTL + token = client.create_access_token( room_name=meeting.room_name, participant_identity=participant_identity,