Add Modal backend for audio mixdown

This commit is contained in:
Igor Loskutov
2026-01-21 17:06:17 -05:00
parent c8743fdf1c
commit e1b790c5a8
5 changed files with 589 additions and 26 deletions

View File

@@ -489,7 +489,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
)
@with_error_handling(TaskName.MIXDOWN_TRACKS)
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
"""Mix all padded tracks into single audio file using PyAV (same as Celery)."""
"""Mix all padded tracks into single audio file using PyAV or Modal backend."""
ctx.log("mixdown_tracks: mixing padded tracks into single audio file")
track_result = ctx.task_output(process_tracks)
@@ -513,7 +513,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
storage = _spawn_storage()
# Presign URLs on demand (avoids stale URLs on workflow replay)
# Presign URLs for padded tracks (same expiration for both backends)
padded_urls = []
for track_info in padded_tracks:
if track_info.key:
@@ -534,33 +534,104 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
logger.error("Mixdown failed - no decodable audio frames found")
raise ValueError("No decodable audio frames in any track")
output_path = tempfile.mktemp(suffix=".mp3")
duration_ms_callback_capture_container = [0.0]
output_key = f"{input.transcript_id}/audio.mp3"
async def capture_duration(d):
duration_ms_callback_capture_container[0] = d
# Conditional: Modal or local backend
if settings.MIXDOWN_BACKEND == "modal":
ctx.log("mixdown_tracks: using Modal backend")
writer = AudioFileWriterProcessor(path=output_path, on_duration=capture_duration)
# Presign PUT URL for output (Modal will upload directly)
output_url = await storage.get_file_url(
output_key,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
await mixdown_tracks_pyav(
valid_urls,
writer,
target_sample_rate,
offsets_seconds=None,
logger=logger,
progress_callback=make_audio_progress_logger(ctx, TaskName.MIXDOWN_TRACKS),
expected_duration_sec=recording_duration if recording_duration > 0 else None,
)
await writer.flush()
from reflector.processors.audio_mixdown_modal import ( # noqa: PLC0415
AudioMixdownModalProcessor,
)
file_size = Path(output_path).stat().st_size
storage_path = f"{input.transcript_id}/audio.mp3"
try:
processor = AudioMixdownModalProcessor()
result = await processor.mixdown(
track_urls=valid_urls,
output_url=output_url,
target_sample_rate=target_sample_rate,
expected_duration_sec=recording_duration
if recording_duration > 0
else None,
)
duration_ms = result.duration_ms
tracks_mixed = result.tracks_mixed
with open(output_path, "rb") as mixed_file:
await storage.put_file(storage_path, mixed_file)
ctx.log(
f"mixdown_tracks: Modal returned duration={duration_ms}ms, tracks={tracks_mixed}"
)
except httpx.HTTPStatusError as e:
error_detail = e.response.text if hasattr(e.response, "text") else str(e)
logger.error(
"[Hatchet] Modal mixdown HTTP error",
transcript_id=input.transcript_id,
status_code=e.response.status_code if hasattr(e, "response") else None,
error=error_detail,
)
raise RuntimeError(
f"Modal mixdown failed with HTTP {e.response.status_code}: {error_detail}"
)
except httpx.TimeoutException:
logger.error(
"[Hatchet] Modal mixdown timeout",
transcript_id=input.transcript_id,
timeout=settings.MIXDOWN_TIMEOUT,
)
raise RuntimeError(
f"Modal mixdown timeout after {settings.MIXDOWN_TIMEOUT}s"
)
except ValueError as e:
logger.error(
"[Hatchet] Modal mixdown validation error",
transcript_id=input.transcript_id,
error=str(e),
)
raise
else:
ctx.log("mixdown_tracks: using local backend")
Path(output_path).unlink(missing_ok=True)
# Existing local implementation
output_path = tempfile.mktemp(suffix=".mp3")
duration_ms_callback_capture_container = [0.0]
async def capture_duration(d):
duration_ms_callback_capture_container[0] = d
writer = AudioFileWriterProcessor(
path=output_path, on_duration=capture_duration
)
await mixdown_tracks_pyav(
valid_urls,
writer,
target_sample_rate,
offsets_seconds=None,
logger=logger,
progress_callback=make_audio_progress_logger(ctx, TaskName.MIXDOWN_TRACKS),
expected_duration_sec=recording_duration
if recording_duration > 0
else None,
)
await writer.flush()
file_size = Path(output_path).stat().st_size
with open(output_path, "rb") as mixed_file:
await storage.put_file(output_key, mixed_file)
Path(output_path).unlink(missing_ok=True)
duration_ms = duration_ms_callback_capture_container[0]
tracks_mixed = len(valid_urls)
ctx.log(f"mixdown_tracks: local mixdown uploaded {file_size} bytes")
# Update DB (same for both backends)
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
@@ -570,12 +641,12 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
transcript, {"audio_location": "storage"}
)
ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}")
ctx.log(f"mixdown_tracks complete: uploaded to {output_key}")
return MixdownResult(
audio_key=storage_path,
duration=duration_ms_callback_capture_container[0],
tracks_mixed=len(valid_urls),
audio_key=output_key,
duration=duration_ms,
tracks_mixed=tracks_mixed,
)

