diff --git a/server/migrations/versions/3aa20b96d963_drop_use_celery_column.py b/server/migrations/versions/3aa20b96d963_drop_use_celery_column.py new file mode 100644 index 00000000..76a44235 --- /dev/null +++ b/server/migrations/versions/3aa20b96d963_drop_use_celery_column.py @@ -0,0 +1,35 @@ +"""drop_use_celery_column + +Revision ID: 3aa20b96d963 +Revises: e69f08ead8ea +Create Date: 2026-02-05 10:12:44.065279 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "3aa20b96d963" +down_revision: Union[str, None] = "e69f08ead8ea" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + with op.batch_alter_table("room", schema=None) as batch_op: + batch_op.drop_column("use_celery") + + +def downgrade() -> None: + with op.batch_alter_table("room", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "use_celery", + sa.Boolean(), + server_default=sa.text("false"), + nullable=False, + ) + ) diff --git a/server/reflector/db/rooms.py b/server/reflector/db/rooms.py index 308817f0..86f90218 100644 --- a/server/reflector/db/rooms.py +++ b/server/reflector/db/rooms.py @@ -57,12 +57,6 @@ rooms = sqlalchemy.Table( sqlalchemy.String, nullable=False, ), - sqlalchemy.Column( - "use_celery", - sqlalchemy.Boolean, - nullable=False, - server_default=false(), - ), sqlalchemy.Column( "skip_consent", sqlalchemy.Boolean, @@ -97,7 +91,6 @@ class Room(BaseModel): ics_last_sync: datetime | None = None ics_last_etag: str | None = None platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) - use_celery: bool = False skip_consent: bool = False diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index 6a7fdbea..13847a49 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -15,14 +15,10 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException from hatchet_sdk.clients.rest.models import V1TaskStatus from reflector.db.recordings import recordings_controller -from reflector.db.rooms import rooms_controller from reflector.db.transcripts import Transcript, transcripts_controller from reflector.hatchet.client import HatchetClientManager from reflector.logger import logger from reflector.pipelines.main_file_pipeline import task_pipeline_file_process -from reflector.pipelines.main_multitrack_pipeline import ( - task_pipeline_multitrack_process, -) from reflector.utils.string import NonEmptyString @@ -181,124 +177,98 @@ async def dispatch_transcript_processing( Returns AsyncResult for Celery tasks, None for Hatchet workflows. """ if isinstance(config, MultitrackProcessingConfig): - use_celery = False - if config.room_id: - room = await rooms_controller.get_by_id(config.room_id) - use_celery = room.use_celery if room else False - - use_hatchet = not use_celery - - if use_celery: - logger.info( - "Room uses legacy Celery processing", - room_id=config.room_id, - transcript_id=config.transcript_id, + # Multitrack processing always uses Hatchet (no Celery fallback) + # First check if we can replay (outside transaction since it's read-only) + transcript = await transcripts_controller.get_by_id(config.transcript_id) + if transcript and transcript.workflow_run_id and not force: + can_replay = await HatchetClientManager.can_replay( + transcript.workflow_run_id ) - - if use_hatchet: - # First check if we can replay (outside transaction since it's read-only) - transcript = await transcripts_controller.get_by_id(config.transcript_id) - if transcript and transcript.workflow_run_id and not force: - can_replay = await HatchetClientManager.can_replay( - transcript.workflow_run_id + if can_replay: + await HatchetClientManager.replay_workflow(transcript.workflow_run_id) + logger.info( + "Replaying Hatchet workflow", + workflow_id=transcript.workflow_run_id, ) - if can_replay: - await HatchetClientManager.replay_workflow( - transcript.workflow_run_id - ) - logger.info( - "Replaying Hatchet workflow", - workflow_id=transcript.workflow_run_id, - ) - return None - else: - # Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted) - # Log and proceed to start new workflow - try: - status = await HatchetClientManager.get_workflow_run_status( - transcript.workflow_run_id - ) - logger.info( - "Old workflow not replayable, starting new", - old_workflow_id=transcript.workflow_run_id, - old_status=status.value, - ) - except NotFoundException: - # Workflow deleted from Hatchet but ID still in DB - logger.info( - "Old workflow not found in Hatchet, starting new", - old_workflow_id=transcript.workflow_run_id, - ) - - # Force: cancel old workflow if exists - if force and transcript and transcript.workflow_run_id: - try: - await HatchetClientManager.cancel_workflow( - transcript.workflow_run_id - ) - logger.info( - "Cancelled old workflow (--force)", - workflow_id=transcript.workflow_run_id, - ) - except NotFoundException: - logger.info( - "Old workflow already deleted (--force)", - workflow_id=transcript.workflow_run_id, - ) - await transcripts_controller.update( - transcript, {"workflow_run_id": None} - ) - - # Re-fetch and check for concurrent dispatch (optimistic approach). - # No database lock - worst case is duplicate dispatch, but Hatchet - # workflows are idempotent so this is acceptable. - transcript = await transcripts_controller.get_by_id(config.transcript_id) - if transcript and transcript.workflow_run_id: - # Another process started a workflow between validation and now + return None + else: + # Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted) + # Log and proceed to start new workflow try: status = await HatchetClientManager.get_workflow_run_status( transcript.workflow_run_id ) - if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED): - logger.info( - "Concurrent workflow detected, skipping dispatch", - workflow_id=transcript.workflow_run_id, - ) - return None - except ApiException: - # Workflow might be gone (404) or API issue - proceed with new workflow - pass + logger.info( + "Old workflow not replayable, starting new", + old_workflow_id=transcript.workflow_run_id, + old_status=status.value, + ) + except NotFoundException: + # Workflow deleted from Hatchet but ID still in DB + logger.info( + "Old workflow not found in Hatchet, starting new", + old_workflow_id=transcript.workflow_run_id, + ) - workflow_id = await HatchetClientManager.start_workflow( - workflow_name="DiarizationPipeline", - input_data={ - "recording_id": config.recording_id, - "tracks": [{"s3_key": k} for k in config.track_keys], - "bucket_name": config.bucket_name, - "transcript_id": config.transcript_id, - "room_id": config.room_id, - }, - additional_metadata={ - "transcript_id": config.transcript_id, - "recording_id": config.recording_id, - "daily_recording_id": config.recording_id, - }, + # Force: cancel old workflow if exists + if force and transcript and transcript.workflow_run_id: + try: + await HatchetClientManager.cancel_workflow(transcript.workflow_run_id) + logger.info( + "Cancelled old workflow (--force)", + workflow_id=transcript.workflow_run_id, + ) + except NotFoundException: + logger.info( + "Old workflow already deleted (--force)", + workflow_id=transcript.workflow_run_id, + ) + await transcripts_controller.update(transcript, {"workflow_run_id": None}) + + # Re-fetch and check for concurrent dispatch (optimistic approach). + # No database lock - worst case is duplicate dispatch, but Hatchet + # workflows are idempotent so this is acceptable. + transcript = await transcripts_controller.get_by_id(config.transcript_id) + if transcript and transcript.workflow_run_id: + # Another process started a workflow between validation and now + try: + status = await HatchetClientManager.get_workflow_run_status( + transcript.workflow_run_id + ) + if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED): + logger.info( + "Concurrent workflow detected, skipping dispatch", + workflow_id=transcript.workflow_run_id, + ) + return None + except ApiException: + # Workflow might be gone (404) or API issue - proceed with new workflow + pass + + workflow_id = await HatchetClientManager.start_workflow( + workflow_name="DiarizationPipeline", + input_data={ + "recording_id": config.recording_id, + "tracks": [{"s3_key": k} for k in config.track_keys], + "bucket_name": config.bucket_name, + "transcript_id": config.transcript_id, + "room_id": config.room_id, + }, + additional_metadata={ + "transcript_id": config.transcript_id, + "recording_id": config.recording_id, + "daily_recording_id": config.recording_id, + }, + ) + + if transcript: + await transcripts_controller.update( + transcript, {"workflow_run_id": workflow_id} ) - if transcript: - await transcripts_controller.update( - transcript, {"workflow_run_id": workflow_id} - ) + logger.info("Hatchet workflow dispatched", workflow_id=workflow_id) + return None - logger.info("Hatchet workflow dispatched", workflow_id=workflow_id) - return None - - # Celery pipeline (durable workflows disabled) - return task_pipeline_multitrack_process.delay( - transcript_id=config.transcript_id, - bucket_name=config.bucket_name, - track_keys=config.track_keys, - ) elif isinstance(config, FileProcessingConfig): return task_pipeline_file_process.delay(transcript_id=config.transcript_id) else: diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 3a200d4a..4955d568 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -1,7 +1,7 @@ from pydantic.types import PositiveInt from pydantic_settings import BaseSettings, SettingsConfigDict -from reflector.schemas.platform import WHEREBY_PLATFORM, Platform +from reflector.schemas.platform import DAILY_PLATFORM, Platform from reflector.utils.string import NonEmptyString @@ -155,7 +155,7 @@ class Settings(BaseSettings): None # Webhook UUID for this environment. Not used by production code ) # Platform Configuration - DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM + DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM # Zulip integration ZULIP_REALM: str | None = None diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 8d88de43..c1d26d96 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -27,9 +27,6 @@ from reflector.db.transcripts import ( from reflector.hatchet.client import HatchetClientManager from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_live_pipeline import asynctask -from reflector.pipelines.main_multitrack_pipeline import ( - task_pipeline_multitrack_process, -) from reflector.pipelines.topic_processing import EmptyPipeline from reflector.processors import AudioFileWriterProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor @@ -351,49 +348,29 @@ async def _process_multitrack_recording_inner( room_id=room.id, ) - use_celery = room and room.use_celery - use_hatchet = not use_celery - - if use_celery: - logger.info( - "Room uses legacy Celery processing", - room_id=room.id, - transcript_id=transcript.id, - ) - - if use_hatchet: - workflow_id = await HatchetClientManager.start_workflow( - workflow_name="DiarizationPipeline", - input_data={ - "recording_id": recording_id, - "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)], - "bucket_name": bucket_name, - "transcript_id": transcript.id, - "room_id": room.id, - }, - additional_metadata={ - "transcript_id": transcript.id, - "recording_id": recording_id, - "daily_recording_id": recording_id, - }, - ) - logger.info( - "Started Hatchet workflow", - workflow_id=workflow_id, - transcript_id=transcript.id, - ) - - await transcripts_controller.update( - transcript, {"workflow_run_id": workflow_id} - ) - return - - # Celery pipeline (runs when durable workflows disabled) - task_pipeline_multitrack_process.delay( - transcript_id=transcript.id, - bucket_name=bucket_name, - track_keys=filter_cam_audio_tracks(track_keys), + # Multitrack processing always uses Hatchet (no Celery fallback) + workflow_id = await HatchetClientManager.start_workflow( + workflow_name="DiarizationPipeline", + input_data={ + "recording_id": recording_id, + "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)], + "bucket_name": bucket_name, + "transcript_id": transcript.id, + "room_id": room.id, + }, + additional_metadata={ + "transcript_id": transcript.id, + "recording_id": recording_id, + "daily_recording_id": recording_id, + }, ) + logger.info( + "Started Hatchet workflow", + workflow_id=workflow_id, + transcript_id=transcript.id, + ) + + await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id}) @shared_task @@ -1072,66 +1049,43 @@ async def reprocess_failed_daily_recordings(): ) continue - use_celery = room and room.use_celery - use_hatchet = not use_celery - - if use_hatchet: - if not transcript: - logger.warning( - "No transcript for Hatchet reprocessing, skipping", - recording_id=recording.id, - ) - continue - - workflow_id = await HatchetClientManager.start_workflow( - workflow_name="DiarizationPipeline", - input_data={ - "recording_id": recording.id, - "tracks": [ - {"s3_key": k} - for k in filter_cam_audio_tracks(recording.track_keys) - ], - "bucket_name": bucket_name, - "transcript_id": transcript.id, - "room_id": room.id if room else None, - }, - additional_metadata={ - "transcript_id": transcript.id, - "recording_id": recording.id, - "reprocess": True, - }, - ) - await transcripts_controller.update( - transcript, {"workflow_run_id": workflow_id} - ) - - logger.info( - "Queued Daily recording for Hatchet reprocessing", + # Multitrack reprocessing always uses Hatchet (no Celery fallback) + if not transcript: + logger.warning( + "No transcript for Hatchet reprocessing, skipping", recording_id=recording.id, - workflow_id=workflow_id, - room_name=meeting.room_name, - track_count=len(recording.track_keys), - ) - else: - logger.info( - "Queueing Daily recording for Celery reprocessing", - recording_id=recording.id, - room_name=meeting.room_name, - track_count=len(recording.track_keys), - transcript_status=transcript.status if transcript else None, ) + continue - # For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner) - # Reprocessing uses recording.meeting_id directly instead of time-based matching - recording_start_ts = int(recording.recorded_at.timestamp()) + workflow_id = await HatchetClientManager.start_workflow( + workflow_name="DiarizationPipeline", + input_data={ + "recording_id": recording.id, + "tracks": [ + {"s3_key": k} + for k in filter_cam_audio_tracks(recording.track_keys) + ], + "bucket_name": bucket_name, + "transcript_id": transcript.id, + "room_id": room.id if room else None, + }, + additional_metadata={ + "transcript_id": transcript.id, + "recording_id": recording.id, + "reprocess": True, + }, + ) + await transcripts_controller.update( + transcript, {"workflow_run_id": workflow_id} + ) - process_multitrack_recording.delay( - bucket_name=bucket_name, - daily_room_name=meeting.room_name, - recording_id=recording.id, - track_keys=recording.track_keys, - recording_start_ts=recording_start_ts, - ) + logger.info( + "Queued Daily recording for Hatchet reprocessing", + recording_id=recording.id, + workflow_id=workflow_id, + room_name=meeting.room_name, + track_count=len(recording.track_keys), + ) reprocessed_count += 1 diff --git a/server/tests/conftest.py b/server/tests/conftest.py index 1f4469ea..0dace1b9 100644 --- a/server/tests/conftest.py +++ b/server/tests/conftest.py @@ -4,7 +4,7 @@ from unittest.mock import patch import pytest -from reflector.schemas.platform import WHEREBY_PLATFORM +from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM @pytest.fixture(scope="session", autouse=True) @@ -14,6 +14,7 @@ def register_mock_platform(): from reflector.video_platforms.registry import register_platform register_platform(WHEREBY_PLATFORM, MockPlatformClient) + register_platform(DAILY_PLATFORM, MockPlatformClient) yield diff --git a/server/tests/test_transcripts_process.py b/server/tests/test_transcripts_process.py index 3e2aed2b..1adf996e 100644 --- a/server/tests/test_transcripts_process.py +++ b/server/tests/test_transcripts_process.py @@ -1,6 +1,6 @@ import asyncio import time -from unittest.mock import patch +from unittest.mock import AsyncMock, patch import pytest from httpx import ASGITransport, AsyncClient @@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client): "reflector.services.transcript_process.task_pipeline_file_process" ) as mock_file_pipeline, patch( - "reflector.services.transcript_process.task_pipeline_multitrack_process" - ) as mock_multitrack_pipeline, + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet, ): response = await client.post(f"/transcripts/{transcript.id}/process") assert response.status_code == 200 assert response.json()["status"] == "ok" - # Whereby recordings should use file pipeline + # Whereby recordings should use file pipeline, not Hatchet mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id) - mock_multitrack_pipeline.delay.assert_not_called() + mock_hatchet.start_workflow.assert_not_called() @pytest.mark.usefixtures("setup_database") @@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client): recording_trigger="automatic-2nd-participant", is_shared=False, ) - # Force Celery backend for test - await rooms_controller.update(room, {"use_celery": True}) transcript = await transcripts_controller.add( "", @@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client): "reflector.services.transcript_process.task_pipeline_file_process" ) as mock_file_pipeline, patch( - "reflector.services.transcript_process.task_pipeline_multitrack_process" - ) as mock_multitrack_pipeline, + "reflector.services.transcript_process.HatchetClientManager" + ) as mock_hatchet, ): + mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id") + response = await client.post(f"/transcripts/{transcript.id}/process") assert response.status_code == 200 assert response.json()["status"] == "ok" - # Daily.co multitrack recordings should use multitrack pipeline - mock_multitrack_pipeline.delay.assert_called_once_with( - transcript_id=transcript.id, - bucket_name="daily-bucket", - track_keys=track_keys, - ) + # Daily.co multitrack recordings should use Hatchet workflow + mock_hatchet.start_workflow.assert_called_once() + call_kwargs = mock_hatchet.start_workflow.call_args.kwargs + assert call_kwargs["workflow_name"] == "DiarizationPipeline" + assert call_kwargs["input_data"]["transcript_id"] == transcript.id + assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket" + assert call_kwargs["input_data"]["tracks"] == [ + {"s3_key": k} for k in track_keys + ] mock_file_pipeline.delay.assert_not_called()