Compare commits

..

5 Commits

Author SHA1 Message Date
a08f94a5bf chore(main): release 0.32.1 (#840) 2026-01-30 17:34:48 -05:00
Igor Loskutov
c05d1f03cd fix: match httpx pad with hatchet audio timeout 2026-01-30 15:56:18 -05:00
Igor Loskutov
23eb1371cb fix: daily multitrack pipeline finalze dependency fix 2026-01-30 15:19:27 -05:00
2592e369f6 chore(main): release 0.32.0 (#838) 2026-01-30 13:13:59 -05:00
7fde64e252 feat: modal padding (#837)
* Add Modal backend for audio padding

- Create reflector_padding.py Modal deployment (CPU-based)
- Add PaddingWorkflow with conditional Modal/local backend
- Update deploy-all.sh to include padding deployment

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-01-30 13:11:51 -05:00
19 changed files with 800 additions and 328 deletions

1
.gitignore vendored
View File

@@ -1,5 +1,6 @@
.DS_Store .DS_Store
server/.env server/.env
server/.env.production
.env .env
Caddyfile Caddyfile
server/exportdanswer server/exportdanswer

View File

@@ -4,4 +4,3 @@ docs/docs/installation/daily-setup.md:curl-auth-header:277
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:74
gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83 gpu/self_hosted/DEV_SETUP.md:curl-auth-header:83
server/reflector/worker/process.py:generic-api-key:465 server/reflector/worker/process.py:generic-api-key:465
server/reflector/worker/process.py:generic-api-key:594

View File

@@ -1,5 +1,20 @@
# Changelog # Changelog
## [0.32.1](https://github.com/Monadical-SAS/reflector/compare/v0.32.0...v0.32.1) (2026-01-30)
### Bug Fixes
* daily multitrack pipeline finalze dependency fix ([23eb137](https://github.com/Monadical-SAS/reflector/commit/23eb1371cb9348c4b81eb12ad506b582f8a4799e))
* match httpx pad with hatchet audio timeout ([c05d1f0](https://github.com/Monadical-SAS/reflector/commit/c05d1f03cd8369fc06efd455527e50246887efd0))
## [0.32.0](https://github.com/Monadical-SAS/reflector/compare/v0.31.0...v0.32.0) (2026-01-30)
### Features
* modal padding ([#837](https://github.com/Monadical-SAS/reflector/issues/837)) ([7fde64e](https://github.com/Monadical-SAS/reflector/commit/7fde64e2529a1d37b0f7507c62d983a7bd0b5b89))
## [0.31.0](https://github.com/Monadical-SAS/reflector/compare/v0.30.0...v0.31.0) (2026-01-23) ## [0.31.0](https://github.com/Monadical-SAS/reflector/compare/v0.30.0...v0.31.0) (2026-01-23)

View File

@@ -131,6 +131,15 @@ if [ -z "$DIARIZER_URL" ]; then
fi fi
echo " -> $DIARIZER_URL" echo " -> $DIARIZER_URL"
echo ""
echo "Deploying padding (CPU audio processing via Modal SDK)..."
modal deploy reflector_padding.py
if [ $? -ne 0 ]; then
echo "Error: Failed to deploy padding. Check Modal dashboard for details."
exit 1
fi
echo " -> reflector-padding.pad_track (Modal SDK function)"
# --- Output Configuration --- # --- Output Configuration ---
echo "" echo ""
echo "==========================================" echo "=========================================="
@@ -147,4 +156,6 @@ echo ""
echo "DIARIZATION_BACKEND=modal" echo "DIARIZATION_BACKEND=modal"
echo "DIARIZATION_URL=$DIARIZER_URL" echo "DIARIZATION_URL=$DIARIZER_URL"
echo "DIARIZATION_MODAL_API_KEY=$API_KEY" echo "DIARIZATION_MODAL_API_KEY=$API_KEY"
echo ""
echo "# Padding uses Modal SDK (requires MODAL_TOKEN_ID/SECRET in worker containers)"
echo "# --- End Modal Configuration ---" echo "# --- End Modal Configuration ---"

View File

@@ -0,0 +1,277 @@
"""
Reflector GPU backend - audio padding
======================================
CPU-intensive audio padding service for adding silence to audio tracks.
Uses PyAV filter graph (adelay) for precise track synchronization.
IMPORTANT: This padding logic is duplicated from server/reflector/utils/audio_padding.py
for Modal deployment isolation (Modal can't import from server/reflector/). If you modify
the PyAV filter graph or padding algorithm, you MUST update both:
- gpu/modal_deployments/reflector_padding.py (this file)
- server/reflector/utils/audio_padding.py
Constants duplicated from server/reflector/utils/audio_constants.py for same reason.
"""
import os
import tempfile
from fractions import Fraction
import math
import asyncio
import modal
S3_TIMEOUT = 60 # happens 2 times
PADDING_TIMEOUT = 600 + (S3_TIMEOUT * 2)
SCALEDOWN_WINDOW = 60 # The maximum duration (in seconds) that individual containers can remain idle when scaling down.
DISCONNECT_CHECK_INTERVAL = 2 # Check for client disconnect
app = modal.App("reflector-padding")
# CPU-based image
image = (
modal.Image.debian_slim(python_version="3.12")
.apt_install("ffmpeg") # Required by PyAV
.pip_install(
"av==13.1.0", # PyAV for audio processing
"requests==2.32.3", # HTTP for presigned URL downloads/uploads
"fastapi==0.115.12", # API framework
)
)
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_STANDARD_SAMPLE_RATE = 48000
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_DEFAULT_BIT_RATE = 128000
@app.function(
cpu=2.0,
timeout=PADDING_TIMEOUT,
scaledown_window=SCALEDOWN_WINDOW,
image=image,
)
@modal.asgi_app()
def web():
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
class PaddingRequest(BaseModel):
track_url: str
output_url: str
start_time_seconds: float
track_index: int
class PaddingResponse(BaseModel):
size: int
cancelled: bool = False
web_app = FastAPI()
@web_app.post("/pad")
async def pad_track_endpoint(request: Request, req: PaddingRequest) -> PaddingResponse:
"""Modal web endpoint for padding audio tracks with disconnect detection.
"""
import logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
if not req.track_url:
raise HTTPException(status_code=400, detail="track_url cannot be empty")
if not req.output_url:
raise HTTPException(status_code=400, detail="output_url cannot be empty")
if req.start_time_seconds <= 0:
raise HTTPException(status_code=400, detail=f"start_time_seconds must be positive, got {req.start_time_seconds}")
if req.start_time_seconds > 18000:
raise HTTPException(status_code=400, detail=f"start_time_seconds exceeds maximum 18000s (5 hours)")
logger.info(f"Padding request: track {req.track_index}, delay={req.start_time_seconds}s")
# Thread-safe cancellation flag shared between async disconnect checker and blocking thread
import threading
cancelled = threading.Event()
async def check_disconnect():
"""Background task to check for client disconnect every 2 seconds."""
while not cancelled.is_set():
await asyncio.sleep(DISCONNECT_CHECK_INTERVAL)
if await request.is_disconnected():
logger.warning("Client disconnected, setting cancellation flag")
cancelled.set()
break
# Start disconnect checker in background
disconnect_task = asyncio.create_task(check_disconnect())
try:
result = await asyncio.get_event_loop().run_in_executor(
None, _pad_track_blocking, req, cancelled, logger
)
return PaddingResponse(**result)
finally:
cancelled.set()
disconnect_task.cancel()
try:
await disconnect_task
except asyncio.CancelledError:
pass
def _pad_track_blocking(req, cancelled, logger) -> dict:
"""Blocking CPU-bound padding work with periodic cancellation checks.
Args:
cancelled: threading.Event for thread-safe cancellation signaling
"""
import av
import requests
from av.audio.resampler import AudioResampler
import time
temp_dir = tempfile.mkdtemp()
input_path = None
output_path = None
last_check = time.time()
try:
logger.info("Downloading track for padding")
response = requests.get(req.track_url, stream=True, timeout=S3_TIMEOUT)
response.raise_for_status()
input_path = os.path.join(temp_dir, "track.webm")
total_bytes = 0
chunk_count = 0
with open(input_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
total_bytes += len(chunk)
chunk_count += 1
# Check for cancellation every arbitrary amount of chunks
if chunk_count % 12 == 0:
now = time.time()
if now - last_check >= DISCONNECT_CHECK_INTERVAL:
if cancelled.is_set():
logger.info("Cancelled during download, exiting early")
return {"size": 0, "cancelled": True}
last_check = now
logger.info(f"Track downloaded: {total_bytes} bytes")
if cancelled.is_set():
logger.info("Cancelled after download, exiting early")
return {"size": 0, "cancelled": True}
# Apply padding using PyAV
output_path = os.path.join(temp_dir, "padded.webm")
delay_ms = math.floor(req.start_time_seconds * 1000)
logger.info(f"Padding track {req.track_index} with {delay_ms}ms delay using PyAV")
in_container = av.open(input_path)
in_stream = next((s for s in in_container.streams if s.type == "audio"), None)
if in_stream is None:
raise ValueError("No audio stream in input")
with av.open(output_path, "w", format="webm") as out_container:
out_stream = out_container.add_stream("libopus", rate=OPUS_STANDARD_SAMPLE_RATE)
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
graph = av.filter.Graph()
abuf_args = (
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_fmt=s16:"
f"channel_layout=stereo"
)
src = graph.add("abuffer", args=abuf_args, name="src")
aresample_f = graph.add("aresample", args="async=1", name="ares")
delays_arg = f"{delay_ms}|{delay_ms}"
adelay_f = graph.add("adelay", args=f"delays={delays_arg}:all=1", name="delay")
sink = graph.add("abuffersink", name="sink")
src.link_to(aresample_f)
aresample_f.link_to(adelay_f)
adelay_f.link_to(sink)
graph.configure()
resampler = AudioResampler(
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
)
for frame in in_container.decode(in_stream):
# Check for cancellation periodically
now = time.time()
if now - last_check >= DISCONNECT_CHECK_INTERVAL:
if cancelled.is_set():
logger.info("Cancelled during processing, exiting early")
in_container.close()
return {"size": 0, "cancelled": True}
last_check = now
out_frames = resampler.resample(frame) or []
for rframe in out_frames:
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
src.push(rframe)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush filter graph
src.push(None)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush encoder
for packet in out_stream.encode(None):
out_container.mux(packet)
in_container.close()
file_size = os.path.getsize(output_path)
logger.info(f"Padding complete: {file_size} bytes")
logger.info("Uploading padded track to S3")
with open(output_path, "rb") as f:
upload_response = requests.put(req.output_url, data=f, timeout=S3_TIMEOUT)
upload_response.raise_for_status()
logger.info(f"Upload complete: {file_size} bytes")
return {"size": file_size}
finally:
if input_path and os.path.exists(input_path):
try:
os.unlink(input_path)
except Exception as e:
logger.warning(f"Failed to cleanup input file: {e}")
if output_path and os.path.exists(output_path):
try:
os.unlink(output_path)
except Exception as e:
logger.warning(f"Failed to cleanup output file: {e}")
try:
os.rmdir(temp_dir)
except Exception as e:
logger.warning(f"Failed to cleanup temp directory: {e}")
return web_app

View File

@@ -8,7 +8,7 @@ readme = "README.md"
dependencies = [ dependencies = [
"aiohttp>=3.9.0", "aiohttp>=3.9.0",
"aiohttp-cors>=0.7.0", "aiohttp-cors>=0.7.0",
"av>=10.0.0", "av>=15.0.0",
"requests>=2.31.0", "requests>=2.31.0",
"aiortc>=1.5.0", "aiortc>=1.5.0",
"sortedcontainers>=2.4.0", "sortedcontainers>=2.4.0",

View File

@@ -8,8 +8,7 @@ from enum import StrEnum
class TaskName(StrEnum): class TaskName(StrEnum):
GET_RECORDING = "get_recording" GET_RECORDING = "get_recording"
GET_PARTICIPANTS = "get_participants" GET_PARTICIPANTS = "get_participants"
PROCESS_PADDINGS = "process_paddings" PROCESS_TRACKS = "process_tracks"
PROCESS_TRANSCRIPTIONS = "process_transcriptions"
MIXDOWN_TRACKS = "mixdown_tracks" MIXDOWN_TRACKS = "mixdown_tracks"
GENERATE_WAVEFORM = "generate_waveform" GENERATE_WAVEFORM = "generate_waveform"
DETECT_TOPICS = "detect_topics" DETECT_TOPICS = "detect_topics"
@@ -38,5 +37,5 @@ LLM_RATE_LIMIT_PER_SECOND = 10
TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates
TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation
TIMEOUT_LONG = 180 # Action items (larger context LLM) TIMEOUT_LONG = 180 # Action items (larger context LLM)
TIMEOUT_AUDIO = 300 # Audio processing: padding, mixdown TIMEOUT_AUDIO = 720 # Audio processing: padding, mixdown
TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks

View File

@@ -1,9 +1,9 @@
""" """
CPU-heavy worker pool for audio processing tasks. CPU-heavy worker pool for audio processing tasks.
Handles: mixdown_tracks only (serialized with max_runs=1) Handles ONLY: mixdown_tracks
Configuration: Configuration:
- slots=1: Only one mixdown at a time - slots=1: Only mixdown (already serialized globally with max_runs=1)
- Worker affinity: pool=cpu-heavy - Worker affinity: pool=cpu-heavy
""" """
@@ -26,7 +26,7 @@ def main():
cpu_worker = hatchet.worker( cpu_worker = hatchet.worker(
"cpu-worker-pool", "cpu-worker-pool",
slots=1, slots=1, # Only 1 mixdown at a time (already serialized globally)
labels={ labels={
"pool": "cpu-heavy", "pool": "cpu-heavy",
}, },

View File

@@ -1,16 +1,15 @@
""" """
LLM/I/O worker pool for all non-CPU tasks. LLM/I/O worker pool for all non-CPU tasks.
Handles: all tasks except mixdown_tracks (padding, transcription, LLM inference, orchestration) Handles: all tasks except mixdown_tracks (transcription, LLM inference, orchestration)
""" """
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.daily_multitrack_pipeline import ( from reflector.hatchet.workflows.daily_multitrack_pipeline import (
daily_multitrack_pipeline, daily_multitrack_pipeline,
) )
from reflector.hatchet.workflows.padding_workflow import padding_workflow
from reflector.hatchet.workflows.subject_processing import subject_workflow from reflector.hatchet.workflows.subject_processing import subject_workflow
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
from reflector.hatchet.workflows.transcription_workflow import transcription_workflow from reflector.hatchet.workflows.track_processing import track_workflow
from reflector.logger import logger from reflector.logger import logger
SLOTS = 10 SLOTS = 10
@@ -30,7 +29,7 @@ def main():
llm_worker = hatchet.worker( llm_worker = hatchet.worker(
WORKER_NAME, WORKER_NAME,
slots=SLOTS, slots=SLOTS, # not all slots are probably used
labels={ labels={
"pool": POOL, "pool": POOL,
}, },
@@ -38,8 +37,7 @@ def main():
daily_multitrack_pipeline, daily_multitrack_pipeline,
topic_chunk_workflow, topic_chunk_workflow,
subject_workflow, subject_workflow,
padding_workflow, track_workflow,
transcription_workflow,
], ],
) )

View File

@@ -4,10 +4,6 @@ from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput, PipelineInput,
daily_multitrack_pipeline, daily_multitrack_pipeline,
) )
from reflector.hatchet.workflows.padding_workflow import (
PaddingInput,
padding_workflow,
)
from reflector.hatchet.workflows.subject_processing import ( from reflector.hatchet.workflows.subject_processing import (
SubjectInput, SubjectInput,
subject_workflow, subject_workflow,
@@ -16,20 +12,15 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
TopicChunkInput, TopicChunkInput,
topic_chunk_workflow, topic_chunk_workflow,
) )
from reflector.hatchet.workflows.transcription_workflow import ( from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
TranscriptionInput,
transcription_workflow,
)
__all__ = [ __all__ = [
"daily_multitrack_pipeline", "daily_multitrack_pipeline",
"subject_workflow", "subject_workflow",
"topic_chunk_workflow", "topic_chunk_workflow",
"padding_workflow", "track_workflow",
"transcription_workflow",
"PipelineInput", "PipelineInput",
"SubjectInput", "SubjectInput",
"TopicChunkInput", "TopicChunkInput",
"PaddingInput", "TrackInput",
"TranscriptionInput",
] ]

View File

@@ -54,9 +54,8 @@ from reflector.hatchet.workflows.models import (
PadTrackResult, PadTrackResult,
ParticipantInfo, ParticipantInfo,
ParticipantsResult, ParticipantsResult,
ProcessPaddingsResult,
ProcessSubjectsResult, ProcessSubjectsResult,
ProcessTranscriptionsResult, ProcessTracksResult,
RecapResult, RecapResult,
RecordingResult, RecordingResult,
SubjectsResult, SubjectsResult,
@@ -69,7 +68,6 @@ from reflector.hatchet.workflows.models import (
WebhookResult, WebhookResult,
ZulipResult, ZulipResult,
) )
from reflector.hatchet.workflows.padding_workflow import PaddingInput, padding_workflow
from reflector.hatchet.workflows.subject_processing import ( from reflector.hatchet.workflows.subject_processing import (
SubjectInput, SubjectInput,
subject_workflow, subject_workflow,
@@ -78,10 +76,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
TopicChunkInput, TopicChunkInput,
topic_chunk_workflow, topic_chunk_workflow,
) )
from reflector.hatchet.workflows.transcription_workflow import ( from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
TranscriptionInput,
transcription_workflow,
)
from reflector.logger import logger from reflector.logger import logger
from reflector.pipelines import topic_processing from reflector.pipelines import topic_processing
from reflector.processors import AudioFileWriterProcessor from reflector.processors import AudioFileWriterProcessor
@@ -409,115 +404,72 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3, retries=3,
) )
@with_error_handling(TaskName.PROCESS_PADDINGS) @with_error_handling(TaskName.PROCESS_TRACKS)
async def process_paddings(input: PipelineInput, ctx: Context) -> ProcessPaddingsResult: async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
"""Spawn child workflows for each track to apply padding (dynamic fan-out).""" """Spawn child workflows for each track (dynamic fan-out)."""
ctx.log(f"process_paddings: spawning {len(input.tracks)} padding workflows") ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows")
participants_result = ctx.task_output(get_participants)
source_language = participants_result.source_language
bulk_runs = [ bulk_runs = [
padding_workflow.create_bulk_run_item( track_workflow.create_bulk_run_item(
input=PaddingInput( input=TrackInput(
track_index=i, track_index=i,
s3_key=track["s3_key"], s3_key=track["s3_key"],
bucket_name=input.bucket_name, bucket_name=input.bucket_name,
transcript_id=input.transcript_id, transcript_id=input.transcript_id,
language=source_language,
) )
) )
for i, track in enumerate(input.tracks) for i, track in enumerate(input.tracks)
] ]
results = await padding_workflow.aio_run_many(bulk_runs) results = await track_workflow.aio_run_many(bulk_runs)
padded_tracks = []
created_padded_files = []
for result in results:
pad_result = PadTrackResult(**result[TaskName.PAD_TRACK])
padded_tracks.append(
PaddedTrackInfo(
key=pad_result.padded_key,
bucket_name=pad_result.bucket_name,
track_index=pad_result.track_index,
)
)
if pad_result.size > 0:
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{pad_result.track_index}.webm"
created_padded_files.append(storage_path)
ctx.log(f"process_paddings complete: {len(padded_tracks)} padded tracks")
return ProcessPaddingsResult(
padded_tracks=padded_tracks,
num_tracks=len(input.tracks),
created_padded_files=list(created_padded_files),
)
@daily_multitrack_pipeline.task(
parents=[process_paddings],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
)
@with_error_handling(TaskName.PROCESS_TRANSCRIPTIONS)
async def process_transcriptions(
input: PipelineInput, ctx: Context
) -> ProcessTranscriptionsResult:
"""Spawn child workflows for each padded track to transcribe (dynamic fan-out)."""
participants_result = ctx.task_output(get_participants)
paddings_result = ctx.task_output(process_paddings)
source_language = participants_result.source_language
if not source_language:
raise ValueError("source_language is required for transcription")
target_language = participants_result.target_language target_language = participants_result.target_language
padded_tracks = paddings_result.padded_tracks
if not padded_tracks:
raise ValueError("No padded tracks available for transcription")
ctx.log(
f"process_transcriptions: spawning {len(padded_tracks)} transcription workflows"
)
bulk_runs = [
transcription_workflow.create_bulk_run_item(
input=TranscriptionInput(
track_index=padded_track.track_index,
padded_key=padded_track.key,
bucket_name=padded_track.bucket_name,
language=source_language,
)
)
for padded_track in padded_tracks
]
results = await transcription_workflow.aio_run_many(bulk_runs)
track_words: list[list[Word]] = [] track_words: list[list[Word]] = []
padded_tracks = []
created_padded_files = set()
for result in results: for result in results:
transcribe_result = TranscribeTrackResult(**result[TaskName.TRANSCRIBE_TRACK]) transcribe_result = TranscribeTrackResult(**result[TaskName.TRANSCRIBE_TRACK])
track_words.append(transcribe_result.words) track_words.append(transcribe_result.words)
pad_result = PadTrackResult(**result[TaskName.PAD_TRACK])
# Store S3 key info (not presigned URL) - consumer tasks presign on demand
if pad_result.padded_key:
padded_tracks.append(
PaddedTrackInfo(
key=pad_result.padded_key, bucket_name=pad_result.bucket_name
)
)
if pad_result.size > 0:
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{pad_result.track_index}.webm"
created_padded_files.add(storage_path)
all_words = [word for words in track_words for word in words] all_words = [word for words in track_words for word in words]
all_words.sort(key=lambda w: w.start) all_words.sort(key=lambda w: w.start)
ctx.log( ctx.log(
f"process_transcriptions complete: {len(all_words)} words from {len(padded_tracks)} tracks" f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks"
) )
return ProcessTranscriptionsResult( return ProcessTracksResult(
all_words=all_words, all_words=all_words,
padded_tracks=padded_tracks,
word_count=len(all_words), word_count=len(all_words),
num_tracks=len(input.tracks), num_tracks=len(input.tracks),
target_language=target_language, target_language=target_language,
created_padded_files=list(created_padded_files),
) )
@daily_multitrack_pipeline.task( @daily_multitrack_pipeline.task(
parents=[process_paddings], parents=[process_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
retries=3, retries=3,
desired_worker_labels={ desired_worker_labels={
@@ -537,12 +489,12 @@ async def process_transcriptions(
) )
@with_error_handling(TaskName.MIXDOWN_TRACKS) @with_error_handling(TaskName.MIXDOWN_TRACKS)
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
"""Mix all padded tracks into single audio file using PyAV.""" """Mix all padded tracks into single audio file using PyAV (same as Celery)."""
ctx.log("mixdown_tracks: mixing padded tracks into single audio file") ctx.log("mixdown_tracks: mixing padded tracks into single audio file")
paddings_result = ctx.task_output(process_paddings) track_result = ctx.task_output(process_tracks)
recording_result = ctx.task_output(get_recording) recording_result = ctx.task_output(get_recording)
padded_tracks = paddings_result.padded_tracks padded_tracks = track_result.padded_tracks
# Dynamic timeout: scales with track count and recording duration # Dynamic timeout: scales with track count and recording duration
# Base 300s + 60s per track + 1s per 10s of recording # Base 300s + 60s per track + 1s per 10s of recording
@@ -609,13 +561,27 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
Path(output_path).unlink(missing_ok=True) Path(output_path).unlink(missing_ok=True)
duration = duration_ms_callback_capture_container[0]
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
transcripts_controller,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id) transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript: if transcript:
await transcripts_controller.update( await transcripts_controller.update(
transcript, {"audio_location": "storage"} transcript, {"audio_location": "storage", "duration": duration}
)
duration_data = TranscriptDuration(duration=duration)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
duration_data,
logger=logger,
) )
ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}") ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}")
@@ -696,7 +662,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
@daily_multitrack_pipeline.task( @daily_multitrack_pipeline.task(
parents=[process_transcriptions], parents=[process_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3, retries=3,
) )
@@ -705,8 +671,8 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
"""Detect topics using parallel child workflows (one per chunk).""" """Detect topics using parallel child workflows (one per chunk)."""
ctx.log("detect_topics: analyzing transcript for topics") ctx.log("detect_topics: analyzing transcript for topics")
transcriptions_result = ctx.task_output(process_transcriptions) track_result = ctx.task_output(process_tracks)
words = transcriptions_result.all_words words = track_result.all_words
if not words: if not words:
ctx.log("detect_topics: no words, returning empty topics") ctx.log("detect_topics: no words, returning empty topics")
@@ -1143,7 +1109,7 @@ async def identify_action_items(
@daily_multitrack_pipeline.task( @daily_multitrack_pipeline.task(
parents=[generate_title, generate_recap, identify_action_items], parents=[process_tracks, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3, retries=3,
) )
@@ -1156,15 +1122,10 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
""" """
ctx.log("finalize: saving transcript and setting status to 'ended'") ctx.log("finalize: saving transcript and setting status to 'ended'")
mixdown_result = ctx.task_output(mixdown_tracks) track_result = ctx.task_output(process_tracks)
transcriptions_result = ctx.task_output(process_transcriptions)
paddings_result = ctx.task_output(process_paddings)
duration = mixdown_result.duration
all_words = transcriptions_result.all_words
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery) # Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
created_padded_files = paddings_result.created_padded_files created_padded_files = track_result.created_padded_files
if created_padded_files: if created_padded_files:
ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files") ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files")
storage = _spawn_storage() storage = _spawn_storage()
@@ -1182,7 +1143,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415 from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
TranscriptText, TranscriptText,
transcripts_controller, transcripts_controller,
) )
@@ -1191,8 +1151,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
if transcript is None: if transcript is None:
raise ValueError(f"Transcript {input.transcript_id} not found in database") raise ValueError(f"Transcript {input.transcript_id} not found in database")
merged_transcript = TranscriptType(words=all_words, translation=None)
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, input.transcript_id,
transcript, transcript,
@@ -1204,21 +1162,15 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
logger=logger, logger=logger,
) )
# Save duration and clear workflow_run_id (workflow completed successfully) # Clear workflow_run_id (workflow completed successfully)
# Note: title/long_summary/short_summary already saved by their callbacks # Note: title/long_summary/short_summary/duration already saved by their callbacks
await transcripts_controller.update( await transcripts_controller.update(
transcript, transcript,
{ {
"duration": duration,
"workflow_run_id": None, # Clear on success - no need to resume "workflow_run_id": None, # Clear on success - no need to resume
}, },
) )
duration_data = TranscriptDuration(duration=duration)
await append_event_and_broadcast(
input.transcript_id, transcript, "DURATION", duration_data, logger=logger
)
await set_status_and_broadcast(input.transcript_id, "ended", logger=logger) await set_status_and_broadcast(input.transcript_id, "ended", logger=logger)
ctx.log( ctx.log(

View File

@@ -21,14 +21,12 @@ class ParticipantInfo(BaseModel):
class PadTrackResult(BaseModel): class PadTrackResult(BaseModel):
"""Result from pad_track task. """Result from pad_track task."""
If size=0, track required no padding and padded_key contains original S3 key. padded_key: NonEmptyString # S3 key (not presigned URL) - presign on demand to avoid stale URLs on replay
If size>0, track was padded and padded_key contains new padded file S3 key. bucket_name: (
""" NonEmptyString | None
) # None means use default transcript storage bucket
padded_key: NonEmptyString
bucket_name: NonEmptyString | None
size: int size: int
track_index: int track_index: int
@@ -61,25 +59,18 @@ class PaddedTrackInfo(BaseModel):
"""Info for a padded track - S3 key + bucket for on-demand presigning.""" """Info for a padded track - S3 key + bucket for on-demand presigning."""
key: NonEmptyString key: NonEmptyString
bucket_name: NonEmptyString | None bucket_name: NonEmptyString | None # None = use default storage bucket
track_index: int
class ProcessPaddingsResult(BaseModel): class ProcessTracksResult(BaseModel):
"""Result from process_paddings task.""" """Result from process_tracks task."""
padded_tracks: list[PaddedTrackInfo]
num_tracks: int
created_padded_files: list[NonEmptyString]
class ProcessTranscriptionsResult(BaseModel):
"""Result from process_transcriptions task."""
all_words: list[Word] all_words: list[Word]
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
word_count: int word_count: int
num_tracks: int num_tracks: int
target_language: NonEmptyString target_language: NonEmptyString
created_padded_files: list[NonEmptyString]
class MixdownResult(BaseModel): class MixdownResult(BaseModel):

View File

@@ -1,11 +1,9 @@
""" """
Hatchet child workflow: PaddingWorkflow Hatchet child workflow: PaddingWorkflow
Handles individual audio track padding only. Handles individual audio track padding via Modal.com backend.
""" """
import tempfile
from datetime import timedelta from datetime import timedelta
from pathlib import Path
import av import av
from hatchet_sdk import Context from hatchet_sdk import Context
@@ -16,10 +14,7 @@ from reflector.hatchet.constants import TIMEOUT_AUDIO
from reflector.hatchet.workflows.models import PadTrackResult from reflector.hatchet.workflows.models import PadTrackResult
from reflector.logger import logger from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import ( from reflector.utils.audio_padding import extract_stream_start_time_from_container
apply_audio_padding_to_file,
extract_stream_start_time_from_container,
)
class PaddingInput(BaseModel): class PaddingInput(BaseModel):
@@ -68,61 +63,83 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
bucket=input.bucket_name, bucket=input.bucket_name,
) )
# Extract start_time to determine if padding needed
with av.open(source_url) as in_container: with av.open(source_url) as in_container:
with av.open(source_url) as in_container: if in_container.duration:
if in_container.duration: try:
try: duration = timedelta(seconds=in_container.duration // 1_000_000)
duration = timedelta(seconds=in_container.duration // 1_000_000) ctx.log(
ctx.log( f"pad_track: track {input.track_index}, duration={duration}"
f"pad_track: track {input.track_index}, duration={duration}" )
) except (ValueError, TypeError, OverflowError) as e:
except (ValueError, TypeError, OverflowError) as e: ctx.log(
ctx.log( f"pad_track: track {input.track_index}, duration error: {str(e)}"
f"pad_track: track {input.track_index}, duration error: {str(e)}" )
)
start_time_seconds = extract_stream_start_time_from_container( start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger in_container, input.track_index, logger=logger
) )
if start_time_seconds <= 0: if start_time_seconds <= 0:
logger.info( logger.info(
f"Track {input.track_index} requires no padding", f"Track {input.track_index} requires no padding",
track_index=input.track_index, track_index=input.track_index,
) )
return PadTrackResult( return PadTrackResult(
padded_key=input.s3_key, padded_key=input.s3_key,
bucket_name=input.bucket_name, bucket_name=input.bucket_name,
size=0, size=0,
track_index=input.track_index, track_index=input.track_index,
) )
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm" storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file: # Presign PUT URL for output (Modal will upload directly)
temp_path = temp_file.name output_url = await storage.get_file_url(
storage_path,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
try: import httpx # noqa: PLC0415
apply_audio_padding_to_file(
in_container,
temp_path,
start_time_seconds,
input.track_index,
logger=logger,
)
file_size = Path(temp_path).stat().st_size from reflector.processors.audio_padding_modal import ( # noqa: PLC0415
AudioPaddingModalProcessor,
)
with open(temp_path, "rb") as padded_file: try:
await storage.put_file(storage_path, padded_file) processor = AudioPaddingModalProcessor()
result = await processor.pad_track(
track_url=source_url,
output_url=output_url,
start_time_seconds=start_time_seconds,
track_index=input.track_index,
)
file_size = result.size
logger.info( ctx.log(f"pad_track: Modal returned size={file_size}")
f"Uploaded padded track to S3", except httpx.HTTPStatusError as e:
key=storage_path, error_detail = e.response.text if hasattr(e.response, "text") else str(e)
size=file_size, logger.error(
) "[Hatchet] Modal padding HTTP error",
finally: transcript_id=input.transcript_id,
Path(temp_path).unlink(missing_ok=True) track_index=input.track_index,
status_code=e.response.status_code if hasattr(e, "response") else None,
error=error_detail,
exc_info=True,
)
raise Exception(
f"Modal padding failed: HTTP {e.response.status_code}"
) from e
except httpx.TimeoutException as e:
logger.error(
"[Hatchet] Modal padding timeout",
transcript_id=input.transcript_id,
track_index=input.track_index,
error=str(e),
exc_info=True,
)
raise Exception("Modal padding timeout") from e
logger.info( logger.info(
"[Hatchet] pad_track complete", "[Hatchet] pad_track complete",

View File

@@ -0,0 +1,205 @@
"""
Hatchet child workflow: TrackProcessing
Handles individual audio track processing: padding and transcription.
Spawned dynamically by the main diarization pipeline for each track.
Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline)
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
Note: This file uses deferred imports (inside tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure
storage/DB connections are not shared across forks.
"""
from datetime import timedelta
import av
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import extract_stream_start_time_from_container
class TrackInput(BaseModel):
"""Input for individual track processing."""
track_index: int
s3_key: str
bucket_name: str
transcript_id: str
language: str = "en"
hatchet = HatchetClientManager.get_client()
track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput)
@track_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
"""Pad single audio track with silence for alignment.
Extracts stream.start_time from WebM container metadata and applies
silence padding using PyAV filter graph (adelay).
"""
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
logger.info(
"[Hatchet] pad_track",
track_index=input.track_index,
s3_key=input.s3_key,
transcript_id=input.transcript_id,
)
try:
# Create fresh storage instance to avoid aioboto3 fork issues
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
source_url = await storage.get_file_url(
input.s3_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=input.bucket_name,
)
with av.open(source_url) as in_container:
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
# If no padding needed, return original S3 key
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
# Presign PUT URL for output (Modal uploads directly)
output_url = await storage.get_file_url(
storage_path,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
from reflector.processors.audio_padding_modal import ( # noqa: PLC0415
AudioPaddingModalProcessor,
)
processor = AudioPaddingModalProcessor()
result = await processor.pad_track(
track_url=source_url,
output_url=output_url,
start_time_seconds=start_time_seconds,
track_index=input.track_index,
)
file_size = result.size
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
logger.info(
"[Hatchet] pad_track complete",
track_index=input.track_index,
padded_key=storage_path,
)
# Return S3 key (not presigned URL) - consumer tasks presign on demand
# This avoids stale URLs when workflow is replayed
return PadTrackResult(
padded_key=storage_path,
bucket_name=None, # None = use default transcript storage bucket
size=file_size,
track_index=input.track_index,
)
except Exception as e:
logger.error("[Hatchet] pad_track failed", error=str(e), exc_info=True)
raise
@track_workflow.task(
parents=[pad_track], execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3
)
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
ctx.log(f"transcribe_track: track {input.track_index}, language={input.language}")
logger.info(
"[Hatchet] transcribe_track",
track_index=input.track_index,
language=input.language,
)
try:
pad_result = ctx.task_output(pad_track)
padded_key = pad_result.padded_key
bucket_name = pad_result.bucket_name
if not padded_key:
raise ValueError("Missing padded_key from pad_track")
# Presign URL on demand (avoids stale URLs on workflow replay)
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
audio_url = await storage.get_file_url(
padded_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=bucket_name,
)
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
transcribe_file_with_processor,
)
transcript = await transcribe_file_with_processor(audio_url, input.language)
# Tag all words with speaker index
for word in transcript.words:
word.speaker = input.track_index
ctx.log(
f"transcribe_track complete: track {input.track_index}, {len(transcript.words)} words"
)
logger.info(
"[Hatchet] transcribe_track complete",
track_index=input.track_index,
word_count=len(transcript.words),
)
return TranscribeTrackResult(
words=transcript.words,
track_index=input.track_index,
)
except Exception as e:
logger.error("[Hatchet] transcribe_track failed", error=str(e), exc_info=True)
raise

View File

@@ -1,98 +0,0 @@
"""
Hatchet child workflow: TranscriptionWorkflow
Handles individual audio track transcription only.
"""
from datetime import timedelta
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import TIMEOUT_HEAVY
from reflector.hatchet.workflows.models import TranscribeTrackResult
from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
class TranscriptionInput(BaseModel):
"""Input for individual track transcription."""
track_index: int
padded_key: str # S3 key from padding step
bucket_name: str | None # None = use default bucket
language: str = "en"
hatchet = HatchetClientManager.get_client()
transcription_workflow = hatchet.workflow(
name="TranscriptionWorkflow", input_validator=TranscriptionInput
)
@transcription_workflow.task(
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3
)
async def transcribe_track(
input: TranscriptionInput, ctx: Context
) -> TranscribeTrackResult:
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
ctx.log(f"transcribe_track: track {input.track_index}, language={input.language}")
logger.info(
"[Hatchet] transcribe_track",
track_index=input.track_index,
language=input.language,
)
try:
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
audio_url = await storage.get_file_url(
input.padded_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=input.bucket_name,
)
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
transcribe_file_with_processor,
)
transcript = await transcribe_file_with_processor(audio_url, input.language)
for word in transcript.words:
word.speaker = input.track_index
ctx.log(
f"transcribe_track complete: track {input.track_index}, {len(transcript.words)} words"
)
logger.info(
"[Hatchet] transcribe_track complete",
track_index=input.track_index,
word_count=len(transcript.words),
)
return TranscribeTrackResult(
words=transcript.words,
track_index=input.track_index,
)
except Exception as e:
logger.error(
"[Hatchet] transcribe_track failed",
track_index=input.track_index,
padded_key=input.padded_key,
language=input.language,
error=str(e),
exc_info=True,
)
raise

View File

@@ -0,0 +1,113 @@
"""
Modal.com backend for audio padding.
"""
import asyncio
import os
import httpx
from pydantic import BaseModel
from reflector.hatchet.constants import TIMEOUT_AUDIO
from reflector.logger import logger
class PaddingResponse(BaseModel):
size: int
cancelled: bool = False
class AudioPaddingModalProcessor:
"""Audio padding processor using Modal.com CPU backend via HTTP."""
def __init__(
self, padding_url: str | None = None, modal_api_key: str | None = None
):
self.padding_url = padding_url or os.getenv("PADDING_URL")
if not self.padding_url:
raise ValueError(
"PADDING_URL required to use AudioPaddingModalProcessor. "
"Set PADDING_URL environment variable or pass padding_url parameter."
)
self.modal_api_key = modal_api_key or os.getenv("MODAL_API_KEY")
async def pad_track(
self,
track_url: str,
output_url: str,
start_time_seconds: float,
track_index: int,
) -> PaddingResponse:
"""Pad audio track with silence via Modal backend.
Args:
track_url: Presigned GET URL for source audio track
output_url: Presigned PUT URL for output WebM
start_time_seconds: Amount of silence to prepend
track_index: Track index for logging
"""
if not track_url:
raise ValueError("track_url cannot be empty")
if start_time_seconds <= 0:
raise ValueError(
f"start_time_seconds must be positive, got {start_time_seconds}"
)
log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds)
log.info("Sending Modal padding HTTP request")
url = f"{self.padding_url}/pad"
headers = {}
if self.modal_api_key:
headers["Authorization"] = f"Bearer {self.modal_api_key}"
try:
async with httpx.AsyncClient(timeout=TIMEOUT_AUDIO) as client:
response = await client.post(
url,
headers=headers,
json={
"track_url": track_url,
"output_url": output_url,
"start_time_seconds": start_time_seconds,
"track_index": track_index,
},
follow_redirects=True,
)
if response.status_code != 200:
error_body = response.text
log.error(
"Modal padding API error",
status_code=response.status_code,
error_body=error_body,
)
response.raise_for_status()
result = response.json()
# Check if work was cancelled
if result.get("cancelled"):
log.warning("Modal padding was cancelled by disconnect detection")
raise asyncio.CancelledError(
"Padding cancelled due to client disconnect"
)
log.info("Modal padding complete", size=result["size"])
return PaddingResponse(**result)
except asyncio.CancelledError:
log.warning(
"Modal padding cancelled (Hatchet timeout, disconnect detected on Modal side)"
)
raise
except httpx.TimeoutException as e:
log.error("Modal padding timeout", error=str(e), exc_info=True)
raise Exception(f"Modal padding timeout: {e}") from e
except httpx.HTTPStatusError as e:
log.error("Modal padding HTTP error", error=str(e), exc_info=True)
raise Exception(f"Modal padding HTTP error: {e}") from e
except Exception as e:
log.error("Modal padding unexpected error", error=str(e), exc_info=True)
raise

View File

@@ -98,6 +98,10 @@ class Settings(BaseSettings):
# Diarization: local pyannote.audio # Diarization: local pyannote.audio
DIARIZATION_PYANNOTE_AUTH_TOKEN: str | None = None DIARIZATION_PYANNOTE_AUTH_TOKEN: str | None = None
# Audio Padding (Modal.com backend)
PADDING_URL: str | None = None
PADDING_MODAL_API_KEY: str | None = None
# Sentry # Sentry
SENTRY_DSN: str | None = None SENTRY_DSN: str | None = None

View File

@@ -5,7 +5,9 @@ Used by both Hatchet workflows and Celery pipelines for consistent audio encodin
""" """
# Opus codec settings # Opus codec settings
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_STANDARD_SAMPLE_RATE = 48000 OPUS_STANDARD_SAMPLE_RATE = 48000
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_DEFAULT_BIT_RATE = 128000 # 128kbps for good speech quality OPUS_DEFAULT_BIT_RATE = 128000 # 128kbps for good speech quality
# S3 presigned URL expiration # S3 presigned URL expiration

45
server/uv.lock generated
View File

@@ -159,21 +159,20 @@ wheels = [
[[package]] [[package]]
name = "aiortc" name = "aiortc"
version = "1.13.0" version = "1.14.0"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "aioice" }, { name = "aioice" },
{ name = "av" }, { name = "av" },
{ name = "cffi" },
{ name = "cryptography" }, { name = "cryptography" },
{ name = "google-crc32c" }, { name = "google-crc32c" },
{ name = "pyee" }, { name = "pyee" },
{ name = "pylibsrtp" }, { name = "pylibsrtp" },
{ name = "pyopenssl" }, { name = "pyopenssl" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/62/03/bc947d74c548e0c17cf94e5d5bdacaed0ee9e5b2bb7b8b8cf1ac7a7c01ec/aiortc-1.13.0.tar.gz", hash = "sha256:5d209975c22d0910fb5a0f0e2caa828f2da966c53580f7c7170ac3a16a871620", size = 1179894 } sdist = { url = "https://files.pythonhosted.org/packages/51/9c/4e027bfe0195de0442da301e2389329496745d40ae44d2d7c4571c4290ce/aiortc-1.14.0.tar.gz", hash = "sha256:adc8a67ace10a085721e588e06a00358ed8eaf5f6b62f0a95358ff45628dd762", size = 1180864 }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/87/29/765633cab5f1888890f5f172d1d53009b9b14e079cdfa01a62d9896a9ea9/aiortc-1.13.0-py3-none-any.whl", hash = "sha256:9ccccec98796f6a96bd1c3dd437a06da7e0f57521c96bd56e4b965a91b03a0a0", size = 92910 }, { url = "https://files.pythonhosted.org/packages/57/ab/31646a49209568cde3b97eeade0d28bb78b400e6645c56422c101df68932/aiortc-1.14.0-py3-none-any.whl", hash = "sha256:4b244d7e482f4e1f67e685b3468269628eca1ec91fa5b329ab517738cfca086e", size = 93183 },
] ]
[[package]] [[package]]
@@ -327,28 +326,24 @@ wheels = [
[[package]] [[package]]
name = "av" name = "av"
version = "14.4.0" version = "16.1.0"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/86/f6/0b473dab52dfdea05f28f3578b1c56b6c796ce85e76951bab7c4e38d5a74/av-14.4.0.tar.gz", hash = "sha256:3ecbf803a7fdf67229c0edada0830d6bfaea4d10bfb24f0c3f4e607cd1064b42", size = 3892203 } sdist = { url = "https://files.pythonhosted.org/packages/78/cd/3a83ffbc3cc25b39721d174487fb0d51a76582f4a1703f98e46170ce83d4/av-16.1.0.tar.gz", hash = "sha256:a094b4fd87a3721dacf02794d3d2c82b8d712c85b9534437e82a8a978c175ffd", size = 4285203 }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/18/8a/d57418b686ffd05fabd5a0a9cfa97e63b38c35d7101af00e87c51c8cc43c/av-14.4.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:5b21d5586a88b9fce0ab78e26bd1c38f8642f8e2aad5b35e619f4d202217c701", size = 19965048 }, { url = "https://files.pythonhosted.org/packages/48/d0/b71b65d1b36520dcb8291a2307d98b7fc12329a45614a303ff92ada4d723/av-16.1.0-cp311-cp311-macosx_11_0_x86_64.whl", hash = "sha256:e88ad64ee9d2b9c4c5d891f16c22ae78e725188b8926eb88187538d9dd0b232f", size = 26927747 },
{ url = "https://files.pythonhosted.org/packages/f5/aa/3f878b0301efe587e9b07bb773dd6b47ef44ca09a3cffb4af50c08a170f3/av-14.4.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:cf8762d90b0f94a20c9f6e25a94f1757db5a256707964dfd0b1d4403e7a16835", size = 23750064 }, { url = "https://files.pythonhosted.org/packages/2f/79/720a5a6ccdee06eafa211b945b0a450e3a0b8fc3d12922f0f3c454d870d2/av-16.1.0-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:cb296073fa6935724de72593800ba86ae49ed48af03960a4aee34f8a611f442b", size = 21492232 },
{ url = "https://files.pythonhosted.org/packages/9a/b4/6fe94a31f9ed3a927daa72df67c7151968587106f30f9f8fcd792b186633/av-14.4.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c0ac9f08920c7bbe0795319689d901e27cb3d7870b9a0acae3f26fc9daa801a6", size = 33648775 }, { url = "https://files.pythonhosted.org/packages/8e/4f/a1ba8d922f2f6d1a3d52419463ef26dd6c4d43ee364164a71b424b5ae204/av-16.1.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:720edd4d25aa73723c1532bb0597806d7b9af5ee34fc02358782c358cfe2f879", size = 39291737 },
{ url = "https://files.pythonhosted.org/packages/6c/f3/7f3130753521d779450c935aec3f4beefc8d4645471159f27b54e896470c/av-14.4.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a56d9ad2afdb638ec0404e962dc570960aae7e08ae331ad7ff70fbe99a6cf40e", size = 32216915 }, { url = "https://files.pythonhosted.org/packages/1a/31/fc62b9fe8738d2693e18d99f040b219e26e8df894c10d065f27c6b4f07e3/av-16.1.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:c7f2bc703d0df260a1fdf4de4253c7f5500ca9fc57772ea241b0cb241bcf972e", size = 40846822 },
{ url = "https://files.pythonhosted.org/packages/f8/9a/8ffabfcafb42154b4b3a67d63f9b69e68fa8c34cb39ddd5cb813dd049ed4/av-14.4.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bed513cbcb3437d0ae47743edc1f5b4a113c0b66cdd4e1aafc533abf5b2fbf2", size = 35287279 }, { url = "https://files.pythonhosted.org/packages/53/10/ab446583dbce730000e8e6beec6ec3c2753e628c7f78f334a35cad0317f4/av-16.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d69c393809babada7d54964d56099e4b30a3e1f8b5736ca5e27bd7be0e0f3c83", size = 40675604 },
{ url = "https://files.pythonhosted.org/packages/ad/11/7023ba0a2ca94a57aedf3114ab8cfcecb0819b50c30982a4c5be4d31df41/av-14.4.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d030c2d3647931e53d51f2f6e0fcf465263e7acf9ec6e4faa8dbfc77975318c3", size = 36294683 }, { url = "https://files.pythonhosted.org/packages/31/d7/1003be685277005f6d63fd9e64904ee222fe1f7a0ea70af313468bb597db/av-16.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:441892be28582356d53f282873c5a951592daaf71642c7f20165e3ddcb0b4c63", size = 42015955 },
{ url = "https://files.pythonhosted.org/packages/3d/fa/b8ac9636bd5034e2b899354468bef9f4dadb067420a16d8a493a514b7817/av-14.4.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1cc21582a4f606271d8c2036ec7a6247df0831050306c55cf8a905701d0f0474", size = 34552391 }, { url = "https://files.pythonhosted.org/packages/2f/4a/fa2a38ee9306bf4579f556f94ecbc757520652eb91294d2a99c7cf7623b9/av-16.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:273a3e32de64819e4a1cd96341824299fe06f70c46f2288b5dc4173944f0fd62", size = 31750339 },
{ url = "https://files.pythonhosted.org/packages/fb/29/0db48079c207d1cba7a2783896db5aec3816e17de55942262c244dffbc0f/av-14.4.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ce7c9cd452153d36f1b1478f904ed5f9ab191d76db873bdd3a597193290805d4", size = 37265250 }, { url = "https://files.pythonhosted.org/packages/9c/84/2535f55edcd426cebec02eb37b811b1b0c163f26b8d3f53b059e2ec32665/av-16.1.0-cp312-cp312-macosx_11_0_x86_64.whl", hash = "sha256:640f57b93f927fba8689f6966c956737ee95388a91bd0b8c8b5e0481f73513d6", size = 26945785 },
{ url = "https://files.pythonhosted.org/packages/1c/55/715858c3feb7efa4d667ce83a829c8e6ee3862e297fb2b568da3f968639d/av-14.4.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd261e31cc6b43ca722f80656c39934199d8f2eb391e0147e704b6226acebc29", size = 27925845 }, { url = "https://files.pythonhosted.org/packages/b6/17/ffb940c9e490bf42e86db4db1ff426ee1559cd355a69609ec1efe4d3a9eb/av-16.1.0-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:ae3fb658eec00852ebd7412fdc141f17f3ddce8afee2d2e1cf366263ad2a3b35", size = 21481147 },
{ url = "https://files.pythonhosted.org/packages/a6/75/b8641653780336c90ba89e5352cac0afa6256a86a150c7703c0b38851c6d/av-14.4.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:a53e682b239dd23b4e3bc9568cfb1168fc629ab01925fdb2e7556eb426339e94", size = 19954125 }, { url = "https://files.pythonhosted.org/packages/15/c1/e0d58003d2d83c3921887d5c8c9b8f5f7de9b58dc2194356a2656a45cfdc/av-16.1.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:27ee558d9c02a142eebcbe55578a6d817fedfde42ff5676275504e16d07a7f86", size = 39517197 },
{ url = "https://files.pythonhosted.org/packages/99/e6/37fe6fa5853a48d54d749526365780a63a4bc530be6abf2115e3a21e292a/av-14.4.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5aa0b901751a32703fa938d2155d56ce3faf3630e4a48d238b35d2f7e49e5395", size = 23751479 }, { url = "https://files.pythonhosted.org/packages/32/77/787797b43475d1b90626af76f80bfb0c12cfec5e11eafcfc4151b8c80218/av-16.1.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:7ae547f6d5fa31763f73900d43901e8c5fa6367bb9a9840978d57b5a7ae14ed2", size = 41174337 },
{ url = "https://files.pythonhosted.org/packages/f7/75/9a5f0e6bda5f513b62bafd1cff2b495441a8b07ab7fb7b8e62f0c0d1683f/av-14.4.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a3b316fed3597675fe2aacfed34e25fc9d5bb0196dc8c0b014ae5ed4adda48de", size = 33801401 }, { url = "https://files.pythonhosted.org/packages/8e/ac/d90df7f1e3b97fc5554cf45076df5045f1e0a6adf13899e10121229b826c/av-16.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:8cf065f9d438e1921dc31fc7aa045790b58aee71736897866420d80b5450f62a", size = 40817720 },
{ url = "https://files.pythonhosted.org/packages/6a/c9/e4df32a2ad1cb7f3a112d0ed610c5e43c89da80b63c60d60e3dc23793ec0/av-14.4.0-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a587b5c5014c3c0e16143a0f8d99874e46b5d0c50db6111aa0b54206b5687c81", size = 32364330 }, { url = "https://files.pythonhosted.org/packages/80/6f/13c3a35f9dbcebafd03fe0c4cbd075d71ac8968ec849a3cfce406c35a9d2/av-16.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:a345877a9d3cc0f08e2bc4ec163ee83176864b92587afb9d08dff50f37a9a829", size = 42267396 },
{ url = "https://files.pythonhosted.org/packages/ca/f0/64e7444a41817fde49a07d0239c033f7e9280bec4a4bb4784f5c79af95e6/av-14.4.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10d53f75e8ac1ec8877a551c0db32a83c0aaeae719d05285281eaaba211bbc30", size = 35519508 }, { url = "https://files.pythonhosted.org/packages/c8/b9/275df9607f7fb44317ccb1d4be74827185c0d410f52b6e2cd770fe209118/av-16.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:f49243b1d27c91cd8c66fdba90a674e344eb8eb917264f36117bf2b6879118fd", size = 31752045 },
{ url = "https://files.pythonhosted.org/packages/c2/a8/a370099daa9033a3b6f9b9bd815304b3d8396907a14d09845f27467ba138/av-14.4.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:c8558cfde79dd8fc92d97c70e0f0fa8c94c7a66f68ae73afdf58598f0fe5e10d", size = 36448593 },
{ url = "https://files.pythonhosted.org/packages/27/bb/edb6ceff8fa7259cb6330c51dbfbc98dd1912bd6eb5f7bc05a4bb14a9d6e/av-14.4.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:455b6410dea0ab2d30234ffb28df7d62ca3cdf10708528e247bec3a4cdcced09", size = 34701485 },
{ url = "https://files.pythonhosted.org/packages/a7/8a/957da1f581aa1faa9a5dfa8b47ca955edb47f2b76b949950933b457bfa1d/av-14.4.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1661efbe9d975f927b8512d654704223d936f39016fad2ddab00aee7c40f412c", size = 37521981 },
{ url = "https://files.pythonhosted.org/packages/28/76/3f1cf0568592f100fd68eb40ed8c491ce95ca3c1378cc2d4c1f6d1bd295d/av-14.4.0-cp312-cp312-win_amd64.whl", hash = "sha256:fbbeef1f421a3461086853d6464ad5526b56ffe8ccb0ab3fd0a1f121dfbf26ad", size = 27925944 },
] ]
[[package]] [[package]]
@@ -3267,7 +3262,7 @@ requires-dist = [
{ name = "aiohttp-cors", specifier = ">=0.7.0" }, { name = "aiohttp-cors", specifier = ">=0.7.0" },
{ name = "aiortc", specifier = ">=1.5.0" }, { name = "aiortc", specifier = ">=1.5.0" },
{ name = "alembic", specifier = ">=1.11.3" }, { name = "alembic", specifier = ">=1.11.3" },
{ name = "av", specifier = ">=10.0.0" }, { name = "av", specifier = ">=15.0.0" },
{ name = "celery", specifier = ">=5.3.4" }, { name = "celery", specifier = ">=5.3.4" },
{ name = "databases", extras = ["aiosqlite", "asyncpg"], specifier = ">=0.7.0" }, { name = "databases", extras = ["aiosqlite", "asyncpg"], specifier = ">=0.7.0" },
{ name = "fastapi", extras = ["standard"], specifier = ">=0.100.1" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.100.1" },