feat: Multitrack segmentation (#747)

* segmentation multitrack (no-mistakes)

* segmentation multitrack (no-mistakes)

* self review

* self review

* recording poll daily doc

* filter cam_audio tracks to remove screensharing from daily processing

* pr review

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
This commit is contained in:
Igor Monadical
2025-11-26 16:21:32 -05:00
committed by GitHub
parent 8d696aa775
commit d63040e2fd
8 changed files with 485 additions and 81 deletions

View File

@@ -35,8 +35,15 @@ class Recording(BaseModel):
status: Literal["pending", "processing", "completed", "failed"] = "pending"
meeting_id: str | None = None
# for multitrack reprocessing
# track_keys can be empty list [] if recording finished but no audio was captured (silence/muted)
# None means not a multitrack recording, [] means multitrack with no tracks
track_keys: list[str] | None = None
@property
def is_multitrack(self) -> bool:
"""True if recording has separate audio tracks (1+ tracks counts as multitrack)."""
return self.track_keys is not None and len(self.track_keys) > 0
class RecordingController:
async def create(self, recording: Recording):

View File

@@ -1,6 +1,7 @@
import io
import re
import tempfile
from collections import defaultdict
from pathlib import Path
from typing import Annotated, TypedDict
@@ -16,6 +17,17 @@ class DiarizationSegment(TypedDict):
PUNC_RE = re.compile(r"[.;:?!…]")
SENTENCE_END_RE = re.compile(r"[.?!…]$")
# Max segment length for words_to_segments() - breaks on any punctuation (. ; : ? ! …)
# when segment exceeds this limit. Used for non-multitrack recordings.
MAX_SEGMENT_CHARS = 120
# Max segment length for words_to_segments_by_sentence() - only breaks on sentence-ending
# punctuation (. ? ! …) when segment exceeds this limit. Higher threshold allows complete
# sentences in multitrack recordings where speakers overlap.
# similar number to server/reflector/processors/transcript_liner.py
MAX_SENTENCE_SEGMENT_CHARS = 1000
class AudioFile(BaseModel):
@@ -76,7 +88,6 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]:
# but separate if the speaker changes, or if the punctuation is a . , ; : ? !
segments = []
current_segment = None
MAX_SEGMENT_LENGTH = 120
for word in words:
if current_segment is None:
@@ -106,7 +117,7 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]:
current_segment.end = word.end
have_punc = PUNC_RE.search(word.text)
if have_punc and (len(current_segment.text) > MAX_SEGMENT_LENGTH):
if have_punc and (len(current_segment.text) > MAX_SEGMENT_CHARS):
segments.append(current_segment)
current_segment = None
@@ -116,6 +127,70 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]:
return segments
def words_to_segments_by_sentence(words: list[Word]) -> list[TranscriptSegment]:
"""Group words by speaker, then split into sentences.
For multitrack recordings where words from different speakers are interleaved
by timestamp, this function first groups all words by speaker, then creates
segments based on sentence boundaries within each speaker's words.
This produces cleaner output than words_to_segments() which breaks on every
speaker change, resulting in many tiny segments when speakers overlap.
"""
if not words:
return []
# Group words by speaker, preserving order within each speaker
by_speaker: dict[int, list[Word]] = defaultdict(list)
for w in words:
by_speaker[w.speaker].append(w)
segments: list[TranscriptSegment] = []
for speaker, speaker_words in by_speaker.items():
current_text = ""
current_start: float | None = None
current_end: float = 0.0
for word in speaker_words:
if current_start is None:
current_start = word.start
current_text += word.text
current_end = word.end
# Check for sentence end or max length
is_sentence_end = SENTENCE_END_RE.search(word.text.strip())
is_too_long = len(current_text) >= MAX_SENTENCE_SEGMENT_CHARS
if is_sentence_end or is_too_long:
segments.append(
TranscriptSegment(
text=current_text,
start=current_start,
end=current_end,
speaker=speaker,
)
)
current_text = ""
current_start = None
# Flush remaining words for this speaker
if current_text and current_start is not None:
segments.append(
TranscriptSegment(
text=current_text,
start=current_start,
end=current_end,
speaker=speaker,
)
)
# Sort segments by start time
segments.sort(key=lambda s: s.start)
return segments
class Transcript(BaseModel):
translation: str | None = None
words: list[Word] = []
@@ -154,7 +229,9 @@ class Transcript(BaseModel):
word.start += offset
word.end += offset
def as_segments(self) -> list[TranscriptSegment]:
def as_segments(self, is_multitrack: bool = False) -> list[TranscriptSegment]:
if is_multitrack:
return words_to_segments_by_sentence(self.words)
return words_to_segments(self.words)

