mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-05 10:26:48 +00:00
Compare commits
5 Commits
feature/sp
...
feature-le
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
485b455c69 | ||
|
|
74c9ec2ff1 | ||
|
|
aac89e8d03 | ||
|
|
13088e72f8 | ||
|
|
775c9b667d |
@@ -4,4 +4,3 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
|
|||||||
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
|
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
|
||||||
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
|
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
|
||||||
server/reflector/worker/process.py:generic-api-key:465
|
server/reflector/worker/process.py:generic-api-key:465
|
||||||
server/reflector/worker/process.py:generic-api-key:594
|
|
||||||
|
|||||||
@@ -1,12 +1,5 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
## [0.31.0](https://github.com/Monadical-SAS/reflector/compare/v0.30.0...v0.31.0) (2026-01-23)
|
|
||||||
|
|
||||||
|
|
||||||
### Features
|
|
||||||
|
|
||||||
* mixdown optional ([#834](https://github.com/Monadical-SAS/reflector/issues/834)) ([fc3ef6c](https://github.com/Monadical-SAS/reflector/commit/fc3ef6c8933231c731fad84e7477a476a6220a5e))
|
|
||||||
|
|
||||||
## [0.30.0](https://github.com/Monadical-SAS/reflector/compare/v0.29.0...v0.30.0) (2026-01-23)
|
## [0.30.0](https://github.com/Monadical-SAS/reflector/compare/v0.29.0...v0.30.0) (2026-01-23)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -8,8 +8,7 @@ from enum import StrEnum
|
|||||||
class TaskName(StrEnum):
|
class TaskName(StrEnum):
|
||||||
GET_RECORDING = "get_recording"
|
GET_RECORDING = "get_recording"
|
||||||
GET_PARTICIPANTS = "get_participants"
|
GET_PARTICIPANTS = "get_participants"
|
||||||
PROCESS_PADDINGS = "process_paddings"
|
PROCESS_TRACKS = "process_tracks"
|
||||||
PROCESS_TRANSCRIPTIONS = "process_transcriptions"
|
|
||||||
MIXDOWN_TRACKS = "mixdown_tracks"
|
MIXDOWN_TRACKS = "mixdown_tracks"
|
||||||
GENERATE_WAVEFORM = "generate_waveform"
|
GENERATE_WAVEFORM = "generate_waveform"
|
||||||
DETECT_TOPICS = "detect_topics"
|
DETECT_TOPICS = "detect_topics"
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
"""
|
"""
|
||||||
CPU-heavy worker pool for audio processing tasks.
|
CPU-heavy worker pool for audio processing tasks.
|
||||||
Handles: mixdown_tracks only (serialized with max_runs=1)
|
Handles ONLY: mixdown_tracks
|
||||||
|
|
||||||
Configuration:
|
Configuration:
|
||||||
- slots=1: Only one mixdown at a time
|
- slots=1: Only mixdown (already serialized globally with max_runs=1)
|
||||||
- Worker affinity: pool=cpu-heavy
|
- Worker affinity: pool=cpu-heavy
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -26,7 +26,7 @@ def main():
|
|||||||
|
|
||||||
cpu_worker = hatchet.worker(
|
cpu_worker = hatchet.worker(
|
||||||
"cpu-worker-pool",
|
"cpu-worker-pool",
|
||||||
slots=1,
|
slots=1, # Only 1 mixdown at a time (already serialized globally)
|
||||||
labels={
|
labels={
|
||||||
"pool": "cpu-heavy",
|
"pool": "cpu-heavy",
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -1,16 +1,15 @@
|
|||||||
"""
|
"""
|
||||||
LLM/I/O worker pool for all non-CPU tasks.
|
LLM/I/O worker pool for all non-CPU tasks.
|
||||||
Handles: all tasks except mixdown_tracks (padding, transcription, LLM inference, orchestration)
|
Handles: all tasks except mixdown_tracks (transcription, LLM inference, orchestration)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
from reflector.hatchet.client import HatchetClientManager
|
||||||
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||||
daily_multitrack_pipeline,
|
daily_multitrack_pipeline,
|
||||||
)
|
)
|
||||||
from reflector.hatchet.workflows.padding_workflow import padding_workflow
|
|
||||||
from reflector.hatchet.workflows.subject_processing import subject_workflow
|
from reflector.hatchet.workflows.subject_processing import subject_workflow
|
||||||
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
|
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
|
||||||
from reflector.hatchet.workflows.transcription_workflow import transcription_workflow
|
from reflector.hatchet.workflows.track_processing import track_workflow
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
|
|
||||||
SLOTS = 10
|
SLOTS = 10
|
||||||
@@ -30,7 +29,7 @@ def main():
|
|||||||
|
|
||||||
llm_worker = hatchet.worker(
|
llm_worker = hatchet.worker(
|
||||||
WORKER_NAME,
|
WORKER_NAME,
|
||||||
slots=SLOTS,
|
slots=SLOTS, # not all slots are probably used
|
||||||
labels={
|
labels={
|
||||||
"pool": POOL,
|
"pool": POOL,
|
||||||
},
|
},
|
||||||
@@ -38,8 +37,7 @@ def main():
|
|||||||
daily_multitrack_pipeline,
|
daily_multitrack_pipeline,
|
||||||
topic_chunk_workflow,
|
topic_chunk_workflow,
|
||||||
subject_workflow,
|
subject_workflow,
|
||||||
padding_workflow,
|
track_workflow,
|
||||||
transcription_workflow,
|
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -4,10 +4,6 @@ from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
|||||||
PipelineInput,
|
PipelineInput,
|
||||||
daily_multitrack_pipeline,
|
daily_multitrack_pipeline,
|
||||||
)
|
)
|
||||||
from reflector.hatchet.workflows.padding_workflow import (
|
|
||||||
PaddingInput,
|
|
||||||
padding_workflow,
|
|
||||||
)
|
|
||||||
from reflector.hatchet.workflows.subject_processing import (
|
from reflector.hatchet.workflows.subject_processing import (
|
||||||
SubjectInput,
|
SubjectInput,
|
||||||
subject_workflow,
|
subject_workflow,
|
||||||
@@ -16,20 +12,15 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
|
|||||||
TopicChunkInput,
|
TopicChunkInput,
|
||||||
topic_chunk_workflow,
|
topic_chunk_workflow,
|
||||||
)
|
)
|
||||||
from reflector.hatchet.workflows.transcription_workflow import (
|
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
||||||
TranscriptionInput,
|
|
||||||
transcription_workflow,
|
|
||||||
)
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"daily_multitrack_pipeline",
|
"daily_multitrack_pipeline",
|
||||||
"subject_workflow",
|
"subject_workflow",
|
||||||
"topic_chunk_workflow",
|
"topic_chunk_workflow",
|
||||||
"padding_workflow",
|
"track_workflow",
|
||||||
"transcription_workflow",
|
|
||||||
"PipelineInput",
|
"PipelineInput",
|
||||||
"SubjectInput",
|
"SubjectInput",
|
||||||
"TopicChunkInput",
|
"TopicChunkInput",
|
||||||
"PaddingInput",
|
"TrackInput",
|
||||||
"TranscriptionInput",
|
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -54,9 +54,8 @@ from reflector.hatchet.workflows.models import (
|
|||||||
PadTrackResult,
|
PadTrackResult,
|
||||||
ParticipantInfo,
|
ParticipantInfo,
|
||||||
ParticipantsResult,
|
ParticipantsResult,
|
||||||
ProcessPaddingsResult,
|
|
||||||
ProcessSubjectsResult,
|
ProcessSubjectsResult,
|
||||||
ProcessTranscriptionsResult,
|
ProcessTracksResult,
|
||||||
RecapResult,
|
RecapResult,
|
||||||
RecordingResult,
|
RecordingResult,
|
||||||
SubjectsResult,
|
SubjectsResult,
|
||||||
@@ -69,7 +68,6 @@ from reflector.hatchet.workflows.models import (
|
|||||||
WebhookResult,
|
WebhookResult,
|
||||||
ZulipResult,
|
ZulipResult,
|
||||||
)
|
)
|
||||||
from reflector.hatchet.workflows.padding_workflow import PaddingInput, padding_workflow
|
|
||||||
from reflector.hatchet.workflows.subject_processing import (
|
from reflector.hatchet.workflows.subject_processing import (
|
||||||
SubjectInput,
|
SubjectInput,
|
||||||
subject_workflow,
|
subject_workflow,
|
||||||
@@ -78,10 +76,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
|
|||||||
TopicChunkInput,
|
TopicChunkInput,
|
||||||
topic_chunk_workflow,
|
topic_chunk_workflow,
|
||||||
)
|
)
|
||||||
from reflector.hatchet.workflows.transcription_workflow import (
|
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
||||||
TranscriptionInput,
|
|
||||||
transcription_workflow,
|
|
||||||
)
|
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.pipelines import topic_processing
|
from reflector.pipelines import topic_processing
|
||||||
from reflector.processors import AudioFileWriterProcessor
|
from reflector.processors import AudioFileWriterProcessor
|
||||||
@@ -409,115 +404,72 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
|
|||||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||||
retries=3,
|
retries=3,
|
||||||
)
|
)
|
||||||
@with_error_handling(TaskName.PROCESS_PADDINGS)
|
@with_error_handling(TaskName.PROCESS_TRACKS)
|
||||||
async def process_paddings(input: PipelineInput, ctx: Context) -> ProcessPaddingsResult:
|
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
|
||||||
"""Spawn child workflows for each track to apply padding (dynamic fan-out)."""
|
"""Spawn child workflows for each track (dynamic fan-out)."""
|
||||||
ctx.log(f"process_paddings: spawning {len(input.tracks)} padding workflows")
|
ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows")
|
||||||
|
|
||||||
|
participants_result = ctx.task_output(get_participants)
|
||||||
|
source_language = participants_result.source_language
|
||||||
|
|
||||||
bulk_runs = [
|
bulk_runs = [
|
||||||
padding_workflow.create_bulk_run_item(
|
track_workflow.create_bulk_run_item(
|
||||||
input=PaddingInput(
|
input=TrackInput(
|
||||||
track_index=i,
|
track_index=i,
|
||||||
s3_key=track["s3_key"],
|
s3_key=track["s3_key"],
|
||||||
bucket_name=input.bucket_name,
|
bucket_name=input.bucket_name,
|
||||||
transcript_id=input.transcript_id,
|
transcript_id=input.transcript_id,
|
||||||
|
language=source_language,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
for i, track in enumerate(input.tracks)
|
for i, track in enumerate(input.tracks)
|
||||||
]
|
]
|
||||||
|
|
||||||
results = await padding_workflow.aio_run_many(bulk_runs)
|
results = await track_workflow.aio_run_many(bulk_runs)
|
||||||
|
|
||||||
padded_tracks = []
|
|
||||||
created_padded_files = []
|
|
||||||
|
|
||||||
for result in results:
|
|
||||||
pad_result = PadTrackResult(**result[TaskName.PAD_TRACK])
|
|
||||||
|
|
||||||
padded_tracks.append(
|
|
||||||
PaddedTrackInfo(
|
|
||||||
key=pad_result.padded_key,
|
|
||||||
bucket_name=pad_result.bucket_name,
|
|
||||||
track_index=pad_result.track_index,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
if pad_result.size > 0:
|
|
||||||
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{pad_result.track_index}.webm"
|
|
||||||
created_padded_files.append(storage_path)
|
|
||||||
|
|
||||||
ctx.log(f"process_paddings complete: {len(padded_tracks)} padded tracks")
|
|
||||||
|
|
||||||
return ProcessPaddingsResult(
|
|
||||||
padded_tracks=padded_tracks,
|
|
||||||
num_tracks=len(input.tracks),
|
|
||||||
created_padded_files=list(created_padded_files),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@daily_multitrack_pipeline.task(
|
|
||||||
parents=[process_paddings],
|
|
||||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
|
||||||
retries=3,
|
|
||||||
)
|
|
||||||
@with_error_handling(TaskName.PROCESS_TRANSCRIPTIONS)
|
|
||||||
async def process_transcriptions(
|
|
||||||
input: PipelineInput, ctx: Context
|
|
||||||
) -> ProcessTranscriptionsResult:
|
|
||||||
"""Spawn child workflows for each padded track to transcribe (dynamic fan-out)."""
|
|
||||||
participants_result = ctx.task_output(get_participants)
|
|
||||||
paddings_result = ctx.task_output(process_paddings)
|
|
||||||
|
|
||||||
source_language = participants_result.source_language
|
|
||||||
if not source_language:
|
|
||||||
raise ValueError("source_language is required for transcription")
|
|
||||||
|
|
||||||
target_language = participants_result.target_language
|
target_language = participants_result.target_language
|
||||||
padded_tracks = paddings_result.padded_tracks
|
|
||||||
|
|
||||||
if not padded_tracks:
|
|
||||||
raise ValueError("No padded tracks available for transcription")
|
|
||||||
|
|
||||||
ctx.log(
|
|
||||||
f"process_transcriptions: spawning {len(padded_tracks)} transcription workflows"
|
|
||||||
)
|
|
||||||
|
|
||||||
bulk_runs = [
|
|
||||||
transcription_workflow.create_bulk_run_item(
|
|
||||||
input=TranscriptionInput(
|
|
||||||
track_index=padded_track.track_index,
|
|
||||||
padded_key=padded_track.key,
|
|
||||||
bucket_name=padded_track.bucket_name,
|
|
||||||
language=source_language,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
for padded_track in padded_tracks
|
|
||||||
]
|
|
||||||
|
|
||||||
results = await transcription_workflow.aio_run_many(bulk_runs)
|
|
||||||
|
|
||||||
track_words: list[list[Word]] = []
|
track_words: list[list[Word]] = []
|
||||||
|
padded_tracks = []
|
||||||
|
created_padded_files = set()
|
||||||
|
|
||||||
for result in results:
|
for result in results:
|
||||||
transcribe_result = TranscribeTrackResult(**result[TaskName.TRANSCRIBE_TRACK])
|
transcribe_result = TranscribeTrackResult(**result[TaskName.TRANSCRIBE_TRACK])
|
||||||
track_words.append(transcribe_result.words)
|
track_words.append(transcribe_result.words)
|
||||||
|
|
||||||
|
pad_result = PadTrackResult(**result[TaskName.PAD_TRACK])
|
||||||
|
|
||||||
|
# Store S3 key info (not presigned URL) - consumer tasks presign on demand
|
||||||
|
if pad_result.padded_key:
|
||||||
|
padded_tracks.append(
|
||||||
|
PaddedTrackInfo(
|
||||||
|
key=pad_result.padded_key, bucket_name=pad_result.bucket_name
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if pad_result.size > 0:
|
||||||
|
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{pad_result.track_index}.webm"
|
||||||
|
created_padded_files.add(storage_path)
|
||||||
|
|
||||||
all_words = [word for words in track_words for word in words]
|
all_words = [word for words in track_words for word in words]
|
||||||
all_words.sort(key=lambda w: w.start)
|
all_words.sort(key=lambda w: w.start)
|
||||||
|
|
||||||
ctx.log(
|
ctx.log(
|
||||||
f"process_transcriptions complete: {len(all_words)} words from {len(padded_tracks)} tracks"
|
f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks"
|
||||||
)
|
)
|
||||||
|
|
||||||
return ProcessTranscriptionsResult(
|
return ProcessTracksResult(
|
||||||
all_words=all_words,
|
all_words=all_words,
|
||||||
|
padded_tracks=padded_tracks,
|
||||||
word_count=len(all_words),
|
word_count=len(all_words),
|
||||||
num_tracks=len(input.tracks),
|
num_tracks=len(input.tracks),
|
||||||
target_language=target_language,
|
target_language=target_language,
|
||||||
|
created_padded_files=list(created_padded_files),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@daily_multitrack_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[process_paddings],
|
parents=[process_tracks],
|
||||||
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
|
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
|
||||||
retries=3,
|
retries=3,
|
||||||
desired_worker_labels={
|
desired_worker_labels={
|
||||||
@@ -537,12 +489,12 @@ async def process_transcriptions(
|
|||||||
)
|
)
|
||||||
@with_error_handling(TaskName.MIXDOWN_TRACKS)
|
@with_error_handling(TaskName.MIXDOWN_TRACKS)
|
||||||
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
|
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
|
||||||
"""Mix all padded tracks into single audio file using PyAV."""
|
"""Mix all padded tracks into single audio file using PyAV (same as Celery)."""
|
||||||
ctx.log("mixdown_tracks: mixing padded tracks into single audio file")
|
ctx.log("mixdown_tracks: mixing padded tracks into single audio file")
|
||||||
|
|
||||||
paddings_result = ctx.task_output(process_paddings)
|
track_result = ctx.task_output(process_tracks)
|
||||||
recording_result = ctx.task_output(get_recording)
|
recording_result = ctx.task_output(get_recording)
|
||||||
padded_tracks = paddings_result.padded_tracks
|
padded_tracks = track_result.padded_tracks
|
||||||
|
|
||||||
# Dynamic timeout: scales with track count and recording duration
|
# Dynamic timeout: scales with track count and recording duration
|
||||||
# Base 300s + 60s per track + 1s per 10s of recording
|
# Base 300s + 60s per track + 1s per 10s of recording
|
||||||
@@ -696,7 +648,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
|
|||||||
|
|
||||||
|
|
||||||
@daily_multitrack_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[process_transcriptions],
|
parents=[process_tracks],
|
||||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||||
retries=3,
|
retries=3,
|
||||||
)
|
)
|
||||||
@@ -705,8 +657,8 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
|
|||||||
"""Detect topics using parallel child workflows (one per chunk)."""
|
"""Detect topics using parallel child workflows (one per chunk)."""
|
||||||
ctx.log("detect_topics: analyzing transcript for topics")
|
ctx.log("detect_topics: analyzing transcript for topics")
|
||||||
|
|
||||||
transcriptions_result = ctx.task_output(process_transcriptions)
|
track_result = ctx.task_output(process_tracks)
|
||||||
words = transcriptions_result.all_words
|
words = track_result.all_words
|
||||||
|
|
||||||
if not words:
|
if not words:
|
||||||
ctx.log("detect_topics: no words, returning empty topics")
|
ctx.log("detect_topics: no words, returning empty topics")
|
||||||
@@ -1143,7 +1095,7 @@ async def identify_action_items(
|
|||||||
|
|
||||||
|
|
||||||
@daily_multitrack_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[generate_title, generate_recap, identify_action_items],
|
parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
|
||||||
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
|
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
|
||||||
retries=3,
|
retries=3,
|
||||||
)
|
)
|
||||||
@@ -1157,14 +1109,13 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
|
|||||||
ctx.log("finalize: saving transcript and setting status to 'ended'")
|
ctx.log("finalize: saving transcript and setting status to 'ended'")
|
||||||
|
|
||||||
mixdown_result = ctx.task_output(mixdown_tracks)
|
mixdown_result = ctx.task_output(mixdown_tracks)
|
||||||
transcriptions_result = ctx.task_output(process_transcriptions)
|
track_result = ctx.task_output(process_tracks)
|
||||||
paddings_result = ctx.task_output(process_paddings)
|
|
||||||
|
|
||||||
duration = mixdown_result.duration
|
duration = mixdown_result.duration
|
||||||
all_words = transcriptions_result.all_words
|
all_words = track_result.all_words
|
||||||
|
|
||||||
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
|
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
|
||||||
created_padded_files = paddings_result.created_padded_files
|
created_padded_files = track_result.created_padded_files
|
||||||
if created_padded_files:
|
if created_padded_files:
|
||||||
ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files")
|
ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files")
|
||||||
storage = _spawn_storage()
|
storage = _spawn_storage()
|
||||||
|
|||||||
@@ -21,14 +21,12 @@ class ParticipantInfo(BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class PadTrackResult(BaseModel):
|
class PadTrackResult(BaseModel):
|
||||||
"""Result from pad_track task.
|
"""Result from pad_track task."""
|
||||||
|
|
||||||
If size=0, track required no padding and padded_key contains original S3 key.
|
padded_key: NonEmptyString # S3 key (not presigned URL) - presign on demand to avoid stale URLs on replay
|
||||||
If size>0, track was padded and padded_key contains new padded file S3 key.
|
bucket_name: (
|
||||||
"""
|
NonEmptyString | None
|
||||||
|
) # None means use default transcript storage bucket
|
||||||
padded_key: NonEmptyString
|
|
||||||
bucket_name: NonEmptyString | None
|
|
||||||
size: int
|
size: int
|
||||||
track_index: int
|
track_index: int
|
||||||
|
|
||||||
@@ -61,25 +59,18 @@ class PaddedTrackInfo(BaseModel):
|
|||||||
"""Info for a padded track - S3 key + bucket for on-demand presigning."""
|
"""Info for a padded track - S3 key + bucket for on-demand presigning."""
|
||||||
|
|
||||||
key: NonEmptyString
|
key: NonEmptyString
|
||||||
bucket_name: NonEmptyString | None
|
bucket_name: NonEmptyString | None # None = use default storage bucket
|
||||||
track_index: int
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessPaddingsResult(BaseModel):
|
class ProcessTracksResult(BaseModel):
|
||||||
"""Result from process_paddings task."""
|
"""Result from process_tracks task."""
|
||||||
|
|
||||||
padded_tracks: list[PaddedTrackInfo]
|
|
||||||
num_tracks: int
|
|
||||||
created_padded_files: list[NonEmptyString]
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessTranscriptionsResult(BaseModel):
|
|
||||||
"""Result from process_transcriptions task."""
|
|
||||||
|
|
||||||
all_words: list[Word]
|
all_words: list[Word]
|
||||||
|
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
|
||||||
word_count: int
|
word_count: int
|
||||||
num_tracks: int
|
num_tracks: int
|
||||||
target_language: NonEmptyString
|
target_language: NonEmptyString
|
||||||
|
created_padded_files: list[NonEmptyString]
|
||||||
|
|
||||||
|
|
||||||
class MixdownResult(BaseModel):
|
class MixdownResult(BaseModel):
|
||||||
|
|||||||
@@ -1,148 +0,0 @@
|
|||||||
"""
|
|
||||||
Hatchet child workflow: PaddingWorkflow
|
|
||||||
Handles individual audio track padding only.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import tempfile
|
|
||||||
from datetime import timedelta
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import av
|
|
||||||
from hatchet_sdk import Context
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
|
||||||
from reflector.hatchet.constants import TIMEOUT_AUDIO
|
|
||||||
from reflector.hatchet.workflows.models import PadTrackResult
|
|
||||||
from reflector.logger import logger
|
|
||||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
|
||||||
from reflector.utils.audio_padding import (
|
|
||||||
apply_audio_padding_to_file,
|
|
||||||
extract_stream_start_time_from_container,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class PaddingInput(BaseModel):
|
|
||||||
"""Input for individual track padding."""
|
|
||||||
|
|
||||||
track_index: int
|
|
||||||
s3_key: str
|
|
||||||
bucket_name: str
|
|
||||||
transcript_id: str
|
|
||||||
|
|
||||||
|
|
||||||
hatchet = HatchetClientManager.get_client()
|
|
||||||
|
|
||||||
padding_workflow = hatchet.workflow(
|
|
||||||
name="PaddingWorkflow", input_validator=PaddingInput
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@padding_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
|
|
||||||
async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
|
|
||||||
"""Pad audio track with silence based on WebM container start_time."""
|
|
||||||
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
|
|
||||||
logger.info(
|
|
||||||
"[Hatchet] pad_track",
|
|
||||||
track_index=input.track_index,
|
|
||||||
s3_key=input.s3_key,
|
|
||||||
transcript_id=input.transcript_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Create fresh storage instance to avoid aioboto3 fork issues
|
|
||||||
from reflector.settings import settings # noqa: PLC0415
|
|
||||||
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
|
||||||
|
|
||||||
storage = AwsStorage(
|
|
||||||
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
|
||||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
|
||||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
|
||||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
|
||||||
)
|
|
||||||
|
|
||||||
source_url = await storage.get_file_url(
|
|
||||||
input.s3_key,
|
|
||||||
operation="get_object",
|
|
||||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
|
||||||
bucket=input.bucket_name,
|
|
||||||
)
|
|
||||||
|
|
||||||
with av.open(source_url) as in_container:
|
|
||||||
with av.open(source_url) as in_container:
|
|
||||||
if in_container.duration:
|
|
||||||
try:
|
|
||||||
duration = timedelta(seconds=in_container.duration // 1_000_000)
|
|
||||||
ctx.log(
|
|
||||||
f"pad_track: track {input.track_index}, duration={duration}"
|
|
||||||
)
|
|
||||||
except (ValueError, TypeError, OverflowError) as e:
|
|
||||||
ctx.log(
|
|
||||||
f"pad_track: track {input.track_index}, duration error: {str(e)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
start_time_seconds = extract_stream_start_time_from_container(
|
|
||||||
in_container, input.track_index, logger=logger
|
|
||||||
)
|
|
||||||
|
|
||||||
if start_time_seconds <= 0:
|
|
||||||
logger.info(
|
|
||||||
f"Track {input.track_index} requires no padding",
|
|
||||||
track_index=input.track_index,
|
|
||||||
)
|
|
||||||
return PadTrackResult(
|
|
||||||
padded_key=input.s3_key,
|
|
||||||
bucket_name=input.bucket_name,
|
|
||||||
size=0,
|
|
||||||
track_index=input.track_index,
|
|
||||||
)
|
|
||||||
|
|
||||||
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
|
|
||||||
|
|
||||||
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
|
|
||||||
temp_path = temp_file.name
|
|
||||||
|
|
||||||
try:
|
|
||||||
apply_audio_padding_to_file(
|
|
||||||
in_container,
|
|
||||||
temp_path,
|
|
||||||
start_time_seconds,
|
|
||||||
input.track_index,
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
file_size = Path(temp_path).stat().st_size
|
|
||||||
|
|
||||||
with open(temp_path, "rb") as padded_file:
|
|
||||||
await storage.put_file(storage_path, padded_file)
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f"Uploaded padded track to S3",
|
|
||||||
key=storage_path,
|
|
||||||
size=file_size,
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
Path(temp_path).unlink(missing_ok=True)
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"[Hatchet] pad_track complete",
|
|
||||||
track_index=input.track_index,
|
|
||||||
padded_key=storage_path,
|
|
||||||
)
|
|
||||||
|
|
||||||
return PadTrackResult(
|
|
||||||
padded_key=storage_path,
|
|
||||||
bucket_name=None, # None = use default transcript storage bucket
|
|
||||||
size=file_size,
|
|
||||||
track_index=input.track_index,
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(
|
|
||||||
"[Hatchet] pad_track failed",
|
|
||||||
transcript_id=input.transcript_id,
|
|
||||||
track_index=input.track_index,
|
|
||||||
error=str(e),
|
|
||||||
exc_info=True,
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
229
server/reflector/hatchet/workflows/track_processing.py
Normal file
229
server/reflector/hatchet/workflows/track_processing.py
Normal file
@@ -0,0 +1,229 @@
|
|||||||
|
"""
|
||||||
|
Hatchet child workflow: TrackProcessing
|
||||||
|
|
||||||
|
Handles individual audio track processing: padding and transcription.
|
||||||
|
Spawned dynamically by the main diarization pipeline for each track.
|
||||||
|
|
||||||
|
Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline)
|
||||||
|
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
|
||||||
|
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
|
||||||
|
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
|
||||||
|
|
||||||
|
Note: This file uses deferred imports (inside tasks) intentionally.
|
||||||
|
Hatchet workers run in forked processes; fresh imports per task ensure
|
||||||
|
storage/DB connections are not shared across forks.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import tempfile
|
||||||
|
from datetime import timedelta
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
import av
|
||||||
|
from hatchet_sdk import Context
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from reflector.hatchet.client import HatchetClientManager
|
||||||
|
from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
|
||||||
|
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
|
||||||
|
from reflector.logger import logger
|
||||||
|
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
||||||
|
from reflector.utils.audio_padding import (
|
||||||
|
apply_audio_padding_to_file,
|
||||||
|
extract_stream_start_time_from_container,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TrackInput(BaseModel):
|
||||||
|
"""Input for individual track processing."""
|
||||||
|
|
||||||
|
track_index: int
|
||||||
|
s3_key: str
|
||||||
|
bucket_name: str
|
||||||
|
transcript_id: str
|
||||||
|
language: str = "en"
|
||||||
|
|
||||||
|
|
||||||
|
hatchet = HatchetClientManager.get_client()
|
||||||
|
|
||||||
|
track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput)
|
||||||
|
|
||||||
|
|
||||||
|
@track_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
|
||||||
|
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
||||||
|
"""Pad single audio track with silence for alignment.
|
||||||
|
|
||||||
|
Extracts stream.start_time from WebM container metadata and applies
|
||||||
|
silence padding using PyAV filter graph (adelay).
|
||||||
|
"""
|
||||||
|
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
|
||||||
|
logger.info(
|
||||||
|
"[Hatchet] pad_track",
|
||||||
|
track_index=input.track_index,
|
||||||
|
s3_key=input.s3_key,
|
||||||
|
transcript_id=input.transcript_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Create fresh storage instance to avoid aioboto3 fork issues
|
||||||
|
from reflector.settings import settings # noqa: PLC0415
|
||||||
|
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
||||||
|
|
||||||
|
storage = AwsStorage(
|
||||||
|
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
||||||
|
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
||||||
|
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
||||||
|
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||||
|
)
|
||||||
|
|
||||||
|
source_url = await storage.get_file_url(
|
||||||
|
input.s3_key,
|
||||||
|
operation="get_object",
|
||||||
|
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
||||||
|
bucket=input.bucket_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
with av.open(source_url) as in_container:
|
||||||
|
if in_container.duration:
|
||||||
|
try:
|
||||||
|
duration = timedelta(seconds=in_container.duration // 1_000_000)
|
||||||
|
ctx.log(
|
||||||
|
f"pad_track: track {input.track_index}, duration={duration}"
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
ctx.log(f"pad_track: track {input.track_index}, duration=ERROR")
|
||||||
|
|
||||||
|
start_time_seconds = extract_stream_start_time_from_container(
|
||||||
|
in_container, input.track_index, logger=logger
|
||||||
|
)
|
||||||
|
|
||||||
|
# If no padding needed, return original S3 key
|
||||||
|
if start_time_seconds <= 0:
|
||||||
|
logger.info(
|
||||||
|
f"Track {input.track_index} requires no padding",
|
||||||
|
track_index=input.track_index,
|
||||||
|
)
|
||||||
|
return PadTrackResult(
|
||||||
|
padded_key=input.s3_key,
|
||||||
|
bucket_name=input.bucket_name,
|
||||||
|
size=0,
|
||||||
|
track_index=input.track_index,
|
||||||
|
)
|
||||||
|
|
||||||
|
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
|
||||||
|
temp_path = temp_file.name
|
||||||
|
|
||||||
|
try:
|
||||||
|
apply_audio_padding_to_file(
|
||||||
|
in_container,
|
||||||
|
temp_path,
|
||||||
|
start_time_seconds,
|
||||||
|
input.track_index,
|
||||||
|
logger=logger,
|
||||||
|
)
|
||||||
|
|
||||||
|
file_size = Path(temp_path).stat().st_size
|
||||||
|
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"About to upload padded track",
|
||||||
|
key=storage_path,
|
||||||
|
size=file_size,
|
||||||
|
)
|
||||||
|
|
||||||
|
with open(temp_path, "rb") as padded_file:
|
||||||
|
await storage.put_file(storage_path, padded_file)
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Uploaded padded track to S3",
|
||||||
|
key=storage_path,
|
||||||
|
size=file_size,
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
Path(temp_path).unlink(missing_ok=True)
|
||||||
|
|
||||||
|
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
|
||||||
|
logger.info(
|
||||||
|
"[Hatchet] pad_track complete",
|
||||||
|
track_index=input.track_index,
|
||||||
|
padded_key=storage_path,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Return S3 key (not presigned URL) - consumer tasks presign on demand
|
||||||
|
# This avoids stale URLs when workflow is replayed
|
||||||
|
return PadTrackResult(
|
||||||
|
padded_key=storage_path,
|
||||||
|
bucket_name=None, # None = use default transcript storage bucket
|
||||||
|
size=file_size,
|
||||||
|
track_index=input.track_index,
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("[Hatchet] pad_track failed", error=str(e), exc_info=True)
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
@track_workflow.task(
|
||||||
|
parents=[pad_track], execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3
|
||||||
|
)
|
||||||
|
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
|
||||||
|
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
|
||||||
|
ctx.log(f"transcribe_track: track {input.track_index}, language={input.language}")
|
||||||
|
logger.info(
|
||||||
|
"[Hatchet] transcribe_track",
|
||||||
|
track_index=input.track_index,
|
||||||
|
language=input.language,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
pad_result = ctx.task_output(pad_track)
|
||||||
|
padded_key = pad_result.padded_key
|
||||||
|
bucket_name = pad_result.bucket_name
|
||||||
|
|
||||||
|
if not padded_key:
|
||||||
|
raise ValueError("Missing padded_key from pad_track")
|
||||||
|
|
||||||
|
# Presign URL on demand (avoids stale URLs on workflow replay)
|
||||||
|
from reflector.settings import settings # noqa: PLC0415
|
||||||
|
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
||||||
|
|
||||||
|
storage = AwsStorage(
|
||||||
|
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
||||||
|
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
||||||
|
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
||||||
|
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||||
|
)
|
||||||
|
|
||||||
|
audio_url = await storage.get_file_url(
|
||||||
|
padded_key,
|
||||||
|
operation="get_object",
|
||||||
|
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
||||||
|
bucket=bucket_name,
|
||||||
|
)
|
||||||
|
|
||||||
|
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
|
||||||
|
transcribe_file_with_processor,
|
||||||
|
)
|
||||||
|
|
||||||
|
transcript = await transcribe_file_with_processor(audio_url, input.language)
|
||||||
|
|
||||||
|
# Tag all words with speaker index
|
||||||
|
for word in transcript.words:
|
||||||
|
word.speaker = input.track_index
|
||||||
|
|
||||||
|
ctx.log(
|
||||||
|
f"transcribe_track complete: track {input.track_index}, {len(transcript.words)} words"
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"[Hatchet] transcribe_track complete",
|
||||||
|
track_index=input.track_index,
|
||||||
|
word_count=len(transcript.words),
|
||||||
|
)
|
||||||
|
|
||||||
|
return TranscribeTrackResult(
|
||||||
|
words=transcript.words,
|
||||||
|
track_index=input.track_index,
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("[Hatchet] transcribe_track failed", error=str(e), exc_info=True)
|
||||||
|
raise
|
||||||
@@ -1,98 +0,0 @@
|
|||||||
"""
|
|
||||||
Hatchet child workflow: TranscriptionWorkflow
|
|
||||||
Handles individual audio track transcription only.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from datetime import timedelta
|
|
||||||
|
|
||||||
from hatchet_sdk import Context
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
|
||||||
from reflector.hatchet.constants import TIMEOUT_HEAVY
|
|
||||||
from reflector.hatchet.workflows.models import TranscribeTrackResult
|
|
||||||
from reflector.logger import logger
|
|
||||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
|
||||||
|
|
||||||
|
|
||||||
class TranscriptionInput(BaseModel):
|
|
||||||
"""Input for individual track transcription."""
|
|
||||||
|
|
||||||
track_index: int
|
|
||||||
padded_key: str # S3 key from padding step
|
|
||||||
bucket_name: str | None # None = use default bucket
|
|
||||||
language: str = "en"
|
|
||||||
|
|
||||||
|
|
||||||
hatchet = HatchetClientManager.get_client()
|
|
||||||
|
|
||||||
transcription_workflow = hatchet.workflow(
|
|
||||||
name="TranscriptionWorkflow", input_validator=TranscriptionInput
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@transcription_workflow.task(
|
|
||||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3
|
|
||||||
)
|
|
||||||
async def transcribe_track(
|
|
||||||
input: TranscriptionInput, ctx: Context
|
|
||||||
) -> TranscribeTrackResult:
|
|
||||||
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
|
|
||||||
ctx.log(f"transcribe_track: track {input.track_index}, language={input.language}")
|
|
||||||
logger.info(
|
|
||||||
"[Hatchet] transcribe_track",
|
|
||||||
track_index=input.track_index,
|
|
||||||
language=input.language,
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
from reflector.settings import settings # noqa: PLC0415
|
|
||||||
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
|
||||||
|
|
||||||
storage = AwsStorage(
|
|
||||||
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
|
||||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
|
||||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
|
||||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
|
||||||
)
|
|
||||||
|
|
||||||
audio_url = await storage.get_file_url(
|
|
||||||
input.padded_key,
|
|
||||||
operation="get_object",
|
|
||||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
|
||||||
bucket=input.bucket_name,
|
|
||||||
)
|
|
||||||
|
|
||||||
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
|
|
||||||
transcribe_file_with_processor,
|
|
||||||
)
|
|
||||||
|
|
||||||
transcript = await transcribe_file_with_processor(audio_url, input.language)
|
|
||||||
|
|
||||||
for word in transcript.words:
|
|
||||||
word.speaker = input.track_index
|
|
||||||
|
|
||||||
ctx.log(
|
|
||||||
f"transcribe_track complete: track {input.track_index}, {len(transcript.words)} words"
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"[Hatchet] transcribe_track complete",
|
|
||||||
track_index=input.track_index,
|
|
||||||
word_count=len(transcript.words),
|
|
||||||
)
|
|
||||||
|
|
||||||
return TranscribeTrackResult(
|
|
||||||
words=transcript.words,
|
|
||||||
track_index=input.track_index,
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(
|
|
||||||
"[Hatchet] transcribe_track failed",
|
|
||||||
track_index=input.track_index,
|
|
||||||
padded_key=input.padded_key,
|
|
||||||
language=input.language,
|
|
||||||
error=str(e),
|
|
||||||
exc_info=True,
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
@@ -129,6 +129,10 @@ class DailyClient(VideoPlatformClient):
|
|||||||
"""Get room presence/session data for a Daily.co room."""
|
"""Get room presence/session data for a Daily.co room."""
|
||||||
return await self._api_client.get_room_presence(room_name)
|
return await self._api_client.get_room_presence(room_name)
|
||||||
|
|
||||||
|
async def delete_room(self, room_name: str) -> None:
|
||||||
|
"""Delete a Daily.co room (idempotent - succeeds even if room doesn't exist)."""
|
||||||
|
return await self._api_client.delete_room(room_name)
|
||||||
|
|
||||||
async def get_meeting_participants(
|
async def get_meeting_participants(
|
||||||
self, meeting_id: str
|
self, meeting_id: str
|
||||||
) -> MeetingParticipantsResponse:
|
) -> MeetingParticipantsResponse:
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ from reflector.services.ics_sync import ics_sync_service
|
|||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
from reflector.utils.url import add_query_param
|
from reflector.utils.url import add_query_param
|
||||||
from reflector.video_platforms.factory import create_platform_client
|
from reflector.video_platforms.factory import create_platform_client
|
||||||
|
from reflector.worker.process import poll_daily_room_presence_task
|
||||||
from reflector.worker.webhook import test_webhook
|
from reflector.worker.webhook import test_webhook
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -365,6 +366,53 @@ async def rooms_create_meeting(
|
|||||||
return meeting
|
return meeting
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/rooms/{room_name}/meetings/{meeting_id}/joined")
|
||||||
|
async def rooms_joined_meeting(
|
||||||
|
room_name: str,
|
||||||
|
meeting_id: str,
|
||||||
|
):
|
||||||
|
"""Trigger presence poll (ideally when user actually joins meeting in Daily iframe)"""
|
||||||
|
room = await rooms_controller.get_by_name(room_name)
|
||||||
|
if not room:
|
||||||
|
raise HTTPException(status_code=404, detail="Room not found")
|
||||||
|
|
||||||
|
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
|
||||||
|
if not meeting:
|
||||||
|
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||||
|
|
||||||
|
if meeting.platform == "daily":
|
||||||
|
poll_daily_room_presence_task.delay(meeting_id)
|
||||||
|
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/rooms/{room_name}/meetings/{meeting_id}/leave")
|
||||||
|
async def rooms_leave_meeting(
|
||||||
|
room_name: str,
|
||||||
|
meeting_id: str,
|
||||||
|
delay_seconds: int = 2,
|
||||||
|
):
|
||||||
|
"""Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
|
||||||
|
|
||||||
|
Queues presence poll with optional delay to allow Daily.co to detect disconnect.
|
||||||
|
"""
|
||||||
|
room = await rooms_controller.get_by_name(room_name)
|
||||||
|
if not room:
|
||||||
|
raise HTTPException(status_code=404, detail="Room not found")
|
||||||
|
|
||||||
|
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
|
||||||
|
if not meeting:
|
||||||
|
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||||
|
|
||||||
|
if meeting.platform == "daily":
|
||||||
|
poll_daily_room_presence_task.apply_async(
|
||||||
|
args=[meeting_id],
|
||||||
|
countdown=delay_seconds,
|
||||||
|
)
|
||||||
|
|
||||||
|
return {"status": "ok"}
|
||||||
|
|
||||||
|
|
||||||
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
|
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
|
||||||
async def rooms_test_webhook(
|
async def rooms_test_webhook(
|
||||||
room_id: str,
|
room_id: str,
|
||||||
|
|||||||
@@ -845,15 +845,47 @@ async def process_meetings():
|
|||||||
end_date = end_date.replace(tzinfo=timezone.utc)
|
end_date = end_date.replace(tzinfo=timezone.utc)
|
||||||
|
|
||||||
client = create_platform_client(meeting.platform)
|
client = create_platform_client(meeting.platform)
|
||||||
room_sessions = await client.get_room_sessions(meeting.room_name)
|
has_active_sessions = False
|
||||||
|
has_had_sessions = False
|
||||||
|
|
||||||
has_active_sessions = bool(
|
if meeting.platform == "daily":
|
||||||
room_sessions and any(s.ended_at is None for s in room_sessions)
|
try:
|
||||||
)
|
presence = await client.get_room_presence(meeting.room_name)
|
||||||
has_had_sessions = bool(room_sessions)
|
has_active_sessions = presence.total_count > 0
|
||||||
logger_.info(
|
|
||||||
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
|
room_sessions = await client.get_room_sessions(
|
||||||
)
|
meeting.room_name
|
||||||
|
)
|
||||||
|
has_had_sessions = bool(room_sessions)
|
||||||
|
|
||||||
|
logger_.info(
|
||||||
|
"Daily.co presence check",
|
||||||
|
has_active_sessions=has_active_sessions,
|
||||||
|
has_had_sessions=has_had_sessions,
|
||||||
|
presence_count=presence.total_count,
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger_.warning(
|
||||||
|
"Daily.co presence API failed, falling back to DB sessions",
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
room_sessions = await client.get_room_sessions(
|
||||||
|
meeting.room_name
|
||||||
|
)
|
||||||
|
has_active_sessions = bool(
|
||||||
|
room_sessions
|
||||||
|
and any(s.ended_at is None for s in room_sessions)
|
||||||
|
)
|
||||||
|
has_had_sessions = bool(room_sessions)
|
||||||
|
else:
|
||||||
|
room_sessions = await client.get_room_sessions(meeting.room_name)
|
||||||
|
has_active_sessions = bool(
|
||||||
|
room_sessions and any(s.ended_at is None for s in room_sessions)
|
||||||
|
)
|
||||||
|
has_had_sessions = bool(room_sessions)
|
||||||
|
logger_.info(
|
||||||
|
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
|
||||||
|
)
|
||||||
|
|
||||||
if has_active_sessions:
|
if has_active_sessions:
|
||||||
logger_.debug("Meeting still has active sessions, keep it")
|
logger_.debug("Meeting still has active sessions, keep it")
|
||||||
@@ -872,7 +904,20 @@ async def process_meetings():
|
|||||||
await meetings_controller.update_meeting(
|
await meetings_controller.update_meeting(
|
||||||
meeting.id, is_active=False
|
meeting.id, is_active=False
|
||||||
)
|
)
|
||||||
logger_.info("Meeting is deactivated")
|
logger_.info("Meeting deactivated in database")
|
||||||
|
|
||||||
|
if meeting.platform == "daily":
|
||||||
|
try:
|
||||||
|
await client.delete_room(meeting.room_name)
|
||||||
|
logger_.info(
|
||||||
|
"Daily.co room deleted", room_name=meeting.room_name
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
logger_.warning(
|
||||||
|
"Failed to delete Daily.co room",
|
||||||
|
room_name=meeting.room_name,
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
|
||||||
processed_count += 1
|
processed_count += 1
|
||||||
|
|
||||||
|
|||||||
286
server/tests/test_daily_presence_deactivation.py
Normal file
286
server/tests/test_daily_presence_deactivation.py
Normal file
@@ -0,0 +1,286 @@
|
|||||||
|
"""Unit tests for Daily.co presence-based meeting deactivation logic.
|
||||||
|
|
||||||
|
Tests the fix for split room race condition by verifying:
|
||||||
|
1. Real-time presence checking via Daily.co API
|
||||||
|
2. Room deletion when meetings deactivate
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
from unittest.mock import AsyncMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from reflector.dailyco_api.responses import (
|
||||||
|
RoomPresenceParticipant,
|
||||||
|
RoomPresenceResponse,
|
||||||
|
)
|
||||||
|
from reflector.db.daily_participant_sessions import (
|
||||||
|
DailyParticipantSession,
|
||||||
|
daily_participant_sessions_controller,
|
||||||
|
)
|
||||||
|
from reflector.db.meetings import meetings_controller
|
||||||
|
from reflector.db.rooms import rooms_controller
|
||||||
|
from reflector.video_platforms.daily import DailyClient
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
async def daily_room_and_meeting():
|
||||||
|
"""Create test room and meeting for Daily platform."""
|
||||||
|
room = await rooms_controller.add(
|
||||||
|
name="test-daily",
|
||||||
|
user_id="test-user",
|
||||||
|
platform="daily",
|
||||||
|
zulip_auto_post=False,
|
||||||
|
zulip_stream="",
|
||||||
|
zulip_topic="",
|
||||||
|
is_locked=False,
|
||||||
|
room_mode="normal",
|
||||||
|
recording_type="cloud",
|
||||||
|
recording_trigger="automatic-2nd-participant",
|
||||||
|
is_shared=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
current_time = datetime.now(timezone.utc)
|
||||||
|
end_time = current_time + timedelta(hours=2)
|
||||||
|
|
||||||
|
meeting = await meetings_controller.create(
|
||||||
|
id="test-meeting-id",
|
||||||
|
room_name="test-daily-20260129120000",
|
||||||
|
room_url="https://daily.co/test",
|
||||||
|
host_room_url="https://daily.co/test",
|
||||||
|
start_date=current_time,
|
||||||
|
end_date=end_time,
|
||||||
|
room=room,
|
||||||
|
)
|
||||||
|
|
||||||
|
return room, meeting
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_daily_client_has_delete_room_method():
|
||||||
|
"""Verify DailyClient has delete_room method for cleanup."""
|
||||||
|
# Create a mock DailyClient
|
||||||
|
with patch("reflector.dailyco_api.client.DailyApiClient"):
|
||||||
|
from reflector.video_platforms.models import VideoPlatformConfig
|
||||||
|
|
||||||
|
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
|
||||||
|
client = DailyClient(config)
|
||||||
|
|
||||||
|
# Verify delete_room method exists
|
||||||
|
assert hasattr(client, "delete_room")
|
||||||
|
assert callable(getattr(client, "delete_room"))
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_get_room_presence_returns_realtime_data(daily_room_and_meeting):
|
||||||
|
"""Test that get_room_presence returns real-time participant data."""
|
||||||
|
room, meeting = daily_room_and_meeting
|
||||||
|
|
||||||
|
# Mock Daily.co API response
|
||||||
|
mock_presence = RoomPresenceResponse(
|
||||||
|
total_count=2,
|
||||||
|
data=[
|
||||||
|
RoomPresenceParticipant(
|
||||||
|
room=meeting.room_name,
|
||||||
|
id="session-1",
|
||||||
|
userId="user-1",
|
||||||
|
userName="User One",
|
||||||
|
joinTime="2026-01-29T12:00:00.000Z",
|
||||||
|
duration=120,
|
||||||
|
),
|
||||||
|
RoomPresenceParticipant(
|
||||||
|
room=meeting.room_name,
|
||||||
|
id="session-2",
|
||||||
|
userId="user-2",
|
||||||
|
userName="User Two",
|
||||||
|
joinTime="2026-01-29T12:05:00.000Z",
|
||||||
|
duration=60,
|
||||||
|
),
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch("reflector.dailyco_api.client.DailyApiClient") as mock_api:
|
||||||
|
from reflector.video_platforms.models import VideoPlatformConfig
|
||||||
|
|
||||||
|
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
|
||||||
|
client = DailyClient(config)
|
||||||
|
|
||||||
|
# Mock the API client method
|
||||||
|
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
|
||||||
|
|
||||||
|
# Call get_room_presence
|
||||||
|
result = await client.get_room_presence(meeting.room_name)
|
||||||
|
|
||||||
|
# Verify it calls Daily.co API
|
||||||
|
client._api_client.get_room_presence.assert_called_once_with(meeting.room_name)
|
||||||
|
|
||||||
|
# Verify result contains real-time data
|
||||||
|
assert result.total_count == 2
|
||||||
|
assert len(result.data) == 2
|
||||||
|
assert result.data[0].id == "session-1"
|
||||||
|
assert result.data[1].id == "session-2"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_presence_shows_active_even_when_db_stale(daily_room_and_meeting):
|
||||||
|
"""Test that Daily.co presence API is source of truth, not stale DB sessions."""
|
||||||
|
room, meeting = daily_room_and_meeting
|
||||||
|
current_time = datetime.now(timezone.utc)
|
||||||
|
|
||||||
|
# Create stale DB session (left_at=NULL but user actually left)
|
||||||
|
session_id = f"{meeting.id}:stale-user:{int((current_time - timedelta(minutes=5)).timestamp() * 1000)}"
|
||||||
|
await daily_participant_sessions_controller.upsert_joined(
|
||||||
|
DailyParticipantSession(
|
||||||
|
id=session_id,
|
||||||
|
meeting_id=meeting.id,
|
||||||
|
room_id=room.id,
|
||||||
|
session_id="stale-daily-session",
|
||||||
|
user_name="Stale User",
|
||||||
|
user_id="stale-user",
|
||||||
|
joined_at=current_time - timedelta(minutes=5),
|
||||||
|
left_at=None, # Stale - shows active but user left
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Verify DB shows active session
|
||||||
|
db_sessions = await daily_participant_sessions_controller.get_active_by_meeting(
|
||||||
|
meeting.id
|
||||||
|
)
|
||||||
|
assert len(db_sessions) == 1
|
||||||
|
|
||||||
|
# But Daily.co API shows room is empty
|
||||||
|
mock_presence = RoomPresenceResponse(total_count=0, data=[])
|
||||||
|
|
||||||
|
with patch("reflector.dailyco_api.client.DailyApiClient"):
|
||||||
|
from reflector.video_platforms.models import VideoPlatformConfig
|
||||||
|
|
||||||
|
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
|
||||||
|
client = DailyClient(config)
|
||||||
|
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
|
||||||
|
|
||||||
|
# Get real-time presence
|
||||||
|
presence = await client.get_room_presence(meeting.room_name)
|
||||||
|
|
||||||
|
# Real-time API shows no participants (truth)
|
||||||
|
assert presence.total_count == 0
|
||||||
|
assert len(presence.data) == 0
|
||||||
|
|
||||||
|
# DB shows 1 participant (stale)
|
||||||
|
assert len(db_sessions) == 1
|
||||||
|
|
||||||
|
# Implementation should trust presence API, not DB
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_meeting_deactivation_logic_with_presence_empty():
|
||||||
|
"""Test the core deactivation decision logic when presence shows room empty."""
|
||||||
|
# This tests the logic that will be in process_meetings
|
||||||
|
|
||||||
|
# Simulate: DB shows stale active session
|
||||||
|
has_active_db_sessions = True # DB is stale
|
||||||
|
|
||||||
|
# Simulate: Daily.co presence API shows room empty
|
||||||
|
presence_count = 0 # Real-time truth
|
||||||
|
|
||||||
|
# Simulate: Meeting has been used before
|
||||||
|
has_had_sessions = True
|
||||||
|
|
||||||
|
# Decision logic (what process_meetings should do):
|
||||||
|
# - If presence API available: trust it
|
||||||
|
# - If presence shows empty AND has_had_sessions: deactivate
|
||||||
|
|
||||||
|
if presence_count == 0 and has_had_sessions:
|
||||||
|
should_deactivate = True
|
||||||
|
else:
|
||||||
|
should_deactivate = False
|
||||||
|
|
||||||
|
assert should_deactivate is True # Should deactivate despite stale DB
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_meeting_deactivation_logic_with_presence_active():
|
||||||
|
"""Test that meetings stay active when presence shows participants."""
|
||||||
|
# Simulate: DB shows no sessions (not yet updated)
|
||||||
|
has_active_db_sessions = False # DB hasn't caught up
|
||||||
|
|
||||||
|
# Simulate: Daily.co presence API shows active participant
|
||||||
|
presence_count = 1 # Real-time truth
|
||||||
|
|
||||||
|
# Decision logic: presence shows activity, keep meeting active
|
||||||
|
if presence_count > 0:
|
||||||
|
should_deactivate = False
|
||||||
|
else:
|
||||||
|
should_deactivate = True
|
||||||
|
|
||||||
|
assert should_deactivate is False # Should stay active
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_delete_room_called_on_deactivation(daily_room_and_meeting):
|
||||||
|
"""Test that Daily.co room is deleted when meeting deactivates."""
|
||||||
|
room, meeting = daily_room_and_meeting
|
||||||
|
|
||||||
|
with patch("reflector.dailyco_api.client.DailyApiClient"):
|
||||||
|
from reflector.video_platforms.models import VideoPlatformConfig
|
||||||
|
|
||||||
|
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
|
||||||
|
client = DailyClient(config)
|
||||||
|
|
||||||
|
# Mock delete_room API call
|
||||||
|
client._api_client.delete_room = AsyncMock()
|
||||||
|
|
||||||
|
# Simulate deactivation - should delete room
|
||||||
|
await client._api_client.delete_room(meeting.room_name)
|
||||||
|
|
||||||
|
# Verify delete was called
|
||||||
|
client._api_client.delete_room.assert_called_once_with(meeting.room_name)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_delete_room_idempotent_on_404():
|
||||||
|
"""Test that room deletion is idempotent (succeeds even if room doesn't exist)."""
|
||||||
|
from reflector.dailyco_api.client import DailyApiClient
|
||||||
|
|
||||||
|
# Create real client to test delete_room logic
|
||||||
|
client = DailyApiClient(api_key="test-key")
|
||||||
|
|
||||||
|
# Mock the HTTP client
|
||||||
|
mock_http_client = AsyncMock()
|
||||||
|
mock_response = AsyncMock()
|
||||||
|
mock_response.status_code = 404 # Room not found
|
||||||
|
mock_http_client.delete = AsyncMock(return_value=mock_response)
|
||||||
|
|
||||||
|
# Mock _get_client to return our mock
|
||||||
|
async def mock_get_client():
|
||||||
|
return mock_http_client
|
||||||
|
|
||||||
|
client._get_client = mock_get_client
|
||||||
|
|
||||||
|
# delete_room should succeed even on 404 (idempotent)
|
||||||
|
await client.delete_room("nonexistent-room")
|
||||||
|
|
||||||
|
# Verify delete was attempted
|
||||||
|
mock_http_client.delete.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_api_failure_fallback_to_db_sessions():
|
||||||
|
"""Test that system falls back to DB sessions if Daily.co API fails."""
|
||||||
|
# Simulate: Daily.co API throws exception
|
||||||
|
api_exception = Exception("API unavailable")
|
||||||
|
|
||||||
|
# Simulate: DB shows active session
|
||||||
|
has_active_db_sessions = True
|
||||||
|
|
||||||
|
# Decision logic with fallback:
|
||||||
|
try:
|
||||||
|
presence_count = None
|
||||||
|
raise api_exception # Simulating API failure
|
||||||
|
except Exception:
|
||||||
|
# Fallback: use DB sessions (conservative - don't deactivate if unsure)
|
||||||
|
if has_active_db_sessions:
|
||||||
|
should_deactivate = False
|
||||||
|
else:
|
||||||
|
should_deactivate = True
|
||||||
|
|
||||||
|
assert should_deactivate is False # Conservative: keep active on API failure
|
||||||
@@ -302,10 +302,10 @@ export default function RoomsList() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const platform: "whereby" | "daily" =
|
const platform: "whereby" | "daily" | null =
|
||||||
room.platform === "whereby" || room.platform === "daily"
|
room.platform === "whereby" || room.platform === "daily"
|
||||||
? room.platform
|
? room.platform
|
||||||
: "daily";
|
: null;
|
||||||
|
|
||||||
const roomData = {
|
const roomData = {
|
||||||
name: room.name,
|
name: room.name,
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ import {
|
|||||||
import { useError } from "../../../../(errors)/errorContext";
|
import { useError } from "../../../../(errors)/errorContext";
|
||||||
import { useRouter } from "next/navigation";
|
import { useRouter } from "next/navigation";
|
||||||
import { Box, Grid } from "@chakra-ui/react";
|
import { Box, Grid } from "@chakra-ui/react";
|
||||||
import { parseNonEmptyString } from "../../../../lib/utils";
|
|
||||||
|
|
||||||
export type TranscriptCorrect = {
|
export type TranscriptCorrect = {
|
||||||
params: Promise<{
|
params: Promise<{
|
||||||
@@ -26,7 +25,8 @@ export type TranscriptCorrect = {
|
|||||||
|
|
||||||
export default function TranscriptCorrect(props: TranscriptCorrect) {
|
export default function TranscriptCorrect(props: TranscriptCorrect) {
|
||||||
const params = use(props.params);
|
const params = use(props.params);
|
||||||
const transcriptId = parseNonEmptyString(params.transcriptId);
|
|
||||||
|
const { transcriptId } = params;
|
||||||
|
|
||||||
const updateTranscriptMutation = useTranscriptUpdate();
|
const updateTranscriptMutation = useTranscriptUpdate();
|
||||||
const transcript = useTranscriptGet(transcriptId);
|
const transcript = useTranscriptGet(transcriptId);
|
||||||
|
|||||||
@@ -9,9 +9,7 @@ import React, { useEffect, useState, use } from "react";
|
|||||||
import FinalSummary from "./finalSummary";
|
import FinalSummary from "./finalSummary";
|
||||||
import TranscriptTitle from "../transcriptTitle";
|
import TranscriptTitle from "../transcriptTitle";
|
||||||
import Player from "../player";
|
import Player from "../player";
|
||||||
import { useWebSockets } from "../useWebSockets";
|
|
||||||
import { useRouter } from "next/navigation";
|
import { useRouter } from "next/navigation";
|
||||||
import { parseNonEmptyString } from "../../../lib/utils";
|
|
||||||
import {
|
import {
|
||||||
Box,
|
Box,
|
||||||
Flex,
|
Flex,
|
||||||
@@ -32,7 +30,7 @@ type TranscriptDetails = {
|
|||||||
|
|
||||||
export default function TranscriptDetails(details: TranscriptDetails) {
|
export default function TranscriptDetails(details: TranscriptDetails) {
|
||||||
const params = use(details.params);
|
const params = use(details.params);
|
||||||
const transcriptId = parseNonEmptyString(params.transcriptId);
|
const transcriptId = params.transcriptId;
|
||||||
const router = useRouter();
|
const router = useRouter();
|
||||||
const statusToRedirect = [
|
const statusToRedirect = [
|
||||||
"idle",
|
"idle",
|
||||||
@@ -51,7 +49,6 @@ export default function TranscriptDetails(details: TranscriptDetails) {
|
|||||||
transcriptId,
|
transcriptId,
|
||||||
waiting || mp3.audioDeleted === true,
|
waiting || mp3.audioDeleted === true,
|
||||||
);
|
);
|
||||||
useWebSockets(transcriptId);
|
|
||||||
const useActiveTopic = useState<Topic | null>(null);
|
const useActiveTopic = useState<Topic | null>(null);
|
||||||
const [finalSummaryElement, setFinalSummaryElement] =
|
const [finalSummaryElement, setFinalSummaryElement] =
|
||||||
useState<HTMLDivElement | null>(null);
|
useState<HTMLDivElement | null>(null);
|
||||||
|
|||||||
@@ -10,7 +10,6 @@ import {
|
|||||||
} from "@chakra-ui/react";
|
} from "@chakra-ui/react";
|
||||||
import { useRouter } from "next/navigation";
|
import { useRouter } from "next/navigation";
|
||||||
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
||||||
import { parseNonEmptyString } from "../../../../lib/utils";
|
|
||||||
|
|
||||||
type TranscriptProcessing = {
|
type TranscriptProcessing = {
|
||||||
params: Promise<{
|
params: Promise<{
|
||||||
@@ -20,7 +19,7 @@ type TranscriptProcessing = {
|
|||||||
|
|
||||||
export default function TranscriptProcessing(details: TranscriptProcessing) {
|
export default function TranscriptProcessing(details: TranscriptProcessing) {
|
||||||
const params = use(details.params);
|
const params = use(details.params);
|
||||||
const transcriptId = parseNonEmptyString(params.transcriptId);
|
const transcriptId = params.transcriptId;
|
||||||
const router = useRouter();
|
const router = useRouter();
|
||||||
|
|
||||||
const transcript = useTranscriptGet(transcriptId);
|
const transcript = useTranscriptGet(transcriptId);
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ import { Box, Text, Grid, Heading, VStack, Flex } from "@chakra-ui/react";
|
|||||||
import LiveTrancription from "../../liveTranscription";
|
import LiveTrancription from "../../liveTranscription";
|
||||||
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
||||||
import { TranscriptStatus } from "../../../../lib/transcript";
|
import { TranscriptStatus } from "../../../../lib/transcript";
|
||||||
import { parseNonEmptyString } from "../../../../lib/utils";
|
|
||||||
|
|
||||||
type TranscriptDetails = {
|
type TranscriptDetails = {
|
||||||
params: Promise<{
|
params: Promise<{
|
||||||
@@ -22,14 +21,13 @@ type TranscriptDetails = {
|
|||||||
|
|
||||||
const TranscriptRecord = (details: TranscriptDetails) => {
|
const TranscriptRecord = (details: TranscriptDetails) => {
|
||||||
const params = use(details.params);
|
const params = use(details.params);
|
||||||
const transcriptId = parseNonEmptyString(params.transcriptId);
|
const transcript = useTranscriptGet(params.transcriptId);
|
||||||
const transcript = useTranscriptGet(transcriptId);
|
|
||||||
const [transcriptStarted, setTranscriptStarted] = useState(false);
|
const [transcriptStarted, setTranscriptStarted] = useState(false);
|
||||||
const useActiveTopic = useState<Topic | null>(null);
|
const useActiveTopic = useState<Topic | null>(null);
|
||||||
|
|
||||||
const webSockets = useWebSockets(transcriptId);
|
const webSockets = useWebSockets(params.transcriptId);
|
||||||
|
|
||||||
const mp3 = useMp3(transcriptId, true);
|
const mp3 = useMp3(params.transcriptId, true);
|
||||||
|
|
||||||
const router = useRouter();
|
const router = useRouter();
|
||||||
|
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import useMp3 from "../../useMp3";
|
|||||||
import { Center, VStack, Text, Heading } from "@chakra-ui/react";
|
import { Center, VStack, Text, Heading } from "@chakra-ui/react";
|
||||||
import FileUploadButton from "../../fileUploadButton";
|
import FileUploadButton from "../../fileUploadButton";
|
||||||
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
||||||
import { parseNonEmptyString } from "../../../../lib/utils";
|
|
||||||
|
|
||||||
type TranscriptUpload = {
|
type TranscriptUpload = {
|
||||||
params: Promise<{
|
params: Promise<{
|
||||||
@@ -17,13 +16,12 @@ type TranscriptUpload = {
|
|||||||
|
|
||||||
const TranscriptUpload = (details: TranscriptUpload) => {
|
const TranscriptUpload = (details: TranscriptUpload) => {
|
||||||
const params = use(details.params);
|
const params = use(details.params);
|
||||||
const transcriptId = parseNonEmptyString(params.transcriptId);
|
const transcript = useTranscriptGet(params.transcriptId);
|
||||||
const transcript = useTranscriptGet(transcriptId);
|
|
||||||
const [transcriptStarted, setTranscriptStarted] = useState(false);
|
const [transcriptStarted, setTranscriptStarted] = useState(false);
|
||||||
|
|
||||||
const webSockets = useWebSockets(transcriptId);
|
const webSockets = useWebSockets(params.transcriptId);
|
||||||
|
|
||||||
const mp3 = useMp3(transcriptId, true);
|
const mp3 = useMp3(params.transcriptId, true);
|
||||||
|
|
||||||
const router = useRouter();
|
const router = useRouter();
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import { useState } from "react";
|
import { useState } from "react";
|
||||||
import type { components } from "../../reflector-api";
|
import type { components } from "../../reflector-api";
|
||||||
import { parseMaybeNonEmptyString } from "../../lib/utils";
|
|
||||||
|
|
||||||
type UpdateTranscript = components["schemas"]["UpdateTranscript"];
|
type UpdateTranscript = components["schemas"]["UpdateTranscript"];
|
||||||
type GetTranscriptWithParticipants =
|
type GetTranscriptWithParticipants =
|
||||||
@@ -33,7 +32,7 @@ const TranscriptTitle = (props: TranscriptTitle) => {
|
|||||||
const [isEditing, setIsEditing] = useState(false);
|
const [isEditing, setIsEditing] = useState(false);
|
||||||
const updateTranscriptMutation = useTranscriptUpdate();
|
const updateTranscriptMutation = useTranscriptUpdate();
|
||||||
const participantsQuery = useTranscriptParticipants(
|
const participantsQuery = useTranscriptParticipants(
|
||||||
props.transcript?.id ? parseMaybeNonEmptyString(props.transcript.id) : null,
|
props.transcript?.id || null,
|
||||||
);
|
);
|
||||||
|
|
||||||
const updateTitle = async (newTitle: string, transcriptId: string) => {
|
const updateTitle = async (newTitle: string, transcriptId: string) => {
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import { useEffect, useState } from "react";
|
import { useEffect, useState } from "react";
|
||||||
import { useTranscriptGet } from "../../lib/apiHooks";
|
import { useTranscriptGet } from "../../lib/apiHooks";
|
||||||
import { parseMaybeNonEmptyString } from "../../lib/utils";
|
|
||||||
import { useAuth } from "../../lib/AuthProvider";
|
import { useAuth } from "../../lib/AuthProvider";
|
||||||
import { API_URL } from "../../lib/apiClient";
|
import { API_URL } from "../../lib/apiClient";
|
||||||
|
|
||||||
@@ -28,7 +27,7 @@ const useMp3 = (transcriptId: string, waiting?: boolean): Mp3Response => {
|
|||||||
data: transcript,
|
data: transcript,
|
||||||
isLoading: transcriptMetadataLoading,
|
isLoading: transcriptMetadataLoading,
|
||||||
error: transcriptError,
|
error: transcriptError,
|
||||||
} = useTranscriptGet(later ? null : parseMaybeNonEmptyString(transcriptId));
|
} = useTranscriptGet(later ? null : transcriptId);
|
||||||
|
|
||||||
const [serviceWorker, setServiceWorker] =
|
const [serviceWorker, setServiceWorker] =
|
||||||
useState<ServiceWorkerRegistration | null>(null);
|
useState<ServiceWorkerRegistration | null>(null);
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
import type { components } from "../../reflector-api";
|
import type { components } from "../../reflector-api";
|
||||||
type Participant = components["schemas"]["Participant"];
|
type Participant = components["schemas"]["Participant"];
|
||||||
import { useTranscriptParticipants } from "../../lib/apiHooks";
|
import { useTranscriptParticipants } from "../../lib/apiHooks";
|
||||||
import { parseMaybeNonEmptyString } from "../../lib/utils";
|
|
||||||
|
|
||||||
type ErrorParticipants = {
|
type ErrorParticipants = {
|
||||||
error: Error;
|
error: Error;
|
||||||
@@ -33,7 +32,7 @@ const useParticipants = (transcriptId: string): UseParticipants => {
|
|||||||
isLoading: loading,
|
isLoading: loading,
|
||||||
error,
|
error,
|
||||||
refetch,
|
refetch,
|
||||||
} = useTranscriptParticipants(parseMaybeNonEmptyString(transcriptId));
|
} = useTranscriptParticipants(transcriptId || null);
|
||||||
|
|
||||||
// Type-safe return based on state
|
// Type-safe return based on state
|
||||||
if (error) {
|
if (error) {
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import type { components } from "../../reflector-api";
|
import type { components } from "../../reflector-api";
|
||||||
import { useTranscriptTopicsWithWordsPerSpeaker } from "../../lib/apiHooks";
|
import { useTranscriptTopicsWithWordsPerSpeaker } from "../../lib/apiHooks";
|
||||||
import { parseMaybeNonEmptyString } from "../../lib/utils";
|
|
||||||
|
|
||||||
type GetTranscriptTopicWithWordsPerSpeaker =
|
type GetTranscriptTopicWithWordsPerSpeaker =
|
||||||
components["schemas"]["GetTranscriptTopicWithWordsPerSpeaker"];
|
components["schemas"]["GetTranscriptTopicWithWordsPerSpeaker"];
|
||||||
@@ -39,7 +38,7 @@ const useTopicWithWords = (
|
|||||||
error,
|
error,
|
||||||
refetch,
|
refetch,
|
||||||
} = useTranscriptTopicsWithWordsPerSpeaker(
|
} = useTranscriptTopicsWithWordsPerSpeaker(
|
||||||
parseMaybeNonEmptyString(transcriptId),
|
transcriptId || null,
|
||||||
topicId || null,
|
topicId || null,
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import { useTranscriptTopics } from "../../lib/apiHooks";
|
import { useTranscriptTopics } from "../../lib/apiHooks";
|
||||||
import type { components } from "../../reflector-api";
|
import type { components } from "../../reflector-api";
|
||||||
import { parseMaybeNonEmptyString } from "../../lib/utils";
|
|
||||||
|
|
||||||
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
|
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
|
||||||
|
|
||||||
@@ -11,11 +10,7 @@ type TranscriptTopics = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
const useTopics = (id: string): TranscriptTopics => {
|
const useTopics = (id: string): TranscriptTopics => {
|
||||||
const {
|
const { data: topics, isLoading: loading, error } = useTranscriptTopics(id);
|
||||||
data: topics,
|
|
||||||
isLoading: loading,
|
|
||||||
error,
|
|
||||||
} = useTranscriptTopics(parseMaybeNonEmptyString(id));
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
topics: topics || null,
|
topics: topics || null,
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import type { components } from "../../reflector-api";
|
import type { components } from "../../reflector-api";
|
||||||
import { useTranscriptWaveform } from "../../lib/apiHooks";
|
import { useTranscriptWaveform } from "../../lib/apiHooks";
|
||||||
import { parseMaybeNonEmptyString } from "../../lib/utils";
|
|
||||||
|
|
||||||
type AudioWaveform = components["schemas"]["AudioWaveform"];
|
type AudioWaveform = components["schemas"]["AudioWaveform"];
|
||||||
|
|
||||||
@@ -15,7 +14,7 @@ const useWaveform = (id: string, skip: boolean): AudioWaveFormResponse => {
|
|||||||
data: waveform,
|
data: waveform,
|
||||||
isLoading: loading,
|
isLoading: loading,
|
||||||
error,
|
error,
|
||||||
} = useTranscriptWaveform(skip ? null : parseMaybeNonEmptyString(id));
|
} = useTranscriptWaveform(skip ? null : id);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
waveform: waveform || null,
|
waveform: waveform || null,
|
||||||
|
|||||||
@@ -7,12 +7,6 @@ type GetTranscriptSegmentTopic =
|
|||||||
components["schemas"]["GetTranscriptSegmentTopic"];
|
components["schemas"]["GetTranscriptSegmentTopic"];
|
||||||
import { useQueryClient } from "@tanstack/react-query";
|
import { useQueryClient } from "@tanstack/react-query";
|
||||||
import { $api, WEBSOCKET_URL } from "../../lib/apiClient";
|
import { $api, WEBSOCKET_URL } from "../../lib/apiClient";
|
||||||
import {
|
|
||||||
invalidateTranscript,
|
|
||||||
invalidateTranscriptTopics,
|
|
||||||
invalidateTranscriptWaveform,
|
|
||||||
} from "../../lib/apiHooks";
|
|
||||||
import { NonEmptyString } from "../../lib/utils";
|
|
||||||
|
|
||||||
export type UseWebSockets = {
|
export type UseWebSockets = {
|
||||||
transcriptTextLive: string;
|
transcriptTextLive: string;
|
||||||
@@ -375,10 +369,15 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
|
|||||||
});
|
});
|
||||||
console.debug("TOPIC event:", message.data);
|
console.debug("TOPIC event:", message.data);
|
||||||
// Invalidate topics query to sync with WebSocket data
|
// Invalidate topics query to sync with WebSocket data
|
||||||
invalidateTranscriptTopics(
|
queryClient.invalidateQueries({
|
||||||
queryClient,
|
queryKey: $api.queryOptions(
|
||||||
transcriptId as NonEmptyString,
|
"get",
|
||||||
);
|
"/v1/transcripts/{transcript_id}/topics",
|
||||||
|
{
|
||||||
|
params: { path: { transcript_id: transcriptId } },
|
||||||
|
},
|
||||||
|
).queryKey,
|
||||||
|
});
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case "FINAL_SHORT_SUMMARY":
|
case "FINAL_SHORT_SUMMARY":
|
||||||
@@ -389,7 +388,15 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
|
|||||||
if (message.data) {
|
if (message.data) {
|
||||||
setFinalSummary(message.data);
|
setFinalSummary(message.data);
|
||||||
// Invalidate transcript query to sync summary
|
// Invalidate transcript query to sync summary
|
||||||
invalidateTranscript(queryClient, transcriptId as NonEmptyString);
|
queryClient.invalidateQueries({
|
||||||
|
queryKey: $api.queryOptions(
|
||||||
|
"get",
|
||||||
|
"/v1/transcripts/{transcript_id}",
|
||||||
|
{
|
||||||
|
params: { path: { transcript_id: transcriptId } },
|
||||||
|
},
|
||||||
|
).queryKey,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@@ -398,7 +405,15 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
|
|||||||
if (message.data) {
|
if (message.data) {
|
||||||
setTitle(message.data.title);
|
setTitle(message.data.title);
|
||||||
// Invalidate transcript query to sync title
|
// Invalidate transcript query to sync title
|
||||||
invalidateTranscript(queryClient, transcriptId as NonEmptyString);
|
queryClient.invalidateQueries({
|
||||||
|
queryKey: $api.queryOptions(
|
||||||
|
"get",
|
||||||
|
"/v1/transcripts/{transcript_id}",
|
||||||
|
{
|
||||||
|
params: { path: { transcript_id: transcriptId } },
|
||||||
|
},
|
||||||
|
).queryKey,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
@@ -409,10 +424,6 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => {
|
|||||||
);
|
);
|
||||||
if (message.data) {
|
if (message.data) {
|
||||||
setWaveForm(message.data.waveform);
|
setWaveForm(message.data.waveform);
|
||||||
invalidateTranscriptWaveform(
|
|
||||||
queryClient,
|
|
||||||
transcriptId as NonEmptyString,
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case "DURATION":
|
case "DURATION":
|
||||||
|
|||||||
@@ -26,7 +26,7 @@ import { useRouter } from "next/navigation";
|
|||||||
import { formatDateTime, formatStartedAgo } from "../lib/timeUtils";
|
import { formatDateTime, formatStartedAgo } from "../lib/timeUtils";
|
||||||
import MeetingMinimalHeader from "../components/MeetingMinimalHeader";
|
import MeetingMinimalHeader from "../components/MeetingMinimalHeader";
|
||||||
import { NonEmptyString } from "../lib/utils";
|
import { NonEmptyString } from "../lib/utils";
|
||||||
import { MeetingId, assertMeetingId } from "../lib/types";
|
import { MeetingId } from "../lib/types";
|
||||||
|
|
||||||
type Meeting = components["schemas"]["Meeting"];
|
type Meeting = components["schemas"]["Meeting"];
|
||||||
|
|
||||||
@@ -315,9 +315,7 @@ export default function MeetingSelection({
|
|||||||
variant="outline"
|
variant="outline"
|
||||||
colorScheme="red"
|
colorScheme="red"
|
||||||
size="md"
|
size="md"
|
||||||
onClick={() =>
|
onClick={() => handleEndMeeting(meeting.id)}
|
||||||
handleEndMeeting(assertMeetingId(meeting.id))
|
|
||||||
}
|
|
||||||
loading={deactivateMeetingMutation.isPending}
|
loading={deactivateMeetingMutation.isPending}
|
||||||
>
|
>
|
||||||
<Icon as={LuX} me={2} />
|
<Icon as={LuX} me={2} />
|
||||||
@@ -462,9 +460,7 @@ export default function MeetingSelection({
|
|||||||
variant="outline"
|
variant="outline"
|
||||||
colorScheme="red"
|
colorScheme="red"
|
||||||
size="md"
|
size="md"
|
||||||
onClick={() =>
|
onClick={() => handleEndMeeting(meeting.id)}
|
||||||
handleEndMeeting(assertMeetingId(meeting.id))
|
|
||||||
}
|
|
||||||
loading={deactivateMeetingMutation.isPending}
|
loading={deactivateMeetingMutation.isPending}
|
||||||
>
|
>
|
||||||
<Icon as={LuX} me={2} />
|
<Icon as={LuX} me={2} />
|
||||||
|
|||||||
@@ -24,15 +24,24 @@ import { useAuth } from "../../lib/AuthProvider";
|
|||||||
import { useConsentDialog } from "../../lib/consent";
|
import { useConsentDialog } from "../../lib/consent";
|
||||||
import {
|
import {
|
||||||
useRoomJoinMeeting,
|
useRoomJoinMeeting,
|
||||||
|
useRoomJoinedMeeting,
|
||||||
|
useRoomLeaveMeeting,
|
||||||
useMeetingStartRecording,
|
useMeetingStartRecording,
|
||||||
|
leaveRoomPostUrl,
|
||||||
|
LeaveRoomBody,
|
||||||
} from "../../lib/apiHooks";
|
} from "../../lib/apiHooks";
|
||||||
import { omit } from "remeda";
|
import { omit } from "remeda";
|
||||||
import {
|
import {
|
||||||
assertExists,
|
assertExists,
|
||||||
|
assertExistsAndNonEmptyString,
|
||||||
NonEmptyString,
|
NonEmptyString,
|
||||||
parseNonEmptyString,
|
parseNonEmptyString,
|
||||||
} from "../../lib/utils";
|
} from "../../lib/utils";
|
||||||
import { assertMeetingId, DailyRecordingType } from "../../lib/types";
|
import {
|
||||||
|
assertMeetingId,
|
||||||
|
DailyRecordingType,
|
||||||
|
MeetingId,
|
||||||
|
} from "../../lib/types";
|
||||||
import { useUuidV5 } from "react-uuid-hook";
|
import { useUuidV5 } from "react-uuid-hook";
|
||||||
|
|
||||||
const CONSENT_BUTTON_ID = "recording-consent";
|
const CONSENT_BUTTON_ID = "recording-consent";
|
||||||
@@ -179,6 +188,58 @@ const useFrame = (
|
|||||||
] as const;
|
] as const;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const leaveDaily = () => {
|
||||||
|
const frame = DailyIframe.getCallInstance();
|
||||||
|
frame?.leave();
|
||||||
|
};
|
||||||
|
|
||||||
|
const useDirtyDisconnects = (
|
||||||
|
meetingId: NonEmptyString,
|
||||||
|
roomName: NonEmptyString,
|
||||||
|
) => {
|
||||||
|
useEffect(() => {
|
||||||
|
if (!meetingId || !roomName) return;
|
||||||
|
|
||||||
|
const handleBeforeUnload = () => {
|
||||||
|
leaveDaily();
|
||||||
|
navigator.sendBeacon(
|
||||||
|
leaveRoomPostUrl(
|
||||||
|
{
|
||||||
|
room_name: roomName,
|
||||||
|
meeting_id: meetingId,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
delay_seconds: 5,
|
||||||
|
},
|
||||||
|
),
|
||||||
|
undefined satisfies LeaveRoomBody,
|
||||||
|
);
|
||||||
|
};
|
||||||
|
window.addEventListener("beforeunload", handleBeforeUnload);
|
||||||
|
return () => window.removeEventListener("beforeunload", handleBeforeUnload);
|
||||||
|
}, [meetingId, roomName]);
|
||||||
|
};
|
||||||
|
|
||||||
|
const useDisconnects = (
|
||||||
|
meetingId: NonEmptyString,
|
||||||
|
roomName: NonEmptyString,
|
||||||
|
leaveMutation: ReturnType<typeof useRoomLeaveMeeting>,
|
||||||
|
) => {
|
||||||
|
useDirtyDisconnects(meetingId, roomName);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
return () => {
|
||||||
|
leaveDaily();
|
||||||
|
leaveMutation.mutate({
|
||||||
|
params: {
|
||||||
|
path: { meeting_id: meetingId, room_name: roomName },
|
||||||
|
query: { delay_seconds: 5 },
|
||||||
|
},
|
||||||
|
});
|
||||||
|
};
|
||||||
|
}, [meetingId, roomName]);
|
||||||
|
};
|
||||||
|
|
||||||
export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
||||||
const router = useRouter();
|
const router = useRouter();
|
||||||
const params = useParams();
|
const params = useParams();
|
||||||
@@ -186,6 +247,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
|||||||
const authLastUserId = auth.lastUserId;
|
const authLastUserId = auth.lastUserId;
|
||||||
const [container, setContainer] = useState<HTMLDivElement | null>(null);
|
const [container, setContainer] = useState<HTMLDivElement | null>(null);
|
||||||
const joinMutation = useRoomJoinMeeting();
|
const joinMutation = useRoomJoinMeeting();
|
||||||
|
const joinedMutation = useRoomJoinedMeeting();
|
||||||
|
const leaveMutation = useRoomLeaveMeeting();
|
||||||
const startRecordingMutation = useMeetingStartRecording();
|
const startRecordingMutation = useMeetingStartRecording();
|
||||||
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
|
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
|
||||||
|
|
||||||
@@ -195,7 +258,9 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
|||||||
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
|
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
|
||||||
);
|
);
|
||||||
|
|
||||||
const roomName = params?.roomName as string;
|
if (typeof params.roomName === "object")
|
||||||
|
throw new Error(`Invalid room name in params. array? ${params.roomName}`);
|
||||||
|
const roomName = assertExistsAndNonEmptyString(params.roomName);
|
||||||
|
|
||||||
const {
|
const {
|
||||||
showConsentModal,
|
showConsentModal,
|
||||||
@@ -237,6 +302,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
|||||||
router.push("/browse");
|
router.push("/browse");
|
||||||
}, [router]);
|
}, [router]);
|
||||||
|
|
||||||
|
useDisconnects(meeting.id as MeetingId, roomName, leaveMutation);
|
||||||
|
|
||||||
const handleCustomButtonClick = useCallback(
|
const handleCustomButtonClick = useCallback(
|
||||||
(ev: DailyEventObjectCustomButtonClick) => {
|
(ev: DailyEventObjectCustomButtonClick) => {
|
||||||
if (ev.button_id === CONSENT_BUTTON_ID) {
|
if (ev.button_id === CONSENT_BUTTON_ID) {
|
||||||
@@ -249,6 +316,15 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
|||||||
);
|
);
|
||||||
|
|
||||||
const handleFrameJoinMeeting = useCallback(() => {
|
const handleFrameJoinMeeting = useCallback(() => {
|
||||||
|
joinedMutation.mutate({
|
||||||
|
params: {
|
||||||
|
path: {
|
||||||
|
room_name: roomName,
|
||||||
|
meeting_id: meeting.id,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
if (meeting.recording_type === "cloud") {
|
if (meeting.recording_type === "cloud") {
|
||||||
console.log("Starting dual recording via REST API", {
|
console.log("Starting dual recording via REST API", {
|
||||||
cloudInstanceId,
|
cloudInstanceId,
|
||||||
@@ -308,8 +384,10 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
|||||||
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
|
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
|
||||||
}
|
}
|
||||||
}, [
|
}, [
|
||||||
meeting.recording_type,
|
joinedMutation,
|
||||||
|
roomName,
|
||||||
meeting.id,
|
meeting.id,
|
||||||
|
meeting.recording_type,
|
||||||
startRecordingMutation,
|
startRecordingMutation,
|
||||||
cloudInstanceId,
|
cloudInstanceId,
|
||||||
rawTracksInstanceId,
|
rawTracksInstanceId,
|
||||||
|
|||||||
@@ -1,12 +1,13 @@
|
|||||||
"use client";
|
"use client";
|
||||||
|
|
||||||
import { $api } from "./apiClient";
|
import { $api, API_URL } from "./apiClient";
|
||||||
import { useError } from "../(errors)/errorContext";
|
import { useError } from "../(errors)/errorContext";
|
||||||
import { QueryClient, useQueryClient } from "@tanstack/react-query";
|
import { QueryClient, useQueryClient } from "@tanstack/react-query";
|
||||||
import type { components } from "../reflector-api";
|
import type { components, operations } from "../reflector-api";
|
||||||
import { useAuth } from "./AuthProvider";
|
import { useAuth } from "./AuthProvider";
|
||||||
import { MeetingId } from "./types";
|
import { MeetingId } from "./types";
|
||||||
import { NonEmptyString } from "./utils";
|
import { NonEmptyString } from "./utils";
|
||||||
|
import { createFinalURL, createQuerySerializer } from "openapi-fetch";
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
|
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
|
||||||
@@ -104,7 +105,7 @@ export function useTranscriptProcess() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
export function useTranscriptGet(transcriptId: NonEmptyString | null) {
|
export function useTranscriptGet(transcriptId: string | null) {
|
||||||
return $api.useQuery(
|
return $api.useQuery(
|
||||||
"get",
|
"get",
|
||||||
"/v1/transcripts/{transcript_id}",
|
"/v1/transcripts/{transcript_id}",
|
||||||
@@ -121,16 +122,6 @@ export function useTranscriptGet(transcriptId: NonEmptyString | null) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export const invalidateTranscript = (
|
|
||||||
queryClient: QueryClient,
|
|
||||||
transcriptId: NonEmptyString,
|
|
||||||
) =>
|
|
||||||
queryClient.invalidateQueries({
|
|
||||||
queryKey: $api.queryOptions("get", "/v1/transcripts/{transcript_id}", {
|
|
||||||
params: { path: { transcript_id: transcriptId } },
|
|
||||||
}).queryKey,
|
|
||||||
});
|
|
||||||
|
|
||||||
export function useRoomGet(roomId: string | null) {
|
export function useRoomGet(roomId: string | null) {
|
||||||
const { isAuthenticated } = useAuthReady();
|
const { isAuthenticated } = useAuthReady();
|
||||||
|
|
||||||
@@ -308,7 +299,7 @@ export function useTranscriptUploadAudio() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function useTranscriptWaveform(transcriptId: NonEmptyString | null) {
|
export function useTranscriptWaveform(transcriptId: string | null) {
|
||||||
return $api.useQuery(
|
return $api.useQuery(
|
||||||
"get",
|
"get",
|
||||||
"/v1/transcripts/{transcript_id}/audio/waveform",
|
"/v1/transcripts/{transcript_id}/audio/waveform",
|
||||||
@@ -323,21 +314,7 @@ export function useTranscriptWaveform(transcriptId: NonEmptyString | null) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export const invalidateTranscriptWaveform = (
|
export function useTranscriptMP3(transcriptId: string | null) {
|
||||||
queryClient: QueryClient,
|
|
||||||
transcriptId: NonEmptyString,
|
|
||||||
) =>
|
|
||||||
queryClient.invalidateQueries({
|
|
||||||
queryKey: $api.queryOptions(
|
|
||||||
"get",
|
|
||||||
"/v1/transcripts/{transcript_id}/audio/waveform",
|
|
||||||
{
|
|
||||||
params: { path: { transcript_id: transcriptId } },
|
|
||||||
},
|
|
||||||
).queryKey,
|
|
||||||
});
|
|
||||||
|
|
||||||
export function useTranscriptMP3(transcriptId: NonEmptyString | null) {
|
|
||||||
const { isAuthenticated } = useAuthReady();
|
const { isAuthenticated } = useAuthReady();
|
||||||
|
|
||||||
return $api.useQuery(
|
return $api.useQuery(
|
||||||
@@ -354,7 +331,7 @@ export function useTranscriptMP3(transcriptId: NonEmptyString | null) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function useTranscriptTopics(transcriptId: NonEmptyString | null) {
|
export function useTranscriptTopics(transcriptId: string | null) {
|
||||||
return $api.useQuery(
|
return $api.useQuery(
|
||||||
"get",
|
"get",
|
||||||
"/v1/transcripts/{transcript_id}/topics",
|
"/v1/transcripts/{transcript_id}/topics",
|
||||||
@@ -369,23 +346,7 @@ export function useTranscriptTopics(transcriptId: NonEmptyString | null) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export const invalidateTranscriptTopics = (
|
export function useTranscriptTopicsWithWords(transcriptId: string | null) {
|
||||||
queryClient: QueryClient,
|
|
||||||
transcriptId: NonEmptyString,
|
|
||||||
) =>
|
|
||||||
queryClient.invalidateQueries({
|
|
||||||
queryKey: $api.queryOptions(
|
|
||||||
"get",
|
|
||||||
"/v1/transcripts/{transcript_id}/topics",
|
|
||||||
{
|
|
||||||
params: { path: { transcript_id: transcriptId } },
|
|
||||||
},
|
|
||||||
).queryKey,
|
|
||||||
});
|
|
||||||
|
|
||||||
export function useTranscriptTopicsWithWords(
|
|
||||||
transcriptId: NonEmptyString | null,
|
|
||||||
) {
|
|
||||||
const { isAuthenticated } = useAuthReady();
|
const { isAuthenticated } = useAuthReady();
|
||||||
|
|
||||||
return $api.useQuery(
|
return $api.useQuery(
|
||||||
@@ -403,7 +364,7 @@ export function useTranscriptTopicsWithWords(
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function useTranscriptTopicsWithWordsPerSpeaker(
|
export function useTranscriptTopicsWithWordsPerSpeaker(
|
||||||
transcriptId: NonEmptyString | null,
|
transcriptId: string | null,
|
||||||
topicId: string | null,
|
topicId: string | null,
|
||||||
) {
|
) {
|
||||||
const { isAuthenticated } = useAuthReady();
|
const { isAuthenticated } = useAuthReady();
|
||||||
@@ -425,7 +386,7 @@ export function useTranscriptTopicsWithWordsPerSpeaker(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function useTranscriptParticipants(transcriptId: NonEmptyString | null) {
|
export function useTranscriptParticipants(transcriptId: string | null) {
|
||||||
const { isAuthenticated } = useAuthReady();
|
const { isAuthenticated } = useAuthReady();
|
||||||
|
|
||||||
return $api.useQuery(
|
return $api.useQuery(
|
||||||
@@ -807,6 +768,44 @@ export function useRoomJoinMeeting() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const LEAVE_ROOM_POST_URL_TEMPLATE =
|
||||||
|
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave" as const;
|
||||||
|
|
||||||
|
export const leaveRoomPostUrl = (
|
||||||
|
path: operations["v1_rooms_leave_meeting"]["parameters"]["path"],
|
||||||
|
query?: operations["v1_rooms_leave_meeting"]["parameters"]["query"],
|
||||||
|
): string =>
|
||||||
|
createFinalURL(LEAVE_ROOM_POST_URL_TEMPLATE, {
|
||||||
|
baseUrl: API_URL,
|
||||||
|
params: { path, query },
|
||||||
|
querySerializer: createQuerySerializer(),
|
||||||
|
});
|
||||||
|
|
||||||
|
export type LeaveRoomBody = operations["v1_rooms_leave_meeting"]["requestBody"];
|
||||||
|
|
||||||
|
export function useRoomLeaveMeeting() {
|
||||||
|
return $api.useMutation("post", LEAVE_ROOM_POST_URL_TEMPLATE);
|
||||||
|
}
|
||||||
|
|
||||||
|
export const JOINED_ROOM_POST_URL_TEMPLATE =
|
||||||
|
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined" as const;
|
||||||
|
|
||||||
|
export const joinedRoomPostUrl = (
|
||||||
|
params: operations["v1_rooms_joined_meeting"]["parameters"]["path"],
|
||||||
|
): string =>
|
||||||
|
createFinalURL(JOINED_ROOM_POST_URL_TEMPLATE, {
|
||||||
|
baseUrl: API_URL,
|
||||||
|
params: { path: params },
|
||||||
|
querySerializer: () => "",
|
||||||
|
});
|
||||||
|
|
||||||
|
export type JoinedRoomBody =
|
||||||
|
operations["v1_rooms_joined_meeting"]["requestBody"];
|
||||||
|
|
||||||
|
export function useRoomJoinedMeeting() {
|
||||||
|
return $api.useMutation("post", JOINED_ROOM_POST_URL_TEMPLATE);
|
||||||
|
}
|
||||||
|
|
||||||
export function useRoomIcsSync() {
|
export function useRoomIcsSync() {
|
||||||
const { setError } = useError();
|
const { setError } = useError();
|
||||||
|
|
||||||
|
|||||||
108
www/app/reflector-api.d.ts
vendored
108
www/app/reflector-api.d.ts
vendored
@@ -171,6 +171,48 @@ export interface paths {
|
|||||||
patch?: never;
|
patch?: never;
|
||||||
trace?: never;
|
trace?: never;
|
||||||
};
|
};
|
||||||
|
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined": {
|
||||||
|
parameters: {
|
||||||
|
query?: never;
|
||||||
|
header?: never;
|
||||||
|
path?: never;
|
||||||
|
cookie?: never;
|
||||||
|
};
|
||||||
|
get?: never;
|
||||||
|
put?: never;
|
||||||
|
/**
|
||||||
|
* Rooms Joined Meeting
|
||||||
|
* @description Trigger presence poll (ideally when user actually joins meeting in Daily iframe)
|
||||||
|
*/
|
||||||
|
post: operations["v1_rooms_joined_meeting"];
|
||||||
|
delete?: never;
|
||||||
|
options?: never;
|
||||||
|
head?: never;
|
||||||
|
patch?: never;
|
||||||
|
trace?: never;
|
||||||
|
};
|
||||||
|
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave": {
|
||||||
|
parameters: {
|
||||||
|
query?: never;
|
||||||
|
header?: never;
|
||||||
|
path?: never;
|
||||||
|
cookie?: never;
|
||||||
|
};
|
||||||
|
get?: never;
|
||||||
|
put?: never;
|
||||||
|
/**
|
||||||
|
* Rooms Leave Meeting
|
||||||
|
* @description Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
|
||||||
|
*
|
||||||
|
* Queues presence poll with optional delay to allow Daily.co to detect disconnect.
|
||||||
|
*/
|
||||||
|
post: operations["v1_rooms_leave_meeting"];
|
||||||
|
delete?: never;
|
||||||
|
options?: never;
|
||||||
|
head?: never;
|
||||||
|
patch?: never;
|
||||||
|
trace?: never;
|
||||||
|
};
|
||||||
"/v1/rooms/{room_id}/webhook/test": {
|
"/v1/rooms/{room_id}/webhook/test": {
|
||||||
parameters: {
|
parameters: {
|
||||||
query?: never;
|
query?: never;
|
||||||
@@ -2435,6 +2477,72 @@ export interface operations {
|
|||||||
};
|
};
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
v1_rooms_joined_meeting: {
|
||||||
|
parameters: {
|
||||||
|
query?: never;
|
||||||
|
header?: never;
|
||||||
|
path: {
|
||||||
|
room_name: string;
|
||||||
|
meeting_id: string;
|
||||||
|
};
|
||||||
|
cookie?: never;
|
||||||
|
};
|
||||||
|
requestBody?: never;
|
||||||
|
responses: {
|
||||||
|
/** @description Successful Response */
|
||||||
|
200: {
|
||||||
|
headers: {
|
||||||
|
[name: string]: unknown;
|
||||||
|
};
|
||||||
|
content: {
|
||||||
|
"application/json": unknown;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
/** @description Validation Error */
|
||||||
|
422: {
|
||||||
|
headers: {
|
||||||
|
[name: string]: unknown;
|
||||||
|
};
|
||||||
|
content: {
|
||||||
|
"application/json": components["schemas"]["HTTPValidationError"];
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
|
v1_rooms_leave_meeting: {
|
||||||
|
parameters: {
|
||||||
|
query?: {
|
||||||
|
delay_seconds?: number;
|
||||||
|
};
|
||||||
|
header?: never;
|
||||||
|
path: {
|
||||||
|
room_name: string;
|
||||||
|
meeting_id: string;
|
||||||
|
};
|
||||||
|
cookie?: never;
|
||||||
|
};
|
||||||
|
requestBody?: never;
|
||||||
|
responses: {
|
||||||
|
/** @description Successful Response */
|
||||||
|
200: {
|
||||||
|
headers: {
|
||||||
|
[name: string]: unknown;
|
||||||
|
};
|
||||||
|
content: {
|
||||||
|
"application/json": unknown;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
/** @description Validation Error */
|
||||||
|
422: {
|
||||||
|
headers: {
|
||||||
|
[name: string]: unknown;
|
||||||
|
};
|
||||||
|
content: {
|
||||||
|
"application/json": components["schemas"]["HTTPValidationError"];
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
v1_rooms_test_webhook: {
|
v1_rooms_test_webhook: {
|
||||||
parameters: {
|
parameters: {
|
||||||
query?: never;
|
query?: never;
|
||||||
|
|||||||
Reference in New Issue
Block a user