durable (no-mistakes)

This commit is contained in:
Igor Loskutov
2025-12-15 12:17:39 -05:00
parent 90c3ecc9c3
commit 243ff2177c
43 changed files with 7808 additions and 1660 deletions

View File

@@ -0,0 +1,617 @@
# Conductor Migration Requirements: Daily.co Multitrack Pipeline
## Executive Summary
Migrate the Daily.co multitrack diarization pipeline from a monolithic Celery task to a decomposed Conductor workflow, enabling visual progress tracking, granular retries, and operational observability.
---
## Business Value
### 1. Visibility: Where Are We Now? (UX, DevEx)
**Current State**: Users see only three states: `idle``processing``ended/error`. A 10-minute pipeline appears frozen with no feedback.
**Target State**: Real-time visibility into which step is executing:
- "Transcribing track 2 of 3"
- "Generating summary (step 8 of 9)"
- Visual DAG in admin UI showing completed/in-progress/pending nodes
**Business Impact**:
- Reduced support tickets ("is it stuck?")
- Engineers can instantly identify bottlenecks
- Users have confidence the system is working
### 2. Progress Tracking: What's Left? (UX, DevEx)
**Current State**: No indication of remaining work. A failure at step 8 gives same error as failure at step 1.
**Target State**:
- Progress percentage based on completed steps
- Clear step enumeration (e.g., "Step 5/9: Transcription")
- Frontend receives structured progress events with step metadata
**Business Impact**:
- Users can estimate completion time
- Frontend can render meaningful progress bars
- Error messages include context ("Failed during summary generation")
### 3. Audit Trail & Profiling (DevEx, Ops)
**Current State**: Logs scattered across Celery workers. No unified view of a single recording's journey. Resource consumption unknown per step.
**Target State**:
- Single workflow ID traces entire recording lifecycle
- Per-step execution times recorded
- Resource consumption (GPU seconds, LLM tokens) attributable to specific steps
- Conductor UI provides complete audit history
**Business Impact**:
- Debugging: "Recording X failed at step Y after Z seconds"
- Cost attribution: "Transcription costs $X, summarization costs $Y"
- Performance optimization: identify slowest steps
### 4. Clear Event Dictionary (DevEx)
**Current State**: Frontend receives WebSocket events (`TRANSCRIPT`, `TOPIC`, `FINAL_TITLE`, etc.) but mapping to pipeline phases is implicit. Adding new events requires tracing through Python code.
**Target State**:
- Each Conductor task explicitly defines its output events
- Event schema documented alongside task definition
- Frontend developers can reference task→event mapping directly
**Business Impact**:
- Faster frontend development
- Reduced miscommunication between backend/frontend teams
- Self-documenting pipeline
### 5. Restart Without Reprocessing (UX, DevEx)
**Current State**: Any failure restarts the entire pipeline. A timeout during summary generation re-runs transcription (wasting GPU costs).
**Target State**:
- Failures resume from last successful step
- Completed work is checkpointed (e.g., transcription results stored before summary)
- Manual retry triggers only failed step, not entire workflow
**Business Impact**:
- Reduced GPU/LLM costs on retries
- Faster recovery from transient failures
- Users don't wait for re-transcription on summary failures
### 6. Per-Step Timeouts (UX, DevEx)
**Current State**: Single task timeout for entire pipeline. A hung GPU call blocks everything. Killing the task loses all progress.
**Target State**:
- Each step has independent timeout (e.g., transcription: 5min, LLM: 30s)
- Timeout kills only the hung step
- Pipeline can retry just that step or fail gracefully
**Business Impact**:
- Faster detection of stuck external services
- Reduced blast radius from hung calls
- More granular SLAs per operation type
### 7. Native Retries with Backoff (DevEx, UX)
**Current State**: Celery retry logic is per-task, not per-external-call. Custom retry wrappers needed for each API call.
**Target State**:
- Conductor provides native retry policies per task
- Exponential backoff configured declaratively
- Retry state visible in UI (attempt 2/5)
**Business Impact**:
- Reduced boilerplate code
- Consistent retry behavior across all external calls
- Visibility into retry attempts for debugging
---
## Current Architecture
### Daily.co Multitrack Pipeline Flow
```
Daily webhook (recording.ready-to-download) Polling (every 3 min)
│ │
▼ ▼
_handle_recording_ready() poll_daily_recordings()
│ │
└──────────────┬─────────────────────────┘
process_multitrack_recording.delay() ← Celery task #1
├── Daily API: GET /recordings/{id}
├── Daily API: GET /meetings/{mtgSessionId}/participants
├── DB: Create recording + transcript
task_pipeline_multitrack_process.delay() ← Celery task #2 (MONOLITH)
│ ┌─────────────────────────────────────────────────┐
│ │ pipeline.process() - ALL PHASES INSIDE HERE │
│ │ │
│ │ Phase 2: Track Padding (N tracks, sequential) │
│ │ Phase 3: Mixdown → S3 upload │
│ │ Phase 4: Waveform generation │
│ │ Phase 5: Transcription (N GPU calls, serial!) │
│ │ Phase 6: Topic Detection (C LLM calls) │
│ │ Phase 7a: Title Generation (1 LLM call) │
│ │ Phase 7b: Summary Generation (2+2M LLM calls) │
│ │ Phase 8: Finalize status │
│ └─────────────────────────────────────────────────┘
chain(cleanup → zulip → webhook).delay() ← Celery chain (3 tasks)
```
### Problem: Monolithic `pipeline.process()`
The heavy lifting happens inside a single Python function call. Celery only sees:
- Task started
- Task succeeded/failed
It cannot see or control the 8 internal phases.
---
## Target Architecture
### Decomposed Conductor Workflow
```
┌─────────────────────┐
│ get_recording │ ← Daily API
│ get_participants │
└──────────┬──────────┘
┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ pad_tk_0 │ │ pad_tk_1 │ │ pad_tk_N │ ← FORK (parallel)
└────┬─────┘ └────┬─────┘ └────┬─────┘
└──────────────────┼──────────────────┘
┌─────────────────────┐
│ mixdown_tracks │ ← PyAV → S3
└──────────┬──────────┘
┌──────────┴──────────┐
▼ ▼
┌───────────────┐ ┌───────────────┐
│generate_wave │ │ (continue) │ ← waveform parallel with transcription setup
└───────────────┘ └───────────────┘
┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│transcribe_0│ │transcribe_1│ │transcribe_N│ ← FORK (parallel GPU!)
└─────┬──────┘ └─────┬──────┘ └─────┬──────┘
└──────────────────┼──────────────────┘
┌─────────────────────┐
│ merge_transcripts │
└──────────┬──────────┘
┌──────────┴──────────┐
▼ ▼
┌───────────────┐ ┌───────────────┐
│detect_topics │ │ (or) │ ← topic detection
└───────┬───────┘ └───────────────┘
┌──────────────┴──────────────┐
▼ ▼
┌─────────────┐ ┌─────────────┐
│generate_title│ │gen_summary │ ← FORK (parallel LLM)
└──────┬──────┘ └──────┬──────┘
└──────────────┬─────────────┘
┌─────────────────────┐
│ finalize │
└──────────┬──────────┘
┌──────────────┼──────────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ consent │──▶│ zulip │──▶│ webhook │ ← sequential chain
└──────────┘ └──────────┘ └──────────┘
```
### Key Improvements
| Aspect | Current (Celery) | Target (Conductor) |
|--------|------------------|-------------------|
| Transcription parallelism | Serial (N × 30s) | Parallel (max 30s) |
| Failure granularity | Restart all | Retry failed step only |
| Progress visibility | None | Per-step status in UI |
| Timeout control | Entire pipeline | Per-step timeouts |
| Audit trail | Scattered logs | Unified workflow history |
---
## Scope of Work
### Module 1: Conductor Infrastructure Setup
**Files to Create/Modify:**
- `docker-compose.yml` - Add Conductor server container
- `server/reflector/conductor/` - New module for Conductor client
- Environment configuration for Conductor URL
**Tasks:**
- [ ] Add `conductoross/conductor-standalone:3.15.0` to docker-compose
- [ ] Create Conductor client wrapper (Python `conductor-python` SDK)
- [ ] Configure health checks and service dependencies
- [ ] Document Conductor UI access (port 8127)
### Module 2: Task Decomposition - Worker Definitions
**Files to Create:**
- `server/reflector/conductor/workers/` directory with:
- `get_recording.py` - Daily API recording fetch
- `get_participants.py` - Daily API participant fetch
- `pad_track.py` - Single track padding (PyAV)
- `mixdown_tracks.py` - Multi-track mixdown
- `generate_waveform.py` - Waveform generation
- `transcribe_track.py` - Single track GPU transcription
- `merge_transcripts.py` - Combine transcriptions
- `detect_topics.py` - LLM topic detection
- `generate_title.py` - LLM title generation
- `generate_summary.py` - LLM summary generation
- `finalize.py` - Status update and cleanup
- `cleanup_consent.py` - Consent check
- `post_zulip.py` - Zulip notification
- `send_webhook.py` - External webhook
- `generate_dynamic_fork_tasks.py` - Helper for FORK_JOIN_DYNAMIC task generation
**Reference Files (Current Implementation):**
- `server/reflector/pipelines/main_multitrack_pipeline.py`
- `server/reflector/worker/process.py`
- `server/reflector/worker/webhook.py`
**Key Considerations:**
- Each worker receives input from previous step via Conductor
- Workers must be idempotent (same input → same output)
- State serialization between steps (JSON-compatible types)
### Module 3: Workflow Definition
**Files to Create:**
- `server/reflector/conductor/workflows/diarization_pipeline.json`
- `server/reflector/conductor/workflows/register.py` - Registration script
**Workflow Structure:**
```json
{
"name": "daily_diarization_pipeline",
"version": 1,
"tasks": [
{"name": "get_recording", "type": "SIMPLE"},
{"name": "get_participants", "type": "SIMPLE"},
{
"name": "fork_padding",
"type": "FORK_JOIN_DYNAMIC",
"dynamicForkTasksParam": "track_keys"
},
{"name": "mixdown_tracks", "type": "SIMPLE"},
{"name": "generate_waveform", "type": "SIMPLE"},
{
"name": "fork_transcription",
"type": "FORK_JOIN_DYNAMIC",
"dynamicForkTasksParam": "padded_urls"
},
{"name": "merge_transcripts", "type": "SIMPLE"},
{"name": "detect_topics", "type": "SIMPLE"},
{
"name": "fork_generation",
"type": "FORK_JOIN",
"forkTasks": [["generate_title"], ["generate_summary"]]
},
{"name": "finalize", "type": "SIMPLE"},
{"name": "cleanup_consent", "type": "SIMPLE"},
{"name": "post_zulip", "type": "SIMPLE"},
{"name": "send_webhook", "type": "SIMPLE"}
]
}
```
**Key Considerations:**
- Dynamic FORK for variable number of tracks (N)
- Timeout configuration per task type
- Retry policies with exponential backoff
### Module 4: Pipeline Trigger Migration
**Files to Modify:**
- `server/reflector/worker/process.py`
**Changes:**
- Replace `task_pipeline_multitrack_process.delay()` with Conductor workflow start
- Store workflow ID on Recording for status tracking
- Handle Conductor API errors
- Keep `process_multitrack_recording` as-is (creates DB entities before workflow)
**Note:** Both webhook AND polling entry points converge at `process_multitrack_recording`,
which then calls `task_pipeline_multitrack_process.delay()`. By modifying this single call site,
we capture both entry paths without duplicating integration logic.
### Module 5: Task Definition Registration
**Files to Create:**
- `server/reflector/conductor/tasks/definitions.py`
**Task Definitions with Timeouts:**
| Task | Timeout | Response Timeout | Retry Count |
|------|---------|------------------|-------------|
| get_recording | 60s | 30s | 3 |
| get_participants | 60s | 30s | 3 |
| pad_track | 300s | 120s | 3 |
| mixdown_tracks | 600s | 300s | 3 |
| generate_waveform | 120s | 60s | 3 |
| transcribe_track | 1800s | 900s | 3 |
| merge_transcripts | 60s | 30s | 3 |
| detect_topics | 300s | 120s | 3 |
| generate_title | 60s | 30s | 3 |
| generate_summary | 300s | 120s | 3 |
| finalize | 60s | 30s | 3 |
| cleanup_consent | 60s | 30s | 3 |
| post_zulip | 60s | 30s | 5 |
| send_webhook | 60s | 30s | 30 |
| generate_dynamic_fork_tasks | 30s | 15s | 3 |
### Module 6: Frontend Integration
**WebSocket Events (Already Defined):**
Events continue to be broadcast as today. No change to event structure.
| Event | Triggered By Task | Payload |
|-------|-------------------|---------|
| STATUS | finalize | `{value: "processing"\|"ended"\|"error"}` |
| DURATION | mixdown_tracks | `{duration: float}` |
| WAVEFORM | generate_waveform | `{waveform: float[]}` |
| TRANSCRIPT | merge_transcripts | `{text: string, translation: string\|null}` |
| TOPIC | detect_topics | `{id, title, summary, timestamp, duration}` |
| FINAL_TITLE | generate_title | `{title: string}` |
| FINAL_LONG_SUMMARY | generate_summary | `{long_summary: string}` |
| FINAL_SHORT_SUMMARY | generate_summary | `{short_summary: string}` |
**New: Progress Events**
Add new event type for granular progress:
```python
# PipelineProgressEvent
{
"event": "PIPELINE_PROGRESS",
"data": {
"workflow_id": str,
"current_step": str,
"step_index": int,
"total_steps": int,
"step_status": "pending" | "in_progress" | "completed" | "failed"
}
}
```
### Module 7: State Management & Checkpointing
**Current State Storage:**
- `transcript.status` - High-level status
- `transcript.events[]` - Append-only event log
- `transcript.topics[]` - Topic results
- `transcript.title`, `transcript.long_summary`, etc.
**Conductor State Storage:**
- Workflow execution state in Conductor database
- Per-task input/output in Conductor
**Checkpointing Strategy:**
1. Each task reads required state from DB (not previous task output for large data)
2. Each task writes results to DB before returning
3. Task output contains references (IDs, URLs) not large payloads
4. On retry, task can check DB for existing results (idempotency)
---
## Data Flow Between Tasks
### Input/Output Contracts
```
get_recording
Input: { recording_id: string }
Output: { id, mtg_session_id, room_name, duration }
get_participants
Input: { mtg_session_id: string }
Output: { participants: [{participant_id, user_name}] }
pad_track
Input: { track_index: number, s3_key: string }
Output: { padded_url: string, size: number }
mixdown_tracks
Input: { padded_urls: string[] }
Output: { audio_key: string, duration: number }
generate_waveform
Input: { audio_key: string }
Output: { waveform: number[] }
transcribe_track
Input: { track_index: number, audio_url: string }
Output: { words: Word[] }
merge_transcripts
Input: { transcripts: Word[][] }
Output: { all_words: Word[], word_count: number }
detect_topics
Input: { words: Word[] }
Output: { topics: Topic[] }
generate_title
Input: { topics: Topic[] }
Output: { title: string }
generate_summary
Input: { words: Word[], topics: Topic[] }
Output: { summary: string, short_summary: string }
finalize
Input: { recording_id, title, summary, duration }
Output: { status: "COMPLETED" }
```
---
## External API Calls Summary
### Per-Step External Dependencies
| Task | External Service | Calls | Notes |
|------|------------------|-------|-------|
| get_recording | Daily.co API | 1 | GET /recordings/{id} |
| get_participants | Daily.co API | 1 | GET /meetings/{id}/participants |
| pad_track | S3 | 2 | presign read + PUT padded |
| mixdown_tracks | S3 | 1 | PUT audio.mp3 |
| transcribe_track | Modal.com GPU | 1 | POST /transcriptions |
| detect_topics | LLM (OpenAI) | C | C = ceil(words/300) |
| generate_title | LLM (OpenAI) | 1 | - |
| generate_summary | LLM (OpenAI) | 2+2M | M = subjects (max 6) |
| post_zulip | Zulip API | 1 | POST or PATCH |
| send_webhook | External | 1 | Customer webhook URL |
### Cost Attribution Enabled
With decomposed tasks, costs can be attributed:
- **GPU costs**: Sum of `transcribe_track` durations
- **LLM costs**: Sum of `detect_topics` + `generate_title` + `generate_summary` token usage
- **S3 costs**: Bytes uploaded by `pad_track` + `mixdown_tracks`
---
## Idempotency Requirements
### By Task
| Task | Idempotent? | Strategy |
|------|-------------|----------|
| get_recording | ✅ | Read-only API call |
| get_participants | ✅ | Read-only API call |
| pad_track | ⚠️ | Overwrite same S3 key |
| mixdown_tracks | ⚠️ | Overwrite same S3 key |
| generate_waveform | ✅ | Deterministic from audio |
| transcribe_track | ❌ | Cache by hash(audio_url) |
| detect_topics | ❌ | Cache by hash(words) |
| generate_title | ❌ | Cache by hash(topic_titles) |
| generate_summary | ❌ | Cache by hash(words+topics) |
| finalize | ✅ | Upsert status |
| cleanup_consent | ✅ | Idempotent deletes |
| post_zulip | ⚠️ | Use message_id for updates |
| send_webhook | ⚠️ | Receiver's responsibility |
### Caching Strategy for LLM/GPU Calls
```python
class TaskCache:
async def get(self, input_hash: str) -> Optional[Output]: ...
async def set(self, input_hash: str, output: Output) -> None: ...
# Before calling external service:
cached = await cache.get(hash(input))
if cached:
return cached
result = await external_service.call(input)
await cache.set(hash(input), result)
return result
```
---
## Migration Strategy
### Phase 1: Infrastructure (No Behavior Change)
- Add Conductor container to docker-compose
- Create Conductor client library
- Verify Conductor UI accessible
### Phase 2: Parallel Implementation
- Implement all worker tasks
- Register workflow definition
- Test with synthetic recordings
### Phase 3: Shadow Mode
- Trigger both Celery and Conductor pipelines
- Compare results for consistency
- Monitor Conductor execution in UI
### Phase 4: Cutover
- Disable Celery pipeline trigger
- Enable Conductor-only execution
- Monitor error rates and performance
### Phase 5: Cleanup
- Remove Celery task definitions
- Remove old pipeline code
- Update documentation
---
## Risks & Mitigations
| Risk | Mitigation |
|------|------------|
| Conductor server downtime | Health checks, failover to Celery (Phase 3) |
| Worker serialization issues | Extensive testing with real data |
| Performance regression | Benchmark parallel vs serial transcription |
| Data loss on migration | Shadow mode comparison (Phase 3) |
| Learning curve for team | Documentation, Conductor UI training |
---
## Success Metrics
| Metric | Current | Target |
|--------|---------|--------|
| Pipeline visibility | 3 states | 14+ steps visible |
| Transcription latency (N tracks) | N × 30s | ~30s (parallel) |
| Retry granularity | Entire pipeline | Single step |
| Cost attribution | None | Per-step breakdown |
| Debug time for failures | ~30 min | ~5 min (UI trace) |
---
## Appendix: Conductor Mock Implementation
A working Python mock demonstrating the target workflow structure is available at:
`docs/conductor-pipeline-mock/`
To run:
```bash
cd docs/conductor-pipeline-mock
docker compose up --build
./test_workflow.sh
```
UI: http://localhost:8127
This mock validates:
- Workflow definition structure
- FORK_JOIN parallelism
- Worker task patterns
- Conductor SDK usage
---
## References
- Diarization Pipeline Diagram: `DIARIZATION_PIPELINE_DIAGRAM.md`
- Current Celery Implementation: `server/reflector/pipelines/main_multitrack_pipeline.py`
- Conductor OSS Documentation: https://conductor-oss.github.io/conductor/
- Conductor Python SDK: https://github.com/conductor-sdk/conductor-python