View File

@@ -64,6 +64,11 @@ def recording_lock_key(recording_id: NonEmptyString) -> NonEmptyString:
return f"recording:{recording_id}"
def filter_cam_audio_tracks(track_keys: list[str]) -> list[str]:
"""Filter track keys to cam-audio tracks only (skip screen-audio, etc.)."""
return [k for k in track_keys if "cam-audio" in k]
def extract_base_room_name(daily_room_name: DailyRoomName) -> NonEmptyString:
"""
Extract base room name from Daily.co timestamped room name.

View File

@@ -6,9 +6,6 @@ from reflector.db.transcripts import TranscriptParticipant, TranscriptTopic
from reflector.processors.types import (
Transcript as ProcessorTranscript,
)
from reflector.processors.types import (
words_to_segments,
)
from reflector.schemas.transcript_formats import TranscriptSegment
from reflector.utils.webvtt import seconds_to_timestamp
@@ -32,7 +29,9 @@ def format_timestamp_mmss(seconds: float | int) -> str:
def transcript_to_text(
topics: list[TranscriptTopic], participants: list[TranscriptParticipant] | None
topics: list[TranscriptTopic],
participants: list[TranscriptParticipant] | None,
is_multitrack: bool = False,
) -> str:
"""Convert transcript topics to plain text with speaker names."""
lines = []
@@ -41,7 +40,7 @@ def transcript_to_text(
continue
transcript = ProcessorTranscript(words=topic.words)
segments = transcript.as_segments()
segments = transcript.as_segments(is_multitrack)
for segment in segments:
speaker_name = get_speaker_name(segment.speaker, participants)
@@ -52,7 +51,9 @@ def transcript_to_text(
def transcript_to_text_timestamped(
topics: list[TranscriptTopic], participants: list[TranscriptParticipant] | None
topics: list[TranscriptTopic],
participants: list[TranscriptParticipant] | None,
is_multitrack: bool = False,
) -> str:
"""Convert transcript topics to timestamped text with speaker names."""
lines = []
@@ -61,7 +62,7 @@ def transcript_to_text_timestamped(
continue
transcript = ProcessorTranscript(words=topic.words)
segments = transcript.as_segments()
segments = transcript.as_segments(is_multitrack)
for segment in segments:
speaker_name = get_speaker_name(segment.speaker, participants)
@@ -73,7 +74,9 @@ def transcript_to_text_timestamped(
def topics_to_webvtt_named(
topics: list[TranscriptTopic], participants: list[TranscriptParticipant] | None
topics: list[TranscriptTopic],
participants: list[TranscriptParticipant] | None,
is_multitrack: bool = False,
) -> str:
"""Convert transcript topics to WebVTT format with participant names."""
vtt = webvtt.WebVTT()
@@ -82,7 +85,8 @@ def topics_to_webvtt_named(
if not topic.words:
continue
segments = words_to_segments(topic.words)
transcript = ProcessorTranscript(words=topic.words)
segments = transcript.as_segments(is_multitrack)
for segment in segments:
speaker_name = get_speaker_name(segment.speaker, participants)
@@ -100,19 +104,23 @@ def topics_to_webvtt_named(
def transcript_to_json_segments(
topics: list[TranscriptTopic], participants: list[TranscriptParticipant] | None
topics: list[TranscriptTopic],
participants: list[TranscriptParticipant] | None,
is_multitrack: bool = False,
) -> list[TranscriptSegment]:
"""Convert transcript topics to a flat list of JSON segments."""
segments = []
result = []
for topic in topics:
if not topic.words:
continue
transcript = ProcessorTranscript(words=topic.words)
for segment in transcript.as_segments():
segments = transcript.as_segments(is_multitrack)
for segment in segments:
speaker_name = get_speaker_name(segment.speaker, participants)
segments.append(
result.append(
TranscriptSegment(
speaker=segment.speaker,
speaker_name=speaker_name,
@@ -122,4 +130,4 @@ def transcript_to_json_segments(
)
)
return segments
return result

View File

@@ -16,6 +16,7 @@ from pydantic import (
import reflector.auth as auth
from reflector.db import get_database
from reflector.db.recordings import recordings_controller
from reflector.db.search import (
DEFAULT_SEARCH_LIMIT,
SearchLimit,
@@ -60,6 +61,14 @@ ALGORITHM = "HS256"
DOWNLOAD_EXPIRE_MINUTES = 60
async def _get_is_multitrack(transcript) -> bool:
"""Detect if transcript is from multitrack recording."""
if not transcript.recording_id:
return False
recording = await recordings_controller.get_by_id(transcript.recording_id)
return recording is not None and recording.is_multitrack
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
@@ -360,7 +369,7 @@ class GetTranscriptTopic(BaseModel):
segments: list[GetTranscriptSegmentTopic] = []
@classmethod
def from_transcript_topic(cls, topic: TranscriptTopic):
def from_transcript_topic(cls, topic: TranscriptTopic, is_multitrack: bool = False):
if not topic.words:
# In previous version, words were missing
# Just output a segment with speaker 0
@@ -384,7 +393,7 @@ class GetTranscriptTopic(BaseModel):
start=segment.start,
speaker=segment.speaker,
)
for segment in transcript.as_segments()
for segment in transcript.as_segments(is_multitrack)
]
return cls(
id=topic.id,
@@ -401,8 +410,8 @@ class GetTranscriptTopicWithWords(GetTranscriptTopic):
words: list[Word] = []
@classmethod
def from_transcript_topic(cls, topic: TranscriptTopic):
instance = super().from_transcript_topic(topic)
def from_transcript_topic(cls, topic: TranscriptTopic, is_multitrack: bool = False):
instance = super().from_transcript_topic(topic, is_multitrack)
if topic.words:
instance.words = topic.words
return instance
@@ -417,8 +426,8 @@ class GetTranscriptTopicWithWordsPerSpeaker(GetTranscriptTopic):
words_per_speaker: list[SpeakerWords] = []
@classmethod
def from_transcript_topic(cls, topic: TranscriptTopic):
instance = super().from_transcript_topic(topic)
def from_transcript_topic(cls, topic: TranscriptTopic, is_multitrack: bool = False):
instance = super().from_transcript_topic(topic, is_multitrack)
if topic.words:
words_per_speakers = []
# group words by speaker
@@ -457,6 +466,8 @@ async def transcript_get(
transcript_id, user_id=user_id
)
is_multitrack = await _get_is_multitrack(transcript)
base_data = {
"id": transcript.id,
"user_id": transcript.user_id,
@@ -483,14 +494,16 @@ async def transcript_get(
return GetTranscriptWithText(
**base_data,
transcript_format="text",
transcript=transcript_to_text(transcript.topics, transcript.participants),
transcript=transcript_to_text(
transcript.topics, transcript.participants, is_multitrack
),
)
elif transcript_format == "text-timestamped":
return GetTranscriptWithTextTimestamped(
**base_data,
transcript_format="text-timestamped",
transcript=transcript_to_text_timestamped(
transcript.topics, transcript.participants
transcript.topics, transcript.participants, is_multitrack
),
)
elif transcript_format == "webvtt-named":
@@ -498,7 +511,7 @@ async def transcript_get(
**base_data,
transcript_format="webvtt-named",
transcript=topics_to_webvtt_named(
transcript.topics, transcript.participants
transcript.topics, transcript.participants, is_multitrack
),
)
elif transcript_format == "json":
@@ -506,7 +519,7 @@ async def transcript_get(
**base_data,
transcript_format="json",
transcript=transcript_to_json_segments(
transcript.topics, transcript.participants
transcript.topics, transcript.participants, is_multitrack
),
)
else:
@@ -565,9 +578,12 @@ async def transcript_get_topics(
transcript_id, user_id=user_id
)
is_multitrack = await _get_is_multitrack(transcript)
# convert to GetTranscriptTopic
return [
GetTranscriptTopic.from_transcript_topic(topic) for topic in transcript.topics
GetTranscriptTopic.from_transcript_topic(topic, is_multitrack)
for topic in transcript.topics
]
@@ -584,9 +600,11 @@ async def transcript_get_topics_with_words(
transcript_id, user_id=user_id
)
is_multitrack = await _get_is_multitrack(transcript)
# convert to GetTranscriptTopicWithWords
return [
GetTranscriptTopicWithWords.from_transcript_topic(topic)
GetTranscriptTopicWithWords.from_transcript_topic(topic, is_multitrack)
for topic in transcript.topics
]
@@ -605,13 +623,17 @@ async def transcript_get_topics_with_words_per_speaker(
transcript_id, user_id=user_id
)
is_multitrack = await _get_is_multitrack(transcript)
# get the topic from the transcript
topic = next((t for t in transcript.topics if t.id == topic_id), None)
if not topic:
raise HTTPException(status_code=404, detail="Topic not found")
# convert to GetTranscriptTopicWithWordsPerSpeaker
return GetTranscriptTopicWithWordsPerSpeaker.from_transcript_topic(topic)
return GetTranscriptTopicWithWordsPerSpeaker.from_transcript_topic(
topic, is_multitrack
)
@router.post("/transcripts/{transcript_id}/zulip")

View File

@@ -2,6 +2,7 @@ import json
import os
import re
from datetime import datetime, timezone
from typing import List
from urllib.parse import unquote
import av
@@ -11,7 +12,7 @@ from celery import shared_task
from celery.utils.log import get_task_logger
from pydantic import ValidationError
from reflector.dailyco_api import MeetingParticipantsResponse
from reflector.dailyco_api import MeetingParticipantsResponse, RecordingResponse
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
@@ -38,6 +39,7 @@ from reflector.storage import get_transcripts_storage
from reflector.utils.daily import (
DailyRoomName,
extract_base_room_name,
filter_cam_audio_tracks,
parse_daily_recording_filename,
recording_lock_key,
)
@@ -338,7 +340,9 @@ async def _process_multitrack_recording_inner(
exc_info=True,
)
for idx, key in enumerate(track_keys):
cam_audio_keys = filter_cam_audio_tracks(track_keys)
for idx, key in enumerate(cam_audio_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
@@ -366,7 +370,7 @@ async def _process_multitrack_recording_inner(
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=track_keys,
track_keys=filter_cam_audio_tracks(track_keys),
)
@@ -391,7 +395,7 @@ async def poll_daily_recordings():
async with create_platform_client("daily") as daily_client:
# latest 100. TODO cursor-based state
api_recordings = await daily_client.list_recordings()
api_recordings: List[RecordingResponse] = await daily_client.list_recordings()
if not api_recordings:
logger.debug(
@@ -422,17 +426,19 @@ async def poll_daily_recordings():
for recording in missing_recordings:
if not recording.tracks:
assert recording.status != "finished", (
f"Recording {recording.id} has status='finished' but no tracks. "
f"Daily.co API guarantees finished recordings have tracks available. "
f"room_name={recording.room_name}"
)
logger.debug(
"No tracks in recording yet",
recording_id=recording.id,
room_name=recording.room_name,
status=recording.status,
)
if recording.status == "finished":
logger.warning(
"Finished recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
else:
logger.debug(
"No tracks in recording yet",
recording_id=recording.id,
room_name=recording.room_name,
status=recording.status,
)
continue
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]