feat: durable (#794)

* durable (no-mistakes)

* hatchet no-mistake

* hatchet no-mistake

* hatchet no-mistake, better logging

* remove conductor and add hatchet tests (no-mistakes)

* self-review (no-mistakes)

* hatched logs

* remove shadow mode for hatchet

* and add hatchet processor setting to room

* .

* cleanup

* hatchet init db

* self-review (no-mistakes)

* self-review (no-mistakes)

* hatchet: restore zullip report

* self-review round

* self-review round

* self-review round

* dry hatchet with celery

* dry hatched with celery - 2

* self-review round

* more NES instead of str

* self-review wip

* self-review round

* self-review round

* self-review round

* can_replay cancelled

* add forgotten file

* pr autoreviewer fixes

* better log webhook events

* durable_started return

* migration sync

* latest changes feature parity

* migration merge

* pr review

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
This commit is contained in:
Igor Monadical
2025-12-22 12:09:20 -05:00
committed by GitHub
parent f580b996ee
commit 1dac999b56
36 changed files with 4908 additions and 2009 deletions

View File

@@ -0,0 +1,5 @@
"""Hatchet workflow orchestration for Reflector."""
from reflector.hatchet.client import HatchetClientManager
__all__ = ["HatchetClientManager"]

View File

@@ -0,0 +1,98 @@
"""WebSocket broadcasting helpers for Hatchet workflows.
DUPLICATION NOTE: To be kept when Celery is deprecated. Currently dupes Celery logic.
Provides WebSocket broadcasting for Hatchet that matches Celery's @broadcast_to_sockets
decorator behavior. Events are broadcast to transcript rooms and user rooms.
"""
from typing import Any
import structlog
from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller
from reflector.utils.string import NonEmptyString
from reflector.ws_manager import get_ws_manager
# Events that should also be sent to user room (matches Celery behavior)
USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"}
async def broadcast_event(
transcript_id: NonEmptyString,
event: TranscriptEvent,
logger: structlog.BoundLogger,
) -> None:
"""Broadcast a TranscriptEvent to WebSocket subscribers.
Fire-and-forget: errors are logged but don't interrupt workflow execution.
"""
logger.info(
"Broadcasting event",
transcript_id=transcript_id,
event_type=event.event,
)
try:
ws_manager = get_ws_manager()
await ws_manager.send_json(
room_id=f"ts:{transcript_id}",
message=event.model_dump(mode="json"),
)
logger.info(
"Event sent to transcript room",
transcript_id=transcript_id,
event_type=event.event,
)
if event.event in USER_ROOM_EVENTS:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript and transcript.user_id:
await ws_manager.send_json(
room_id=f"user:{transcript.user_id}",
message={
"event": f"TRANSCRIPT_{event.event}",
"data": {"id": transcript_id, **event.data},
},
)
except Exception as e:
logger.warning(
"Failed to broadcast event",
error=str(e),
transcript_id=transcript_id,
event_type=event.event,
)
async def set_status_and_broadcast(
transcript_id: NonEmptyString,
status: str,
logger: structlog.BoundLogger,
) -> None:
"""Set transcript status and broadcast to WebSocket.
Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting.
"""
event = await transcripts_controller.set_status(transcript_id, status)
if event:
await broadcast_event(transcript_id, event, logger=logger)
async def append_event_and_broadcast(
transcript_id: NonEmptyString,
transcript: Transcript,
event_name: str,
data: Any,
logger: structlog.BoundLogger,
) -> TranscriptEvent:
"""Append event to transcript and broadcast to WebSocket.
Wrapper around transcripts_controller.append_event that adds WebSocket broadcasting.
"""
event = await transcripts_controller.append_event(
transcript=transcript,
event=event_name,
data=data,
)
await broadcast_event(transcript_id, event, logger=logger)
return event

View File

@@ -0,0 +1,115 @@
"""Hatchet Python client wrapper.
Uses singleton pattern because:
1. Hatchet client maintains persistent gRPC connections for workflow registration
2. Creating multiple clients would cause registration conflicts and resource leaks
3. The SDK is designed for a single client instance per process
4. Tests use `HatchetClientManager.reset()` to isolate state between tests
"""
import logging
import threading
from hatchet_sdk import ClientConfig, Hatchet
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.logger import logger
from reflector.settings import settings
class HatchetClientManager:
"""Singleton manager for Hatchet client connections.
See module docstring for rationale. For test isolation, use `reset()`.
"""
_instance: Hatchet | None = None
_lock = threading.Lock()
@classmethod
def get_client(cls) -> Hatchet:
"""Get or create the Hatchet client (thread-safe singleton)."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
if not settings.HATCHET_CLIENT_TOKEN:
raise ValueError("HATCHET_CLIENT_TOKEN must be set")
# Pass root logger to Hatchet so workflow logs appear in dashboard
root_logger = logging.getLogger()
cls._instance = Hatchet(
debug=settings.HATCHET_DEBUG,
config=ClientConfig(logger=root_logger),
)
return cls._instance
@classmethod
async def start_workflow(
cls,
workflow_name: str,
input_data: dict,
additional_metadata: dict | None = None,
) -> str:
"""Start a workflow and return the workflow run ID.
Args:
workflow_name: Name of the workflow to trigger.
input_data: Input data for the workflow run.
additional_metadata: Optional metadata for filtering in dashboard
(e.g., transcript_id, recording_id).
"""
client = cls.get_client()
result = await client.runs.aio_create(
workflow_name,
input_data,
additional_metadata=additional_metadata,
)
return result.run.metadata.id
@classmethod
async def get_workflow_run_status(cls, workflow_run_id: str) -> V1TaskStatus:
client = cls.get_client()
return await client.runs.aio_get_status(workflow_run_id)
@classmethod
async def cancel_workflow(cls, workflow_run_id: str) -> None:
client = cls.get_client()
await client.runs.aio_cancel(workflow_run_id)
logger.info("[Hatchet] Cancelled workflow", workflow_run_id=workflow_run_id)
@classmethod
async def replay_workflow(cls, workflow_run_id: str) -> None:
client = cls.get_client()
await client.runs.aio_replay(workflow_run_id)
logger.info("[Hatchet] Replaying workflow", workflow_run_id=workflow_run_id)
@classmethod
async def can_replay(cls, workflow_run_id: str) -> bool:
"""Check if workflow can be replayed (is FAILED only).
CANCELLED workflows should start fresh (new run ID) rather than replay,
since cancellation indicates user intent to abort.
"""
try:
status = await cls.get_workflow_run_status(workflow_run_id)
return status == V1TaskStatus.FAILED
except Exception as e:
logger.warning(
"[Hatchet] Failed to check replay status",
workflow_run_id=workflow_run_id,
error=str(e),
)
return False
@classmethod
async def get_workflow_status(cls, workflow_run_id: str) -> dict:
"""Get the full workflow run details as dict."""
client = cls.get_client()
run = await client.runs.aio_get(workflow_run_id)
return run.to_dict()
@classmethod
def reset(cls) -> None:
"""Reset the client instance (for testing)."""
with cls._lock:
cls._instance = None

View File

@@ -0,0 +1,63 @@
"""
Run Hatchet workers for the diarization pipeline.
Runs as a separate process, just like Celery workers.
Usage:
uv run -m reflector.hatchet.run_workers
# Or via docker:
docker compose exec server uv run -m reflector.hatchet.run_workers
"""
import signal
import sys
from reflector.logger import logger
from reflector.settings import settings
def main() -> None:
"""Start Hatchet worker polling."""
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting workers")
sys.exit(1)
if not settings.HATCHET_CLIENT_TOKEN:
logger.error("HATCHET_CLIENT_TOKEN is not set")
sys.exit(1)
logger.info(
"Starting Hatchet workers",
debug=settings.HATCHET_DEBUG,
)
# Import here (not top-level) - workflow modules call HatchetClientManager.get_client()
# at module level because Hatchet SDK decorators (@workflow.task) bind at import time.
# Can't use lazy init: decorators need the client object when function is defined.
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415
diarization_pipeline,
track_workflow,
)
hatchet = HatchetClientManager.get_client()
worker = hatchet.worker(
"reflector-diarization-worker",
workflows=[diarization_pipeline, track_workflow],
)
def shutdown_handler(signum: int, frame) -> None:
logger.info("Received shutdown signal, stopping workers...")
# Worker cleanup happens automatically on exit
sys.exit(0)
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
logger.info("Starting Hatchet worker polling...")
worker.start()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,14 @@
"""Hatchet workflow definitions."""
from reflector.hatchet.workflows.diarization_pipeline import (
PipelineInput,
diarization_pipeline,
)
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
__all__ = [
"diarization_pipeline",
"track_workflow",
"PipelineInput",
"TrackInput",
]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,124 @@
"""
Pydantic models for Hatchet workflow task return types.
Provides static typing for all task outputs, enabling type checking
and better IDE support.
"""
from typing import Any
from pydantic import BaseModel
from reflector.utils.string import NonEmptyString
class PadTrackResult(BaseModel):
"""Result from pad_track task."""
padded_key: NonEmptyString # S3 key (not presigned URL) - presign on demand to avoid stale URLs on replay
bucket_name: (
NonEmptyString | None
) # None means use default transcript storage bucket
size: int
track_index: int
class TranscribeTrackResult(BaseModel):
"""Result from transcribe_track task."""
words: list[dict[str, Any]]
track_index: int
class RecordingResult(BaseModel):
"""Result from get_recording task."""
id: NonEmptyString | None
mtg_session_id: NonEmptyString | None
duration: float
class ParticipantsResult(BaseModel):
"""Result from get_participants task."""
participants: list[dict[str, Any]]
num_tracks: int
source_language: NonEmptyString
target_language: NonEmptyString
class PaddedTrackInfo(BaseModel):
"""Info for a padded track - S3 key + bucket for on-demand presigning."""
key: NonEmptyString
bucket_name: NonEmptyString | None # None = use default storage bucket
class ProcessTracksResult(BaseModel):
"""Result from process_tracks task."""
all_words: list[dict[str, Any]]
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
word_count: int
num_tracks: int
target_language: NonEmptyString
created_padded_files: list[NonEmptyString]
class MixdownResult(BaseModel):
"""Result from mixdown_tracks task."""
audio_key: NonEmptyString
duration: float
tracks_mixed: int
class WaveformResult(BaseModel):
"""Result from generate_waveform task."""
waveform_generated: bool
class TopicsResult(BaseModel):
"""Result from detect_topics task."""
topics: list[dict[str, Any]]
class TitleResult(BaseModel):
"""Result from generate_title task."""
title: str | None
class SummaryResult(BaseModel):
"""Result from generate_summary task."""
summary: str | None
short_summary: str | None
action_items: dict | None = None
class FinalizeResult(BaseModel):
"""Result from finalize task."""
status: NonEmptyString
class ConsentResult(BaseModel):
"""Result from cleanup_consent task."""
class ZulipResult(BaseModel):
"""Result from post_zulip task."""
zulip_message_id: int | None = None
skipped: bool = False
class WebhookResult(BaseModel):
"""Result from send_webhook task."""
webhook_sent: bool
skipped: bool = False
response_code: int | None = None

View File

@@ -0,0 +1,222 @@
"""
Hatchet child workflow: TrackProcessing
Handles individual audio track processing: padding and transcription.
Spawned dynamically by the main diarization pipeline for each track.
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline)
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py.
Note: This file uses deferred imports (inside tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure
storage/DB connections are not shared across forks.
"""
import tempfile
from datetime import timedelta
from pathlib import Path
import av
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import (
apply_audio_padding_to_file,
extract_stream_start_time_from_container,
)
class TrackInput(BaseModel):
"""Input for individual track processing."""
track_index: int
s3_key: str
bucket_name: str
transcript_id: str
language: str = "en"
hatchet = HatchetClientManager.get_client()
track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput)
@track_workflow.task(execution_timeout=timedelta(seconds=300), retries=3)
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
"""Pad single audio track with silence for alignment.
Extracts stream.start_time from WebM container metadata and applies
silence padding using PyAV filter graph (adelay).
"""
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
logger.info(
"[Hatchet] pad_track",
track_index=input.track_index,
s3_key=input.s3_key,
transcript_id=input.transcript_id,
)
try:
# Create fresh storage instance to avoid aioboto3 fork issues
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
source_url = await storage.get_file_url(
input.s3_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=input.bucket_name,
)
with av.open(source_url) as in_container:
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
# If no padding needed, return original S3 key
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
temp_path = temp_file.name
try:
apply_audio_padding_to_file(
in_container,
temp_path,
start_time_seconds,
input.track_index,
logger=logger,
)
file_size = Path(temp_path).stat().st_size
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
logger.info(
f"About to upload padded track",
key=storage_path,
size=file_size,
)
with open(temp_path, "rb") as padded_file:
await storage.put_file(storage_path, padded_file)
logger.info(
f"Uploaded padded track to S3",
key=storage_path,
size=file_size,
)
finally:
Path(temp_path).unlink(missing_ok=True)
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
logger.info(
"[Hatchet] pad_track complete",
track_index=input.track_index,
padded_key=storage_path,
)
# Return S3 key (not presigned URL) - consumer tasks presign on demand
# This avoids stale URLs when workflow is replayed
return PadTrackResult(
padded_key=storage_path,
bucket_name=None, # None = use default transcript storage bucket
size=file_size,
track_index=input.track_index,
)
except Exception as e:
logger.error("[Hatchet] pad_track failed", error=str(e), exc_info=True)
raise
@track_workflow.task(
parents=[pad_track], execution_timeout=timedelta(seconds=600), retries=3
)
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
ctx.log(f"transcribe_track: track {input.track_index}, language={input.language}")
logger.info(
"[Hatchet] transcribe_track",
track_index=input.track_index,
language=input.language,
)
try:
pad_result = ctx.task_output(pad_track)
padded_key = pad_result.padded_key
bucket_name = pad_result.bucket_name
if not padded_key:
raise ValueError("Missing padded_key from pad_track")
# Presign URL on demand (avoids stale URLs on workflow replay)
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
audio_url = await storage.get_file_url(
padded_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=bucket_name,
)
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
transcribe_file_with_processor,
)
transcript = await transcribe_file_with_processor(audio_url, input.language)
# Tag all words with speaker index
words = []
for word in transcript.words:
word_dict = word.model_dump()
word_dict["speaker"] = input.track_index
words.append(word_dict)
ctx.log(
f"transcribe_track complete: track {input.track_index}, {len(words)} words"
)
logger.info(
"[Hatchet] transcribe_track complete",
track_index=input.track_index,
word_count=len(words),
)
return TranscribeTrackResult(
words=words,
track_index=input.track_index,
)
except Exception as e:
logger.error("[Hatchet] transcribe_track failed", error=str(e), exc_info=True)
raise