Compare commits

..

9 Commits

Author SHA1 Message Date
Igor Loskutov
f0828bb846 active meetings type precision 2025-12-19 17:46:31 -05:00
Igor Loskutov
65916c273f no forced whereby recording indicator 2025-12-19 16:59:40 -05:00
Igor Loskutov
15afd57ed9 consent skip feature 2025-12-19 16:56:31 -05:00
Igor Loskutov
3929a80665 consent skip feature 2025-12-19 16:14:28 -05:00
Igor Loskutov
a988c3aa92 daily backend code refactor 2025-12-19 15:31:27 -05:00
Igor Loskutov
9edc38b861 consent disable refactor 2025-12-18 23:16:24 -05:00
Igor Loskutov
fbf319573e sync migration 2025-12-18 18:37:16 -05:00
Igor Monadical
537f9413a5 Merge branch 'main' into feat/consent-disable 2025-12-18 18:18:29 -05:00
Igor Loskutov
129a19bcb5 consent disable feature (no-mistakes) 2025-12-18 11:14:02 -05:00
59 changed files with 2636 additions and 4925 deletions

View File

@@ -1,20 +1,5 @@
# Changelog
## [0.24.0](https://github.com/Monadical-SAS/reflector/compare/v0.23.2...v0.24.0) (2025-12-18)
### Features
* identify action items ([#790](https://github.com/Monadical-SAS/reflector/issues/790)) ([964cd78](https://github.com/Monadical-SAS/reflector/commit/964cd78bb699d83d012ae4b8c96565df25b90a5d))
### Bug Fixes
* automatically reprocess daily recordings ([#797](https://github.com/Monadical-SAS/reflector/issues/797)) ([5f458aa](https://github.com/Monadical-SAS/reflector/commit/5f458aa4a7ec3d00ca5ec49d62fcc8ad232b138e))
* daily video optimisation ([#789](https://github.com/Monadical-SAS/reflector/issues/789)) ([16284e1](https://github.com/Monadical-SAS/reflector/commit/16284e1ac3faede2b74f0d91b50c0b5612af2c35))
* main menu login ([#800](https://github.com/Monadical-SAS/reflector/issues/800)) ([0bc971b](https://github.com/Monadical-SAS/reflector/commit/0bc971ba966a52d719c8c240b47dc7b3bdea4391))
* retry on workflow timeout ([#798](https://github.com/Monadical-SAS/reflector/issues/798)) ([5f7dfad](https://github.com/Monadical-SAS/reflector/commit/5f7dfadabd3e8017406ad3720ba495a59963ee34))
## [0.23.2](https://github.com/Monadical-SAS/reflector/compare/v0.23.1...v0.23.2) (2025-12-11)

View File

@@ -34,20 +34,6 @@ services:
environment:
ENTRYPOINT: beat
hatchet-worker:
build:
context: server
volumes:
- ./server/:/app/
- /app/.venv
env_file:
- ./server/.env
environment:
ENTRYPOINT: hatchet-worker
depends_on:
hatchet:
condition: service_healthy
redis:
image: redis:7.2
ports:
@@ -69,7 +55,6 @@ services:
postgres:
image: postgres:17
command: postgres -c 'max_connections=200'
ports:
- 5432:5432
environment:
@@ -78,42 +63,6 @@ services:
POSTGRES_DB: reflector
volumes:
- ./data/postgres:/var/lib/postgresql/data
- ./server/docker/init-hatchet-db.sql:/docker-entrypoint-initdb.d/init-hatchet-db.sql:ro
healthcheck:
test: ["CMD-SHELL", "pg_isready -d reflector -U reflector"]
interval: 10s
timeout: 10s
retries: 5
start_period: 10s
hatchet:
image: ghcr.io/hatchet-dev/hatchet/hatchet-lite:latest
ports:
- "8889:8888"
- "7078:7077"
depends_on:
postgres:
condition: service_healthy
environment:
DATABASE_URL: "postgresql://reflector:reflector@postgres:5432/hatchet?sslmode=disable"
SERVER_AUTH_COOKIE_DOMAIN: localhost
SERVER_AUTH_COOKIE_INSECURE: "t"
SERVER_GRPC_BIND_ADDRESS: "0.0.0.0"
SERVER_GRPC_INSECURE: "t"
SERVER_GRPC_BROADCAST_ADDRESS: hatchet:7077
SERVER_GRPC_PORT: "7077"
SERVER_URL: http://localhost:8889
SERVER_AUTH_SET_EMAIL_VERIFIED: "t"
# SERVER_DEFAULT_ENGINE_VERSION: "V1" # default
SERVER_INTERNAL_CLIENT_INTERNAL_GRPC_BROADCAST_ADDRESS: hatchet:7077
volumes:
- ./data/hatchet-config:/config
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8888/api/live"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
networks:
default:

View File

@@ -53,36 +53,6 @@ response = sqs.receive_message(QueueUrl=queue_url, ...)
uv run /app/requeue_uploaded_file.py TRANSCRIPT_ID
```
## Hatchet Setup (Fresh DB)
After resetting the Hatchet database:
### Option A: Automatic (CLI)
```bash
# Get default tenant ID and create token in one command
TENANT_ID=$(docker compose exec -T postgres psql -U reflector -d hatchet -t -c \
"SELECT id FROM \"Tenant\" WHERE slug = 'default';" | tr -d ' \n') && \
TOKEN=$(docker compose exec -T hatchet /hatchet-admin token create \
--config /config --tenant-id "$TENANT_ID" 2>/dev/null | tr -d '\n') && \
echo "HATCHET_CLIENT_TOKEN=$TOKEN"
```
Copy the output to `server/.env`.
### Option B: Manual (UI)
1. Create API token at http://localhost:8889 → Settings → API Tokens
2. Update `server/.env`: `HATCHET_CLIENT_TOKEN=<new-token>`
### Then restart workers
```bash
docker compose restart server hatchet-worker
```
Workflows register automatically when hatchet-worker starts.
## Pipeline Management
### Continue stuck pipeline from final summaries (identify_participants) step:

View File

@@ -1,2 +0,0 @@
-- Create hatchet database for Hatchet workflow engine
CREATE DATABASE hatchet;

View File

@@ -1,28 +0,0 @@
"""add workflow_run_id to transcript
Revision ID: 0f943fede0e0
Revises: 05f8688d6895
Create Date: 2025-12-16 01:54:13.855106
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "0f943fede0e0"
down_revision: Union[str, None] = "05f8688d6895"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("transcript", schema=None) as batch_op:
batch_op.add_column(sa.Column("workflow_run_id", sa.String(), nullable=True))
def downgrade() -> None:
with op.batch_alter_table("transcript", schema=None) as batch_op:
batch_op.drop_column("workflow_run_id")

View File

@@ -1,8 +1,8 @@
"""add use_hatchet to room
"""add skip_consent to room
Revision ID: bd3a729bb379
Revises: 0f943fede0e0
Create Date: 2025-12-16 16:34:03.594231
Revision ID: 20251217000000
Revises: 05f8688d6895
Create Date: 2025-12-17 00:00:00.000000
"""
@@ -12,8 +12,8 @@ import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "bd3a729bb379"
down_revision: Union[str, None] = "0f943fede0e0"
revision: str = "20251217000000"
down_revision: Union[str, None] = "05f8688d6895"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
@@ -22,14 +22,14 @@ def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_hatchet",
"skip_consent",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
server_default=sa.text("false"),
)
)
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_hatchet")
batch_op.drop_column("skip_consent")

View File

@@ -39,7 +39,6 @@ dependencies = [
"pytest-env>=1.1.5",
"webvtt-py>=0.5.0",
"icalendar>=6.0.0",
"hatchet-sdk>=0.47.0",
]
[dependency-groups]

View File

@@ -47,7 +47,7 @@ class DailyApiError(Exception):
)
super().__init__(
f"Daily.co API error: {operation} failed with status {self.status_code}: {response.text}"
f"Daily.co API error: {operation} failed with status {self.status_code}"
)

View File

@@ -58,10 +58,10 @@ rooms = sqlalchemy.Table(
nullable=False,
),
sqlalchemy.Column(
"use_hatchet",
"skip_consent",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
server_default=sqlalchemy.sql.false(),
),
sqlalchemy.Index("idx_room_is_shared", "is_shared"),
sqlalchemy.Index("idx_room_ics_enabled", "ics_enabled"),
@@ -91,7 +91,7 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_hatchet: bool = False
skip_consent: bool = False
class RoomController:
@@ -146,6 +146,7 @@ class RoomController:
ics_fetch_interval: int = 300,
ics_enabled: bool = False,
platform: Platform = settings.DEFAULT_VIDEO_PLATFORM,
skip_consent: bool = False,
):
"""
Add a new room
@@ -170,6 +171,7 @@ class RoomController:
"ics_fetch_interval": ics_fetch_interval,
"ics_enabled": ics_enabled,
"platform": platform,
"skip_consent": skip_consent,
}
room = Room(**room_data)

View File

@@ -84,8 +84,6 @@ transcripts = sqlalchemy.Table(
sqlalchemy.Column("audio_deleted", sqlalchemy.Boolean),
sqlalchemy.Column("room_id", sqlalchemy.String),
sqlalchemy.Column("webvtt", sqlalchemy.Text),
# Hatchet workflow run ID for resumption of failed workflows
sqlalchemy.Column("workflow_run_id", sqlalchemy.String),
sqlalchemy.Index("idx_transcript_recording_id", "recording_id"),
sqlalchemy.Index("idx_transcript_user_id", "user_id"),
sqlalchemy.Index("idx_transcript_created_at", "created_at"),
@@ -225,7 +223,6 @@ class Transcript(BaseModel):
zulip_message_id: int | None = None
audio_deleted: bool | None = None
webvtt: str | None = None
workflow_run_id: str | None = None # Hatchet workflow run ID for resumption
@field_serializer("created_at", when_used="json")
def serialize_datetime(self, dt: datetime) -> str:

View File

@@ -1,5 +0,0 @@
"""Hatchet workflow orchestration for Reflector."""
from reflector.hatchet.client import HatchetClientManager
__all__ = ["HatchetClientManager"]

View File

@@ -1,98 +0,0 @@
"""WebSocket broadcasting helpers for Hatchet workflows.
DUPLICATION NOTE: To be kept when Celery is deprecated. Currently dupes Celery logic.
Provides WebSocket broadcasting for Hatchet that matches Celery's @broadcast_to_sockets
decorator behavior. Events are broadcast to transcript rooms and user rooms.
"""
from typing import Any
import structlog
from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller
from reflector.utils.string import NonEmptyString
from reflector.ws_manager import get_ws_manager
# Events that should also be sent to user room (matches Celery behavior)
USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"}
async def broadcast_event(
transcript_id: NonEmptyString,
event: TranscriptEvent,
logger: structlog.BoundLogger,
) -> None:
"""Broadcast a TranscriptEvent to WebSocket subscribers.
Fire-and-forget: errors are logged but don't interrupt workflow execution.
"""
logger.info(
"Broadcasting event",
transcript_id=transcript_id,
event_type=event.event,
)
try:
ws_manager = get_ws_manager()
await ws_manager.send_json(
room_id=f"ts:{transcript_id}",
message=event.model_dump(mode="json"),
)
logger.info(
"Event sent to transcript room",
transcript_id=transcript_id,
event_type=event.event,
)
if event.event in USER_ROOM_EVENTS:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript and transcript.user_id:
await ws_manager.send_json(
room_id=f"user:{transcript.user_id}",
message={
"event": f"TRANSCRIPT_{event.event}",
"data": {"id": transcript_id, **event.data},
},
)
except Exception as e:
logger.warning(
"Failed to broadcast event",
error=str(e),
transcript_id=transcript_id,
event_type=event.event,
)
async def set_status_and_broadcast(
transcript_id: NonEmptyString,
status: str,
logger: structlog.BoundLogger,
) -> None:
"""Set transcript status and broadcast to WebSocket.
Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting.
"""
event = await transcripts_controller.set_status(transcript_id, status)
if event:
await broadcast_event(transcript_id, event, logger=logger)
async def append_event_and_broadcast(
transcript_id: NonEmptyString,
transcript: Transcript,
event_name: str,
data: Any,
logger: structlog.BoundLogger,
) -> TranscriptEvent:
"""Append event to transcript and broadcast to WebSocket.
Wrapper around transcripts_controller.append_event that adds WebSocket broadcasting.
"""
event = await transcripts_controller.append_event(
transcript=transcript,
event=event_name,
data=data,
)
await broadcast_event(transcript_id, event, logger=logger)
return event

View File

@@ -1,111 +0,0 @@
"""Hatchet Python client wrapper.
Uses singleton pattern because:
1. Hatchet client maintains persistent gRPC connections for workflow registration
2. Creating multiple clients would cause registration conflicts and resource leaks
3. The SDK is designed for a single client instance per process
4. Tests use `HatchetClientManager.reset()` to isolate state between tests
"""
import logging
import threading
from hatchet_sdk import ClientConfig, Hatchet
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.logger import logger
from reflector.settings import settings
class HatchetClientManager:
"""Singleton manager for Hatchet client connections.
See module docstring for rationale. For test isolation, use `reset()`.
"""
_instance: Hatchet | None = None
_lock = threading.Lock()
@classmethod
def get_client(cls) -> Hatchet:
"""Get or create the Hatchet client (thread-safe singleton)."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
if not settings.HATCHET_CLIENT_TOKEN:
raise ValueError("HATCHET_CLIENT_TOKEN must be set")
# Pass root logger to Hatchet so workflow logs appear in dashboard
root_logger = logging.getLogger()
cls._instance = Hatchet(
debug=settings.HATCHET_DEBUG,
config=ClientConfig(logger=root_logger),
)
return cls._instance
@classmethod
async def start_workflow(
cls,
workflow_name: str,
input_data: dict,
additional_metadata: dict | None = None,
) -> str:
"""Start a workflow and return the workflow run ID.
Args:
workflow_name: Name of the workflow to trigger.
input_data: Input data for the workflow run.
additional_metadata: Optional metadata for filtering in dashboard
(e.g., transcript_id, recording_id).
"""
client = cls.get_client()
result = await client.runs.aio_create(
workflow_name,
input_data,
additional_metadata=additional_metadata,
)
return result.run.metadata.id
@classmethod
async def get_workflow_run_status(cls, workflow_run_id: str) -> V1TaskStatus:
client = cls.get_client()
return await client.runs.aio_get_status(workflow_run_id)
@classmethod
async def cancel_workflow(cls, workflow_run_id: str) -> None:
client = cls.get_client()
await client.runs.aio_cancel(workflow_run_id)
logger.info("[Hatchet] Cancelled workflow", workflow_run_id=workflow_run_id)
@classmethod
async def replay_workflow(cls, workflow_run_id: str) -> None:
client = cls.get_client()
await client.runs.aio_replay(workflow_run_id)
logger.info("[Hatchet] Replaying workflow", workflow_run_id=workflow_run_id)
@classmethod
async def can_replay(cls, workflow_run_id: str) -> bool:
"""Check if workflow can be replayed (is FAILED)."""
try:
status = await cls.get_workflow_run_status(workflow_run_id)
return status == V1TaskStatus.FAILED or status == V1TaskStatus.CANCELLED
except Exception as e:
logger.warning(
"[Hatchet] Failed to check replay status",
workflow_run_id=workflow_run_id,
error=str(e),
)
return False
@classmethod
async def get_workflow_status(cls, workflow_run_id: str) -> dict:
"""Get the full workflow run details as dict."""
client = cls.get_client()
run = await client.runs.aio_get(workflow_run_id)
return run.to_dict()
@classmethod
def reset(cls) -> None:
"""Reset the client instance (for testing)."""
with cls._lock:
cls._instance = None

View File

@@ -1,63 +0,0 @@
"""
Run Hatchet workers for the diarization pipeline.
Runs as a separate process, just like Celery workers.
Usage:
uv run -m reflector.hatchet.run_workers
# Or via docker:
docker compose exec server uv run -m reflector.hatchet.run_workers
"""
import signal
import sys
from reflector.logger import logger
from reflector.settings import settings
def main() -> None:
"""Start Hatchet worker polling."""
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting workers")
sys.exit(1)
if not settings.HATCHET_CLIENT_TOKEN:
logger.error("HATCHET_CLIENT_TOKEN is not set")
sys.exit(1)
logger.info(
"Starting Hatchet workers",
debug=settings.HATCHET_DEBUG,
)
# Import here (not top-level) - workflow modules call HatchetClientManager.get_client()
# at module level because Hatchet SDK decorators (@workflow.task) bind at import time.
# Can't use lazy init: decorators need the client object when function is defined.
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415
diarization_pipeline,
track_workflow,
)
hatchet = HatchetClientManager.get_client()
worker = hatchet.worker(
"reflector-diarization-worker",
workflows=[diarization_pipeline, track_workflow],
)
def shutdown_handler(signum: int, frame) -> None:
logger.info("Received shutdown signal, stopping workers...")
# Worker cleanup happens automatically on exit
sys.exit(0)
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
logger.info("Starting Hatchet worker polling...")
worker.start()
if __name__ == "__main__":
main()

View File

@@ -1,14 +0,0 @@
"""Hatchet workflow definitions."""
from reflector.hatchet.workflows.diarization_pipeline import (
PipelineInput,
diarization_pipeline,
)
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
__all__ = [
"diarization_pipeline",
"track_workflow",
"PipelineInput",
"TrackInput",
]

View File

@@ -1,938 +0,0 @@
"""
Hatchet main workflow: DiarizationPipeline
Multitrack diarization pipeline for Daily.co recordings.
Orchestrates the full processing flow from recording metadata to final transcript.
Note: This file uses deferred imports (inside functions/tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure DB connections
are not shared across forks, avoiding connection pooling issues.
"""
import asyncio
import functools
import tempfile
from contextlib import asynccontextmanager
from datetime import timedelta
from pathlib import Path
from typing import Callable
import httpx
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.dailyco_api.client import DailyApiClient
from reflector.hatchet.broadcast import (
append_event_and_broadcast,
set_status_and_broadcast,
)
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.models import (
ConsentResult,
FinalizeResult,
MixdownResult,
PaddedTrackInfo,
ParticipantsResult,
ProcessTracksResult,
RecordingResult,
SummaryResult,
TitleResult,
TopicsResult,
WaveformResult,
WebhookResult,
ZulipResult,
)
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
from reflector.logger import logger
from reflector.pipelines import topic_processing
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.types import (
TitleSummary,
TitleSummaryWithId,
Word,
)
from reflector.processors.types import (
Transcript as TranscriptType,
)
from reflector.settings import settings
from reflector.storage.storage_aws import AwsStorage
from reflector.utils.audio_constants import (
PRESIGNED_URL_EXPIRATION_SECONDS,
WAVEFORM_SEGMENTS,
)
from reflector.utils.audio_mixdown import (
detect_sample_rate_from_tracks,
mixdown_tracks_pyav,
)
from reflector.utils.audio_waveform import get_audio_waveform
from reflector.utils.daily import (
filter_cam_audio_tracks,
parse_daily_recording_filename,
)
from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty
from reflector.zulip import post_transcript_notification
class PipelineInput(BaseModel):
"""Input to trigger the diarization pipeline."""
recording_id: NonEmptyString
tracks: list[dict] # List of {"s3_key": str}
bucket_name: NonEmptyString
transcript_id: NonEmptyString
room_id: NonEmptyString | None = None
hatchet = HatchetClientManager.get_client()
diarization_pipeline = hatchet.workflow(
name="DiarizationPipeline", input_validator=PipelineInput
)
@asynccontextmanager
async def fresh_db_connection():
"""Context manager for database connections in Hatchet workers.
TECH DEBT: Made to make connection fork-aware without changing db code too much.
The real fix would be making the db module fork-aware instead of bypassing it.
Current pattern is acceptable given Hatchet's process model.
"""
import databases # noqa: PLC0415
from reflector.db import _database_context # noqa: PLC0415
_database_context.set(None)
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
try:
yield db
finally:
await db.disconnect()
_database_context.set(None)
async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool:
"""Set transcript status to 'error' on workflow failure.
Returns:
True if status was set successfully, False if failed.
Failure is logged as CRITICAL since it means transcript may be stuck.
"""
try:
async with fresh_db_connection():
await set_status_and_broadcast(transcript_id, "error", logger=logger)
return True
except Exception as e:
logger.critical(
"[Hatchet] CRITICAL: Failed to set error status - transcript may be stuck in 'processing'",
transcript_id=transcript_id,
error=str(e),
exc_info=True,
)
return False
def _spawn_storage():
"""Create fresh storage instance."""
return AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
def with_error_handling(step_name: str, set_error_status: bool = True) -> Callable:
"""Decorator that handles task failures uniformly.
Args:
step_name: Name of the step for logging and progress tracking.
set_error_status: Whether to set transcript status to 'error' on failure.
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(input: PipelineInput, ctx: Context):
try:
return await func(input, ctx)
except Exception as e:
logger.error(
f"[Hatchet] {step_name} failed",
transcript_id=input.transcript_id,
error=str(e),
exc_info=True,
)
if set_error_status:
await set_workflow_error_status(input.transcript_id)
raise
return wrapper
return decorator
@diarization_pipeline.task(execution_timeout=timedelta(seconds=60), retries=3)
@with_error_handling("get_recording")
async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
"""Fetch recording metadata from Daily.co API."""
ctx.log(f"get_recording: recording_id={input.recording_id}")
# Set transcript status to "processing" at workflow start (broadcasts to WebSocket)
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
await set_status_and_broadcast(
input.transcript_id, "processing", logger=logger
)
ctx.log(f"Set transcript status to processing: {input.transcript_id}")
if not settings.DAILY_API_KEY:
raise ValueError("DAILY_API_KEY not configured")
async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client:
recording = await client.get_recording(input.recording_id)
ctx.log(
f"get_recording complete: room={recording.room_name}, duration={recording.duration}s"
)
return RecordingResult(
id=recording.id,
mtg_session_id=recording.mtgSessionId,
duration=recording.duration,
)
@diarization_pipeline.task(
parents=[get_recording], execution_timeout=timedelta(seconds=60), retries=3
)
@with_error_handling("get_participants")
async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult:
"""Fetch participant list from Daily.co API and update transcript in database."""
ctx.log(f"get_participants: transcript_id={input.transcript_id}")
recording = ctx.task_output(get_recording)
mtg_session_id = recording.mtg_session_id
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptParticipant,
transcripts_controller,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
# Note: title NOT cleared - preserves existing titles
await transcripts_controller.update(
transcript,
{
"events": [],
"topics": [],
"participants": [],
},
)
mtg_session_id = assert_non_none_and_non_empty(
mtg_session_id, "mtg_session_id is required"
)
daily_api_key = assert_non_none_and_non_empty(
settings.DAILY_API_KEY, "DAILY_API_KEY is required"
)
async with DailyApiClient(api_key=daily_api_key) as client:
participants = await client.get_meeting_participants(mtg_session_id)
id_to_name = {}
id_to_user_id = {}
for p in participants.data:
if p.user_name:
id_to_name[p.participant_id] = p.user_name
if p.user_id:
id_to_user_id[p.participant_id] = p.user_id
track_keys = [t["s3_key"] for t in input.tracks]
cam_audio_keys = filter_cam_audio_tracks(track_keys)
participants_list = []
for idx, key in enumerate(cam_audio_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
)
await transcripts_controller.upsert_participant(transcript, participant)
participants_list.append(
{
"participant_id": participant_id,
"user_name": name,
"speaker": idx,
}
)
ctx.log(f"get_participants complete: {len(participants_list)} participants")
return ParticipantsResult(
participants=participants_list,
num_tracks=len(input.tracks),
source_language=transcript.source_language if transcript else "en",
target_language=transcript.target_language if transcript else "en",
)
@diarization_pipeline.task(
parents=[get_participants], execution_timeout=timedelta(seconds=600), retries=3
)
@with_error_handling("process_tracks")
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
"""Spawn child workflows for each track (dynamic fan-out)."""
ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows")
participants_result = ctx.task_output(get_participants)
source_language = participants_result.source_language
child_coroutines = [
track_workflow.aio_run(
TrackInput(
track_index=i,
s3_key=track["s3_key"],
bucket_name=input.bucket_name,
transcript_id=input.transcript_id,
language=source_language,
)
)
for i, track in enumerate(input.tracks)
]
results = await asyncio.gather(*child_coroutines)
target_language = participants_result.target_language
track_words = []
padded_tracks = []
created_padded_files = set()
for result in results:
transcribe_result = result.get("transcribe_track", {})
track_words.append(transcribe_result.get("words", []))
pad_result = result.get("pad_track", {})
padded_key = pad_result.get("padded_key")
bucket_name = pad_result.get("bucket_name")
# Store S3 key info (not presigned URL) - consumer tasks presign on demand
if padded_key:
padded_tracks.append(
PaddedTrackInfo(key=padded_key, bucket_name=bucket_name)
)
track_index = pad_result.get("track_index")
if pad_result.get("size", 0) > 0 and track_index is not None:
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{track_index}.webm"
created_padded_files.add(storage_path)
all_words = [word for words in track_words for word in words]
all_words.sort(key=lambda w: w.get("start", 0))
ctx.log(
f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks"
)
return ProcessTracksResult(
all_words=all_words,
padded_tracks=padded_tracks,
word_count=len(all_words),
num_tracks=len(input.tracks),
target_language=target_language,
created_padded_files=list(created_padded_files),
)
@diarization_pipeline.task(
parents=[process_tracks], execution_timeout=timedelta(seconds=300), retries=3
)
@with_error_handling("mixdown_tracks")
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
"""Mix all padded tracks into single audio file using PyAV (same as Celery)."""
ctx.log("mixdown_tracks: mixing padded tracks into single audio file")
track_result = ctx.task_output(process_tracks)
padded_tracks = track_result.padded_tracks
# TODO think of NonEmpty type to avoid those checks, e.g. sized.NonEmpty from https://github.com/antonagestam/phantom-types/
if not padded_tracks:
raise ValueError("No padded tracks to mixdown")
storage = _spawn_storage()
# Presign URLs on demand (avoids stale URLs on workflow replay)
padded_urls = []
for track_info in padded_tracks:
if track_info.key:
url = await storage.get_file_url(
track_info.key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=track_info.bucket_name,
)
padded_urls.append(url)
valid_urls = [url for url in padded_urls if url]
if not valid_urls:
raise ValueError("No valid padded tracks to mixdown")
target_sample_rate = detect_sample_rate_from_tracks(valid_urls, logger=logger)
if not target_sample_rate:
logger.error("Mixdown failed - no decodable audio frames found")
raise ValueError("No decodable audio frames in any track")
output_path = tempfile.mktemp(suffix=".mp3")
duration_ms_callback_capture_container = [0.0]
async def capture_duration(d):
duration_ms_callback_capture_container[0] = d
writer = AudioFileWriterProcessor(path=output_path, on_duration=capture_duration)
await mixdown_tracks_pyav(
valid_urls,
writer,
target_sample_rate,
offsets_seconds=None,
logger=logger,
)
await writer.flush()
file_size = Path(output_path).stat().st_size
storage_path = f"{input.transcript_id}/audio.mp3"
with open(output_path, "rb") as mixed_file:
await storage.put_file(storage_path, mixed_file)
Path(output_path).unlink(missing_ok=True)
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
await transcripts_controller.update(
transcript, {"audio_location": "storage"}
)
ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}")
return MixdownResult(
audio_key=storage_path,
duration=duration_ms_callback_capture_container[0],
tracks_mixed=len(valid_urls),
)
@diarization_pipeline.task(
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=120), retries=3
)
@with_error_handling("generate_waveform")
async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResult:
"""Generate audio waveform visualization using AudioWaveformProcessor (matches Celery)."""
ctx.log(f"generate_waveform: transcript_id={input.transcript_id}")
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptWaveform,
transcripts_controller,
)
mixdown_result = ctx.task_output(mixdown_tracks)
audio_key = mixdown_result.audio_key
storage = _spawn_storage()
audio_url = await storage.get_file_url(
audio_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
# Download MP3 to temp file (AudioWaveformProcessor needs local file)
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
temp_path = temp_file.name
try:
async with httpx.AsyncClient() as client:
response = await client.get(audio_url, timeout=120)
response.raise_for_status()
with open(temp_path, "wb") as f:
f.write(response.content)
waveform = get_audio_waveform(
path=Path(temp_path), segments_count=WAVEFORM_SEGMENTS
)
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
waveform_data = TranscriptWaveform(waveform=waveform)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"WAVEFORM",
waveform_data,
logger=logger,
)
finally:
Path(temp_path).unlink(missing_ok=True)
ctx.log("generate_waveform complete")
return WaveformResult(waveform_generated=True)
@diarization_pipeline.task(
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=300), retries=3
)
@with_error_handling("detect_topics")
async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
"""Detect topics using LLM and save to database (matches Celery on_topic callback)."""
ctx.log("detect_topics: analyzing transcript for topics")
track_result = ctx.task_output(process_tracks)
words = track_result.all_words
target_language = track_result.target_language
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptTopic,
transcripts_controller,
)
word_objects = [Word(**w) for w in words]
transcript_type = TranscriptType(words=word_objects)
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
async def on_topic_callback(data):
topic = TranscriptTopic(
title=data.title,
summary=data.summary,
timestamp=data.timestamp,
transcript=data.transcript.text,
words=data.transcript.words,
)
if isinstance(
data, TitleSummaryWithId
): # Celery parity: main_live_pipeline.py
topic.id = data.id
await transcripts_controller.upsert_topic(transcript, topic)
await append_event_and_broadcast(
input.transcript_id, transcript, "TOPIC", topic, logger=logger
)
topics = await topic_processing.detect_topics(
transcript_type,
target_language,
on_topic_callback=on_topic_callback,
empty_pipeline=empty_pipeline,
)
topics_list = [t.model_dump() for t in topics]
ctx.log(f"detect_topics complete: found {len(topics_list)} topics")
return TopicsResult(topics=topics_list)
@diarization_pipeline.task(
parents=[detect_topics], execution_timeout=timedelta(seconds=120), retries=3
)
@with_error_handling("generate_title")
async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
"""Generate meeting title using LLM and save to database (matches Celery on_title callback)."""
ctx.log("generate_title: generating title from topics")
topics_result = ctx.task_output(detect_topics)
topics = topics_result.topics
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptFinalTitle,
transcripts_controller,
)
topic_objects = [TitleSummary(**t) for t in topics]
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
title_result = None
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
async def on_title_callback(data):
nonlocal title_result
title_result = data.title
final_title = TranscriptFinalTitle(title=data.title)
if not transcript.title:
await transcripts_controller.update(
transcript,
{"title": final_title.title},
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_TITLE",
final_title,
logger=logger,
)
await topic_processing.generate_title(
topic_objects,
on_title_callback=on_title_callback,
empty_pipeline=empty_pipeline,
logger=logger,
)
ctx.log(f"generate_title complete: '{title_result}'")
return TitleResult(title=title_result)
@diarization_pipeline.task(
parents=[detect_topics], execution_timeout=timedelta(seconds=300), retries=3
)
@with_error_handling("generate_summary")
async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
"""Generate meeting summary using LLM and save to database (matches Celery callbacks)."""
ctx.log("generate_summary: generating long and short summaries")
topics_result = ctx.task_output(detect_topics)
topics = topics_result.topics
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptFinalLongSummary,
TranscriptFinalShortSummary,
transcripts_controller,
)
topic_objects = [TitleSummary(**t) for t in topics]
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
summary_result = None
short_summary_result = None
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
async def on_long_summary_callback(data):
nonlocal summary_result
summary_result = data.long_summary
final_long_summary = TranscriptFinalLongSummary(
long_summary=data.long_summary
)
await transcripts_controller.update(
transcript,
{"long_summary": final_long_summary.long_summary},
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_LONG_SUMMARY",
final_long_summary,
logger=logger,
)
async def on_short_summary_callback(data):
nonlocal short_summary_result
short_summary_result = data.short_summary
final_short_summary = TranscriptFinalShortSummary(
short_summary=data.short_summary
)
await transcripts_controller.update(
transcript,
{"short_summary": final_short_summary.short_summary},
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_SHORT_SUMMARY",
final_short_summary,
logger=logger,
)
await topic_processing.generate_summaries(
topic_objects,
transcript, # DB transcript for context
on_long_summary_callback=on_long_summary_callback,
on_short_summary_callback=on_short_summary_callback,
empty_pipeline=empty_pipeline,
logger=logger,
)
ctx.log("generate_summary complete")
return SummaryResult(summary=summary_result, short_summary=short_summary_result)
@diarization_pipeline.task(
parents=[generate_waveform, generate_title, generate_summary],
execution_timeout=timedelta(seconds=60),
retries=3,
)
@with_error_handling("finalize")
async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
"""Finalize transcript: save words, emit TRANSCRIPT event, set status to 'ended'.
Matches Celery's on_transcript + set_status behavior.
Note: Title and summaries are already saved by their respective task callbacks.
"""
ctx.log("finalize: saving transcript and setting status to 'ended'")
mixdown_result = ctx.task_output(mixdown_tracks)
track_result = ctx.task_output(process_tracks)
duration = mixdown_result.duration
all_words = track_result.all_words
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
created_padded_files = track_result.created_padded_files
if created_padded_files:
ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files")
storage = _spawn_storage()
cleanup_results = await asyncio.gather(
*[storage.delete_file(path) for path in created_padded_files],
return_exceptions=True,
)
for storage_path, result in zip(created_padded_files, cleanup_results):
if isinstance(result, Exception):
logger.warning(
"[Hatchet] Failed to cleanup temporary padded track",
storage_path=storage_path,
error=str(result),
)
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
TranscriptText,
transcripts_controller,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript is None:
raise ValueError(f"Transcript {input.transcript_id} not found in database")
word_objects = [Word(**w) for w in all_words]
merged_transcript = TranscriptType(words=word_objects, translation=None)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"TRANSCRIPT",
TranscriptText(
text=merged_transcript.text,
translation=merged_transcript.translation,
),
logger=logger,
)
# Save duration and clear workflow_run_id (workflow completed successfully)
# Note: title/long_summary/short_summary already saved by their callbacks
await transcripts_controller.update(
transcript,
{
"duration": duration,
"workflow_run_id": None, # Clear on success - no need to resume
},
)
duration_data = TranscriptDuration(duration=duration)
await append_event_and_broadcast(
input.transcript_id, transcript, "DURATION", duration_data, logger=logger
)
await set_status_and_broadcast(input.transcript_id, "ended", logger=logger)
ctx.log(
f"finalize complete: transcript {input.transcript_id} status set to 'ended'"
)
return FinalizeResult(status="COMPLETED")
@diarization_pipeline.task(
parents=[finalize], execution_timeout=timedelta(seconds=60), retries=3
)
@with_error_handling("cleanup_consent", set_error_status=False)
async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
"""Check consent and delete audio files if any participant denied."""
ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.db.meetings import ( # noqa: PLC0415
meeting_consent_controller,
meetings_controller,
)
from reflector.db.recordings import recordings_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.storage import get_transcripts_storage # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
ctx.log("cleanup_consent: transcript not found")
return ConsentResult()
consent_denied = False
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting:
consent_denied = await meeting_consent_controller.has_any_denial(
meeting.id
)
if not consent_denied:
ctx.log("cleanup_consent: consent approved, keeping all files")
return ConsentResult()
ctx.log("cleanup_consent: consent denied, deleting audio files")
input_track_keys = set(t["s3_key"] for t in input.tracks)
# Detect if recording.track_keys was manually modified after workflow started
if transcript.recording_id:
recording = await recordings_controller.get_by_id(transcript.recording_id)
if recording and recording.track_keys:
db_track_keys = set(filter_cam_audio_tracks(recording.track_keys))
if input_track_keys != db_track_keys:
added = db_track_keys - input_track_keys
removed = input_track_keys - db_track_keys
logger.warning(
"[Hatchet] Track keys mismatch: DB changed since workflow start",
transcript_id=input.transcript_id,
recording_id=transcript.recording_id,
input_count=len(input_track_keys),
db_count=len(db_track_keys),
added_in_db=list(added) if added else None,
removed_from_db=list(removed) if removed else None,
)
ctx.log(
f"WARNING: track_keys mismatch - "
f"input has {len(input_track_keys)}, DB has {len(db_track_keys)}. "
f"Using input tracks for deletion."
)
deletion_errors = []
if input_track_keys and input.bucket_name:
master_storage = get_transcripts_storage()
for key in input_track_keys:
try:
await master_storage.delete_file(key, bucket=input.bucket_name)
ctx.log(f"Deleted recording file: {input.bucket_name}/{key}")
except Exception as e:
error_msg = f"Failed to delete {key}: {e}"
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
if transcript.audio_location == "storage":
storage = get_transcripts_storage()
try:
await storage.delete_file(transcript.storage_audio_path)
ctx.log(f"Deleted processed audio: {transcript.storage_audio_path}")
except Exception as e:
error_msg = f"Failed to delete processed audio: {e}"
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
if deletion_errors:
logger.warning(
"[Hatchet] cleanup_consent completed with errors",
transcript_id=input.transcript_id,
error_count=len(deletion_errors),
errors=deletion_errors,
)
ctx.log(f"cleanup_consent completed with {len(deletion_errors)} errors")
else:
await transcripts_controller.update(transcript, {"audio_deleted": True})
ctx.log("cleanup_consent: all audio deleted successfully")
return ConsentResult()
@diarization_pipeline.task(
parents=[cleanup_consent], execution_timeout=timedelta(seconds=60), retries=5
)
@with_error_handling("post_zulip", set_error_status=False)
async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
"""Post notification to Zulip."""
ctx.log(f"post_zulip: transcript_id={input.transcript_id}")
if not settings.ZULIP_REALM:
ctx.log("post_zulip skipped (Zulip not configured)")
return ZulipResult(zulip_message_id=None, skipped=True)
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
message_id = await post_transcript_notification(transcript)
ctx.log(f"post_zulip complete: zulip_message_id={message_id}")
else:
message_id = None
return ZulipResult(zulip_message_id=message_id)
@diarization_pipeline.task(
parents=[post_zulip], execution_timeout=timedelta(seconds=120), retries=30
)
@with_error_handling("send_webhook", set_error_status=False)
async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
"""Send completion webhook to external service."""
ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
if not input.room_id:
ctx.log("send_webhook skipped (no room_id)")
return WebhookResult(webhook_sent=False, skipped=True)
async with fresh_db_connection():
from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
room = await rooms_controller.get_by_id(input.room_id)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if room and room.webhook_url and transcript:
webhook_payload = {
"event": "transcript.completed",
"transcript_id": input.transcript_id,
"title": transcript.title,
"duration": transcript.duration,
}
async with httpx.AsyncClient() as client:
response = await client.post(
room.webhook_url, json=webhook_payload, timeout=30
)
response.raise_for_status()
ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code)
return WebhookResult(webhook_sent=False, skipped=True)

View File

@@ -1,123 +0,0 @@
"""
Pydantic models for Hatchet workflow task return types.
Provides static typing for all task outputs, enabling type checking
and better IDE support.
"""
from typing import Any
from pydantic import BaseModel
from reflector.utils.string import NonEmptyString
class PadTrackResult(BaseModel):
"""Result from pad_track task."""
padded_key: NonEmptyString # S3 key (not presigned URL) - presign on demand to avoid stale URLs on replay
bucket_name: (
NonEmptyString | None
) # None means use default transcript storage bucket
size: int
track_index: int
class TranscribeTrackResult(BaseModel):
"""Result from transcribe_track task."""
words: list[dict[str, Any]]
track_index: int
class RecordingResult(BaseModel):
"""Result from get_recording task."""
id: NonEmptyString | None
mtg_session_id: NonEmptyString | None
duration: float
class ParticipantsResult(BaseModel):
"""Result from get_participants task."""
participants: list[dict[str, Any]]
num_tracks: int
source_language: NonEmptyString
target_language: NonEmptyString
class PaddedTrackInfo(BaseModel):
"""Info for a padded track - S3 key + bucket for on-demand presigning."""
key: NonEmptyString
bucket_name: NonEmptyString | None # None = use default storage bucket
class ProcessTracksResult(BaseModel):
"""Result from process_tracks task."""
all_words: list[dict[str, Any]]
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
word_count: int
num_tracks: int
target_language: NonEmptyString
created_padded_files: list[NonEmptyString]
class MixdownResult(BaseModel):
"""Result from mixdown_tracks task."""
audio_key: NonEmptyString
duration: float
tracks_mixed: int
class WaveformResult(BaseModel):
"""Result from generate_waveform task."""
waveform_generated: bool
class TopicsResult(BaseModel):
"""Result from detect_topics task."""
topics: list[dict[str, Any]]
class TitleResult(BaseModel):
"""Result from generate_title task."""
title: str | None
class SummaryResult(BaseModel):
"""Result from generate_summary task."""
summary: str | None
short_summary: str | None
class FinalizeResult(BaseModel):
"""Result from finalize task."""
status: NonEmptyString
class ConsentResult(BaseModel):
"""Result from cleanup_consent task."""
class ZulipResult(BaseModel):
"""Result from post_zulip task."""
zulip_message_id: int | None = None
skipped: bool = False
class WebhookResult(BaseModel):
"""Result from send_webhook task."""
webhook_sent: bool
skipped: bool = False
response_code: int | None = None

View File

@@ -1,222 +0,0 @@
"""
Hatchet child workflow: TrackProcessing
Handles individual audio track processing: padding and transcription.
Spawned dynamically by the main diarization pipeline for each track.
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline)
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py.
Note: This file uses deferred imports (inside tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure
storage/DB connections are not shared across forks.
"""
import tempfile
from datetime import timedelta
from pathlib import Path
import av
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import (
apply_audio_padding_to_file,
extract_stream_start_time_from_container,
)
class TrackInput(BaseModel):
"""Input for individual track processing."""
track_index: int
s3_key: str
bucket_name: str
transcript_id: str
language: str = "en"
hatchet = HatchetClientManager.get_client()
track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput)
@track_workflow.task(execution_timeout=timedelta(seconds=300), retries=3)
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
"""Pad single audio track with silence for alignment.
Extracts stream.start_time from WebM container metadata and applies
silence padding using PyAV filter graph (adelay).
"""
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
logger.info(
"[Hatchet] pad_track",
track_index=input.track_index,
s3_key=input.s3_key,
transcript_id=input.transcript_id,
)
try:
# Create fresh storage instance to avoid aioboto3 fork issues
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
source_url = await storage.get_file_url(
input.s3_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=input.bucket_name,
)
with av.open(source_url) as in_container:
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
# If no padding needed, return original S3 key
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
temp_path = temp_file.name
try:
apply_audio_padding_to_file(
in_container,
temp_path,
start_time_seconds,
input.track_index,
logger=logger,
)
file_size = Path(temp_path).stat().st_size
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
logger.info(
f"About to upload padded track",
key=storage_path,
size=file_size,
)
with open(temp_path, "rb") as padded_file:
await storage.put_file(storage_path, padded_file)
logger.info(
f"Uploaded padded track to S3",
key=storage_path,
size=file_size,
)
finally:
Path(temp_path).unlink(missing_ok=True)
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
logger.info(
"[Hatchet] pad_track complete",
track_index=input.track_index,
padded_key=storage_path,
)
# Return S3 key (not presigned URL) - consumer tasks presign on demand
# This avoids stale URLs when workflow is replayed
return PadTrackResult(
padded_key=storage_path,
bucket_name=None, # None = use default transcript storage bucket
size=file_size,
track_index=input.track_index,
)
except Exception as e:
logger.error("[Hatchet] pad_track failed", error=str(e), exc_info=True)
raise
@track_workflow.task(
parents=[pad_track], execution_timeout=timedelta(seconds=600), retries=3
)
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
ctx.log(f"transcribe_track: track {input.track_index}, language={input.language}")
logger.info(
"[Hatchet] transcribe_track",
track_index=input.track_index,
language=input.language,
)
try:
pad_result = ctx.task_output(pad_track)
padded_key = pad_result.padded_key
bucket_name = pad_result.bucket_name
if not padded_key:
raise ValueError("Missing padded_key from pad_track")
# Presign URL on demand (avoids stale URLs on workflow replay)
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
audio_url = await storage.get_file_url(
padded_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=bucket_name,
)
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
transcribe_file_with_processor,
)
transcript = await transcribe_file_with_processor(audio_url, input.language)
# Tag all words with speaker index
words = []
for word in transcript.words:
word_dict = word.model_dump()
word_dict["speaker"] = input.track_index
words.append(word_dict)
ctx.log(
f"transcribe_track complete: track {input.track_index}, {len(words)} words"
)
logger.info(
"[Hatchet] transcribe_track complete",
track_index=input.track_index,
word_count=len(words),
)
return TranscribeTrackResult(
words=words,
track_index=input.track_index,
)
except Exception as e:
logger.error("[Hatchet] transcribe_track failed", error=str(e), exc_info=True)
raise

View File

@@ -97,8 +97,13 @@ class PipelineMainFile(PipelineMainBase):
},
)
# Extract audio and write to transcript location
audio_path = await self.extract_and_write_audio(file_path, transcript)
# Upload for processing
audio_url = await self.upload_audio(audio_path, transcript)
# Run parallel processing
await self.run_parallel_processing(
audio_path,
audio_url,
@@ -192,6 +197,7 @@ class PipelineMainFile(PipelineMainBase):
transcript_result = results[0]
diarization_result = results[1]
# Handle errors - raise any exception that occurred
self._handle_gather_exceptions(results, "parallel processing")
for result in results:
if isinstance(result, Exception):
@@ -206,6 +212,7 @@ class PipelineMainFile(PipelineMainBase):
transcript=transcript_result, diarization=diarization_result or []
)
# Store result for retrieval
diarized_transcript: Transcript | None = None
async def capture_result(transcript):
@@ -342,6 +349,7 @@ async def task_pipeline_file_process(*, transcript_id: str):
try:
await pipeline.set_status(transcript_id, "processing")
# Find the file to process
audio_file = next(transcript.data_path.glob("upload.*"), None)
if not audio_file:
audio_file = next(transcript.data_path.glob("audio.*"), None)

View File

@@ -112,7 +112,7 @@ def get_transcript(func):
transcript_id = kwargs.pop("transcript_id")
transcript = await transcripts_controller.get_by_id(transcript_id=transcript_id)
if not transcript:
raise Exception("Transcript {transcript_id} not found")
raise Exception(f"Transcript {transcript_id} not found")
# Enhanced logger with Celery task context
tlogger = logger.bind(transcript_id=transcript.id)

View File

@@ -1,8 +1,11 @@
import asyncio
import math
import tempfile
from fractions import Fraction
from pathlib import Path
import av
from av.audio.resampler import AudioResampler
from celery import chain, shared_task
from reflector.asynctask import asynctask
@@ -29,15 +32,6 @@ from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType
from reflector.storage import Storage, get_transcripts_storage
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_mixdown import (
detect_sample_rate_from_tracks,
mixdown_tracks_pyav,
)
from reflector.utils.audio_padding import (
apply_audio_padding_to_file,
extract_stream_start_time_from_container,
)
from reflector.utils.daily import (
filter_cam_audio_tracks,
parse_daily_recording_filename,
@@ -45,6 +39,13 @@ from reflector.utils.daily import (
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
# Audio encoding constants
OPUS_STANDARD_SAMPLE_RATE = 48000
OPUS_DEFAULT_BIT_RATE = 128000
# Storage operation constants
PRESIGNED_URL_EXPIRATION_SECONDS = 7200 # 2 hours
class PipelineMainMultitrack(PipelineMainBase):
def __init__(self, transcript_id: str):
@@ -124,8 +125,8 @@ class PipelineMainMultitrack(PipelineMainBase):
try:
# PyAV streams input from S3 URL efficiently (2-5MB fixed overhead for codec/filters)
with av.open(track_url) as in_container:
start_time_seconds = extract_stream_start_time_from_container(
in_container, track_idx, logger=self.logger
start_time_seconds = self._extract_stream_start_time_from_container(
in_container, track_idx
)
if start_time_seconds <= 0:
@@ -143,12 +144,8 @@ class PipelineMainMultitrack(PipelineMainBase):
temp_path = temp_file.name
try:
apply_audio_padding_to_file(
in_container,
temp_path,
start_time_seconds,
track_idx,
logger=self.logger,
self._apply_audio_padding_to_file(
in_container, temp_path, start_time_seconds, track_idx
)
storage_path = (
@@ -159,6 +156,7 @@ class PipelineMainMultitrack(PipelineMainBase):
with open(temp_path, "rb") as padded_file:
await storage.put_file(storage_path, padded_file)
finally:
# Clean up temp file
Path(temp_path).unlink(missing_ok=True)
padded_url = await storage.get_file_url(
@@ -188,28 +186,317 @@ class PipelineMainMultitrack(PipelineMainBase):
f"Track {track_idx} padding failed - transcript would have incorrect timestamps"
) from e
def _extract_stream_start_time_from_container(
self, container, track_idx: int
) -> float:
"""
Extract meeting-relative start time from WebM stream metadata.
Uses PyAV to read stream.start_time from WebM container.
More accurate than filename timestamps by ~209ms due to network/encoding delays.
"""
start_time_seconds = 0.0
try:
audio_streams = [s for s in container.streams if s.type == "audio"]
stream = audio_streams[0] if audio_streams else container.streams[0]
# 1) Try stream-level start_time (most reliable for Daily.co tracks)
if stream.start_time is not None and stream.time_base is not None:
start_time_seconds = float(stream.start_time * stream.time_base)
# 2) Fallback to container-level start_time (in av.time_base units)
if (start_time_seconds <= 0) and (container.start_time is not None):
start_time_seconds = float(container.start_time * av.time_base)
# 3) Fallback to first packet DTS in stream.time_base
if start_time_seconds <= 0:
for packet in container.demux(stream):
if packet.dts is not None:
start_time_seconds = float(packet.dts * stream.time_base)
break
except Exception as e:
self.logger.warning(
"PyAV metadata read failed; assuming 0 start_time",
track_idx=track_idx,
error=str(e),
)
start_time_seconds = 0.0
self.logger.info(
f"Track {track_idx} stream metadata: start_time={start_time_seconds:.3f}s",
track_idx=track_idx,
)
return start_time_seconds
def _apply_audio_padding_to_file(
self,
in_container,
output_path: str,
start_time_seconds: float,
track_idx: int,
) -> None:
"""Apply silence padding to audio track using PyAV filter graph, writing to file"""
delay_ms = math.floor(start_time_seconds * 1000)
self.logger.info(
f"Padding track {track_idx} with {delay_ms}ms delay using PyAV",
track_idx=track_idx,
delay_ms=delay_ms,
)
try:
with av.open(output_path, "w", format="webm") as out_container:
in_stream = next(
(s for s in in_container.streams if s.type == "audio"), None
)
if in_stream is None:
raise Exception("No audio stream in input")
out_stream = out_container.add_stream(
"libopus", rate=OPUS_STANDARD_SAMPLE_RATE
)
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
graph = av.filter.Graph()
abuf_args = (
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_fmt=s16:"
f"channel_layout=stereo"
)
src = graph.add("abuffer", args=abuf_args, name="src")
aresample_f = graph.add("aresample", args="async=1", name="ares")
# adelay requires one delay value per channel separated by '|'
delays_arg = f"{delay_ms}|{delay_ms}"
adelay_f = graph.add(
"adelay", args=f"delays={delays_arg}:all=1", name="delay"
)
sink = graph.add("abuffersink", name="sink")
src.link_to(aresample_f)
aresample_f.link_to(adelay_f)
adelay_f.link_to(sink)
graph.configure()
resampler = AudioResampler(
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
)
# Decode -> resample -> push through graph -> encode Opus
for frame in in_container.decode(in_stream):
out_frames = resampler.resample(frame) or []
for rframe in out_frames:
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
src.push(rframe)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
src.push(None)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
for packet in out_stream.encode(None):
out_container.mux(packet)
except Exception as e:
self.logger.error(
"PyAV padding failed for track",
track_idx=track_idx,
delay_ms=delay_ms,
error=str(e),
exc_info=True,
)
raise
async def mixdown_tracks(
self,
track_urls: list[str],
writer: AudioFileWriterProcessor,
offsets_seconds: list[float] | None = None,
) -> None:
"""Multi-track mixdown using PyAV filter graph (amix), reading from S3 presigned URLs."""
target_sample_rate = detect_sample_rate_from_tracks(
track_urls, logger=self.logger
)
"""Multi-track mixdown using PyAV filter graph (amix), reading from S3 presigned URLs"""
target_sample_rate: int | None = None
for url in track_urls:
if not url:
continue
container = None
try:
container = av.open(url)
for frame in container.decode(audio=0):
target_sample_rate = frame.sample_rate
break
except Exception:
continue
finally:
if container is not None:
container.close()
if target_sample_rate:
break
if not target_sample_rate:
self.logger.error("Mixdown failed - no decodable audio frames found")
raise Exception("Mixdown failed: No decodable audio frames in any track")
# Build PyAV filter graph:
# N abuffer (s32/stereo)
# -> optional adelay per input (for alignment)
# -> amix (s32)
# -> aformat(s16)
# -> sink
graph = av.filter.Graph()
inputs = []
valid_track_urls = [url for url in track_urls if url]
input_offsets_seconds = None
if offsets_seconds is not None:
input_offsets_seconds = [
offsets_seconds[i] for i, url in enumerate(track_urls) if url
]
for idx, url in enumerate(valid_track_urls):
args = (
f"time_base=1/{target_sample_rate}:"
f"sample_rate={target_sample_rate}:"
f"sample_fmt=s32:"
f"channel_layout=stereo"
)
in_ctx = graph.add("abuffer", args=args, name=f"in{idx}")
inputs.append(in_ctx)
await mixdown_tracks_pyav(
track_urls,
writer,
target_sample_rate,
offsets_seconds=offsets_seconds,
logger=self.logger,
if not inputs:
self.logger.error("Mixdown failed - no valid inputs for graph")
raise Exception("Mixdown failed: No valid inputs for filter graph")
mixer = graph.add("amix", args=f"inputs={len(inputs)}:normalize=0", name="mix")
fmt = graph.add(
"aformat",
args=(
f"sample_fmts=s32:channel_layouts=stereo:sample_rates={target_sample_rate}"
),
name="fmt",
)
sink = graph.add("abuffersink", name="out")
# Optional per-input delay before mixing
delays_ms: list[int] = []
if input_offsets_seconds is not None:
base = min(input_offsets_seconds) if input_offsets_seconds else 0.0
delays_ms = [
max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds
]
else:
delays_ms = [0 for _ in inputs]
for idx, in_ctx in enumerate(inputs):
delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0
if delay_ms > 0:
# adelay requires one value per channel; use same for stereo
adelay = graph.add(
"adelay",
args=f"delays={delay_ms}|{delay_ms}:all=1",
name=f"delay{idx}",
)
in_ctx.link_to(adelay)
adelay.link_to(mixer, 0, idx)
else:
in_ctx.link_to(mixer, 0, idx)
mixer.link_to(fmt)
fmt.link_to(sink)
graph.configure()
containers = []
try:
# Open all containers with cleanup guaranteed
for i, url in enumerate(valid_track_urls):
try:
c = av.open(
url,
options={
# it's trying to stream from s3 by default
"reconnect": "1",
"reconnect_streamed": "1",
"reconnect_delay_max": "5",
},
)
containers.append(c)
except Exception as e:
self.logger.warning(
"Mixdown: failed to open container from URL",
input=i,
url=url,
error=str(e),
)
if not containers:
self.logger.error("Mixdown failed - no valid containers opened")
raise Exception("Mixdown failed: Could not open any track containers")
decoders = [c.decode(audio=0) for c in containers]
active = [True] * len(decoders)
resamplers = [
AudioResampler(format="s32", layout="stereo", rate=target_sample_rate)
for _ in decoders
]
while any(active):
for i, (dec, is_active) in enumerate(zip(decoders, active)):
if not is_active:
continue
try:
frame = next(dec)
except StopIteration:
active[i] = False
# causes stream to move on / unclogs memory
inputs[i].push(None)
continue
if frame.sample_rate != target_sample_rate:
continue
out_frames = resamplers[i].resample(frame) or []
for rf in out_frames:
rf.sample_rate = target_sample_rate
rf.time_base = Fraction(1, target_sample_rate)
inputs[i].push(rf)
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
finally:
# Cleanup all containers, even if processing failed
for c in containers:
if c is not None:
try:
c.close()
except Exception:
pass # Best effort cleanup
@broadcast_to_sockets
async def set_status(self, transcript_id: str, status: TranscriptStatus):
async with self.lock_transaction():

View File

@@ -11,19 +11,13 @@ from typing import Literal, Union, assert_never
import celery
from celery.result import AsyncResult
from hatchet_sdk.clients.rest.exceptions import ApiException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger
from reflector.db.transcripts import Transcript
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.settings import settings
from reflector.utils.string import NonEmptyString
@@ -43,8 +37,6 @@ class MultitrackProcessingConfig:
transcript_id: NonEmptyString
bucket_name: NonEmptyString
track_keys: list[str]
recording_id: NonEmptyString | None = None
room_id: NonEmptyString | None = None
mode: Literal["multitrack"] = "multitrack"
@@ -57,7 +49,6 @@ class ValidationOk:
# transcript currently doesnt always have recording_id
recording_id: NonEmptyString | None
transcript_id: NonEmptyString
room_id: NonEmptyString | None = None
@dataclass
@@ -105,7 +96,6 @@ async def validate_transcript_for_processing(
if transcript.status == "idle":
return ValidationNotReady(detail="Recording is not ready for processing")
# Check Celery tasks
if task_is_scheduled_or_active(
"reflector.pipelines.main_file_pipeline.task_pipeline_file_process",
transcript_id=transcript.id,
@@ -115,25 +105,8 @@ async def validate_transcript_for_processing(
):
return ValidationAlreadyScheduled(detail="already running")
# Check Hatchet workflows (if enabled)
if settings.HATCHET_ENABLED and transcript.workflow_run_id:
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
# If workflow is running or queued, don't allow new processing
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
return ValidationAlreadyScheduled(
detail="Hatchet workflow already running"
)
except ApiException:
# Workflow might be gone (404) or API issue - allow processing
pass
return ValidationOk(
recording_id=transcript.recording_id,
transcript_id=transcript.id,
room_id=transcript.room_id,
recording_id=transcript.recording_id, transcript_id=transcript.id
)
@@ -143,7 +116,6 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
"""
bucket_name: str | None = None
track_keys: list[str] | None = None
recording_id: str | None = validation.recording_id
if validation.recording_id:
recording = await recordings_controller.get_by_id(validation.recording_id)
@@ -165,8 +137,6 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
bucket_name=bucket_name, # type: ignore (validated above)
track_keys=track_keys,
transcript_id=validation.transcript_id,
recording_id=recording_id,
room_id=validation.room_id,
)
return FileProcessingConfig(
@@ -174,104 +144,8 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
)
async def dispatch_transcript_processing(
config: ProcessingConfig, force: bool = False
) -> AsyncResult | None:
"""Dispatch transcript processing to appropriate backend (Hatchet or Celery).
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
"""
def dispatch_transcript_processing(config: ProcessingConfig) -> AsyncResult:
if isinstance(config, MultitrackProcessingConfig):
# Check if room has use_hatchet=True (overrides env vars)
room_forces_hatchet = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
room_forces_hatchet = room.use_hatchet if room else False
# Start durable workflow if enabled (Hatchet)
# or if room has use_hatchet=True
use_hatchet = settings.HATCHET_ENABLED or room_forces_hatchet
if room_forces_hatchet:
logger.info(
"Room forces Hatchet workflow",
room_id=config.room_id,
transcript_id=config.transcript_id,
)
if use_hatchet:
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
)
if can_replay:
await HatchetClientManager.replay_workflow(
transcript.workflow_run_id
)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,

View File

@@ -153,19 +153,5 @@ class Settings(BaseSettings):
ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None
# Durable workflow orchestration
# Provider: "hatchet" (or "none" to disable)
DURABLE_WORKFLOW_PROVIDER: str = "none"
# Hatchet workflow orchestration
HATCHET_CLIENT_TOKEN: str | None = None
HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls
HATCHET_DEBUG: bool = False
@property
def HATCHET_ENABLED(self) -> bool:
"""True if Hatchet is the active provider."""
return self.DURABLE_WORKFLOW_PROVIDER == "hatchet"
settings = Settings()

View File

@@ -15,11 +15,8 @@ import time
from typing import Callable
from celery.result import AsyncResult
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db import get_database
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.services.transcript_process import (
FileProcessingConfig,
MultitrackProcessingConfig,
@@ -37,26 +34,24 @@ async def process_transcript_inner(
transcript: Transcript,
on_validation: Callable[[ValidationResult], None],
on_preprocess: Callable[[PrepareResult], None],
force: bool = False,
) -> AsyncResult | None:
) -> AsyncResult:
validation = await validate_transcript_for_processing(transcript)
on_validation(validation)
config = await prepare_transcript_processing(validation)
on_preprocess(config)
return await dispatch_transcript_processing(config, force=force)
return dispatch_transcript_processing(config)
async def process_transcript(
transcript_id: str, sync: bool = False, force: bool = False
) -> None:
async def process_transcript(transcript_id: str, sync: bool = False) -> None:
"""
Process a transcript by ID, auto-detecting multitrack vs file pipeline.
Args:
transcript_id: The transcript UUID
sync: If True, wait for task completion. If False, dispatch and exit.
force: If True, cancel old workflow and start new (latest code). If False, replay failed workflow.
"""
from reflector.db import get_database
database = get_database()
await database.connect()
@@ -87,42 +82,10 @@ async def process_transcript(
print(f"Dispatching file pipeline", file=sys.stderr)
result = await process_transcript_inner(
transcript,
on_validation=on_validation,
on_preprocess=on_preprocess,
force=force,
transcript, on_validation=on_validation, on_preprocess=on_preprocess
)
if result is None:
# Hatchet workflow dispatched
if sync:
# Re-fetch transcript to get workflow_run_id
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.workflow_run_id:
print("Error: workflow_run_id not found", file=sys.stderr)
sys.exit(1)
print("Waiting for Hatchet workflow...", file=sys.stderr)
while True:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
print(f" Status: {status.value}", file=sys.stderr)
if status == V1TaskStatus.COMPLETED:
print("Workflow completed successfully", file=sys.stderr)
break
elif status in (V1TaskStatus.FAILED, V1TaskStatus.CANCELLED):
print(f"Workflow failed: {status}", file=sys.stderr)
sys.exit(1)
await asyncio.sleep(5)
else:
print(
"Task dispatched (use --sync to wait for completion)",
file=sys.stderr,
)
elif sync:
if sync:
print("Waiting for task completion...", file=sys.stderr)
while not result.ready():
print(f" Status: {result.state}", file=sys.stderr)
@@ -155,16 +118,9 @@ def main():
action="store_true",
help="Wait for task completion instead of just dispatching",
)
parser.add_argument(
"--force",
action="store_true",
help="Cancel old workflow and start new (uses latest code instead of replaying)",
)
args = parser.parse_args()
asyncio.run(
process_transcript(args.transcript_id, sync=args.sync, force=args.force)
)
asyncio.run(process_transcript(args.transcript_id, sync=args.sync))
if __name__ == "__main__":

View File

@@ -1,15 +0,0 @@
"""
Shared audio processing constants.
Used by both Hatchet workflows and Celery pipelines for consistent audio encoding.
"""
# Opus codec settings
OPUS_STANDARD_SAMPLE_RATE = 48000
OPUS_DEFAULT_BIT_RATE = 128000 # 128kbps for good speech quality
# S3 presigned URL expiration
PRESIGNED_URL_EXPIRATION_SECONDS = 7200 # 2 hours
# Waveform visualization
WAVEFORM_SEGMENTS = 255

View File

@@ -1,227 +0,0 @@
"""
Audio track mixdown utilities.
Shared PyAV-based functions for mixing multiple audio tracks into a single output.
Used by both Hatchet workflows and Celery pipelines.
"""
from fractions import Fraction
import av
from av.audio.resampler import AudioResampler
def detect_sample_rate_from_tracks(track_urls: list[str], logger=None) -> int | None:
"""Detect sample rate from first decodable audio frame.
Args:
track_urls: List of URLs to audio files (S3 presigned or local)
logger: Optional logger instance
Returns:
Sample rate in Hz, or None if no decodable frames found
"""
for url in track_urls:
if not url:
continue
container = None
try:
container = av.open(url)
for frame in container.decode(audio=0):
return frame.sample_rate
except Exception:
continue
finally:
if container is not None:
container.close()
return None
async def mixdown_tracks_pyav(
track_urls: list[str],
writer,
target_sample_rate: int,
offsets_seconds: list[float] | None = None,
logger=None,
) -> None:
"""Multi-track mixdown using PyAV filter graph (amix).
Builds a filter graph: N abuffer -> optional adelay -> amix -> aformat -> sink
Reads from S3 presigned URLs or local files, pushes mixed frames to writer.
Args:
track_urls: List of URLs to audio tracks (S3 presigned or local)
writer: AudioFileWriterProcessor instance with async push() method
target_sample_rate: Sample rate for output (Hz)
offsets_seconds: Optional per-track delays in seconds for alignment.
If provided, must have same length as track_urls. Delays are relative
to the minimum offset (earliest track has delay=0).
logger: Optional logger instance
Raises:
ValueError: If offsets_seconds length doesn't match track_urls,
no valid tracks provided, or no containers can be opened
"""
if offsets_seconds is not None and len(offsets_seconds) != len(track_urls):
raise ValueError(
f"offsets_seconds length ({len(offsets_seconds)}) must match track_urls ({len(track_urls)})"
)
valid_track_urls = [url for url in track_urls if url]
if not valid_track_urls:
if logger:
logger.error("Mixdown failed - no valid track URLs provided")
raise ValueError("Mixdown failed: No valid track URLs")
# Calculate per-input delays if offsets provided
input_offsets_seconds = None
if offsets_seconds is not None:
input_offsets_seconds = [
offsets_seconds[i] for i, url in enumerate(track_urls) if url
]
# Build PyAV filter graph:
# N abuffer (s32/stereo)
# -> optional adelay per input (for alignment)
# -> amix (s32)
# -> aformat(s16)
# -> sink
graph = av.filter.Graph()
inputs = []
for idx, url in enumerate(valid_track_urls):
args = (
f"time_base=1/{target_sample_rate}:"
f"sample_rate={target_sample_rate}:"
f"sample_fmt=s32:"
f"channel_layout=stereo"
)
in_ctx = graph.add("abuffer", args=args, name=f"in{idx}")
inputs.append(in_ctx)
if not inputs:
if logger:
logger.error("Mixdown failed - no valid inputs for graph")
raise ValueError("Mixdown failed: No valid inputs for filter graph")
mixer = graph.add("amix", args=f"inputs={len(inputs)}:normalize=0", name="mix")
fmt = graph.add(
"aformat",
args=f"sample_fmts=s32:channel_layouts=stereo:sample_rates={target_sample_rate}",
name="fmt",
)
sink = graph.add("abuffersink", name="out")
# Optional per-input delay before mixing
delays_ms: list[int] = []
if input_offsets_seconds is not None:
base = min(input_offsets_seconds) if input_offsets_seconds else 0.0
delays_ms = [
max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds
]
else:
delays_ms = [0 for _ in inputs]
for idx, in_ctx in enumerate(inputs):
delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0
if delay_ms > 0:
# adelay requires one value per channel; use same for stereo
adelay = graph.add(
"adelay",
args=f"delays={delay_ms}|{delay_ms}:all=1",
name=f"delay{idx}",
)
in_ctx.link_to(adelay)
adelay.link_to(mixer, 0, idx)
else:
in_ctx.link_to(mixer, 0, idx)
mixer.link_to(fmt)
fmt.link_to(sink)
graph.configure()
containers = []
try:
# Open all containers with cleanup guaranteed
for i, url in enumerate(valid_track_urls):
try:
c = av.open(
url,
options={
# S3 streaming options
"reconnect": "1",
"reconnect_streamed": "1",
"reconnect_delay_max": "5",
},
)
containers.append(c)
except Exception as e:
if logger:
logger.warning(
"Mixdown: failed to open container from URL",
input=i,
url=url,
error=str(e),
)
if not containers:
if logger:
logger.error("Mixdown failed - no valid containers opened")
raise ValueError("Mixdown failed: Could not open any track containers")
decoders = [c.decode(audio=0) for c in containers]
active = [True] * len(decoders)
resamplers = [
AudioResampler(format="s32", layout="stereo", rate=target_sample_rate)
for _ in decoders
]
while any(active):
for i, (dec, is_active) in enumerate(zip(decoders, active)):
if not is_active:
continue
try:
frame = next(dec)
except StopIteration:
active[i] = False
# Signal end of stream to filter graph
inputs[i].push(None)
continue
if frame.sample_rate != target_sample_rate:
continue
out_frames = resamplers[i].resample(frame) or []
for rf in out_frames:
rf.sample_rate = target_sample_rate
rf.time_base = Fraction(1, target_sample_rate)
inputs[i].push(rf)
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
# Flush remaining frames from filter graph
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
finally:
# Cleanup all containers, even if processing failed
for c in containers:
if c is not None:
try:
c.close()
except Exception:
pass # Best effort cleanup

View File

@@ -1,186 +0,0 @@
"""
Audio track padding utilities.
Shared PyAV-based functions for extracting stream metadata and applying
silence padding to audio tracks. Used by both Hatchet workflows and Celery pipelines.
"""
import math
from fractions import Fraction
import av
from av.audio.resampler import AudioResampler
from reflector.utils.audio_constants import (
OPUS_DEFAULT_BIT_RATE,
OPUS_STANDARD_SAMPLE_RATE,
)
def extract_stream_start_time_from_container(
container,
track_idx: int,
logger=None,
) -> float:
"""Extract meeting-relative start time from WebM stream metadata.
Uses PyAV to read stream.start_time from WebM container.
More accurate than filename timestamps by ~209ms due to network/encoding delays.
Args:
container: PyAV container opened from audio file/URL
track_idx: Track index for logging context
logger: Optional logger instance (structlog or stdlib compatible)
Returns:
Start time in seconds (0.0 if extraction fails)
"""
start_time_seconds = 0.0
try:
audio_streams = [s for s in container.streams if s.type == "audio"]
stream = audio_streams[0] if audio_streams else container.streams[0]
# 1) Try stream-level start_time (most reliable for Daily.co tracks)
if stream.start_time is not None and stream.time_base is not None:
start_time_seconds = float(stream.start_time * stream.time_base)
# 2) Fallback to container-level start_time (in av.time_base units)
if (start_time_seconds <= 0) and (container.start_time is not None):
start_time_seconds = float(container.start_time * av.time_base)
# 3) Fallback to first packet DTS in stream.time_base
if start_time_seconds <= 0:
for packet in container.demux(stream):
if packet.dts is not None:
start_time_seconds = float(packet.dts * stream.time_base)
break
except Exception as e:
if logger:
logger.warning(
"PyAV metadata read failed; assuming 0 start_time",
track_idx=track_idx,
error=str(e),
)
start_time_seconds = 0.0
if logger:
logger.info(
f"Track {track_idx} stream metadata: start_time={start_time_seconds:.3f}s",
track_idx=track_idx,
)
return start_time_seconds
def apply_audio_padding_to_file(
in_container,
output_path: str,
start_time_seconds: float,
track_idx: int,
logger=None,
) -> None:
"""Apply silence padding to audio track using PyAV filter graph.
Uses adelay filter to prepend silence, aligning track to meeting start time.
Output is WebM/Opus format.
Args:
in_container: PyAV container opened from source audio
output_path: Path for output WebM file
start_time_seconds: Amount of silence to prepend (in seconds)
track_idx: Track index for logging context
logger: Optional logger instance (structlog or stdlib compatible)
Raises:
Exception: If no audio stream found or PyAV processing fails
"""
delay_ms = math.floor(start_time_seconds * 1000)
if logger:
logger.info(
f"Padding track {track_idx} with {delay_ms}ms delay using PyAV",
track_idx=track_idx,
delay_ms=delay_ms,
)
try:
with av.open(output_path, "w", format="webm") as out_container:
in_stream = next(
(s for s in in_container.streams if s.type == "audio"), None
)
if in_stream is None:
raise Exception("No audio stream in input")
out_stream = out_container.add_stream(
"libopus", rate=OPUS_STANDARD_SAMPLE_RATE
)
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
graph = av.filter.Graph()
abuf_args = (
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_fmt=s16:"
f"channel_layout=stereo"
)
src = graph.add("abuffer", args=abuf_args, name="src")
aresample_f = graph.add("aresample", args="async=1", name="ares")
# adelay requires one delay value per channel separated by '|'
delays_arg = f"{delay_ms}|{delay_ms}"
adelay_f = graph.add(
"adelay", args=f"delays={delays_arg}:all=1", name="delay"
)
sink = graph.add("abuffersink", name="sink")
src.link_to(aresample_f)
aresample_f.link_to(adelay_f)
adelay_f.link_to(sink)
graph.configure()
resampler = AudioResampler(
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
)
# Decode -> resample -> push through graph -> encode Opus
for frame in in_container.decode(in_stream):
out_frames = resampler.resample(frame) or []
for rframe in out_frames:
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
src.push(rframe)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush remaining frames from filter graph
src.push(None)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush encoder
for packet in out_stream.encode(None):
out_container.mux(packet)
except Exception as e:
if logger:
logger.error(
"PyAV padding failed for track",
track_idx=track_idx,
delay_ms=delay_ms,
error=str(e),
exc_info=True,
)
raise

View File

@@ -1,4 +0,0 @@
def assert_not_none[T](value: T | None, message: str = "Value is None") -> T:
if value is None:
raise ValueError(message)
return value

View File

@@ -2,17 +2,6 @@ from typing import Annotated, TypeVar
from pydantic import Field, TypeAdapter, constr
T_NotNone = TypeVar("T_NotNone")
def assert_not_none(
value: T_NotNone | None, message: str = "Value is None"
) -> T_NotNone:
if value is None:
raise ValueError(message)
return value
NonEmptyStringBase = constr(min_length=1, strip_whitespace=False)
NonEmptyString = Annotated[
NonEmptyStringBase,
@@ -34,18 +23,10 @@ def try_parse_non_empty_string(s: str) -> NonEmptyString | None:
return parse_non_empty_string(s)
T_Str = TypeVar("T_Str", bound=str)
T = TypeVar("T", bound=str)
def assert_equal(s1: T_Str, s2: T_Str) -> T_Str:
def assert_equal[T](s1: T, s2: T) -> T:
if s1 != s2:
raise ValueError(f"assert_equal: {s1} != {s2}")
return s1
def assert_non_none_and_non_empty(
value: str | None, error: str | None = None
) -> NonEmptyString:
return parse_non_empty_string(
assert_not_none(value, error or "Value is None"), error
)

View File

@@ -44,6 +44,7 @@ class Room(BaseModel):
ics_last_sync: Optional[datetime] = None
ics_last_etag: Optional[str] = None
platform: Platform
skip_consent: bool = False
class RoomDetails(Room):
@@ -90,6 +91,7 @@ class CreateRoom(BaseModel):
ics_fetch_interval: int = 300
ics_enabled: bool = False
platform: Platform
skip_consent: bool = False
class UpdateRoom(BaseModel):
@@ -108,6 +110,7 @@ class UpdateRoom(BaseModel):
ics_fetch_interval: Optional[int] = None
ics_enabled: Optional[bool] = None
platform: Optional[Platform] = None
skip_consent: Optional[bool] = None
class CreateRoomMeeting(BaseModel):
@@ -249,6 +252,7 @@ async def rooms_create(
ics_fetch_interval=room.ics_fetch_interval,
ics_enabled=room.ics_enabled,
platform=room.platform,
skip_consent=room.skip_consent,
)
@@ -567,10 +571,17 @@ async def rooms_join_meeting(
if meeting.platform == "daily" and user_id is not None:
client = create_platform_client(meeting.platform)
# Show Daily's built-in recording UI when:
# - local recording (user controls when to record), OR
# - cloud recording with consent disabled (skip_consent=True)
# Hide it when cloud recording with consent enabled (we show custom consent UI)
enable_recording_ui = meeting.recording_type == "local" or (
meeting.recording_type == "cloud" and room.skip_consent
)
token = await client.create_meeting_token(
meeting.room_name,
start_cloud_recording=meeting.recording_type == "cloud",
enable_recording_ui=meeting.recording_type == "local",
enable_recording_ui=enable_recording_ui,
user_id=user_id,
is_owner=user_id == room.user_id,
)

View File

@@ -50,5 +50,5 @@ async def transcript_process(
if isinstance(config, ProcessError):
raise HTTPException(status_code=500, detail=config.detail)
else:
await dispatch_transcript_processing(config)
dispatch_transcript_processing(config)
return ProcessStatus(status="ok")

View File

@@ -7,6 +7,12 @@ from reflector.settings import settings
logger = structlog.get_logger(__name__)
@shared_task(name="celery.ping")
def celery_ping():
"""Compatibility task for Celery 5.x - celery.ping was removed but monitoring tools still call it."""
return "pong"
@shared_task
def healthcheck_ping():
url = settings.HEALTHCHECK_URL

View File

@@ -24,7 +24,6 @@ from reflector.db.transcripts import (
SourceKind,
transcripts_controller,
)
from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
@@ -287,45 +286,6 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
# Start durable workflow if enabled (Hatchet) or room overrides it
durable_started = False
use_hatchet = settings.HATCHET_ENABLED or (room and room.use_hatchet)
if room and room.use_hatchet and not settings.HATCHET_ENABLED:
logger.info(
"Room forces Hatchet workflow",
room_id=room.id,
transcript_id=transcript.id,
)
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
@@ -610,12 +570,12 @@ async def process_meetings():
client = create_platform_client(meeting.platform)
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = room_sessions and any(
s.ended_at is None for s in room_sessions
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
logger_.info(
f"found {has_active_sessions} active sessions, had {has_had_sessions}"
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
)
if has_active_sessions:

View File

@@ -16,7 +16,6 @@ import threading
import redis.asyncio as redis
from fastapi import WebSocket
from reflector.events import subscribers_shutdown
from reflector.settings import settings
@@ -110,30 +109,29 @@ class WebsocketManager:
await socket.send_json(data)
_ws_manager_instance: WebsocketManager | None = None
_ws_manager_lock = threading.Lock()
def get_ws_manager() -> WebsocketManager:
"""Returns the WebsocketManager singleton instance."""
global _ws_manager_instance
if _ws_manager_instance is None:
with _ws_manager_lock:
if _ws_manager_instance is None:
pubsub_client = RedisPubSubManager(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
)
_ws_manager_instance = WebsocketManager(pubsub_client=pubsub_client)
return _ws_manager_instance
"""
Returns the WebsocketManager instance for managing websockets.
This function initializes and returns the WebsocketManager instance,
which is responsible for managing websockets and handling websocket
connections.
async def cleanup_ws_manager(_app=None) -> None:
"""Cleanup WebsocketManager singleton on shutdown."""
global _ws_manager_instance
if _ws_manager_instance is not None:
await _ws_manager_instance.pubsub_client.disconnect()
_ws_manager_instance = None
Returns:
WebsocketManager: The initialized WebsocketManager instance.
Raises:
ImportError: If the 'reflector.settings' module cannot be imported.
RedisConnectionError: If there is an error connecting to the Redis server.
"""
local = threading.local()
if hasattr(local, "ws_manager"):
return local.ws_manager
subscribers_shutdown.append(cleanup_ws_manager)
pubsub_client = RedisPubSubManager(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
)
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
local.ws_manager = ws_manager
return ws_manager

View File

@@ -3,8 +3,7 @@ from urllib.parse import urlparse
import httpx
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.db.transcripts import Transcript
from reflector.settings import settings
@@ -114,49 +113,6 @@ def get_zulip_message(transcript: Transcript, include_topics: bool):
return message
async def post_transcript_notification(transcript: Transcript) -> int | None:
"""Post or update transcript notification in Zulip.
Uses transcript.room_id directly (Hatchet flow).
Celery's pipeline_post_to_zulip uses recording→meeting→room path instead.
DUPLICATION NOTE: This function will stay when we use Celery no more, and Celery one will be removed.
"""
if not transcript.room_id:
return None
room = await rooms_controller.get_by_id(transcript.room_id)
if not room or not room.zulip_stream or not room.zulip_auto_post:
return None
message = get_zulip_message(transcript=transcript, include_topics=True)
message_updated = False
if transcript.zulip_message_id:
try:
await update_zulip_message(
transcript.zulip_message_id,
room.zulip_stream,
room.zulip_topic,
message,
)
message_updated = True
except Exception:
pass
if not message_updated:
response = await send_message_to_zulip(
room.zulip_stream, room.zulip_topic, message
)
message_id = response.get("id")
if message_id:
await transcripts_controller.update(
transcript, {"zulip_message_id": message_id}
)
return message_id
return transcript.zulip_message_id
def extract_domain(url: str) -> str:
return urlparse(url).netloc

View File

@@ -7,8 +7,6 @@ elif [ "${ENTRYPOINT}" = "worker" ]; then
uv run celery -A reflector.worker.app worker --loglevel=info
elif [ "${ENTRYPOINT}" = "beat" ]; then
uv run celery -A reflector.worker.app beat --loglevel=info
elif [ "${ENTRYPOINT}" = "hatchet-worker" ]; then
uv run python -m reflector.hatchet.run_workers
else
echo "Unknown command"
fi

View File

@@ -527,22 +527,6 @@ def fake_mp3_upload():
yield
@pytest.fixture(autouse=True)
def reset_hatchet_client():
"""Reset HatchetClientManager singleton before and after each test.
This ensures test isolation - each test starts with a fresh client state.
The fixture is autouse=True so it applies to all tests automatically.
"""
from reflector.hatchet.client import HatchetClientManager
# Reset before test
HatchetClientManager.reset()
yield
# Reset after test to clean up
HatchetClientManager.reset()
@pytest.fixture
async def fake_transcript_with_topics(tmpdir, client):
import shutil

View File

@@ -1,54 +0,0 @@
"""
Tests for HatchetClientManager error handling and validation.
Only tests that catch real bugs - not mock verification tests.
Note: The `reset_hatchet_client` fixture (autouse=True in conftest.py)
automatically resets the singleton before and after each test.
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@pytest.mark.asyncio
async def test_hatchet_client_can_replay_handles_exception():
"""Test can_replay returns False when status check fails.
Useful: Ensures network/API errors don't crash the system and
gracefully allow reprocessing when workflow state is unknown.
"""
from reflector.hatchet.client import HatchetClientManager
with patch("reflector.hatchet.client.settings") as mock_settings:
mock_settings.HATCHET_CLIENT_TOKEN = "test-token"
mock_settings.HATCHET_DEBUG = False
with patch("reflector.hatchet.client.Hatchet") as mock_hatchet_class:
mock_client = MagicMock()
mock_hatchet_class.return_value = mock_client
mock_client.runs.aio_get_status = AsyncMock(
side_effect=Exception("Network error")
)
can_replay = await HatchetClientManager.can_replay("workflow-123")
# Should return False on error (workflow might be gone)
assert can_replay is False
def test_hatchet_client_raises_without_token():
"""Test that get_client raises ValueError without token.
Useful: Catches if someone removes the token validation,
which would cause cryptic errors later.
"""
from reflector.hatchet.client import HatchetClientManager
with patch("reflector.hatchet.client.settings") as mock_settings:
mock_settings.HATCHET_CLIENT_TOKEN = None
with pytest.raises(ValueError, match="HATCHET_CLIENT_TOKEN must be set"):
HatchetClientManager.get_client()

View File

@@ -1,398 +0,0 @@
"""
Tests for Hatchet workflow dispatch and routing logic.
These tests verify:
1. Routing to Hatchet when HATCHET_ENABLED=True
2. Replay logic for failed workflows
3. Force flag to cancel and restart
4. Validation prevents concurrent workflows
"""
from unittest.mock import AsyncMock, patch
import pytest
from hatchet_sdk.clients.rest.exceptions import ApiException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.transcripts import Transcript
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_blocks_running_workflow():
"""Test that validation blocks reprocessing when workflow is running."""
from reflector.services.transcript_process import (
ValidationAlreadyScheduled,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="processing",
source_kind="room",
workflow_run_id="running-workflow-123",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.RUNNING
)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationAlreadyScheduled)
assert "running" in result.detail.lower()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_blocks_queued_workflow():
"""Test that validation blocks reprocessing when workflow is queued."""
from reflector.services.transcript_process import (
ValidationAlreadyScheduled,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="processing",
source_kind="room",
workflow_run_id="queued-workflow-123",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.QUEUED
)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationAlreadyScheduled)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_allows_failed_workflow():
"""Test that validation allows reprocessing when workflow has failed."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="error",
source_kind="room",
workflow_run_id="failed-workflow-123",
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.FAILED
)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationOk)
assert result.transcript_id == "test-transcript-id"
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_allows_completed_workflow():
"""Test that validation allows reprocessing when workflow has completed."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="ended",
source_kind="room",
workflow_run_id="completed-workflow-123",
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.COMPLETED
)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_allows_when_status_check_fails():
"""Test that validation allows reprocessing when status check fails (workflow might be gone)."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="error",
source_kind="room",
workflow_run_id="old-workflow-123",
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Status check fails (workflow might be deleted)
mock_hatchet.get_workflow_run_status = AsyncMock(
side_effect=ApiException("Workflow not found")
)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
# Should allow processing when we can't get status
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_skipped_when_no_workflow_id():
"""Test that Hatchet validation is skipped when transcript has no workflow_run_id."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="uploaded",
source_kind="room",
workflow_run_id=None, # No workflow yet
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Should not be called
mock_hatchet.get_workflow_run_status = AsyncMock()
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
# Should not check Hatchet status
mock_hatchet.get_workflow_run_status.assert_not_called()
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_skipped_when_disabled():
"""Test that Hatchet validation is skipped when HATCHET_ENABLED is False."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="uploaded",
source_kind="room",
workflow_run_id="some-workflow-123",
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = False # Hatchet disabled
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
# Should not check Hatchet at all
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_validation_locked_transcript():
"""Test that validation rejects locked transcripts."""
from reflector.services.transcript_process import (
ValidationLocked,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="ended",
source_kind="room",
locked=True,
)
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationLocked)
assert "locked" in result.detail.lower()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_validation_idle_transcript():
"""Test that validation rejects idle transcripts (not ready)."""
from reflector.services.transcript_process import (
ValidationNotReady,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="idle",
source_kind="room",
)
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationNotReady)
assert "not ready" in result.detail.lower()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_prepare_multitrack_config():
"""Test preparing multitrack processing config."""
from reflector.db.recordings import Recording
from reflector.services.transcript_process import (
MultitrackProcessingConfig,
ValidationOk,
prepare_transcript_processing,
)
validation = ValidationOk(
recording_id="test-recording-id",
transcript_id="test-transcript-id",
)
mock_recording = Recording(
id="test-recording-id",
bucket_name="test-bucket",
object_key="recordings/test",
recorded_at="2024-01-01T00:00:00Z",
track_keys=["track1.webm", "track2.webm"],
)
with patch(
"reflector.services.transcript_process.recordings_controller"
) as mock_rc:
mock_rc.get_by_id = AsyncMock(return_value=mock_recording)
result = await prepare_transcript_processing(validation)
assert isinstance(result, MultitrackProcessingConfig)
assert result.bucket_name == "test-bucket"
assert result.track_keys == ["track1.webm", "track2.webm"]
assert result.transcript_id == "test-transcript-id"
assert result.room_id is None # ValidationOk didn't specify room_id
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_prepare_file_config():
"""Test preparing file processing config (no track keys)."""
from reflector.db.recordings import Recording
from reflector.services.transcript_process import (
FileProcessingConfig,
ValidationOk,
prepare_transcript_processing,
)
validation = ValidationOk(
recording_id="test-recording-id",
transcript_id="test-transcript-id",
)
mock_recording = Recording(
id="test-recording-id",
bucket_name="test-bucket",
object_key="recordings/test.mp4",
recorded_at="2024-01-01T00:00:00Z",
track_keys=None, # No track keys = file pipeline
)
with patch(
"reflector.services.transcript_process.recordings_controller"
) as mock_rc:
mock_rc.get_by_id = AsyncMock(return_value=mock_recording)
result = await prepare_transcript_processing(validation)
assert isinstance(result, FileProcessingConfig)
assert result.transcript_id == "test-transcript-id"

3416
server/uv.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -91,6 +91,7 @@ const roomInitialState = {
icsEnabled: false,
icsFetchInterval: 5,
platform: "whereby",
skipConsent: false,
};
export default function RoomsList() {
@@ -175,6 +176,7 @@ export default function RoomsList() {
icsEnabled: detailedEditedRoom.ics_enabled || false,
icsFetchInterval: detailedEditedRoom.ics_fetch_interval || 5,
platform: detailedEditedRoom.platform,
skipConsent: detailedEditedRoom.skip_consent || false,
}
: null,
[detailedEditedRoom],
@@ -326,6 +328,7 @@ export default function RoomsList() {
ics_enabled: room.icsEnabled,
ics_fetch_interval: room.icsFetchInterval,
platform,
skip_consent: room.skipConsent,
};
if (isEditing) {
@@ -388,6 +391,7 @@ export default function RoomsList() {
icsEnabled: roomData.ics_enabled || false,
icsFetchInterval: roomData.ics_fetch_interval || 5,
platform: roomData.platform,
skipConsent: roomData.skip_consent || false,
});
setEditRoomId(roomId);
setIsEditing(true);
@@ -796,6 +800,34 @@ export default function RoomsList() {
<Checkbox.Label>Shared room</Checkbox.Label>
</Checkbox.Root>
</Field.Root>
{room.recordingType === "cloud" && (
<Field.Root mt={4}>
<Checkbox.Root
name="skipConsent"
checked={room.skipConsent}
onCheckedChange={(e) => {
const syntheticEvent = {
target: {
name: "skipConsent",
type: "checkbox",
checked: e.checked,
},
};
handleRoomChange(syntheticEvent);
}}
>
<Checkbox.HiddenInput />
<Checkbox.Control>
<Checkbox.Indicator />
</Checkbox.Control>
<Checkbox.Label>Skip consent dialog</Checkbox.Label>
</Checkbox.Root>
<Field.HelperText>
When enabled, participants won't be asked for
recording consent. Audio will be stored automatically.
</Field.HelperText>
</Field.Root>
)}
</Tabs.Content>
<Tabs.Content value="share" pt={6}>

View File

@@ -2,19 +2,13 @@
import { Spinner, Link } from "@chakra-ui/react";
import { useAuth } from "../lib/AuthProvider";
import { usePathname } from "next/navigation";
import { getLogoutRedirectUrl } from "../lib/auth";
export default function UserInfo() {
const auth = useAuth();
const pathname = usePathname();
const status = auth.status;
const isLoading = status === "loading";
const isAuthenticated = status === "authenticated";
const isRefreshing = status === "refreshing";
const callbackUrl = getLogoutRedirectUrl(pathname);
return isLoading ? (
<Spinner size="xs" className="mx-3" />
) : !isAuthenticated && !isRefreshing ? (
@@ -32,7 +26,7 @@ export default function UserInfo() {
<Link
href="#"
className="font-light px-2"
onClick={() => auth.signOut({ callbackUrl })}
onClick={() => auth.signOut({ callbackUrl: "/" })}
>
Log out
</Link>

View File

@@ -26,6 +26,7 @@ import { useRouter } from "next/navigation";
import { formatDateTime, formatStartedAgo } from "../lib/timeUtils";
import MeetingMinimalHeader from "../components/MeetingMinimalHeader";
import { NonEmptyString } from "../lib/utils";
import { MeetingId } from "../lib/types";
type Meeting = components["schemas"]["Meeting"];
@@ -98,7 +99,7 @@ export default function MeetingSelection({
onMeetingSelect(meeting);
};
const handleEndMeeting = async (meetingId: string) => {
const handleEndMeeting = async (meetingId: MeetingId) => {
try {
await deactivateMeetingMutation.mutateAsync({
params: {

View File

@@ -1,35 +1,194 @@
"use client";
import { useCallback, useEffect, useRef, useState } from "react";
import {
RefObject,
useCallback,
useEffect,
useMemo,
useRef,
useState,
} from "react";
import { Box, Spinner, Center, Text } from "@chakra-ui/react";
import { useRouter, useParams } from "next/navigation";
import DailyIframe, { DailyCall } from "@daily-co/daily-js";
import DailyIframe, {
DailyCall,
DailyCallOptions,
DailyCustomTrayButton,
DailyCustomTrayButtons,
DailyEventObjectCustomButtonClick,
DailyFactoryOptions,
DailyParticipantsObject,
} from "@daily-co/daily-js";
import type { components } from "../../reflector-api";
import { useAuth } from "../../lib/AuthProvider";
import {
ConsentDialogButton,
recordingTypeRequiresConsent,
} from "../../lib/consent";
import { useConsentDialog } from "../../lib/consent";
import { useRoomJoinMeeting } from "../../lib/apiHooks";
import { omit } from "remeda";
import { assertExists } from "../../lib/utils";
import { assertMeetingId } from "../../lib/types";
const CONSENT_BUTTON_ID = "recording-consent";
const RECORDING_INDICATOR_ID = "recording-indicator";
type Meeting = components["schemas"]["Meeting"];
type Room = components["schemas"]["RoomDetails"];
interface DailyRoomProps {
type DailyRoomProps = {
meeting: Meeting;
}
room: Room;
};
export default function DailyRoom({ meeting }: DailyRoomProps) {
const useCustomTrayButtons = (
frame: {
updateCustomTrayButtons: (
customTrayButtons: DailyCustomTrayButtons,
) => void;
joined: boolean;
} | null,
) => {
const [, setCustomTrayButtons] = useState<DailyCustomTrayButtons>({});
return useCallback(
(id: string, button: DailyCustomTrayButton | null) => {
setCustomTrayButtons((prev) => {
// would blink state when frame blinks but it's ok here
const state =
button === null ? omit(prev, [id]) : { ...prev, [id]: button };
if (frame !== null && frame.joined)
frame.updateCustomTrayButtons(state);
return state;
});
},
[setCustomTrayButtons, frame],
);
};
const USE_FRAME_INIT_STATE = {
frame: null as DailyCall | null,
joined: false as boolean,
} as const;
// Daily js and not Daily react used right now because daily-js allows for prebuild interface vs. -react is customizable but has no nice defaults
const useFrame = (
container: HTMLDivElement | null,
cbs: {
onLeftMeeting: () => void;
onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void;
onJoinMeeting: (
startRecording: (args: { type: "raw-tracks" }) => void,
) => void;
},
) => {
const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE);
const setJoined = useCallback(
(joined: boolean) => setState((prev) => ({ ...prev, joined })),
[setState],
);
const setFrame = useCallback(
(frame: DailyCall | null) => setState((prev) => ({ ...prev, frame })),
[setState],
);
useEffect(() => {
if (!container) return;
const init = async () => {
const existingFrame = DailyIframe.getCallInstance();
if (existingFrame) {
console.error("existing daily frame present");
await existingFrame.destroy();
}
const frameOptions: DailyFactoryOptions = {
iframeStyle: {
width: "100vw",
height: "100vh",
border: "none",
},
showLeaveButton: true,
showFullscreenButton: true,
};
const frame = DailyIframe.createFrame(container, frameOptions);
setFrame(frame);
};
init().catch(
console.error.bind(console, "Failed to initialize daily frame:"),
);
return () => {
frame
?.destroy()
.catch(console.error.bind(console, "Failed to destroy daily frame:"));
setState(USE_FRAME_INIT_STATE);
};
}, [container]);
useEffect(() => {
if (!frame) return;
frame.on("left-meeting", cbs.onLeftMeeting);
frame.on("custom-button-click", cbs.onCustomButtonClick);
const joinCb = () => {
if (!frame) {
console.error("frame is null in joined-meeting callback");
return;
}
cbs.onJoinMeeting(frame.startRecording.bind(frame));
};
frame.on("joined-meeting", joinCb);
return () => {
frame.off("left-meeting", cbs.onLeftMeeting);
frame.off("custom-button-click", cbs.onCustomButtonClick);
frame.off("joined-meeting", joinCb);
};
}, [frame, cbs]);
const frame_ = useMemo(() => {
if (frame === null) return frame;
return {
join: async (
properties?: DailyCallOptions,
): Promise<DailyParticipantsObject | void> => {
await frame.join(properties);
setJoined(!frame.isDestroyed());
},
updateCustomTrayButtons: (
customTrayButtons: DailyCustomTrayButtons,
): DailyCall => frame.updateCustomTrayButtons(customTrayButtons),
};
}, [frame]);
const setCustomTrayButton = useCustomTrayButtons(
useMemo(() => {
if (frame_ === null) return null;
return {
updateCustomTrayButtons: frame_.updateCustomTrayButtons,
joined,
};
}, [frame_, joined]),
);
return [
frame_,
{
setCustomTrayButton,
},
] as const;
};
export default function DailyRoom({ meeting, room }: DailyRoomProps) {
const router = useRouter();
const params = useParams();
const auth = useAuth();
const authLastUserId = auth.lastUserId;
const containerRef = useRef<HTMLDivElement>(null);
const [container, setContainer] = useState<HTMLDivElement | null>(null);
const joinMutation = useRoomJoinMeeting();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
const roomName = params?.roomName as string;
const {
showConsentModal,
showRecordingIndicator: showRecordingInTray,
showConsentButton,
} = useConsentDialog({
meetingId: assertMeetingId(meeting.id),
recordingType: meeting.recording_type,
skipConsent: room.skip_consent,
});
const showConsentModalRef = useRef(showConsentModal);
showConsentModalRef.current = showConsentModal;
useEffect(() => {
if (authLastUserId === undefined || !meeting?.id || !roomName) return;
@@ -49,7 +208,7 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
}
};
join();
join().catch(console.error.bind(console, "Failed to join meeting:"));
}, [meeting?.id, roomName, authLastUserId]);
const roomUrl = joinedMeeting?.room_url;
@@ -58,84 +217,86 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
router.push("/browse");
}, [router]);
useEffect(() => {
if (authLastUserId === undefined || !roomUrl || !containerRef.current)
return;
const handleCustomButtonClick = useCallback(
(ev: DailyEventObjectCustomButtonClick) => {
if (ev.button_id === CONSENT_BUTTON_ID) {
showConsentModalRef.current();
}
},
[
/*keep static; iframe recreation depends on it*/
],
);
let frame: DailyCall | null = null;
let destroyed = false;
const createAndJoin = async () => {
const handleFrameJoinMeeting = useCallback(
(startRecording: (args: { type: "raw-tracks" }) => void) => {
try {
const existingFrame = DailyIframe.getCallInstance();
if (existingFrame) {
await existingFrame.destroy();
if (meeting.recording_type === "cloud") {
console.log("Starting cloud recording");
startRecording({ type: "raw-tracks" });
}
frame = DailyIframe.createFrame(containerRef.current!, {
iframeStyle: {
width: "100vw",
height: "100vh",
border: "none",
},
showLeaveButton: true,
showFullscreenButton: true,
});
if (destroyed) {
await frame.destroy();
return;
}
frame.on("left-meeting", handleLeave);
frame.on("joined-meeting", async () => {
try {
const frameInstance = assertExists(
frame,
"frame object got lost somewhere after frame.on was called",
);
if (meeting.recording_type === "cloud") {
console.log("Starting cloud recording");
await frameInstance.startRecording({ type: "raw-tracks" });
}
} catch (error) {
console.error("Failed to start recording:", error);
}
});
await frame.join({
url: roomUrl,
sendSettings: {
video: {
// Optimize bandwidth for camera video
// allowAdaptiveLayers automatically adjusts quality based on network conditions
allowAdaptiveLayers: true,
// Use bandwidth-optimized preset as fallback for browsers without adaptive support
maxQuality: "medium",
},
// Note: screenVideo intentionally not configured to preserve full quality for screen shares
},
});
} catch (error) {
console.error("Error creating Daily frame:", error);
console.error("Failed to start recording:", error);
}
};
},
[meeting.recording_type],
);
createAndJoin().catch((error) => {
console.error("Failed to create and join meeting:", error);
});
const recordingIconUrl = useMemo(
() => new URL("/recording-icon.svg", window.location.origin),
[],
);
return () => {
destroyed = true;
if (frame) {
frame.destroy().catch((e) => {
console.error("Error destroying frame:", e);
});
}
};
}, [roomUrl, authLastUserId, handleLeave]);
const [frame, { setCustomTrayButton }] = useFrame(container, {
onLeftMeeting: handleLeave,
onCustomButtonClick: handleCustomButtonClick,
onJoinMeeting: handleFrameJoinMeeting,
});
useEffect(() => {
if (!frame || !roomUrl) return;
frame
.join({
url: roomUrl,
sendSettings: {
video: {
// Optimize bandwidth for camera video
// allowAdaptiveLayers automatically adjusts quality based on network conditions
allowAdaptiveLayers: true,
// Use bandwidth-optimized preset as fallback for browsers without adaptive support
maxQuality: "medium",
},
// Note: screenVideo intentionally not configured to preserve full quality for screen shares
},
})
.catch(console.error.bind(console, "Failed to join daily room:"));
}, [frame, roomUrl]);
useEffect(() => {
setCustomTrayButton(
RECORDING_INDICATOR_ID,
showRecordingInTray
? {
iconPath: recordingIconUrl.href,
label: "Recording",
tooltip: "Recording in progress",
}
: null,
);
}, [showRecordingInTray, recordingIconUrl, setCustomTrayButton]);
useEffect(() => {
setCustomTrayButton(
CONSENT_BUTTON_ID,
showConsentButton
? {
iconPath: recordingIconUrl.href,
label: "Recording (click to consent)",
tooltip: "Recording (click to consent)",
}
: null,
);
}, [showConsentButton, recordingIconUrl, setCustomTrayButton]);
if (authLastUserId === undefined) {
return (
@@ -159,10 +320,7 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
return (
<Box position="relative" width="100vw" height="100vh">
<div ref={containerRef} style={{ width: "100%", height: "100%" }} />
{meeting.recording_type &&
recordingTypeRequiresConsent(meeting.recording_type) &&
meeting.id && <ConsentDialogButton meetingId={meeting.id} />}
<div ref={setContainer} style={{ width: "100%", height: "100%" }} />
</Box>
);
}

View File

@@ -18,6 +18,7 @@ import { useAuth } from "../../lib/AuthProvider";
import { useError } from "../../(errors)/errorContext";
import { parseNonEmptyString } from "../../lib/utils";
import { printApiError } from "../../api/_error";
import { assertMeetingId } from "../../lib/types";
type Meeting = components["schemas"]["Meeting"];
@@ -67,7 +68,10 @@ export default function RoomContainer(details: RoomDetails) {
room && !room.ics_enabled && !pageMeetingId ? roomName : null,
);
const explicitMeeting = useRoomGetMeeting(roomName, pageMeetingId || null);
const explicitMeeting = useRoomGetMeeting(
roomName,
pageMeetingId ? assertMeetingId(pageMeetingId) : null,
);
const meeting = explicitMeeting.data || defaultMeeting.response;
@@ -192,9 +196,9 @@ export default function RoomContainer(details: RoomDetails) {
switch (platform) {
case "daily":
return <DailyRoom meeting={meeting} />;
return <DailyRoom meeting={meeting} room={room} />;
case "whereby":
return <WherebyRoom meeting={meeting} />;
return <WherebyRoom meeting={meeting} room={room} />;
default: {
const _exhaustive: never = platform;
return (

View File

@@ -5,24 +5,29 @@ import { useRouter } from "next/navigation";
import type { components } from "../../reflector-api";
import { useAuth } from "../../lib/AuthProvider";
import { getWherebyUrl, useWhereby } from "../../lib/wherebyClient";
import { assertExistsAndNonEmptyString, NonEmptyString } from "../../lib/utils";
import {
ConsentDialogButton as BaseConsentDialogButton,
useConsentDialog,
recordingTypeRequiresConsent,
} from "../../lib/consent";
import { assertMeetingId, MeetingId } from "../../lib/types";
type Meeting = components["schemas"]["Meeting"];
type Room = components["schemas"]["RoomDetails"];
interface WherebyRoomProps {
meeting: Meeting;
room: Room;
}
function WherebyConsentDialogButton({
meetingId,
recordingType,
skipConsent,
wherebyRef,
}: {
meetingId: NonEmptyString;
meetingId: MeetingId;
recordingType: Meeting["recording_type"];
skipConsent: boolean;
wherebyRef: React.RefObject<HTMLElement>;
}) {
const previousFocusRef = useRef<HTMLElement | null>(null);
@@ -45,10 +50,16 @@ function WherebyConsentDialogButton({
};
}, [wherebyRef]);
return <BaseConsentDialogButton meetingId={meetingId} />;
return (
<BaseConsentDialogButton
meetingId={meetingId}
recordingType={recordingType}
skipConsent={skipConsent}
/>
);
}
export default function WherebyRoom({ meeting }: WherebyRoomProps) {
export default function WherebyRoom({ meeting, room }: WherebyRoomProps) {
const wherebyLoaded = useWhereby();
const wherebyRef = useRef<HTMLElement>(null);
const router = useRouter();
@@ -57,9 +68,14 @@ export default function WherebyRoom({ meeting }: WherebyRoomProps) {
const isAuthenticated = status === "authenticated";
const wherebyRoomUrl = getWherebyUrl(meeting);
const recordingType = meeting.recording_type;
const meetingId = meeting.id;
const { showConsentButton } = useConsentDialog({
meetingId: assertMeetingId(meetingId),
recordingType: meeting.recording_type,
skipConsent: room.skip_consent,
});
const isLoading = status === "loading";
const handleLeave = useCallback(() => {
@@ -88,14 +104,14 @@ export default function WherebyRoom({ meeting }: WherebyRoomProps) {
room={wherebyRoomUrl}
style={{ width: "100vw", height: "100vh" }}
/>
{recordingType &&
recordingTypeRequiresConsent(recordingType) &&
meetingId && (
<WherebyConsentDialogButton
meetingId={assertExistsAndNonEmptyString(meetingId)}
wherebyRef={wherebyRef}
/>
)}
{showConsentButton && (
<WherebyConsentDialogButton
meetingId={assertMeetingId(meetingId)}
recordingType={meeting.recording_type}
skipConsent={room.skip_consent}
wherebyRef={wherebyRef}
/>
)}
</>
);
}

View File

@@ -6,7 +6,6 @@ import {
useEffect,
useRef,
useState,
useContext,
RefObject,
use,
} from "react";
@@ -25,8 +24,6 @@ import { useRecordingConsent } from "../recordingConsentContext";
import {
useMeetingAudioConsent,
useRoomGetByName,
useRoomActiveMeetings,
useRoomUpcomingMeetings,
useRoomsCreateMeeting,
useRoomGetMeeting,
} from "../lib/apiHooks";
@@ -39,12 +36,9 @@ import { FaBars } from "react-icons/fa6";
import { useAuth } from "../lib/AuthProvider";
import { getWherebyUrl, useWhereby } from "../lib/wherebyClient";
import { useError } from "../(errors)/errorContext";
import {
assertExistsAndNonEmptyString,
NonEmptyString,
parseNonEmptyString,
} from "../lib/utils";
import { parseNonEmptyString } from "../lib/utils";
import { printApiError } from "../api/_error";
import { assertMeetingId, MeetingId } from "../lib/types";
export type RoomDetails = {
params: Promise<{
@@ -92,16 +86,16 @@ const useConsentWherebyFocusManagement = (
};
const useConsentDialog = (
meetingId: string,
meetingId: MeetingId,
wherebyRef: RefObject<HTMLElement> /*accessibility*/,
) => {
const { state: consentState, touch, hasConsent } = useRecordingConsent();
const { state: consentState, touch, hasAnswered } = useRecordingConsent();
// toast would open duplicates, even with using "id=" prop
const [modalOpen, setModalOpen] = useState(false);
const audioConsentMutation = useMeetingAudioConsent();
const handleConsent = useCallback(
async (meetingId: string, given: boolean) => {
async (meetingId: MeetingId, given: boolean) => {
try {
await audioConsentMutation.mutateAsync({
params: {
@@ -114,7 +108,7 @@ const useConsentDialog = (
},
});
touch(meetingId);
touch(meetingId, given);
} catch (error) {
console.error("Error submitting consent:", error);
}
@@ -216,7 +210,7 @@ const useConsentDialog = (
return {
showConsentModal,
consentState,
hasConsent,
hasAnswered,
consentLoading: audioConsentMutation.isPending,
};
};
@@ -225,13 +219,13 @@ function ConsentDialogButton({
meetingId,
wherebyRef,
}: {
meetingId: NonEmptyString;
meetingId: MeetingId;
wherebyRef: React.RefObject<HTMLElement>;
}) {
const { showConsentModal, consentState, hasConsent, consentLoading } =
const { showConsentModal, consentState, hasAnswered, consentLoading } =
useConsentDialog(meetingId, wherebyRef);
if (!consentState.ready || hasConsent(meetingId) || consentLoading) {
if (!consentState.ready || hasAnswered(meetingId) || consentLoading) {
return null;
}
@@ -284,7 +278,10 @@ export default function Room(details: RoomDetails) {
room && !room.ics_enabled && !pageMeetingId ? roomName : null,
);
const explicitMeeting = useRoomGetMeeting(roomName, pageMeetingId || null);
const explicitMeeting = useRoomGetMeeting(
roomName,
pageMeetingId ? assertMeetingId(pageMeetingId) : null,
);
const wherebyRoomUrl = explicitMeeting.data
? getWherebyUrl(explicitMeeting.data)
: defaultMeeting.response
@@ -437,7 +434,7 @@ export default function Room(details: RoomDetails) {
recordingTypeRequiresConsent(recordingType) &&
meetingId && (
<ConsentDialogButton
meetingId={assertExistsAndNonEmptyString(meetingId)}
meetingId={assertMeetingId(meetingId)}
wherebyRef={wherebyRef}
/>
)}

View File

@@ -5,6 +5,7 @@ import { useError } from "../(errors)/errorContext";
import { QueryClient, useQueryClient } from "@tanstack/react-query";
import type { components } from "../reflector-api";
import { useAuth } from "./AuthProvider";
import { MeetingId } from "./types";
/*
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
@@ -718,7 +719,7 @@ export function useRoomActiveMeetings(roomName: string | null) {
export function useRoomGetMeeting(
roomName: string | null,
meetingId: string | null,
meetingId: MeetingId | null,
) {
return $api.useQuery(
"get",

View File

@@ -18,8 +18,3 @@ export const LOGIN_REQUIRED_PAGES = [
export const PROTECTED_PAGES = new RegExp(
LOGIN_REQUIRED_PAGES.map((page) => `^${page}$`).join("|"),
);
export function getLogoutRedirectUrl(pathname: string): string {
const transcriptPagePattern = /^\/transcripts\/[^/]+$/;
return transcriptPagePattern.test(pathname) ? pathname : "/";
}

View File

@@ -1,5 +1,6 @@
"use client";
import { useState, useEffect } from "react";
import { Box, Button, Text, VStack, HStack } from "@chakra-ui/react";
import { CONSENT_DIALOG_TEXT } from "./constants";
@@ -9,6 +10,15 @@ interface ConsentDialogProps {
}
export function ConsentDialog({ onAccept, onReject }: ConsentDialogProps) {
const [acceptButton, setAcceptButton] = useState<HTMLButtonElement | null>(
null,
);
useEffect(() => {
// Auto-focus accept button so Escape key works (Daily iframe captures keyboard otherwise)
acceptButton?.focus();
}, [acceptButton]);
return (
<Box
p={6}
@@ -26,7 +36,12 @@ export function ConsentDialog({ onAccept, onReject }: ConsentDialogProps) {
<Button variant="ghost" size="sm" onClick={onReject}>
{CONSENT_DIALOG_TEXT.rejectButton}
</Button>
<Button colorPalette="primary" size="sm" onClick={onAccept}>
<Button
ref={setAcceptButton}
colorPalette="primary"
size="sm"
onClick={onAccept}
>
{CONSENT_DIALOG_TEXT.acceptButton}
</Button>
</HStack>

View File

@@ -9,16 +9,26 @@ import {
CONSENT_BUTTON_Z_INDEX,
CONSENT_DIALOG_TEXT,
} from "./constants";
import { MeetingId } from "../types";
import type { components } from "../../reflector-api";
interface ConsentDialogButtonProps {
meetingId: string;
}
type Meeting = components["schemas"]["Meeting"];
export function ConsentDialogButton({ meetingId }: ConsentDialogButtonProps) {
const { showConsentModal, consentState, hasConsent, consentLoading } =
useConsentDialog(meetingId);
type ConsentDialogButtonProps = {
meetingId: MeetingId;
recordingType: Meeting["recording_type"];
skipConsent: boolean;
};
if (!consentState.ready || hasConsent(meetingId) || consentLoading) {
export function ConsentDialogButton({
meetingId,
recordingType,
skipConsent,
}: ConsentDialogButtonProps) {
const { showConsentModal, consentState, showConsentButton, consentLoading } =
useConsentDialog({ meetingId, recordingType, skipConsent });
if (!consentState.ready || !showConsentButton || consentLoading) {
return null;
}

View File

@@ -0,0 +1,33 @@
"use client";
import { Box, Text } from "@chakra-ui/react";
import { FaCircle } from "react-icons/fa6";
import {
CONSENT_BUTTON_TOP_OFFSET,
CONSENT_BUTTON_LEFT_OFFSET,
CONSENT_BUTTON_Z_INDEX,
} from "./constants";
export function RecordingIndicator() {
return (
<Box
position="absolute"
top={CONSENT_BUTTON_TOP_OFFSET}
left={CONSENT_BUTTON_LEFT_OFFSET}
zIndex={CONSENT_BUTTON_Z_INDEX}
display="flex"
alignItems="center"
gap={2}
bg="red.500"
color="white"
px={3}
py={1.5}
borderRadius="md"
fontSize="sm"
fontWeight="medium"
>
<FaCircle size={8} />
<Text>Recording</Text>
</Box>
);
}

View File

@@ -2,6 +2,7 @@
export { ConsentDialogButton } from "./ConsentDialogButton";
export { ConsentDialog } from "./ConsentDialog";
export { RecordingIndicator } from "./RecordingIndicator";
export { useConsentDialog } from "./useConsentDialog";
export { recordingTypeRequiresConsent } from "./utils";
export * from "./constants";

View File

@@ -1,9 +1,14 @@
export interface ConsentDialogResult {
import { MeetingId } from "../types";
export type ConsentDialogResult = {
showConsentModal: () => void;
consentState: {
ready: boolean;
consentAnsweredForMeetings?: Set<string>;
consentForMeetings?: Map<MeetingId, boolean>;
};
hasConsent: (meetingId: string) => boolean;
hasAnswered: (meetingId: MeetingId) => boolean;
hasAccepted: (meetingId: MeetingId) => boolean;
consentLoading: boolean;
}
showRecordingIndicator: boolean;
showConsentButton: boolean;
};

View File

@@ -7,9 +7,29 @@ import { useMeetingAudioConsent } from "../apiHooks";
import { ConsentDialog } from "./ConsentDialog";
import { TOAST_CHECK_INTERVAL_MS } from "./constants";
import type { ConsentDialogResult } from "./types";
import { MeetingId } from "../types";
import { recordingTypeRequiresConsent } from "./utils";
import type { components } from "../../reflector-api";
export function useConsentDialog(meetingId: string): ConsentDialogResult {
const { state: consentState, touch, hasConsent } = useRecordingConsent();
type Meeting = components["schemas"]["Meeting"];
type UseConsentDialogParams = {
meetingId: MeetingId;
recordingType: Meeting["recording_type"];
skipConsent: boolean;
};
export function useConsentDialog({
meetingId,
recordingType,
skipConsent,
}: UseConsentDialogParams): ConsentDialogResult {
const {
state: consentState,
touch,
hasAnswered,
hasAccepted,
} = useRecordingConsent();
const [modalOpen, setModalOpen] = useState(false);
const audioConsentMutation = useMeetingAudioConsent();
const intervalRef = useRef<NodeJS.Timeout | null>(null);
@@ -42,7 +62,7 @@ export function useConsentDialog(meetingId: string): ConsentDialogResult {
},
});
touch(meetingId);
touch(meetingId, given);
} catch (error) {
console.error("Error submitting consent:", error);
}
@@ -100,10 +120,23 @@ export function useConsentDialog(meetingId: string): ConsentDialogResult {
});
}, [handleConsent, modalOpen]);
const requiresConsent = Boolean(
recordingType && recordingTypeRequiresConsent(recordingType),
);
const showRecordingIndicator =
requiresConsent && (skipConsent || hasAccepted(meetingId));
const showConsentButton =
requiresConsent && !skipConsent && !hasAnswered(meetingId);
return {
showConsentModal,
consentState,
hasConsent,
hasAnswered,
hasAccepted,
consentLoading: audioConsentMutation.isPending,
showRecordingIndicator,
showConsentButton,
};
}

View File

@@ -1,6 +1,10 @@
import type { Session } from "next-auth";
import type { JWT } from "next-auth/jwt";
import { parseMaybeNonEmptyString } from "./utils";
import {
assertExistsAndNonEmptyString,
NonEmptyString,
parseMaybeNonEmptyString,
} from "./utils";
export interface JWTWithAccessToken extends JWT {
accessToken: string;
@@ -78,3 +82,10 @@ export const assertCustomSession = <T extends Session>(
export type Mutable<T> = {
-readonly [P in keyof T]: T[P];
};
export type MeetingId = NonEmptyString & { __type: "MeetingId" };
export const assertMeetingId = (s: string): MeetingId => {
const nes = assertExistsAndNonEmptyString(s);
// just cast for now
return nes as MeetingId;
};

View File

@@ -1,18 +1,22 @@
"use client";
import React, { createContext, useContext, useEffect, useState } from "react";
import { MeetingId } from "./lib/types";
type ConsentMap = Map<MeetingId, boolean>;
type ConsentContextState =
| { ready: false }
| {
ready: true;
consentAnsweredForMeetings: Set<string>;
consentForMeetings: ConsentMap;
};
interface RecordingConsentContextValue {
state: ConsentContextState;
touch: (meetingId: string) => void;
hasConsent: (meetingId: string) => boolean;
touch: (meetingId: MeetingId, accepted: boolean) => void;
hasAnswered: (meetingId: MeetingId) => boolean;
hasAccepted: (meetingId: MeetingId) => boolean;
}
const RecordingConsentContext = createContext<
@@ -35,81 +39,116 @@ interface RecordingConsentProviderProps {
const LOCAL_STORAGE_KEY = "recording_consent_meetings";
const ACCEPTED = "T" as const;
type Accepted = typeof ACCEPTED;
const REJECTED = "F" as const;
type Rejected = typeof REJECTED;
type Consent = Accepted | Rejected;
const SEPARATOR = "|" as const;
type Separator = typeof SEPARATOR;
const DEFAULT_CONSENT = ACCEPTED;
type Entry = `${MeetingId}${Separator}${Consent}`;
type EntryAndDefault = Entry | MeetingId;
// Format: "meetingId|T" or "meetingId|F", legacy format "meetingId" is treated as accepted
const encodeEntry = (meetingId: MeetingId, accepted: boolean): Entry =>
`${meetingId}|${accepted ? ACCEPTED : REJECTED}`;
const decodeEntry = (
entry: EntryAndDefault,
): { meetingId: MeetingId; accepted: boolean } | null => {
const pipeIndex = entry.lastIndexOf(SEPARATOR);
if (pipeIndex === -1) {
// Legacy format: no pipe means accepted (backward compat)
return { meetingId: entry as MeetingId, accepted: true };
}
const suffix = entry.slice(pipeIndex + 1);
const meetingId = entry.slice(0, pipeIndex) as MeetingId;
// T = accepted, F = rejected, anything else = accepted (safe default)
const accepted = suffix !== REJECTED;
return { meetingId, accepted };
};
export const RecordingConsentProvider: React.FC<
RecordingConsentProviderProps
> = ({ children }) => {
const [state, setState] = useState<ConsentContextState>({ ready: false });
const safeWriteToStorage = (meetingIds: string[]): void => {
const safeWriteToStorage = (consentMap: ConsentMap): void => {
try {
if (typeof window !== "undefined" && window.localStorage) {
localStorage.setItem(LOCAL_STORAGE_KEY, JSON.stringify(meetingIds));
const entries = Array.from(consentMap.entries())
.slice(-5)
.map(([id, accepted]) => encodeEntry(id, accepted));
localStorage.setItem(LOCAL_STORAGE_KEY, JSON.stringify(entries));
}
} catch (error) {
console.error("Failed to save consent data to localStorage:", error);
}
};
// writes to local storage and to the state of context both
const touch = (meetingId: string): void => {
const touch = (meetingId: MeetingId, accepted: boolean): void => {
if (!state.ready) {
console.warn("Attempted to touch consent before context is ready");
return;
}
// has success regardless local storage write success: we don't handle that
// and don't want to crash anything with just consent functionality
const newSet = state.consentAnsweredForMeetings.has(meetingId)
? state.consentAnsweredForMeetings
: new Set([...state.consentAnsweredForMeetings, meetingId]);
// note: preserves the set insertion order
const array = Array.from(newSet).slice(-5); // Keep latest 5
safeWriteToStorage(array);
setState({ ready: true, consentAnsweredForMeetings: newSet });
const newMap = new Map(state.consentForMeetings);
newMap.set(meetingId, accepted);
safeWriteToStorage(newMap);
setState({ ready: true, consentForMeetings: newMap });
};
const hasConsent = (meetingId: string): boolean => {
const hasAnswered = (meetingId: MeetingId): boolean => {
if (!state.ready) return false;
return state.consentAnsweredForMeetings.has(meetingId);
return state.consentForMeetings.has(meetingId);
};
const hasAccepted = (meetingId: MeetingId): boolean => {
if (!state.ready) return false;
return state.consentForMeetings.get(meetingId) === true;
};
// initialize on mount
useEffect(() => {
try {
if (typeof window === "undefined" || !window.localStorage) {
setState({ ready: true, consentAnsweredForMeetings: new Set() });
setState({ ready: true, consentForMeetings: new Map() });
return;
}
const stored = localStorage.getItem(LOCAL_STORAGE_KEY);
if (!stored) {
setState({ ready: true, consentAnsweredForMeetings: new Set() });
setState({ ready: true, consentForMeetings: new Map() });
return;
}
const parsed = JSON.parse(stored);
if (!Array.isArray(parsed)) {
console.warn("Invalid consent data format in localStorage, resetting");
setState({ ready: true, consentAnsweredForMeetings: new Set() });
setState({ ready: true, consentForMeetings: new Map() });
return;
}
// pre-historic way of parsing!
const consentAnsweredForMeetings = new Set(
parsed.filter((id) => !!id && typeof id === "string"),
);
setState({ ready: true, consentAnsweredForMeetings });
const consentForMeetings = new Map<MeetingId, boolean>();
for (const entry of parsed) {
const decoded = decodeEntry(entry);
if (decoded) {
consentForMeetings.set(decoded.meetingId, decoded.accepted);
}
}
setState({ ready: true, consentForMeetings });
} catch (error) {
// we don't want to fail the page here; the component is not essential.
console.error("Failed to parse consent data from localStorage:", error);
setState({ ready: true, consentAnsweredForMeetings: new Set() });
setState({ ready: true, consentForMeetings: new Map() });
}
}, []);
const value: RecordingConsentContextValue = {
state,
touch,
hasConsent,
hasAnswered,
hasAccepted,
};
return (

View File

@@ -893,8 +893,16 @@ export interface components {
* @default false
*/
ics_enabled: boolean;
/** Platform */
platform?: ("whereby" | "daily") | null;
/**
* Platform
* @enum {string}
*/
platform: "whereby" | "daily";
/**
* Skip Consent
* @default false
*/
skip_consent: boolean;
};
/** CreateRoomMeeting */
CreateRoomMeeting: {
@@ -1123,7 +1131,9 @@ export interface components {
/** Audio Deleted */
audio_deleted?: boolean | null;
/** Participants */
participants: components["schemas"]["TranscriptParticipant"][] | null;
participants:
| components["schemas"]["TranscriptParticipantWithEmail"][]
| null;
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
@@ -1184,7 +1194,9 @@ export interface components {
/** Audio Deleted */
audio_deleted?: boolean | null;
/** Participants */
participants: components["schemas"]["TranscriptParticipant"][] | null;
participants:
| components["schemas"]["TranscriptParticipantWithEmail"][]
| null;
};
/**
* GetTranscriptWithText
@@ -1246,7 +1258,9 @@ export interface components {
/** Audio Deleted */
audio_deleted?: boolean | null;
/** Participants */
participants: components["schemas"]["TranscriptParticipant"][] | null;
participants:
| components["schemas"]["TranscriptParticipantWithEmail"][]
| null;
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
@@ -1315,7 +1329,9 @@ export interface components {
/** Audio Deleted */
audio_deleted?: boolean | null;
/** Participants */
participants: components["schemas"]["TranscriptParticipant"][] | null;
participants:
| components["schemas"]["TranscriptParticipantWithEmail"][]
| null;
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
@@ -1386,7 +1402,9 @@ export interface components {
/** Audio Deleted */
audio_deleted?: boolean | null;
/** Participants */
participants: components["schemas"]["TranscriptParticipant"][] | null;
participants:
| components["schemas"]["TranscriptParticipantWithEmail"][]
| null;
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
@@ -1567,6 +1585,11 @@ export interface components {
/** Name */
name: string;
};
/** ProcessStatus */
ProcessStatus: {
/** Status */
status: string;
};
/** Room */
Room: {
/** Id */
@@ -1617,6 +1640,11 @@ export interface components {
* @enum {string}
*/
platform: "whereby" | "daily";
/**
* Skip Consent
* @default false
*/
skip_consent: boolean;
};
/** RoomDetails */
RoomDetails: {
@@ -1668,6 +1696,11 @@ export interface components {
* @enum {string}
*/
platform: "whereby" | "daily";
/**
* Skip Consent
* @default false
*/
skip_consent: boolean;
/** Webhook Url */
webhook_url: string | null;
/** Webhook Secret */
@@ -1813,6 +1846,19 @@ export interface components {
/** User Id */
user_id?: string | null;
};
/** TranscriptParticipantWithEmail */
TranscriptParticipantWithEmail: {
/** Id */
id?: string;
/** Speaker */
speaker: number | null;
/** Name */
name: string;
/** User Id */
user_id?: string | null;
/** Email */
email?: string | null;
};
/**
* TranscriptSegment
* @description A single transcript segment with speaker and timing information.
@@ -1868,6 +1914,8 @@ export interface components {
ics_enabled?: boolean | null;
/** Platform */
platform?: ("whereby" | "daily") | null;
/** Skip Consent */
skip_consent?: boolean | null;
};
/** UpdateTranscript */
UpdateTranscript: {
@@ -3362,7 +3410,7 @@ export interface operations {
[name: string]: unknown;
};
content: {
"application/json": unknown;
"application/json": components["schemas"]["ProcessStatus"];
};
};
/** @description Validation Error */

View File

@@ -0,0 +1,3 @@
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" fill="#ef4444" stroke="none">
<circle cx="12" cy="12" r="8"/>
</svg>

After

Width:  |  Height:  |  Size: 131 B