mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-06 02:36:47 +00:00
Compare commits
2 Commits
fix-presen
...
release-pl
| Author | SHA1 | Date | |
|---|---|---|---|
| 46a10af349 | |||
| 15ab2e306e |
12
CHANGELOG.md
12
CHANGELOG.md
@@ -1,5 +1,17 @@
|
||||
# Changelog
|
||||
|
||||
## [0.33.0](https://github.com/Monadical-SAS/reflector/compare/v0.32.2...v0.33.0) (2026-02-05)
|
||||
|
||||
|
||||
### Features
|
||||
|
||||
* Daily+hatchet default ([#846](https://github.com/Monadical-SAS/reflector/issues/846)) ([15ab2e3](https://github.com/Monadical-SAS/reflector/commit/15ab2e306eacf575494b4b5d2b2ad779d44a1c7f))
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* websocket tests ([#825](https://github.com/Monadical-SAS/reflector/issues/825)) ([1ce1c7a](https://github.com/Monadical-SAS/reflector/commit/1ce1c7a910b6c374115d2437b17f9d288ef094dc))
|
||||
|
||||
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
"""drop_use_celery_column
|
||||
|
||||
Revision ID: 3aa20b96d963
|
||||
Revises: e69f08ead8ea
|
||||
Create Date: 2026-02-05 10:12:44.065279
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "3aa20b96d963"
|
||||
down_revision: Union[str, None] = "e69f08ead8ea"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
with op.batch_alter_table("room", schema=None) as batch_op:
|
||||
batch_op.drop_column("use_celery")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
with op.batch_alter_table("room", schema=None) as batch_op:
|
||||
batch_op.add_column(
|
||||
sa.Column(
|
||||
"use_celery",
|
||||
sa.Boolean(),
|
||||
server_default=sa.text("false"),
|
||||
nullable=False,
|
||||
)
|
||||
)
|
||||
@@ -57,12 +57,6 @@ rooms = sqlalchemy.Table(
|
||||
sqlalchemy.String,
|
||||
nullable=False,
|
||||
),
|
||||
sqlalchemy.Column(
|
||||
"use_celery",
|
||||
sqlalchemy.Boolean,
|
||||
nullable=False,
|
||||
server_default=false(),
|
||||
),
|
||||
sqlalchemy.Column(
|
||||
"skip_consent",
|
||||
sqlalchemy.Boolean,
|
||||
@@ -97,7 +91,6 @@ class Room(BaseModel):
|
||||
ics_last_sync: datetime | None = None
|
||||
ics_last_etag: str | None = None
|
||||
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
|
||||
use_celery: bool = False
|
||||
skip_consent: bool = False
|
||||
|
||||
|
||||
|
||||
@@ -15,14 +15,10 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
|
||||
from hatchet_sdk.clients.rest.models import V1TaskStatus
|
||||
|
||||
from reflector.db.recordings import recordings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.transcripts import Transcript, transcripts_controller
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.logger import logger
|
||||
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
||||
from reflector.pipelines.main_multitrack_pipeline import (
|
||||
task_pipeline_multitrack_process,
|
||||
)
|
||||
from reflector.utils.string import NonEmptyString
|
||||
|
||||
|
||||
@@ -181,124 +177,98 @@ async def dispatch_transcript_processing(
|
||||
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
|
||||
"""
|
||||
if isinstance(config, MultitrackProcessingConfig):
|
||||
use_celery = False
|
||||
if config.room_id:
|
||||
room = await rooms_controller.get_by_id(config.room_id)
|
||||
use_celery = room.use_celery if room else False
|
||||
|
||||
use_hatchet = not use_celery
|
||||
|
||||
if use_celery:
|
||||
logger.info(
|
||||
"Room uses legacy Celery processing",
|
||||
room_id=config.room_id,
|
||||
transcript_id=config.transcript_id,
|
||||
# Multitrack processing always uses Hatchet (no Celery fallback)
|
||||
# First check if we can replay (outside transaction since it's read-only)
|
||||
transcript = await transcripts_controller.get_by_id(config.transcript_id)
|
||||
if transcript and transcript.workflow_run_id and not force:
|
||||
can_replay = await HatchetClientManager.can_replay(
|
||||
transcript.workflow_run_id
|
||||
)
|
||||
|
||||
if use_hatchet:
|
||||
# First check if we can replay (outside transaction since it's read-only)
|
||||
transcript = await transcripts_controller.get_by_id(config.transcript_id)
|
||||
if transcript and transcript.workflow_run_id and not force:
|
||||
can_replay = await HatchetClientManager.can_replay(
|
||||
transcript.workflow_run_id
|
||||
if can_replay:
|
||||
await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
|
||||
logger.info(
|
||||
"Replaying Hatchet workflow",
|
||||
workflow_id=transcript.workflow_run_id,
|
||||
)
|
||||
if can_replay:
|
||||
await HatchetClientManager.replay_workflow(
|
||||
transcript.workflow_run_id
|
||||
)
|
||||
logger.info(
|
||||
"Replaying Hatchet workflow",
|
||||
workflow_id=transcript.workflow_run_id,
|
||||
)
|
||||
return None
|
||||
else:
|
||||
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
|
||||
# Log and proceed to start new workflow
|
||||
try:
|
||||
status = await HatchetClientManager.get_workflow_run_status(
|
||||
transcript.workflow_run_id
|
||||
)
|
||||
logger.info(
|
||||
"Old workflow not replayable, starting new",
|
||||
old_workflow_id=transcript.workflow_run_id,
|
||||
old_status=status.value,
|
||||
)
|
||||
except NotFoundException:
|
||||
# Workflow deleted from Hatchet but ID still in DB
|
||||
logger.info(
|
||||
"Old workflow not found in Hatchet, starting new",
|
||||
old_workflow_id=transcript.workflow_run_id,
|
||||
)
|
||||
|
||||
# Force: cancel old workflow if exists
|
||||
if force and transcript and transcript.workflow_run_id:
|
||||
try:
|
||||
await HatchetClientManager.cancel_workflow(
|
||||
transcript.workflow_run_id
|
||||
)
|
||||
logger.info(
|
||||
"Cancelled old workflow (--force)",
|
||||
workflow_id=transcript.workflow_run_id,
|
||||
)
|
||||
except NotFoundException:
|
||||
logger.info(
|
||||
"Old workflow already deleted (--force)",
|
||||
workflow_id=transcript.workflow_run_id,
|
||||
)
|
||||
await transcripts_controller.update(
|
||||
transcript, {"workflow_run_id": None}
|
||||
)
|
||||
|
||||
# Re-fetch and check for concurrent dispatch (optimistic approach).
|
||||
# No database lock - worst case is duplicate dispatch, but Hatchet
|
||||
# workflows are idempotent so this is acceptable.
|
||||
transcript = await transcripts_controller.get_by_id(config.transcript_id)
|
||||
if transcript and transcript.workflow_run_id:
|
||||
# Another process started a workflow between validation and now
|
||||
return None
|
||||
else:
|
||||
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
|
||||
# Log and proceed to start new workflow
|
||||
try:
|
||||
status = await HatchetClientManager.get_workflow_run_status(
|
||||
transcript.workflow_run_id
|
||||
)
|
||||
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
|
||||
logger.info(
|
||||
"Concurrent workflow detected, skipping dispatch",
|
||||
workflow_id=transcript.workflow_run_id,
|
||||
)
|
||||
return None
|
||||
except ApiException:
|
||||
# Workflow might be gone (404) or API issue - proceed with new workflow
|
||||
pass
|
||||
logger.info(
|
||||
"Old workflow not replayable, starting new",
|
||||
old_workflow_id=transcript.workflow_run_id,
|
||||
old_status=status.value,
|
||||
)
|
||||
except NotFoundException:
|
||||
# Workflow deleted from Hatchet but ID still in DB
|
||||
logger.info(
|
||||
"Old workflow not found in Hatchet, starting new",
|
||||
old_workflow_id=transcript.workflow_run_id,
|
||||
)
|
||||
|
||||
workflow_id = await HatchetClientManager.start_workflow(
|
||||
workflow_name="DiarizationPipeline",
|
||||
input_data={
|
||||
"recording_id": config.recording_id,
|
||||
"tracks": [{"s3_key": k} for k in config.track_keys],
|
||||
"bucket_name": config.bucket_name,
|
||||
"transcript_id": config.transcript_id,
|
||||
"room_id": config.room_id,
|
||||
},
|
||||
additional_metadata={
|
||||
"transcript_id": config.transcript_id,
|
||||
"recording_id": config.recording_id,
|
||||
"daily_recording_id": config.recording_id,
|
||||
},
|
||||
# Force: cancel old workflow if exists
|
||||
if force and transcript and transcript.workflow_run_id:
|
||||
try:
|
||||
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
|
||||
logger.info(
|
||||
"Cancelled old workflow (--force)",
|
||||
workflow_id=transcript.workflow_run_id,
|
||||
)
|
||||
except NotFoundException:
|
||||
logger.info(
|
||||
"Old workflow already deleted (--force)",
|
||||
workflow_id=transcript.workflow_run_id,
|
||||
)
|
||||
await transcripts_controller.update(transcript, {"workflow_run_id": None})
|
||||
|
||||
# Re-fetch and check for concurrent dispatch (optimistic approach).
|
||||
# No database lock - worst case is duplicate dispatch, but Hatchet
|
||||
# workflows are idempotent so this is acceptable.
|
||||
transcript = await transcripts_controller.get_by_id(config.transcript_id)
|
||||
if transcript and transcript.workflow_run_id:
|
||||
# Another process started a workflow between validation and now
|
||||
try:
|
||||
status = await HatchetClientManager.get_workflow_run_status(
|
||||
transcript.workflow_run_id
|
||||
)
|
||||
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
|
||||
logger.info(
|
||||
"Concurrent workflow detected, skipping dispatch",
|
||||
workflow_id=transcript.workflow_run_id,
|
||||
)
|
||||
return None
|
||||
except ApiException:
|
||||
# Workflow might be gone (404) or API issue - proceed with new workflow
|
||||
pass
|
||||
|
||||
workflow_id = await HatchetClientManager.start_workflow(
|
||||
workflow_name="DiarizationPipeline",
|
||||
input_data={
|
||||
"recording_id": config.recording_id,
|
||||
"tracks": [{"s3_key": k} for k in config.track_keys],
|
||||
"bucket_name": config.bucket_name,
|
||||
"transcript_id": config.transcript_id,
|
||||
"room_id": config.room_id,
|
||||
},
|
||||
additional_metadata={
|
||||
"transcript_id": config.transcript_id,
|
||||
"recording_id": config.recording_id,
|
||||
"daily_recording_id": config.recording_id,
|
||||
},
|
||||
)
|
||||
|
||||
if transcript:
|
||||
await transcripts_controller.update(
|
||||
transcript, {"workflow_run_id": workflow_id}
|
||||
)
|
||||
|
||||
if transcript:
|
||||
await transcripts_controller.update(
|
||||
transcript, {"workflow_run_id": workflow_id}
|
||||
)
|
||||
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
|
||||
return None
|
||||
|
||||
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
|
||||
return None
|
||||
|
||||
# Celery pipeline (durable workflows disabled)
|
||||
return task_pipeline_multitrack_process.delay(
|
||||
transcript_id=config.transcript_id,
|
||||
bucket_name=config.bucket_name,
|
||||
track_keys=config.track_keys,
|
||||
)
|
||||
elif isinstance(config, FileProcessingConfig):
|
||||
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
|
||||
else:
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
from pydantic.types import PositiveInt
|
||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||
|
||||
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
|
||||
from reflector.schemas.platform import DAILY_PLATFORM, Platform
|
||||
from reflector.utils.string import NonEmptyString
|
||||
|
||||
|
||||
@@ -155,7 +155,7 @@ class Settings(BaseSettings):
|
||||
None # Webhook UUID for this environment. Not used by production code
|
||||
)
|
||||
# Platform Configuration
|
||||
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
|
||||
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
|
||||
|
||||
# Zulip integration
|
||||
ZULIP_REALM: str | None = None
|
||||
|
||||
@@ -27,9 +27,6 @@ from reflector.db.transcripts import (
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
||||
from reflector.pipelines.main_live_pipeline import asynctask
|
||||
from reflector.pipelines.main_multitrack_pipeline import (
|
||||
task_pipeline_multitrack_process,
|
||||
)
|
||||
from reflector.pipelines.topic_processing import EmptyPipeline
|
||||
from reflector.processors import AudioFileWriterProcessor
|
||||
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
|
||||
@@ -351,49 +348,29 @@ async def _process_multitrack_recording_inner(
|
||||
room_id=room.id,
|
||||
)
|
||||
|
||||
use_celery = room and room.use_celery
|
||||
use_hatchet = not use_celery
|
||||
|
||||
if use_celery:
|
||||
logger.info(
|
||||
"Room uses legacy Celery processing",
|
||||
room_id=room.id,
|
||||
transcript_id=transcript.id,
|
||||
)
|
||||
|
||||
if use_hatchet:
|
||||
workflow_id = await HatchetClientManager.start_workflow(
|
||||
workflow_name="DiarizationPipeline",
|
||||
input_data={
|
||||
"recording_id": recording_id,
|
||||
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
|
||||
"bucket_name": bucket_name,
|
||||
"transcript_id": transcript.id,
|
||||
"room_id": room.id,
|
||||
},
|
||||
additional_metadata={
|
||||
"transcript_id": transcript.id,
|
||||
"recording_id": recording_id,
|
||||
"daily_recording_id": recording_id,
|
||||
},
|
||||
)
|
||||
logger.info(
|
||||
"Started Hatchet workflow",
|
||||
workflow_id=workflow_id,
|
||||
transcript_id=transcript.id,
|
||||
)
|
||||
|
||||
await transcripts_controller.update(
|
||||
transcript, {"workflow_run_id": workflow_id}
|
||||
)
|
||||
return
|
||||
|
||||
# Celery pipeline (runs when durable workflows disabled)
|
||||
task_pipeline_multitrack_process.delay(
|
||||
transcript_id=transcript.id,
|
||||
bucket_name=bucket_name,
|
||||
track_keys=filter_cam_audio_tracks(track_keys),
|
||||
# Multitrack processing always uses Hatchet (no Celery fallback)
|
||||
workflow_id = await HatchetClientManager.start_workflow(
|
||||
workflow_name="DiarizationPipeline",
|
||||
input_data={
|
||||
"recording_id": recording_id,
|
||||
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
|
||||
"bucket_name": bucket_name,
|
||||
"transcript_id": transcript.id,
|
||||
"room_id": room.id,
|
||||
},
|
||||
additional_metadata={
|
||||
"transcript_id": transcript.id,
|
||||
"recording_id": recording_id,
|
||||
"daily_recording_id": recording_id,
|
||||
},
|
||||
)
|
||||
logger.info(
|
||||
"Started Hatchet workflow",
|
||||
workflow_id=workflow_id,
|
||||
transcript_id=transcript.id,
|
||||
)
|
||||
|
||||
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
|
||||
|
||||
|
||||
@shared_task
|
||||
@@ -1072,66 +1049,43 @@ async def reprocess_failed_daily_recordings():
|
||||
)
|
||||
continue
|
||||
|
||||
use_celery = room and room.use_celery
|
||||
use_hatchet = not use_celery
|
||||
|
||||
if use_hatchet:
|
||||
if not transcript:
|
||||
logger.warning(
|
||||
"No transcript for Hatchet reprocessing, skipping",
|
||||
recording_id=recording.id,
|
||||
)
|
||||
continue
|
||||
|
||||
workflow_id = await HatchetClientManager.start_workflow(
|
||||
workflow_name="DiarizationPipeline",
|
||||
input_data={
|
||||
"recording_id": recording.id,
|
||||
"tracks": [
|
||||
{"s3_key": k}
|
||||
for k in filter_cam_audio_tracks(recording.track_keys)
|
||||
],
|
||||
"bucket_name": bucket_name,
|
||||
"transcript_id": transcript.id,
|
||||
"room_id": room.id if room else None,
|
||||
},
|
||||
additional_metadata={
|
||||
"transcript_id": transcript.id,
|
||||
"recording_id": recording.id,
|
||||
"reprocess": True,
|
||||
},
|
||||
)
|
||||
await transcripts_controller.update(
|
||||
transcript, {"workflow_run_id": workflow_id}
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Queued Daily recording for Hatchet reprocessing",
|
||||
# Multitrack reprocessing always uses Hatchet (no Celery fallback)
|
||||
if not transcript:
|
||||
logger.warning(
|
||||
"No transcript for Hatchet reprocessing, skipping",
|
||||
recording_id=recording.id,
|
||||
workflow_id=workflow_id,
|
||||
room_name=meeting.room_name,
|
||||
track_count=len(recording.track_keys),
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
"Queueing Daily recording for Celery reprocessing",
|
||||
recording_id=recording.id,
|
||||
room_name=meeting.room_name,
|
||||
track_count=len(recording.track_keys),
|
||||
transcript_status=transcript.status if transcript else None,
|
||||
)
|
||||
continue
|
||||
|
||||
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
|
||||
# Reprocessing uses recording.meeting_id directly instead of time-based matching
|
||||
recording_start_ts = int(recording.recorded_at.timestamp())
|
||||
workflow_id = await HatchetClientManager.start_workflow(
|
||||
workflow_name="DiarizationPipeline",
|
||||
input_data={
|
||||
"recording_id": recording.id,
|
||||
"tracks": [
|
||||
{"s3_key": k}
|
||||
for k in filter_cam_audio_tracks(recording.track_keys)
|
||||
],
|
||||
"bucket_name": bucket_name,
|
||||
"transcript_id": transcript.id,
|
||||
"room_id": room.id if room else None,
|
||||
},
|
||||
additional_metadata={
|
||||
"transcript_id": transcript.id,
|
||||
"recording_id": recording.id,
|
||||
"reprocess": True,
|
||||
},
|
||||
)
|
||||
await transcripts_controller.update(
|
||||
transcript, {"workflow_run_id": workflow_id}
|
||||
)
|
||||
|
||||
process_multitrack_recording.delay(
|
||||
bucket_name=bucket_name,
|
||||
daily_room_name=meeting.room_name,
|
||||
recording_id=recording.id,
|
||||
track_keys=recording.track_keys,
|
||||
recording_start_ts=recording_start_ts,
|
||||
)
|
||||
logger.info(
|
||||
"Queued Daily recording for Hatchet reprocessing",
|
||||
recording_id=recording.id,
|
||||
workflow_id=workflow_id,
|
||||
room_name=meeting.room_name,
|
||||
track_count=len(recording.track_keys),
|
||||
)
|
||||
|
||||
reprocessed_count += 1
|
||||
|
||||
|
||||
@@ -4,7 +4,7 @@ from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from reflector.schemas.platform import WHEREBY_PLATFORM
|
||||
from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
@@ -14,6 +14,7 @@ def register_mock_platform():
|
||||
from reflector.video_platforms.registry import register_platform
|
||||
|
||||
register_platform(WHEREBY_PLATFORM, MockPlatformClient)
|
||||
register_platform(DAILY_PLATFORM, MockPlatformClient)
|
||||
yield
|
||||
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import asyncio
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
|
||||
"reflector.services.transcript_process.task_pipeline_file_process"
|
||||
) as mock_file_pipeline,
|
||||
patch(
|
||||
"reflector.services.transcript_process.task_pipeline_multitrack_process"
|
||||
) as mock_multitrack_pipeline,
|
||||
"reflector.services.transcript_process.HatchetClientManager"
|
||||
) as mock_hatchet,
|
||||
):
|
||||
response = await client.post(f"/transcripts/{transcript.id}/process")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json()["status"] == "ok"
|
||||
|
||||
# Whereby recordings should use file pipeline
|
||||
# Whereby recordings should use file pipeline, not Hatchet
|
||||
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
|
||||
mock_multitrack_pipeline.delay.assert_not_called()
|
||||
mock_hatchet.start_workflow.assert_not_called()
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("setup_database")
|
||||
@@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
|
||||
recording_trigger="automatic-2nd-participant",
|
||||
is_shared=False,
|
||||
)
|
||||
# Force Celery backend for test
|
||||
await rooms_controller.update(room, {"use_celery": True})
|
||||
|
||||
transcript = await transcripts_controller.add(
|
||||
"",
|
||||
@@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
|
||||
"reflector.services.transcript_process.task_pipeline_file_process"
|
||||
) as mock_file_pipeline,
|
||||
patch(
|
||||
"reflector.services.transcript_process.task_pipeline_multitrack_process"
|
||||
) as mock_multitrack_pipeline,
|
||||
"reflector.services.transcript_process.HatchetClientManager"
|
||||
) as mock_hatchet,
|
||||
):
|
||||
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
|
||||
|
||||
response = await client.post(f"/transcripts/{transcript.id}/process")
|
||||
|
||||
assert response.status_code == 200
|
||||
assert response.json()["status"] == "ok"
|
||||
|
||||
# Daily.co multitrack recordings should use multitrack pipeline
|
||||
mock_multitrack_pipeline.delay.assert_called_once_with(
|
||||
transcript_id=transcript.id,
|
||||
bucket_name="daily-bucket",
|
||||
track_keys=track_keys,
|
||||
)
|
||||
# Daily.co multitrack recordings should use Hatchet workflow
|
||||
mock_hatchet.start_workflow.assert_called_once()
|
||||
call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
|
||||
assert call_kwargs["workflow_name"] == "DiarizationPipeline"
|
||||
assert call_kwargs["input_data"]["transcript_id"] == transcript.id
|
||||
assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
|
||||
assert call_kwargs["input_data"]["tracks"] == [
|
||||
{"s3_key": k} for k in track_keys
|
||||
]
|
||||
mock_file_pipeline.delay.assert_not_called()
|
||||
|
||||
Reference in New Issue
Block a user