From 9b8f76929e06927a301599ce1e1abe828947bcff Mon Sep 17 00:00:00 2001 From: Igor Loskutov Date: Tue, 16 Dec 2025 16:39:52 -0500 Subject: [PATCH] remove shadow mode for hatchet --- server/HATCHET_LLM_OBSERVATIONS.md | 376 ------------------ server/reflector/db/rooms.py | 7 + .../reflector/services/transcript_process.py | 58 ++- server/reflector/settings.py | 1 - server/reflector/worker/process.py | 6 +- 5 files changed, 62 insertions(+), 386 deletions(-) delete mode 100644 server/HATCHET_LLM_OBSERVATIONS.md diff --git a/server/HATCHET_LLM_OBSERVATIONS.md b/server/HATCHET_LLM_OBSERVATIONS.md deleted file mode 100644 index cab8e740..00000000 --- a/server/HATCHET_LLM_OBSERVATIONS.md +++ /dev/null @@ -1,376 +0,0 @@ -# Hatchet Migration - LLM Debugging Observations - -This document captures hard-won debugging insights from implementing the multitrack diarization pipeline with Hatchet. These observations are particularly relevant for LLM assistants working on this codebase. - -## Architecture Context - -- **Hatchet SDK v1.21+** uses async workers with gRPC for task polling -- Workers connect to Hatchet server via gRPC (port 7077) and trigger workflows via REST (port 8888) -- `hatchet-lite` image bundles server, engine, and database in one container -- Tasks are decorated with `@workflow.task()` (not `@hatchet.step()` as in older examples) -- Workflow input is validated via Pydantic models with `input_validator=` parameter - ---- - -## Challenge 1: SDK Version API Breaking Changes - -### Symptoms -``` -AttributeError: 'V1WorkflowRunDetails' object has no attribute 'workflow_run_id' -``` - -### Root Cause -Hatchet SDK v1.21+ changed the response structure for workflow creation. Old examples show: -```python -result = await client.runs.aio_create(workflow_name, input_data) -return result.workflow_run_id # OLD - doesn't work -``` - -### Resolution -Access the run ID through the new nested structure: -```python -result = await client.runs.aio_create(workflow_name, input_data) -return result.run.metadata.id # NEW - SDK v1.21+ -``` - -### Key Insight -**Don't trust documentation or examples.** Read the SDK source code or use IDE autocomplete to discover actual attribute names. The SDK evolves faster than docs. - ---- - -## Challenge 2: Worker Appears Hung at "starting runner..." - -### Symptoms -``` -[INFO] Starting Hatchet workers -[INFO] Starting Hatchet worker polling... -[INFO] STARTING HATCHET... -[INFO] starting runner... -# ... nothing else, appears stuck -``` - -### Root Cause -Without debug mode, Hatchet SDK doesn't log: -- Workflow registration -- gRPC connection status -- Heartbeat activity -- Action listener acquisition - -The worker IS working, you just can't see it. - -### Resolution -Always enable debug mode during development: -```bash -HATCHET_DEBUG=true -``` - -With debug enabled, you'll see the actual activity: -``` -[DEBUG] 'worker-name' waiting for ['workflow:task1', 'workflow:task2'] -[DEBUG] starting action listener: worker-name -[DEBUG] acquired action listener: 562d00a8-8895-42a1-b65b-46f905c902f9 -[DEBUG] sending heartbeat -``` - -### Key Insight -**Start every Hatchet debugging session with `HATCHET_DEBUG=true`.** Silent workers waste hours of debugging time. - ---- - -## Challenge 3: Docker Networking + JWT Token URL Conflicts - -### Symptoms -``` -grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: - status = StatusCode.UNAVAILABLE - details = "failed to connect to all addresses" -``` - -### Root Cause -The Hatchet API token embeds URLs: -```json -{ - "aud": "http://localhost:8889", - "grpc_broadcast_address": "localhost:7077", - "server_url": "http://localhost:8889" -} -``` - -Inside Docker containers, `localhost` refers to the container itself, not the Hatchet server. - -### Resolution -Override the token-embedded URLs with environment variables: -```bash -# In .env or docker-compose environment -HATCHET_CLIENT_HOST_PORT=hatchet:7077 -HATCHET_CLIENT_SERVER_URL=http://hatchet:8888 -HATCHET_CLIENT_TLS_STRATEGY=none -``` - -### Key Insight -**The JWT token is not the final word on connection settings.** Environment variables override token-embedded URLs, which is essential for Docker networking. - ---- - -## Challenge 4: Workflow Name Case Sensitivity - -### Symptoms -``` -BadRequestException: (400) -HTTP response body: errors=[APIError(description='workflow names not found: diarizationpipeline')] -``` - -### Root Cause -Hatchet uses the exact workflow name you define for triggering: -```python -diarization_pipeline = hatchet.workflow( - name="DiarizationPipeline", # Use THIS exact name to trigger - input_validator=PipelineInput -) -``` - -Internally, task identifiers are lowercased (`diarizationpipeline:get_recording`), but workflow triggers must match the defined name. - -### Resolution -```python -# Correct -await client.start_workflow('DiarizationPipeline', input_data) - -# Wrong -await client.start_workflow('diarizationpipeline', input_data) -``` - -### Key Insight -**Workflow names are case-sensitive for triggering, but task refs are lowercase.** Don't conflate the two. - ---- - -## Challenge 5: Pydantic Response Object Iteration - -### Symptoms -``` -AttributeError: 'tuple' object has no attribute 'participant_id' -``` - -### Root Cause -When API responses return Pydantic models with list fields: -```python -class MeetingParticipantsResponse(BaseModel): - data: List[MeetingParticipant] -``` - -Iterating the response object directly is wrong: -```python -for p in participants: # WRONG - iterates over model fields as tuples -``` - -### Resolution -Access the `.data` attribute explicitly: -```python -for p in participants.data: # CORRECT - iterates over list items - print(p.participant_id) -``` - -### Key Insight -**Pydantic models with list fields require explicit `.data` access.** The model itself is not iterable in the expected way. - ---- - -## Challenge 6: Database Connections in Async Workers - -### Symptoms -``` -InterfaceError: cannot perform operation: another operation is in progress -``` - -### Root Cause -Similar to Conductor, Hatchet workers may inherit stale database connections. Each task runs in an async context that may not share the same event loop as cached connections. - -### Resolution -Create fresh database connections per task: -```python -async def _get_fresh_db_connection(): - """Create fresh database connection for worker task.""" - import databases - from reflector.db import _database_context - from reflector.settings import settings - - _database_context.set(None) - db = databases.Database(settings.DATABASE_URL) - _database_context.set(db) - await db.connect() - return db - -async def _close_db_connection(db): - await db.disconnect() - _database_context.set(None) -``` - -### Key Insight -**Cached singletons (DB, HTTP clients) are unsafe in workflow workers.** Always create fresh connections. - ---- - -## Challenge 7: Child Workflow Fan-out Pattern - -### Symptoms -Child workflows spawn but parent doesn't wait for completion, or results aren't collected. - -### Root Cause -Hatchet child workflows need explicit spawning and result collection: -```python -# Spawning children -child_runs = await asyncio.gather(*[ - child_workflow.aio_run(child_input) - for child_input in inputs -]) - -# Results are returned directly from aio_run() -``` - -### Resolution -Use `aio_run()` for child workflows and `asyncio.gather()` for parallelism: -```python -@parent_workflow.task(parents=[setup_task]) -async def process_tracks(input: ParentInput, ctx: Context) -> dict: - child_coroutines = [ - track_workflow.aio_run(TrackInput(track_index=i, ...)) - for i in range(len(input.tracks)) - ] - - results = await asyncio.gather(*child_coroutines, return_exceptions=True) - - # Handle failures - for i, result in enumerate(results): - if isinstance(result, Exception): - logger.error(f"Track {i} failed: {result}") - - return {"track_results": [r for r in results if not isinstance(r, Exception)]} -``` - -### Key Insight -**Child workflows in Hatchet return results directly.** No need to poll for completion like in Conductor. - ---- - -## Challenge 8: Workflow Replay and Code Updates - -### Symptoms -After fixing a bug in workflow code, clicking "Replay Event" in Hatchet UI still shows the old error/behavior. - -### Root Cause -Hatchet replay creates a **new workflow instance** with the latest registered workflow version. However, the worker must be restarted to register the new code version. Without restart, the worker is still running the old Python module in memory. - -From [Hatchet docs](https://github.com/hatchet-dev/hatchet/blob/059a4e562cd7feb5cc5728c14b04974decd72400/frontend/v0-docs/pages/home/features/retries/manual.mdx): -> "To retry a failed step, simply click on the step in the run details view and then click the 'Replay Event' button. This will create a new instance of the workflow, starting from the failed step, and using the same input data as the original run." - -### Resolution -**Required steps to see code changes in replayed workflows:** - -1. **Edit the code** - make your changes to workflow files -2. **Restart the worker** - registers new workflow version: - ```bash - docker compose restart hatchet-worker - ``` -3. **Replay in Hatchet UI** - click "Replay Event" on failed step -4. **Verify new code runs** - check logs for your changes - -**For CLI-based reprocessing:** -```bash -# Default: replays existing workflow with latest code (after worker restart) -docker compose exec server uv run -m reflector.tools.process_transcript - -# Force: cancels old workflow and starts fresh -docker compose exec server uv run -m reflector.tools.process_transcript --force -``` - -### Key Insight -**Replay uses updated code, but ONLY after worker restart.** Python module caching means the worker process must be restarted to pick up code changes. Simply rebuilding the container is not enough if the worker process is still running old bytecode. - ---- - -## Debugging Workflow - -### 1. Enable Debug Mode First -```bash -HATCHET_DEBUG=true -``` - -### 2. Verify Worker Registration -Look for this in debug logs: -``` -[DEBUG] 'worker-name' waiting for ['workflow:task1', 'workflow:task2', ...] -[DEBUG] acquired action listener: {uuid} -``` - -### 3. Test Workflow Trigger Separately -```python -docker exec server uv run python -c " -from reflector.hatchet.client import HatchetClientManager -from reflector.hatchet.workflows.diarization_pipeline import PipelineInput -import asyncio - -async def test(): - input_data = PipelineInput( - transcript_id='test', - recording_id=None, - room_name='test-room', - bucket_name='bucket', - tracks=[], - ) - run_id = await HatchetClientManager.start_workflow( - 'DiarizationPipeline', - input_data.model_dump() - ) - print(f'Triggered: {run_id}') - -asyncio.run(test()) -" -``` - -### 4. Check Hatchet Server Logs -```bash -docker logs reflector-hatchet-1 --tail 50 -``` - -Look for `WRN` entries indicating API errors or connection issues. - -### 5. Verify gRPC Connectivity -```python -docker exec worker python -c " -import socket -sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) -result = sock.connect_ex(('hatchet', 7077)) -print(f'gRPC port 7077: {\"reachable\" if result == 0 else \"blocked\"}')" -``` - -### 6. Force Container Rebuild -Volume mounts may cache old bytecode: -```bash -docker compose up -d --build --force-recreate hatchet-worker -``` - ---- - -## Common Gotchas Summary - -| Issue | Signal | Fix | -|-------|--------|-----| -| SDK API changed | `AttributeError` on result | Check SDK source for actual attributes | -| Worker appears stuck | Only "starting runner..." | Enable `HATCHET_DEBUG=true` | -| Can't connect from Docker | gRPC unavailable | Set `HATCHET_CLIENT_HOST_PORT` and `_SERVER_URL` | -| Workflow not found | 400 Bad Request | Use exact case-sensitive workflow name | -| Tuple iteration error | `'tuple' has no attribute` | Access `.data` on Pydantic response models | -| DB conflicts | "another operation in progress" | Fresh DB connection per task | -| Old code running | Fixed code but same error | Restart worker: `docker compose restart hatchet-worker` | -| Replay shows old behavior | Code changed but replay unchanged | Restart worker, then replay in UI | - ---- - -## Files Most Likely to Need Hatchet-Specific Handling - -- `server/reflector/hatchet/workflows/*.py` - Workflow and task definitions -- `server/reflector/hatchet/client.py` - Client wrapper, SDK version compatibility -- `server/reflector/hatchet/run_workers.py` - Worker startup and registration -- `server/reflector/hatchet/progress.py` - Progress emission for UI updates -- `docker-compose.yml` - Hatchet infrastructure services diff --git a/server/reflector/db/rooms.py b/server/reflector/db/rooms.py index fc6194e3..0ebc615a 100644 --- a/server/reflector/db/rooms.py +++ b/server/reflector/db/rooms.py @@ -57,6 +57,12 @@ rooms = sqlalchemy.Table( sqlalchemy.String, nullable=False, ), + sqlalchemy.Column( + "use_hatchet", + sqlalchemy.Boolean, + nullable=False, + server_default=false(), + ), sqlalchemy.Index("idx_room_is_shared", "is_shared"), sqlalchemy.Index("idx_room_ics_enabled", "ics_enabled"), ) @@ -85,6 +91,7 @@ class Room(BaseModel): ics_last_sync: datetime | None = None ics_last_etag: str | None = None platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) + use_hatchet: bool = False class RoomController: diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index 62d0d30f..cfe7a280 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -176,10 +176,56 @@ def dispatch_transcript_processing( config: ProcessingConfig, force: bool = False ) -> AsyncResult | None: if isinstance(config, MultitrackProcessingConfig): - # Start durable workflow if enabled (Hatchet or Conductor) - durable_started = False + # Check if room has use_hatchet=True (overrides env vars) + room_forces_hatchet = False + if config.room_id: + import asyncio - if settings.HATCHET_ENABLED: + from reflector.db.rooms import rooms_controller + + async def _check_room_hatchet(): + import databases + + from reflector.db import _database_context + + db = databases.Database(settings.DATABASE_URL) + _database_context.set(db) + await db.connect() + try: + room = await rooms_controller.get_by_id(config.room_id) + return room.use_hatchet if room else False + finally: + await db.disconnect() + _database_context.set(None) + + try: + loop = asyncio.get_running_loop() + except RuntimeError: + loop = None + + if loop and loop.is_running(): + import concurrent.futures + + with concurrent.futures.ThreadPoolExecutor() as pool: + room_forces_hatchet = pool.submit( + asyncio.run, _check_room_hatchet() + ).result() + else: + room_forces_hatchet = asyncio.run(_check_room_hatchet()) + + # Start durable workflow if enabled (Hatchet or Conductor) + # or if room has use_hatchet=True + durable_started = False + use_hatchet = settings.HATCHET_ENABLED or room_forces_hatchet + + if room_forces_hatchet: + logger.info( + "Room forces Hatchet workflow", + room_id=config.room_id, + transcript_id=config.transcript_id, + ) + + if use_hatchet: import asyncio import databases @@ -287,11 +333,11 @@ def dispatch_transcript_processing( logger.info("Hatchet workflow dispatched", workflow_id=workflow_id) durable_started = True - # If durable workflow started and not in shadow mode, skip Celery - if durable_started and not settings.DURABLE_WORKFLOW_SHADOW_MODE: + # If durable workflow started, skip Celery + if durable_started: return None - # Celery pipeline (shadow mode or durable workflows disabled) + # Celery pipeline (durable workflows disabled) return task_pipeline_multitrack_process.delay( transcript_id=config.transcript_id, bucket_name=config.bucket_name, diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 1d7b3e45..bd62c217 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -153,7 +153,6 @@ class Settings(BaseSettings): # Durable workflow orchestration # Provider: "hatchet" (or "none" to disable) DURABLE_WORKFLOW_PROVIDER: str = "none" - DURABLE_WORKFLOW_SHADOW_MODE: bool = False # Run both provider + Celery # Hatchet workflow orchestration HATCHET_CLIENT_TOKEN: str | None = None diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 043684e5..3f27db7b 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -320,11 +320,11 @@ async def _process_multitrack_recording_inner( ) durable_started = True - # If durable workflow started and not in shadow mode, skip Celery - if durable_started and not settings.DURABLE_WORKFLOW_SHADOW_MODE: + # If durable workflow started, skip Celery + if durable_started: return - # Celery pipeline (runs when durable workflows disabled OR in shadow mode) + # Celery pipeline (runs when durable workflows disabled) task_pipeline_multitrack_process.delay( transcript_id=transcript.id, bucket_name=bucket_name,