Compare commits

...

3 Commits

Author SHA1 Message Date
Igor Loskutov
8373874cbd feat: add LLM_ENABLE_THINKING env var for thinking-mode LLMs
Some LLMs (e.g. GLM-4.5-Air) default to thinking mode which returns
content in reasoning_content instead of content field, breaking
structured output parsing. This setting passes enable_thinking through
extra_body to control the behavior per deployment.

Three states: None (default, don't send), True, False.
2026-02-06 20:19:53 -05:00
cd2255cfbc chore(main): release 0.33.0 (#847) 2026-02-06 18:12:06 -05:00
15ab2e306e feat: Daily+hatchet default (#846)
* feat: set Daily as default video platform

Daily.co has been battle-tested and is ready to be the default.
Whereby remains available for rooms that explicitly set it.

* feat: enforce Hatchet for all multitrack processing

Remove use_celery option from rooms - multitrack (Daily) recordings
now always use Hatchet workflows. Celery remains for single-track
(Whereby) file processing only.

- Remove use_celery column from room table
- Simplify dispatch logic to always use Hatchet for multitracks
- Update tests to mock Hatchet instead of Celery

* fix: update whereby test to patch Hatchet instead of removed Celery import

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-05 18:38:08 -05:00
10 changed files with 266 additions and 239 deletions

View File

@@ -1,5 +1,17 @@
# Changelog # Changelog
## [0.33.0](https://github.com/Monadical-SAS/reflector/compare/v0.32.2...v0.33.0) (2026-02-05)
### Features
* Daily+hatchet default ([#846](https://github.com/Monadical-SAS/reflector/issues/846)) ([15ab2e3](https://github.com/Monadical-SAS/reflector/commit/15ab2e306eacf575494b4b5d2b2ad779d44a1c7f))
### Bug Fixes
* websocket tests ([#825](https://github.com/Monadical-SAS/reflector/issues/825)) ([1ce1c7a](https://github.com/Monadical-SAS/reflector/commit/1ce1c7a910b6c374115d2437b17f9d288ef094dc))
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03) ## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)

View File

@@ -0,0 +1,35 @@
"""drop_use_celery_column
Revision ID: 3aa20b96d963
Revises: e69f08ead8ea
Create Date: 2026-02-05 10:12:44.065279
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "3aa20b96d963"
down_revision: Union[str, None] = "e69f08ead8ea"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_celery")
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_celery",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)

View File

@@ -57,12 +57,6 @@ rooms = sqlalchemy.Table(
sqlalchemy.String, sqlalchemy.String,
nullable=False, nullable=False,
), ),
sqlalchemy.Column(
"use_celery",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Column( sqlalchemy.Column(
"skip_consent", "skip_consent",
sqlalchemy.Boolean, sqlalchemy.Boolean,
@@ -97,7 +91,6 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None ics_last_sync: datetime | None = None
ics_last_etag: str | None = None ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM) platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_celery: bool = False
skip_consent: bool = False skip_consent: bool = False

View File

@@ -206,6 +206,12 @@ class LLM:
"""Configure llamaindex Settings with OpenAILike LLM""" """Configure llamaindex Settings with OpenAILike LLM"""
session_id = llm_session_id.get() or f"fallback-{uuid4().hex}" session_id = llm_session_id.get() or f"fallback-{uuid4().hex}"
extra_body: dict = {"litellm_session_id": session_id}
# Only send enable_thinking when explicitly set (not None/unset).
# Models that don't support it will ignore the param.
if self.settings_obj.LLM_ENABLE_THINKING is not None:
extra_body["enable_thinking"] = self.settings_obj.LLM_ENABLE_THINKING
Settings.llm = OpenAILike( Settings.llm = OpenAILike(
model=self.model_name, model=self.model_name,
api_base=self.url, api_base=self.url,
@@ -215,7 +221,7 @@ class LLM:
is_function_calling_model=False, is_function_calling_model=False,
temperature=self.temperature, temperature=self.temperature,
max_tokens=self.max_tokens, max_tokens=self.max_tokens,
additional_kwargs={"extra_body": {"litellm_session_id": session_id}}, additional_kwargs={"extra_body": extra_body},
) )
async def get_response( async def get_response(

View File

@@ -15,14 +15,10 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -181,124 +177,98 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows. Returns AsyncResult for Celery tasks, None for Hatchet workflows.
""" """
if isinstance(config, MultitrackProcessingConfig): if isinstance(config, MultitrackProcessingConfig):
use_celery = False # Multitrack processing always uses Hatchet (no Celery fallback)
if config.room_id: # First check if we can replay (outside transaction since it's read-only)
room = await rooms_controller.get_by_id(config.room_id) transcript = await transcripts_controller.get_by_id(config.transcript_id)
use_celery = room.use_celery if room else False if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
use_hatchet = not use_celery transcript.workflow_run_id
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=config.room_id,
transcript_id=config.transcript_id,
) )
if can_replay:
if use_hatchet: await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
# First check if we can replay (outside transaction since it's read-only) logger.info(
transcript = await transcripts_controller.get_by_id(config.transcript_id) "Replaying Hatchet workflow",
if transcript and transcript.workflow_run_id and not force: workflow_id=transcript.workflow_run_id,
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
) )
if can_replay: return None
await HatchetClientManager.replay_workflow( else:
transcript.workflow_run_id # Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
) # Log and proceed to start new workflow
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
else:
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
logger.info(
"Old workflow not replayable, starting new",
old_workflow_id=transcript.workflow_run_id,
old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
try:
await HatchetClientManager.cancel_workflow(
transcript.workflow_run_id
)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try: try:
status = await HatchetClientManager.get_workflow_run_status( status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id transcript.workflow_run_id
) )
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED): logger.info(
logger.info( "Old workflow not replayable, starting new",
"Concurrent workflow detected, skipping dispatch", old_workflow_id=transcript.workflow_run_id,
workflow_id=transcript.workflow_run_id, old_status=status.value,
) )
return None except NotFoundException:
except ApiException: # Workflow deleted from Hatchet but ID still in DB
# Workflow might be gone (404) or API issue - proceed with new workflow logger.info(
pass "Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
workflow_id = await HatchetClientManager.start_workflow( # Force: cancel old workflow if exists
workflow_name="DiarizationPipeline", if force and transcript and transcript.workflow_run_id:
input_data={ try:
"recording_id": config.recording_id, await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
"tracks": [{"s3_key": k} for k in config.track_keys], logger.info(
"bucket_name": config.bucket_name, "Cancelled old workflow (--force)",
"transcript_id": config.transcript_id, workflow_id=transcript.workflow_run_id,
"room_id": config.room_id, )
}, except NotFoundException:
additional_metadata={ logger.info(
"transcript_id": config.transcript_id, "Old workflow already deleted (--force)",
"recording_id": config.recording_id, workflow_id=transcript.workflow_run_id,
"daily_recording_id": config.recording_id, )
}, await transcripts_controller.update(transcript, {"workflow_run_id": None})
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
) )
if transcript: logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
await transcripts_controller.update( return None
transcript, {"workflow_run_id": workflow_id}
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig): elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id) return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else: else:

View File

@@ -1,7 +1,7 @@
from pydantic.types import PositiveInt from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform from reflector.schemas.platform import DAILY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
@@ -75,6 +75,7 @@ class Settings(BaseSettings):
LLM_URL: str | None = None LLM_URL: str | None = None
LLM_API_KEY: str | None = None LLM_API_KEY: str | None = None
LLM_CONTEXT_WINDOW: int = 16000 LLM_CONTEXT_WINDOW: int = 16000
LLM_ENABLE_THINKING: bool | None = None
LLM_PARSE_MAX_RETRIES: int = ( LLM_PARSE_MAX_RETRIES: int = (
3 # Max retries for JSON/validation errors (total attempts = retries + 1) 3 # Max retries for JSON/validation errors (total attempts = retries + 1)
@@ -155,7 +156,7 @@ class Settings(BaseSettings):
None # Webhook UUID for this environment. Not used by production code None # Webhook UUID for this environment. Not used by production code
) )
# Platform Configuration # Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
# Zulip integration # Zulip integration
ZULIP_REALM: str | None = None ZULIP_REALM: str | None = None

View File

@@ -27,9 +27,6 @@ from reflector.db.transcripts import (
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
@@ -351,49 +348,29 @@ async def _process_multitrack_recording_inner(
room_id=room.id, room_id=room.id,
) )
use_celery = room and room.use_celery # Multitrack processing always uses Hatchet (no Celery fallback)
use_hatchet = not use_celery workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
if use_celery: input_data={
logger.info( "recording_id": recording_id,
"Room uses legacy Celery processing", "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
room_id=room.id, "bucket_name": bucket_name,
transcript_id=transcript.id, "transcript_id": transcript.id,
) "room_id": room.id,
},
if use_hatchet: additional_metadata={
workflow_id = await HatchetClientManager.start_workflow( "transcript_id": transcript.id,
workflow_name="DiarizationPipeline", "recording_id": recording_id,
input_data={ "daily_recording_id": recording_id,
"recording_id": recording_id, },
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
) )
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
@shared_task @shared_task
@@ -1072,66 +1049,43 @@ async def reprocess_failed_daily_recordings():
) )
continue continue
use_celery = room and room.use_celery # Multitrack reprocessing always uses Hatchet (no Celery fallback)
use_hatchet = not use_celery if not transcript:
logger.warning(
if use_hatchet: "No transcript for Hatchet reprocessing, skipping",
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id, recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
) )
continue
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner) workflow_id = await HatchetClientManager.start_workflow(
# Reprocessing uses recording.meeting_id directly instead of time-based matching workflow_name="DiarizationPipeline",
recording_start_ts = int(recording.recorded_at.timestamp()) input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
process_multitrack_recording.delay( logger.info(
bucket_name=bucket_name, "Queued Daily recording for Hatchet reprocessing",
daily_room_name=meeting.room_name, recording_id=recording.id,
recording_id=recording.id, workflow_id=workflow_id,
track_keys=recording.track_keys, room_name=meeting.room_name,
recording_start_ts=recording_start_ts, track_count=len(recording.track_keys),
) )
reprocessed_count += 1 reprocessed_count += 1

View File

@@ -4,7 +4,7 @@ from unittest.mock import patch
import pytest import pytest
from reflector.schemas.platform import WHEREBY_PLATFORM from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
@pytest.fixture(scope="session", autouse=True) @pytest.fixture(scope="session", autouse=True)
@@ -14,6 +14,7 @@ def register_mock_platform():
from reflector.video_platforms.registry import register_platform from reflector.video_platforms.registry import register_platform
register_platform(WHEREBY_PLATFORM, MockPlatformClient) register_platform(WHEREBY_PLATFORM, MockPlatformClient)
register_platform(DAILY_PLATFORM, MockPlatformClient)
yield yield

View File

@@ -8,6 +8,7 @@ from pydantic import BaseModel, Field
from workflows.errors import WorkflowRuntimeError, WorkflowTimeoutError from workflows.errors import WorkflowRuntimeError, WorkflowTimeoutError
from reflector.llm import LLM, LLMParseError, StructuredOutputWorkflow from reflector.llm import LLM, LLMParseError, StructuredOutputWorkflow
from reflector.settings import Settings
from reflector.utils.retry import RetryException from reflector.utils.retry import RetryException
@@ -26,6 +27,57 @@ def make_completion_response(text: str):
return response return response
class TestLLMEnableThinking:
"""Test that LLM_ENABLE_THINKING setting is passed through to OpenAILike"""
def test_enable_thinking_false_passed_in_extra_body(self):
"""enable_thinking=False should be in extra_body when LLM_ENABLE_THINKING=False"""
settings = Settings(
LLM_ENABLE_THINKING=False,
LLM_URL="http://fake",
LLM_API_KEY="fake",
)
with (
patch("reflector.llm.OpenAILike") as mock_openai,
patch("reflector.llm.Settings"),
):
LLM(settings=settings)
extra_body = mock_openai.call_args.kwargs["additional_kwargs"]["extra_body"]
assert extra_body["enable_thinking"] is False
def test_enable_thinking_true_passed_in_extra_body(self):
"""enable_thinking=True should be in extra_body when LLM_ENABLE_THINKING=True"""
settings = Settings(
LLM_ENABLE_THINKING=True,
LLM_URL="http://fake",
LLM_API_KEY="fake",
)
with (
patch("reflector.llm.OpenAILike") as mock_openai,
patch("reflector.llm.Settings"),
):
LLM(settings=settings)
extra_body = mock_openai.call_args.kwargs["additional_kwargs"]["extra_body"]
assert extra_body["enable_thinking"] is True
def test_enable_thinking_none_not_in_extra_body(self):
"""enable_thinking should not be in extra_body when LLM_ENABLE_THINKING is None (default)"""
settings = Settings(
LLM_URL="http://fake",
LLM_API_KEY="fake",
)
with (
patch("reflector.llm.OpenAILike") as mock_openai,
patch("reflector.llm.Settings"),
):
LLM(settings=settings)
extra_body = mock_openai.call_args.kwargs["additional_kwargs"]["extra_body"]
assert "enable_thinking" not in extra_body
class TestLLMParseErrorRecovery: class TestLLMParseErrorRecovery:
"""Test parse error recovery with Workflow feedback loop""" """Test parse error recovery with Workflow feedback loop"""

View File

@@ -1,6 +1,6 @@
import asyncio import asyncio
import time import time
from unittest.mock import patch from unittest.mock import AsyncMock, patch
import pytest import pytest
from httpx import ASGITransport, AsyncClient from httpx import ASGITransport, AsyncClient
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process" "reflector.services.transcript_process.HatchetClientManager"
) as mock_multitrack_pipeline, ) as mock_hatchet,
): ):
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Whereby recordings should use file pipeline # Whereby recordings should use file pipeline, not Hatchet
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id) mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
mock_multitrack_pipeline.delay.assert_not_called() mock_hatchet.start_workflow.assert_not_called()
@pytest.mark.usefixtures("setup_database") @pytest.mark.usefixtures("setup_database")
@@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
recording_trigger="automatic-2nd-participant", recording_trigger="automatic-2nd-participant",
is_shared=False, is_shared=False,
) )
# Force Celery backend for test
await rooms_controller.update(room, {"use_celery": True})
transcript = await transcripts_controller.add( transcript = await transcripts_controller.add(
"", "",
@@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
"reflector.services.transcript_process.task_pipeline_file_process" "reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline, ) as mock_file_pipeline,
patch( patch(
"reflector.services.transcript_process.task_pipeline_multitrack_process" "reflector.services.transcript_process.HatchetClientManager"
) as mock_multitrack_pipeline, ) as mock_hatchet,
): ):
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
response = await client.post(f"/transcripts/{transcript.id}/process") response = await client.post(f"/transcripts/{transcript.id}/process")
assert response.status_code == 200 assert response.status_code == 200
assert response.json()["status"] == "ok" assert response.json()["status"] == "ok"
# Daily.co multitrack recordings should use multitrack pipeline # Daily.co multitrack recordings should use Hatchet workflow
mock_multitrack_pipeline.delay.assert_called_once_with( mock_hatchet.start_workflow.assert_called_once()
transcript_id=transcript.id, call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
bucket_name="daily-bucket", assert call_kwargs["workflow_name"] == "DiarizationPipeline"
track_keys=track_keys, assert call_kwargs["input_data"]["transcript_id"] == transcript.id
) assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
assert call_kwargs["input_data"]["tracks"] == [
{"s3_key": k} for k in track_keys
]
mock_file_pipeline.delay.assert_not_called() mock_file_pipeline.delay.assert_not_called()