From 7fde64e2529a1d37b0f7507c62d983a7bd0b5b89 Mon Sep 17 00:00:00 2001 From: Igor Monadical Date: Fri, 30 Jan 2026 13:11:51 -0500 Subject: [PATCH] feat: modal padding (#837) * Add Modal backend for audio padding - Create reflector_padding.py Modal deployment (CPU-based) - Add PaddingWorkflow with conditional Modal/local backend - Update deploy-all.sh to include padding deployment --------- Co-authored-by: Igor Loskutov --- .gitignore | 1 + gpu/modal_deployments/deploy-all.sh | 11 + gpu/modal_deployments/reflector_padding.py | 277 ++++++++++++++++++ server/pyproject.toml | 2 +- server/reflector/hatchet/constants.py | 2 +- .../hatchet/workflows/padding_workflow.py | 165 +++++++++++ .../hatchet/workflows/track_processing.py | 86 ++---- .../processors/audio_padding_modal.py | 112 +++++++ server/reflector/settings.py | 4 + server/reflector/utils/audio_constants.py | 2 + server/uv.lock | 45 ++- 11 files changed, 625 insertions(+), 82 deletions(-) create mode 100644 gpu/modal_deployments/reflector_padding.py create mode 100644 server/reflector/hatchet/workflows/padding_workflow.py create mode 100644 server/reflector/processors/audio_padding_modal.py diff --git a/.gitignore b/.gitignore index 1eb3a8bb..2cebdf5c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ .DS_Store server/.env +server/.env.production .env Caddyfile server/exportdanswer diff --git a/gpu/modal_deployments/deploy-all.sh b/gpu/modal_deployments/deploy-all.sh index 42e589a2..8eae3e66 100755 --- a/gpu/modal_deployments/deploy-all.sh +++ b/gpu/modal_deployments/deploy-all.sh @@ -131,6 +131,15 @@ if [ -z "$DIARIZER_URL" ]; then fi echo " -> $DIARIZER_URL" +echo "" +echo "Deploying padding (CPU audio processing via Modal SDK)..." +modal deploy reflector_padding.py +if [ $? -ne 0 ]; then + echo "Error: Failed to deploy padding. Check Modal dashboard for details." + exit 1 +fi +echo " -> reflector-padding.pad_track (Modal SDK function)" + # --- Output Configuration --- echo "" echo "==========================================" @@ -147,4 +156,6 @@ echo "" echo "DIARIZATION_BACKEND=modal" echo "DIARIZATION_URL=$DIARIZER_URL" echo "DIARIZATION_MODAL_API_KEY=$API_KEY" +echo "" +echo "# Padding uses Modal SDK (requires MODAL_TOKEN_ID/SECRET in worker containers)" echo "# --- End Modal Configuration ---" diff --git a/gpu/modal_deployments/reflector_padding.py b/gpu/modal_deployments/reflector_padding.py new file mode 100644 index 00000000..809131e6 --- /dev/null +++ b/gpu/modal_deployments/reflector_padding.py @@ -0,0 +1,277 @@ +""" +Reflector GPU backend - audio padding +====================================== + +CPU-intensive audio padding service for adding silence to audio tracks. +Uses PyAV filter graph (adelay) for precise track synchronization. + +IMPORTANT: This padding logic is duplicated from server/reflector/utils/audio_padding.py +for Modal deployment isolation (Modal can't import from server/reflector/). If you modify +the PyAV filter graph or padding algorithm, you MUST update both: + - gpu/modal_deployments/reflector_padding.py (this file) + - server/reflector/utils/audio_padding.py + +Constants duplicated from server/reflector/utils/audio_constants.py for same reason. +""" + +import os +import tempfile +from fractions import Fraction +import math +import asyncio + +import modal + +S3_TIMEOUT = 60 # happens 2 times +PADDING_TIMEOUT = 600 + (S3_TIMEOUT * 2) +SCALEDOWN_WINDOW = 60 # The maximum duration (in seconds) that individual containers can remain idle when scaling down. +DISCONNECT_CHECK_INTERVAL = 2 # Check for client disconnect + + +app = modal.App("reflector-padding") + +# CPU-based image +image = ( + modal.Image.debian_slim(python_version="3.12") + .apt_install("ffmpeg") # Required by PyAV + .pip_install( + "av==13.1.0", # PyAV for audio processing + "requests==2.32.3", # HTTP for presigned URL downloads/uploads + "fastapi==0.115.12", # API framework + ) +) + +# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711 +OPUS_STANDARD_SAMPLE_RATE = 48000 +# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711 +OPUS_DEFAULT_BIT_RATE = 128000 + + +@app.function( + cpu=2.0, + timeout=PADDING_TIMEOUT, + scaledown_window=SCALEDOWN_WINDOW, + image=image, +) +@modal.asgi_app() +def web(): + from fastapi import FastAPI, Request, HTTPException + from pydantic import BaseModel + + class PaddingRequest(BaseModel): + track_url: str + output_url: str + start_time_seconds: float + track_index: int + + class PaddingResponse(BaseModel): + size: int + cancelled: bool = False + + web_app = FastAPI() + + @web_app.post("/pad") + async def pad_track_endpoint(request: Request, req: PaddingRequest) -> PaddingResponse: + """Modal web endpoint for padding audio tracks with disconnect detection. + """ + import logging + + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + logger = logging.getLogger(__name__) + + if not req.track_url: + raise HTTPException(status_code=400, detail="track_url cannot be empty") + if not req.output_url: + raise HTTPException(status_code=400, detail="output_url cannot be empty") + if req.start_time_seconds <= 0: + raise HTTPException(status_code=400, detail=f"start_time_seconds must be positive, got {req.start_time_seconds}") + if req.start_time_seconds > 18000: + raise HTTPException(status_code=400, detail=f"start_time_seconds exceeds maximum 18000s (5 hours)") + + logger.info(f"Padding request: track {req.track_index}, delay={req.start_time_seconds}s") + + # Thread-safe cancellation flag shared between async disconnect checker and blocking thread + import threading + cancelled = threading.Event() + + async def check_disconnect(): + """Background task to check for client disconnect every 2 seconds.""" + while not cancelled.is_set(): + await asyncio.sleep(DISCONNECT_CHECK_INTERVAL) + if await request.is_disconnected(): + logger.warning("Client disconnected, setting cancellation flag") + cancelled.set() + break + + # Start disconnect checker in background + disconnect_task = asyncio.create_task(check_disconnect()) + + try: + result = await asyncio.get_event_loop().run_in_executor( + None, _pad_track_blocking, req, cancelled, logger + ) + return PaddingResponse(**result) + finally: + cancelled.set() + disconnect_task.cancel() + try: + await disconnect_task + except asyncio.CancelledError: + pass + + def _pad_track_blocking(req, cancelled, logger) -> dict: + """Blocking CPU-bound padding work with periodic cancellation checks. + + Args: + cancelled: threading.Event for thread-safe cancellation signaling + """ + import av + import requests + from av.audio.resampler import AudioResampler + import time + + temp_dir = tempfile.mkdtemp() + input_path = None + output_path = None + last_check = time.time() + + try: + logger.info("Downloading track for padding") + response = requests.get(req.track_url, stream=True, timeout=S3_TIMEOUT) + response.raise_for_status() + + input_path = os.path.join(temp_dir, "track.webm") + total_bytes = 0 + chunk_count = 0 + with open(input_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + total_bytes += len(chunk) + chunk_count += 1 + + # Check for cancellation every arbitrary amount of chunks + if chunk_count % 12 == 0: + now = time.time() + if now - last_check >= DISCONNECT_CHECK_INTERVAL: + if cancelled.is_set(): + logger.info("Cancelled during download, exiting early") + return {"size": 0, "cancelled": True} + last_check = now + logger.info(f"Track downloaded: {total_bytes} bytes") + + if cancelled.is_set(): + logger.info("Cancelled after download, exiting early") + return {"size": 0, "cancelled": True} + + # Apply padding using PyAV + output_path = os.path.join(temp_dir, "padded.webm") + delay_ms = math.floor(req.start_time_seconds * 1000) + logger.info(f"Padding track {req.track_index} with {delay_ms}ms delay using PyAV") + + in_container = av.open(input_path) + in_stream = next((s for s in in_container.streams if s.type == "audio"), None) + if in_stream is None: + raise ValueError("No audio stream in input") + + with av.open(output_path, "w", format="webm") as out_container: + out_stream = out_container.add_stream("libopus", rate=OPUS_STANDARD_SAMPLE_RATE) + out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE + graph = av.filter.Graph() + + abuf_args = ( + f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:" + f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:" + f"sample_fmt=s16:" + f"channel_layout=stereo" + ) + src = graph.add("abuffer", args=abuf_args, name="src") + aresample_f = graph.add("aresample", args="async=1", name="ares") + delays_arg = f"{delay_ms}|{delay_ms}" + adelay_f = graph.add("adelay", args=f"delays={delays_arg}:all=1", name="delay") + sink = graph.add("abuffersink", name="sink") + + src.link_to(aresample_f) + aresample_f.link_to(adelay_f) + adelay_f.link_to(sink) + graph.configure() + + resampler = AudioResampler( + format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE + ) + + for frame in in_container.decode(in_stream): + # Check for cancellation periodically + now = time.time() + if now - last_check >= DISCONNECT_CHECK_INTERVAL: + if cancelled.is_set(): + logger.info("Cancelled during processing, exiting early") + in_container.close() + return {"size": 0, "cancelled": True} + last_check = now + + out_frames = resampler.resample(frame) or [] + for rframe in out_frames: + rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE + rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + src.push(rframe) + + while True: + try: + f_out = sink.pull() + except Exception: + break + f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE + f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + for packet in out_stream.encode(f_out): + out_container.mux(packet) + + # Flush filter graph + src.push(None) + while True: + try: + f_out = sink.pull() + except Exception: + break + f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE + f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + for packet in out_stream.encode(f_out): + out_container.mux(packet) + + # Flush encoder + for packet in out_stream.encode(None): + out_container.mux(packet) + + in_container.close() + + file_size = os.path.getsize(output_path) + logger.info(f"Padding complete: {file_size} bytes") + + logger.info("Uploading padded track to S3") + + with open(output_path, "rb") as f: + upload_response = requests.put(req.output_url, data=f, timeout=S3_TIMEOUT) + + upload_response.raise_for_status() + logger.info(f"Upload complete: {file_size} bytes") + + return {"size": file_size} + + finally: + if input_path and os.path.exists(input_path): + try: + os.unlink(input_path) + except Exception as e: + logger.warning(f"Failed to cleanup input file: {e}") + if output_path and os.path.exists(output_path): + try: + os.unlink(output_path) + except Exception as e: + logger.warning(f"Failed to cleanup output file: {e}") + try: + os.rmdir(temp_dir) + except Exception as e: + logger.warning(f"Failed to cleanup temp directory: {e}") + + return web_app + diff --git a/server/pyproject.toml b/server/pyproject.toml index df6cf536..1cbb5320 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -8,7 +8,7 @@ readme = "README.md" dependencies = [ "aiohttp>=3.9.0", "aiohttp-cors>=0.7.0", - "av>=10.0.0", + "av>=15.0.0", "requests>=2.31.0", "aiortc>=1.5.0", "sortedcontainers>=2.4.0", diff --git a/server/reflector/hatchet/constants.py b/server/reflector/hatchet/constants.py index fbe6d25b..209d1bd1 100644 --- a/server/reflector/hatchet/constants.py +++ b/server/reflector/hatchet/constants.py @@ -37,5 +37,5 @@ LLM_RATE_LIMIT_PER_SECOND = 10 TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation TIMEOUT_LONG = 180 # Action items (larger context LLM) -TIMEOUT_AUDIO = 300 # Audio processing: padding, mixdown +TIMEOUT_AUDIO = 720 # Audio processing: padding, mixdown TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks diff --git a/server/reflector/hatchet/workflows/padding_workflow.py b/server/reflector/hatchet/workflows/padding_workflow.py new file mode 100644 index 00000000..229da5e2 --- /dev/null +++ b/server/reflector/hatchet/workflows/padding_workflow.py @@ -0,0 +1,165 @@ +""" +Hatchet child workflow: PaddingWorkflow +Handles individual audio track padding via Modal.com backend. +""" + +from datetime import timedelta + +import av +from hatchet_sdk import Context +from pydantic import BaseModel + +from reflector.hatchet.client import HatchetClientManager +from reflector.hatchet.constants import TIMEOUT_AUDIO +from reflector.hatchet.workflows.models import PadTrackResult +from reflector.logger import logger +from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS +from reflector.utils.audio_padding import extract_stream_start_time_from_container + + +class PaddingInput(BaseModel): + """Input for individual track padding.""" + + track_index: int + s3_key: str + bucket_name: str + transcript_id: str + + +hatchet = HatchetClientManager.get_client() + +padding_workflow = hatchet.workflow( + name="PaddingWorkflow", input_validator=PaddingInput +) + + +@padding_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3) +async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult: + """Pad audio track with silence based on WebM container start_time.""" + ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}") + logger.info( + "[Hatchet] pad_track", + track_index=input.track_index, + s3_key=input.s3_key, + transcript_id=input.transcript_id, + ) + + try: + # Create fresh storage instance to avoid aioboto3 fork issues + from reflector.settings import settings # noqa: PLC0415 + from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415 + + storage = AwsStorage( + aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME, + aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION, + aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID, + aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY, + ) + + source_url = await storage.get_file_url( + input.s3_key, + operation="get_object", + expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, + bucket=input.bucket_name, + ) + + # Extract start_time to determine if padding needed + with av.open(source_url) as in_container: + if in_container.duration: + try: + duration = timedelta(seconds=in_container.duration // 1_000_000) + ctx.log( + f"pad_track: track {input.track_index}, duration={duration}" + ) + except (ValueError, TypeError, OverflowError) as e: + ctx.log( + f"pad_track: track {input.track_index}, duration error: {str(e)}" + ) + + start_time_seconds = extract_stream_start_time_from_container( + in_container, input.track_index, logger=logger + ) + + if start_time_seconds <= 0: + logger.info( + f"Track {input.track_index} requires no padding", + track_index=input.track_index, + ) + return PadTrackResult( + padded_key=input.s3_key, + bucket_name=input.bucket_name, + size=0, + track_index=input.track_index, + ) + + storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm" + + # Presign PUT URL for output (Modal will upload directly) + output_url = await storage.get_file_url( + storage_path, + operation="put_object", + expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, + ) + + import httpx # noqa: PLC0415 + + from reflector.processors.audio_padding_modal import ( # noqa: PLC0415 + AudioPaddingModalProcessor, + ) + + try: + processor = AudioPaddingModalProcessor() + result = await processor.pad_track( + track_url=source_url, + output_url=output_url, + start_time_seconds=start_time_seconds, + track_index=input.track_index, + ) + file_size = result.size + + ctx.log(f"pad_track: Modal returned size={file_size}") + except httpx.HTTPStatusError as e: + error_detail = e.response.text if hasattr(e.response, "text") else str(e) + logger.error( + "[Hatchet] Modal padding HTTP error", + transcript_id=input.transcript_id, + track_index=input.track_index, + status_code=e.response.status_code if hasattr(e, "response") else None, + error=error_detail, + exc_info=True, + ) + raise Exception( + f"Modal padding failed: HTTP {e.response.status_code}" + ) from e + except httpx.TimeoutException as e: + logger.error( + "[Hatchet] Modal padding timeout", + transcript_id=input.transcript_id, + track_index=input.track_index, + error=str(e), + exc_info=True, + ) + raise Exception("Modal padding timeout") from e + + logger.info( + "[Hatchet] pad_track complete", + track_index=input.track_index, + padded_key=storage_path, + ) + + return PadTrackResult( + padded_key=storage_path, + bucket_name=None, # None = use default transcript storage bucket + size=file_size, + track_index=input.track_index, + ) + + except Exception as e: + logger.error( + "[Hatchet] pad_track failed", + transcript_id=input.transcript_id, + track_index=input.track_index, + error=str(e), + exc_info=True, + ) + raise diff --git a/server/reflector/hatchet/workflows/track_processing.py b/server/reflector/hatchet/workflows/track_processing.py index dd3aea3a..5a9e7505 100644 --- a/server/reflector/hatchet/workflows/track_processing.py +++ b/server/reflector/hatchet/workflows/track_processing.py @@ -14,9 +14,7 @@ Hatchet workers run in forked processes; fresh imports per task ensure storage/DB connections are not shared across forks. """ -import tempfile from datetime import timedelta -from pathlib import Path import av from hatchet_sdk import Context @@ -27,10 +25,7 @@ from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult from reflector.logger import logger from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS -from reflector.utils.audio_padding import ( - apply_audio_padding_to_file, - extract_stream_start_time_from_container, -) +from reflector.utils.audio_padding import extract_stream_start_time_from_container class TrackInput(BaseModel): @@ -83,63 +78,44 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: ) with av.open(source_url) as in_container: - if in_container.duration: - try: - duration = timedelta(seconds=in_container.duration // 1_000_000) - ctx.log( - f"pad_track: track {input.track_index}, duration={duration}" - ) - except Exception: - ctx.log(f"pad_track: track {input.track_index}, duration=ERROR") - start_time_seconds = extract_stream_start_time_from_container( in_container, input.track_index, logger=logger ) - # If no padding needed, return original S3 key - if start_time_seconds <= 0: - logger.info( - f"Track {input.track_index} requires no padding", - track_index=input.track_index, - ) - return PadTrackResult( - padded_key=input.s3_key, - bucket_name=input.bucket_name, - size=0, - track_index=input.track_index, - ) + # If no padding needed, return original S3 key + if start_time_seconds <= 0: + logger.info( + f"Track {input.track_index} requires no padding", + track_index=input.track_index, + ) + return PadTrackResult( + padded_key=input.s3_key, + bucket_name=input.bucket_name, + size=0, + track_index=input.track_index, + ) - with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file: - temp_path = temp_file.name + storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm" - try: - apply_audio_padding_to_file( - in_container, - temp_path, - start_time_seconds, - input.track_index, - logger=logger, - ) + # Presign PUT URL for output (Modal uploads directly) + output_url = await storage.get_file_url( + storage_path, + operation="put_object", + expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, + ) - file_size = Path(temp_path).stat().st_size - storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm" + from reflector.processors.audio_padding_modal import ( # noqa: PLC0415 + AudioPaddingModalProcessor, + ) - logger.info( - f"About to upload padded track", - key=storage_path, - size=file_size, - ) - - with open(temp_path, "rb") as padded_file: - await storage.put_file(storage_path, padded_file) - - logger.info( - f"Uploaded padded track to S3", - key=storage_path, - size=file_size, - ) - finally: - Path(temp_path).unlink(missing_ok=True) + processor = AudioPaddingModalProcessor() + result = await processor.pad_track( + track_url=source_url, + output_url=output_url, + start_time_seconds=start_time_seconds, + track_index=input.track_index, + ) + file_size = result.size ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}") logger.info( diff --git a/server/reflector/processors/audio_padding_modal.py b/server/reflector/processors/audio_padding_modal.py new file mode 100644 index 00000000..128133db --- /dev/null +++ b/server/reflector/processors/audio_padding_modal.py @@ -0,0 +1,112 @@ +""" +Modal.com backend for audio padding. +""" + +import asyncio +import os + +import httpx +from pydantic import BaseModel + +from reflector.logger import logger + + +class PaddingResponse(BaseModel): + size: int + cancelled: bool = False + + +class AudioPaddingModalProcessor: + """Audio padding processor using Modal.com CPU backend via HTTP.""" + + def __init__( + self, padding_url: str | None = None, modal_api_key: str | None = None + ): + self.padding_url = padding_url or os.getenv("PADDING_URL") + if not self.padding_url: + raise ValueError( + "PADDING_URL required to use AudioPaddingModalProcessor. " + "Set PADDING_URL environment variable or pass padding_url parameter." + ) + + self.modal_api_key = modal_api_key or os.getenv("MODAL_API_KEY") + + async def pad_track( + self, + track_url: str, + output_url: str, + start_time_seconds: float, + track_index: int, + ) -> PaddingResponse: + """Pad audio track with silence via Modal backend. + + Args: + track_url: Presigned GET URL for source audio track + output_url: Presigned PUT URL for output WebM + start_time_seconds: Amount of silence to prepend + track_index: Track index for logging + """ + if not track_url: + raise ValueError("track_url cannot be empty") + if start_time_seconds <= 0: + raise ValueError( + f"start_time_seconds must be positive, got {start_time_seconds}" + ) + + log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds) + log.info("Sending Modal padding HTTP request") + + url = f"{self.padding_url}/pad" + + headers = {} + if self.modal_api_key: + headers["Authorization"] = f"Bearer {self.modal_api_key}" + + try: + async with httpx.AsyncClient() as client: + response = await client.post( + url, + headers=headers, + json={ + "track_url": track_url, + "output_url": output_url, + "start_time_seconds": start_time_seconds, + "track_index": track_index, + }, + follow_redirects=True, + ) + + if response.status_code != 200: + error_body = response.text + log.error( + "Modal padding API error", + status_code=response.status_code, + error_body=error_body, + ) + + response.raise_for_status() + result = response.json() + + # Check if work was cancelled + if result.get("cancelled"): + log.warning("Modal padding was cancelled by disconnect detection") + raise asyncio.CancelledError( + "Padding cancelled due to client disconnect" + ) + + log.info("Modal padding complete", size=result["size"]) + return PaddingResponse(**result) + except asyncio.CancelledError: + log.warning( + "Modal padding cancelled (Hatchet timeout, disconnect detected on Modal side)" + ) + raise + except httpx.TimeoutException as e: + log.error("Modal padding timeout", error=str(e), exc_info=True) + raise Exception(f"Modal padding timeout: {e}") from e + except httpx.HTTPStatusError as e: + log.error("Modal padding HTTP error", error=str(e), exc_info=True) + raise Exception(f"Modal padding HTTP error: {e}") from e + except Exception as e: + log.error("Modal padding unexpected error", error=str(e), exc_info=True) + raise diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 6fc808d2..3a200d4a 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -98,6 +98,10 @@ class Settings(BaseSettings): # Diarization: local pyannote.audio DIARIZATION_PYANNOTE_AUTH_TOKEN: str | None = None + # Audio Padding (Modal.com backend) + PADDING_URL: str | None = None + PADDING_MODAL_API_KEY: str | None = None + # Sentry SENTRY_DSN: str | None = None diff --git a/server/reflector/utils/audio_constants.py b/server/reflector/utils/audio_constants.py index 0ad964a9..03270976 100644 --- a/server/reflector/utils/audio_constants.py +++ b/server/reflector/utils/audio_constants.py @@ -5,7 +5,9 @@ Used by both Hatchet workflows and Celery pipelines for consistent audio encodin """ # Opus codec settings +# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711 OPUS_STANDARD_SAMPLE_RATE = 48000 +# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711 OPUS_DEFAULT_BIT_RATE = 128000 # 128kbps for good speech quality # S3 presigned URL expiration diff --git a/server/uv.lock b/server/uv.lock index fd8389d3..ac2849f1 100644 --- a/server/uv.lock +++ b/server/uv.lock @@ -159,21 +159,20 @@ wheels = [ [[package]] name = "aiortc" -version = "1.13.0" +version = "1.14.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aioice" }, { name = "av" }, - { name = "cffi" }, { name = "cryptography" }, { name = "google-crc32c" }, { name = "pyee" }, { name = "pylibsrtp" }, { name = "pyopenssl" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/62/03/bc947d74c548e0c17cf94e5d5bdacaed0ee9e5b2bb7b8b8cf1ac7a7c01ec/aiortc-1.13.0.tar.gz", hash = "sha256:5d209975c22d0910fb5a0f0e2caa828f2da966c53580f7c7170ac3a16a871620", size = 1179894 } +sdist = { url = "https://files.pythonhosted.org/packages/51/9c/4e027bfe0195de0442da301e2389329496745d40ae44d2d7c4571c4290ce/aiortc-1.14.0.tar.gz", hash = "sha256:adc8a67ace10a085721e588e06a00358ed8eaf5f6b62f0a95358ff45628dd762", size = 1180864 } wheels = [ - { url = "https://files.pythonhosted.org/packages/87/29/765633cab5f1888890f5f172d1d53009b9b14e079cdfa01a62d9896a9ea9/aiortc-1.13.0-py3-none-any.whl", hash = "sha256:9ccccec98796f6a96bd1c3dd437a06da7e0f57521c96bd56e4b965a91b03a0a0", size = 92910 }, + { url = "https://files.pythonhosted.org/packages/57/ab/31646a49209568cde3b97eeade0d28bb78b400e6645c56422c101df68932/aiortc-1.14.0-py3-none-any.whl", hash = "sha256:4b244d7e482f4e1f67e685b3468269628eca1ec91fa5b329ab517738cfca086e", size = 93183 }, ] [[package]] @@ -327,28 +326,24 @@ wheels = [ [[package]] name = "av" -version = "14.4.0" +version = "16.1.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/86/f6/0b473dab52dfdea05f28f3578b1c56b6c796ce85e76951bab7c4e38d5a74/av-14.4.0.tar.gz", hash = "sha256:3ecbf803a7fdf67229c0edada0830d6bfaea4d10bfb24f0c3f4e607cd1064b42", size = 3892203 } +sdist = { url = "https://files.pythonhosted.org/packages/78/cd/3a83ffbc3cc25b39721d174487fb0d51a76582f4a1703f98e46170ce83d4/av-16.1.0.tar.gz", hash = "sha256:a094b4fd87a3721dacf02794d3d2c82b8d712c85b9534437e82a8a978c175ffd", size = 4285203 } wheels = [ - { url = "https://files.pythonhosted.org/packages/18/8a/d57418b686ffd05fabd5a0a9cfa97e63b38c35d7101af00e87c51c8cc43c/av-14.4.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:5b21d5586a88b9fce0ab78e26bd1c38f8642f8e2aad5b35e619f4d202217c701", size = 19965048 }, - { url = "https://files.pythonhosted.org/packages/f5/aa/3f878b0301efe587e9b07bb773dd6b47ef44ca09a3cffb4af50c08a170f3/av-14.4.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:cf8762d90b0f94a20c9f6e25a94f1757db5a256707964dfd0b1d4403e7a16835", size = 23750064 }, - { url = "https://files.pythonhosted.org/packages/9a/b4/6fe94a31f9ed3a927daa72df67c7151968587106f30f9f8fcd792b186633/av-14.4.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c0ac9f08920c7bbe0795319689d901e27cb3d7870b9a0acae3f26fc9daa801a6", size = 33648775 }, - { url = "https://files.pythonhosted.org/packages/6c/f3/7f3130753521d779450c935aec3f4beefc8d4645471159f27b54e896470c/av-14.4.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a56d9ad2afdb638ec0404e962dc570960aae7e08ae331ad7ff70fbe99a6cf40e", size = 32216915 }, - { url = "https://files.pythonhosted.org/packages/f8/9a/8ffabfcafb42154b4b3a67d63f9b69e68fa8c34cb39ddd5cb813dd049ed4/av-14.4.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bed513cbcb3437d0ae47743edc1f5b4a113c0b66cdd4e1aafc533abf5b2fbf2", size = 35287279 }, - { url = "https://files.pythonhosted.org/packages/ad/11/7023ba0a2ca94a57aedf3114ab8cfcecb0819b50c30982a4c5be4d31df41/av-14.4.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d030c2d3647931e53d51f2f6e0fcf465263e7acf9ec6e4faa8dbfc77975318c3", size = 36294683 }, - { url = "https://files.pythonhosted.org/packages/3d/fa/b8ac9636bd5034e2b899354468bef9f4dadb067420a16d8a493a514b7817/av-14.4.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1cc21582a4f606271d8c2036ec7a6247df0831050306c55cf8a905701d0f0474", size = 34552391 }, - { url = "https://files.pythonhosted.org/packages/fb/29/0db48079c207d1cba7a2783896db5aec3816e17de55942262c244dffbc0f/av-14.4.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ce7c9cd452153d36f1b1478f904ed5f9ab191d76db873bdd3a597193290805d4", size = 37265250 }, - { url = "https://files.pythonhosted.org/packages/1c/55/715858c3feb7efa4d667ce83a829c8e6ee3862e297fb2b568da3f968639d/av-14.4.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd261e31cc6b43ca722f80656c39934199d8f2eb391e0147e704b6226acebc29", size = 27925845 }, - { url = "https://files.pythonhosted.org/packages/a6/75/b8641653780336c90ba89e5352cac0afa6256a86a150c7703c0b38851c6d/av-14.4.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:a53e682b239dd23b4e3bc9568cfb1168fc629ab01925fdb2e7556eb426339e94", size = 19954125 }, - { url = "https://files.pythonhosted.org/packages/99/e6/37fe6fa5853a48d54d749526365780a63a4bc530be6abf2115e3a21e292a/av-14.4.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5aa0b901751a32703fa938d2155d56ce3faf3630e4a48d238b35d2f7e49e5395", size = 23751479 }, - { url = "https://files.pythonhosted.org/packages/f7/75/9a5f0e6bda5f513b62bafd1cff2b495441a8b07ab7fb7b8e62f0c0d1683f/av-14.4.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a3b316fed3597675fe2aacfed34e25fc9d5bb0196dc8c0b014ae5ed4adda48de", size = 33801401 }, - { url = "https://files.pythonhosted.org/packages/6a/c9/e4df32a2ad1cb7f3a112d0ed610c5e43c89da80b63c60d60e3dc23793ec0/av-14.4.0-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a587b5c5014c3c0e16143a0f8d99874e46b5d0c50db6111aa0b54206b5687c81", size = 32364330 }, - { url = "https://files.pythonhosted.org/packages/ca/f0/64e7444a41817fde49a07d0239c033f7e9280bec4a4bb4784f5c79af95e6/av-14.4.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10d53f75e8ac1ec8877a551c0db32a83c0aaeae719d05285281eaaba211bbc30", size = 35519508 }, - { url = "https://files.pythonhosted.org/packages/c2/a8/a370099daa9033a3b6f9b9bd815304b3d8396907a14d09845f27467ba138/av-14.4.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:c8558cfde79dd8fc92d97c70e0f0fa8c94c7a66f68ae73afdf58598f0fe5e10d", size = 36448593 }, - { url = "https://files.pythonhosted.org/packages/27/bb/edb6ceff8fa7259cb6330c51dbfbc98dd1912bd6eb5f7bc05a4bb14a9d6e/av-14.4.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:455b6410dea0ab2d30234ffb28df7d62ca3cdf10708528e247bec3a4cdcced09", size = 34701485 }, - { url = "https://files.pythonhosted.org/packages/a7/8a/957da1f581aa1faa9a5dfa8b47ca955edb47f2b76b949950933b457bfa1d/av-14.4.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1661efbe9d975f927b8512d654704223d936f39016fad2ddab00aee7c40f412c", size = 37521981 }, - { url = "https://files.pythonhosted.org/packages/28/76/3f1cf0568592f100fd68eb40ed8c491ce95ca3c1378cc2d4c1f6d1bd295d/av-14.4.0-cp312-cp312-win_amd64.whl", hash = "sha256:fbbeef1f421a3461086853d6464ad5526b56ffe8ccb0ab3fd0a1f121dfbf26ad", size = 27925944 }, + { url = "https://files.pythonhosted.org/packages/48/d0/b71b65d1b36520dcb8291a2307d98b7fc12329a45614a303ff92ada4d723/av-16.1.0-cp311-cp311-macosx_11_0_x86_64.whl", hash = "sha256:e88ad64ee9d2b9c4c5d891f16c22ae78e725188b8926eb88187538d9dd0b232f", size = 26927747 }, + { url = "https://files.pythonhosted.org/packages/2f/79/720a5a6ccdee06eafa211b945b0a450e3a0b8fc3d12922f0f3c454d870d2/av-16.1.0-cp311-cp311-macosx_14_0_arm64.whl", hash = "sha256:cb296073fa6935724de72593800ba86ae49ed48af03960a4aee34f8a611f442b", size = 21492232 }, + { url = "https://files.pythonhosted.org/packages/8e/4f/a1ba8d922f2f6d1a3d52419463ef26dd6c4d43ee364164a71b424b5ae204/av-16.1.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:720edd4d25aa73723c1532bb0597806d7b9af5ee34fc02358782c358cfe2f879", size = 39291737 }, + { url = "https://files.pythonhosted.org/packages/1a/31/fc62b9fe8738d2693e18d99f040b219e26e8df894c10d065f27c6b4f07e3/av-16.1.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:c7f2bc703d0df260a1fdf4de4253c7f5500ca9fc57772ea241b0cb241bcf972e", size = 40846822 }, + { url = "https://files.pythonhosted.org/packages/53/10/ab446583dbce730000e8e6beec6ec3c2753e628c7f78f334a35cad0317f4/av-16.1.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d69c393809babada7d54964d56099e4b30a3e1f8b5736ca5e27bd7be0e0f3c83", size = 40675604 }, + { url = "https://files.pythonhosted.org/packages/31/d7/1003be685277005f6d63fd9e64904ee222fe1f7a0ea70af313468bb597db/av-16.1.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:441892be28582356d53f282873c5a951592daaf71642c7f20165e3ddcb0b4c63", size = 42015955 }, + { url = "https://files.pythonhosted.org/packages/2f/4a/fa2a38ee9306bf4579f556f94ecbc757520652eb91294d2a99c7cf7623b9/av-16.1.0-cp311-cp311-win_amd64.whl", hash = "sha256:273a3e32de64819e4a1cd96341824299fe06f70c46f2288b5dc4173944f0fd62", size = 31750339 }, + { url = "https://files.pythonhosted.org/packages/9c/84/2535f55edcd426cebec02eb37b811b1b0c163f26b8d3f53b059e2ec32665/av-16.1.0-cp312-cp312-macosx_11_0_x86_64.whl", hash = "sha256:640f57b93f927fba8689f6966c956737ee95388a91bd0b8c8b5e0481f73513d6", size = 26945785 }, + { url = "https://files.pythonhosted.org/packages/b6/17/ffb940c9e490bf42e86db4db1ff426ee1559cd355a69609ec1efe4d3a9eb/av-16.1.0-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:ae3fb658eec00852ebd7412fdc141f17f3ddce8afee2d2e1cf366263ad2a3b35", size = 21481147 }, + { url = "https://files.pythonhosted.org/packages/15/c1/e0d58003d2d83c3921887d5c8c9b8f5f7de9b58dc2194356a2656a45cfdc/av-16.1.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:27ee558d9c02a142eebcbe55578a6d817fedfde42ff5676275504e16d07a7f86", size = 39517197 }, + { url = "https://files.pythonhosted.org/packages/32/77/787797b43475d1b90626af76f80bfb0c12cfec5e11eafcfc4151b8c80218/av-16.1.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:7ae547f6d5fa31763f73900d43901e8c5fa6367bb9a9840978d57b5a7ae14ed2", size = 41174337 }, + { url = "https://files.pythonhosted.org/packages/8e/ac/d90df7f1e3b97fc5554cf45076df5045f1e0a6adf13899e10121229b826c/av-16.1.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:8cf065f9d438e1921dc31fc7aa045790b58aee71736897866420d80b5450f62a", size = 40817720 }, + { url = "https://files.pythonhosted.org/packages/80/6f/13c3a35f9dbcebafd03fe0c4cbd075d71ac8968ec849a3cfce406c35a9d2/av-16.1.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:a345877a9d3cc0f08e2bc4ec163ee83176864b92587afb9d08dff50f37a9a829", size = 42267396 }, + { url = "https://files.pythonhosted.org/packages/c8/b9/275df9607f7fb44317ccb1d4be74827185c0d410f52b6e2cd770fe209118/av-16.1.0-cp312-cp312-win_amd64.whl", hash = "sha256:f49243b1d27c91cd8c66fdba90a674e344eb8eb917264f36117bf2b6879118fd", size = 31752045 }, ] [[package]] @@ -3267,7 +3262,7 @@ requires-dist = [ { name = "aiohttp-cors", specifier = ">=0.7.0" }, { name = "aiortc", specifier = ">=1.5.0" }, { name = "alembic", specifier = ">=1.11.3" }, - { name = "av", specifier = ">=10.0.0" }, + { name = "av", specifier = ">=15.0.0" }, { name = "celery", specifier = ">=5.3.4" }, { name = "databases", extras = ["aiosqlite", "asyncpg"], specifier = ">=0.7.0" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.100.1" },