mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 12:49:06 +00:00
* segmentation multitrack (no-mistakes) * segmentation multitrack (no-mistakes) * self review * self review * recording poll daily doc * filter cam_audio tracks to remove screensharing from daily processing * pr review --------- Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
134 lines
4.1 KiB
Python
134 lines
4.1 KiB
Python
"""Utilities for converting transcript data to various output formats."""
|
|
|
|
import webvtt
|
|
|
|
from reflector.db.transcripts import TranscriptParticipant, TranscriptTopic
|
|
from reflector.processors.types import (
|
|
Transcript as ProcessorTranscript,
|
|
)
|
|
from reflector.schemas.transcript_formats import TranscriptSegment
|
|
from reflector.utils.webvtt import seconds_to_timestamp
|
|
|
|
|
|
def get_speaker_name(
|
|
speaker: int, participants: list[TranscriptParticipant] | None
|
|
) -> str:
|
|
"""Get participant name for speaker or default to 'Speaker N'."""
|
|
if participants:
|
|
for participant in participants:
|
|
if participant.speaker == speaker:
|
|
return participant.name
|
|
return f"Speaker {speaker}"
|
|
|
|
|
|
def format_timestamp_mmss(seconds: float | int) -> str:
|
|
"""Format seconds as MM:SS timestamp."""
|
|
minutes = int(seconds // 60)
|
|
secs = int(seconds % 60)
|
|
return f"{minutes:02d}:{secs:02d}"
|
|
|
|
|
|
def transcript_to_text(
|
|
topics: list[TranscriptTopic],
|
|
participants: list[TranscriptParticipant] | None,
|
|
is_multitrack: bool = False,
|
|
) -> str:
|
|
"""Convert transcript topics to plain text with speaker names."""
|
|
lines = []
|
|
for topic in topics:
|
|
if not topic.words:
|
|
continue
|
|
|
|
transcript = ProcessorTranscript(words=topic.words)
|
|
segments = transcript.as_segments(is_multitrack)
|
|
|
|
for segment in segments:
|
|
speaker_name = get_speaker_name(segment.speaker, participants)
|
|
text = segment.text.strip()
|
|
lines.append(f"{speaker_name}: {text}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def transcript_to_text_timestamped(
|
|
topics: list[TranscriptTopic],
|
|
participants: list[TranscriptParticipant] | None,
|
|
is_multitrack: bool = False,
|
|
) -> str:
|
|
"""Convert transcript topics to timestamped text with speaker names."""
|
|
lines = []
|
|
for topic in topics:
|
|
if not topic.words:
|
|
continue
|
|
|
|
transcript = ProcessorTranscript(words=topic.words)
|
|
segments = transcript.as_segments(is_multitrack)
|
|
|
|
for segment in segments:
|
|
speaker_name = get_speaker_name(segment.speaker, participants)
|
|
timestamp = format_timestamp_mmss(segment.start)
|
|
text = segment.text.strip()
|
|
lines.append(f"[{timestamp}] {speaker_name}: {text}")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def topics_to_webvtt_named(
|
|
topics: list[TranscriptTopic],
|
|
participants: list[TranscriptParticipant] | None,
|
|
is_multitrack: bool = False,
|
|
) -> str:
|
|
"""Convert transcript topics to WebVTT format with participant names."""
|
|
vtt = webvtt.WebVTT()
|
|
|
|
for topic in topics:
|
|
if not topic.words:
|
|
continue
|
|
|
|
transcript = ProcessorTranscript(words=topic.words)
|
|
segments = transcript.as_segments(is_multitrack)
|
|
|
|
for segment in segments:
|
|
speaker_name = get_speaker_name(segment.speaker, participants)
|
|
text = segment.text.strip()
|
|
text = f"<v {speaker_name}>{text}"
|
|
|
|
caption = webvtt.Caption(
|
|
start=seconds_to_timestamp(segment.start),
|
|
end=seconds_to_timestamp(segment.end),
|
|
text=text,
|
|
)
|
|
vtt.captions.append(caption)
|
|
|
|
return vtt.content
|
|
|
|
|
|
def transcript_to_json_segments(
|
|
topics: list[TranscriptTopic],
|
|
participants: list[TranscriptParticipant] | None,
|
|
is_multitrack: bool = False,
|
|
) -> list[TranscriptSegment]:
|
|
"""Convert transcript topics to a flat list of JSON segments."""
|
|
result = []
|
|
|
|
for topic in topics:
|
|
if not topic.words:
|
|
continue
|
|
|
|
transcript = ProcessorTranscript(words=topic.words)
|
|
segments = transcript.as_segments(is_multitrack)
|
|
|
|
for segment in segments:
|
|
speaker_name = get_speaker_name(segment.speaker, participants)
|
|
result.append(
|
|
TranscriptSegment(
|
|
speaker=segment.speaker,
|
|
speaker_name=speaker_name,
|
|
text=segment.text.strip(),
|
|
start=segment.start,
|
|
end=segment.end,
|
|
)
|
|
)
|
|
|
|
return result
|