diff --git a/docsv2/selfhosted-architecture.md b/docsv2/selfhosted-architecture.md index a0845e55..3a47a74d 100644 --- a/docsv2/selfhosted-architecture.md +++ b/docsv2/selfhosted-architecture.md @@ -24,6 +24,8 @@ This document explains the internals of the self-hosted deployment: how the setu The self-hosted deployment runs the entire Reflector platform on a single server using Docker Compose. A single bash script (`scripts/setup-selfhosted.sh`) handles all configuration and orchestration. The key design principles are: - **One command to deploy** — flags select which features to enable +- **Config memory** — CLI args are saved to `data/.selfhosted-last-args`; re-run with no flags to replay +- **Per-service overrides** — individual ML backends (transcript, diarization, translation, padding, mixdown) can be overridden independently from the base mode - **Idempotent** — safe to re-run without losing existing configuration - **Profile-based composition** — Docker Compose profiles activate optional services - **No external dependencies required** — with `--garage` and `--ollama-*`, everything runs locally @@ -61,8 +63,9 @@ Creates or updates the backend environment file from `server/.env.selfhosted.exa - **Infrastructure** — PostgreSQL URL, Redis host, Celery broker (all pointing to Docker-internal hostnames) - **Public URLs** — `BASE_URL` and `CORS_ORIGIN` computed from the domain (if `--domain`), IP (if detected on Linux), or `localhost` - **WebRTC** — `WEBRTC_HOST` set to the server's LAN IP so browsers can reach UDP ICE candidates -- **Specialized models** — always points to `http://transcription:8000` (the Docker network alias shared by GPU and CPU containers) -- **HuggingFace token** — prompts interactively for pyannote model access; writes to root `.env` so Docker Compose can inject it into GPU/CPU containers +- **ML backends (per-service)** — Each ML service (transcript, diarization, translation, padding, mixdown) is configured independently using "effective backends" (`EFF_TRANSCRIPT`, `EFF_DIARIZATION`, `EFF_TRANSLATION`, `EFF_PADDING`, `EFF_MIXDOWN`). These are resolved from the base mode default + any `--transcript`/`--diarization`/`--translation`/`--padding`/`--mixdown` overrides. For `modal` backends, the URL is `http://transcription:8000` (GPU mode), user-provided (hosted mode), or read from existing env (CPU mode with override). For CPU backends, no URL is needed (in-process). If a service is overridden to `modal` in CPU mode without a URL configured, the script warns the user to set `TRANSCRIPT_URL` in `server/.env` +- **CPU timeouts** — `TRANSCRIPT_FILE_TIMEOUT` and `DIARIZATION_FILE_TIMEOUT` are increased to 3600s only for services actually using CPU backends (whisper/pyannote), not blanket for the whole mode +- **HuggingFace token** — prompted when diarization uses `pyannote` (in-process) or when GPU mode is active (GPU container needs it). Writes to root `.env` so Docker Compose can inject it into GPU/CPU containers - **LLM** — if `--ollama-*` is used, configures `LLM_URL` pointing to the Ollama container. Otherwise, warns that the user needs to configure an external LLM - **Public mode** — sets `PUBLIC_MODE=true` so the app is accessible without authentication by default - **Password auth** — if `--password` is passed, sets `AUTH_BACKEND=password`, `PUBLIC_MODE=false`, `ADMIN_EMAIL=admin@localhost`, and `ADMIN_PASSWORD_HASH` (the hash generated in Step 1). The admin user is provisioned in the database on container startup via `runserver.sh` @@ -228,11 +231,19 @@ Both the `gpu` and `cpu` services define a Docker network alias of `transcriptio Environment variables flow through multiple layers. Understanding this prevents confusion when debugging: ``` -Flags (--gpu, --garage, etc.) +CLI args (--gpu, --garage, --padding modal, --mixdown modal, etc.) │ - ├── setup-selfhosted.sh interprets flags + ├── Config memory: saved to data/.selfhosted-last-args + │ (replayed on next run if no args provided) + │ + ├── setup-selfhosted.sh resolves effective backends: + │ EFF_TRANSCRIPT = override or base mode default + │ EFF_DIARIZATION = override or base mode default + │ EFF_TRANSLATION = override or base mode default + │ EFF_PADDING = override or base mode default + │ EFF_MIXDOWN = override or base mode default │ │ - │ ├── Writes server/.env (backend config) + │ ├── Writes server/.env (backend config, per-service backends) │ ├── Writes www/.env (frontend config) │ ├── Writes .env (HF_TOKEN for compose interpolation) │ └── Writes Caddyfile (proxy routes) diff --git a/docsv2/selfhosted-production.md b/docsv2/selfhosted-production.md index 5e65b78f..443c2eda 100644 --- a/docsv2/selfhosted-production.md +++ b/docsv2/selfhosted-production.md @@ -70,7 +70,7 @@ That's it. The script generates env files, secrets, starts all containers, waits ## ML Processing Modes (Required) -Pick `--gpu`, `--cpu`, or `--hosted`. This determines how **transcription, diarization, translation, and audio padding** run: +Pick `--gpu`, `--cpu`, or `--hosted`. This determines how **transcription, diarization, translation, audio padding, and audio mixdown** run: | Flag | What it does | Requires | |------|-------------|----------| @@ -158,6 +158,56 @@ Without `--caddy` or `--domain`, no ports are exposed. Point your own reverse pr **Without a domain:** `--caddy` alone uses a self-signed certificate. Browsers will show a security warning that must be accepted. +## Per-Service Backend Overrides + +Override individual ML services without changing the base mode. Useful when you want most services on one backend but need specific services on another. + +| Flag | Valid backends | Default (`--gpu`/`--hosted`) | Default (`--cpu`) | +|------|---------------|------------------------------|-------------------| +| `--transcript BACKEND` | `whisper`, `modal` | `modal` | `whisper` | +| `--diarization BACKEND` | `pyannote`, `modal` | `modal` | `pyannote` | +| `--translation BACKEND` | `marian`, `modal`, `passthrough` | `modal` | `marian` | +| `--padding BACKEND` | `pyav`, `modal` | `modal` | `pyav` | +| `--mixdown BACKEND` | `pyav`, `modal` | `modal` | `pyav` | + +**Examples:** + +```bash +# CPU base, but use a remote modal service for padding only +./scripts/setup-selfhosted.sh --cpu --padding modal --garage --caddy + +# GPU base, but skip translation entirely (passthrough) +./scripts/setup-selfhosted.sh --gpu --translation passthrough --garage --caddy + +# CPU base with remote modal diarization and translation +./scripts/setup-selfhosted.sh --cpu --diarization modal --translation modal --garage +``` + +When overriding a service to `modal` in `--cpu` mode, the script will warn you to configure the service URL (`TRANSCRIPT_URL` etc.) in `server/.env` to point to your GPU service, then re-run. + +When overriding a service to a CPU backend (e.g., `--transcript whisper`) in `--gpu` mode, that service runs in-process on the server/worker containers while the GPU container still serves the remaining `modal` services. + +## Config Memory (No-Flag Re-run) + +After a successful run, the script saves your CLI arguments to `data/.selfhosted-last-args`. On subsequent runs with no arguments, the saved configuration is automatically replayed: + +```bash +# First run — saves the config +./scripts/setup-selfhosted.sh --gpu --ollama-gpu --garage --caddy + +# Later re-runs — same config, no flags needed +./scripts/setup-selfhosted.sh +# => "No flags provided — replaying saved configuration:" +# => " --gpu --ollama-gpu --garage --caddy" +``` + +To change the configuration, pass new flags — they override and replace the saved config: + +```bash +# Switch to CPU mode with overrides — this becomes the new saved config +./scripts/setup-selfhosted.sh --cpu --padding modal --garage --caddy +``` + ## What the Script Does 1. **Prerequisites check** — Docker, NVIDIA GPU (if needed), compose file exists @@ -189,6 +239,8 @@ Without `--caddy` or `--domain`, no ports are exposed. Point your own reverse pr | `TRANSCRIPT_URL` | Specialized model endpoint | `http://transcription:8000` | | `PADDING_BACKEND` | Audio padding backend (`pyav` or `modal`) | `modal` (selfhosted), `pyav` (default) | | `PADDING_URL` | Audio padding endpoint (when `PADDING_BACKEND=modal`) | `http://transcription:8000` | +| `MIXDOWN_BACKEND` | Audio mixdown backend (`pyav` or `modal`) | `modal` (selfhosted), `pyav` (default) | +| `MIXDOWN_URL` | Audio mixdown endpoint (when `MIXDOWN_BACKEND=modal`) | `http://transcription:8000` | | `LLM_URL` | OpenAI-compatible LLM endpoint | Auto-set for Ollama modes | | `LLM_API_KEY` | LLM API key | `not-needed` for Ollama | | `LLM_MODEL` | LLM model name | `qwen2.5:14b` for Ollama (override with `--llm-model`) | @@ -576,9 +628,9 @@ docker compose -f docker-compose.selfhosted.yml exec gpu curl http://localhost:8 ## Updating ```bash -# Option A: Pull latest prebuilt images and restart +# Option A: Pull latest prebuilt images and restart (replays saved config automatically) docker compose -f docker-compose.selfhosted.yml down -./scripts/setup-selfhosted.sh +./scripts/setup-selfhosted.sh # Option B: Build from source (after git pull) and restart git pull @@ -589,6 +641,8 @@ docker compose -f docker-compose.selfhosted.yml down docker compose -f docker-compose.selfhosted.yml build gpu # or cpu ``` +> **Note on config memory:** Running with no flags replays the saved config from your last run. Running with *any* flags replaces the saved config entirely — the script always saves the complete set of flags you provide. See [Config Memory](#config-memory-no-flag-re-run). + The setup script is idempotent — it won't overwrite existing secrets or env vars that are already set. ## Architecture Overview diff --git a/gpu/modal_deployments/deploy-all.sh b/gpu/modal_deployments/deploy-all.sh index 8eae3e66..93303604 100755 --- a/gpu/modal_deployments/deploy-all.sh +++ b/gpu/modal_deployments/deploy-all.sh @@ -132,13 +132,22 @@ fi echo " -> $DIARIZER_URL" echo "" -echo "Deploying padding (CPU audio processing via Modal SDK)..." -modal deploy reflector_padding.py -if [ $? -ne 0 ]; then +echo "Deploying padding (CPU audio processing)..." +PADDING_URL=$(modal deploy reflector_padding.py 2>&1 | grep -o 'https://[^ ]*web.modal.run' | head -1) +if [ -z "$PADDING_URL" ]; then echo "Error: Failed to deploy padding. Check Modal dashboard for details." exit 1 fi -echo " -> reflector-padding.pad_track (Modal SDK function)" +echo " -> $PADDING_URL" + +echo "" +echo "Deploying mixdown (CPU multi-track audio mixing)..." +MIXDOWN_URL=$(modal deploy reflector_mixdown.py 2>&1 | grep -o 'https://[^ ]*web.modal.run' | head -1) +if [ -z "$MIXDOWN_URL" ]; then + echo "Error: Failed to deploy mixdown. Check Modal dashboard for details." + exit 1 +fi +echo " -> $MIXDOWN_URL" # --- Output Configuration --- echo "" @@ -157,5 +166,11 @@ echo "DIARIZATION_BACKEND=modal" echo "DIARIZATION_URL=$DIARIZER_URL" echo "DIARIZATION_MODAL_API_KEY=$API_KEY" echo "" -echo "# Padding uses Modal SDK (requires MODAL_TOKEN_ID/SECRET in worker containers)" +echo "PADDING_BACKEND=modal" +echo "PADDING_URL=$PADDING_URL" +echo "PADDING_MODAL_API_KEY=$API_KEY" +echo "" +echo "MIXDOWN_BACKEND=modal" +echo "MIXDOWN_URL=$MIXDOWN_URL" +echo "MIXDOWN_MODAL_API_KEY=$API_KEY" echo "# --- End Modal Configuration ---" diff --git a/gpu/modal_deployments/reflector_mixdown.py b/gpu/modal_deployments/reflector_mixdown.py new file mode 100644 index 00000000..fe7ff141 --- /dev/null +++ b/gpu/modal_deployments/reflector_mixdown.py @@ -0,0 +1,385 @@ +""" +Reflector GPU backend - audio mixdown +===================================== + +CPU-intensive multi-track audio mixdown service. +Mixes N audio tracks into a single MP3 using PyAV amix filter graph. + +IMPORTANT: This mixdown logic is duplicated from server/reflector/utils/audio_mixdown.py +for Modal deployment isolation (Modal can't import from server/reflector/). If you modify +the PyAV filter graph or mixdown algorithm, you MUST update both: + - gpu/modal_deployments/reflector_mixdown.py (this file) + - server/reflector/utils/audio_mixdown.py + +Constants duplicated from server/reflector/utils/audio_constants.py for same reason. +""" + +import os +import tempfile +from fractions import Fraction +import asyncio + +import modal + +S3_TIMEOUT = 120 # Higher than padding (60s) — multiple track downloads +MIXDOWN_TIMEOUT = 1200 + (S3_TIMEOUT * 2) # 1440s total +SCALEDOWN_WINDOW = 60 +DISCONNECT_CHECK_INTERVAL = 2 + +app = modal.App("reflector-mixdown") + +# CPU-based image (mixdown is CPU-bound, no GPU needed) +image = ( + modal.Image.debian_slim(python_version="3.12") + .apt_install("ffmpeg") # Required by PyAV + .pip_install( + "av==13.1.0", # PyAV for audio processing + "requests==2.32.3", # HTTP for presigned URL downloads/uploads + "fastapi==0.115.12", # API framework + ) +) + + +@app.function( + cpu=4.0, # Higher than padding (2.0) for multi-track mixing + timeout=MIXDOWN_TIMEOUT, + scaledown_window=SCALEDOWN_WINDOW, + image=image, + secrets=[modal.Secret.from_name("reflector-gpu")], +) +@modal.asgi_app() +def web(): + from fastapi import Depends, FastAPI, HTTPException, Request, status + from fastapi.security import OAuth2PasswordBearer + from pydantic import BaseModel + + class MixdownRequest(BaseModel): + track_urls: list[str] + output_url: str + target_sample_rate: int | None = None + offsets_seconds: list[float] | None = None + + class MixdownResponse(BaseModel): + size: int + duration_ms: float = 0.0 + cancelled: bool = False + + web_app = FastAPI() + + oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + + def apikey_auth(apikey: str = Depends(oauth2_scheme)): + if apikey == os.environ["REFLECTOR_GPU_APIKEY"]: + return + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid API key", + headers={"WWW-Authenticate": "Bearer"}, + ) + + @web_app.post("/mixdown", dependencies=[Depends(apikey_auth)]) + async def mixdown_endpoint(request: Request, req: MixdownRequest) -> MixdownResponse: + """Modal web endpoint for mixing audio tracks with disconnect detection.""" + import logging + import threading + + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") + logger = logging.getLogger(__name__) + + valid_urls = [u for u in req.track_urls if u] + if not valid_urls: + raise HTTPException(status_code=400, detail="No valid track URLs provided") + if req.offsets_seconds is not None: + if len(req.offsets_seconds) != len(req.track_urls): + raise HTTPException( + status_code=400, + detail=f"offsets_seconds length ({len(req.offsets_seconds)}) " + f"must match track_urls ({len(req.track_urls)})", + ) + if any(o > 18000 for o in req.offsets_seconds): + raise HTTPException(status_code=400, detail="offsets_seconds exceeds maximum 18000s (5 hours)") + if not req.output_url: + raise HTTPException(status_code=400, detail="output_url cannot be empty") + + logger.info(f"Mixdown request: {len(valid_urls)} tracks") + + # Thread-safe cancellation flag + cancelled = threading.Event() + + async def check_disconnect(): + """Background task to check for client disconnect.""" + while not cancelled.is_set(): + await asyncio.sleep(DISCONNECT_CHECK_INTERVAL) + if await request.is_disconnected(): + logger.warning("Client disconnected, setting cancellation flag") + cancelled.set() + break + + disconnect_task = asyncio.create_task(check_disconnect()) + + try: + result = await asyncio.get_event_loop().run_in_executor( + None, _mixdown_tracks_blocking, req, cancelled, logger + ) + return MixdownResponse(**result) + finally: + cancelled.set() + disconnect_task.cancel() + try: + await disconnect_task + except asyncio.CancelledError: + pass + + def _mixdown_tracks_blocking(req, cancelled, logger) -> dict: + """Blocking CPU-bound mixdown work with periodic cancellation checks. + + Downloads all tracks, builds PyAV amix filter graph, encodes to MP3, + and uploads the result to the presigned output URL. + """ + import av + import requests + from av.audio.resampler import AudioResampler + import time + + temp_dir = tempfile.mkdtemp() + track_paths = [] + output_path = None + last_check = time.time() + + try: + # --- Download all tracks --- + valid_urls = [u for u in req.track_urls if u] + for i, url in enumerate(valid_urls): + if cancelled.is_set(): + logger.info("Cancelled during download phase") + return {"size": 0, "duration_ms": 0.0, "cancelled": True} + + logger.info(f"Downloading track {i}") + response = requests.get(url, stream=True, timeout=S3_TIMEOUT) + response.raise_for_status() + + track_path = os.path.join(temp_dir, f"track_{i}.webm") + total_bytes = 0 + chunk_count = 0 + with open(track_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + total_bytes += len(chunk) + chunk_count += 1 + if chunk_count % 12 == 0: + now = time.time() + if now - last_check >= DISCONNECT_CHECK_INTERVAL: + if cancelled.is_set(): + logger.info(f"Cancelled during track {i} download") + return {"size": 0, "duration_ms": 0.0, "cancelled": True} + last_check = now + + track_paths.append(track_path) + logger.info(f"Track {i} downloaded: {total_bytes} bytes") + + if not track_paths: + raise ValueError("No tracks downloaded") + + # --- Detect sample rate --- + target_sample_rate = req.target_sample_rate + if target_sample_rate is None: + for path in track_paths: + try: + container = av.open(path) + for frame in container.decode(audio=0): + target_sample_rate = frame.sample_rate + container.close() + break + else: + container.close() + continue + break + except Exception: + continue + if target_sample_rate is None: + raise ValueError("Could not detect sample rate from any track") + + logger.info(f"Target sample rate: {target_sample_rate}") + + # --- Calculate per-input delays --- + input_offsets_seconds = None + if req.offsets_seconds is not None: + input_offsets_seconds = [ + req.offsets_seconds[i] for i, url in enumerate(req.track_urls) if url + ] + + delays_ms = [] + if input_offsets_seconds is not None: + base = min(input_offsets_seconds) if input_offsets_seconds else 0.0 + delays_ms = [max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds] + else: + delays_ms = [0 for _ in track_paths] + + # --- Build filter graph --- + # N abuffer -> optional adelay -> amix -> aformat -> abuffersink + graph = av.filter.Graph() + inputs = [] + + for idx in range(len(track_paths)): + args = ( + f"time_base=1/{target_sample_rate}:" + f"sample_rate={target_sample_rate}:" + f"sample_fmt=s32:" + f"channel_layout=stereo" + ) + in_ctx = graph.add("abuffer", args=args, name=f"in{idx}") + inputs.append(in_ctx) + + mixer = graph.add("amix", args=f"inputs={len(inputs)}:normalize=0", name="mix") + fmt = graph.add( + "aformat", + args=f"sample_fmts=s32:channel_layouts=stereo:sample_rates={target_sample_rate}", + name="fmt", + ) + sink = graph.add("abuffersink", name="out") + + for idx, in_ctx in enumerate(inputs): + delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0 + if delay_ms > 0: + adelay = graph.add( + "adelay", + args=f"delays={delay_ms}|{delay_ms}:all=1", + name=f"delay{idx}", + ) + in_ctx.link_to(adelay) + adelay.link_to(mixer, 0, idx) + else: + in_ctx.link_to(mixer, 0, idx) + + mixer.link_to(fmt) + fmt.link_to(sink) + graph.configure() + + # --- Open all containers and decode --- + containers = [] + output_path = os.path.join(temp_dir, "mixed.mp3") + + try: + for path in track_paths: + containers.append(av.open(path)) + + decoders = [c.decode(audio=0) for c in containers] + active = [True] * len(decoders) + resamplers = [ + AudioResampler(format="s32", layout="stereo", rate=target_sample_rate) + for _ in decoders + ] + + # Open output MP3 + out_container = av.open(output_path, "w", format="mp3") + out_stream = out_container.add_stream("libmp3lame", rate=target_sample_rate) + total_duration = 0 + + while any(active): + # Check cancellation periodically + now = time.time() + if now - last_check >= DISCONNECT_CHECK_INTERVAL: + if cancelled.is_set(): + logger.info("Cancelled during mixing") + out_container.close() + return {"size": 0, "duration_ms": 0.0, "cancelled": True} + last_check = now + + for i, (dec, is_active) in enumerate(zip(decoders, active)): + if not is_active: + continue + try: + frame = next(dec) + except StopIteration: + active[i] = False + inputs[i].push(None) + continue + + if frame.sample_rate != target_sample_rate: + continue + + out_frames = resamplers[i].resample(frame) or [] + for rf in out_frames: + rf.sample_rate = target_sample_rate + rf.time_base = Fraction(1, target_sample_rate) + inputs[i].push(rf) + + while True: + try: + mixed = sink.pull() + except Exception: + break + mixed.sample_rate = target_sample_rate + mixed.time_base = Fraction(1, target_sample_rate) + for packet in out_stream.encode(mixed): + out_container.mux(packet) + total_duration += packet.duration + + # Flush filter graph + while True: + try: + mixed = sink.pull() + except Exception: + break + mixed.sample_rate = target_sample_rate + mixed.time_base = Fraction(1, target_sample_rate) + for packet in out_stream.encode(mixed): + out_container.mux(packet) + total_duration += packet.duration + + # Flush encoder + for packet in out_stream.encode(None): + out_container.mux(packet) + total_duration += packet.duration + + # Calculate duration in ms + last_tb = out_stream.time_base + duration_ms = 0.0 + if last_tb and total_duration > 0: + duration_ms = round(float(total_duration * last_tb * 1000), 2) + + out_container.close() + + finally: + for c in containers: + try: + c.close() + except Exception: + pass + + file_size = os.path.getsize(output_path) + logger.info(f"Mixdown complete: {file_size} bytes, {duration_ms}ms") + + if cancelled.is_set(): + logger.info("Cancelled after mixing, before upload") + return {"size": 0, "duration_ms": 0.0, "cancelled": True} + + # --- Upload result --- + logger.info("Uploading mixed audio to S3") + with open(output_path, "rb") as f: + upload_response = requests.put(req.output_url, data=f, timeout=S3_TIMEOUT) + upload_response.raise_for_status() + logger.info(f"Upload complete: {file_size} bytes") + + return {"size": file_size, "duration_ms": duration_ms} + + finally: + # Cleanup all temp files + for path in track_paths: + if os.path.exists(path): + try: + os.unlink(path) + except Exception as e: + logger.warning(f"Failed to cleanup track file: {e}") + if output_path and os.path.exists(output_path): + try: + os.unlink(output_path) + except Exception as e: + logger.warning(f"Failed to cleanup output file: {e}") + try: + os.rmdir(temp_dir) + except Exception as e: + logger.warning(f"Failed to cleanup temp directory: {e}") + + return web_app diff --git a/gpu/modal_deployments/reflector_padding.py b/gpu/modal_deployments/reflector_padding.py index 809131e6..7d0e45ac 100644 --- a/gpu/modal_deployments/reflector_padding.py +++ b/gpu/modal_deployments/reflector_padding.py @@ -52,10 +52,12 @@ OPUS_DEFAULT_BIT_RATE = 128000 timeout=PADDING_TIMEOUT, scaledown_window=SCALEDOWN_WINDOW, image=image, + secrets=[modal.Secret.from_name("reflector-gpu")], ) @modal.asgi_app() def web(): - from fastapi import FastAPI, Request, HTTPException + from fastapi import Depends, FastAPI, HTTPException, Request, status + from fastapi.security import OAuth2PasswordBearer from pydantic import BaseModel class PaddingRequest(BaseModel): @@ -70,7 +72,18 @@ def web(): web_app = FastAPI() - @web_app.post("/pad") + oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + + def apikey_auth(apikey: str = Depends(oauth2_scheme)): + if apikey == os.environ["REFLECTOR_GPU_APIKEY"]: + return + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid API key", + headers={"WWW-Authenticate": "Bearer"}, + ) + + @web_app.post("/pad", dependencies=[Depends(apikey_auth)]) async def pad_track_endpoint(request: Request, req: PaddingRequest) -> PaddingResponse: """Modal web endpoint for padding audio tracks with disconnect detection. """ diff --git a/gpu/self_hosted/app/factory.py b/gpu/self_hosted/app/factory.py index c13fc753..2579aff1 100644 --- a/gpu/self_hosted/app/factory.py +++ b/gpu/self_hosted/app/factory.py @@ -3,6 +3,7 @@ from contextlib import asynccontextmanager from fastapi import FastAPI from .routers.diarization import router as diarization_router +from .routers.mixdown import router as mixdown_router from .routers.padding import router as padding_router from .routers.transcription import router as transcription_router from .routers.translation import router as translation_router @@ -29,4 +30,5 @@ def create_app() -> FastAPI: app.include_router(translation_router) app.include_router(diarization_router) app.include_router(padding_router) + app.include_router(mixdown_router) return app diff --git a/gpu/self_hosted/app/routers/mixdown.py b/gpu/self_hosted/app/routers/mixdown.py new file mode 100644 index 00000000..19d5f0e6 --- /dev/null +++ b/gpu/self_hosted/app/routers/mixdown.py @@ -0,0 +1,288 @@ +""" +Audio mixdown endpoint for selfhosted GPU service. + +CPU-intensive multi-track audio mixing service for combining N audio tracks +into a single MP3 using PyAV amix filter graph. + +IMPORTANT: This mixdown logic is duplicated from server/reflector/utils/audio_mixdown.py +for deployment isolation (self_hosted can't import from server/reflector/). If you modify +the PyAV filter graph or mixdown algorithm, you MUST update both: + - gpu/self_hosted/app/routers/mixdown.py (this file) + - server/reflector/utils/audio_mixdown.py + +Constants duplicated from server/reflector/utils/audio_constants.py for same reason. +""" + +import logging +import os +import tempfile +from fractions import Fraction + +import av +import requests +from av.audio.resampler import AudioResampler +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel + +from ..auth import apikey_auth + +logger = logging.getLogger(__name__) + +router = APIRouter(tags=["mixdown"]) + +S3_TIMEOUT = 120 + + +class MixdownRequest(BaseModel): + track_urls: list[str] + output_url: str + target_sample_rate: int | None = None + offsets_seconds: list[float] | None = None + + +class MixdownResponse(BaseModel): + size: int + duration_ms: float = 0.0 + cancelled: bool = False + + +@router.post("/mixdown", dependencies=[Depends(apikey_auth)], response_model=MixdownResponse) +def mixdown_tracks(req: MixdownRequest): + """Mix multiple audio tracks into single MP3 using PyAV amix filter graph.""" + valid_urls = [u for u in req.track_urls if u] + if not valid_urls: + raise HTTPException(status_code=400, detail="No valid track URLs provided") + if req.offsets_seconds is not None: + if len(req.offsets_seconds) != len(req.track_urls): + raise HTTPException( + status_code=400, + detail=f"offsets_seconds length ({len(req.offsets_seconds)}) " + f"must match track_urls ({len(req.track_urls)})", + ) + if any(o > 18000 for o in req.offsets_seconds): + raise HTTPException( + status_code=400, detail="offsets_seconds exceeds maximum 18000s (5 hours)" + ) + if not req.output_url: + raise HTTPException(status_code=400, detail="output_url cannot be empty") + + logger.info("Mixdown request: %d tracks", len(valid_urls)) + + temp_dir = tempfile.mkdtemp() + track_paths = [] + output_path = None + + try: + # --- Download all tracks --- + for i, url in enumerate(valid_urls): + logger.info("Downloading track %d", i) + response = requests.get(url, stream=True, timeout=S3_TIMEOUT) + response.raise_for_status() + + track_path = os.path.join(temp_dir, f"track_{i}.webm") + total_bytes = 0 + with open(track_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + total_bytes += len(chunk) + + track_paths.append(track_path) + logger.info("Track %d downloaded: %d bytes", i, total_bytes) + + if not track_paths: + raise HTTPException(status_code=400, detail="No tracks could be downloaded") + + # --- Detect sample rate --- + target_sample_rate = req.target_sample_rate + if target_sample_rate is None: + for path in track_paths: + try: + container = av.open(path) + for frame in container.decode(audio=0): + target_sample_rate = frame.sample_rate + container.close() + break + else: + container.close() + continue + break + except Exception: + continue + if target_sample_rate is None: + raise HTTPException( + status_code=400, detail="Could not detect sample rate from any track" + ) + + logger.info("Target sample rate: %d", target_sample_rate) + + # --- Calculate per-input delays --- + input_offsets_seconds = None + if req.offsets_seconds is not None: + input_offsets_seconds = [ + req.offsets_seconds[i] for i, url in enumerate(req.track_urls) if url + ] + + delays_ms = [] + if input_offsets_seconds is not None: + base = min(input_offsets_seconds) if input_offsets_seconds else 0.0 + delays_ms = [max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds] + else: + delays_ms = [0 for _ in track_paths] + + # --- Build filter graph --- + # N abuffer -> optional adelay -> amix -> aformat -> abuffersink + graph = av.filter.Graph() + inputs = [] + + for idx in range(len(track_paths)): + args = ( + f"time_base=1/{target_sample_rate}:" + f"sample_rate={target_sample_rate}:" + f"sample_fmt=s32:" + f"channel_layout=stereo" + ) + in_ctx = graph.add("abuffer", args=args, name=f"in{idx}") + inputs.append(in_ctx) + + mixer = graph.add("amix", args=f"inputs={len(inputs)}:normalize=0", name="mix") + fmt = graph.add( + "aformat", + args=f"sample_fmts=s32:channel_layouts=stereo:sample_rates={target_sample_rate}", + name="fmt", + ) + sink = graph.add("abuffersink", name="out") + + for idx, in_ctx in enumerate(inputs): + delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0 + if delay_ms > 0: + adelay = graph.add( + "adelay", + args=f"delays={delay_ms}|{delay_ms}:all=1", + name=f"delay{idx}", + ) + in_ctx.link_to(adelay) + adelay.link_to(mixer, 0, idx) + else: + in_ctx.link_to(mixer, 0, idx) + + mixer.link_to(fmt) + fmt.link_to(sink) + graph.configure() + + # --- Open all containers and decode --- + containers = [] + output_path = os.path.join(temp_dir, "mixed.mp3") + + try: + for path in track_paths: + containers.append(av.open(path)) + + decoders = [c.decode(audio=0) for c in containers] + active = [True] * len(decoders) + resamplers = [ + AudioResampler(format="s32", layout="stereo", rate=target_sample_rate) + for _ in decoders + ] + + # Open output MP3 + out_container = av.open(output_path, "w", format="mp3") + out_stream = out_container.add_stream("libmp3lame", rate=target_sample_rate) + total_duration = 0 + + while any(active): + for i, (dec, is_active) in enumerate(zip(decoders, active)): + if not is_active: + continue + try: + frame = next(dec) + except StopIteration: + active[i] = False + inputs[i].push(None) + continue + + if frame.sample_rate != target_sample_rate: + continue + + out_frames = resamplers[i].resample(frame) or [] + for rf in out_frames: + rf.sample_rate = target_sample_rate + rf.time_base = Fraction(1, target_sample_rate) + inputs[i].push(rf) + + while True: + try: + mixed = sink.pull() + except Exception: + break + mixed.sample_rate = target_sample_rate + mixed.time_base = Fraction(1, target_sample_rate) + for packet in out_stream.encode(mixed): + out_container.mux(packet) + total_duration += packet.duration + + # Flush filter graph + while True: + try: + mixed = sink.pull() + except Exception: + break + mixed.sample_rate = target_sample_rate + mixed.time_base = Fraction(1, target_sample_rate) + for packet in out_stream.encode(mixed): + out_container.mux(packet) + total_duration += packet.duration + + # Flush encoder + for packet in out_stream.encode(None): + out_container.mux(packet) + total_duration += packet.duration + + # Calculate duration in ms + last_tb = out_stream.time_base + duration_ms = 0.0 + if last_tb and total_duration > 0: + duration_ms = round(float(total_duration * last_tb * 1000), 2) + + out_container.close() + + finally: + for c in containers: + try: + c.close() + except Exception: + pass + + file_size = os.path.getsize(output_path) + logger.info("Mixdown complete: %d bytes, %.2fms", file_size, duration_ms) + + # --- Upload result --- + logger.info("Uploading mixed audio to S3") + with open(output_path, "rb") as f: + upload_response = requests.put(req.output_url, data=f, timeout=S3_TIMEOUT) + upload_response.raise_for_status() + logger.info("Upload complete: %d bytes", file_size) + + return MixdownResponse(size=file_size, duration_ms=duration_ms) + + except HTTPException: + raise + except Exception as e: + logger.error("Mixdown failed: %s", e, exc_info=True) + raise HTTPException(status_code=500, detail=f"Mixdown failed: {e}") from e + finally: + for path in track_paths: + if os.path.exists(path): + try: + os.unlink(path) + except Exception as e: + logger.warning("Failed to cleanup track file: %s", e) + if output_path and os.path.exists(output_path): + try: + os.unlink(output_path) + except Exception as e: + logger.warning("Failed to cleanup output file: %s", e) + try: + os.rmdir(temp_dir) + except Exception as e: + logger.warning("Failed to cleanup temp directory: %s", e) diff --git a/scripts/setup-selfhosted.sh b/scripts/setup-selfhosted.sh index ac519f93..c716ebf0 100755 --- a/scripts/setup-selfhosted.sh +++ b/scripts/setup-selfhosted.sh @@ -4,13 +4,21 @@ # Single script to configure and launch everything on one server. # # Usage: -# ./scripts/setup-selfhosted.sh <--gpu|--cpu|--hosted> [--ollama-gpu|--ollama-cpu] [--llm-model MODEL] [--garage] [--caddy] [--domain DOMAIN] [--custom-ca PATH] [--password PASSWORD] [--build] +# ./scripts/setup-selfhosted.sh <--gpu|--cpu|--hosted> [options] [--transcript BACKEND] [--diarization BACKEND] [--translation BACKEND] [--padding BACKEND] [--mixdown BACKEND] +# ./scripts/setup-selfhosted.sh (re-run with saved config from last run) # -# ML processing modes (pick ONE — required): +# ML processing modes (pick ONE — required on first run): # --gpu NVIDIA GPU container for transcription/diarization/translation # --cpu In-process CPU processing (no ML container, slower) # --hosted Remote GPU service URL (no ML container) # +# Per-service backend overrides (optional — override individual services from the base mode): +# --transcript BACKEND whisper | modal (default: whisper for --cpu, modal for --gpu/--hosted) +# --diarization BACKEND pyannote | modal (default: pyannote for --cpu, modal for --gpu/--hosted) +# --translation BACKEND marian | modal | passthrough (default: marian for --cpu, modal for --gpu/--hosted) +# --padding BACKEND pyav | modal (default: pyav for --cpu, modal for --gpu/--hosted) +# --mixdown BACKEND pyav | modal (default: pyav for --cpu, modal for --gpu/--hosted) +# # Local LLM (optional — for summarization & topic detection): # --ollama-gpu Local Ollama with NVIDIA GPU acceleration # --ollama-cpu Local Ollama on CPU only @@ -38,12 +46,17 @@ # ./scripts/setup-selfhosted.sh --gpu --ollama-gpu --garage --caddy --domain reflector.example.com # ./scripts/setup-selfhosted.sh --cpu --ollama-cpu --garage --caddy # ./scripts/setup-selfhosted.sh --hosted --garage --caddy +# ./scripts/setup-selfhosted.sh --cpu --padding modal --garage --caddy +# ./scripts/setup-selfhosted.sh --gpu --translation passthrough --garage --caddy +# ./scripts/setup-selfhosted.sh --cpu --diarization modal --translation modal --garage # ./scripts/setup-selfhosted.sh --gpu --ollama-gpu --llm-model mistral --garage --caddy # ./scripts/setup-selfhosted.sh --gpu --garage --caddy --password mysecretpass -# ./scripts/setup-selfhosted.sh --gpu --garage --caddy -# ./scripts/setup-selfhosted.sh --cpu # ./scripts/setup-selfhosted.sh --gpu --caddy --domain reflector.local --custom-ca certs/ # ./scripts/setup-selfhosted.sh --hosted --custom-ca /path/to/corporate-ca.crt +# ./scripts/setup-selfhosted.sh # re-run with saved config +# +# Config memory: after a successful run, flags are saved to data/.selfhosted-last-args. +# Re-running with no arguments replays the saved configuration automatically. # # The script auto-detects Daily.co (DAILY_API_KEY) and Whereby (WHEREBY_API_KEY) # from server/.env. If Daily.co is configured, Hatchet workflow services are @@ -59,6 +72,7 @@ ROOT_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" COMPOSE_FILE="$ROOT_DIR/docker-compose.selfhosted.yml" SERVER_ENV="$ROOT_DIR/server/.env" WWW_ENV="$ROOT_DIR/www/.env" +LAST_ARGS_FILE="$ROOT_DIR/data/.selfhosted-last-args" OLLAMA_MODEL="qwen2.5:14b" OS="$(uname -s)" @@ -178,6 +192,17 @@ compose_garage_cmd() { docker compose $files --profile garage "$@" } +# --- Config memory: replay last args if none provided --- +if [[ $# -eq 0 ]] && [[ -f "$LAST_ARGS_FILE" ]]; then + SAVED_ARGS="$(cat "$LAST_ARGS_FILE")" + if [[ -n "$SAVED_ARGS" ]]; then + info "No flags provided — replaying saved configuration:" + info " $SAVED_ARGS" + echo "" + eval "set -- $SAVED_ARGS" + fi +fi + # --- Parse arguments --- MODEL_MODE="" # gpu or cpu (required, mutually exclusive) OLLAMA_MODE="" # ollama-gpu or ollama-cpu (optional) @@ -189,6 +214,19 @@ ADMIN_PASSWORD="" # optional admin password for password auth CUSTOM_CA="" # --custom-ca: path to dir or CA cert file USE_CUSTOM_CA=false # derived flag: true when --custom-ca is provided EXTRA_CA_FILES=() # --extra-ca: additional CA certs to trust (can be repeated) +OVERRIDE_TRANSCRIPT="" # per-service override: whisper | modal +OVERRIDE_DIARIZATION="" # per-service override: pyannote | modal +OVERRIDE_TRANSLATION="" # per-service override: marian | modal | passthrough +OVERRIDE_PADDING="" # per-service override: pyav | modal +OVERRIDE_MIXDOWN="" # per-service override: pyav | modal + +# Validate per-service backend override values +validate_backend() { + local service="$1" value="$2"; shift 2; local valid=("$@") + for v in "${valid[@]}"; do [[ "$value" == "$v" ]] && return 0; done + err "--$service value '$value' is not valid. Choose one of: ${valid[*]}" + exit 1 +} SKIP_NEXT=false ARGS=("$@") @@ -265,14 +303,65 @@ for i in "${!ARGS[@]}"; do EXTRA_CA_FILES+=("$extra_ca_file") USE_CUSTOM_CA=true SKIP_NEXT=true ;; + --transcript) + next_i=$((i + 1)) + if [[ $next_i -ge ${#ARGS[@]} ]] || [[ "${ARGS[$next_i]}" == --* ]]; then + err "--transcript requires a backend (whisper | modal)" + exit 1 + fi + validate_backend "transcript" "${ARGS[$next_i]}" whisper modal + OVERRIDE_TRANSCRIPT="${ARGS[$next_i]}" + SKIP_NEXT=true ;; + --diarization) + next_i=$((i + 1)) + if [[ $next_i -ge ${#ARGS[@]} ]] || [[ "${ARGS[$next_i]}" == --* ]]; then + err "--diarization requires a backend (pyannote | modal)" + exit 1 + fi + validate_backend "diarization" "${ARGS[$next_i]}" pyannote modal + OVERRIDE_DIARIZATION="${ARGS[$next_i]}" + SKIP_NEXT=true ;; + --translation) + next_i=$((i + 1)) + if [[ $next_i -ge ${#ARGS[@]} ]] || [[ "${ARGS[$next_i]}" == --* ]]; then + err "--translation requires a backend (marian | modal | passthrough)" + exit 1 + fi + validate_backend "translation" "${ARGS[$next_i]}" marian modal passthrough + OVERRIDE_TRANSLATION="${ARGS[$next_i]}" + SKIP_NEXT=true ;; + --padding) + next_i=$((i + 1)) + if [[ $next_i -ge ${#ARGS[@]} ]] || [[ "${ARGS[$next_i]}" == --* ]]; then + err "--padding requires a backend (pyav | modal)" + exit 1 + fi + validate_backend "padding" "${ARGS[$next_i]}" pyav modal + OVERRIDE_PADDING="${ARGS[$next_i]}" + SKIP_NEXT=true ;; + --mixdown) + next_i=$((i + 1)) + if [[ $next_i -ge ${#ARGS[@]} ]] || [[ "${ARGS[$next_i]}" == --* ]]; then + err "--mixdown requires a backend (pyav | modal)" + exit 1 + fi + validate_backend "mixdown" "${ARGS[$next_i]}" pyav modal + OVERRIDE_MIXDOWN="${ARGS[$next_i]}" + SKIP_NEXT=true ;; *) err "Unknown argument: $arg" - err "Usage: $0 <--gpu|--cpu|--hosted> [--ollama-gpu|--ollama-cpu] [--llm-model MODEL] [--garage] [--caddy] [--domain DOMAIN] [--custom-ca PATH] [--password PASS] [--build]" + err "Usage: $0 <--gpu|--cpu|--hosted> [options] [--transcript BACKEND] [--diarization BACKEND] [--translation BACKEND] [--padding BACKEND] [--mixdown BACKEND]" exit 1 ;; esac done +# --- Save CLI args for config memory (re-run without flags) --- +if [[ $# -gt 0 ]]; then + mkdir -p "$ROOT_DIR/data" + printf '%q ' "$@" > "$LAST_ARGS_FILE" +fi + # --- Resolve --custom-ca flag --- CA_CERT_PATH="" # resolved path to CA certificate TLS_CERT_PATH="" # resolved path to server cert (optional, for Caddy TLS) @@ -330,13 +419,20 @@ fi if [[ -z "$MODEL_MODE" ]]; then err "No model mode specified. You must choose --gpu, --cpu, or --hosted." err "" - err "Usage: $0 <--gpu|--cpu|--hosted> [--ollama-gpu|--ollama-cpu] [--llm-model MODEL] [--garage] [--caddy] [--domain DOMAIN] [--custom-ca PATH] [--password PASS] [--build]" + err "Usage: $0 <--gpu|--cpu|--hosted> [options] [--transcript BACKEND] [--diarization BACKEND] [--translation BACKEND] [--padding BACKEND] [--mixdown BACKEND]" err "" err "ML processing modes (required):" err " --gpu NVIDIA GPU container for transcription/diarization/translation" err " --cpu In-process CPU processing (no ML container, slower)" err " --hosted Remote GPU service URL (no ML container)" err "" + err "Per-service backend overrides (optional — override individual services):" + err " --transcript BACKEND whisper | modal (default: whisper for --cpu, modal for --gpu/--hosted)" + err " --diarization BACKEND pyannote | modal (default: pyannote for --cpu, modal for --gpu/--hosted)" + err " --translation BACKEND marian | modal | passthrough (default: marian for --cpu, modal for --gpu/--hosted)" + err " --padding BACKEND pyav | modal (default: pyav for --cpu, modal for --gpu/--hosted)" + err " --mixdown BACKEND pyav | modal (default: pyav for --cpu, modal for --gpu/--hosted)" + err "" err "Local LLM (optional):" err " --ollama-gpu Local Ollama with GPU (for summarization/topics)" err " --ollama-cpu Local Ollama on CPU (for summarization/topics)" @@ -351,6 +447,8 @@ if [[ -z "$MODEL_MODE" ]]; then err " --extra-ca FILE Additional CA cert to trust (repeatable for multiple CAs)" err " --password PASS Enable password auth (admin@localhost) instead of public mode" err " --build Build backend/frontend images from source instead of pulling" + err "" + err "Tip: After your first run, re-run with no flags to reuse the same configuration." exit 1 fi @@ -374,9 +472,38 @@ OLLAMA_SVC="" [[ "$OLLAMA_MODE" == "ollama-gpu" ]] && USES_OLLAMA=true && OLLAMA_SVC="ollama" [[ "$OLLAMA_MODE" == "ollama-cpu" ]] && USES_OLLAMA=true && OLLAMA_SVC="ollama-cpu" +# Resolve effective backend per service (override wins over base mode default) +case "$MODEL_MODE" in + gpu|hosted) + EFF_TRANSCRIPT="${OVERRIDE_TRANSCRIPT:-modal}" + EFF_DIARIZATION="${OVERRIDE_DIARIZATION:-modal}" + EFF_TRANSLATION="${OVERRIDE_TRANSLATION:-modal}" + EFF_PADDING="${OVERRIDE_PADDING:-modal}" + EFF_MIXDOWN="${OVERRIDE_MIXDOWN:-modal}" + ;; + cpu) + EFF_TRANSCRIPT="${OVERRIDE_TRANSCRIPT:-whisper}" + EFF_DIARIZATION="${OVERRIDE_DIARIZATION:-pyannote}" + EFF_TRANSLATION="${OVERRIDE_TRANSLATION:-marian}" + EFF_PADDING="${OVERRIDE_PADDING:-pyav}" + EFF_MIXDOWN="${OVERRIDE_MIXDOWN:-pyav}" + ;; +esac + +# Check if any per-service overrides were provided +HAS_OVERRIDES=false +[[ -n "$OVERRIDE_TRANSCRIPT" ]] && HAS_OVERRIDES=true +[[ -n "$OVERRIDE_DIARIZATION" ]] && HAS_OVERRIDES=true +[[ -n "$OVERRIDE_TRANSLATION" ]] && HAS_OVERRIDES=true +[[ -n "$OVERRIDE_PADDING" ]] && HAS_OVERRIDES=true +[[ -n "$OVERRIDE_MIXDOWN" ]] && HAS_OVERRIDES=true + # Human-readable mode string for display MODE_DISPLAY="$MODEL_MODE" [[ -n "$OLLAMA_MODE" ]] && MODE_DISPLAY="$MODEL_MODE + $OLLAMA_MODE" +if [[ "$HAS_OVERRIDES" == "true" ]]; then + MODE_DISPLAY="$MODE_DISPLAY (overrides: transcript=$EFF_TRANSCRIPT, diarization=$EFF_DIARIZATION, translation=$EFF_TRANSLATION, padding=$EFF_PADDING, mixdown=$EFF_MIXDOWN)" +fi # ========================================================= # Step 0: Prerequisites @@ -623,54 +750,30 @@ step_server_env() { env_set "$SERVER_ENV" "WEBRTC_HOST" "$PRIMARY_IP" fi - # Specialized models — backend configuration per mode + # Specialized models — backend configuration per service env_set "$SERVER_ENV" "DIARIZATION_ENABLED" "true" + + # Resolve the URL for modal backends + local modal_url="" case "$MODEL_MODE" in gpu) - # GPU container aliased as "transcription" on docker network - env_set "$SERVER_ENV" "TRANSCRIPT_BACKEND" "modal" - env_set "$SERVER_ENV" "TRANSCRIPT_URL" "http://transcription:8000" - env_set "$SERVER_ENV" "TRANSCRIPT_MODAL_API_KEY" "selfhosted" - env_set "$SERVER_ENV" "DIARIZATION_BACKEND" "modal" - env_set "$SERVER_ENV" "DIARIZATION_URL" "http://transcription:8000" - env_set "$SERVER_ENV" "TRANSLATION_BACKEND" "modal" - env_set "$SERVER_ENV" "TRANSLATE_URL" "http://transcription:8000" - env_set "$SERVER_ENV" "PADDING_BACKEND" "modal" - env_set "$SERVER_ENV" "PADDING_URL" "http://transcription:8000" - ok "ML backends: GPU container (modal)" - ;; - cpu) - # In-process backends — no ML service container needed - env_set "$SERVER_ENV" "TRANSCRIPT_BACKEND" "whisper" - env_set "$SERVER_ENV" "DIARIZATION_BACKEND" "pyannote" - env_set "$SERVER_ENV" "TRANSLATION_BACKEND" "marian" - env_set "$SERVER_ENV" "PADDING_BACKEND" "pyav" - ok "ML backends: in-process CPU (whisper/pyannote/marian/pyav)" + modal_url="http://transcription:8000" ;; hosted) # Remote GPU service — user provides URL - local gpu_url="" if env_has_key "$SERVER_ENV" "TRANSCRIPT_URL"; then - gpu_url=$(env_get "$SERVER_ENV" "TRANSCRIPT_URL") + modal_url=$(env_get "$SERVER_ENV" "TRANSCRIPT_URL") fi - if [[ -z "$gpu_url" ]] && [[ -t 0 ]]; then + if [[ -z "$modal_url" ]] && [[ -t 0 ]]; then echo "" info "Enter the URL of your remote GPU service (e.g. https://gpu.example.com)" - read -rp " GPU service URL: " gpu_url + read -rp " GPU service URL: " modal_url fi - if [[ -z "$gpu_url" ]]; then + if [[ -z "$modal_url" ]]; then err "GPU service URL required for --hosted mode." err "Set TRANSCRIPT_URL in server/.env or provide it interactively." exit 1 fi - env_set "$SERVER_ENV" "TRANSCRIPT_BACKEND" "modal" - env_set "$SERVER_ENV" "TRANSCRIPT_URL" "$gpu_url" - env_set "$SERVER_ENV" "DIARIZATION_BACKEND" "modal" - env_set "$SERVER_ENV" "DIARIZATION_URL" "$gpu_url" - env_set "$SERVER_ENV" "TRANSLATION_BACKEND" "modal" - env_set "$SERVER_ENV" "TRANSLATE_URL" "$gpu_url" - env_set "$SERVER_ENV" "PADDING_BACKEND" "modal" - env_set "$SERVER_ENV" "PADDING_URL" "$gpu_url" # API key for remote service local gpu_api_key="" if env_has_key "$SERVER_ENV" "TRANSCRIPT_MODAL_API_KEY"; then @@ -682,15 +785,106 @@ step_server_env() { if [[ -n "$gpu_api_key" ]]; then env_set "$SERVER_ENV" "TRANSCRIPT_MODAL_API_KEY" "$gpu_api_key" fi - ok "ML backends: remote hosted ($gpu_url)" + ;; + cpu) + # CPU mode: modal_url stays empty. If services are overridden to modal, + # the user must configure the URL (TRANSCRIPT_URL etc.) in server/.env manually. + # We intentionally do NOT read from existing env here to avoid overwriting + # per-service URLs with a stale TRANSCRIPT_URL from a previous --gpu run. ;; esac + # Set each service backend independently using effective backends + # Transcript + case "$EFF_TRANSCRIPT" in + modal) + env_set "$SERVER_ENV" "TRANSCRIPT_BACKEND" "modal" + if [[ -n "$modal_url" ]]; then + env_set "$SERVER_ENV" "TRANSCRIPT_URL" "$modal_url" + fi + [[ "$MODEL_MODE" == "gpu" ]] && env_set "$SERVER_ENV" "TRANSCRIPT_MODAL_API_KEY" "selfhosted" + ;; + whisper) + env_set "$SERVER_ENV" "TRANSCRIPT_BACKEND" "whisper" + ;; + esac + + # Diarization + case "$EFF_DIARIZATION" in + modal) + env_set "$SERVER_ENV" "DIARIZATION_BACKEND" "modal" + if [[ -n "$modal_url" ]]; then + env_set "$SERVER_ENV" "DIARIZATION_URL" "$modal_url" + fi + ;; + pyannote) + env_set "$SERVER_ENV" "DIARIZATION_BACKEND" "pyannote" + ;; + esac + + # Translation + case "$EFF_TRANSLATION" in + modal) + env_set "$SERVER_ENV" "TRANSLATION_BACKEND" "modal" + if [[ -n "$modal_url" ]]; then + env_set "$SERVER_ENV" "TRANSLATE_URL" "$modal_url" + fi + ;; + marian) + env_set "$SERVER_ENV" "TRANSLATION_BACKEND" "marian" + ;; + passthrough) + env_set "$SERVER_ENV" "TRANSLATION_BACKEND" "passthrough" + ;; + esac + + # Padding + case "$EFF_PADDING" in + modal) + env_set "$SERVER_ENV" "PADDING_BACKEND" "modal" + if [[ -n "$modal_url" ]]; then + env_set "$SERVER_ENV" "PADDING_URL" "$modal_url" + fi + ;; + pyav) + env_set "$SERVER_ENV" "PADDING_BACKEND" "pyav" + ;; + esac + + # Mixdown + case "$EFF_MIXDOWN" in + modal) + env_set "$SERVER_ENV" "MIXDOWN_BACKEND" "modal" + if [[ -n "$modal_url" ]]; then + env_set "$SERVER_ENV" "MIXDOWN_URL" "$modal_url" + fi + ;; + pyav) + env_set "$SERVER_ENV" "MIXDOWN_BACKEND" "pyav" + ;; + esac + + # Warn about modal overrides in CPU mode that need URL configuration + if [[ "$MODEL_MODE" == "cpu" ]] && [[ -z "$modal_url" ]]; then + local needs_url=false + [[ "$EFF_TRANSCRIPT" == "modal" ]] && needs_url=true + [[ "$EFF_DIARIZATION" == "modal" ]] && needs_url=true + [[ "$EFF_TRANSLATION" == "modal" ]] && needs_url=true + [[ "$EFF_PADDING" == "modal" ]] && needs_url=true + [[ "$EFF_MIXDOWN" == "modal" ]] && needs_url=true + if [[ "$needs_url" == "true" ]]; then + warn "One or more services are set to 'modal' but no service URL is configured." + warn "Set TRANSCRIPT_URL (and optionally TRANSCRIPT_MODAL_API_KEY) in server/.env" + warn "to point to your GPU service, then re-run this script." + fi + fi + + ok "ML backends: transcript=$EFF_TRANSCRIPT, diarization=$EFF_DIARIZATION, translation=$EFF_TRANSLATION, padding=$EFF_PADDING, mixdown=$EFF_MIXDOWN" + # HuggingFace token for gated models (pyannote diarization) - # --gpu: written to root .env (docker compose passes to GPU container) - # --cpu: written to both root .env and server/.env (in-process pyannote needs it) - # --hosted: not needed (remote service handles its own auth) - if [[ "$MODEL_MODE" != "hosted" ]]; then + # Needed when: GPU container is running (MODEL_MODE=gpu), or diarization uses pyannote in-process + # Not needed when: all modal services point to a remote hosted URL with its own auth + if [[ "$MODEL_MODE" == "gpu" ]] || [[ "$EFF_DIARIZATION" == "pyannote" ]]; then local root_env="$ROOT_DIR/.env" local current_hf_token="${HF_TOKEN:-}" if [[ -f "$root_env" ]] && env_has_key "$root_env" "HF_TOKEN"; then @@ -709,8 +903,8 @@ step_server_env() { touch "$root_env" env_set "$root_env" "HF_TOKEN" "$current_hf_token" export HF_TOKEN="$current_hf_token" - # In CPU mode, server process needs HF_TOKEN directly - if [[ "$MODEL_MODE" == "cpu" ]]; then + # When diarization runs in-process (pyannote), server process needs HF_TOKEN directly + if [[ "$EFF_DIARIZATION" == "pyannote" ]]; then env_set "$SERVER_ENV" "HF_TOKEN" "$current_hf_token" fi ok "HF_TOKEN configured" @@ -743,11 +937,15 @@ step_server_env() { fi fi - # CPU mode: increase file processing timeouts (default 600s is too short for long audio on CPU) - if [[ "$MODEL_MODE" == "cpu" ]]; then + # Increase file processing timeouts for CPU backends (default 600s is too short for long audio on CPU) + if [[ "$EFF_TRANSCRIPT" == "whisper" ]]; then env_set "$SERVER_ENV" "TRANSCRIPT_FILE_TIMEOUT" "3600" + fi + if [[ "$EFF_DIARIZATION" == "pyannote" ]]; then env_set "$SERVER_ENV" "DIARIZATION_FILE_TIMEOUT" "3600" - ok "CPU mode — file processing timeouts set to 3600s (1 hour)" + fi + if [[ "$EFF_TRANSCRIPT" == "whisper" ]] || [[ "$EFF_DIARIZATION" == "pyannote" ]]; then + ok "CPU backend(s) detected — file processing timeouts set to 3600s (1 hour)" fi # Hatchet is always required (file, live, and multitrack pipelines all use it) @@ -1175,9 +1373,9 @@ step_health() { warn "Check with: docker compose -f docker-compose.selfhosted.yml logs gpu" fi elif [[ "$MODEL_MODE" == "cpu" ]]; then - ok "CPU mode — ML processing runs in-process on server/worker (no separate service)" + ok "CPU mode — in-process backends run on server/worker (transcript=$EFF_TRANSCRIPT, diarization=$EFF_DIARIZATION, translation=$EFF_TRANSLATION, padding=$EFF_PADDING, mixdown=$EFF_MIXDOWN)" elif [[ "$MODEL_MODE" == "hosted" ]]; then - ok "Hosted mode — ML processing via remote GPU service (no local health check)" + ok "Hosted mode — ML processing via remote GPU service (transcript=$EFF_TRANSCRIPT, diarization=$EFF_DIARIZATION, translation=$EFF_TRANSLATION, padding=$EFF_PADDING, mixdown=$EFF_MIXDOWN)" fi # Ollama (if applicable) @@ -1375,6 +1573,10 @@ main() { echo "==========================================" echo "" echo " Models: $MODEL_MODE" + if [[ "$HAS_OVERRIDES" == "true" ]]; then + echo " transcript=$EFF_TRANSCRIPT, diarization=$EFF_DIARIZATION" + echo " translation=$EFF_TRANSLATION, padding=$EFF_PADDING, mixdown=$EFF_MIXDOWN" + fi echo " LLM: ${OLLAMA_MODE:-external}" echo " Garage: $USE_GARAGE" echo " Caddy: $USE_CADDY" @@ -1487,7 +1689,13 @@ EOF echo " API: server:1250 (or localhost:1250 from host)" fi echo "" - echo " Models: $MODEL_MODE (transcription/diarization/translation)" + if [[ "$HAS_OVERRIDES" == "true" ]]; then + echo " Models: $MODEL_MODE base + overrides" + echo " transcript=$EFF_TRANSCRIPT, diarization=$EFF_DIARIZATION" + echo " translation=$EFF_TRANSLATION, padding=$EFF_PADDING, mixdown=$EFF_MIXDOWN" + else + echo " Models: $MODEL_MODE (transcription/diarization/translation/padding)" + fi [[ "$USE_GARAGE" == "true" ]] && echo " Storage: Garage (local S3)" [[ "$USE_GARAGE" != "true" ]] && echo " Storage: External S3" [[ "$USES_OLLAMA" == "true" ]] && echo " LLM: Ollama ($OLLAMA_MODEL) for summarization/topics" @@ -1507,7 +1715,8 @@ EOF echo "" fi echo " To stop: docker compose -f docker-compose.selfhosted.yml down" - echo " To re-run: ./scripts/setup-selfhosted.sh $*" + echo " To re-run: ./scripts/setup-selfhosted.sh (replays saved config)" + echo " Last args: $*" echo "" } diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index 35a1003b..9a49990e 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -84,7 +84,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import ( from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow from reflector.logger import logger from reflector.pipelines import topic_processing -from reflector.processors import AudioFileWriterProcessor +from reflector.processors.audio_mixdown_auto import AudioMixdownAutoProcessor from reflector.processors.summary.models import ActionItemsResponse from reflector.processors.summary.prompts import ( RECAP_PROMPT, @@ -99,10 +99,6 @@ from reflector.utils.audio_constants import ( PRESIGNED_URL_EXPIRATION_SECONDS, WAVEFORM_SEGMENTS, ) -from reflector.utils.audio_mixdown import ( - detect_sample_rate_from_tracks, - mixdown_tracks_pyav, -) from reflector.utils.audio_waveform import get_audio_waveform from reflector.utils.daily import ( filter_cam_audio_tracks, @@ -539,7 +535,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes ) @with_error_handling(TaskName.MIXDOWN_TRACKS) async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: - """Mix all padded tracks into single audio file using PyAV (same as Celery).""" + """Mix all padded tracks into single audio file via configured backend.""" ctx.log("mixdown_tracks: mixing padded tracks into single audio file") track_result = ctx.task_output(process_tracks) @@ -579,37 +575,33 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: if not valid_urls: raise ValueError("No valid padded tracks to mixdown") - target_sample_rate = detect_sample_rate_from_tracks(valid_urls, logger=logger) - if not target_sample_rate: - logger.error("Mixdown failed - no decodable audio frames found") - raise ValueError("No decodable audio frames in any track") - - output_path = tempfile.mktemp(suffix=".mp3") - duration_ms_callback_capture_container = [0.0] - - async def capture_duration(d): - duration_ms_callback_capture_container[0] = d - - writer = AudioFileWriterProcessor(path=output_path, on_duration=capture_duration) - - await mixdown_tracks_pyav( - valid_urls, - writer, - target_sample_rate, - offsets_seconds=None, - logger=logger, - progress_callback=make_audio_progress_logger(ctx, TaskName.MIXDOWN_TRACKS), - expected_duration_sec=recording_duration if recording_duration > 0 else None, - ) - await writer.flush() - - file_size = Path(output_path).stat().st_size storage_path = f"{input.transcript_id}/audio.mp3" - with open(output_path, "rb") as mixed_file: - await storage.put_file(storage_path, mixed_file) + # Generate presigned PUT URL for the output (used by modal backend; + # pyav backend ignores it and writes locally instead) + output_url = await storage.get_file_url( + storage_path, + operation="put_object", + expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, + ) - Path(output_path).unlink(missing_ok=True) + processor = AudioMixdownAutoProcessor() + result = await processor.mixdown_tracks( + valid_urls, output_url, offsets_seconds=None + ) + + if result.output_path: + # Pyav backend wrote locally — upload to storage ourselves + output_file = Path(result.output_path) + with open(output_file, "rb") as mixed_file: + await storage.put_file(storage_path, mixed_file) + output_file.unlink(missing_ok=True) + # Clean up the temp directory the pyav processor created + try: + output_file.parent.rmdir() + except OSError: + pass + # else: modal backend already uploaded to output_url async with fresh_db_connection(): from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 @@ -620,11 +612,11 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: transcript, {"audio_location": "storage"} ) - ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}") + ctx.log(f"mixdown_tracks complete: {result.size} bytes to {storage_path}") return MixdownResult( audio_key=storage_path, - duration=duration_ms_callback_capture_container[0], + duration=result.duration_ms, tracks_mixed=len(valid_urls), ) diff --git a/server/reflector/processors/__init__.py b/server/reflector/processors/__init__.py index 5f2d2496..7477a8c7 100644 --- a/server/reflector/processors/__init__.py +++ b/server/reflector/processors/__init__.py @@ -4,6 +4,8 @@ from .audio_diarization_auto import AudioDiarizationAutoProcessor # noqa: F401 from .audio_downscale import AudioDownscaleProcessor # noqa: F401 from .audio_file_writer import AudioFileWriterProcessor # noqa: F401 from .audio_merge import AudioMergeProcessor # noqa: F401 +from .audio_mixdown import AudioMixdownProcessor # noqa: F401 +from .audio_mixdown_auto import AudioMixdownAutoProcessor # noqa: F401 from .audio_padding import AudioPaddingProcessor # noqa: F401 from .audio_padding_auto import AudioPaddingAutoProcessor # noqa: F401 from .audio_transcript import AudioTranscriptProcessor # noqa: F401 diff --git a/server/reflector/processors/audio_mixdown.py b/server/reflector/processors/audio_mixdown.py new file mode 100644 index 00000000..40e01672 --- /dev/null +++ b/server/reflector/processors/audio_mixdown.py @@ -0,0 +1,27 @@ +""" +Base class for audio mixdown processors. +""" + +from pydantic import BaseModel + + +class MixdownResponse(BaseModel): + size: int + duration_ms: float = 0.0 + cancelled: bool = False + output_path: str | None = ( + None # Local file path (pyav sets this; modal leaves None) + ) + + +class AudioMixdownProcessor: + """Base class for audio mixdown processors.""" + + async def mixdown_tracks( + self, + track_urls: list[str], + output_url: str, + target_sample_rate: int | None = None, + offsets_seconds: list[float] | None = None, + ) -> MixdownResponse: + raise NotImplementedError diff --git a/server/reflector/processors/audio_mixdown_auto.py b/server/reflector/processors/audio_mixdown_auto.py new file mode 100644 index 00000000..d47663b8 --- /dev/null +++ b/server/reflector/processors/audio_mixdown_auto.py @@ -0,0 +1,32 @@ +import importlib + +from reflector.processors.audio_mixdown import AudioMixdownProcessor +from reflector.settings import settings + + +class AudioMixdownAutoProcessor(AudioMixdownProcessor): + _registry = {} + + @classmethod + def register(cls, name, kclass): + cls._registry[name] = kclass + + def __new__(cls, name: str | None = None, **kwargs): + if name is None: + name = settings.MIXDOWN_BACKEND + if name not in cls._registry: + module_name = f"reflector.processors.audio_mixdown_{name}" + importlib.import_module(module_name) + + # gather specific configuration for the processor + # search `MIXDOWN_XXX_YYY`, push to constructor as `xxx_yyy` + config = {} + name_upper = name.upper() + settings_prefix = "MIXDOWN_" + config_prefix = f"{settings_prefix}{name_upper}_" + for key, value in settings: + if key.startswith(config_prefix): + config_name = key[len(settings_prefix) :].lower() + config[config_name] = value + + return cls._registry[name](**config | kwargs) diff --git a/server/reflector/processors/audio_mixdown_modal.py b/server/reflector/processors/audio_mixdown_modal.py new file mode 100644 index 00000000..b1569be3 --- /dev/null +++ b/server/reflector/processors/audio_mixdown_modal.py @@ -0,0 +1,110 @@ +""" +Modal.com backend for audio mixdown. +""" + +import asyncio +import os + +import httpx + +from reflector.hatchet.constants import TIMEOUT_HEAVY_HTTP +from reflector.logger import logger +from reflector.processors.audio_mixdown import AudioMixdownProcessor, MixdownResponse +from reflector.processors.audio_mixdown_auto import AudioMixdownAutoProcessor + + +class AudioMixdownModalProcessor(AudioMixdownProcessor): + """Audio mixdown processor using Modal.com/self-hosted backend via HTTP.""" + + def __init__( + self, mixdown_url: str | None = None, modal_api_key: str | None = None + ): + self.mixdown_url = mixdown_url or os.getenv("MIXDOWN_URL") + if not self.mixdown_url: + raise ValueError( + "MIXDOWN_URL required to use AudioMixdownModalProcessor. " + "Set MIXDOWN_URL environment variable or pass mixdown_url parameter." + ) + + self.modal_api_key = modal_api_key or os.getenv("MODAL_API_KEY") + + async def mixdown_tracks( + self, + track_urls: list[str], + output_url: str, + target_sample_rate: int | None = None, + offsets_seconds: list[float] | None = None, + ) -> MixdownResponse: + """Mix audio tracks via remote Modal/self-hosted backend. + + Args: + track_urls: Presigned GET URLs for source audio tracks + output_url: Presigned PUT URL for output MP3 + target_sample_rate: Sample rate for output (Hz), auto-detected if None + offsets_seconds: Optional per-track delays in seconds for alignment + """ + valid_count = len([u for u in track_urls if u]) + log = logger.bind(track_count=valid_count) + log.info("Sending Modal mixdown HTTP request") + + url = f"{self.mixdown_url}/mixdown" + + headers = {} + if self.modal_api_key: + headers["Authorization"] = f"Bearer {self.modal_api_key}" + + # Scale timeout with track count: base TIMEOUT_HEAVY_HTTP + 60s per track beyond 2 + extra_timeout = max(0, (valid_count - 2)) * 60 + timeout = TIMEOUT_HEAVY_HTTP + extra_timeout + + try: + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post( + url, + headers=headers, + json={ + "track_urls": track_urls, + "output_url": output_url, + "target_sample_rate": target_sample_rate, + "offsets_seconds": offsets_seconds, + }, + follow_redirects=True, + ) + + if response.status_code != 200: + error_body = response.text + log.error( + "Modal mixdown API error", + status_code=response.status_code, + error_body=error_body, + ) + + response.raise_for_status() + result = response.json() + + # Check if work was cancelled + if result.get("cancelled"): + log.warning("Modal mixdown was cancelled by disconnect detection") + raise asyncio.CancelledError( + "Mixdown cancelled due to client disconnect" + ) + + log.info("Modal mixdown complete", size=result["size"]) + return MixdownResponse(**result) + except asyncio.CancelledError: + log.warning( + "Modal mixdown cancelled (Hatchet timeout, disconnect detected on Modal side)" + ) + raise + except httpx.TimeoutException as e: + log.error("Modal mixdown timeout", error=str(e), exc_info=True) + raise Exception(f"Modal mixdown timeout: {e}") from e + except httpx.HTTPStatusError as e: + log.error("Modal mixdown HTTP error", error=str(e), exc_info=True) + raise Exception(f"Modal mixdown HTTP error: {e}") from e + except Exception as e: + log.error("Modal mixdown unexpected error", error=str(e), exc_info=True) + raise + + +AudioMixdownAutoProcessor.register("modal", AudioMixdownModalProcessor) diff --git a/server/reflector/processors/audio_mixdown_pyav.py b/server/reflector/processors/audio_mixdown_pyav.py new file mode 100644 index 00000000..05ff44d8 --- /dev/null +++ b/server/reflector/processors/audio_mixdown_pyav.py @@ -0,0 +1,101 @@ +""" +PyAV audio mixdown processor. + +Mixes N tracks in-process using the existing utility from reflector.utils.audio_mixdown. +Writes to a local temp file (does NOT upload to S3 — the pipeline handles upload). +""" + +import os +import tempfile + +from reflector.logger import logger +from reflector.processors.audio_file_writer import AudioFileWriterProcessor +from reflector.processors.audio_mixdown import AudioMixdownProcessor, MixdownResponse +from reflector.processors.audio_mixdown_auto import AudioMixdownAutoProcessor +from reflector.utils.audio_mixdown import ( + detect_sample_rate_from_tracks, + mixdown_tracks_pyav, +) + + +class AudioMixdownPyavProcessor(AudioMixdownProcessor): + """Audio mixdown processor using PyAV (no HTTP backend). + + Writes the mixed output to a local temp file and returns its path + in MixdownResponse.output_path. The caller is responsible for + uploading the file and cleaning it up. + """ + + async def mixdown_tracks( + self, + track_urls: list[str], + output_url: str, + target_sample_rate: int | None = None, + offsets_seconds: list[float] | None = None, + ) -> MixdownResponse: + log = logger.bind(track_count=len(track_urls)) + log.info("Starting local PyAV mixdown") + + valid_urls = [url for url in track_urls if url] + if not valid_urls: + raise ValueError("No valid track URLs provided") + + # Auto-detect sample rate if not provided + if target_sample_rate is None: + target_sample_rate = detect_sample_rate_from_tracks( + valid_urls, logger=logger + ) + if not target_sample_rate: + raise ValueError("No decodable audio frames in any track") + + # Write to temp MP3 file + temp_dir = tempfile.mkdtemp() + output_path = os.path.join(temp_dir, "mixed.mp3") + duration_ms_container = [0.0] + + async def capture_duration(d): + duration_ms_container[0] = d + + writer = AudioFileWriterProcessor( + path=output_path, on_duration=capture_duration + ) + + try: + await mixdown_tracks_pyav( + valid_urls, + writer, + target_sample_rate, + offsets_seconds=offsets_seconds, + logger=logger, + ) + await writer.flush() + + file_size = os.path.getsize(output_path) + log.info( + "Local mixdown complete", + size=file_size, + duration_ms=duration_ms_container[0], + ) + + return MixdownResponse( + size=file_size, + duration_ms=duration_ms_container[0], + output_path=output_path, + ) + + except Exception as e: + # Cleanup on failure + if os.path.exists(output_path): + try: + os.unlink(output_path) + except Exception: + pass + try: + os.rmdir(temp_dir) + except Exception: + pass + log.error("Local mixdown failed", error=str(e), exc_info=True) + raise + + +AudioMixdownAutoProcessor.register("pyav", AudioMixdownPyavProcessor) diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 0e2ab634..5cbbe8c1 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -127,6 +127,14 @@ class Settings(BaseSettings): PADDING_URL: str | None = None PADDING_MODAL_API_KEY: str | None = None + # Audio Mixdown + # backends: + # - pyav: in-process PyAV mixdown (no HTTP, runs in same process) + # - modal: HTTP API client (works with Modal.com OR self-hosted gpu/self_hosted/) + MIXDOWN_BACKEND: str = "pyav" + MIXDOWN_URL: str | None = None + MIXDOWN_MODAL_API_KEY: str | None = None + # Sentry SENTRY_DSN: str | None = None