From 045eae8ff2014a7b83061045e3c8cb25cce9d60a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Diego=20Garc=C3=ADa?= Date: Mon, 2 Mar 2026 11:08:20 -0500 Subject: [PATCH] feat: enable daily co in selfhosted + only schedule tasks when necessary (#883) * feat: enable daily co in selfhosted + only schedule tasks when necessary * feat: refactor aws storage to be platform agnostic + add local pad tracking with slfhosted support --- .gitignore | 1 + docker-compose.selfhosted.yml | 98 ++++++- docsv2/selfhosted-production.md | 122 ++++++++- gpu/self_hosted/app/factory.py | 2 + gpu/self_hosted/app/routers/padding.py | 199 ++++++++++++++ gpu/self_hosted/pyproject.toml | 1 + gpu/self_hosted/uv.lock | 5 +- scripts/setup-selfhosted.sh | 215 ++++++++++++++- server/.env.example | 16 ++ server/.env.selfhosted.example | 36 ++- .../workflows/daily_multitrack_pipeline.py | 23 +- .../hatchet/workflows/padding_workflow.py | 71 ++--- .../hatchet/workflows/track_processing.py | 50 ++-- .../processors/audio_padding_auto.py | 31 +++ .../processors/audio_padding_local.py | 133 ++++++++++ .../processors/audio_padding_modal.py | 4 + .../reflector/services/transcript_process.py | 2 + server/reflector/settings.py | 9 +- server/reflector/storage/__init__.py | 43 +++ server/reflector/worker/app.py | 174 ++++++++---- server/reflector/worker/process.py | 2 + server/tests/test_beat_schedule.py | 247 ++++++++++++++++++ server/tests/test_storage.py | 123 +++++++++ 23 files changed, 1442 insertions(+), 165 deletions(-) create mode 100644 gpu/self_hosted/app/routers/padding.py create mode 100644 server/reflector/processors/audio_padding_auto.py create mode 100644 server/reflector/processors/audio_padding_local.py create mode 100644 server/tests/test_beat_schedule.py diff --git a/.gitignore b/.gitignore index d6532d82..4955cb90 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ server/.env server/.env.production .env Caddyfile +.env.hatchet server/exportdanswer .vercel .env*.local diff --git a/docker-compose.selfhosted.yml b/docker-compose.selfhosted.yml index 26018a46..a301364c 100644 --- a/docker-compose.selfhosted.yml +++ b/docker-compose.selfhosted.yml @@ -11,6 +11,9 @@ # --profile ollama-gpu Local Ollama with NVIDIA GPU # --profile ollama-cpu Local Ollama on CPU only # +# Daily.co multitrack processing (auto-detected from server/.env): +# --profile dailyco Hatchet workflow engine + CPU/LLM workers +# # Other optional services: # --profile garage Local S3-compatible storage (Garage) # --profile caddy Reverse proxy with auto-SSL @@ -32,7 +35,7 @@ services: restart: unless-stopped ports: - "127.0.0.1:1250:1250" - - "50000-50100:50000-50100/udp" + - "51000-51100:51000-51100/udp" env_file: - ./server/.env environment: @@ -42,8 +45,6 @@ services: REDIS_HOST: redis CELERY_BROKER_URL: redis://redis:6379/1 CELERY_RESULT_BACKEND: redis://redis:6379/1 - HATCHET_CLIENT_SERVER_URL: "" - HATCHET_CLIENT_HOST_PORT: "" # Specialized models via gpu/cpu container (aliased as "transcription") TRANSCRIPT_BACKEND: modal TRANSCRIPT_URL: http://transcription:8000 @@ -52,8 +53,10 @@ services: DIARIZATION_URL: http://transcription:8000 TRANSLATION_BACKEND: modal TRANSLATE_URL: http://transcription:8000 + PADDING_BACKEND: modal + PADDING_URL: http://transcription:8000 # WebRTC: fixed UDP port range for ICE candidates (mapped above) - WEBRTC_PORT_RANGE: "50000-50100" + WEBRTC_PORT_RANGE: "51000-51100" depends_on: postgres: condition: service_healthy @@ -76,8 +79,6 @@ services: REDIS_HOST: redis CELERY_BROKER_URL: redis://redis:6379/1 CELERY_RESULT_BACKEND: redis://redis:6379/1 - HATCHET_CLIENT_SERVER_URL: "" - HATCHET_CLIENT_HOST_PORT: "" TRANSCRIPT_BACKEND: modal TRANSCRIPT_URL: http://transcription:8000 TRANSCRIPT_MODAL_API_KEY: selfhosted @@ -85,6 +86,8 @@ services: DIARIZATION_URL: http://transcription:8000 TRANSLATION_BACKEND: modal TRANSLATE_URL: http://transcription:8000 + PADDING_BACKEND: modal + PADDING_URL: http://transcription:8000 depends_on: postgres: condition: service_healthy @@ -153,6 +156,7 @@ services: POSTGRES_DB: reflector volumes: - postgres_data:/var/lib/postgresql/data + - ./server/docker/init-hatchet-db.sql:/docker-entrypoint-initdb.d/init-hatchet-db.sql:ro healthcheck: test: ["CMD-SHELL", "pg_isready -U reflector"] interval: 30s @@ -305,6 +309,87 @@ services: - web - server + # =========================================================== + # Hatchet + Daily.co workers (optional — for Daily.co multitrack processing) + # Auto-enabled when DAILY_API_KEY is configured in server/r + # =========================================================== + + hatchet: + image: ghcr.io/hatchet-dev/hatchet/hatchet-lite:latest + profiles: [dailyco] + restart: on-failure + depends_on: + postgres: + condition: service_healthy + ports: + - "8888:8888" + - "7078:7077" + env_file: + - ./.env.hatchet + environment: + DATABASE_URL: "postgresql://reflector:reflector@postgres:5432/hatchet?sslmode=disable&connect_timeout=30" + SERVER_AUTH_COOKIE_INSECURE: "t" + SERVER_GRPC_BIND_ADDRESS: "0.0.0.0" + SERVER_GRPC_INSECURE: "t" + SERVER_GRPC_BROADCAST_ADDRESS: hatchet:7077 + SERVER_GRPC_PORT: "7077" + SERVER_AUTH_SET_EMAIL_VERIFIED: "t" + SERVER_INTERNAL_CLIENT_INTERNAL_GRPC_BROADCAST_ADDRESS: hatchet:7077 + volumes: + - hatchet_config:/config + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8888/api/live"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + + hatchet-worker-cpu: + build: + context: ./server + dockerfile: Dockerfile + image: monadicalsas/reflector-backend:latest + profiles: [dailyco] + restart: unless-stopped + env_file: + - ./server/.env + environment: + ENTRYPOINT: hatchet-worker-cpu + DATABASE_URL: postgresql+asyncpg://reflector:reflector@postgres:5432/reflector + REDIS_HOST: redis + CELERY_BROKER_URL: redis://redis:6379/1 + CELERY_RESULT_BACKEND: redis://redis:6379/1 + HATCHET_CLIENT_SERVER_URL: http://hatchet:8888 + HATCHET_CLIENT_HOST_PORT: hatchet:7077 + depends_on: + hatchet: + condition: service_healthy + volumes: + - server_data:/app/data + + hatchet-worker-llm: + build: + context: ./server + dockerfile: Dockerfile + image: monadicalsas/reflector-backend:latest + profiles: [dailyco] + restart: unless-stopped + env_file: + - ./server/.env + environment: + ENTRYPOINT: hatchet-worker-llm + DATABASE_URL: postgresql+asyncpg://reflector:reflector@postgres:5432/reflector + REDIS_HOST: redis + CELERY_BROKER_URL: redis://redis:6379/1 + CELERY_RESULT_BACKEND: redis://redis:6379/1 + HATCHET_CLIENT_SERVER_URL: http://hatchet:8888 + HATCHET_CLIENT_HOST_PORT: hatchet:7077 + depends_on: + hatchet: + condition: service_healthy + volumes: + - server_data:/app/data + volumes: postgres_data: redis_data: @@ -315,6 +400,7 @@ volumes: ollama_data: caddy_data: caddy_config: + hatchet_config: networks: default: diff --git a/docsv2/selfhosted-production.md b/docsv2/selfhosted-production.md index bb5385ce..42d3b4d9 100644 --- a/docsv2/selfhosted-production.md +++ b/docsv2/selfhosted-production.md @@ -67,7 +67,7 @@ That's it. The script generates env files, secrets, starts all containers, waits ## Specialized Models (Required) -Pick `--gpu` or `--cpu`. This determines how **transcription, diarization, and translation** run: +Pick `--gpu` or `--cpu`. This determines how **transcription, diarization, translation, and audio padding** run: | Flag | What it does | Requires | |------|-------------|----------| @@ -161,7 +161,8 @@ Without `--caddy` or `--domain`, no ports are exposed. Point your own reverse pr 5. **Storage setup** — Either initializes Garage (bucket, keys, permissions) or prompts for external S3 credentials 6. **Caddyfile** — Generates domain-specific (Let's Encrypt) or IP-specific (self-signed) configuration 7. **Build & start** — Always builds GPU/CPU model image from source. With `--build`, also builds backend and frontend from source; otherwise pulls prebuilt images from the registry -8. **Health checks** — Waits for each service, pulls Ollama model if needed, warns about missing LLM config +8. **Auto-detects video platforms** — If `DAILY_API_KEY` is found in `server/.env`, generates `.env.hatchet` (dashboard URL/cookie config), starts Hatchet workflow engine, and generates an API token. If any video platform is configured, enables the Rooms feature +9. **Health checks** — Waits for each service, pulls Ollama model if needed, warns about missing LLM config > For a deeper dive into each step, see [How the Self-Hosted Setup Works](selfhosted-architecture.md). @@ -180,12 +181,23 @@ Without `--caddy` or `--domain`, no ports are exposed. Point your own reverse pr | `ADMIN_PASSWORD_HASH` | PBKDF2 hash for password auth | *(unset)* | | `WEBRTC_HOST` | IP advertised in WebRTC ICE candidates | Auto-detected (server IP) | | `TRANSCRIPT_URL` | Specialized model endpoint | `http://transcription:8000` | +| `PADDING_BACKEND` | Audio padding backend (`local` or `modal`) | `modal` (selfhosted), `local` (default) | +| `PADDING_URL` | Audio padding endpoint (when `PADDING_BACKEND=modal`) | `http://transcription:8000` | | `LLM_URL` | OpenAI-compatible LLM endpoint | Auto-set for Ollama modes | | `LLM_API_KEY` | LLM API key | `not-needed` for Ollama | | `LLM_MODEL` | LLM model name | `qwen2.5:14b` for Ollama (override with `--llm-model`) | | `CELERY_BEAT_POLL_INTERVAL` | Override all worker polling intervals (seconds). `0` = use individual defaults | `300` (selfhosted), `0` (other) | | `TRANSCRIPT_STORAGE_BACKEND` | Storage backend | `aws` | | `TRANSCRIPT_STORAGE_AWS_*` | S3 credentials | Auto-set for Garage | +| `DAILY_API_KEY` | Daily.co API key (enables live rooms) | *(unset)* | +| `DAILY_SUBDOMAIN` | Daily.co subdomain | *(unset)* | +| `DAILYCO_STORAGE_AWS_ACCESS_KEY_ID` | AWS access key for reading Daily's recording bucket | *(unset)* | +| `DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY` | AWS secret key for reading Daily's recording bucket | *(unset)* | +| `HATCHET_CLIENT_TOKEN` | Hatchet API token (auto-generated) | *(unset)* | +| `HATCHET_CLIENT_SERVER_URL` | Hatchet server URL | Auto-set when Daily.co configured | +| `HATCHET_CLIENT_HOST_PORT` | Hatchet gRPC address | Auto-set when Daily.co configured | +| `TRANSCRIPT_FILE_TIMEOUT` | HTTP timeout (seconds) for file transcription requests | `600` (`3600` in CPU mode) | +| `DIARIZATION_FILE_TIMEOUT` | HTTP timeout (seconds) for file diarization requests | `600` (`3600` in CPU mode) | ### Frontend Environment (`www/.env`) @@ -197,6 +209,7 @@ Without `--caddy` or `--domain`, no ports are exposed. Point your own reverse pr | `NEXTAUTH_SECRET` | Auth secret | Auto-generated | | `FEATURE_REQUIRE_LOGIN` | Require authentication | `false` | | `AUTH_PROVIDER` | Auth provider (`authentik` or `credentials`) | *(unset)* | +| `FEATURE_ROOMS` | Enable meeting rooms UI | Auto-set when video platform configured | ## Storage Options @@ -353,6 +366,87 @@ By default, authentication is disabled (`AUTH_BACKEND=none`, `FEATURE_REQUIRE_LO ``` 5. Restart: `docker compose -f docker-compose.selfhosted.yml down && ./scripts/setup-selfhosted.sh ` +## Enabling Daily.co Live Rooms + +Daily.co enables real-time meeting rooms with automatic recording and per-participant +audio tracks for improved diarization. When configured, the setup script automatically +starts the Hatchet workflow engine for multitrack recording processing. + +### Prerequisites + +- **Daily.co account** — Sign up at https://www.daily.co/ +- **API key** — From Daily.co Dashboard → Developers → API Keys +- **Subdomain** — The `yourname` part of `yourname.daily.co` +- **AWS S3 bucket** — For Daily.co to store recordings. See [Daily.co recording storage docs](https://docs.daily.co/guides/products/live-streaming-recording/storing-recordings-in-a-custom-s3-bucket) +- **IAM role ARN** — An AWS IAM role that Daily.co assumes to write recordings to your bucket + +### Setup + +1. Configure Daily.co env vars in `server/.env` **before** running the setup script: + + ```env + DAILY_API_KEY=your-daily-api-key + DAILY_SUBDOMAIN=your-subdomain + DEFAULT_VIDEO_PLATFORM=daily + DAILYCO_STORAGE_AWS_BUCKET_NAME=your-recordings-bucket + DAILYCO_STORAGE_AWS_REGION=us-east-1 + DAILYCO_STORAGE_AWS_ROLE_ARN=arn:aws:iam::123456789:role/DailyCoAccess + # Worker credentials for reading/deleting recordings from Daily's S3 bucket. + # Required when transcript storage is separate from Daily's bucket + # (e.g., selfhosted with Garage or a different S3 account). + DAILYCO_STORAGE_AWS_ACCESS_KEY_ID=your-aws-access-key + DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY=your-aws-secret-key + ``` + + > **Important:** The `DAILYCO_STORAGE_AWS_ACCESS_KEY_ID` and `SECRET_ACCESS_KEY` are AWS IAM + > credentials that allow the Hatchet workers to **read and delete** recording files from Daily's + > S3 bucket. These are separate from the `ROLE_ARN` (which Daily's API uses to *write* recordings). + > Without these keys, multitrack processing will fail with 404 errors when transcript storage + > (e.g., Garage) uses different credentials than the Daily recording bucket. + +2. Run the setup script as normal: + + ```bash + ./scripts/setup-selfhosted.sh --gpu --ollama-gpu --garage --caddy + ``` + + The script detects `DAILY_API_KEY` and automatically: + - Starts the Hatchet workflow engine (`hatchet` container) + - Starts Hatchet CPU and LLM workers (`hatchet-worker-cpu`, `hatchet-worker-llm`) + - Generates a `HATCHET_CLIENT_TOKEN` and saves it to `server/.env` + - Sets `HATCHET_CLIENT_SERVER_URL` and `HATCHET_CLIENT_HOST_PORT` + - Enables `FEATURE_ROOMS=true` in `www/.env` + - Registers Daily.co beat tasks (recording polling, presence reconciliation) + +3. (Optional) For faster recording discovery, configure a Daily.co webhook: + - In the Daily.co dashboard, add a webhook pointing to `https://your-domain/v1/daily/webhook` + - Set `DAILY_WEBHOOK_SECRET` in `server/.env` (the signing secret from Daily.co) + - Without webhooks, the system polls the Daily.co API every 15 seconds + +### What Gets Started + +| Service | Purpose | +|---------|---------| +| `hatchet` | Workflow orchestration engine (manages multitrack processing pipelines) | +| `hatchet-worker-cpu` | CPU-heavy audio tasks (track mixdown, waveform generation) | +| `hatchet-worker-llm` | Transcription, LLM inference (summaries, topics, titles), orchestration | + +### Hatchet Dashboard + +The Hatchet workflow engine includes a web dashboard for monitoring workflow runs and debugging. The setup script auto-generates `.env.hatchet` at the project root with the dashboard URL and cookie domain configuration. This file is git-ignored. + +- **With Caddy**: Accessible at `https://your-domain:8888` (TLS via Caddy) +- **Without Caddy**: Accessible at `http://your-ip:8888` (direct port mapping) + +### Conditional Beat Tasks + +Beat tasks are registered based on which services are configured: + +- **Whereby tasks** (only if `WHEREBY_API_KEY` or `AWS_PROCESS_RECORDING_QUEUE_URL`): `process_messages`, `reprocess_failed_recordings` +- **Daily.co tasks** (only if `DAILY_API_KEY`): `poll_daily_recordings`, `trigger_daily_reconciliation`, `reprocess_failed_daily_recordings` +- **Platform tasks** (if any video platform configured): `process_meetings`, `sync_all_ics_calendars`, `create_upcoming_meetings` +- **Always registered**: `cleanup_old_public_data` (if `PUBLIC_MODE`), `healthcheck_ping` (if `HEALTHCHECK_URL`) + ## Enabling Real Domain with Let's Encrypt By default, Caddy uses self-signed certificates. For a real domain: @@ -446,6 +540,15 @@ docker compose -f docker-compose.selfhosted.yml logs server --tail 50 For self-signed certs, your browser will warn. Click Advanced > Proceed. For Let's Encrypt, ensure ports 80/443 are open and DNS is pointed correctly. +### File processing timeout on CPU +CPU transcription and diarization are significantly slower than GPU. A 20-minute audio file can take 20-40 minutes to process on CPU. The setup script automatically sets `TRANSCRIPT_FILE_TIMEOUT=3600` and `DIARIZATION_FILE_TIMEOUT=3600` (1 hour) for `--cpu` mode. If you still hit timeouts with very long files, increase these values in `server/.env`: +```bash +# Increase to 2 hours for files over 1 hour +TRANSCRIPT_FILE_TIMEOUT=7200 +DIARIZATION_FILE_TIMEOUT=7200 +``` +Then restart the worker: `docker compose -f docker-compose.selfhosted.yml restart worker` + ### Summaries/topics not generating Check LLM configuration: ```bash @@ -511,12 +614,15 @@ The setup script is idempotent — it won't overwrite existing secrets or env va │ (optional)│ │(optional│ │ :11435 │ │ S3) │ └───────────┘ └─────────┘ + + ┌───────────────────────────────────┐ + │ Hatchet (optional — Daily.co) │ + │ ┌─────────┐ ┌───────────────┐ │ + │ │ hatchet │ │ hatchet-worker│ │ + │ │ :8888 │──│ -cpu / -llm │ │ + │ └─────────┘ └───────────────┘ │ + └───────────────────────────────────┘ ``` -All services communicate over Docker's internal network. Only Caddy (if enabled) exposes ports to the internet. +All services communicate over Docker's internal network. Only Caddy (if enabled) exposes ports to the internet. Hatchet services are only started when `DAILY_API_KEY` is configured. -## Future Plans for the Self-Hosted Script - -The following features are supported by Reflector but are **not yet integrated into the self-hosted setup script** and require manual configuration: - -- **Daily.co live rooms with multitrack processing**: Daily.co enables real-time meeting rooms with automatic recording and per-participant audio tracks for improved diarization. Requires a Daily.co account, API key, and an AWS S3 bucket for recording storage. Currently not automated in the script because the worker orchestration (hatchet) is not yet supported in the selfhosted compose setup. diff --git a/gpu/self_hosted/app/factory.py b/gpu/self_hosted/app/factory.py index 72dadcd7..c13fc753 100644 --- a/gpu/self_hosted/app/factory.py +++ b/gpu/self_hosted/app/factory.py @@ -3,6 +3,7 @@ from contextlib import asynccontextmanager from fastapi import FastAPI from .routers.diarization import router as diarization_router +from .routers.padding import router as padding_router from .routers.transcription import router as transcription_router from .routers.translation import router as translation_router from .services.transcriber import WhisperService @@ -27,4 +28,5 @@ def create_app() -> FastAPI: app.include_router(transcription_router) app.include_router(translation_router) app.include_router(diarization_router) + app.include_router(padding_router) return app diff --git a/gpu/self_hosted/app/routers/padding.py b/gpu/self_hosted/app/routers/padding.py new file mode 100644 index 00000000..dae8c1fd --- /dev/null +++ b/gpu/self_hosted/app/routers/padding.py @@ -0,0 +1,199 @@ +""" +Audio padding endpoint for selfhosted GPU service. + +CPU-intensive audio padding service for adding silence to audio tracks. +Uses PyAV filter graph (adelay) for precise track synchronization. + +IMPORTANT: This padding logic is duplicated from server/reflector/utils/audio_padding.py +for deployment isolation (self_hosted can't import from server/reflector/). If you modify +the PyAV filter graph or padding algorithm, you MUST update both: + - gpu/self_hosted/app/routers/padding.py (this file) + - server/reflector/utils/audio_padding.py + +Constants duplicated from server/reflector/utils/audio_constants.py for same reason. +""" + +import logging +import math +import os +import tempfile +from fractions import Fraction + +import av +import requests +from av.audio.resampler import AudioResampler +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel + +from ..auth import apikey_auth + +logger = logging.getLogger(__name__) + +router = APIRouter(tags=["padding"]) + +# ref B0F71CE8-FC59-4AA5-8414-DAFB836DB711 +OPUS_STANDARD_SAMPLE_RATE = 48000 +OPUS_DEFAULT_BIT_RATE = 128000 + +S3_TIMEOUT = 60 + + +class PaddingRequest(BaseModel): + track_url: str + output_url: str + start_time_seconds: float + track_index: int + + +class PaddingResponse(BaseModel): + size: int + cancelled: bool = False + + +@router.post("/pad", dependencies=[Depends(apikey_auth)], response_model=PaddingResponse) +def pad_track(req: PaddingRequest): + """Pad audio track with silence using PyAV adelay filter graph.""" + if not req.track_url: + raise HTTPException(status_code=400, detail="track_url cannot be empty") + if not req.output_url: + raise HTTPException(status_code=400, detail="output_url cannot be empty") + if req.start_time_seconds <= 0: + raise HTTPException( + status_code=400, + detail=f"start_time_seconds must be positive, got {req.start_time_seconds}", + ) + if req.start_time_seconds > 18000: + raise HTTPException( + status_code=400, + detail="start_time_seconds exceeds maximum 18000s (5 hours)", + ) + + logger.info( + "Padding request: track %d, delay=%.3fs", req.track_index, req.start_time_seconds + ) + + temp_dir = tempfile.mkdtemp() + input_path = None + output_path = None + + try: + # Download source audio + logger.info("Downloading track for padding") + response = requests.get(req.track_url, stream=True, timeout=S3_TIMEOUT) + response.raise_for_status() + + input_path = os.path.join(temp_dir, "track.webm") + total_bytes = 0 + with open(input_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + total_bytes += len(chunk) + logger.info("Track downloaded: %d bytes", total_bytes) + + # Apply padding using PyAV + output_path = os.path.join(temp_dir, "padded.webm") + delay_ms = math.floor(req.start_time_seconds * 1000) + logger.info("Padding track %d with %dms delay using PyAV", req.track_index, delay_ms) + + in_container = av.open(input_path) + in_stream = next((s for s in in_container.streams if s.type == "audio"), None) + if in_stream is None: + in_container.close() + raise HTTPException(status_code=400, detail="No audio stream in input") + + with av.open(output_path, "w", format="webm") as out_container: + out_stream = out_container.add_stream("libopus", rate=OPUS_STANDARD_SAMPLE_RATE) + out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE + graph = av.filter.Graph() + + abuf_args = ( + f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:" + f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:" + f"sample_fmt=s16:" + f"channel_layout=stereo" + ) + src = graph.add("abuffer", args=abuf_args, name="src") + aresample_f = graph.add("aresample", args="async=1", name="ares") + delays_arg = f"{delay_ms}|{delay_ms}" + adelay_f = graph.add( + "adelay", args=f"delays={delays_arg}:all=1", name="delay" + ) + sink = graph.add("abuffersink", name="sink") + + src.link_to(aresample_f) + aresample_f.link_to(adelay_f) + adelay_f.link_to(sink) + graph.configure() + + resampler = AudioResampler( + format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE + ) + + for frame in in_container.decode(in_stream): + out_frames = resampler.resample(frame) or [] + for rframe in out_frames: + rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE + rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + src.push(rframe) + + while True: + try: + f_out = sink.pull() + except Exception: + break + f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE + f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + for packet in out_stream.encode(f_out): + out_container.mux(packet) + + # Flush filter graph + src.push(None) + while True: + try: + f_out = sink.pull() + except Exception: + break + f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE + f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + for packet in out_stream.encode(f_out): + out_container.mux(packet) + + # Flush encoder + for packet in out_stream.encode(None): + out_container.mux(packet) + + in_container.close() + + file_size = os.path.getsize(output_path) + logger.info("Padding complete: %d bytes", file_size) + + # Upload padded track + logger.info("Uploading padded track to S3") + with open(output_path, "rb") as f: + upload_response = requests.put(req.output_url, data=f, timeout=S3_TIMEOUT) + upload_response.raise_for_status() + logger.info("Upload complete: %d bytes", file_size) + + return PaddingResponse(size=file_size) + + except HTTPException: + raise + except Exception as e: + logger.error("Padding failed for track %d: %s", req.track_index, e, exc_info=True) + raise HTTPException(status_code=500, detail=f"Padding failed: {e}") from e + finally: + if input_path and os.path.exists(input_path): + try: + os.unlink(input_path) + except Exception as e: + logger.warning("Failed to cleanup input file: %s", e) + if output_path and os.path.exists(output_path): + try: + os.unlink(output_path) + except Exception as e: + logger.warning("Failed to cleanup output file: %s", e) + try: + os.rmdir(temp_dir) + except Exception as e: + logger.warning("Failed to cleanup temp directory: %s", e) diff --git a/gpu/self_hosted/pyproject.toml b/gpu/self_hosted/pyproject.toml index 7cd3007d..48237c66 100644 --- a/gpu/self_hosted/pyproject.toml +++ b/gpu/self_hosted/pyproject.toml @@ -16,4 +16,5 @@ dependencies = [ "sentencepiece", "pyannote.audio==3.1.0", "torchaudio>=2.3.0", + "av>=13.1.0", ] diff --git a/gpu/self_hosted/uv.lock b/gpu/self_hosted/uv.lock index eb0ebffc..f3fdbd60 100644 --- a/gpu/self_hosted/uv.lock +++ b/gpu/self_hosted/uv.lock @@ -726,7 +726,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/44/69/9b804adb5fd0671f367781560eb5eb586c4d495277c93bde4307b9e28068/greenlet-3.2.4-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:3b67ca49f54cede0186854a008109d6ee71f66bd57bb36abd6d0a0267b540cdd", size = 274079, upload-time = "2025-08-07T13:15:45.033Z" }, { url = "https://files.pythonhosted.org/packages/46/e9/d2a80c99f19a153eff70bc451ab78615583b8dac0754cfb942223d2c1a0d/greenlet-3.2.4-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:ddf9164e7a5b08e9d22511526865780a576f19ddd00d62f8a665949327fde8bb", size = 640997, upload-time = "2025-08-07T13:42:56.234Z" }, { url = "https://files.pythonhosted.org/packages/3b/16/035dcfcc48715ccd345f3a93183267167cdd162ad123cd93067d86f27ce4/greenlet-3.2.4-cp312-cp312-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:f28588772bb5fb869a8eb331374ec06f24a83a9c25bfa1f38b6993afe9c1e968", size = 655185, upload-time = "2025-08-07T13:45:27.624Z" }, - { url = "https://files.pythonhosted.org/packages/31/da/0386695eef69ffae1ad726881571dfe28b41970173947e7c558d9998de0f/greenlet-3.2.4-cp312-cp312-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:5c9320971821a7cb77cfab8d956fa8e39cd07ca44b6070db358ceb7f8797c8c9", size = 649926, upload-time = "2025-08-07T13:53:15.251Z" }, { url = "https://files.pythonhosted.org/packages/68/88/69bf19fd4dc19981928ceacbc5fd4bb6bc2215d53199e367832e98d1d8fe/greenlet-3.2.4-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:c60a6d84229b271d44b70fb6e5fa23781abb5d742af7b808ae3f6efd7c9c60f6", size = 651839, upload-time = "2025-08-07T13:18:30.281Z" }, { url = "https://files.pythonhosted.org/packages/19/0d/6660d55f7373b2ff8152401a83e02084956da23ae58cddbfb0b330978fe9/greenlet-3.2.4-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3b3812d8d0c9579967815af437d96623f45c0f2ae5f04e366de62a12d83a8fb0", size = 607586, upload-time = "2025-08-07T13:18:28.544Z" }, { url = "https://files.pythonhosted.org/packages/8e/1a/c953fdedd22d81ee4629afbb38d2f9d71e37d23caace44775a3a969147d4/greenlet-3.2.4-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:abbf57b5a870d30c4675928c37278493044d7c14378350b3aa5d484fa65575f0", size = 1123281, upload-time = "2025-08-07T13:42:39.858Z" }, @@ -737,7 +736,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/49/e8/58c7f85958bda41dafea50497cbd59738c5c43dbbea5ee83d651234398f4/greenlet-3.2.4-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:1a921e542453fe531144e91e1feedf12e07351b1cf6c9e8a3325ea600a715a31", size = 272814, upload-time = "2025-08-07T13:15:50.011Z" }, { url = "https://files.pythonhosted.org/packages/62/dd/b9f59862e9e257a16e4e610480cfffd29e3fae018a68c2332090b53aac3d/greenlet-3.2.4-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:cd3c8e693bff0fff6ba55f140bf390fa92c994083f838fece0f63be121334945", size = 641073, upload-time = "2025-08-07T13:42:57.23Z" }, { url = "https://files.pythonhosted.org/packages/f7/0b/bc13f787394920b23073ca3b6c4a7a21396301ed75a655bcb47196b50e6e/greenlet-3.2.4-cp313-cp313-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:710638eb93b1fa52823aa91bf75326f9ecdfd5e0466f00789246a5280f4ba0fc", size = 655191, upload-time = "2025-08-07T13:45:29.752Z" }, - { url = "https://files.pythonhosted.org/packages/f2/d6/6adde57d1345a8d0f14d31e4ab9c23cfe8e2cd39c3baf7674b4b0338d266/greenlet-3.2.4-cp313-cp313-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:c5111ccdc9c88f423426df3fd1811bfc40ed66264d35aa373420a34377efc98a", size = 649516, upload-time = "2025-08-07T13:53:16.314Z" }, { url = "https://files.pythonhosted.org/packages/7f/3b/3a3328a788d4a473889a2d403199932be55b1b0060f4ddd96ee7cdfcad10/greenlet-3.2.4-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:d76383238584e9711e20ebe14db6c88ddcedc1829a9ad31a584389463b5aa504", size = 652169, upload-time = "2025-08-07T13:18:32.861Z" }, { url = "https://files.pythonhosted.org/packages/ee/43/3cecdc0349359e1a527cbf2e3e28e5f8f06d3343aaf82ca13437a9aa290f/greenlet-3.2.4-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:23768528f2911bcd7e475210822ffb5254ed10d71f4028387e5a99b4c6699671", size = 610497, upload-time = "2025-08-07T13:18:31.636Z" }, { url = "https://files.pythonhosted.org/packages/b8/19/06b6cf5d604e2c382a6f31cafafd6f33d5dea706f4db7bdab184bad2b21d/greenlet-3.2.4-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:00fadb3fedccc447f517ee0d3fd8fe49eae949e1cd0f6a611818f4f6fb7dc83b", size = 1121662, upload-time = "2025-08-07T13:42:41.117Z" }, @@ -748,7 +746,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/22/5c/85273fd7cc388285632b0498dbbab97596e04b154933dfe0f3e68156c68c/greenlet-3.2.4-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:49a30d5fda2507ae77be16479bdb62a660fa51b1eb4928b524975b3bde77b3c0", size = 273586, upload-time = "2025-08-07T13:16:08.004Z" }, { url = "https://files.pythonhosted.org/packages/d1/75/10aeeaa3da9332c2e761e4c50d4c3556c21113ee3f0afa2cf5769946f7a3/greenlet-3.2.4-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:299fd615cd8fc86267b47597123e3f43ad79c9d8a22bebdce535e53550763e2f", size = 686346, upload-time = "2025-08-07T13:42:59.944Z" }, { url = "https://files.pythonhosted.org/packages/c0/aa/687d6b12ffb505a4447567d1f3abea23bd20e73a5bed63871178e0831b7a/greenlet-3.2.4-cp314-cp314-manylinux2014_ppc64le.manylinux_2_17_ppc64le.whl", hash = "sha256:c17b6b34111ea72fc5a4e4beec9711d2226285f0386ea83477cbb97c30a3f3a5", size = 699218, upload-time = "2025-08-07T13:45:30.969Z" }, - { url = "https://files.pythonhosted.org/packages/dc/8b/29aae55436521f1d6f8ff4e12fb676f3400de7fcf27fccd1d4d17fd8fecd/greenlet-3.2.4-cp314-cp314-manylinux2014_s390x.manylinux_2_17_s390x.whl", hash = "sha256:b4a1870c51720687af7fa3e7cda6d08d801dae660f75a76f3845b642b4da6ee1", size = 694659, upload-time = "2025-08-07T13:53:17.759Z" }, { url = "https://files.pythonhosted.org/packages/92/2e/ea25914b1ebfde93b6fc4ff46d6864564fba59024e928bdc7de475affc25/greenlet-3.2.4-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:061dc4cf2c34852b052a8620d40f36324554bc192be474b9e9770e8c042fd735", size = 695355, upload-time = "2025-08-07T13:18:34.517Z" }, { url = "https://files.pythonhosted.org/packages/72/60/fc56c62046ec17f6b0d3060564562c64c862948c9d4bc8aa807cf5bd74f4/greenlet-3.2.4-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:44358b9bf66c8576a9f57a590d5f5d6e72fa4228b763d0e43fee6d3b06d3a337", size = 657512, upload-time = "2025-08-07T13:18:33.969Z" }, { url = "https://files.pythonhosted.org/packages/23/6e/74407aed965a4ab6ddd93a7ded3180b730d281c77b765788419484cdfeef/greenlet-3.2.4-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:2917bdf657f5859fbf3386b12d68ede4cf1f04c90c3a6bc1f013dd68a22e2269", size = 1612508, upload-time = "2025-11-04T12:42:23.427Z" }, @@ -2072,6 +2069,7 @@ name = "reflector-gpu" version = "0.1.0" source = { virtual = "." } dependencies = [ + { name = "av" }, { name = "fastapi", extra = ["standard"] }, { name = "faster-whisper" }, { name = "librosa" }, @@ -2087,6 +2085,7 @@ dependencies = [ [package.metadata] requires-dist = [ + { name = "av", specifier = ">=13.1.0" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.116.1" }, { name = "faster-whisper", specifier = ">=1.1.0" }, { name = "librosa", specifier = "==0.10.1" }, diff --git a/scripts/setup-selfhosted.sh b/scripts/setup-selfhosted.sh index 2fba2b65..6eee6a38 100755 --- a/scripts/setup-selfhosted.sh +++ b/scripts/setup-selfhosted.sh @@ -34,6 +34,10 @@ # ./scripts/setup-selfhosted.sh --gpu --garage --caddy # ./scripts/setup-selfhosted.sh --cpu # +# The script auto-detects Daily.co (DAILY_API_KEY) and Whereby (WHEREBY_API_KEY) +# from server/.env. If Daily.co is configured, Hatchet workflow services are +# started automatically for multitrack recording processing. +# # Idempotent — safe to re-run at any time. # set -euo pipefail @@ -427,6 +431,8 @@ step_server_env() { env_set "$SERVER_ENV" "DIARIZATION_URL" "http://transcription:8000" env_set "$SERVER_ENV" "TRANSLATION_BACKEND" "modal" env_set "$SERVER_ENV" "TRANSLATE_URL" "http://transcription:8000" + env_set "$SERVER_ENV" "PADDING_BACKEND" "modal" + env_set "$SERVER_ENV" "PADDING_URL" "http://transcription:8000" # HuggingFace token for gated models (pyannote diarization) # Written to root .env so docker compose picks it up for gpu/cpu containers @@ -466,7 +472,7 @@ step_server_env() { if env_has_key "$SERVER_ENV" "LLM_URL"; then current_llm_url=$(env_get "$SERVER_ENV" "LLM_URL") fi - if [[ -z "$current_llm_url" ]] || [[ "$current_llm_url" == "http://host.docker.internal"* ]]; then + if [[ -z "$current_llm_url" ]]; then warn "LLM not configured. Summarization and topic detection will NOT work." warn "Edit server/.env and set LLM_URL, LLM_API_KEY, LLM_MODEL" warn "Example: LLM_URL=https://api.openai.com/v1 LLM_MODEL=gpt-4o-mini" @@ -475,6 +481,20 @@ step_server_env() { fi fi + # CPU mode: increase file processing timeouts (default 600s is too short for long audio on CPU) + if [[ "$MODEL_MODE" == "cpu" ]]; then + env_set "$SERVER_ENV" "TRANSCRIPT_FILE_TIMEOUT" "3600" + env_set "$SERVER_ENV" "DIARIZATION_FILE_TIMEOUT" "3600" + ok "CPU mode — file processing timeouts set to 3600s (1 hour)" + fi + + # If Daily.co is manually configured, ensure Hatchet connectivity vars are set + if env_has_key "$SERVER_ENV" "DAILY_API_KEY" && [[ -n "$(env_get "$SERVER_ENV" "DAILY_API_KEY")" ]]; then + env_set "$SERVER_ENV" "HATCHET_CLIENT_SERVER_URL" "http://hatchet:8888" + env_set "$SERVER_ENV" "HATCHET_CLIENT_HOST_PORT" "hatchet:7077" + ok "Daily.co detected — Hatchet connectivity configured" + fi + ok "server/.env ready" } @@ -535,6 +555,19 @@ step_www_env() { fi fi + # Enable rooms if any video platform is configured in server/.env + local _daily_key="" _whereby_key="" + if env_has_key "$SERVER_ENV" "DAILY_API_KEY"; then + _daily_key=$(env_get "$SERVER_ENV" "DAILY_API_KEY") + fi + if env_has_key "$SERVER_ENV" "WHEREBY_API_KEY"; then + _whereby_key=$(env_get "$SERVER_ENV" "WHEREBY_API_KEY") + fi + if [[ -n "$_daily_key" ]] || [[ -n "$_whereby_key" ]]; then + env_set "$WWW_ENV" "FEATURE_ROOMS" "true" + ok "Rooms feature enabled (video platform configured)" + fi + ok "www/.env ready (URL=$base_url)" } @@ -739,6 +772,23 @@ CADDYEOF else ok "Caddyfile already exists" fi + + # Add Hatchet dashboard route if Daily.co is detected + if [[ "$DAILY_DETECTED" == "true" ]]; then + if ! grep -q "hatchet" "$caddyfile" 2>/dev/null; then + cat >> "$caddyfile" << CADDYEOF + +# Hatchet workflow dashboard (Daily.co multitrack processing) +:8888 { + tls internal + reverse_proxy hatchet:8888 +} +CADDYEOF + ok "Added Hatchet dashboard route to Caddyfile (port 8888)" + else + ok "Hatchet dashboard route already in Caddyfile" + fi + fi } # ========================================================= @@ -766,6 +816,37 @@ step_services() { compose_cmd pull server web || warn "Pull failed — using cached images" fi + # Build hatchet workers if Daily.co is configured (same backend image) + if [[ "$DAILY_DETECTED" == "true" ]] && [[ "$BUILD_IMAGES" == "true" ]]; then + info "Building Hatchet worker images..." + compose_cmd build hatchet-worker-cpu hatchet-worker-llm + ok "Hatchet worker images built" + fi + + # Ensure hatchet database exists before starting hatchet (init-hatchet-db.sql only runs on fresh postgres volumes) + if [[ "$DAILY_DETECTED" == "true" ]]; then + info "Ensuring postgres is running for Hatchet database setup..." + compose_cmd up -d postgres + local pg_ready=false + for i in $(seq 1 30); do + if compose_cmd exec -T postgres pg_isready -U reflector > /dev/null 2>&1; then + pg_ready=true + break + fi + sleep 2 + done + if [[ "$pg_ready" == "true" ]]; then + compose_cmd exec -T postgres psql -U reflector -tc \ + "SELECT 1 FROM pg_database WHERE datname = 'hatchet'" 2>/dev/null \ + | grep -q 1 \ + || compose_cmd exec -T postgres psql -U reflector -c "CREATE DATABASE hatchet" 2>/dev/null \ + || true + ok "Hatchet database ready" + else + warn "Postgres not ready — hatchet database may need to be created manually" + fi + fi + # Start all services compose_cmd up -d ok "Containers started" @@ -894,6 +975,26 @@ step_health() { fi fi + # Hatchet (if Daily.co detected) + if [[ "$DAILY_DETECTED" == "true" ]]; then + info "Waiting for Hatchet workflow engine..." + local hatchet_ok=false + for i in $(seq 1 60); do + if curl -sf http://localhost:8888/api/live > /dev/null 2>&1; then + hatchet_ok=true + break + fi + echo -ne "\r Waiting for Hatchet... ($i/60)" + sleep 3 + done + echo "" + if [[ "$hatchet_ok" == "true" ]]; then + ok "Hatchet workflow engine healthy" + else + warn "Hatchet not ready yet. Check: docker compose logs hatchet" + fi + fi + # LLM warning for non-Ollama modes if [[ "$USES_OLLAMA" == "false" ]]; then local llm_url="" @@ -911,6 +1012,71 @@ step_health() { fi } +# ========================================================= +# Step 8: Hatchet token generation (Daily.co only) +# ========================================================= +step_hatchet_token() { + if [[ "$DAILY_DETECTED" != "true" ]]; then + return + fi + + # Skip if token already set + if env_has_key "$SERVER_ENV" "HATCHET_CLIENT_TOKEN" && [[ -n "$(env_get "$SERVER_ENV" "HATCHET_CLIENT_TOKEN")" ]]; then + ok "HATCHET_CLIENT_TOKEN already set — skipping generation" + return + fi + + info "Step 8: Generating Hatchet API token" + + # Wait for hatchet to be healthy + local hatchet_ok=false + for i in $(seq 1 60); do + if curl -sf http://localhost:8888/api/live > /dev/null 2>&1; then + hatchet_ok=true + break + fi + echo -ne "\r Waiting for Hatchet API... ($i/60)" + sleep 3 + done + echo "" + + if [[ "$hatchet_ok" != "true" ]]; then + err "Hatchet not responding — cannot generate token" + err "Check: docker compose logs hatchet" + return + fi + + # Get tenant ID from hatchet database + local tenant_id + tenant_id=$(compose_cmd exec -T postgres psql -U reflector -d hatchet -t -c \ + "SELECT id FROM \"Tenant\" WHERE slug = 'default';" 2>/dev/null | tr -d ' \n') + + if [[ -z "$tenant_id" ]]; then + err "Could not find default tenant in Hatchet database" + err "Hatchet may still be initializing. Try re-running the script." + return + fi + + # Generate token via hatchet-admin + local token + token=$(compose_cmd exec -T hatchet /hatchet-admin token create \ + --config /config --tenant-id "$tenant_id" 2>/dev/null | tr -d '\n') + + if [[ -z "$token" ]]; then + err "Failed to generate Hatchet token" + err "Try generating manually: see server/README.md" + return + fi + + env_set "$SERVER_ENV" "HATCHET_CLIENT_TOKEN" "$token" + ok "HATCHET_CLIENT_TOKEN generated and saved to server/.env" + + # Restart services that need the token + info "Restarting services with new Hatchet token..." + compose_cmd restart server worker hatchet-worker-cpu hatchet-worker-llm + ok "Services restarted with Hatchet token" +} + # ========================================================= # Main # ========================================================= @@ -957,6 +1123,48 @@ main() { echo "" step_server_env echo "" + + # Auto-detect video platforms from server/.env (after step_server_env so file exists) + DAILY_DETECTED=false + WHEREBY_DETECTED=false + if env_has_key "$SERVER_ENV" "DAILY_API_KEY" && [[ -n "$(env_get "$SERVER_ENV" "DAILY_API_KEY")" ]]; then + DAILY_DETECTED=true + fi + if env_has_key "$SERVER_ENV" "WHEREBY_API_KEY" && [[ -n "$(env_get "$SERVER_ENV" "WHEREBY_API_KEY")" ]]; then + WHEREBY_DETECTED=true + fi + ANY_PLATFORM_DETECTED=false + [[ "$DAILY_DETECTED" == "true" || "$WHEREBY_DETECTED" == "true" ]] && ANY_PLATFORM_DETECTED=true + + # Conditional profile activation for Daily.co + if [[ "$DAILY_DETECTED" == "true" ]]; then + COMPOSE_PROFILES+=("dailyco") + ok "Daily.co detected — enabling Hatchet workflow services" + fi + + # Generate .env.hatchet for hatchet dashboard config + if [[ "$DAILY_DETECTED" == "true" ]]; then + local hatchet_server_url hatchet_cookie_domain + if [[ -n "$CUSTOM_DOMAIN" ]]; then + hatchet_server_url="https://${CUSTOM_DOMAIN}:8888" + hatchet_cookie_domain="$CUSTOM_DOMAIN" + elif [[ -n "$PRIMARY_IP" ]]; then + hatchet_server_url="http://${PRIMARY_IP}:8888" + hatchet_cookie_domain="$PRIMARY_IP" + else + hatchet_server_url="http://localhost:8888" + hatchet_cookie_domain="localhost" + fi + cat > "$ROOT_DIR/.env.hatchet" << EOF +SERVER_URL=$hatchet_server_url +SERVER_AUTH_COOKIE_DOMAIN=$hatchet_cookie_domain +EOF + ok "Generated .env.hatchet (dashboard URL=$hatchet_server_url)" + else + # Create empty .env.hatchet so compose doesn't fail if dailyco profile is ever activated manually + touch "$ROOT_DIR/.env.hatchet" + fi + step_www_env echo "" step_storage @@ -966,6 +1174,8 @@ main() { step_services echo "" step_health + echo "" + step_hatchet_token echo "" echo "==========================================" @@ -995,6 +1205,9 @@ main() { [[ "$USE_GARAGE" != "true" ]] && echo " Storage: External S3" [[ "$USES_OLLAMA" == "true" ]] && echo " LLM: Ollama ($OLLAMA_MODEL) for summarization/topics" [[ "$USES_OLLAMA" != "true" ]] && echo " LLM: External (configure in server/.env)" + [[ "$DAILY_DETECTED" == "true" ]] && echo " Video: Daily.co (live rooms + multitrack processing via Hatchet)" + [[ "$WHEREBY_DETECTED" == "true" ]] && echo " Video: Whereby (live rooms)" + [[ "$ANY_PLATFORM_DETECTED" != "true" ]] && echo " Video: None (rooms disabled)" echo "" echo " To stop: docker compose -f docker-compose.selfhosted.yml down" echo " To re-run: ./scripts/setup-selfhosted.sh $*" diff --git a/server/.env.example b/server/.env.example index 5ef8c489..08b9b392 100644 --- a/server/.env.example +++ b/server/.env.example @@ -86,6 +86,18 @@ LLM_API_KEY=not-needed ## Context size for summary generation (tokens) LLM_CONTEXT_WINDOW=16000 +## ======================================================= +## Audio Padding +## +## backends: local (in-process PyAV), modal (HTTP API client) +## Default is "local" — no external service needed. +## Set to "modal" when using Modal.com or self-hosted gpu/self_hosted/ container. +## ======================================================= +#PADDING_BACKEND=local +#PADDING_BACKEND=modal +#PADDING_URL=https://xxxxx--reflector-padding-web.modal.run +#PADDING_MODAL_API_KEY=xxxxx + ## ======================================================= ## Diarization ## @@ -137,6 +149,10 @@ TRANSCRIPT_STORAGE_AWS_REGION=us-east-1 #DAILYCO_STORAGE_AWS_ROLE_ARN=... # IAM role ARN for Daily.co S3 access #DAILYCO_STORAGE_AWS_BUCKET_NAME=reflector-dailyco #DAILYCO_STORAGE_AWS_REGION=us-west-2 +# Worker credentials for reading/deleting from Daily's recording bucket +# Required when transcript storage is separate from Daily's bucket (e.g., selfhosted with Garage) +#DAILYCO_STORAGE_AWS_ACCESS_KEY_ID=your-aws-access-key +#DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY=your-aws-secret-key ## Whereby (optional separate bucket) #WHEREBY_STORAGE_AWS_BUCKET_NAME=reflector-whereby diff --git a/server/.env.selfhosted.example b/server/.env.selfhosted.example index 96c520fc..eb96599f 100644 --- a/server/.env.selfhosted.example +++ b/server/.env.selfhosted.example @@ -47,6 +47,9 @@ DIARIZATION_URL=http://transcription:8000 TRANSLATION_BACKEND=modal TRANSLATE_URL=http://transcription:8000 +PADDING_BACKEND=modal +PADDING_URL=http://transcription:8000 + # HuggingFace token — optional, for gated models (e.g. pyannote). # Falls back to public S3 model bundle if not set. # HF_TOKEN=hf_xxxxx @@ -93,15 +96,42 @@ TRANSCRIPT_STORAGE_AWS_REGION=us-east-1 # ======================================================= # Daily.co Live Rooms (Optional) # Enable real-time meeting rooms with Daily.co integration. -# Requires a Daily.co account: https://www.daily.co/ +# Configure these BEFORE running setup-selfhosted.sh and the +# script will auto-detect and start Hatchet workflow services. +# +# Prerequisites: +# 1. Daily.co account: https://www.daily.co/ +# 2. API key: Dashboard → Developers → API Keys +# 3. S3 bucket for recordings: https://docs.daily.co/guides/products/live-streaming-recording/storing-recordings-in-a-custom-s3-bucket +# 4. IAM role ARN for Daily.co to write recordings to your bucket +# +# After configuring, run: ./scripts/setup-selfhosted.sh +# The script will detect DAILY_API_KEY and automatically: +# - Start Hatchet workflow engine + CPU/LLM workers +# - Generate a Hatchet API token +# - Enable FEATURE_ROOMS in the frontend # ======================================================= -# DEFAULT_VIDEO_PLATFORM=daily # DAILY_API_KEY=your-daily-api-key # DAILY_SUBDOMAIN=your-subdomain -# DAILY_WEBHOOK_SECRET=your-daily-webhook-secret +# DEFAULT_VIDEO_PLATFORM=daily # DAILYCO_STORAGE_AWS_BUCKET_NAME=reflector-dailyco # DAILYCO_STORAGE_AWS_REGION=us-east-1 # DAILYCO_STORAGE_AWS_ROLE_ARN=arn:aws:iam::role/DailyCoAccess +# Worker credentials for reading/deleting from Daily's recording bucket +# Required when transcript storage is separate from Daily's bucket (e.g., selfhosted with Garage) +# DAILYCO_STORAGE_AWS_ACCESS_KEY_ID=your-aws-access-key +# DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY=your-aws-secret-key +# DAILY_WEBHOOK_SECRET=your-daily-webhook-secret # optional, for faster recording discovery + +# ======================================================= +# Hatchet Workflow Engine (Auto-configured for Daily.co) +# Required for Daily.co multitrack recording processing. +# The setup script generates HATCHET_CLIENT_TOKEN automatically. +# Do not set these manually unless you know what you're doing. +# ======================================================= +# HATCHET_CLIENT_TOKEN= +# HATCHET_CLIENT_SERVER_URL=http://hatchet:8888 +# HATCHET_CLIENT_HOST_PORT=hatchet:7077 # ======================================================= # Feature Flags diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index aead2a35..3fa725b6 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -90,7 +90,6 @@ from reflector.processors.summary.summary_builder import SummaryBuilder from reflector.processors.types import TitleSummary, Word from reflector.processors.types import Transcript as TranscriptType from reflector.settings import settings -from reflector.storage.storage_aws import AwsStorage from reflector.utils.audio_constants import ( PRESIGNED_URL_EXPIRATION_SECONDS, WAVEFORM_SEGMENTS, @@ -117,6 +116,7 @@ class PipelineInput(BaseModel): bucket_name: NonEmptyString transcript_id: NonEmptyString room_id: NonEmptyString | None = None + source_platform: str = "daily" hatchet = HatchetClientManager.get_client() @@ -170,15 +170,10 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool: def _spawn_storage(): - """Create fresh storage instance.""" - # TODO: replace direct AwsStorage construction with get_transcripts_storage() factory - return AwsStorage( - aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME, - aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION, - aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID, - aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY, - aws_endpoint_url=settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL, - ) + """Create fresh storage instance for writing to our transcript bucket.""" + from reflector.storage import get_transcripts_storage # noqa: PLC0415 + + return get_transcripts_storage() class Loggable(Protocol): @@ -434,6 +429,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes bucket_name=input.bucket_name, transcript_id=input.transcript_id, language=source_language, + source_platform=input.source_platform, ) ) for i, track in enumerate(input.tracks) @@ -1195,7 +1191,10 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult: ) from reflector.db.recordings import recordings_controller # noqa: PLC0415 from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 - from reflector.storage import get_transcripts_storage # noqa: PLC0415 + from reflector.storage import ( # noqa: PLC0415 + get_source_storage, + get_transcripts_storage, + ) transcript = await transcripts_controller.get_by_id(input.transcript_id) if not transcript: @@ -1245,7 +1244,7 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult: deletion_errors = [] if input_track_keys and input.bucket_name: - master_storage = get_transcripts_storage() + master_storage = get_source_storage(input.source_platform) for key in input_track_keys: try: await master_storage.delete_file(key, bucket=input.bucket_name) diff --git a/server/reflector/hatchet/workflows/padding_workflow.py b/server/reflector/hatchet/workflows/padding_workflow.py index a75a15a3..0e0056ed 100644 --- a/server/reflector/hatchet/workflows/padding_workflow.py +++ b/server/reflector/hatchet/workflows/padding_workflow.py @@ -24,6 +24,7 @@ class PaddingInput(BaseModel): s3_key: str bucket_name: str transcript_id: str + source_platform: str = "daily" hatchet = HatchetClientManager.get_client() @@ -45,20 +46,14 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult: ) try: - # Create fresh storage instance to avoid aioboto3 fork issues - from reflector.settings import settings # noqa: PLC0415 - from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415 - - # TODO: replace direct AwsStorage construction with get_transcripts_storage() factory - storage = AwsStorage( - aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME, - aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION, - aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID, - aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY, - aws_endpoint_url=settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL, + from reflector.storage import ( # noqa: PLC0415 + get_source_storage, + get_transcripts_storage, ) - source_url = await storage.get_file_url( + # Source reads: use platform-specific credentials + source_storage = get_source_storage(input.source_platform) + source_url = await source_storage.get_file_url( input.s3_key, operation="get_object", expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, @@ -96,52 +91,28 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult: storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm" - # Presign PUT URL for output (Modal will upload directly) - output_url = await storage.get_file_url( + # Output writes: use transcript storage (our own bucket) + output_storage = get_transcripts_storage() + output_url = await output_storage.get_file_url( storage_path, operation="put_object", expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, ) - import httpx # noqa: PLC0415 - - from reflector.processors.audio_padding_modal import ( # noqa: PLC0415 - AudioPaddingModalProcessor, + from reflector.processors.audio_padding_auto import ( # noqa: PLC0415 + AudioPaddingAutoProcessor, ) - try: - processor = AudioPaddingModalProcessor() - result = await processor.pad_track( - track_url=source_url, - output_url=output_url, - start_time_seconds=start_time_seconds, - track_index=input.track_index, - ) - file_size = result.size + processor = AudioPaddingAutoProcessor() + result = await processor.pad_track( + track_url=source_url, + output_url=output_url, + start_time_seconds=start_time_seconds, + track_index=input.track_index, + ) + file_size = result.size - ctx.log(f"pad_track: Modal returned size={file_size}") - except httpx.HTTPStatusError as e: - error_detail = e.response.text if hasattr(e.response, "text") else str(e) - logger.error( - "[Hatchet] Modal padding HTTP error", - transcript_id=input.transcript_id, - track_index=input.track_index, - status_code=e.response.status_code if hasattr(e, "response") else None, - error=error_detail, - exc_info=True, - ) - raise Exception( - f"Modal padding failed: HTTP {e.response.status_code}" - ) from e - except httpx.TimeoutException as e: - logger.error( - "[Hatchet] Modal padding timeout", - transcript_id=input.transcript_id, - track_index=input.track_index, - error=str(e), - exc_info=True, - ) - raise Exception("Modal padding timeout") from e + ctx.log(f"pad_track: padding returned size={file_size}") logger.info( "[Hatchet] pad_track complete", diff --git a/server/reflector/hatchet/workflows/track_processing.py b/server/reflector/hatchet/workflows/track_processing.py index 7b08ecb1..f2ca2d6b 100644 --- a/server/reflector/hatchet/workflows/track_processing.py +++ b/server/reflector/hatchet/workflows/track_processing.py @@ -36,6 +36,7 @@ class TrackInput(BaseModel): bucket_name: str transcript_id: str language: str = "en" + source_platform: str = "daily" hatchet = HatchetClientManager.get_client() @@ -59,20 +60,14 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: ) try: - # Create fresh storage instance to avoid aioboto3 fork issues - # TODO: replace direct AwsStorage construction with get_transcripts_storage() factory - from reflector.settings import settings # noqa: PLC0415 - from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415 - - storage = AwsStorage( - aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME, - aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION, - aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID, - aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY, - aws_endpoint_url=settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL, + from reflector.storage import ( # noqa: PLC0415 + get_source_storage, + get_transcripts_storage, ) - source_url = await storage.get_file_url( + # Source reads: use platform-specific credentials + source_storage = get_source_storage(input.source_platform) + source_url = await source_storage.get_file_url( input.s3_key, operation="get_object", expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, @@ -99,18 +94,19 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm" - # Presign PUT URL for output (Modal uploads directly) - output_url = await storage.get_file_url( + # Output writes: use transcript storage (our own bucket) + output_storage = get_transcripts_storage() + output_url = await output_storage.get_file_url( storage_path, operation="put_object", expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, ) - from reflector.processors.audio_padding_modal import ( # noqa: PLC0415 - AudioPaddingModalProcessor, + from reflector.processors.audio_padding_auto import ( # noqa: PLC0415 + AudioPaddingAutoProcessor, ) - processor = AudioPaddingModalProcessor() + processor = AudioPaddingAutoProcessor() result = await processor.pad_track( track_url=source_url, output_url=output_url, @@ -161,18 +157,18 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe raise ValueError("Missing padded_key from pad_track") # Presign URL on demand (avoids stale URLs on workflow replay) - # TODO: replace direct AwsStorage construction with get_transcripts_storage() factory - from reflector.settings import settings # noqa: PLC0415 - from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415 - - storage = AwsStorage( - aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME, - aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION, - aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID, - aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY, - aws_endpoint_url=settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL, + from reflector.storage import ( # noqa: PLC0415 + get_source_storage, + get_transcripts_storage, ) + # If bucket_name is set, file is still in the platform's source bucket (no padding applied). + # If bucket_name is None, padded file was written to our transcript storage. + if bucket_name: + storage = get_source_storage(input.source_platform) + else: + storage = get_transcripts_storage() + audio_url = await storage.get_file_url( padded_key, operation="get_object", diff --git a/server/reflector/processors/audio_padding_auto.py b/server/reflector/processors/audio_padding_auto.py new file mode 100644 index 00000000..2e191470 --- /dev/null +++ b/server/reflector/processors/audio_padding_auto.py @@ -0,0 +1,31 @@ +import importlib + +from reflector.settings import settings + + +class AudioPaddingAutoProcessor: + _registry = {} + + @classmethod + def register(cls, name, kclass): + cls._registry[name] = kclass + + def __new__(cls, name: str | None = None, **kwargs): + if name is None: + name = settings.PADDING_BACKEND + if name not in cls._registry: + module_name = f"reflector.processors.audio_padding_{name}" + importlib.import_module(module_name) + + # gather specific configuration for the processor + # search `PADDING_XXX_YYY`, push to constructor as `xxx_yyy` + config = {} + name_upper = name.upper() + settings_prefix = "PADDING_" + config_prefix = f"{settings_prefix}{name_upper}_" + for key, value in settings: + if key.startswith(config_prefix): + config_name = key[len(settings_prefix) :].lower() + config[config_name] = value + + return cls._registry[name](**config | kwargs) diff --git a/server/reflector/processors/audio_padding_local.py b/server/reflector/processors/audio_padding_local.py new file mode 100644 index 00000000..e6646660 --- /dev/null +++ b/server/reflector/processors/audio_padding_local.py @@ -0,0 +1,133 @@ +""" +Local audio padding processor using PyAV. + +Pads audio tracks with silence directly in-process (no HTTP). +Reuses the shared PyAV utilities from reflector.utils.audio_padding. +""" + +import asyncio +import os +import tempfile + +import av + +from reflector.logger import logger +from reflector.processors.audio_padding_auto import AudioPaddingAutoProcessor +from reflector.processors.audio_padding_modal import PaddingResponse +from reflector.utils.audio_padding import apply_audio_padding_to_file + +S3_TIMEOUT = 60 + + +class AudioPaddingLocalProcessor: + """Audio padding processor using local PyAV (no HTTP backend).""" + + async def pad_track( + self, + track_url: str, + output_url: str, + start_time_seconds: float, + track_index: int, + ) -> PaddingResponse: + """Pad audio track with silence locally via PyAV. + + Args: + track_url: Presigned GET URL for source audio track + output_url: Presigned PUT URL for output WebM + start_time_seconds: Amount of silence to prepend + track_index: Track index for logging + """ + if not track_url: + raise ValueError("track_url cannot be empty") + if start_time_seconds <= 0: + raise ValueError( + f"start_time_seconds must be positive, got {start_time_seconds}" + ) + + log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds) + log.info("Starting local PyAV padding") + + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, + self._pad_track_blocking, + track_url, + output_url, + start_time_seconds, + track_index, + ) + + def _pad_track_blocking( + self, + track_url: str, + output_url: str, + start_time_seconds: float, + track_index: int, + ) -> PaddingResponse: + """Blocking padding work: download, pad with PyAV, upload.""" + import requests + + log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds) + temp_dir = tempfile.mkdtemp() + input_path = None + output_path = None + + try: + # Download source audio + log.info("Downloading track for local padding") + response = requests.get(track_url, stream=True, timeout=S3_TIMEOUT) + response.raise_for_status() + + input_path = os.path.join(temp_dir, "track.webm") + total_bytes = 0 + with open(input_path, "wb") as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + total_bytes += len(chunk) + log.info("Track downloaded", bytes=total_bytes) + + # Apply padding using shared PyAV utility + output_path = os.path.join(temp_dir, "padded.webm") + with av.open(input_path) as in_container: + apply_audio_padding_to_file( + in_container, + output_path, + start_time_seconds, + track_index, + logger=logger, + ) + + file_size = os.path.getsize(output_path) + log.info("Local padding complete", size=file_size) + + # Upload padded track + log.info("Uploading padded track to S3") + with open(output_path, "rb") as f: + upload_response = requests.put(output_url, data=f, timeout=S3_TIMEOUT) + upload_response.raise_for_status() + log.info("Upload complete", size=file_size) + + return PaddingResponse(size=file_size) + + except Exception as e: + log.error("Local padding failed", error=str(e), exc_info=True) + raise + finally: + if input_path and os.path.exists(input_path): + try: + os.unlink(input_path) + except Exception as e: + log.warning("Failed to cleanup input file", error=str(e)) + if output_path and os.path.exists(output_path): + try: + os.unlink(output_path) + except Exception as e: + log.warning("Failed to cleanup output file", error=str(e)) + try: + os.rmdir(temp_dir) + except Exception as e: + log.warning("Failed to cleanup temp directory", error=str(e)) + + +AudioPaddingAutoProcessor.register("local", AudioPaddingLocalProcessor) diff --git a/server/reflector/processors/audio_padding_modal.py b/server/reflector/processors/audio_padding_modal.py index 49227f1d..289058f1 100644 --- a/server/reflector/processors/audio_padding_modal.py +++ b/server/reflector/processors/audio_padding_modal.py @@ -10,6 +10,7 @@ from pydantic import BaseModel from reflector.hatchet.constants import TIMEOUT_AUDIO from reflector.logger import logger +from reflector.processors.audio_padding_auto import AudioPaddingAutoProcessor class PaddingResponse(BaseModel): @@ -111,3 +112,6 @@ class AudioPaddingModalProcessor: except Exception as e: log.error("Modal padding unexpected error", error=str(e), exc_info=True) raise + + +AudioPaddingAutoProcessor.register("modal", AudioPaddingModalProcessor) diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index fb36cfeb..3a8343d8 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -40,6 +40,7 @@ class MultitrackProcessingConfig: track_keys: list[str] recording_id: NonEmptyString | None = None room_id: NonEmptyString | None = None + source_platform: str = "daily" mode: Literal["multitrack"] = "multitrack" @@ -256,6 +257,7 @@ async def dispatch_transcript_processing( "bucket_name": config.bucket_name, "transcript_id": config.transcript_id, "room_id": config.room_id, + "source_platform": config.source_platform, }, additional_metadata={ "transcript_id": config.transcript_id, diff --git a/server/reflector/settings.py b/server/reflector/settings.py index fad1f4d4..47845f58 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -73,6 +73,9 @@ class Settings(BaseSettings): DAILYCO_STORAGE_AWS_BUCKET_NAME: str | None = None DAILYCO_STORAGE_AWS_REGION: str | None = None DAILYCO_STORAGE_AWS_ROLE_ARN: str | None = None + # Worker credentials for reading/deleting from Daily's recording bucket + DAILYCO_STORAGE_AWS_ACCESS_KEY_ID: str | None = None + DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY: str | None = None # Translate into the target language TRANSLATION_BACKEND: str = "passthrough" @@ -106,7 +109,11 @@ class Settings(BaseSettings): # Diarization: modal backend DIARIZATION_MODAL_API_KEY: str | None = None - # Audio Padding (Modal.com backend) + # Audio Padding + # backends: + # - local: in-process PyAV padding (no HTTP, runs in same process) + # - modal: HTTP API client (works with Modal.com OR self-hosted gpu/self_hosted/) + PADDING_BACKEND: str = "local" PADDING_URL: str | None = None PADDING_MODAL_API_KEY: str | None = None diff --git a/server/reflector/storage/__init__.py b/server/reflector/storage/__init__.py index aff6c767..16528396 100644 --- a/server/reflector/storage/__init__.py +++ b/server/reflector/storage/__init__.py @@ -17,6 +17,49 @@ def get_transcripts_storage() -> Storage: ) +def get_source_storage(platform: str) -> Storage: + """Get storage for reading/deleting source recording files from the platform's bucket. + + Returns an AwsStorage configured with the platform's worker credentials + (access keys), or falls back to get_transcripts_storage() when platform-specific + credentials aren't configured (e.g., single-bucket setups). + + Args: + platform: Recording platform name ("daily", "whereby", or other). + """ + if platform == "daily": + if ( + settings.DAILYCO_STORAGE_AWS_ACCESS_KEY_ID + and settings.DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY + and settings.DAILYCO_STORAGE_AWS_BUCKET_NAME + ): + from reflector.storage.storage_aws import AwsStorage + + return AwsStorage( + aws_bucket_name=settings.DAILYCO_STORAGE_AWS_BUCKET_NAME, + aws_region=settings.DAILYCO_STORAGE_AWS_REGION or "us-east-1", + aws_access_key_id=settings.DAILYCO_STORAGE_AWS_ACCESS_KEY_ID, + aws_secret_access_key=settings.DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY, + ) + + elif platform == "whereby": + if ( + settings.WHEREBY_STORAGE_AWS_ACCESS_KEY_ID + and settings.WHEREBY_STORAGE_AWS_SECRET_ACCESS_KEY + and settings.WHEREBY_STORAGE_AWS_BUCKET_NAME + ): + from reflector.storage.storage_aws import AwsStorage + + return AwsStorage( + aws_bucket_name=settings.WHEREBY_STORAGE_AWS_BUCKET_NAME, + aws_region=settings.WHEREBY_STORAGE_AWS_REGION or "us-east-1", + aws_access_key_id=settings.WHEREBY_STORAGE_AWS_ACCESS_KEY_ID, + aws_secret_access_key=settings.WHEREBY_STORAGE_AWS_SECRET_ACCESS_KEY, + ) + + return get_transcripts_storage() + + def get_whereby_storage() -> Storage: """ Get storage config for Whereby (for passing to Whereby API). diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index adcefe5e..2c9d2ae8 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -24,6 +24,118 @@ RECONCILIATION_INTERVAL = _override or 30.0 ICS_SYNC_INTERVAL = _override or 60.0 UPCOMING_MEETINGS_INTERVAL = _override or 30.0 + +def build_beat_schedule( + *, + whereby_api_key=None, + aws_process_recording_queue_url=None, + daily_api_key=None, + public_mode=False, + public_data_retention_days=None, + healthcheck_url=None, +): + """Build the Celery beat schedule based on configured services. + + Only registers tasks for services that are actually configured, + avoiding unnecessary worker wake-ups in selfhosted deployments. + """ + beat_schedule = {} + + _whereby_enabled = bool(whereby_api_key) or bool(aws_process_recording_queue_url) + if _whereby_enabled: + beat_schedule["process_messages"] = { + "task": "reflector.worker.process.process_messages", + "schedule": SQS_POLL_INTERVAL, + } + beat_schedule["reprocess_failed_recordings"] = { + "task": "reflector.worker.process.reprocess_failed_recordings", + "schedule": crontab(hour=5, minute=0), # Midnight EST + } + logger.info( + "Whereby beat tasks enabled", + tasks=["process_messages", "reprocess_failed_recordings"], + ) + else: + logger.info("Whereby beat tasks disabled (no WHEREBY_API_KEY or SQS URL)") + + _daily_enabled = bool(daily_api_key) + if _daily_enabled: + beat_schedule["poll_daily_recordings"] = { + "task": "reflector.worker.process.poll_daily_recordings", + "schedule": POLL_DAILY_RECORDINGS_INTERVAL_SEC, + } + beat_schedule["trigger_daily_reconciliation"] = { + "task": "reflector.worker.process.trigger_daily_reconciliation", + "schedule": RECONCILIATION_INTERVAL, + } + beat_schedule["reprocess_failed_daily_recordings"] = { + "task": "reflector.worker.process.reprocess_failed_daily_recordings", + "schedule": crontab(hour=5, minute=0), # Midnight EST + } + logger.info( + "Daily.co beat tasks enabled", + tasks=[ + "poll_daily_recordings", + "trigger_daily_reconciliation", + "reprocess_failed_daily_recordings", + ], + ) + else: + logger.info("Daily.co beat tasks disabled (no DAILY_API_KEY)") + + _any_platform = _whereby_enabled or _daily_enabled + if _any_platform: + beat_schedule["process_meetings"] = { + "task": "reflector.worker.process.process_meetings", + "schedule": SQS_POLL_INTERVAL, + } + beat_schedule["sync_all_ics_calendars"] = { + "task": "reflector.worker.ics_sync.sync_all_ics_calendars", + "schedule": ICS_SYNC_INTERVAL, + } + beat_schedule["create_upcoming_meetings"] = { + "task": "reflector.worker.ics_sync.create_upcoming_meetings", + "schedule": UPCOMING_MEETINGS_INTERVAL, + } + logger.info( + "Platform tasks enabled", + tasks=[ + "process_meetings", + "sync_all_ics_calendars", + "create_upcoming_meetings", + ], + ) + else: + logger.info("Platform tasks disabled (no video platform configured)") + + if public_mode: + beat_schedule["cleanup_old_public_data"] = { + "task": "reflector.worker.cleanup.cleanup_old_public_data_task", + "schedule": crontab(hour=3, minute=0), + } + logger.info( + "Public mode cleanup enabled", + retention_days=public_data_retention_days, + ) + + if healthcheck_url: + beat_schedule["healthcheck_ping"] = { + "task": "reflector.worker.healthcheck.healthcheck_ping", + "schedule": 60.0 * 10, + } + logger.info("Healthcheck enabled", url=healthcheck_url) + else: + logger.warning("Healthcheck disabled, no url configured") + + logger.info( + "Beat schedule configured", + total_tasks=len(beat_schedule), + task_names=sorted(beat_schedule.keys()), + ) + + return beat_schedule + + if celery.current_app.main != "default": logger.info(f"Celery already configured ({celery.current_app})") app = celery.current_app @@ -42,57 +154,11 @@ else: ] ) - # crontab - app.conf.beat_schedule = { - "process_messages": { - "task": "reflector.worker.process.process_messages", - "schedule": SQS_POLL_INTERVAL, - }, - "process_meetings": { - "task": "reflector.worker.process.process_meetings", - "schedule": SQS_POLL_INTERVAL, - }, - "reprocess_failed_recordings": { - "task": "reflector.worker.process.reprocess_failed_recordings", - "schedule": crontab(hour=5, minute=0), # Midnight EST - }, - "reprocess_failed_daily_recordings": { - "task": "reflector.worker.process.reprocess_failed_daily_recordings", - "schedule": crontab(hour=5, minute=0), # Midnight EST - }, - "poll_daily_recordings": { - "task": "reflector.worker.process.poll_daily_recordings", - "schedule": POLL_DAILY_RECORDINGS_INTERVAL_SEC, - }, - "trigger_daily_reconciliation": { - "task": "reflector.worker.process.trigger_daily_reconciliation", - "schedule": RECONCILIATION_INTERVAL, - }, - "sync_all_ics_calendars": { - "task": "reflector.worker.ics_sync.sync_all_ics_calendars", - "schedule": ICS_SYNC_INTERVAL, - }, - "create_upcoming_meetings": { - "task": "reflector.worker.ics_sync.create_upcoming_meetings", - "schedule": UPCOMING_MEETINGS_INTERVAL, - }, - } - - if settings.PUBLIC_MODE: - app.conf.beat_schedule["cleanup_old_public_data"] = { - "task": "reflector.worker.cleanup.cleanup_old_public_data_task", - "schedule": crontab(hour=3, minute=0), - } - logger.info( - "Public mode cleanup enabled", - retention_days=settings.PUBLIC_DATA_RETENTION_DAYS, - ) - - if settings.HEALTHCHECK_URL: - app.conf.beat_schedule["healthcheck_ping"] = { - "task": "reflector.worker.healthcheck.healthcheck_ping", - "schedule": 60.0 * 10, - } - logger.info("Healthcheck enabled", url=settings.HEALTHCHECK_URL) - else: - logger.warning("Healthcheck disabled, no url configured") + app.conf.beat_schedule = build_beat_schedule( + whereby_api_key=settings.WHEREBY_API_KEY, + aws_process_recording_queue_url=settings.AWS_PROCESS_RECORDING_QUEUE_URL, + daily_api_key=settings.DAILY_API_KEY, + public_mode=settings.PUBLIC_MODE, + public_data_retention_days=settings.PUBLIC_DATA_RETENTION_DAYS, + healthcheck_url=settings.HEALTHCHECK_URL, + ) diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index c1d26d96..ede72947 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -357,6 +357,7 @@ async def _process_multitrack_recording_inner( "bucket_name": bucket_name, "transcript_id": transcript.id, "room_id": room.id, + "source_platform": "daily", }, additional_metadata={ "transcript_id": transcript.id, @@ -1068,6 +1069,7 @@ async def reprocess_failed_daily_recordings(): "bucket_name": bucket_name, "transcript_id": transcript.id, "room_id": room.id if room else None, + "source_platform": "daily", }, additional_metadata={ "transcript_id": transcript.id, diff --git a/server/tests/test_beat_schedule.py b/server/tests/test_beat_schedule.py new file mode 100644 index 00000000..382af9a2 --- /dev/null +++ b/server/tests/test_beat_schedule.py @@ -0,0 +1,247 @@ +"""Tests for conditional Celery beat schedule registration. + +Verifies that beat tasks are only registered when their corresponding +services are configured (WHEREBY_API_KEY, DAILY_API_KEY, etc.). +""" + +import pytest + +from reflector.worker.app import build_beat_schedule + + +# Override autouse fixtures from conftest — these tests don't need database or websockets +@pytest.fixture(autouse=True) +def setup_database(): + yield + + +@pytest.fixture(autouse=True) +def ws_manager_in_memory(): + yield + + +@pytest.fixture(autouse=True) +def reset_hatchet_client(): + yield + + +# Task name sets for each group +WHEREBY_TASKS = {"process_messages", "reprocess_failed_recordings"} +DAILY_TASKS = { + "poll_daily_recordings", + "trigger_daily_reconciliation", + "reprocess_failed_daily_recordings", +} +PLATFORM_TASKS = { + "process_meetings", + "sync_all_ics_calendars", + "create_upcoming_meetings", +} + + +class TestNoPlatformConfigured: + """When no video platform is configured, no platform tasks should be registered.""" + + def test_no_platform_tasks(self): + schedule = build_beat_schedule() + task_names = set(schedule.keys()) + assert not task_names & WHEREBY_TASKS + assert not task_names & DAILY_TASKS + assert not task_names & PLATFORM_TASKS + + def test_only_healthcheck_disabled_warning(self): + """With no config at all, schedule should be empty (healthcheck needs URL).""" + schedule = build_beat_schedule() + assert len(schedule) == 0 + + def test_healthcheck_only(self): + schedule = build_beat_schedule(healthcheck_url="https://hc.example.com/ping") + assert set(schedule.keys()) == {"healthcheck_ping"} + + def test_public_mode_only(self): + schedule = build_beat_schedule(public_mode=True) + assert set(schedule.keys()) == {"cleanup_old_public_data"} + + +class TestWherebyOnly: + """When only Whereby is configured.""" + + def test_whereby_api_key(self): + schedule = build_beat_schedule(whereby_api_key="test-key") + task_names = set(schedule.keys()) + assert WHEREBY_TASKS <= task_names + assert PLATFORM_TASKS <= task_names + assert not task_names & DAILY_TASKS + + def test_whereby_sqs_url(self): + schedule = build_beat_schedule( + aws_process_recording_queue_url="https://sqs.us-east-1.amazonaws.com/123/queue" + ) + task_names = set(schedule.keys()) + assert WHEREBY_TASKS <= task_names + assert PLATFORM_TASKS <= task_names + assert not task_names & DAILY_TASKS + + def test_whereby_task_count(self): + schedule = build_beat_schedule(whereby_api_key="test-key") + # Whereby (2) + Platform (3) = 5 + assert len(schedule) == 5 + + +class TestDailyOnly: + """When only Daily.co is configured.""" + + def test_daily_api_key(self): + schedule = build_beat_schedule(daily_api_key="test-daily-key") + task_names = set(schedule.keys()) + assert DAILY_TASKS <= task_names + assert PLATFORM_TASKS <= task_names + assert not task_names & WHEREBY_TASKS + + def test_daily_task_count(self): + schedule = build_beat_schedule(daily_api_key="test-daily-key") + # Daily (3) + Platform (3) = 6 + assert len(schedule) == 6 + + +class TestBothPlatforms: + """When both Whereby and Daily.co are configured.""" + + def test_all_tasks_registered(self): + schedule = build_beat_schedule( + whereby_api_key="test-key", + daily_api_key="test-daily-key", + ) + task_names = set(schedule.keys()) + assert WHEREBY_TASKS <= task_names + assert DAILY_TASKS <= task_names + assert PLATFORM_TASKS <= task_names + + def test_combined_task_count(self): + schedule = build_beat_schedule( + whereby_api_key="test-key", + daily_api_key="test-daily-key", + ) + # Whereby (2) + Daily (3) + Platform (3) = 8 + assert len(schedule) == 8 + + +class TestConditionalFlags: + """Test PUBLIC_MODE and HEALTHCHECK_URL interact correctly with platform tasks.""" + + def test_all_flags_enabled(self): + schedule = build_beat_schedule( + whereby_api_key="test-key", + daily_api_key="test-daily-key", + public_mode=True, + healthcheck_url="https://hc.example.com/ping", + ) + task_names = set(schedule.keys()) + assert "cleanup_old_public_data" in task_names + assert "healthcheck_ping" in task_names + assert WHEREBY_TASKS <= task_names + assert DAILY_TASKS <= task_names + assert PLATFORM_TASKS <= task_names + # Whereby (2) + Daily (3) + Platform (3) + cleanup (1) + healthcheck (1) = 10 + assert len(schedule) == 10 + + def test_public_mode_with_whereby(self): + schedule = build_beat_schedule( + whereby_api_key="test-key", + public_mode=True, + ) + task_names = set(schedule.keys()) + assert "cleanup_old_public_data" in task_names + assert WHEREBY_TASKS <= task_names + assert PLATFORM_TASKS <= task_names + + def test_healthcheck_with_daily(self): + schedule = build_beat_schedule( + daily_api_key="test-daily-key", + healthcheck_url="https://hc.example.com/ping", + ) + task_names = set(schedule.keys()) + assert "healthcheck_ping" in task_names + assert DAILY_TASKS <= task_names + assert PLATFORM_TASKS <= task_names + + +class TestTaskDefinitions: + """Verify task definitions have correct structure.""" + + def test_whereby_task_paths(self): + schedule = build_beat_schedule(whereby_api_key="test-key") + assert ( + schedule["process_messages"]["task"] + == "reflector.worker.process.process_messages" + ) + assert ( + schedule["reprocess_failed_recordings"]["task"] + == "reflector.worker.process.reprocess_failed_recordings" + ) + + def test_daily_task_paths(self): + schedule = build_beat_schedule(daily_api_key="test-daily-key") + assert ( + schedule["poll_daily_recordings"]["task"] + == "reflector.worker.process.poll_daily_recordings" + ) + assert ( + schedule["trigger_daily_reconciliation"]["task"] + == "reflector.worker.process.trigger_daily_reconciliation" + ) + assert ( + schedule["reprocess_failed_daily_recordings"]["task"] + == "reflector.worker.process.reprocess_failed_daily_recordings" + ) + + def test_platform_task_paths(self): + schedule = build_beat_schedule(daily_api_key="test-daily-key") + assert ( + schedule["process_meetings"]["task"] + == "reflector.worker.process.process_meetings" + ) + assert ( + schedule["sync_all_ics_calendars"]["task"] + == "reflector.worker.ics_sync.sync_all_ics_calendars" + ) + assert ( + schedule["create_upcoming_meetings"]["task"] + == "reflector.worker.ics_sync.create_upcoming_meetings" + ) + + def test_all_tasks_have_schedule(self): + """Every registered task must have a 'schedule' key.""" + schedule = build_beat_schedule( + whereby_api_key="test-key", + daily_api_key="test-daily-key", + public_mode=True, + healthcheck_url="https://hc.example.com/ping", + ) + for name, config in schedule.items(): + assert "schedule" in config, f"Task '{name}' missing 'schedule' key" + assert "task" in config, f"Task '{name}' missing 'task' key" + + +class TestEmptyStringValues: + """Empty strings should be treated as not configured (falsy).""" + + def test_empty_whereby_key(self): + schedule = build_beat_schedule(whereby_api_key="") + assert not set(schedule.keys()) & WHEREBY_TASKS + + def test_empty_daily_key(self): + schedule = build_beat_schedule(daily_api_key="") + assert not set(schedule.keys()) & DAILY_TASKS + + def test_empty_sqs_url(self): + schedule = build_beat_schedule(aws_process_recording_queue_url="") + assert not set(schedule.keys()) & WHEREBY_TASKS + + def test_none_values(self): + schedule = build_beat_schedule( + whereby_api_key=None, + daily_api_key=None, + aws_process_recording_queue_url=None, + ) + assert len(schedule) == 0 diff --git a/server/tests/test_storage.py b/server/tests/test_storage.py index 2ba1f012..0e7ce809 100644 --- a/server/tests/test_storage.py +++ b/server/tests/test_storage.py @@ -367,3 +367,126 @@ async def test_aws_storage_none_endpoint_url(): assert storage.base_url == "https://reflector-bucket.s3.amazonaws.com/" # No s3 addressing_style override — boto_config should only have retries assert not hasattr(storage.boto_config, "s3") or storage.boto_config.s3 is None + + +# --- Tests for get_source_storage() --- + + +def test_get_source_storage_daily_with_credentials(): + """Daily platform with access keys returns AwsStorage with Daily credentials.""" + with patch("reflector.storage.settings") as mock_settings: + mock_settings.DAILYCO_STORAGE_AWS_ACCESS_KEY_ID = "daily-key" + mock_settings.DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY = "daily-secret" + mock_settings.DAILYCO_STORAGE_AWS_BUCKET_NAME = "daily-bucket" + mock_settings.DAILYCO_STORAGE_AWS_REGION = "us-west-2" + + from reflector.storage import get_source_storage + + storage = get_source_storage("daily") + + assert isinstance(storage, AwsStorage) + assert storage._bucket_name == "daily-bucket" + assert storage._region == "us-west-2" + assert storage._access_key_id == "daily-key" + assert storage._secret_access_key == "daily-secret" + assert storage._endpoint_url is None + + +def test_get_source_storage_daily_falls_back_without_credentials(): + """Daily platform without access keys falls back to transcript storage.""" + with patch("reflector.storage.settings") as mock_settings: + mock_settings.DAILYCO_STORAGE_AWS_ACCESS_KEY_ID = None + mock_settings.DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY = None + mock_settings.DAILYCO_STORAGE_AWS_BUCKET_NAME = "daily-bucket" + mock_settings.TRANSCRIPT_STORAGE_BACKEND = "aws" + mock_settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME = "transcript-bucket" + mock_settings.TRANSCRIPT_STORAGE_AWS_REGION = "us-east-1" + mock_settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID = "transcript-key" + mock_settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY = "transcript-secret" + mock_settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL = None + + from reflector.storage import get_source_storage + + with patch("reflector.storage.get_transcripts_storage") as mock_get_transcripts: + fallback = AwsStorage( + aws_bucket_name="transcript-bucket", + aws_region="us-east-1", + aws_access_key_id="transcript-key", + aws_secret_access_key="transcript-secret", + ) + mock_get_transcripts.return_value = fallback + + storage = get_source_storage("daily") + + mock_get_transcripts.assert_called_once() + assert storage is fallback + + +def test_get_source_storage_whereby_with_credentials(): + """Whereby platform with access keys returns AwsStorage with Whereby credentials.""" + with patch("reflector.storage.settings") as mock_settings: + mock_settings.WHEREBY_STORAGE_AWS_ACCESS_KEY_ID = "whereby-key" + mock_settings.WHEREBY_STORAGE_AWS_SECRET_ACCESS_KEY = "whereby-secret" + mock_settings.WHEREBY_STORAGE_AWS_BUCKET_NAME = "whereby-bucket" + mock_settings.WHEREBY_STORAGE_AWS_REGION = "eu-west-1" + + from reflector.storage import get_source_storage + + storage = get_source_storage("whereby") + + assert isinstance(storage, AwsStorage) + assert storage._bucket_name == "whereby-bucket" + assert storage._region == "eu-west-1" + assert storage._access_key_id == "whereby-key" + assert storage._secret_access_key == "whereby-secret" + + +def test_get_source_storage_unknown_platform_falls_back(): + """Unknown platform falls back to transcript storage.""" + with patch("reflector.storage.settings"): + from reflector.storage import get_source_storage + + with patch("reflector.storage.get_transcripts_storage") as mock_get_transcripts: + fallback = MagicMock() + mock_get_transcripts.return_value = fallback + + storage = get_source_storage("unknown-platform") + + mock_get_transcripts.assert_called_once() + assert storage is fallback + + +@pytest.mark.asyncio +async def test_source_storage_presigns_for_correct_bucket(): + """Source storage presigns URLs using the platform's credentials and the override bucket.""" + with patch("reflector.storage.settings") as mock_settings: + mock_settings.DAILYCO_STORAGE_AWS_ACCESS_KEY_ID = "daily-key" + mock_settings.DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY = "daily-secret" + mock_settings.DAILYCO_STORAGE_AWS_BUCKET_NAME = "daily-bucket" + mock_settings.DAILYCO_STORAGE_AWS_REGION = "us-west-2" + + from reflector.storage import get_source_storage + + storage = get_source_storage("daily") + + mock_client = AsyncMock() + mock_client.generate_presigned_url = AsyncMock( + return_value="https://daily-bucket.s3.amazonaws.com/track.webm?signed" + ) + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=None) + + with patch.object(storage.session, "client", return_value=mock_client): + url = await storage.get_file_url( + "track.webm", + operation="get_object", + expires_in=3600, + bucket="override-bucket", + ) + + assert "track.webm" in url + mock_client.generate_presigned_url.assert_called_once() + call_kwargs = mock_client.generate_presigned_url.call_args + params = call_kwargs[1].get("Params") or call_kwargs[0][1] + assert params["Bucket"] == "override-bucket" + assert params["Key"] == "track.webm"