feat: modal padding (#837)

* Add Modal backend for audio padding

- Create reflector_padding.py Modal deployment (CPU-based)
- Add PaddingWorkflow with conditional Modal/local backend
- Update deploy-all.sh to include padding deployment

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
This commit is contained in:
2026-01-30 13:11:51 -05:00
committed by GitHub
parent 2ca624f052
commit 7fde64e252
11 changed files with 625 additions and 82 deletions

View File

@@ -37,5 +37,5 @@ LLM_RATE_LIMIT_PER_SECOND = 10
TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates
TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation
TIMEOUT_LONG = 180 # Action items (larger context LLM)
TIMEOUT_AUDIO = 300 # Audio processing: padding, mixdown
TIMEOUT_AUDIO = 720 # Audio processing: padding, mixdown
TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks

View File

@@ -0,0 +1,165 @@
"""
Hatchet child workflow: PaddingWorkflow
Handles individual audio track padding via Modal.com backend.
"""
from datetime import timedelta
import av
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import TIMEOUT_AUDIO
from reflector.hatchet.workflows.models import PadTrackResult
from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import extract_stream_start_time_from_container
class PaddingInput(BaseModel):
"""Input for individual track padding."""
track_index: int
s3_key: str
bucket_name: str
transcript_id: str
hatchet = HatchetClientManager.get_client()
padding_workflow = hatchet.workflow(
name="PaddingWorkflow", input_validator=PaddingInput
)
@padding_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
"""Pad audio track with silence based on WebM container start_time."""
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
logger.info(
"[Hatchet] pad_track",
track_index=input.track_index,
s3_key=input.s3_key,
transcript_id=input.transcript_id,
)
try:
# Create fresh storage instance to avoid aioboto3 fork issues
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
source_url = await storage.get_file_url(
input.s3_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=input.bucket_name,
)
# Extract start_time to determine if padding needed
with av.open(source_url) as in_container:
if in_container.duration:
try:
duration = timedelta(seconds=in_container.duration // 1_000_000)
ctx.log(
f"pad_track: track {input.track_index}, duration={duration}"
)
except (ValueError, TypeError, OverflowError) as e:
ctx.log(
f"pad_track: track {input.track_index}, duration error: {str(e)}"
)
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
# Presign PUT URL for output (Modal will upload directly)
output_url = await storage.get_file_url(
storage_path,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
import httpx # noqa: PLC0415
from reflector.processors.audio_padding_modal import ( # noqa: PLC0415
AudioPaddingModalProcessor,
)
try:
processor = AudioPaddingModalProcessor()
result = await processor.pad_track(
track_url=source_url,
output_url=output_url,
start_time_seconds=start_time_seconds,
track_index=input.track_index,
)
file_size = result.size
ctx.log(f"pad_track: Modal returned size={file_size}")
except httpx.HTTPStatusError as e:
error_detail = e.response.text if hasattr(e.response, "text") else str(e)
logger.error(
"[Hatchet] Modal padding HTTP error",
transcript_id=input.transcript_id,
track_index=input.track_index,
status_code=e.response.status_code if hasattr(e, "response") else None,
error=error_detail,
exc_info=True,
)
raise Exception(
f"Modal padding failed: HTTP {e.response.status_code}"
) from e
except httpx.TimeoutException as e:
logger.error(
"[Hatchet] Modal padding timeout",
transcript_id=input.transcript_id,
track_index=input.track_index,
error=str(e),
exc_info=True,
)
raise Exception("Modal padding timeout") from e
logger.info(
"[Hatchet] pad_track complete",
track_index=input.track_index,
padded_key=storage_path,
)
return PadTrackResult(
padded_key=storage_path,
bucket_name=None, # None = use default transcript storage bucket
size=file_size,
track_index=input.track_index,
)
except Exception as e:
logger.error(
"[Hatchet] pad_track failed",
transcript_id=input.transcript_id,
track_index=input.track_index,
error=str(e),
exc_info=True,
)
raise

View File

@@ -14,9 +14,7 @@ Hatchet workers run in forked processes; fresh imports per task ensure
storage/DB connections are not shared across forks.
"""
import tempfile
from datetime import timedelta
from pathlib import Path
import av
from hatchet_sdk import Context
@@ -27,10 +25,7 @@ from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import (
apply_audio_padding_to_file,
extract_stream_start_time_from_container,
)
from reflector.utils.audio_padding import extract_stream_start_time_from_container
class TrackInput(BaseModel):
@@ -83,63 +78,44 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
)
with av.open(source_url) as in_container:
if in_container.duration:
try:
duration = timedelta(seconds=in_container.duration // 1_000_000)
ctx.log(
f"pad_track: track {input.track_index}, duration={duration}"
)
except Exception:
ctx.log(f"pad_track: track {input.track_index}, duration=ERROR")
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
# If no padding needed, return original S3 key
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
# If no padding needed, return original S3 key
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
temp_path = temp_file.name
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
try:
apply_audio_padding_to_file(
in_container,
temp_path,
start_time_seconds,
input.track_index,
logger=logger,
)
# Presign PUT URL for output (Modal uploads directly)
output_url = await storage.get_file_url(
storage_path,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
file_size = Path(temp_path).stat().st_size
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
from reflector.processors.audio_padding_modal import ( # noqa: PLC0415
AudioPaddingModalProcessor,
)
logger.info(
f"About to upload padded track",
key=storage_path,
size=file_size,
)
with open(temp_path, "rb") as padded_file:
await storage.put_file(storage_path, padded_file)
logger.info(
f"Uploaded padded track to S3",
key=storage_path,
size=file_size,
)
finally:
Path(temp_path).unlink(missing_ok=True)
processor = AudioPaddingModalProcessor()
result = await processor.pad_track(
track_url=source_url,
output_url=output_url,
start_time_seconds=start_time_seconds,
track_index=input.track_index,
)
file_size = result.size
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
logger.info(

View File

@@ -0,0 +1,112 @@
"""
Modal.com backend for audio padding.
"""
import asyncio
import os
import httpx
from pydantic import BaseModel
from reflector.logger import logger
class PaddingResponse(BaseModel):
size: int
cancelled: bool = False
class AudioPaddingModalProcessor:
"""Audio padding processor using Modal.com CPU backend via HTTP."""
def __init__(
self, padding_url: str | None = None, modal_api_key: str | None = None
):
self.padding_url = padding_url or os.getenv("PADDING_URL")
if not self.padding_url:
raise ValueError(
"PADDING_URL required to use AudioPaddingModalProcessor. "
"Set PADDING_URL environment variable or pass padding_url parameter."
)
self.modal_api_key = modal_api_key or os.getenv("MODAL_API_KEY")
async def pad_track(
self,
track_url: str,
output_url: str,
start_time_seconds: float,
track_index: int,
) -> PaddingResponse:
"""Pad audio track with silence via Modal backend.
Args:
track_url: Presigned GET URL for source audio track
output_url: Presigned PUT URL for output WebM
start_time_seconds: Amount of silence to prepend
track_index: Track index for logging
"""
if not track_url:
raise ValueError("track_url cannot be empty")
if start_time_seconds <= 0:
raise ValueError(
f"start_time_seconds must be positive, got {start_time_seconds}"
)
log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds)
log.info("Sending Modal padding HTTP request")
url = f"{self.padding_url}/pad"
headers = {}
if self.modal_api_key:
headers["Authorization"] = f"Bearer {self.modal_api_key}"
try:
async with httpx.AsyncClient() as client:
response = await client.post(
url,
headers=headers,
json={
"track_url": track_url,
"output_url": output_url,
"start_time_seconds": start_time_seconds,
"track_index": track_index,
},
follow_redirects=True,
)
if response.status_code != 200:
error_body = response.text
log.error(
"Modal padding API error",
status_code=response.status_code,
error_body=error_body,
)
response.raise_for_status()
result = response.json()
# Check if work was cancelled
if result.get("cancelled"):
log.warning("Modal padding was cancelled by disconnect detection")
raise asyncio.CancelledError(
"Padding cancelled due to client disconnect"
)
log.info("Modal padding complete", size=result["size"])
return PaddingResponse(**result)
except asyncio.CancelledError:
log.warning(
"Modal padding cancelled (Hatchet timeout, disconnect detected on Modal side)"
)
raise
except httpx.TimeoutException as e:
log.error("Modal padding timeout", error=str(e), exc_info=True)
raise Exception(f"Modal padding timeout: {e}") from e
except httpx.HTTPStatusError as e:
log.error("Modal padding HTTP error", error=str(e), exc_info=True)
raise Exception(f"Modal padding HTTP error: {e}") from e
except Exception as e:
log.error("Modal padding unexpected error", error=str(e), exc_info=True)
raise

View File

@@ -98,6 +98,10 @@ class Settings(BaseSettings):
# Diarization: local pyannote.audio
DIARIZATION_PYANNOTE_AUTH_TOKEN: str | None = None
# Audio Padding (Modal.com backend)
PADDING_URL: str | None = None
PADDING_MODAL_API_KEY: str | None = None
# Sentry
SENTRY_DSN: str | None = None

View File

@@ -5,7 +5,9 @@ Used by both Hatchet workflows and Celery pipelines for consistent audio encodin
"""
# Opus codec settings
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_STANDARD_SAMPLE_RATE = 48000
# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711
OPUS_DEFAULT_BIT_RATE = 128000 # 128kbps for good speech quality
# S3 presigned URL expiration