diff --git a/TASKS.md b/TASKS.md index 6e871af7..d038dc18 100644 --- a/TASKS.md +++ b/TASKS.md @@ -1,1977 +1,174 @@ -# Conductor Migration Tasks +# Durable Workflow Migration Tasks -This document defines atomic, isolated work items for migrating the Daily.co multitrack diarization pipeline from Celery to Conductor. Each task is self-contained with clear dependencies, acceptance criteria, and references to the codebase. +This document defines atomic, isolated work items for migrating the Daily.co multitrack diarization pipeline from Celery to durable workflow orchestration. Supports both **Conductor** and **Hatchet** via `DURABLE_WORKFLOW_PROVIDER` env var. + +--- + +## Provider Selection + +```bash +# .env +DURABLE_WORKFLOW_PROVIDER=none # Celery only (default) +DURABLE_WORKFLOW_PROVIDER=conductor # Use Conductor +DURABLE_WORKFLOW_PROVIDER=hatchet # Use Hatchet +DURABLE_WORKFLOW_SHADOW_MODE=true # Run both provider + Celery (for comparison) +``` --- ## Task Index -| ID | Title | Phase | Dependencies | Complexity | -|----|-------|-------|--------------|------------| -| INFRA-001 | Add Conductor container to docker-compose | 1 | None | Low | -| INFRA-002 | Create Conductor Python client wrapper | 1 | INFRA-001 | Medium | -| INFRA-003 | Add Conductor environment configuration | 1 | INFRA-001 | Low | -| INFRA-004 | Create health check endpoint for Conductor | 1 | INFRA-002 | Low | -| TASK-001 | Create task definitions registry module | 2 | INFRA-002 | Medium | -| TASK-002 | Implement get_recording worker | 2 | TASK-001 | Low | -| TASK-003 | Implement get_participants worker | 2 | TASK-001 | Low | -| TASK-004a | Implement pad_track: extract stream metadata | 2 | TASK-001 | Medium | -| TASK-004b | Implement pad_track: PyAV padding filter | 2 | TASK-004a | Medium | -| TASK-004c | Implement pad_track: S3 upload padded file | 2 | TASK-004b | Low | -| TASK-005a | Implement mixdown_tracks: build filter graph | 2 | TASK-001 | Medium | -| TASK-005b | Implement mixdown_tracks: S3 streaming + upload | 2 | TASK-005a | Medium | -| TASK-006 | Implement generate_waveform worker | 2 | TASK-001 | Medium | -| TASK-007 | Implement transcribe_track worker | 2 | TASK-001 | Medium | -| TASK-008 | Implement merge_transcripts worker | 2 | TASK-001 | Medium | -| TASK-009 | Implement detect_topics worker | 2 | TASK-001 | Medium | -| TASK-010 | Implement generate_title worker | 2 | TASK-001 | Low | -| TASK-011 | Implement generate_summary worker | 2 | TASK-001 | Medium | -| TASK-012 | Implement finalize worker | 2 | TASK-001 | Medium | -| TASK-013 | Implement cleanup_consent worker | 2 | TASK-001 | Low | -| TASK-014 | Implement post_zulip worker | 2 | TASK-001 | Low | -| TASK-015 | Implement send_webhook worker | 2 | TASK-001 | Low | -| TASK-016 | Implement generate_dynamic_fork_tasks helper | 2 | TASK-001 | Low | -| STATE-001 | Add workflow_id to Recording model | 2 | INFRA-002 | Low | -| WFLOW-001 | Create workflow definition JSON with FORK_JOIN_DYNAMIC | 3 | TASK-002..015 | High | -| WFLOW-002 | Implement workflow registration script | 3 | WFLOW-001 | Medium | -| EVENT-001 | Add PIPELINE_PROGRESS WebSocket event (requires frontend ticket) | 2 | None | Medium | -| EVENT-002 | Emit progress events from workers (requires frontend ticket) | 2 | EVENT-001, TASK-002..015 | Medium | -| INTEG-001 | Modify pipeline trigger to start Conductor workflow | 4 | WFLOW-002, STATE-001 | Medium | -| SHADOW-001 | Implement shadow mode toggle | 4 | INTEG-001 | Medium | -| SHADOW-002 | Add result comparison: content fields | 4 | SHADOW-001 | Medium | -| CUTOVER-001 | Create feature flag for Conductor-only mode | 5 | SHADOW-001 | Low | -| CUTOVER-002 | Add fallback to Celery on Conductor failure | 5 | CUTOVER-001 | Medium | -| CLEANUP-001 | Remove deprecated Celery task code | 6 | CUTOVER-001 | Medium | -| CLEANUP-002 | Update documentation | 6 | CLEANUP-001 | Low | -| TEST-001a | Integration tests: API workers (defer to human if complex) | 2 | TASK-002, TASK-003 | Low | -| TEST-001b | Integration tests: audio workers (defer to human if complex) | 2 | TASK-004c, TASK-005b, TASK-006 | Medium | -| TEST-001c | Integration tests: transcription workers (defer to human if complex) | 2 | TASK-007, TASK-008 | Medium | -| TEST-001d | Integration tests: LLM workers (defer to human if complex) | 2 | TASK-009..011 | Medium | -| TEST-001e | Integration tests: finalization workers (defer to human if complex) | 2 | TASK-012..015 | Low | -| TEST-002 | E2E test for complete workflow (defer to human if complex) | 3 | WFLOW-002 | High | -| TEST-003 | Shadow mode comparison tests (defer to human tester if too complex) | 4 | SHADOW-002 | Medium | +| ID | Title | Status | Conductor | Hatchet | +|----|-------|--------|-----------|---------| +| INFRA-001 | Add container to docker-compose | Done | ✓ | ✓ | +| INFRA-002 | Create Python client wrapper | Done | ✓ | ✓ | +| INFRA-003 | Add environment configuration | Done | ✓ | ✓ | +| TASK-001 | Create task definitions/workflow | Done | ✓ JSON | ✓ Python | +| TASK-002 | get_recording worker | Done | ✓ | ✓ | +| TASK-003 | get_participants worker | Done | ✓ | ✓ | +| TASK-004 | pad_track worker | Done | ✓ | ✓ | +| TASK-005 | mixdown_tracks worker | Done | ✓ | ✓ | +| TASK-006 | generate_waveform worker | Done | ✓ | ✓ | +| TASK-007 | transcribe_track worker | Done | ✓ | ✓ | +| TASK-008 | merge_transcripts worker | Done | ✓ | ✓ (in process_tracks) | +| TASK-009 | detect_topics worker | Done | ✓ | ✓ | +| TASK-010 | generate_title worker | Done | ✓ | ✓ | +| TASK-011 | generate_summary worker | Done | ✓ | ✓ | +| TASK-012 | finalize worker | Done | ✓ | ✓ | +| TASK-013 | cleanup_consent worker | Done | ✓ | ✓ | +| TASK-014 | post_zulip worker | Done | ✓ | ✓ | +| TASK-015 | send_webhook worker | Done | ✓ | ✓ | +| EVENT-001 | Progress WebSocket events | Done | ✓ | ✓ | +| INTEG-001 | Pipeline trigger integration | Done | ✓ | ✓ | +| SHADOW-001 | Shadow mode toggle | Done | ✓ | ✓ | +| TEST-001 | Integration tests | Pending | - | - | +| TEST-002 | E2E workflow test | Pending | - | - | +| CUTOVER-001 | Production cutover | Pending | - | - | +| CLEANUP-001 | Remove Celery code | Pending | - | - | --- -## Phase 1: Infrastructure Setup +## Architecture Differences -### INFRA-001: Add Conductor Container to docker-compose - -**Description:** -Add the Conductor OSS standalone container to the docker-compose configuration. - -**Files to Modify:** -- `docker-compose.yml` - -**Implementation Details:** -```yaml -conductor: - image: conductoross/conductor-standalone:3.15.0 - ports: - - 8127:8080 - - 5001:5000 - environment: - - conductor.db.type=memory # Use postgres in production - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8080/health"] - interval: 30s - timeout: 10s - retries: 5 -``` - -**Acceptance Criteria:** -- [ ] Conductor UI accessible at http://localhost:8127 -- [ ] Swagger docs available at http://localhost:8127/swagger-ui/index.html -- [ ] Health endpoint returns 200 - -**Dependencies:** None - -**Reference Files:** -- `docs/conductor-pipeline-mock/docker-compose.yml` +| Aspect | Conductor | Hatchet | +|--------|-----------|---------| +| Worker model | Multiprocessing (fork) | Async (single process) | +| Task communication | REST polling | gRPC streaming | +| Workflow definition | JSON files | Python decorators | +| Child workflows | FORK_JOIN_DYNAMIC + JOIN task | `aio_run()` returns directly | +| Task definitions | Separate worker files | Embedded in workflow | +| Debug logging | Limited | Excellent with `HATCHET_DEBUG=true` | --- -### INFRA-002: Create Conductor Python Client Wrapper +## File Structure -**Description:** -Create a reusable client wrapper module for interacting with the Conductor server using the `conductor-python` SDK. - -**Files to Create:** -- `server/reflector/conductor/__init__.py` -- `server/reflector/conductor/client.py` - -**Implementation Details:** -```python -# server/reflector/conductor/client.py -from conductor.client.configuration.configuration import Configuration -from conductor.client.orkes_clients import OrkesClients -from conductor.client.workflow_client import WorkflowClient -from reflector.settings import settings - -class ConductorClientManager: - _instance = None - - @classmethod - def get_client(cls) -> WorkflowClient: - if cls._instance is None: - config = Configuration( - server_api_url=settings.CONDUCTOR_SERVER_URL, - debug=settings.CONDUCTOR_DEBUG, - ) - cls._instance = OrkesClients(config) - return cls._instance.get_workflow_client() - - @classmethod - def start_workflow(cls, name: str, version: int, input_data: dict) -> str: - """Start a workflow and return the workflow ID.""" - client = cls.get_client() - return client.start_workflow_by_name(name, input_data, version=version) - - @classmethod - def get_workflow_status(cls, workflow_id: str) -> dict: - """Get the current status of a workflow.""" - client = cls.get_client() - return client.get_workflow(workflow_id, include_tasks=True) +### Conductor +``` +server/reflector/conductor/ +├── client.py # SDK wrapper +├── progress.py # WebSocket progress emission +├── run_workers.py # Worker startup +├── shadow_compare.py # Shadow mode comparison +├── tasks/ +│ ├── definitions.py # Task definitions with timeouts +│ └── register.py # Registration script +├── workers/ +│ ├── get_recording.py +│ ├── get_participants.py +│ ├── pad_track.py +│ ├── mixdown_tracks.py +│ ├── generate_waveform.py +│ ├── transcribe_track.py +│ ├── merge_transcripts.py +│ ├── detect_topics.py +│ ├── generate_title.py +│ ├── generate_summary.py +│ ├── finalize.py +│ ├── cleanup_consent.py +│ ├── post_zulip.py +│ ├── send_webhook.py +│ └── generate_dynamic_fork_tasks.py +└── workflows/ + └── register.py ``` -**Acceptance Criteria:** -- [ ] Can connect to Conductor server -- [ ] Can start a workflow -- [ ] Can retrieve workflow status -- [ ] Proper error handling for connection failures - -**Dependencies:** INFRA-001 - -**Reference Files:** -- `docs/conductor-pipeline-mock/src/main.py` -- `docs/conductor-pipeline-mock/src/register_workflow.py` +### Hatchet +``` +server/reflector/hatchet/ +├── client.py # SDK wrapper +├── progress.py # WebSocket progress emission +├── run_workers.py # Worker startup +└── workflows/ + ├── diarization_pipeline.py # Main workflow with all tasks + └── track_processing.py # Child workflow (pad + transcribe) +``` --- -### INFRA-003: Add Conductor Environment Configuration +## Remaining Work -**Description:** -Add environment variables for Conductor configuration to the settings module. +### TEST-001: Integration Tests +- [ ] Test each worker with mocked external services +- [ ] Test error handling and retries +- [ ] Test both Conductor and Hatchet paths -**Files to Modify:** -- `server/reflector/settings.py` -- `server/.env_template` +### TEST-002: E2E Workflow Test +- [ ] Complete workflow run with real Daily.co recording +- [ ] Verify output matches Celery pipeline +- [ ] Performance comparison -**Implementation Details:** -```python -# Add to settings.py -CONDUCTOR_SERVER_URL: str = "http://conductor:8080/api" -CONDUCTOR_DEBUG: bool = False -CONDUCTOR_ENABLED: bool = False # Feature flag -CONDUCTOR_SHADOW_MODE: bool = False # Run both Celery and Conductor -``` +### CUTOVER-001: Production Cutover +- [ ] Deploy with `DURABLE_WORKFLOW_PROVIDER=conductor` or `hatchet` +- [ ] Monitor for failures +- [ ] Compare results with shadow mode if needed -**Acceptance Criteria:** -- [ ] Settings load from environment variables -- [ ] Default values work for local development -- [ ] Docker container uses internal hostname - -**Dependencies:** INFRA-001 - -**Reference Files:** -- `server/reflector/settings.py` +### CLEANUP-001: Remove Celery Code +- [ ] Remove `main_multitrack_pipeline.py` +- [ ] Remove Celery task triggers +- [ ] Update documentation --- -### INFRA-004: Create Health Check Endpoint for Conductor +## Known Issues -**Description:** -Add an endpoint to check Conductor server connectivity and status. +### Conductor +- See `CONDUCTOR_LLM_OBSERVATIONS.md` for debugging notes +- Ghost workers issue (multiple containers polling) +- Multiprocessing + AsyncIO conflicts -**Files to Create:** -- `server/reflector/views/conductor.py` - -**Files to Modify:** -- `server/reflector/app.py` (register router) - -**Implementation Details:** -```python -from fastapi import APIRouter -from reflector.conductor.client import ConductorClientManager - -router = APIRouter(prefix="/conductor", tags=["conductor"]) - -@router.get("/health") -async def conductor_health(): - try: - client = ConductorClientManager.get_client() - # Conductor SDK health check - return {"status": "healthy", "connected": True} - except Exception as e: - return {"status": "unhealthy", "error": str(e)} -``` - -**Acceptance Criteria:** -- [ ] Endpoint returns healthy when Conductor is up -- [ ] Endpoint returns unhealthy with error when Conductor is down -- [ ] Does not block on slow responses - -**Dependencies:** INFRA-002 +### Hatchet +- See `HATCHET_LLM_OBSERVATIONS.md` for debugging notes +- SDK v1.21+ API changes (breaking) +- JWT token Docker networking issues +- Worker appears hung without debug mode --- -## Phase 2: Task Decomposition - Worker Definitions +## Quick Start -### TASK-001: Create Task Definitions Registry Module - -**Description:** -Create a module that registers all task definitions with the Conductor server on startup. - -**Files to Create:** -- `server/reflector/conductor/tasks/__init__.py` -- `server/reflector/conductor/tasks/definitions.py` -- `server/reflector/conductor/tasks/register.py` - -**Implementation Details:** - -Task definition schema: -```python -TASK_DEFINITIONS = [ - { - "name": "get_recording", - "retryCount": 3, - "timeoutSeconds": 60, - "responseTimeoutSeconds": 30, - "inputKeys": ["recording_id"], - "outputKeys": ["id", "mtg_session_id", "room_name", "duration"], - "ownerEmail": "reflector@example.com", - }, - # ... all other tasks -] -``` - -**Acceptance Criteria:** -- [ ] All 16 task types defined with correct timeouts -- [ ] Registration script runs successfully -- [ ] Tasks visible in Conductor UI - -**Dependencies:** INFRA-002 - -**Reference Files:** -- `docs/conductor-pipeline-mock/src/register_workflow.py` (lines 10-112) -- `CONDUCTOR_MIGRATION_REQUIREMENTS.md` (Module 5 section) - ---- - -### TASK-002: Implement get_recording Worker - -**Description:** -Create a Conductor worker that fetches recording metadata from the Daily.co API. - -**Files to Create:** -- `server/reflector/conductor/workers/__init__.py` -- `server/reflector/conductor/workers/get_recording.py` - -**Implementation Details:** -```python -from conductor.client.worker.worker_task import worker_task -from conductor.client.http.models import Task, TaskResult -from conductor.client.http.models.task_result_status import TaskResultStatus -from reflector.video_platforms.factory import create_platform_client - -@worker_task(task_definition_name="get_recording") -async def get_recording(task: Task) -> TaskResult: - recording_id = task.input_data.get("recording_id") - - async with create_platform_client("daily") as client: - recording = await client.get_recording(recording_id) - - result = TaskResult( - task_id=task.task_id, - workflow_instance_id=task.workflow_instance_id, - worker_id=task.worker_id, - ) - result.status = TaskResultStatus.COMPLETED - result.output_data = { - "id": recording.id, - "mtg_session_id": recording.mtgSessionId, - "room_name": recording.roomName, - "duration": recording.duration, - } - return result -``` - -**Input Contract:** -```json -{"recording_id": "string"} -``` - -**Output Contract:** -```json -{"id": "string", "mtg_session_id": "string", "room_name": "string", "duration": "number"} -``` - -**Acceptance Criteria:** -- [ ] Worker polls for tasks correctly -- [ ] Handles Daily.co API errors gracefully -- [ ] Returns correct output schema -- [ ] Timeout: 60s, Response timeout: 30s, Retries: 3 - -**Dependencies:** TASK-001 - -**Reference Files:** -- `server/reflector/worker/process.py` (lines 218-294) -- `docs/conductor-pipeline-mock/src/workers.py` (lines 13-26) - ---- - -### TASK-003: Implement get_participants Worker - -**Description:** -Create a Conductor worker that fetches meeting participants from the Daily.co API. - -**Files to Create:** -- `server/reflector/conductor/workers/get_participants.py` - -**Implementation Details:** -```python -@worker_task(task_definition_name="get_participants") -async def get_participants(task: Task) -> TaskResult: - mtg_session_id = task.input_data.get("mtg_session_id") - - async with create_platform_client("daily") as client: - payload = await client.get_meeting_participants(mtg_session_id) - - participants = [ - {"participant_id": p.participant_id, "user_name": p.user_name, "user_id": p.user_id} - for p in payload.data - ] - - result = TaskResult(...) - result.output_data = {"participants": participants} - return result -``` - -**Input Contract:** -```json -{"mtg_session_id": "string"} -``` - -**Output Contract:** -```json -{"participants": [{"participant_id": "string", "user_name": "string", "user_id": "string|null"}]} -``` - -**Acceptance Criteria:** -- [ ] Fetches participants from Daily.co API -- [ ] Maps participant IDs to names correctly -- [ ] Handles missing mtg_session_id - -**Dependencies:** TASK-001 - -**Reference Files:** -- `server/reflector/pipelines/main_multitrack_pipeline.py` (lines 513-596) -- `docs/conductor-pipeline-mock/src/workers.py` (lines 29-42) - ---- - -### TASK-004a: Implement pad_track - Extract Stream Metadata - -**Description:** -Extract stream.start_time from WebM container metadata for timestamp alignment. - -**Files to Create:** -- `server/reflector/conductor/workers/pad_track.py` (partial - metadata extraction) - -**Implementation Details:** -```python -def _extract_stream_start_time_from_container(source_url: str) -> float: - """Extract start_time from WebM stream metadata using PyAV.""" - container = av.open(source_url, options={ - "reconnect": "1", - "reconnect_streamed": "1", - "reconnect_delay_max": "30", - }) - audio_stream = container.streams.audio[0] - start_time = float(audio_stream.start_time * audio_stream.time_base) - container.close() - return start_time -``` - -**Acceptance Criteria:** -- [ ] Opens WebM container from S3 presigned URL -- [ ] Extracts start_time from audio stream metadata -- [ ] Handles missing/invalid start_time (returns 0) -- [ ] Closes container properly - -**Dependencies:** TASK-001 - -**Reference Files:** -- `server/reflector/pipelines/main_multitrack_pipeline.py` (lines 56-85) - - `_extract_stream_start_time_from_container()` method - ---- - -### TASK-004b: Implement pad_track - PyAV Padding Filter - -**Description:** -Apply adelay filter using PyAV filter graph to pad audio with silence. - -**Files to Modify:** -- `server/reflector/conductor/workers/pad_track.py` (add filter logic) - -**Implementation Details:** -```python -def _apply_audio_padding_to_file(in_container, output_path: str, start_time_seconds: float): - """Apply adelay filter to pad audio with silence.""" - delay_ms = math.floor(start_time_seconds * 1000) - - graph = av.filter.Graph() - src = graph.add("abuffer", args=abuf_args, name="src") - aresample_f = graph.add("aresample", args="async=1", name="ares") - delays_arg = f"{delay_ms}|{delay_ms}" - adelay_f = graph.add("adelay", args=f"delays={delays_arg}:all=1", name="delay") - sink = graph.add("abuffersink", name="sink") - - src.link_to(aresample_f) - aresample_f.link_to(adelay_f) - adelay_f.link_to(sink) - graph.configure() - - # Process frames through filter graph - # Write to output file -``` - -**Acceptance Criteria:** -- [ ] Constructs correct filter graph chain -- [ ] Calculates delay_ms correctly (start_time * 1000) -- [ ] Handles stereo audio (delay per channel) -- [ ] Edge case: skip if start_time <= 0 - -**Dependencies:** TASK-004a - -**Reference Files:** -- `server/reflector/pipelines/main_multitrack_pipeline.py` (lines 87-188) - - `_apply_audio_padding_to_file()` method - -**Technical Notes:** -- Filter chain: `abuffer` -> `aresample` -> `adelay` -> `abuffersink` -- adelay format: `delays={ms}|{ms}:all=1` - ---- - -### TASK-004c: Implement pad_track - S3 Upload - -**Description:** -Complete the pad_track worker by uploading padded file to S3 and returning presigned URL. - -**Files to Modify:** -- `server/reflector/conductor/workers/pad_track.py` (complete worker) - -**Implementation Details:** -```python -@worker_task(task_definition_name="pad_track") -async def pad_track(task: Task) -> TaskResult: - track_index = task.input_data.get("track_index") - s3_key = task.input_data.get("s3_key") - bucket_name = task.input_data.get("bucket_name") - transcript_id = task.input_data.get("transcript_id") - - storage = get_transcripts_storage() - source_url = await storage.get_file_url(s3_key, expires_in=7200, bucket=bucket_name) - - # Use helpers from 004a and 004b - start_time = _extract_stream_start_time_from_container(source_url) - padded_path = _apply_audio_padding_to_file(source_url, start_time) - - # Upload to S3 - storage_key = f"{transcript_id}/padded_track_{track_index}.webm" - await storage.put_file(storage_key, padded_path) - padded_url = await storage.get_file_url(storage_key, expires_in=7200) - - result.output_data = {"padded_url": padded_url, "size": file_size, "track_index": track_index} - return result -``` - -**Input Contract:** -```json -{"track_index": "number", "s3_key": "string", "bucket_name": "string", "transcript_id": "string"} -``` - -**Output Contract:** -```json -{"padded_url": "string", "size": "number", "track_index": "number"} -``` - -**Acceptance Criteria:** -- [ ] Uploads padded file to S3 -- [ ] Returns presigned URL (7200s expiry) -- [ ] Timeout: 300s, Response timeout: 120s, Retries: 3 - -**Dependencies:** TASK-004b - -**Reference Files:** -- `server/reflector/pipelines/main_multitrack_pipeline.py` (lines 190-210) - ---- - -### TASK-005a: Implement mixdown_tracks - Build Filter Graph - -**Description:** -Build PyAV filter graph for mixing N audio tracks with amix filter. - -**Files to Create:** -- `server/reflector/conductor/workers/mixdown_tracks.py` (partial - filter graph) - -**Implementation Details:** -```python -def _build_mixdown_filter_graph(containers: list, out_stream) -> av.filter.Graph: - """Build filter graph: N abuffer -> amix -> aformat -> sink.""" - graph = av.filter.Graph() - - # Create abuffer for each input - abuffers = [] - for i, container in enumerate(containers): - audio_stream = container.streams.audio[0] - abuf_args = f"time_base={...}:sample_rate=48000:sample_fmt=fltp:channel_layout=stereo" - abuffers.append(graph.add("abuffer", args=abuf_args, name=f"src{i}")) - - # amix with normalize=0 to prevent volume reduction - amix = graph.add("amix", args=f"inputs={len(containers)}:normalize=0", name="amix") - aformat = graph.add("aformat", args="sample_fmts=s16:channel_layouts=stereo", name="aformat") - sink = graph.add("abuffersink", name="sink") - - # Link all sources to amix - for abuf in abuffers: - abuf.link_to(amix) - amix.link_to(aformat) - aformat.link_to(sink) - graph.configure() - return graph -``` - -**Acceptance Criteria:** -- [ ] Creates abuffer per input track -- [ ] Uses amix with normalize=0 -- [ ] Outputs stereo s16 format -- [ ] Handles variable number of inputs (1-N tracks) - -**Dependencies:** TASK-001 - -**Reference Files:** -- `server/reflector/pipelines/main_multitrack_pipeline.py` (lines 324-420) - -**Technical Notes:** -- amix normalize=0 prevents volume reduction when mixing -- Output format: stereo, s16 for MP3 encoding - ---- - -### TASK-005b: Implement mixdown_tracks - S3 Streaming and Upload - -**Description:** -Complete mixdown worker with S3 streaming input and upload output. - -**Files to Modify:** -- `server/reflector/conductor/workers/mixdown_tracks.py` (complete worker) - -**Implementation Details:** -```python -@worker_task(task_definition_name="mixdown_tracks") -async def mixdown_tracks(task: Task) -> TaskResult: - padded_urls = task.input_data.get("padded_urls", []) - transcript_id = task.input_data.get("transcript_id") - - # Open containers with reconnect options for S3 streaming - containers = [] - for url in padded_urls: - containers.append(av.open(url, options={ - "reconnect": "1", "reconnect_streamed": "1", "reconnect_delay_max": "30" - })) - - # Build filter graph and process - graph = _build_mixdown_filter_graph(containers, ...) - # Encode to MP3 and upload - - storage = get_transcripts_storage() - storage_path = f"{transcript_id}/audio.mp3" - await storage.put_file(storage_path, mp3_file) - - result.output_data = {"audio_key": storage_path, "duration": duration, "size": file_size} - return result -``` - -**Input Contract:** -```json -{"padded_urls": ["string"], "transcript_id": "string"} -``` - -**Output Contract:** -```json -{"audio_key": "string", "duration": "number", "size": "number"} -``` - -**Acceptance Criteria:** -- [ ] Opens all padded tracks via presigned URLs -- [ ] Handles S3 streaming with reconnect options -- [ ] Encodes to MP3 format -- [ ] Uploads to `{transcript_id}/audio.mp3` -- [ ] Returns duration for broadcast -- [ ] Timeout: 600s, Response timeout: 300s, Retries: 3 - -**Dependencies:** TASK-005a - -**Reference Files:** -- `server/reflector/pipelines/main_multitrack_pipeline.py` (lines 420-498) - ---- - -### TASK-006: Implement generate_waveform Worker - -**Description:** -Create a Conductor worker that generates waveform visualization data from the mixed audio. - -**Files to Create:** -- `server/reflector/conductor/workers/generate_waveform.py` - -**Implementation Details:** -```python -@worker_task(task_definition_name="generate_waveform") -async def generate_waveform(task: Task) -> TaskResult: - audio_key = task.input_data.get("audio_key") - transcript_id = task.input_data.get("transcript_id") - - # Use AudioWaveformProcessor to generate peaks - # This processor uses librosa/scipy internally - - result.output_data = {"waveform": waveform_peaks} - return result -``` - -**Input Contract:** -```json -{"audio_key": "string", "transcript_id": "string"} -``` - -**Output Contract:** -```json -{"waveform": ["number"]} -``` - -**Acceptance Criteria:** -- [ ] Generates waveform peaks array -- [ ] Broadcasts WAVEFORM event to WebSocket -- [ ] Stores waveform JSON locally -- [ ] Timeout: 120s, Response timeout: 60s, Retries: 3 - -**Dependencies:** TASK-001 - -**Reference Files:** -- `server/reflector/pipelines/main_multitrack_pipeline.py` (lines 670-678) -- `server/reflector/processors/audio_waveform_processor.py` -- `docs/conductor-pipeline-mock/src/workers.py` (lines 79-92) - ---- - -### TASK-007: Implement transcribe_track Worker - -**Description:** -Create a Conductor worker that transcribes a single audio track using GPU (Modal.com) or local Whisper. - -**Files to Create:** -- `server/reflector/conductor/workers/transcribe_track.py` - -**Implementation Details:** -```python -@worker_task(task_definition_name="transcribe_track") -async def transcribe_track(task: Task) -> TaskResult: - track_index = task.input_data.get("track_index") - audio_url = task.input_data.get("audio_url") - language = task.input_data.get("language", "en") - - transcript = await transcribe_file_with_processor(audio_url, language) - - # Tag all words with speaker index - for word in transcript.words: - word.speaker = track_index - - result.output_data = { - "words": [w.model_dump() for w in transcript.words], - "track_index": track_index, - } - return result -``` - -**Input Contract:** -```json -{ - "track_index": "number", - "audio_url": "string", - "language": "string" -} -``` - -**Output Contract:** -```json -{ - "words": [{"word": "string", "start": "number", "end": "number", "speaker": "number"}], - "track_index": "number" -} -``` - -**Acceptance Criteria:** -- [ ] Calls Modal.com GPU transcription service -- [ ] Tags words with correct speaker index -- [ ] Handles empty transcription results -- [ ] Timeout: 1800s, Response timeout: 900s, Retries: 3 - -**Dependencies:** TASK-001, CACHE-001 - -**Reference Files:** -- `server/reflector/pipelines/main_multitrack_pipeline.py` (lines 747-748) -- `server/reflector/pipelines/transcription_helpers.py` -- `server/reflector/processors/file_transcript_auto.py` -- `docs/conductor-pipeline-mock/src/workers.py` (lines 95-109) - -**Technical Notes:** -- This is the most expensive operation (GPU time) -- Should implement caching to avoid re-transcription on retries (see CACHE-002) -- Environment variable: `TRANSCRIPT_MODAL_API_KEY` - ---- - -### TASK-008: Implement merge_transcripts Worker - -**Description:** -Create a Conductor worker that merges multiple track transcriptions into a single timeline sorted by timestamp. - -**Files to Create:** -- `server/reflector/conductor/workers/merge_transcripts.py` - -**Implementation Details:** -```python -@worker_task(task_definition_name="merge_transcripts") -async def merge_transcripts(task: Task) -> TaskResult: - transcripts = task.input_data.get("transcripts", []) - transcript_id = task.input_data.get("transcript_id") - - all_words = [] - for t in transcripts: - if isinstance(t, dict) and "words" in t: - all_words.extend(t["words"]) - - # Sort by start timestamp - all_words.sort(key=lambda w: w.get("start", 0)) - - # Broadcast TRANSCRIPT event - await broadcast_transcript_event(transcript_id, all_words) - - result.output_data = { - "all_words": all_words, - "word_count": len(all_words), - } - return result -``` - -**Input Contract:** -```json -{ - "transcripts": [{"words": [...]}], - "transcript_id": "string" -} -``` - -**Output Contract:** -```json -{"all_words": [...], "word_count": "number"} -``` - -**Acceptance Criteria:** -- [ ] Merges words from all tracks -- [ ] Sorts by start timestamp -- [ ] Preserves speaker attribution -- [ ] Broadcasts TRANSCRIPT event -- [ ] Updates transcript.events in DB - -**Dependencies:** TASK-001 - -**Reference Files:** -- `server/reflector/pipelines/main_multitrack_pipeline.py` (lines 727-736) -- `docs/conductor-pipeline-mock/src/workers.py` (lines 112-131) - ---- - -### TASK-009: Implement detect_topics Worker - -**Description:** -Create a Conductor worker that detects topics using LLM calls. - -**Files to Create:** -- `server/reflector/conductor/workers/detect_topics.py` - -**Implementation Details:** -```python -@worker_task(task_definition_name="detect_topics") -async def detect_topics(task: Task) -> TaskResult: - words = task.input_data.get("words", []) - transcript_id = task.input_data.get("transcript_id") - target_language = task.input_data.get("target_language", "en") - - # Uses TranscriptTopicDetectorProcessor - # Chunks words into groups of 300, calls LLM per chunk - topics = await topic_processing.detect_topics( - TranscriptType(words=words), - target_language, - on_topic_callback=lambda t: broadcast_topic_event(transcript_id, t), - empty_pipeline=EmptyPipeline(logger), - ) - - result.output_data = { - "topics": [t.model_dump() for t in topics] - } - return result -``` - -**Input Contract:** -```json -{ - "words": [...], - "transcript_id": "string", - "target_language": "string" -} -``` - -**Output Contract:** -```json -{"topics": [{"id": "string", "title": "string", "summary": "string", "timestamp": "number", "duration": "number"}]} -``` - -**Acceptance Criteria:** -- [ ] Chunks words in groups of 300 -- [ ] Calls LLM for each chunk -- [ ] Broadcasts TOPIC event for each detected topic -- [ ] Returns complete topics list -- [ ] Timeout: 300s, Response timeout: 120s, Retries: 3 - -**Dependencies:** TASK-001, CACHE-001 - -**Reference Files:** -- `server/reflector/pipelines/topic_processing.py` (lines 34-63) -- `server/reflector/processors/transcript_topic_detector.py` -- `docs/conductor-pipeline-mock/src/workers.py` (lines 134-147) - -**Technical Notes:** -- Number of LLM calls: `ceil(word_count / 300)` -- Uses `TranscriptTopicDetectorProcessor` - ---- - -### TASK-010: Implement generate_title Worker - -**Description:** -Create a Conductor worker that generates a meeting title from detected topics using LLM. - -**Files to Create:** -- `server/reflector/conductor/workers/generate_title.py` - -**Implementation Details:** -```python -@worker_task(task_definition_name="generate_title") -async def generate_title(task: Task) -> TaskResult: - topics = task.input_data.get("topics", []) - transcript_id = task.input_data.get("transcript_id") - - if not topics: - result.output_data = {"title": "Untitled Meeting"} - return result - - # Uses TranscriptFinalTitleProcessor - title = await topic_processing.generate_title( - topics, - on_title_callback=lambda t: broadcast_title_event(transcript_id, t), - empty_pipeline=EmptyPipeline(logger), - logger=logger, - ) - - result.output_data = {"title": title} - return result -``` - -**Input Contract:** -```json -{"topics": [...], "transcript_id": "string"} -``` - -**Output Contract:** -```json -{"title": "string"} -``` - -**Acceptance Criteria:** -- [ ] Generates title from topic summaries -- [ ] Broadcasts FINAL_TITLE event -- [ ] Updates transcript.title in DB -- [ ] Handles empty topics list -- [ ] Timeout: 60s, Response timeout: 30s, Retries: 3 - -**Dependencies:** TASK-001 - -**Reference Files:** -- `server/reflector/pipelines/topic_processing.py` (lines 66-84) -- `server/reflector/pipelines/main_multitrack_pipeline.py` (lines 760-766) -- `docs/conductor-pipeline-mock/src/workers.py` (lines 150-163) - ---- - -### TASK-011: Implement generate_summary Worker - -**Description:** -Create a Conductor worker that generates long and short summaries from topics and words using LLM. - -**Files to Create:** -- `server/reflector/conductor/workers/generate_summary.py` - -**Implementation Details:** -```python -@worker_task(task_definition_name="generate_summary") -async def generate_summary(task: Task) -> TaskResult: - words = task.input_data.get("words", []) - topics = task.input_data.get("topics", []) - transcript_id = task.input_data.get("transcript_id") - - transcript = await transcripts_controller.get_by_id(transcript_id) - - # Uses TranscriptFinalSummaryProcessor - await topic_processing.generate_summaries( - topics, transcript, - on_long_summary_callback=lambda s: broadcast_long_summary_event(transcript_id, s), - on_short_summary_callback=lambda s: broadcast_short_summary_event(transcript_id, s), - empty_pipeline=EmptyPipeline(logger), - logger=logger, - ) - - result.output_data = { - "summary": long_summary, - "short_summary": short_summary, - } - return result -``` - -**Input Contract:** -```json -{ - "words": [...], - "topics": [...], - "transcript_id": "string" -} -``` - -**Output Contract:** -```json -{"summary": "string", "short_summary": "string"} -``` - -**Acceptance Criteria:** -- [ ] Generates long summary -- [ ] Generates short summary -- [ ] Broadcasts FINAL_LONG_SUMMARY event -- [ ] Broadcasts FINAL_SHORT_SUMMARY event -- [ ] Updates transcript.long_summary and transcript.short_summary in DB -- [ ] Timeout: 300s, Response timeout: 120s, Retries: 3 - -**Dependencies:** TASK-001, CACHE-001 - -**Reference Files:** -- `server/reflector/pipelines/topic_processing.py` (lines 86-109) -- `server/reflector/pipelines/main_multitrack_pipeline.py` (lines 768-777) -- `docs/conductor-pipeline-mock/src/workers.py` (lines 166-180) - -**Technical Notes:** -- LLM calls: 2 + 2*M where M = number of subjects (max 6) - ---- - -### TASK-012: Implement finalize Worker - -**Description:** -Create a Conductor worker that finalizes the transcript status and updates the database. - -**Files to Create:** -- `server/reflector/conductor/workers/finalize.py` - -**Implementation Details:** -```python -@worker_task(task_definition_name="finalize") -async def finalize(task: Task) -> TaskResult: - transcript_id = task.input_data.get("transcript_id") - title = task.input_data.get("title") - summary = task.input_data.get("summary") - short_summary = task.input_data.get("short_summary") - duration = task.input_data.get("duration") - - transcript = await transcripts_controller.get_by_id(transcript_id) - await transcripts_controller.update(transcript, { - "status": "ended", - "title": title, - "long_summary": summary, - "short_summary": short_summary, - "duration": duration, - }) - - # Broadcast STATUS event - await broadcast_status_event(transcript_id, "ended") - - result.output_data = {"status": "COMPLETED"} - return result -``` - -**Input Contract:** -```json -{ - "transcript_id": "string", - "title": "string", - "summary": "string", - "short_summary": "string", - "duration": "number" -} -``` - -**Output Contract:** -```json -{"status": "string"} -``` - -**Acceptance Criteria:** -- [ ] Updates transcript status to "ended" -- [ ] Persists title, summaries, duration -- [ ] Broadcasts STATUS event with "ended" -- [ ] Idempotent (can be retried safely) - -**Dependencies:** TASK-001 - -**Reference Files:** -- `server/reflector/pipelines/main_multitrack_pipeline.py` (lines 745, 787-791) -- `docs/conductor-pipeline-mock/src/workers.py` (lines 183-196) - ---- - -### TASK-013: Implement cleanup_consent Worker - -**Description:** -Create a Conductor worker that checks participant consent and deletes audio if denied. - -**Files to Create:** -- `server/reflector/conductor/workers/cleanup_consent.py` - -**Implementation Details:** -```python -@worker_task(task_definition_name="cleanup_consent") -async def cleanup_consent(task: Task) -> TaskResult: - transcript_id = task.input_data.get("transcript_id") - - # Check if any participant denied consent - # Delete audio from S3 if so - # Implementation mirrors task_cleanup_consent from main_live_pipeline - - result.output_data = { - "audio_deleted": deleted, - "reason": reason, - } - return result -``` - -**Input Contract:** -```json -{"transcript_id": "string"} -``` - -**Output Contract:** -```json -{"audio_deleted": "boolean", "reason": "string|null"} -``` - -**Acceptance Criteria:** -- [ ] Checks all participant consent statuses -- [ ] Deletes audio from S3 if any denied -- [ ] Updates transcript.audio_deleted flag -- [ ] Idempotent deletes - -**Dependencies:** TASK-001 - -**Reference Files:** -- `server/reflector/pipelines/main_live_pipeline.py` - `task_cleanup_consent` -- `server/reflector/pipelines/main_multitrack_pipeline.py` (line 794) - ---- - -### TASK-014: Implement post_zulip Worker - -**Description:** -Create a Conductor worker that posts or updates a Zulip message with the transcript summary. - -**Files to Create:** -- `server/reflector/conductor/workers/post_zulip.py` - -**Implementation Details:** -```python -@worker_task(task_definition_name="post_zulip") -async def post_zulip(task: Task) -> TaskResult: - transcript_id = task.input_data.get("transcript_id") - - # Uses existing Zulip integration - # Post new message or update existing using message_id - - result.output_data = {"message_id": message_id} - return result -``` - -**Input Contract:** -```json -{"transcript_id": "string"} -``` - -**Output Contract:** -```json -{"message_id": "string|null"} -``` - -**Acceptance Criteria:** -- [ ] Posts to configured Zulip channel -- [ ] Updates existing message if message_id exists -- [ ] Handles Zulip API errors gracefully -- [ ] Timeout: 60s, Response timeout: 30s, Retries: 5 - -**Dependencies:** TASK-001 - -**Reference Files:** -- `server/reflector/pipelines/main_live_pipeline.py` - `task_pipeline_post_to_zulip` -- `server/reflector/pipelines/main_multitrack_pipeline.py` (line 795) -- `server/reflector/zulip.py` - ---- - -### TASK-015: Implement send_webhook Worker - -**Description:** -Create a Conductor worker that sends the transcript completion webhook to the configured URL. - -**Files to Create:** -- `server/reflector/conductor/workers/send_webhook.py` - -**Implementation Details:** -```python -@worker_task(task_definition_name="send_webhook") -async def send_webhook(task: Task) -> TaskResult: - transcript_id = task.input_data.get("transcript_id") - room_id = task.input_data.get("room_id") - - # Uses existing webhook logic from webhook.py - # Includes HMAC signature if secret configured - - result.output_data = { - "sent": success, - "status_code": status_code, - } - return result -``` - -**Input Contract:** -```json -{"transcript_id": "string", "room_id": "string"} -``` - -**Output Contract:** -```json -{"sent": "boolean", "status_code": "number|null"} -``` - -**Acceptance Criteria:** -- [ ] Sends webhook with correct payload schema -- [ ] Includes HMAC signature -- [ ] Retries on 5xx, not on 4xx -- [ ] Timeout: 60s, Response timeout: 30s, Retries: 30 - -**Dependencies:** TASK-001 - -**Reference Files:** -- `server/reflector/worker/webhook.py` -- `server/reflector/pipelines/main_file_pipeline.py` - `task_send_webhook_if_needed` -- `server/reflector/pipelines/main_multitrack_pipeline.py` (line 796) - ---- - -### TASK-016: Implement generate_dynamic_fork_tasks Helper - -**Description:** -Create a helper worker that generates dynamic task definitions for FORK_JOIN_DYNAMIC. This is required because Conductor's FORK_JOIN_DYNAMIC needs pre-computed task lists and input maps. - -**Files to Create:** -- `server/reflector/conductor/workers/generate_dynamic_fork_tasks.py` - -**Implementation Details:** -```python -@worker_task(task_definition_name="generate_dynamic_fork_tasks") -def generate_dynamic_fork_tasks(task: Task) -> TaskResult: - tracks = task.input_data.get("tracks", []) - task_type = task.input_data.get("task_type") # "pad_track" or "transcribe_track" - transcript_id = task.input_data.get("transcript_id") - - tasks = [] - inputs = {} - for idx, track in enumerate(tracks): - ref_name = f"{task_type}_{idx}" - tasks.append({ - "name": task_type, - "taskReferenceName": ref_name, - "type": "SIMPLE" - }) - inputs[ref_name] = { - "track_index": idx, - "transcript_id": transcript_id, - # Additional task-specific inputs based on task_type - } - - result.output_data = {"tasks": tasks, "inputs": inputs} - return result -``` - -**Input Contract:** -```json -{ - "tracks": [{"s3_key": "string"}], - "task_type": "pad_track" | "transcribe_track", - "transcript_id": "string", - "bucket_name": "string" -} -``` - -**Output Contract:** -```json -{ - "tasks": [{"name": "string", "taskReferenceName": "string", "type": "SIMPLE"}], - "inputs": {"ref_name": {...input_data...}} -} -``` - -**Acceptance Criteria:** -- [ ] Generates correct task list for variable track counts (1, 2, ... N) -- [ ] Generates correct input map with task-specific parameters -- [ ] Supports both pad_track and transcribe_track task types -- [ ] Timeout: 30s, Response timeout: 15s, Retries: 3 - -**Dependencies:** TASK-001 - -**Technical Notes:** -- This helper is required because FORK_JOIN_DYNAMIC expects `dynamicTasks` and `dynamicTasksInput` parameters -- The workflow uses this helper twice: once for padding, once for transcription -- Each invocation has different task_type and additional inputs - ---- - -## Phase 2 (Continued): State Management - -### STATE-001: Add workflow_id to Recording Model - -**Description:** -Add a `workflow_id` field to the Recording model to track the Conductor workflow associated with each recording. - -**Files to Modify:** -- `server/reflector/db/recordings.py` -- Create migration file - -**Implementation Details:** -```python -# In Recording model -workflow_id: Optional[str] = Column(String, nullable=True, index=True) -``` - -**Acceptance Criteria:** -- [ ] Migration adds nullable workflow_id column -- [ ] Index created for workflow_id lookups -- [ ] Recording can be queried by workflow_id - -**Dependencies:** INFRA-002 - -**Reference Files:** -- `CONDUCTOR_MIGRATION_REQUIREMENTS.md` (Module 7: State Management) - ---- - -## Phase 3: Workflow Definition - -### WFLOW-001: Create Workflow Definition JSON with FORK_JOIN_DYNAMIC - -**Description:** -Define the complete workflow DAG in Conductor's workflow definition format, including dynamic forking for variable track counts. - -**Files to Create:** -- `server/reflector/conductor/workflows/diarization_pipeline.json` - -**Implementation Details:** - -The workflow must include: -1. Sequential: get_recording -> get_participants -2. FORK_JOIN_DYNAMIC: pad_track for each track -3. Sequential: mixdown_tracks -> generate_waveform -4. FORK_JOIN_DYNAMIC: transcribe_track for each track (parallel!) -5. Sequential: merge_transcripts -> detect_topics -6. FORK_JOIN: generate_title || generate_summary -7. Sequential: finalize -> cleanup_consent -> post_zulip -> send_webhook - -**FORK_JOIN_DYNAMIC Pattern:** -```json -{ - "name": "fork_track_padding", - "taskReferenceName": "fork_track_padding", - "type": "FORK_JOIN_DYNAMIC", - "inputParameters": { - "dynamicTasks": "${generate_padding_tasks.output.tasks}", - "dynamicTasksInput": "${generate_padding_tasks.output.inputs}" - }, - "dynamicForkTasksParam": "dynamicTasks", - "dynamicForkTasksInputParamName": "dynamicTasksInput" -} -``` - -This requires a helper task that generates the dynamic fork structure based on track count. - -**Acceptance Criteria:** -- [ ] Valid Conductor workflow schema -- [ ] All task references match registered task definitions -- [ ] Input/output parameter mappings correct -- [ ] FORK_JOIN_DYNAMIC works with 1, 2, ... N tracks -- [ ] JOIN correctly collects all parallel results -- [ ] DAG renders correctly in Conductor UI - -**Dependencies:** TASK-002 through TASK-015 - -**Reference Files:** -- `docs/conductor-pipeline-mock/src/register_workflow.py` (lines 125-304) -- `CONDUCTOR_MIGRATION_REQUIREMENTS.md` (Module 3 section, Target Architecture diagram) - ---- - -### WFLOW-002: Implement Workflow Registration Script - -**Description:** -Create a script that registers the workflow definition with the Conductor server. - -**Files to Create:** -- `server/reflector/conductor/workflows/register.py` - -**Implementation Details:** -```python -import requests -from reflector.settings import settings - -def register_workflow(): - with open("diarization_pipeline.json") as f: - workflow = json.load(f) - - resp = requests.put( - f"{settings.CONDUCTOR_SERVER_URL}/metadata/workflow", - json=[workflow], - headers={"Content-Type": "application/json"}, - ) - resp.raise_for_status() -``` - -**Acceptance Criteria:** -- [ ] Workflow visible in Conductor UI -- [ ] Can start workflow via API -- [ ] DAG renders correctly in UI - -**Dependencies:** WFLOW-001 - -**Reference Files:** -- `docs/conductor-pipeline-mock/src/register_workflow.py` (lines 317-327) - ---- - -## Phase 2 (Continued): WebSocket Events - -### EVENT-001: Add PIPELINE_PROGRESS WebSocket Event - -**Description:** -Define a new WebSocket event type for granular pipeline progress tracking. - -**⚠️ Note:** Requires separate frontend ticket to add UI consumer for this event. - -**Files to Modify:** -- `server/reflector/db/transcripts.py` (add event type) -- `server/reflector/ws_manager.py` (ensure broadcast support) - -**Implementation Details:** -```python -# New event schema -class PipelineProgressEvent(BaseModel): - event: str = "PIPELINE_PROGRESS" - data: PipelineProgressData - -class PipelineProgressData(BaseModel): - workflow_id: str - current_step: str - step_index: int - total_steps: int - step_status: Literal["pending", "in_progress", "completed", "failed"] -``` - -**Acceptance Criteria:** -- [ ] Event schema defined -- [ ] Works with existing WebSocket infrastructure -- [ ] Frontend ticket created for progress UI consumer - -**Dependencies:** None - -**Reference Files:** -- `CONDUCTOR_MIGRATION_REQUIREMENTS.md` (Module 6 section) -- `server/reflector/pipelines/main_live_pipeline.py` (broadcast_to_sockets decorator) - ---- - -### EVENT-002: Emit Progress Events from Workers - -**Description:** -Modify workers to emit PIPELINE_PROGRESS events at start and completion of each task. - -**⚠️ Note:** Requires separate frontend ticket to add UI consumer (see EVENT-001). - -**Files to Modify:** -- All worker files in `server/reflector/conductor/workers/` - -**Implementation Details:** -```python -async def emit_progress(transcript_id: str, step: str, status: str, index: int, total: int): - ws_manager = get_ws_manager() - await ws_manager.send_json( - room_id=f"ts:{transcript_id}", - message={ - "event": "PIPELINE_PROGRESS", - "data": { - "current_step": step, - "step_index": index, - "total_steps": total, - "step_status": status, - } - } - ) - -@worker_task(task_definition_name="transcribe_track") -async def transcribe_track(task: Task) -> TaskResult: - await emit_progress(transcript_id, "transcribe_track", "in_progress", 6, 14) - # ... processing ... - await emit_progress(transcript_id, "transcribe_track", "completed", 6, 14) -``` - -**Acceptance Criteria:** -- [ ] Progress emitted at task start -- [ ] Progress emitted at task completion - -**Dependencies:** EVENT-001, TASK-002 through TASK-015 - ---- - -## Phase 4: Integration - -### INTEG-001: Modify Pipeline Trigger to Start Conductor Workflow - -**Description:** -Replace `task_pipeline_multitrack_process.delay()` with Conductor workflow start in `process_multitrack_recording`. -This single change captures BOTH webhook AND polling entry paths, since both converge at this function. - -**Files to Modify:** -- `server/reflector/worker/process.py` - -**Implementation Details:** -```python -# In _process_multitrack_recording_inner(), around line 289 -# Replace: -# task_pipeline_multitrack_process.delay( -# transcript_id=transcript.id, -# bucket_name=bucket_name, -# track_keys=filter_cam_audio_tracks(track_keys), -# ) -# With: - -if settings.CONDUCTOR_ENABLED: - from reflector.conductor.client import ConductorClientManager - from reflector.db.recordings import recordings_controller - - workflow_id = ConductorClientManager.start_workflow( - name="diarization_pipeline", - version=1, - input_data={ - "recording_id": recording_id, - "room_name": daily_room_name, - "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)], - "bucket_name": bucket_name, - "transcript_id": transcript.id, - "room_id": room.id, - } - ) - logger.info("Started Conductor workflow", workflow_id=workflow_id, transcript_id=transcript.id) - - # Store workflow_id on recording for status tracking - await recordings_controller.update(recording, {"workflow_id": workflow_id}) - - if not settings.CONDUCTOR_SHADOW_MODE: - return # Don't trigger Celery - -# Existing Celery trigger (runs in shadow mode or when Conductor disabled) -task_pipeline_multitrack_process.delay( - transcript_id=transcript.id, - bucket_name=bucket_name, - track_keys=filter_cam_audio_tracks(track_keys), -) -``` - -**Acceptance Criteria:** -- [ ] Conductor workflow started from process_multitrack_recording -- [ ] Workflow ID stored on Recording model -- [ ] Both webhook and polling paths covered (single integration point) -- [ ] Celery still triggered in shadow mode - -**Dependencies:** WFLOW-002, STATE-001 - -**Reference Files:** -- `server/reflector/worker/process.py` (lines 172-293) -- `CONDUCTOR_MIGRATION_REQUIREMENTS.md` (Module 4 section) - ---- - -### SHADOW-001: Implement Shadow Mode Toggle - -**Description:** -Add configuration and logic to run both Celery and Conductor pipelines simultaneously for comparison. - -**Files to Modify:** -- `server/reflector/settings.py` (already has CONDUCTOR_SHADOW_MODE from INFRA-003) -- `server/reflector/worker/process.py` (INTEG-001 already implements shadow mode logic) - -**Implementation Details:** -```python -# settings.py (already done in INFRA-003) -CONDUCTOR_SHADOW_MODE: bool = False - -# worker/process.py (in _process_multitrack_recording_inner) -if settings.CONDUCTOR_ENABLED: - workflow_id = ConductorClientManager.start_workflow(...) - await recordings_controller.update(recording, {"workflow_id": workflow_id}) - - if not settings.CONDUCTOR_SHADOW_MODE: - return # Conductor only - skip Celery - # If shadow mode, fall through to Celery trigger below - -# Celery trigger (runs when Conductor disabled OR in shadow mode) -task_pipeline_multitrack_process.delay(...) -``` - -**Acceptance Criteria:** -- [ ] Both pipelines triggered when CONDUCTOR_SHADOW_MODE=True -- [ ] Only Conductor triggered when CONDUCTOR_ENABLED=True and SHADOW_MODE=False -- [ ] Only Celery triggered when CONDUCTOR_ENABLED=False -- [ ] workflow_id stored on Recording model for comparison - -**Dependencies:** INTEG-001 - -**Note:** INTEG-001 already implements the shadow mode toggle logic. This task verifies -the implementation and adds any missing comparison/monitoring infrastructure. - -**Reference Files:** -- `CONDUCTOR_MIGRATION_REQUIREMENTS.md` (Phase 3: Shadow Mode) - ---- - -### SHADOW-002: Add Result Comparison - Content Fields - -**Description:** -Compare content fields (title, summaries, topics, word counts) between Celery and Conductor outputs. - -**Files to Create:** -- `server/reflector/conductor/shadow_compare.py` - -**Implementation Details:** -```python -async def compare_content_results(recording_id: str, workflow_id: str) -> dict: - """Compare content results from Celery and Conductor pipelines.""" - celery_transcript = await transcripts_controller.get_by_recording_id(recording_id) - workflow_status = ConductorClientManager.get_workflow_status(workflow_id) - - differences = [] - # Compare title - if celery_transcript.title != workflow_status.output.get("title"): - differences.append({"field": "title", ...}) - # Compare summaries, topics, word_count - ... - - return {"match": len(differences) == 0, "differences": differences} -``` - -**Acceptance Criteria:** -- [ ] Compares title, long_summary, short_summary -- [ ] Compares topic count and content -- [ ] Compares word_count -- [ ] Logs differences for debugging - -**Dependencies:** SHADOW-001 - ---- - -## Phase 5: Cutover - -### CUTOVER-001: Create Feature Flag for Conductor-Only Mode - -**Description:** -Enable Conductor-only mode by setting environment variables. No code changes required. - -**Files to Modify:** -- `.env` or environment configuration - -**Implementation Details:** +### Conductor ```bash -# .env (production) -CONDUCTOR_ENABLED=true # Enable Conductor -CONDUCTOR_SHADOW_MODE=false # Disable shadow mode (Conductor only) +# Start infrastructure +docker compose up -d conductor conductor-worker + +# Register workflow +docker compose exec conductor-worker uv run python -m reflector.conductor.workflows.register ``` -The logic is already implemented in INTEG-001: -```python -# worker/process.py (_process_multitrack_recording_inner) -if settings.CONDUCTOR_ENABLED: - workflow_id = ConductorClientManager.start_workflow(...) - if not settings.CONDUCTOR_SHADOW_MODE: - return # Conductor only - Celery not triggered -# Celery only reached if Conductor disabled or shadow mode enabled -task_pipeline_multitrack_process.delay(...) +### Hatchet +```bash +# Start infrastructure +docker compose up -d hatchet hatchet-worker + +# Workers auto-register on startup ``` -**Acceptance Criteria:** -- [ ] Set CONDUCTOR_ENABLED=true in production environment -- [ ] Set CONDUCTOR_SHADOW_MODE=false -- [ ] Verify Celery not triggered (check logs for "Started Conductor workflow") -- [ ] Can toggle back via environment variables without code changes +### Trigger Workflow +```bash +# Set provider in .env +DURABLE_WORKFLOW_PROVIDER=hatchet # or conductor -**Dependencies:** SHADOW-001 - -**Note:** This is primarily a configuration change. The code logic is already in place from INTEG-001. - ---- - -### CUTOVER-002: Add Fallback to Celery on Conductor Failure - -**Description:** -Implement automatic fallback to Celery pipeline if Conductor fails to start or process a workflow. - -**Files to Modify:** -- `server/reflector/worker/process.py` -- `server/reflector/conductor/client.py` - -**Implementation Details:** -```python -# In _process_multitrack_recording_inner() -if settings.CONDUCTOR_ENABLED: - try: - workflow_id = ConductorClientManager.start_workflow( - name="diarization_pipeline", - version=1, - input_data={...} - ) - logger.info("Conductor workflow started", workflow_id=workflow_id, transcript_id=transcript.id) - await recordings_controller.update(recording, {"workflow_id": workflow_id}) - - if not settings.CONDUCTOR_SHADOW_MODE: - return # Success - don't trigger Celery - except Exception as e: - logger.error( - "Conductor workflow start failed, falling back to Celery", - error=str(e), - transcript_id=transcript.id, - exc_info=True, - ) - # Fall through to Celery trigger below - -# Celery fallback (runs on Conductor failure, or when disabled, or in shadow mode) -task_pipeline_multitrack_process.delay( - transcript_id=transcript.id, - bucket_name=bucket_name, - track_keys=filter_cam_audio_tracks(track_keys), -) -``` - -**Acceptance Criteria:** -- [ ] Celery triggered on Conductor connection failure -- [ ] Celery triggered on workflow start failure -- [ ] Errors logged with full context for debugging -- [ ] workflow_id still stored if partially successful - -**Dependencies:** CUTOVER-001 - ---- - -## Phase 6: Cleanup - -### CLEANUP-001: Remove Deprecated Celery Task Code - -**Description:** -After successful migration, remove the old Celery-based pipeline code. - -**Files to Modify:** -- `server/reflector/pipelines/main_multitrack_pipeline.py` - Remove entire file -- `server/reflector/worker/process.py` - Remove `task_pipeline_multitrack_process.delay()` call -- `server/reflector/pipelines/main_live_pipeline.py` - Remove shared utilities if unused - -**Implementation Details:** -```python -# worker/process.py - Remove Celery fallback entirely -if settings.CONDUCTOR_ENABLED: - workflow_id = ConductorClientManager.start_workflow(...) - await recordings_controller.update(recording, {"workflow_id": workflow_id}) - return # No Celery fallback - -# Delete this: -# task_pipeline_multitrack_process.delay(...) -``` - -**Acceptance Criteria:** -- [ ] `main_multitrack_pipeline.py` deleted -- [ ] Celery trigger removed from `worker/process.py` -- [ ] Old task imports removed -- [ ] No new recordings processed via Celery -- [ ] Code removed after stability period (1-2 weeks) - -**Dependencies:** CUTOVER-001 - ---- - -### CLEANUP-002: Update Documentation - -**Description:** -Update all documentation to reflect the new Conductor-based architecture. - -**Files to Modify:** -- `CLAUDE.md` -- `README.md` -- `docs/` (if applicable) - -**Files to Archive:** -- `CONDUCTOR_MIGRATION_REQUIREMENTS.md` (move to docs/archive/) - -**Acceptance Criteria:** -- [ ] Architecture diagrams updated -- [ ] API documentation reflects new endpoints -- [ ] Runbooks updated for Conductor operations - -**Dependencies:** CLEANUP-001 - ---- - -## Testing Tasks - -**⚠️ Note:** All test tasks should be deferred to human tester if automated testing proves too complex or time-consuming. - -### TEST-001a: Integration Tests - API Workers - -**Description:** -Write integration tests for get_recording and get_participants workers. - -**Files to Create:** -- `server/tests/conductor/test_workers_api.py` - -**Implementation Details:** -```python -@pytest.mark.asyncio -async def test_get_recording_worker(): - with patch("reflector.conductor.workers.get_recording.create_platform_client") as mock: - mock.return_value.__aenter__.return_value.get_recording.return_value = MockRecording() - - task = Task(input_data={"recording_id": "rec_123"}) - result = await get_recording(task) - - assert result.status == TaskResultStatus.COMPLETED - assert result.output_data["id"] == "rec_123" -``` - -**Acceptance Criteria:** -- [ ] get_recording worker tested with mock Daily.co API -- [ ] get_participants worker tested with mock response -- [ ] Error handling tested (API failures) - -**Dependencies:** TASK-002, TASK-003 - ---- - -### TEST-001b: Integration Tests - Audio Processing Workers - -**Description:** -Write integration tests for pad_track, mixdown_tracks, and generate_waveform workers. - -**Files to Create:** -- `server/tests/conductor/test_workers_audio.py` - -**Acceptance Criteria:** -- [ ] pad_track worker tested with mock S3 and sample WebM -- [ ] mixdown_tracks worker tested with mock audio streams -- [ ] generate_waveform worker tested -- [ ] PyAV filter graph execution verified - -**Dependencies:** TASK-004c, TASK-005b, TASK-006 - ---- - -### TEST-001c: Integration Tests - Transcription Workers - -**Description:** -Write integration tests for transcribe_track and merge_transcripts workers. - -**Files to Create:** -- `server/tests/conductor/test_workers_transcription.py` - -**Acceptance Criteria:** -- [ ] transcribe_track worker tested with mock Modal.com response -- [ ] merge_transcripts worker tested with multiple track inputs -- [ ] Word sorting by timestamp verified - -**Dependencies:** TASK-007, TASK-008 - ---- - -### TEST-001d: Integration Tests - LLM Workers - -**Description:** -Write integration tests for detect_topics, generate_title, and generate_summary workers. - -**Files to Create:** -- `server/tests/conductor/test_workers_llm.py` - -**Acceptance Criteria:** -- [ ] detect_topics worker tested with mock LLM response -- [ ] generate_title worker tested -- [ ] generate_summary worker tested -- [ ] WebSocket event broadcasting verified - -**Dependencies:** TASK-009, TASK-010, TASK-011 - ---- - -### TEST-001e: Integration Tests - Finalization Workers - -**Description:** -Write integration tests for finalize, cleanup_consent, post_zulip, and send_webhook workers. - -**Files to Create:** -- `server/tests/conductor/test_workers_finalization.py` - -**Acceptance Criteria:** -- [ ] finalize worker tested (DB update) -- [ ] cleanup_consent worker tested (S3 deletion) -- [ ] post_zulip worker tested with mock API -- [ ] send_webhook worker tested with HMAC verification - -**Dependencies:** TASK-012, TASK-013, TASK-014, TASK-015 - ---- - -### TEST-002: E2E Test for Complete Workflow - -**Description:** -Create an end-to-end test that runs the complete Conductor workflow with mock services. - -**Files to Create:** -- `server/tests/conductor/test_workflow_e2e.py` - -**Implementation Details:** -```python -@pytest.mark.asyncio -async def test_complete_diarization_workflow(): - # Start Conductor in test mode - workflow_id = ConductorClientManager.start_workflow( - "diarization_pipeline", 1, - {"recording_id": "test_123", "tracks": [...]} - ) - - # Wait for completion - status = await wait_for_workflow(workflow_id, timeout=60) - - assert status.status == "COMPLETED" - assert status.output["title"] is not None -``` - -**Acceptance Criteria:** -- [ ] Complete workflow runs successfully -- [ ] All tasks execute in correct order -- [ ] FORK_JOIN_DYNAMIC parallelism works -- [ ] Output matches expected schema - -**Dependencies:** WFLOW-002 - ---- - -### TEST-003: Shadow Mode Comparison Tests - -**Description:** -Write tests that verify Celery and Conductor produce equivalent results. - -**Files to Create:** -- `server/tests/conductor/test_shadow_compare.py` - -**Acceptance Criteria:** -- [ ] Same input produces same output -- [ ] Timing differences documented -- [ ] Edge cases handled - -**Dependencies:** SHADOW-002b - ---- - -## Appendix: Task Timeout Reference - -| Task | Timeout (s) | Response Timeout (s) | Retry Count | -|------|-------------|---------------------|-------------| -| get_recording | 60 | 30 | 3 | -| get_participants | 60 | 30 | 3 | -| pad_track | 300 | 120 | 3 | -| mixdown_tracks | 600 | 300 | 3 | -| generate_waveform | 120 | 60 | 3 | -| transcribe_track | 1800 | 900 | 3 | -| merge_transcripts | 60 | 30 | 3 | -| detect_topics | 300 | 120 | 3 | -| generate_title | 60 | 30 | 3 | -| generate_summary | 300 | 120 | 3 | -| finalize | 60 | 30 | 3 | -| cleanup_consent | 60 | 30 | 3 | -| post_zulip | 60 | 30 | 5 | -| send_webhook | 60 | 30 | 30 | - ---- - -## Appendix: File Structure After Migration - -``` -server/reflector/ -├── conductor/ -│ ├── __init__.py -│ ├── client.py # Conductor SDK wrapper -│ ├── cache.py # Idempotency cache -│ ├── shadow_compare.py # Shadow mode comparison -│ ├── tasks/ -│ │ ├── __init__.py -│ │ ├── definitions.py # Task definitions with timeouts -│ │ └── register.py # Registration script -│ ├── workers/ -│ │ ├── __init__.py -│ │ ├── get_recording.py -│ │ ├── get_participants.py -│ │ ├── pad_track.py -│ │ ├── mixdown_tracks.py -│ │ ├── generate_waveform.py -│ │ ├── transcribe_track.py -│ │ ├── merge_transcripts.py -│ │ ├── detect_topics.py -│ │ ├── generate_title.py -│ │ ├── generate_summary.py -│ │ ├── finalize.py -│ │ ├── cleanup_consent.py -│ │ ├── post_zulip.py -│ │ └── send_webhook.py -│ └── workflows/ -│ ├── diarization_pipeline.json -│ └── register.py -├── views/ -│ └── conductor.py # Health & status endpoints -└── ...existing files... +# Process a Daily.co recording via webhook or API +# The pipeline trigger automatically uses the configured provider ``` diff --git a/docker-compose.yml b/docker-compose.yml index a0dbc068..d2dc6e5a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -48,6 +48,20 @@ services: conductor: condition: service_healthy + hatchet-worker: + build: + context: server + volumes: + - ./server/:/app/ + - /app/.venv + env_file: + - ./server/.env + environment: + ENTRYPOINT: hatchet-worker + depends_on: + hatchet: + condition: service_healthy + redis: image: redis:7.2 ports: @@ -81,8 +95,8 @@ services: conductor: image: conductoross/conductor-standalone:3.15.0 ports: - - 8180:8080 - - 5001:5000 + - "8180:8080" + - "5001:5000" environment: - conductor.db.type=memory healthcheck: @@ -91,6 +105,54 @@ services: timeout: 10s retries: 5 + hatchet-postgres: + image: postgres:15.6 + command: postgres -c 'max_connections=200' + restart: always + environment: + - POSTGRES_USER=hatchet + - POSTGRES_PASSWORD=hatchet + - POSTGRES_DB=hatchet + ports: + - "5436:5432" + volumes: + - ./data/hatchet-postgres:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -d hatchet -U hatchet"] + interval: 10s + timeout: 10s + retries: 5 + start_period: 10s + + hatchet: + image: ghcr.io/hatchet-dev/hatchet/hatchet-lite:latest + ports: + - "8889:8888" + - "7078:7077" + depends_on: + hatchet-postgres: + condition: service_healthy + environment: + DATABASE_URL: "postgresql://hatchet:hatchet@hatchet-postgres:5432/hatchet?sslmode=disable" + SERVER_AUTH_COOKIE_DOMAIN: localhost + SERVER_AUTH_COOKIE_INSECURE: "t" + SERVER_GRPC_BIND_ADDRESS: "0.0.0.0" + SERVER_GRPC_INSECURE: "t" + SERVER_GRPC_BROADCAST_ADDRESS: hatchet:7077 + SERVER_GRPC_PORT: "7077" + SERVER_URL: http://localhost:8889 + SERVER_AUTH_SET_EMAIL_VERIFIED: "t" + SERVER_DEFAULT_ENGINE_VERSION: "V1" + SERVER_INTERNAL_CLIENT_INTERNAL_GRPC_BROADCAST_ADDRESS: hatchet:7077 + volumes: + - ./data/hatchet-config:/config + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8888/api/live"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + networks: default: attachable: true diff --git a/server/HATCHET_LLM_OBSERVATIONS.md b/server/HATCHET_LLM_OBSERVATIONS.md new file mode 100644 index 00000000..299110d2 --- /dev/null +++ b/server/HATCHET_LLM_OBSERVATIONS.md @@ -0,0 +1,339 @@ +# Hatchet Migration - LLM Debugging Observations + +This document captures hard-won debugging insights from implementing the multitrack diarization pipeline with Hatchet. These observations are particularly relevant for LLM assistants working on this codebase. + +## Architecture Context + +- **Hatchet SDK v1.21+** uses async workers with gRPC for task polling +- Workers connect to Hatchet server via gRPC (port 7077) and trigger workflows via REST (port 8888) +- `hatchet-lite` image bundles server, engine, and database in one container +- Tasks are decorated with `@workflow.task()` (not `@hatchet.step()` as in older examples) +- Workflow input is validated via Pydantic models with `input_validator=` parameter + +--- + +## Challenge 1: SDK Version API Breaking Changes + +### Symptoms +``` +AttributeError: 'V1WorkflowRunDetails' object has no attribute 'workflow_run_id' +``` + +### Root Cause +Hatchet SDK v1.21+ changed the response structure for workflow creation. Old examples show: +```python +result = await client.runs.aio_create(workflow_name, input_data) +return result.workflow_run_id # OLD - doesn't work +``` + +### Resolution +Access the run ID through the new nested structure: +```python +result = await client.runs.aio_create(workflow_name, input_data) +return result.run.metadata.id # NEW - SDK v1.21+ +``` + +### Key Insight +**Don't trust documentation or examples.** Read the SDK source code or use IDE autocomplete to discover actual attribute names. The SDK evolves faster than docs. + +--- + +## Challenge 2: Worker Appears Hung at "starting runner..." + +### Symptoms +``` +[INFO] Starting Hatchet workers +[INFO] Starting Hatchet worker polling... +[INFO] STARTING HATCHET... +[INFO] starting runner... +# ... nothing else, appears stuck +``` + +### Root Cause +Without debug mode, Hatchet SDK doesn't log: +- Workflow registration +- gRPC connection status +- Heartbeat activity +- Action listener acquisition + +The worker IS working, you just can't see it. + +### Resolution +Always enable debug mode during development: +```bash +HATCHET_DEBUG=true +``` + +With debug enabled, you'll see the actual activity: +``` +[DEBUG] 'worker-name' waiting for ['workflow:task1', 'workflow:task2'] +[DEBUG] starting action listener: worker-name +[DEBUG] acquired action listener: 562d00a8-8895-42a1-b65b-46f905c902f9 +[DEBUG] sending heartbeat +``` + +### Key Insight +**Start every Hatchet debugging session with `HATCHET_DEBUG=true`.** Silent workers waste hours of debugging time. + +--- + +## Challenge 3: Docker Networking + JWT Token URL Conflicts + +### Symptoms +``` +grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: + status = StatusCode.UNAVAILABLE + details = "failed to connect to all addresses" +``` + +### Root Cause +The Hatchet API token embeds URLs: +```json +{ + "aud": "http://localhost:8889", + "grpc_broadcast_address": "localhost:7077", + "server_url": "http://localhost:8889" +} +``` + +Inside Docker containers, `localhost` refers to the container itself, not the Hatchet server. + +### Resolution +Override the token-embedded URLs with environment variables: +```bash +# In .env or docker-compose environment +HATCHET_CLIENT_HOST_PORT=hatchet:7077 +HATCHET_CLIENT_SERVER_URL=http://hatchet:8888 +HATCHET_CLIENT_TLS_STRATEGY=none +``` + +### Key Insight +**The JWT token is not the final word on connection settings.** Environment variables override token-embedded URLs, which is essential for Docker networking. + +--- + +## Challenge 4: Workflow Name Case Sensitivity + +### Symptoms +``` +BadRequestException: (400) +HTTP response body: errors=[APIError(description='workflow names not found: diarizationpipeline')] +``` + +### Root Cause +Hatchet uses the exact workflow name you define for triggering: +```python +diarization_pipeline = hatchet.workflow( + name="DiarizationPipeline", # Use THIS exact name to trigger + input_validator=PipelineInput +) +``` + +Internally, task identifiers are lowercased (`diarizationpipeline:get_recording`), but workflow triggers must match the defined name. + +### Resolution +```python +# Correct +await client.start_workflow('DiarizationPipeline', input_data) + +# Wrong +await client.start_workflow('diarizationpipeline', input_data) +``` + +### Key Insight +**Workflow names are case-sensitive for triggering, but task refs are lowercase.** Don't conflate the two. + +--- + +## Challenge 5: Pydantic Response Object Iteration + +### Symptoms +``` +AttributeError: 'tuple' object has no attribute 'participant_id' +``` + +### Root Cause +When API responses return Pydantic models with list fields: +```python +class MeetingParticipantsResponse(BaseModel): + data: List[MeetingParticipant] +``` + +Iterating the response object directly is wrong: +```python +for p in participants: # WRONG - iterates over model fields as tuples +``` + +### Resolution +Access the `.data` attribute explicitly: +```python +for p in participants.data: # CORRECT - iterates over list items + print(p.participant_id) +``` + +### Key Insight +**Pydantic models with list fields require explicit `.data` access.** The model itself is not iterable in the expected way. + +--- + +## Challenge 6: Database Connections in Async Workers + +### Symptoms +``` +InterfaceError: cannot perform operation: another operation is in progress +``` + +### Root Cause +Similar to Conductor, Hatchet workers may inherit stale database connections. Each task runs in an async context that may not share the same event loop as cached connections. + +### Resolution +Create fresh database connections per task: +```python +async def _get_fresh_db_connection(): + """Create fresh database connection for worker task.""" + import databases + from reflector.db import _database_context + from reflector.settings import settings + + _database_context.set(None) + db = databases.Database(settings.DATABASE_URL) + _database_context.set(db) + await db.connect() + return db + +async def _close_db_connection(db): + await db.disconnect() + _database_context.set(None) +``` + +### Key Insight +**Cached singletons (DB, HTTP clients) are unsafe in workflow workers.** Always create fresh connections. + +--- + +## Challenge 7: Child Workflow Fan-out Pattern + +### Symptoms +Child workflows spawn but parent doesn't wait for completion, or results aren't collected. + +### Root Cause +Hatchet child workflows need explicit spawning and result collection: +```python +# Spawning children +child_runs = await asyncio.gather(*[ + child_workflow.aio_run(child_input) + for child_input in inputs +]) + +# Results are returned directly from aio_run() +``` + +### Resolution +Use `aio_run()` for child workflows and `asyncio.gather()` for parallelism: +```python +@parent_workflow.task(parents=[setup_task]) +async def process_tracks(input: ParentInput, ctx: Context) -> dict: + child_coroutines = [ + track_workflow.aio_run(TrackInput(track_index=i, ...)) + for i in range(len(input.tracks)) + ] + + results = await asyncio.gather(*child_coroutines, return_exceptions=True) + + # Handle failures + for i, result in enumerate(results): + if isinstance(result, Exception): + logger.error(f"Track {i} failed: {result}") + + return {"track_results": [r for r in results if not isinstance(r, Exception)]} +``` + +### Key Insight +**Child workflows in Hatchet return results directly.** No need to poll for completion like in Conductor. + +--- + +## Debugging Workflow + +### 1. Enable Debug Mode First +```bash +HATCHET_DEBUG=true +``` + +### 2. Verify Worker Registration +Look for this in debug logs: +``` +[DEBUG] 'worker-name' waiting for ['workflow:task1', 'workflow:task2', ...] +[DEBUG] acquired action listener: {uuid} +``` + +### 3. Test Workflow Trigger Separately +```python +docker exec server uv run python -c " +from reflector.hatchet.client import HatchetClientManager +from reflector.hatchet.workflows.diarization_pipeline import PipelineInput +import asyncio + +async def test(): + input_data = PipelineInput( + transcript_id='test', + recording_id=None, + room_name='test-room', + bucket_name='bucket', + tracks=[], + ) + run_id = await HatchetClientManager.start_workflow( + 'DiarizationPipeline', + input_data.model_dump() + ) + print(f'Triggered: {run_id}') + +asyncio.run(test()) +" +``` + +### 4. Check Hatchet Server Logs +```bash +docker logs reflector-hatchet-1 --tail 50 +``` + +Look for `WRN` entries indicating API errors or connection issues. + +### 5. Verify gRPC Connectivity +```python +docker exec worker python -c " +import socket +sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +result = sock.connect_ex(('hatchet', 7077)) +print(f'gRPC port 7077: {\"reachable\" if result == 0 else \"blocked\"}')" +``` + +### 6. Force Container Rebuild +Volume mounts may cache old bytecode: +```bash +docker compose up -d --build --force-recreate hatchet-worker +``` + +--- + +## Common Gotchas Summary + +| Issue | Signal | Fix | +|-------|--------|-----| +| SDK API changed | `AttributeError` on result | Check SDK source for actual attributes | +| Worker appears stuck | Only "starting runner..." | Enable `HATCHET_DEBUG=true` | +| Can't connect from Docker | gRPC unavailable | Set `HATCHET_CLIENT_HOST_PORT` and `_SERVER_URL` | +| Workflow not found | 400 Bad Request | Use exact case-sensitive workflow name | +| Tuple iteration error | `'tuple' has no attribute` | Access `.data` on Pydantic response models | +| DB conflicts | "another operation in progress" | Fresh DB connection per task | +| Old code running | Fixed code but same error | Force rebuild container, clear `__pycache__` | + +--- + +## Files Most Likely to Need Hatchet-Specific Handling + +- `server/reflector/hatchet/workflows/*.py` - Workflow and task definitions +- `server/reflector/hatchet/client.py` - Client wrapper, SDK version compatibility +- `server/reflector/hatchet/run_workers.py` - Worker startup and registration +- `server/reflector/hatchet/progress.py` - Progress emission for UI updates +- `docker-compose.yml` - Hatchet infrastructure services diff --git a/server/pyproject.toml b/server/pyproject.toml index 3cea72e5..a3702a7e 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -40,6 +40,7 @@ dependencies = [ "webvtt-py>=0.5.0", "icalendar>=6.0.0", "conductor-python>=1.2.3", + "hatchet-sdk>=0.47.0", ] [dependency-groups] @@ -135,5 +136,10 @@ select = [ "reflector/processors/summary/summary_builder.py" = ["E501"] "gpu/modal_deployments/**.py" = ["PLC0415"] "reflector/tools/**.py" = ["PLC0415"] +"reflector/hatchet/run_workers.py" = ["PLC0415"] +"reflector/hatchet/workflows/**.py" = ["PLC0415"] +"reflector/conductor/run_workers.py" = ["PLC0415"] +"reflector/conductor/workers/**.py" = ["PLC0415"] +"reflector/views/hatchet.py" = ["PLC0415"] "migrations/versions/**.py" = ["PLC0415"] "tests/**.py" = ["PLC0415"] diff --git a/server/reflector/app.py b/server/reflector/app.py index 4913d366..f37e19bd 100644 --- a/server/reflector/app.py +++ b/server/reflector/app.py @@ -14,6 +14,7 @@ from reflector.metrics import metrics_init from reflector.settings import settings from reflector.views.conductor import router as conductor_router from reflector.views.daily import router as daily_router +from reflector.views.hatchet import router as hatchet_router from reflector.views.meetings import router as meetings_router from reflector.views.rooms import router as rooms_router from reflector.views.rtc_offer import router as rtc_offer_router @@ -100,6 +101,7 @@ app.include_router(zulip_router, prefix="/v1") app.include_router(whereby_router, prefix="/v1") app.include_router(daily_router, prefix="/v1/daily") app.include_router(conductor_router, prefix="/v1") +app.include_router(hatchet_router, prefix="/v1") add_pagination(app) # prepare celery diff --git a/server/reflector/hatchet/__init__.py b/server/reflector/hatchet/__init__.py new file mode 100644 index 00000000..08055550 --- /dev/null +++ b/server/reflector/hatchet/__init__.py @@ -0,0 +1,6 @@ +"""Hatchet workflow orchestration for Reflector.""" + +from reflector.hatchet.client import HatchetClientManager +from reflector.hatchet.progress import emit_progress, emit_progress_async + +__all__ = ["HatchetClientManager", "emit_progress", "emit_progress_async"] diff --git a/server/reflector/hatchet/client.py b/server/reflector/hatchet/client.py new file mode 100644 index 00000000..bc3a63f0 --- /dev/null +++ b/server/reflector/hatchet/client.py @@ -0,0 +1,48 @@ +"""Hatchet Python client wrapper.""" + +from hatchet_sdk import Hatchet + +from reflector.settings import settings + + +class HatchetClientManager: + """Singleton manager for Hatchet client connections.""" + + _instance: Hatchet | None = None + + @classmethod + def get_client(cls) -> Hatchet: + """Get or create the Hatchet client.""" + if cls._instance is None: + if not settings.HATCHET_CLIENT_TOKEN: + raise ValueError("HATCHET_CLIENT_TOKEN must be set") + + cls._instance = Hatchet( + debug=settings.HATCHET_DEBUG, + ) + return cls._instance + + @classmethod + async def start_workflow( + cls, workflow_name: str, input_data: dict, key: str | None = None + ) -> str: + """Start a workflow and return the workflow run ID.""" + client = cls.get_client() + result = await client.runs.aio_create( + workflow_name, + input_data, + ) + # SDK v1.21+ returns V1WorkflowRunDetails with run.metadata.id + return result.run.metadata.id + + @classmethod + async def get_workflow_status(cls, workflow_run_id: str) -> dict: + """Get the current status of a workflow run.""" + client = cls.get_client() + run = await client.runs.aio_get(workflow_run_id) + return run.to_dict() + + @classmethod + def reset(cls) -> None: + """Reset the client instance (for testing).""" + cls._instance = None diff --git a/server/reflector/hatchet/progress.py b/server/reflector/hatchet/progress.py new file mode 100644 index 00000000..2d6d54aa --- /dev/null +++ b/server/reflector/hatchet/progress.py @@ -0,0 +1,120 @@ +"""Progress event emission for Hatchet workers.""" + +import asyncio +from typing import Literal + +from reflector.db.transcripts import PipelineProgressData +from reflector.logger import logger +from reflector.ws_manager import get_ws_manager + +# Step mapping for progress tracking (matches Conductor pipeline) +PIPELINE_STEPS = { + "get_recording": 1, + "get_participants": 2, + "pad_track": 3, # Fork tasks share same step + "mixdown_tracks": 4, + "generate_waveform": 5, + "transcribe_track": 6, # Fork tasks share same step + "merge_transcripts": 7, + "detect_topics": 8, + "generate_title": 9, # Fork tasks share same step + "generate_summary": 9, # Fork tasks share same step + "finalize": 10, + "cleanup_consent": 11, + "post_zulip": 12, + "send_webhook": 13, +} + +TOTAL_STEPS = 13 + + +async def _emit_progress_async( + transcript_id: str, + step: str, + status: Literal["pending", "in_progress", "completed", "failed"], + workflow_id: str | None = None, +) -> None: + """Async implementation of progress emission.""" + ws_manager = get_ws_manager() + step_index = PIPELINE_STEPS.get(step, 0) + + data = PipelineProgressData( + workflow_id=workflow_id, + current_step=step, + step_index=step_index, + total_steps=TOTAL_STEPS, + step_status=status, + ) + + await ws_manager.send_json( + room_id=f"ts:{transcript_id}", + message={ + "event": "PIPELINE_PROGRESS", + "data": data.model_dump(), + }, + ) + + logger.debug( + "[Hatchet Progress] Emitted", + transcript_id=transcript_id, + step=step, + status=status, + step_index=step_index, + ) + + +def emit_progress( + transcript_id: str, + step: str, + status: Literal["pending", "in_progress", "completed", "failed"], + workflow_id: str | None = None, +) -> None: + """Emit a pipeline progress event (sync wrapper for Hatchet workers). + + Args: + transcript_id: The transcript ID to emit progress for + step: The current step name (e.g., "transcribe_track") + status: The step status + workflow_id: Optional workflow run ID + """ + try: + # Get or create event loop for sync context + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop is not None and loop.is_running(): + # Already in async context, schedule the coroutine + asyncio.create_task( + _emit_progress_async(transcript_id, step, status, workflow_id) + ) + else: + # Not in async context, run synchronously + asyncio.run(_emit_progress_async(transcript_id, step, status, workflow_id)) + except Exception as e: + # Progress emission should never break the pipeline + logger.warning( + "[Hatchet Progress] Failed to emit progress event", + error=str(e), + transcript_id=transcript_id, + step=step, + ) + + +async def emit_progress_async( + transcript_id: str, + step: str, + status: Literal["pending", "in_progress", "completed", "failed"], + workflow_id: str | None = None, +) -> None: + """Async version of emit_progress for use in async Hatchet tasks.""" + try: + await _emit_progress_async(transcript_id, step, status, workflow_id) + except Exception as e: + logger.warning( + "[Hatchet Progress] Failed to emit progress event", + error=str(e), + transcript_id=transcript_id, + step=step, + ) diff --git a/server/reflector/hatchet/run_workers.py b/server/reflector/hatchet/run_workers.py new file mode 100644 index 00000000..96eb6bfa --- /dev/null +++ b/server/reflector/hatchet/run_workers.py @@ -0,0 +1,59 @@ +""" +Run Hatchet workers for the diarization pipeline. + +Usage: + uv run -m reflector.hatchet.run_workers + + # Or via docker: + docker compose exec server uv run -m reflector.hatchet.run_workers +""" + +import signal +import sys + +from reflector.logger import logger +from reflector.settings import settings + + +def main() -> None: + """Start Hatchet worker polling.""" + if not settings.HATCHET_ENABLED: + logger.error("HATCHET_ENABLED is False, not starting workers") + sys.exit(1) + + if not settings.HATCHET_CLIENT_TOKEN: + logger.error("HATCHET_CLIENT_TOKEN is not set") + sys.exit(1) + + logger.info( + "Starting Hatchet workers", + debug=settings.HATCHET_DEBUG, + ) + + # Import workflows to register them + from reflector.hatchet.client import HatchetClientManager + from reflector.hatchet.workflows import diarization_pipeline, track_workflow + + hatchet = HatchetClientManager.get_client() + + # Create worker with both workflows + worker = hatchet.worker( + "reflector-diarization-worker", + workflows=[diarization_pipeline, track_workflow], + ) + + # Handle graceful shutdown + def shutdown_handler(signum: int, frame) -> None: + logger.info("Received shutdown signal, stopping workers...") + # Worker cleanup happens automatically on exit + sys.exit(0) + + signal.signal(signal.SIGINT, shutdown_handler) + signal.signal(signal.SIGTERM, shutdown_handler) + + logger.info("Starting Hatchet worker polling...") + worker.start() + + +if __name__ == "__main__": + main() diff --git a/server/reflector/hatchet/workflows/__init__.py b/server/reflector/hatchet/workflows/__init__.py new file mode 100644 index 00000000..17d3876d --- /dev/null +++ b/server/reflector/hatchet/workflows/__init__.py @@ -0,0 +1,14 @@ +"""Hatchet workflow definitions.""" + +from reflector.hatchet.workflows.diarization_pipeline import ( + PipelineInput, + diarization_pipeline, +) +from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow + +__all__ = [ + "diarization_pipeline", + "track_workflow", + "PipelineInput", + "TrackInput", +] diff --git a/server/reflector/hatchet/workflows/diarization_pipeline.py b/server/reflector/hatchet/workflows/diarization_pipeline.py new file mode 100644 index 00000000..4bbae444 --- /dev/null +++ b/server/reflector/hatchet/workflows/diarization_pipeline.py @@ -0,0 +1,808 @@ +""" +Hatchet main workflow: DiarizationPipeline + +Multitrack diarization pipeline for Daily.co recordings. +Orchestrates the full processing flow from recording metadata to final transcript. +""" + +import asyncio +import tempfile +from datetime import timedelta +from pathlib import Path + +import av +from hatchet_sdk import Context +from pydantic import BaseModel + +from reflector.hatchet.client import HatchetClientManager +from reflector.hatchet.progress import emit_progress_async +from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow +from reflector.logger import logger + +# Audio constants +OPUS_STANDARD_SAMPLE_RATE = 48000 +OPUS_DEFAULT_BIT_RATE = 64000 +PRESIGNED_URL_EXPIRATION_SECONDS = 7200 + + +class PipelineInput(BaseModel): + """Input to trigger the diarization pipeline.""" + + recording_id: str | None + room_name: str | None + tracks: list[dict] # List of {"s3_key": str} + bucket_name: str + transcript_id: str + room_id: str | None = None + + +# Get hatchet client and define workflow +hatchet = HatchetClientManager.get_client() + +diarization_pipeline = hatchet.workflow( + name="DiarizationPipeline", input_validator=PipelineInput +) + + +# ============================================================================ +# Helper Functions +# ============================================================================ + + +async def _get_fresh_db_connection(): + """Create fresh database connection for subprocess.""" + import databases + + from reflector.db import _database_context + from reflector.settings import settings + + _database_context.set(None) + db = databases.Database(settings.DATABASE_URL) + _database_context.set(db) + await db.connect() + return db + + +async def _close_db_connection(db): + """Close database connection.""" + from reflector.db import _database_context + + await db.disconnect() + _database_context.set(None) + + +def _get_storage(): + """Create fresh storage instance.""" + from reflector.settings import settings + from reflector.storage.storage_aws import AwsStorage + + return AwsStorage( + aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME, + aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION, + aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID, + aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY, + ) + + +# ============================================================================ +# Pipeline Tasks +# ============================================================================ + + +@diarization_pipeline.task(execution_timeout=timedelta(seconds=60), retries=3) +async def get_recording(input: PipelineInput, ctx: Context) -> dict: + """Fetch recording metadata from Daily.co API.""" + logger.info("[Hatchet] get_recording", recording_id=input.recording_id) + + await emit_progress_async( + input.transcript_id, "get_recording", "in_progress", ctx.workflow_run_id + ) + + try: + from reflector.dailyco_api.client import DailyApiClient + from reflector.settings import settings + + if not input.recording_id: + # No recording_id in reprocess path - return minimal data + await emit_progress_async( + input.transcript_id, "get_recording", "completed", ctx.workflow_run_id + ) + return { + "id": None, + "mtg_session_id": None, + "room_name": input.room_name, + "duration": 0, + } + + if not settings.DAILY_API_KEY: + raise ValueError("DAILY_API_KEY not configured") + + async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client: + recording = await client.get_recording(input.recording_id) + + logger.info( + "[Hatchet] get_recording complete", + recording_id=input.recording_id, + room_name=recording.room_name, + duration=recording.duration, + ) + + await emit_progress_async( + input.transcript_id, "get_recording", "completed", ctx.workflow_run_id + ) + + return { + "id": recording.id, + "mtg_session_id": recording.mtgSessionId, + "room_name": recording.room_name, + "duration": recording.duration, + } + + except Exception as e: + logger.error("[Hatchet] get_recording failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "get_recording", "failed", ctx.workflow_run_id + ) + raise + + +@diarization_pipeline.task( + parents=[get_recording], execution_timeout=timedelta(seconds=60), retries=3 +) +async def get_participants(input: PipelineInput, ctx: Context) -> dict: + """Fetch participant list from Daily.co API.""" + logger.info("[Hatchet] get_participants", transcript_id=input.transcript_id) + + await emit_progress_async( + input.transcript_id, "get_participants", "in_progress", ctx.workflow_run_id + ) + + try: + recording_data = ctx.task_output(get_recording) + mtg_session_id = recording_data.get("mtg_session_id") + + from reflector.dailyco_api.client import DailyApiClient + from reflector.settings import settings + + if not mtg_session_id or not settings.DAILY_API_KEY: + # Return empty participants if no session ID + await emit_progress_async( + input.transcript_id, + "get_participants", + "completed", + ctx.workflow_run_id, + ) + return {"participants": [], "num_tracks": len(input.tracks)} + + async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client: + participants = await client.get_meeting_participants(mtg_session_id) + + participants_list = [ + {"participant_id": p.participant_id, "user_name": p.user_name} + for p in participants.data + ] + + logger.info( + "[Hatchet] get_participants complete", + participant_count=len(participants_list), + ) + + await emit_progress_async( + input.transcript_id, "get_participants", "completed", ctx.workflow_run_id + ) + + return {"participants": participants_list, "num_tracks": len(input.tracks)} + + except Exception as e: + logger.error("[Hatchet] get_participants failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "get_participants", "failed", ctx.workflow_run_id + ) + raise + + +@diarization_pipeline.task( + parents=[get_participants], execution_timeout=timedelta(seconds=600), retries=3 +) +async def process_tracks(input: PipelineInput, ctx: Context) -> dict: + """Spawn child workflows for each track (dynamic fan-out). + + Processes pad_track and transcribe_track for each audio track in parallel. + """ + logger.info( + "[Hatchet] process_tracks", + num_tracks=len(input.tracks), + transcript_id=input.transcript_id, + ) + + # Spawn child workflows for each track + child_coroutines = [ + track_workflow.aio_run( + TrackInput( + track_index=i, + s3_key=track["s3_key"], + bucket_name=input.bucket_name, + transcript_id=input.transcript_id, + ) + ) + for i, track in enumerate(input.tracks) + ] + + # Wait for all child workflows to complete + results = await asyncio.gather(*child_coroutines) + + # Collect all track results + all_words = [] + padded_urls = [] + + for result in results: + transcribe_result = result.get("transcribe_track", {}) + all_words.extend(transcribe_result.get("words", [])) + + pad_result = result.get("pad_track", {}) + padded_urls.append(pad_result.get("padded_url")) + + # Sort words by start time + all_words.sort(key=lambda w: w.get("start", 0)) + + logger.info( + "[Hatchet] process_tracks complete", + num_tracks=len(input.tracks), + total_words=len(all_words), + ) + + return { + "all_words": all_words, + "padded_urls": padded_urls, + "word_count": len(all_words), + "num_tracks": len(input.tracks), + } + + +@diarization_pipeline.task( + parents=[process_tracks], execution_timeout=timedelta(seconds=300), retries=3 +) +async def mixdown_tracks(input: PipelineInput, ctx: Context) -> dict: + """Mix all padded tracks into single audio file.""" + logger.info("[Hatchet] mixdown_tracks", transcript_id=input.transcript_id) + + await emit_progress_async( + input.transcript_id, "mixdown_tracks", "in_progress", ctx.workflow_run_id + ) + + try: + track_data = ctx.task_output(process_tracks) + padded_urls = track_data.get("padded_urls", []) + + if not padded_urls: + raise ValueError("No padded tracks to mixdown") + + storage = _get_storage() + + # Download all tracks and mix + temp_inputs = [] + try: + for i, url in enumerate(padded_urls): + if not url: + continue + temp_input = tempfile.NamedTemporaryFile(suffix=".webm", delete=False) + temp_inputs.append(temp_input.name) + + # Download track + import httpx + + async with httpx.AsyncClient() as client: + response = await client.get(url) + response.raise_for_status() + with open(temp_input.name, "wb") as f: + f.write(response.content) + + # Mix using PyAV amix filter + if len(temp_inputs) == 0: + raise ValueError("No valid tracks to mixdown") + + output_path = tempfile.mktemp(suffix=".mp3") + + try: + # Use ffmpeg-style mixing via PyAV + containers = [av.open(path) for path in temp_inputs] + + # Get the longest duration + max_duration = 0.0 + for container in containers: + if container.duration: + duration = float(container.duration * av.time_base) + max_duration = max(max_duration, duration) + + # Close containers for now + for container in containers: + container.close() + + # Use subprocess for mixing (simpler than complex PyAV graph) + import subprocess + + # Build ffmpeg command + cmd = ["ffmpeg", "-y"] + for path in temp_inputs: + cmd.extend(["-i", path]) + + # Build filter for N inputs + n = len(temp_inputs) + filter_str = f"amix=inputs={n}:duration=longest:normalize=0" + cmd.extend(["-filter_complex", filter_str]) + cmd.extend(["-ac", "2", "-ar", "48000", "-b:a", "128k", output_path]) + + subprocess.run(cmd, check=True, capture_output=True) + + # Upload mixed file + file_size = Path(output_path).stat().st_size + storage_path = f"file_pipeline_hatchet/{input.transcript_id}/mixed.mp3" + + with open(output_path, "rb") as mixed_file: + await storage.put_file(storage_path, mixed_file) + + logger.info( + "[Hatchet] mixdown_tracks uploaded", + key=storage_path, + size=file_size, + ) + + finally: + Path(output_path).unlink(missing_ok=True) + + finally: + for path in temp_inputs: + Path(path).unlink(missing_ok=True) + + await emit_progress_async( + input.transcript_id, "mixdown_tracks", "completed", ctx.workflow_run_id + ) + + return { + "audio_key": storage_path, + "duration": max_duration, + "tracks_mixed": len(temp_inputs), + } + + except Exception as e: + logger.error("[Hatchet] mixdown_tracks failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "mixdown_tracks", "failed", ctx.workflow_run_id + ) + raise + + +@diarization_pipeline.task( + parents=[mixdown_tracks], execution_timeout=timedelta(seconds=120), retries=3 +) +async def generate_waveform(input: PipelineInput, ctx: Context) -> dict: + """Generate audio waveform visualization.""" + logger.info("[Hatchet] generate_waveform", transcript_id=input.transcript_id) + + await emit_progress_async( + input.transcript_id, "generate_waveform", "in_progress", ctx.workflow_run_id + ) + + try: + mixdown_data = ctx.task_output(mixdown_tracks) + audio_key = mixdown_data.get("audio_key") + + storage = _get_storage() + audio_url = await storage.get_file_url( + audio_key, + operation="get_object", + expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, + ) + + from reflector.pipelines.waveform_helpers import generate_waveform_data + + waveform = await generate_waveform_data(audio_url) + + # Store waveform + waveform_key = f"file_pipeline_hatchet/{input.transcript_id}/waveform.json" + import json + + waveform_bytes = json.dumps(waveform).encode() + import io + + await storage.put_file(waveform_key, io.BytesIO(waveform_bytes)) + + logger.info("[Hatchet] generate_waveform complete") + + await emit_progress_async( + input.transcript_id, "generate_waveform", "completed", ctx.workflow_run_id + ) + + return {"waveform_key": waveform_key} + + except Exception as e: + logger.error("[Hatchet] generate_waveform failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "generate_waveform", "failed", ctx.workflow_run_id + ) + raise + + +@diarization_pipeline.task( + parents=[mixdown_tracks], execution_timeout=timedelta(seconds=300), retries=3 +) +async def detect_topics(input: PipelineInput, ctx: Context) -> dict: + """Detect topics using LLM.""" + logger.info("[Hatchet] detect_topics", transcript_id=input.transcript_id) + + await emit_progress_async( + input.transcript_id, "detect_topics", "in_progress", ctx.workflow_run_id + ) + + try: + track_data = ctx.task_output(process_tracks) + words = track_data.get("all_words", []) + + from reflector.pipelines import topic_processing + from reflector.processors.types import Transcript as TranscriptType + from reflector.processors.types import Word + + # Convert word dicts to Word objects + word_objects = [Word(**w) for w in words] + transcript = TranscriptType(words=word_objects) + + empty_pipeline = topic_processing.EmptyPipeline(logger=logger) + + async def noop_callback(t): + pass + + topics = await topic_processing.detect_topics( + transcript, + "en", # target_language + on_topic_callback=noop_callback, + empty_pipeline=empty_pipeline, + ) + + topics_list = [t.model_dump() for t in topics] + + logger.info("[Hatchet] detect_topics complete", topic_count=len(topics_list)) + + await emit_progress_async( + input.transcript_id, "detect_topics", "completed", ctx.workflow_run_id + ) + + return {"topics": topics_list} + + except Exception as e: + logger.error("[Hatchet] detect_topics failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "detect_topics", "failed", ctx.workflow_run_id + ) + raise + + +@diarization_pipeline.task( + parents=[detect_topics], execution_timeout=timedelta(seconds=120), retries=3 +) +async def generate_title(input: PipelineInput, ctx: Context) -> dict: + """Generate meeting title using LLM.""" + logger.info("[Hatchet] generate_title", transcript_id=input.transcript_id) + + await emit_progress_async( + input.transcript_id, "generate_title", "in_progress", ctx.workflow_run_id + ) + + try: + topics_data = ctx.task_output(detect_topics) + topics = topics_data.get("topics", []) + + from reflector.pipelines import topic_processing + from reflector.processors.types import Topic + + topic_objects = [Topic(**t) for t in topics] + + title = await topic_processing.generate_title(topic_objects) + + logger.info("[Hatchet] generate_title complete", title=title) + + await emit_progress_async( + input.transcript_id, "generate_title", "completed", ctx.workflow_run_id + ) + + return {"title": title} + + except Exception as e: + logger.error("[Hatchet] generate_title failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "generate_title", "failed", ctx.workflow_run_id + ) + raise + + +@diarization_pipeline.task( + parents=[detect_topics], execution_timeout=timedelta(seconds=300), retries=3 +) +async def generate_summary(input: PipelineInput, ctx: Context) -> dict: + """Generate meeting summary using LLM.""" + logger.info("[Hatchet] generate_summary", transcript_id=input.transcript_id) + + await emit_progress_async( + input.transcript_id, "generate_summary", "in_progress", ctx.workflow_run_id + ) + + try: + track_data = ctx.task_output(process_tracks) + topics_data = ctx.task_output(detect_topics) + + words = track_data.get("all_words", []) + topics = topics_data.get("topics", []) + + from reflector.pipelines import topic_processing + from reflector.processors.types import Topic, Word + from reflector.processors.types import Transcript as TranscriptType + + word_objects = [Word(**w) for w in words] + transcript = TranscriptType(words=word_objects) + topic_objects = [Topic(**t) for t in topics] + + summary, short_summary = await topic_processing.generate_summary( + transcript, topic_objects + ) + + logger.info("[Hatchet] generate_summary complete") + + await emit_progress_async( + input.transcript_id, "generate_summary", "completed", ctx.workflow_run_id + ) + + return {"summary": summary, "short_summary": short_summary} + + except Exception as e: + logger.error("[Hatchet] generate_summary failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "generate_summary", "failed", ctx.workflow_run_id + ) + raise + + +@diarization_pipeline.task( + parents=[generate_waveform, generate_title, generate_summary], + execution_timeout=timedelta(seconds=60), + retries=3, +) +async def finalize(input: PipelineInput, ctx: Context) -> dict: + """Finalize transcript status and update database.""" + logger.info("[Hatchet] finalize", transcript_id=input.transcript_id) + + await emit_progress_async( + input.transcript_id, "finalize", "in_progress", ctx.workflow_run_id + ) + + try: + title_data = ctx.task_output(generate_title) + summary_data = ctx.task_output(generate_summary) + mixdown_data = ctx.task_output(mixdown_tracks) + track_data = ctx.task_output(process_tracks) + + title = title_data.get("title", "") + summary = summary_data.get("summary", "") + short_summary = summary_data.get("short_summary", "") + duration = mixdown_data.get("duration", 0) + all_words = track_data.get("all_words", []) + + db = await _get_fresh_db_connection() + + try: + from reflector.db.transcripts import transcripts_controller + from reflector.processors.types import Word + + transcript = await transcripts_controller.get_by_id(input.transcript_id) + if transcript is None: + raise ValueError( + f"Transcript {input.transcript_id} not found in database" + ) + + # Convert words back to Word objects for storage + word_objects = [Word(**w) for w in all_words] + + await transcripts_controller.update( + transcript, + { + "status": "ended", + "title": title, + "long_summary": summary, + "short_summary": short_summary, + "duration": duration, + "words": word_objects, + }, + ) + + logger.info( + "[Hatchet] finalize complete", transcript_id=input.transcript_id + ) + + finally: + await _close_db_connection(db) + + await emit_progress_async( + input.transcript_id, "finalize", "completed", ctx.workflow_run_id + ) + + return {"status": "COMPLETED"} + + except Exception as e: + logger.error("[Hatchet] finalize failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "finalize", "failed", ctx.workflow_run_id + ) + raise + + +@diarization_pipeline.task( + parents=[finalize], execution_timeout=timedelta(seconds=60), retries=3 +) +async def cleanup_consent(input: PipelineInput, ctx: Context) -> dict: + """Check and handle consent requirements.""" + logger.info("[Hatchet] cleanup_consent", transcript_id=input.transcript_id) + + await emit_progress_async( + input.transcript_id, "cleanup_consent", "in_progress", ctx.workflow_run_id + ) + + try: + db = await _get_fresh_db_connection() + + try: + from reflector.db.meetings import meetings_controller + from reflector.db.transcripts import transcripts_controller + + transcript = await transcripts_controller.get_by_id(input.transcript_id) + if transcript and transcript.meeting_id: + meeting = await meetings_controller.get_by_id(transcript.meeting_id) + if meeting: + # Check consent logic here + # For now just mark as checked + pass + + logger.info( + "[Hatchet] cleanup_consent complete", transcript_id=input.transcript_id + ) + + finally: + await _close_db_connection(db) + + await emit_progress_async( + input.transcript_id, "cleanup_consent", "completed", ctx.workflow_run_id + ) + + return {"consent_checked": True} + + except Exception as e: + logger.error("[Hatchet] cleanup_consent failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "cleanup_consent", "failed", ctx.workflow_run_id + ) + raise + + +@diarization_pipeline.task( + parents=[cleanup_consent], execution_timeout=timedelta(seconds=60), retries=5 +) +async def post_zulip(input: PipelineInput, ctx: Context) -> dict: + """Post notification to Zulip.""" + logger.info("[Hatchet] post_zulip", transcript_id=input.transcript_id) + + await emit_progress_async( + input.transcript_id, "post_zulip", "in_progress", ctx.workflow_run_id + ) + + try: + from reflector.settings import settings + + if not settings.ZULIP_REALM: + logger.info("[Hatchet] post_zulip skipped (Zulip not configured)") + await emit_progress_async( + input.transcript_id, "post_zulip", "completed", ctx.workflow_run_id + ) + return {"zulip_message_id": None, "skipped": True} + + from reflector.zulip import post_transcript_notification + + db = await _get_fresh_db_connection() + + try: + from reflector.db.transcripts import transcripts_controller + + transcript = await transcripts_controller.get_by_id(input.transcript_id) + if transcript: + message_id = await post_transcript_notification(transcript) + logger.info( + "[Hatchet] post_zulip complete", zulip_message_id=message_id + ) + else: + message_id = None + + finally: + await _close_db_connection(db) + + await emit_progress_async( + input.transcript_id, "post_zulip", "completed", ctx.workflow_run_id + ) + + return {"zulip_message_id": message_id} + + except Exception as e: + logger.error("[Hatchet] post_zulip failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "post_zulip", "failed", ctx.workflow_run_id + ) + raise + + +@diarization_pipeline.task( + parents=[post_zulip], execution_timeout=timedelta(seconds=120), retries=30 +) +async def send_webhook(input: PipelineInput, ctx: Context) -> dict: + """Send completion webhook to external service.""" + logger.info("[Hatchet] send_webhook", transcript_id=input.transcript_id) + + await emit_progress_async( + input.transcript_id, "send_webhook", "in_progress", ctx.workflow_run_id + ) + + try: + if not input.room_id: + logger.info("[Hatchet] send_webhook skipped (no room_id)") + await emit_progress_async( + input.transcript_id, "send_webhook", "completed", ctx.workflow_run_id + ) + return {"webhook_sent": False, "skipped": True} + + db = await _get_fresh_db_connection() + + try: + from reflector.db.rooms import rooms_controller + from reflector.db.transcripts import transcripts_controller + + room = await rooms_controller.get_by_id(input.room_id) + transcript = await transcripts_controller.get_by_id(input.transcript_id) + + if room and room.webhook_url and transcript: + import httpx + + webhook_payload = { + "event": "transcript.completed", + "transcript_id": input.transcript_id, + "title": transcript.title, + "duration": transcript.duration, + } + + async with httpx.AsyncClient() as client: + response = await client.post( + room.webhook_url, json=webhook_payload, timeout=30 + ) + response.raise_for_status() + + logger.info( + "[Hatchet] send_webhook complete", status_code=response.status_code + ) + + await emit_progress_async( + input.transcript_id, + "send_webhook", + "completed", + ctx.workflow_run_id, + ) + + return {"webhook_sent": True, "response_code": response.status_code} + + finally: + await _close_db_connection(db) + + await emit_progress_async( + input.transcript_id, "send_webhook", "completed", ctx.workflow_run_id + ) + + return {"webhook_sent": False, "skipped": True} + + except Exception as e: + logger.error("[Hatchet] send_webhook failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "send_webhook", "failed", ctx.workflow_run_id + ) + raise diff --git a/server/reflector/hatchet/workflows/track_processing.py b/server/reflector/hatchet/workflows/track_processing.py new file mode 100644 index 00000000..304d6b37 --- /dev/null +++ b/server/reflector/hatchet/workflows/track_processing.py @@ -0,0 +1,337 @@ +""" +Hatchet child workflow: TrackProcessing + +Handles individual audio track processing: padding and transcription. +Spawned dynamically by the main diarization pipeline for each track. +""" + +import math +import tempfile +from datetime import timedelta +from fractions import Fraction +from pathlib import Path + +import av +from av.audio.resampler import AudioResampler +from hatchet_sdk import Context +from pydantic import BaseModel + +from reflector.hatchet.client import HatchetClientManager +from reflector.hatchet.progress import emit_progress_async +from reflector.logger import logger + +# Audio constants matching existing pipeline +OPUS_STANDARD_SAMPLE_RATE = 48000 +OPUS_DEFAULT_BIT_RATE = 64000 +PRESIGNED_URL_EXPIRATION_SECONDS = 7200 + + +class TrackInput(BaseModel): + """Input for individual track processing.""" + + track_index: int + s3_key: str + bucket_name: str + transcript_id: str + language: str = "en" + + +# Get hatchet client and define workflow +hatchet = HatchetClientManager.get_client() + +track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput) + + +def _extract_stream_start_time_from_container(container, track_idx: int) -> float: + """Extract meeting-relative start time from WebM stream metadata. + + Uses PyAV to read stream.start_time from WebM container. + More accurate than filename timestamps by ~209ms due to network/encoding delays. + """ + start_time_seconds = 0.0 + try: + audio_streams = [s for s in container.streams if s.type == "audio"] + stream = audio_streams[0] if audio_streams else container.streams[0] + + # 1) Try stream-level start_time (most reliable for Daily.co tracks) + if stream.start_time is not None and stream.time_base is not None: + start_time_seconds = float(stream.start_time * stream.time_base) + + # 2) Fallback to container-level start_time + if (start_time_seconds <= 0) and (container.start_time is not None): + start_time_seconds = float(container.start_time * av.time_base) + + # 3) Fallback to first packet DTS + if start_time_seconds <= 0: + for packet in container.demux(stream): + if packet.dts is not None: + start_time_seconds = float(packet.dts * stream.time_base) + break + except Exception as e: + logger.warning( + "PyAV metadata read failed; assuming 0 start_time", + track_idx=track_idx, + error=str(e), + ) + start_time_seconds = 0.0 + + logger.info( + f"Track {track_idx} stream metadata: start_time={start_time_seconds:.3f}s", + track_idx=track_idx, + ) + return start_time_seconds + + +def _apply_audio_padding_to_file( + in_container, + output_path: str, + start_time_seconds: float, + track_idx: int, +) -> None: + """Apply silence padding to audio track using PyAV filter graph.""" + delay_ms = math.floor(start_time_seconds * 1000) + + logger.info( + f"Padding track {track_idx} with {delay_ms}ms delay using PyAV", + track_idx=track_idx, + delay_ms=delay_ms, + ) + + with av.open(output_path, "w", format="webm") as out_container: + in_stream = next((s for s in in_container.streams if s.type == "audio"), None) + if in_stream is None: + raise Exception("No audio stream in input") + + out_stream = out_container.add_stream("libopus", rate=OPUS_STANDARD_SAMPLE_RATE) + out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE + graph = av.filter.Graph() + + abuf_args = ( + f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:" + f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:" + f"sample_fmt=s16:" + f"channel_layout=stereo" + ) + src = graph.add("abuffer", args=abuf_args, name="src") + aresample_f = graph.add("aresample", args="async=1", name="ares") + delays_arg = f"{delay_ms}|{delay_ms}" + adelay_f = graph.add("adelay", args=f"delays={delays_arg}:all=1", name="delay") + sink = graph.add("abuffersink", name="sink") + + src.link_to(aresample_f) + aresample_f.link_to(adelay_f) + adelay_f.link_to(sink) + graph.configure() + + resampler = AudioResampler( + format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE + ) + + for frame in in_container.decode(in_stream): + out_frames = resampler.resample(frame) or [] + for rframe in out_frames: + rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE + rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + src.push(rframe) + + while True: + try: + f_out = sink.pull() + except Exception: + break + f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE + f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + for packet in out_stream.encode(f_out): + out_container.mux(packet) + + # Flush remaining frames + src.push(None) + while True: + try: + f_out = sink.pull() + except Exception: + break + f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE + f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + for packet in out_stream.encode(f_out): + out_container.mux(packet) + + for packet in out_stream.encode(None): + out_container.mux(packet) + + +@track_workflow.task(execution_timeout=timedelta(seconds=300), retries=3) +async def pad_track(input: TrackInput, ctx: Context) -> dict: + """Pad single audio track with silence for alignment. + + Extracts stream.start_time from WebM container metadata and applies + silence padding using PyAV filter graph (adelay). + """ + logger.info( + "[Hatchet] pad_track", + track_index=input.track_index, + s3_key=input.s3_key, + transcript_id=input.transcript_id, + ) + + await emit_progress_async( + input.transcript_id, "pad_track", "in_progress", ctx.workflow_run_id + ) + + try: + # Create fresh storage instance to avoid aioboto3 fork issues + from reflector.settings import settings + from reflector.storage.storage_aws import AwsStorage + + storage = AwsStorage( + aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME, + aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION, + aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID, + aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY, + ) + + # Get presigned URL for source file + source_url = await storage.get_file_url( + input.s3_key, + operation="get_object", + expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, + bucket=input.bucket_name, + ) + + # Open container and extract start time + with av.open(source_url) as in_container: + start_time_seconds = _extract_stream_start_time_from_container( + in_container, input.track_index + ) + + # If no padding needed, return original URL + if start_time_seconds <= 0: + logger.info( + f"Track {input.track_index} requires no padding", + track_index=input.track_index, + ) + await emit_progress_async( + input.transcript_id, "pad_track", "completed", ctx.workflow_run_id + ) + return { + "padded_url": source_url, + "size": 0, + "track_index": input.track_index, + } + + # Create temp file for padded output + with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file: + temp_path = temp_file.name + + try: + _apply_audio_padding_to_file( + in_container, temp_path, start_time_seconds, input.track_index + ) + + file_size = Path(temp_path).stat().st_size + storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm" + + logger.info( + f"About to upload padded track", + key=storage_path, + size=file_size, + ) + + with open(temp_path, "rb") as padded_file: + await storage.put_file(storage_path, padded_file) + + logger.info( + f"Uploaded padded track to S3", + key=storage_path, + size=file_size, + ) + finally: + Path(temp_path).unlink(missing_ok=True) + + # Get presigned URL for padded file + padded_url = await storage.get_file_url( + storage_path, + operation="get_object", + expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, + ) + + logger.info( + "[Hatchet] pad_track complete", + track_index=input.track_index, + padded_url=padded_url[:50] + "...", + ) + + await emit_progress_async( + input.transcript_id, "pad_track", "completed", ctx.workflow_run_id + ) + + return { + "padded_url": padded_url, + "size": file_size, + "track_index": input.track_index, + } + + except Exception as e: + logger.error("[Hatchet] pad_track failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "pad_track", "failed", ctx.workflow_run_id + ) + raise + + +@track_workflow.task( + parents=[pad_track], execution_timeout=timedelta(seconds=600), retries=3 +) +async def transcribe_track(input: TrackInput, ctx: Context) -> dict: + """Transcribe audio track using GPU (Modal.com) or local Whisper.""" + logger.info( + "[Hatchet] transcribe_track", + track_index=input.track_index, + language=input.language, + ) + + await emit_progress_async( + input.transcript_id, "transcribe_track", "in_progress", ctx.workflow_run_id + ) + + try: + pad_result = ctx.task_output(pad_track) + audio_url = pad_result.get("padded_url") + + if not audio_url: + raise ValueError("Missing padded_url from pad_track") + + from reflector.pipelines.transcription_helpers import ( + transcribe_file_with_processor, + ) + + transcript = await transcribe_file_with_processor(audio_url, input.language) + + # Tag all words with speaker index + words = [] + for word in transcript.words: + word_dict = word.model_dump() + word_dict["speaker"] = input.track_index + words.append(word_dict) + + logger.info( + "[Hatchet] transcribe_track complete", + track_index=input.track_index, + word_count=len(words), + ) + + await emit_progress_async( + input.transcript_id, "transcribe_track", "completed", ctx.workflow_run_id + ) + + return { + "words": words, + "track_index": input.track_index, + } + + except Exception as e: + logger.error("[Hatchet] transcribe_track failed", error=str(e), exc_info=True) + await emit_progress_async( + input.transcript_id, "transcribe_track", "failed", ctx.workflow_run_id + ) + raise diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index 14754077..379d5aae 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -15,6 +15,7 @@ from celery.result import AsyncResult from reflector.conductor.client import ConductorClientManager from reflector.db.recordings import recordings_controller from reflector.db.transcripts import Transcript +from reflector.hatchet.client import HatchetClientManager from reflector.logger import logger from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_multitrack_pipeline import ( @@ -156,8 +157,47 @@ async def prepare_transcript_processing( def dispatch_transcript_processing(config: ProcessingConfig) -> AsyncResult | None: if isinstance(config, MultitrackProcessingConfig): - # Start Conductor workflow if enabled - if settings.CONDUCTOR_ENABLED: + # Start durable workflow if enabled (Hatchet or Conductor) + durable_started = False + + if settings.HATCHET_ENABLED: + import asyncio + + async def _start_hatchet(): + return await HatchetClientManager.start_workflow( + workflow_name="DiarizationPipeline", + input_data={ + "recording_id": config.recording_id, + "room_name": None, # Not available in reprocess path + "tracks": [{"s3_key": k} for k in config.track_keys], + "bucket_name": config.bucket_name, + "transcript_id": config.transcript_id, + "room_id": config.room_id, + }, + ) + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop and loop.is_running(): + # Already in async context + import concurrent.futures + + with concurrent.futures.ThreadPoolExecutor() as pool: + workflow_id = pool.submit(asyncio.run, _start_hatchet()).result() + else: + workflow_id = asyncio.run(_start_hatchet()) + + logger.info( + "Started Hatchet workflow (reprocess)", + workflow_id=workflow_id, + transcript_id=config.transcript_id, + ) + durable_started = True + + elif settings.CONDUCTOR_ENABLED: workflow_id = ConductorClientManager.start_workflow( name="diarization_pipeline", version=1, @@ -175,11 +215,13 @@ def dispatch_transcript_processing(config: ProcessingConfig) -> AsyncResult | No workflow_id=workflow_id, transcript_id=config.transcript_id, ) + durable_started = True - if not settings.CONDUCTOR_SHADOW_MODE: - return None # Conductor-only, no Celery result + # If durable workflow started and not in shadow mode, skip Celery + if durable_started and not settings.DURABLE_WORKFLOW_SHADOW_MODE: + return None - # Celery pipeline (shadow mode or Conductor disabled) + # Celery pipeline (shadow mode or durable workflows disabled) return task_pipeline_multitrack_process.delay( transcript_id=config.transcript_id, bucket_name=config.bucket_name, diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 6d5e27da..b0c49907 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -150,11 +150,34 @@ class Settings(BaseSettings): ZULIP_API_KEY: str | None = None ZULIP_BOT_EMAIL: str | None = None + # Durable workflow orchestration + # Provider: "hatchet" or "conductor" (or "none" to disable) + DURABLE_WORKFLOW_PROVIDER: str = "none" + DURABLE_WORKFLOW_SHADOW_MODE: bool = False # Run both provider + Celery + # Conductor workflow orchestration CONDUCTOR_SERVER_URL: str = "http://conductor:8080/api" CONDUCTOR_DEBUG: bool = False - CONDUCTOR_ENABLED: bool = False - CONDUCTOR_SHADOW_MODE: bool = False + + # Hatchet workflow orchestration + HATCHET_CLIENT_TOKEN: str | None = None + HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls + HATCHET_DEBUG: bool = False + + @property + def CONDUCTOR_ENABLED(self) -> bool: + """Legacy compatibility: True if Conductor is the active provider.""" + return self.DURABLE_WORKFLOW_PROVIDER == "conductor" + + @property + def HATCHET_ENABLED(self) -> bool: + """True if Hatchet is the active provider.""" + return self.DURABLE_WORKFLOW_PROVIDER == "hatchet" + + @property + def CONDUCTOR_SHADOW_MODE(self) -> bool: + """Legacy compatibility for shadow mode.""" + return self.DURABLE_WORKFLOW_SHADOW_MODE and self.CONDUCTOR_ENABLED settings = Settings() diff --git a/server/reflector/views/hatchet.py b/server/reflector/views/hatchet.py new file mode 100644 index 00000000..1f73cfdb --- /dev/null +++ b/server/reflector/views/hatchet.py @@ -0,0 +1,57 @@ +"""Hatchet health and status endpoints.""" + +from fastapi import APIRouter + +from reflector.settings import settings + +router = APIRouter(prefix="/hatchet", tags=["hatchet"]) + + +@router.get("/health") +async def hatchet_health(): + """Check Hatchet connectivity and status.""" + if not settings.HATCHET_ENABLED: + return {"status": "disabled", "connected": False} + + if not settings.HATCHET_CLIENT_TOKEN: + return { + "status": "unhealthy", + "connected": False, + "error": "HATCHET_CLIENT_TOKEN not configured", + } + + try: + from reflector.hatchet.client import HatchetClientManager + + # Get client to verify token is valid + client = HatchetClientManager.get_client() + + # Try to get the client's gRPC connection status + # The SDK doesn't have a simple health check, so we just verify we can create the client + if client is not None: + return {"status": "healthy", "connected": True} + else: + return { + "status": "unhealthy", + "connected": False, + "error": "Failed to create client", + } + except ValueError as e: + return {"status": "unhealthy", "connected": False, "error": str(e)} + except Exception as e: + return {"status": "unhealthy", "connected": False, "error": str(e)} + + +@router.get("/workflow/{workflow_run_id}") +async def get_workflow_status(workflow_run_id: str): + """Get the status of a workflow run.""" + if not settings.HATCHET_ENABLED: + return {"error": "Hatchet is disabled"} + + try: + from reflector.hatchet.client import HatchetClientManager + + status = await HatchetClientManager.get_workflow_status(workflow_run_id) + return status + except Exception as e: + return {"error": str(e)} diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 764d35cb..26d4e1d9 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -286,8 +286,34 @@ async def _process_multitrack_recording_inner( room_id=room.id, ) - # Start Conductor workflow if enabled - if settings.CONDUCTOR_ENABLED: + # Start durable workflow if enabled (Hatchet or Conductor) + durable_started = False + + if settings.HATCHET_ENABLED: + from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415 + + workflow_id = await HatchetClientManager.start_workflow( + workflow_name="DiarizationPipeline", + input_data={ + "recording_id": recording_id, + "room_name": daily_room_name, + "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)], + "bucket_name": bucket_name, + "transcript_id": transcript.id, + "room_id": room.id, + }, + ) + logger.info( + "Started Hatchet workflow", + workflow_id=workflow_id, + transcript_id=transcript.id, + ) + + # Store workflow_id on recording for status tracking + await recordings_controller.update(recording, {"workflow_id": workflow_id}) + durable_started = True + + elif settings.CONDUCTOR_ENABLED: from reflector.conductor.client import ConductorClientManager # noqa: PLC0415 workflow_id = ConductorClientManager.start_workflow( @@ -310,11 +336,13 @@ async def _process_multitrack_recording_inner( # Store workflow_id on recording for status tracking await recordings_controller.update(recording, {"workflow_id": workflow_id}) + durable_started = True - if not settings.CONDUCTOR_SHADOW_MODE: - return # Don't trigger Celery + # If durable workflow started and not in shadow mode, skip Celery + if durable_started and not settings.DURABLE_WORKFLOW_SHADOW_MODE: + return - # Celery pipeline (runs when Conductor disabled OR in shadow mode) + # Celery pipeline (runs when durable workflows disabled OR in shadow mode) task_pipeline_multitrack_process.delay( transcript_id=transcript.id, bucket_name=bucket_name, diff --git a/server/runserver.sh b/server/runserver.sh index 65605ea6..b1b52ab2 100755 --- a/server/runserver.sh +++ b/server/runserver.sh @@ -9,6 +9,8 @@ elif [ "${ENTRYPOINT}" = "beat" ]; then uv run celery -A reflector.worker.app beat --loglevel=info elif [ "${ENTRYPOINT}" = "conductor-worker" ]; then uv run python -m reflector.conductor.run_workers +elif [ "${ENTRYPOINT}" = "hatchet-worker" ]; then + uv run python -m reflector.hatchet.run_workers else echo "Unknown command" fi diff --git a/server/uv.lock b/server/uv.lock index f47a968f..b6ddf93e 100644 --- a/server/uv.lock +++ b/server/uv.lock @@ -1218,6 +1218,70 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bf/c4/a839fcc28bebfa72925d9121c4d39398f77f95bcba0cf26c972a0cfb1de7/griffe-1.8.0-py3-none-any.whl", hash = "sha256:110faa744b2c5c84dd432f4fa9aa3b14805dd9519777dd55e8db214320593b02", size = 132487 }, ] +[[package]] +name = "grpcio" +version = "1.76.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b6/e0/318c1ce3ae5a17894d5791e87aea147587c9e702f24122cc7a5c8bbaeeb1/grpcio-1.76.0.tar.gz", hash = "sha256:7be78388d6da1a25c0d5ec506523db58b18be22d9c37d8d3a32c08be4987bd73", size = 12785182 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a0/00/8163a1beeb6971f66b4bbe6ac9457b97948beba8dd2fc8e1281dce7f79ec/grpcio-1.76.0-cp311-cp311-linux_armv7l.whl", hash = "sha256:2e1743fbd7f5fa713a1b0a8ac8ebabf0ec980b5d8809ec358d488e273b9cf02a", size = 5843567 }, + { url = "https://files.pythonhosted.org/packages/10/c1/934202f5cf335e6d852530ce14ddb0fef21be612ba9ecbbcbd4d748ca32d/grpcio-1.76.0-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:a8c2cf1209497cf659a667d7dea88985e834c24b7c3b605e6254cbb5076d985c", size = 11848017 }, + { url = "https://files.pythonhosted.org/packages/11/0b/8dec16b1863d74af6eb3543928600ec2195af49ca58b16334972f6775663/grpcio-1.76.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:08caea849a9d3c71a542827d6df9d5a69067b0a1efbea8a855633ff5d9571465", size = 6412027 }, + { url = "https://files.pythonhosted.org/packages/d7/64/7b9e6e7ab910bea9d46f2c090380bab274a0b91fb0a2fe9b0cd399fffa12/grpcio-1.76.0-cp311-cp311-manylinux2014_i686.manylinux_2_17_i686.whl", hash = "sha256:f0e34c2079d47ae9f6188211db9e777c619a21d4faba6977774e8fa43b085e48", size = 7075913 }, + { url = "https://files.pythonhosted.org/packages/68/86/093c46e9546073cefa789bd76d44c5cb2abc824ca62af0c18be590ff13ba/grpcio-1.76.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:8843114c0cfce61b40ad48df65abcfc00d4dba82eae8718fab5352390848c5da", size = 6615417 }, + { url = "https://files.pythonhosted.org/packages/f7/b6/5709a3a68500a9c03da6fb71740dcdd5ef245e39266461a03f31a57036d8/grpcio-1.76.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:8eddfb4d203a237da6f3cc8a540dad0517d274b5a1e9e636fd8d2c79b5c1d397", size = 7199683 }, + { url = "https://files.pythonhosted.org/packages/91/d3/4b1f2bf16ed52ce0b508161df3a2d186e4935379a159a834cb4a7d687429/grpcio-1.76.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:32483fe2aab2c3794101c2a159070584e5db11d0aa091b2c0ea9c4fc43d0d749", size = 8163109 }, + { url = "https://files.pythonhosted.org/packages/5c/61/d9043f95f5f4cf085ac5dd6137b469d41befb04bd80280952ffa2a4c3f12/grpcio-1.76.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:dcfe41187da8992c5f40aa8c5ec086fa3672834d2be57a32384c08d5a05b4c00", size = 7626676 }, + { url = "https://files.pythonhosted.org/packages/36/95/fd9a5152ca02d8881e4dd419cdd790e11805979f499a2e5b96488b85cf27/grpcio-1.76.0-cp311-cp311-win32.whl", hash = "sha256:2107b0c024d1b35f4083f11245c0e23846ae64d02f40b2b226684840260ed054", size = 3997688 }, + { url = "https://files.pythonhosted.org/packages/60/9c/5c359c8d4c9176cfa3c61ecd4efe5affe1f38d9bae81e81ac7186b4c9cc8/grpcio-1.76.0-cp311-cp311-win_amd64.whl", hash = "sha256:522175aba7af9113c48ec10cc471b9b9bd4f6ceb36aeb4544a8e2c80ed9d252d", size = 4709315 }, + { url = "https://files.pythonhosted.org/packages/bf/05/8e29121994b8d959ffa0afd28996d452f291b48cfc0875619de0bde2c50c/grpcio-1.76.0-cp312-cp312-linux_armv7l.whl", hash = "sha256:81fd9652b37b36f16138611c7e884eb82e0cec137c40d3ef7c3f9b3ed00f6ed8", size = 5799718 }, + { url = "https://files.pythonhosted.org/packages/d9/75/11d0e66b3cdf998c996489581bdad8900db79ebd83513e45c19548f1cba4/grpcio-1.76.0-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:04bbe1bfe3a68bbfd4e52402ab7d4eb59d72d02647ae2042204326cf4bbad280", size = 11825627 }, + { url = "https://files.pythonhosted.org/packages/28/50/2f0aa0498bc188048f5d9504dcc5c2c24f2eb1a9337cd0fa09a61a2e75f0/grpcio-1.76.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:d388087771c837cdb6515539f43b9d4bf0b0f23593a24054ac16f7a960be16f4", size = 6359167 }, + { url = "https://files.pythonhosted.org/packages/66/e5/bbf0bb97d29ede1d59d6588af40018cfc345b17ce979b7b45424628dc8bb/grpcio-1.76.0-cp312-cp312-manylinux2014_i686.manylinux_2_17_i686.whl", hash = "sha256:9f8f757bebaaea112c00dba718fc0d3260052ce714e25804a03f93f5d1c6cc11", size = 7044267 }, + { url = "https://files.pythonhosted.org/packages/f5/86/f6ec2164f743d9609691115ae8ece098c76b894ebe4f7c94a655c6b03e98/grpcio-1.76.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:980a846182ce88c4f2f7e2c22c56aefd515daeb36149d1c897f83cf57999e0b6", size = 6573963 }, + { url = "https://files.pythonhosted.org/packages/60/bc/8d9d0d8505feccfdf38a766d262c71e73639c165b311c9457208b56d92ae/grpcio-1.76.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:f92f88e6c033db65a5ae3d97905c8fea9c725b63e28d5a75cb73b49bda5024d8", size = 7164484 }, + { url = "https://files.pythonhosted.org/packages/67/e6/5d6c2fc10b95edf6df9b8f19cf10a34263b7fd48493936fffd5085521292/grpcio-1.76.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:4baf3cbe2f0be3289eb68ac8ae771156971848bb8aaff60bad42005539431980", size = 8127777 }, + { url = "https://files.pythonhosted.org/packages/3f/c8/dce8ff21c86abe025efe304d9e31fdb0deaaa3b502b6a78141080f206da0/grpcio-1.76.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:615ba64c208aaceb5ec83bfdce7728b80bfeb8be97562944836a7a0a9647d882", size = 7594014 }, + { url = "https://files.pythonhosted.org/packages/e0/42/ad28191ebf983a5d0ecef90bab66baa5a6b18f2bfdef9d0a63b1973d9f75/grpcio-1.76.0-cp312-cp312-win32.whl", hash = "sha256:45d59a649a82df5718fd9527ce775fd66d1af35e6d31abdcdc906a49c6822958", size = 3984750 }, + { url = "https://files.pythonhosted.org/packages/9e/00/7bd478cbb851c04a48baccaa49b75abaa8e4122f7d86da797500cccdd771/grpcio-1.76.0-cp312-cp312-win_amd64.whl", hash = "sha256:c088e7a90b6017307f423efbb9d1ba97a22aa2170876223f9709e9d1de0b5347", size = 4704003 }, +] + +[[package]] +name = "grpcio-tools" +version = "1.76.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "grpcio" }, + { name = "protobuf" }, + { name = "setuptools" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a0/77/17d60d636ccd86a0db0eccc24d02967bbc3eea86b9db7324b04507ebaa40/grpcio_tools-1.76.0.tar.gz", hash = "sha256:ce80169b5e6adf3e8302f3ebb6cb0c3a9f08089133abca4b76ad67f751f5ad88", size = 5390807 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/73/d1/efbeed1a864c846228c0a3b322e7a2d6545f025e35246aebf96496a36004/grpcio_tools-1.76.0-cp311-cp311-linux_armv7l.whl", hash = "sha256:c6480f6af6833850a85cca1c6b435ef4ffd2ac8e88ef683b4065233827950243", size = 2545931 }, + { url = "https://files.pythonhosted.org/packages/af/8e/f257c0f565d9d44658301238b01a9353bc6f3b272bb4191faacae042579d/grpcio_tools-1.76.0-cp311-cp311-macosx_11_0_universal2.whl", hash = "sha256:c7c23fe1dc09818e16a48853477806ad77dd628b33996f78c05a293065f8210c", size = 5844794 }, + { url = "https://files.pythonhosted.org/packages/c7/c0/6c1e89c67356cb20e19ed670c5099b13e40fd678cac584c778f931666a86/grpcio_tools-1.76.0-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:fcdce7f7770ff052cd4e60161764b0b3498c909bde69138f8bd2e7b24a3ecd8f", size = 2591772 }, + { url = "https://files.pythonhosted.org/packages/c0/10/5f33aa7bc3ddaad0cfd2f4e950ac4f1a310e8d0c7b1358622a581e8b7a2f/grpcio_tools-1.76.0-cp311-cp311-manylinux2014_i686.manylinux_2_17_i686.whl", hash = "sha256:b598fdcebffa931c7da5c9e90b5805fff7e9bc6cf238319358a1b85704c57d33", size = 2905140 }, + { url = "https://files.pythonhosted.org/packages/f4/3e/23e3a52a77368f47188ed83c34eb53866d3ce0f73835b2f6764844ae89eb/grpcio_tools-1.76.0-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:6a9818ff884796b12dcf8db32126e40ec1098cacf5697f27af9cfccfca1c1fae", size = 2656475 }, + { url = "https://files.pythonhosted.org/packages/51/85/a74ae87ec7dbd3d2243881f5c548215aed1148660df7945be3a125ba9a21/grpcio_tools-1.76.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:105e53435b2eed3961da543db44a2a34479d98d18ea248219856f30a0ca4646b", size = 3106158 }, + { url = "https://files.pythonhosted.org/packages/54/d5/a6ed1e5823bc5d55a1eb93e0c14ccee0b75951f914832ab51fb64d522a0f/grpcio_tools-1.76.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:454a1232c7f99410d92fa9923c7851fd4cdaf657ee194eac73ea1fe21b406d6e", size = 3654980 }, + { url = "https://files.pythonhosted.org/packages/f9/29/c05d5501ba156a242079ef71d073116d2509c195b5e5e74c545f0a3a3a69/grpcio_tools-1.76.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ca9ccf667afc0268d45ab202af4556c72e57ea36ebddc93535e1a25cbd4f8aba", size = 3322658 }, + { url = "https://files.pythonhosted.org/packages/02/b6/ee0317b91da19a7537d93c4161cbc2a45a165c8893209b0bbd470d830ffa/grpcio_tools-1.76.0-cp311-cp311-win32.whl", hash = "sha256:a83c87513b708228b4cad7619311daba65b40937745103cadca3db94a6472d9c", size = 993837 }, + { url = "https://files.pythonhosted.org/packages/81/63/9623cadf0406b264737f16d4ed273bb2d65001d87fbd803b565c45d665d1/grpcio_tools-1.76.0-cp311-cp311-win_amd64.whl", hash = "sha256:2ce5e87ec71f2e4041dce4351f2a8e3b713e3bca6b54c69c3fbc6c7ad1f4c386", size = 1158634 }, + { url = "https://files.pythonhosted.org/packages/4f/ca/a931c1439cabfe305c9afd07e233150cd0565aa062c20d1ee412ed188852/grpcio_tools-1.76.0-cp312-cp312-linux_armv7l.whl", hash = "sha256:4ad555b8647de1ebaffb25170249f89057721ffb74f7da96834a07b4855bb46a", size = 2546852 }, + { url = "https://files.pythonhosted.org/packages/4c/07/935cfbb7dccd602723482a86d43fbd992f91e9867bca0056a1e9f348473e/grpcio_tools-1.76.0-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:243af7c8fc7ff22a40a42eb8e0f6f66963c1920b75aae2a2ec503a9c3c8b31c1", size = 5841777 }, + { url = "https://files.pythonhosted.org/packages/e4/92/8fcb5acebdccb647e0fa3f002576480459f6cf81e79692d7b3c4d6e29605/grpcio_tools-1.76.0-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.whl", hash = "sha256:8207b890f423142cc0025d041fb058f7286318df6a049565c27869d73534228b", size = 2594004 }, + { url = "https://files.pythonhosted.org/packages/9d/ea/64838e8113b7bfd4842b15c815a7354cb63242fdce9d6648d894b5d50897/grpcio_tools-1.76.0-cp312-cp312-manylinux2014_i686.manylinux_2_17_i686.whl", hash = "sha256:3dafa34c2626a6691d103877e8a145f54c34cf6530975f695b396ed2fc5c98f8", size = 2905563 }, + { url = "https://files.pythonhosted.org/packages/a6/d6/53798827d821098219e58518b6db52161ce4985620850aa74ce3795da8a7/grpcio_tools-1.76.0-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.whl", hash = "sha256:30f1d2dda6ece285b3d9084e94f66fa721ebdba14ae76b2bc4c581c8a166535c", size = 2656936 }, + { url = "https://files.pythonhosted.org/packages/89/a3/d9c1cefc46a790eec520fe4e70e87279abb01a58b1a3b74cf93f62b824a2/grpcio_tools-1.76.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:a889af059dc6dbb82d7b417aa581601316e364fe12eb54c1b8d95311ea50916d", size = 3109811 }, + { url = "https://files.pythonhosted.org/packages/50/75/5997752644b73b5d59377d333a51c8a916606df077f5a487853e37dca289/grpcio_tools-1.76.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:c3f2c3c44c56eb5d479ab178f0174595d0a974c37dade442f05bb73dfec02f31", size = 3658786 }, + { url = "https://files.pythonhosted.org/packages/84/47/dcf8380df4bd7931ffba32fc6adc2de635b6569ca27fdec7121733797062/grpcio_tools-1.76.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:479ce02dff684046f909a487d452a83a96b4231f7c70a3b218a075d54e951f56", size = 3325144 }, + { url = "https://files.pythonhosted.org/packages/04/88/ea3e5fdb874d8c2d04488e4b9d05056537fba70915593f0c283ac77df188/grpcio_tools-1.76.0-cp312-cp312-win32.whl", hash = "sha256:9ba4bb539936642a44418b38ee6c3e8823c037699e2cb282bd8a44d76a4be833", size = 993523 }, + { url = "https://files.pythonhosted.org/packages/de/b1/ce7d59d147675ec191a55816be46bc47a343b5ff07279eef5817c09cc53e/grpcio_tools-1.76.0-cp312-cp312-win_amd64.whl", hash = "sha256:0cd489016766b05f9ed8a6b6596004b62c57d323f49593eac84add032a6d43f7", size = 1158493 }, +] + [[package]] name = "h11" version = "0.16.0" @@ -1227,6 +1291,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515 }, ] +[[package]] +name = "hatchet-sdk" +version = "1.21.6" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiohttp" }, + { name = "grpcio" }, + { name = "grpcio-tools" }, + { name = "prometheus-client" }, + { name = "protobuf" }, + { name = "pydantic" }, + { name = "pydantic-settings" }, + { name = "python-dateutil" }, + { name = "tenacity" }, + { name = "urllib3" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/7c/df/75dd02e1dc6b99f7151a57f084876c50f739ad4d643b060078f65d51d717/hatchet_sdk-1.21.6.tar.gz", hash = "sha256:b65741324ad721ce57f5fe3f960e2942c4ac2ceec6ca483dd35f84137ff7c46c", size = 219345 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/00/86/e4cd7928bcabd33c634c33d4e878e2454e03f97c87b72947c7ff5762d813/hatchet_sdk-1.21.6-py3-none-any.whl", hash = "sha256:589fba9104a6517e1ba677b9865fa0a20e221863a8c2a2724051198994c11399", size = 529167 }, +] + [[package]] name = "hf-xet" version = "1.1.5" @@ -3150,6 +3235,7 @@ dependencies = [ { name = "databases", extra = ["aiosqlite", "asyncpg"] }, { name = "fastapi", extra = ["standard"] }, { name = "fastapi-pagination" }, + { name = "hatchet-sdk" }, { name = "httpx" }, { name = "icalendar" }, { name = "jsonschema" }, @@ -3227,6 +3313,7 @@ requires-dist = [ { name = "databases", extras = ["aiosqlite", "asyncpg"], specifier = ">=0.7.0" }, { name = "fastapi", extras = ["standard"], specifier = ">=0.100.1" }, { name = "fastapi-pagination", specifier = ">=0.12.6" }, + { name = "hatchet-sdk", specifier = ">=0.47.0" }, { name = "httpx", specifier = ">=0.24.1" }, { name = "icalendar", specifier = ">=6.0.0" }, { name = "jsonschema", specifier = ">=4.23.0" },