mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-04 09:56:47 +00:00
Compare commits
18 Commits
v0.30.0
...
feature/sp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6a57388723 | ||
| ddef1d4a4a | |||
| 2ca624f052 | |||
| 88e0d11ccd | |||
|
|
9f6e7b515b | ||
|
|
d0110f4dd4 | ||
|
|
7dfb37154d | ||
|
|
67679e90b2 | ||
|
|
aa4c368479 | ||
|
|
deb5ed6010 | ||
|
|
30b28eed3b | ||
|
|
1b33fba3ba | ||
| fc3ef6c893 | |||
|
|
3ce279daa4 | ||
|
|
01650be787 | ||
| f00c16a41c | |||
| 859df5513e | |||
|
|
2af9918979 |
@@ -4,3 +4,4 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
|
||||
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
|
||||
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
|
||||
server/reflector/worker/process.py:generic-api-key:465
|
||||
server/reflector/worker/process.py:generic-api-key:594
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
# Changelog
|
||||
|
||||
## [0.31.0](https://github.com/Monadical-SAS/reflector/compare/v0.30.0...v0.31.0) (2026-01-23)
|
||||
|
||||
|
||||
### Features
|
||||
|
||||
* mixdown optional ([#834](https://github.com/Monadical-SAS/reflector/issues/834)) ([fc3ef6c](https://github.com/Monadical-SAS/reflector/commit/fc3ef6c8933231c731fad84e7477a476a6220a5e))
|
||||
|
||||
## [0.30.0](https://github.com/Monadical-SAS/reflector/compare/v0.29.0...v0.30.0) (2026-01-23)
|
||||
|
||||
|
||||
|
||||
@@ -8,7 +8,8 @@ from enum import StrEnum
|
||||
class TaskName(StrEnum):
|
||||
GET_RECORDING = "get_recording"
|
||||
GET_PARTICIPANTS = "get_participants"
|
||||
PROCESS_TRACKS = "process_tracks"
|
||||
PROCESS_PADDINGS = "process_paddings"
|
||||
PROCESS_TRANSCRIPTIONS = "process_transcriptions"
|
||||
MIXDOWN_TRACKS = "mixdown_tracks"
|
||||
GENERATE_WAVEFORM = "generate_waveform"
|
||||
DETECT_TOPICS = "detect_topics"
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
"""
|
||||
CPU-heavy worker pool for audio processing tasks.
|
||||
Handles ONLY: mixdown_tracks
|
||||
Handles: mixdown_tracks only (serialized with max_runs=1)
|
||||
|
||||
Configuration:
|
||||
- slots=1: Only mixdown (already serialized globally with max_runs=1)
|
||||
- slots=1: Only one mixdown at a time
|
||||
- Worker affinity: pool=cpu-heavy
|
||||
"""
|
||||
|
||||
@@ -26,7 +26,7 @@ def main():
|
||||
|
||||
cpu_worker = hatchet.worker(
|
||||
"cpu-worker-pool",
|
||||
slots=1, # Only 1 mixdown at a time (already serialized globally)
|
||||
slots=1,
|
||||
labels={
|
||||
"pool": "cpu-heavy",
|
||||
},
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
"""
|
||||
LLM/I/O worker pool for all non-CPU tasks.
|
||||
Handles: all tasks except mixdown_tracks (transcription, LLM inference, orchestration)
|
||||
Handles: all tasks except mixdown_tracks (padding, transcription, LLM inference, orchestration)
|
||||
"""
|
||||
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||
daily_multitrack_pipeline,
|
||||
)
|
||||
from reflector.hatchet.workflows.padding_workflow import padding_workflow
|
||||
from reflector.hatchet.workflows.subject_processing import subject_workflow
|
||||
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
|
||||
from reflector.hatchet.workflows.track_processing import track_workflow
|
||||
from reflector.hatchet.workflows.transcription_workflow import transcription_workflow
|
||||
from reflector.logger import logger
|
||||
|
||||
SLOTS = 10
|
||||
@@ -29,7 +30,7 @@ def main():
|
||||
|
||||
llm_worker = hatchet.worker(
|
||||
WORKER_NAME,
|
||||
slots=SLOTS, # not all slots are probably used
|
||||
slots=SLOTS,
|
||||
labels={
|
||||
"pool": POOL,
|
||||
},
|
||||
@@ -37,7 +38,8 @@ def main():
|
||||
daily_multitrack_pipeline,
|
||||
topic_chunk_workflow,
|
||||
subject_workflow,
|
||||
track_workflow,
|
||||
padding_workflow,
|
||||
transcription_workflow,
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
@@ -4,6 +4,10 @@ from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||
PipelineInput,
|
||||
daily_multitrack_pipeline,
|
||||
)
|
||||
from reflector.hatchet.workflows.padding_workflow import (
|
||||
PaddingInput,
|
||||
padding_workflow,
|
||||
)
|
||||
from reflector.hatchet.workflows.subject_processing import (
|
||||
SubjectInput,
|
||||
subject_workflow,
|
||||
@@ -12,15 +16,20 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
|
||||
TopicChunkInput,
|
||||
topic_chunk_workflow,
|
||||
)
|
||||
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
||||
from reflector.hatchet.workflows.transcription_workflow import (
|
||||
TranscriptionInput,
|
||||
transcription_workflow,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"daily_multitrack_pipeline",
|
||||
"subject_workflow",
|
||||
"topic_chunk_workflow",
|
||||
"track_workflow",
|
||||
"padding_workflow",
|
||||
"transcription_workflow",
|
||||
"PipelineInput",
|
||||
"SubjectInput",
|
||||
"TopicChunkInput",
|
||||
"TrackInput",
|
||||
"PaddingInput",
|
||||
"TranscriptionInput",
|
||||
]
|
||||
|
||||
@@ -54,8 +54,9 @@ from reflector.hatchet.workflows.models import (
|
||||
PadTrackResult,
|
||||
ParticipantInfo,
|
||||
ParticipantsResult,
|
||||
ProcessPaddingsResult,
|
||||
ProcessSubjectsResult,
|
||||
ProcessTracksResult,
|
||||
ProcessTranscriptionsResult,
|
||||
RecapResult,
|
||||
RecordingResult,
|
||||
SubjectsResult,
|
||||
@@ -68,6 +69,7 @@ from reflector.hatchet.workflows.models import (
|
||||
WebhookResult,
|
||||
ZulipResult,
|
||||
)
|
||||
from reflector.hatchet.workflows.padding_workflow import PaddingInput, padding_workflow
|
||||
from reflector.hatchet.workflows.subject_processing import (
|
||||
SubjectInput,
|
||||
subject_workflow,
|
||||
@@ -76,7 +78,10 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
|
||||
TopicChunkInput,
|
||||
topic_chunk_workflow,
|
||||
)
|
||||
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
||||
from reflector.hatchet.workflows.transcription_workflow import (
|
||||
TranscriptionInput,
|
||||
transcription_workflow,
|
||||
)
|
||||
from reflector.logger import logger
|
||||
from reflector.pipelines import topic_processing
|
||||
from reflector.processors import AudioFileWriterProcessor
|
||||
@@ -404,72 +409,115 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||
retries=3,
|
||||
)
|
||||
@with_error_handling(TaskName.PROCESS_TRACKS)
|
||||
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
|
||||
"""Spawn child workflows for each track (dynamic fan-out)."""
|
||||
ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows")
|
||||
|
||||
participants_result = ctx.task_output(get_participants)
|
||||
source_language = participants_result.source_language
|
||||
@with_error_handling(TaskName.PROCESS_PADDINGS)
|
||||
async def process_paddings(input: PipelineInput, ctx: Context) -> ProcessPaddingsResult:
|
||||
"""Spawn child workflows for each track to apply padding (dynamic fan-out)."""
|
||||
ctx.log(f"process_paddings: spawning {len(input.tracks)} padding workflows")
|
||||
|
||||
bulk_runs = [
|
||||
track_workflow.create_bulk_run_item(
|
||||
input=TrackInput(
|
||||
padding_workflow.create_bulk_run_item(
|
||||
input=PaddingInput(
|
||||
track_index=i,
|
||||
s3_key=track["s3_key"],
|
||||
bucket_name=input.bucket_name,
|
||||
transcript_id=input.transcript_id,
|
||||
language=source_language,
|
||||
)
|
||||
)
|
||||
for i, track in enumerate(input.tracks)
|
||||
]
|
||||
|
||||
results = await track_workflow.aio_run_many(bulk_runs)
|
||||
results = await padding_workflow.aio_run_many(bulk_runs)
|
||||
|
||||
target_language = participants_result.target_language
|
||||
|
||||
track_words: list[list[Word]] = []
|
||||
padded_tracks = []
|
||||
created_padded_files = set()
|
||||
created_padded_files = []
|
||||
|
||||
for result in results:
|
||||
transcribe_result = TranscribeTrackResult(**result[TaskName.TRANSCRIBE_TRACK])
|
||||
track_words.append(transcribe_result.words)
|
||||
|
||||
pad_result = PadTrackResult(**result[TaskName.PAD_TRACK])
|
||||
|
||||
# Store S3 key info (not presigned URL) - consumer tasks presign on demand
|
||||
if pad_result.padded_key:
|
||||
padded_tracks.append(
|
||||
PaddedTrackInfo(
|
||||
key=pad_result.padded_key, bucket_name=pad_result.bucket_name
|
||||
)
|
||||
padded_tracks.append(
|
||||
PaddedTrackInfo(
|
||||
key=pad_result.padded_key,
|
||||
bucket_name=pad_result.bucket_name,
|
||||
track_index=pad_result.track_index,
|
||||
)
|
||||
)
|
||||
|
||||
if pad_result.size > 0:
|
||||
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{pad_result.track_index}.webm"
|
||||
created_padded_files.add(storage_path)
|
||||
created_padded_files.append(storage_path)
|
||||
|
||||
all_words = [word for words in track_words for word in words]
|
||||
all_words.sort(key=lambda w: w.start)
|
||||
ctx.log(f"process_paddings complete: {len(padded_tracks)} padded tracks")
|
||||
|
||||
ctx.log(
|
||||
f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks"
|
||||
)
|
||||
|
||||
return ProcessTracksResult(
|
||||
all_words=all_words,
|
||||
return ProcessPaddingsResult(
|
||||
padded_tracks=padded_tracks,
|
||||
word_count=len(all_words),
|
||||
num_tracks=len(input.tracks),
|
||||
target_language=target_language,
|
||||
created_padded_files=list(created_padded_files),
|
||||
)
|
||||
|
||||
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[process_tracks],
|
||||
parents=[process_paddings],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||
retries=3,
|
||||
)
|
||||
@with_error_handling(TaskName.PROCESS_TRANSCRIPTIONS)
|
||||
async def process_transcriptions(
|
||||
input: PipelineInput, ctx: Context
|
||||
) -> ProcessTranscriptionsResult:
|
||||
"""Spawn child workflows for each padded track to transcribe (dynamic fan-out)."""
|
||||
participants_result = ctx.task_output(get_participants)
|
||||
paddings_result = ctx.task_output(process_paddings)
|
||||
|
||||
source_language = participants_result.source_language
|
||||
if not source_language:
|
||||
raise ValueError("source_language is required for transcription")
|
||||
|
||||
target_language = participants_result.target_language
|
||||
padded_tracks = paddings_result.padded_tracks
|
||||
|
||||
if not padded_tracks:
|
||||
raise ValueError("No padded tracks available for transcription")
|
||||
|
||||
ctx.log(
|
||||
f"process_transcriptions: spawning {len(padded_tracks)} transcription workflows"
|
||||
)
|
||||
|
||||
bulk_runs = [
|
||||
transcription_workflow.create_bulk_run_item(
|
||||
input=TranscriptionInput(
|
||||
track_index=padded_track.track_index,
|
||||
padded_key=padded_track.key,
|
||||
bucket_name=padded_track.bucket_name,
|
||||
language=source_language,
|
||||
)
|
||||
)
|
||||
for padded_track in padded_tracks
|
||||
]
|
||||
|
||||
results = await transcription_workflow.aio_run_many(bulk_runs)
|
||||
|
||||
track_words: list[list[Word]] = []
|
||||
for result in results:
|
||||
transcribe_result = TranscribeTrackResult(**result[TaskName.TRANSCRIBE_TRACK])
|
||||
track_words.append(transcribe_result.words)
|
||||
|
||||
all_words = [word for words in track_words for word in words]
|
||||
all_words.sort(key=lambda w: w.start)
|
||||
|
||||
ctx.log(
|
||||
f"process_transcriptions complete: {len(all_words)} words from {len(padded_tracks)} tracks"
|
||||
)
|
||||
|
||||
return ProcessTranscriptionsResult(
|
||||
all_words=all_words,
|
||||
word_count=len(all_words),
|
||||
num_tracks=len(input.tracks),
|
||||
target_language=target_language,
|
||||
)
|
||||
|
||||
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[process_paddings],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
|
||||
retries=3,
|
||||
desired_worker_labels={
|
||||
@@ -489,12 +537,12 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
|
||||
)
|
||||
@with_error_handling(TaskName.MIXDOWN_TRACKS)
|
||||
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
|
||||
"""Mix all padded tracks into single audio file using PyAV (same as Celery)."""
|
||||
"""Mix all padded tracks into single audio file using PyAV."""
|
||||
ctx.log("mixdown_tracks: mixing padded tracks into single audio file")
|
||||
|
||||
track_result = ctx.task_output(process_tracks)
|
||||
paddings_result = ctx.task_output(process_paddings)
|
||||
recording_result = ctx.task_output(get_recording)
|
||||
padded_tracks = track_result.padded_tracks
|
||||
padded_tracks = paddings_result.padded_tracks
|
||||
|
||||
# Dynamic timeout: scales with track count and recording duration
|
||||
# Base 300s + 60s per track + 1s per 10s of recording
|
||||
@@ -648,7 +696,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
|
||||
|
||||
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[process_tracks],
|
||||
parents=[process_transcriptions],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||
retries=3,
|
||||
)
|
||||
@@ -657,8 +705,8 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
|
||||
"""Detect topics using parallel child workflows (one per chunk)."""
|
||||
ctx.log("detect_topics: analyzing transcript for topics")
|
||||
|
||||
track_result = ctx.task_output(process_tracks)
|
||||
words = track_result.all_words
|
||||
transcriptions_result = ctx.task_output(process_transcriptions)
|
||||
words = transcriptions_result.all_words
|
||||
|
||||
if not words:
|
||||
ctx.log("detect_topics: no words, returning empty topics")
|
||||
@@ -1095,7 +1143,7 @@ async def identify_action_items(
|
||||
|
||||
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
|
||||
parents=[generate_title, generate_recap, identify_action_items],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
|
||||
retries=3,
|
||||
)
|
||||
@@ -1109,13 +1157,14 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
|
||||
ctx.log("finalize: saving transcript and setting status to 'ended'")
|
||||
|
||||
mixdown_result = ctx.task_output(mixdown_tracks)
|
||||
track_result = ctx.task_output(process_tracks)
|
||||
transcriptions_result = ctx.task_output(process_transcriptions)
|
||||
paddings_result = ctx.task_output(process_paddings)
|
||||
|
||||
duration = mixdown_result.duration
|
||||
all_words = track_result.all_words
|
||||
all_words = transcriptions_result.all_words
|
||||
|
||||
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
|
||||
created_padded_files = track_result.created_padded_files
|
||||
created_padded_files = paddings_result.created_padded_files
|
||||
if created_padded_files:
|
||||
ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files")
|
||||
storage = _spawn_storage()
|
||||
|
||||
@@ -21,12 +21,14 @@ class ParticipantInfo(BaseModel):
|
||||
|
||||
|
||||
class PadTrackResult(BaseModel):
|
||||
"""Result from pad_track task."""
|
||||
"""Result from pad_track task.
|
||||
|
||||
padded_key: NonEmptyString # S3 key (not presigned URL) - presign on demand to avoid stale URLs on replay
|
||||
bucket_name: (
|
||||
NonEmptyString | None
|
||||
) # None means use default transcript storage bucket
|
||||
If size=0, track required no padding and padded_key contains original S3 key.
|
||||
If size>0, track was padded and padded_key contains new padded file S3 key.
|
||||
"""
|
||||
|
||||
padded_key: NonEmptyString
|
||||
bucket_name: NonEmptyString | None
|
||||
size: int
|
||||
track_index: int
|
||||
|
||||
@@ -59,18 +61,25 @@ class PaddedTrackInfo(BaseModel):
|
||||
"""Info for a padded track - S3 key + bucket for on-demand presigning."""
|
||||
|
||||
key: NonEmptyString
|
||||
bucket_name: NonEmptyString | None # None = use default storage bucket
|
||||
bucket_name: NonEmptyString | None
|
||||
track_index: int
|
||||
|
||||
|
||||
class ProcessTracksResult(BaseModel):
|
||||
"""Result from process_tracks task."""
|
||||
class ProcessPaddingsResult(BaseModel):
|
||||
"""Result from process_paddings task."""
|
||||
|
||||
padded_tracks: list[PaddedTrackInfo]
|
||||
num_tracks: int
|
||||
created_padded_files: list[NonEmptyString]
|
||||
|
||||
|
||||
class ProcessTranscriptionsResult(BaseModel):
|
||||
"""Result from process_transcriptions task."""
|
||||
|
||||
all_words: list[Word]
|
||||
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
|
||||
word_count: int
|
||||
num_tracks: int
|
||||
target_language: NonEmptyString
|
||||
created_padded_files: list[NonEmptyString]
|
||||
|
||||
|
||||
class MixdownResult(BaseModel):
|
||||
|
||||
148
server/reflector/hatchet/workflows/padding_workflow.py
Normal file
148
server/reflector/hatchet/workflows/padding_workflow.py
Normal file
@@ -0,0 +1,148 @@
|
||||
"""
|
||||
Hatchet child workflow: PaddingWorkflow
|
||||
Handles individual audio track padding only.
|
||||
"""
|
||||
|
||||
import tempfile
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
|
||||
import av
|
||||
from hatchet_sdk import Context
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.constants import TIMEOUT_AUDIO
|
||||
from reflector.hatchet.workflows.models import PadTrackResult
|
||||
from reflector.logger import logger
|
||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
||||
from reflector.utils.audio_padding import (
|
||||
apply_audio_padding_to_file,
|
||||
extract_stream_start_time_from_container,
|
||||
)
|
||||
|
||||
|
||||
class PaddingInput(BaseModel):
|
||||
"""Input for individual track padding."""
|
||||
|
||||
track_index: int
|
||||
s3_key: str
|
||||
bucket_name: str
|
||||
transcript_id: str
|
||||
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
padding_workflow = hatchet.workflow(
|
||||
name="PaddingWorkflow", input_validator=PaddingInput
|
||||
)
|
||||
|
||||
|
||||
@padding_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
|
||||
async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
|
||||
"""Pad audio track with silence based on WebM container start_time."""
|
||||
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
|
||||
logger.info(
|
||||
"[Hatchet] pad_track",
|
||||
track_index=input.track_index,
|
||||
s3_key=input.s3_key,
|
||||
transcript_id=input.transcript_id,
|
||||
)
|
||||
|
||||
try:
|
||||
# Create fresh storage instance to avoid aioboto3 fork issues
|
||||
from reflector.settings import settings # noqa: PLC0415
|
||||
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
||||
|
||||
storage = AwsStorage(
|
||||
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||
)
|
||||
|
||||
source_url = await storage.get_file_url(
|
||||
input.s3_key,
|
||||
operation="get_object",
|
||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
||||
bucket=input.bucket_name,
|
||||
)
|
||||
|
||||
with av.open(source_url) as in_container:
|
||||
with av.open(source_url) as in_container:
|
||||
if in_container.duration:
|
||||
try:
|
||||
duration = timedelta(seconds=in_container.duration // 1_000_000)
|
||||
ctx.log(
|
||||
f"pad_track: track {input.track_index}, duration={duration}"
|
||||
)
|
||||
except (ValueError, TypeError, OverflowError) as e:
|
||||
ctx.log(
|
||||
f"pad_track: track {input.track_index}, duration error: {str(e)}"
|
||||
)
|
||||
|
||||
start_time_seconds = extract_stream_start_time_from_container(
|
||||
in_container, input.track_index, logger=logger
|
||||
)
|
||||
|
||||
if start_time_seconds <= 0:
|
||||
logger.info(
|
||||
f"Track {input.track_index} requires no padding",
|
||||
track_index=input.track_index,
|
||||
)
|
||||
return PadTrackResult(
|
||||
padded_key=input.s3_key,
|
||||
bucket_name=input.bucket_name,
|
||||
size=0,
|
||||
track_index=input.track_index,
|
||||
)
|
||||
|
||||
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
|
||||
temp_path = temp_file.name
|
||||
|
||||
try:
|
||||
apply_audio_padding_to_file(
|
||||
in_container,
|
||||
temp_path,
|
||||
start_time_seconds,
|
||||
input.track_index,
|
||||
logger=logger,
|
||||
)
|
||||
|
||||
file_size = Path(temp_path).stat().st_size
|
||||
|
||||
with open(temp_path, "rb") as padded_file:
|
||||
await storage.put_file(storage_path, padded_file)
|
||||
|
||||
logger.info(
|
||||
f"Uploaded padded track to S3",
|
||||
key=storage_path,
|
||||
size=file_size,
|
||||
)
|
||||
finally:
|
||||
Path(temp_path).unlink(missing_ok=True)
|
||||
|
||||
logger.info(
|
||||
"[Hatchet] pad_track complete",
|
||||
track_index=input.track_index,
|
||||
padded_key=storage_path,
|
||||
)
|
||||
|
||||
return PadTrackResult(
|
||||
padded_key=storage_path,
|
||||
bucket_name=None, # None = use default transcript storage bucket
|
||||
size=file_size,
|
||||
track_index=input.track_index,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"[Hatchet] pad_track failed",
|
||||
transcript_id=input.transcript_id,
|
||||
track_index=input.track_index,
|
||||
error=str(e),
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
@@ -1,229 +0,0 @@
|
||||
"""
|
||||
Hatchet child workflow: TrackProcessing
|
||||
|
||||
Handles individual audio track processing: padding and transcription.
|
||||
Spawned dynamically by the main diarization pipeline for each track.
|
||||
|
||||
Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline)
|
||||
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
|
||||
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
|
||||
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
|
||||
|
||||
Note: This file uses deferred imports (inside tasks) intentionally.
|
||||
Hatchet workers run in forked processes; fresh imports per task ensure
|
||||
storage/DB connections are not shared across forks.
|
||||
"""
|
||||
|
||||
import tempfile
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
|
||||
import av
|
||||
from hatchet_sdk import Context
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
|
||||
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
|
||||
from reflector.logger import logger
|
||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
||||
from reflector.utils.audio_padding import (
|
||||
apply_audio_padding_to_file,
|
||||
extract_stream_start_time_from_container,
|
||||
)
|
||||
|
||||
|
||||
class TrackInput(BaseModel):
|
||||
"""Input for individual track processing."""
|
||||
|
||||
track_index: int
|
||||
s3_key: str
|
||||
bucket_name: str
|
||||
transcript_id: str
|
||||
language: str = "en"
|
||||
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput)
|
||||
|
||||
|
||||
@track_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
|
||||
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
||||
"""Pad single audio track with silence for alignment.
|
||||
|
||||
Extracts stream.start_time from WebM container metadata and applies
|
||||
silence padding using PyAV filter graph (adelay).
|
||||
"""
|
||||
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
|
||||
logger.info(
|
||||
"[Hatchet] pad_track",
|
||||
track_index=input.track_index,
|
||||
s3_key=input.s3_key,
|
||||
transcript_id=input.transcript_id,
|
||||
)
|
||||
|
||||
try:
|
||||
# Create fresh storage instance to avoid aioboto3 fork issues
|
||||
from reflector.settings import settings # noqa: PLC0415
|
||||
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
||||
|
||||
storage = AwsStorage(
|
||||
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||
)
|
||||
|
||||
source_url = await storage.get_file_url(
|
||||
input.s3_key,
|
||||
operation="get_object",
|
||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
||||
bucket=input.bucket_name,
|
||||
)
|
||||
|
||||
with av.open(source_url) as in_container:
|
||||
if in_container.duration:
|
||||
try:
|
||||
duration = timedelta(seconds=in_container.duration // 1_000_000)
|
||||
ctx.log(
|
||||
f"pad_track: track {input.track_index}, duration={duration}"
|
||||
)
|
||||
except Exception:
|
||||
ctx.log(f"pad_track: track {input.track_index}, duration=ERROR")
|
||||
|
||||
start_time_seconds = extract_stream_start_time_from_container(
|
||||
in_container, input.track_index, logger=logger
|
||||
)
|
||||
|
||||
# If no padding needed, return original S3 key
|
||||
if start_time_seconds <= 0:
|
||||
logger.info(
|
||||
f"Track {input.track_index} requires no padding",
|
||||
track_index=input.track_index,
|
||||
)
|
||||
return PadTrackResult(
|
||||
padded_key=input.s3_key,
|
||||
bucket_name=input.bucket_name,
|
||||
size=0,
|
||||
track_index=input.track_index,
|
||||
)
|
||||
|
||||
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
|
||||
temp_path = temp_file.name
|
||||
|
||||
try:
|
||||
apply_audio_padding_to_file(
|
||||
in_container,
|
||||
temp_path,
|
||||
start_time_seconds,
|
||||
input.track_index,
|
||||
logger=logger,
|
||||
)
|
||||
|
||||
file_size = Path(temp_path).stat().st_size
|
||||
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
|
||||
|
||||
logger.info(
|
||||
f"About to upload padded track",
|
||||
key=storage_path,
|
||||
size=file_size,
|
||||
)
|
||||
|
||||
with open(temp_path, "rb") as padded_file:
|
||||
await storage.put_file(storage_path, padded_file)
|
||||
|
||||
logger.info(
|
||||
f"Uploaded padded track to S3",
|
||||
key=storage_path,
|
||||
size=file_size,
|
||||
)
|
||||
finally:
|
||||
Path(temp_path).unlink(missing_ok=True)
|
||||
|
||||
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
|
||||
logger.info(
|
||||
"[Hatchet] pad_track complete",
|
||||
track_index=input.track_index,
|
||||
padded_key=storage_path,
|
||||
)
|
||||
|
||||
# Return S3 key (not presigned URL) - consumer tasks presign on demand
|
||||
# This avoids stale URLs when workflow is replayed
|
||||
return PadTrackResult(
|
||||
padded_key=storage_path,
|
||||
bucket_name=None, # None = use default transcript storage bucket
|
||||
size=file_size,
|
||||
track_index=input.track_index,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("[Hatchet] pad_track failed", error=str(e), exc_info=True)
|
||||
raise
|
||||
|
||||
|
||||
@track_workflow.task(
|
||||
parents=[pad_track], execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3
|
||||
)
|
||||
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
|
||||
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
|
||||
ctx.log(f"transcribe_track: track {input.track_index}, language={input.language}")
|
||||
logger.info(
|
||||
"[Hatchet] transcribe_track",
|
||||
track_index=input.track_index,
|
||||
language=input.language,
|
||||
)
|
||||
|
||||
try:
|
||||
pad_result = ctx.task_output(pad_track)
|
||||
padded_key = pad_result.padded_key
|
||||
bucket_name = pad_result.bucket_name
|
||||
|
||||
if not padded_key:
|
||||
raise ValueError("Missing padded_key from pad_track")
|
||||
|
||||
# Presign URL on demand (avoids stale URLs on workflow replay)
|
||||
from reflector.settings import settings # noqa: PLC0415
|
||||
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
||||
|
||||
storage = AwsStorage(
|
||||
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||
)
|
||||
|
||||
audio_url = await storage.get_file_url(
|
||||
padded_key,
|
||||
operation="get_object",
|
||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
||||
bucket=bucket_name,
|
||||
)
|
||||
|
||||
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
|
||||
transcribe_file_with_processor,
|
||||
)
|
||||
|
||||
transcript = await transcribe_file_with_processor(audio_url, input.language)
|
||||
|
||||
# Tag all words with speaker index
|
||||
for word in transcript.words:
|
||||
word.speaker = input.track_index
|
||||
|
||||
ctx.log(
|
||||
f"transcribe_track complete: track {input.track_index}, {len(transcript.words)} words"
|
||||
)
|
||||
logger.info(
|
||||
"[Hatchet] transcribe_track complete",
|
||||
track_index=input.track_index,
|
||||
word_count=len(transcript.words),
|
||||
)
|
||||
|
||||
return TranscribeTrackResult(
|
||||
words=transcript.words,
|
||||
track_index=input.track_index,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("[Hatchet] transcribe_track failed", error=str(e), exc_info=True)
|
||||
raise
|
||||
98
server/reflector/hatchet/workflows/transcription_workflow.py
Normal file
98
server/reflector/hatchet/workflows/transcription_workflow.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""
|
||||
Hatchet child workflow: TranscriptionWorkflow
|
||||
Handles individual audio track transcription only.
|
||||
"""
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
from hatchet_sdk import Context
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.constants import TIMEOUT_HEAVY
|
||||
from reflector.hatchet.workflows.models import TranscribeTrackResult
|
||||
from reflector.logger import logger
|
||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
||||
|
||||
|
||||
class TranscriptionInput(BaseModel):
|
||||
"""Input for individual track transcription."""
|
||||
|
||||
track_index: int
|
||||
padded_key: str # S3 key from padding step
|
||||
bucket_name: str | None # None = use default bucket
|
||||
language: str = "en"
|
||||
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
transcription_workflow = hatchet.workflow(
|
||||
name="TranscriptionWorkflow", input_validator=TranscriptionInput
|
||||
)
|
||||
|
||||
|
||||
@transcription_workflow.task(
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3
|
||||
)
|
||||
async def transcribe_track(
|
||||
input: TranscriptionInput, ctx: Context
|
||||
) -> TranscribeTrackResult:
|
||||
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
|
||||
ctx.log(f"transcribe_track: track {input.track_index}, language={input.language}")
|
||||
logger.info(
|
||||
"[Hatchet] transcribe_track",
|
||||
track_index=input.track_index,
|
||||
language=input.language,
|
||||
)
|
||||
|
||||
try:
|
||||
from reflector.settings import settings # noqa: PLC0415
|
||||
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
||||
|
||||
storage = AwsStorage(
|
||||
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||
)
|
||||
|
||||
audio_url = await storage.get_file_url(
|
||||
input.padded_key,
|
||||
operation="get_object",
|
||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
||||
bucket=input.bucket_name,
|
||||
)
|
||||
|
||||
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
|
||||
transcribe_file_with_processor,
|
||||
)
|
||||
|
||||
transcript = await transcribe_file_with_processor(audio_url, input.language)
|
||||
|
||||
for word in transcript.words:
|
||||
word.speaker = input.track_index
|
||||
|
||||
ctx.log(
|
||||
f"transcribe_track complete: track {input.track_index}, {len(transcript.words)} words"
|
||||
)
|
||||
logger.info(
|
||||
"[Hatchet] transcribe_track complete",
|
||||
track_index=input.track_index,
|
||||
word_count=len(transcript.words),
|
||||
)
|
||||
|
||||
return TranscribeTrackResult(
|
||||
words=transcript.words,
|
||||
track_index=input.track_index,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"[Hatchet] transcribe_track failed",
|
||||
track_index=input.track_index,
|
||||
padded_key=input.padded_key,
|
||||
language=input.language,
|
||||
error=str(e),
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
@@ -302,10 +302,10 @@ export default function RoomsList() {
|
||||
return;
|
||||
}
|
||||
|
||||
const platform: "whereby" | "daily" | null =
|
||||
const platform: "whereby" | "daily" =
|
||||
room.platform === "whereby" || room.platform === "daily"
|
||||
? room.platform
|
||||
: null;
|
||||
: "daily";
|
||||
|
||||
const roomData = {
|
||||
name: room.name,
|
||||
|
||||
@@ -16,6 +16,7 @@ import {
|
||||
import { useError } from "../../../../(errors)/errorContext";
|
||||
import { useRouter } from "next/navigation";
|
||||
import { Box, Grid } from "@chakra-ui/react";
|
||||
import { parseNonEmptyString } from "../../../../lib/utils";
|
||||
|
||||
export type TranscriptCorrect = {
|
||||
params: Promise<{
|
||||
@@ -25,8 +26,7 @@ export type TranscriptCorrect = {
|
||||
|
||||
export default function TranscriptCorrect(props: TranscriptCorrect) {
|
||||
const params = use(props.params);
|
||||
|
||||
const { transcriptId } = params;
|
||||
const transcriptId = parseNonEmptyString(params.transcriptId);
|
||||
|
||||
const updateTranscriptMutation = useTranscriptUpdate();
|
||||
const transcript = useTranscriptGet(transcriptId);
|
||||
|
||||
@@ -9,7 +9,9 @@ import React, { useEffect, useState, use } from "react";
|
||||
import FinalSummary from "./finalSummary";
|
||||
import TranscriptTitle from "../transcriptTitle";
|
||||
import Player from "../player";
|
||||
import { useWebSockets } from "../useWebSockets";
|
||||
import { useRouter } from "next/navigation";
|
||||
import { parseNonEmptyString } from "../../../lib/utils";
|
||||
import {
|
||||
Box,
|
||||
Flex,
|
||||
@@ -30,7 +32,7 @@ type TranscriptDetails = {
|
||||
|
||||
export default function TranscriptDetails(details: TranscriptDetails) {
|
||||
const params = use(details.params);
|
||||
const transcriptId = params.transcriptId;
|
||||
const transcriptId = parseNonEmptyString(params.transcriptId);
|
||||
const router = useRouter();
|
||||
const statusToRedirect = [
|
||||
"idle",
|
||||
@@ -49,6 +51,7 @@ export default function TranscriptDetails(details: TranscriptDetails) {
|
||||
transcriptId,
|
||||
waiting || mp3.audioDeleted === true,
|
||||
);
|
||||
useWebSockets(transcriptId);
|
||||
const useActiveTopic = useState<Topic | null>(null);
|
||||
const [finalSummaryElement, setFinalSummaryElement] =
|
||||
useState<HTMLDivElement | null>(null);
|
||||
|
||||
@@ -10,6 +10,7 @@ import {
|
||||
} from "@chakra-ui/react";
|
||||
import { useRouter } from "next/navigation";
|
||||
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
||||
import { parseNonEmptyString } from "../../../../lib/utils";
|
||||
|
||||
type TranscriptProcessing = {
|
||||
params: Promise<{
|
||||
@@ -19,7 +20,7 @@ type TranscriptProcessing = {
|
||||
|
||||
export default function TranscriptProcessing(details: TranscriptProcessing) {
|
||||
const params = use(details.params);
|
||||
const transcriptId = params.transcriptId;
|
||||
const transcriptId = parseNonEmptyString(params.transcriptId);
|
||||
const router = useRouter();
|
||||
|
||||
const transcript = useTranscriptGet(transcriptId);
|
||||
|
||||
@@ -12,6 +12,7 @@ import { Box, Text, Grid, Heading, VStack, Flex } from "@chakra-ui/react";
|
||||
import LiveTrancription from "../../liveTranscription";
|
||||
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
||||
import { TranscriptStatus } from "../../../../lib/transcript";
|
||||
import { parseNonEmptyString } from "../../../../lib/utils";
|
||||
|
||||
type TranscriptDetails = {
|
||||
params: Promise<{
|
||||
@@ -21,13 +22,14 @@ type TranscriptDetails = {
|
||||
|
||||
const TranscriptRecord = (details: TranscriptDetails) => {
|
||||
const params = use(details.params);
|
||||
const transcript = useTranscriptGet(params.transcriptId);
|
||||
const transcriptId = parseNonEmptyString(params.transcriptId);
|
||||
const transcript = useTranscriptGet(transcriptId);
|
||||
const [transcriptStarted, setTranscriptStarted] = useState(false);
|
||||
const useActiveTopic = useState<Topic | null>(null);
|
||||
|
||||
const webSockets = useWebSockets(params.transcriptId);
|
||||
const webSockets = useWebSockets(transcriptId);
|
||||
|
||||
const mp3 = useMp3(params.transcriptId, true);
|
||||
const mp3 = useMp3(transcriptId, true);
|
||||
|
||||
const router = useRouter();
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import useMp3 from "../../useMp3";
|
||||
import { Center, VStack, Text, Heading } from "@chakra-ui/react";
|
||||
import FileUploadButton from "../../fileUploadButton";
|
||||
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
||||
import { parseNonEmptyString } from "../../../../lib/utils";
|
||||
|
||||
type TranscriptUpload = {
|
||||
params: Promise<{
|
||||
@@ -16,12 +17,13 @@ type TranscriptUpload = {
|
||||
|
||||
const TranscriptUpload = (details: TranscriptUpload) => {
|
||||
const params = use(details.params);
|
||||
const transcript = useTranscriptGet(params.transcriptId);
|
||||
const transcriptId = parseNonEmptyString(params.transcriptId);
|
||||
const transcript = useTranscriptGet(transcriptId);
|
||||
const [transcriptStarted, setTranscriptStarted] = useState(false);
|
||||
|
||||
const webSockets = useWebSockets(params.transcriptId);
|
||||
const webSockets = useWebSockets(transcriptId);
|
||||
|
||||
const mp3 = useMp3(params.transcriptId, true);
|
||||
const mp3 = useMp3(transcriptId, true);
|
||||
|
||||
const router = useRouter();
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { useState } from "react";
|
||||
import type { components } from "../../reflector-api";
|
||||
import { parseMaybeNonEmptyString } from "../../lib/utils";
|
||||
|
||||
type UpdateTranscript = components["schemas"]["UpdateTranscript"];
|
||||
type GetTranscriptWithParticipants =
|
||||
@@ -32,7 +33,7 @@ const TranscriptTitle = (props: TranscriptTitle) => {
|
||||
const [isEditing, setIsEditing] = useState(false);
|
||||
const updateTranscriptMutation = useTranscriptUpdate();
|
||||
const participantsQuery = useTranscriptParticipants(
|
||||
props.transcript?.id || null,
|
||||
props.transcript?.id ? parseMaybeNonEmptyString(props.transcript.id) : null,
|
||||
);
|
||||
|
||||
const updateTitle = async (newTitle: string, transcriptId: string) => {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { useEffect, useState } from "react";
|
||||
import { useTranscriptGet } from "../../lib/apiHooks";
|
||||
import { parseMaybeNonEmptyString } from "../../lib/utils";
|
||||
import { useAuth } from "../../lib/AuthProvider";
|
||||
import { API_URL } from "../../lib/apiClient";
|
||||
|
||||
@@ -27,7 +28,7 @@ const useMp3 = (transcriptId: string, waiting?: boolean): Mp3Response => {
|
||||
data: transcript,
|
||||
isLoading: transcriptMetadataLoading,
|
||||
error: transcriptError,
|
||||
} = useTranscriptGet(later ? null : transcriptId);
|
||||
} = useTranscriptGet(later ? null : parseMaybeNonEmptyString(transcriptId));
|
||||
|
||||
const [serviceWorker, setServiceWorker] =
|
||||
useState<ServiceWorkerRegistration | null>(null);
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { components } from "../../reflector-api";
|
||||
type Participant = components["schemas"]["Participant"];
|
||||
import { useTranscriptParticipants } from "../../lib/apiHooks";
|
||||
import { parseMaybeNonEmptyString } from "../../lib/utils";
|
||||
|
||||
type ErrorParticipants = {
|
||||
error: Error;
|
||||
@@ -32,7 +33,7 @@ const useParticipants = (transcriptId: string): UseParticipants => {
|
||||
isLoading: loading,
|
||||
error,
|
||||
refetch,
|
||||
} = useTranscriptParticipants(transcriptId || null);
|
||||
} = useTranscriptParticipants(parseMaybeNonEmptyString(transcriptId));
|
||||
|
||||
// Type-safe return based on state
|
||||
if (error) {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { components } from "../../reflector-api";
|
||||
import { useTranscriptTopicsWithWordsPerSpeaker } from "../../lib/apiHooks";
|
||||
import { parseMaybeNonEmptyString } from "../../lib/utils";
|
||||
|
||||
type GetTranscriptTopicWithWordsPerSpeaker =
|
||||
components["schemas"]["GetTranscriptTopicWithWordsPerSpeaker"];
|
||||
@@ -38,7 +39,7 @@ const useTopicWithWords = (
|
||||
error,
|
||||
refetch,
|
||||
} = useTranscriptTopicsWithWordsPerSpeaker(
|
||||
transcriptId || null,
|
||||
parseMaybeNonEmptyString(transcriptId),
|
||||
topicId || null,
|
||||
);
|
||||
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { useTranscriptTopics } from "../../lib/apiHooks";
|
||||
import type { components } from "../../reflector-api";
|
||||
import { parseMaybeNonEmptyString } from "../../lib/utils";
|
||||
|
||||
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
|
||||
|
||||
@@ -10,7 +11,11 @@ type TranscriptTopics = {
|
||||
};
|
||||
|
||||
const useTopics = (id: string): TranscriptTopics => {
|
||||
const { data: topics, isLoading: loading, error } = useTranscriptTopics(id);
|
||||
const {
|
||||
data: topics,
|
||||
isLoading: loading,
|
||||
error,
|
||||
} = useTranscriptTopics(parseMaybeNonEmptyString(id));
|
||||
|
||||
return {
|
||||
topics: topics || null,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { components } from "../../reflector-api";
|
||||
import { useTranscriptWaveform } from "../../lib/apiHooks";
|
||||
import { parseMaybeNonEmptyString } from "../../lib/utils";
|
||||
|
||||
type AudioWaveform = components["schemas"]["AudioWaveform"];
|
||||
|
||||
@@ -14,7 +15,7 @@ const useWaveform = (id: string, skip: boolean): AudioWaveFormResponse => {
|
||||
data: waveform,
|
||||
isLoading: loading,
|
||||
error,
|
||||
} = useTranscriptWaveform(skip ? null : id);
|
||||
} = useTranscriptWaveform(skip ? null : parseMaybeNonEmptyString(id));
|
||||
|
||||
return {
|
||||
waveform: waveform || null,
|
||||
|
||||
@@ -7,6 +7,12 @@ type GetTranscriptSegmentTopic =
|
||||
components["schemas"]["GetTranscriptSegmentTopic"];
|
||||
import { useQueryClient } from "@tanstack/react-query";
|
||||
import { $api, WEBSOCKET_URL } from "../../lib/apiClient";
|
||||
import {
|
||||
invalidateTranscript,
|
||||
invalidateTranscriptTopics,
|
||||
invalidateTranscriptWaveform,
|
||||
} from "../../lib/apiHooks";
|
||||
import { NonEmptyString } from "../../lib/utils";
|
||||
|
||||
export type UseWebSockets = {
|
||||
transcriptTextLive: string;
|
||||
@@ -369,15 +375,10 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
|
||||
});
|
||||
console.debug("TOPIC event:", message.data);
|
||||
// Invalidate topics query to sync with WebSocket data
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: $api.queryOptions(
|
||||
"get",
|
||||
"/v1/transcripts/{transcript_id}/topics",
|
||||
{
|
||||
params: { path: { transcript_id: transcriptId } },
|
||||
},
|
||||
).queryKey,
|
||||
});
|
||||
invalidateTranscriptTopics(
|
||||
queryClient,
|
||||
transcriptId as NonEmptyString,
|
||||
);
|
||||
break;
|
||||
|
||||
case "FINAL_SHORT_SUMMARY":
|
||||
@@ -388,15 +389,7 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
|
||||
if (message.data) {
|
||||
setFinalSummary(message.data);
|
||||
// Invalidate transcript query to sync summary
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: $api.queryOptions(
|
||||
"get",
|
||||
"/v1/transcripts/{transcript_id}",
|
||||
{
|
||||
params: { path: { transcript_id: transcriptId } },
|
||||
},
|
||||
).queryKey,
|
||||
});
|
||||
invalidateTranscript(queryClient, transcriptId as NonEmptyString);
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -405,15 +398,7 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
|
||||
if (message.data) {
|
||||
setTitle(message.data.title);
|
||||
// Invalidate transcript query to sync title
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: $api.queryOptions(
|
||||
"get",
|
||||
"/v1/transcripts/{transcript_id}",
|
||||
{
|
||||
params: { path: { transcript_id: transcriptId } },
|
||||
},
|
||||
).queryKey,
|
||||
});
|
||||
invalidateTranscript(queryClient, transcriptId as NonEmptyString);
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -424,6 +409,10 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
|
||||
);
|
||||
if (message.data) {
|
||||
setWaveForm(message.data.waveform);
|
||||
invalidateTranscriptWaveform(
|
||||
queryClient,
|
||||
transcriptId as NonEmptyString,
|
||||
);
|
||||
}
|
||||
break;
|
||||
case "DURATION":
|
||||
|
||||
@@ -26,7 +26,7 @@ import { useRouter } from "next/navigation";
|
||||
import { formatDateTime, formatStartedAgo } from "../lib/timeUtils";
|
||||
import MeetingMinimalHeader from "../components/MeetingMinimalHeader";
|
||||
import { NonEmptyString } from "../lib/utils";
|
||||
import { MeetingId } from "../lib/types";
|
||||
import { MeetingId, assertMeetingId } from "../lib/types";
|
||||
|
||||
type Meeting = components["schemas"]["Meeting"];
|
||||
|
||||
@@ -315,7 +315,9 @@ export default function MeetingSelection({
|
||||
variant="outline"
|
||||
colorScheme="red"
|
||||
size="md"
|
||||
onClick={() => handleEndMeeting(meeting.id)}
|
||||
onClick={() =>
|
||||
handleEndMeeting(assertMeetingId(meeting.id))
|
||||
}
|
||||
loading={deactivateMeetingMutation.isPending}
|
||||
>
|
||||
<Icon as={LuX} me={2} />
|
||||
@@ -460,7 +462,9 @@ export default function MeetingSelection({
|
||||
variant="outline"
|
||||
colorScheme="red"
|
||||
size="md"
|
||||
onClick={() => handleEndMeeting(meeting.id)}
|
||||
onClick={() =>
|
||||
handleEndMeeting(assertMeetingId(meeting.id))
|
||||
}
|
||||
loading={deactivateMeetingMutation.isPending}
|
||||
>
|
||||
<Icon as={LuX} me={2} />
|
||||
|
||||
@@ -6,6 +6,7 @@ import { QueryClient, useQueryClient } from "@tanstack/react-query";
|
||||
import type { components } from "../reflector-api";
|
||||
import { useAuth } from "./AuthProvider";
|
||||
import { MeetingId } from "./types";
|
||||
import { NonEmptyString } from "./utils";
|
||||
|
||||
/*
|
||||
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
|
||||
@@ -103,7 +104,7 @@ export function useTranscriptProcess() {
|
||||
});
|
||||
}
|
||||
|
||||
export function useTranscriptGet(transcriptId: string | null) {
|
||||
export function useTranscriptGet(transcriptId: NonEmptyString | null) {
|
||||
return $api.useQuery(
|
||||
"get",
|
||||
"/v1/transcripts/{transcript_id}",
|
||||
@@ -120,6 +121,16 @@ export function useTranscriptGet(transcriptId: string | null) {
|
||||
);
|
||||
}
|
||||
|
||||
export const invalidateTranscript = (
|
||||
queryClient: QueryClient,
|
||||
transcriptId: NonEmptyString,
|
||||
) =>
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: $api.queryOptions("get", "/v1/transcripts/{transcript_id}", {
|
||||
params: { path: { transcript_id: transcriptId } },
|
||||
}).queryKey,
|
||||
});
|
||||
|
||||
export function useRoomGet(roomId: string | null) {
|
||||
const { isAuthenticated } = useAuthReady();
|
||||
|
||||
@@ -297,7 +308,7 @@ export function useTranscriptUploadAudio() {
|
||||
);
|
||||
}
|
||||
|
||||
export function useTranscriptWaveform(transcriptId: string | null) {
|
||||
export function useTranscriptWaveform(transcriptId: NonEmptyString | null) {
|
||||
return $api.useQuery(
|
||||
"get",
|
||||
"/v1/transcripts/{transcript_id}/audio/waveform",
|
||||
@@ -312,7 +323,21 @@ export function useTranscriptWaveform(transcriptId: string | null) {
|
||||
);
|
||||
}
|
||||
|
||||
export function useTranscriptMP3(transcriptId: string | null) {
|
||||
export const invalidateTranscriptWaveform = (
|
||||
queryClient: QueryClient,
|
||||
transcriptId: NonEmptyString,
|
||||
) =>
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: $api.queryOptions(
|
||||
"get",
|
||||
"/v1/transcripts/{transcript_id}/audio/waveform",
|
||||
{
|
||||
params: { path: { transcript_id: transcriptId } },
|
||||
},
|
||||
).queryKey,
|
||||
});
|
||||
|
||||
export function useTranscriptMP3(transcriptId: NonEmptyString | null) {
|
||||
const { isAuthenticated } = useAuthReady();
|
||||
|
||||
return $api.useQuery(
|
||||
@@ -329,7 +354,7 @@ export function useTranscriptMP3(transcriptId: string | null) {
|
||||
);
|
||||
}
|
||||
|
||||
export function useTranscriptTopics(transcriptId: string | null) {
|
||||
export function useTranscriptTopics(transcriptId: NonEmptyString | null) {
|
||||
return $api.useQuery(
|
||||
"get",
|
||||
"/v1/transcripts/{transcript_id}/topics",
|
||||
@@ -344,7 +369,23 @@ export function useTranscriptTopics(transcriptId: string | null) {
|
||||
);
|
||||
}
|
||||
|
||||
export function useTranscriptTopicsWithWords(transcriptId: string | null) {
|
||||
export const invalidateTranscriptTopics = (
|
||||
queryClient: QueryClient,
|
||||
transcriptId: NonEmptyString,
|
||||
) =>
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: $api.queryOptions(
|
||||
"get",
|
||||
"/v1/transcripts/{transcript_id}/topics",
|
||||
{
|
||||
params: { path: { transcript_id: transcriptId } },
|
||||
},
|
||||
).queryKey,
|
||||
});
|
||||
|
||||
export function useTranscriptTopicsWithWords(
|
||||
transcriptId: NonEmptyString | null,
|
||||
) {
|
||||
const { isAuthenticated } = useAuthReady();
|
||||
|
||||
return $api.useQuery(
|
||||
@@ -362,7 +403,7 @@ export function useTranscriptTopicsWithWords(transcriptId: string | null) {
|
||||
}
|
||||
|
||||
export function useTranscriptTopicsWithWordsPerSpeaker(
|
||||
transcriptId: string | null,
|
||||
transcriptId: NonEmptyString | null,
|
||||
topicId: string | null,
|
||||
) {
|
||||
const { isAuthenticated } = useAuthReady();
|
||||
@@ -384,7 +425,7 @@ export function useTranscriptTopicsWithWordsPerSpeaker(
|
||||
);
|
||||
}
|
||||
|
||||
export function useTranscriptParticipants(transcriptId: string | null) {
|
||||
export function useTranscriptParticipants(transcriptId: NonEmptyString | null) {
|
||||
const { isAuthenticated } = useAuthReady();
|
||||
|
||||
return $api.useQuery(
|
||||
|
||||
Reference in New Issue
Block a user