From d63040e2fdc07e7b272e85a39eb2411cd6a14798 Mon Sep 17 00:00:00 2001 From: Igor Monadical Date: Wed, 26 Nov 2025 16:21:32 -0500 Subject: [PATCH] feat: Multitrack segmentation (#747) * segmentation multitrack (no-mistakes) * segmentation multitrack (no-mistakes) * self review * self review * recording poll daily doc * filter cam_audio tracks to remove screensharing from daily processing * pr review --------- Co-authored-by: Igor Loskutov --- server/reflector/db/recordings.py | 7 + server/reflector/processors/types.py | 83 +++++- server/reflector/utils/daily.py | 5 + server/reflector/utils/transcript_formats.py | 36 ++- server/reflector/views/transcripts.py | 48 ++- server/reflector/worker/process.py | 36 ++- .../test_processor_transcript_segment.py | 75 +++++ server/tests/test_transcript_formats.py | 276 +++++++++++++++--- 8 files changed, 485 insertions(+), 81 deletions(-) diff --git a/server/reflector/db/recordings.py b/server/reflector/db/recordings.py index c67b8413..18fe358b 100644 --- a/server/reflector/db/recordings.py +++ b/server/reflector/db/recordings.py @@ -35,8 +35,15 @@ class Recording(BaseModel): status: Literal["pending", "processing", "completed", "failed"] = "pending" meeting_id: str | None = None # for multitrack reprocessing + # track_keys can be empty list [] if recording finished but no audio was captured (silence/muted) + # None means not a multitrack recording, [] means multitrack with no tracks track_keys: list[str] | None = None + @property + def is_multitrack(self) -> bool: + """True if recording has separate audio tracks (1+ tracks counts as multitrack).""" + return self.track_keys is not None and len(self.track_keys) > 0 + class RecordingController: async def create(self, recording: Recording): diff --git a/server/reflector/processors/types.py b/server/reflector/processors/types.py index 7096e81c..3369e09c 100644 --- a/server/reflector/processors/types.py +++ b/server/reflector/processors/types.py @@ -1,6 +1,7 @@ import io import re import tempfile +from collections import defaultdict from pathlib import Path from typing import Annotated, TypedDict @@ -16,6 +17,17 @@ class DiarizationSegment(TypedDict): PUNC_RE = re.compile(r"[.;:?!…]") +SENTENCE_END_RE = re.compile(r"[.?!…]$") + +# Max segment length for words_to_segments() - breaks on any punctuation (. ; : ? ! …) +# when segment exceeds this limit. Used for non-multitrack recordings. +MAX_SEGMENT_CHARS = 120 + +# Max segment length for words_to_segments_by_sentence() - only breaks on sentence-ending +# punctuation (. ? ! …) when segment exceeds this limit. Higher threshold allows complete +# sentences in multitrack recordings where speakers overlap. +# similar number to server/reflector/processors/transcript_liner.py +MAX_SENTENCE_SEGMENT_CHARS = 1000 class AudioFile(BaseModel): @@ -76,7 +88,6 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]: # but separate if the speaker changes, or if the punctuation is a . , ; : ? ! segments = [] current_segment = None - MAX_SEGMENT_LENGTH = 120 for word in words: if current_segment is None: @@ -106,7 +117,7 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]: current_segment.end = word.end have_punc = PUNC_RE.search(word.text) - if have_punc and (len(current_segment.text) > MAX_SEGMENT_LENGTH): + if have_punc and (len(current_segment.text) > MAX_SEGMENT_CHARS): segments.append(current_segment) current_segment = None @@ -116,6 +127,70 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]: return segments +def words_to_segments_by_sentence(words: list[Word]) -> list[TranscriptSegment]: + """Group words by speaker, then split into sentences. + + For multitrack recordings where words from different speakers are interleaved + by timestamp, this function first groups all words by speaker, then creates + segments based on sentence boundaries within each speaker's words. + + This produces cleaner output than words_to_segments() which breaks on every + speaker change, resulting in many tiny segments when speakers overlap. + """ + if not words: + return [] + + # Group words by speaker, preserving order within each speaker + by_speaker: dict[int, list[Word]] = defaultdict(list) + for w in words: + by_speaker[w.speaker].append(w) + + segments: list[TranscriptSegment] = [] + + for speaker, speaker_words in by_speaker.items(): + current_text = "" + current_start: float | None = None + current_end: float = 0.0 + + for word in speaker_words: + if current_start is None: + current_start = word.start + + current_text += word.text + current_end = word.end + + # Check for sentence end or max length + is_sentence_end = SENTENCE_END_RE.search(word.text.strip()) + is_too_long = len(current_text) >= MAX_SENTENCE_SEGMENT_CHARS + + if is_sentence_end or is_too_long: + segments.append( + TranscriptSegment( + text=current_text, + start=current_start, + end=current_end, + speaker=speaker, + ) + ) + current_text = "" + current_start = None + + # Flush remaining words for this speaker + if current_text and current_start is not None: + segments.append( + TranscriptSegment( + text=current_text, + start=current_start, + end=current_end, + speaker=speaker, + ) + ) + + # Sort segments by start time + segments.sort(key=lambda s: s.start) + return segments + + class Transcript(BaseModel): translation: str | None = None words: list[Word] = [] @@ -154,7 +229,9 @@ class Transcript(BaseModel): word.start += offset word.end += offset - def as_segments(self) -> list[TranscriptSegment]: + def as_segments(self, is_multitrack: bool = False) -> list[TranscriptSegment]: + if is_multitrack: + return words_to_segments_by_sentence(self.words) return words_to_segments(self.words) diff --git a/server/reflector/utils/daily.py b/server/reflector/utils/daily.py index 72242f78..91f8d782 100644 --- a/server/reflector/utils/daily.py +++ b/server/reflector/utils/daily.py @@ -64,6 +64,11 @@ def recording_lock_key(recording_id: NonEmptyString) -> NonEmptyString: return f"recording:{recording_id}" +def filter_cam_audio_tracks(track_keys: list[str]) -> list[str]: + """Filter track keys to cam-audio tracks only (skip screen-audio, etc.).""" + return [k for k in track_keys if "cam-audio" in k] + + def extract_base_room_name(daily_room_name: DailyRoomName) -> NonEmptyString: """ Extract base room name from Daily.co timestamped room name. diff --git a/server/reflector/utils/transcript_formats.py b/server/reflector/utils/transcript_formats.py index 4ccf8cce..d2aa3af2 100644 --- a/server/reflector/utils/transcript_formats.py +++ b/server/reflector/utils/transcript_formats.py @@ -6,9 +6,6 @@ from reflector.db.transcripts import TranscriptParticipant, TranscriptTopic from reflector.processors.types import ( Transcript as ProcessorTranscript, ) -from reflector.processors.types import ( - words_to_segments, -) from reflector.schemas.transcript_formats import TranscriptSegment from reflector.utils.webvtt import seconds_to_timestamp @@ -32,7 +29,9 @@ def format_timestamp_mmss(seconds: float | int) -> str: def transcript_to_text( - topics: list[TranscriptTopic], participants: list[TranscriptParticipant] | None + topics: list[TranscriptTopic], + participants: list[TranscriptParticipant] | None, + is_multitrack: bool = False, ) -> str: """Convert transcript topics to plain text with speaker names.""" lines = [] @@ -41,7 +40,7 @@ def transcript_to_text( continue transcript = ProcessorTranscript(words=topic.words) - segments = transcript.as_segments() + segments = transcript.as_segments(is_multitrack) for segment in segments: speaker_name = get_speaker_name(segment.speaker, participants) @@ -52,7 +51,9 @@ def transcript_to_text( def transcript_to_text_timestamped( - topics: list[TranscriptTopic], participants: list[TranscriptParticipant] | None + topics: list[TranscriptTopic], + participants: list[TranscriptParticipant] | None, + is_multitrack: bool = False, ) -> str: """Convert transcript topics to timestamped text with speaker names.""" lines = [] @@ -61,7 +62,7 @@ def transcript_to_text_timestamped( continue transcript = ProcessorTranscript(words=topic.words) - segments = transcript.as_segments() + segments = transcript.as_segments(is_multitrack) for segment in segments: speaker_name = get_speaker_name(segment.speaker, participants) @@ -73,7 +74,9 @@ def transcript_to_text_timestamped( def topics_to_webvtt_named( - topics: list[TranscriptTopic], participants: list[TranscriptParticipant] | None + topics: list[TranscriptTopic], + participants: list[TranscriptParticipant] | None, + is_multitrack: bool = False, ) -> str: """Convert transcript topics to WebVTT format with participant names.""" vtt = webvtt.WebVTT() @@ -82,7 +85,8 @@ def topics_to_webvtt_named( if not topic.words: continue - segments = words_to_segments(topic.words) + transcript = ProcessorTranscript(words=topic.words) + segments = transcript.as_segments(is_multitrack) for segment in segments: speaker_name = get_speaker_name(segment.speaker, participants) @@ -100,19 +104,23 @@ def topics_to_webvtt_named( def transcript_to_json_segments( - topics: list[TranscriptTopic], participants: list[TranscriptParticipant] | None + topics: list[TranscriptTopic], + participants: list[TranscriptParticipant] | None, + is_multitrack: bool = False, ) -> list[TranscriptSegment]: """Convert transcript topics to a flat list of JSON segments.""" - segments = [] + result = [] for topic in topics: if not topic.words: continue transcript = ProcessorTranscript(words=topic.words) - for segment in transcript.as_segments(): + segments = transcript.as_segments(is_multitrack) + + for segment in segments: speaker_name = get_speaker_name(segment.speaker, participants) - segments.append( + result.append( TranscriptSegment( speaker=segment.speaker, speaker_name=speaker_name, @@ -122,4 +130,4 @@ def transcript_to_json_segments( ) ) - return segments + return result diff --git a/server/reflector/views/transcripts.py b/server/reflector/views/transcripts.py index dc5ccdb7..625a9896 100644 --- a/server/reflector/views/transcripts.py +++ b/server/reflector/views/transcripts.py @@ -16,6 +16,7 @@ from pydantic import ( import reflector.auth as auth from reflector.db import get_database +from reflector.db.recordings import recordings_controller from reflector.db.search import ( DEFAULT_SEARCH_LIMIT, SearchLimit, @@ -60,6 +61,14 @@ ALGORITHM = "HS256" DOWNLOAD_EXPIRE_MINUTES = 60 +async def _get_is_multitrack(transcript) -> bool: + """Detect if transcript is from multitrack recording.""" + if not transcript.recording_id: + return False + recording = await recordings_controller.get_by_id(transcript.recording_id) + return recording is not None and recording.is_multitrack + + def create_access_token(data: dict, expires_delta: timedelta): to_encode = data.copy() expire = datetime.now(timezone.utc) + expires_delta @@ -360,7 +369,7 @@ class GetTranscriptTopic(BaseModel): segments: list[GetTranscriptSegmentTopic] = [] @classmethod - def from_transcript_topic(cls, topic: TranscriptTopic): + def from_transcript_topic(cls, topic: TranscriptTopic, is_multitrack: bool = False): if not topic.words: # In previous version, words were missing # Just output a segment with speaker 0 @@ -384,7 +393,7 @@ class GetTranscriptTopic(BaseModel): start=segment.start, speaker=segment.speaker, ) - for segment in transcript.as_segments() + for segment in transcript.as_segments(is_multitrack) ] return cls( id=topic.id, @@ -401,8 +410,8 @@ class GetTranscriptTopicWithWords(GetTranscriptTopic): words: list[Word] = [] @classmethod - def from_transcript_topic(cls, topic: TranscriptTopic): - instance = super().from_transcript_topic(topic) + def from_transcript_topic(cls, topic: TranscriptTopic, is_multitrack: bool = False): + instance = super().from_transcript_topic(topic, is_multitrack) if topic.words: instance.words = topic.words return instance @@ -417,8 +426,8 @@ class GetTranscriptTopicWithWordsPerSpeaker(GetTranscriptTopic): words_per_speaker: list[SpeakerWords] = [] @classmethod - def from_transcript_topic(cls, topic: TranscriptTopic): - instance = super().from_transcript_topic(topic) + def from_transcript_topic(cls, topic: TranscriptTopic, is_multitrack: bool = False): + instance = super().from_transcript_topic(topic, is_multitrack) if topic.words: words_per_speakers = [] # group words by speaker @@ -457,6 +466,8 @@ async def transcript_get( transcript_id, user_id=user_id ) + is_multitrack = await _get_is_multitrack(transcript) + base_data = { "id": transcript.id, "user_id": transcript.user_id, @@ -483,14 +494,16 @@ async def transcript_get( return GetTranscriptWithText( **base_data, transcript_format="text", - transcript=transcript_to_text(transcript.topics, transcript.participants), + transcript=transcript_to_text( + transcript.topics, transcript.participants, is_multitrack + ), ) elif transcript_format == "text-timestamped": return GetTranscriptWithTextTimestamped( **base_data, transcript_format="text-timestamped", transcript=transcript_to_text_timestamped( - transcript.topics, transcript.participants + transcript.topics, transcript.participants, is_multitrack ), ) elif transcript_format == "webvtt-named": @@ -498,7 +511,7 @@ async def transcript_get( **base_data, transcript_format="webvtt-named", transcript=topics_to_webvtt_named( - transcript.topics, transcript.participants + transcript.topics, transcript.participants, is_multitrack ), ) elif transcript_format == "json": @@ -506,7 +519,7 @@ async def transcript_get( **base_data, transcript_format="json", transcript=transcript_to_json_segments( - transcript.topics, transcript.participants + transcript.topics, transcript.participants, is_multitrack ), ) else: @@ -565,9 +578,12 @@ async def transcript_get_topics( transcript_id, user_id=user_id ) + is_multitrack = await _get_is_multitrack(transcript) + # convert to GetTranscriptTopic return [ - GetTranscriptTopic.from_transcript_topic(topic) for topic in transcript.topics + GetTranscriptTopic.from_transcript_topic(topic, is_multitrack) + for topic in transcript.topics ] @@ -584,9 +600,11 @@ async def transcript_get_topics_with_words( transcript_id, user_id=user_id ) + is_multitrack = await _get_is_multitrack(transcript) + # convert to GetTranscriptTopicWithWords return [ - GetTranscriptTopicWithWords.from_transcript_topic(topic) + GetTranscriptTopicWithWords.from_transcript_topic(topic, is_multitrack) for topic in transcript.topics ] @@ -605,13 +623,17 @@ async def transcript_get_topics_with_words_per_speaker( transcript_id, user_id=user_id ) + is_multitrack = await _get_is_multitrack(transcript) + # get the topic from the transcript topic = next((t for t in transcript.topics if t.id == topic_id), None) if not topic: raise HTTPException(status_code=404, detail="Topic not found") # convert to GetTranscriptTopicWithWordsPerSpeaker - return GetTranscriptTopicWithWordsPerSpeaker.from_transcript_topic(topic) + return GetTranscriptTopicWithWordsPerSpeaker.from_transcript_topic( + topic, is_multitrack + ) @router.post("/transcripts/{transcript_id}/zulip") diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 0e1b4d86..adf73d15 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -2,6 +2,7 @@ import json import os import re from datetime import datetime, timezone +from typing import List from urllib.parse import unquote import av @@ -11,7 +12,7 @@ from celery import shared_task from celery.utils.log import get_task_logger from pydantic import ValidationError -from reflector.dailyco_api import MeetingParticipantsResponse +from reflector.dailyco_api import MeetingParticipantsResponse, RecordingResponse from reflector.db.daily_participant_sessions import ( DailyParticipantSession, daily_participant_sessions_controller, @@ -38,6 +39,7 @@ from reflector.storage import get_transcripts_storage from reflector.utils.daily import ( DailyRoomName, extract_base_room_name, + filter_cam_audio_tracks, parse_daily_recording_filename, recording_lock_key, ) @@ -338,7 +340,9 @@ async def _process_multitrack_recording_inner( exc_info=True, ) - for idx, key in enumerate(track_keys): + cam_audio_keys = filter_cam_audio_tracks(track_keys) + + for idx, key in enumerate(cam_audio_keys): try: parsed = parse_daily_recording_filename(key) participant_id = parsed.participant_id @@ -366,7 +370,7 @@ async def _process_multitrack_recording_inner( task_pipeline_multitrack_process.delay( transcript_id=transcript.id, bucket_name=bucket_name, - track_keys=track_keys, + track_keys=filter_cam_audio_tracks(track_keys), ) @@ -391,7 +395,7 @@ async def poll_daily_recordings(): async with create_platform_client("daily") as daily_client: # latest 100. TODO cursor-based state - api_recordings = await daily_client.list_recordings() + api_recordings: List[RecordingResponse] = await daily_client.list_recordings() if not api_recordings: logger.debug( @@ -422,17 +426,19 @@ async def poll_daily_recordings(): for recording in missing_recordings: if not recording.tracks: - assert recording.status != "finished", ( - f"Recording {recording.id} has status='finished' but no tracks. " - f"Daily.co API guarantees finished recordings have tracks available. " - f"room_name={recording.room_name}" - ) - logger.debug( - "No tracks in recording yet", - recording_id=recording.id, - room_name=recording.room_name, - status=recording.status, - ) + if recording.status == "finished": + logger.warning( + "Finished recording has no tracks (no audio captured)", + recording_id=recording.id, + room_name=recording.room_name, + ) + else: + logger.debug( + "No tracks in recording yet", + recording_id=recording.id, + room_name=recording.room_name, + status=recording.status, + ) continue track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"] diff --git a/server/tests/test_processor_transcript_segment.py b/server/tests/test_processor_transcript_segment.py index 6fde0dd1..89cc459f 100644 --- a/server/tests/test_processor_transcript_segment.py +++ b/server/tests/test_processor_transcript_segment.py @@ -159,3 +159,78 @@ def test_processor_transcript_segment(): assert segments[3].start == 30.72 assert segments[4].start == 31.56 assert segments[5].start == 32.38 + + +def test_processor_transcript_segment_multitrack_interleaved(): + """Test as_segments(is_multitrack=True) with interleaved speakers. + + Multitrack recordings have words from different speakers sorted by start time, + causing frequent speaker alternation. The multitrack mode should group by + speaker first, then split into sentences. + """ + from reflector.processors.types import Transcript, Word + + # Simulate real multitrack data: words sorted by start time, speakers interleave + # Speaker 0 says: "Hello there." + # Speaker 1 says: "I'm good." + # When sorted by time, words interleave + transcript = Transcript( + words=[ + Word(text="Hello ", start=0.0, end=0.5, speaker=0), + Word(text="I'm ", start=0.5, end=0.8, speaker=1), + Word(text="there.", start=0.5, end=1.0, speaker=0), + Word(text="good.", start=1.0, end=1.5, speaker=1), + ] + ) + + # Default behavior (is_multitrack=False): breaks on every speaker change = 4 segments + segments_default = transcript.as_segments(is_multitrack=False) + assert len(segments_default) == 4 + + # Multitrack behavior: groups by speaker, then sentences = 2 segments + segments_multitrack = transcript.as_segments(is_multitrack=True) + assert len(segments_multitrack) == 2 + + # Check content - sorted by start time + assert segments_multitrack[0].speaker == 0 + assert segments_multitrack[0].text == "Hello there." + assert segments_multitrack[0].start == 0.0 + assert segments_multitrack[0].end == 1.0 + + assert segments_multitrack[1].speaker == 1 + assert segments_multitrack[1].text == "I'm good." + assert segments_multitrack[1].start == 0.5 + assert segments_multitrack[1].end == 1.5 + + +def test_processor_transcript_segment_multitrack_overlapping_timestamps(): + """Test multitrack with exactly overlapping timestamps (real Daily.co data pattern).""" + from reflector.processors.types import Transcript, Word + + # Real pattern from transcript 38d84d57: words with identical timestamps + transcript = Transcript( + words=[ + Word(text="speaking ", start=6.71, end=7.11, speaker=0), + Word(text="Speaking ", start=6.71, end=7.11, speaker=1), + Word(text="at ", start=7.11, end=7.27, speaker=0), + Word(text="at ", start=7.11, end=7.27, speaker=1), + Word(text="the ", start=7.27, end=7.43, speaker=0), + Word(text="the ", start=7.27, end=7.43, speaker=1), + Word(text="same ", start=7.43, end=7.59, speaker=0), + Word(text="same ", start=7.43, end=7.59, speaker=1), + Word(text="time.", start=7.59, end=8.0, speaker=0), + Word(text="time.", start=7.59, end=8.0, speaker=1), + ] + ) + + # Default: 10 segments (one per speaker change) + segments_default = transcript.as_segments(is_multitrack=False) + assert len(segments_default) == 10 + + # Multitrack: 2 segments (one per speaker sentence) + segments_multitrack = transcript.as_segments(is_multitrack=True) + assert len(segments_multitrack) == 2 + + # Both should have complete sentences + assert "speaking at the same time." in segments_multitrack[0].text + assert "Speaking at the same time." in segments_multitrack[1].text diff --git a/server/tests/test_transcript_formats.py b/server/tests/test_transcript_formats.py index 62e382fe..ea44636d 100644 --- a/server/tests/test_transcript_formats.py +++ b/server/tests/test_transcript_formats.py @@ -273,8 +273,17 @@ async def test_transcript_formats_with_multiple_speakers(): @pytest.mark.asyncio -async def test_transcript_formats_with_overlapping_speakers(): - """Test format conversion when multiple speakers speak at the same time (overlapping timestamps).""" +async def test_transcript_formats_with_overlapping_speakers_multitrack(): + """Test format conversion for multitrack recordings with truly interleaved words. + + Multitrack recordings have words from different speakers sorted by start time, + causing frequent speaker alternation. This tests the sentence-based segmentation + that groups each speaker's words into complete sentences. + """ + # Real multitrack data: words sorted by start time, speakers interleave + # Alice says: "Hello there." (0.0-1.0) + # Bob says: "I'm good." (0.5-1.5) + # When sorted by time, words interleave: Hello, I'm, there., good. topics = [ TranscriptTopic( id="1", @@ -282,11 +291,10 @@ async def test_transcript_formats_with_overlapping_speakers(): summary="Summary 1", timestamp=0.0, words=[ - Word(text="Hello", start=0.0, end=0.5, speaker=0), - Word(text=" there.", start=0.5, end=1.0, speaker=0), - # Speaker 1 overlaps with speaker 0 at 0.5-1.0 - Word(text="I'm", start=0.5, end=1.0, speaker=1), - Word(text=" good.", start=1.0, end=1.5, speaker=1), + Word(text="Hello ", start=0.0, end=0.5, speaker=0), + Word(text="I'm ", start=0.5, end=0.8, speaker=1), + Word(text="there.", start=0.5, end=1.0, speaker=0), + Word(text="good.", start=1.0, end=1.5, speaker=1), ], ), ] @@ -296,20 +304,9 @@ async def test_transcript_formats_with_overlapping_speakers(): TranscriptParticipant(id="2", speaker=1, name="Bob"), ] - text_result = transcript_to_text(topics, participants) - lines = text_result.split("\n") - assert len(lines) >= 2 - assert any("Alice:" in line for line in lines) - assert any("Bob:" in line for line in lines) - - timestamped_result = transcript_to_text_timestamped(topics, participants) - timestamped_lines = timestamped_result.split("\n") - assert len(timestamped_lines) >= 2 - assert any("Alice:" in line for line in timestamped_lines) - assert any("Bob:" in line for line in timestamped_lines) - assert any("[00:00]" in line for line in timestamped_lines) - - webvtt_result = topics_to_webvtt_named(topics, participants) + # With is_multitrack=True, should produce 2 segments (one per speaker sentence) + # not 4 segments (one per speaker change) + webvtt_result = topics_to_webvtt_named(topics, participants, is_multitrack=True) expected_webvtt = """WEBVTT 00:00:00.000 --> 00:00:01.000 @@ -320,23 +317,26 @@ async def test_transcript_formats_with_overlapping_speakers(): """ assert webvtt_result == expected_webvtt - segments = transcript_to_json_segments(topics, participants) - assert len(segments) >= 2 - speakers = {seg.speaker for seg in segments} - assert 0 in speakers and 1 in speakers + text_result = transcript_to_text(topics, participants, is_multitrack=True) + lines = text_result.split("\n") + assert len(lines) == 2 + assert "Alice: Hello there." in lines[0] + assert "Bob: I'm good." in lines[1] - alice_seg = next(seg for seg in segments if seg.speaker == 0) - bob_seg = next(seg for seg in segments if seg.speaker == 1) + timestamped_result = transcript_to_text_timestamped( + topics, participants, is_multitrack=True + ) + timestamped_lines = timestamped_result.split("\n") + assert len(timestamped_lines) == 2 + assert "[00:00] Alice: Hello there." in timestamped_lines[0] + assert "[00:00] Bob: I'm good." in timestamped_lines[1] - # Verify timestamps overlap: Alice (0.0-1.0) and Bob (0.5-1.5) overlap at 0.5-1.0 - assert alice_seg.start < bob_seg.end, "Alice segment should start before Bob ends" - assert bob_seg.start < alice_seg.end, "Bob segment should start before Alice ends" - - overlap_start = max(alice_seg.start, bob_seg.start) - overlap_end = min(alice_seg.end, bob_seg.end) - assert ( - overlap_start < overlap_end - ), f"Segments should overlap between {overlap_start} and {overlap_end}" + segments = transcript_to_json_segments(topics, participants, is_multitrack=True) + assert len(segments) == 2 + assert segments[0].speaker_name == "Alice" + assert segments[0].text == "Hello there." + assert segments[1].speaker_name == "Bob" + assert segments[1].text == "I'm good." @pytest.mark.asyncio @@ -573,3 +573,207 @@ async def test_api_transcript_format_default_is_text(client): assert data["transcript_format"] == "text" assert "transcript" in data + + +@pytest.mark.asyncio +async def test_api_topics_endpoint_multitrack_segmentation(client): + """Test GET /transcripts/{id}/topics uses sentence-based segmentation for multitrack. + + This tests the fix for TASKS2.md - ensuring /topics endpoints correctly detect + multitrack recordings and use sentence-based segmentation instead of fragmenting + on every speaker change. + """ + from datetime import datetime, timezone + + from reflector.db.recordings import Recording, recordings_controller + from reflector.db.transcripts import ( + TranscriptParticipant, + TranscriptTopic, + transcripts_controller, + ) + from reflector.processors.types import Word + + # Create a multitrack recording (has track_keys) + recording = Recording( + bucket_name="test-bucket", + object_key="test-key", + recorded_at=datetime.now(timezone.utc), + track_keys=["track1.webm", "track2.webm"], # This makes it multitrack + ) + await recordings_controller.create(recording) + + # Create transcript linked to the recording + transcript = await transcripts_controller.add( + name="Multitrack Test", + source_kind="file", + recording_id=recording.id, + ) + + await transcripts_controller.update( + transcript, + { + "participants": [ + TranscriptParticipant(id="1", speaker=0, name="Alice").model_dump(), + TranscriptParticipant(id="2", speaker=1, name="Bob").model_dump(), + ] + }, + ) + + # Add interleaved words (as they appear in real multitrack data) + await transcripts_controller.upsert_topic( + transcript, + TranscriptTopic( + title="Topic 1", + summary="Summary 1", + timestamp=0, + words=[ + Word(text="Hello ", start=0.0, end=0.5, speaker=0), + Word(text="I'm ", start=0.5, end=0.8, speaker=1), + Word(text="there.", start=0.5, end=1.0, speaker=0), + Word(text="good.", start=1.0, end=1.5, speaker=1), + ], + ), + ) + + # Test /topics endpoint + response = await client.get(f"/transcripts/{transcript.id}/topics") + assert response.status_code == 200 + data = response.json() + + assert len(data) == 1 + topic = data[0] + + # Key assertion: multitrack should produce 2 segments (one per speaker sentence) + # Not 4 segments (one per speaker change) + assert len(topic["segments"]) == 2 + + # Check content + segment_texts = [s["text"] for s in topic["segments"]] + assert "Hello there." in segment_texts + assert "I'm good." in segment_texts + + +@pytest.mark.asyncio +async def test_api_topics_endpoint_non_multitrack_segmentation(client): + """Test GET /transcripts/{id}/topics uses default segmentation for non-multitrack. + + Ensures backward compatibility - transcripts without multitrack recordings + should continue using the default speaker-change-based segmentation. + """ + from reflector.db.transcripts import ( + TranscriptParticipant, + TranscriptTopic, + transcripts_controller, + ) + from reflector.processors.types import Word + + # Create transcript WITHOUT recording (defaulted as not multitrack) TODO better heuristic + response = await client.post("/transcripts", json={"name": "Test transcript"}) + assert response.status_code == 200 + tid = response.json()["id"] + + transcript = await transcripts_controller.get_by_id(tid) + + await transcripts_controller.update( + transcript, + { + "participants": [ + TranscriptParticipant(id="1", speaker=0, name="Alice").model_dump(), + TranscriptParticipant(id="2", speaker=1, name="Bob").model_dump(), + ] + }, + ) + + # Add interleaved words + await transcripts_controller.upsert_topic( + transcript, + TranscriptTopic( + title="Topic 1", + summary="Summary 1", + timestamp=0, + words=[ + Word(text="Hello ", start=0.0, end=0.5, speaker=0), + Word(text="I'm ", start=0.5, end=0.8, speaker=1), + Word(text="there.", start=0.5, end=1.0, speaker=0), + Word(text="good.", start=1.0, end=1.5, speaker=1), + ], + ), + ) + + # Test /topics endpoint + response = await client.get(f"/transcripts/{tid}/topics") + assert response.status_code == 200 + data = response.json() + + assert len(data) == 1 + topic = data[0] + + # Non-multitrack: should produce 4 segments (one per speaker change) + assert len(topic["segments"]) == 4 + + +@pytest.mark.asyncio +async def test_api_topics_with_words_endpoint_multitrack(client): + """Test GET /transcripts/{id}/topics/with-words uses multitrack segmentation.""" + from datetime import datetime, timezone + + from reflector.db.recordings import Recording, recordings_controller + from reflector.db.transcripts import ( + TranscriptParticipant, + TranscriptTopic, + transcripts_controller, + ) + from reflector.processors.types import Word + + # Create multitrack recording + recording = Recording( + bucket_name="test-bucket", + object_key="test-key-2", + recorded_at=datetime.now(timezone.utc), + track_keys=["track1.webm", "track2.webm"], + ) + await recordings_controller.create(recording) + + transcript = await transcripts_controller.add( + name="Multitrack Test 2", + source_kind="file", + recording_id=recording.id, + ) + + await transcripts_controller.update( + transcript, + { + "participants": [ + TranscriptParticipant(id="1", speaker=0, name="Alice").model_dump(), + TranscriptParticipant(id="2", speaker=1, name="Bob").model_dump(), + ] + }, + ) + + await transcripts_controller.upsert_topic( + transcript, + TranscriptTopic( + title="Topic 1", + summary="Summary 1", + timestamp=0, + words=[ + Word(text="Hello ", start=0.0, end=0.5, speaker=0), + Word(text="I'm ", start=0.5, end=0.8, speaker=1), + Word(text="there.", start=0.5, end=1.0, speaker=0), + Word(text="good.", start=1.0, end=1.5, speaker=1), + ], + ), + ) + + response = await client.get(f"/transcripts/{transcript.id}/topics/with-words") + assert response.status_code == 200 + data = response.json() + + assert len(data) == 1 + topic = data[0] + + # Should have 2 segments (multitrack sentence-based) + assert len(topic["segments"]) == 2 + # Should also have words field + assert "words" in topic + assert len(topic["words"]) == 4