From 4287f8b8aeee60e51db7539f4dcbda5f6e696bd8 Mon Sep 17 00:00:00 2001 From: Igor Monadical Date: Fri, 21 Nov 2025 10:24:04 -0500 Subject: [PATCH] feat: dailyco api module (#725) * dailyco api module (no-mistakes) * daily co library self-review * uncurse * self-review: daily resource leak, uniform types, enable_recording bomb, daily custom error, video_platforms/daily typing, daily timestamp dry * dailyco docs parser * remove generated daily docs --------- Co-authored-by: Igor Loskutov --- server/reflector/dailyco_api/README.md | 6 + server/reflector/dailyco_api/__init__.py | 96 ++++ server/reflector/dailyco_api/client.py | 527 ++++++++++++++++++ server/reflector/dailyco_api/requests.py | 158 ++++++ server/reflector/dailyco_api/responses.py | 182 ++++++ server/reflector/dailyco_api/webhook_utils.py | 229 ++++++++ server/reflector/dailyco_api/webhooks.py | 199 +++++++ server/reflector/video_platforms/daily.py | 252 +++------ server/reflector/views/daily.py | 70 +-- server/scripts/list_daily_webhooks.py | 58 +- server/scripts/recreate_daily_webhook.py | 84 +-- 11 files changed, 1558 insertions(+), 303 deletions(-) create mode 100644 server/reflector/dailyco_api/README.md create mode 100644 server/reflector/dailyco_api/__init__.py create mode 100644 server/reflector/dailyco_api/client.py create mode 100644 server/reflector/dailyco_api/requests.py create mode 100644 server/reflector/dailyco_api/responses.py create mode 100644 server/reflector/dailyco_api/webhook_utils.py create mode 100644 server/reflector/dailyco_api/webhooks.py diff --git a/server/reflector/dailyco_api/README.md b/server/reflector/dailyco_api/README.md new file mode 100644 index 00000000..88ec2cc3 --- /dev/null +++ b/server/reflector/dailyco_api/README.md @@ -0,0 +1,6 @@ +anything about Daily.co api interaction + +- webhook event shapes +- REST api client + +No REST api client existing found in the wild; the official lib is about working with videocall as a bot \ No newline at end of file diff --git a/server/reflector/dailyco_api/__init__.py b/server/reflector/dailyco_api/__init__.py new file mode 100644 index 00000000..1a65478b --- /dev/null +++ b/server/reflector/dailyco_api/__init__.py @@ -0,0 +1,96 @@ +""" +Daily.co API Module +""" + +# Client +from .client import DailyApiClient, DailyApiError + +# Request models +from .requests import ( + CreateMeetingTokenRequest, + CreateRoomRequest, + CreateWebhookRequest, + MeetingTokenProperties, + RecordingsBucketConfig, + RoomProperties, + UpdateWebhookRequest, +) + +# Response models +from .responses import ( + MeetingParticipant, + MeetingParticipantsResponse, + MeetingResponse, + MeetingTokenResponse, + RecordingResponse, + RecordingS3Info, + RoomPresenceParticipant, + RoomPresenceResponse, + RoomResponse, + WebhookResponse, +) + +# Webhook utilities +from .webhook_utils import ( + extract_room_name, + parse_participant_joined, + parse_participant_left, + parse_recording_error, + parse_recording_ready, + parse_recording_started, + parse_webhook_payload, + verify_webhook_signature, +) + +# Webhook models +from .webhooks import ( + DailyTrack, + DailyWebhookEvent, + ParticipantJoinedPayload, + ParticipantLeftPayload, + RecordingErrorPayload, + RecordingReadyToDownloadPayload, + RecordingStartedPayload, +) + +__all__ = [ + # Client + "DailyApiClient", + "DailyApiError", + # Requests + "CreateRoomRequest", + "RoomProperties", + "RecordingsBucketConfig", + "CreateMeetingTokenRequest", + "MeetingTokenProperties", + "CreateWebhookRequest", + "UpdateWebhookRequest", + # Responses + "RoomResponse", + "RoomPresenceResponse", + "RoomPresenceParticipant", + "MeetingParticipantsResponse", + "MeetingParticipant", + "MeetingResponse", + "RecordingResponse", + "RecordingS3Info", + "MeetingTokenResponse", + "WebhookResponse", + # Webhooks + "DailyWebhookEvent", + "DailyTrack", + "ParticipantJoinedPayload", + "ParticipantLeftPayload", + "RecordingStartedPayload", + "RecordingReadyToDownloadPayload", + "RecordingErrorPayload", + # Webhook utilities + "verify_webhook_signature", + "extract_room_name", + "parse_webhook_payload", + "parse_participant_joined", + "parse_participant_left", + "parse_recording_started", + "parse_recording_ready", + "parse_recording_error", +] diff --git a/server/reflector/dailyco_api/client.py b/server/reflector/dailyco_api/client.py new file mode 100644 index 00000000..24221bb2 --- /dev/null +++ b/server/reflector/dailyco_api/client.py @@ -0,0 +1,527 @@ +""" +Daily.co API Client + +Complete async client for Daily.co REST API with Pydantic models. + +Reference: https://docs.daily.co/reference/rest-api +""" + +from http import HTTPStatus +from typing import Any + +import httpx +import structlog + +from reflector.utils.string import NonEmptyString + +from .requests import ( + CreateMeetingTokenRequest, + CreateRoomRequest, + CreateWebhookRequest, + UpdateWebhookRequest, +) +from .responses import ( + MeetingParticipantsResponse, + MeetingResponse, + MeetingTokenResponse, + RecordingResponse, + RoomPresenceResponse, + RoomResponse, + WebhookResponse, +) + +logger = structlog.get_logger(__name__) + + +class DailyApiError(Exception): + """Daily.co API error with full request/response context.""" + + def __init__(self, operation: str, response: httpx.Response): + self.operation = operation + self.response = response + self.status_code = response.status_code + self.response_body = response.text + self.url = str(response.url) + self.request_body = ( + response.request.content.decode() if response.request.content else None + ) + + super().__init__( + f"Daily.co API error: {operation} failed with status {self.status_code}" + ) + + +class DailyApiClient: + """ + Complete async client for Daily.co REST API. + + Usage: + # Direct usage + client = DailyApiClient(api_key="your_api_key") + room = await client.create_room(CreateRoomRequest(name="my-room")) + await client.close() # Clean up when done + + # Context manager (recommended) + async with DailyApiClient(api_key="your_api_key") as client: + room = await client.create_room(CreateRoomRequest(name="my-room")) + """ + + BASE_URL = "https://api.daily.co/v1" + DEFAULT_TIMEOUT = 10.0 + + def __init__( + self, + api_key: NonEmptyString, + webhook_secret: NonEmptyString | None = None, + timeout: float = DEFAULT_TIMEOUT, + base_url: NonEmptyString | None = None, + ): + """ + Initialize Daily.co API client. + + Args: + api_key: Daily.co API key (Bearer token) + webhook_secret: Base64-encoded HMAC secret for webhook verification. + Must match the 'hmac' value provided when creating webhooks. + Generate with: base64.b64encode(os.urandom(32)).decode() + timeout: Default request timeout in seconds + base_url: Override base URL (for testing) + """ + self.api_key = api_key + self.webhook_secret = webhook_secret + self.timeout = timeout + self.base_url = base_url or self.BASE_URL + + self.headers = { + "Authorization": f"Bearer {api_key}", + "Content-Type": "application/json", + } + + self._client: httpx.AsyncClient | None = None + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + async def _get_client(self) -> httpx.AsyncClient: + if self._client is None: + self._client = httpx.AsyncClient(timeout=self.timeout) + return self._client + + async def close(self): + if self._client is not None: + await self._client.aclose() + self._client = None + + async def _handle_response( + self, response: httpx.Response, operation: str + ) -> dict[str, Any]: + """ + Handle API response with error logging. + + Args: + response: HTTP response + operation: Operation name for logging (e.g., "create_room") + + Returns: + Parsed JSON response + + Raises: + DailyApiError: If request failed with full context + """ + if response.status_code >= 400: + logger.error( + f"Daily.co API error: {operation}", + status_code=response.status_code, + response_body=response.text, + request_body=response.request.content.decode() + if response.request.content + else None, + url=str(response.url), + ) + raise DailyApiError(operation, response) + + return response.json() + + # ============================================================================ + # ROOMS + # ============================================================================ + + async def create_room(self, request: CreateRoomRequest) -> RoomResponse: + """ + Create a new Daily.co room. + + Reference: https://docs.daily.co/reference/rest-api/rooms/create-room + + Args: + request: Room creation request with name, privacy, and properties + + Returns: + Created room data including URL and ID + + Raises: + httpx.HTTPStatusError: If API request fails + """ + client = await self._get_client() + response = await client.post( + f"{self.base_url}/rooms", + headers=self.headers, + json=request.model_dump(exclude_none=True), + ) + + data = await self._handle_response(response, "create_room") + return RoomResponse(**data) + + async def get_room(self, room_name: NonEmptyString) -> RoomResponse: + """ + Get room configuration. + + Args: + room_name: Daily.co room name + + Returns: + Room configuration data + + Raises: + httpx.HTTPStatusError: If API request fails + """ + client = await self._get_client() + response = await client.get( + f"{self.base_url}/rooms/{room_name}", + headers=self.headers, + ) + + data = await self._handle_response(response, "get_room") + return RoomResponse(**data) + + async def get_room_presence( + self, room_name: NonEmptyString + ) -> RoomPresenceResponse: + """ + Get current participants in a room (real-time presence). + + Reference: https://docs.daily.co/reference/rest-api/rooms/get-room-presence + + Args: + room_name: Daily.co room name + + Returns: + List of currently present participants with join time and duration + + Raises: + httpx.HTTPStatusError: If API request fails + """ + client = await self._get_client() + response = await client.get( + f"{self.base_url}/rooms/{room_name}/presence", + headers=self.headers, + ) + + data = await self._handle_response(response, "get_room_presence") + return RoomPresenceResponse(**data) + + async def delete_room(self, room_name: NonEmptyString) -> None: + """ + Delete a room (idempotent - succeeds even if room doesn't exist). + + Reference: https://docs.daily.co/reference/rest-api/rooms/delete-room + + Args: + room_name: Daily.co room name + + Raises: + httpx.HTTPStatusError: If API request fails (except 404) + """ + client = await self._get_client() + response = await client.delete( + f"{self.base_url}/rooms/{room_name}", + headers=self.headers, + ) + + # Idempotent delete - 404 means already deleted + if response.status_code == HTTPStatus.NOT_FOUND: + logger.debug("Room not found (already deleted)", room_name=room_name) + return + + await self._handle_response(response, "delete_room") + + # ============================================================================ + # MEETINGS + # ============================================================================ + + async def get_meeting(self, meeting_id: NonEmptyString) -> MeetingResponse: + """ + Get full meeting information including participants. + + Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-information + + Args: + meeting_id: Daily.co meeting/session ID + + Returns: + Meeting metadata including room, duration, participants, and status + + Raises: + httpx.HTTPStatusError: If API request fails + """ + client = await self._get_client() + response = await client.get( + f"{self.base_url}/meetings/{meeting_id}", + headers=self.headers, + ) + + data = await self._handle_response(response, "get_meeting") + return MeetingResponse(**data) + + async def get_meeting_participants( + self, + meeting_id: NonEmptyString, + limit: int | None = None, + joined_after: NonEmptyString | None = None, + joined_before: NonEmptyString | None = None, + ) -> MeetingParticipantsResponse: + """ + Get historical participant data from a completed meeting (paginated). + + Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-participants + + Args: + meeting_id: Daily.co meeting/session ID + limit: Maximum number of participant records to return + joined_after: Return participants who joined after this participant_id + joined_before: Return participants who joined before this participant_id + + Returns: + List of participants with join times and duration + + Raises: + httpx.HTTPStatusError: If API request fails (404 when no more participants) + + Note: + For pagination, use joined_after with the last participant_id from previous response. + Returns 404 when no more participants remain. + """ + params = {} + if limit is not None: + params["limit"] = limit + if joined_after is not None: + params["joined_after"] = joined_after + if joined_before is not None: + params["joined_before"] = joined_before + + client = await self._get_client() + response = await client.get( + f"{self.base_url}/meetings/{meeting_id}/participants", + headers=self.headers, + params=params, + ) + + data = await self._handle_response(response, "get_meeting_participants") + return MeetingParticipantsResponse(**data) + + # ============================================================================ + # RECORDINGS + # ============================================================================ + + async def get_recording(self, recording_id: NonEmptyString) -> RecordingResponse: + """ + Get recording metadata and status. + + Reference: https://docs.daily.co/reference/rest-api/recordings + + Args: + recording_id: Daily.co recording ID + + Returns: + Recording metadata including status, duration, and S3 info + + Raises: + httpx.HTTPStatusError: If API request fails + """ + client = await self._get_client() + response = await client.get( + f"{self.base_url}/recordings/{recording_id}", + headers=self.headers, + ) + + data = await self._handle_response(response, "get_recording") + return RecordingResponse(**data) + + # ============================================================================ + # MEETING TOKENS + # ============================================================================ + + async def create_meeting_token( + self, request: CreateMeetingTokenRequest + ) -> MeetingTokenResponse: + """ + Create a meeting token for participant authentication. + + Reference: https://docs.daily.co/reference/rest-api/meeting-tokens/create-meeting-token + + Args: + request: Token properties including room name, user_id, permissions + + Returns: + JWT meeting token + + Raises: + httpx.HTTPStatusError: If API request fails + """ + client = await self._get_client() + response = await client.post( + f"{self.base_url}/meeting-tokens", + headers=self.headers, + json=request.model_dump(exclude_none=True), + ) + + data = await self._handle_response(response, "create_meeting_token") + return MeetingTokenResponse(**data) + + # ============================================================================ + # WEBHOOKS + # ============================================================================ + + async def list_webhooks(self) -> list[WebhookResponse]: + """ + List all configured webhooks for this account. + + Reference: https://docs.daily.co/reference/rest-api/webhooks + + Returns: + List of webhook configurations + + Raises: + httpx.HTTPStatusError: If API request fails + """ + client = await self._get_client() + response = await client.get( + f"{self.base_url}/webhooks", + headers=self.headers, + ) + + data = await self._handle_response(response, "list_webhooks") + + # Daily.co returns array directly (not paginated) + if isinstance(data, list): + return [WebhookResponse(**wh) for wh in data] + + # Future-proof: handle potential pagination envelope + if isinstance(data, dict) and "data" in data: + return [WebhookResponse(**wh) for wh in data["data"]] + + logger.warning("Unexpected webhook list response format", data=data) + return [] + + async def create_webhook(self, request: CreateWebhookRequest) -> WebhookResponse: + """ + Create a new webhook subscription. + + Reference: https://docs.daily.co/reference/rest-api/webhooks + + Args: + request: Webhook configuration with URL, event types, and HMAC secret + + Returns: + Created webhook with UUID and state + + Raises: + httpx.HTTPStatusError: If API request fails + """ + client = await self._get_client() + response = await client.post( + f"{self.base_url}/webhooks", + headers=self.headers, + json=request.model_dump(exclude_none=True), + ) + + data = await self._handle_response(response, "create_webhook") + return WebhookResponse(**data) + + async def update_webhook( + self, webhook_uuid: NonEmptyString, request: UpdateWebhookRequest + ) -> WebhookResponse: + """ + Update webhook configuration. + + Note: Daily.co may not support PATCH for all fields. + Common pattern is delete + recreate. + + Reference: https://docs.daily.co/reference/rest-api/webhooks + + Args: + webhook_uuid: Webhook UUID to update + request: Updated webhook configuration + + Returns: + Updated webhook configuration + + Raises: + httpx.HTTPStatusError: If API request fails + """ + client = await self._get_client() + response = await client.patch( + f"{self.base_url}/webhooks/{webhook_uuid}", + headers=self.headers, + json=request.model_dump(exclude_none=True), + ) + + data = await self._handle_response(response, "update_webhook") + return WebhookResponse(**data) + + async def delete_webhook(self, webhook_uuid: NonEmptyString) -> None: + """ + Delete a webhook. + + Reference: https://docs.daily.co/reference/rest-api/webhooks + + Args: + webhook_uuid: Webhook UUID to delete + + Raises: + httpx.HTTPStatusError: If webhook not found or deletion fails + """ + client = await self._get_client() + response = await client.delete( + f"{self.base_url}/webhooks/{webhook_uuid}", + headers=self.headers, + ) + + await self._handle_response(response, "delete_webhook") + + # ============================================================================ + # HELPER METHODS + # ============================================================================ + + async def find_webhook_by_url(self, url: NonEmptyString) -> WebhookResponse | None: + """ + Find a webhook by its URL. + + Args: + url: Webhook endpoint URL to search for + + Returns: + Webhook if found, None otherwise + """ + webhooks = await self.list_webhooks() + for webhook in webhooks: + if webhook.url == url: + return webhook + return None + + async def find_webhooks_by_pattern( + self, pattern: NonEmptyString + ) -> list[WebhookResponse]: + """ + Find webhooks matching a URL pattern (e.g., 'ngrok'). + + Args: + pattern: String to match in webhook URLs + + Returns: + List of matching webhooks + """ + webhooks = await self.list_webhooks() + return [wh for wh in webhooks if pattern in wh.url] diff --git a/server/reflector/dailyco_api/requests.py b/server/reflector/dailyco_api/requests.py new file mode 100644 index 00000000..e943b90f --- /dev/null +++ b/server/reflector/dailyco_api/requests.py @@ -0,0 +1,158 @@ +""" +Daily.co API Request Models + +Reference: https://docs.daily.co/reference/rest-api +""" + +from typing import List, Literal + +from pydantic import BaseModel, Field + +from reflector.utils.string import NonEmptyString + + +class RecordingsBucketConfig(BaseModel): + """ + S3 bucket configuration for raw-tracks recordings. + + Reference: https://docs.daily.co/reference/rest-api/rooms/create-room + """ + + bucket_name: NonEmptyString = Field(description="S3 bucket name") + bucket_region: NonEmptyString = Field(description="AWS region (e.g., 'us-east-1')") + assume_role_arn: NonEmptyString = Field( + description="AWS IAM role ARN that Daily.co will assume to write recordings" + ) + allow_api_access: bool = Field( + default=True, + description="Whether to allow API access to recording metadata", + ) + + +class RoomProperties(BaseModel): + """ + Room configuration properties. + """ + + enable_recording: Literal["cloud", "local", "raw-tracks"] | None = Field( + default=None, + description="Recording mode: 'cloud' for mixed, 'local' for local recording, 'raw-tracks' for multitrack, None to disable", + ) + enable_chat: bool = Field(default=True, description="Enable in-meeting chat") + enable_screenshare: bool = Field(default=True, description="Enable screen sharing") + start_video_off: bool = Field( + default=False, description="Start with video off for all participants" + ) + start_audio_off: bool = Field( + default=False, description="Start with audio muted for all participants" + ) + exp: int | None = Field( + None, description="Room expiration timestamp (Unix epoch seconds)" + ) + recordings_bucket: RecordingsBucketConfig | None = Field( + None, description="S3 bucket configuration for raw-tracks recordings" + ) + + +class CreateRoomRequest(BaseModel): + """ + Request to create a new Daily.co room. + + Reference: https://docs.daily.co/reference/rest-api/rooms/create-room + """ + + name: NonEmptyString = Field(description="Room name (must be unique within domain)") + privacy: Literal["public", "private"] = Field( + default="public", description="Room privacy setting" + ) + properties: RoomProperties = Field( + default_factory=RoomProperties, description="Room configuration properties" + ) + + +class MeetingTokenProperties(BaseModel): + """ + Properties for meeting token creation. + + Reference: https://docs.daily.co/reference/rest-api/meeting-tokens/create-meeting-token + """ + + room_name: NonEmptyString = Field(description="Room name this token is valid for") + user_id: NonEmptyString | None = Field( + None, description="User identifier to associate with token" + ) + is_owner: bool = Field( + default=False, description="Grant owner privileges to token holder" + ) + start_cloud_recording: bool = Field( + default=False, description="Automatically start cloud recording on join" + ) + enable_recording_ui: bool = Field( + default=True, description="Show recording controls in UI" + ) + eject_at_token_exp: bool = Field( + default=False, description="Eject participant when token expires" + ) + nbf: int | None = Field( + None, description="Not-before timestamp (Unix epoch seconds)" + ) + exp: int | None = Field( + None, description="Expiration timestamp (Unix epoch seconds)" + ) + + +class CreateMeetingTokenRequest(BaseModel): + """ + Request to create a meeting token for participant authentication. + + Reference: https://docs.daily.co/reference/rest-api/meeting-tokens/create-meeting-token + """ + + properties: MeetingTokenProperties = Field(description="Token properties") + + +class CreateWebhookRequest(BaseModel): + """ + Request to create a webhook subscription. + + Reference: https://docs.daily.co/reference/rest-api/webhooks + """ + + url: NonEmptyString = Field(description="Webhook endpoint URL (must be HTTPS)") + eventTypes: List[ + Literal[ + "participant.joined", + "participant.left", + "recording.started", + "recording.ready-to-download", + "recording.error", + ] + ] = Field( + description="Array of event types to subscribe to (only events we handle)" + ) + hmac: NonEmptyString = Field( + description="Base64-encoded HMAC secret for webhook signature verification" + ) + basicAuth: NonEmptyString | None = Field( + None, description="Optional basic auth credentials for webhook endpoint" + ) + + +class UpdateWebhookRequest(BaseModel): + """ + Request to update an existing webhook. + + Note: Daily.co API may not support PATCH for webhooks. + Common pattern is to delete and recreate. + + Reference: https://docs.daily.co/reference/rest-api/webhooks + """ + + url: NonEmptyString | None = Field(None, description="New webhook endpoint URL") + eventTypes: List[NonEmptyString] | None = Field( + None, description="New array of event types" + ) + hmac: NonEmptyString | None = Field(None, description="New HMAC secret") + basicAuth: NonEmptyString | None = Field( + None, description="New basic auth credentials" + ) diff --git a/server/reflector/dailyco_api/responses.py b/server/reflector/dailyco_api/responses.py new file mode 100644 index 00000000..4eb84245 --- /dev/null +++ b/server/reflector/dailyco_api/responses.py @@ -0,0 +1,182 @@ +""" +Daily.co API Response Models +""" + +from typing import Any, Dict, List, Literal + +from pydantic import BaseModel, Field + +from reflector.utils.string import NonEmptyString + +# not documented in daily; we fill it according to observations +RecordingStatus = Literal["in-progress", "finished"] + + +class RoomResponse(BaseModel): + """ + Response from room creation or retrieval. + + Reference: https://docs.daily.co/reference/rest-api/rooms/create-room + """ + + id: NonEmptyString = Field(description="Unique room identifier (UUID)") + name: NonEmptyString = Field(description="Room name used in URLs") + api_created: bool = Field(description="Whether room was created via API") + privacy: Literal["public", "private"] = Field(description="Room privacy setting") + url: NonEmptyString = Field(description="Full room URL") + created_at: NonEmptyString = Field(description="ISO 8601 creation timestamp") + config: Dict[NonEmptyString, Any] = Field( + default_factory=dict, description="Room configuration properties" + ) + + +class RoomPresenceParticipant(BaseModel): + """ + Participant presence information in a room. + + Reference: https://docs.daily.co/reference/rest-api/rooms/get-room-presence + """ + + room: NonEmptyString = Field(description="Room name") + id: NonEmptyString = Field(description="Participant session ID") + userId: NonEmptyString | None = Field(None, description="User ID if provided") + userName: NonEmptyString | None = Field(None, description="User display name") + joinTime: NonEmptyString = Field(description="ISO 8601 join timestamp") + duration: int = Field(description="Duration in room (seconds)") + + +class RoomPresenceResponse(BaseModel): + """ + Response from room presence endpoint. + + Reference: https://docs.daily.co/reference/rest-api/rooms/get-room-presence + """ + + total_count: int = Field( + description="Total number of participants currently in room" + ) + data: List[RoomPresenceParticipant] = Field( + default_factory=list, description="Array of participant presence data" + ) + + +class MeetingParticipant(BaseModel): + """ + Historical participant data from a meeting. + + Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-participants + """ + + user_id: NonEmptyString = Field(description="User identifier") + participant_id: NonEmptyString = Field(description="Participant session identifier") + user_name: NonEmptyString | None = Field(None, description="User display name") + join_time: int = Field(description="Join timestamp (Unix epoch seconds)") + duration: int = Field(description="Duration in meeting (seconds)") + + +class MeetingParticipantsResponse(BaseModel): + """ + Response from meeting participants endpoint. + + Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-participants + """ + + data: List[MeetingParticipant] = Field( + default_factory=list, description="Array of participant data" + ) + + +class MeetingResponse(BaseModel): + """ + Response from meeting information endpoint. + + Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-information + """ + + id: NonEmptyString = Field(description="Meeting session identifier (UUID)") + room: NonEmptyString = Field(description="Room name where meeting occurred") + start_time: int = Field( + description="Meeting start Unix timestamp (~15s granularity)" + ) + duration: int = Field(description="Total meeting duration in seconds") + ongoing: bool = Field(description="Whether meeting is currently active") + max_participants: int = Field(description="Peak concurrent participant count") + participants: List[MeetingParticipant] = Field( + default_factory=list, description="Array of participant session data" + ) + + +class RecordingS3Info(BaseModel): + """ + S3 bucket information for a recording. + + Reference: https://docs.daily.co/reference/rest-api/recordings + """ + + bucket_name: NonEmptyString + bucket_region: NonEmptyString + endpoint: NonEmptyString | None = None + + +class RecordingResponse(BaseModel): + """ + Response from recording retrieval endpoint. + + Reference: https://docs.daily.co/reference/rest-api/recordings + """ + + id: NonEmptyString = Field(description="Recording identifier") + room_name: NonEmptyString = Field(description="Room where recording occurred") + start_ts: int = Field(description="Recording start timestamp (Unix epoch seconds)") + status: RecordingStatus = Field( + description="Recording status ('in-progress' or 'finished')" + ) + max_participants: int = Field(description="Maximum participants during recording") + duration: int = Field(description="Recording duration in seconds") + share_token: NonEmptyString | None = Field( + None, description="Token for sharing recording" + ) + s3: RecordingS3Info | None = Field(None, description="S3 bucket information") + + +class MeetingTokenResponse(BaseModel): + """ + Response from meeting token creation. + + Reference: https://docs.daily.co/reference/rest-api/meeting-tokens/create-meeting-token + """ + + token: NonEmptyString = Field( + description="JWT meeting token for participant authentication" + ) + + +class WebhookResponse(BaseModel): + """ + Response from webhook creation or retrieval. + + Reference: https://docs.daily.co/reference/rest-api/webhooks + """ + + uuid: NonEmptyString = Field(description="Unique webhook identifier") + url: NonEmptyString = Field(description="Webhook endpoint URL") + hmac: NonEmptyString | None = Field( + None, description="Base64-encoded HMAC secret for signature verification" + ) + basicAuth: NonEmptyString | None = Field( + None, description="Basic auth credentials if configured" + ) + eventTypes: List[NonEmptyString] = Field( + default_factory=list, + description="Array of event types (e.g., ['recording.started', 'participant.joined'])", + ) + state: Literal["ACTIVE", "FAILED"] = Field( + description="Webhook state - FAILED after 3+ consecutive failures" + ) + failedCount: int = Field(default=0, description="Number of consecutive failures") + lastMomentPushed: NonEmptyString | None = Field( + None, description="ISO 8601 timestamp of last successful push" + ) + domainId: NonEmptyString = Field(description="Daily.co domain/account identifier") + createdAt: NonEmptyString = Field(description="ISO 8601 creation timestamp") + updatedAt: NonEmptyString = Field(description="ISO 8601 last update timestamp") diff --git a/server/reflector/dailyco_api/webhook_utils.py b/server/reflector/dailyco_api/webhook_utils.py new file mode 100644 index 00000000..b10d4fa2 --- /dev/null +++ b/server/reflector/dailyco_api/webhook_utils.py @@ -0,0 +1,229 @@ +""" +Daily.co Webhook Utilities + +Utilities for verifying and parsing Daily.co webhook events. + +Reference: https://docs.daily.co/reference/rest-api/webhooks +""" + +import base64 +import hmac +from hashlib import sha256 + +import structlog + +from .webhooks import ( + DailyWebhookEvent, + ParticipantJoinedPayload, + ParticipantLeftPayload, + RecordingErrorPayload, + RecordingReadyToDownloadPayload, + RecordingStartedPayload, +) + +logger = structlog.get_logger(__name__) + + +def verify_webhook_signature( + body: bytes, + signature: str, + timestamp: str, + webhook_secret: str, +) -> bool: + """ + Verify Daily.co webhook signature using HMAC-SHA256. + + Daily.co signature verification: + 1. Base64-decode the webhook secret + 2. Create signed content: timestamp + '.' + body + 3. Compute HMAC-SHA256(secret, signed_content) + 4. Base64-encode the result + 5. Compare with provided signature using constant-time comparison + + Reference: https://docs.daily.co/reference/rest-api/webhooks + + Args: + body: Raw request body bytes + signature: X-Webhook-Signature header value + timestamp: X-Webhook-Timestamp header value + webhook_secret: Base64-encoded HMAC secret + + Returns: + True if signature is valid, False otherwise + + Example: + >>> body = b'{"version":"1.0.0","type":"participant.joined",...}' + >>> signature = "abc123..." + >>> timestamp = "1234567890" + >>> secret = "your-base64-secret" + >>> is_valid = verify_webhook_signature(body, signature, timestamp, secret) + """ + if not signature or not timestamp or not webhook_secret: + logger.warning( + "Missing required data for webhook verification", + has_signature=bool(signature), + has_timestamp=bool(timestamp), + has_secret=bool(webhook_secret), + ) + return False + + try: + secret_bytes = base64.b64decode(webhook_secret) + signed_content = timestamp.encode() + b"." + body + expected = hmac.new(secret_bytes, signed_content, sha256).digest() + expected_b64 = base64.b64encode(expected).decode() + + # Constant-time comparison to prevent timing attacks + return hmac.compare_digest(expected_b64, signature) + + except (base64.binascii.Error, ValueError, TypeError, UnicodeDecodeError) as e: + logger.error( + "Webhook signature verification failed", + error=str(e), + error_type=type(e).__name__, + ) + return False + + +def extract_room_name(event: DailyWebhookEvent) -> str | None: + """ + Extract room name from Daily.co webhook event payload. + + Args: + event: Parsed webhook event + + Returns: + Room name if present and is a string, None otherwise + + Example: + >>> event = DailyWebhookEvent(**webhook_payload) + >>> room_name = extract_room_name(event) + """ + room = event.payload.get("room_name") + # Ensure we return a string, not any falsy value that might be in payload + return room if isinstance(room, str) else None + + +def parse_participant_joined(event: DailyWebhookEvent) -> ParticipantJoinedPayload: + """ + Parse participant.joined webhook event payload. + + Args: + event: Webhook event with type "participant.joined" + + Returns: + Parsed participant joined payload + + Raises: + pydantic.ValidationError: If payload doesn't match expected schema + """ + return ParticipantJoinedPayload(**event.payload) + + +def parse_participant_left(event: DailyWebhookEvent) -> ParticipantLeftPayload: + """ + Parse participant.left webhook event payload. + + Args: + event: Webhook event with type "participant.left" + + Returns: + Parsed participant left payload + + Raises: + pydantic.ValidationError: If payload doesn't match expected schema + """ + return ParticipantLeftPayload(**event.payload) + + +def parse_recording_started(event: DailyWebhookEvent) -> RecordingStartedPayload: + """ + Parse recording.started webhook event payload. + + Args: + event: Webhook event with type "recording.started" + + Returns: + Parsed recording started payload + + Raises: + pydantic.ValidationError: If payload doesn't match expected schema + """ + return RecordingStartedPayload(**event.payload) + + +def parse_recording_ready( + event: DailyWebhookEvent, +) -> RecordingReadyToDownloadPayload: + """ + Parse recording.ready-to-download webhook event payload. + + This event is sent when raw-tracks recordings are complete and uploaded to S3. + The payload includes a 'tracks' array with individual audio/video files. + + Args: + event: Webhook event with type "recording.ready-to-download" + + Returns: + Parsed recording ready payload with tracks array + + Raises: + pydantic.ValidationError: If payload doesn't match expected schema + + Example: + >>> event = DailyWebhookEvent(**webhook_payload) + >>> if event.type == "recording.ready-to-download": + ... payload = parse_recording_ready(event) + ... audio_tracks = [t for t in payload.tracks if t.type == "audio"] + """ + return RecordingReadyToDownloadPayload(**event.payload) + + +def parse_recording_error(event: DailyWebhookEvent) -> RecordingErrorPayload: + """ + Parse recording.error webhook event payload. + + Args: + event: Webhook event with type "recording.error" + + Returns: + Parsed recording error payload + + Raises: + pydantic.ValidationError: If payload doesn't match expected schema + """ + return RecordingErrorPayload(**event.payload) + + +# Webhook event type to parser mapping +WEBHOOK_PARSERS = { + "participant.joined": parse_participant_joined, + "participant.left": parse_participant_left, + "recording.started": parse_recording_started, + "recording.ready-to-download": parse_recording_ready, + "recording.error": parse_recording_error, +} + + +def parse_webhook_payload(event: DailyWebhookEvent): + """ + Parse webhook event payload based on event type. + + Args: + event: Webhook event + + Returns: + Typed payload model based on event type, or raw dict if unknown + + Example: + >>> event = DailyWebhookEvent(**webhook_payload) + >>> payload = parse_webhook_payload(event) + >>> if isinstance(payload, ParticipantJoinedPayload): + ... print(f"User {payload.user_name} joined") + """ + parser = WEBHOOK_PARSERS.get(event.type) + if parser: + return parser(event) + else: + logger.warning("Unknown webhook event type", event_type=event.type) + return event.payload diff --git a/server/reflector/dailyco_api/webhooks.py b/server/reflector/dailyco_api/webhooks.py new file mode 100644 index 00000000..862f4996 --- /dev/null +++ b/server/reflector/dailyco_api/webhooks.py @@ -0,0 +1,199 @@ +""" +Daily.co Webhook Event Models + +Reference: https://docs.daily.co/reference/rest-api/webhooks +""" + +from typing import Any, Dict, Literal + +from pydantic import BaseModel, Field, field_validator + +from reflector.utils.string import NonEmptyString + + +def normalize_timestamp_to_int(v): + """ + Normalize float timestamps to int by truncating decimal part. + + Daily.co sometimes sends timestamps as floats (e.g., 1708972279.96). + Pydantic expects int for fields typed as `int`. + """ + if v is None: + return v + if isinstance(v, float): + return int(v) + return v + + +WebhookEventType = Literal[ + "participant.joined", + "participant.left", + "recording.started", + "recording.ready-to-download", + "recording.error", +] + + +class DailyTrack(BaseModel): + """ + Individual audio or video track from a multitrack recording. + + Reference: https://docs.daily.co/reference/rest-api/recordings + """ + + type: Literal["audio", "video"] + s3Key: NonEmptyString = Field(description="S3 object key for the track file") + size: int = Field(description="File size in bytes") + + +class DailyWebhookEvent(BaseModel): + """ + Base structure for all Daily.co webhook events. + All events share five common fields documented below. + + Reference: https://docs.daily.co/reference/rest-api/webhooks + """ + + version: NonEmptyString = Field( + description="Represents the version of the event. This uses semantic versioning to inform a consumer if the payload has introduced any breaking changes" + ) + type: WebhookEventType = Field( + description="Represents the type of the event described in the payload" + ) + id: NonEmptyString = Field( + description="An identifier representing this specific event" + ) + payload: Dict[NonEmptyString, Any] = Field( + description="An object representing the event, whose fields are described in the corresponding payload class" + ) + event_ts: int = Field( + description="Documenting when the webhook itself was sent. This timestamp is different than the time of the event the webhook describes. For example, a recording.started event will contain a start_ts timestamp of when the actual recording started, and a slightly later event_ts timestamp indicating when the webhook event was sent" + ) + + _normalize_event_ts = field_validator("event_ts", mode="before")( + normalize_timestamp_to_int + ) + + +class ParticipantJoinedPayload(BaseModel): + """ + Payload for participant.joined webhook event. + + Reference: https://docs.daily.co/reference/rest-api/webhooks/events/participant-joined + """ + + room_name: NonEmptyString | None = Field(None, description="Daily.co room name") + session_id: NonEmptyString = Field(description="Daily.co session identifier") + user_id: NonEmptyString = Field(description="User identifier (may be encoded)") + user_name: NonEmptyString | None = Field(None, description="User display name") + joined_at: int = Field(description="Join timestamp in Unix epoch seconds") + + _normalize_joined_at = field_validator("joined_at", mode="before")( + normalize_timestamp_to_int + ) + + +class ParticipantLeftPayload(BaseModel): + """ + Payload for participant.left webhook event. + + Reference: https://docs.daily.co/reference/rest-api/webhooks/events/participant-left + """ + + room_name: NonEmptyString | None = Field(None, description="Daily.co room name") + session_id: NonEmptyString = Field(description="Daily.co session identifier") + user_id: NonEmptyString = Field(description="User identifier (may be encoded)") + user_name: NonEmptyString | None = Field(None, description="User display name") + joined_at: int = Field(description="Join timestamp in Unix epoch seconds") + duration: int | None = Field( + None, description="Duration of participation in seconds" + ) + + _normalize_joined_at = field_validator("joined_at", mode="before")( + normalize_timestamp_to_int + ) + + +class RecordingStartedPayload(BaseModel): + """ + Payload for recording.started webhook event. + + Reference: https://docs.daily.co/reference/rest-api/webhooks/events/recording-started + """ + + room_name: NonEmptyString | None = Field(None, description="Daily.co room name") + recording_id: NonEmptyString = Field(description="Recording identifier") + start_ts: int | None = Field(None, description="Recording start timestamp") + + _normalize_start_ts = field_validator("start_ts", mode="before")( + normalize_timestamp_to_int + ) + + +class RecordingReadyToDownloadPayload(BaseModel): + """ + Payload for recording.ready-to-download webhook event. + This is sent when raw-tracks recordings are complete and uploaded to S3. + + Reference: https://docs.daily.co/reference/rest-api/webhooks/events/recording-ready-to-download + """ + + type: Literal["cloud", "raw-tracks"] = Field( + description="The type of recording that was generated" + ) + recording_id: NonEmptyString = Field( + description="An ID identifying the recording that was generated" + ) + room_name: NonEmptyString = Field( + description="The name of the room where the recording was made" + ) + start_ts: int = Field( + description="The Unix epoch time in seconds representing when the recording started" + ) + status: Literal["finished"] = Field( + description="The status of the given recording (always 'finished' in ready-to-download webhook, see RecordingStatus in responses.py for full API statuses)" + ) + max_participants: int = Field( + description="The number of participants on the call that were recorded" + ) + duration: int = Field(description="The duration in seconds of the call") + s3_key: NonEmptyString = Field( + description="The location of the recording in the provided S3 bucket" + ) + share_token: NonEmptyString | None = Field( + None, description="undocumented documented secret field" + ) + tracks: list[DailyTrack] | None = Field( + None, + description="If the recording is a raw-tracks recording, a tracks field will be provided. If role permissions have been removed, the tracks field may be null", + ) + + _normalize_start_ts = field_validator("start_ts", mode="before")( + normalize_timestamp_to_int + ) + + +class RecordingErrorPayload(BaseModel): + """ + Payload for recording.error webhook event. + + Reference: https://docs.daily.co/reference/rest-api/webhooks/events/recording-error + """ + + action: Literal["clourd-recording-err", "cloud-recording-error"] = Field( + description="A string describing the event that was emitted (both variants are documented)" + ) + error_msg: NonEmptyString = Field(description="The error message returned") + instance_id: NonEmptyString = Field( + description="The recording instance ID that was passed into the start recording command" + ) + room_name: NonEmptyString = Field( + description="The name of the room where the recording was made" + ) + timestamp: int = Field( + description="The Unix epoch time in seconds representing when the error was emitted" + ) + + _normalize_timestamp = field_validator("timestamp", mode="before")( + normalize_timestamp_to_int + ) diff --git a/server/reflector/video_platforms/daily.py b/server/reflector/video_platforms/daily.py index 7bec4864..7485cc95 100644 --- a/server/reflector/video_platforms/daily.py +++ b/server/reflector/video_platforms/daily.py @@ -1,12 +1,17 @@ -import base64 -import hmac from datetime import datetime -from hashlib import sha256 -from http import HTTPStatus -from typing import Any, Dict, Optional - -import httpx +from reflector.dailyco_api import ( + CreateMeetingTokenRequest, + CreateRoomRequest, + DailyApiClient, + MeetingParticipantsResponse, + MeetingTokenProperties, + RecordingResponse, + RecordingsBucketConfig, + RoomPresenceResponse, + RoomProperties, + verify_webhook_signature, +) from reflector.db.daily_participant_sessions import ( daily_participant_sessions_controller, ) @@ -23,18 +28,17 @@ from .models import MeetingData, RecordingType, SessionData, VideoPlatformConfig class DailyClient(VideoPlatformClient): PLATFORM_NAME: Platform = "daily" - TIMEOUT = 10 - BASE_URL = "https://api.daily.co/v1" TIMESTAMP_FORMAT = "%Y%m%d%H%M%S" RECORDING_NONE: RecordingType = "none" RECORDING_CLOUD: RecordingType = "cloud" def __init__(self, config: VideoPlatformConfig): super().__init__(config) - self.headers = { - "Authorization": f"Bearer {config.api_key}", - "Content-Type": "application/json", - } + self._api_client = DailyApiClient( + api_key=config.api_key, + webhook_secret=config.webhook_secret, + timeout=10.0, + ) async def create_meeting( self, room_name_prefix: NonEmptyString, end_date: datetime, room: Room @@ -49,57 +53,43 @@ class DailyClient(VideoPlatformClient): timestamp = datetime.now().strftime(self.TIMESTAMP_FORMAT) room_name = f"{room_name_prefix}{ROOM_PREFIX_SEPARATOR}{timestamp}" - data = { - "name": room_name, - "privacy": "private" if room.is_locked else "public", - "properties": { - "enable_recording": "raw-tracks" - if room.recording_type != self.RECORDING_NONE - else False, - "enable_chat": True, - "enable_screenshare": True, - "start_video_off": False, - "start_audio_off": False, - "exp": int(end_date.timestamp()), - }, - } + properties = RoomProperties( + enable_recording="raw-tracks" + if room.recording_type != self.RECORDING_NONE + else False, + enable_chat=True, + enable_screenshare=True, + start_video_off=False, + start_audio_off=False, + exp=int(end_date.timestamp()), + ) # Only configure recordings_bucket if recording is enabled if room.recording_type != self.RECORDING_NONE: daily_storage = get_dailyco_storage() assert daily_storage.bucket_name, "S3 bucket must be configured" - data["properties"]["recordings_bucket"] = { - "bucket_name": daily_storage.bucket_name, - "bucket_region": daily_storage.region, - "assume_role_arn": daily_storage.role_credential, - "allow_api_access": True, - } - async with httpx.AsyncClient() as client: - response = await client.post( - f"{self.BASE_URL}/rooms", - headers=self.headers, - json=data, - timeout=self.TIMEOUT, + properties.recordings_bucket = RecordingsBucketConfig( + bucket_name=daily_storage.bucket_name, + bucket_region=daily_storage.region, + assume_role_arn=daily_storage.role_credential, + allow_api_access=True, ) - if response.status_code >= 400: - logger.error( - "Daily.co API error", - status_code=response.status_code, - response_body=response.text, - request_data=data, - ) - response.raise_for_status() - result = response.json() - room_url = result["url"] + request = CreateRoomRequest( + name=room_name, + privacy="private" if room.is_locked else "public", + properties=properties, + ) + + result = await self._api_client.create_room(request) return MeetingData( - meeting_id=result["id"], - room_name=result["name"], - room_url=room_url, - host_room_url=room_url, + meeting_id=result.id, + room_name=result.name, + room_url=result.url, + host_room_url=result.url, platform=self.PLATFORM_NAME, - extra_data=result, + extra_data=result.model_dump(), ) async def get_room_sessions(self, room_name: str) -> list[SessionData]: @@ -108,7 +98,7 @@ class DailyClient(VideoPlatformClient): Daily.co doesn't provide historical session API, so we query our database where participant.joined/left webhooks are stored. """ - from reflector.db.meetings import meetings_controller + from reflector.db.meetings import meetings_controller # noqa: PLC0415 meeting = await meetings_controller.get_by_room_name(room_name) if not meeting: @@ -127,135 +117,65 @@ class DailyClient(VideoPlatformClient): for s in sessions ] - async def get_room_presence(self, room_name: str) -> Dict[str, Any]: - """Get room presence/session data for a Daily.co room. + async def get_room_presence(self, room_name: str) -> RoomPresenceResponse: + """Get room presence/session data for a Daily.co room.""" + return await self._api_client.get_room_presence(room_name) - Example response: - { - "total_count": 1, - "data": [ - { - "room": "w2pp2cf4kltgFACPKXmX", - "id": "d61cd7b2-a273-42b4-89bd-be763fd562c1", - "userId": "pbZ+ismP7dk=", - "userName": "Moishe", - "joinTime": "2023-01-01T20:53:19.000Z", - "duration": 2312 - } - ] - } - """ - async with httpx.AsyncClient() as client: - response = await client.get( - f"{self.BASE_URL}/rooms/{room_name}/presence", - headers=self.headers, - timeout=self.TIMEOUT, - ) - response.raise_for_status() - return response.json() + async def get_meeting_participants( + self, meeting_id: str + ) -> MeetingParticipantsResponse: + """Get participant data for a specific Daily.co meeting.""" + return await self._api_client.get_meeting_participants(meeting_id) - async def get_meeting_participants(self, meeting_id: str) -> Dict[str, Any]: - """Get participant data for a specific Daily.co meeting. - - Example response: - { - "data": [ - { - "user_id": "4q47OTmqa/w=", - "participant_id": "d61cd7b2-a273-42b4-89bd-be763fd562c1", - "user_name": "Lindsey", - "join_time": 1672786813, - "duration": 150 - }, - { - "user_id": "pbZ+ismP7dk=", - "participant_id": "b3d56359-14d7-46af-ac8b-18f8c991f5f6", - "user_name": "Moishe", - "join_time": 1672786797, - "duration": 165 - } - ] - } - """ - async with httpx.AsyncClient() as client: - response = await client.get( - f"{self.BASE_URL}/meetings/{meeting_id}/participants", - headers=self.headers, - timeout=self.TIMEOUT, - ) - response.raise_for_status() - return response.json() - - async def get_recording(self, recording_id: str) -> Dict[str, Any]: - async with httpx.AsyncClient() as client: - response = await client.get( - f"{self.BASE_URL}/recordings/{recording_id}", - headers=self.headers, - timeout=self.TIMEOUT, - ) - response.raise_for_status() - return response.json() + async def get_recording(self, recording_id: str) -> RecordingResponse: + return await self._api_client.get_recording(recording_id) async def delete_room(self, room_name: str) -> bool: - async with httpx.AsyncClient() as client: - response = await client.delete( - f"{self.BASE_URL}/rooms/{room_name}", - headers=self.headers, - timeout=self.TIMEOUT, - ) - return response.status_code in (HTTPStatus.OK, HTTPStatus.NOT_FOUND) + """Delete a room (idempotent - succeeds even if room doesn't exist).""" + await self._api_client.delete_room(room_name) + return True async def upload_logo(self, room_name: str, logo_path: str) -> bool: return True def verify_webhook_signature( - self, body: bytes, signature: str, timestamp: Optional[str] = None + self, body: bytes, signature: str, timestamp: str | None = None ) -> bool: - """Verify Daily.co webhook signature. - - Daily.co uses: - - X-Webhook-Signature header - - X-Webhook-Timestamp header - - Signature format: HMAC-SHA256(base64_decode(secret), timestamp + '.' + body) - - Result is base64 encoded - """ - if not signature or not timestamp: + """Verify Daily.co webhook signature using dailyco_api module.""" + if not self.config.webhook_secret: + logger.warning("Webhook secret not configured") return False - try: - secret_bytes = base64.b64decode(self.config.webhook_secret) - - signed_content = timestamp.encode() + b"." + body - - expected = hmac.new(secret_bytes, signed_content, sha256).digest() - expected_b64 = base64.b64encode(expected).decode() - - return hmac.compare_digest(expected_b64, signature) - except Exception as e: - logger.error("Daily.co webhook signature verification failed", exc_info=e) - return False + return verify_webhook_signature( + body=body, + signature=signature, + timestamp=timestamp or "", + webhook_secret=self.config.webhook_secret, + ) async def create_meeting_token( self, room_name: DailyRoomName, enable_recording: bool, - user_id: Optional[str] = None, + user_id: str | None = None, ) -> str: - data = {"properties": {"room_name": room_name}} + properties = MeetingTokenProperties( + room_name=room_name, + user_id=user_id, + start_cloud_recording=enable_recording, + enable_recording_ui=not enable_recording, + ) - if enable_recording: - data["properties"]["start_cloud_recording"] = True - data["properties"]["enable_recording_ui"] = False + request = CreateMeetingTokenRequest(properties=properties) + result = await self._api_client.create_meeting_token(request) + return result.token - if user_id: - data["properties"]["user_id"] = user_id + async def close(self): + """Clean up API client resources.""" + await self._api_client.close() - async with httpx.AsyncClient() as client: - response = await client.post( - f"{self.BASE_URL}/meeting-tokens", - headers=self.headers, - json=data, - timeout=self.TIMEOUT, - ) - response.raise_for_status() - return response.json()["token"] + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index baad97e9..733c70a3 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -1,10 +1,14 @@ import json from datetime import datetime, timezone -from typing import Any, Dict, Literal from fastapi import APIRouter, HTTPException, Request -from pydantic import BaseModel +from reflector.dailyco_api import ( + DailyTrack, + DailyWebhookEvent, + extract_room_name, + parse_recording_error, +) from reflector.db import get_database from reflector.db.daily_participant_sessions import ( DailyParticipantSession, @@ -13,7 +17,6 @@ from reflector.db.daily_participant_sessions import ( from reflector.db.meetings import meetings_controller from reflector.logger import logger as _logger from reflector.settings import settings -from reflector.utils.daily import DailyRoomName from reflector.video_platforms.factory import create_platform_client from reflector.worker.process import process_multitrack_recording @@ -22,30 +25,6 @@ router = APIRouter() logger = _logger.bind(platform="daily") -class DailyTrack(BaseModel): - type: Literal["audio", "video"] - s3Key: str - size: int - - -class DailyWebhookEvent(BaseModel): - version: str - type: str - id: str - payload: Dict[str, Any] - event_ts: float - - -def _extract_room_name(event: DailyWebhookEvent) -> DailyRoomName | None: - """Extract room name from Daily event payload. - - Daily.co API inconsistency: - - participant.* events use "room" field - - recording.* events use "room_name" field - """ - return event.payload.get("room_name") or event.payload.get("room") - - @router.post("/webhook") async def webhook(request: Request): """Handle Daily webhook events. @@ -77,18 +56,14 @@ async def webhook(request: Request): client = create_platform_client("daily") - # TEMPORARY: Bypass signature check for testing - # TODO: Remove this after testing is complete - BYPASS_FOR_TESTING = True - if not BYPASS_FOR_TESTING: - if not client.verify_webhook_signature(body, signature, timestamp): - logger.warning( - "Invalid webhook signature", - signature=signature, - timestamp=timestamp, - has_body=bool(body), - ) - raise HTTPException(status_code=401, detail="Invalid webhook signature") + if not client.verify_webhook_signature(body, signature, timestamp): + logger.warning( + "Invalid webhook signature", + signature=signature, + timestamp=timestamp, + has_body=bool(body), + ) + raise HTTPException(status_code=401, detail="Invalid webhook signature") try: body_json = json.loads(body) @@ -99,14 +74,12 @@ async def webhook(request: Request): logger.info("Received Daily webhook test event") return {"status": "ok"} - # Parse as actual event try: event = DailyWebhookEvent(**body_json) except Exception as e: logger.error("Failed to parse webhook event", error=str(e), body=body.decode()) raise HTTPException(status_code=422, detail="Invalid event format") - # Handle participant events if event.type == "participant.joined": await _handle_participant_joined(event) elif event.type == "participant.left": @@ -154,7 +127,7 @@ async def webhook(request: Request): async def _handle_participant_joined(event: DailyWebhookEvent): - daily_room_name = _extract_room_name(event) + daily_room_name = extract_room_name(event) if not daily_room_name: logger.warning("participant.joined: no room in payload", payload=event.payload) return @@ -167,7 +140,6 @@ async def _handle_participant_joined(event: DailyWebhookEvent): return payload = event.payload - logger.warning({"payload": payload}) joined_at = datetime.fromtimestamp(payload["joined_at"], tz=timezone.utc) session_id = f"{meeting.id}:{payload['session_id']}" @@ -225,7 +197,7 @@ async def _handle_participant_joined(event: DailyWebhookEvent): async def _handle_participant_left(event: DailyWebhookEvent): - room_name = _extract_room_name(event) + room_name = extract_room_name(event) if not room_name: logger.warning("participant.left: no room in payload", payload=event.payload) return @@ -268,7 +240,7 @@ async def _handle_participant_left(event: DailyWebhookEvent): async def _handle_recording_started(event: DailyWebhookEvent): - room_name = _extract_room_name(event) + room_name = extract_room_name(event) if not room_name: logger.warning( "recording.started: no room_name in payload", payload=event.payload @@ -301,7 +273,7 @@ async def _handle_recording_ready(event: DailyWebhookEvent): ] } """ - room_name = _extract_room_name(event) + room_name = extract_room_name(event) recording_id = event.payload.get("recording_id") tracks_raw = event.payload.get("tracks", []) @@ -350,8 +322,8 @@ async def _handle_recording_ready(event: DailyWebhookEvent): async def _handle_recording_error(event: DailyWebhookEvent): - room_name = _extract_room_name(event) - error = event.payload.get("error", "Unknown error") + payload = parse_recording_error(event) + room_name = payload.room_name if room_name: meeting = await meetings_controller.get_by_room_name(room_name) @@ -360,6 +332,6 @@ async def _handle_recording_error(event: DailyWebhookEvent): "Recording error", meeting_id=meeting.id, room_name=room_name, - error=error, + error=payload.error_msg, platform="daily", ) diff --git a/server/scripts/list_daily_webhooks.py b/server/scripts/list_daily_webhooks.py index c3c13568..e2e3c912 100755 --- a/server/scripts/list_daily_webhooks.py +++ b/server/scripts/list_daily_webhooks.py @@ -6,53 +6,19 @@ from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) -import httpx - +from reflector.dailyco_api import DailyApiClient from reflector.settings import settings async def list_webhooks(): - """ - List all Daily.co webhooks for this account. - """ + """List all Daily.co webhooks for this account using dailyco_api module.""" if not settings.DAILY_API_KEY: print("Error: DAILY_API_KEY not set") return 1 - headers = { - "Authorization": f"Bearer {settings.DAILY_API_KEY}", - "Content-Type": "application/json", - } - - async with httpx.AsyncClient() as client: + async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client: try: - """ - Daily.co webhook list response format: - [ - { - "uuid": "0b4e4c7c-5eaf-46fe-990b-a3752f5684f5", - "url": "{{webhook_url}}", - "hmac": "NQrSA5z0FkJ44QPrFerW7uCc5kdNLv3l2FDEKDanL1U=", - "basicAuth": null, - "eventTypes": [ - "recording.started", - "recording.ready-to-download" - ], - "state": "ACTVIE", - "failedCount": 0, - "lastMomentPushed": "2023-08-15T18:29:52.000Z", - "domainId": "{{domain_id}}", - "createdAt": "2023-08-15T18:28:30.000Z", - "updatedAt": "2023-08-15T18:29:52.000Z" - } - ] - """ - resp = await client.get( - "https://api.daily.co/v1/webhooks", - headers=headers, - ) - resp.raise_for_status() - webhooks = resp.json() + webhooks = await client.list_webhooks() if not webhooks: print("No webhooks found") @@ -62,12 +28,12 @@ async def list_webhooks(): for webhook in webhooks: print("=" * 80) - print(f"UUID: {webhook['uuid']}") - print(f"URL: {webhook['url']}") - print(f"State: {webhook['state']}") - print(f"Event Types: {', '.join(webhook.get('eventTypes', []))}") + print(f"UUID: {webhook.uuid}") + print(f"URL: {webhook.url}") + print(f"State: {webhook.state}") + print(f"Event Types: {', '.join(webhook.eventTypes)}") print( - f"HMAC Secret: {'✓ Configured' if webhook.get('hmac') else '✗ Not set'}" + f"HMAC Secret: {'✓ Configured' if webhook.hmac else '✗ Not set'}" ) print() @@ -78,12 +44,8 @@ async def list_webhooks(): return 0 - except httpx.HTTPStatusError as e: - print(f"Error fetching webhooks: {e}") - print(f"Response: {e.response.text}") - return 1 except Exception as e: - print(f"Unexpected error: {e}") + print(f"Error fetching webhooks: {e}") return 1 diff --git a/server/scripts/recreate_daily_webhook.py b/server/scripts/recreate_daily_webhook.py index a378baf2..e4ac9ce9 100644 --- a/server/scripts/recreate_daily_webhook.py +++ b/server/scripts/recreate_daily_webhook.py @@ -6,56 +6,60 @@ from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) -import httpx - +from reflector.dailyco_api import ( + CreateWebhookRequest, + DailyApiClient, +) from reflector.settings import settings async def setup_webhook(webhook_url: str): """ - Create or update Daily.co webhook for this environment. + Create or update Daily.co webhook for this environment using dailyco_api module. Uses DAILY_WEBHOOK_UUID to identify existing webhook. """ if not settings.DAILY_API_KEY: print("Error: DAILY_API_KEY not set") return 1 - headers = { - "Authorization": f"Bearer {settings.DAILY_API_KEY}", - "Content-Type": "application/json", - } + if not settings.DAILY_WEBHOOK_SECRET: + print("Error: DAILY_WEBHOOK_SECRET not set") + return 1 - webhook_data = { - "url": webhook_url, - "eventTypes": [ - "participant.joined", - "participant.left", - "recording.started", - "recording.ready-to-download", - "recording.error", - ], - "hmac": settings.DAILY_WEBHOOK_SECRET, - } + event_types = [ + "participant.joined", + "participant.left", + "recording.started", + "recording.ready-to-download", + "recording.error", + ] - async with httpx.AsyncClient() as client: + async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client: webhook_uuid = settings.DAILY_WEBHOOK_UUID if webhook_uuid: - # Update existing webhook print(f"Updating existing webhook {webhook_uuid}...") try: - resp = await client.patch( - f"https://api.daily.co/v1/webhooks/{webhook_uuid}", - headers=headers, - json=webhook_data, + # Note: Daily.co doesn't support PATCH well, so we delete + recreate + await client.delete_webhook(webhook_uuid) + print(f"Deleted old webhook {webhook_uuid}") + + request = CreateWebhookRequest( + url=webhook_url, + eventTypes=event_types, + hmac=settings.DAILY_WEBHOOK_SECRET, ) - resp.raise_for_status() - result = resp.json() - print(f"✓ Updated webhook {result['uuid']} (state: {result['state']})") - print(f" URL: {result['url']}") - return 0 - except httpx.HTTPStatusError as e: - if e.response.status_code == 404: + result = await client.create_webhook(request) + + print( + f"✓ Created replacement webhook {result.uuid} (state: {result.state})" + ) + print(f" URL: {result.url}") + + webhook_uuid = result.uuid + + except Exception as e: + if hasattr(e, "response") and e.response.status_code == 404: print(f"Webhook {webhook_uuid} not found, creating new one...") webhook_uuid = None # Fall through to creation else: @@ -63,17 +67,17 @@ async def setup_webhook(webhook_url: str): return 1 if not webhook_uuid: - # Create new webhook print("Creating new webhook...") - resp = await client.post( - "https://api.daily.co/v1/webhooks", headers=headers, json=webhook_data + request = CreateWebhookRequest( + url=webhook_url, + eventTypes=event_types, + hmac=settings.DAILY_WEBHOOK_SECRET, ) - resp.raise_for_status() - result = resp.json() - webhook_uuid = result["uuid"] + result = await client.create_webhook(request) + webhook_uuid = result.uuid - print(f"✓ Created webhook {webhook_uuid} (state: {result['state']})") - print(f" URL: {result['url']}") + print(f"✓ Created webhook {webhook_uuid} (state: {result.state})") + print(f" URL: {result.url}") print() print("=" * 60) print("IMPORTANT: Add this to your environment variables:") @@ -114,7 +118,7 @@ if __name__ == "__main__": ) print() print("Behavior:") - print(" - If DAILY_WEBHOOK_UUID set: Updates existing webhook") + print(" - If DAILY_WEBHOOK_UUID set: Deletes old webhook, creates new one") print( " - If DAILY_WEBHOOK_UUID empty: Creates new webhook, saves UUID to .env" )