mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 04:39:06 +00:00
Compare commits
9 Commits
feat/durab
...
feat/conse
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0828bb846 | ||
|
|
65916c273f | ||
|
|
15afd57ed9 | ||
|
|
3929a80665 | ||
|
|
a988c3aa92 | ||
|
|
9edc38b861 | ||
|
|
fbf319573e | ||
|
|
537f9413a5 | ||
|
|
129a19bcb5 |
15
CHANGELOG.md
15
CHANGELOG.md
@@ -1,20 +1,5 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
## [0.24.0](https://github.com/Monadical-SAS/reflector/compare/v0.23.2...v0.24.0) (2025-12-18)
|
|
||||||
|
|
||||||
|
|
||||||
### Features
|
|
||||||
|
|
||||||
* identify action items ([#790](https://github.com/Monadical-SAS/reflector/issues/790)) ([964cd78](https://github.com/Monadical-SAS/reflector/commit/964cd78bb699d83d012ae4b8c96565df25b90a5d))
|
|
||||||
|
|
||||||
|
|
||||||
### Bug Fixes
|
|
||||||
|
|
||||||
* automatically reprocess daily recordings ([#797](https://github.com/Monadical-SAS/reflector/issues/797)) ([5f458aa](https://github.com/Monadical-SAS/reflector/commit/5f458aa4a7ec3d00ca5ec49d62fcc8ad232b138e))
|
|
||||||
* daily video optimisation ([#789](https://github.com/Monadical-SAS/reflector/issues/789)) ([16284e1](https://github.com/Monadical-SAS/reflector/commit/16284e1ac3faede2b74f0d91b50c0b5612af2c35))
|
|
||||||
* main menu login ([#800](https://github.com/Monadical-SAS/reflector/issues/800)) ([0bc971b](https://github.com/Monadical-SAS/reflector/commit/0bc971ba966a52d719c8c240b47dc7b3bdea4391))
|
|
||||||
* retry on workflow timeout ([#798](https://github.com/Monadical-SAS/reflector/issues/798)) ([5f7dfad](https://github.com/Monadical-SAS/reflector/commit/5f7dfadabd3e8017406ad3720ba495a59963ee34))
|
|
||||||
|
|
||||||
## [0.23.2](https://github.com/Monadical-SAS/reflector/compare/v0.23.1...v0.23.2) (2025-12-11)
|
## [0.23.2](https://github.com/Monadical-SAS/reflector/compare/v0.23.1...v0.23.2) (2025-12-11)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -34,20 +34,6 @@ services:
|
|||||||
environment:
|
environment:
|
||||||
ENTRYPOINT: beat
|
ENTRYPOINT: beat
|
||||||
|
|
||||||
hatchet-worker:
|
|
||||||
build:
|
|
||||||
context: server
|
|
||||||
volumes:
|
|
||||||
- ./server/:/app/
|
|
||||||
- /app/.venv
|
|
||||||
env_file:
|
|
||||||
- ./server/.env
|
|
||||||
environment:
|
|
||||||
ENTRYPOINT: hatchet-worker
|
|
||||||
depends_on:
|
|
||||||
hatchet:
|
|
||||||
condition: service_healthy
|
|
||||||
|
|
||||||
redis:
|
redis:
|
||||||
image: redis:7.2
|
image: redis:7.2
|
||||||
ports:
|
ports:
|
||||||
@@ -69,7 +55,6 @@ services:
|
|||||||
|
|
||||||
postgres:
|
postgres:
|
||||||
image: postgres:17
|
image: postgres:17
|
||||||
command: postgres -c 'max_connections=200'
|
|
||||||
ports:
|
ports:
|
||||||
- 5432:5432
|
- 5432:5432
|
||||||
environment:
|
environment:
|
||||||
@@ -78,42 +63,6 @@ services:
|
|||||||
POSTGRES_DB: reflector
|
POSTGRES_DB: reflector
|
||||||
volumes:
|
volumes:
|
||||||
- ./data/postgres:/var/lib/postgresql/data
|
- ./data/postgres:/var/lib/postgresql/data
|
||||||
- ./server/docker/init-hatchet-db.sql:/docker-entrypoint-initdb.d/init-hatchet-db.sql:ro
|
|
||||||
healthcheck:
|
|
||||||
test: ["CMD-SHELL", "pg_isready -d reflector -U reflector"]
|
|
||||||
interval: 10s
|
|
||||||
timeout: 10s
|
|
||||||
retries: 5
|
|
||||||
start_period: 10s
|
|
||||||
|
|
||||||
hatchet:
|
|
||||||
image: ghcr.io/hatchet-dev/hatchet/hatchet-lite:latest
|
|
||||||
ports:
|
|
||||||
- "8889:8888"
|
|
||||||
- "7078:7077"
|
|
||||||
depends_on:
|
|
||||||
postgres:
|
|
||||||
condition: service_healthy
|
|
||||||
environment:
|
|
||||||
DATABASE_URL: "postgresql://reflector:reflector@postgres:5432/hatchet?sslmode=disable"
|
|
||||||
SERVER_AUTH_COOKIE_DOMAIN: localhost
|
|
||||||
SERVER_AUTH_COOKIE_INSECURE: "t"
|
|
||||||
SERVER_GRPC_BIND_ADDRESS: "0.0.0.0"
|
|
||||||
SERVER_GRPC_INSECURE: "t"
|
|
||||||
SERVER_GRPC_BROADCAST_ADDRESS: hatchet:7077
|
|
||||||
SERVER_GRPC_PORT: "7077"
|
|
||||||
SERVER_URL: http://localhost:8889
|
|
||||||
SERVER_AUTH_SET_EMAIL_VERIFIED: "t"
|
|
||||||
# SERVER_DEFAULT_ENGINE_VERSION: "V1" # default
|
|
||||||
SERVER_INTERNAL_CLIENT_INTERNAL_GRPC_BROADCAST_ADDRESS: hatchet:7077
|
|
||||||
volumes:
|
|
||||||
- ./data/hatchet-config:/config
|
|
||||||
healthcheck:
|
|
||||||
test: ["CMD", "curl", "-f", "http://localhost:8888/api/live"]
|
|
||||||
interval: 30s
|
|
||||||
timeout: 10s
|
|
||||||
retries: 5
|
|
||||||
start_period: 30s
|
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
default:
|
default:
|
||||||
|
|||||||
@@ -53,36 +53,6 @@ response = sqs.receive_message(QueueUrl=queue_url, ...)
|
|||||||
uv run /app/requeue_uploaded_file.py TRANSCRIPT_ID
|
uv run /app/requeue_uploaded_file.py TRANSCRIPT_ID
|
||||||
```
|
```
|
||||||
|
|
||||||
## Hatchet Setup (Fresh DB)
|
|
||||||
|
|
||||||
After resetting the Hatchet database:
|
|
||||||
|
|
||||||
### Option A: Automatic (CLI)
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Get default tenant ID and create token in one command
|
|
||||||
TENANT_ID=$(docker compose exec -T postgres psql -U reflector -d hatchet -t -c \
|
|
||||||
"SELECT id FROM \"Tenant\" WHERE slug = 'default';" | tr -d ' \n') && \
|
|
||||||
TOKEN=$(docker compose exec -T hatchet /hatchet-admin token create \
|
|
||||||
--config /config --tenant-id "$TENANT_ID" 2>/dev/null | tr -d '\n') && \
|
|
||||||
echo "HATCHET_CLIENT_TOKEN=$TOKEN"
|
|
||||||
```
|
|
||||||
|
|
||||||
Copy the output to `server/.env`.
|
|
||||||
|
|
||||||
### Option B: Manual (UI)
|
|
||||||
|
|
||||||
1. Create API token at http://localhost:8889 → Settings → API Tokens
|
|
||||||
2. Update `server/.env`: `HATCHET_CLIENT_TOKEN=<new-token>`
|
|
||||||
|
|
||||||
### Then restart workers
|
|
||||||
|
|
||||||
```bash
|
|
||||||
docker compose restart server hatchet-worker
|
|
||||||
```
|
|
||||||
|
|
||||||
Workflows register automatically when hatchet-worker starts.
|
|
||||||
|
|
||||||
## Pipeline Management
|
## Pipeline Management
|
||||||
|
|
||||||
### Continue stuck pipeline from final summaries (identify_participants) step:
|
### Continue stuck pipeline from final summaries (identify_participants) step:
|
||||||
|
|||||||
@@ -1,2 +0,0 @@
|
|||||||
-- Create hatchet database for Hatchet workflow engine
|
|
||||||
CREATE DATABASE hatchet;
|
|
||||||
@@ -1,28 +0,0 @@
|
|||||||
"""add workflow_run_id to transcript
|
|
||||||
|
|
||||||
Revision ID: 0f943fede0e0
|
|
||||||
Revises: 05f8688d6895
|
|
||||||
Create Date: 2025-12-16 01:54:13.855106
|
|
||||||
|
|
||||||
"""
|
|
||||||
|
|
||||||
from typing import Sequence, Union
|
|
||||||
|
|
||||||
import sqlalchemy as sa
|
|
||||||
from alembic import op
|
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
|
||||||
revision: str = "0f943fede0e0"
|
|
||||||
down_revision: Union[str, None] = "05f8688d6895"
|
|
||||||
branch_labels: Union[str, Sequence[str], None] = None
|
|
||||||
depends_on: Union[str, Sequence[str], None] = None
|
|
||||||
|
|
||||||
|
|
||||||
def upgrade() -> None:
|
|
||||||
with op.batch_alter_table("transcript", schema=None) as batch_op:
|
|
||||||
batch_op.add_column(sa.Column("workflow_run_id", sa.String(), nullable=True))
|
|
||||||
|
|
||||||
|
|
||||||
def downgrade() -> None:
|
|
||||||
with op.batch_alter_table("transcript", schema=None) as batch_op:
|
|
||||||
batch_op.drop_column("workflow_run_id")
|
|
||||||
@@ -1,8 +1,8 @@
|
|||||||
"""add use_hatchet to room
|
"""add skip_consent to room
|
||||||
|
|
||||||
Revision ID: bd3a729bb379
|
Revision ID: 20251217000000
|
||||||
Revises: 0f943fede0e0
|
Revises: 05f8688d6895
|
||||||
Create Date: 2025-12-16 16:34:03.594231
|
Create Date: 2025-12-17 00:00:00.000000
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -12,8 +12,8 @@ import sqlalchemy as sa
|
|||||||
from alembic import op
|
from alembic import op
|
||||||
|
|
||||||
# revision identifiers, used by Alembic.
|
# revision identifiers, used by Alembic.
|
||||||
revision: str = "bd3a729bb379"
|
revision: str = "20251217000000"
|
||||||
down_revision: Union[str, None] = "0f943fede0e0"
|
down_revision: Union[str, None] = "05f8688d6895"
|
||||||
branch_labels: Union[str, Sequence[str], None] = None
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
depends_on: Union[str, Sequence[str], None] = None
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
@@ -22,14 +22,14 @@ def upgrade() -> None:
|
|||||||
with op.batch_alter_table("room", schema=None) as batch_op:
|
with op.batch_alter_table("room", schema=None) as batch_op:
|
||||||
batch_op.add_column(
|
batch_op.add_column(
|
||||||
sa.Column(
|
sa.Column(
|
||||||
"use_hatchet",
|
"skip_consent",
|
||||||
sa.Boolean(),
|
sa.Boolean(),
|
||||||
server_default=sa.text("false"),
|
|
||||||
nullable=False,
|
nullable=False,
|
||||||
|
server_default=sa.text("false"),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def downgrade() -> None:
|
def downgrade() -> None:
|
||||||
with op.batch_alter_table("room", schema=None) as batch_op:
|
with op.batch_alter_table("room", schema=None) as batch_op:
|
||||||
batch_op.drop_column("use_hatchet")
|
batch_op.drop_column("skip_consent")
|
||||||
@@ -39,7 +39,6 @@ dependencies = [
|
|||||||
"pytest-env>=1.1.5",
|
"pytest-env>=1.1.5",
|
||||||
"webvtt-py>=0.5.0",
|
"webvtt-py>=0.5.0",
|
||||||
"icalendar>=6.0.0",
|
"icalendar>=6.0.0",
|
||||||
"hatchet-sdk>=0.47.0",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[dependency-groups]
|
[dependency-groups]
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ class DailyApiError(Exception):
|
|||||||
)
|
)
|
||||||
|
|
||||||
super().__init__(
|
super().__init__(
|
||||||
f"Daily.co API error: {operation} failed with status {self.status_code}: {response.text}"
|
f"Daily.co API error: {operation} failed with status {self.status_code}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -58,10 +58,10 @@ rooms = sqlalchemy.Table(
|
|||||||
nullable=False,
|
nullable=False,
|
||||||
),
|
),
|
||||||
sqlalchemy.Column(
|
sqlalchemy.Column(
|
||||||
"use_hatchet",
|
"skip_consent",
|
||||||
sqlalchemy.Boolean,
|
sqlalchemy.Boolean,
|
||||||
nullable=False,
|
nullable=False,
|
||||||
server_default=false(),
|
server_default=sqlalchemy.sql.false(),
|
||||||
),
|
),
|
||||||
sqlalchemy.Index("idx_room_is_shared", "is_shared"),
|
sqlalchemy.Index("idx_room_is_shared", "is_shared"),
|
||||||
sqlalchemy.Index("idx_room_ics_enabled", "ics_enabled"),
|
sqlalchemy.Index("idx_room_ics_enabled", "ics_enabled"),
|
||||||
@@ -91,7 +91,7 @@ class Room(BaseModel):
|
|||||||
ics_last_sync: datetime | None = None
|
ics_last_sync: datetime | None = None
|
||||||
ics_last_etag: str | None = None
|
ics_last_etag: str | None = None
|
||||||
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
|
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
|
||||||
use_hatchet: bool = False
|
skip_consent: bool = False
|
||||||
|
|
||||||
|
|
||||||
class RoomController:
|
class RoomController:
|
||||||
@@ -146,6 +146,7 @@ class RoomController:
|
|||||||
ics_fetch_interval: int = 300,
|
ics_fetch_interval: int = 300,
|
||||||
ics_enabled: bool = False,
|
ics_enabled: bool = False,
|
||||||
platform: Platform = settings.DEFAULT_VIDEO_PLATFORM,
|
platform: Platform = settings.DEFAULT_VIDEO_PLATFORM,
|
||||||
|
skip_consent: bool = False,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Add a new room
|
Add a new room
|
||||||
@@ -170,6 +171,7 @@ class RoomController:
|
|||||||
"ics_fetch_interval": ics_fetch_interval,
|
"ics_fetch_interval": ics_fetch_interval,
|
||||||
"ics_enabled": ics_enabled,
|
"ics_enabled": ics_enabled,
|
||||||
"platform": platform,
|
"platform": platform,
|
||||||
|
"skip_consent": skip_consent,
|
||||||
}
|
}
|
||||||
|
|
||||||
room = Room(**room_data)
|
room = Room(**room_data)
|
||||||
|
|||||||
@@ -84,8 +84,6 @@ transcripts = sqlalchemy.Table(
|
|||||||
sqlalchemy.Column("audio_deleted", sqlalchemy.Boolean),
|
sqlalchemy.Column("audio_deleted", sqlalchemy.Boolean),
|
||||||
sqlalchemy.Column("room_id", sqlalchemy.String),
|
sqlalchemy.Column("room_id", sqlalchemy.String),
|
||||||
sqlalchemy.Column("webvtt", sqlalchemy.Text),
|
sqlalchemy.Column("webvtt", sqlalchemy.Text),
|
||||||
# Hatchet workflow run ID for resumption of failed workflows
|
|
||||||
sqlalchemy.Column("workflow_run_id", sqlalchemy.String),
|
|
||||||
sqlalchemy.Index("idx_transcript_recording_id", "recording_id"),
|
sqlalchemy.Index("idx_transcript_recording_id", "recording_id"),
|
||||||
sqlalchemy.Index("idx_transcript_user_id", "user_id"),
|
sqlalchemy.Index("idx_transcript_user_id", "user_id"),
|
||||||
sqlalchemy.Index("idx_transcript_created_at", "created_at"),
|
sqlalchemy.Index("idx_transcript_created_at", "created_at"),
|
||||||
@@ -225,7 +223,6 @@ class Transcript(BaseModel):
|
|||||||
zulip_message_id: int | None = None
|
zulip_message_id: int | None = None
|
||||||
audio_deleted: bool | None = None
|
audio_deleted: bool | None = None
|
||||||
webvtt: str | None = None
|
webvtt: str | None = None
|
||||||
workflow_run_id: str | None = None # Hatchet workflow run ID for resumption
|
|
||||||
|
|
||||||
@field_serializer("created_at", when_used="json")
|
@field_serializer("created_at", when_used="json")
|
||||||
def serialize_datetime(self, dt: datetime) -> str:
|
def serialize_datetime(self, dt: datetime) -> str:
|
||||||
|
|||||||
@@ -1,5 +0,0 @@
|
|||||||
"""Hatchet workflow orchestration for Reflector."""
|
|
||||||
|
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
|
||||||
|
|
||||||
__all__ = ["HatchetClientManager"]
|
|
||||||
@@ -1,98 +0,0 @@
|
|||||||
"""WebSocket broadcasting helpers for Hatchet workflows.
|
|
||||||
|
|
||||||
DUPLICATION NOTE: To be kept when Celery is deprecated. Currently dupes Celery logic.
|
|
||||||
|
|
||||||
Provides WebSocket broadcasting for Hatchet that matches Celery's @broadcast_to_sockets
|
|
||||||
decorator behavior. Events are broadcast to transcript rooms and user rooms.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
import structlog
|
|
||||||
|
|
||||||
from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller
|
|
||||||
from reflector.utils.string import NonEmptyString
|
|
||||||
from reflector.ws_manager import get_ws_manager
|
|
||||||
|
|
||||||
# Events that should also be sent to user room (matches Celery behavior)
|
|
||||||
USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"}
|
|
||||||
|
|
||||||
|
|
||||||
async def broadcast_event(
|
|
||||||
transcript_id: NonEmptyString,
|
|
||||||
event: TranscriptEvent,
|
|
||||||
logger: structlog.BoundLogger,
|
|
||||||
) -> None:
|
|
||||||
"""Broadcast a TranscriptEvent to WebSocket subscribers.
|
|
||||||
|
|
||||||
Fire-and-forget: errors are logged but don't interrupt workflow execution.
|
|
||||||
"""
|
|
||||||
logger.info(
|
|
||||||
"Broadcasting event",
|
|
||||||
transcript_id=transcript_id,
|
|
||||||
event_type=event.event,
|
|
||||||
)
|
|
||||||
try:
|
|
||||||
ws_manager = get_ws_manager()
|
|
||||||
|
|
||||||
await ws_manager.send_json(
|
|
||||||
room_id=f"ts:{transcript_id}",
|
|
||||||
message=event.model_dump(mode="json"),
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"Event sent to transcript room",
|
|
||||||
transcript_id=transcript_id,
|
|
||||||
event_type=event.event,
|
|
||||||
)
|
|
||||||
|
|
||||||
if event.event in USER_ROOM_EVENTS:
|
|
||||||
transcript = await transcripts_controller.get_by_id(transcript_id)
|
|
||||||
if transcript and transcript.user_id:
|
|
||||||
await ws_manager.send_json(
|
|
||||||
room_id=f"user:{transcript.user_id}",
|
|
||||||
message={
|
|
||||||
"event": f"TRANSCRIPT_{event.event}",
|
|
||||||
"data": {"id": transcript_id, **event.data},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(
|
|
||||||
"Failed to broadcast event",
|
|
||||||
error=str(e),
|
|
||||||
transcript_id=transcript_id,
|
|
||||||
event_type=event.event,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def set_status_and_broadcast(
|
|
||||||
transcript_id: NonEmptyString,
|
|
||||||
status: str,
|
|
||||||
logger: structlog.BoundLogger,
|
|
||||||
) -> None:
|
|
||||||
"""Set transcript status and broadcast to WebSocket.
|
|
||||||
|
|
||||||
Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting.
|
|
||||||
"""
|
|
||||||
event = await transcripts_controller.set_status(transcript_id, status)
|
|
||||||
if event:
|
|
||||||
await broadcast_event(transcript_id, event, logger=logger)
|
|
||||||
|
|
||||||
|
|
||||||
async def append_event_and_broadcast(
|
|
||||||
transcript_id: NonEmptyString,
|
|
||||||
transcript: Transcript,
|
|
||||||
event_name: str,
|
|
||||||
data: Any,
|
|
||||||
logger: structlog.BoundLogger,
|
|
||||||
) -> TranscriptEvent:
|
|
||||||
"""Append event to transcript and broadcast to WebSocket.
|
|
||||||
|
|
||||||
Wrapper around transcripts_controller.append_event that adds WebSocket broadcasting.
|
|
||||||
"""
|
|
||||||
event = await transcripts_controller.append_event(
|
|
||||||
transcript=transcript,
|
|
||||||
event=event_name,
|
|
||||||
data=data,
|
|
||||||
)
|
|
||||||
await broadcast_event(transcript_id, event, logger=logger)
|
|
||||||
return event
|
|
||||||
@@ -1,111 +0,0 @@
|
|||||||
"""Hatchet Python client wrapper.
|
|
||||||
|
|
||||||
Uses singleton pattern because:
|
|
||||||
1. Hatchet client maintains persistent gRPC connections for workflow registration
|
|
||||||
2. Creating multiple clients would cause registration conflicts and resource leaks
|
|
||||||
3. The SDK is designed for a single client instance per process
|
|
||||||
4. Tests use `HatchetClientManager.reset()` to isolate state between tests
|
|
||||||
"""
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import threading
|
|
||||||
|
|
||||||
from hatchet_sdk import ClientConfig, Hatchet
|
|
||||||
from hatchet_sdk.clients.rest.models import V1TaskStatus
|
|
||||||
|
|
||||||
from reflector.logger import logger
|
|
||||||
from reflector.settings import settings
|
|
||||||
|
|
||||||
|
|
||||||
class HatchetClientManager:
|
|
||||||
"""Singleton manager for Hatchet client connections.
|
|
||||||
|
|
||||||
See module docstring for rationale. For test isolation, use `reset()`.
|
|
||||||
"""
|
|
||||||
|
|
||||||
_instance: Hatchet | None = None
|
|
||||||
_lock = threading.Lock()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def get_client(cls) -> Hatchet:
|
|
||||||
"""Get or create the Hatchet client (thread-safe singleton)."""
|
|
||||||
if cls._instance is None:
|
|
||||||
with cls._lock:
|
|
||||||
if cls._instance is None:
|
|
||||||
if not settings.HATCHET_CLIENT_TOKEN:
|
|
||||||
raise ValueError("HATCHET_CLIENT_TOKEN must be set")
|
|
||||||
|
|
||||||
# Pass root logger to Hatchet so workflow logs appear in dashboard
|
|
||||||
root_logger = logging.getLogger()
|
|
||||||
cls._instance = Hatchet(
|
|
||||||
debug=settings.HATCHET_DEBUG,
|
|
||||||
config=ClientConfig(logger=root_logger),
|
|
||||||
)
|
|
||||||
return cls._instance
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
async def start_workflow(
|
|
||||||
cls,
|
|
||||||
workflow_name: str,
|
|
||||||
input_data: dict,
|
|
||||||
additional_metadata: dict | None = None,
|
|
||||||
) -> str:
|
|
||||||
"""Start a workflow and return the workflow run ID.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
workflow_name: Name of the workflow to trigger.
|
|
||||||
input_data: Input data for the workflow run.
|
|
||||||
additional_metadata: Optional metadata for filtering in dashboard
|
|
||||||
(e.g., transcript_id, recording_id).
|
|
||||||
"""
|
|
||||||
client = cls.get_client()
|
|
||||||
result = await client.runs.aio_create(
|
|
||||||
workflow_name,
|
|
||||||
input_data,
|
|
||||||
additional_metadata=additional_metadata,
|
|
||||||
)
|
|
||||||
return result.run.metadata.id
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
async def get_workflow_run_status(cls, workflow_run_id: str) -> V1TaskStatus:
|
|
||||||
client = cls.get_client()
|
|
||||||
return await client.runs.aio_get_status(workflow_run_id)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
async def cancel_workflow(cls, workflow_run_id: str) -> None:
|
|
||||||
client = cls.get_client()
|
|
||||||
await client.runs.aio_cancel(workflow_run_id)
|
|
||||||
logger.info("[Hatchet] Cancelled workflow", workflow_run_id=workflow_run_id)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
async def replay_workflow(cls, workflow_run_id: str) -> None:
|
|
||||||
client = cls.get_client()
|
|
||||||
await client.runs.aio_replay(workflow_run_id)
|
|
||||||
logger.info("[Hatchet] Replaying workflow", workflow_run_id=workflow_run_id)
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
async def can_replay(cls, workflow_run_id: str) -> bool:
|
|
||||||
"""Check if workflow can be replayed (is FAILED)."""
|
|
||||||
try:
|
|
||||||
status = await cls.get_workflow_run_status(workflow_run_id)
|
|
||||||
return status == V1TaskStatus.FAILED or status == V1TaskStatus.CANCELLED
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(
|
|
||||||
"[Hatchet] Failed to check replay status",
|
|
||||||
workflow_run_id=workflow_run_id,
|
|
||||||
error=str(e),
|
|
||||||
)
|
|
||||||
return False
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
async def get_workflow_status(cls, workflow_run_id: str) -> dict:
|
|
||||||
"""Get the full workflow run details as dict."""
|
|
||||||
client = cls.get_client()
|
|
||||||
run = await client.runs.aio_get(workflow_run_id)
|
|
||||||
return run.to_dict()
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def reset(cls) -> None:
|
|
||||||
"""Reset the client instance (for testing)."""
|
|
||||||
with cls._lock:
|
|
||||||
cls._instance = None
|
|
||||||
@@ -1,63 +0,0 @@
|
|||||||
"""
|
|
||||||
Run Hatchet workers for the diarization pipeline.
|
|
||||||
Runs as a separate process, just like Celery workers.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
uv run -m reflector.hatchet.run_workers
|
|
||||||
|
|
||||||
# Or via docker:
|
|
||||||
docker compose exec server uv run -m reflector.hatchet.run_workers
|
|
||||||
"""
|
|
||||||
|
|
||||||
import signal
|
|
||||||
import sys
|
|
||||||
|
|
||||||
from reflector.logger import logger
|
|
||||||
from reflector.settings import settings
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
|
||||||
"""Start Hatchet worker polling."""
|
|
||||||
if not settings.HATCHET_ENABLED:
|
|
||||||
logger.error("HATCHET_ENABLED is False, not starting workers")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if not settings.HATCHET_CLIENT_TOKEN:
|
|
||||||
logger.error("HATCHET_CLIENT_TOKEN is not set")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
"Starting Hatchet workers",
|
|
||||||
debug=settings.HATCHET_DEBUG,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Import here (not top-level) - workflow modules call HatchetClientManager.get_client()
|
|
||||||
# at module level because Hatchet SDK decorators (@workflow.task) bind at import time.
|
|
||||||
# Can't use lazy init: decorators need the client object when function is defined.
|
|
||||||
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
|
|
||||||
from reflector.hatchet.workflows import ( # noqa: PLC0415
|
|
||||||
diarization_pipeline,
|
|
||||||
track_workflow,
|
|
||||||
)
|
|
||||||
|
|
||||||
hatchet = HatchetClientManager.get_client()
|
|
||||||
|
|
||||||
worker = hatchet.worker(
|
|
||||||
"reflector-diarization-worker",
|
|
||||||
workflows=[diarization_pipeline, track_workflow],
|
|
||||||
)
|
|
||||||
|
|
||||||
def shutdown_handler(signum: int, frame) -> None:
|
|
||||||
logger.info("Received shutdown signal, stopping workers...")
|
|
||||||
# Worker cleanup happens automatically on exit
|
|
||||||
sys.exit(0)
|
|
||||||
|
|
||||||
signal.signal(signal.SIGINT, shutdown_handler)
|
|
||||||
signal.signal(signal.SIGTERM, shutdown_handler)
|
|
||||||
|
|
||||||
logger.info("Starting Hatchet worker polling...")
|
|
||||||
worker.start()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
||||||
@@ -1,14 +0,0 @@
|
|||||||
"""Hatchet workflow definitions."""
|
|
||||||
|
|
||||||
from reflector.hatchet.workflows.diarization_pipeline import (
|
|
||||||
PipelineInput,
|
|
||||||
diarization_pipeline,
|
|
||||||
)
|
|
||||||
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
|
||||||
|
|
||||||
__all__ = [
|
|
||||||
"diarization_pipeline",
|
|
||||||
"track_workflow",
|
|
||||||
"PipelineInput",
|
|
||||||
"TrackInput",
|
|
||||||
]
|
|
||||||
@@ -1,961 +0,0 @@
|
|||||||
"""
|
|
||||||
Hatchet main workflow: DiarizationPipeline
|
|
||||||
|
|
||||||
Multitrack diarization pipeline for Daily.co recordings.
|
|
||||||
Orchestrates the full processing flow from recording metadata to final transcript.
|
|
||||||
|
|
||||||
Note: This file uses deferred imports (inside functions/tasks) intentionally.
|
|
||||||
Hatchet workers run in forked processes; fresh imports per task ensure DB connections
|
|
||||||
are not shared across forks, avoiding connection pooling issues.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import functools
|
|
||||||
import tempfile
|
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
from datetime import timedelta
|
|
||||||
from pathlib import Path
|
|
||||||
from typing import Callable
|
|
||||||
|
|
||||||
import httpx
|
|
||||||
from hatchet_sdk import Context
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
from reflector.dailyco_api.client import DailyApiClient
|
|
||||||
from reflector.hatchet.broadcast import (
|
|
||||||
append_event_and_broadcast,
|
|
||||||
set_status_and_broadcast,
|
|
||||||
)
|
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
|
||||||
from reflector.hatchet.workflows.models import (
|
|
||||||
ConsentResult,
|
|
||||||
FinalizeResult,
|
|
||||||
MixdownResult,
|
|
||||||
PaddedTrackInfo,
|
|
||||||
ParticipantsResult,
|
|
||||||
ProcessTracksResult,
|
|
||||||
RecordingResult,
|
|
||||||
SummaryResult,
|
|
||||||
TitleResult,
|
|
||||||
TopicsResult,
|
|
||||||
WaveformResult,
|
|
||||||
WebhookResult,
|
|
||||||
ZulipResult,
|
|
||||||
)
|
|
||||||
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
|
||||||
from reflector.logger import logger
|
|
||||||
from reflector.pipelines import topic_processing
|
|
||||||
from reflector.processors import AudioFileWriterProcessor
|
|
||||||
from reflector.processors.types import (
|
|
||||||
TitleSummary,
|
|
||||||
TitleSummaryWithId,
|
|
||||||
Word,
|
|
||||||
)
|
|
||||||
from reflector.processors.types import (
|
|
||||||
Transcript as TranscriptType,
|
|
||||||
)
|
|
||||||
from reflector.settings import settings
|
|
||||||
from reflector.storage.storage_aws import AwsStorage
|
|
||||||
from reflector.utils.audio_constants import (
|
|
||||||
PRESIGNED_URL_EXPIRATION_SECONDS,
|
|
||||||
WAVEFORM_SEGMENTS,
|
|
||||||
)
|
|
||||||
from reflector.utils.audio_mixdown import (
|
|
||||||
detect_sample_rate_from_tracks,
|
|
||||||
mixdown_tracks_pyav,
|
|
||||||
)
|
|
||||||
from reflector.utils.audio_waveform import get_audio_waveform
|
|
||||||
from reflector.utils.daily import (
|
|
||||||
filter_cam_audio_tracks,
|
|
||||||
parse_daily_recording_filename,
|
|
||||||
)
|
|
||||||
from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty
|
|
||||||
from reflector.zulip import post_transcript_notification
|
|
||||||
|
|
||||||
|
|
||||||
class PipelineInput(BaseModel):
|
|
||||||
"""Input to trigger the diarization pipeline."""
|
|
||||||
|
|
||||||
recording_id: NonEmptyString
|
|
||||||
tracks: list[dict] # List of {"s3_key": str}
|
|
||||||
bucket_name: NonEmptyString
|
|
||||||
transcript_id: NonEmptyString
|
|
||||||
room_id: NonEmptyString | None = None
|
|
||||||
|
|
||||||
|
|
||||||
hatchet = HatchetClientManager.get_client()
|
|
||||||
|
|
||||||
diarization_pipeline = hatchet.workflow(
|
|
||||||
name="DiarizationPipeline", input_validator=PipelineInput
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def fresh_db_connection():
|
|
||||||
"""Context manager for database connections in Hatchet workers.
|
|
||||||
TECH DEBT: Made to make connection fork-aware without changing db code too much.
|
|
||||||
The real fix would be making the db module fork-aware instead of bypassing it.
|
|
||||||
Current pattern is acceptable given Hatchet's process model.
|
|
||||||
"""
|
|
||||||
import databases # noqa: PLC0415
|
|
||||||
|
|
||||||
from reflector.db import _database_context # noqa: PLC0415
|
|
||||||
|
|
||||||
_database_context.set(None)
|
|
||||||
db = databases.Database(settings.DATABASE_URL)
|
|
||||||
_database_context.set(db)
|
|
||||||
await db.connect()
|
|
||||||
try:
|
|
||||||
yield db
|
|
||||||
finally:
|
|
||||||
await db.disconnect()
|
|
||||||
_database_context.set(None)
|
|
||||||
|
|
||||||
|
|
||||||
async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool:
|
|
||||||
"""Set transcript status to 'error' on workflow failure.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if status was set successfully, False if failed.
|
|
||||||
Failure is logged as CRITICAL since it means transcript may be stuck.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
async with fresh_db_connection():
|
|
||||||
await set_status_and_broadcast(transcript_id, "error", logger=logger)
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(
|
|
||||||
"[Hatchet] CRITICAL: Failed to set error status - transcript may be stuck in 'processing'",
|
|
||||||
transcript_id=transcript_id,
|
|
||||||
error=str(e),
|
|
||||||
exc_info=True,
|
|
||||||
)
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def _spawn_storage():
|
|
||||||
"""Create fresh storage instance."""
|
|
||||||
return AwsStorage(
|
|
||||||
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
|
||||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
|
||||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
|
||||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def with_error_handling(step_name: str, set_error_status: bool = True) -> Callable:
|
|
||||||
"""Decorator that handles task failures uniformly.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
step_name: Name of the step for logging and progress tracking.
|
|
||||||
set_error_status: Whether to set transcript status to 'error' on failure.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def decorator(func: Callable) -> Callable:
|
|
||||||
@functools.wraps(func)
|
|
||||||
async def wrapper(input: PipelineInput, ctx: Context):
|
|
||||||
try:
|
|
||||||
return await func(input, ctx)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(
|
|
||||||
f"[Hatchet] {step_name} failed",
|
|
||||||
transcript_id=input.transcript_id,
|
|
||||||
error=str(e),
|
|
||||||
exc_info=True,
|
|
||||||
)
|
|
||||||
if set_error_status:
|
|
||||||
await set_workflow_error_status(input.transcript_id)
|
|
||||||
raise
|
|
||||||
|
|
||||||
return wrapper
|
|
||||||
|
|
||||||
return decorator
|
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(execution_timeout=timedelta(seconds=60), retries=3)
|
|
||||||
@with_error_handling("get_recording")
|
|
||||||
async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
|
|
||||||
"""Fetch recording metadata from Daily.co API."""
|
|
||||||
ctx.log(f"get_recording: recording_id={input.recording_id}")
|
|
||||||
|
|
||||||
# Set transcript status to "processing" at workflow start (broadcasts to WebSocket)
|
|
||||||
async with fresh_db_connection():
|
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
|
||||||
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
|
||||||
if transcript:
|
|
||||||
await set_status_and_broadcast(
|
|
||||||
input.transcript_id, "processing", logger=logger
|
|
||||||
)
|
|
||||||
ctx.log(f"Set transcript status to processing: {input.transcript_id}")
|
|
||||||
|
|
||||||
if not settings.DAILY_API_KEY:
|
|
||||||
raise ValueError("DAILY_API_KEY not configured")
|
|
||||||
|
|
||||||
async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client:
|
|
||||||
recording = await client.get_recording(input.recording_id)
|
|
||||||
|
|
||||||
ctx.log(
|
|
||||||
f"get_recording complete: room={recording.room_name}, duration={recording.duration}s"
|
|
||||||
)
|
|
||||||
|
|
||||||
return RecordingResult(
|
|
||||||
id=recording.id,
|
|
||||||
mtg_session_id=recording.mtgSessionId,
|
|
||||||
duration=recording.duration,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
|
||||||
parents=[get_recording], execution_timeout=timedelta(seconds=60), retries=3
|
|
||||||
)
|
|
||||||
@with_error_handling("get_participants")
|
|
||||||
async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult:
|
|
||||||
"""Fetch participant list from Daily.co API and update transcript in database."""
|
|
||||||
ctx.log(f"get_participants: transcript_id={input.transcript_id}")
|
|
||||||
|
|
||||||
recording = ctx.task_output(get_recording)
|
|
||||||
mtg_session_id = recording.mtg_session_id
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
|
||||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
|
||||||
TranscriptParticipant,
|
|
||||||
transcripts_controller,
|
|
||||||
)
|
|
||||||
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
|
||||||
if transcript:
|
|
||||||
# Note: title NOT cleared - preserves existing titles
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript,
|
|
||||||
{
|
|
||||||
"events": [],
|
|
||||||
"topics": [],
|
|
||||||
"participants": [],
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
mtg_session_id = assert_non_none_and_non_empty(
|
|
||||||
mtg_session_id, "mtg_session_id is required"
|
|
||||||
)
|
|
||||||
daily_api_key = assert_non_none_and_non_empty(
|
|
||||||
settings.DAILY_API_KEY, "DAILY_API_KEY is required"
|
|
||||||
)
|
|
||||||
|
|
||||||
async with DailyApiClient(api_key=daily_api_key) as client:
|
|
||||||
participants = await client.get_meeting_participants(mtg_session_id)
|
|
||||||
|
|
||||||
id_to_name = {}
|
|
||||||
id_to_user_id = {}
|
|
||||||
for p in participants.data:
|
|
||||||
if p.user_name:
|
|
||||||
id_to_name[p.participant_id] = p.user_name
|
|
||||||
if p.user_id:
|
|
||||||
id_to_user_id[p.participant_id] = p.user_id
|
|
||||||
|
|
||||||
track_keys = [t["s3_key"] for t in input.tracks]
|
|
||||||
cam_audio_keys = filter_cam_audio_tracks(track_keys)
|
|
||||||
|
|
||||||
participants_list = []
|
|
||||||
for idx, key in enumerate(cam_audio_keys):
|
|
||||||
try:
|
|
||||||
parsed = parse_daily_recording_filename(key)
|
|
||||||
participant_id = parsed.participant_id
|
|
||||||
except ValueError as e:
|
|
||||||
logger.error(
|
|
||||||
"Failed to parse Daily recording filename",
|
|
||||||
error=str(e),
|
|
||||||
key=key,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
default_name = f"Speaker {idx}"
|
|
||||||
name = id_to_name.get(participant_id, default_name)
|
|
||||||
user_id = id_to_user_id.get(participant_id)
|
|
||||||
|
|
||||||
participant = TranscriptParticipant(
|
|
||||||
id=participant_id, speaker=idx, name=name, user_id=user_id
|
|
||||||
)
|
|
||||||
await transcripts_controller.upsert_participant(transcript, participant)
|
|
||||||
participants_list.append(
|
|
||||||
{
|
|
||||||
"participant_id": participant_id,
|
|
||||||
"user_name": name,
|
|
||||||
"speaker": idx,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx.log(f"get_participants complete: {len(participants_list)} participants")
|
|
||||||
|
|
||||||
return ParticipantsResult(
|
|
||||||
participants=participants_list,
|
|
||||||
num_tracks=len(input.tracks),
|
|
||||||
source_language=transcript.source_language if transcript else "en",
|
|
||||||
target_language=transcript.target_language if transcript else "en",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
|
||||||
parents=[get_participants], execution_timeout=timedelta(seconds=600), retries=3
|
|
||||||
)
|
|
||||||
@with_error_handling("process_tracks")
|
|
||||||
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
|
|
||||||
"""Spawn child workflows for each track (dynamic fan-out)."""
|
|
||||||
ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows")
|
|
||||||
|
|
||||||
participants_result = ctx.task_output(get_participants)
|
|
||||||
source_language = participants_result.source_language
|
|
||||||
|
|
||||||
child_coroutines = [
|
|
||||||
track_workflow.aio_run(
|
|
||||||
TrackInput(
|
|
||||||
track_index=i,
|
|
||||||
s3_key=track["s3_key"],
|
|
||||||
bucket_name=input.bucket_name,
|
|
||||||
transcript_id=input.transcript_id,
|
|
||||||
language=source_language,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
for i, track in enumerate(input.tracks)
|
|
||||||
]
|
|
||||||
|
|
||||||
results = await asyncio.gather(*child_coroutines)
|
|
||||||
|
|
||||||
target_language = participants_result.target_language
|
|
||||||
|
|
||||||
track_words = []
|
|
||||||
padded_tracks = []
|
|
||||||
created_padded_files = set()
|
|
||||||
|
|
||||||
for result in results:
|
|
||||||
transcribe_result = result.get("transcribe_track", {})
|
|
||||||
track_words.append(transcribe_result.get("words", []))
|
|
||||||
|
|
||||||
pad_result = result.get("pad_track", {})
|
|
||||||
padded_key = pad_result.get("padded_key")
|
|
||||||
bucket_name = pad_result.get("bucket_name")
|
|
||||||
|
|
||||||
# Store S3 key info (not presigned URL) - consumer tasks presign on demand
|
|
||||||
if padded_key:
|
|
||||||
padded_tracks.append(
|
|
||||||
PaddedTrackInfo(key=padded_key, bucket_name=bucket_name)
|
|
||||||
)
|
|
||||||
|
|
||||||
track_index = pad_result.get("track_index")
|
|
||||||
if pad_result.get("size", 0) > 0 and track_index is not None:
|
|
||||||
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{track_index}.webm"
|
|
||||||
created_padded_files.add(storage_path)
|
|
||||||
|
|
||||||
all_words = [word for words in track_words for word in words]
|
|
||||||
all_words.sort(key=lambda w: w.get("start", 0))
|
|
||||||
|
|
||||||
ctx.log(
|
|
||||||
f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks"
|
|
||||||
)
|
|
||||||
|
|
||||||
return ProcessTracksResult(
|
|
||||||
all_words=all_words,
|
|
||||||
padded_tracks=padded_tracks,
|
|
||||||
word_count=len(all_words),
|
|
||||||
num_tracks=len(input.tracks),
|
|
||||||
target_language=target_language,
|
|
||||||
created_padded_files=list(created_padded_files),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
|
||||||
parents=[process_tracks], execution_timeout=timedelta(seconds=300), retries=3
|
|
||||||
)
|
|
||||||
@with_error_handling("mixdown_tracks")
|
|
||||||
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
|
|
||||||
"""Mix all padded tracks into single audio file using PyAV (same as Celery)."""
|
|
||||||
ctx.log("mixdown_tracks: mixing padded tracks into single audio file")
|
|
||||||
|
|
||||||
track_result = ctx.task_output(process_tracks)
|
|
||||||
padded_tracks = track_result.padded_tracks
|
|
||||||
|
|
||||||
# TODO think of NonEmpty type to avoid those checks, e.g. sized.NonEmpty from https://github.com/antonagestam/phantom-types/
|
|
||||||
if not padded_tracks:
|
|
||||||
raise ValueError("No padded tracks to mixdown")
|
|
||||||
|
|
||||||
storage = _spawn_storage()
|
|
||||||
|
|
||||||
# Presign URLs on demand (avoids stale URLs on workflow replay)
|
|
||||||
padded_urls = []
|
|
||||||
for track_info in padded_tracks:
|
|
||||||
if track_info.key:
|
|
||||||
url = await storage.get_file_url(
|
|
||||||
track_info.key,
|
|
||||||
operation="get_object",
|
|
||||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
|
||||||
bucket=track_info.bucket_name,
|
|
||||||
)
|
|
||||||
padded_urls.append(url)
|
|
||||||
|
|
||||||
valid_urls = [url for url in padded_urls if url]
|
|
||||||
if not valid_urls:
|
|
||||||
raise ValueError("No valid padded tracks to mixdown")
|
|
||||||
|
|
||||||
target_sample_rate = detect_sample_rate_from_tracks(valid_urls, logger=logger)
|
|
||||||
if not target_sample_rate:
|
|
||||||
logger.error("Mixdown failed - no decodable audio frames found")
|
|
||||||
raise ValueError("No decodable audio frames in any track")
|
|
||||||
|
|
||||||
output_path = tempfile.mktemp(suffix=".mp3")
|
|
||||||
duration_ms_callback_capture_container = [0.0]
|
|
||||||
|
|
||||||
async def capture_duration(d):
|
|
||||||
duration_ms_callback_capture_container[0] = d
|
|
||||||
|
|
||||||
writer = AudioFileWriterProcessor(path=output_path, on_duration=capture_duration)
|
|
||||||
|
|
||||||
await mixdown_tracks_pyav(
|
|
||||||
valid_urls,
|
|
||||||
writer,
|
|
||||||
target_sample_rate,
|
|
||||||
offsets_seconds=None,
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
await writer.flush()
|
|
||||||
|
|
||||||
file_size = Path(output_path).stat().st_size
|
|
||||||
storage_path = f"{input.transcript_id}/audio.mp3"
|
|
||||||
|
|
||||||
with open(output_path, "rb") as mixed_file:
|
|
||||||
await storage.put_file(storage_path, mixed_file)
|
|
||||||
|
|
||||||
Path(output_path).unlink(missing_ok=True)
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
|
||||||
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
|
||||||
if transcript:
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript, {"audio_location": "storage"}
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}")
|
|
||||||
|
|
||||||
return MixdownResult(
|
|
||||||
audio_key=storage_path,
|
|
||||||
duration=duration_ms_callback_capture_container[0],
|
|
||||||
tracks_mixed=len(valid_urls),
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
|
||||||
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=120), retries=3
|
|
||||||
)
|
|
||||||
@with_error_handling("generate_waveform")
|
|
||||||
async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResult:
|
|
||||||
"""Generate audio waveform visualization using AudioWaveformProcessor (matches Celery)."""
|
|
||||||
ctx.log(f"generate_waveform: transcript_id={input.transcript_id}")
|
|
||||||
|
|
||||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
|
||||||
TranscriptWaveform,
|
|
||||||
transcripts_controller,
|
|
||||||
)
|
|
||||||
|
|
||||||
mixdown_result = ctx.task_output(mixdown_tracks)
|
|
||||||
audio_key = mixdown_result.audio_key
|
|
||||||
|
|
||||||
storage = _spawn_storage()
|
|
||||||
audio_url = await storage.get_file_url(
|
|
||||||
audio_key,
|
|
||||||
operation="get_object",
|
|
||||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Download MP3 to temp file (AudioWaveformProcessor needs local file)
|
|
||||||
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
|
|
||||||
temp_path = temp_file.name
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with httpx.AsyncClient() as client:
|
|
||||||
response = await client.get(audio_url, timeout=120)
|
|
||||||
response.raise_for_status()
|
|
||||||
with open(temp_path, "wb") as f:
|
|
||||||
f.write(response.content)
|
|
||||||
|
|
||||||
waveform = get_audio_waveform(
|
|
||||||
path=Path(temp_path), segments_count=WAVEFORM_SEGMENTS
|
|
||||||
)
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
|
||||||
if transcript:
|
|
||||||
waveform_data = TranscriptWaveform(waveform=waveform)
|
|
||||||
await append_event_and_broadcast(
|
|
||||||
input.transcript_id,
|
|
||||||
transcript,
|
|
||||||
"WAVEFORM",
|
|
||||||
waveform_data,
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
Path(temp_path).unlink(missing_ok=True)
|
|
||||||
|
|
||||||
ctx.log("generate_waveform complete")
|
|
||||||
|
|
||||||
return WaveformResult(waveform_generated=True)
|
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
|
||||||
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=300), retries=3
|
|
||||||
)
|
|
||||||
@with_error_handling("detect_topics")
|
|
||||||
async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
|
|
||||||
"""Detect topics using LLM and save to database (matches Celery on_topic callback)."""
|
|
||||||
ctx.log("detect_topics: analyzing transcript for topics")
|
|
||||||
|
|
||||||
track_result = ctx.task_output(process_tracks)
|
|
||||||
words = track_result.all_words
|
|
||||||
target_language = track_result.target_language
|
|
||||||
|
|
||||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
|
||||||
TranscriptTopic,
|
|
||||||
transcripts_controller,
|
|
||||||
)
|
|
||||||
|
|
||||||
word_objects = [Word(**w) for w in words]
|
|
||||||
transcript_type = TranscriptType(words=word_objects)
|
|
||||||
|
|
||||||
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
|
||||||
|
|
||||||
async def on_topic_callback(data):
|
|
||||||
topic = TranscriptTopic(
|
|
||||||
title=data.title,
|
|
||||||
summary=data.summary,
|
|
||||||
timestamp=data.timestamp,
|
|
||||||
transcript=data.transcript.text,
|
|
||||||
words=data.transcript.words,
|
|
||||||
)
|
|
||||||
if isinstance(
|
|
||||||
data, TitleSummaryWithId
|
|
||||||
): # Celery parity: main_live_pipeline.py
|
|
||||||
topic.id = data.id
|
|
||||||
await transcripts_controller.upsert_topic(transcript, topic)
|
|
||||||
await append_event_and_broadcast(
|
|
||||||
input.transcript_id, transcript, "TOPIC", topic, logger=logger
|
|
||||||
)
|
|
||||||
|
|
||||||
topics = await topic_processing.detect_topics(
|
|
||||||
transcript_type,
|
|
||||||
target_language,
|
|
||||||
on_topic_callback=on_topic_callback,
|
|
||||||
empty_pipeline=empty_pipeline,
|
|
||||||
)
|
|
||||||
|
|
||||||
topics_list = [t.model_dump() for t in topics]
|
|
||||||
|
|
||||||
ctx.log(f"detect_topics complete: found {len(topics_list)} topics")
|
|
||||||
|
|
||||||
return TopicsResult(topics=topics_list)
|
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
|
||||||
parents=[detect_topics], execution_timeout=timedelta(seconds=120), retries=3
|
|
||||||
)
|
|
||||||
@with_error_handling("generate_title")
|
|
||||||
async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
|
|
||||||
"""Generate meeting title using LLM and save to database (matches Celery on_title callback)."""
|
|
||||||
ctx.log("generate_title: generating title from topics")
|
|
||||||
|
|
||||||
topics_result = ctx.task_output(detect_topics)
|
|
||||||
topics = topics_result.topics
|
|
||||||
|
|
||||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
|
||||||
TranscriptFinalTitle,
|
|
||||||
transcripts_controller,
|
|
||||||
)
|
|
||||||
|
|
||||||
topic_objects = [TitleSummary(**t) for t in topics]
|
|
||||||
|
|
||||||
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
|
|
||||||
title_result = None
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
|
||||||
|
|
||||||
async def on_title_callback(data):
|
|
||||||
nonlocal title_result
|
|
||||||
title_result = data.title
|
|
||||||
final_title = TranscriptFinalTitle(title=data.title)
|
|
||||||
if not transcript.title:
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript,
|
|
||||||
{"title": final_title.title},
|
|
||||||
)
|
|
||||||
await append_event_and_broadcast(
|
|
||||||
input.transcript_id,
|
|
||||||
transcript,
|
|
||||||
"FINAL_TITLE",
|
|
||||||
final_title,
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
await topic_processing.generate_title(
|
|
||||||
topic_objects,
|
|
||||||
on_title_callback=on_title_callback,
|
|
||||||
empty_pipeline=empty_pipeline,
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx.log(f"generate_title complete: '{title_result}'")
|
|
||||||
|
|
||||||
return TitleResult(title=title_result)
|
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
|
||||||
parents=[detect_topics], execution_timeout=timedelta(seconds=300), retries=3
|
|
||||||
)
|
|
||||||
@with_error_handling("generate_summary")
|
|
||||||
async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
|
|
||||||
"""Generate meeting summary using LLM and save to database (matches Celery callbacks)."""
|
|
||||||
ctx.log("generate_summary: generating long and short summaries")
|
|
||||||
|
|
||||||
topics_result = ctx.task_output(detect_topics)
|
|
||||||
topics = topics_result.topics
|
|
||||||
|
|
||||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
|
||||||
TranscriptActionItems,
|
|
||||||
TranscriptFinalLongSummary,
|
|
||||||
TranscriptFinalShortSummary,
|
|
||||||
transcripts_controller,
|
|
||||||
)
|
|
||||||
|
|
||||||
topic_objects = [TitleSummary(**t) for t in topics]
|
|
||||||
|
|
||||||
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
|
|
||||||
summary_result = None
|
|
||||||
short_summary_result = None
|
|
||||||
action_items_result = None
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
|
||||||
|
|
||||||
async def on_long_summary_callback(data):
|
|
||||||
nonlocal summary_result
|
|
||||||
summary_result = data.long_summary
|
|
||||||
final_long_summary = TranscriptFinalLongSummary(
|
|
||||||
long_summary=data.long_summary
|
|
||||||
)
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript,
|
|
||||||
{"long_summary": final_long_summary.long_summary},
|
|
||||||
)
|
|
||||||
await append_event_and_broadcast(
|
|
||||||
input.transcript_id,
|
|
||||||
transcript,
|
|
||||||
"FINAL_LONG_SUMMARY",
|
|
||||||
final_long_summary,
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def on_short_summary_callback(data):
|
|
||||||
nonlocal short_summary_result
|
|
||||||
short_summary_result = data.short_summary
|
|
||||||
final_short_summary = TranscriptFinalShortSummary(
|
|
||||||
short_summary=data.short_summary
|
|
||||||
)
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript,
|
|
||||||
{"short_summary": final_short_summary.short_summary},
|
|
||||||
)
|
|
||||||
await append_event_and_broadcast(
|
|
||||||
input.transcript_id,
|
|
||||||
transcript,
|
|
||||||
"FINAL_SHORT_SUMMARY",
|
|
||||||
final_short_summary,
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
async def on_action_items_callback(data):
|
|
||||||
nonlocal action_items_result
|
|
||||||
action_items_result = data.action_items
|
|
||||||
action_items = TranscriptActionItems(action_items=data.action_items)
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript,
|
|
||||||
{"action_items": action_items.action_items},
|
|
||||||
)
|
|
||||||
await append_event_and_broadcast(
|
|
||||||
input.transcript_id,
|
|
||||||
transcript,
|
|
||||||
"ACTION_ITEMS",
|
|
||||||
action_items,
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
await topic_processing.generate_summaries(
|
|
||||||
topic_objects,
|
|
||||||
transcript,
|
|
||||||
on_long_summary_callback=on_long_summary_callback,
|
|
||||||
on_short_summary_callback=on_short_summary_callback,
|
|
||||||
on_action_items_callback=on_action_items_callback,
|
|
||||||
empty_pipeline=empty_pipeline,
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx.log("generate_summary complete")
|
|
||||||
|
|
||||||
return SummaryResult(
|
|
||||||
summary=summary_result,
|
|
||||||
short_summary=short_summary_result,
|
|
||||||
action_items=action_items_result,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
|
||||||
parents=[generate_waveform, generate_title, generate_summary],
|
|
||||||
execution_timeout=timedelta(seconds=60),
|
|
||||||
retries=3,
|
|
||||||
)
|
|
||||||
@with_error_handling("finalize")
|
|
||||||
async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
|
|
||||||
"""Finalize transcript: save words, emit TRANSCRIPT event, set status to 'ended'.
|
|
||||||
|
|
||||||
Matches Celery's on_transcript + set_status behavior.
|
|
||||||
Note: Title and summaries are already saved by their respective task callbacks.
|
|
||||||
"""
|
|
||||||
ctx.log("finalize: saving transcript and setting status to 'ended'")
|
|
||||||
|
|
||||||
mixdown_result = ctx.task_output(mixdown_tracks)
|
|
||||||
track_result = ctx.task_output(process_tracks)
|
|
||||||
|
|
||||||
duration = mixdown_result.duration
|
|
||||||
all_words = track_result.all_words
|
|
||||||
|
|
||||||
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
|
|
||||||
created_padded_files = track_result.created_padded_files
|
|
||||||
if created_padded_files:
|
|
||||||
ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files")
|
|
||||||
storage = _spawn_storage()
|
|
||||||
cleanup_results = await asyncio.gather(
|
|
||||||
*[storage.delete_file(path) for path in created_padded_files],
|
|
||||||
return_exceptions=True,
|
|
||||||
)
|
|
||||||
for storage_path, result in zip(created_padded_files, cleanup_results):
|
|
||||||
if isinstance(result, Exception):
|
|
||||||
logger.warning(
|
|
||||||
"[Hatchet] Failed to cleanup temporary padded track",
|
|
||||||
storage_path=storage_path,
|
|
||||||
error=str(result),
|
|
||||||
)
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
|
||||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
|
||||||
TranscriptDuration,
|
|
||||||
TranscriptText,
|
|
||||||
transcripts_controller,
|
|
||||||
)
|
|
||||||
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
|
||||||
if transcript is None:
|
|
||||||
raise ValueError(f"Transcript {input.transcript_id} not found in database")
|
|
||||||
|
|
||||||
word_objects = [Word(**w) for w in all_words]
|
|
||||||
merged_transcript = TranscriptType(words=word_objects, translation=None)
|
|
||||||
|
|
||||||
await append_event_and_broadcast(
|
|
||||||
input.transcript_id,
|
|
||||||
transcript,
|
|
||||||
"TRANSCRIPT",
|
|
||||||
TranscriptText(
|
|
||||||
text=merged_transcript.text,
|
|
||||||
translation=merged_transcript.translation,
|
|
||||||
),
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Save duration and clear workflow_run_id (workflow completed successfully)
|
|
||||||
# Note: title/long_summary/short_summary already saved by their callbacks
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript,
|
|
||||||
{
|
|
||||||
"duration": duration,
|
|
||||||
"workflow_run_id": None, # Clear on success - no need to resume
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
duration_data = TranscriptDuration(duration=duration)
|
|
||||||
await append_event_and_broadcast(
|
|
||||||
input.transcript_id, transcript, "DURATION", duration_data, logger=logger
|
|
||||||
)
|
|
||||||
|
|
||||||
await set_status_and_broadcast(input.transcript_id, "ended", logger=logger)
|
|
||||||
|
|
||||||
ctx.log(
|
|
||||||
f"finalize complete: transcript {input.transcript_id} status set to 'ended'"
|
|
||||||
)
|
|
||||||
|
|
||||||
return FinalizeResult(status="COMPLETED")
|
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
|
||||||
parents=[finalize], execution_timeout=timedelta(seconds=60), retries=3
|
|
||||||
)
|
|
||||||
@with_error_handling("cleanup_consent", set_error_status=False)
|
|
||||||
async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
|
|
||||||
"""Check consent and delete audio files if any participant denied."""
|
|
||||||
ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}")
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
|
||||||
from reflector.db.meetings import ( # noqa: PLC0415
|
|
||||||
meeting_consent_controller,
|
|
||||||
meetings_controller,
|
|
||||||
)
|
|
||||||
from reflector.db.recordings import recordings_controller # noqa: PLC0415
|
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
|
||||||
from reflector.storage import get_transcripts_storage # noqa: PLC0415
|
|
||||||
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
|
||||||
if not transcript:
|
|
||||||
ctx.log("cleanup_consent: transcript not found")
|
|
||||||
return ConsentResult()
|
|
||||||
|
|
||||||
consent_denied = False
|
|
||||||
if transcript.meeting_id:
|
|
||||||
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
|
|
||||||
if meeting:
|
|
||||||
consent_denied = await meeting_consent_controller.has_any_denial(
|
|
||||||
meeting.id
|
|
||||||
)
|
|
||||||
|
|
||||||
if not consent_denied:
|
|
||||||
ctx.log("cleanup_consent: consent approved, keeping all files")
|
|
||||||
return ConsentResult()
|
|
||||||
|
|
||||||
ctx.log("cleanup_consent: consent denied, deleting audio files")
|
|
||||||
|
|
||||||
input_track_keys = set(t["s3_key"] for t in input.tracks)
|
|
||||||
|
|
||||||
# Detect if recording.track_keys was manually modified after workflow started
|
|
||||||
if transcript.recording_id:
|
|
||||||
recording = await recordings_controller.get_by_id(transcript.recording_id)
|
|
||||||
if recording and recording.track_keys:
|
|
||||||
db_track_keys = set(filter_cam_audio_tracks(recording.track_keys))
|
|
||||||
|
|
||||||
if input_track_keys != db_track_keys:
|
|
||||||
added = db_track_keys - input_track_keys
|
|
||||||
removed = input_track_keys - db_track_keys
|
|
||||||
logger.warning(
|
|
||||||
"[Hatchet] Track keys mismatch: DB changed since workflow start",
|
|
||||||
transcript_id=input.transcript_id,
|
|
||||||
recording_id=transcript.recording_id,
|
|
||||||
input_count=len(input_track_keys),
|
|
||||||
db_count=len(db_track_keys),
|
|
||||||
added_in_db=list(added) if added else None,
|
|
||||||
removed_from_db=list(removed) if removed else None,
|
|
||||||
)
|
|
||||||
ctx.log(
|
|
||||||
f"WARNING: track_keys mismatch - "
|
|
||||||
f"input has {len(input_track_keys)}, DB has {len(db_track_keys)}. "
|
|
||||||
f"Using input tracks for deletion."
|
|
||||||
)
|
|
||||||
|
|
||||||
deletion_errors = []
|
|
||||||
|
|
||||||
if input_track_keys and input.bucket_name:
|
|
||||||
master_storage = get_transcripts_storage()
|
|
||||||
for key in input_track_keys:
|
|
||||||
try:
|
|
||||||
await master_storage.delete_file(key, bucket=input.bucket_name)
|
|
||||||
ctx.log(f"Deleted recording file: {input.bucket_name}/{key}")
|
|
||||||
except Exception as e:
|
|
||||||
error_msg = f"Failed to delete {key}: {e}"
|
|
||||||
logger.error(error_msg, exc_info=True)
|
|
||||||
deletion_errors.append(error_msg)
|
|
||||||
|
|
||||||
if transcript.audio_location == "storage":
|
|
||||||
storage = get_transcripts_storage()
|
|
||||||
try:
|
|
||||||
await storage.delete_file(transcript.storage_audio_path)
|
|
||||||
ctx.log(f"Deleted processed audio: {transcript.storage_audio_path}")
|
|
||||||
except Exception as e:
|
|
||||||
error_msg = f"Failed to delete processed audio: {e}"
|
|
||||||
logger.error(error_msg, exc_info=True)
|
|
||||||
deletion_errors.append(error_msg)
|
|
||||||
|
|
||||||
if deletion_errors:
|
|
||||||
logger.warning(
|
|
||||||
"[Hatchet] cleanup_consent completed with errors",
|
|
||||||
transcript_id=input.transcript_id,
|
|
||||||
error_count=len(deletion_errors),
|
|
||||||
errors=deletion_errors,
|
|
||||||
)
|
|
||||||
ctx.log(f"cleanup_consent completed with {len(deletion_errors)} errors")
|
|
||||||
else:
|
|
||||||
await transcripts_controller.update(transcript, {"audio_deleted": True})
|
|
||||||
ctx.log("cleanup_consent: all audio deleted successfully")
|
|
||||||
|
|
||||||
return ConsentResult()
|
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
|
||||||
parents=[cleanup_consent], execution_timeout=timedelta(seconds=60), retries=5
|
|
||||||
)
|
|
||||||
@with_error_handling("post_zulip", set_error_status=False)
|
|
||||||
async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
|
|
||||||
"""Post notification to Zulip."""
|
|
||||||
ctx.log(f"post_zulip: transcript_id={input.transcript_id}")
|
|
||||||
|
|
||||||
if not settings.ZULIP_REALM:
|
|
||||||
ctx.log("post_zulip skipped (Zulip not configured)")
|
|
||||||
return ZulipResult(zulip_message_id=None, skipped=True)
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
|
||||||
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
|
||||||
if transcript:
|
|
||||||
message_id = await post_transcript_notification(transcript)
|
|
||||||
ctx.log(f"post_zulip complete: zulip_message_id={message_id}")
|
|
||||||
else:
|
|
||||||
message_id = None
|
|
||||||
|
|
||||||
return ZulipResult(zulip_message_id=message_id)
|
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
|
||||||
parents=[post_zulip], execution_timeout=timedelta(seconds=120), retries=30
|
|
||||||
)
|
|
||||||
@with_error_handling("send_webhook", set_error_status=False)
|
|
||||||
async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
|
|
||||||
"""Send completion webhook to external service."""
|
|
||||||
ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
|
|
||||||
|
|
||||||
if not input.room_id:
|
|
||||||
ctx.log("send_webhook skipped (no room_id)")
|
|
||||||
return WebhookResult(webhook_sent=False, skipped=True)
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
|
||||||
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
|
||||||
|
|
||||||
room = await rooms_controller.get_by_id(input.room_id)
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
|
||||||
|
|
||||||
if room and room.webhook_url and transcript:
|
|
||||||
webhook_payload = {
|
|
||||||
"event": "transcript.completed",
|
|
||||||
"transcript_id": input.transcript_id,
|
|
||||||
"title": transcript.title,
|
|
||||||
"duration": transcript.duration,
|
|
||||||
}
|
|
||||||
|
|
||||||
async with httpx.AsyncClient() as client:
|
|
||||||
response = await client.post(
|
|
||||||
room.webhook_url, json=webhook_payload, timeout=30
|
|
||||||
)
|
|
||||||
response.raise_for_status()
|
|
||||||
|
|
||||||
ctx.log(f"send_webhook complete: status_code={response.status_code}")
|
|
||||||
|
|
||||||
return WebhookResult(webhook_sent=True, response_code=response.status_code)
|
|
||||||
|
|
||||||
return WebhookResult(webhook_sent=False, skipped=True)
|
|
||||||
@@ -1,124 +0,0 @@
|
|||||||
"""
|
|
||||||
Pydantic models for Hatchet workflow task return types.
|
|
||||||
|
|
||||||
Provides static typing for all task outputs, enabling type checking
|
|
||||||
and better IDE support.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
from reflector.utils.string import NonEmptyString
|
|
||||||
|
|
||||||
|
|
||||||
class PadTrackResult(BaseModel):
|
|
||||||
"""Result from pad_track task."""
|
|
||||||
|
|
||||||
padded_key: NonEmptyString # S3 key (not presigned URL) - presign on demand to avoid stale URLs on replay
|
|
||||||
bucket_name: (
|
|
||||||
NonEmptyString | None
|
|
||||||
) # None means use default transcript storage bucket
|
|
||||||
size: int
|
|
||||||
track_index: int
|
|
||||||
|
|
||||||
|
|
||||||
class TranscribeTrackResult(BaseModel):
|
|
||||||
"""Result from transcribe_track task."""
|
|
||||||
|
|
||||||
words: list[dict[str, Any]]
|
|
||||||
track_index: int
|
|
||||||
|
|
||||||
|
|
||||||
class RecordingResult(BaseModel):
|
|
||||||
"""Result from get_recording task."""
|
|
||||||
|
|
||||||
id: NonEmptyString | None
|
|
||||||
mtg_session_id: NonEmptyString | None
|
|
||||||
duration: float
|
|
||||||
|
|
||||||
|
|
||||||
class ParticipantsResult(BaseModel):
|
|
||||||
"""Result from get_participants task."""
|
|
||||||
|
|
||||||
participants: list[dict[str, Any]]
|
|
||||||
num_tracks: int
|
|
||||||
source_language: NonEmptyString
|
|
||||||
target_language: NonEmptyString
|
|
||||||
|
|
||||||
|
|
||||||
class PaddedTrackInfo(BaseModel):
|
|
||||||
"""Info for a padded track - S3 key + bucket for on-demand presigning."""
|
|
||||||
|
|
||||||
key: NonEmptyString
|
|
||||||
bucket_name: NonEmptyString | None # None = use default storage bucket
|
|
||||||
|
|
||||||
|
|
||||||
class ProcessTracksResult(BaseModel):
|
|
||||||
"""Result from process_tracks task."""
|
|
||||||
|
|
||||||
all_words: list[dict[str, Any]]
|
|
||||||
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
|
|
||||||
word_count: int
|
|
||||||
num_tracks: int
|
|
||||||
target_language: NonEmptyString
|
|
||||||
created_padded_files: list[NonEmptyString]
|
|
||||||
|
|
||||||
|
|
||||||
class MixdownResult(BaseModel):
|
|
||||||
"""Result from mixdown_tracks task."""
|
|
||||||
|
|
||||||
audio_key: NonEmptyString
|
|
||||||
duration: float
|
|
||||||
tracks_mixed: int
|
|
||||||
|
|
||||||
|
|
||||||
class WaveformResult(BaseModel):
|
|
||||||
"""Result from generate_waveform task."""
|
|
||||||
|
|
||||||
waveform_generated: bool
|
|
||||||
|
|
||||||
|
|
||||||
class TopicsResult(BaseModel):
|
|
||||||
"""Result from detect_topics task."""
|
|
||||||
|
|
||||||
topics: list[dict[str, Any]]
|
|
||||||
|
|
||||||
|
|
||||||
class TitleResult(BaseModel):
|
|
||||||
"""Result from generate_title task."""
|
|
||||||
|
|
||||||
title: str | None
|
|
||||||
|
|
||||||
|
|
||||||
class SummaryResult(BaseModel):
|
|
||||||
"""Result from generate_summary task."""
|
|
||||||
|
|
||||||
summary: str | None
|
|
||||||
short_summary: str | None
|
|
||||||
action_items: dict | None = None
|
|
||||||
|
|
||||||
|
|
||||||
class FinalizeResult(BaseModel):
|
|
||||||
"""Result from finalize task."""
|
|
||||||
|
|
||||||
status: NonEmptyString
|
|
||||||
|
|
||||||
|
|
||||||
class ConsentResult(BaseModel):
|
|
||||||
"""Result from cleanup_consent task."""
|
|
||||||
|
|
||||||
|
|
||||||
class ZulipResult(BaseModel):
|
|
||||||
"""Result from post_zulip task."""
|
|
||||||
|
|
||||||
zulip_message_id: int | None = None
|
|
||||||
skipped: bool = False
|
|
||||||
|
|
||||||
|
|
||||||
class WebhookResult(BaseModel):
|
|
||||||
"""Result from send_webhook task."""
|
|
||||||
|
|
||||||
webhook_sent: bool
|
|
||||||
skipped: bool = False
|
|
||||||
response_code: int | None = None
|
|
||||||
@@ -1,222 +0,0 @@
|
|||||||
"""
|
|
||||||
Hatchet child workflow: TrackProcessing
|
|
||||||
|
|
||||||
Handles individual audio track processing: padding and transcription.
|
|
||||||
Spawned dynamically by the main diarization pipeline for each track.
|
|
||||||
|
|
||||||
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline)
|
|
||||||
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
|
|
||||||
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
|
|
||||||
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py.
|
|
||||||
|
|
||||||
Note: This file uses deferred imports (inside tasks) intentionally.
|
|
||||||
Hatchet workers run in forked processes; fresh imports per task ensure
|
|
||||||
storage/DB connections are not shared across forks.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import tempfile
|
|
||||||
from datetime import timedelta
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import av
|
|
||||||
from hatchet_sdk import Context
|
|
||||||
from pydantic import BaseModel
|
|
||||||
|
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
|
||||||
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
|
|
||||||
from reflector.logger import logger
|
|
||||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
|
||||||
from reflector.utils.audio_padding import (
|
|
||||||
apply_audio_padding_to_file,
|
|
||||||
extract_stream_start_time_from_container,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class TrackInput(BaseModel):
|
|
||||||
"""Input for individual track processing."""
|
|
||||||
|
|
||||||
track_index: int
|
|
||||||
s3_key: str
|
|
||||||
bucket_name: str
|
|
||||||
transcript_id: str
|
|
||||||
language: str = "en"
|
|
||||||
|
|
||||||
|
|
||||||
hatchet = HatchetClientManager.get_client()
|
|
||||||
|
|
||||||
track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput)
|
|
||||||
|
|
||||||
|
|
||||||
@track_workflow.task(execution_timeout=timedelta(seconds=300), retries=3)
|
|
||||||
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
|
||||||
"""Pad single audio track with silence for alignment.
|
|
||||||
|
|
||||||
Extracts stream.start_time from WebM container metadata and applies
|
|
||||||
silence padding using PyAV filter graph (adelay).
|
|
||||||
"""
|
|
||||||
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
|
|
||||||
logger.info(
|
|
||||||
"[Hatchet] pad_track",
|
|
||||||
track_index=input.track_index,
|
|
||||||
s3_key=input.s3_key,
|
|
||||||
transcript_id=input.transcript_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Create fresh storage instance to avoid aioboto3 fork issues
|
|
||||||
from reflector.settings import settings # noqa: PLC0415
|
|
||||||
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
|
||||||
|
|
||||||
storage = AwsStorage(
|
|
||||||
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
|
||||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
|
||||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
|
||||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
|
||||||
)
|
|
||||||
|
|
||||||
source_url = await storage.get_file_url(
|
|
||||||
input.s3_key,
|
|
||||||
operation="get_object",
|
|
||||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
|
||||||
bucket=input.bucket_name,
|
|
||||||
)
|
|
||||||
|
|
||||||
with av.open(source_url) as in_container:
|
|
||||||
start_time_seconds = extract_stream_start_time_from_container(
|
|
||||||
in_container, input.track_index, logger=logger
|
|
||||||
)
|
|
||||||
|
|
||||||
# If no padding needed, return original S3 key
|
|
||||||
if start_time_seconds <= 0:
|
|
||||||
logger.info(
|
|
||||||
f"Track {input.track_index} requires no padding",
|
|
||||||
track_index=input.track_index,
|
|
||||||
)
|
|
||||||
return PadTrackResult(
|
|
||||||
padded_key=input.s3_key,
|
|
||||||
bucket_name=input.bucket_name,
|
|
||||||
size=0,
|
|
||||||
track_index=input.track_index,
|
|
||||||
)
|
|
||||||
|
|
||||||
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
|
|
||||||
temp_path = temp_file.name
|
|
||||||
|
|
||||||
try:
|
|
||||||
apply_audio_padding_to_file(
|
|
||||||
in_container,
|
|
||||||
temp_path,
|
|
||||||
start_time_seconds,
|
|
||||||
input.track_index,
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
|
|
||||||
file_size = Path(temp_path).stat().st_size
|
|
||||||
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f"About to upload padded track",
|
|
||||||
key=storage_path,
|
|
||||||
size=file_size,
|
|
||||||
)
|
|
||||||
|
|
||||||
with open(temp_path, "rb") as padded_file:
|
|
||||||
await storage.put_file(storage_path, padded_file)
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
f"Uploaded padded track to S3",
|
|
||||||
key=storage_path,
|
|
||||||
size=file_size,
|
|
||||||
)
|
|
||||||
finally:
|
|
||||||
Path(temp_path).unlink(missing_ok=True)
|
|
||||||
|
|
||||||
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
|
|
||||||
logger.info(
|
|
||||||
"[Hatchet] pad_track complete",
|
|
||||||
track_index=input.track_index,
|
|
||||||
padded_key=storage_path,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Return S3 key (not presigned URL) - consumer tasks presign on demand
|
|
||||||
# This avoids stale URLs when workflow is replayed
|
|
||||||
return PadTrackResult(
|
|
||||||
padded_key=storage_path,
|
|
||||||
bucket_name=None, # None = use default transcript storage bucket
|
|
||||||
size=file_size,
|
|
||||||
track_index=input.track_index,
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("[Hatchet] pad_track failed", error=str(e), exc_info=True)
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
@track_workflow.task(
|
|
||||||
parents=[pad_track], execution_timeout=timedelta(seconds=600), retries=3
|
|
||||||
)
|
|
||||||
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
|
|
||||||
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
|
|
||||||
ctx.log(f"transcribe_track: track {input.track_index}, language={input.language}")
|
|
||||||
logger.info(
|
|
||||||
"[Hatchet] transcribe_track",
|
|
||||||
track_index=input.track_index,
|
|
||||||
language=input.language,
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
pad_result = ctx.task_output(pad_track)
|
|
||||||
padded_key = pad_result.padded_key
|
|
||||||
bucket_name = pad_result.bucket_name
|
|
||||||
|
|
||||||
if not padded_key:
|
|
||||||
raise ValueError("Missing padded_key from pad_track")
|
|
||||||
|
|
||||||
# Presign URL on demand (avoids stale URLs on workflow replay)
|
|
||||||
from reflector.settings import settings # noqa: PLC0415
|
|
||||||
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
|
|
||||||
|
|
||||||
storage = AwsStorage(
|
|
||||||
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
|
|
||||||
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
|
||||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
|
||||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
|
||||||
)
|
|
||||||
|
|
||||||
audio_url = await storage.get_file_url(
|
|
||||||
padded_key,
|
|
||||||
operation="get_object",
|
|
||||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
|
||||||
bucket=bucket_name,
|
|
||||||
)
|
|
||||||
|
|
||||||
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
|
|
||||||
transcribe_file_with_processor,
|
|
||||||
)
|
|
||||||
|
|
||||||
transcript = await transcribe_file_with_processor(audio_url, input.language)
|
|
||||||
|
|
||||||
# Tag all words with speaker index
|
|
||||||
words = []
|
|
||||||
for word in transcript.words:
|
|
||||||
word_dict = word.model_dump()
|
|
||||||
word_dict["speaker"] = input.track_index
|
|
||||||
words.append(word_dict)
|
|
||||||
|
|
||||||
ctx.log(
|
|
||||||
f"transcribe_track complete: track {input.track_index}, {len(words)} words"
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"[Hatchet] transcribe_track complete",
|
|
||||||
track_index=input.track_index,
|
|
||||||
word_count=len(words),
|
|
||||||
)
|
|
||||||
|
|
||||||
return TranscribeTrackResult(
|
|
||||||
words=words,
|
|
||||||
track_index=input.track_index,
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("[Hatchet] transcribe_track failed", error=str(e), exc_info=True)
|
|
||||||
raise
|
|
||||||
@@ -97,8 +97,13 @@ class PipelineMainFile(PipelineMainBase):
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Extract audio and write to transcript location
|
||||||
audio_path = await self.extract_and_write_audio(file_path, transcript)
|
audio_path = await self.extract_and_write_audio(file_path, transcript)
|
||||||
|
|
||||||
|
# Upload for processing
|
||||||
audio_url = await self.upload_audio(audio_path, transcript)
|
audio_url = await self.upload_audio(audio_path, transcript)
|
||||||
|
|
||||||
|
# Run parallel processing
|
||||||
await self.run_parallel_processing(
|
await self.run_parallel_processing(
|
||||||
audio_path,
|
audio_path,
|
||||||
audio_url,
|
audio_url,
|
||||||
@@ -192,6 +197,7 @@ class PipelineMainFile(PipelineMainBase):
|
|||||||
transcript_result = results[0]
|
transcript_result = results[0]
|
||||||
diarization_result = results[1]
|
diarization_result = results[1]
|
||||||
|
|
||||||
|
# Handle errors - raise any exception that occurred
|
||||||
self._handle_gather_exceptions(results, "parallel processing")
|
self._handle_gather_exceptions(results, "parallel processing")
|
||||||
for result in results:
|
for result in results:
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
@@ -206,6 +212,7 @@ class PipelineMainFile(PipelineMainBase):
|
|||||||
transcript=transcript_result, diarization=diarization_result or []
|
transcript=transcript_result, diarization=diarization_result or []
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Store result for retrieval
|
||||||
diarized_transcript: Transcript | None = None
|
diarized_transcript: Transcript | None = None
|
||||||
|
|
||||||
async def capture_result(transcript):
|
async def capture_result(transcript):
|
||||||
@@ -342,6 +349,7 @@ async def task_pipeline_file_process(*, transcript_id: str):
|
|||||||
try:
|
try:
|
||||||
await pipeline.set_status(transcript_id, "processing")
|
await pipeline.set_status(transcript_id, "processing")
|
||||||
|
|
||||||
|
# Find the file to process
|
||||||
audio_file = next(transcript.data_path.glob("upload.*"), None)
|
audio_file = next(transcript.data_path.glob("upload.*"), None)
|
||||||
if not audio_file:
|
if not audio_file:
|
||||||
audio_file = next(transcript.data_path.glob("audio.*"), None)
|
audio_file = next(transcript.data_path.glob("audio.*"), None)
|
||||||
|
|||||||
@@ -112,7 +112,7 @@ def get_transcript(func):
|
|||||||
transcript_id = kwargs.pop("transcript_id")
|
transcript_id = kwargs.pop("transcript_id")
|
||||||
transcript = await transcripts_controller.get_by_id(transcript_id=transcript_id)
|
transcript = await transcripts_controller.get_by_id(transcript_id=transcript_id)
|
||||||
if not transcript:
|
if not transcript:
|
||||||
raise Exception("Transcript {transcript_id} not found")
|
raise Exception(f"Transcript {transcript_id} not found")
|
||||||
|
|
||||||
# Enhanced logger with Celery task context
|
# Enhanced logger with Celery task context
|
||||||
tlogger = logger.bind(transcript_id=transcript.id)
|
tlogger = logger.bind(transcript_id=transcript.id)
|
||||||
|
|||||||
@@ -1,8 +1,11 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import math
|
||||||
import tempfile
|
import tempfile
|
||||||
|
from fractions import Fraction
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
import av
|
import av
|
||||||
|
from av.audio.resampler import AudioResampler
|
||||||
from celery import chain, shared_task
|
from celery import chain, shared_task
|
||||||
|
|
||||||
from reflector.asynctask import asynctask
|
from reflector.asynctask import asynctask
|
||||||
@@ -29,15 +32,6 @@ from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
|
|||||||
from reflector.processors.types import TitleSummary
|
from reflector.processors.types import TitleSummary
|
||||||
from reflector.processors.types import Transcript as TranscriptType
|
from reflector.processors.types import Transcript as TranscriptType
|
||||||
from reflector.storage import Storage, get_transcripts_storage
|
from reflector.storage import Storage, get_transcripts_storage
|
||||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
|
||||||
from reflector.utils.audio_mixdown import (
|
|
||||||
detect_sample_rate_from_tracks,
|
|
||||||
mixdown_tracks_pyav,
|
|
||||||
)
|
|
||||||
from reflector.utils.audio_padding import (
|
|
||||||
apply_audio_padding_to_file,
|
|
||||||
extract_stream_start_time_from_container,
|
|
||||||
)
|
|
||||||
from reflector.utils.daily import (
|
from reflector.utils.daily import (
|
||||||
filter_cam_audio_tracks,
|
filter_cam_audio_tracks,
|
||||||
parse_daily_recording_filename,
|
parse_daily_recording_filename,
|
||||||
@@ -45,6 +39,13 @@ from reflector.utils.daily import (
|
|||||||
from reflector.utils.string import NonEmptyString
|
from reflector.utils.string import NonEmptyString
|
||||||
from reflector.video_platforms.factory import create_platform_client
|
from reflector.video_platforms.factory import create_platform_client
|
||||||
|
|
||||||
|
# Audio encoding constants
|
||||||
|
OPUS_STANDARD_SAMPLE_RATE = 48000
|
||||||
|
OPUS_DEFAULT_BIT_RATE = 128000
|
||||||
|
|
||||||
|
# Storage operation constants
|
||||||
|
PRESIGNED_URL_EXPIRATION_SECONDS = 7200 # 2 hours
|
||||||
|
|
||||||
|
|
||||||
class PipelineMainMultitrack(PipelineMainBase):
|
class PipelineMainMultitrack(PipelineMainBase):
|
||||||
def __init__(self, transcript_id: str):
|
def __init__(self, transcript_id: str):
|
||||||
@@ -124,8 +125,8 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
try:
|
try:
|
||||||
# PyAV streams input from S3 URL efficiently (2-5MB fixed overhead for codec/filters)
|
# PyAV streams input from S3 URL efficiently (2-5MB fixed overhead for codec/filters)
|
||||||
with av.open(track_url) as in_container:
|
with av.open(track_url) as in_container:
|
||||||
start_time_seconds = extract_stream_start_time_from_container(
|
start_time_seconds = self._extract_stream_start_time_from_container(
|
||||||
in_container, track_idx, logger=self.logger
|
in_container, track_idx
|
||||||
)
|
)
|
||||||
|
|
||||||
if start_time_seconds <= 0:
|
if start_time_seconds <= 0:
|
||||||
@@ -143,12 +144,8 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
temp_path = temp_file.name
|
temp_path = temp_file.name
|
||||||
|
|
||||||
try:
|
try:
|
||||||
apply_audio_padding_to_file(
|
self._apply_audio_padding_to_file(
|
||||||
in_container,
|
in_container, temp_path, start_time_seconds, track_idx
|
||||||
temp_path,
|
|
||||||
start_time_seconds,
|
|
||||||
track_idx,
|
|
||||||
logger=self.logger,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
storage_path = (
|
storage_path = (
|
||||||
@@ -159,6 +156,7 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
with open(temp_path, "rb") as padded_file:
|
with open(temp_path, "rb") as padded_file:
|
||||||
await storage.put_file(storage_path, padded_file)
|
await storage.put_file(storage_path, padded_file)
|
||||||
finally:
|
finally:
|
||||||
|
# Clean up temp file
|
||||||
Path(temp_path).unlink(missing_ok=True)
|
Path(temp_path).unlink(missing_ok=True)
|
||||||
|
|
||||||
padded_url = await storage.get_file_url(
|
padded_url = await storage.get_file_url(
|
||||||
@@ -188,27 +186,316 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
f"Track {track_idx} padding failed - transcript would have incorrect timestamps"
|
f"Track {track_idx} padding failed - transcript would have incorrect timestamps"
|
||||||
) from e
|
) from e
|
||||||
|
|
||||||
|
def _extract_stream_start_time_from_container(
|
||||||
|
self, container, track_idx: int
|
||||||
|
) -> float:
|
||||||
|
"""
|
||||||
|
Extract meeting-relative start time from WebM stream metadata.
|
||||||
|
Uses PyAV to read stream.start_time from WebM container.
|
||||||
|
More accurate than filename timestamps by ~209ms due to network/encoding delays.
|
||||||
|
"""
|
||||||
|
start_time_seconds = 0.0
|
||||||
|
try:
|
||||||
|
audio_streams = [s for s in container.streams if s.type == "audio"]
|
||||||
|
stream = audio_streams[0] if audio_streams else container.streams[0]
|
||||||
|
|
||||||
|
# 1) Try stream-level start_time (most reliable for Daily.co tracks)
|
||||||
|
if stream.start_time is not None and stream.time_base is not None:
|
||||||
|
start_time_seconds = float(stream.start_time * stream.time_base)
|
||||||
|
|
||||||
|
# 2) Fallback to container-level start_time (in av.time_base units)
|
||||||
|
if (start_time_seconds <= 0) and (container.start_time is not None):
|
||||||
|
start_time_seconds = float(container.start_time * av.time_base)
|
||||||
|
|
||||||
|
# 3) Fallback to first packet DTS in stream.time_base
|
||||||
|
if start_time_seconds <= 0:
|
||||||
|
for packet in container.demux(stream):
|
||||||
|
if packet.dts is not None:
|
||||||
|
start_time_seconds = float(packet.dts * stream.time_base)
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.warning(
|
||||||
|
"PyAV metadata read failed; assuming 0 start_time",
|
||||||
|
track_idx=track_idx,
|
||||||
|
error=str(e),
|
||||||
|
)
|
||||||
|
start_time_seconds = 0.0
|
||||||
|
|
||||||
|
self.logger.info(
|
||||||
|
f"Track {track_idx} stream metadata: start_time={start_time_seconds:.3f}s",
|
||||||
|
track_idx=track_idx,
|
||||||
|
)
|
||||||
|
return start_time_seconds
|
||||||
|
|
||||||
|
def _apply_audio_padding_to_file(
|
||||||
|
self,
|
||||||
|
in_container,
|
||||||
|
output_path: str,
|
||||||
|
start_time_seconds: float,
|
||||||
|
track_idx: int,
|
||||||
|
) -> None:
|
||||||
|
"""Apply silence padding to audio track using PyAV filter graph, writing to file"""
|
||||||
|
delay_ms = math.floor(start_time_seconds * 1000)
|
||||||
|
|
||||||
|
self.logger.info(
|
||||||
|
f"Padding track {track_idx} with {delay_ms}ms delay using PyAV",
|
||||||
|
track_idx=track_idx,
|
||||||
|
delay_ms=delay_ms,
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with av.open(output_path, "w", format="webm") as out_container:
|
||||||
|
in_stream = next(
|
||||||
|
(s for s in in_container.streams if s.type == "audio"), None
|
||||||
|
)
|
||||||
|
if in_stream is None:
|
||||||
|
raise Exception("No audio stream in input")
|
||||||
|
|
||||||
|
out_stream = out_container.add_stream(
|
||||||
|
"libopus", rate=OPUS_STANDARD_SAMPLE_RATE
|
||||||
|
)
|
||||||
|
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
|
||||||
|
graph = av.filter.Graph()
|
||||||
|
|
||||||
|
abuf_args = (
|
||||||
|
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
|
||||||
|
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
|
||||||
|
f"sample_fmt=s16:"
|
||||||
|
f"channel_layout=stereo"
|
||||||
|
)
|
||||||
|
src = graph.add("abuffer", args=abuf_args, name="src")
|
||||||
|
aresample_f = graph.add("aresample", args="async=1", name="ares")
|
||||||
|
# adelay requires one delay value per channel separated by '|'
|
||||||
|
delays_arg = f"{delay_ms}|{delay_ms}"
|
||||||
|
adelay_f = graph.add(
|
||||||
|
"adelay", args=f"delays={delays_arg}:all=1", name="delay"
|
||||||
|
)
|
||||||
|
sink = graph.add("abuffersink", name="sink")
|
||||||
|
|
||||||
|
src.link_to(aresample_f)
|
||||||
|
aresample_f.link_to(adelay_f)
|
||||||
|
adelay_f.link_to(sink)
|
||||||
|
graph.configure()
|
||||||
|
|
||||||
|
resampler = AudioResampler(
|
||||||
|
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
|
||||||
|
)
|
||||||
|
# Decode -> resample -> push through graph -> encode Opus
|
||||||
|
for frame in in_container.decode(in_stream):
|
||||||
|
out_frames = resampler.resample(frame) or []
|
||||||
|
for rframe in out_frames:
|
||||||
|
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
|
||||||
|
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
|
||||||
|
src.push(rframe)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
f_out = sink.pull()
|
||||||
|
except Exception:
|
||||||
|
break
|
||||||
|
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
|
||||||
|
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
|
||||||
|
for packet in out_stream.encode(f_out):
|
||||||
|
out_container.mux(packet)
|
||||||
|
|
||||||
|
src.push(None)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
f_out = sink.pull()
|
||||||
|
except Exception:
|
||||||
|
break
|
||||||
|
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
|
||||||
|
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
|
||||||
|
for packet in out_stream.encode(f_out):
|
||||||
|
out_container.mux(packet)
|
||||||
|
|
||||||
|
for packet in out_stream.encode(None):
|
||||||
|
out_container.mux(packet)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(
|
||||||
|
"PyAV padding failed for track",
|
||||||
|
track_idx=track_idx,
|
||||||
|
delay_ms=delay_ms,
|
||||||
|
error=str(e),
|
||||||
|
exc_info=True,
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
|
||||||
async def mixdown_tracks(
|
async def mixdown_tracks(
|
||||||
self,
|
self,
|
||||||
track_urls: list[str],
|
track_urls: list[str],
|
||||||
writer: AudioFileWriterProcessor,
|
writer: AudioFileWriterProcessor,
|
||||||
offsets_seconds: list[float] | None = None,
|
offsets_seconds: list[float] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Multi-track mixdown using PyAV filter graph (amix), reading from S3 presigned URLs."""
|
"""Multi-track mixdown using PyAV filter graph (amix), reading from S3 presigned URLs"""
|
||||||
target_sample_rate = detect_sample_rate_from_tracks(
|
|
||||||
track_urls, logger=self.logger
|
target_sample_rate: int | None = None
|
||||||
)
|
for url in track_urls:
|
||||||
|
if not url:
|
||||||
|
continue
|
||||||
|
container = None
|
||||||
|
try:
|
||||||
|
container = av.open(url)
|
||||||
|
for frame in container.decode(audio=0):
|
||||||
|
target_sample_rate = frame.sample_rate
|
||||||
|
break
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
finally:
|
||||||
|
if container is not None:
|
||||||
|
container.close()
|
||||||
|
if target_sample_rate:
|
||||||
|
break
|
||||||
|
|
||||||
if not target_sample_rate:
|
if not target_sample_rate:
|
||||||
self.logger.error("Mixdown failed - no decodable audio frames found")
|
self.logger.error("Mixdown failed - no decodable audio frames found")
|
||||||
raise Exception("Mixdown failed: No decodable audio frames in any track")
|
raise Exception("Mixdown failed: No decodable audio frames in any track")
|
||||||
|
# Build PyAV filter graph:
|
||||||
await mixdown_tracks_pyav(
|
# N abuffer (s32/stereo)
|
||||||
track_urls,
|
# -> optional adelay per input (for alignment)
|
||||||
writer,
|
# -> amix (s32)
|
||||||
target_sample_rate,
|
# -> aformat(s16)
|
||||||
offsets_seconds=offsets_seconds,
|
# -> sink
|
||||||
logger=self.logger,
|
graph = av.filter.Graph()
|
||||||
|
inputs = []
|
||||||
|
valid_track_urls = [url for url in track_urls if url]
|
||||||
|
input_offsets_seconds = None
|
||||||
|
if offsets_seconds is not None:
|
||||||
|
input_offsets_seconds = [
|
||||||
|
offsets_seconds[i] for i, url in enumerate(track_urls) if url
|
||||||
|
]
|
||||||
|
for idx, url in enumerate(valid_track_urls):
|
||||||
|
args = (
|
||||||
|
f"time_base=1/{target_sample_rate}:"
|
||||||
|
f"sample_rate={target_sample_rate}:"
|
||||||
|
f"sample_fmt=s32:"
|
||||||
|
f"channel_layout=stereo"
|
||||||
)
|
)
|
||||||
|
in_ctx = graph.add("abuffer", args=args, name=f"in{idx}")
|
||||||
|
inputs.append(in_ctx)
|
||||||
|
|
||||||
|
if not inputs:
|
||||||
|
self.logger.error("Mixdown failed - no valid inputs for graph")
|
||||||
|
raise Exception("Mixdown failed: No valid inputs for filter graph")
|
||||||
|
|
||||||
|
mixer = graph.add("amix", args=f"inputs={len(inputs)}:normalize=0", name="mix")
|
||||||
|
|
||||||
|
fmt = graph.add(
|
||||||
|
"aformat",
|
||||||
|
args=(
|
||||||
|
f"sample_fmts=s32:channel_layouts=stereo:sample_rates={target_sample_rate}"
|
||||||
|
),
|
||||||
|
name="fmt",
|
||||||
|
)
|
||||||
|
|
||||||
|
sink = graph.add("abuffersink", name="out")
|
||||||
|
|
||||||
|
# Optional per-input delay before mixing
|
||||||
|
delays_ms: list[int] = []
|
||||||
|
if input_offsets_seconds is not None:
|
||||||
|
base = min(input_offsets_seconds) if input_offsets_seconds else 0.0
|
||||||
|
delays_ms = [
|
||||||
|
max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
delays_ms = [0 for _ in inputs]
|
||||||
|
|
||||||
|
for idx, in_ctx in enumerate(inputs):
|
||||||
|
delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0
|
||||||
|
if delay_ms > 0:
|
||||||
|
# adelay requires one value per channel; use same for stereo
|
||||||
|
adelay = graph.add(
|
||||||
|
"adelay",
|
||||||
|
args=f"delays={delay_ms}|{delay_ms}:all=1",
|
||||||
|
name=f"delay{idx}",
|
||||||
|
)
|
||||||
|
in_ctx.link_to(adelay)
|
||||||
|
adelay.link_to(mixer, 0, idx)
|
||||||
|
else:
|
||||||
|
in_ctx.link_to(mixer, 0, idx)
|
||||||
|
mixer.link_to(fmt)
|
||||||
|
fmt.link_to(sink)
|
||||||
|
graph.configure()
|
||||||
|
|
||||||
|
containers = []
|
||||||
|
try:
|
||||||
|
# Open all containers with cleanup guaranteed
|
||||||
|
for i, url in enumerate(valid_track_urls):
|
||||||
|
try:
|
||||||
|
c = av.open(
|
||||||
|
url,
|
||||||
|
options={
|
||||||
|
# it's trying to stream from s3 by default
|
||||||
|
"reconnect": "1",
|
||||||
|
"reconnect_streamed": "1",
|
||||||
|
"reconnect_delay_max": "5",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
containers.append(c)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.warning(
|
||||||
|
"Mixdown: failed to open container from URL",
|
||||||
|
input=i,
|
||||||
|
url=url,
|
||||||
|
error=str(e),
|
||||||
|
)
|
||||||
|
|
||||||
|
if not containers:
|
||||||
|
self.logger.error("Mixdown failed - no valid containers opened")
|
||||||
|
raise Exception("Mixdown failed: Could not open any track containers")
|
||||||
|
|
||||||
|
decoders = [c.decode(audio=0) for c in containers]
|
||||||
|
active = [True] * len(decoders)
|
||||||
|
resamplers = [
|
||||||
|
AudioResampler(format="s32", layout="stereo", rate=target_sample_rate)
|
||||||
|
for _ in decoders
|
||||||
|
]
|
||||||
|
|
||||||
|
while any(active):
|
||||||
|
for i, (dec, is_active) in enumerate(zip(decoders, active)):
|
||||||
|
if not is_active:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
frame = next(dec)
|
||||||
|
except StopIteration:
|
||||||
|
active[i] = False
|
||||||
|
# causes stream to move on / unclogs memory
|
||||||
|
inputs[i].push(None)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if frame.sample_rate != target_sample_rate:
|
||||||
|
continue
|
||||||
|
out_frames = resamplers[i].resample(frame) or []
|
||||||
|
for rf in out_frames:
|
||||||
|
rf.sample_rate = target_sample_rate
|
||||||
|
rf.time_base = Fraction(1, target_sample_rate)
|
||||||
|
inputs[i].push(rf)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
mixed = sink.pull()
|
||||||
|
except Exception:
|
||||||
|
break
|
||||||
|
mixed.sample_rate = target_sample_rate
|
||||||
|
mixed.time_base = Fraction(1, target_sample_rate)
|
||||||
|
await writer.push(mixed)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
mixed = sink.pull()
|
||||||
|
except Exception:
|
||||||
|
break
|
||||||
|
mixed.sample_rate = target_sample_rate
|
||||||
|
mixed.time_base = Fraction(1, target_sample_rate)
|
||||||
|
await writer.push(mixed)
|
||||||
|
finally:
|
||||||
|
# Cleanup all containers, even if processing failed
|
||||||
|
for c in containers:
|
||||||
|
if c is not None:
|
||||||
|
try:
|
||||||
|
c.close()
|
||||||
|
except Exception:
|
||||||
|
pass # Best effort cleanup
|
||||||
|
|
||||||
@broadcast_to_sockets
|
@broadcast_to_sockets
|
||||||
async def set_status(self, transcript_id: str, status: TranscriptStatus):
|
async def set_status(self, transcript_id: str, status: TranscriptStatus):
|
||||||
|
|||||||
@@ -11,19 +11,13 @@ from typing import Literal, Union, assert_never
|
|||||||
|
|
||||||
import celery
|
import celery
|
||||||
from celery.result import AsyncResult
|
from celery.result import AsyncResult
|
||||||
from hatchet_sdk.clients.rest.exceptions import ApiException
|
|
||||||
from hatchet_sdk.clients.rest.models import V1TaskStatus
|
|
||||||
|
|
||||||
from reflector.db.recordings import recordings_controller
|
from reflector.db.recordings import recordings_controller
|
||||||
from reflector.db.rooms import rooms_controller
|
from reflector.db.transcripts import Transcript
|
||||||
from reflector.db.transcripts import Transcript, transcripts_controller
|
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
|
||||||
from reflector.logger import logger
|
|
||||||
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
||||||
from reflector.pipelines.main_multitrack_pipeline import (
|
from reflector.pipelines.main_multitrack_pipeline import (
|
||||||
task_pipeline_multitrack_process,
|
task_pipeline_multitrack_process,
|
||||||
)
|
)
|
||||||
from reflector.settings import settings
|
|
||||||
from reflector.utils.string import NonEmptyString
|
from reflector.utils.string import NonEmptyString
|
||||||
|
|
||||||
|
|
||||||
@@ -43,8 +37,6 @@ class MultitrackProcessingConfig:
|
|||||||
transcript_id: NonEmptyString
|
transcript_id: NonEmptyString
|
||||||
bucket_name: NonEmptyString
|
bucket_name: NonEmptyString
|
||||||
track_keys: list[str]
|
track_keys: list[str]
|
||||||
recording_id: NonEmptyString | None = None
|
|
||||||
room_id: NonEmptyString | None = None
|
|
||||||
mode: Literal["multitrack"] = "multitrack"
|
mode: Literal["multitrack"] = "multitrack"
|
||||||
|
|
||||||
|
|
||||||
@@ -57,7 +49,6 @@ class ValidationOk:
|
|||||||
# transcript currently doesnt always have recording_id
|
# transcript currently doesnt always have recording_id
|
||||||
recording_id: NonEmptyString | None
|
recording_id: NonEmptyString | None
|
||||||
transcript_id: NonEmptyString
|
transcript_id: NonEmptyString
|
||||||
room_id: NonEmptyString | None = None
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
@@ -105,7 +96,6 @@ async def validate_transcript_for_processing(
|
|||||||
if transcript.status == "idle":
|
if transcript.status == "idle":
|
||||||
return ValidationNotReady(detail="Recording is not ready for processing")
|
return ValidationNotReady(detail="Recording is not ready for processing")
|
||||||
|
|
||||||
# Check Celery tasks
|
|
||||||
if task_is_scheduled_or_active(
|
if task_is_scheduled_or_active(
|
||||||
"reflector.pipelines.main_file_pipeline.task_pipeline_file_process",
|
"reflector.pipelines.main_file_pipeline.task_pipeline_file_process",
|
||||||
transcript_id=transcript.id,
|
transcript_id=transcript.id,
|
||||||
@@ -115,25 +105,8 @@ async def validate_transcript_for_processing(
|
|||||||
):
|
):
|
||||||
return ValidationAlreadyScheduled(detail="already running")
|
return ValidationAlreadyScheduled(detail="already running")
|
||||||
|
|
||||||
# Check Hatchet workflows (if enabled)
|
|
||||||
if settings.HATCHET_ENABLED and transcript.workflow_run_id:
|
|
||||||
try:
|
|
||||||
status = await HatchetClientManager.get_workflow_run_status(
|
|
||||||
transcript.workflow_run_id
|
|
||||||
)
|
|
||||||
# If workflow is running or queued, don't allow new processing
|
|
||||||
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
|
|
||||||
return ValidationAlreadyScheduled(
|
|
||||||
detail="Hatchet workflow already running"
|
|
||||||
)
|
|
||||||
except ApiException:
|
|
||||||
# Workflow might be gone (404) or API issue - allow processing
|
|
||||||
pass
|
|
||||||
|
|
||||||
return ValidationOk(
|
return ValidationOk(
|
||||||
recording_id=transcript.recording_id,
|
recording_id=transcript.recording_id, transcript_id=transcript.id
|
||||||
transcript_id=transcript.id,
|
|
||||||
room_id=transcript.room_id,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -143,7 +116,6 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
|
|||||||
"""
|
"""
|
||||||
bucket_name: str | None = None
|
bucket_name: str | None = None
|
||||||
track_keys: list[str] | None = None
|
track_keys: list[str] | None = None
|
||||||
recording_id: str | None = validation.recording_id
|
|
||||||
|
|
||||||
if validation.recording_id:
|
if validation.recording_id:
|
||||||
recording = await recordings_controller.get_by_id(validation.recording_id)
|
recording = await recordings_controller.get_by_id(validation.recording_id)
|
||||||
@@ -165,8 +137,6 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
|
|||||||
bucket_name=bucket_name, # type: ignore (validated above)
|
bucket_name=bucket_name, # type: ignore (validated above)
|
||||||
track_keys=track_keys,
|
track_keys=track_keys,
|
||||||
transcript_id=validation.transcript_id,
|
transcript_id=validation.transcript_id,
|
||||||
recording_id=recording_id,
|
|
||||||
room_id=validation.room_id,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
return FileProcessingConfig(
|
return FileProcessingConfig(
|
||||||
@@ -174,104 +144,8 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def dispatch_transcript_processing(
|
def dispatch_transcript_processing(config: ProcessingConfig) -> AsyncResult:
|
||||||
config: ProcessingConfig, force: bool = False
|
|
||||||
) -> AsyncResult | None:
|
|
||||||
"""Dispatch transcript processing to appropriate backend (Hatchet or Celery).
|
|
||||||
|
|
||||||
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
|
|
||||||
"""
|
|
||||||
if isinstance(config, MultitrackProcessingConfig):
|
if isinstance(config, MultitrackProcessingConfig):
|
||||||
# Check if room has use_hatchet=True (overrides env vars)
|
|
||||||
room_forces_hatchet = False
|
|
||||||
if config.room_id:
|
|
||||||
room = await rooms_controller.get_by_id(config.room_id)
|
|
||||||
room_forces_hatchet = room.use_hatchet if room else False
|
|
||||||
|
|
||||||
# Start durable workflow if enabled (Hatchet)
|
|
||||||
# or if room has use_hatchet=True
|
|
||||||
use_hatchet = settings.HATCHET_ENABLED or room_forces_hatchet
|
|
||||||
|
|
||||||
if room_forces_hatchet:
|
|
||||||
logger.info(
|
|
||||||
"Room forces Hatchet workflow",
|
|
||||||
room_id=config.room_id,
|
|
||||||
transcript_id=config.transcript_id,
|
|
||||||
)
|
|
||||||
|
|
||||||
if use_hatchet:
|
|
||||||
# First check if we can replay (outside transaction since it's read-only)
|
|
||||||
transcript = await transcripts_controller.get_by_id(config.transcript_id)
|
|
||||||
if transcript and transcript.workflow_run_id and not force:
|
|
||||||
can_replay = await HatchetClientManager.can_replay(
|
|
||||||
transcript.workflow_run_id
|
|
||||||
)
|
|
||||||
if can_replay:
|
|
||||||
await HatchetClientManager.replay_workflow(
|
|
||||||
transcript.workflow_run_id
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"Replaying Hatchet workflow",
|
|
||||||
workflow_id=transcript.workflow_run_id,
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Force: cancel old workflow if exists
|
|
||||||
if force and transcript and transcript.workflow_run_id:
|
|
||||||
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
|
|
||||||
logger.info(
|
|
||||||
"Cancelled old workflow (--force)",
|
|
||||||
workflow_id=transcript.workflow_run_id,
|
|
||||||
)
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript, {"workflow_run_id": None}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Re-fetch and check for concurrent dispatch (optimistic approach).
|
|
||||||
# No database lock - worst case is duplicate dispatch, but Hatchet
|
|
||||||
# workflows are idempotent so this is acceptable.
|
|
||||||
transcript = await transcripts_controller.get_by_id(config.transcript_id)
|
|
||||||
if transcript and transcript.workflow_run_id:
|
|
||||||
# Another process started a workflow between validation and now
|
|
||||||
try:
|
|
||||||
status = await HatchetClientManager.get_workflow_run_status(
|
|
||||||
transcript.workflow_run_id
|
|
||||||
)
|
|
||||||
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
|
|
||||||
logger.info(
|
|
||||||
"Concurrent workflow detected, skipping dispatch",
|
|
||||||
workflow_id=transcript.workflow_run_id,
|
|
||||||
)
|
|
||||||
return None
|
|
||||||
except ApiException:
|
|
||||||
# Workflow might be gone (404) or API issue - proceed with new workflow
|
|
||||||
pass
|
|
||||||
|
|
||||||
workflow_id = await HatchetClientManager.start_workflow(
|
|
||||||
workflow_name="DiarizationPipeline",
|
|
||||||
input_data={
|
|
||||||
"recording_id": config.recording_id,
|
|
||||||
"tracks": [{"s3_key": k} for k in config.track_keys],
|
|
||||||
"bucket_name": config.bucket_name,
|
|
||||||
"transcript_id": config.transcript_id,
|
|
||||||
"room_id": config.room_id,
|
|
||||||
},
|
|
||||||
additional_metadata={
|
|
||||||
"transcript_id": config.transcript_id,
|
|
||||||
"recording_id": config.recording_id,
|
|
||||||
"daily_recording_id": config.recording_id,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
if transcript:
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript, {"workflow_run_id": workflow_id}
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Celery pipeline (durable workflows disabled)
|
|
||||||
return task_pipeline_multitrack_process.delay(
|
return task_pipeline_multitrack_process.delay(
|
||||||
transcript_id=config.transcript_id,
|
transcript_id=config.transcript_id,
|
||||||
bucket_name=config.bucket_name,
|
bucket_name=config.bucket_name,
|
||||||
|
|||||||
@@ -153,19 +153,5 @@ class Settings(BaseSettings):
|
|||||||
ZULIP_API_KEY: str | None = None
|
ZULIP_API_KEY: str | None = None
|
||||||
ZULIP_BOT_EMAIL: str | None = None
|
ZULIP_BOT_EMAIL: str | None = None
|
||||||
|
|
||||||
# Durable workflow orchestration
|
|
||||||
# Provider: "hatchet" (or "none" to disable)
|
|
||||||
DURABLE_WORKFLOW_PROVIDER: str = "none"
|
|
||||||
|
|
||||||
# Hatchet workflow orchestration
|
|
||||||
HATCHET_CLIENT_TOKEN: str | None = None
|
|
||||||
HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls
|
|
||||||
HATCHET_DEBUG: bool = False
|
|
||||||
|
|
||||||
@property
|
|
||||||
def HATCHET_ENABLED(self) -> bool:
|
|
||||||
"""True if Hatchet is the active provider."""
|
|
||||||
return self.DURABLE_WORKFLOW_PROVIDER == "hatchet"
|
|
||||||
|
|
||||||
|
|
||||||
settings = Settings()
|
settings = Settings()
|
||||||
|
|||||||
@@ -15,11 +15,8 @@ import time
|
|||||||
from typing import Callable
|
from typing import Callable
|
||||||
|
|
||||||
from celery.result import AsyncResult
|
from celery.result import AsyncResult
|
||||||
from hatchet_sdk.clients.rest.models import V1TaskStatus
|
|
||||||
|
|
||||||
from reflector.db import get_database
|
|
||||||
from reflector.db.transcripts import Transcript, transcripts_controller
|
from reflector.db.transcripts import Transcript, transcripts_controller
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
|
||||||
from reflector.services.transcript_process import (
|
from reflector.services.transcript_process import (
|
||||||
FileProcessingConfig,
|
FileProcessingConfig,
|
||||||
MultitrackProcessingConfig,
|
MultitrackProcessingConfig,
|
||||||
@@ -37,26 +34,24 @@ async def process_transcript_inner(
|
|||||||
transcript: Transcript,
|
transcript: Transcript,
|
||||||
on_validation: Callable[[ValidationResult], None],
|
on_validation: Callable[[ValidationResult], None],
|
||||||
on_preprocess: Callable[[PrepareResult], None],
|
on_preprocess: Callable[[PrepareResult], None],
|
||||||
force: bool = False,
|
) -> AsyncResult:
|
||||||
) -> AsyncResult | None:
|
|
||||||
validation = await validate_transcript_for_processing(transcript)
|
validation = await validate_transcript_for_processing(transcript)
|
||||||
on_validation(validation)
|
on_validation(validation)
|
||||||
config = await prepare_transcript_processing(validation)
|
config = await prepare_transcript_processing(validation)
|
||||||
on_preprocess(config)
|
on_preprocess(config)
|
||||||
return await dispatch_transcript_processing(config, force=force)
|
return dispatch_transcript_processing(config)
|
||||||
|
|
||||||
|
|
||||||
async def process_transcript(
|
async def process_transcript(transcript_id: str, sync: bool = False) -> None:
|
||||||
transcript_id: str, sync: bool = False, force: bool = False
|
|
||||||
) -> None:
|
|
||||||
"""
|
"""
|
||||||
Process a transcript by ID, auto-detecting multitrack vs file pipeline.
|
Process a transcript by ID, auto-detecting multitrack vs file pipeline.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
transcript_id: The transcript UUID
|
transcript_id: The transcript UUID
|
||||||
sync: If True, wait for task completion. If False, dispatch and exit.
|
sync: If True, wait for task completion. If False, dispatch and exit.
|
||||||
force: If True, cancel old workflow and start new (latest code). If False, replay failed workflow.
|
|
||||||
"""
|
"""
|
||||||
|
from reflector.db import get_database
|
||||||
|
|
||||||
database = get_database()
|
database = get_database()
|
||||||
await database.connect()
|
await database.connect()
|
||||||
|
|
||||||
@@ -87,42 +82,10 @@ async def process_transcript(
|
|||||||
print(f"Dispatching file pipeline", file=sys.stderr)
|
print(f"Dispatching file pipeline", file=sys.stderr)
|
||||||
|
|
||||||
result = await process_transcript_inner(
|
result = await process_transcript_inner(
|
||||||
transcript,
|
transcript, on_validation=on_validation, on_preprocess=on_preprocess
|
||||||
on_validation=on_validation,
|
|
||||||
on_preprocess=on_preprocess,
|
|
||||||
force=force,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
if result is None:
|
|
||||||
# Hatchet workflow dispatched
|
|
||||||
if sync:
|
if sync:
|
||||||
# Re-fetch transcript to get workflow_run_id
|
|
||||||
transcript = await transcripts_controller.get_by_id(transcript_id)
|
|
||||||
if not transcript or not transcript.workflow_run_id:
|
|
||||||
print("Error: workflow_run_id not found", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
print("Waiting for Hatchet workflow...", file=sys.stderr)
|
|
||||||
while True:
|
|
||||||
status = await HatchetClientManager.get_workflow_run_status(
|
|
||||||
transcript.workflow_run_id
|
|
||||||
)
|
|
||||||
print(f" Status: {status.value}", file=sys.stderr)
|
|
||||||
|
|
||||||
if status == V1TaskStatus.COMPLETED:
|
|
||||||
print("Workflow completed successfully", file=sys.stderr)
|
|
||||||
break
|
|
||||||
elif status in (V1TaskStatus.FAILED, V1TaskStatus.CANCELLED):
|
|
||||||
print(f"Workflow failed: {status}", file=sys.stderr)
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
await asyncio.sleep(5)
|
|
||||||
else:
|
|
||||||
print(
|
|
||||||
"Task dispatched (use --sync to wait for completion)",
|
|
||||||
file=sys.stderr,
|
|
||||||
)
|
|
||||||
elif sync:
|
|
||||||
print("Waiting for task completion...", file=sys.stderr)
|
print("Waiting for task completion...", file=sys.stderr)
|
||||||
while not result.ready():
|
while not result.ready():
|
||||||
print(f" Status: {result.state}", file=sys.stderr)
|
print(f" Status: {result.state}", file=sys.stderr)
|
||||||
@@ -155,16 +118,9 @@ def main():
|
|||||||
action="store_true",
|
action="store_true",
|
||||||
help="Wait for task completion instead of just dispatching",
|
help="Wait for task completion instead of just dispatching",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
|
||||||
"--force",
|
|
||||||
action="store_true",
|
|
||||||
help="Cancel old workflow and start new (uses latest code instead of replaying)",
|
|
||||||
)
|
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
asyncio.run(
|
asyncio.run(process_transcript(args.transcript_id, sync=args.sync))
|
||||||
process_transcript(args.transcript_id, sync=args.sync, force=args.force)
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@@ -1,15 +0,0 @@
|
|||||||
"""
|
|
||||||
Shared audio processing constants.
|
|
||||||
|
|
||||||
Used by both Hatchet workflows and Celery pipelines for consistent audio encoding.
|
|
||||||
"""
|
|
||||||
|
|
||||||
# Opus codec settings
|
|
||||||
OPUS_STANDARD_SAMPLE_RATE = 48000
|
|
||||||
OPUS_DEFAULT_BIT_RATE = 128000 # 128kbps for good speech quality
|
|
||||||
|
|
||||||
# S3 presigned URL expiration
|
|
||||||
PRESIGNED_URL_EXPIRATION_SECONDS = 7200 # 2 hours
|
|
||||||
|
|
||||||
# Waveform visualization
|
|
||||||
WAVEFORM_SEGMENTS = 255
|
|
||||||
@@ -1,227 +0,0 @@
|
|||||||
"""
|
|
||||||
Audio track mixdown utilities.
|
|
||||||
|
|
||||||
Shared PyAV-based functions for mixing multiple audio tracks into a single output.
|
|
||||||
Used by both Hatchet workflows and Celery pipelines.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from fractions import Fraction
|
|
||||||
|
|
||||||
import av
|
|
||||||
from av.audio.resampler import AudioResampler
|
|
||||||
|
|
||||||
|
|
||||||
def detect_sample_rate_from_tracks(track_urls: list[str], logger=None) -> int | None:
|
|
||||||
"""Detect sample rate from first decodable audio frame.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
track_urls: List of URLs to audio files (S3 presigned or local)
|
|
||||||
logger: Optional logger instance
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Sample rate in Hz, or None if no decodable frames found
|
|
||||||
"""
|
|
||||||
for url in track_urls:
|
|
||||||
if not url:
|
|
||||||
continue
|
|
||||||
container = None
|
|
||||||
try:
|
|
||||||
container = av.open(url)
|
|
||||||
for frame in container.decode(audio=0):
|
|
||||||
return frame.sample_rate
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
finally:
|
|
||||||
if container is not None:
|
|
||||||
container.close()
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
async def mixdown_tracks_pyav(
|
|
||||||
track_urls: list[str],
|
|
||||||
writer,
|
|
||||||
target_sample_rate: int,
|
|
||||||
offsets_seconds: list[float] | None = None,
|
|
||||||
logger=None,
|
|
||||||
) -> None:
|
|
||||||
"""Multi-track mixdown using PyAV filter graph (amix).
|
|
||||||
|
|
||||||
Builds a filter graph: N abuffer -> optional adelay -> amix -> aformat -> sink
|
|
||||||
Reads from S3 presigned URLs or local files, pushes mixed frames to writer.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
track_urls: List of URLs to audio tracks (S3 presigned or local)
|
|
||||||
writer: AudioFileWriterProcessor instance with async push() method
|
|
||||||
target_sample_rate: Sample rate for output (Hz)
|
|
||||||
offsets_seconds: Optional per-track delays in seconds for alignment.
|
|
||||||
If provided, must have same length as track_urls. Delays are relative
|
|
||||||
to the minimum offset (earliest track has delay=0).
|
|
||||||
logger: Optional logger instance
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
ValueError: If offsets_seconds length doesn't match track_urls,
|
|
||||||
no valid tracks provided, or no containers can be opened
|
|
||||||
"""
|
|
||||||
if offsets_seconds is not None and len(offsets_seconds) != len(track_urls):
|
|
||||||
raise ValueError(
|
|
||||||
f"offsets_seconds length ({len(offsets_seconds)}) must match track_urls ({len(track_urls)})"
|
|
||||||
)
|
|
||||||
|
|
||||||
valid_track_urls = [url for url in track_urls if url]
|
|
||||||
if not valid_track_urls:
|
|
||||||
if logger:
|
|
||||||
logger.error("Mixdown failed - no valid track URLs provided")
|
|
||||||
raise ValueError("Mixdown failed: No valid track URLs")
|
|
||||||
|
|
||||||
# Calculate per-input delays if offsets provided
|
|
||||||
input_offsets_seconds = None
|
|
||||||
if offsets_seconds is not None:
|
|
||||||
input_offsets_seconds = [
|
|
||||||
offsets_seconds[i] for i, url in enumerate(track_urls) if url
|
|
||||||
]
|
|
||||||
|
|
||||||
# Build PyAV filter graph:
|
|
||||||
# N abuffer (s32/stereo)
|
|
||||||
# -> optional adelay per input (for alignment)
|
|
||||||
# -> amix (s32)
|
|
||||||
# -> aformat(s16)
|
|
||||||
# -> sink
|
|
||||||
graph = av.filter.Graph()
|
|
||||||
inputs = []
|
|
||||||
|
|
||||||
for idx, url in enumerate(valid_track_urls):
|
|
||||||
args = (
|
|
||||||
f"time_base=1/{target_sample_rate}:"
|
|
||||||
f"sample_rate={target_sample_rate}:"
|
|
||||||
f"sample_fmt=s32:"
|
|
||||||
f"channel_layout=stereo"
|
|
||||||
)
|
|
||||||
in_ctx = graph.add("abuffer", args=args, name=f"in{idx}")
|
|
||||||
inputs.append(in_ctx)
|
|
||||||
|
|
||||||
if not inputs:
|
|
||||||
if logger:
|
|
||||||
logger.error("Mixdown failed - no valid inputs for graph")
|
|
||||||
raise ValueError("Mixdown failed: No valid inputs for filter graph")
|
|
||||||
|
|
||||||
mixer = graph.add("amix", args=f"inputs={len(inputs)}:normalize=0", name="mix")
|
|
||||||
|
|
||||||
fmt = graph.add(
|
|
||||||
"aformat",
|
|
||||||
args=f"sample_fmts=s32:channel_layouts=stereo:sample_rates={target_sample_rate}",
|
|
||||||
name="fmt",
|
|
||||||
)
|
|
||||||
|
|
||||||
sink = graph.add("abuffersink", name="out")
|
|
||||||
|
|
||||||
# Optional per-input delay before mixing
|
|
||||||
delays_ms: list[int] = []
|
|
||||||
if input_offsets_seconds is not None:
|
|
||||||
base = min(input_offsets_seconds) if input_offsets_seconds else 0.0
|
|
||||||
delays_ms = [
|
|
||||||
max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds
|
|
||||||
]
|
|
||||||
else:
|
|
||||||
delays_ms = [0 for _ in inputs]
|
|
||||||
|
|
||||||
for idx, in_ctx in enumerate(inputs):
|
|
||||||
delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0
|
|
||||||
if delay_ms > 0:
|
|
||||||
# adelay requires one value per channel; use same for stereo
|
|
||||||
adelay = graph.add(
|
|
||||||
"adelay",
|
|
||||||
args=f"delays={delay_ms}|{delay_ms}:all=1",
|
|
||||||
name=f"delay{idx}",
|
|
||||||
)
|
|
||||||
in_ctx.link_to(adelay)
|
|
||||||
adelay.link_to(mixer, 0, idx)
|
|
||||||
else:
|
|
||||||
in_ctx.link_to(mixer, 0, idx)
|
|
||||||
|
|
||||||
mixer.link_to(fmt)
|
|
||||||
fmt.link_to(sink)
|
|
||||||
graph.configure()
|
|
||||||
|
|
||||||
containers = []
|
|
||||||
try:
|
|
||||||
# Open all containers with cleanup guaranteed
|
|
||||||
for i, url in enumerate(valid_track_urls):
|
|
||||||
try:
|
|
||||||
c = av.open(
|
|
||||||
url,
|
|
||||||
options={
|
|
||||||
# S3 streaming options
|
|
||||||
"reconnect": "1",
|
|
||||||
"reconnect_streamed": "1",
|
|
||||||
"reconnect_delay_max": "5",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
containers.append(c)
|
|
||||||
except Exception as e:
|
|
||||||
if logger:
|
|
||||||
logger.warning(
|
|
||||||
"Mixdown: failed to open container from URL",
|
|
||||||
input=i,
|
|
||||||
url=url,
|
|
||||||
error=str(e),
|
|
||||||
)
|
|
||||||
|
|
||||||
if not containers:
|
|
||||||
if logger:
|
|
||||||
logger.error("Mixdown failed - no valid containers opened")
|
|
||||||
raise ValueError("Mixdown failed: Could not open any track containers")
|
|
||||||
|
|
||||||
decoders = [c.decode(audio=0) for c in containers]
|
|
||||||
active = [True] * len(decoders)
|
|
||||||
resamplers = [
|
|
||||||
AudioResampler(format="s32", layout="stereo", rate=target_sample_rate)
|
|
||||||
for _ in decoders
|
|
||||||
]
|
|
||||||
|
|
||||||
while any(active):
|
|
||||||
for i, (dec, is_active) in enumerate(zip(decoders, active)):
|
|
||||||
if not is_active:
|
|
||||||
continue
|
|
||||||
try:
|
|
||||||
frame = next(dec)
|
|
||||||
except StopIteration:
|
|
||||||
active[i] = False
|
|
||||||
# Signal end of stream to filter graph
|
|
||||||
inputs[i].push(None)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if frame.sample_rate != target_sample_rate:
|
|
||||||
continue
|
|
||||||
out_frames = resamplers[i].resample(frame) or []
|
|
||||||
for rf in out_frames:
|
|
||||||
rf.sample_rate = target_sample_rate
|
|
||||||
rf.time_base = Fraction(1, target_sample_rate)
|
|
||||||
inputs[i].push(rf)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
mixed = sink.pull()
|
|
||||||
except Exception:
|
|
||||||
break
|
|
||||||
mixed.sample_rate = target_sample_rate
|
|
||||||
mixed.time_base = Fraction(1, target_sample_rate)
|
|
||||||
await writer.push(mixed)
|
|
||||||
|
|
||||||
# Flush remaining frames from filter graph
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
mixed = sink.pull()
|
|
||||||
except Exception:
|
|
||||||
break
|
|
||||||
mixed.sample_rate = target_sample_rate
|
|
||||||
mixed.time_base = Fraction(1, target_sample_rate)
|
|
||||||
await writer.push(mixed)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# Cleanup all containers, even if processing failed
|
|
||||||
for c in containers:
|
|
||||||
if c is not None:
|
|
||||||
try:
|
|
||||||
c.close()
|
|
||||||
except Exception:
|
|
||||||
pass # Best effort cleanup
|
|
||||||
@@ -1,186 +0,0 @@
|
|||||||
"""
|
|
||||||
Audio track padding utilities.
|
|
||||||
|
|
||||||
Shared PyAV-based functions for extracting stream metadata and applying
|
|
||||||
silence padding to audio tracks. Used by both Hatchet workflows and Celery pipelines.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import math
|
|
||||||
from fractions import Fraction
|
|
||||||
|
|
||||||
import av
|
|
||||||
from av.audio.resampler import AudioResampler
|
|
||||||
|
|
||||||
from reflector.utils.audio_constants import (
|
|
||||||
OPUS_DEFAULT_BIT_RATE,
|
|
||||||
OPUS_STANDARD_SAMPLE_RATE,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def extract_stream_start_time_from_container(
|
|
||||||
container,
|
|
||||||
track_idx: int,
|
|
||||||
logger=None,
|
|
||||||
) -> float:
|
|
||||||
"""Extract meeting-relative start time from WebM stream metadata.
|
|
||||||
|
|
||||||
Uses PyAV to read stream.start_time from WebM container.
|
|
||||||
More accurate than filename timestamps by ~209ms due to network/encoding delays.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
container: PyAV container opened from audio file/URL
|
|
||||||
track_idx: Track index for logging context
|
|
||||||
logger: Optional logger instance (structlog or stdlib compatible)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Start time in seconds (0.0 if extraction fails)
|
|
||||||
"""
|
|
||||||
start_time_seconds = 0.0
|
|
||||||
try:
|
|
||||||
audio_streams = [s for s in container.streams if s.type == "audio"]
|
|
||||||
stream = audio_streams[0] if audio_streams else container.streams[0]
|
|
||||||
|
|
||||||
# 1) Try stream-level start_time (most reliable for Daily.co tracks)
|
|
||||||
if stream.start_time is not None and stream.time_base is not None:
|
|
||||||
start_time_seconds = float(stream.start_time * stream.time_base)
|
|
||||||
|
|
||||||
# 2) Fallback to container-level start_time (in av.time_base units)
|
|
||||||
if (start_time_seconds <= 0) and (container.start_time is not None):
|
|
||||||
start_time_seconds = float(container.start_time * av.time_base)
|
|
||||||
|
|
||||||
# 3) Fallback to first packet DTS in stream.time_base
|
|
||||||
if start_time_seconds <= 0:
|
|
||||||
for packet in container.demux(stream):
|
|
||||||
if packet.dts is not None:
|
|
||||||
start_time_seconds = float(packet.dts * stream.time_base)
|
|
||||||
break
|
|
||||||
except Exception as e:
|
|
||||||
if logger:
|
|
||||||
logger.warning(
|
|
||||||
"PyAV metadata read failed; assuming 0 start_time",
|
|
||||||
track_idx=track_idx,
|
|
||||||
error=str(e),
|
|
||||||
)
|
|
||||||
start_time_seconds = 0.0
|
|
||||||
|
|
||||||
if logger:
|
|
||||||
logger.info(
|
|
||||||
f"Track {track_idx} stream metadata: start_time={start_time_seconds:.3f}s",
|
|
||||||
track_idx=track_idx,
|
|
||||||
)
|
|
||||||
return start_time_seconds
|
|
||||||
|
|
||||||
|
|
||||||
def apply_audio_padding_to_file(
|
|
||||||
in_container,
|
|
||||||
output_path: str,
|
|
||||||
start_time_seconds: float,
|
|
||||||
track_idx: int,
|
|
||||||
logger=None,
|
|
||||||
) -> None:
|
|
||||||
"""Apply silence padding to audio track using PyAV filter graph.
|
|
||||||
|
|
||||||
Uses adelay filter to prepend silence, aligning track to meeting start time.
|
|
||||||
Output is WebM/Opus format.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
in_container: PyAV container opened from source audio
|
|
||||||
output_path: Path for output WebM file
|
|
||||||
start_time_seconds: Amount of silence to prepend (in seconds)
|
|
||||||
track_idx: Track index for logging context
|
|
||||||
logger: Optional logger instance (structlog or stdlib compatible)
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
Exception: If no audio stream found or PyAV processing fails
|
|
||||||
"""
|
|
||||||
delay_ms = math.floor(start_time_seconds * 1000)
|
|
||||||
|
|
||||||
if logger:
|
|
||||||
logger.info(
|
|
||||||
f"Padding track {track_idx} with {delay_ms}ms delay using PyAV",
|
|
||||||
track_idx=track_idx,
|
|
||||||
delay_ms=delay_ms,
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
with av.open(output_path, "w", format="webm") as out_container:
|
|
||||||
in_stream = next(
|
|
||||||
(s for s in in_container.streams if s.type == "audio"), None
|
|
||||||
)
|
|
||||||
if in_stream is None:
|
|
||||||
raise Exception("No audio stream in input")
|
|
||||||
|
|
||||||
out_stream = out_container.add_stream(
|
|
||||||
"libopus", rate=OPUS_STANDARD_SAMPLE_RATE
|
|
||||||
)
|
|
||||||
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
|
|
||||||
graph = av.filter.Graph()
|
|
||||||
|
|
||||||
abuf_args = (
|
|
||||||
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
|
|
||||||
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
|
|
||||||
f"sample_fmt=s16:"
|
|
||||||
f"channel_layout=stereo"
|
|
||||||
)
|
|
||||||
src = graph.add("abuffer", args=abuf_args, name="src")
|
|
||||||
aresample_f = graph.add("aresample", args="async=1", name="ares")
|
|
||||||
# adelay requires one delay value per channel separated by '|'
|
|
||||||
delays_arg = f"{delay_ms}|{delay_ms}"
|
|
||||||
adelay_f = graph.add(
|
|
||||||
"adelay", args=f"delays={delays_arg}:all=1", name="delay"
|
|
||||||
)
|
|
||||||
sink = graph.add("abuffersink", name="sink")
|
|
||||||
|
|
||||||
src.link_to(aresample_f)
|
|
||||||
aresample_f.link_to(adelay_f)
|
|
||||||
adelay_f.link_to(sink)
|
|
||||||
graph.configure()
|
|
||||||
|
|
||||||
resampler = AudioResampler(
|
|
||||||
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
|
|
||||||
)
|
|
||||||
|
|
||||||
# Decode -> resample -> push through graph -> encode Opus
|
|
||||||
for frame in in_container.decode(in_stream):
|
|
||||||
out_frames = resampler.resample(frame) or []
|
|
||||||
for rframe in out_frames:
|
|
||||||
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
|
|
||||||
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
|
|
||||||
src.push(rframe)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
f_out = sink.pull()
|
|
||||||
except Exception:
|
|
||||||
break
|
|
||||||
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
|
|
||||||
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
|
|
||||||
for packet in out_stream.encode(f_out):
|
|
||||||
out_container.mux(packet)
|
|
||||||
|
|
||||||
# Flush remaining frames from filter graph
|
|
||||||
src.push(None)
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
f_out = sink.pull()
|
|
||||||
except Exception:
|
|
||||||
break
|
|
||||||
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
|
|
||||||
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
|
|
||||||
for packet in out_stream.encode(f_out):
|
|
||||||
out_container.mux(packet)
|
|
||||||
|
|
||||||
# Flush encoder
|
|
||||||
for packet in out_stream.encode(None):
|
|
||||||
out_container.mux(packet)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
if logger:
|
|
||||||
logger.error(
|
|
||||||
"PyAV padding failed for track",
|
|
||||||
track_idx=track_idx,
|
|
||||||
delay_ms=delay_ms,
|
|
||||||
error=str(e),
|
|
||||||
exc_info=True,
|
|
||||||
)
|
|
||||||
raise
|
|
||||||
@@ -1,4 +0,0 @@
|
|||||||
def assert_not_none[T](value: T | None, message: str = "Value is None") -> T:
|
|
||||||
if value is None:
|
|
||||||
raise ValueError(message)
|
|
||||||
return value
|
|
||||||
@@ -2,17 +2,6 @@ from typing import Annotated, TypeVar
|
|||||||
|
|
||||||
from pydantic import Field, TypeAdapter, constr
|
from pydantic import Field, TypeAdapter, constr
|
||||||
|
|
||||||
T_NotNone = TypeVar("T_NotNone")
|
|
||||||
|
|
||||||
|
|
||||||
def assert_not_none(
|
|
||||||
value: T_NotNone | None, message: str = "Value is None"
|
|
||||||
) -> T_NotNone:
|
|
||||||
if value is None:
|
|
||||||
raise ValueError(message)
|
|
||||||
return value
|
|
||||||
|
|
||||||
|
|
||||||
NonEmptyStringBase = constr(min_length=1, strip_whitespace=False)
|
NonEmptyStringBase = constr(min_length=1, strip_whitespace=False)
|
||||||
NonEmptyString = Annotated[
|
NonEmptyString = Annotated[
|
||||||
NonEmptyStringBase,
|
NonEmptyStringBase,
|
||||||
@@ -34,18 +23,10 @@ def try_parse_non_empty_string(s: str) -> NonEmptyString | None:
|
|||||||
return parse_non_empty_string(s)
|
return parse_non_empty_string(s)
|
||||||
|
|
||||||
|
|
||||||
T_Str = TypeVar("T_Str", bound=str)
|
T = TypeVar("T", bound=str)
|
||||||
|
|
||||||
|
|
||||||
def assert_equal(s1: T_Str, s2: T_Str) -> T_Str:
|
def assert_equal[T](s1: T, s2: T) -> T:
|
||||||
if s1 != s2:
|
if s1 != s2:
|
||||||
raise ValueError(f"assert_equal: {s1} != {s2}")
|
raise ValueError(f"assert_equal: {s1} != {s2}")
|
||||||
return s1
|
return s1
|
||||||
|
|
||||||
|
|
||||||
def assert_non_none_and_non_empty(
|
|
||||||
value: str | None, error: str | None = None
|
|
||||||
) -> NonEmptyString:
|
|
||||||
return parse_non_empty_string(
|
|
||||||
assert_not_none(value, error or "Value is None"), error
|
|
||||||
)
|
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ class Room(BaseModel):
|
|||||||
ics_last_sync: Optional[datetime] = None
|
ics_last_sync: Optional[datetime] = None
|
||||||
ics_last_etag: Optional[str] = None
|
ics_last_etag: Optional[str] = None
|
||||||
platform: Platform
|
platform: Platform
|
||||||
|
skip_consent: bool = False
|
||||||
|
|
||||||
|
|
||||||
class RoomDetails(Room):
|
class RoomDetails(Room):
|
||||||
@@ -90,6 +91,7 @@ class CreateRoom(BaseModel):
|
|||||||
ics_fetch_interval: int = 300
|
ics_fetch_interval: int = 300
|
||||||
ics_enabled: bool = False
|
ics_enabled: bool = False
|
||||||
platform: Platform
|
platform: Platform
|
||||||
|
skip_consent: bool = False
|
||||||
|
|
||||||
|
|
||||||
class UpdateRoom(BaseModel):
|
class UpdateRoom(BaseModel):
|
||||||
@@ -108,6 +110,7 @@ class UpdateRoom(BaseModel):
|
|||||||
ics_fetch_interval: Optional[int] = None
|
ics_fetch_interval: Optional[int] = None
|
||||||
ics_enabled: Optional[bool] = None
|
ics_enabled: Optional[bool] = None
|
||||||
platform: Optional[Platform] = None
|
platform: Optional[Platform] = None
|
||||||
|
skip_consent: Optional[bool] = None
|
||||||
|
|
||||||
|
|
||||||
class CreateRoomMeeting(BaseModel):
|
class CreateRoomMeeting(BaseModel):
|
||||||
@@ -249,6 +252,7 @@ async def rooms_create(
|
|||||||
ics_fetch_interval=room.ics_fetch_interval,
|
ics_fetch_interval=room.ics_fetch_interval,
|
||||||
ics_enabled=room.ics_enabled,
|
ics_enabled=room.ics_enabled,
|
||||||
platform=room.platform,
|
platform=room.platform,
|
||||||
|
skip_consent=room.skip_consent,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -567,10 +571,17 @@ async def rooms_join_meeting(
|
|||||||
|
|
||||||
if meeting.platform == "daily" and user_id is not None:
|
if meeting.platform == "daily" and user_id is not None:
|
||||||
client = create_platform_client(meeting.platform)
|
client = create_platform_client(meeting.platform)
|
||||||
|
# Show Daily's built-in recording UI when:
|
||||||
|
# - local recording (user controls when to record), OR
|
||||||
|
# - cloud recording with consent disabled (skip_consent=True)
|
||||||
|
# Hide it when cloud recording with consent enabled (we show custom consent UI)
|
||||||
|
enable_recording_ui = meeting.recording_type == "local" or (
|
||||||
|
meeting.recording_type == "cloud" and room.skip_consent
|
||||||
|
)
|
||||||
token = await client.create_meeting_token(
|
token = await client.create_meeting_token(
|
||||||
meeting.room_name,
|
meeting.room_name,
|
||||||
start_cloud_recording=meeting.recording_type == "cloud",
|
start_cloud_recording=meeting.recording_type == "cloud",
|
||||||
enable_recording_ui=meeting.recording_type == "local",
|
enable_recording_ui=enable_recording_ui,
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
is_owner=user_id == room.user_id,
|
is_owner=user_id == room.user_id,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -50,5 +50,5 @@ async def transcript_process(
|
|||||||
if isinstance(config, ProcessError):
|
if isinstance(config, ProcessError):
|
||||||
raise HTTPException(status_code=500, detail=config.detail)
|
raise HTTPException(status_code=500, detail=config.detail)
|
||||||
else:
|
else:
|
||||||
await dispatch_transcript_processing(config)
|
dispatch_transcript_processing(config)
|
||||||
return ProcessStatus(status="ok")
|
return ProcessStatus(status="ok")
|
||||||
|
|||||||
@@ -7,6 +7,12 @@ from reflector.settings import settings
|
|||||||
logger = structlog.get_logger(__name__)
|
logger = structlog.get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@shared_task(name="celery.ping")
|
||||||
|
def celery_ping():
|
||||||
|
"""Compatibility task for Celery 5.x - celery.ping was removed but monitoring tools still call it."""
|
||||||
|
return "pong"
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
def healthcheck_ping():
|
def healthcheck_ping():
|
||||||
url = settings.HEALTHCHECK_URL
|
url = settings.HEALTHCHECK_URL
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ from reflector.db.transcripts import (
|
|||||||
SourceKind,
|
SourceKind,
|
||||||
transcripts_controller,
|
transcripts_controller,
|
||||||
)
|
)
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
|
||||||
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
||||||
from reflector.pipelines.main_live_pipeline import asynctask
|
from reflector.pipelines.main_live_pipeline import asynctask
|
||||||
from reflector.pipelines.main_multitrack_pipeline import (
|
from reflector.pipelines.main_multitrack_pipeline import (
|
||||||
@@ -287,45 +286,6 @@ async def _process_multitrack_recording_inner(
|
|||||||
room_id=room.id,
|
room_id=room.id,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Start durable workflow if enabled (Hatchet) or room overrides it
|
|
||||||
durable_started = False
|
|
||||||
use_hatchet = settings.HATCHET_ENABLED or (room and room.use_hatchet)
|
|
||||||
|
|
||||||
if room and room.use_hatchet and not settings.HATCHET_ENABLED:
|
|
||||||
logger.info(
|
|
||||||
"Room forces Hatchet workflow",
|
|
||||||
room_id=room.id,
|
|
||||||
transcript_id=transcript.id,
|
|
||||||
)
|
|
||||||
|
|
||||||
if use_hatchet:
|
|
||||||
workflow_id = await HatchetClientManager.start_workflow(
|
|
||||||
workflow_name="DiarizationPipeline",
|
|
||||||
input_data={
|
|
||||||
"recording_id": recording_id,
|
|
||||||
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
|
|
||||||
"bucket_name": bucket_name,
|
|
||||||
"transcript_id": transcript.id,
|
|
||||||
"room_id": room.id,
|
|
||||||
},
|
|
||||||
additional_metadata={
|
|
||||||
"transcript_id": transcript.id,
|
|
||||||
"recording_id": recording_id,
|
|
||||||
"daily_recording_id": recording_id,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
logger.info(
|
|
||||||
"Started Hatchet workflow",
|
|
||||||
workflow_id=workflow_id,
|
|
||||||
transcript_id=transcript.id,
|
|
||||||
)
|
|
||||||
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript, {"workflow_run_id": workflow_id}
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Celery pipeline (runs when durable workflows disabled)
|
|
||||||
task_pipeline_multitrack_process.delay(
|
task_pipeline_multitrack_process.delay(
|
||||||
transcript_id=transcript.id,
|
transcript_id=transcript.id,
|
||||||
bucket_name=bucket_name,
|
bucket_name=bucket_name,
|
||||||
@@ -610,12 +570,12 @@ async def process_meetings():
|
|||||||
client = create_platform_client(meeting.platform)
|
client = create_platform_client(meeting.platform)
|
||||||
room_sessions = await client.get_room_sessions(meeting.room_name)
|
room_sessions = await client.get_room_sessions(meeting.room_name)
|
||||||
|
|
||||||
has_active_sessions = room_sessions and any(
|
has_active_sessions = bool(
|
||||||
s.ended_at is None for s in room_sessions
|
room_sessions and any(s.ended_at is None for s in room_sessions)
|
||||||
)
|
)
|
||||||
has_had_sessions = bool(room_sessions)
|
has_had_sessions = bool(room_sessions)
|
||||||
logger_.info(
|
logger_.info(
|
||||||
f"found {has_active_sessions} active sessions, had {has_had_sessions}"
|
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
|
||||||
)
|
)
|
||||||
|
|
||||||
if has_active_sessions:
|
if has_active_sessions:
|
||||||
@@ -812,11 +772,6 @@ async def reprocess_failed_daily_recordings():
|
|||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Fetch room to check use_hatchet flag
|
|
||||||
room = None
|
|
||||||
if meeting.room_id:
|
|
||||||
room = await rooms_controller.get_by_id(meeting.room_id)
|
|
||||||
|
|
||||||
transcript = None
|
transcript = None
|
||||||
try:
|
try:
|
||||||
transcript = await transcripts_controller.get_by_recording_id(
|
transcript = await transcripts_controller.get_by_recording_id(
|
||||||
@@ -836,49 +791,8 @@ async def reprocess_failed_daily_recordings():
|
|||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
use_hatchet = settings.HATCHET_ENABLED or (room and room.use_hatchet)
|
|
||||||
|
|
||||||
if use_hatchet:
|
|
||||||
# Hatchet requires a transcript for workflow_run_id tracking
|
|
||||||
if not transcript:
|
|
||||||
logger.warning(
|
|
||||||
"No transcript for Hatchet reprocessing, skipping",
|
|
||||||
recording_id=recording.id,
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
|
|
||||||
workflow_id = await HatchetClientManager.start_workflow(
|
|
||||||
workflow_name="DiarizationPipeline",
|
|
||||||
input_data={
|
|
||||||
"recording_id": recording.id,
|
|
||||||
"tracks": [
|
|
||||||
{"s3_key": k}
|
|
||||||
for k in filter_cam_audio_tracks(recording.track_keys)
|
|
||||||
],
|
|
||||||
"bucket_name": bucket_name,
|
|
||||||
"transcript_id": transcript.id,
|
|
||||||
"room_id": room.id if room else None,
|
|
||||||
},
|
|
||||||
additional_metadata={
|
|
||||||
"transcript_id": transcript.id,
|
|
||||||
"recording_id": recording.id,
|
|
||||||
"reprocess": True,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript, {"workflow_run_id": workflow_id}
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Queued Daily recording for Hatchet reprocessing",
|
"Queueing Daily recording for reprocessing",
|
||||||
recording_id=recording.id,
|
|
||||||
workflow_id=workflow_id,
|
|
||||||
room_name=meeting.room_name,
|
|
||||||
track_count=len(recording.track_keys),
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.info(
|
|
||||||
"Queueing Daily recording for Celery reprocessing",
|
|
||||||
recording_id=recording.id,
|
recording_id=recording.id,
|
||||||
room_name=meeting.room_name,
|
room_name=meeting.room_name,
|
||||||
track_count=len(recording.track_keys),
|
track_count=len(recording.track_keys),
|
||||||
@@ -891,7 +805,6 @@ async def reprocess_failed_daily_recordings():
|
|||||||
recording_id=recording.id,
|
recording_id=recording.id,
|
||||||
track_keys=recording.track_keys,
|
track_keys=recording.track_keys,
|
||||||
)
|
)
|
||||||
|
|
||||||
reprocessed_count += 1
|
reprocessed_count += 1
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ import threading
|
|||||||
import redis.asyncio as redis
|
import redis.asyncio as redis
|
||||||
from fastapi import WebSocket
|
from fastapi import WebSocket
|
||||||
|
|
||||||
from reflector.events import subscribers_shutdown
|
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
|
|
||||||
|
|
||||||
@@ -110,30 +109,29 @@ class WebsocketManager:
|
|||||||
await socket.send_json(data)
|
await socket.send_json(data)
|
||||||
|
|
||||||
|
|
||||||
_ws_manager_instance: WebsocketManager | None = None
|
|
||||||
_ws_manager_lock = threading.Lock()
|
|
||||||
|
|
||||||
|
|
||||||
def get_ws_manager() -> WebsocketManager:
|
def get_ws_manager() -> WebsocketManager:
|
||||||
"""Returns the WebsocketManager singleton instance."""
|
"""
|
||||||
global _ws_manager_instance
|
Returns the WebsocketManager instance for managing websockets.
|
||||||
if _ws_manager_instance is None:
|
|
||||||
with _ws_manager_lock:
|
This function initializes and returns the WebsocketManager instance,
|
||||||
if _ws_manager_instance is None:
|
which is responsible for managing websockets and handling websocket
|
||||||
|
connections.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
WebsocketManager: The initialized WebsocketManager instance.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
ImportError: If the 'reflector.settings' module cannot be imported.
|
||||||
|
RedisConnectionError: If there is an error connecting to the Redis server.
|
||||||
|
"""
|
||||||
|
local = threading.local()
|
||||||
|
if hasattr(local, "ws_manager"):
|
||||||
|
return local.ws_manager
|
||||||
|
|
||||||
pubsub_client = RedisPubSubManager(
|
pubsub_client = RedisPubSubManager(
|
||||||
host=settings.REDIS_HOST,
|
host=settings.REDIS_HOST,
|
||||||
port=settings.REDIS_PORT,
|
port=settings.REDIS_PORT,
|
||||||
)
|
)
|
||||||
_ws_manager_instance = WebsocketManager(pubsub_client=pubsub_client)
|
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
|
||||||
return _ws_manager_instance
|
local.ws_manager = ws_manager
|
||||||
|
return ws_manager
|
||||||
|
|
||||||
async def cleanup_ws_manager(_app=None) -> None:
|
|
||||||
"""Cleanup WebsocketManager singleton on shutdown."""
|
|
||||||
global _ws_manager_instance
|
|
||||||
if _ws_manager_instance is not None:
|
|
||||||
await _ws_manager_instance.pubsub_client.disconnect()
|
|
||||||
_ws_manager_instance = None
|
|
||||||
|
|
||||||
|
|
||||||
subscribers_shutdown.append(cleanup_ws_manager)
|
|
||||||
|
|||||||
@@ -3,8 +3,7 @@ from urllib.parse import urlparse
|
|||||||
|
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
from reflector.db.rooms import rooms_controller
|
from reflector.db.transcripts import Transcript
|
||||||
from reflector.db.transcripts import Transcript, transcripts_controller
|
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
|
|
||||||
|
|
||||||
@@ -114,49 +113,6 @@ def get_zulip_message(transcript: Transcript, include_topics: bool):
|
|||||||
return message
|
return message
|
||||||
|
|
||||||
|
|
||||||
async def post_transcript_notification(transcript: Transcript) -> int | None:
|
|
||||||
"""Post or update transcript notification in Zulip.
|
|
||||||
|
|
||||||
Uses transcript.room_id directly (Hatchet flow).
|
|
||||||
Celery's pipeline_post_to_zulip uses recording→meeting→room path instead.
|
|
||||||
DUPLICATION NOTE: This function will stay when we use Celery no more, and Celery one will be removed.
|
|
||||||
"""
|
|
||||||
if not transcript.room_id:
|
|
||||||
return None
|
|
||||||
|
|
||||||
room = await rooms_controller.get_by_id(transcript.room_id)
|
|
||||||
if not room or not room.zulip_stream or not room.zulip_auto_post:
|
|
||||||
return None
|
|
||||||
|
|
||||||
message = get_zulip_message(transcript=transcript, include_topics=True)
|
|
||||||
message_updated = False
|
|
||||||
|
|
||||||
if transcript.zulip_message_id:
|
|
||||||
try:
|
|
||||||
await update_zulip_message(
|
|
||||||
transcript.zulip_message_id,
|
|
||||||
room.zulip_stream,
|
|
||||||
room.zulip_topic,
|
|
||||||
message,
|
|
||||||
)
|
|
||||||
message_updated = True
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if not message_updated:
|
|
||||||
response = await send_message_to_zulip(
|
|
||||||
room.zulip_stream, room.zulip_topic, message
|
|
||||||
)
|
|
||||||
message_id = response.get("id")
|
|
||||||
if message_id:
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript, {"zulip_message_id": message_id}
|
|
||||||
)
|
|
||||||
return message_id
|
|
||||||
|
|
||||||
return transcript.zulip_message_id
|
|
||||||
|
|
||||||
|
|
||||||
def extract_domain(url: str) -> str:
|
def extract_domain(url: str) -> str:
|
||||||
return urlparse(url).netloc
|
return urlparse(url).netloc
|
||||||
|
|
||||||
|
|||||||
@@ -7,8 +7,6 @@ elif [ "${ENTRYPOINT}" = "worker" ]; then
|
|||||||
uv run celery -A reflector.worker.app worker --loglevel=info
|
uv run celery -A reflector.worker.app worker --loglevel=info
|
||||||
elif [ "${ENTRYPOINT}" = "beat" ]; then
|
elif [ "${ENTRYPOINT}" = "beat" ]; then
|
||||||
uv run celery -A reflector.worker.app beat --loglevel=info
|
uv run celery -A reflector.worker.app beat --loglevel=info
|
||||||
elif [ "${ENTRYPOINT}" = "hatchet-worker" ]; then
|
|
||||||
uv run python -m reflector.hatchet.run_workers
|
|
||||||
else
|
else
|
||||||
echo "Unknown command"
|
echo "Unknown command"
|
||||||
fi
|
fi
|
||||||
|
|||||||
@@ -527,22 +527,6 @@ def fake_mp3_upload():
|
|||||||
yield
|
yield
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(autouse=True)
|
|
||||||
def reset_hatchet_client():
|
|
||||||
"""Reset HatchetClientManager singleton before and after each test.
|
|
||||||
|
|
||||||
This ensures test isolation - each test starts with a fresh client state.
|
|
||||||
The fixture is autouse=True so it applies to all tests automatically.
|
|
||||||
"""
|
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
|
||||||
|
|
||||||
# Reset before test
|
|
||||||
HatchetClientManager.reset()
|
|
||||||
yield
|
|
||||||
# Reset after test to clean up
|
|
||||||
HatchetClientManager.reset()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
async def fake_transcript_with_topics(tmpdir, client):
|
async def fake_transcript_with_topics(tmpdir, client):
|
||||||
import shutil
|
import shutil
|
||||||
|
|||||||
@@ -1,54 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for HatchetClientManager error handling and validation.
|
|
||||||
|
|
||||||
Only tests that catch real bugs - not mock verification tests.
|
|
||||||
|
|
||||||
Note: The `reset_hatchet_client` fixture (autouse=True in conftest.py)
|
|
||||||
automatically resets the singleton before and after each test.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from unittest.mock import AsyncMock, MagicMock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_hatchet_client_can_replay_handles_exception():
|
|
||||||
"""Test can_replay returns False when status check fails.
|
|
||||||
|
|
||||||
Useful: Ensures network/API errors don't crash the system and
|
|
||||||
gracefully allow reprocessing when workflow state is unknown.
|
|
||||||
"""
|
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
|
||||||
|
|
||||||
with patch("reflector.hatchet.client.settings") as mock_settings:
|
|
||||||
mock_settings.HATCHET_CLIENT_TOKEN = "test-token"
|
|
||||||
mock_settings.HATCHET_DEBUG = False
|
|
||||||
|
|
||||||
with patch("reflector.hatchet.client.Hatchet") as mock_hatchet_class:
|
|
||||||
mock_client = MagicMock()
|
|
||||||
mock_hatchet_class.return_value = mock_client
|
|
||||||
|
|
||||||
mock_client.runs.aio_get_status = AsyncMock(
|
|
||||||
side_effect=Exception("Network error")
|
|
||||||
)
|
|
||||||
|
|
||||||
can_replay = await HatchetClientManager.can_replay("workflow-123")
|
|
||||||
|
|
||||||
# Should return False on error (workflow might be gone)
|
|
||||||
assert can_replay is False
|
|
||||||
|
|
||||||
|
|
||||||
def test_hatchet_client_raises_without_token():
|
|
||||||
"""Test that get_client raises ValueError without token.
|
|
||||||
|
|
||||||
Useful: Catches if someone removes the token validation,
|
|
||||||
which would cause cryptic errors later.
|
|
||||||
"""
|
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
|
||||||
|
|
||||||
with patch("reflector.hatchet.client.settings") as mock_settings:
|
|
||||||
mock_settings.HATCHET_CLIENT_TOKEN = None
|
|
||||||
|
|
||||||
with pytest.raises(ValueError, match="HATCHET_CLIENT_TOKEN must be set"):
|
|
||||||
HatchetClientManager.get_client()
|
|
||||||
@@ -1,398 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for Hatchet workflow dispatch and routing logic.
|
|
||||||
|
|
||||||
These tests verify:
|
|
||||||
1. Routing to Hatchet when HATCHET_ENABLED=True
|
|
||||||
2. Replay logic for failed workflows
|
|
||||||
3. Force flag to cancel and restart
|
|
||||||
4. Validation prevents concurrent workflows
|
|
||||||
"""
|
|
||||||
|
|
||||||
from unittest.mock import AsyncMock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
from hatchet_sdk.clients.rest.exceptions import ApiException
|
|
||||||
from hatchet_sdk.clients.rest.models import V1TaskStatus
|
|
||||||
|
|
||||||
from reflector.db.transcripts import Transcript
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_hatchet_validation_blocks_running_workflow():
|
|
||||||
"""Test that validation blocks reprocessing when workflow is running."""
|
|
||||||
from reflector.services.transcript_process import (
|
|
||||||
ValidationAlreadyScheduled,
|
|
||||||
validate_transcript_for_processing,
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_transcript = Transcript(
|
|
||||||
id="test-transcript-id",
|
|
||||||
name="Test",
|
|
||||||
status="processing",
|
|
||||||
source_kind="room",
|
|
||||||
workflow_run_id="running-workflow-123",
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
|
||||||
mock_settings.HATCHET_ENABLED = True
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.HatchetClientManager"
|
|
||||||
) as mock_hatchet:
|
|
||||||
mock_hatchet.get_workflow_run_status = AsyncMock(
|
|
||||||
return_value=V1TaskStatus.RUNNING
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
|
||||||
|
|
||||||
assert isinstance(result, ValidationAlreadyScheduled)
|
|
||||||
assert "running" in result.detail.lower()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_hatchet_validation_blocks_queued_workflow():
|
|
||||||
"""Test that validation blocks reprocessing when workflow is queued."""
|
|
||||||
from reflector.services.transcript_process import (
|
|
||||||
ValidationAlreadyScheduled,
|
|
||||||
validate_transcript_for_processing,
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_transcript = Transcript(
|
|
||||||
id="test-transcript-id",
|
|
||||||
name="Test",
|
|
||||||
status="processing",
|
|
||||||
source_kind="room",
|
|
||||||
workflow_run_id="queued-workflow-123",
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
|
||||||
mock_settings.HATCHET_ENABLED = True
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.HatchetClientManager"
|
|
||||||
) as mock_hatchet:
|
|
||||||
mock_hatchet.get_workflow_run_status = AsyncMock(
|
|
||||||
return_value=V1TaskStatus.QUEUED
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
|
||||||
|
|
||||||
assert isinstance(result, ValidationAlreadyScheduled)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_hatchet_validation_allows_failed_workflow():
|
|
||||||
"""Test that validation allows reprocessing when workflow has failed."""
|
|
||||||
from reflector.services.transcript_process import (
|
|
||||||
ValidationOk,
|
|
||||||
validate_transcript_for_processing,
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_transcript = Transcript(
|
|
||||||
id="test-transcript-id",
|
|
||||||
name="Test",
|
|
||||||
status="error",
|
|
||||||
source_kind="room",
|
|
||||||
workflow_run_id="failed-workflow-123",
|
|
||||||
recording_id="test-recording-id",
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
|
||||||
mock_settings.HATCHET_ENABLED = True
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.HatchetClientManager"
|
|
||||||
) as mock_hatchet:
|
|
||||||
mock_hatchet.get_workflow_run_status = AsyncMock(
|
|
||||||
return_value=V1TaskStatus.FAILED
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
|
||||||
|
|
||||||
assert isinstance(result, ValidationOk)
|
|
||||||
assert result.transcript_id == "test-transcript-id"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_hatchet_validation_allows_completed_workflow():
|
|
||||||
"""Test that validation allows reprocessing when workflow has completed."""
|
|
||||||
from reflector.services.transcript_process import (
|
|
||||||
ValidationOk,
|
|
||||||
validate_transcript_for_processing,
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_transcript = Transcript(
|
|
||||||
id="test-transcript-id",
|
|
||||||
name="Test",
|
|
||||||
status="ended",
|
|
||||||
source_kind="room",
|
|
||||||
workflow_run_id="completed-workflow-123",
|
|
||||||
recording_id="test-recording-id",
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
|
||||||
mock_settings.HATCHET_ENABLED = True
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.HatchetClientManager"
|
|
||||||
) as mock_hatchet:
|
|
||||||
mock_hatchet.get_workflow_run_status = AsyncMock(
|
|
||||||
return_value=V1TaskStatus.COMPLETED
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
|
||||||
|
|
||||||
assert isinstance(result, ValidationOk)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_hatchet_validation_allows_when_status_check_fails():
|
|
||||||
"""Test that validation allows reprocessing when status check fails (workflow might be gone)."""
|
|
||||||
from reflector.services.transcript_process import (
|
|
||||||
ValidationOk,
|
|
||||||
validate_transcript_for_processing,
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_transcript = Transcript(
|
|
||||||
id="test-transcript-id",
|
|
||||||
name="Test",
|
|
||||||
status="error",
|
|
||||||
source_kind="room",
|
|
||||||
workflow_run_id="old-workflow-123",
|
|
||||||
recording_id="test-recording-id",
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
|
||||||
mock_settings.HATCHET_ENABLED = True
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.HatchetClientManager"
|
|
||||||
) as mock_hatchet:
|
|
||||||
# Status check fails (workflow might be deleted)
|
|
||||||
mock_hatchet.get_workflow_run_status = AsyncMock(
|
|
||||||
side_effect=ApiException("Workflow not found")
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
|
||||||
|
|
||||||
# Should allow processing when we can't get status
|
|
||||||
assert isinstance(result, ValidationOk)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_hatchet_validation_skipped_when_no_workflow_id():
|
|
||||||
"""Test that Hatchet validation is skipped when transcript has no workflow_run_id."""
|
|
||||||
from reflector.services.transcript_process import (
|
|
||||||
ValidationOk,
|
|
||||||
validate_transcript_for_processing,
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_transcript = Transcript(
|
|
||||||
id="test-transcript-id",
|
|
||||||
name="Test",
|
|
||||||
status="uploaded",
|
|
||||||
source_kind="room",
|
|
||||||
workflow_run_id=None, # No workflow yet
|
|
||||||
recording_id="test-recording-id",
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
|
||||||
mock_settings.HATCHET_ENABLED = True
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.HatchetClientManager"
|
|
||||||
) as mock_hatchet:
|
|
||||||
# Should not be called
|
|
||||||
mock_hatchet.get_workflow_run_status = AsyncMock()
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
|
||||||
|
|
||||||
# Should not check Hatchet status
|
|
||||||
mock_hatchet.get_workflow_run_status.assert_not_called()
|
|
||||||
assert isinstance(result, ValidationOk)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_hatchet_validation_skipped_when_disabled():
|
|
||||||
"""Test that Hatchet validation is skipped when HATCHET_ENABLED is False."""
|
|
||||||
from reflector.services.transcript_process import (
|
|
||||||
ValidationOk,
|
|
||||||
validate_transcript_for_processing,
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_transcript = Transcript(
|
|
||||||
id="test-transcript-id",
|
|
||||||
name="Test",
|
|
||||||
status="uploaded",
|
|
||||||
source_kind="room",
|
|
||||||
workflow_run_id="some-workflow-123",
|
|
||||||
recording_id="test-recording-id",
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch("reflector.services.transcript_process.settings") as mock_settings:
|
|
||||||
mock_settings.HATCHET_ENABLED = False # Hatchet disabled
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
|
||||||
|
|
||||||
# Should not check Hatchet at all
|
|
||||||
assert isinstance(result, ValidationOk)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_validation_locked_transcript():
|
|
||||||
"""Test that validation rejects locked transcripts."""
|
|
||||||
from reflector.services.transcript_process import (
|
|
||||||
ValidationLocked,
|
|
||||||
validate_transcript_for_processing,
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_transcript = Transcript(
|
|
||||||
id="test-transcript-id",
|
|
||||||
name="Test",
|
|
||||||
status="ended",
|
|
||||||
source_kind="room",
|
|
||||||
locked=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
|
||||||
|
|
||||||
assert isinstance(result, ValidationLocked)
|
|
||||||
assert "locked" in result.detail.lower()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_validation_idle_transcript():
|
|
||||||
"""Test that validation rejects idle transcripts (not ready)."""
|
|
||||||
from reflector.services.transcript_process import (
|
|
||||||
ValidationNotReady,
|
|
||||||
validate_transcript_for_processing,
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_transcript = Transcript(
|
|
||||||
id="test-transcript-id",
|
|
||||||
name="Test",
|
|
||||||
status="idle",
|
|
||||||
source_kind="room",
|
|
||||||
)
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
|
||||||
|
|
||||||
assert isinstance(result, ValidationNotReady)
|
|
||||||
assert "not ready" in result.detail.lower()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_prepare_multitrack_config():
|
|
||||||
"""Test preparing multitrack processing config."""
|
|
||||||
from reflector.db.recordings import Recording
|
|
||||||
from reflector.services.transcript_process import (
|
|
||||||
MultitrackProcessingConfig,
|
|
||||||
ValidationOk,
|
|
||||||
prepare_transcript_processing,
|
|
||||||
)
|
|
||||||
|
|
||||||
validation = ValidationOk(
|
|
||||||
recording_id="test-recording-id",
|
|
||||||
transcript_id="test-transcript-id",
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_recording = Recording(
|
|
||||||
id="test-recording-id",
|
|
||||||
bucket_name="test-bucket",
|
|
||||||
object_key="recordings/test",
|
|
||||||
recorded_at="2024-01-01T00:00:00Z",
|
|
||||||
track_keys=["track1.webm", "track2.webm"],
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.recordings_controller"
|
|
||||||
) as mock_rc:
|
|
||||||
mock_rc.get_by_id = AsyncMock(return_value=mock_recording)
|
|
||||||
|
|
||||||
result = await prepare_transcript_processing(validation)
|
|
||||||
|
|
||||||
assert isinstance(result, MultitrackProcessingConfig)
|
|
||||||
assert result.bucket_name == "test-bucket"
|
|
||||||
assert result.track_keys == ["track1.webm", "track2.webm"]
|
|
||||||
assert result.transcript_id == "test-transcript-id"
|
|
||||||
assert result.room_id is None # ValidationOk didn't specify room_id
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_prepare_file_config():
|
|
||||||
"""Test preparing file processing config (no track keys)."""
|
|
||||||
from reflector.db.recordings import Recording
|
|
||||||
from reflector.services.transcript_process import (
|
|
||||||
FileProcessingConfig,
|
|
||||||
ValidationOk,
|
|
||||||
prepare_transcript_processing,
|
|
||||||
)
|
|
||||||
|
|
||||||
validation = ValidationOk(
|
|
||||||
recording_id="test-recording-id",
|
|
||||||
transcript_id="test-transcript-id",
|
|
||||||
)
|
|
||||||
|
|
||||||
mock_recording = Recording(
|
|
||||||
id="test-recording-id",
|
|
||||||
bucket_name="test-bucket",
|
|
||||||
object_key="recordings/test.mp4",
|
|
||||||
recorded_at="2024-01-01T00:00:00Z",
|
|
||||||
track_keys=None, # No track keys = file pipeline
|
|
||||||
)
|
|
||||||
|
|
||||||
with patch(
|
|
||||||
"reflector.services.transcript_process.recordings_controller"
|
|
||||||
) as mock_rc:
|
|
||||||
mock_rc.get_by_id = AsyncMock(return_value=mock_recording)
|
|
||||||
|
|
||||||
result = await prepare_transcript_processing(validation)
|
|
||||||
|
|
||||||
assert isinstance(result, FileProcessingConfig)
|
|
||||||
assert result.transcript_id == "test-transcript-id"
|
|
||||||
3416
server/uv.lock
generated
3416
server/uv.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -91,6 +91,7 @@ const roomInitialState = {
|
|||||||
icsEnabled: false,
|
icsEnabled: false,
|
||||||
icsFetchInterval: 5,
|
icsFetchInterval: 5,
|
||||||
platform: "whereby",
|
platform: "whereby",
|
||||||
|
skipConsent: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
export default function RoomsList() {
|
export default function RoomsList() {
|
||||||
@@ -175,6 +176,7 @@ export default function RoomsList() {
|
|||||||
icsEnabled: detailedEditedRoom.ics_enabled || false,
|
icsEnabled: detailedEditedRoom.ics_enabled || false,
|
||||||
icsFetchInterval: detailedEditedRoom.ics_fetch_interval || 5,
|
icsFetchInterval: detailedEditedRoom.ics_fetch_interval || 5,
|
||||||
platform: detailedEditedRoom.platform,
|
platform: detailedEditedRoom.platform,
|
||||||
|
skipConsent: detailedEditedRoom.skip_consent || false,
|
||||||
}
|
}
|
||||||
: null,
|
: null,
|
||||||
[detailedEditedRoom],
|
[detailedEditedRoom],
|
||||||
@@ -326,6 +328,7 @@ export default function RoomsList() {
|
|||||||
ics_enabled: room.icsEnabled,
|
ics_enabled: room.icsEnabled,
|
||||||
ics_fetch_interval: room.icsFetchInterval,
|
ics_fetch_interval: room.icsFetchInterval,
|
||||||
platform,
|
platform,
|
||||||
|
skip_consent: room.skipConsent,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (isEditing) {
|
if (isEditing) {
|
||||||
@@ -388,6 +391,7 @@ export default function RoomsList() {
|
|||||||
icsEnabled: roomData.ics_enabled || false,
|
icsEnabled: roomData.ics_enabled || false,
|
||||||
icsFetchInterval: roomData.ics_fetch_interval || 5,
|
icsFetchInterval: roomData.ics_fetch_interval || 5,
|
||||||
platform: roomData.platform,
|
platform: roomData.platform,
|
||||||
|
skipConsent: roomData.skip_consent || false,
|
||||||
});
|
});
|
||||||
setEditRoomId(roomId);
|
setEditRoomId(roomId);
|
||||||
setIsEditing(true);
|
setIsEditing(true);
|
||||||
@@ -796,6 +800,34 @@ export default function RoomsList() {
|
|||||||
<Checkbox.Label>Shared room</Checkbox.Label>
|
<Checkbox.Label>Shared room</Checkbox.Label>
|
||||||
</Checkbox.Root>
|
</Checkbox.Root>
|
||||||
</Field.Root>
|
</Field.Root>
|
||||||
|
{room.recordingType === "cloud" && (
|
||||||
|
<Field.Root mt={4}>
|
||||||
|
<Checkbox.Root
|
||||||
|
name="skipConsent"
|
||||||
|
checked={room.skipConsent}
|
||||||
|
onCheckedChange={(e) => {
|
||||||
|
const syntheticEvent = {
|
||||||
|
target: {
|
||||||
|
name: "skipConsent",
|
||||||
|
type: "checkbox",
|
||||||
|
checked: e.checked,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
handleRoomChange(syntheticEvent);
|
||||||
|
}}
|
||||||
|
>
|
||||||
|
<Checkbox.HiddenInput />
|
||||||
|
<Checkbox.Control>
|
||||||
|
<Checkbox.Indicator />
|
||||||
|
</Checkbox.Control>
|
||||||
|
<Checkbox.Label>Skip consent dialog</Checkbox.Label>
|
||||||
|
</Checkbox.Root>
|
||||||
|
<Field.HelperText>
|
||||||
|
When enabled, participants won't be asked for
|
||||||
|
recording consent. Audio will be stored automatically.
|
||||||
|
</Field.HelperText>
|
||||||
|
</Field.Root>
|
||||||
|
)}
|
||||||
</Tabs.Content>
|
</Tabs.Content>
|
||||||
|
|
||||||
<Tabs.Content value="share" pt={6}>
|
<Tabs.Content value="share" pt={6}>
|
||||||
|
|||||||
@@ -2,19 +2,13 @@
|
|||||||
|
|
||||||
import { Spinner, Link } from "@chakra-ui/react";
|
import { Spinner, Link } from "@chakra-ui/react";
|
||||||
import { useAuth } from "../lib/AuthProvider";
|
import { useAuth } from "../lib/AuthProvider";
|
||||||
import { usePathname } from "next/navigation";
|
|
||||||
import { getLogoutRedirectUrl } from "../lib/auth";
|
|
||||||
|
|
||||||
export default function UserInfo() {
|
export default function UserInfo() {
|
||||||
const auth = useAuth();
|
const auth = useAuth();
|
||||||
const pathname = usePathname();
|
|
||||||
const status = auth.status;
|
const status = auth.status;
|
||||||
const isLoading = status === "loading";
|
const isLoading = status === "loading";
|
||||||
const isAuthenticated = status === "authenticated";
|
const isAuthenticated = status === "authenticated";
|
||||||
const isRefreshing = status === "refreshing";
|
const isRefreshing = status === "refreshing";
|
||||||
|
|
||||||
const callbackUrl = getLogoutRedirectUrl(pathname);
|
|
||||||
|
|
||||||
return isLoading ? (
|
return isLoading ? (
|
||||||
<Spinner size="xs" className="mx-3" />
|
<Spinner size="xs" className="mx-3" />
|
||||||
) : !isAuthenticated && !isRefreshing ? (
|
) : !isAuthenticated && !isRefreshing ? (
|
||||||
@@ -32,7 +26,7 @@ export default function UserInfo() {
|
|||||||
<Link
|
<Link
|
||||||
href="#"
|
href="#"
|
||||||
className="font-light px-2"
|
className="font-light px-2"
|
||||||
onClick={() => auth.signOut({ callbackUrl })}
|
onClick={() => auth.signOut({ callbackUrl: "/" })}
|
||||||
>
|
>
|
||||||
Log out
|
Log out
|
||||||
</Link>
|
</Link>
|
||||||
|
|||||||
@@ -26,6 +26,7 @@ import { useRouter } from "next/navigation";
|
|||||||
import { formatDateTime, formatStartedAgo } from "../lib/timeUtils";
|
import { formatDateTime, formatStartedAgo } from "../lib/timeUtils";
|
||||||
import MeetingMinimalHeader from "../components/MeetingMinimalHeader";
|
import MeetingMinimalHeader from "../components/MeetingMinimalHeader";
|
||||||
import { NonEmptyString } from "../lib/utils";
|
import { NonEmptyString } from "../lib/utils";
|
||||||
|
import { MeetingId } from "../lib/types";
|
||||||
|
|
||||||
type Meeting = components["schemas"]["Meeting"];
|
type Meeting = components["schemas"]["Meeting"];
|
||||||
|
|
||||||
@@ -98,7 +99,7 @@ export default function MeetingSelection({
|
|||||||
onMeetingSelect(meeting);
|
onMeetingSelect(meeting);
|
||||||
};
|
};
|
||||||
|
|
||||||
const handleEndMeeting = async (meetingId: string) => {
|
const handleEndMeeting = async (meetingId: MeetingId) => {
|
||||||
try {
|
try {
|
||||||
await deactivateMeetingMutation.mutateAsync({
|
await deactivateMeetingMutation.mutateAsync({
|
||||||
params: {
|
params: {
|
||||||
|
|||||||
@@ -1,35 +1,194 @@
|
|||||||
"use client";
|
"use client";
|
||||||
|
|
||||||
import { useCallback, useEffect, useRef, useState } from "react";
|
import {
|
||||||
|
RefObject,
|
||||||
|
useCallback,
|
||||||
|
useEffect,
|
||||||
|
useMemo,
|
||||||
|
useRef,
|
||||||
|
useState,
|
||||||
|
} from "react";
|
||||||
import { Box, Spinner, Center, Text } from "@chakra-ui/react";
|
import { Box, Spinner, Center, Text } from "@chakra-ui/react";
|
||||||
import { useRouter, useParams } from "next/navigation";
|
import { useRouter, useParams } from "next/navigation";
|
||||||
import DailyIframe, { DailyCall } from "@daily-co/daily-js";
|
import DailyIframe, {
|
||||||
|
DailyCall,
|
||||||
|
DailyCallOptions,
|
||||||
|
DailyCustomTrayButton,
|
||||||
|
DailyCustomTrayButtons,
|
||||||
|
DailyEventObjectCustomButtonClick,
|
||||||
|
DailyFactoryOptions,
|
||||||
|
DailyParticipantsObject,
|
||||||
|
} from "@daily-co/daily-js";
|
||||||
import type { components } from "../../reflector-api";
|
import type { components } from "../../reflector-api";
|
||||||
import { useAuth } from "../../lib/AuthProvider";
|
import { useAuth } from "../../lib/AuthProvider";
|
||||||
import {
|
import { useConsentDialog } from "../../lib/consent";
|
||||||
ConsentDialogButton,
|
|
||||||
recordingTypeRequiresConsent,
|
|
||||||
} from "../../lib/consent";
|
|
||||||
import { useRoomJoinMeeting } from "../../lib/apiHooks";
|
import { useRoomJoinMeeting } from "../../lib/apiHooks";
|
||||||
|
import { omit } from "remeda";
|
||||||
import { assertExists } from "../../lib/utils";
|
import { assertExists } from "../../lib/utils";
|
||||||
|
import { assertMeetingId } from "../../lib/types";
|
||||||
|
|
||||||
|
const CONSENT_BUTTON_ID = "recording-consent";
|
||||||
|
const RECORDING_INDICATOR_ID = "recording-indicator";
|
||||||
|
|
||||||
type Meeting = components["schemas"]["Meeting"];
|
type Meeting = components["schemas"]["Meeting"];
|
||||||
|
type Room = components["schemas"]["RoomDetails"];
|
||||||
|
|
||||||
interface DailyRoomProps {
|
type DailyRoomProps = {
|
||||||
meeting: Meeting;
|
meeting: Meeting;
|
||||||
}
|
room: Room;
|
||||||
|
};
|
||||||
|
|
||||||
export default function DailyRoom({ meeting }: DailyRoomProps) {
|
const useCustomTrayButtons = (
|
||||||
|
frame: {
|
||||||
|
updateCustomTrayButtons: (
|
||||||
|
customTrayButtons: DailyCustomTrayButtons,
|
||||||
|
) => void;
|
||||||
|
joined: boolean;
|
||||||
|
} | null,
|
||||||
|
) => {
|
||||||
|
const [, setCustomTrayButtons] = useState<DailyCustomTrayButtons>({});
|
||||||
|
return useCallback(
|
||||||
|
(id: string, button: DailyCustomTrayButton | null) => {
|
||||||
|
setCustomTrayButtons((prev) => {
|
||||||
|
// would blink state when frame blinks but it's ok here
|
||||||
|
const state =
|
||||||
|
button === null ? omit(prev, [id]) : { ...prev, [id]: button };
|
||||||
|
if (frame !== null && frame.joined)
|
||||||
|
frame.updateCustomTrayButtons(state);
|
||||||
|
return state;
|
||||||
|
});
|
||||||
|
},
|
||||||
|
[setCustomTrayButtons, frame],
|
||||||
|
);
|
||||||
|
};
|
||||||
|
|
||||||
|
const USE_FRAME_INIT_STATE = {
|
||||||
|
frame: null as DailyCall | null,
|
||||||
|
joined: false as boolean,
|
||||||
|
} as const;
|
||||||
|
|
||||||
|
// Daily js and not Daily react used right now because daily-js allows for prebuild interface vs. -react is customizable but has no nice defaults
|
||||||
|
const useFrame = (
|
||||||
|
container: HTMLDivElement | null,
|
||||||
|
cbs: {
|
||||||
|
onLeftMeeting: () => void;
|
||||||
|
onCustomButtonClick: (ev: DailyEventObjectCustomButtonClick) => void;
|
||||||
|
onJoinMeeting: (
|
||||||
|
startRecording: (args: { type: "raw-tracks" }) => void,
|
||||||
|
) => void;
|
||||||
|
},
|
||||||
|
) => {
|
||||||
|
const [{ frame, joined }, setState] = useState(USE_FRAME_INIT_STATE);
|
||||||
|
const setJoined = useCallback(
|
||||||
|
(joined: boolean) => setState((prev) => ({ ...prev, joined })),
|
||||||
|
[setState],
|
||||||
|
);
|
||||||
|
const setFrame = useCallback(
|
||||||
|
(frame: DailyCall | null) => setState((prev) => ({ ...prev, frame })),
|
||||||
|
[setState],
|
||||||
|
);
|
||||||
|
useEffect(() => {
|
||||||
|
if (!container) return;
|
||||||
|
const init = async () => {
|
||||||
|
const existingFrame = DailyIframe.getCallInstance();
|
||||||
|
if (existingFrame) {
|
||||||
|
console.error("existing daily frame present");
|
||||||
|
await existingFrame.destroy();
|
||||||
|
}
|
||||||
|
const frameOptions: DailyFactoryOptions = {
|
||||||
|
iframeStyle: {
|
||||||
|
width: "100vw",
|
||||||
|
height: "100vh",
|
||||||
|
border: "none",
|
||||||
|
},
|
||||||
|
showLeaveButton: true,
|
||||||
|
showFullscreenButton: true,
|
||||||
|
};
|
||||||
|
const frame = DailyIframe.createFrame(container, frameOptions);
|
||||||
|
setFrame(frame);
|
||||||
|
};
|
||||||
|
init().catch(
|
||||||
|
console.error.bind(console, "Failed to initialize daily frame:"),
|
||||||
|
);
|
||||||
|
return () => {
|
||||||
|
frame
|
||||||
|
?.destroy()
|
||||||
|
.catch(console.error.bind(console, "Failed to destroy daily frame:"));
|
||||||
|
setState(USE_FRAME_INIT_STATE);
|
||||||
|
};
|
||||||
|
}, [container]);
|
||||||
|
useEffect(() => {
|
||||||
|
if (!frame) return;
|
||||||
|
frame.on("left-meeting", cbs.onLeftMeeting);
|
||||||
|
frame.on("custom-button-click", cbs.onCustomButtonClick);
|
||||||
|
const joinCb = () => {
|
||||||
|
if (!frame) {
|
||||||
|
console.error("frame is null in joined-meeting callback");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
cbs.onJoinMeeting(frame.startRecording.bind(frame));
|
||||||
|
};
|
||||||
|
frame.on("joined-meeting", joinCb);
|
||||||
|
return () => {
|
||||||
|
frame.off("left-meeting", cbs.onLeftMeeting);
|
||||||
|
frame.off("custom-button-click", cbs.onCustomButtonClick);
|
||||||
|
frame.off("joined-meeting", joinCb);
|
||||||
|
};
|
||||||
|
}, [frame, cbs]);
|
||||||
|
const frame_ = useMemo(() => {
|
||||||
|
if (frame === null) return frame;
|
||||||
|
return {
|
||||||
|
join: async (
|
||||||
|
properties?: DailyCallOptions,
|
||||||
|
): Promise<DailyParticipantsObject | void> => {
|
||||||
|
await frame.join(properties);
|
||||||
|
setJoined(!frame.isDestroyed());
|
||||||
|
},
|
||||||
|
updateCustomTrayButtons: (
|
||||||
|
customTrayButtons: DailyCustomTrayButtons,
|
||||||
|
): DailyCall => frame.updateCustomTrayButtons(customTrayButtons),
|
||||||
|
};
|
||||||
|
}, [frame]);
|
||||||
|
const setCustomTrayButton = useCustomTrayButtons(
|
||||||
|
useMemo(() => {
|
||||||
|
if (frame_ === null) return null;
|
||||||
|
return {
|
||||||
|
updateCustomTrayButtons: frame_.updateCustomTrayButtons,
|
||||||
|
joined,
|
||||||
|
};
|
||||||
|
}, [frame_, joined]),
|
||||||
|
);
|
||||||
|
return [
|
||||||
|
frame_,
|
||||||
|
{
|
||||||
|
setCustomTrayButton,
|
||||||
|
},
|
||||||
|
] as const;
|
||||||
|
};
|
||||||
|
|
||||||
|
export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
||||||
const router = useRouter();
|
const router = useRouter();
|
||||||
const params = useParams();
|
const params = useParams();
|
||||||
const auth = useAuth();
|
const auth = useAuth();
|
||||||
const authLastUserId = auth.lastUserId;
|
const authLastUserId = auth.lastUserId;
|
||||||
const containerRef = useRef<HTMLDivElement>(null);
|
const [container, setContainer] = useState<HTMLDivElement | null>(null);
|
||||||
const joinMutation = useRoomJoinMeeting();
|
const joinMutation = useRoomJoinMeeting();
|
||||||
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
|
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
|
||||||
|
|
||||||
const roomName = params?.roomName as string;
|
const roomName = params?.roomName as string;
|
||||||
|
|
||||||
|
const {
|
||||||
|
showConsentModal,
|
||||||
|
showRecordingIndicator: showRecordingInTray,
|
||||||
|
showConsentButton,
|
||||||
|
} = useConsentDialog({
|
||||||
|
meetingId: assertMeetingId(meeting.id),
|
||||||
|
recordingType: meeting.recording_type,
|
||||||
|
skipConsent: room.skip_consent,
|
||||||
|
});
|
||||||
|
const showConsentModalRef = useRef(showConsentModal);
|
||||||
|
showConsentModalRef.current = showConsentModal;
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
if (authLastUserId === undefined || !meeting?.id || !roomName) return;
|
if (authLastUserId === undefined || !meeting?.id || !roomName) return;
|
||||||
|
|
||||||
@@ -49,7 +208,7 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
join();
|
join().catch(console.error.bind(console, "Failed to join meeting:"));
|
||||||
}, [meeting?.id, roomName, authLastUserId]);
|
}, [meeting?.id, roomName, authLastUserId]);
|
||||||
|
|
||||||
const roomUrl = joinedMeeting?.room_url;
|
const roomUrl = joinedMeeting?.room_url;
|
||||||
@@ -58,54 +217,46 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
|
|||||||
router.push("/browse");
|
router.push("/browse");
|
||||||
}, [router]);
|
}, [router]);
|
||||||
|
|
||||||
useEffect(() => {
|
const handleCustomButtonClick = useCallback(
|
||||||
if (authLastUserId === undefined || !roomUrl || !containerRef.current)
|
(ev: DailyEventObjectCustomButtonClick) => {
|
||||||
return;
|
if (ev.button_id === CONSENT_BUTTON_ID) {
|
||||||
|
showConsentModalRef.current();
|
||||||
let frame: DailyCall | null = null;
|
|
||||||
let destroyed = false;
|
|
||||||
|
|
||||||
const createAndJoin = async () => {
|
|
||||||
try {
|
|
||||||
const existingFrame = DailyIframe.getCallInstance();
|
|
||||||
if (existingFrame) {
|
|
||||||
await existingFrame.destroy();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
frame = DailyIframe.createFrame(containerRef.current!, {
|
|
||||||
iframeStyle: {
|
|
||||||
width: "100vw",
|
|
||||||
height: "100vh",
|
|
||||||
border: "none",
|
|
||||||
},
|
},
|
||||||
showLeaveButton: true,
|
[
|
||||||
showFullscreenButton: true,
|
/*keep static; iframe recreation depends on it*/
|
||||||
});
|
],
|
||||||
|
|
||||||
if (destroyed) {
|
|
||||||
await frame.destroy();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
frame.on("left-meeting", handleLeave);
|
|
||||||
|
|
||||||
frame.on("joined-meeting", async () => {
|
|
||||||
try {
|
|
||||||
const frameInstance = assertExists(
|
|
||||||
frame,
|
|
||||||
"frame object got lost somewhere after frame.on was called",
|
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const handleFrameJoinMeeting = useCallback(
|
||||||
|
(startRecording: (args: { type: "raw-tracks" }) => void) => {
|
||||||
|
try {
|
||||||
if (meeting.recording_type === "cloud") {
|
if (meeting.recording_type === "cloud") {
|
||||||
console.log("Starting cloud recording");
|
console.log("Starting cloud recording");
|
||||||
await frameInstance.startRecording({ type: "raw-tracks" });
|
startRecording({ type: "raw-tracks" });
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Failed to start recording:", error);
|
console.error("Failed to start recording:", error);
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
[meeting.recording_type],
|
||||||
|
);
|
||||||
|
|
||||||
|
const recordingIconUrl = useMemo(
|
||||||
|
() => new URL("/recording-icon.svg", window.location.origin),
|
||||||
|
[],
|
||||||
|
);
|
||||||
|
|
||||||
|
const [frame, { setCustomTrayButton }] = useFrame(container, {
|
||||||
|
onLeftMeeting: handleLeave,
|
||||||
|
onCustomButtonClick: handleCustomButtonClick,
|
||||||
|
onJoinMeeting: handleFrameJoinMeeting,
|
||||||
});
|
});
|
||||||
|
|
||||||
await frame.join({
|
useEffect(() => {
|
||||||
|
if (!frame || !roomUrl) return;
|
||||||
|
frame
|
||||||
|
.join({
|
||||||
url: roomUrl,
|
url: roomUrl,
|
||||||
sendSettings: {
|
sendSettings: {
|
||||||
video: {
|
video: {
|
||||||
@@ -117,25 +268,35 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
|
|||||||
},
|
},
|
||||||
// Note: screenVideo intentionally not configured to preserve full quality for screen shares
|
// Note: screenVideo intentionally not configured to preserve full quality for screen shares
|
||||||
},
|
},
|
||||||
});
|
})
|
||||||
} catch (error) {
|
.catch(console.error.bind(console, "Failed to join daily room:"));
|
||||||
console.error("Error creating Daily frame:", error);
|
}, [frame, roomUrl]);
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
createAndJoin().catch((error) => {
|
useEffect(() => {
|
||||||
console.error("Failed to create and join meeting:", error);
|
setCustomTrayButton(
|
||||||
});
|
RECORDING_INDICATOR_ID,
|
||||||
|
showRecordingInTray
|
||||||
return () => {
|
? {
|
||||||
destroyed = true;
|
iconPath: recordingIconUrl.href,
|
||||||
if (frame) {
|
label: "Recording",
|
||||||
frame.destroy().catch((e) => {
|
tooltip: "Recording in progress",
|
||||||
console.error("Error destroying frame:", e);
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
};
|
: null,
|
||||||
}, [roomUrl, authLastUserId, handleLeave]);
|
);
|
||||||
|
}, [showRecordingInTray, recordingIconUrl, setCustomTrayButton]);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
setCustomTrayButton(
|
||||||
|
CONSENT_BUTTON_ID,
|
||||||
|
showConsentButton
|
||||||
|
? {
|
||||||
|
iconPath: recordingIconUrl.href,
|
||||||
|
label: "Recording (click to consent)",
|
||||||
|
tooltip: "Recording (click to consent)",
|
||||||
|
}
|
||||||
|
: null,
|
||||||
|
);
|
||||||
|
}, [showConsentButton, recordingIconUrl, setCustomTrayButton]);
|
||||||
|
|
||||||
if (authLastUserId === undefined) {
|
if (authLastUserId === undefined) {
|
||||||
return (
|
return (
|
||||||
@@ -159,10 +320,7 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
|
|||||||
|
|
||||||
return (
|
return (
|
||||||
<Box position="relative" width="100vw" height="100vh">
|
<Box position="relative" width="100vw" height="100vh">
|
||||||
<div ref={containerRef} style={{ width: "100%", height: "100%" }} />
|
<div ref={setContainer} style={{ width: "100%", height: "100%" }} />
|
||||||
{meeting.recording_type &&
|
|
||||||
recordingTypeRequiresConsent(meeting.recording_type) &&
|
|
||||||
meeting.id && <ConsentDialogButton meetingId={meeting.id} />}
|
|
||||||
</Box>
|
</Box>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import { useAuth } from "../../lib/AuthProvider";
|
|||||||
import { useError } from "../../(errors)/errorContext";
|
import { useError } from "../../(errors)/errorContext";
|
||||||
import { parseNonEmptyString } from "../../lib/utils";
|
import { parseNonEmptyString } from "../../lib/utils";
|
||||||
import { printApiError } from "../../api/_error";
|
import { printApiError } from "../../api/_error";
|
||||||
|
import { assertMeetingId } from "../../lib/types";
|
||||||
|
|
||||||
type Meeting = components["schemas"]["Meeting"];
|
type Meeting = components["schemas"]["Meeting"];
|
||||||
|
|
||||||
@@ -67,7 +68,10 @@ export default function RoomContainer(details: RoomDetails) {
|
|||||||
room && !room.ics_enabled && !pageMeetingId ? roomName : null,
|
room && !room.ics_enabled && !pageMeetingId ? roomName : null,
|
||||||
);
|
);
|
||||||
|
|
||||||
const explicitMeeting = useRoomGetMeeting(roomName, pageMeetingId || null);
|
const explicitMeeting = useRoomGetMeeting(
|
||||||
|
roomName,
|
||||||
|
pageMeetingId ? assertMeetingId(pageMeetingId) : null,
|
||||||
|
);
|
||||||
|
|
||||||
const meeting = explicitMeeting.data || defaultMeeting.response;
|
const meeting = explicitMeeting.data || defaultMeeting.response;
|
||||||
|
|
||||||
@@ -192,9 +196,9 @@ export default function RoomContainer(details: RoomDetails) {
|
|||||||
|
|
||||||
switch (platform) {
|
switch (platform) {
|
||||||
case "daily":
|
case "daily":
|
||||||
return <DailyRoom meeting={meeting} />;
|
return <DailyRoom meeting={meeting} room={room} />;
|
||||||
case "whereby":
|
case "whereby":
|
||||||
return <WherebyRoom meeting={meeting} />;
|
return <WherebyRoom meeting={meeting} room={room} />;
|
||||||
default: {
|
default: {
|
||||||
const _exhaustive: never = platform;
|
const _exhaustive: never = platform;
|
||||||
return (
|
return (
|
||||||
|
|||||||
@@ -5,24 +5,29 @@ import { useRouter } from "next/navigation";
|
|||||||
import type { components } from "../../reflector-api";
|
import type { components } from "../../reflector-api";
|
||||||
import { useAuth } from "../../lib/AuthProvider";
|
import { useAuth } from "../../lib/AuthProvider";
|
||||||
import { getWherebyUrl, useWhereby } from "../../lib/wherebyClient";
|
import { getWherebyUrl, useWhereby } from "../../lib/wherebyClient";
|
||||||
import { assertExistsAndNonEmptyString, NonEmptyString } from "../../lib/utils";
|
|
||||||
import {
|
import {
|
||||||
ConsentDialogButton as BaseConsentDialogButton,
|
ConsentDialogButton as BaseConsentDialogButton,
|
||||||
useConsentDialog,
|
useConsentDialog,
|
||||||
recordingTypeRequiresConsent,
|
|
||||||
} from "../../lib/consent";
|
} from "../../lib/consent";
|
||||||
|
import { assertMeetingId, MeetingId } from "../../lib/types";
|
||||||
|
|
||||||
type Meeting = components["schemas"]["Meeting"];
|
type Meeting = components["schemas"]["Meeting"];
|
||||||
|
type Room = components["schemas"]["RoomDetails"];
|
||||||
|
|
||||||
interface WherebyRoomProps {
|
interface WherebyRoomProps {
|
||||||
meeting: Meeting;
|
meeting: Meeting;
|
||||||
|
room: Room;
|
||||||
}
|
}
|
||||||
|
|
||||||
function WherebyConsentDialogButton({
|
function WherebyConsentDialogButton({
|
||||||
meetingId,
|
meetingId,
|
||||||
|
recordingType,
|
||||||
|
skipConsent,
|
||||||
wherebyRef,
|
wherebyRef,
|
||||||
}: {
|
}: {
|
||||||
meetingId: NonEmptyString;
|
meetingId: MeetingId;
|
||||||
|
recordingType: Meeting["recording_type"];
|
||||||
|
skipConsent: boolean;
|
||||||
wherebyRef: React.RefObject<HTMLElement>;
|
wherebyRef: React.RefObject<HTMLElement>;
|
||||||
}) {
|
}) {
|
||||||
const previousFocusRef = useRef<HTMLElement | null>(null);
|
const previousFocusRef = useRef<HTMLElement | null>(null);
|
||||||
@@ -45,10 +50,16 @@ function WherebyConsentDialogButton({
|
|||||||
};
|
};
|
||||||
}, [wherebyRef]);
|
}, [wherebyRef]);
|
||||||
|
|
||||||
return <BaseConsentDialogButton meetingId={meetingId} />;
|
return (
|
||||||
|
<BaseConsentDialogButton
|
||||||
|
meetingId={meetingId}
|
||||||
|
recordingType={recordingType}
|
||||||
|
skipConsent={skipConsent}
|
||||||
|
/>
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export default function WherebyRoom({ meeting }: WherebyRoomProps) {
|
export default function WherebyRoom({ meeting, room }: WherebyRoomProps) {
|
||||||
const wherebyLoaded = useWhereby();
|
const wherebyLoaded = useWhereby();
|
||||||
const wherebyRef = useRef<HTMLElement>(null);
|
const wherebyRef = useRef<HTMLElement>(null);
|
||||||
const router = useRouter();
|
const router = useRouter();
|
||||||
@@ -57,9 +68,14 @@ export default function WherebyRoom({ meeting }: WherebyRoomProps) {
|
|||||||
const isAuthenticated = status === "authenticated";
|
const isAuthenticated = status === "authenticated";
|
||||||
|
|
||||||
const wherebyRoomUrl = getWherebyUrl(meeting);
|
const wherebyRoomUrl = getWherebyUrl(meeting);
|
||||||
const recordingType = meeting.recording_type;
|
|
||||||
const meetingId = meeting.id;
|
const meetingId = meeting.id;
|
||||||
|
|
||||||
|
const { showConsentButton } = useConsentDialog({
|
||||||
|
meetingId: assertMeetingId(meetingId),
|
||||||
|
recordingType: meeting.recording_type,
|
||||||
|
skipConsent: room.skip_consent,
|
||||||
|
});
|
||||||
|
|
||||||
const isLoading = status === "loading";
|
const isLoading = status === "loading";
|
||||||
|
|
||||||
const handleLeave = useCallback(() => {
|
const handleLeave = useCallback(() => {
|
||||||
@@ -88,11 +104,11 @@ export default function WherebyRoom({ meeting }: WherebyRoomProps) {
|
|||||||
room={wherebyRoomUrl}
|
room={wherebyRoomUrl}
|
||||||
style={{ width: "100vw", height: "100vh" }}
|
style={{ width: "100vw", height: "100vh" }}
|
||||||
/>
|
/>
|
||||||
{recordingType &&
|
{showConsentButton && (
|
||||||
recordingTypeRequiresConsent(recordingType) &&
|
|
||||||
meetingId && (
|
|
||||||
<WherebyConsentDialogButton
|
<WherebyConsentDialogButton
|
||||||
meetingId={assertExistsAndNonEmptyString(meetingId)}
|
meetingId={assertMeetingId(meetingId)}
|
||||||
|
recordingType={meeting.recording_type}
|
||||||
|
skipConsent={room.skip_consent}
|
||||||
wherebyRef={wherebyRef}
|
wherebyRef={wherebyRef}
|
||||||
/>
|
/>
|
||||||
)}
|
)}
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import {
|
|||||||
useEffect,
|
useEffect,
|
||||||
useRef,
|
useRef,
|
||||||
useState,
|
useState,
|
||||||
useContext,
|
|
||||||
RefObject,
|
RefObject,
|
||||||
use,
|
use,
|
||||||
} from "react";
|
} from "react";
|
||||||
@@ -25,8 +24,6 @@ import { useRecordingConsent } from "../recordingConsentContext";
|
|||||||
import {
|
import {
|
||||||
useMeetingAudioConsent,
|
useMeetingAudioConsent,
|
||||||
useRoomGetByName,
|
useRoomGetByName,
|
||||||
useRoomActiveMeetings,
|
|
||||||
useRoomUpcomingMeetings,
|
|
||||||
useRoomsCreateMeeting,
|
useRoomsCreateMeeting,
|
||||||
useRoomGetMeeting,
|
useRoomGetMeeting,
|
||||||
} from "../lib/apiHooks";
|
} from "../lib/apiHooks";
|
||||||
@@ -39,12 +36,9 @@ import { FaBars } from "react-icons/fa6";
|
|||||||
import { useAuth } from "../lib/AuthProvider";
|
import { useAuth } from "../lib/AuthProvider";
|
||||||
import { getWherebyUrl, useWhereby } from "../lib/wherebyClient";
|
import { getWherebyUrl, useWhereby } from "../lib/wherebyClient";
|
||||||
import { useError } from "../(errors)/errorContext";
|
import { useError } from "../(errors)/errorContext";
|
||||||
import {
|
import { parseNonEmptyString } from "../lib/utils";
|
||||||
assertExistsAndNonEmptyString,
|
|
||||||
NonEmptyString,
|
|
||||||
parseNonEmptyString,
|
|
||||||
} from "../lib/utils";
|
|
||||||
import { printApiError } from "../api/_error";
|
import { printApiError } from "../api/_error";
|
||||||
|
import { assertMeetingId, MeetingId } from "../lib/types";
|
||||||
|
|
||||||
export type RoomDetails = {
|
export type RoomDetails = {
|
||||||
params: Promise<{
|
params: Promise<{
|
||||||
@@ -92,16 +86,16 @@ const useConsentWherebyFocusManagement = (
|
|||||||
};
|
};
|
||||||
|
|
||||||
const useConsentDialog = (
|
const useConsentDialog = (
|
||||||
meetingId: string,
|
meetingId: MeetingId,
|
||||||
wherebyRef: RefObject<HTMLElement> /*accessibility*/,
|
wherebyRef: RefObject<HTMLElement> /*accessibility*/,
|
||||||
) => {
|
) => {
|
||||||
const { state: consentState, touch, hasConsent } = useRecordingConsent();
|
const { state: consentState, touch, hasAnswered } = useRecordingConsent();
|
||||||
// toast would open duplicates, even with using "id=" prop
|
// toast would open duplicates, even with using "id=" prop
|
||||||
const [modalOpen, setModalOpen] = useState(false);
|
const [modalOpen, setModalOpen] = useState(false);
|
||||||
const audioConsentMutation = useMeetingAudioConsent();
|
const audioConsentMutation = useMeetingAudioConsent();
|
||||||
|
|
||||||
const handleConsent = useCallback(
|
const handleConsent = useCallback(
|
||||||
async (meetingId: string, given: boolean) => {
|
async (meetingId: MeetingId, given: boolean) => {
|
||||||
try {
|
try {
|
||||||
await audioConsentMutation.mutateAsync({
|
await audioConsentMutation.mutateAsync({
|
||||||
params: {
|
params: {
|
||||||
@@ -114,7 +108,7 @@ const useConsentDialog = (
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
touch(meetingId);
|
touch(meetingId, given);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Error submitting consent:", error);
|
console.error("Error submitting consent:", error);
|
||||||
}
|
}
|
||||||
@@ -216,7 +210,7 @@ const useConsentDialog = (
|
|||||||
return {
|
return {
|
||||||
showConsentModal,
|
showConsentModal,
|
||||||
consentState,
|
consentState,
|
||||||
hasConsent,
|
hasAnswered,
|
||||||
consentLoading: audioConsentMutation.isPending,
|
consentLoading: audioConsentMutation.isPending,
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
@@ -225,13 +219,13 @@ function ConsentDialogButton({
|
|||||||
meetingId,
|
meetingId,
|
||||||
wherebyRef,
|
wherebyRef,
|
||||||
}: {
|
}: {
|
||||||
meetingId: NonEmptyString;
|
meetingId: MeetingId;
|
||||||
wherebyRef: React.RefObject<HTMLElement>;
|
wherebyRef: React.RefObject<HTMLElement>;
|
||||||
}) {
|
}) {
|
||||||
const { showConsentModal, consentState, hasConsent, consentLoading } =
|
const { showConsentModal, consentState, hasAnswered, consentLoading } =
|
||||||
useConsentDialog(meetingId, wherebyRef);
|
useConsentDialog(meetingId, wherebyRef);
|
||||||
|
|
||||||
if (!consentState.ready || hasConsent(meetingId) || consentLoading) {
|
if (!consentState.ready || hasAnswered(meetingId) || consentLoading) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -284,7 +278,10 @@ export default function Room(details: RoomDetails) {
|
|||||||
room && !room.ics_enabled && !pageMeetingId ? roomName : null,
|
room && !room.ics_enabled && !pageMeetingId ? roomName : null,
|
||||||
);
|
);
|
||||||
|
|
||||||
const explicitMeeting = useRoomGetMeeting(roomName, pageMeetingId || null);
|
const explicitMeeting = useRoomGetMeeting(
|
||||||
|
roomName,
|
||||||
|
pageMeetingId ? assertMeetingId(pageMeetingId) : null,
|
||||||
|
);
|
||||||
const wherebyRoomUrl = explicitMeeting.data
|
const wherebyRoomUrl = explicitMeeting.data
|
||||||
? getWherebyUrl(explicitMeeting.data)
|
? getWherebyUrl(explicitMeeting.data)
|
||||||
: defaultMeeting.response
|
: defaultMeeting.response
|
||||||
@@ -437,7 +434,7 @@ export default function Room(details: RoomDetails) {
|
|||||||
recordingTypeRequiresConsent(recordingType) &&
|
recordingTypeRequiresConsent(recordingType) &&
|
||||||
meetingId && (
|
meetingId && (
|
||||||
<ConsentDialogButton
|
<ConsentDialogButton
|
||||||
meetingId={assertExistsAndNonEmptyString(meetingId)}
|
meetingId={assertMeetingId(meetingId)}
|
||||||
wherebyRef={wherebyRef}
|
wherebyRef={wherebyRef}
|
||||||
/>
|
/>
|
||||||
)}
|
)}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import { useError } from "../(errors)/errorContext";
|
|||||||
import { QueryClient, useQueryClient } from "@tanstack/react-query";
|
import { QueryClient, useQueryClient } from "@tanstack/react-query";
|
||||||
import type { components } from "../reflector-api";
|
import type { components } from "../reflector-api";
|
||||||
import { useAuth } from "./AuthProvider";
|
import { useAuth } from "./AuthProvider";
|
||||||
|
import { MeetingId } from "./types";
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
|
* XXX error types returned from the hooks are not always correct; declared types are ValidationError but real type could be string or any other
|
||||||
@@ -718,7 +719,7 @@ export function useRoomActiveMeetings(roomName: string | null) {
|
|||||||
|
|
||||||
export function useRoomGetMeeting(
|
export function useRoomGetMeeting(
|
||||||
roomName: string | null,
|
roomName: string | null,
|
||||||
meetingId: string | null,
|
meetingId: MeetingId | null,
|
||||||
) {
|
) {
|
||||||
return $api.useQuery(
|
return $api.useQuery(
|
||||||
"get",
|
"get",
|
||||||
|
|||||||
@@ -18,8 +18,3 @@ export const LOGIN_REQUIRED_PAGES = [
|
|||||||
export const PROTECTED_PAGES = new RegExp(
|
export const PROTECTED_PAGES = new RegExp(
|
||||||
LOGIN_REQUIRED_PAGES.map((page) => `^${page}$`).join("|"),
|
LOGIN_REQUIRED_PAGES.map((page) => `^${page}$`).join("|"),
|
||||||
);
|
);
|
||||||
|
|
||||||
export function getLogoutRedirectUrl(pathname: string): string {
|
|
||||||
const transcriptPagePattern = /^\/transcripts\/[^/]+$/;
|
|
||||||
return transcriptPagePattern.test(pathname) ? pathname : "/";
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
"use client";
|
"use client";
|
||||||
|
|
||||||
|
import { useState, useEffect } from "react";
|
||||||
import { Box, Button, Text, VStack, HStack } from "@chakra-ui/react";
|
import { Box, Button, Text, VStack, HStack } from "@chakra-ui/react";
|
||||||
import { CONSENT_DIALOG_TEXT } from "./constants";
|
import { CONSENT_DIALOG_TEXT } from "./constants";
|
||||||
|
|
||||||
@@ -9,6 +10,15 @@ interface ConsentDialogProps {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function ConsentDialog({ onAccept, onReject }: ConsentDialogProps) {
|
export function ConsentDialog({ onAccept, onReject }: ConsentDialogProps) {
|
||||||
|
const [acceptButton, setAcceptButton] = useState<HTMLButtonElement | null>(
|
||||||
|
null,
|
||||||
|
);
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
// Auto-focus accept button so Escape key works (Daily iframe captures keyboard otherwise)
|
||||||
|
acceptButton?.focus();
|
||||||
|
}, [acceptButton]);
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<Box
|
<Box
|
||||||
p={6}
|
p={6}
|
||||||
@@ -26,7 +36,12 @@ export function ConsentDialog({ onAccept, onReject }: ConsentDialogProps) {
|
|||||||
<Button variant="ghost" size="sm" onClick={onReject}>
|
<Button variant="ghost" size="sm" onClick={onReject}>
|
||||||
{CONSENT_DIALOG_TEXT.rejectButton}
|
{CONSENT_DIALOG_TEXT.rejectButton}
|
||||||
</Button>
|
</Button>
|
||||||
<Button colorPalette="primary" size="sm" onClick={onAccept}>
|
<Button
|
||||||
|
ref={setAcceptButton}
|
||||||
|
colorPalette="primary"
|
||||||
|
size="sm"
|
||||||
|
onClick={onAccept}
|
||||||
|
>
|
||||||
{CONSENT_DIALOG_TEXT.acceptButton}
|
{CONSENT_DIALOG_TEXT.acceptButton}
|
||||||
</Button>
|
</Button>
|
||||||
</HStack>
|
</HStack>
|
||||||
|
|||||||
@@ -9,16 +9,26 @@ import {
|
|||||||
CONSENT_BUTTON_Z_INDEX,
|
CONSENT_BUTTON_Z_INDEX,
|
||||||
CONSENT_DIALOG_TEXT,
|
CONSENT_DIALOG_TEXT,
|
||||||
} from "./constants";
|
} from "./constants";
|
||||||
|
import { MeetingId } from "../types";
|
||||||
|
import type { components } from "../../reflector-api";
|
||||||
|
|
||||||
interface ConsentDialogButtonProps {
|
type Meeting = components["schemas"]["Meeting"];
|
||||||
meetingId: string;
|
|
||||||
}
|
|
||||||
|
|
||||||
export function ConsentDialogButton({ meetingId }: ConsentDialogButtonProps) {
|
type ConsentDialogButtonProps = {
|
||||||
const { showConsentModal, consentState, hasConsent, consentLoading } =
|
meetingId: MeetingId;
|
||||||
useConsentDialog(meetingId);
|
recordingType: Meeting["recording_type"];
|
||||||
|
skipConsent: boolean;
|
||||||
|
};
|
||||||
|
|
||||||
if (!consentState.ready || hasConsent(meetingId) || consentLoading) {
|
export function ConsentDialogButton({
|
||||||
|
meetingId,
|
||||||
|
recordingType,
|
||||||
|
skipConsent,
|
||||||
|
}: ConsentDialogButtonProps) {
|
||||||
|
const { showConsentModal, consentState, showConsentButton, consentLoading } =
|
||||||
|
useConsentDialog({ meetingId, recordingType, skipConsent });
|
||||||
|
|
||||||
|
if (!consentState.ready || !showConsentButton || consentLoading) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
33
www/app/lib/consent/RecordingIndicator.tsx
Normal file
33
www/app/lib/consent/RecordingIndicator.tsx
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
"use client";
|
||||||
|
|
||||||
|
import { Box, Text } from "@chakra-ui/react";
|
||||||
|
import { FaCircle } from "react-icons/fa6";
|
||||||
|
import {
|
||||||
|
CONSENT_BUTTON_TOP_OFFSET,
|
||||||
|
CONSENT_BUTTON_LEFT_OFFSET,
|
||||||
|
CONSENT_BUTTON_Z_INDEX,
|
||||||
|
} from "./constants";
|
||||||
|
|
||||||
|
export function RecordingIndicator() {
|
||||||
|
return (
|
||||||
|
<Box
|
||||||
|
position="absolute"
|
||||||
|
top={CONSENT_BUTTON_TOP_OFFSET}
|
||||||
|
left={CONSENT_BUTTON_LEFT_OFFSET}
|
||||||
|
zIndex={CONSENT_BUTTON_Z_INDEX}
|
||||||
|
display="flex"
|
||||||
|
alignItems="center"
|
||||||
|
gap={2}
|
||||||
|
bg="red.500"
|
||||||
|
color="white"
|
||||||
|
px={3}
|
||||||
|
py={1.5}
|
||||||
|
borderRadius="md"
|
||||||
|
fontSize="sm"
|
||||||
|
fontWeight="medium"
|
||||||
|
>
|
||||||
|
<FaCircle size={8} />
|
||||||
|
<Text>Recording</Text>
|
||||||
|
</Box>
|
||||||
|
);
|
||||||
|
}
|
||||||
@@ -2,6 +2,7 @@
|
|||||||
|
|
||||||
export { ConsentDialogButton } from "./ConsentDialogButton";
|
export { ConsentDialogButton } from "./ConsentDialogButton";
|
||||||
export { ConsentDialog } from "./ConsentDialog";
|
export { ConsentDialog } from "./ConsentDialog";
|
||||||
|
export { RecordingIndicator } from "./RecordingIndicator";
|
||||||
export { useConsentDialog } from "./useConsentDialog";
|
export { useConsentDialog } from "./useConsentDialog";
|
||||||
export { recordingTypeRequiresConsent } from "./utils";
|
export { recordingTypeRequiresConsent } from "./utils";
|
||||||
export * from "./constants";
|
export * from "./constants";
|
||||||
|
|||||||
@@ -1,9 +1,14 @@
|
|||||||
export interface ConsentDialogResult {
|
import { MeetingId } from "../types";
|
||||||
|
|
||||||
|
export type ConsentDialogResult = {
|
||||||
showConsentModal: () => void;
|
showConsentModal: () => void;
|
||||||
consentState: {
|
consentState: {
|
||||||
ready: boolean;
|
ready: boolean;
|
||||||
consentAnsweredForMeetings?: Set<string>;
|
consentForMeetings?: Map<MeetingId, boolean>;
|
||||||
};
|
};
|
||||||
hasConsent: (meetingId: string) => boolean;
|
hasAnswered: (meetingId: MeetingId) => boolean;
|
||||||
|
hasAccepted: (meetingId: MeetingId) => boolean;
|
||||||
consentLoading: boolean;
|
consentLoading: boolean;
|
||||||
}
|
showRecordingIndicator: boolean;
|
||||||
|
showConsentButton: boolean;
|
||||||
|
};
|
||||||
|
|||||||
@@ -7,9 +7,29 @@ import { useMeetingAudioConsent } from "../apiHooks";
|
|||||||
import { ConsentDialog } from "./ConsentDialog";
|
import { ConsentDialog } from "./ConsentDialog";
|
||||||
import { TOAST_CHECK_INTERVAL_MS } from "./constants";
|
import { TOAST_CHECK_INTERVAL_MS } from "./constants";
|
||||||
import type { ConsentDialogResult } from "./types";
|
import type { ConsentDialogResult } from "./types";
|
||||||
|
import { MeetingId } from "../types";
|
||||||
|
import { recordingTypeRequiresConsent } from "./utils";
|
||||||
|
import type { components } from "../../reflector-api";
|
||||||
|
|
||||||
export function useConsentDialog(meetingId: string): ConsentDialogResult {
|
type Meeting = components["schemas"]["Meeting"];
|
||||||
const { state: consentState, touch, hasConsent } = useRecordingConsent();
|
|
||||||
|
type UseConsentDialogParams = {
|
||||||
|
meetingId: MeetingId;
|
||||||
|
recordingType: Meeting["recording_type"];
|
||||||
|
skipConsent: boolean;
|
||||||
|
};
|
||||||
|
|
||||||
|
export function useConsentDialog({
|
||||||
|
meetingId,
|
||||||
|
recordingType,
|
||||||
|
skipConsent,
|
||||||
|
}: UseConsentDialogParams): ConsentDialogResult {
|
||||||
|
const {
|
||||||
|
state: consentState,
|
||||||
|
touch,
|
||||||
|
hasAnswered,
|
||||||
|
hasAccepted,
|
||||||
|
} = useRecordingConsent();
|
||||||
const [modalOpen, setModalOpen] = useState(false);
|
const [modalOpen, setModalOpen] = useState(false);
|
||||||
const audioConsentMutation = useMeetingAudioConsent();
|
const audioConsentMutation = useMeetingAudioConsent();
|
||||||
const intervalRef = useRef<NodeJS.Timeout | null>(null);
|
const intervalRef = useRef<NodeJS.Timeout | null>(null);
|
||||||
@@ -42,7 +62,7 @@ export function useConsentDialog(meetingId: string): ConsentDialogResult {
|
|||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
touch(meetingId);
|
touch(meetingId, given);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Error submitting consent:", error);
|
console.error("Error submitting consent:", error);
|
||||||
}
|
}
|
||||||
@@ -100,10 +120,23 @@ export function useConsentDialog(meetingId: string): ConsentDialogResult {
|
|||||||
});
|
});
|
||||||
}, [handleConsent, modalOpen]);
|
}, [handleConsent, modalOpen]);
|
||||||
|
|
||||||
|
const requiresConsent = Boolean(
|
||||||
|
recordingType && recordingTypeRequiresConsent(recordingType),
|
||||||
|
);
|
||||||
|
|
||||||
|
const showRecordingIndicator =
|
||||||
|
requiresConsent && (skipConsent || hasAccepted(meetingId));
|
||||||
|
|
||||||
|
const showConsentButton =
|
||||||
|
requiresConsent && !skipConsent && !hasAnswered(meetingId);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
showConsentModal,
|
showConsentModal,
|
||||||
consentState,
|
consentState,
|
||||||
hasConsent,
|
hasAnswered,
|
||||||
|
hasAccepted,
|
||||||
consentLoading: audioConsentMutation.isPending,
|
consentLoading: audioConsentMutation.isPending,
|
||||||
|
showRecordingIndicator,
|
||||||
|
showConsentButton,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,10 @@
|
|||||||
import type { Session } from "next-auth";
|
import type { Session } from "next-auth";
|
||||||
import type { JWT } from "next-auth/jwt";
|
import type { JWT } from "next-auth/jwt";
|
||||||
import { parseMaybeNonEmptyString } from "./utils";
|
import {
|
||||||
|
assertExistsAndNonEmptyString,
|
||||||
|
NonEmptyString,
|
||||||
|
parseMaybeNonEmptyString,
|
||||||
|
} from "./utils";
|
||||||
|
|
||||||
export interface JWTWithAccessToken extends JWT {
|
export interface JWTWithAccessToken extends JWT {
|
||||||
accessToken: string;
|
accessToken: string;
|
||||||
@@ -78,3 +82,10 @@ export const assertCustomSession = <T extends Session>(
|
|||||||
export type Mutable<T> = {
|
export type Mutable<T> = {
|
||||||
-readonly [P in keyof T]: T[P];
|
-readonly [P in keyof T]: T[P];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
export type MeetingId = NonEmptyString & { __type: "MeetingId" };
|
||||||
|
export const assertMeetingId = (s: string): MeetingId => {
|
||||||
|
const nes = assertExistsAndNonEmptyString(s);
|
||||||
|
// just cast for now
|
||||||
|
return nes as MeetingId;
|
||||||
|
};
|
||||||
|
|||||||
@@ -1,18 +1,22 @@
|
|||||||
"use client";
|
"use client";
|
||||||
|
|
||||||
import React, { createContext, useContext, useEffect, useState } from "react";
|
import React, { createContext, useContext, useEffect, useState } from "react";
|
||||||
|
import { MeetingId } from "./lib/types";
|
||||||
|
|
||||||
|
type ConsentMap = Map<MeetingId, boolean>;
|
||||||
|
|
||||||
type ConsentContextState =
|
type ConsentContextState =
|
||||||
| { ready: false }
|
| { ready: false }
|
||||||
| {
|
| {
|
||||||
ready: true;
|
ready: true;
|
||||||
consentAnsweredForMeetings: Set<string>;
|
consentForMeetings: ConsentMap;
|
||||||
};
|
};
|
||||||
|
|
||||||
interface RecordingConsentContextValue {
|
interface RecordingConsentContextValue {
|
||||||
state: ConsentContextState;
|
state: ConsentContextState;
|
||||||
touch: (meetingId: string) => void;
|
touch: (meetingId: MeetingId, accepted: boolean) => void;
|
||||||
hasConsent: (meetingId: string) => boolean;
|
hasAnswered: (meetingId: MeetingId) => boolean;
|
||||||
|
hasAccepted: (meetingId: MeetingId) => boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
const RecordingConsentContext = createContext<
|
const RecordingConsentContext = createContext<
|
||||||
@@ -35,81 +39,116 @@ interface RecordingConsentProviderProps {
|
|||||||
|
|
||||||
const LOCAL_STORAGE_KEY = "recording_consent_meetings";
|
const LOCAL_STORAGE_KEY = "recording_consent_meetings";
|
||||||
|
|
||||||
|
const ACCEPTED = "T" as const;
|
||||||
|
type Accepted = typeof ACCEPTED;
|
||||||
|
const REJECTED = "F" as const;
|
||||||
|
type Rejected = typeof REJECTED;
|
||||||
|
type Consent = Accepted | Rejected;
|
||||||
|
const SEPARATOR = "|" as const;
|
||||||
|
type Separator = typeof SEPARATOR;
|
||||||
|
const DEFAULT_CONSENT = ACCEPTED;
|
||||||
|
type Entry = `${MeetingId}${Separator}${Consent}`;
|
||||||
|
type EntryAndDefault = Entry | MeetingId;
|
||||||
|
|
||||||
|
// Format: "meetingId|T" or "meetingId|F", legacy format "meetingId" is treated as accepted
|
||||||
|
const encodeEntry = (meetingId: MeetingId, accepted: boolean): Entry =>
|
||||||
|
`${meetingId}|${accepted ? ACCEPTED : REJECTED}`;
|
||||||
|
|
||||||
|
const decodeEntry = (
|
||||||
|
entry: EntryAndDefault,
|
||||||
|
): { meetingId: MeetingId; accepted: boolean } | null => {
|
||||||
|
const pipeIndex = entry.lastIndexOf(SEPARATOR);
|
||||||
|
if (pipeIndex === -1) {
|
||||||
|
// Legacy format: no pipe means accepted (backward compat)
|
||||||
|
return { meetingId: entry as MeetingId, accepted: true };
|
||||||
|
}
|
||||||
|
const suffix = entry.slice(pipeIndex + 1);
|
||||||
|
const meetingId = entry.slice(0, pipeIndex) as MeetingId;
|
||||||
|
// T = accepted, F = rejected, anything else = accepted (safe default)
|
||||||
|
const accepted = suffix !== REJECTED;
|
||||||
|
return { meetingId, accepted };
|
||||||
|
};
|
||||||
|
|
||||||
export const RecordingConsentProvider: React.FC<
|
export const RecordingConsentProvider: React.FC<
|
||||||
RecordingConsentProviderProps
|
RecordingConsentProviderProps
|
||||||
> = ({ children }) => {
|
> = ({ children }) => {
|
||||||
const [state, setState] = useState<ConsentContextState>({ ready: false });
|
const [state, setState] = useState<ConsentContextState>({ ready: false });
|
||||||
|
|
||||||
const safeWriteToStorage = (meetingIds: string[]): void => {
|
const safeWriteToStorage = (consentMap: ConsentMap): void => {
|
||||||
try {
|
try {
|
||||||
if (typeof window !== "undefined" && window.localStorage) {
|
if (typeof window !== "undefined" && window.localStorage) {
|
||||||
localStorage.setItem(LOCAL_STORAGE_KEY, JSON.stringify(meetingIds));
|
const entries = Array.from(consentMap.entries())
|
||||||
|
.slice(-5)
|
||||||
|
.map(([id, accepted]) => encodeEntry(id, accepted));
|
||||||
|
localStorage.setItem(LOCAL_STORAGE_KEY, JSON.stringify(entries));
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error("Failed to save consent data to localStorage:", error);
|
console.error("Failed to save consent data to localStorage:", error);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// writes to local storage and to the state of context both
|
const touch = (meetingId: MeetingId, accepted: boolean): void => {
|
||||||
const touch = (meetingId: string): void => {
|
|
||||||
if (!state.ready) {
|
if (!state.ready) {
|
||||||
console.warn("Attempted to touch consent before context is ready");
|
console.warn("Attempted to touch consent before context is ready");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// has success regardless local storage write success: we don't handle that
|
const newMap = new Map(state.consentForMeetings);
|
||||||
// and don't want to crash anything with just consent functionality
|
newMap.set(meetingId, accepted);
|
||||||
const newSet = state.consentAnsweredForMeetings.has(meetingId)
|
safeWriteToStorage(newMap);
|
||||||
? state.consentAnsweredForMeetings
|
setState({ ready: true, consentForMeetings: newMap });
|
||||||
: new Set([...state.consentAnsweredForMeetings, meetingId]);
|
|
||||||
// note: preserves the set insertion order
|
|
||||||
const array = Array.from(newSet).slice(-5); // Keep latest 5
|
|
||||||
safeWriteToStorage(array);
|
|
||||||
setState({ ready: true, consentAnsweredForMeetings: newSet });
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const hasConsent = (meetingId: string): boolean => {
|
const hasAnswered = (meetingId: MeetingId): boolean => {
|
||||||
if (!state.ready) return false;
|
if (!state.ready) return false;
|
||||||
return state.consentAnsweredForMeetings.has(meetingId);
|
return state.consentForMeetings.has(meetingId);
|
||||||
|
};
|
||||||
|
|
||||||
|
const hasAccepted = (meetingId: MeetingId): boolean => {
|
||||||
|
if (!state.ready) return false;
|
||||||
|
return state.consentForMeetings.get(meetingId) === true;
|
||||||
};
|
};
|
||||||
|
|
||||||
// initialize on mount
|
// initialize on mount
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
try {
|
try {
|
||||||
if (typeof window === "undefined" || !window.localStorage) {
|
if (typeof window === "undefined" || !window.localStorage) {
|
||||||
setState({ ready: true, consentAnsweredForMeetings: new Set() });
|
setState({ ready: true, consentForMeetings: new Map() });
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const stored = localStorage.getItem(LOCAL_STORAGE_KEY);
|
const stored = localStorage.getItem(LOCAL_STORAGE_KEY);
|
||||||
if (!stored) {
|
if (!stored) {
|
||||||
setState({ ready: true, consentAnsweredForMeetings: new Set() });
|
setState({ ready: true, consentForMeetings: new Map() });
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const parsed = JSON.parse(stored);
|
const parsed = JSON.parse(stored);
|
||||||
if (!Array.isArray(parsed)) {
|
if (!Array.isArray(parsed)) {
|
||||||
console.warn("Invalid consent data format in localStorage, resetting");
|
console.warn("Invalid consent data format in localStorage, resetting");
|
||||||
setState({ ready: true, consentAnsweredForMeetings: new Set() });
|
setState({ ready: true, consentForMeetings: new Map() });
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pre-historic way of parsing!
|
const consentForMeetings = new Map<MeetingId, boolean>();
|
||||||
const consentAnsweredForMeetings = new Set(
|
for (const entry of parsed) {
|
||||||
parsed.filter((id) => !!id && typeof id === "string"),
|
const decoded = decodeEntry(entry);
|
||||||
);
|
if (decoded) {
|
||||||
setState({ ready: true, consentAnsweredForMeetings });
|
consentForMeetings.set(decoded.meetingId, decoded.accepted);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
setState({ ready: true, consentForMeetings });
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
// we don't want to fail the page here; the component is not essential.
|
|
||||||
console.error("Failed to parse consent data from localStorage:", error);
|
console.error("Failed to parse consent data from localStorage:", error);
|
||||||
setState({ ready: true, consentAnsweredForMeetings: new Set() });
|
setState({ ready: true, consentForMeetings: new Map() });
|
||||||
}
|
}
|
||||||
}, []);
|
}, []);
|
||||||
|
|
||||||
const value: RecordingConsentContextValue = {
|
const value: RecordingConsentContextValue = {
|
||||||
state,
|
state,
|
||||||
touch,
|
touch,
|
||||||
hasConsent,
|
hasAnswered,
|
||||||
|
hasAccepted,
|
||||||
};
|
};
|
||||||
|
|
||||||
return (
|
return (
|
||||||
|
|||||||
64
www/app/reflector-api.d.ts
vendored
64
www/app/reflector-api.d.ts
vendored
@@ -893,8 +893,16 @@ export interface components {
|
|||||||
* @default false
|
* @default false
|
||||||
*/
|
*/
|
||||||
ics_enabled: boolean;
|
ics_enabled: boolean;
|
||||||
/** Platform */
|
/**
|
||||||
platform?: ("whereby" | "daily") | null;
|
* Platform
|
||||||
|
* @enum {string}
|
||||||
|
*/
|
||||||
|
platform: "whereby" | "daily";
|
||||||
|
/**
|
||||||
|
* Skip Consent
|
||||||
|
* @default false
|
||||||
|
*/
|
||||||
|
skip_consent: boolean;
|
||||||
};
|
};
|
||||||
/** CreateRoomMeeting */
|
/** CreateRoomMeeting */
|
||||||
CreateRoomMeeting: {
|
CreateRoomMeeting: {
|
||||||
@@ -1123,7 +1131,9 @@ export interface components {
|
|||||||
/** Audio Deleted */
|
/** Audio Deleted */
|
||||||
audio_deleted?: boolean | null;
|
audio_deleted?: boolean | null;
|
||||||
/** Participants */
|
/** Participants */
|
||||||
participants: components["schemas"]["TranscriptParticipant"][] | null;
|
participants:
|
||||||
|
| components["schemas"]["TranscriptParticipantWithEmail"][]
|
||||||
|
| null;
|
||||||
/**
|
/**
|
||||||
* @description discriminator enum property added by openapi-typescript
|
* @description discriminator enum property added by openapi-typescript
|
||||||
* @enum {string}
|
* @enum {string}
|
||||||
@@ -1184,7 +1194,9 @@ export interface components {
|
|||||||
/** Audio Deleted */
|
/** Audio Deleted */
|
||||||
audio_deleted?: boolean | null;
|
audio_deleted?: boolean | null;
|
||||||
/** Participants */
|
/** Participants */
|
||||||
participants: components["schemas"]["TranscriptParticipant"][] | null;
|
participants:
|
||||||
|
| components["schemas"]["TranscriptParticipantWithEmail"][]
|
||||||
|
| null;
|
||||||
};
|
};
|
||||||
/**
|
/**
|
||||||
* GetTranscriptWithText
|
* GetTranscriptWithText
|
||||||
@@ -1246,7 +1258,9 @@ export interface components {
|
|||||||
/** Audio Deleted */
|
/** Audio Deleted */
|
||||||
audio_deleted?: boolean | null;
|
audio_deleted?: boolean | null;
|
||||||
/** Participants */
|
/** Participants */
|
||||||
participants: components["schemas"]["TranscriptParticipant"][] | null;
|
participants:
|
||||||
|
| components["schemas"]["TranscriptParticipantWithEmail"][]
|
||||||
|
| null;
|
||||||
/**
|
/**
|
||||||
* @description discriminator enum property added by openapi-typescript
|
* @description discriminator enum property added by openapi-typescript
|
||||||
* @enum {string}
|
* @enum {string}
|
||||||
@@ -1315,7 +1329,9 @@ export interface components {
|
|||||||
/** Audio Deleted */
|
/** Audio Deleted */
|
||||||
audio_deleted?: boolean | null;
|
audio_deleted?: boolean | null;
|
||||||
/** Participants */
|
/** Participants */
|
||||||
participants: components["schemas"]["TranscriptParticipant"][] | null;
|
participants:
|
||||||
|
| components["schemas"]["TranscriptParticipantWithEmail"][]
|
||||||
|
| null;
|
||||||
/**
|
/**
|
||||||
* @description discriminator enum property added by openapi-typescript
|
* @description discriminator enum property added by openapi-typescript
|
||||||
* @enum {string}
|
* @enum {string}
|
||||||
@@ -1386,7 +1402,9 @@ export interface components {
|
|||||||
/** Audio Deleted */
|
/** Audio Deleted */
|
||||||
audio_deleted?: boolean | null;
|
audio_deleted?: boolean | null;
|
||||||
/** Participants */
|
/** Participants */
|
||||||
participants: components["schemas"]["TranscriptParticipant"][] | null;
|
participants:
|
||||||
|
| components["schemas"]["TranscriptParticipantWithEmail"][]
|
||||||
|
| null;
|
||||||
/**
|
/**
|
||||||
* @description discriminator enum property added by openapi-typescript
|
* @description discriminator enum property added by openapi-typescript
|
||||||
* @enum {string}
|
* @enum {string}
|
||||||
@@ -1567,6 +1585,11 @@ export interface components {
|
|||||||
/** Name */
|
/** Name */
|
||||||
name: string;
|
name: string;
|
||||||
};
|
};
|
||||||
|
/** ProcessStatus */
|
||||||
|
ProcessStatus: {
|
||||||
|
/** Status */
|
||||||
|
status: string;
|
||||||
|
};
|
||||||
/** Room */
|
/** Room */
|
||||||
Room: {
|
Room: {
|
||||||
/** Id */
|
/** Id */
|
||||||
@@ -1617,6 +1640,11 @@ export interface components {
|
|||||||
* @enum {string}
|
* @enum {string}
|
||||||
*/
|
*/
|
||||||
platform: "whereby" | "daily";
|
platform: "whereby" | "daily";
|
||||||
|
/**
|
||||||
|
* Skip Consent
|
||||||
|
* @default false
|
||||||
|
*/
|
||||||
|
skip_consent: boolean;
|
||||||
};
|
};
|
||||||
/** RoomDetails */
|
/** RoomDetails */
|
||||||
RoomDetails: {
|
RoomDetails: {
|
||||||
@@ -1668,6 +1696,11 @@ export interface components {
|
|||||||
* @enum {string}
|
* @enum {string}
|
||||||
*/
|
*/
|
||||||
platform: "whereby" | "daily";
|
platform: "whereby" | "daily";
|
||||||
|
/**
|
||||||
|
* Skip Consent
|
||||||
|
* @default false
|
||||||
|
*/
|
||||||
|
skip_consent: boolean;
|
||||||
/** Webhook Url */
|
/** Webhook Url */
|
||||||
webhook_url: string | null;
|
webhook_url: string | null;
|
||||||
/** Webhook Secret */
|
/** Webhook Secret */
|
||||||
@@ -1813,6 +1846,19 @@ export interface components {
|
|||||||
/** User Id */
|
/** User Id */
|
||||||
user_id?: string | null;
|
user_id?: string | null;
|
||||||
};
|
};
|
||||||
|
/** TranscriptParticipantWithEmail */
|
||||||
|
TranscriptParticipantWithEmail: {
|
||||||
|
/** Id */
|
||||||
|
id?: string;
|
||||||
|
/** Speaker */
|
||||||
|
speaker: number | null;
|
||||||
|
/** Name */
|
||||||
|
name: string;
|
||||||
|
/** User Id */
|
||||||
|
user_id?: string | null;
|
||||||
|
/** Email */
|
||||||
|
email?: string | null;
|
||||||
|
};
|
||||||
/**
|
/**
|
||||||
* TranscriptSegment
|
* TranscriptSegment
|
||||||
* @description A single transcript segment with speaker and timing information.
|
* @description A single transcript segment with speaker and timing information.
|
||||||
@@ -1868,6 +1914,8 @@ export interface components {
|
|||||||
ics_enabled?: boolean | null;
|
ics_enabled?: boolean | null;
|
||||||
/** Platform */
|
/** Platform */
|
||||||
platform?: ("whereby" | "daily") | null;
|
platform?: ("whereby" | "daily") | null;
|
||||||
|
/** Skip Consent */
|
||||||
|
skip_consent?: boolean | null;
|
||||||
};
|
};
|
||||||
/** UpdateTranscript */
|
/** UpdateTranscript */
|
||||||
UpdateTranscript: {
|
UpdateTranscript: {
|
||||||
@@ -3362,7 +3410,7 @@ export interface operations {
|
|||||||
[name: string]: unknown;
|
[name: string]: unknown;
|
||||||
};
|
};
|
||||||
content: {
|
content: {
|
||||||
"application/json": unknown;
|
"application/json": components["schemas"]["ProcessStatus"];
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
/** @description Validation Error */
|
/** @description Validation Error */
|
||||||
|
|||||||
3
www/public/recording-icon.svg
Normal file
3
www/public/recording-icon.svg
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24" fill="#ef4444" stroke="none">
|
||||||
|
<circle cx="12" cy="12" r="8"/>
|
||||||
|
</svg>
|
||||||
|
After Width: | Height: | Size: 131 B |
Reference in New Issue
Block a user