mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-07 03:06:46 +00:00
feat: set hatchet as default for multitracks (#822)
* set hatchet as default for multitracks * fix: pipeline routing tests for hatchet-default branch - Create room with use_celery=True to force Celery backend in tests - Link transcript to room to enable multitrack pipeline routing - Fixes test failures caused by missing HATCHET_CLIENT_TOKEN in test env * Update server/reflector/services/transcript_process.py Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com> --------- Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com> Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
This commit is contained in:
@@ -58,7 +58,7 @@ rooms = sqlalchemy.Table(
|
||||
nullable=False,
|
||||
),
|
||||
sqlalchemy.Column(
|
||||
"use_hatchet",
|
||||
"use_celery",
|
||||
sqlalchemy.Boolean,
|
||||
nullable=False,
|
||||
server_default=false(),
|
||||
@@ -97,7 +97,7 @@ class Room(BaseModel):
|
||||
ics_last_sync: datetime | None = None
|
||||
ics_last_etag: str | None = None
|
||||
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
|
||||
use_hatchet: bool = False
|
||||
use_celery: bool = False
|
||||
skip_consent: bool = False
|
||||
|
||||
|
||||
|
||||
@@ -12,14 +12,9 @@ from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||
daily_multitrack_pipeline,
|
||||
)
|
||||
from reflector.logger import logger
|
||||
from reflector.settings import settings
|
||||
|
||||
|
||||
def main():
|
||||
if not settings.HATCHET_ENABLED:
|
||||
logger.error("HATCHET_ENABLED is False, not starting CPU workers")
|
||||
return
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
logger.info(
|
||||
|
||||
@@ -11,7 +11,6 @@ from reflector.hatchet.workflows.subject_processing import subject_workflow
|
||||
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
|
||||
from reflector.hatchet.workflows.track_processing import track_workflow
|
||||
from reflector.logger import logger
|
||||
from reflector.settings import settings
|
||||
|
||||
SLOTS = 10
|
||||
WORKER_NAME = "llm-worker-pool"
|
||||
@@ -19,10 +18,6 @@ POOL = "llm-io"
|
||||
|
||||
|
||||
def main():
|
||||
if not settings.HATCHET_ENABLED:
|
||||
logger.error("HATCHET_ENABLED is False, not starting LLM workers")
|
||||
return
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
logger.info(
|
||||
|
||||
@@ -23,7 +23,6 @@ from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
||||
from reflector.pipelines.main_multitrack_pipeline import (
|
||||
task_pipeline_multitrack_process,
|
||||
)
|
||||
from reflector.settings import settings
|
||||
from reflector.utils.string import NonEmptyString
|
||||
|
||||
|
||||
@@ -102,8 +101,8 @@ async def validate_transcript_for_processing(
|
||||
if transcript.locked:
|
||||
return ValidationLocked(detail="Recording is locked")
|
||||
|
||||
# hatchet is idempotent anyways + if it wasn't dispatched successfully
|
||||
if transcript.status == "idle" and not settings.HATCHET_ENABLED:
|
||||
# Check if recording is ready for processing
|
||||
if transcript.status == "idle" and not transcript.workflow_run_id:
|
||||
return ValidationNotReady(detail="Recording is not ready for processing")
|
||||
|
||||
# Check Celery tasks
|
||||
@@ -116,7 +115,8 @@ async def validate_transcript_for_processing(
|
||||
):
|
||||
return ValidationAlreadyScheduled(detail="already running")
|
||||
|
||||
if settings.HATCHET_ENABLED and transcript.workflow_run_id:
|
||||
# Check Hatchet workflow status if workflow_run_id exists
|
||||
if transcript.workflow_run_id:
|
||||
try:
|
||||
status = await HatchetClientManager.get_workflow_run_status(
|
||||
transcript.workflow_run_id
|
||||
@@ -181,19 +181,16 @@ async def dispatch_transcript_processing(
|
||||
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
|
||||
"""
|
||||
if isinstance(config, MultitrackProcessingConfig):
|
||||
# Check if room has use_hatchet=True (overrides env vars)
|
||||
room_forces_hatchet = False
|
||||
use_celery = False
|
||||
if config.room_id:
|
||||
room = await rooms_controller.get_by_id(config.room_id)
|
||||
room_forces_hatchet = room.use_hatchet if room else False
|
||||
use_celery = room.use_celery if room else False
|
||||
|
||||
# Start durable workflow if enabled (Hatchet)
|
||||
# and if room has use_hatchet=True
|
||||
use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet
|
||||
use_hatchet = not use_celery
|
||||
|
||||
if room_forces_hatchet:
|
||||
if use_celery:
|
||||
logger.info(
|
||||
"Room forces Hatchet workflow",
|
||||
"Room uses legacy Celery processing",
|
||||
room_id=config.room_id,
|
||||
transcript_id=config.transcript_id,
|
||||
)
|
||||
|
||||
@@ -158,19 +158,10 @@ class Settings(BaseSettings):
|
||||
ZULIP_API_KEY: str | None = None
|
||||
ZULIP_BOT_EMAIL: str | None = None
|
||||
|
||||
# Durable workflow orchestration
|
||||
# Provider: "hatchet" (or "none" to disable)
|
||||
DURABLE_WORKFLOW_PROVIDER: str = "none"
|
||||
|
||||
# Hatchet workflow orchestration
|
||||
# Hatchet workflow orchestration (always enabled for multitrack processing)
|
||||
HATCHET_CLIENT_TOKEN: str | None = None
|
||||
HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls
|
||||
HATCHET_DEBUG: bool = False
|
||||
|
||||
@property
|
||||
def HATCHET_ENABLED(self) -> bool:
|
||||
"""True if Hatchet is the active provider."""
|
||||
return self.DURABLE_WORKFLOW_PROVIDER == "hatchet"
|
||||
|
||||
|
||||
settings = Settings()
|
||||
|
||||
@@ -287,11 +287,12 @@ async def _process_multitrack_recording_inner(
|
||||
room_id=room.id,
|
||||
)
|
||||
|
||||
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
|
||||
use_celery = room and room.use_celery
|
||||
use_hatchet = not use_celery
|
||||
|
||||
if room and room.use_hatchet and not settings.HATCHET_ENABLED:
|
||||
if use_celery:
|
||||
logger.info(
|
||||
"Room forces Hatchet workflow",
|
||||
"Room uses legacy Celery processing",
|
||||
room_id=room.id,
|
||||
transcript_id=transcript.id,
|
||||
)
|
||||
@@ -810,7 +811,6 @@ async def reprocess_failed_daily_recordings():
|
||||
)
|
||||
continue
|
||||
|
||||
# Fetch room to check use_hatchet flag
|
||||
room = None
|
||||
if meeting.room_id:
|
||||
room = await rooms_controller.get_by_id(meeting.room_id)
|
||||
@@ -834,10 +834,10 @@ async def reprocess_failed_daily_recordings():
|
||||
)
|
||||
continue
|
||||
|
||||
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
|
||||
use_celery = room and room.use_celery
|
||||
use_hatchet = not use_celery
|
||||
|
||||
if use_hatchet:
|
||||
# Hatchet requires a transcript for workflow_run_id tracking
|
||||
if not transcript:
|
||||
logger.warning(
|
||||
"No transcript for Hatchet reprocessing, skipping",
|
||||
|
||||
Reference in New Issue
Block a user