From b20cad76e69fb6a76405af299a005f1ddcf60eae Mon Sep 17 00:00:00 2001 From: Igor Monadical Date: Fri, 14 Nov 2025 14:31:52 -0500 Subject: [PATCH] feat: daily QOL: participants dictionary (#721) * daily QOL: participants dictionary * meeting deactivation fix * meeting deactivation fix --------- Co-authored-by: Igor Loskutov --- ...aa_add_daily_participant_session_table_.py | 79 ++++++++ server/reflector/db/__init__.py | 1 + .../db/daily_participant_sessions.py | 169 ++++++++++++++++++ server/reflector/video_platforms/base.py | 7 +- server/reflector/video_platforms/daily.py | 93 ++++++++-- server/reflector/video_platforms/models.py | 28 ++- server/reflector/video_platforms/whereby.py | 48 ++++- server/reflector/views/daily.py | 154 ++++++++++++++-- server/reflector/worker/ics_sync.py | 2 +- server/reflector/worker/process.py | 12 +- server/scripts/list_daily_webhooks.py | 91 ++++++++++ server/tests/mocks/mock_platform.py | 24 ++- server/tests/test_transcripts_process.py | 111 ++++++++++++ 13 files changed, 759 insertions(+), 60 deletions(-) create mode 100644 server/migrations/versions/2b92a1b03caa_add_daily_participant_session_table_.py create mode 100644 server/reflector/db/daily_participant_sessions.py create mode 100755 server/scripts/list_daily_webhooks.py diff --git a/server/migrations/versions/2b92a1b03caa_add_daily_participant_session_table_.py b/server/migrations/versions/2b92a1b03caa_add_daily_participant_session_table_.py new file mode 100644 index 00000000..90c3e94e --- /dev/null +++ b/server/migrations/versions/2b92a1b03caa_add_daily_participant_session_table_.py @@ -0,0 +1,79 @@ +"""add daily participant session table with immutable left_at + +Revision ID: 2b92a1b03caa +Revises: f8294b31f022 +Create Date: 2025-11-13 20:29:30.486577 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "2b92a1b03caa" +down_revision: Union[str, None] = "f8294b31f022" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # Create table + op.create_table( + "daily_participant_session", + sa.Column("id", sa.String(), nullable=False), + sa.Column("meeting_id", sa.String(), nullable=False), + sa.Column("room_id", sa.String(), nullable=False), + sa.Column("session_id", sa.String(), nullable=False), + sa.Column("user_id", sa.String(), nullable=True), + sa.Column("user_name", sa.String(), nullable=False), + sa.Column("joined_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("left_at", sa.DateTime(timezone=True), nullable=True), + sa.ForeignKeyConstraint(["meeting_id"], ["meeting.id"], ondelete="CASCADE"), + sa.ForeignKeyConstraint(["room_id"], ["room.id"], ondelete="CASCADE"), + sa.PrimaryKeyConstraint("id"), + ) + with op.batch_alter_table("daily_participant_session", schema=None) as batch_op: + batch_op.create_index( + "idx_daily_session_meeting_left", ["meeting_id", "left_at"], unique=False + ) + batch_op.create_index("idx_daily_session_room", ["room_id"], unique=False) + + # Create trigger function to prevent left_at from being updated once set + op.execute(""" + CREATE OR REPLACE FUNCTION prevent_left_at_update() + RETURNS TRIGGER AS $$ + BEGIN + IF OLD.left_at IS NOT NULL THEN + RAISE EXCEPTION 'left_at is immutable once set'; + END IF; + RETURN NEW; + END; + $$ LANGUAGE plpgsql; + """) + + # Create trigger + op.execute(""" + CREATE TRIGGER prevent_left_at_update_trigger + BEFORE UPDATE ON daily_participant_session + FOR EACH ROW + EXECUTE FUNCTION prevent_left_at_update(); + """) + + +def downgrade() -> None: + # Drop trigger + op.execute( + "DROP TRIGGER IF EXISTS prevent_left_at_update_trigger ON daily_participant_session;" + ) + + # Drop trigger function + op.execute("DROP FUNCTION IF EXISTS prevent_left_at_update();") + + # Drop indexes and table + with op.batch_alter_table("daily_participant_session", schema=None) as batch_op: + batch_op.drop_index("idx_daily_session_room") + batch_op.drop_index("idx_daily_session_meeting_left") + + op.drop_table("daily_participant_session") diff --git a/server/reflector/db/__init__.py b/server/reflector/db/__init__.py index 8822e6b0..91ed12ee 100644 --- a/server/reflector/db/__init__.py +++ b/server/reflector/db/__init__.py @@ -25,6 +25,7 @@ def get_database() -> databases.Database: # import models import reflector.db.calendar_events # noqa +import reflector.db.daily_participant_sessions # noqa import reflector.db.meetings # noqa import reflector.db.recordings # noqa import reflector.db.rooms # noqa diff --git a/server/reflector/db/daily_participant_sessions.py b/server/reflector/db/daily_participant_sessions.py new file mode 100644 index 00000000..5fac1912 --- /dev/null +++ b/server/reflector/db/daily_participant_sessions.py @@ -0,0 +1,169 @@ +"""Daily.co participant session tracking. + +Stores webhook data for participant.joined and participant.left events to provide +historical session information (Daily.co API only returns current participants). +""" + +from datetime import datetime + +import sqlalchemy as sa +from pydantic import BaseModel +from sqlalchemy.dialects.postgresql import insert + +from reflector.db import get_database, metadata +from reflector.utils.string import NonEmptyString + +daily_participant_sessions = sa.Table( + "daily_participant_session", + metadata, + sa.Column("id", sa.String, primary_key=True), + sa.Column( + "meeting_id", + sa.String, + sa.ForeignKey("meeting.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column( + "room_id", + sa.String, + sa.ForeignKey("room.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column("session_id", sa.String, nullable=False), + sa.Column("user_id", sa.String, nullable=True), + sa.Column("user_name", sa.String, nullable=False), + sa.Column("joined_at", sa.DateTime(timezone=True), nullable=False), + sa.Column("left_at", sa.DateTime(timezone=True), nullable=True), + sa.Index("idx_daily_session_meeting_left", "meeting_id", "left_at"), + sa.Index("idx_daily_session_room", "room_id"), +) + + +class DailyParticipantSession(BaseModel): + """Daily.co participant session record. + + Tracks when a participant joined and left a meeting. Populated from webhooks: + - participant.joined: Creates record with left_at=None + - participant.left: Updates record with left_at + + ID format: {meeting_id}:{user_id}:{joined_at_ms} + - Ensures idempotency (duplicate webhooks don't create duplicates) + - Allows same user to rejoin (different joined_at = different session) + + Duration is calculated as: left_at - joined_at (not stored) + """ + + id: NonEmptyString + meeting_id: NonEmptyString + room_id: NonEmptyString + session_id: NonEmptyString # Daily.co's session_id (identifies room session) + user_id: NonEmptyString | None = None + user_name: str + joined_at: datetime + left_at: datetime | None = None + + +class DailyParticipantSessionController: + """Controller for Daily.co participant session persistence.""" + + async def get_by_id(self, id: str) -> DailyParticipantSession | None: + """Get a session by its ID.""" + query = daily_participant_sessions.select().where( + daily_participant_sessions.c.id == id + ) + result = await get_database().fetch_one(query) + return DailyParticipantSession(**result) if result else None + + async def get_open_session( + self, meeting_id: NonEmptyString, session_id: NonEmptyString + ) -> DailyParticipantSession | None: + """Get the open (not left) session for a user in a meeting.""" + query = daily_participant_sessions.select().where( + sa.and_( + daily_participant_sessions.c.meeting_id == meeting_id, + daily_participant_sessions.c.session_id == session_id, + daily_participant_sessions.c.left_at.is_(None), + ) + ) + results = await get_database().fetch_all(query) + + if len(results) > 1: + raise ValueError( + f"Multiple open sessions for daily session {session_id} in meeting {meeting_id}: " + f"found {len(results)} sessions" + ) + + return DailyParticipantSession(**results[0]) if results else None + + async def upsert_joined(self, session: DailyParticipantSession) -> None: + """Insert or update when participant.joined webhook arrives. + + Idempotent: Duplicate webhooks with same ID are safely ignored. + Out-of-order: If left webhook arrived first, preserves left_at. + """ + query = insert(daily_participant_sessions).values(**session.model_dump()) + query = query.on_conflict_do_update( + index_elements=["id"], + set_={"user_name": session.user_name}, + ) + await get_database().execute(query) + + async def upsert_left(self, session: DailyParticipantSession) -> None: + """Update session when participant.left webhook arrives. + + Finds the open session for this user in this meeting and updates left_at. + Works around Daily.co webhook timestamp inconsistency (joined_at differs by ~4ms between webhooks). + + Handles three cases: + 1. Normal flow: open session exists → updates left_at + 2. Out-of-order: left arrives first → creates new record with left data + 3. Duplicate: left arrives again → idempotent (DB trigger prevents left_at modification) + """ + if session.left_at is None: + raise ValueError("left_at is required for upsert_left") + + if session.left_at <= session.joined_at: + raise ValueError( + f"left_at ({session.left_at}) must be after joined_at ({session.joined_at})" + ) + + # Find existing open session (works around timestamp mismatch in webhooks) + existing = await self.get_open_session(session.meeting_id, session.session_id) + + if existing: + # Update existing open session + query = ( + daily_participant_sessions.update() + .where(daily_participant_sessions.c.id == existing.id) + .values(left_at=session.left_at) + ) + await get_database().execute(query) + else: + # Out-of-order or first webhook: insert new record + query = insert(daily_participant_sessions).values(**session.model_dump()) + query = query.on_conflict_do_nothing(index_elements=["id"]) + await get_database().execute(query) + + async def get_by_meeting(self, meeting_id: str) -> list[DailyParticipantSession]: + """Get all participant sessions for a meeting (active and ended).""" + query = daily_participant_sessions.select().where( + daily_participant_sessions.c.meeting_id == meeting_id + ) + results = await get_database().fetch_all(query) + return [DailyParticipantSession(**result) for result in results] + + async def get_active_by_meeting( + self, meeting_id: str + ) -> list[DailyParticipantSession]: + """Get only active (not left) participant sessions for a meeting.""" + query = daily_participant_sessions.select().where( + sa.and_( + daily_participant_sessions.c.meeting_id == meeting_id, + daily_participant_sessions.c.left_at.is_(None), + ) + ) + results = await get_database().fetch_all(query) + return [DailyParticipantSession(**result) for result in results] + + +daily_participant_sessions_controller = DailyParticipantSessionController() diff --git a/server/reflector/video_platforms/base.py b/server/reflector/video_platforms/base.py index d208a75a..877114f7 100644 --- a/server/reflector/video_platforms/base.py +++ b/server/reflector/video_platforms/base.py @@ -1,10 +1,10 @@ from abc import ABC, abstractmethod from datetime import datetime -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional from ..schemas.platform import Platform from ..utils.string import NonEmptyString -from .models import MeetingData, VideoPlatformConfig +from .models import MeetingData, SessionData, VideoPlatformConfig if TYPE_CHECKING: from reflector.db.rooms import Room @@ -26,7 +26,8 @@ class VideoPlatformClient(ABC): pass @abstractmethod - async def get_room_sessions(self, room_name: str) -> List[Any] | None: + async def get_room_sessions(self, room_name: str) -> list[SessionData]: + """Get session history for a room.""" pass @abstractmethod diff --git a/server/reflector/video_platforms/daily.py b/server/reflector/video_platforms/daily.py index ec45d965..7bec4864 100644 --- a/server/reflector/video_platforms/daily.py +++ b/server/reflector/video_platforms/daily.py @@ -3,10 +3,13 @@ import hmac from datetime import datetime from hashlib import sha256 from http import HTTPStatus -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Optional import httpx +from reflector.db.daily_participant_sessions import ( + daily_participant_sessions_controller, +) from reflector.db.rooms import Room from reflector.logger import logger from reflector.storage import get_dailyco_storage @@ -15,7 +18,7 @@ from ..schemas.platform import Platform from ..utils.daily import DailyRoomName from ..utils.string import NonEmptyString from .base import ROOM_PREFIX_SEPARATOR, VideoPlatformClient -from .models import MeetingData, RecordingType, VideoPlatformConfig +from .models import MeetingData, RecordingType, SessionData, VideoPlatformConfig class DailyClient(VideoPlatformClient): @@ -61,16 +64,16 @@ class DailyClient(VideoPlatformClient): }, } - # Get storage config for passing to Daily API - daily_storage = get_dailyco_storage() - assert daily_storage.bucket_name, "S3 bucket must be configured" - data["properties"]["recordings_bucket"] = { - "bucket_name": daily_storage.bucket_name, - "bucket_region": daily_storage.region, - "assume_role_arn": daily_storage.role_credential, - "allow_api_access": True, - } - + # Only configure recordings_bucket if recording is enabled + if room.recording_type != self.RECORDING_NONE: + daily_storage = get_dailyco_storage() + assert daily_storage.bucket_name, "S3 bucket must be configured" + data["properties"]["recordings_bucket"] = { + "bucket_name": daily_storage.bucket_name, + "bucket_region": daily_storage.region, + "assume_role_arn": daily_storage.role_credential, + "allow_api_access": True, + } async with httpx.AsyncClient() as client: response = await client.post( f"{self.BASE_URL}/rooms", @@ -99,11 +102,49 @@ class DailyClient(VideoPlatformClient): extra_data=result, ) - async def get_room_sessions(self, room_name: str) -> List[Any] | None: - # no such api - return None + async def get_room_sessions(self, room_name: str) -> list[SessionData]: + """Get room session history from database (webhook-stored sessions). + + Daily.co doesn't provide historical session API, so we query our database + where participant.joined/left webhooks are stored. + """ + from reflector.db.meetings import meetings_controller + + meeting = await meetings_controller.get_by_room_name(room_name) + if not meeting: + return [] + + sessions = await daily_participant_sessions_controller.get_by_meeting( + meeting.id + ) + + return [ + SessionData( + session_id=s.id, + started_at=s.joined_at, + ended_at=s.left_at, + ) + for s in sessions + ] async def get_room_presence(self, room_name: str) -> Dict[str, Any]: + """Get room presence/session data for a Daily.co room. + + Example response: + { + "total_count": 1, + "data": [ + { + "room": "w2pp2cf4kltgFACPKXmX", + "id": "d61cd7b2-a273-42b4-89bd-be763fd562c1", + "userId": "pbZ+ismP7dk=", + "userName": "Moishe", + "joinTime": "2023-01-01T20:53:19.000Z", + "duration": 2312 + } + ] + } + """ async with httpx.AsyncClient() as client: response = await client.get( f"{self.BASE_URL}/rooms/{room_name}/presence", @@ -114,6 +155,28 @@ class DailyClient(VideoPlatformClient): return response.json() async def get_meeting_participants(self, meeting_id: str) -> Dict[str, Any]: + """Get participant data for a specific Daily.co meeting. + + Example response: + { + "data": [ + { + "user_id": "4q47OTmqa/w=", + "participant_id": "d61cd7b2-a273-42b4-89bd-be763fd562c1", + "user_name": "Lindsey", + "join_time": 1672786813, + "duration": 150 + }, + { + "user_id": "pbZ+ismP7dk=", + "participant_id": "b3d56359-14d7-46af-ac8b-18f8c991f5f6", + "user_name": "Moishe", + "join_time": 1672786797, + "duration": 165 + } + ] + } + """ async with httpx.AsyncClient() as client: response = await client.get( f"{self.BASE_URL}/meetings/{meeting_id}/participants", diff --git a/server/reflector/video_platforms/models.py b/server/reflector/video_platforms/models.py index 82876888..648da251 100644 --- a/server/reflector/video_platforms/models.py +++ b/server/reflector/video_platforms/models.py @@ -1,18 +1,38 @@ +from datetime import datetime from typing import Any, Dict, Literal, Optional from pydantic import BaseModel, Field from reflector.schemas.platform import WHEREBY_PLATFORM, Platform +from reflector.utils.string import NonEmptyString RecordingType = Literal["none", "local", "cloud"] +class SessionData(BaseModel): + """Platform-agnostic session data. + + Represents a participant session in a meeting room, regardless of platform. + Used to determine if a meeting is still active or has ended. + """ + + session_id: NonEmptyString = Field(description="Unique session identifier") + started_at: datetime = Field(description="When session started (UTC)") + ended_at: datetime | None = Field( + description="When session ended (UTC), None if still active" + ) + + class MeetingData(BaseModel): platform: Platform - meeting_id: str = Field(description="Platform-specific meeting identifier") - room_url: str = Field(description="URL for participants to join") - host_room_url: str = Field(description="URL for hosts (may be same as room_url)") - room_name: str = Field(description="Human-readable room name") + meeting_id: NonEmptyString = Field( + description="Platform-specific meeting identifier" + ) + room_url: NonEmptyString = Field(description="URL for participants to join") + host_room_url: NonEmptyString = Field( + description="URL for hosts (may be same as room_url)" + ) + room_name: NonEmptyString = Field(description="Human-readable room name") extra_data: Dict[str, Any] = Field(default_factory=dict) class Config: diff --git a/server/reflector/video_platforms/whereby.py b/server/reflector/video_platforms/whereby.py index f856454a..f4775e89 100644 --- a/server/reflector/video_platforms/whereby.py +++ b/server/reflector/video_platforms/whereby.py @@ -4,7 +4,7 @@ import re import time from datetime import datetime from hashlib import sha256 -from typing import Any, Dict, Optional +from typing import Optional import httpx @@ -13,11 +13,8 @@ from reflector.storage import get_whereby_storage from ..schemas.platform import WHEREBY_PLATFORM, Platform from ..utils.string import NonEmptyString -from .base import ( - MeetingData, - VideoPlatformClient, - VideoPlatformConfig, -) +from .base import VideoPlatformClient +from .models import MeetingData, SessionData, VideoPlatformConfig from .whereby_utils import whereby_room_name_prefix @@ -80,15 +77,50 @@ class WherebyClient(VideoPlatformClient): extra_data=result, ) - async def get_room_sessions(self, room_name: str) -> Dict[str, Any]: + async def get_room_sessions(self, room_name: str) -> list[SessionData]: + """Get room session history from Whereby API. + + Whereby API returns: [{"sessionId": "...", "startedAt": "...", "endedAt": "..." | null}, ...] + """ async with httpx.AsyncClient() as client: + """ + { + "cursor": "text", + "results": [ + { + "roomSessionId": "e2f29530-46ec-4cee-8b27-e565cb5bb2e9", + "roomName": "/room-prefix-793e9ec1-c686-423d-9043-9b7a10c553fd", + "startedAt": "2025-01-01T00:00:00.000Z", + "endedAt": "2025-01-01T01:00:00.000Z", + "totalParticipantMinutes": 124, + "totalRecorderMinutes": 120, + "totalStreamerMinutes": 120, + "totalUniqueParticipants": 4, + "totalUniqueRecorders": 3, + "totalUniqueStreamers": 2 + } + ] + }""" response = await client.get( f"{self.config.api_url}/insights/room-sessions?roomName={room_name}", headers=self.headers, timeout=self.TIMEOUT, ) response.raise_for_status() - return response.json().get("results", []) + results = response.json().get("results", []) + + return [ + SessionData( + session_id=s["roomSessionId"], + started_at=datetime.fromisoformat( + s["startedAt"].replace("Z", "+00:00") + ), + ended_at=datetime.fromisoformat(s["endedAt"].replace("Z", "+00:00")) + if s.get("endedAt") + else None, + ) + for s in results + ] async def delete_room(self, room_name: str) -> bool: return True diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index 6f51cd1e..baad97e9 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -1,9 +1,15 @@ import json +from datetime import datetime, timezone from typing import Any, Dict, Literal from fastapi import APIRouter, HTTPException, Request from pydantic import BaseModel +from reflector.db import get_database +from reflector.db.daily_participant_sessions import ( + DailyParticipantSession, + daily_participant_sessions_controller, +) from reflector.db.meetings import meetings_controller from reflector.logger import logger as _logger from reflector.settings import settings @@ -44,6 +50,24 @@ def _extract_room_name(event: DailyWebhookEvent) -> DailyRoomName | None: async def webhook(request: Request): """Handle Daily webhook events. + Example webhook payload: + { + "version": "1.0.0", + "type": "recording.ready-to-download", + "id": "rec-rtd-c3df927c-f738-4471-a2b7-066fa7e95a6b-1692124192", + "payload": { + "recording_id": "08fa0b24-9220-44c5-846c-3f116cf8e738", + "room_name": "Xcm97xRZ08b2dePKb78g", + "start_ts": 1692124183, + "status": "finished", + "max_participants": 1, + "duration": 9, + "share_token": "ntDCL5k98Ulq", #gitleaks:allow + "s3_key": "api-test-1j8fizhzd30c/Xcm97xRZ08b2dePKb78g/1692124183028" + }, + "event_ts": 1692124192 + } + Daily.co circuit-breaker: After 3+ failed responses (4xx/5xx), webhook state→FAILED, stops sending events. Reset: scripts/recreate_daily_webhook.py """ @@ -103,6 +127,32 @@ async def webhook(request: Request): return {"status": "ok"} +""" +{ + "version": "1.0.0", + "type": "participant.joined", + "id": "ptcpt-join-6497c79b-f326-4942-aef8-c36a29140ad1-1708972279961", + "payload": { + "room": "test", + "user_id": "6497c79b-f326-4942-aef8-c36a29140ad1", + "user_name": "testuser", + "session_id": "0c0d2dda-f21d-4cf9-ab56-86bf3c407ffa", + "joined_at": 1708972279.96, + "will_eject_at": 1708972299.541, + "owner": false, + "permissions": { + "hasPresence": true, + "canSend": true, + "canReceive": { "base": true }, + "canAdmin": false + } + }, + "event_ts": 1708972279.961 +} + +""" + + async def _handle_participant_joined(event: DailyWebhookEvent): daily_room_name = _extract_room_name(event) if not daily_room_name: @@ -110,29 +160,111 @@ async def _handle_participant_joined(event: DailyWebhookEvent): return meeting = await meetings_controller.get_by_room_name(daily_room_name) - if meeting: - await meetings_controller.increment_num_clients(meeting.id) - logger.info( - "Participant joined", - meeting_id=meeting.id, - room_name=daily_room_name, - recording_type=meeting.recording_type, - recording_trigger=meeting.recording_trigger, - ) - else: + if not meeting: logger.warning( "participant.joined: meeting not found", room_name=daily_room_name ) + return + + payload = event.payload + logger.warning({"payload": payload}) + joined_at = datetime.fromtimestamp(payload["joined_at"], tz=timezone.utc) + session_id = f"{meeting.id}:{payload['session_id']}" + + session = DailyParticipantSession( + id=session_id, + meeting_id=meeting.id, + room_id=meeting.room_id, + session_id=payload["session_id"], + user_id=payload.get("user_id", None), + user_name=payload["user_name"], + joined_at=joined_at, + left_at=None, + ) + + # num_clients serves as a projection/cache of active session count for Daily.co + # Both operations must succeed or fail together to maintain consistency + async with get_database().transaction(): + await meetings_controller.increment_num_clients(meeting.id) + await daily_participant_sessions_controller.upsert_joined(session) + + logger.info( + "Participant joined", + meeting_id=meeting.id, + room_name=daily_room_name, + user_id=payload.get("user_id", None), + user_name=payload.get("user_name"), + session_id=session_id, + ) + + +""" +{ + "version": "1.0.0", + "type": "participant.left", + "id": "ptcpt-left-16168c97-f973-4eae-9642-020fe3fda5db-1708972302986", + "payload": { + "room": "test", + "user_id": "16168c97-f973-4eae-9642-020fe3fda5db", + "user_name": "bipol", + "session_id": "0c0d2dda-f21d-4cf9-ab56-86bf3c407ffa", + "joined_at": 1708972291.567, + "will_eject_at": null, + "owner": false, + "permissions": { + "hasPresence": true, + "canSend": true, + "canReceive": { "base": true }, + "canAdmin": false + }, + "duration": 11.419000148773193 + }, + "event_ts": 1708972302.986 +} +""" async def _handle_participant_left(event: DailyWebhookEvent): room_name = _extract_room_name(event) if not room_name: + logger.warning("participant.left: no room in payload", payload=event.payload) return meeting = await meetings_controller.get_by_room_name(room_name) - if meeting: + if not meeting: + logger.warning("participant.left: meeting not found", room_name=room_name) + return + + payload = event.payload + joined_at = datetime.fromtimestamp(payload["joined_at"], tz=timezone.utc) + left_at = datetime.fromtimestamp(event.event_ts, tz=timezone.utc) + session_id = f"{meeting.id}:{payload['session_id']}" + + session = DailyParticipantSession( + id=session_id, + meeting_id=meeting.id, + room_id=meeting.room_id, + session_id=payload["session_id"], + user_id=payload.get("user_id", None), + user_name=payload["user_name"], + joined_at=joined_at, + left_at=left_at, + ) + + # num_clients serves as a projection/cache of active session count for Daily.co + # Both operations must succeed or fail together to maintain consistency + async with get_database().transaction(): await meetings_controller.decrement_num_clients(meeting.id) + await daily_participant_sessions_controller.upsert_left(session) + + logger.info( + "Participant left", + meeting_id=meeting.id, + room_name=room_name, + user_id=payload.get("user_id", None), + duration=payload.get("duration"), + session_id=session_id, + ) async def _handle_recording_started(event: DailyWebhookEvent): diff --git a/server/reflector/worker/ics_sync.py b/server/reflector/worker/ics_sync.py index 4d72d4ae..6881dfa2 100644 --- a/server/reflector/worker/ics_sync.py +++ b/server/reflector/worker/ics_sync.py @@ -107,7 +107,7 @@ async def create_upcoming_meetings_for_event(event, create_window, room: Room): client = create_platform_client(get_platform(room.platform)) meeting_data = await client.create_meeting( - "", + room.name, end_date=end_date, room=room, ) diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 47cbb1cb..dd9c1059 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -335,15 +335,15 @@ async def process_meetings(): Uses distributed locking to prevent race conditions when multiple workers process the same meeting simultaneously. """ - logger.debug("Processing meetings") meetings = await meetings_controller.get_all_active() + logger.info(f"Processing {len(meetings)} meetings") current_time = datetime.now(timezone.utc) redis_client = get_redis_client() processed_count = 0 skipped_count = 0 - for meeting in meetings: logger_ = logger.bind(meeting_id=meeting.id, room_name=meeting.room_name) + logger_.info("Processing meeting") lock_key = f"meeting_process_lock:{meeting.id}" lock = redis_client.lock(lock_key, timeout=120) @@ -359,21 +359,23 @@ async def process_meetings(): if end_date.tzinfo is None: end_date = end_date.replace(tzinfo=timezone.utc) - # This API call could be slow, extend lock if needed client = create_platform_client(meeting.platform) room_sessions = await client.get_room_sessions(meeting.room_name) try: - # Extend lock after slow operation to ensure we still hold it + # Extend lock after operation to ensure we still hold it lock.extend(120, replace_ttl=True) except LockError: logger_.warning("Lost lock for meeting, skipping") continue has_active_sessions = room_sessions and any( - rs["endedAt"] is None for rs in room_sessions + s.ended_at is None for s in room_sessions ) has_had_sessions = bool(room_sessions) + logger_.info( + f"found {has_active_sessions} active sessions, had {has_had_sessions}" + ) if has_active_sessions: logger_.debug("Meeting still has active sessions, keep it") diff --git a/server/scripts/list_daily_webhooks.py b/server/scripts/list_daily_webhooks.py new file mode 100755 index 00000000..c3c13568 --- /dev/null +++ b/server/scripts/list_daily_webhooks.py @@ -0,0 +1,91 @@ +#!/usr/bin/env python3 + +import asyncio +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +import httpx + +from reflector.settings import settings + + +async def list_webhooks(): + """ + List all Daily.co webhooks for this account. + """ + if not settings.DAILY_API_KEY: + print("Error: DAILY_API_KEY not set") + return 1 + + headers = { + "Authorization": f"Bearer {settings.DAILY_API_KEY}", + "Content-Type": "application/json", + } + + async with httpx.AsyncClient() as client: + try: + """ + Daily.co webhook list response format: + [ + { + "uuid": "0b4e4c7c-5eaf-46fe-990b-a3752f5684f5", + "url": "{{webhook_url}}", + "hmac": "NQrSA5z0FkJ44QPrFerW7uCc5kdNLv3l2FDEKDanL1U=", + "basicAuth": null, + "eventTypes": [ + "recording.started", + "recording.ready-to-download" + ], + "state": "ACTVIE", + "failedCount": 0, + "lastMomentPushed": "2023-08-15T18:29:52.000Z", + "domainId": "{{domain_id}}", + "createdAt": "2023-08-15T18:28:30.000Z", + "updatedAt": "2023-08-15T18:29:52.000Z" + } + ] + """ + resp = await client.get( + "https://api.daily.co/v1/webhooks", + headers=headers, + ) + resp.raise_for_status() + webhooks = resp.json() + + if not webhooks: + print("No webhooks found") + return 0 + + print(f"Found {len(webhooks)} webhook(s):\n") + + for webhook in webhooks: + print("=" * 80) + print(f"UUID: {webhook['uuid']}") + print(f"URL: {webhook['url']}") + print(f"State: {webhook['state']}") + print(f"Event Types: {', '.join(webhook.get('eventTypes', []))}") + print( + f"HMAC Secret: {'✓ Configured' if webhook.get('hmac') else '✗ Not set'}" + ) + print() + + print("=" * 80) + print( + f"\nCurrent DAILY_WEBHOOK_UUID in settings: {settings.DAILY_WEBHOOK_UUID or '(not set)'}" + ) + + return 0 + + except httpx.HTTPStatusError as e: + print(f"Error fetching webhooks: {e}") + print(f"Response: {e.response.text}") + return 1 + except Exception as e: + print(f"Unexpected error: {e}") + return 1 + + +if __name__ == "__main__": + sys.exit(asyncio.run(list_webhooks())) diff --git a/server/tests/mocks/mock_platform.py b/server/tests/mocks/mock_platform.py index 0f84a271..b4d9ae90 100644 --- a/server/tests/mocks/mock_platform.py +++ b/server/tests/mocks/mock_platform.py @@ -3,9 +3,11 @@ from datetime import datetime from typing import Any, Dict, Literal, Optional from reflector.db.rooms import Room +from reflector.utils.string import NonEmptyString from reflector.video_platforms.base import ( ROOM_PREFIX_SEPARATOR, MeetingData, + SessionData, VideoPlatformClient, VideoPlatformConfig, ) @@ -49,22 +51,18 @@ class MockPlatformClient(VideoPlatformClient): extra_data={"mock": True}, ) - async def get_room_sessions(self, room_name: str) -> Dict[str, Any]: + async def get_room_sessions(self, room_name: NonEmptyString) -> list[SessionData]: if room_name not in self._rooms: - return {"error": "Room not found"} + return [] room_data = self._rooms[room_name] - return { - "roomName": room_name, - "sessions": [ - { - "sessionId": room_data["id"], - "startTime": datetime.utcnow().isoformat(), - "participants": room_data["participants"], - "isActive": room_data["is_active"], - } - ], - } + return [ + SessionData( + session_id=room_data["id"], + started_at=datetime.utcnow(), + ended_at=None if room_data["is_active"] else datetime.utcnow(), + ) + ] async def delete_room(self, room_name: str) -> bool: if room_name in self._rooms: diff --git a/server/tests/test_transcripts_process.py b/server/tests/test_transcripts_process.py index 5f45cf4b..3a0614c1 100644 --- a/server/tests/test_transcripts_process.py +++ b/server/tests/test_transcripts_process.py @@ -1,5 +1,6 @@ import asyncio import time +from unittest.mock import patch import pytest from httpx import ASGITransport, AsyncClient @@ -101,3 +102,113 @@ async def test_transcript_process( assert response.status_code == 200 assert len(response.json()) == 1 assert "Hello world. How are you today?" in response.json()[0]["transcript"] + + +@pytest.mark.usefixtures("setup_database") +@pytest.mark.asyncio +async def test_whereby_recording_uses_file_pipeline(client): + """Test that Whereby recordings (bucket_name but no track_keys) use file pipeline""" + from datetime import datetime, timezone + + from reflector.db.recordings import Recording, recordings_controller + from reflector.db.transcripts import transcripts_controller + + # Create transcript with Whereby recording (has bucket_name, no track_keys) + transcript = await transcripts_controller.add( + "", + source_kind="room", + source_language="en", + target_language="en", + user_id="test-user", + share_mode="public", + ) + + recording = await recordings_controller.create( + Recording( + bucket_name="whereby-bucket", + object_key="test-recording.mp4", # gitleaks:allow + meeting_id="test-meeting", + recorded_at=datetime.now(timezone.utc), + track_keys=None, # Whereby recordings have no track_keys + ) + ) + + await transcripts_controller.update( + transcript, {"recording_id": recording.id, "status": "uploaded"} + ) + + with ( + patch( + "reflector.views.transcripts_process.task_pipeline_file_process" + ) as mock_file_pipeline, + patch( + "reflector.views.transcripts_process.task_pipeline_multitrack_process" + ) as mock_multitrack_pipeline, + ): + response = await client.post(f"/transcripts/{transcript.id}/process") + + assert response.status_code == 200 + assert response.json()["status"] == "ok" + + # Whereby recordings should use file pipeline + mock_file_pipeline.delay.assert_called_once_with(transcript_id=transcript.id) + mock_multitrack_pipeline.delay.assert_not_called() + + +@pytest.mark.usefixtures("setup_database") +@pytest.mark.asyncio +async def test_dailyco_recording_uses_multitrack_pipeline(client): + """Test that Daily.co recordings (bucket_name + track_keys) use multitrack pipeline""" + from datetime import datetime, timezone + + from reflector.db.recordings import Recording, recordings_controller + from reflector.db.transcripts import transcripts_controller + + # Create transcript with Daily.co multitrack recording + transcript = await transcripts_controller.add( + "", + source_kind="room", + source_language="en", + target_language="en", + user_id="test-user", + share_mode="public", + ) + + track_keys = [ + "recordings/test-room/track1.webm", + "recordings/test-room/track2.webm", + ] + recording = await recordings_controller.create( + Recording( + bucket_name="daily-bucket", + object_key="recordings/test-room", + meeting_id="test-meeting", + track_keys=track_keys, + recorded_at=datetime.now(timezone.utc), + ) + ) + + await transcripts_controller.update( + transcript, {"recording_id": recording.id, "status": "uploaded"} + ) + + with ( + patch( + "reflector.views.transcripts_process.task_pipeline_file_process" + ) as mock_file_pipeline, + patch( + "reflector.views.transcripts_process.task_pipeline_multitrack_process" + ) as mock_multitrack_pipeline, + ): + response = await client.post(f"/transcripts/{transcript.id}/process") + + assert response.status_code == 200 + assert response.json()["status"] == "ok" + + # Daily.co multitrack recordings should use multitrack pipeline + mock_multitrack_pipeline.delay.assert_called_once_with( + transcript_id=transcript.id, + bucket_name="daily-bucket", + track_keys=track_keys, + ) + mock_file_pipeline.delay.assert_not_called()