fix: reflector user id on participants and duration fix

This commit is contained in:
Juan
2026-04-06 09:43:45 -05:00
parent d2ec7a0b24
commit 379473e94a
2 changed files with 76 additions and 7 deletions

View File

@@ -392,6 +392,38 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
parse_livekit_track_filepath, # noqa: PLC0415
)
# Look up identity → Reflector user_id mapping from Redis
# (stored at join time in rooms.py)
identity_to_user_id: dict[str, str] = {}
try:
from reflector.db.meetings import (
meetings_controller as mc, # noqa: PLC0415
)
from reflector.redis_cache import get_redis # noqa: PLC0415
meeting = (
await mc.get_by_id(transcript.meeting_id)
if transcript.meeting_id
else None
)
if meeting:
redis = await get_redis()
mapping_key = f"livekit:participant_map:{meeting.room_name}"
raw_map = await redis.hgetall(mapping_key)
identity_to_user_id = {
k.decode() if isinstance(k, bytes) else k: v.decode()
if isinstance(v, bytes)
else v
for k, v in raw_map.items()
}
ctx.log(
f"get_participants: loaded {len(identity_to_user_id)} identity→user_id mappings from Redis"
)
except Exception as e:
ctx.log(
f"get_participants: could not load identity map from Redis: {e}"
)
for idx, track in enumerate(input.tracks):
identity = track.get("participant_identity")
if not identity:
@@ -401,17 +433,25 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
identity = parsed.participant_identity
except (ValueError, KeyError):
identity = f"speaker-{idx}"
# Strip the uuid suffix from identity for display name
# e.g., "Juan-2bcea0" → "Juan"
display_name = (
identity.rsplit("-", 1)[0] if "-" in identity else identity
)
reflector_user_id = identity_to_user_id.get(identity)
participant = TranscriptParticipant(
id=identity,
speaker=idx,
name=identity,
user_id=identity if not identity.startswith("anon-") else None,
name=display_name,
user_id=reflector_user_id,
)
await transcripts_controller.upsert_participant(transcript, participant)
participants_list.append(
ParticipantInfo(
participant_id=identity,
user_name=identity,
user_name=display_name,
speaker=idx,
)
)
@@ -705,13 +745,31 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
# else: modal backend already uploaded to output_url
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
transcripts_controller,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
await transcripts_controller.update(
transcript, {"audio_location": "storage"}
)
update_data = {"audio_location": "storage"}
# Set duration from mixdown if not already set (LiveKit: duration starts at 0)
if not transcript.duration or transcript.duration == 0:
update_data["duration"] = result.duration_ms
await transcripts_controller.update(transcript, update_data)
# Broadcast duration update if it was missing
if not transcript.duration or transcript.duration == 0:
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
TranscriptDuration(duration=result.duration_ms),
logger=logger,
)
ctx.log(
f"mixdown_tracks: set duration={result.duration_ms}ms from mixdown"
)
ctx.log(f"mixdown_tracks complete: {result.size} bytes to {storage_path}")

View File

@@ -621,6 +621,17 @@ async def rooms_join_meeting(
else:
participant_identity = f"anon-{uid_suffix}"
participant_name = display_name or participant_identity
# Store identity → Reflector user_id mapping for the pipeline
# (so TranscriptParticipant.user_id can be set correctly)
if user_id:
from reflector.redis_cache import get_redis # noqa: PLC0415
redis = await get_redis()
mapping_key = f"livekit:participant_map:{meeting.room_name}"
await redis.hset(mapping_key, participant_identity, user_id)
await redis.expire(mapping_key, 7 * 86400) # 7 day TTL
token = client.create_access_token(
room_name=meeting.room_name,
participant_identity=participant_identity,