1977
TASKS.md Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -34,6 +34,20 @@ services:
environment:
ENTRYPOINT: beat
conductor-worker:
build:
context: server
volumes:
- ./server/:/app/
- /app/.venv
env_file:
- ./server/.env
environment:
ENTRYPOINT: conductor-worker
depends_on:
conductor:
condition: service_healthy
redis:
image: redis:7.2
ports:
@@ -64,6 +78,19 @@ services:
volumes:
- ./data/postgres:/var/lib/postgresql/data
conductor:
image: conductoross/conductor-standalone:3.15.0
ports:
- 8180:8080
- 5001:5000
environment:
- conductor.db.type=memory
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
networks:
default:
attachable: true

View File

@@ -0,0 +1,345 @@
# Conductor OSS Migration - LLM Debugging Observations
This document captures hard-won debugging insights from migrating the multitrack diarization pipeline from Celery to Conductor OSS. These observations are particularly relevant for LLM assistants working on this codebase.
## Architecture Context
- **Conductor Python SDK** uses multiprocessing: 1 parent process spawns 15 `TaskRunner` subprocesses
- Each task type gets its own subprocess that polls Conductor server
- Workers are identified by container hostname (e.g., `595f5ddc9711`)
- Shadow mode (`CONDUCTOR_SHADOW_MODE=true`) runs both Celery and Conductor in parallel
---
## Challenge 1: Ghost Workers - Multiple Containers Polling Same Tasks
### Symptoms
- Tasks complete but with wrong/empty output
- Worker logs show no execution for a task that API shows as COMPLETED
- `workerId` in Conductor API doesn't match expected container
### Root Cause
Multiple containers may be running Conductor workers:
- `reflector-conductor-worker-1` (dedicated worker)
- `reflector-server-1` (if shadow mode enabled or worker code imported)
### Debugging Steps
```bash
# 1. Get the mystery worker ID from Conductor API
curl -s "http://localhost:8180/api/workflow/{id}" | jq '.tasks[] | {ref: .referenceTaskName, workerId}'
# 2. Find which container has that hostname
docker ps -a | grep {workerId}
# or
docker ps -a --format "{{.ID}} {{.Names}}" | grep {first-12-chars}
# 3. Check that container's code version
docker exec {container} cat /app/reflector/conductor/workers/{worker}.py | head -50
```
### Resolution
Restart ALL containers that might be polling Conductor tasks:
```bash
docker compose restart conductor-worker server
```
### Key Insight
**Always verify `workerId` matches your expected container.** In distributed worker setups, know ALL containers that poll for tasks.
---
## Challenge 2: Multiprocessing + AsyncIO + Database Conflicts
### Symptoms
```
InterfaceError: cannot perform operation: another operation is in progress
RuntimeError: Task <Task pending...> running at /app/.../worker.py
```
### Root Cause
Conductor Python SDK forks subprocesses. When subprocess calls `asyncio.run()`:
1. New event loop is created
2. But `get_database()` returns cached connection from parent process context
3. Parent's connection is incompatible with child's event loop
### Resolution
Reset context and create fresh connection in each subprocess:
```python
async def _process():
import databases
from reflector.db import _database_context
from reflector.settings import settings
# Reset context var - don't inherit from parent
_database_context.set(None)
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
# ... rest of async code
```
### Key Insight
**Any singleton/cached resource (DB connections, S3 clients, HTTP sessions) must be recreated AFTER fork.** Never trust inherited state in multiprocessing workers.
### TODO: The Real Problem with get_database()
**Current solution is a hack.** The issue runs deeper than multiprocessing fork:
#### What's Actually Happening
1. Each Conductor subprocess calls `asyncio.run(_process())` repeatedly for each task
2. First `asyncio.run()`: creates DB connection, stores in ContextVar
3. First task completes, `asyncio.run()` exits, **event loop destroyed**
4. **But**: ContextVar still holds the connection reference (ContextVars persist across `asyncio.run()` calls)
5. Second `asyncio.run()`: Creates a **new event loop**
6. Code tries to use the **old connection** (from ContextVar) with the **new event loop**
7. Error: "another operation is in progress"
**Root issue**: `get_database()` as a global singleton is incompatible with repeated `asyncio.run()` calls in the same process.
#### Option 1: Explicit Connection Lifecycle (cleanest)
```python
async def _process():
import databases
from reflector.settings import settings
# Don't use get_database() - create explicit connection
db = databases.Database(settings.DATABASE_URL)
try:
await db.connect()
# Problem: transcripts_controller.get_by_id() uses get_database() internally
# Would need to refactor controllers to accept db parameter
# e.g., await transcripts_controller.get_by_id(transcript_id, db=db)
finally:
await db.disconnect()
```
**Pros**: Clean separation, explicit lifecycle
**Cons**: Requires refactoring all controller methods to accept `db` parameter
#### Option 2: Reset ContextVar Properly (pragmatic)
```python
async def _process():
from reflector.db import _database_context, get_database
# Ensure fresh connection per task
old_db = _database_context.get()
if old_db and old_db.is_connected:
await old_db.disconnect()
_database_context.set(None)
# Now get_database() will create fresh connection
db = get_database()
await db.connect()
try:
# ... work ...
finally:
await db.disconnect()
_database_context.set(None)
```
**Pros**: Works with existing controller code
**Cons**: Still manipulating globals, cleanup needed in every worker
#### Option 3: Fix get_database() Itself (best long-term)
```python
# In reflector/db/__init__.py
def get_database() -> databases.Database:
"""Get database instance for current event loop"""
import asyncio
db = _database_context.get()
# Check if connection is valid for current event loop
if db is not None:
try:
loop = asyncio.get_running_loop()
# If connection's event loop differs, it's stale
if db._connection and hasattr(db._connection, '_loop'):
if db._connection._loop != loop:
# Stale connection from old event loop
db = None
except RuntimeError:
# No running loop
pass
if db is None:
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
return db
```
**Pros**: Fixes root cause, no changes needed in workers
**Cons**: Relies on implementation details of `databases` library
#### Recommendation
- **Short-term**: Option 2 (explicit cleanup in workers that need DB)
- **Long-term**: Option 1 (refactor to dependency injection) is the only architecturally clean solution
---
## Challenge 3: Type Mismatches Across Serialization Boundary
### Symptoms
```
ValidationError: 1 validation error for TranscriptTopic
transcript
Input should be a valid string [type=string_type, input_value={'translation': None, 'words': [...]}]
```
### Root Cause
Conductor JSON-serializes all task inputs/outputs. Complex Pydantic models get serialized to dicts:
- `TitleSummary.transcript: Transcript` becomes `{"translation": null, "words": [...]}`
- Next task expects `TranscriptTopic.transcript: str`
### Resolution
Explicitly reconstruct types when deserializing:
```python
from reflector.processors.types import TitleSummary, Transcript as TranscriptType, Word
def normalize_topic(t):
topic = dict(t)
transcript_data = topic.get("transcript")
if isinstance(transcript_data, dict):
words_list = transcript_data.get("words", [])
word_objects = [Word(**w) for w in words_list]
topic["transcript"] = TranscriptType(
words=word_objects,
translation=transcript_data.get("translation")
)
return topic
topic_objects = [TitleSummary(**normalize_topic(t)) for t in topics]
```
### Key Insight
**Conductor task I/O is always JSON.** Design workers to handle dict inputs and reconstruct domain objects explicitly.
---
## Challenge 4: Conductor Health Check Failures
### Symptoms
```
dependency failed to start: container reflector-conductor-1 is unhealthy
```
### Root Cause
Conductor OSS standalone container health endpoint can be slow/flaky, especially during startup or under load.
### Resolution
Bypass docker-compose health check dependency:
```bash
# Instead of: docker compose up -d conductor-worker
docker start reflector-conductor-worker-1
```
### Key Insight
For development, consider removing `depends_on.condition: service_healthy` or increasing health check timeout.
---
## Challenge 5: JOIN Task Output Format
### Symptoms
`merge_transcripts` receives data but outputs `word_count: 0`
### Root Cause
FORK_JOIN_DYNAMIC's JOIN task outputs a **dict keyed by task reference names**, not an array:
```json
{
"transcribe_track_0": {"words": [...], "track_index": 0},
"transcribe_track_1": {"words": [...], "track_index": 1}
}
```
### Resolution
Handle both dict and array inputs:
```python
transcripts = task.input_data.get("transcripts", [])
# Handle JOIN output (dict with task refs as keys)
if isinstance(transcripts, dict):
transcripts = list(transcripts.values())
for t in transcripts:
if isinstance(t, dict) and "words" in t:
all_words.extend(t["words"])
```
### Key Insight
**JOIN task output structure differs from FORK input.** Always log input types during debugging.
---
## Debugging Workflow
### 1. Add DEBUG Prints with Flush
Multiprocessing buffers stdout. Force immediate output:
```python
import sys
print("[DEBUG] worker entered", flush=True)
sys.stdout.flush()
```
### 2. Test Worker Functions Directly
Bypass Conductor entirely to verify logic:
```bash
docker compose exec conductor-worker uv run python -c "
from reflector.conductor.workers.merge_transcripts import merge_transcripts
from conductor.client.http.models import Task
mock_task = Task()
mock_task.input_data = {'transcripts': {...}, 'transcript_id': 'test'}
result = merge_transcripts(mock_task)
print(result.output_data)
"
```
### 3. Check Task Timing
Suspiciously fast completion (e.g., 10ms) indicates:
- Cached result from previous run
- Wrong worker processed it
- Task completed without actual execution
```bash
curl -s "http://localhost:8180/api/workflow/{id}" | \
jq '.tasks[] | {ref: .referenceTaskName, duration: (.endTime - .startTime)}'
```
### 4. Verify Container Code Version
```bash
docker compose exec conductor-worker cat /app/reflector/conductor/workers/{file}.py | head -50
```
### 5. Use Conductor Retry API
Retry from specific failed task without re-running entire workflow:
```bash
curl -X POST "http://localhost:8180/api/workflow/{id}/retry"
```
---
## Common Gotchas Summary
| Issue | Signal | Fix |
|-------|--------|-----|
| Wrong worker | `workerId` mismatch | Restart all worker containers |
| DB conflict | "another operation in progress" | Fresh DB connection per subprocess |
| Type mismatch | Pydantic validation error | Reconstruct objects from dicts |
| No logs | Task completes but no output | Check if different container processed |
| 0 results | JOIN output format | Convert dict.values() to list |
| Health check | Compose dependency fails | Use `docker start` directly |
---
## Files Most Likely to Need Conductor-Specific Handling
- `server/reflector/conductor/workers/*.py` - All workers need multiprocessing-safe patterns
- `server/reflector/db/__init__.py` - Database singleton, needs context reset
- `server/reflector/conductor/workflows/*.json` - Workflow definitions, check input/output mappings

