Compare commits

...

2 Commits

Author SHA1 Message Date
8fc8d8bf4a chore(main): release 0.29.0 (#828) 2026-01-22 12:52:39 -05:00
c723752b7e feat: set hatchet as default for multitracks (#822)
* set hatchet as default for multitracks

* fix: pipeline routing tests for hatchet-default branch

- Create room with use_celery=True to force Celery backend in tests
- Link transcript to room to enable multitrack pipeline routing
- Fixes test failures caused by missing HATCHET_CLIENT_TOKEN in test env

* Update server/reflector/services/transcript_process.py

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
2026-01-21 17:05:03 -05:00
10 changed files with 155 additions and 161 deletions

View File

@@ -1,5 +1,12 @@
# Changelog # Changelog
## [0.29.0](https://github.com/Monadical-SAS/reflector/compare/v0.28.1...v0.29.0) (2026-01-21)
### Features
* set hatchet as default for multitracks ([#822](https://github.com/Monadical-SAS/reflector/issues/822)) ([c723752](https://github.com/Monadical-SAS/reflector/commit/c723752b7e15aa48a41ad22856f147a5517d3f46))
## [0.28.1](https://github.com/Monadical-SAS/reflector/compare/v0.28.0...v0.28.1) (2026-01-21) ## [0.28.1](https://github.com/Monadical-SAS/reflector/compare/v0.28.0...v0.28.1) (2026-01-21)

View File

@@ -0,0 +1,44 @@
"""replace_use_hatchet_with_use_celery
Revision ID: 80beb1ea3269
Revises: bd3a729bb379
Create Date: 2026-01-20 16:26:25.555869
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "80beb1ea3269"
down_revision: Union[str, None] = "bd3a729bb379"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)
batch_op.drop_column("use_hatchet")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_hatchet",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)
batch_op.drop_column("use_celery")

View File

@@ -58,7 +58,7 @@ rooms = sqlalchemy.Table(
nullable=False, nullable=False,
), ),
sqlalchemy.Column( sqlalchemy.Column(
"use_hatchet", "use_celery",
sqlalchemy.Boolean, sqlalchemy.Boolean,
nullable=False, nullable=False,
server_default=false(), server_default=false(),
@@ -97,7 +97,7 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None ics_last_sync: datetime | None = None
ics_last_etag: str | None = None ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_hatchet: bool = False use_celery: bool = False
skip_consent: bool = False skip_consent: bool = False

View File

@@ -12,14 +12,9 @@ from reflector.hatchet.workflows.daily_multitrack_pipeline import (
daily_multitrack_pipeline, daily_multitrack_pipeline,
) )
from reflector.logger import logger from reflector.logger import logger
from reflector.settings import settings
def main(): def main():
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting CPU workers")
return
hatchet = HatchetClientManager.get_client() hatchet = HatchetClientManager.get_client()
logger.info( logger.info(

View File

@@ -11,7 +11,6 @@ from reflector.hatchet.workflows.subject_processing import subject_workflow
from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow
from reflector.hatchet.workflows.track_processing import track_workflow from reflector.hatchet.workflows.track_processing import track_workflow
from reflector.logger import logger from reflector.logger import logger
from reflector.settings import settings
SLOTS = 10 SLOTS = 10
WORKER_NAME = "llm-worker-pool" WORKER_NAME = "llm-worker-pool"
@@ -19,10 +18,6 @@ POOL = "llm-io"
def main(): def main():
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting LLM workers")
return
hatchet = HatchetClientManager.get_client() hatchet = HatchetClientManager.get_client()
logger.info( logger.info(

View File

@@ -23,7 +23,6 @@ from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import ( from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process, task_pipeline_multitrack_process,
) )
from reflector.settings import settings
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -102,8 +101,8 @@ async def validate_transcript_for_processing(
if transcript.locked: if transcript.locked:
return ValidationLocked(detail="Recording is locked") return ValidationLocked(detail="Recording is locked")
# hatchet is idempotent anyways + if it wasn't dispatched successfully # Check if recording is ready for processing
if transcript.status == "idle" and not settings.HATCHET_ENABLED: if transcript.status == "idle" and not transcript.workflow_run_id:
return ValidationNotReady(detail="Recording is not ready for processing") return ValidationNotReady(detail="Recording is not ready for processing")
# Check Celery tasks # Check Celery tasks
@@ -116,7 +115,8 @@ async def validate_transcript_for_processing(
): ):
return ValidationAlreadyScheduled(detail="already running") return ValidationAlreadyScheduled(detail="already running")
if settings.HATCHET_ENABLED and transcript.workflow_run_id: # Check Hatchet workflow status if workflow_run_id exists
if transcript.workflow_run_id:
try: try:
status = await HatchetClientManager.get_workflow_run_status( status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id transcript.workflow_run_id
@@ -181,19 +181,16 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows. Returns AsyncResult for Celery tasks, None for Hatchet workflows.
""" """
if isinstance(config, MultitrackProcessingConfig): if isinstance(config, MultitrackProcessingConfig):
# Check if room has use_hatchet=True (overrides env vars) use_celery = False
room_forces_hatchet = False
if config.room_id: if config.room_id:
room = await rooms_controller.get_by_id(config.room_id) room = await rooms_controller.get_by_id(config.room_id)
room_forces_hatchet = room.use_hatchet if room else False use_celery = room.use_celery if room else False
# Start durable workflow if enabled (Hatchet) use_hatchet = not use_celery
# and if room has use_hatchet=True
use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet
if room_forces_hatchet: if use_celery:
logger.info( logger.info(
"Room forces Hatchet workflow", "Room uses legacy Celery processing",
room_id=config.room_id, room_id=config.room_id,
transcript_id=config.transcript_id, transcript_id=config.transcript_id,
) )

View File

@@ -158,19 +158,10 @@ class Settings(BaseSettings):
ZULIP_API_KEY: str | None = None ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None ZULIP_BOT_EMAIL: str | None = None
# Durable workflow orchestration # Hatchet workflow orchestration (always enabled for multitrack processing)
# Provider: "hatchet" (or "none" to disable)
DURABLE_WORKFLOW_PROVIDER: str = "none"
# Hatchet workflow orchestration
HATCHET_CLIENT_TOKEN: str | None = None HATCHET_CLIENT_TOKEN: str | None = None
HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls
HATCHET_DEBUG: bool = False HATCHET_DEBUG: bool = False
@property
def HATCHET_ENABLED(self) -> bool:
"""True if Hatchet is the active provider."""
return self.DURABLE_WORKFLOW_PROVIDER == "hatchet"
settings = Settings() settings = Settings()

View File

@@ -287,11 +287,12 @@ async def _process_multitrack_recording_inner(
room_id=room.id, room_id=room.id,
) )
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet use_celery = room and room.use_celery
use_hatchet = not use_celery
if room and room.use_hatchet and not settings.HATCHET_ENABLED: if use_celery:
logger.info( logger.info(
"Room forces Hatchet workflow", "Room uses legacy Celery processing",
room_id=room.id, room_id=room.id,
transcript_id=transcript.id, transcript_id=transcript.id,
) )
@@ -810,7 +811,6 @@ async def reprocess_failed_daily_recordings():
) )
continue continue
# Fetch room to check use_hatchet flag
room = None room = None
if meeting.room_id: if meeting.room_id:
room = await rooms_controller.get_by_id(meeting.room_id) room = await rooms_controller.get_by_id(meeting.room_id)
@@ -834,10 +834,10 @@ async def reprocess_failed_daily_recordings():
) )
continue continue
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_hatchet: if use_hatchet:
# Hatchet requires a transcript for workflow_run_id tracking
if not transcript: if not transcript:
logger.warning( logger.warning(
"No transcript for Hatchet reprocessing, skipping", "No transcript for Hatchet reprocessing, skipping",

View File

@@ -2,10 +2,9 @@
Tests for Hatchet workflow dispatch and routing logic. Tests for Hatchet workflow dispatch and routing logic.
These tests verify: These tests verify:
1. Routing to Hatchet when HATCHET_ENABLED=True 1. Hatchet workflow validation and replay logic
2. Replay logic for failed workflows 2. Force flag to cancel and restart workflows
3. Force flag to cancel and restart 3. Validation prevents concurrent workflows
4. Validation prevents concurrent workflows
""" """
from unittest.mock import AsyncMock, patch from unittest.mock import AsyncMock, patch
@@ -34,25 +33,22 @@ async def test_hatchet_validation_blocks_running_workflow():
workflow_run_id="running-workflow-123", workflow_run_id="running-workflow-123",
) )
with patch("reflector.services.transcript_process.settings") as mock_settings: with patch(
mock_settings.HATCHET_ENABLED = True "reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.RUNNING
)
with patch( with patch(
"reflector.services.transcript_process.HatchetClientManager" "reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_hatchet: ) as mock_celery_check:
mock_hatchet.get_workflow_run_status = AsyncMock( mock_celery_check.return_value = False
return_value=V1TaskStatus.RUNNING
)
with patch( result = await validate_transcript_for_processing(mock_transcript)
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript) assert isinstance(result, ValidationAlreadyScheduled)
assert "running" in result.detail.lower()
assert isinstance(result, ValidationAlreadyScheduled)
assert "running" in result.detail.lower()
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")
@@ -72,24 +68,21 @@ async def test_hatchet_validation_blocks_queued_workflow():
workflow_run_id="queued-workflow-123", workflow_run_id="queued-workflow-123",
) )
with patch("reflector.services.transcript_process.settings") as mock_settings: with patch(
mock_settings.HATCHET_ENABLED = True "reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.QUEUED
)
with patch( with patch(
"reflector.services.transcript_process.HatchetClientManager" "reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_hatchet: ) as mock_celery_check:
mock_hatchet.get_workflow_run_status = AsyncMock( mock_celery_check.return_value = False
return_value=V1TaskStatus.QUEUED
)
with patch( result = await validate_transcript_for_processing(mock_transcript)
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript) assert isinstance(result, ValidationAlreadyScheduled)
assert isinstance(result, ValidationAlreadyScheduled)
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")
@@ -110,25 +103,22 @@ async def test_hatchet_validation_allows_failed_workflow():
recording_id="test-recording-id", recording_id="test-recording-id",
) )
with patch("reflector.services.transcript_process.settings") as mock_settings: with patch(
mock_settings.HATCHET_ENABLED = True "reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.FAILED
)
with patch( with patch(
"reflector.services.transcript_process.HatchetClientManager" "reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_hatchet: ) as mock_celery_check:
mock_hatchet.get_workflow_run_status = AsyncMock( mock_celery_check.return_value = False
return_value=V1TaskStatus.FAILED
)
with patch( result = await validate_transcript_for_processing(mock_transcript)
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript) assert isinstance(result, ValidationOk)
assert result.transcript_id == "test-transcript-id"
assert isinstance(result, ValidationOk)
assert result.transcript_id == "test-transcript-id"
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")
@@ -149,24 +139,21 @@ async def test_hatchet_validation_allows_completed_workflow():
recording_id="test-recording-id", recording_id="test-recording-id",
) )
with patch("reflector.services.transcript_process.settings") as mock_settings: with patch(
mock_settings.HATCHET_ENABLED = True "reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.COMPLETED
)
with patch( with patch(
"reflector.services.transcript_process.HatchetClientManager" "reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_hatchet: ) as mock_celery_check:
mock_hatchet.get_workflow_run_status = AsyncMock( mock_celery_check.return_value = False
return_value=V1TaskStatus.COMPLETED
)
with patch( result = await validate_transcript_for_processing(mock_transcript)
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript) assert isinstance(result, ValidationOk)
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")
@@ -187,26 +174,23 @@ async def test_hatchet_validation_allows_when_status_check_fails():
recording_id="test-recording-id", recording_id="test-recording-id",
) )
with patch("reflector.services.transcript_process.settings") as mock_settings: with patch(
mock_settings.HATCHET_ENABLED = True "reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Status check fails (workflow might be deleted)
mock_hatchet.get_workflow_run_status = AsyncMock(
side_effect=ApiException("Workflow not found")
)
with patch( with patch(
"reflector.services.transcript_process.HatchetClientManager" "reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_hatchet: ) as mock_celery_check:
# Status check fails (workflow might be deleted) mock_celery_check.return_value = False
mock_hatchet.get_workflow_run_status = AsyncMock(
side_effect=ApiException("Workflow not found")
)
with patch( result = await validate_transcript_for_processing(mock_transcript)
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript) # Should allow processing when we can't get status
assert isinstance(result, ValidationOk)
# Should allow processing when we can't get status
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")
@@ -227,47 +211,11 @@ async def test_hatchet_validation_skipped_when_no_workflow_id():
recording_id="test-recording-id", recording_id="test-recording-id",
) )
with patch("reflector.services.transcript_process.settings") as mock_settings: with patch(
mock_settings.HATCHET_ENABLED = True "reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
with patch( # Should not be called
"reflector.services.transcript_process.HatchetClientManager" mock_hatchet.get_workflow_run_status = AsyncMock()
) as mock_hatchet:
# Should not be called
mock_hatchet.get_workflow_run_status = AsyncMock()
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
# Should not check Hatchet status
mock_hatchet.get_workflow_run_status.assert_not_called()
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_skipped_when_disabled():
"""Test that Hatchet validation is skipped when HATCHET_ENABLED is False."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="uploaded",
source_kind="room",
workflow_run_id="some-workflow-123",
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = False # Hatchet disabled
with patch( with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active" "reflector.services.transcript_process.task_is_scheduled_or_active"
@@ -276,7 +224,8 @@ async def test_hatchet_validation_skipped_when_disabled():
result = await validate_transcript_for_processing(mock_transcript) result = await validate_transcript_for_processing(mock_transcript)
# Should not check Hatchet at all # Should not check Hatchet status
mock_hatchet.get_workflow_run_status.assert_not_called()
assert isinstance(result, ValidationOk) assert isinstance(result, ValidationOk)

View File

@@ -162,9 +162,24 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
from datetime import datetime, timezone from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller from reflector.db.transcripts import transcripts_controller
# Create transcript with Daily.co multitrack recording room = await rooms_controller.add(
name="test-room",
user_id="test-user",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
)
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add( transcript = await transcripts_controller.add(
"", "",
source_kind="room", source_kind="room",
@@ -172,6 +187,7 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
target_language="en", target_language="en",
user_id="test-user", user_id="test-user",
share_mode="public", share_mode="public",
room_id=room.id,
) )
track_keys = [ track_keys = [