diff --git a/server/migrations/versions/80beb1ea3269_replace_use_hatchet_with_use_celery.py b/server/migrations/versions/80beb1ea3269_replace_use_hatchet_with_use_celery.py new file mode 100644 index 00000000..778fa001 --- /dev/null +++ b/server/migrations/versions/80beb1ea3269_replace_use_hatchet_with_use_celery.py @@ -0,0 +1,44 @@ +"""replace_use_hatchet_with_use_celery + +Revision ID: 80beb1ea3269 +Revises: bd3a729bb379 +Create Date: 2026-01-20 16:26:25.555869 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "80beb1ea3269" +down_revision: Union[str, None] = "bd3a729bb379" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + with op.batch_alter_table("room", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "use_celery", + sa.Boolean(), + server_default=sa.text("false"), + nullable=False, + ) + ) + batch_op.drop_column("use_hatchet") + + +def downgrade() -> None: + with op.batch_alter_table("room", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "use_hatchet", + sa.Boolean(), + server_default=sa.text("false"), + nullable=False, + ) + ) + batch_op.drop_column("use_celery") diff --git a/server/reflector/db/rooms.py b/server/reflector/db/rooms.py index 478c096e..8228144c 100644 --- a/server/reflector/db/rooms.py +++ b/server/reflector/db/rooms.py @@ -58,7 +58,7 @@ rooms = sqlalchemy.Table( nullable=False, ), sqlalchemy.Column( - "use_hatchet", + "use_celery", sqlalchemy.Boolean, nullable=False, server_default=false(), @@ -97,7 +97,7 @@ class Room(BaseModel): ics_last_sync: datetime | None = None ics_last_etag: str | None = None platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) - use_hatchet: bool = False + use_celery: bool = False skip_consent: bool = False diff --git a/server/reflector/hatchet/run_workers_cpu.py b/server/reflector/hatchet/run_workers_cpu.py index 3fa1106d..ad946578 100644 --- a/server/reflector/hatchet/run_workers_cpu.py +++ b/server/reflector/hatchet/run_workers_cpu.py @@ -12,14 +12,9 @@ from reflector.hatchet.workflows.daily_multitrack_pipeline import ( daily_multitrack_pipeline, ) from reflector.logger import logger -from reflector.settings import settings def main(): - if not settings.HATCHET_ENABLED: - logger.error("HATCHET_ENABLED is False, not starting CPU workers") - return - hatchet = HatchetClientManager.get_client() logger.info( diff --git a/server/reflector/hatchet/run_workers_llm.py b/server/reflector/hatchet/run_workers_llm.py index 00c3a115..3ab0529a 100644 --- a/server/reflector/hatchet/run_workers_llm.py +++ b/server/reflector/hatchet/run_workers_llm.py @@ -11,7 +11,6 @@ from reflector.hatchet.workflows.subject_processing import subject_workflow from reflector.hatchet.workflows.topic_chunk_processing import topic_chunk_workflow from reflector.hatchet.workflows.track_processing import track_workflow from reflector.logger import logger -from reflector.settings import settings SLOTS = 10 WORKER_NAME = "llm-worker-pool" @@ -19,10 +18,6 @@ POOL = "llm-io" def main(): - if not settings.HATCHET_ENABLED: - logger.error("HATCHET_ENABLED is False, not starting LLM workers") - return - hatchet = HatchetClientManager.get_client() logger.info( diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index 0e0ebc7c..0341304f 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -23,7 +23,6 @@ from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_multitrack_pipeline import ( task_pipeline_multitrack_process, ) -from reflector.settings import settings from reflector.utils.string import NonEmptyString @@ -102,8 +101,8 @@ async def validate_transcript_for_processing( if transcript.locked: return ValidationLocked(detail="Recording is locked") - # hatchet is idempotent anyways + if it wasn't dispatched successfully - if transcript.status == "idle" and not settings.HATCHET_ENABLED: + # Check if recording is ready for processing + if transcript.status == "idle" and not transcript.workflow_run_id: return ValidationNotReady(detail="Recording is not ready for processing") # Check Celery tasks @@ -116,7 +115,8 @@ async def validate_transcript_for_processing( ): return ValidationAlreadyScheduled(detail="already running") - if settings.HATCHET_ENABLED and transcript.workflow_run_id: + # Check Hatchet workflow status if workflow_run_id exists + if transcript.workflow_run_id: try: status = await HatchetClientManager.get_workflow_run_status( transcript.workflow_run_id @@ -181,19 +181,16 @@ async def dispatch_transcript_processing( Returns AsyncResult for Celery tasks, None for Hatchet workflows. """ if isinstance(config, MultitrackProcessingConfig): - # Check if room has use_hatchet=True (overrides env vars) - room_forces_hatchet = False + use_celery = False if config.room_id: room = await rooms_controller.get_by_id(config.room_id) - room_forces_hatchet = room.use_hatchet if room else False + use_celery = room.use_celery if room else False - # Start durable workflow if enabled (Hatchet) - # and if room has use_hatchet=True - use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet + use_hatchet = not use_celery - if room_forces_hatchet: + if use_celery: logger.info( - "Room forces Hatchet workflow", + "Room uses legacy Celery processing", room_id=config.room_id, transcript_id=config.transcript_id, ) diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 0057479d..6fc808d2 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -158,19 +158,10 @@ class Settings(BaseSettings): ZULIP_API_KEY: str | None = None ZULIP_BOT_EMAIL: str | None = None - # Durable workflow orchestration - # Provider: "hatchet" (or "none" to disable) - DURABLE_WORKFLOW_PROVIDER: str = "none" - - # Hatchet workflow orchestration + # Hatchet workflow orchestration (always enabled for multitrack processing) HATCHET_CLIENT_TOKEN: str | None = None HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls HATCHET_DEBUG: bool = False - @property - def HATCHET_ENABLED(self) -> bool: - """True if Hatchet is the active provider.""" - return self.DURABLE_WORKFLOW_PROVIDER == "hatchet" - settings = Settings() diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 466cdef0..a1fa3a3c 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -287,11 +287,12 @@ async def _process_multitrack_recording_inner( room_id=room.id, ) - use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet + use_celery = room and room.use_celery + use_hatchet = not use_celery - if room and room.use_hatchet and not settings.HATCHET_ENABLED: + if use_celery: logger.info( - "Room forces Hatchet workflow", + "Room uses legacy Celery processing", room_id=room.id, transcript_id=transcript.id, ) @@ -810,7 +811,6 @@ async def reprocess_failed_daily_recordings(): ) continue - # Fetch room to check use_hatchet flag room = None if meeting.room_id: room = await rooms_controller.get_by_id(meeting.room_id) @@ -834,10 +834,10 @@ async def reprocess_failed_daily_recordings(): ) continue - use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet + use_celery = room and room.use_celery + use_hatchet = not use_celery if use_hatchet: - # Hatchet requires a transcript for workflow_run_id tracking if not transcript: logger.warning( "No transcript for Hatchet reprocessing, skipping", diff --git a/server/tests/test_hatchet_dispatch.py b/server/tests/test_hatchet_dispatch.py index 57a699ee..157f2b5c 100644 --- a/server/tests/test_hatchet_dispatch.py +++ b/server/tests/test_hatchet_dispatch.py @@ -2,10 +2,9 @@ Tests for Hatchet workflow dispatch and routing logic. These tests verify: -1. Routing to Hatchet when HATCHET_ENABLED=True -2. Replay logic for failed workflows -3. Force flag to cancel and restart -4. Validation prevents concurrent workflows +1. Hatchet workflow validation and replay logic +2. Force flag to cancel and restart workflows +3. Validation prevents concurrent workflows """ from unittest.mock import AsyncMock, patch @@ -34,25 +33,22 @@ async def test_hatchet_validation_blocks_running_workflow(): workflow_run_id="running-workflow-123", ) - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = True + with patch( + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet: + mock_hatchet.get_workflow_run_status = AsyncMock( + return_value=V1TaskStatus.RUNNING + ) with patch( - "reflector.services.transcript_process.HatchetClientManager" - ) as mock_hatchet: - mock_hatchet.get_workflow_run_status = AsyncMock( - return_value=V1TaskStatus.RUNNING - ) + "reflector.services.transcript_process.task_is_scheduled_or_active" + ) as mock_celery_check: + mock_celery_check.return_value = False - with patch( - "reflector.services.transcript_process.task_is_scheduled_or_active" - ) as mock_celery_check: - mock_celery_check.return_value = False + result = await validate_transcript_for_processing(mock_transcript) - result = await validate_transcript_for_processing(mock_transcript) - - assert isinstance(result, ValidationAlreadyScheduled) - assert "running" in result.detail.lower() + assert isinstance(result, ValidationAlreadyScheduled) + assert "running" in result.detail.lower() @pytest.mark.usefixtures("setup_database") @@ -72,24 +68,21 @@ async def test_hatchet_validation_blocks_queued_workflow(): workflow_run_id="queued-workflow-123", ) - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = True + with patch( + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet: + mock_hatchet.get_workflow_run_status = AsyncMock( + return_value=V1TaskStatus.QUEUED + ) with patch( - "reflector.services.transcript_process.HatchetClientManager" - ) as mock_hatchet: - mock_hatchet.get_workflow_run_status = AsyncMock( - return_value=V1TaskStatus.QUEUED - ) + "reflector.services.transcript_process.task_is_scheduled_or_active" + ) as mock_celery_check: + mock_celery_check.return_value = False - with patch( - "reflector.services.transcript_process.task_is_scheduled_or_active" - ) as mock_celery_check: - mock_celery_check.return_value = False + result = await validate_transcript_for_processing(mock_transcript) - result = await validate_transcript_for_processing(mock_transcript) - - assert isinstance(result, ValidationAlreadyScheduled) + assert isinstance(result, ValidationAlreadyScheduled) @pytest.mark.usefixtures("setup_database") @@ -110,25 +103,22 @@ async def test_hatchet_validation_allows_failed_workflow(): recording_id="test-recording-id", ) - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = True + with patch( + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet: + mock_hatchet.get_workflow_run_status = AsyncMock( + return_value=V1TaskStatus.FAILED + ) with patch( - "reflector.services.transcript_process.HatchetClientManager" - ) as mock_hatchet: - mock_hatchet.get_workflow_run_status = AsyncMock( - return_value=V1TaskStatus.FAILED - ) + "reflector.services.transcript_process.task_is_scheduled_or_active" + ) as mock_celery_check: + mock_celery_check.return_value = False - with patch( - "reflector.services.transcript_process.task_is_scheduled_or_active" - ) as mock_celery_check: - mock_celery_check.return_value = False + result = await validate_transcript_for_processing(mock_transcript) - result = await validate_transcript_for_processing(mock_transcript) - - assert isinstance(result, ValidationOk) - assert result.transcript_id == "test-transcript-id" + assert isinstance(result, ValidationOk) + assert result.transcript_id == "test-transcript-id" @pytest.mark.usefixtures("setup_database") @@ -149,24 +139,21 @@ async def test_hatchet_validation_allows_completed_workflow(): recording_id="test-recording-id", ) - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = True + with patch( + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet: + mock_hatchet.get_workflow_run_status = AsyncMock( + return_value=V1TaskStatus.COMPLETED + ) with patch( - "reflector.services.transcript_process.HatchetClientManager" - ) as mock_hatchet: - mock_hatchet.get_workflow_run_status = AsyncMock( - return_value=V1TaskStatus.COMPLETED - ) + "reflector.services.transcript_process.task_is_scheduled_or_active" + ) as mock_celery_check: + mock_celery_check.return_value = False - with patch( - "reflector.services.transcript_process.task_is_scheduled_or_active" - ) as mock_celery_check: - mock_celery_check.return_value = False + result = await validate_transcript_for_processing(mock_transcript) - result = await validate_transcript_for_processing(mock_transcript) - - assert isinstance(result, ValidationOk) + assert isinstance(result, ValidationOk) @pytest.mark.usefixtures("setup_database") @@ -187,26 +174,23 @@ async def test_hatchet_validation_allows_when_status_check_fails(): recording_id="test-recording-id", ) - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = True + with patch( + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet: + # Status check fails (workflow might be deleted) + mock_hatchet.get_workflow_run_status = AsyncMock( + side_effect=ApiException("Workflow not found") + ) with patch( - "reflector.services.transcript_process.HatchetClientManager" - ) as mock_hatchet: - # Status check fails (workflow might be deleted) - mock_hatchet.get_workflow_run_status = AsyncMock( - side_effect=ApiException("Workflow not found") - ) + "reflector.services.transcript_process.task_is_scheduled_or_active" + ) as mock_celery_check: + mock_celery_check.return_value = False - with patch( - "reflector.services.transcript_process.task_is_scheduled_or_active" - ) as mock_celery_check: - mock_celery_check.return_value = False + result = await validate_transcript_for_processing(mock_transcript) - result = await validate_transcript_for_processing(mock_transcript) - - # Should allow processing when we can't get status - assert isinstance(result, ValidationOk) + # Should allow processing when we can't get status + assert isinstance(result, ValidationOk) @pytest.mark.usefixtures("setup_database") @@ -227,47 +211,11 @@ async def test_hatchet_validation_skipped_when_no_workflow_id(): recording_id="test-recording-id", ) - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = True - - with patch( - "reflector.services.transcript_process.HatchetClientManager" - ) as mock_hatchet: - # Should not be called - mock_hatchet.get_workflow_run_status = AsyncMock() - - with patch( - "reflector.services.transcript_process.task_is_scheduled_or_active" - ) as mock_celery_check: - mock_celery_check.return_value = False - - result = await validate_transcript_for_processing(mock_transcript) - - # Should not check Hatchet status - mock_hatchet.get_workflow_run_status.assert_not_called() - assert isinstance(result, ValidationOk) - - -@pytest.mark.usefixtures("setup_database") -@pytest.mark.asyncio -async def test_hatchet_validation_skipped_when_disabled(): - """Test that Hatchet validation is skipped when HATCHET_ENABLED is False.""" - from reflector.services.transcript_process import ( - ValidationOk, - validate_transcript_for_processing, - ) - - mock_transcript = Transcript( - id="test-transcript-id", - name="Test", - status="uploaded", - source_kind="room", - workflow_run_id="some-workflow-123", - recording_id="test-recording-id", - ) - - with patch("reflector.services.transcript_process.settings") as mock_settings: - mock_settings.HATCHET_ENABLED = False # Hatchet disabled + with patch( + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet: + # Should not be called + mock_hatchet.get_workflow_run_status = AsyncMock() with patch( "reflector.services.transcript_process.task_is_scheduled_or_active" @@ -276,7 +224,8 @@ async def test_hatchet_validation_skipped_when_disabled(): result = await validate_transcript_for_processing(mock_transcript) - # Should not check Hatchet at all + # Should not check Hatchet status + mock_hatchet.get_workflow_run_status.assert_not_called() assert isinstance(result, ValidationOk) diff --git a/server/tests/test_transcripts_process.py b/server/tests/test_transcripts_process.py index e3d749df..3e2aed2b 100644 --- a/server/tests/test_transcripts_process.py +++ b/server/tests/test_transcripts_process.py @@ -162,9 +162,24 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client): from datetime import datetime, timezone from reflector.db.recordings import Recording, recordings_controller + from reflector.db.rooms import rooms_controller from reflector.db.transcripts import transcripts_controller - # Create transcript with Daily.co multitrack recording + room = await rooms_controller.add( + name="test-room", + user_id="test-user", + zulip_auto_post=False, + zulip_stream="", + zulip_topic="", + is_locked=False, + room_mode="normal", + recording_type="cloud", + recording_trigger="automatic-2nd-participant", + is_shared=False, + ) + # Force Celery backend for test + await rooms_controller.update(room, {"use_celery": True}) + transcript = await transcripts_controller.add( "", source_kind="room", @@ -172,6 +187,7 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client): target_language="en", user_id="test-user", share_mode="public", + room_id=room.id, ) track_keys = [