View File

@@ -0,0 +1,32 @@
"""add workflow_id to recording
Revision ID: a326252ac554
Revises: bbafedfa510c
Create Date: 2025-12-14 11:34:22.137910
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "a326252ac554"
down_revision: Union[str, None] = "bbafedfa510c"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("recording", schema=None) as batch_op:
batch_op.add_column(sa.Column("workflow_id", sa.String(), nullable=True))
batch_op.create_index(
"idx_recording_workflow_id", ["workflow_id"], unique=False
)
def downgrade() -> None:
with op.batch_alter_table("recording", schema=None) as batch_op:
batch_op.drop_index("idx_recording_workflow_id")
batch_op.drop_column("workflow_id")

View File

@@ -39,6 +39,7 @@ dependencies = [
"pytest-env>=1.1.5",
"webvtt-py>=0.5.0",
"icalendar>=6.0.0",
"conductor-python>=1.2.3",
]
[dependency-groups]

View File

@@ -12,6 +12,7 @@ from reflector.events import subscribers_shutdown, subscribers_startup
from reflector.logger import logger
from reflector.metrics import metrics_init
from reflector.settings import settings
from reflector.views.conductor import router as conductor_router
from reflector.views.daily import router as daily_router
from reflector.views.meetings import router as meetings_router
from reflector.views.rooms import router as rooms_router
@@ -98,6 +99,7 @@ app.include_router(user_ws_router, prefix="/v1")
app.include_router(zulip_router, prefix="/v1")
app.include_router(whereby_router, prefix="/v1")
app.include_router(daily_router, prefix="/v1/daily")
app.include_router(conductor_router, prefix="/v1")
add_pagination(app)
# prepare celery

View File

@@ -0,0 +1,5 @@
"""Conductor workflow orchestration module."""
from reflector.conductor.client import ConductorClientManager
__all__ = ["ConductorClientManager"]

View File

@@ -0,0 +1,40 @@
"""Conductor Python client wrapper."""
from conductor.client.configuration.configuration import Configuration
from conductor.client.orkes_clients import OrkesClients
from conductor.client.workflow_client import WorkflowClient
from reflector.settings import settings
class ConductorClientManager:
"""Singleton manager for Conductor client connections."""
_instance: OrkesClients | None = None
@classmethod
def get_client(cls) -> WorkflowClient:
"""Get or create the workflow client."""
if cls._instance is None:
config = Configuration(
server_api_url=settings.CONDUCTOR_SERVER_URL,
debug=settings.CONDUCTOR_DEBUG,
)
cls._instance = OrkesClients(config)
return cls._instance.get_workflow_client()
@classmethod
def start_workflow(cls, name: str, version: int, input_data: dict) -> str:
"""Start a workflow and return the workflow ID."""
client = cls.get_client()
return client.start_workflow_by_name(name, input_data, version=version)
@classmethod
def get_workflow_status(cls, workflow_id: str) -> dict:
"""Get the current status of a workflow."""
client = cls.get_client()
return client.get_workflow(workflow_id, include_tasks=True)
@classmethod
def reset(cls) -> None:
"""Reset the client instance (for testing)."""
cls._instance = None

View File

@@ -0,0 +1,103 @@
"""Progress event emission for Conductor workers."""
import asyncio
from typing import Literal
from reflector.db.transcripts import PipelineProgressData
from reflector.logger import logger
from reflector.ws_manager import get_ws_manager
# Step mapping for progress tracking
# Maps task names to their index in the pipeline
PIPELINE_STEPS = {
"get_recording": 1,
"get_participants": 2,
"pad_track": 3, # Fork tasks share same step
"mixdown_tracks": 4,
"generate_waveform": 5,
"transcribe_track": 6, # Fork tasks share same step
"merge_transcripts": 7,
"detect_topics": 8,
"generate_title": 9, # Fork tasks share same step
"generate_summary": 9, # Fork tasks share same step
"finalize": 10,
"cleanup_consent": 11,
"post_zulip": 12,
"send_webhook": 13,
}
TOTAL_STEPS = 13
async def _emit_progress_async(
transcript_id: str,
step: str,
status: Literal["pending", "in_progress", "completed", "failed"],
workflow_id: str | None = None,
) -> None:
"""Async implementation of progress emission."""
ws_manager = get_ws_manager()
step_index = PIPELINE_STEPS.get(step, 0)
data = PipelineProgressData(
workflow_id=workflow_id,
current_step=step,
step_index=step_index,
total_steps=TOTAL_STEPS,
step_status=status,
)
await ws_manager.send_json(
room_id=f"ts:{transcript_id}",
message={
"event": "PIPELINE_PROGRESS",
"data": data.model_dump(),
},
)
logger.debug(
"[Progress] Emitted",
transcript_id=transcript_id,
step=step,
status=status,
step_index=step_index,
)
def emit_progress(
transcript_id: str,
step: str,
status: Literal["pending", "in_progress", "completed", "failed"],
workflow_id: str | None = None,
) -> None:
"""Emit a pipeline progress event (sync wrapper for Conductor workers).
Args:
transcript_id: The transcript ID to emit progress for
step: The current step name (e.g., "transcribe_track")
status: The step status
workflow_id: Optional workflow ID
"""
try:
# Get or create event loop for sync context
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop is not None and loop.is_running():
# Already in async context, schedule the coroutine
asyncio.create_task(
_emit_progress_async(transcript_id, step, status, workflow_id)
)
else:
# Not in async context, run synchronously
asyncio.run(_emit_progress_async(transcript_id, step, status, workflow_id))
except Exception as e:
# Progress emission should never break the pipeline
logger.warning(
"[Progress] Failed to emit progress event",
error=str(e),
transcript_id=transcript_id,
step=step,
)

View File

@@ -0,0 +1,58 @@
"""
Run Conductor workers for the diarization pipeline.
Usage:
uv run -m reflector.conductor.run_workers
# Or via docker:
docker compose exec server uv run -m reflector.conductor.run_workers
"""
import signal
import sys
import time
from conductor.client.automator.task_handler import TaskHandler
from conductor.client.configuration.configuration import Configuration
from reflector.conductor import workers # noqa: F401 - registers workers via decorators
from reflector.logger import logger
from reflector.settings import settings
def main() -> None:
"""Start Conductor worker polling."""
if not settings.CONDUCTOR_ENABLED:
logger.error("CONDUCTOR_ENABLED is False, not starting workers")
sys.exit(1)
logger.info(
"Starting Conductor workers",
server_url=settings.CONDUCTOR_SERVER_URL,
)
config = Configuration(
server_api_url=settings.CONDUCTOR_SERVER_URL,
debug=settings.CONDUCTOR_DEBUG,
)
task_handler = TaskHandler(configuration=config)
# Handle graceful shutdown
def shutdown_handler(signum: int, frame) -> None:
logger.info("Received shutdown signal, stopping workers...")
task_handler.stop_processes()
sys.exit(0)
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
logger.info("Starting task polling...")
task_handler.start_processes()
# Keep alive
while True:
time.sleep(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,207 @@
"""Shadow mode comparison for Celery vs Conductor pipeline results."""
from dataclasses import dataclass
from typing import Any
from reflector.conductor.client import ConductorClientManager
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.logger import logger
@dataclass
class FieldDifference:
"""A difference between Celery and Conductor field values."""
field: str
celery_value: Any
conductor_value: Any
@dataclass
class ComparisonResult:
"""Result of comparing Celery and Conductor outputs."""
match: bool
differences: list[FieldDifference]
celery_status: str
conductor_status: str
error: str | None = None
async def compare_content_results(
recording_id: str, workflow_id: str
) -> ComparisonResult:
"""
Compare content results from Celery and Conductor pipelines.
Args:
recording_id: Recording ID to look up Celery transcript
workflow_id: Conductor workflow ID to get workflow output
Returns:
ComparisonResult with match status and any differences
"""
try:
# Get Celery result from DB
celery_transcript = await transcripts_controller.get_by_recording_id(
recording_id
)
if not celery_transcript:
return ComparisonResult(
match=False,
differences=[],
celery_status="not_found",
conductor_status="unknown",
error=f"No transcript found for recording_id={recording_id}",
)
# Get Conductor workflow status
workflow_status = ConductorClientManager.get_workflow_status(workflow_id)
conductor_status = workflow_status.status if workflow_status else "unknown"
# If workflow not completed, can't compare
if conductor_status != "COMPLETED":
return ComparisonResult(
match=False,
differences=[],
celery_status=celery_transcript.status,
conductor_status=conductor_status,
error=f"Conductor workflow not completed: {conductor_status}",
)
# Extract output from workflow
workflow_output = (
workflow_status.output if hasattr(workflow_status, "output") else {}
)
differences = _compare_fields(celery_transcript, workflow_output)
result = ComparisonResult(
match=len(differences) == 0,
differences=differences,
celery_status=celery_transcript.status,
conductor_status=conductor_status,
)
# Log comparison result
if result.match:
logger.info(
"Shadow mode comparison: MATCH",
recording_id=recording_id,
workflow_id=workflow_id,
)
else:
logger.warning(
"Shadow mode comparison: MISMATCH",
recording_id=recording_id,
workflow_id=workflow_id,
differences=[
{
"field": d.field,
"celery": d.celery_value,
"conductor": d.conductor_value,
}
for d in differences
],
)
return result
except Exception as e:
logger.error(
"Shadow mode comparison failed",
recording_id=recording_id,
workflow_id=workflow_id,
error=str(e),
exc_info=True,
)
return ComparisonResult(
match=False,
differences=[],
celery_status="unknown",
conductor_status="unknown",
error=str(e),
)
def _compare_fields(
celery_transcript: Transcript, workflow_output: dict
) -> list[FieldDifference]:
"""Compare specific content fields between Celery and Conductor."""
differences = []
# Compare title
conductor_title = workflow_output.get("title")
if celery_transcript.title != conductor_title:
differences.append(
FieldDifference(
field="title",
celery_value=celery_transcript.title,
conductor_value=conductor_title,
)
)
# Compare short_summary
conductor_short_summary = workflow_output.get("short_summary")
if celery_transcript.short_summary != conductor_short_summary:
differences.append(
FieldDifference(
field="short_summary",
celery_value=celery_transcript.short_summary,
conductor_value=conductor_short_summary,
)
)
# Compare long_summary
conductor_long_summary = workflow_output.get("summary")
if celery_transcript.long_summary != conductor_long_summary:
differences.append(
FieldDifference(
field="long_summary",
celery_value=celery_transcript.long_summary,
conductor_value=conductor_long_summary,
)
)
# Compare topic count
celery_topics = celery_transcript.topics or []
conductor_topics = workflow_output.get("topics", [])
if len(celery_topics) != len(conductor_topics):
differences.append(
FieldDifference(
field="topic_count",
celery_value=len(celery_topics),
conductor_value=len(conductor_topics),
)
)
# Compare word count from events
celery_events = celery_transcript.events or {}
celery_words = (
celery_events.get("words", []) if isinstance(celery_events, dict) else []
)
conductor_words = workflow_output.get("all_words", [])
if len(celery_words) != len(conductor_words):
differences.append(
FieldDifference(
field="word_count",
celery_value=len(celery_words),
conductor_value=len(conductor_words),
)
)
# Compare duration
conductor_duration = workflow_output.get("duration")
if (
conductor_duration is not None
and celery_transcript.duration != conductor_duration
):
differences.append(
FieldDifference(
field="duration",
celery_value=celery_transcript.duration,
conductor_value=conductor_duration,
)
)
return differences

View File

@@ -0,0 +1,6 @@
"""Conductor task definitions module."""
from reflector.conductor.tasks.definitions import TASK_DEFINITIONS
from reflector.conductor.tasks.register import register_task_definitions
__all__ = ["TASK_DEFINITIONS", "register_task_definitions"]

View File

@@ -0,0 +1,161 @@
"""Task definitions for Conductor workflow orchestration.
Timeout reference (from CONDUCTOR_MIGRATION_REQUIREMENTS.md):
| Task | Timeout (s) | Response Timeout (s) | Retry Count |
|-------------------|-------------|----------------------|-------------|
| get_recording | 60 | 30 | 3 |
| get_participants | 60 | 30 | 3 |
| pad_track | 300 | 120 | 3 |
| mixdown_tracks | 600 | 300 | 3 |
| generate_waveform | 120 | 60 | 3 |
| transcribe_track | 1800 | 900 | 3 |
| merge_transcripts | 60 | 30 | 3 |
| detect_topics | 300 | 120 | 3 |
| generate_title | 60 | 30 | 3 |
| generate_summary | 300 | 120 | 3 |
| finalize | 60 | 30 | 3 |
| cleanup_consent | 60 | 30 | 3 |
| post_zulip | 60 | 30 | 5 |
| send_webhook | 60 | 30 | 30 |
"""
OWNER_EMAIL = "reflector@example.com"
TASK_DEFINITIONS = [
{
"name": "get_recording",
"retryCount": 3,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["recording_id"],
"outputKeys": ["id", "mtg_session_id", "room_name", "duration"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "get_participants",
"retryCount": 3,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["mtg_session_id"],
"outputKeys": ["participants"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "pad_track",
"retryCount": 3,
"timeoutSeconds": 300,
"responseTimeoutSeconds": 120,
"inputKeys": ["track_index", "s3_key", "bucket_name", "transcript_id"],
"outputKeys": ["padded_url", "size", "track_index"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "mixdown_tracks",
"retryCount": 3,
"timeoutSeconds": 600,
"responseTimeoutSeconds": 300,
"inputKeys": ["padded_urls", "transcript_id"],
"outputKeys": ["audio_key", "duration", "size"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "generate_waveform",
"retryCount": 3,
"timeoutSeconds": 120,
"responseTimeoutSeconds": 60,
"inputKeys": ["audio_key", "transcript_id"],
"outputKeys": ["waveform"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "transcribe_track",
"retryCount": 3,
"timeoutSeconds": 1800,
"responseTimeoutSeconds": 900,
"inputKeys": ["track_index", "audio_url", "language"],
"outputKeys": ["words", "track_index"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "merge_transcripts",
"retryCount": 3,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["transcripts", "transcript_id"],
"outputKeys": ["all_words", "word_count"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "detect_topics",
"retryCount": 3,
"timeoutSeconds": 300,
"responseTimeoutSeconds": 120,
"inputKeys": ["words", "transcript_id", "target_language"],
"outputKeys": ["topics"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "generate_title",
"retryCount": 3,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["topics", "transcript_id"],
"outputKeys": ["title"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "generate_summary",
"retryCount": 3,
"timeoutSeconds": 300,
"responseTimeoutSeconds": 120,
"inputKeys": ["words", "topics", "transcript_id"],
"outputKeys": ["summary", "short_summary"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "finalize",
"retryCount": 3,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["transcript_id", "title", "summary", "short_summary", "duration"],
"outputKeys": ["status"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "cleanup_consent",
"retryCount": 3,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["transcript_id"],
"outputKeys": ["audio_deleted", "reason"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "post_zulip",
"retryCount": 5,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["transcript_id"],
"outputKeys": ["message_id"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "send_webhook",
"retryCount": 30,
"timeoutSeconds": 60,
"responseTimeoutSeconds": 30,
"inputKeys": ["transcript_id", "room_id"],
"outputKeys": ["sent", "status_code"],
"ownerEmail": OWNER_EMAIL,
},
{
"name": "generate_dynamic_fork_tasks",
"retryCount": 3,
"timeoutSeconds": 30,
"responseTimeoutSeconds": 15,
"inputKeys": ["tracks", "task_type", "transcript_id", "bucket_name"],
"outputKeys": ["tasks", "inputs"],
"ownerEmail": OWNER_EMAIL,
"description": "Helper task to generate dynamic fork structure for variable track counts",
},
]

View File

@@ -0,0 +1,60 @@
"""Register task definitions with Conductor server."""
import httpx
from reflector.conductor.tasks.definitions import TASK_DEFINITIONS
from reflector.logger import logger
from reflector.settings import settings
def register_task_definitions() -> None:
"""Register all task definitions with Conductor server.
Raises:
httpx.HTTPStatusError: If registration fails.
"""
base_url = settings.CONDUCTOR_SERVER_URL.rstrip("/")
url = f"{base_url}/metadata/taskdefs"
logger.info(
"Registering task definitions",
count=len(TASK_DEFINITIONS),
url=url,
)
with httpx.Client(timeout=30.0) as client:
resp = client.post(
url,
json=TASK_DEFINITIONS,
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
logger.info("Task definitions registered successfully")
async def register_task_definitions_async() -> None:
"""Async version of register_task_definitions."""
base_url = settings.CONDUCTOR_SERVER_URL.rstrip("/")
url = f"{base_url}/metadata/taskdefs"
logger.info(
"Registering task definitions",
count=len(TASK_DEFINITIONS),
url=url,
)
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
url,
json=TASK_DEFINITIONS,
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
logger.info("Task definitions registered successfully")
if __name__ == "__main__":
register_task_definitions()
print(f"Registered {len(TASK_DEFINITIONS)} task definitions")

View File

@@ -0,0 +1,37 @@
"""Conductor workers for the diarization pipeline."""
from reflector.conductor.workers.cleanup_consent import cleanup_consent
from reflector.conductor.workers.detect_topics import detect_topics
from reflector.conductor.workers.finalize import finalize
from reflector.conductor.workers.generate_dynamic_fork_tasks import (
generate_dynamic_fork_tasks,
)
from reflector.conductor.workers.generate_summary import generate_summary
from reflector.conductor.workers.generate_title import generate_title
from reflector.conductor.workers.generate_waveform import generate_waveform
from reflector.conductor.workers.get_participants import get_participants
from reflector.conductor.workers.get_recording import get_recording
from reflector.conductor.workers.merge_transcripts import merge_transcripts
from reflector.conductor.workers.mixdown_tracks import mixdown_tracks
from reflector.conductor.workers.pad_track import pad_track
from reflector.conductor.workers.post_zulip import post_zulip
from reflector.conductor.workers.send_webhook import send_webhook
from reflector.conductor.workers.transcribe_track import transcribe_track
__all__ = [
"get_recording",
"get_participants",
"pad_track",
"mixdown_tracks",
"generate_waveform",
"transcribe_track",
"merge_transcripts",
"detect_topics",
"generate_title",
"generate_summary",
"finalize",
"cleanup_consent",
"post_zulip",
"send_webhook",
"generate_dynamic_fork_tasks",
]

View File

@@ -0,0 +1,126 @@
"""Conductor worker: cleanup_consent - Check consent and delete audio if denied."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="cleanup_consent")
def cleanup_consent(task: Task) -> TaskResult:
"""Check participant consent and delete audio if denied.
Input:
transcript_id: str - Transcript ID
Output:
audio_deleted: bool - Whether audio was deleted
reason: str | None - Reason for deletion
"""
transcript_id = task.input_data.get("transcript_id")
logger.info("[Worker] cleanup_consent", transcript_id=transcript_id)
if transcript_id:
emit_progress(
transcript_id, "cleanup_consent", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing transcript_id"
return task_result
import asyncio
async def _process():
import databases
from reflector.db import _database_context
from reflector.db.transcripts import transcripts_controller
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
# Create fresh database connection for subprocess (not shared from parent)
_database_context.set(None)
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
try:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript is None:
raise ValueError(f"Transcript {transcript_id} not found in database")
# Check if any participant denied consent
# This mirrors the logic from main_live_pipeline.task_cleanup_consent
audio_deleted = False
reason = None
if transcript.participants:
for p in transcript.participants:
if hasattr(p, "consent") and p.consent == "denied":
audio_deleted = True
reason = f"Participant {p.name or p.id} denied consent"
break
if audio_deleted:
storage = get_transcripts_storage()
audio_key = f"{transcript_id}/audio.mp3"
try:
await storage.delete_file(audio_key)
await transcripts_controller.update(
transcript, {"audio_deleted": True}
)
logger.info(
"[Worker] cleanup_consent: audio deleted",
transcript_id=transcript_id,
reason=reason,
)
except Exception as e:
logger.warning(
"[Worker] cleanup_consent: failed to delete audio",
error=str(e),
)
return audio_deleted, reason
finally:
await db.disconnect()
_database_context.set(None)
try:
audio_deleted, reason = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"audio_deleted": audio_deleted,
"reason": reason,
}
logger.info(
"[Worker] cleanup_consent complete",
transcript_id=transcript_id,
audio_deleted=audio_deleted,
)
if transcript_id:
emit_progress(
transcript_id, "cleanup_consent", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] cleanup_consent failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "cleanup_consent", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,93 @@
"""Conductor worker: detect_topics - Detect topics using LLM."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="detect_topics")
def detect_topics(task: Task) -> TaskResult:
"""Detect topics using LLM.
Input:
words: list[dict] - Transcribed words
transcript_id: str - Transcript ID
target_language: str - Target language code (default: "en")
Output:
topics: list[dict] - Detected topics
"""
words = task.input_data.get("words", [])
transcript_id = task.input_data.get("transcript_id")
target_language = task.input_data.get("target_language", "en")
logger.info(
"[Worker] detect_topics",
word_count=len(words),
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "detect_topics", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
import asyncio
async def _process():
from reflector.pipelines import topic_processing
from reflector.processors.types import Transcript as TranscriptType
from reflector.processors.types import Word
# Convert word dicts to Word objects
word_objects = [Word(**w) for w in words]
transcript = TranscriptType(words=word_objects)
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
async def noop_callback(t):
pass
topics = await topic_processing.detect_topics(
transcript,
target_language,
on_topic_callback=noop_callback,
empty_pipeline=empty_pipeline,
)
return [t.model_dump() for t in topics]
try:
topics = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"topics": topics}
logger.info(
"[Worker] detect_topics complete",
transcript_id=transcript_id,
topic_count=len(topics),
)
if transcript_id:
emit_progress(
transcript_id, "detect_topics", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] detect_topics failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "detect_topics", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,111 @@
"""Conductor worker: finalize - Finalize transcript status and update database."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="finalize")
def finalize(task: Task) -> TaskResult:
"""Finalize the transcript status and update the database.
Input:
transcript_id: str - Transcript ID
title: str - Generated title
summary: str - Long summary
short_summary: str - Short summary
duration: float - Audio duration
Output:
status: str - "COMPLETED"
"""
transcript_id = task.input_data.get("transcript_id")
title = task.input_data.get("title", "")
summary = task.input_data.get("summary", "")
short_summary = task.input_data.get("short_summary", "")
duration = task.input_data.get("duration", 0)
logger.info(
"[Worker] finalize",
transcript_id=transcript_id,
title=title,
)
if transcript_id:
emit_progress(
transcript_id, "finalize", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing transcript_id"
return task_result
import asyncio
async def _process():
import databases
from reflector.db import _database_context
from reflector.db.transcripts import transcripts_controller
from reflector.settings import settings
# Create fresh database connection for subprocess (not shared from parent)
_database_context.set(None)
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
try:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript is None:
raise ValueError(f"Transcript {transcript_id} not found in database")
await transcripts_controller.update(
transcript,
{
"status": "ended",
"title": title,
"long_summary": summary,
"short_summary": short_summary,
"duration": duration,
},
)
return True
finally:
await db.disconnect()
_database_context.set(None)
try:
asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"status": "COMPLETED"}
logger.info(
"[Worker] finalize complete",
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "finalize", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] finalize failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "finalize", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,110 @@
"""Conductor worker: generate_dynamic_fork_tasks - Helper for FORK_JOIN_DYNAMIC."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.logger import logger
@worker_task(task_definition_name="generate_dynamic_fork_tasks")
def generate_dynamic_fork_tasks(task: Task) -> TaskResult:
"""Generate dynamic fork task structure for variable track counts.
This helper task generates the task definitions and inputs needed for
FORK_JOIN_DYNAMIC to process N tracks in parallel.
Input:
tracks: list[dict] - List of track info with s3_key
task_type: str - Either "pad_track" or "transcribe_track"
transcript_id: str - Transcript ID
bucket_name: str - S3 bucket name (for pad_track)
padded_urls: list[dict] - Padded track outputs (for transcribe_track)
Output:
tasks: list[dict] - Task definitions for dynamic fork
inputs: dict - Input parameters keyed by task reference name
"""
tracks = task.input_data.get("tracks", [])
task_type = task.input_data.get("task_type")
transcript_id = task.input_data.get("transcript_id")
bucket_name = task.input_data.get("bucket_name")
padded_urls = task.input_data.get("padded_urls", {})
logger.info(
"[Worker] generate_dynamic_fork_tasks",
task_type=task_type,
track_count=len(tracks),
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not tracks or not task_type:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing tracks or task_type"
return task_result
try:
tasks = []
inputs = {}
for idx, track in enumerate(tracks):
ref_name = f"{task_type}_{idx}"
# Task definition
tasks.append(
{
"name": task_type,
"taskReferenceName": ref_name,
"type": "SIMPLE",
}
)
# Task input based on type
if task_type == "pad_track":
inputs[ref_name] = {
"track_index": idx,
"s3_key": track.get("s3_key"),
"bucket_name": bucket_name,
"transcript_id": transcript_id,
}
elif task_type == "transcribe_track":
# Get padded URL from previous fork join output
padded_url = None
if isinstance(padded_urls, dict):
# Try to get from join output structure
pad_ref = f"pad_track_{idx}"
if pad_ref in padded_urls:
padded_url = padded_urls[pad_ref].get("padded_url")
elif "padded_url" in padded_urls:
# Single track case
padded_url = padded_urls.get("padded_url")
inputs[ref_name] = {
"track_index": idx,
"audio_url": padded_url,
"language": "en",
"transcript_id": transcript_id,
}
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"tasks": tasks,
"inputs": inputs,
}
logger.info(
"[Worker] generate_dynamic_fork_tasks complete",
task_type=task_type,
task_count=len(tasks),
)
except Exception as e:
logger.error("[Worker] generate_dynamic_fork_tasks failed", error=str(e))
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
return task_result

View File

@@ -0,0 +1,150 @@
"""Conductor worker: generate_summary - Generate meeting summaries using LLM."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="generate_summary")
def generate_summary(task: Task) -> TaskResult:
"""Generate long and short summaries from topics and words using LLM.
Input:
words: list[dict] - Transcribed words
topics: list[dict] - Detected topics
transcript_id: str - Transcript ID
Output:
summary: str - Long summary
short_summary: str - Short summary
"""
words = task.input_data.get("words", [])
topics = task.input_data.get("topics", [])
transcript_id = task.input_data.get("transcript_id")
logger.info(
"[Worker] generate_summary",
word_count=len(words),
topic_count=len(topics),
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "generate_summary", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
import asyncio
async def _process():
import databases
from reflector.db import _database_context
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines import topic_processing
from reflector.processors.types import TitleSummary, Word
from reflector.processors.types import Transcript as TranscriptType
from reflector.settings import settings
# Create fresh database connection for subprocess (not shared from parent)
# Reset context var to ensure we get a fresh connection
_database_context.set(None)
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
try:
# detect_topics returns TitleSummary objects (with transcript: Transcript)
# When serialized, transcript becomes {translation, words} dict
# We need to reconstruct TitleSummary objects with proper Transcript
def normalize_topic(t):
topic = dict(t)
transcript_data = topic.get("transcript")
if isinstance(transcript_data, dict):
# Reconstruct Transcript object from serialized dict
words_list = transcript_data.get("words", [])
word_objects = [
Word(**w) if isinstance(w, dict) else w for w in words_list
]
topic["transcript"] = TranscriptType(
words=word_objects,
translation=transcript_data.get("translation"),
)
elif transcript_data is None:
topic["transcript"] = TranscriptType(words=[])
return topic
topic_objects = [TitleSummary(**normalize_topic(t)) for t in topics]
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
transcript = await transcripts_controller.get_by_id(transcript_id)
long_summary = ""
short_summary = ""
async def on_long(s):
nonlocal long_summary
# s is FinalLongSummary object
long_summary = s.long_summary if hasattr(s, "long_summary") else str(s)
async def on_short(s):
nonlocal short_summary
# s is FinalShortSummary object
short_summary = (
s.short_summary if hasattr(s, "short_summary") else str(s)
)
await topic_processing.generate_summaries(
topic_objects,
transcript,
on_long_summary_callback=on_long,
on_short_summary_callback=on_short,
empty_pipeline=empty_pipeline,
logger=logger,
)
return long_summary, short_summary
finally:
await db.disconnect()
_database_context.set(None)
try:
summary, short_summary = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"summary": summary,
"short_summary": short_summary,
}
logger.info(
"[Worker] generate_summary complete",
transcript_id=transcript_id,
summary_len=len(summary) if summary else 0,
)
if transcript_id:
emit_progress(
transcript_id,
"generate_summary",
"completed",
task.workflow_instance_id,
)
except Exception as e:
logger.error("[Worker] generate_summary failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "generate_summary", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,111 @@
"""Conductor worker: generate_title - Generate meeting title using LLM."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="generate_title")
def generate_title(task: Task) -> TaskResult:
"""Generate meeting title from detected topics using LLM.
Input:
topics: list[dict] - Detected topics
transcript_id: str - Transcript ID
Output:
title: str - Generated title
"""
topics = task.input_data.get("topics", [])
transcript_id = task.input_data.get("transcript_id")
logger.info(
"[Worker] generate_title",
topic_count=len(topics),
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "generate_title", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not topics:
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"title": "Untitled Meeting"}
return task_result
import asyncio
async def _process():
from reflector.pipelines import topic_processing
from reflector.processors.types import TitleSummary, Word
from reflector.processors.types import Transcript as TranscriptType
# detect_topics returns TitleSummary objects (with transcript: Transcript)
# When serialized, transcript becomes {translation, words} dict
# We need to reconstruct TitleSummary objects with proper Transcript
def normalize_topic(t):
topic = dict(t)
transcript_data = topic.get("transcript")
if isinstance(transcript_data, dict):
# Reconstruct Transcript object from serialized dict
words_list = transcript_data.get("words", [])
word_objects = [
Word(**w) if isinstance(w, dict) else w for w in words_list
]
topic["transcript"] = TranscriptType(
words=word_objects, translation=transcript_data.get("translation")
)
elif transcript_data is None:
topic["transcript"] = TranscriptType(words=[])
return topic
topic_objects = [TitleSummary(**normalize_topic(t)) for t in topics]
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
async def noop_callback(t):
pass
title = await topic_processing.generate_title(
topic_objects,
on_title_callback=noop_callback,
empty_pipeline=empty_pipeline,
logger=logger,
)
return title
try:
title = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"title": title}
logger.info(
"[Worker] generate_title complete",
transcript_id=transcript_id,
title=title,
)
if transcript_id:
emit_progress(
transcript_id, "generate_title", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] generate_title failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "generate_title", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,106 @@
"""Conductor worker: generate_waveform - Generate waveform visualization data."""
import tempfile
from pathlib import Path
import httpx
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
from reflector.storage import get_transcripts_storage
from reflector.utils.audio_waveform import get_audio_waveform
PRESIGNED_URL_EXPIRATION_SECONDS = 7200
@worker_task(task_definition_name="generate_waveform")
def generate_waveform(task: Task) -> TaskResult:
"""Generate waveform visualization data from mixed audio.
Input:
audio_key: str - S3 key of the audio file
transcript_id: str - Transcript ID
Output:
waveform: list[float] - Waveform peaks array
"""
audio_key = task.input_data.get("audio_key")
transcript_id = task.input_data.get("transcript_id")
logger.info(
"[Worker] generate_waveform", audio_key=audio_key, transcript_id=transcript_id
)
if transcript_id:
emit_progress(
transcript_id, "generate_waveform", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not audio_key or not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing audio_key or transcript_id"
return task_result
import asyncio
async def _process():
storage = get_transcripts_storage()
audio_url = await storage.get_file_url(
audio_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
# Download audio to temp file
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as tmp:
tmp_path = Path(tmp.name)
async with httpx.AsyncClient() as client:
resp = await client.get(audio_url)
resp.raise_for_status()
tmp.write(resp.content)
try:
waveform = get_audio_waveform(tmp_path, segments_count=255)
finally:
tmp_path.unlink(missing_ok=True)
return waveform
try:
waveform = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"waveform": waveform}
logger.info(
"[Worker] generate_waveform complete",
transcript_id=transcript_id,
peaks_count=len(waveform) if waveform else 0,
)
if transcript_id:
emit_progress(
transcript_id,
"generate_waveform",
"completed",
task.workflow_instance_id,
)
except Exception as e:
logger.error("[Worker] generate_waveform failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "generate_waveform", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,96 @@
"""Conductor worker: get_participants - Fetch meeting participants from Daily.co API."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.dailyco_api.client import DailyApiClient
from reflector.logger import logger
from reflector.settings import settings
@worker_task(task_definition_name="get_participants")
def get_participants(task: Task) -> TaskResult:
"""Fetch meeting participants from Daily.co API.
Input:
mtg_session_id: str - Daily.co meeting session identifier
transcript_id: str - Transcript ID for progress tracking
Output:
participants: list[dict] - List of participant info
- participant_id: str
- user_name: str | None
- user_id: str | None
"""
mtg_session_id = task.input_data.get("mtg_session_id")
transcript_id = task.input_data.get("transcript_id")
logger.info("[Worker] get_participants", mtg_session_id=mtg_session_id)
if transcript_id:
emit_progress(
transcript_id, "get_participants", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not mtg_session_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing mtg_session_id"
return task_result
if not settings.DAILY_API_KEY:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "DAILY_API_KEY not configured"
return task_result
import asyncio
async def _fetch():
async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client:
return await client.get_meeting_participants(mtg_session_id)
try:
response = asyncio.run(_fetch())
participants = [
{
"participant_id": p.participant_id,
"user_name": p.user_name,
"user_id": p.user_id,
}
for p in response.data
]
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"participants": participants}
logger.info(
"[Worker] get_participants complete",
mtg_session_id=mtg_session_id,
count=len(participants),
)
if transcript_id:
emit_progress(
transcript_id,
"get_participants",
"completed",
task.workflow_instance_id,
)
except Exception as e:
logger.error("[Worker] get_participants failed", error=str(e))
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "get_participants", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,90 @@
"""Conductor worker: get_recording - Fetch recording metadata from Daily.co API."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.dailyco_api.client import DailyApiClient
from reflector.logger import logger
from reflector.settings import settings
@worker_task(task_definition_name="get_recording")
def get_recording(task: Task) -> TaskResult:
"""Fetch recording metadata from Daily.co API.
Input:
recording_id: str - Daily.co recording identifier
transcript_id: str - Transcript ID for progress tracking
Output:
id: str - Recording ID
mtg_session_id: str - Meeting session ID
room_name: str - Room name
duration: int - Recording duration in seconds
"""
recording_id = task.input_data.get("recording_id")
transcript_id = task.input_data.get("transcript_id")
logger.info("[Worker] get_recording", recording_id=recording_id)
if transcript_id:
emit_progress(
transcript_id, "get_recording", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not recording_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing recording_id"
return task_result
if not settings.DAILY_API_KEY:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "DAILY_API_KEY not configured"
return task_result
import asyncio
async def _fetch():
async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client:
return await client.get_recording(recording_id)
try:
recording = asyncio.run(_fetch())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"id": recording.id,
"mtg_session_id": recording.mtgSessionId,
"room_name": recording.room_name,
"duration": recording.duration,
}
logger.info(
"[Worker] get_recording complete",
recording_id=recording_id,
room_name=recording.room_name,
duration=recording.duration,
)
if transcript_id:
emit_progress(
transcript_id, "get_recording", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] get_recording failed", error=str(e))
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "get_recording", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,89 @@
"""Conductor worker: merge_transcripts - Merge multiple track transcriptions."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="merge_transcripts")
def merge_transcripts(task: Task) -> TaskResult:
"""Merge multiple track transcriptions into single timeline sorted by timestamp.
Input:
transcripts: list[dict] - List of transcription results with words
transcript_id: str - Transcript ID
Output:
all_words: list[dict] - Merged and sorted words
word_count: int - Total word count
"""
transcripts = task.input_data.get("transcripts", [])
transcript_id = task.input_data.get("transcript_id")
logger.info(
"[Worker] merge_transcripts",
transcript_count=len(transcripts)
if isinstance(transcripts, (list, dict))
else 0,
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "merge_transcripts", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
try:
all_words = []
# Handle JOIN output (dict with task refs as keys)
if isinstance(transcripts, dict):
transcripts = list(transcripts.values())
for t in transcripts:
if isinstance(t, list):
all_words.extend(t)
elif isinstance(t, dict) and "words" in t:
all_words.extend(t["words"])
# Sort by start timestamp
all_words.sort(key=lambda w: w.get("start", 0))
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"all_words": all_words,
"word_count": len(all_words),
}
logger.info(
"[Worker] merge_transcripts complete",
transcript_id=transcript_id,
word_count=len(all_words),
)
if transcript_id:
emit_progress(
transcript_id,
"merge_transcripts",
"completed",
task.workflow_instance_id,
)
except Exception as e:
logger.error("[Worker] merge_transcripts failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "merge_transcripts", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,278 @@
"""Conductor worker: mixdown_tracks - Mix multiple audio tracks into single file.
Builds PyAV filter graph with amix filter to combine N padded tracks into
a single stereo MP3 file.
"""
import tempfile
from fractions import Fraction
from pathlib import Path
import av
from av.audio.resampler import AudioResampler
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
from reflector.storage import get_transcripts_storage
PRESIGNED_URL_EXPIRATION_SECONDS = 7200
MP3_BITRATE = 192000
def _build_mixdown_filter_graph(containers: list, target_sample_rate: int):
"""Build PyAV filter graph: N abuffer -> amix -> aformat -> sink.
Args:
containers: List of PyAV containers for input tracks
target_sample_rate: Output sample rate
Returns:
Tuple of (graph, inputs list, sink)
"""
graph = av.filter.Graph()
inputs = []
for idx in range(len(containers)):
args = (
f"time_base=1/{target_sample_rate}:"
f"sample_rate={target_sample_rate}:"
f"sample_fmt=s32:"
f"channel_layout=stereo"
)
in_ctx = graph.add("abuffer", args=args, name=f"in{idx}")
inputs.append(in_ctx)
# amix with normalize=0 to prevent volume reduction
mixer = graph.add("amix", args=f"inputs={len(containers)}:normalize=0", name="mix")
fmt = graph.add(
"aformat",
args=f"sample_fmts=s16:channel_layouts=stereo:sample_rates={target_sample_rate}",
name="fmt",
)
sink = graph.add("abuffersink", name="out")
for idx, in_ctx in enumerate(inputs):
in_ctx.link_to(mixer, 0, idx)
mixer.link_to(fmt)
fmt.link_to(sink)
graph.configure()
return graph, inputs, sink
@worker_task(task_definition_name="mixdown_tracks")
def mixdown_tracks(task: Task) -> TaskResult:
"""Mix multiple audio tracks into single stereo file.
Input:
padded_urls: list[str] - Presigned URLs of padded tracks
transcript_id: str - Transcript ID for storage path
Output:
audio_key: str - S3 key of mixed audio file
duration: float - Audio duration in seconds
size: int - File size in bytes
"""
padded_urls = task.input_data.get("padded_urls", [])
transcript_id = task.input_data.get("transcript_id")
logger.info(
"[Worker] mixdown_tracks",
track_count=len(padded_urls),
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "mixdown_tracks", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not padded_urls or not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing padded_urls or transcript_id"
return task_result
import asyncio
async def _process():
storage = get_transcripts_storage()
# Determine target sample rate from first track
target_sample_rate = None
for url in padded_urls:
if not url:
continue
try:
with av.open(url) as container:
for frame in container.decode(audio=0):
target_sample_rate = frame.sample_rate
break
except Exception:
continue
if target_sample_rate:
break
if not target_sample_rate:
raise Exception("Mixdown failed: No decodable audio frames in any track")
# Open all containers with reconnect options for S3 streaming
containers = []
valid_urls = [url for url in padded_urls if url]
for url in valid_urls:
try:
c = av.open(
url,
options={
"reconnect": "1",
"reconnect_streamed": "1",
"reconnect_delay_max": "5",
},
)
containers.append(c)
except Exception as e:
logger.warning(
"Mixdown: failed to open container", url=url[:50], error=str(e)
)
if not containers:
raise Exception("Mixdown failed: Could not open any track containers")
try:
# Build filter graph
graph, inputs, sink = _build_mixdown_filter_graph(
containers, target_sample_rate
)
# Create temp file for output
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
temp_path = temp_file.name
try:
# Open output container for MP3
with av.open(temp_path, "w", format="mp3") as out_container:
out_stream = out_container.add_stream(
"libmp3lame", rate=target_sample_rate
)
out_stream.bit_rate = MP3_BITRATE
decoders = [c.decode(audio=0) for c in containers]
active = [True] * len(decoders)
resamplers = [
AudioResampler(
format="s32", layout="stereo", rate=target_sample_rate
)
for _ in decoders
]
duration_samples = 0
while any(active):
for i, (dec, is_active) in enumerate(zip(decoders, active)):
if not is_active:
continue
try:
frame = next(dec)
except StopIteration:
active[i] = False
inputs[i].push(None)
continue
if frame.sample_rate != target_sample_rate:
continue
out_frames = resamplers[i].resample(frame) or []
for rf in out_frames:
rf.sample_rate = target_sample_rate
rf.time_base = Fraction(1, target_sample_rate)
inputs[i].push(rf)
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
duration_samples += mixed.samples
for packet in out_stream.encode(mixed):
out_container.mux(packet)
# Flush remaining
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
duration_samples += mixed.samples
for packet in out_stream.encode(mixed):
out_container.mux(packet)
for packet in out_stream.encode(None):
out_container.mux(packet)
# Get file size and duration
file_size = Path(temp_path).stat().st_size
duration = (
duration_samples / target_sample_rate if target_sample_rate else 0
)
# Upload to S3
storage_path = f"{transcript_id}/audio.mp3"
with open(temp_path, "rb") as mp3_file:
await storage.put_file(storage_path, mp3_file)
finally:
Path(temp_path).unlink(missing_ok=True)
finally:
for c in containers:
try:
c.close()
except Exception:
pass
return {
"audio_key": storage_path,
"duration": duration,
"size": file_size,
}
try:
result = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = result
logger.info(
"[Worker] mixdown_tracks complete",
audio_key=result["audio_key"],
duration=result["duration"],
size=result["size"],
)
if transcript_id:
emit_progress(
transcript_id, "mixdown_tracks", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] mixdown_tracks failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "mixdown_tracks", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,322 @@
"""Conductor worker: pad_track - Pad audio track with silence for alignment.
This worker extracts stream.start_time from WebM container metadata and applies
silence padding using PyAV filter graph (adelay). The padded audio is uploaded
to S3 and a presigned URL is returned.
"""
import math
import tempfile
from fractions import Fraction
from pathlib import Path
import av
from av.audio.resampler import AudioResampler
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
# Audio constants matching existing pipeline
OPUS_STANDARD_SAMPLE_RATE = 48000
OPUS_DEFAULT_BIT_RATE = 64000
PRESIGNED_URL_EXPIRATION_SECONDS = 7200
def _extract_stream_start_time_from_container(container, track_idx: int) -> float:
"""Extract meeting-relative start time from WebM stream metadata.
Uses PyAV to read stream.start_time from WebM container.
More accurate than filename timestamps by ~209ms due to network/encoding delays.
Args:
container: PyAV container object
track_idx: Track index for logging
Returns:
Start time in seconds (0.0 if not found)
"""
start_time_seconds = 0.0
try:
audio_streams = [s for s in container.streams if s.type == "audio"]
stream = audio_streams[0] if audio_streams else container.streams[0]
# 1) Try stream-level start_time (most reliable for Daily.co tracks)
if stream.start_time is not None and stream.time_base is not None:
start_time_seconds = float(stream.start_time * stream.time_base)
# 2) Fallback to container-level start_time (in av.time_base units)
if (start_time_seconds <= 0) and (container.start_time is not None):
start_time_seconds = float(container.start_time * av.time_base)
# 3) Fallback to first packet DTS in stream.time_base
if start_time_seconds <= 0:
for packet in container.demux(stream):
if packet.dts is not None:
start_time_seconds = float(packet.dts * stream.time_base)
break
except Exception as e:
logger.warning(
"PyAV metadata read failed; assuming 0 start_time",
track_idx=track_idx,
error=str(e),
)
start_time_seconds = 0.0
logger.info(
f"Track {track_idx} stream metadata: start_time={start_time_seconds:.3f}s",
track_idx=track_idx,
)
return start_time_seconds
def _apply_audio_padding_to_file(
in_container,
output_path: str,
start_time_seconds: float,
track_idx: int,
) -> None:
"""Apply silence padding to audio track using PyAV filter graph.
Filter chain: abuffer -> aresample -> adelay -> abuffersink
Args:
in_container: PyAV input container
output_path: Path to write padded output
start_time_seconds: Amount of silence to prepend
track_idx: Track index for logging
"""
delay_ms = math.floor(start_time_seconds * 1000)
logger.info(
f"Padding track {track_idx} with {delay_ms}ms delay using PyAV",
track_idx=track_idx,
delay_ms=delay_ms,
)
with av.open(output_path, "w", format="webm") as out_container:
in_stream = next((s for s in in_container.streams if s.type == "audio"), None)
if in_stream is None:
raise Exception("No audio stream in input")
out_stream = out_container.add_stream("libopus", rate=OPUS_STANDARD_SAMPLE_RATE)
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
graph = av.filter.Graph()
abuf_args = (
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_fmt=s16:"
f"channel_layout=stereo"
)
src = graph.add("abuffer", args=abuf_args, name="src")
aresample_f = graph.add("aresample", args="async=1", name="ares")
# adelay requires one delay value per channel separated by '|'
delays_arg = f"{delay_ms}|{delay_ms}"
adelay_f = graph.add("adelay", args=f"delays={delays_arg}:all=1", name="delay")
sink = graph.add("abuffersink", name="sink")
src.link_to(aresample_f)
aresample_f.link_to(adelay_f)
adelay_f.link_to(sink)
graph.configure()
resampler = AudioResampler(
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
)
# Decode -> resample -> push through graph -> encode Opus
for frame in in_container.decode(in_stream):
out_frames = resampler.resample(frame) or []
for rframe in out_frames:
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
src.push(rframe)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush remaining frames
src.push(None)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
for packet in out_stream.encode(None):
out_container.mux(packet)
@worker_task(task_definition_name="pad_track")
def pad_track(task: Task) -> TaskResult:
"""Pad audio track with silence for alignment.
Input:
track_index: int - Index of the track
s3_key: str - S3 key of the source audio file
bucket_name: str - S3 bucket name
transcript_id: str - Transcript ID for storage path
Output:
padded_url: str - Presigned URL of padded track
size: int - File size in bytes
track_index: int - Track index (echoed back)
"""
track_index = task.input_data.get("track_index", 0)
s3_key = task.input_data.get("s3_key")
bucket_name = task.input_data.get("bucket_name")
transcript_id = task.input_data.get("transcript_id")
logger.info(
"[Worker] pad_track",
track_index=track_index,
s3_key=s3_key,
transcript_id=transcript_id,
)
if transcript_id:
emit_progress(
transcript_id, "pad_track", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not s3_key or not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing s3_key or transcript_id"
return task_result
import asyncio
async def _process():
# Create fresh storage instance to avoid aioboto3 fork issues
from reflector.settings import settings
from reflector.storage.storage_aws import AwsStorage
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
# Get presigned URL for source file
source_url = await storage.get_file_url(
s3_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=bucket_name,
)
# Open container and extract start time
with av.open(source_url) as in_container:
start_time_seconds = _extract_stream_start_time_from_container(
in_container, track_index
)
# If no padding needed, return original URL
if start_time_seconds <= 0:
logger.info(
f"Track {track_index} requires no padding",
track_index=track_index,
)
return {
"padded_url": source_url,
"size": 0,
"track_index": track_index,
}
# Create temp file for padded output
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
temp_path = temp_file.name
try:
_apply_audio_padding_to_file(
in_container, temp_path, start_time_seconds, track_index
)
# Get file size
file_size = Path(temp_path).stat().st_size
# Upload using storage layer (use separate path in shadow mode to avoid conflicts)
storage_path = f"file_pipeline_conductor/{transcript_id}/tracks/padded_{track_index}.webm"
logger.info(
f"About to upload padded track",
key=storage_path,
size=file_size,
)
with open(temp_path, "rb") as padded_file:
upload_result = await storage.put_file(storage_path, padded_file)
logger.info(
f"storage.put_file returned",
result=str(upload_result),
)
logger.info(
f"Uploaded padded track to S3",
key=storage_path,
size=file_size,
)
finally:
Path(temp_path).unlink(missing_ok=True)
# Get presigned URL for padded file
padded_url = await storage.get_file_url(
storage_path,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
return {
"padded_url": padded_url,
"size": file_size,
"track_index": track_index,
}
try:
result = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = result
logger.info(
"[Worker] pad_track complete",
track_index=track_index,
padded_url=result["padded_url"][:50] + "...",
)
if transcript_id:
emit_progress(
transcript_id, "pad_track", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] pad_track failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "pad_track", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,101 @@
"""Conductor worker: post_zulip - Post or update Zulip message with transcript summary."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
from reflector.settings import settings
@worker_task(task_definition_name="post_zulip")
def post_zulip(task: Task) -> TaskResult:
"""Post or update a Zulip message with the transcript summary.
Input:
transcript_id: str - Transcript ID
Output:
message_id: str | None - Zulip message ID
"""
transcript_id = task.input_data.get("transcript_id")
logger.info("[Worker] post_zulip", transcript_id=transcript_id)
if transcript_id:
emit_progress(
transcript_id, "post_zulip", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing transcript_id"
return task_result
# Check if Zulip is configured
if not settings.ZULIP_REALM or not settings.ZULIP_API_KEY:
logger.info("[Worker] post_zulip: Zulip not configured, skipping")
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {"message_id": None}
return task_result
import asyncio
async def _process():
import databases
from reflector.db import _database_context
from reflector.db.transcripts import transcripts_controller
from reflector.settings import settings as app_settings
from reflector.zulip import post_transcript_to_zulip
# Create fresh database connection for subprocess (not shared from parent)
_database_context.set(None)
db = databases.Database(app_settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
try:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript is None:
raise ValueError(f"Transcript {transcript_id} not found in database")
message_id = await post_transcript_to_zulip(transcript)
return message_id
finally:
await db.disconnect()
_database_context.set(None)
try:
message_id = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"message_id": str(message_id) if message_id else None
}
logger.info(
"[Worker] post_zulip complete",
transcript_id=transcript_id,
message_id=message_id,
)
if transcript_id:
emit_progress(
transcript_id, "post_zulip", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] post_zulip failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "post_zulip", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,115 @@
"""Conductor worker: send_webhook - Send transcript completion webhook."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="send_webhook")
def send_webhook(task: Task) -> TaskResult:
"""Send the transcript completion webhook to the configured URL.
Input:
transcript_id: str - Transcript ID
room_id: str - Room ID
Output:
sent: bool - Whether webhook was sent
status_code: int | None - HTTP status code
"""
transcript_id = task.input_data.get("transcript_id")
room_id = task.input_data.get("room_id")
logger.info("[Worker] send_webhook", transcript_id=transcript_id, room_id=room_id)
if transcript_id:
emit_progress(
transcript_id, "send_webhook", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not transcript_id:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing transcript_id"
return task_result
import asyncio
async def _process():
import databases
from reflector.db import _database_context
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
from reflector.settings import settings
from reflector.worker.webhook import send_transcript_webhook
# Create fresh database connection for subprocess (not shared from parent)
_database_context.set(None)
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
try:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript is None:
raise ValueError(f"Transcript {transcript_id} not found in database")
# Get room for webhook URL
room = None
if room_id:
try:
room = await rooms_controller.get_by_id(room_id)
except Exception:
pass
if not room or not room.webhook_url:
logger.info(
"[Worker] send_webhook: No webhook URL configured",
transcript_id=transcript_id,
)
return False, None
status_code = await send_transcript_webhook(transcript, room)
return True, status_code
finally:
await db.disconnect()
_database_context.set(None)
try:
sent, status_code = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"sent": sent,
"status_code": status_code,
}
logger.info(
"[Worker] send_webhook complete",
transcript_id=transcript_id,
sent=sent,
status_code=status_code,
)
if transcript_id:
emit_progress(
transcript_id, "send_webhook", "completed", task.workflow_instance_id
)
except Exception as e:
logger.error("[Worker] send_webhook failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "send_webhook", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,96 @@
"""Conductor worker: transcribe_track - Transcribe audio track using GPU service."""
from conductor.client.http.models import Task, TaskResult
from conductor.client.http.models.task_result_status import TaskResultStatus
from conductor.client.worker.worker_task import worker_task
from reflector.conductor.progress import emit_progress
from reflector.logger import logger
@worker_task(task_definition_name="transcribe_track")
def transcribe_track(task: Task) -> TaskResult:
"""Transcribe audio track using GPU (Modal.com) or local Whisper.
Input:
track_index: int - Index of the track
audio_url: str - Presigned URL of the audio file
language: str - Language code (default: "en")
transcript_id: str - Transcript ID for progress tracking
Output:
words: list[dict] - List of transcribed words with timestamps and speaker
track_index: int - Track index (echoed back)
"""
track_index = task.input_data.get("track_index", 0)
audio_url = task.input_data.get("audio_url")
language = task.input_data.get("language", "en")
transcript_id = task.input_data.get("transcript_id")
logger.info("[Worker] transcribe_track", track_index=track_index, language=language)
if transcript_id:
emit_progress(
transcript_id, "transcribe_track", "in_progress", task.workflow_instance_id
)
task_result = TaskResult(
task_id=task.task_id,
workflow_instance_id=task.workflow_instance_id,
worker_id=task.worker_id,
)
if not audio_url:
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = "Missing audio_url"
return task_result
import asyncio
async def _process():
from reflector.pipelines.transcription_helpers import (
transcribe_file_with_processor,
)
transcript = await transcribe_file_with_processor(audio_url, language)
# Tag all words with speaker index
words = []
for word in transcript.words:
word_dict = word.model_dump()
word_dict["speaker"] = track_index
words.append(word_dict)
return words
try:
words = asyncio.run(_process())
task_result.status = TaskResultStatus.COMPLETED
task_result.output_data = {
"words": words,
"track_index": track_index,
}
logger.info(
"[Worker] transcribe_track complete",
track_index=track_index,
word_count=len(words),
)
if transcript_id:
emit_progress(
transcript_id,
"transcribe_track",
"completed",
task.workflow_instance_id,
)
except Exception as e:
logger.error("[Worker] transcribe_track failed", error=str(e), exc_info=True)
task_result.status = TaskResultStatus.FAILED
task_result.reason_for_incompletion = str(e)
if transcript_id:
emit_progress(
transcript_id, "transcribe_track", "failed", task.workflow_instance_id
)
return task_result

View File

@@ -0,0 +1,205 @@
{
"name": "diarization_pipeline",
"description": "Reflector multitrack diarization pipeline",
"version": 1,
"schemaVersion": 2,
"inputParameters": [
"recording_id",
"room_name",
"tracks",
"bucket_name",
"transcript_id",
"room_id"
],
"tasks": [
{
"name": "get_recording",
"taskReferenceName": "get_recording",
"type": "SIMPLE",
"inputParameters": {
"recording_id": "${workflow.input.recording_id}",
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "get_participants",
"taskReferenceName": "get_participants",
"type": "SIMPLE",
"inputParameters": {
"mtg_session_id": "${get_recording.output.mtg_session_id}",
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "generate_dynamic_fork_tasks",
"taskReferenceName": "generate_padding_tasks",
"type": "SIMPLE",
"inputParameters": {
"tracks": "${workflow.input.tracks}",
"task_type": "pad_track",
"transcript_id": "${workflow.input.transcript_id}",
"bucket_name": "${workflow.input.bucket_name}"
}
},
{
"name": "fork_track_padding",
"taskReferenceName": "fork_track_padding",
"type": "FORK_JOIN_DYNAMIC",
"inputParameters": {
"dynamicTasks": "${generate_padding_tasks.output.tasks}",
"dynamicTasksInput": "${generate_padding_tasks.output.inputs}"
},
"dynamicForkTasksParam": "dynamicTasks",
"dynamicForkTasksInputParamName": "dynamicTasksInput"
},
{
"name": "join_padding",
"taskReferenceName": "join_padding",
"type": "JOIN"
},
{
"name": "mixdown_tracks",
"taskReferenceName": "mixdown_tracks",
"type": "SIMPLE",
"inputParameters": {
"padded_urls": "${join_padding.output..padded_url}",
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "generate_waveform",
"taskReferenceName": "generate_waveform",
"type": "SIMPLE",
"inputParameters": {
"audio_key": "${mixdown_tracks.output.audio_key}",
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "generate_dynamic_fork_tasks",
"taskReferenceName": "generate_transcription_tasks",
"type": "SIMPLE",
"inputParameters": {
"tracks": "${workflow.input.tracks}",
"task_type": "transcribe_track",
"transcript_id": "${workflow.input.transcript_id}",
"padded_urls": "${join_padding.output}"
}
},
{
"name": "fork_transcription",
"taskReferenceName": "fork_transcription",
"type": "FORK_JOIN_DYNAMIC",
"inputParameters": {
"dynamicTasks": "${generate_transcription_tasks.output.tasks}",
"dynamicTasksInput": "${generate_transcription_tasks.output.inputs}"
},
"dynamicForkTasksParam": "dynamicTasks",
"dynamicForkTasksInputParamName": "dynamicTasksInput"
},
{
"name": "join_transcription",
"taskReferenceName": "join_transcription",
"type": "JOIN"
},
{
"name": "merge_transcripts",
"taskReferenceName": "merge_transcripts",
"type": "SIMPLE",
"inputParameters": {
"transcripts": "${join_transcription.output}",
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "detect_topics",
"taskReferenceName": "detect_topics",
"type": "SIMPLE",
"inputParameters": {
"words": "${merge_transcripts.output.all_words}",
"transcript_id": "${workflow.input.transcript_id}",
"target_language": "en"
}
},
{
"name": "fork_generation",
"taskReferenceName": "fork_generation",
"type": "FORK_JOIN",
"forkTasks": [
[
{
"name": "generate_title",
"taskReferenceName": "generate_title",
"type": "SIMPLE",
"inputParameters": {
"topics": "${detect_topics.output.topics}",
"transcript_id": "${workflow.input.transcript_id}"
}
}
],
[
{
"name": "generate_summary",
"taskReferenceName": "generate_summary",
"type": "SIMPLE",
"inputParameters": {
"words": "${merge_transcripts.output.all_words}",
"topics": "${detect_topics.output.topics}",
"transcript_id": "${workflow.input.transcript_id}"
}
}
]
]
},
{
"name": "join_generation",
"taskReferenceName": "join_generation",
"type": "JOIN",
"joinOn": ["generate_title", "generate_summary"]
},
{
"name": "finalize",
"taskReferenceName": "finalize",
"type": "SIMPLE",
"inputParameters": {
"transcript_id": "${workflow.input.transcript_id}",
"title": "${generate_title.output.title}",
"summary": "${generate_summary.output.summary}",
"short_summary": "${generate_summary.output.short_summary}",
"duration": "${mixdown_tracks.output.duration}"
}
},
{
"name": "cleanup_consent",
"taskReferenceName": "cleanup_consent",
"type": "SIMPLE",
"inputParameters": {
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "post_zulip",
"taskReferenceName": "post_zulip",
"type": "SIMPLE",
"inputParameters": {
"transcript_id": "${workflow.input.transcript_id}"
}
},
{
"name": "send_webhook",
"taskReferenceName": "send_webhook",
"type": "SIMPLE",
"inputParameters": {
"transcript_id": "${workflow.input.transcript_id}",
"room_id": "${workflow.input.room_id}"
}
}
],
"outputParameters": {
"transcript_id": "${workflow.input.transcript_id}",
"title": "${generate_title.output.title}",
"summary": "${generate_summary.output.summary}",
"duration": "${mixdown_tracks.output.duration}",
"word_count": "${merge_transcripts.output.word_count}"
}
}

View File

@@ -0,0 +1,74 @@
"""Register workflow definition with Conductor server."""
import json
from pathlib import Path
import httpx
from reflector.logger import logger
from reflector.settings import settings
def register_workflow() -> None:
"""Register the diarization pipeline workflow with Conductor server.
Raises:
httpx.HTTPStatusError: If registration fails.
"""
workflow_path = Path(__file__).parent / "diarization_pipeline.json"
with open(workflow_path) as f:
workflow = json.load(f)
base_url = settings.CONDUCTOR_SERVER_URL.rstrip("/")
url = f"{base_url}/metadata/workflow"
logger.info(
"Registering workflow",
name=workflow["name"],
version=workflow["version"],
url=url,
)
with httpx.Client(timeout=30.0) as client:
resp = client.put(
url,
json=[workflow],
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
logger.info("Workflow registered successfully", name=workflow["name"])
async def register_workflow_async() -> None:
"""Async version of register_workflow."""
workflow_path = Path(__file__).parent / "diarization_pipeline.json"
with open(workflow_path) as f:
workflow = json.load(f)
base_url = settings.CONDUCTOR_SERVER_URL.rstrip("/")
url = f"{base_url}/metadata/workflow"
logger.info(
"Registering workflow",
name=workflow["name"],
version=workflow["version"],
url=url,
)
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.put(
url,
json=[workflow],
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
logger.info("Workflow registered successfully", name=workflow["name"])
if __name__ == "__main__":
register_workflow()
print("Workflow registration complete!")

View File

@@ -22,7 +22,9 @@ recordings = sa.Table(
),
sa.Column("meeting_id", sa.String),
sa.Column("track_keys", sa.JSON, nullable=True),
sa.Column("workflow_id", sa.String, nullable=True),
sa.Index("idx_recording_meeting_id", "meeting_id"),
sa.Index("idx_recording_workflow_id", "workflow_id"),
)
@@ -38,6 +40,8 @@ class Recording(BaseModel):
# track_keys can be empty list [] if recording finished but no audio was captured (silence/muted)
# None means not a multitrack recording, [] means multitrack with no tracks
track_keys: list[str] | None = None
# Conductor workflow ID for tracking pipeline execution
workflow_id: str | None = None
@property
def is_multitrack(self) -> bool:

View File

@@ -181,6 +181,16 @@ class TranscriptEvent(BaseModel):
data: dict
class PipelineProgressData(BaseModel):
"""Data payload for PIPELINE_PROGRESS WebSocket events."""
workflow_id: str | None = None
current_step: str
step_index: int
total_steps: int
step_status: Literal["pending", "in_progress", "completed", "failed"]
class TranscriptParticipant(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: str = Field(default_factory=generate_uuid4)

View File

@@ -12,12 +12,15 @@ from typing import Literal, Union, assert_never
import celery
from celery.result import AsyncResult
from reflector.conductor.client import ConductorClientManager
from reflector.db.recordings import recordings_controller
from reflector.db.transcripts import Transcript
from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.settings import settings
from reflector.utils.string import NonEmptyString
@@ -37,6 +40,8 @@ class MultitrackProcessingConfig:
transcript_id: NonEmptyString
bucket_name: NonEmptyString
track_keys: list[str]
recording_id: NonEmptyString | None = None
room_id: NonEmptyString | None = None
mode: Literal["multitrack"] = "multitrack"
@@ -110,12 +115,15 @@ async def validate_transcript_for_processing(
)
async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResult:
async def prepare_transcript_processing(
validation: ValidationOk, room_id: str | None = None
) -> PrepareResult:
"""
Determine processing mode from transcript/recording data.
"""
bucket_name: str | None = None
track_keys: list[str] | None = None
recording_id: str | None = validation.recording_id
if validation.recording_id:
recording = await recordings_controller.get_by_id(validation.recording_id)
@@ -137,6 +145,8 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
bucket_name=bucket_name, # type: ignore (validated above)
track_keys=track_keys,
transcript_id=validation.transcript_id,
recording_id=recording_id,
room_id=room_id,
)
return FileProcessingConfig(
@@ -144,8 +154,32 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
)
def dispatch_transcript_processing(config: ProcessingConfig) -> AsyncResult:
def dispatch_transcript_processing(config: ProcessingConfig) -> AsyncResult | None:
if isinstance(config, MultitrackProcessingConfig):
# Start Conductor workflow if enabled
if settings.CONDUCTOR_ENABLED:
workflow_id = ConductorClientManager.start_workflow(
name="diarization_pipeline",
version=1,
input_data={
"recording_id": config.recording_id,
"room_name": None, # Not available in reprocess path
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
)
logger.info(
"Started Conductor workflow (reprocess)",
workflow_id=workflow_id,
transcript_id=config.transcript_id,
)
if not settings.CONDUCTOR_SHADOW_MODE:
return None # Conductor-only, no Celery result
# Celery pipeline (shadow mode or Conductor disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,

View File

@@ -150,5 +150,11 @@ class Settings(BaseSettings):
ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None
# Conductor workflow orchestration
CONDUCTOR_SERVER_URL: str = "http://conductor:8080/api"
CONDUCTOR_DEBUG: bool = False
CONDUCTOR_ENABLED: bool = False
CONDUCTOR_SHADOW_MODE: bool = False
settings = Settings()

View File

@@ -37,7 +37,7 @@ async def process_transcript_inner(
) -> AsyncResult:
validation = await validate_transcript_for_processing(transcript)
on_validation(validation)
config = await prepare_transcript_processing(validation)
config = await prepare_transcript_processing(validation, room_id=transcript.room_id)
on_preprocess(config)
return dispatch_transcript_processing(config)

View File

@@ -0,0 +1,45 @@
"""Conductor health and status endpoints."""
import httpx
from fastapi import APIRouter
from reflector.settings import settings
router = APIRouter(prefix="/conductor", tags=["conductor"])
@router.get("/health")
async def conductor_health():
"""Check Conductor server connectivity and status."""
if not settings.CONDUCTOR_ENABLED:
return {"status": "disabled", "connected": False}
# Extract base URL (remove /api suffix for health check)
base_url = settings.CONDUCTOR_SERVER_URL.rstrip("/api").rstrip("/")
health_url = f"{base_url}/health"
try:
async with httpx.AsyncClient(timeout=5.0) as client:
resp = await client.get(health_url)
if resp.status_code == 200:
return {"status": "healthy", "connected": True}
else:
return {
"status": "unhealthy",
"connected": True,
"error": f"Health check returned {resp.status_code}",
}
except httpx.TimeoutException:
return {
"status": "unhealthy",
"connected": False,
"error": "Connection timeout",
}
except httpx.ConnectError as e:
return {
"status": "unhealthy",
"connected": False,
"error": f"Connection failed: {e}",
}
except Exception as e:
return {"status": "unhealthy", "connected": False, "error": str(e)}

View File

@@ -45,7 +45,7 @@ async def transcript_process(
else:
assert_never(validation)
config = await prepare_transcript_processing(validation)
config = await prepare_transcript_processing(validation, room_id=transcript.room_id)
if isinstance(config, ProcessError):
raise HTTPException(status_code=500, detail=config.detail)

View File

@@ -286,6 +286,35 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
# Start Conductor workflow if enabled
if settings.CONDUCTOR_ENABLED:
from reflector.conductor.client import ConductorClientManager # noqa: PLC0415
workflow_id = ConductorClientManager.start_workflow(
name="diarization_pipeline",
version=1,
input_data={
"recording_id": recording_id,
"room_name": daily_room_name,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
)
logger.info(
"Started Conductor workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
# Store workflow_id on recording for status tracking
await recordings_controller.update(recording, {"workflow_id": workflow_id})
if not settings.CONDUCTOR_SHADOW_MODE:
return # Don't trigger Celery
# Celery pipeline (runs when Conductor disabled OR in shadow mode)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,

View File

@@ -7,6 +7,8 @@ elif [ "${ENTRYPOINT}" = "worker" ]; then
uv run celery -A reflector.worker.app worker --loglevel=info
elif [ "${ENTRYPOINT}" = "beat" ]; then
uv run celery -A reflector.worker.app beat --loglevel=info
elif [ "${ENTRYPOINT}" = "conductor-worker" ]; then
uv run python -m reflector.conductor.run_workers
else
echo "Unknown command"
fi

3379
server/uv.lock generated

File diff suppressed because it is too large Load Diff