View File

@@ -0,0 +1,89 @@
"""
Modal.com backend for audio mixdown.
Uses Modal's CPU containers to offload audio mixing from Hatchet workers.
Communicates via presigned S3 URLs for both input and output.
"""
import httpx
from pydantic import BaseModel
from reflector.settings import settings
class MixdownResponse(BaseModel):
"""Response from Modal mixdown endpoint."""
duration_ms: float
tracks_mixed: int
audio_uploaded: bool
class AudioMixdownModalProcessor:
"""Audio mixdown processor using Modal.com CPU backend.
Sends track URLs (presigned GET) and output URL (presigned PUT) to Modal.
Modal handles download, mixdown via PyAV, and upload.
"""
def __init__(self, modal_api_key: str | None = None):
if not settings.MIXDOWN_URL:
raise ValueError("MIXDOWN_URL required to use AudioMixdownModalProcessor")
self.mixdown_url = settings.MIXDOWN_URL + "/v1"
self.timeout = settings.MIXDOWN_TIMEOUT
self.modal_api_key = modal_api_key or settings.MIXDOWN_MODAL_API_KEY
if not self.modal_api_key:
raise ValueError(
"MIXDOWN_MODAL_API_KEY required to use AudioMixdownModalProcessor"
)
async def mixdown(
self,
track_urls: list[str],
output_url: str,
target_sample_rate: int,
expected_duration_sec: float | None = None,
) -> MixdownResponse:
"""Mix multiple audio tracks via Modal backend.
Args:
track_urls: List of presigned GET URLs for audio tracks (non-empty)
output_url: Presigned PUT URL for output MP3
target_sample_rate: Sample rate for output (Hz, must be positive)
expected_duration_sec: Optional fallback duration if container metadata unavailable
Returns:
MixdownResponse with duration_ms, tracks_mixed, audio_uploaded
Raises:
ValueError: If track_urls is empty or target_sample_rate invalid
httpx.HTTPStatusError: On HTTP errors (404, 403, 500, etc.)
httpx.TimeoutException: On timeout
"""
# Validate inputs
if not track_urls:
raise ValueError("track_urls cannot be empty")
if target_sample_rate <= 0:
raise ValueError(
f"target_sample_rate must be positive, got {target_sample_rate}"
)
if expected_duration_sec is not None and expected_duration_sec < 0:
raise ValueError(
f"expected_duration_sec cannot be negative, got {expected_duration_sec}"
)
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.post(
f"{self.mixdown_url}/audio/mixdown",
headers={"Authorization": f"Bearer {self.modal_api_key}"},
json={
"track_urls": track_urls,
"output_url": output_url,
"target_sample_rate": target_sample_rate,
"expected_duration_sec": expected_duration_sec,
},
)
response.raise_for_status()
return MixdownResponse(**response.json())

View File

@@ -98,6 +98,17 @@ class Settings(BaseSettings):
# Diarization: local pyannote.audio
DIARIZATION_PYANNOTE_AUTH_TOKEN: str | None = None
# Audio Mixdown
# backends:
# - local: in-process PyAV mixdown (runs in same process as Hatchet worker)
# - modal: HTTP API client to Modal.com CPU container
MIXDOWN_BACKEND: str = "local"
MIXDOWN_URL: str | None = None
MIXDOWN_TIMEOUT: int = 900 # 15 minutes
# Mixdown: modal backend
MIXDOWN_MODAL_API_KEY: str | None = None
# Sentry
SENTRY_DSN: str | None = None