mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-06 10:46:46 +00:00
Compare commits
2 Commits
feature-le
...
release-pl
| Author | SHA1 | Date | |
|---|---|---|---|
| 46a10af349 | |||
| 15ab2e306e |
12
CHANGELOG.md
12
CHANGELOG.md
@@ -1,5 +1,17 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## [0.33.0](https://github.com/Monadical-SAS/reflector/compare/v0.32.2...v0.33.0) (2026-02-05)
|
||||||
|
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* Daily+hatchet default ([#846](https://github.com/Monadical-SAS/reflector/issues/846)) ([15ab2e3](https://github.com/Monadical-SAS/reflector/commit/15ab2e306eacf575494b4b5d2b2ad779d44a1c7f))
|
||||||
|
|
||||||
|
|
||||||
|
### Bug Fixes
|
||||||
|
|
||||||
|
* websocket tests ([#825](https://github.com/Monadical-SAS/reflector/issues/825)) ([1ce1c7a](https://github.com/Monadical-SAS/reflector/commit/1ce1c7a910b6c374115d2437b17f9d288ef094dc))
|
||||||
|
|
||||||
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)
|
## [0.32.2](https://github.com/Monadical-SAS/reflector/compare/v0.32.1...v0.32.2) (2026-02-03)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,35 @@
|
|||||||
|
"""drop_use_celery_column
|
||||||
|
|
||||||
|
Revision ID: 3aa20b96d963
|
||||||
|
Revises: e69f08ead8ea
|
||||||
|
Create Date: 2026-02-05 10:12:44.065279
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = "3aa20b96d963"
|
||||||
|
down_revision: Union[str, None] = "e69f08ead8ea"
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
with op.batch_alter_table("room", schema=None) as batch_op:
|
||||||
|
batch_op.drop_column("use_celery")
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
with op.batch_alter_table("room", schema=None) as batch_op:
|
||||||
|
batch_op.add_column(
|
||||||
|
sa.Column(
|
||||||
|
"use_celery",
|
||||||
|
sa.Boolean(),
|
||||||
|
server_default=sa.text("false"),
|
||||||
|
nullable=False,
|
||||||
|
)
|
||||||
|
)
|
||||||
@@ -57,12 +57,6 @@ rooms = sqlalchemy.Table(
|
|||||||
sqlalchemy.String,
|
sqlalchemy.String,
|
||||||
nullable=False,
|
nullable=False,
|
||||||
),
|
),
|
||||||
sqlalchemy.Column(
|
|
||||||
"use_celery",
|
|
||||||
sqlalchemy.Boolean,
|
|
||||||
nullable=False,
|
|
||||||
server_default=false(),
|
|
||||||
),
|
|
||||||
sqlalchemy.Column(
|
sqlalchemy.Column(
|
||||||
"skip_consent",
|
"skip_consent",
|
||||||
sqlalchemy.Boolean,
|
sqlalchemy.Boolean,
|
||||||
@@ -97,7 +91,6 @@ class Room(BaseModel):
|
|||||||
ics_last_sync: datetime | None = None
|
ics_last_sync: datetime | None = None
|
||||||
ics_last_etag: str | None = None
|
ics_last_etag: str | None = None
|
||||||
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
|
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
|
||||||
use_celery: bool = False
|
|
||||||
skip_consent: bool = False
|
skip_consent: bool = False
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -15,14 +15,10 @@ from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
|
|||||||
from hatchet_sdk.clients.rest.models import V1TaskStatus
|
from hatchet_sdk.clients.rest.models import V1TaskStatus
|
||||||
|
|
||||||
from reflector.db.recordings import recordings_controller
|
from reflector.db.recordings import recordings_controller
|
||||||
from reflector.db.rooms import rooms_controller
|
|
||||||
from reflector.db.transcripts import Transcript, transcripts_controller
|
from reflector.db.transcripts import Transcript, transcripts_controller
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
from reflector.hatchet.client import HatchetClientManager
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
||||||
from reflector.pipelines.main_multitrack_pipeline import (
|
|
||||||
task_pipeline_multitrack_process,
|
|
||||||
)
|
|
||||||
from reflector.utils.string import NonEmptyString
|
from reflector.utils.string import NonEmptyString
|
||||||
|
|
||||||
|
|
||||||
@@ -181,124 +177,98 @@ async def dispatch_transcript_processing(
|
|||||||
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
|
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
|
||||||
"""
|
"""
|
||||||
if isinstance(config, MultitrackProcessingConfig):
|
if isinstance(config, MultitrackProcessingConfig):
|
||||||
use_celery = False
|
# Multitrack processing always uses Hatchet (no Celery fallback)
|
||||||
if config.room_id:
|
# First check if we can replay (outside transaction since it's read-only)
|
||||||
room = await rooms_controller.get_by_id(config.room_id)
|
transcript = await transcripts_controller.get_by_id(config.transcript_id)
|
||||||
use_celery = room.use_celery if room else False
|
if transcript and transcript.workflow_run_id and not force:
|
||||||
|
can_replay = await HatchetClientManager.can_replay(
|
||||||
use_hatchet = not use_celery
|
transcript.workflow_run_id
|
||||||
|
|
||||||
if use_celery:
|
|
||||||
logger.info(
|
|
||||||
"Room uses legacy Celery processing",
|
|
||||||
room_id=config.room_id,
|
|
||||||
transcript_id=config.transcript_id,
|
|
||||||
)
|
)
|
||||||
|
if can_replay:
|
||||||
if use_hatchet:
|
await HatchetClientManager.replay_workflow(transcript.workflow_run_id)
|
||||||
# First check if we can replay (outside transaction since it's read-only)
|
logger.info(
|
||||||
transcript = await transcripts_controller.get_by_id(config.transcript_id)
|
"Replaying Hatchet workflow",
|
||||||
if transcript and transcript.workflow_run_id and not force:
|
workflow_id=transcript.workflow_run_id,
|
||||||
can_replay = await HatchetClientManager.can_replay(
|
|
||||||
transcript.workflow_run_id
|
|
||||||
)
|
)
|
||||||
if can_replay:
|
return None
|
||||||
await HatchetClientManager.replay_workflow(
|
else:
|
||||||
transcript.workflow_run_id
|
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
|
||||||
)
|
# Log and proceed to start new workflow
|
||||||
logger.info(
|
|
||||||
"Replaying Hatchet workflow",
|
|
||||||
workflow_id=transcript.workflow_run_id,
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
else:
|
|
||||||
# Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
|
|
||||||
# Log and proceed to start new workflow
|
|
||||||
try:
|
|
||||||
status = await HatchetClientManager.get_workflow_run_status(
|
|
||||||
transcript.workflow_run_id
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"Old workflow not replayable, starting new",
|
|
||||||
old_workflow_id=transcript.workflow_run_id,
|
|
||||||
old_status=status.value,
|
|
||||||
)
|
|
||||||
except NotFoundException:
|
|
||||||
# Workflow deleted from Hatchet but ID still in DB
|
|
||||||
logger.info(
|
|
||||||
"Old workflow not found in Hatchet, starting new",
|
|
||||||
old_workflow_id=transcript.workflow_run_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Force: cancel old workflow if exists
|
|
||||||
if force and transcript and transcript.workflow_run_id:
|
|
||||||
try:
|
|
||||||
await HatchetClientManager.cancel_workflow(
|
|
||||||
transcript.workflow_run_id
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"Cancelled old workflow (--force)",
|
|
||||||
workflow_id=transcript.workflow_run_id,
|
|
||||||
)
|
|
||||||
except NotFoundException:
|
|
||||||
logger.info(
|
|
||||||
"Old workflow already deleted (--force)",
|
|
||||||
workflow_id=transcript.workflow_run_id,
|
|
||||||
)
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript, {"workflow_run_id": None}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Re-fetch and check for concurrent dispatch (optimistic approach).
|
|
||||||
# No database lock - worst case is duplicate dispatch, but Hatchet
|
|
||||||
# workflows are idempotent so this is acceptable.
|
|
||||||
transcript = await transcripts_controller.get_by_id(config.transcript_id)
|
|
||||||
if transcript and transcript.workflow_run_id:
|
|
||||||
# Another process started a workflow between validation and now
|
|
||||||
try:
|
try:
|
||||||
status = await HatchetClientManager.get_workflow_run_status(
|
status = await HatchetClientManager.get_workflow_run_status(
|
||||||
transcript.workflow_run_id
|
transcript.workflow_run_id
|
||||||
)
|
)
|
||||||
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
|
logger.info(
|
||||||
logger.info(
|
"Old workflow not replayable, starting new",
|
||||||
"Concurrent workflow detected, skipping dispatch",
|
old_workflow_id=transcript.workflow_run_id,
|
||||||
workflow_id=transcript.workflow_run_id,
|
old_status=status.value,
|
||||||
)
|
)
|
||||||
return None
|
except NotFoundException:
|
||||||
except ApiException:
|
# Workflow deleted from Hatchet but ID still in DB
|
||||||
# Workflow might be gone (404) or API issue - proceed with new workflow
|
logger.info(
|
||||||
pass
|
"Old workflow not found in Hatchet, starting new",
|
||||||
|
old_workflow_id=transcript.workflow_run_id,
|
||||||
|
)
|
||||||
|
|
||||||
workflow_id = await HatchetClientManager.start_workflow(
|
# Force: cancel old workflow if exists
|
||||||
workflow_name="DiarizationPipeline",
|
if force and transcript and transcript.workflow_run_id:
|
||||||
input_data={
|
try:
|
||||||
"recording_id": config.recording_id,
|
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
|
||||||
"tracks": [{"s3_key": k} for k in config.track_keys],
|
logger.info(
|
||||||
"bucket_name": config.bucket_name,
|
"Cancelled old workflow (--force)",
|
||||||
"transcript_id": config.transcript_id,
|
workflow_id=transcript.workflow_run_id,
|
||||||
"room_id": config.room_id,
|
)
|
||||||
},
|
except NotFoundException:
|
||||||
additional_metadata={
|
logger.info(
|
||||||
"transcript_id": config.transcript_id,
|
"Old workflow already deleted (--force)",
|
||||||
"recording_id": config.recording_id,
|
workflow_id=transcript.workflow_run_id,
|
||||||
"daily_recording_id": config.recording_id,
|
)
|
||||||
},
|
await transcripts_controller.update(transcript, {"workflow_run_id": None})
|
||||||
|
|
||||||
|
# Re-fetch and check for concurrent dispatch (optimistic approach).
|
||||||
|
# No database lock - worst case is duplicate dispatch, but Hatchet
|
||||||
|
# workflows are idempotent so this is acceptable.
|
||||||
|
transcript = await transcripts_controller.get_by_id(config.transcript_id)
|
||||||
|
if transcript and transcript.workflow_run_id:
|
||||||
|
# Another process started a workflow between validation and now
|
||||||
|
try:
|
||||||
|
status = await HatchetClientManager.get_workflow_run_status(
|
||||||
|
transcript.workflow_run_id
|
||||||
|
)
|
||||||
|
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
|
||||||
|
logger.info(
|
||||||
|
"Concurrent workflow detected, skipping dispatch",
|
||||||
|
workflow_id=transcript.workflow_run_id,
|
||||||
|
)
|
||||||
|
return None
|
||||||
|
except ApiException:
|
||||||
|
# Workflow might be gone (404) or API issue - proceed with new workflow
|
||||||
|
pass
|
||||||
|
|
||||||
|
workflow_id = await HatchetClientManager.start_workflow(
|
||||||
|
workflow_name="DiarizationPipeline",
|
||||||
|
input_data={
|
||||||
|
"recording_id": config.recording_id,
|
||||||
|
"tracks": [{"s3_key": k} for k in config.track_keys],
|
||||||
|
"bucket_name": config.bucket_name,
|
||||||
|
"transcript_id": config.transcript_id,
|
||||||
|
"room_id": config.room_id,
|
||||||
|
},
|
||||||
|
additional_metadata={
|
||||||
|
"transcript_id": config.transcript_id,
|
||||||
|
"recording_id": config.recording_id,
|
||||||
|
"daily_recording_id": config.recording_id,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
if transcript:
|
||||||
|
await transcripts_controller.update(
|
||||||
|
transcript, {"workflow_run_id": workflow_id}
|
||||||
)
|
)
|
||||||
|
|
||||||
if transcript:
|
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
|
||||||
await transcripts_controller.update(
|
return None
|
||||||
transcript, {"workflow_run_id": workflow_id}
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Celery pipeline (durable workflows disabled)
|
|
||||||
return task_pipeline_multitrack_process.delay(
|
|
||||||
transcript_id=config.transcript_id,
|
|
||||||
bucket_name=config.bucket_name,
|
|
||||||
track_keys=config.track_keys,
|
|
||||||
)
|
|
||||||
elif isinstance(config, FileProcessingConfig):
|
elif isinstance(config, FileProcessingConfig):
|
||||||
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
|
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
|
||||||
else:
|
else:
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
from pydantic.types import PositiveInt
|
from pydantic.types import PositiveInt
|
||||||
from pydantic_settings import BaseSettings, SettingsConfigDict
|
from pydantic_settings import BaseSettings, SettingsConfigDict
|
||||||
|
|
||||||
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
|
from reflector.schemas.platform import DAILY_PLATFORM, Platform
|
||||||
from reflector.utils.string import NonEmptyString
|
from reflector.utils.string import NonEmptyString
|
||||||
|
|
||||||
|
|
||||||
@@ -155,7 +155,7 @@ class Settings(BaseSettings):
|
|||||||
None # Webhook UUID for this environment. Not used by production code
|
None # Webhook UUID for this environment. Not used by production code
|
||||||
)
|
)
|
||||||
# Platform Configuration
|
# Platform Configuration
|
||||||
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
|
DEFAULT_VIDEO_PLATFORM: Platform = DAILY_PLATFORM
|
||||||
|
|
||||||
# Zulip integration
|
# Zulip integration
|
||||||
ZULIP_REALM: str | None = None
|
ZULIP_REALM: str | None = None
|
||||||
|
|||||||
@@ -129,10 +129,6 @@ class DailyClient(VideoPlatformClient):
|
|||||||
"""Get room presence/session data for a Daily.co room."""
|
"""Get room presence/session data for a Daily.co room."""
|
||||||
return await self._api_client.get_room_presence(room_name)
|
return await self._api_client.get_room_presence(room_name)
|
||||||
|
|
||||||
async def delete_room(self, room_name: str) -> None:
|
|
||||||
"""Delete a Daily.co room (idempotent - succeeds even if room doesn't exist)."""
|
|
||||||
return await self._api_client.delete_room(room_name)
|
|
||||||
|
|
||||||
async def get_meeting_participants(
|
async def get_meeting_participants(
|
||||||
self, meeting_id: str
|
self, meeting_id: str
|
||||||
) -> MeetingParticipantsResponse:
|
) -> MeetingParticipantsResponse:
|
||||||
|
|||||||
@@ -20,7 +20,6 @@ from reflector.services.ics_sync import ics_sync_service
|
|||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
from reflector.utils.url import add_query_param
|
from reflector.utils.url import add_query_param
|
||||||
from reflector.video_platforms.factory import create_platform_client
|
from reflector.video_platforms.factory import create_platform_client
|
||||||
from reflector.worker.process import poll_daily_room_presence_task
|
|
||||||
from reflector.worker.webhook import test_webhook
|
from reflector.worker.webhook import test_webhook
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
@@ -366,53 +365,6 @@ async def rooms_create_meeting(
|
|||||||
return meeting
|
return meeting
|
||||||
|
|
||||||
|
|
||||||
@router.post("/rooms/{room_name}/meetings/{meeting_id}/joined")
|
|
||||||
async def rooms_joined_meeting(
|
|
||||||
room_name: str,
|
|
||||||
meeting_id: str,
|
|
||||||
):
|
|
||||||
"""Trigger presence poll (ideally when user actually joins meeting in Daily iframe)"""
|
|
||||||
room = await rooms_controller.get_by_name(room_name)
|
|
||||||
if not room:
|
|
||||||
raise HTTPException(status_code=404, detail="Room not found")
|
|
||||||
|
|
||||||
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
|
|
||||||
if not meeting:
|
|
||||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
|
||||||
|
|
||||||
if meeting.platform == "daily":
|
|
||||||
poll_daily_room_presence_task.delay(meeting_id)
|
|
||||||
|
|
||||||
return {"status": "ok"}
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/rooms/{room_name}/meetings/{meeting_id}/leave")
|
|
||||||
async def rooms_leave_meeting(
|
|
||||||
room_name: str,
|
|
||||||
meeting_id: str,
|
|
||||||
delay_seconds: int = 2,
|
|
||||||
):
|
|
||||||
"""Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
|
|
||||||
|
|
||||||
Queues presence poll with optional delay to allow Daily.co to detect disconnect.
|
|
||||||
"""
|
|
||||||
room = await rooms_controller.get_by_name(room_name)
|
|
||||||
if not room:
|
|
||||||
raise HTTPException(status_code=404, detail="Room not found")
|
|
||||||
|
|
||||||
meeting = await meetings_controller.get_by_id(meeting_id, room=room)
|
|
||||||
if not meeting:
|
|
||||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
|
||||||
|
|
||||||
if meeting.platform == "daily":
|
|
||||||
poll_daily_room_presence_task.apply_async(
|
|
||||||
args=[meeting_id],
|
|
||||||
countdown=delay_seconds,
|
|
||||||
)
|
|
||||||
|
|
||||||
return {"status": "ok"}
|
|
||||||
|
|
||||||
|
|
||||||
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
|
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
|
||||||
async def rooms_test_webhook(
|
async def rooms_test_webhook(
|
||||||
room_id: str,
|
room_id: str,
|
||||||
|
|||||||
@@ -27,9 +27,6 @@ from reflector.db.transcripts import (
|
|||||||
from reflector.hatchet.client import HatchetClientManager
|
from reflector.hatchet.client import HatchetClientManager
|
||||||
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
||||||
from reflector.pipelines.main_live_pipeline import asynctask
|
from reflector.pipelines.main_live_pipeline import asynctask
|
||||||
from reflector.pipelines.main_multitrack_pipeline import (
|
|
||||||
task_pipeline_multitrack_process,
|
|
||||||
)
|
|
||||||
from reflector.pipelines.topic_processing import EmptyPipeline
|
from reflector.pipelines.topic_processing import EmptyPipeline
|
||||||
from reflector.processors import AudioFileWriterProcessor
|
from reflector.processors import AudioFileWriterProcessor
|
||||||
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
|
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
|
||||||
@@ -351,49 +348,29 @@ async def _process_multitrack_recording_inner(
|
|||||||
room_id=room.id,
|
room_id=room.id,
|
||||||
)
|
)
|
||||||
|
|
||||||
use_celery = room and room.use_celery
|
# Multitrack processing always uses Hatchet (no Celery fallback)
|
||||||
use_hatchet = not use_celery
|
workflow_id = await HatchetClientManager.start_workflow(
|
||||||
|
workflow_name="DiarizationPipeline",
|
||||||
if use_celery:
|
input_data={
|
||||||
logger.info(
|
"recording_id": recording_id,
|
||||||
"Room uses legacy Celery processing",
|
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
|
||||||
room_id=room.id,
|
"bucket_name": bucket_name,
|
||||||
transcript_id=transcript.id,
|
"transcript_id": transcript.id,
|
||||||
)
|
"room_id": room.id,
|
||||||
|
},
|
||||||
if use_hatchet:
|
additional_metadata={
|
||||||
workflow_id = await HatchetClientManager.start_workflow(
|
"transcript_id": transcript.id,
|
||||||
workflow_name="DiarizationPipeline",
|
"recording_id": recording_id,
|
||||||
input_data={
|
"daily_recording_id": recording_id,
|
||||||
"recording_id": recording_id,
|
},
|
||||||
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
|
|
||||||
"bucket_name": bucket_name,
|
|
||||||
"transcript_id": transcript.id,
|
|
||||||
"room_id": room.id,
|
|
||||||
},
|
|
||||||
additional_metadata={
|
|
||||||
"transcript_id": transcript.id,
|
|
||||||
"recording_id": recording_id,
|
|
||||||
"daily_recording_id": recording_id,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"Started Hatchet workflow",
|
|
||||||
workflow_id=workflow_id,
|
|
||||||
transcript_id=transcript.id,
|
|
||||||
)
|
|
||||||
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript, {"workflow_run_id": workflow_id}
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Celery pipeline (runs when durable workflows disabled)
|
|
||||||
task_pipeline_multitrack_process.delay(
|
|
||||||
transcript_id=transcript.id,
|
|
||||||
bucket_name=bucket_name,
|
|
||||||
track_keys=filter_cam_audio_tracks(track_keys),
|
|
||||||
)
|
)
|
||||||
|
logger.info(
|
||||||
|
"Started Hatchet workflow",
|
||||||
|
workflow_id=workflow_id,
|
||||||
|
transcript_id=transcript.id,
|
||||||
|
)
|
||||||
|
|
||||||
|
await transcripts_controller.update(transcript, {"workflow_run_id": workflow_id})
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
@@ -845,47 +822,15 @@ async def process_meetings():
|
|||||||
end_date = end_date.replace(tzinfo=timezone.utc)
|
end_date = end_date.replace(tzinfo=timezone.utc)
|
||||||
|
|
||||||
client = create_platform_client(meeting.platform)
|
client = create_platform_client(meeting.platform)
|
||||||
has_active_sessions = False
|
room_sessions = await client.get_room_sessions(meeting.room_name)
|
||||||
has_had_sessions = False
|
|
||||||
|
|
||||||
if meeting.platform == "daily":
|
has_active_sessions = bool(
|
||||||
try:
|
room_sessions and any(s.ended_at is None for s in room_sessions)
|
||||||
presence = await client.get_room_presence(meeting.room_name)
|
)
|
||||||
has_active_sessions = presence.total_count > 0
|
has_had_sessions = bool(room_sessions)
|
||||||
|
logger_.info(
|
||||||
room_sessions = await client.get_room_sessions(
|
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
|
||||||
meeting.room_name
|
)
|
||||||
)
|
|
||||||
has_had_sessions = bool(room_sessions)
|
|
||||||
|
|
||||||
logger_.info(
|
|
||||||
"Daily.co presence check",
|
|
||||||
has_active_sessions=has_active_sessions,
|
|
||||||
has_had_sessions=has_had_sessions,
|
|
||||||
presence_count=presence.total_count,
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
logger_.warning(
|
|
||||||
"Daily.co presence API failed, falling back to DB sessions",
|
|
||||||
exc_info=True,
|
|
||||||
)
|
|
||||||
room_sessions = await client.get_room_sessions(
|
|
||||||
meeting.room_name
|
|
||||||
)
|
|
||||||
has_active_sessions = bool(
|
|
||||||
room_sessions
|
|
||||||
and any(s.ended_at is None for s in room_sessions)
|
|
||||||
)
|
|
||||||
has_had_sessions = bool(room_sessions)
|
|
||||||
else:
|
|
||||||
room_sessions = await client.get_room_sessions(meeting.room_name)
|
|
||||||
has_active_sessions = bool(
|
|
||||||
room_sessions and any(s.ended_at is None for s in room_sessions)
|
|
||||||
)
|
|
||||||
has_had_sessions = bool(room_sessions)
|
|
||||||
logger_.info(
|
|
||||||
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
|
|
||||||
)
|
|
||||||
|
|
||||||
if has_active_sessions:
|
if has_active_sessions:
|
||||||
logger_.debug("Meeting still has active sessions, keep it")
|
logger_.debug("Meeting still has active sessions, keep it")
|
||||||
@@ -904,20 +849,7 @@ async def process_meetings():
|
|||||||
await meetings_controller.update_meeting(
|
await meetings_controller.update_meeting(
|
||||||
meeting.id, is_active=False
|
meeting.id, is_active=False
|
||||||
)
|
)
|
||||||
logger_.info("Meeting deactivated in database")
|
logger_.info("Meeting is deactivated")
|
||||||
|
|
||||||
if meeting.platform == "daily":
|
|
||||||
try:
|
|
||||||
await client.delete_room(meeting.room_name)
|
|
||||||
logger_.info(
|
|
||||||
"Daily.co room deleted", room_name=meeting.room_name
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
logger_.warning(
|
|
||||||
"Failed to delete Daily.co room",
|
|
||||||
room_name=meeting.room_name,
|
|
||||||
exc_info=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
processed_count += 1
|
processed_count += 1
|
||||||
|
|
||||||
@@ -1117,66 +1049,43 @@ async def reprocess_failed_daily_recordings():
|
|||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
use_celery = room and room.use_celery
|
# Multitrack reprocessing always uses Hatchet (no Celery fallback)
|
||||||
use_hatchet = not use_celery
|
if not transcript:
|
||||||
|
logger.warning(
|
||||||
if use_hatchet:
|
"No transcript for Hatchet reprocessing, skipping",
|
||||||
if not transcript:
|
|
||||||
logger.warning(
|
|
||||||
"No transcript for Hatchet reprocessing, skipping",
|
|
||||||
recording_id=recording.id,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
workflow_id = await HatchetClientManager.start_workflow(
|
|
||||||
workflow_name="DiarizationPipeline",
|
|
||||||
input_data={
|
|
||||||
"recording_id": recording.id,
|
|
||||||
"tracks": [
|
|
||||||
{"s3_key": k}
|
|
||||||
for k in filter_cam_audio_tracks(recording.track_keys)
|
|
||||||
],
|
|
||||||
"bucket_name": bucket_name,
|
|
||||||
"transcript_id": transcript.id,
|
|
||||||
"room_id": room.id if room else None,
|
|
||||||
},
|
|
||||||
additional_metadata={
|
|
||||||
"transcript_id": transcript.id,
|
|
||||||
"recording_id": recording.id,
|
|
||||||
"reprocess": True,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript, {"workflow_run_id": workflow_id}
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"Queued Daily recording for Hatchet reprocessing",
|
|
||||||
recording_id=recording.id,
|
recording_id=recording.id,
|
||||||
workflow_id=workflow_id,
|
|
||||||
room_name=meeting.room_name,
|
|
||||||
track_count=len(recording.track_keys),
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.info(
|
|
||||||
"Queueing Daily recording for Celery reprocessing",
|
|
||||||
recording_id=recording.id,
|
|
||||||
room_name=meeting.room_name,
|
|
||||||
track_count=len(recording.track_keys),
|
|
||||||
transcript_status=transcript.status if transcript else None,
|
|
||||||
)
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
|
workflow_id = await HatchetClientManager.start_workflow(
|
||||||
# Reprocessing uses recording.meeting_id directly instead of time-based matching
|
workflow_name="DiarizationPipeline",
|
||||||
recording_start_ts = int(recording.recorded_at.timestamp())
|
input_data={
|
||||||
|
"recording_id": recording.id,
|
||||||
|
"tracks": [
|
||||||
|
{"s3_key": k}
|
||||||
|
for k in filter_cam_audio_tracks(recording.track_keys)
|
||||||
|
],
|
||||||
|
"bucket_name": bucket_name,
|
||||||
|
"transcript_id": transcript.id,
|
||||||
|
"room_id": room.id if room else None,
|
||||||
|
},
|
||||||
|
additional_metadata={
|
||||||
|
"transcript_id": transcript.id,
|
||||||
|
"recording_id": recording.id,
|
||||||
|
"reprocess": True,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
await transcripts_controller.update(
|
||||||
|
transcript, {"workflow_run_id": workflow_id}
|
||||||
|
)
|
||||||
|
|
||||||
process_multitrack_recording.delay(
|
logger.info(
|
||||||
bucket_name=bucket_name,
|
"Queued Daily recording for Hatchet reprocessing",
|
||||||
daily_room_name=meeting.room_name,
|
recording_id=recording.id,
|
||||||
recording_id=recording.id,
|
workflow_id=workflow_id,
|
||||||
track_keys=recording.track_keys,
|
room_name=meeting.room_name,
|
||||||
recording_start_ts=recording_start_ts,
|
track_count=len(recording.track_keys),
|
||||||
)
|
)
|
||||||
|
|
||||||
reprocessed_count += 1
|
reprocessed_count += 1
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ from unittest.mock import patch
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from reflector.schemas.platform import WHEREBY_PLATFORM
|
from reflector.schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session", autouse=True)
|
@pytest.fixture(scope="session", autouse=True)
|
||||||
@@ -14,6 +14,7 @@ def register_mock_platform():
|
|||||||
from reflector.video_platforms.registry import register_platform
|
from reflector.video_platforms.registry import register_platform
|
||||||
|
|
||||||
register_platform(WHEREBY_PLATFORM, MockPlatformClient)
|
register_platform(WHEREBY_PLATFORM, MockPlatformClient)
|
||||||
|
register_platform(DAILY_PLATFORM, MockPlatformClient)
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,286 +0,0 @@
|
|||||||
"""Unit tests for Daily.co presence-based meeting deactivation logic.
|
|
||||||
|
|
||||||
Tests the fix for split room race condition by verifying:
|
|
||||||
1. Real-time presence checking via Daily.co API
|
|
||||||
2. Room deletion when meetings deactivate
|
|
||||||
"""
|
|
||||||
|
|
||||||
from datetime import datetime, timedelta, timezone
|
|
||||||
from unittest.mock import AsyncMock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from reflector.dailyco_api.responses import (
|
|
||||||
RoomPresenceParticipant,
|
|
||||||
RoomPresenceResponse,
|
|
||||||
)
|
|
||||||
from reflector.db.daily_participant_sessions import (
|
|
||||||
DailyParticipantSession,
|
|
||||||
daily_participant_sessions_controller,
|
|
||||||
)
|
|
||||||
from reflector.db.meetings import meetings_controller
|
|
||||||
from reflector.db.rooms import rooms_controller
|
|
||||||
from reflector.video_platforms.daily import DailyClient
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
async def daily_room_and_meeting():
|
|
||||||
"""Create test room and meeting for Daily platform."""
|
|
||||||
room = await rooms_controller.add(
|
|
||||||
name="test-daily",
|
|
||||||
user_id="test-user",
|
|
||||||
platform="daily",
|
|
||||||
zulip_auto_post=False,
|
|
||||||
zulip_stream="",
|
|
||||||
zulip_topic="",
|
|
||||||
is_locked=False,
|
|
||||||
room_mode="normal",
|
|
||||||
recording_type="cloud",
|
|
||||||
recording_trigger="automatic-2nd-participant",
|
|
||||||
is_shared=False,
|
|
||||||
)
|
|
||||||
|
|
||||||
current_time = datetime.now(timezone.utc)
|
|
||||||
end_time = current_time + timedelta(hours=2)
|
|
||||||
|
|
||||||
meeting = await meetings_controller.create(
|
|
||||||
id="test-meeting-id",
|
|
||||||
room_name="test-daily-20260129120000",
|
|
||||||
room_url="https://daily.co/test",
|
|
||||||
host_room_url="https://daily.co/test",
|
|
||||||
start_date=current_time,
|
|
||||||
end_date=end_time,
|
|
||||||
room=room,
|
|
||||||
)
|
|
||||||
|
|
||||||
return room, meeting
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_daily_client_has_delete_room_method():
|
|
||||||
"""Verify DailyClient has delete_room method for cleanup."""
|
|
||||||
# Create a mock DailyClient
|
|
||||||
with patch("reflector.dailyco_api.client.DailyApiClient"):
|
|
||||||
from reflector.video_platforms.models import VideoPlatformConfig
|
|
||||||
|
|
||||||
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
|
|
||||||
client = DailyClient(config)
|
|
||||||
|
|
||||||
# Verify delete_room method exists
|
|
||||||
assert hasattr(client, "delete_room")
|
|
||||||
assert callable(getattr(client, "delete_room"))
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_get_room_presence_returns_realtime_data(daily_room_and_meeting):
|
|
||||||
"""Test that get_room_presence returns real-time participant data."""
|
|
||||||
room, meeting = daily_room_and_meeting
|
|
||||||
|
|
||||||
# Mock Daily.co API response
|
|
||||||
mock_presence = RoomPresenceResponse(
|
|
||||||
total_count=2,
|
|
||||||
data=[
|
|
||||||
RoomPresenceParticipant(
|
|
||||||
room=meeting.room_name,
|
|
||||||
id="session-1",
|
|
||||||
userId="user-1",
|
|
||||||
userName="User One",
|
|
||||||
joinTime="2026-01-29T12:00:00.000Z",
|
|
||||||
duration=120,
|
|
||||||
),
|
|
||||||
RoomPresenceParticipant(
|
|
||||||
room=meeting.room_name,
|
|
||||||
id="session-2",
|
|
||||||
userId="user-2",
|
|
||||||
userName="User Two",
|
|
||||||
joinTime="2026-01-29T12:05:00.000Z",
|
|
||||||
duration=60,
|
|
||||||
),
|
|
||||||
],
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch("reflector.dailyco_api.client.DailyApiClient") as mock_api:
|
|
||||||
from reflector.video_platforms.models import VideoPlatformConfig
|
|
||||||
|
|
||||||
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
|
|
||||||
client = DailyClient(config)
|
|
||||||
|
|
||||||
# Mock the API client method
|
|
||||||
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
|
|
||||||
|
|
||||||
# Call get_room_presence
|
|
||||||
result = await client.get_room_presence(meeting.room_name)
|
|
||||||
|
|
||||||
# Verify it calls Daily.co API
|
|
||||||
client._api_client.get_room_presence.assert_called_once_with(meeting.room_name)
|
|
||||||
|
|
||||||
# Verify result contains real-time data
|
|
||||||
assert result.total_count == 2
|
|
||||||
assert len(result.data) == 2
|
|
||||||
assert result.data[0].id == "session-1"
|
|
||||||
assert result.data[1].id == "session-2"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_presence_shows_active_even_when_db_stale(daily_room_and_meeting):
|
|
||||||
"""Test that Daily.co presence API is source of truth, not stale DB sessions."""
|
|
||||||
room, meeting = daily_room_and_meeting
|
|
||||||
current_time = datetime.now(timezone.utc)
|
|
||||||
|
|
||||||
# Create stale DB session (left_at=NULL but user actually left)
|
|
||||||
session_id = f"{meeting.id}:stale-user:{int((current_time - timedelta(minutes=5)).timestamp() * 1000)}"
|
|
||||||
await daily_participant_sessions_controller.upsert_joined(
|
|
||||||
DailyParticipantSession(
|
|
||||||
id=session_id,
|
|
||||||
meeting_id=meeting.id,
|
|
||||||
room_id=room.id,
|
|
||||||
session_id="stale-daily-session",
|
|
||||||
user_name="Stale User",
|
|
||||||
user_id="stale-user",
|
|
||||||
joined_at=current_time - timedelta(minutes=5),
|
|
||||||
left_at=None, # Stale - shows active but user left
|
|
||||||
)
|
|
||||||
)
|
|
||||||
|
|
||||||
# Verify DB shows active session
|
|
||||||
db_sessions = await daily_participant_sessions_controller.get_active_by_meeting(
|
|
||||||
meeting.id
|
|
||||||
)
|
|
||||||
assert len(db_sessions) == 1
|
|
||||||
|
|
||||||
# But Daily.co API shows room is empty
|
|
||||||
mock_presence = RoomPresenceResponse(total_count=0, data=[])
|
|
||||||
|
|
||||||
with patch("reflector.dailyco_api.client.DailyApiClient"):
|
|
||||||
from reflector.video_platforms.models import VideoPlatformConfig
|
|
||||||
|
|
||||||
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
|
|
||||||
client = DailyClient(config)
|
|
||||||
client._api_client.get_room_presence = AsyncMock(return_value=mock_presence)
|
|
||||||
|
|
||||||
# Get real-time presence
|
|
||||||
presence = await client.get_room_presence(meeting.room_name)
|
|
||||||
|
|
||||||
# Real-time API shows no participants (truth)
|
|
||||||
assert presence.total_count == 0
|
|
||||||
assert len(presence.data) == 0
|
|
||||||
|
|
||||||
# DB shows 1 participant (stale)
|
|
||||||
assert len(db_sessions) == 1
|
|
||||||
|
|
||||||
# Implementation should trust presence API, not DB
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_meeting_deactivation_logic_with_presence_empty():
|
|
||||||
"""Test the core deactivation decision logic when presence shows room empty."""
|
|
||||||
# This tests the logic that will be in process_meetings
|
|
||||||
|
|
||||||
# Simulate: DB shows stale active session
|
|
||||||
has_active_db_sessions = True # DB is stale
|
|
||||||
|
|
||||||
# Simulate: Daily.co presence API shows room empty
|
|
||||||
presence_count = 0 # Real-time truth
|
|
||||||
|
|
||||||
# Simulate: Meeting has been used before
|
|
||||||
has_had_sessions = True
|
|
||||||
|
|
||||||
# Decision logic (what process_meetings should do):
|
|
||||||
# - If presence API available: trust it
|
|
||||||
# - If presence shows empty AND has_had_sessions: deactivate
|
|
||||||
|
|
||||||
if presence_count == 0 and has_had_sessions:
|
|
||||||
should_deactivate = True
|
|
||||||
else:
|
|
||||||
should_deactivate = False
|
|
||||||
|
|
||||||
assert should_deactivate is True # Should deactivate despite stale DB
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_meeting_deactivation_logic_with_presence_active():
|
|
||||||
"""Test that meetings stay active when presence shows participants."""
|
|
||||||
# Simulate: DB shows no sessions (not yet updated)
|
|
||||||
has_active_db_sessions = False # DB hasn't caught up
|
|
||||||
|
|
||||||
# Simulate: Daily.co presence API shows active participant
|
|
||||||
presence_count = 1 # Real-time truth
|
|
||||||
|
|
||||||
# Decision logic: presence shows activity, keep meeting active
|
|
||||||
if presence_count > 0:
|
|
||||||
should_deactivate = False
|
|
||||||
else:
|
|
||||||
should_deactivate = True
|
|
||||||
|
|
||||||
assert should_deactivate is False # Should stay active
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_delete_room_called_on_deactivation(daily_room_and_meeting):
|
|
||||||
"""Test that Daily.co room is deleted when meeting deactivates."""
|
|
||||||
room, meeting = daily_room_and_meeting
|
|
||||||
|
|
||||||
with patch("reflector.dailyco_api.client.DailyApiClient"):
|
|
||||||
from reflector.video_platforms.models import VideoPlatformConfig
|
|
||||||
|
|
||||||
config = VideoPlatformConfig(api_key="test-key", webhook_secret="test-secret")
|
|
||||||
client = DailyClient(config)
|
|
||||||
|
|
||||||
# Mock delete_room API call
|
|
||||||
client._api_client.delete_room = AsyncMock()
|
|
||||||
|
|
||||||
# Simulate deactivation - should delete room
|
|
||||||
await client._api_client.delete_room(meeting.room_name)
|
|
||||||
|
|
||||||
# Verify delete was called
|
|
||||||
client._api_client.delete_room.assert_called_once_with(meeting.room_name)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_delete_room_idempotent_on_404():
|
|
||||||
"""Test that room deletion is idempotent (succeeds even if room doesn't exist)."""
|
|
||||||
from reflector.dailyco_api.client import DailyApiClient
|
|
||||||
|
|
||||||
# Create real client to test delete_room logic
|
|
||||||
client = DailyApiClient(api_key="test-key")
|
|
||||||
|
|
||||||
# Mock the HTTP client
|
|
||||||
mock_http_client = AsyncMock()
|
|
||||||
mock_response = AsyncMock()
|
|
||||||
mock_response.status_code = 404 # Room not found
|
|
||||||
mock_http_client.delete = AsyncMock(return_value=mock_response)
|
|
||||||
|
|
||||||
# Mock _get_client to return our mock
|
|
||||||
async def mock_get_client():
|
|
||||||
return mock_http_client
|
|
||||||
|
|
||||||
client._get_client = mock_get_client
|
|
||||||
|
|
||||||
# delete_room should succeed even on 404 (idempotent)
|
|
||||||
await client.delete_room("nonexistent-room")
|
|
||||||
|
|
||||||
# Verify delete was attempted
|
|
||||||
mock_http_client.delete.assert_called_once()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_api_failure_fallback_to_db_sessions():
|
|
||||||
"""Test that system falls back to DB sessions if Daily.co API fails."""
|
|
||||||
# Simulate: Daily.co API throws exception
|
|
||||||
api_exception = Exception("API unavailable")
|
|
||||||
|
|
||||||
# Simulate: DB shows active session
|
|
||||||
has_active_db_sessions = True
|
|
||||||
|
|
||||||
# Decision logic with fallback:
|
|
||||||
try:
|
|
||||||
presence_count = None
|
|
||||||
raise api_exception # Simulating API failure
|
|
||||||
except Exception:
|
|
||||||
# Fallback: use DB sessions (conservative - don't deactivate if unsure)
|
|
||||||
if has_active_db_sessions:
|
|
||||||
should_deactivate = False
|
|
||||||
else:
|
|
||||||
should_deactivate = True
|
|
||||||
|
|
||||||
assert should_deactivate is False # Conservative: keep active on API failure
|
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
import time
|
import time
|
||||||
from unittest.mock import patch
|
from unittest.mock import AsyncMock, patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from httpx import ASGITransport, AsyncClient
|
from httpx import ASGITransport, AsyncClient
|
||||||
@@ -142,17 +142,17 @@ async def test_whereby_recording_uses_file_pipeline(client):
|
|||||||
"reflector.services.transcript_process.task_pipeline_file_process"
|
"reflector.services.transcript_process.task_pipeline_file_process"
|
||||||
) as mock_file_pipeline,
|
) as mock_file_pipeline,
|
||||||
patch(
|
patch(
|
||||||
"reflector.services.transcript_process.task_pipeline_multitrack_process"
|
"reflector.services.transcript_process.HatchetClientManager"
|
||||||
) as mock_multitrack_pipeline,
|
) as mock_hatchet,
|
||||||
):
|
):
|
||||||
response = await client.post(f"/transcripts/{transcript.id}/process")
|
response = await client.post(f"/transcripts/{transcript.id}/process")
|
||||||
|
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert response.json()["status"] == "ok"
|
assert response.json()["status"] == "ok"
|
||||||
|
|
||||||
# Whereby recordings should use file pipeline
|
# Whereby recordings should use file pipeline, not Hatchet
|
||||||
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
|
mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id)
|
||||||
mock_multitrack_pipeline.delay.assert_not_called()
|
mock_hatchet.start_workflow.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
@pytest.mark.usefixtures("setup_database")
|
||||||
@@ -177,8 +177,6 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
|
|||||||
recording_trigger="automatic-2nd-participant",
|
recording_trigger="automatic-2nd-participant",
|
||||||
is_shared=False,
|
is_shared=False,
|
||||||
)
|
)
|
||||||
# Force Celery backend for test
|
|
||||||
await rooms_controller.update(room, {"use_celery": True})
|
|
||||||
|
|
||||||
transcript = await transcripts_controller.add(
|
transcript = await transcripts_controller.add(
|
||||||
"",
|
"",
|
||||||
@@ -213,18 +211,23 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
|
|||||||
"reflector.services.transcript_process.task_pipeline_file_process"
|
"reflector.services.transcript_process.task_pipeline_file_process"
|
||||||
) as mock_file_pipeline,
|
) as mock_file_pipeline,
|
||||||
patch(
|
patch(
|
||||||
"reflector.services.transcript_process.task_pipeline_multitrack_process"
|
"reflector.services.transcript_process.HatchetClientManager"
|
||||||
) as mock_multitrack_pipeline,
|
) as mock_hatchet,
|
||||||
):
|
):
|
||||||
|
mock_hatchet.start_workflow = AsyncMock(return_value="test-workflow-id")
|
||||||
|
|
||||||
response = await client.post(f"/transcripts/{transcript.id}/process")
|
response = await client.post(f"/transcripts/{transcript.id}/process")
|
||||||
|
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert response.json()["status"] == "ok"
|
assert response.json()["status"] == "ok"
|
||||||
|
|
||||||
# Daily.co multitrack recordings should use multitrack pipeline
|
# Daily.co multitrack recordings should use Hatchet workflow
|
||||||
mock_multitrack_pipeline.delay.assert_called_once_with(
|
mock_hatchet.start_workflow.assert_called_once()
|
||||||
transcript_id=transcript.id,
|
call_kwargs = mock_hatchet.start_workflow.call_args.kwargs
|
||||||
bucket_name="daily-bucket",
|
assert call_kwargs["workflow_name"] == "DiarizationPipeline"
|
||||||
track_keys=track_keys,
|
assert call_kwargs["input_data"]["transcript_id"] == transcript.id
|
||||||
)
|
assert call_kwargs["input_data"]["bucket_name"] == "daily-bucket"
|
||||||
|
assert call_kwargs["input_data"]["tracks"] == [
|
||||||
|
{"s3_key": k} for k in track_keys
|
||||||
|
]
|
||||||
mock_file_pipeline.delay.assert_not_called()
|
mock_file_pipeline.delay.assert_not_called()
|
||||||
|
|||||||
@@ -24,24 +24,15 @@ import { useAuth } from "../../lib/AuthProvider";
|
|||||||
import { useConsentDialog } from "../../lib/consent";
|
import { useConsentDialog } from "../../lib/consent";
|
||||||
import {
|
import {
|
||||||
useRoomJoinMeeting,
|
useRoomJoinMeeting,
|
||||||
useRoomJoinedMeeting,
|
|
||||||
useRoomLeaveMeeting,
|
|
||||||
useMeetingStartRecording,
|
useMeetingStartRecording,
|
||||||
leaveRoomPostUrl,
|
|
||||||
LeaveRoomBody,
|
|
||||||
} from "../../lib/apiHooks";
|
} from "../../lib/apiHooks";
|
||||||
import { omit } from "remeda";
|
import { omit } from "remeda";
|
||||||
import {
|
import {
|
||||||
assertExists,
|
assertExists,
|
||||||
assertExistsAndNonEmptyString,
|
|
||||||
NonEmptyString,
|
NonEmptyString,
|
||||||
parseNonEmptyString,
|
parseNonEmptyString,
|
||||||
} from "../../lib/utils";
|
} from "../../lib/utils";
|
||||||
import {
|
import { assertMeetingId, DailyRecordingType } from "../../lib/types";
|
||||||
assertMeetingId,
|
|
||||||
DailyRecordingType,
|
|
||||||
MeetingId,
|
|
||||||
} from "../../lib/types";
|
|
||||||
import { useUuidV5 } from "react-uuid-hook";
|
import { useUuidV5 } from "react-uuid-hook";
|
||||||
|
|
||||||
const CONSENT_BUTTON_ID = "recording-consent";
|
const CONSENT_BUTTON_ID = "recording-consent";
|
||||||
@@ -188,58 +179,6 @@ const useFrame = (
|
|||||||
] as const;
|
] as const;
|
||||||
};
|
};
|
||||||
|
|
||||||
const leaveDaily = () => {
|
|
||||||
const frame = DailyIframe.getCallInstance();
|
|
||||||
frame?.leave();
|
|
||||||
};
|
|
||||||
|
|
||||||
const useDirtyDisconnects = (
|
|
||||||
meetingId: NonEmptyString,
|
|
||||||
roomName: NonEmptyString,
|
|
||||||
) => {
|
|
||||||
useEffect(() => {
|
|
||||||
if (!meetingId || !roomName) return;
|
|
||||||
|
|
||||||
const handleBeforeUnload = () => {
|
|
||||||
leaveDaily();
|
|
||||||
navigator.sendBeacon(
|
|
||||||
leaveRoomPostUrl(
|
|
||||||
{
|
|
||||||
room_name: roomName,
|
|
||||||
meeting_id: meetingId,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
delay_seconds: 5,
|
|
||||||
},
|
|
||||||
),
|
|
||||||
undefined satisfies LeaveRoomBody,
|
|
||||||
);
|
|
||||||
};
|
|
||||||
window.addEventListener("beforeunload", handleBeforeUnload);
|
|
||||||
return () => window.removeEventListener("beforeunload", handleBeforeUnload);
|
|
||||||
}, [meetingId, roomName]);
|
|
||||||
};
|
|
||||||
|
|
||||||
const useDisconnects = (
|
|
||||||
meetingId: NonEmptyString,
|
|
||||||
roomName: NonEmptyString,
|
|
||||||
leaveMutation: ReturnType<typeof useRoomLeaveMeeting>,
|
|
||||||
) => {
|
|
||||||
useDirtyDisconnects(meetingId, roomName);
|
|
||||||
|
|
||||||
useEffect(() => {
|
|
||||||
return () => {
|
|
||||||
leaveDaily();
|
|
||||||
leaveMutation.mutate({
|
|
||||||
params: {
|
|
||||||
path: { meeting_id: meetingId, room_name: roomName },
|
|
||||||
query: { delay_seconds: 5 },
|
|
||||||
},
|
|
||||||
});
|
|
||||||
};
|
|
||||||
}, [meetingId, roomName]);
|
|
||||||
};
|
|
||||||
|
|
||||||
export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
||||||
const router = useRouter();
|
const router = useRouter();
|
||||||
const params = useParams();
|
const params = useParams();
|
||||||
@@ -247,8 +186,6 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
|||||||
const authLastUserId = auth.lastUserId;
|
const authLastUserId = auth.lastUserId;
|
||||||
const [container, setContainer] = useState<HTMLDivElement | null>(null);
|
const [container, setContainer] = useState<HTMLDivElement | null>(null);
|
||||||
const joinMutation = useRoomJoinMeeting();
|
const joinMutation = useRoomJoinMeeting();
|
||||||
const joinedMutation = useRoomJoinedMeeting();
|
|
||||||
const leaveMutation = useRoomLeaveMeeting();
|
|
||||||
const startRecordingMutation = useMeetingStartRecording();
|
const startRecordingMutation = useMeetingStartRecording();
|
||||||
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
|
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
|
||||||
|
|
||||||
@@ -258,9 +195,7 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
|||||||
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
|
useUuidV5(meeting.id, RAW_TRACKS_NAMESPACE)[0],
|
||||||
);
|
);
|
||||||
|
|
||||||
if (typeof params.roomName === "object")
|
const roomName = params?.roomName as string;
|
||||||
throw new Error(`Invalid room name in params. array? ${params.roomName}`);
|
|
||||||
const roomName = assertExistsAndNonEmptyString(params.roomName);
|
|
||||||
|
|
||||||
const {
|
const {
|
||||||
showConsentModal,
|
showConsentModal,
|
||||||
@@ -302,8 +237,6 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
|||||||
router.push("/browse");
|
router.push("/browse");
|
||||||
}, [router]);
|
}, [router]);
|
||||||
|
|
||||||
useDisconnects(meeting.id as MeetingId, roomName, leaveMutation);
|
|
||||||
|
|
||||||
const handleCustomButtonClick = useCallback(
|
const handleCustomButtonClick = useCallback(
|
||||||
(ev: DailyEventObjectCustomButtonClick) => {
|
(ev: DailyEventObjectCustomButtonClick) => {
|
||||||
if (ev.button_id === CONSENT_BUTTON_ID) {
|
if (ev.button_id === CONSENT_BUTTON_ID) {
|
||||||
@@ -316,15 +249,6 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
|||||||
);
|
);
|
||||||
|
|
||||||
const handleFrameJoinMeeting = useCallback(() => {
|
const handleFrameJoinMeeting = useCallback(() => {
|
||||||
joinedMutation.mutate({
|
|
||||||
params: {
|
|
||||||
path: {
|
|
||||||
room_name: roomName,
|
|
||||||
meeting_id: meeting.id,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
if (meeting.recording_type === "cloud") {
|
if (meeting.recording_type === "cloud") {
|
||||||
console.log("Starting dual recording via REST API", {
|
console.log("Starting dual recording via REST API", {
|
||||||
cloudInstanceId,
|
cloudInstanceId,
|
||||||
@@ -384,10 +308,8 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
|||||||
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
|
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
|
||||||
}
|
}
|
||||||
}, [
|
}, [
|
||||||
joinedMutation,
|
|
||||||
roomName,
|
|
||||||
meeting.id,
|
|
||||||
meeting.recording_type,
|
meeting.recording_type,
|
||||||
|
meeting.id,
|
||||||
startRecordingMutation,
|
startRecordingMutation,
|
||||||
cloudInstanceId,
|
cloudInstanceId,
|
||||||
rawTracksInstanceId,
|
rawTracksInstanceId,
|
||||||
|
|||||||
@@ -1,13 +1,12 @@
|
|||||||
"use client";
|
"use client";
|
||||||
|
|
||||||
import { $api, API_URL } from "./apiClient";
|
import { $api } from "./apiClient";
|
||||||
import { useError } from "../(errors)/errorContext";
|
import { useError } from "../(errors)/errorContext";
|
||||||
import { QueryClient, useQueryClient } from "@tanstack/react-query";
|
import { QueryClient, useQueryClient } from "@tanstack/react-query";
|
||||||
import type { components, operations } from "../reflector-api";
|
import type { components } from "../reflector-api";
|
||||||
import { useAuth } from "./AuthProvider";
|
import { useAuth } from "./AuthProvider";
|
||||||
import { MeetingId } from "./types";
|
import { MeetingId } from "./types";
|
||||||
import { NonEmptyString } from "./utils";
|
import { NonEmptyString } from "./utils";
|
||||||
import { createFinalURL, createQuerySerializer } from "openapi-fetch";
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
|
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
|
||||||
@@ -808,44 +807,6 @@ export function useRoomJoinMeeting() {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export const LEAVE_ROOM_POST_URL_TEMPLATE =
|
|
||||||
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave" as const;
|
|
||||||
|
|
||||||
export const leaveRoomPostUrl = (
|
|
||||||
path: operations["v1_rooms_leave_meeting"]["parameters"]["path"],
|
|
||||||
query?: operations["v1_rooms_leave_meeting"]["parameters"]["query"],
|
|
||||||
): string =>
|
|
||||||
createFinalURL(LEAVE_ROOM_POST_URL_TEMPLATE, {
|
|
||||||
baseUrl: API_URL,
|
|
||||||
params: { path, query },
|
|
||||||
querySerializer: createQuerySerializer(),
|
|
||||||
});
|
|
||||||
|
|
||||||
export type LeaveRoomBody = operations["v1_rooms_leave_meeting"]["requestBody"];
|
|
||||||
|
|
||||||
export function useRoomLeaveMeeting() {
|
|
||||||
return $api.useMutation("post", LEAVE_ROOM_POST_URL_TEMPLATE);
|
|
||||||
}
|
|
||||||
|
|
||||||
export const JOINED_ROOM_POST_URL_TEMPLATE =
|
|
||||||
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined" as const;
|
|
||||||
|
|
||||||
export const joinedRoomPostUrl = (
|
|
||||||
params: operations["v1_rooms_joined_meeting"]["parameters"]["path"],
|
|
||||||
): string =>
|
|
||||||
createFinalURL(JOINED_ROOM_POST_URL_TEMPLATE, {
|
|
||||||
baseUrl: API_URL,
|
|
||||||
params: { path: params },
|
|
||||||
querySerializer: () => "",
|
|
||||||
});
|
|
||||||
|
|
||||||
export type JoinedRoomBody =
|
|
||||||
operations["v1_rooms_joined_meeting"]["requestBody"];
|
|
||||||
|
|
||||||
export function useRoomJoinedMeeting() {
|
|
||||||
return $api.useMutation("post", JOINED_ROOM_POST_URL_TEMPLATE);
|
|
||||||
}
|
|
||||||
|
|
||||||
export function useRoomIcsSync() {
|
export function useRoomIcsSync() {
|
||||||
const { setError } = useError();
|
const { setError } = useError();
|
||||||
|
|
||||||
|
|||||||
108
www/app/reflector-api.d.ts
vendored
108
www/app/reflector-api.d.ts
vendored
@@ -171,48 +171,6 @@ export interface paths {
|
|||||||
patch?: never;
|
patch?: never;
|
||||||
trace?: never;
|
trace?: never;
|
||||||
};
|
};
|
||||||
"/v1/rooms/{room_name}/meetings/{meeting_id}/joined": {
|
|
||||||
parameters: {
|
|
||||||
query?: never;
|
|
||||||
header?: never;
|
|
||||||
path?: never;
|
|
||||||
cookie?: never;
|
|
||||||
};
|
|
||||||
get?: never;
|
|
||||||
put?: never;
|
|
||||||
/**
|
|
||||||
* Rooms Joined Meeting
|
|
||||||
* @description Trigger presence poll (ideally when user actually joins meeting in Daily iframe)
|
|
||||||
*/
|
|
||||||
post: operations["v1_rooms_joined_meeting"];
|
|
||||||
delete?: never;
|
|
||||||
options?: never;
|
|
||||||
head?: never;
|
|
||||||
patch?: never;
|
|
||||||
trace?: never;
|
|
||||||
};
|
|
||||||
"/v1/rooms/{room_name}/meetings/{meeting_id}/leave": {
|
|
||||||
parameters: {
|
|
||||||
query?: never;
|
|
||||||
header?: never;
|
|
||||||
path?: never;
|
|
||||||
cookie?: never;
|
|
||||||
};
|
|
||||||
get?: never;
|
|
||||||
put?: never;
|
|
||||||
/**
|
|
||||||
* Rooms Leave Meeting
|
|
||||||
* @description Trigger presence recheck when user leaves meeting (e.g., tab close/navigation).
|
|
||||||
*
|
|
||||||
* Queues presence poll with optional delay to allow Daily.co to detect disconnect.
|
|
||||||
*/
|
|
||||||
post: operations["v1_rooms_leave_meeting"];
|
|
||||||
delete?: never;
|
|
||||||
options?: never;
|
|
||||||
head?: never;
|
|
||||||
patch?: never;
|
|
||||||
trace?: never;
|
|
||||||
};
|
|
||||||
"/v1/rooms/{room_id}/webhook/test": {
|
"/v1/rooms/{room_id}/webhook/test": {
|
||||||
parameters: {
|
parameters: {
|
||||||
query?: never;
|
query?: never;
|
||||||
@@ -2477,72 +2435,6 @@ export interface operations {
|
|||||||
};
|
};
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
v1_rooms_joined_meeting: {
|
|
||||||
parameters: {
|
|
||||||
query?: never;
|
|
||||||
header?: never;
|
|
||||||
path: {
|
|
||||||
room_name: string;
|
|
||||||
meeting_id: string;
|
|
||||||
};
|
|
||||||
cookie?: never;
|
|
||||||
};
|
|
||||||
requestBody?: never;
|
|
||||||
responses: {
|
|
||||||
/** @description Successful Response */
|
|
||||||
200: {
|
|
||||||
headers: {
|
|
||||||
[name: string]: unknown;
|
|
||||||
};
|
|
||||||
content: {
|
|
||||||
"application/json": unknown;
|
|
||||||
};
|
|
||||||
};
|
|
||||||
/** @description Validation Error */
|
|
||||||
422: {
|
|
||||||
headers: {
|
|
||||||
[name: string]: unknown;
|
|
||||||
};
|
|
||||||
content: {
|
|
||||||
"application/json": components["schemas"]["HTTPValidationError"];
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
v1_rooms_leave_meeting: {
|
|
||||||
parameters: {
|
|
||||||
query?: {
|
|
||||||
delay_seconds?: number;
|
|
||||||
};
|
|
||||||
header?: never;
|
|
||||||
path: {
|
|
||||||
room_name: string;
|
|
||||||
meeting_id: string;
|
|
||||||
};
|
|
||||||
cookie?: never;
|
|
||||||
};
|
|
||||||
requestBody?: never;
|
|
||||||
responses: {
|
|
||||||
/** @description Successful Response */
|
|
||||||
200: {
|
|
||||||
headers: {
|
|
||||||
[name: string]: unknown;
|
|
||||||
};
|
|
||||||
content: {
|
|
||||||
"application/json": unknown;
|
|
||||||
};
|
|
||||||
};
|
|
||||||
/** @description Validation Error */
|
|
||||||
422: {
|
|
||||||
headers: {
|
|
||||||
[name: string]: unknown;
|
|
||||||
};
|
|
||||||
content: {
|
|
||||||
"application/json": components["schemas"]["HTTPValidationError"];
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
v1_rooms_test_webhook: {
|
v1_rooms_test_webhook: {
|
||||||
parameters: {
|
parameters: {
|
||||||
query?: never;
|
query?: never;
|
||||||
|
|||||||
Reference in New Issue
Block a user