feat: dailyco api module (#725)

* dailyco api module (no-mistakes)

* daily co library self-review

* uncurse

* self-review: daily resource leak, uniform types, enable_recording bomb, daily custom error, video_platforms/daily typing, daily timestamp dry

* dailyco docs parser

* remove generated daily docs

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
This commit is contained in:
Igor Monadical
2025-11-21 10:24:04 -05:00
committed by GitHub
parent 3e47c2c057
commit 4287f8b8ae
11 changed files with 1558 additions and 303 deletions

View File

@@ -0,0 +1,6 @@
anything about Daily.co api interaction
- webhook event shapes
- REST api client
No REST api client existing found in the wild; the official lib is about working with videocall as a bot

View File

@@ -0,0 +1,96 @@
"""
Daily.co API Module
"""
# Client
from .client import DailyApiClient, DailyApiError
# Request models
from .requests import (
CreateMeetingTokenRequest,
CreateRoomRequest,
CreateWebhookRequest,
MeetingTokenProperties,
RecordingsBucketConfig,
RoomProperties,
UpdateWebhookRequest,
)
# Response models
from .responses import (
MeetingParticipant,
MeetingParticipantsResponse,
MeetingResponse,
MeetingTokenResponse,
RecordingResponse,
RecordingS3Info,
RoomPresenceParticipant,
RoomPresenceResponse,
RoomResponse,
WebhookResponse,
)
# Webhook utilities
from .webhook_utils import (
extract_room_name,
parse_participant_joined,
parse_participant_left,
parse_recording_error,
parse_recording_ready,
parse_recording_started,
parse_webhook_payload,
verify_webhook_signature,
)
# Webhook models
from .webhooks import (
DailyTrack,
DailyWebhookEvent,
ParticipantJoinedPayload,
ParticipantLeftPayload,
RecordingErrorPayload,
RecordingReadyToDownloadPayload,
RecordingStartedPayload,
)
__all__ = [
# Client
"DailyApiClient",
"DailyApiError",
# Requests
"CreateRoomRequest",
"RoomProperties",
"RecordingsBucketConfig",
"CreateMeetingTokenRequest",
"MeetingTokenProperties",
"CreateWebhookRequest",
"UpdateWebhookRequest",
# Responses
"RoomResponse",
"RoomPresenceResponse",
"RoomPresenceParticipant",
"MeetingParticipantsResponse",
"MeetingParticipant",
"MeetingResponse",
"RecordingResponse",
"RecordingS3Info",
"MeetingTokenResponse",
"WebhookResponse",
# Webhooks
"DailyWebhookEvent",
"DailyTrack",
"ParticipantJoinedPayload",
"ParticipantLeftPayload",
"RecordingStartedPayload",
"RecordingReadyToDownloadPayload",
"RecordingErrorPayload",
# Webhook utilities
"verify_webhook_signature",
"extract_room_name",
"parse_webhook_payload",
"parse_participant_joined",
"parse_participant_left",
"parse_recording_started",
"parse_recording_ready",
"parse_recording_error",
]

View File

@@ -0,0 +1,527 @@
"""
Daily.co API Client
Complete async client for Daily.co REST API with Pydantic models.
Reference: https://docs.daily.co/reference/rest-api
"""
from http import HTTPStatus
from typing import Any
import httpx
import structlog
from reflector.utils.string import NonEmptyString
from .requests import (
CreateMeetingTokenRequest,
CreateRoomRequest,
CreateWebhookRequest,
UpdateWebhookRequest,
)
from .responses import (
MeetingParticipantsResponse,
MeetingResponse,
MeetingTokenResponse,
RecordingResponse,
RoomPresenceResponse,
RoomResponse,
WebhookResponse,
)
logger = structlog.get_logger(__name__)
class DailyApiError(Exception):
"""Daily.co API error with full request/response context."""
def __init__(self, operation: str, response: httpx.Response):
self.operation = operation
self.response = response
self.status_code = response.status_code
self.response_body = response.text
self.url = str(response.url)
self.request_body = (
response.request.content.decode() if response.request.content else None
)
super().__init__(
f"Daily.co API error: {operation} failed with status {self.status_code}"
)
class DailyApiClient:
"""
Complete async client for Daily.co REST API.
Usage:
# Direct usage
client = DailyApiClient(api_key="your_api_key")
room = await client.create_room(CreateRoomRequest(name="my-room"))
await client.close() # Clean up when done
# Context manager (recommended)
async with DailyApiClient(api_key="your_api_key") as client:
room = await client.create_room(CreateRoomRequest(name="my-room"))
"""
BASE_URL = "https://api.daily.co/v1"
DEFAULT_TIMEOUT = 10.0
def __init__(
self,
api_key: NonEmptyString,
webhook_secret: NonEmptyString | None = None,
timeout: float = DEFAULT_TIMEOUT,
base_url: NonEmptyString | None = None,
):
"""
Initialize Daily.co API client.
Args:
api_key: Daily.co API key (Bearer token)
webhook_secret: Base64-encoded HMAC secret for webhook verification.
Must match the 'hmac' value provided when creating webhooks.
Generate with: base64.b64encode(os.urandom(32)).decode()
timeout: Default request timeout in seconds
base_url: Override base URL (for testing)
"""
self.api_key = api_key
self.webhook_secret = webhook_secret
self.timeout = timeout
self.base_url = base_url or self.BASE_URL
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
self._client: httpx.AsyncClient | None = None
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(timeout=self.timeout)
return self._client
async def close(self):
if self._client is not None:
await self._client.aclose()
self._client = None
async def _handle_response(
self, response: httpx.Response, operation: str
) -> dict[str, Any]:
"""
Handle API response with error logging.
Args:
response: HTTP response
operation: Operation name for logging (e.g., "create_room")
Returns:
Parsed JSON response
Raises:
DailyApiError: If request failed with full context
"""
if response.status_code >= 400:
logger.error(
f"Daily.co API error: {operation}",
status_code=response.status_code,
response_body=response.text,
request_body=response.request.content.decode()
if response.request.content
else None,
url=str(response.url),
)
raise DailyApiError(operation, response)
return response.json()
# ============================================================================
# ROOMS
# ============================================================================
async def create_room(self, request: CreateRoomRequest) -> RoomResponse:
"""
Create a new Daily.co room.
Reference: https://docs.daily.co/reference/rest-api/rooms/create-room
Args:
request: Room creation request with name, privacy, and properties
Returns:
Created room data including URL and ID
Raises:
httpx.HTTPStatusError: If API request fails
"""
client = await self._get_client()
response = await client.post(
f"{self.base_url}/rooms",
headers=self.headers,
json=request.model_dump(exclude_none=True),
)
data = await self._handle_response(response, "create_room")
return RoomResponse(**data)
async def get_room(self, room_name: NonEmptyString) -> RoomResponse:
"""
Get room configuration.
Args:
room_name: Daily.co room name
Returns:
Room configuration data
Raises:
httpx.HTTPStatusError: If API request fails
"""
client = await self._get_client()
response = await client.get(
f"{self.base_url}/rooms/{room_name}",
headers=self.headers,
)
data = await self._handle_response(response, "get_room")
return RoomResponse(**data)
async def get_room_presence(
self, room_name: NonEmptyString
) -> RoomPresenceResponse:
"""
Get current participants in a room (real-time presence).
Reference: https://docs.daily.co/reference/rest-api/rooms/get-room-presence
Args:
room_name: Daily.co room name
Returns:
List of currently present participants with join time and duration
Raises:
httpx.HTTPStatusError: If API request fails
"""
client = await self._get_client()
response = await client.get(
f"{self.base_url}/rooms/{room_name}/presence",
headers=self.headers,
)
data = await self._handle_response(response, "get_room_presence")
return RoomPresenceResponse(**data)
async def delete_room(self, room_name: NonEmptyString) -> None:
"""
Delete a room (idempotent - succeeds even if room doesn't exist).
Reference: https://docs.daily.co/reference/rest-api/rooms/delete-room
Args:
room_name: Daily.co room name
Raises:
httpx.HTTPStatusError: If API request fails (except 404)
"""
client = await self._get_client()
response = await client.delete(
f"{self.base_url}/rooms/{room_name}",
headers=self.headers,
)
# Idempotent delete - 404 means already deleted
if response.status_code == HTTPStatus.NOT_FOUND:
logger.debug("Room not found (already deleted)", room_name=room_name)
return
await self._handle_response(response, "delete_room")
# ============================================================================
# MEETINGS
# ============================================================================
async def get_meeting(self, meeting_id: NonEmptyString) -> MeetingResponse:
"""
Get full meeting information including participants.
Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-information
Args:
meeting_id: Daily.co meeting/session ID
Returns:
Meeting metadata including room, duration, participants, and status
Raises:
httpx.HTTPStatusError: If API request fails
"""
client = await self._get_client()
response = await client.get(
f"{self.base_url}/meetings/{meeting_id}",
headers=self.headers,
)
data = await self._handle_response(response, "get_meeting")
return MeetingResponse(**data)
async def get_meeting_participants(
self,
meeting_id: NonEmptyString,
limit: int | None = None,
joined_after: NonEmptyString | None = None,
joined_before: NonEmptyString | None = None,
) -> MeetingParticipantsResponse:
"""
Get historical participant data from a completed meeting (paginated).
Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-participants
Args:
meeting_id: Daily.co meeting/session ID
limit: Maximum number of participant records to return
joined_after: Return participants who joined after this participant_id
joined_before: Return participants who joined before this participant_id
Returns:
List of participants with join times and duration
Raises:
httpx.HTTPStatusError: If API request fails (404 when no more participants)
Note:
For pagination, use joined_after with the last participant_id from previous response.
Returns 404 when no more participants remain.
"""
params = {}
if limit is not None:
params["limit"] = limit
if joined_after is not None:
params["joined_after"] = joined_after
if joined_before is not None:
params["joined_before"] = joined_before
client = await self._get_client()
response = await client.get(
f"{self.base_url}/meetings/{meeting_id}/participants",
headers=self.headers,
params=params,
)
data = await self._handle_response(response, "get_meeting_participants")
return MeetingParticipantsResponse(**data)
# ============================================================================
# RECORDINGS
# ============================================================================
async def get_recording(self, recording_id: NonEmptyString) -> RecordingResponse:
"""
Get recording metadata and status.
Reference: https://docs.daily.co/reference/rest-api/recordings
Args:
recording_id: Daily.co recording ID
Returns:
Recording metadata including status, duration, and S3 info
Raises:
httpx.HTTPStatusError: If API request fails
"""
client = await self._get_client()
response = await client.get(
f"{self.base_url}/recordings/{recording_id}",
headers=self.headers,
)
data = await self._handle_response(response, "get_recording")
return RecordingResponse(**data)
# ============================================================================
# MEETING TOKENS
# ============================================================================
async def create_meeting_token(
self, request: CreateMeetingTokenRequest
) -> MeetingTokenResponse:
"""
Create a meeting token for participant authentication.
Reference: https://docs.daily.co/reference/rest-api/meeting-tokens/create-meeting-token
Args:
request: Token properties including room name, user_id, permissions
Returns:
JWT meeting token
Raises:
httpx.HTTPStatusError: If API request fails
"""
client = await self._get_client()
response = await client.post(
f"{self.base_url}/meeting-tokens",
headers=self.headers,
json=request.model_dump(exclude_none=True),
)
data = await self._handle_response(response, "create_meeting_token")
return MeetingTokenResponse(**data)
# ============================================================================
# WEBHOOKS
# ============================================================================
async def list_webhooks(self) -> list[WebhookResponse]:
"""
List all configured webhooks for this account.
Reference: https://docs.daily.co/reference/rest-api/webhooks
Returns:
List of webhook configurations
Raises:
httpx.HTTPStatusError: If API request fails
"""
client = await self._get_client()
response = await client.get(
f"{self.base_url}/webhooks",
headers=self.headers,
)
data = await self._handle_response(response, "list_webhooks")
# Daily.co returns array directly (not paginated)
if isinstance(data, list):
return [WebhookResponse(**wh) for wh in data]
# Future-proof: handle potential pagination envelope
if isinstance(data, dict) and "data" in data:
return [WebhookResponse(**wh) for wh in data["data"]]
logger.warning("Unexpected webhook list response format", data=data)
return []
async def create_webhook(self, request: CreateWebhookRequest) -> WebhookResponse:
"""
Create a new webhook subscription.
Reference: https://docs.daily.co/reference/rest-api/webhooks
Args:
request: Webhook configuration with URL, event types, and HMAC secret
Returns:
Created webhook with UUID and state
Raises:
httpx.HTTPStatusError: If API request fails
"""
client = await self._get_client()
response = await client.post(
f"{self.base_url}/webhooks",
headers=self.headers,
json=request.model_dump(exclude_none=True),
)
data = await self._handle_response(response, "create_webhook")
return WebhookResponse(**data)
async def update_webhook(
self, webhook_uuid: NonEmptyString, request: UpdateWebhookRequest
) -> WebhookResponse:
"""
Update webhook configuration.
Note: Daily.co may not support PATCH for all fields.
Common pattern is delete + recreate.
Reference: https://docs.daily.co/reference/rest-api/webhooks
Args:
webhook_uuid: Webhook UUID to update
request: Updated webhook configuration
Returns:
Updated webhook configuration
Raises:
httpx.HTTPStatusError: If API request fails
"""
client = await self._get_client()
response = await client.patch(
f"{self.base_url}/webhooks/{webhook_uuid}",
headers=self.headers,
json=request.model_dump(exclude_none=True),
)
data = await self._handle_response(response, "update_webhook")
return WebhookResponse(**data)
async def delete_webhook(self, webhook_uuid: NonEmptyString) -> None:
"""
Delete a webhook.
Reference: https://docs.daily.co/reference/rest-api/webhooks
Args:
webhook_uuid: Webhook UUID to delete
Raises:
httpx.HTTPStatusError: If webhook not found or deletion fails
"""
client = await self._get_client()
response = await client.delete(
f"{self.base_url}/webhooks/{webhook_uuid}",
headers=self.headers,
)
await self._handle_response(response, "delete_webhook")
# ============================================================================
# HELPER METHODS
# ============================================================================
async def find_webhook_by_url(self, url: NonEmptyString) -> WebhookResponse | None:
"""
Find a webhook by its URL.
Args:
url: Webhook endpoint URL to search for
Returns:
Webhook if found, None otherwise
"""
webhooks = await self.list_webhooks()
for webhook in webhooks:
if webhook.url == url:
return webhook
return None
async def find_webhooks_by_pattern(
self, pattern: NonEmptyString
) -> list[WebhookResponse]:
"""
Find webhooks matching a URL pattern (e.g., 'ngrok').
Args:
pattern: String to match in webhook URLs
Returns:
List of matching webhooks
"""
webhooks = await self.list_webhooks()
return [wh for wh in webhooks if pattern in wh.url]

View File

@@ -0,0 +1,158 @@
"""
Daily.co API Request Models
Reference: https://docs.daily.co/reference/rest-api
"""
from typing import List, Literal
from pydantic import BaseModel, Field
from reflector.utils.string import NonEmptyString
class RecordingsBucketConfig(BaseModel):
"""
S3 bucket configuration for raw-tracks recordings.
Reference: https://docs.daily.co/reference/rest-api/rooms/create-room
"""
bucket_name: NonEmptyString = Field(description="S3 bucket name")
bucket_region: NonEmptyString = Field(description="AWS region (e.g., 'us-east-1')")
assume_role_arn: NonEmptyString = Field(
description="AWS IAM role ARN that Daily.co will assume to write recordings"
)
allow_api_access: bool = Field(
default=True,
description="Whether to allow API access to recording metadata",
)
class RoomProperties(BaseModel):
"""
Room configuration properties.
"""
enable_recording: Literal["cloud", "local", "raw-tracks"] | None = Field(
default=None,
description="Recording mode: 'cloud' for mixed, 'local' for local recording, 'raw-tracks' for multitrack, None to disable",
)
enable_chat: bool = Field(default=True, description="Enable in-meeting chat")
enable_screenshare: bool = Field(default=True, description="Enable screen sharing")
start_video_off: bool = Field(
default=False, description="Start with video off for all participants"
)
start_audio_off: bool = Field(
default=False, description="Start with audio muted for all participants"
)
exp: int | None = Field(
None, description="Room expiration timestamp (Unix epoch seconds)"
)
recordings_bucket: RecordingsBucketConfig | None = Field(
None, description="S3 bucket configuration for raw-tracks recordings"
)
class CreateRoomRequest(BaseModel):
"""
Request to create a new Daily.co room.
Reference: https://docs.daily.co/reference/rest-api/rooms/create-room
"""
name: NonEmptyString = Field(description="Room name (must be unique within domain)")
privacy: Literal["public", "private"] = Field(
default="public", description="Room privacy setting"
)
properties: RoomProperties = Field(
default_factory=RoomProperties, description="Room configuration properties"
)
class MeetingTokenProperties(BaseModel):
"""
Properties for meeting token creation.
Reference: https://docs.daily.co/reference/rest-api/meeting-tokens/create-meeting-token
"""
room_name: NonEmptyString = Field(description="Room name this token is valid for")
user_id: NonEmptyString | None = Field(
None, description="User identifier to associate with token"
)
is_owner: bool = Field(
default=False, description="Grant owner privileges to token holder"
)
start_cloud_recording: bool = Field(
default=False, description="Automatically start cloud recording on join"
)
enable_recording_ui: bool = Field(
default=True, description="Show recording controls in UI"
)
eject_at_token_exp: bool = Field(
default=False, description="Eject participant when token expires"
)
nbf: int | None = Field(
None, description="Not-before timestamp (Unix epoch seconds)"
)
exp: int | None = Field(
None, description="Expiration timestamp (Unix epoch seconds)"
)
class CreateMeetingTokenRequest(BaseModel):
"""
Request to create a meeting token for participant authentication.
Reference: https://docs.daily.co/reference/rest-api/meeting-tokens/create-meeting-token
"""
properties: MeetingTokenProperties = Field(description="Token properties")
class CreateWebhookRequest(BaseModel):
"""
Request to create a webhook subscription.
Reference: https://docs.daily.co/reference/rest-api/webhooks
"""
url: NonEmptyString = Field(description="Webhook endpoint URL (must be HTTPS)")
eventTypes: List[
Literal[
"participant.joined",
"participant.left",
"recording.started",
"recording.ready-to-download",
"recording.error",
]
] = Field(
description="Array of event types to subscribe to (only events we handle)"
)
hmac: NonEmptyString = Field(
description="Base64-encoded HMAC secret for webhook signature verification"
)
basicAuth: NonEmptyString | None = Field(
None, description="Optional basic auth credentials for webhook endpoint"
)
class UpdateWebhookRequest(BaseModel):
"""
Request to update an existing webhook.
Note: Daily.co API may not support PATCH for webhooks.
Common pattern is to delete and recreate.
Reference: https://docs.daily.co/reference/rest-api/webhooks
"""
url: NonEmptyString | None = Field(None, description="New webhook endpoint URL")
eventTypes: List[NonEmptyString] | None = Field(
None, description="New array of event types"
)
hmac: NonEmptyString | None = Field(None, description="New HMAC secret")
basicAuth: NonEmptyString | None = Field(
None, description="New basic auth credentials"
)

View File

@@ -0,0 +1,182 @@
"""
Daily.co API Response Models
"""
from typing import Any, Dict, List, Literal
from pydantic import BaseModel, Field
from reflector.utils.string import NonEmptyString
# not documented in daily; we fill it according to observations
RecordingStatus = Literal["in-progress", "finished"]
class RoomResponse(BaseModel):
"""
Response from room creation or retrieval.
Reference: https://docs.daily.co/reference/rest-api/rooms/create-room
"""
id: NonEmptyString = Field(description="Unique room identifier (UUID)")
name: NonEmptyString = Field(description="Room name used in URLs")
api_created: bool = Field(description="Whether room was created via API")
privacy: Literal["public", "private"] = Field(description="Room privacy setting")
url: NonEmptyString = Field(description="Full room URL")
created_at: NonEmptyString = Field(description="ISO 8601 creation timestamp")
config: Dict[NonEmptyString, Any] = Field(
default_factory=dict, description="Room configuration properties"
)
class RoomPresenceParticipant(BaseModel):
"""
Participant presence information in a room.
Reference: https://docs.daily.co/reference/rest-api/rooms/get-room-presence
"""
room: NonEmptyString = Field(description="Room name")
id: NonEmptyString = Field(description="Participant session ID")
userId: NonEmptyString | None = Field(None, description="User ID if provided")
userName: NonEmptyString | None = Field(None, description="User display name")
joinTime: NonEmptyString = Field(description="ISO 8601 join timestamp")
duration: int = Field(description="Duration in room (seconds)")
class RoomPresenceResponse(BaseModel):
"""
Response from room presence endpoint.
Reference: https://docs.daily.co/reference/rest-api/rooms/get-room-presence
"""
total_count: int = Field(
description="Total number of participants currently in room"
)
data: List[RoomPresenceParticipant] = Field(
default_factory=list, description="Array of participant presence data"
)
class MeetingParticipant(BaseModel):
"""
Historical participant data from a meeting.
Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-participants
"""
user_id: NonEmptyString = Field(description="User identifier")
participant_id: NonEmptyString = Field(description="Participant session identifier")
user_name: NonEmptyString | None = Field(None, description="User display name")
join_time: int = Field(description="Join timestamp (Unix epoch seconds)")
duration: int = Field(description="Duration in meeting (seconds)")
class MeetingParticipantsResponse(BaseModel):
"""
Response from meeting participants endpoint.
Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-participants
"""
data: List[MeetingParticipant] = Field(
default_factory=list, description="Array of participant data"
)
class MeetingResponse(BaseModel):
"""
Response from meeting information endpoint.
Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-information
"""
id: NonEmptyString = Field(description="Meeting session identifier (UUID)")
room: NonEmptyString = Field(description="Room name where meeting occurred")
start_time: int = Field(
description="Meeting start Unix timestamp (~15s granularity)"
)
duration: int = Field(description="Total meeting duration in seconds")
ongoing: bool = Field(description="Whether meeting is currently active")
max_participants: int = Field(description="Peak concurrent participant count")
participants: List[MeetingParticipant] = Field(
default_factory=list, description="Array of participant session data"
)
class RecordingS3Info(BaseModel):
"""
S3 bucket information for a recording.
Reference: https://docs.daily.co/reference/rest-api/recordings
"""
bucket_name: NonEmptyString
bucket_region: NonEmptyString
endpoint: NonEmptyString | None = None
class RecordingResponse(BaseModel):
"""
Response from recording retrieval endpoint.
Reference: https://docs.daily.co/reference/rest-api/recordings
"""
id: NonEmptyString = Field(description="Recording identifier")
room_name: NonEmptyString = Field(description="Room where recording occurred")
start_ts: int = Field(description="Recording start timestamp (Unix epoch seconds)")
status: RecordingStatus = Field(
description="Recording status ('in-progress' or 'finished')"
)
max_participants: int = Field(description="Maximum participants during recording")
duration: int = Field(description="Recording duration in seconds")
share_token: NonEmptyString | None = Field(
None, description="Token for sharing recording"
)
s3: RecordingS3Info | None = Field(None, description="S3 bucket information")
class MeetingTokenResponse(BaseModel):
"""
Response from meeting token creation.
Reference: https://docs.daily.co/reference/rest-api/meeting-tokens/create-meeting-token
"""
token: NonEmptyString = Field(
description="JWT meeting token for participant authentication"
)
class WebhookResponse(BaseModel):
"""
Response from webhook creation or retrieval.
Reference: https://docs.daily.co/reference/rest-api/webhooks
"""
uuid: NonEmptyString = Field(description="Unique webhook identifier")
url: NonEmptyString = Field(description="Webhook endpoint URL")
hmac: NonEmptyString | None = Field(
None, description="Base64-encoded HMAC secret for signature verification"
)
basicAuth: NonEmptyString | None = Field(
None, description="Basic auth credentials if configured"
)
eventTypes: List[NonEmptyString] = Field(
default_factory=list,
description="Array of event types (e.g., ['recording.started', 'participant.joined'])",
)
state: Literal["ACTIVE", "FAILED"] = Field(
description="Webhook state - FAILED after 3+ consecutive failures"
)
failedCount: int = Field(default=0, description="Number of consecutive failures")
lastMomentPushed: NonEmptyString | None = Field(
None, description="ISO 8601 timestamp of last successful push"
)
domainId: NonEmptyString = Field(description="Daily.co domain/account identifier")
createdAt: NonEmptyString = Field(description="ISO 8601 creation timestamp")
updatedAt: NonEmptyString = Field(description="ISO 8601 last update timestamp")

View File

@@ -0,0 +1,229 @@
"""
Daily.co Webhook Utilities
Utilities for verifying and parsing Daily.co webhook events.
Reference: https://docs.daily.co/reference/rest-api/webhooks
"""
import base64
import hmac
from hashlib import sha256
import structlog
from .webhooks import (
DailyWebhookEvent,
ParticipantJoinedPayload,
ParticipantLeftPayload,
RecordingErrorPayload,
RecordingReadyToDownloadPayload,
RecordingStartedPayload,
)
logger = structlog.get_logger(__name__)
def verify_webhook_signature(
body: bytes,
signature: str,
timestamp: str,
webhook_secret: str,
) -> bool:
"""
Verify Daily.co webhook signature using HMAC-SHA256.
Daily.co signature verification:
1. Base64-decode the webhook secret
2. Create signed content: timestamp + '.' + body
3. Compute HMAC-SHA256(secret, signed_content)
4. Base64-encode the result
5. Compare with provided signature using constant-time comparison
Reference: https://docs.daily.co/reference/rest-api/webhooks
Args:
body: Raw request body bytes
signature: X-Webhook-Signature header value
timestamp: X-Webhook-Timestamp header value
webhook_secret: Base64-encoded HMAC secret
Returns:
True if signature is valid, False otherwise
Example:
>>> body = b'{"version":"1.0.0","type":"participant.joined",...}'
>>> signature = "abc123..."
>>> timestamp = "1234567890"
>>> secret = "your-base64-secret"
>>> is_valid = verify_webhook_signature(body, signature, timestamp, secret)
"""
if not signature or not timestamp or not webhook_secret:
logger.warning(
"Missing required data for webhook verification",
has_signature=bool(signature),
has_timestamp=bool(timestamp),
has_secret=bool(webhook_secret),
)
return False
try:
secret_bytes = base64.b64decode(webhook_secret)
signed_content = timestamp.encode() + b"." + body
expected = hmac.new(secret_bytes, signed_content, sha256).digest()
expected_b64 = base64.b64encode(expected).decode()
# Constant-time comparison to prevent timing attacks
return hmac.compare_digest(expected_b64, signature)
except (base64.binascii.Error, ValueError, TypeError, UnicodeDecodeError) as e:
logger.error(
"Webhook signature verification failed",
error=str(e),
error_type=type(e).__name__,
)
return False
def extract_room_name(event: DailyWebhookEvent) -> str | None:
"""
Extract room name from Daily.co webhook event payload.
Args:
event: Parsed webhook event
Returns:
Room name if present and is a string, None otherwise
Example:
>>> event = DailyWebhookEvent(**webhook_payload)
>>> room_name = extract_room_name(event)
"""
room = event.payload.get("room_name")
# Ensure we return a string, not any falsy value that might be in payload
return room if isinstance(room, str) else None
def parse_participant_joined(event: DailyWebhookEvent) -> ParticipantJoinedPayload:
"""
Parse participant.joined webhook event payload.
Args:
event: Webhook event with type "participant.joined"
Returns:
Parsed participant joined payload
Raises:
pydantic.ValidationError: If payload doesn't match expected schema
"""
return ParticipantJoinedPayload(**event.payload)
def parse_participant_left(event: DailyWebhookEvent) -> ParticipantLeftPayload:
"""
Parse participant.left webhook event payload.
Args:
event: Webhook event with type "participant.left"
Returns:
Parsed participant left payload
Raises:
pydantic.ValidationError: If payload doesn't match expected schema
"""
return ParticipantLeftPayload(**event.payload)
def parse_recording_started(event: DailyWebhookEvent) -> RecordingStartedPayload:
"""
Parse recording.started webhook event payload.
Args:
event: Webhook event with type "recording.started"
Returns:
Parsed recording started payload
Raises:
pydantic.ValidationError: If payload doesn't match expected schema
"""
return RecordingStartedPayload(**event.payload)
def parse_recording_ready(
event: DailyWebhookEvent,
) -> RecordingReadyToDownloadPayload:
"""
Parse recording.ready-to-download webhook event payload.
This event is sent when raw-tracks recordings are complete and uploaded to S3.
The payload includes a 'tracks' array with individual audio/video files.
Args:
event: Webhook event with type "recording.ready-to-download"
Returns:
Parsed recording ready payload with tracks array
Raises:
pydantic.ValidationError: If payload doesn't match expected schema
Example:
>>> event = DailyWebhookEvent(**webhook_payload)
>>> if event.type == "recording.ready-to-download":
... payload = parse_recording_ready(event)
... audio_tracks = [t for t in payload.tracks if t.type == "audio"]
"""
return RecordingReadyToDownloadPayload(**event.payload)
def parse_recording_error(event: DailyWebhookEvent) -> RecordingErrorPayload:
"""
Parse recording.error webhook event payload.
Args:
event: Webhook event with type "recording.error"
Returns:
Parsed recording error payload
Raises:
pydantic.ValidationError: If payload doesn't match expected schema
"""
return RecordingErrorPayload(**event.payload)
# Webhook event type to parser mapping
WEBHOOK_PARSERS = {
"participant.joined": parse_participant_joined,
"participant.left": parse_participant_left,
"recording.started": parse_recording_started,
"recording.ready-to-download": parse_recording_ready,
"recording.error": parse_recording_error,
}
def parse_webhook_payload(event: DailyWebhookEvent):
"""
Parse webhook event payload based on event type.
Args:
event: Webhook event
Returns:
Typed payload model based on event type, or raw dict if unknown
Example:
>>> event = DailyWebhookEvent(**webhook_payload)
>>> payload = parse_webhook_payload(event)
>>> if isinstance(payload, ParticipantJoinedPayload):
... print(f"User {payload.user_name} joined")
"""
parser = WEBHOOK_PARSERS.get(event.type)
if parser:
return parser(event)
else:
logger.warning("Unknown webhook event type", event_type=event.type)
return event.payload

View File

@@ -0,0 +1,199 @@
"""
Daily.co Webhook Event Models
Reference: https://docs.daily.co/reference/rest-api/webhooks
"""
from typing import Any, Dict, Literal
from pydantic import BaseModel, Field, field_validator
from reflector.utils.string import NonEmptyString
def normalize_timestamp_to_int(v):
"""
Normalize float timestamps to int by truncating decimal part.
Daily.co sometimes sends timestamps as floats (e.g., 1708972279.96).
Pydantic expects int for fields typed as `int`.
"""
if v is None:
return v
if isinstance(v, float):
return int(v)
return v
WebhookEventType = Literal[
"participant.joined",
"participant.left",
"recording.started",
"recording.ready-to-download",
"recording.error",
]
class DailyTrack(BaseModel):
"""
Individual audio or video track from a multitrack recording.
Reference: https://docs.daily.co/reference/rest-api/recordings
"""
type: Literal["audio", "video"]
s3Key: NonEmptyString = Field(description="S3 object key for the track file")
size: int = Field(description="File size in bytes")
class DailyWebhookEvent(BaseModel):
"""
Base structure for all Daily.co webhook events.
All events share five common fields documented below.
Reference: https://docs.daily.co/reference/rest-api/webhooks
"""
version: NonEmptyString = Field(
description="Represents the version of the event. This uses semantic versioning to inform a consumer if the payload has introduced any breaking changes"
)
type: WebhookEventType = Field(
description="Represents the type of the event described in the payload"
)
id: NonEmptyString = Field(
description="An identifier representing this specific event"
)
payload: Dict[NonEmptyString, Any] = Field(
description="An object representing the event, whose fields are described in the corresponding payload class"
)
event_ts: int = Field(
description="Documenting when the webhook itself was sent. This timestamp is different than the time of the event the webhook describes. For example, a recording.started event will contain a start_ts timestamp of when the actual recording started, and a slightly later event_ts timestamp indicating when the webhook event was sent"
)
_normalize_event_ts = field_validator("event_ts", mode="before")(
normalize_timestamp_to_int
)
class ParticipantJoinedPayload(BaseModel):
"""
Payload for participant.joined webhook event.
Reference: https://docs.daily.co/reference/rest-api/webhooks/events/participant-joined
"""
room_name: NonEmptyString | None = Field(None, description="Daily.co room name")
session_id: NonEmptyString = Field(description="Daily.co session identifier")
user_id: NonEmptyString = Field(description="User identifier (may be encoded)")
user_name: NonEmptyString | None = Field(None, description="User display name")
joined_at: int = Field(description="Join timestamp in Unix epoch seconds")
_normalize_joined_at = field_validator("joined_at", mode="before")(
normalize_timestamp_to_int
)
class ParticipantLeftPayload(BaseModel):
"""
Payload for participant.left webhook event.
Reference: https://docs.daily.co/reference/rest-api/webhooks/events/participant-left
"""
room_name: NonEmptyString | None = Field(None, description="Daily.co room name")
session_id: NonEmptyString = Field(description="Daily.co session identifier")
user_id: NonEmptyString = Field(description="User identifier (may be encoded)")
user_name: NonEmptyString | None = Field(None, description="User display name")
joined_at: int = Field(description="Join timestamp in Unix epoch seconds")
duration: int | None = Field(
None, description="Duration of participation in seconds"
)
_normalize_joined_at = field_validator("joined_at", mode="before")(
normalize_timestamp_to_int
)
class RecordingStartedPayload(BaseModel):
"""
Payload for recording.started webhook event.
Reference: https://docs.daily.co/reference/rest-api/webhooks/events/recording-started
"""
room_name: NonEmptyString | None = Field(None, description="Daily.co room name")
recording_id: NonEmptyString = Field(description="Recording identifier")
start_ts: int | None = Field(None, description="Recording start timestamp")
_normalize_start_ts = field_validator("start_ts", mode="before")(
normalize_timestamp_to_int
)
class RecordingReadyToDownloadPayload(BaseModel):
"""
Payload for recording.ready-to-download webhook event.
This is sent when raw-tracks recordings are complete and uploaded to S3.
Reference: https://docs.daily.co/reference/rest-api/webhooks/events/recording-ready-to-download
"""
type: Literal["cloud", "raw-tracks"] = Field(
description="The type of recording that was generated"
)
recording_id: NonEmptyString = Field(
description="An ID identifying the recording that was generated"
)
room_name: NonEmptyString = Field(
description="The name of the room where the recording was made"
)
start_ts: int = Field(
description="The Unix epoch time in seconds representing when the recording started"
)
status: Literal["finished"] = Field(
description="The status of the given recording (always 'finished' in ready-to-download webhook, see RecordingStatus in responses.py for full API statuses)"
)
max_participants: int = Field(
description="The number of participants on the call that were recorded"
)
duration: int = Field(description="The duration in seconds of the call")
s3_key: NonEmptyString = Field(
description="The location of the recording in the provided S3 bucket"
)
share_token: NonEmptyString | None = Field(
None, description="undocumented documented secret field"
)
tracks: list[DailyTrack] | None = Field(
None,
description="If the recording is a raw-tracks recording, a tracks field will be provided. If role permissions have been removed, the tracks field may be null",
)
_normalize_start_ts = field_validator("start_ts", mode="before")(
normalize_timestamp_to_int
)
class RecordingErrorPayload(BaseModel):
"""
Payload for recording.error webhook event.
Reference: https://docs.daily.co/reference/rest-api/webhooks/events/recording-error
"""
action: Literal["clourd-recording-err", "cloud-recording-error"] = Field(
description="A string describing the event that was emitted (both variants are documented)"
)
error_msg: NonEmptyString = Field(description="The error message returned")
instance_id: NonEmptyString = Field(
description="The recording instance ID that was passed into the start recording command"
)
room_name: NonEmptyString = Field(
description="The name of the room where the recording was made"
)
timestamp: int = Field(
description="The Unix epoch time in seconds representing when the error was emitted"
)
_normalize_timestamp = field_validator("timestamp", mode="before")(
normalize_timestamp_to_int
)

View File

@@ -1,12 +1,17 @@
import base64
import hmac
from datetime import datetime from datetime import datetime
from hashlib import sha256
from http import HTTPStatus
from typing import Any, Dict, Optional
import httpx
from reflector.dailyco_api import (
CreateMeetingTokenRequest,
CreateRoomRequest,
DailyApiClient,
MeetingParticipantsResponse,
MeetingTokenProperties,
RecordingResponse,
RecordingsBucketConfig,
RoomPresenceResponse,
RoomProperties,
verify_webhook_signature,
)
from reflector.db.daily_participant_sessions import ( from reflector.db.daily_participant_sessions import (
daily_participant_sessions_controller, daily_participant_sessions_controller,
) )
@@ -23,18 +28,17 @@ from .models import MeetingData, RecordingType, SessionData, VideoPlatformConfig
class DailyClient(VideoPlatformClient): class DailyClient(VideoPlatformClient):
PLATFORM_NAME: Platform = "daily" PLATFORM_NAME: Platform = "daily"
TIMEOUT = 10
BASE_URL = "https://api.daily.co/v1"
TIMESTAMP_FORMAT = "%Y%m%d%H%M%S" TIMESTAMP_FORMAT = "%Y%m%d%H%M%S"
RECORDING_NONE: RecordingType = "none" RECORDING_NONE: RecordingType = "none"
RECORDING_CLOUD: RecordingType = "cloud" RECORDING_CLOUD: RecordingType = "cloud"
def __init__(self, config: VideoPlatformConfig): def __init__(self, config: VideoPlatformConfig):
super().__init__(config) super().__init__(config)
self.headers = { self._api_client = DailyApiClient(
"Authorization": f"Bearer {config.api_key}", api_key=config.api_key,
"Content-Type": "application/json", webhook_secret=config.webhook_secret,
} timeout=10.0,
)
async def create_meeting( async def create_meeting(
self, room_name_prefix: NonEmptyString, end_date: datetime, room: Room self, room_name_prefix: NonEmptyString, end_date: datetime, room: Room
@@ -49,57 +53,43 @@ class DailyClient(VideoPlatformClient):
timestamp = datetime.now().strftime(self.TIMESTAMP_FORMAT) timestamp = datetime.now().strftime(self.TIMESTAMP_FORMAT)
room_name = f"{room_name_prefix}{ROOM_PREFIX_SEPARATOR}{timestamp}" room_name = f"{room_name_prefix}{ROOM_PREFIX_SEPARATOR}{timestamp}"
data = { properties = RoomProperties(
"name": room_name, enable_recording="raw-tracks"
"privacy": "private" if room.is_locked else "public", if room.recording_type != self.RECORDING_NONE
"properties": { else False,
"enable_recording": "raw-tracks" enable_chat=True,
if room.recording_type != self.RECORDING_NONE enable_screenshare=True,
else False, start_video_off=False,
"enable_chat": True, start_audio_off=False,
"enable_screenshare": True, exp=int(end_date.timestamp()),
"start_video_off": False, )
"start_audio_off": False,
"exp": int(end_date.timestamp()),
},
}
# Only configure recordings_bucket if recording is enabled # Only configure recordings_bucket if recording is enabled
if room.recording_type != self.RECORDING_NONE: if room.recording_type != self.RECORDING_NONE:
daily_storage = get_dailyco_storage() daily_storage = get_dailyco_storage()
assert daily_storage.bucket_name, "S3 bucket must be configured" assert daily_storage.bucket_name, "S3 bucket must be configured"
data["properties"]["recordings_bucket"] = { properties.recordings_bucket = RecordingsBucketConfig(
"bucket_name": daily_storage.bucket_name, bucket_name=daily_storage.bucket_name,
"bucket_region": daily_storage.region, bucket_region=daily_storage.region,
"assume_role_arn": daily_storage.role_credential, assume_role_arn=daily_storage.role_credential,
"allow_api_access": True, allow_api_access=True,
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/rooms",
headers=self.headers,
json=data,
timeout=self.TIMEOUT,
) )
if response.status_code >= 400:
logger.error(
"Daily.co API error",
status_code=response.status_code,
response_body=response.text,
request_data=data,
)
response.raise_for_status()
result = response.json()
room_url = result["url"] request = CreateRoomRequest(
name=room_name,
privacy="private" if room.is_locked else "public",
properties=properties,
)
result = await self._api_client.create_room(request)
return MeetingData( return MeetingData(
meeting_id=result["id"], meeting_id=result.id,
room_name=result["name"], room_name=result.name,
room_url=room_url, room_url=result.url,
host_room_url=room_url, host_room_url=result.url,
platform=self.PLATFORM_NAME, platform=self.PLATFORM_NAME,
extra_data=result, extra_data=result.model_dump(),
) )
async def get_room_sessions(self, room_name: str) -> list[SessionData]: async def get_room_sessions(self, room_name: str) -> list[SessionData]:
@@ -108,7 +98,7 @@ class DailyClient(VideoPlatformClient):
Daily.co doesn't provide historical session API, so we query our database Daily.co doesn't provide historical session API, so we query our database
where participant.joined/left webhooks are stored. where participant.joined/left webhooks are stored.
""" """
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meetings_controller # noqa: PLC0415
meeting = await meetings_controller.get_by_room_name(room_name) meeting = await meetings_controller.get_by_room_name(room_name)
if not meeting: if not meeting:
@@ -127,135 +117,65 @@ class DailyClient(VideoPlatformClient):
for s in sessions for s in sessions
] ]
async def get_room_presence(self, room_name: str) -> Dict[str, Any]: async def get_room_presence(self, room_name: str) -> RoomPresenceResponse:
"""Get room presence/session data for a Daily.co room. """Get room presence/session data for a Daily.co room."""
return await self._api_client.get_room_presence(room_name)
Example response: async def get_meeting_participants(
{ self, meeting_id: str
"total_count": 1, ) -> MeetingParticipantsResponse:
"data": [ """Get participant data for a specific Daily.co meeting."""
{ return await self._api_client.get_meeting_participants(meeting_id)
"room": "w2pp2cf4kltgFACPKXmX",
"id": "d61cd7b2-a273-42b4-89bd-be763fd562c1",
"userId": "pbZ+ismP7dk=",
"userName": "Moishe",
"joinTime": "2023-01-01T20:53:19.000Z",
"duration": 2312
}
]
}
"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/rooms/{room_name}/presence",
headers=self.headers,
timeout=self.TIMEOUT,
)
response.raise_for_status()
return response.json()
async def get_meeting_participants(self, meeting_id: str) -> Dict[str, Any]: async def get_recording(self, recording_id: str) -> RecordingResponse:
"""Get participant data for a specific Daily.co meeting. return await self._api_client.get_recording(recording_id)
Example response:
{
"data": [
{
"user_id": "4q47OTmqa/w=",
"participant_id": "d61cd7b2-a273-42b4-89bd-be763fd562c1",
"user_name": "Lindsey",
"join_time": 1672786813,
"duration": 150
},
{
"user_id": "pbZ+ismP7dk=",
"participant_id": "b3d56359-14d7-46af-ac8b-18f8c991f5f6",
"user_name": "Moishe",
"join_time": 1672786797,
"duration": 165
}
]
}
"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/meetings/{meeting_id}/participants",
headers=self.headers,
timeout=self.TIMEOUT,
)
response.raise_for_status()
return response.json()
async def get_recording(self, recording_id: str) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/recordings/{recording_id}",
headers=self.headers,
timeout=self.TIMEOUT,
)
response.raise_for_status()
return response.json()
async def delete_room(self, room_name: str) -> bool: async def delete_room(self, room_name: str) -> bool:
async with httpx.AsyncClient() as client: """Delete a room (idempotent - succeeds even if room doesn't exist)."""
response = await client.delete( await self._api_client.delete_room(room_name)
f"{self.BASE_URL}/rooms/{room_name}", return True
headers=self.headers,
timeout=self.TIMEOUT,
)
return response.status_code in (HTTPStatus.OK, HTTPStatus.NOT_FOUND)
async def upload_logo(self, room_name: str, logo_path: str) -> bool: async def upload_logo(self, room_name: str, logo_path: str) -> bool:
return True return True
def verify_webhook_signature( def verify_webhook_signature(
self, body: bytes, signature: str, timestamp: Optional[str] = None self, body: bytes, signature: str, timestamp: str | None = None
) -> bool: ) -> bool:
"""Verify Daily.co webhook signature. """Verify Daily.co webhook signature using dailyco_api module."""
if not self.config.webhook_secret:
Daily.co uses: logger.warning("Webhook secret not configured")
- X-Webhook-Signature header
- X-Webhook-Timestamp header
- Signature format: HMAC-SHA256(base64_decode(secret), timestamp + '.' + body)
- Result is base64 encoded
"""
if not signature or not timestamp:
return False return False
try: return verify_webhook_signature(
secret_bytes = base64.b64decode(self.config.webhook_secret) body=body,
signature=signature,
signed_content = timestamp.encode() + b"." + body timestamp=timestamp or "",
webhook_secret=self.config.webhook_secret,
expected = hmac.new(secret_bytes, signed_content, sha256).digest() )
expected_b64 = base64.b64encode(expected).decode()
return hmac.compare_digest(expected_b64, signature)
except Exception as e:
logger.error("Daily.co webhook signature verification failed", exc_info=e)
return False
async def create_meeting_token( async def create_meeting_token(
self, self,
room_name: DailyRoomName, room_name: DailyRoomName,
enable_recording: bool, enable_recording: bool,
user_id: Optional[str] = None, user_id: str | None = None,
) -> str: ) -> str:
data = {"properties": {"room_name": room_name}} properties = MeetingTokenProperties(
room_name=room_name,
user_id=user_id,
start_cloud_recording=enable_recording,
enable_recording_ui=not enable_recording,
)
if enable_recording: request = CreateMeetingTokenRequest(properties=properties)
data["properties"]["start_cloud_recording"] = True result = await self._api_client.create_meeting_token(request)
data["properties"]["enable_recording_ui"] = False return result.token
if user_id: async def close(self):
data["properties"]["user_id"] = user_id """Clean up API client resources."""
await self._api_client.close()
async with httpx.AsyncClient() as client: async def __aenter__(self):
response = await client.post( return self
f"{self.BASE_URL}/meeting-tokens",
headers=self.headers, async def __aexit__(self, exc_type, exc_val, exc_tb):
json=data, await self.close()
timeout=self.TIMEOUT,
)
response.raise_for_status()
return response.json()["token"]

View File

@@ -1,10 +1,14 @@
import json import json
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Any, Dict, Literal
from fastapi import APIRouter, HTTPException, Request from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from reflector.dailyco_api import (
DailyTrack,
DailyWebhookEvent,
extract_room_name,
parse_recording_error,
)
from reflector.db import get_database from reflector.db import get_database
from reflector.db.daily_participant_sessions import ( from reflector.db.daily_participant_sessions import (
DailyParticipantSession, DailyParticipantSession,
@@ -13,7 +17,6 @@ from reflector.db.daily_participant_sessions import (
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meetings_controller
from reflector.logger import logger as _logger from reflector.logger import logger as _logger
from reflector.settings import settings from reflector.settings import settings
from reflector.utils.daily import DailyRoomName
from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import process_multitrack_recording from reflector.worker.process import process_multitrack_recording
@@ -22,30 +25,6 @@ router = APIRouter()
logger = _logger.bind(platform="daily") logger = _logger.bind(platform="daily")
class DailyTrack(BaseModel):
type: Literal["audio", "video"]
s3Key: str
size: int
class DailyWebhookEvent(BaseModel):
version: str
type: str
id: str
payload: Dict[str, Any]
event_ts: float
def _extract_room_name(event: DailyWebhookEvent) -> DailyRoomName | None:
"""Extract room name from Daily event payload.
Daily.co API inconsistency:
- participant.* events use "room" field
- recording.* events use "room_name" field
"""
return event.payload.get("room_name") or event.payload.get("room")
@router.post("/webhook") @router.post("/webhook")
async def webhook(request: Request): async def webhook(request: Request):
"""Handle Daily webhook events. """Handle Daily webhook events.
@@ -77,18 +56,14 @@ async def webhook(request: Request):
client = create_platform_client("daily") client = create_platform_client("daily")
# TEMPORARY: Bypass signature check for testing if not client.verify_webhook_signature(body, signature, timestamp):
# TODO: Remove this after testing is complete logger.warning(
BYPASS_FOR_TESTING = True "Invalid webhook signature",
if not BYPASS_FOR_TESTING: signature=signature,
if not client.verify_webhook_signature(body, signature, timestamp): timestamp=timestamp,
logger.warning( has_body=bool(body),
"Invalid webhook signature", )
signature=signature, raise HTTPException(status_code=401, detail="Invalid webhook signature")
timestamp=timestamp,
has_body=bool(body),
)
raise HTTPException(status_code=401, detail="Invalid webhook signature")
try: try:
body_json = json.loads(body) body_json = json.loads(body)
@@ -99,14 +74,12 @@ async def webhook(request: Request):
logger.info("Received Daily webhook test event") logger.info("Received Daily webhook test event")
return {"status": "ok"} return {"status": "ok"}
# Parse as actual event
try: try:
event = DailyWebhookEvent(**body_json) event = DailyWebhookEvent(**body_json)
except Exception as e: except Exception as e:
logger.error("Failed to parse webhook event", error=str(e), body=body.decode()) logger.error("Failed to parse webhook event", error=str(e), body=body.decode())
raise HTTPException(status_code=422, detail="Invalid event format") raise HTTPException(status_code=422, detail="Invalid event format")
# Handle participant events
if event.type == "participant.joined": if event.type == "participant.joined":
await _handle_participant_joined(event) await _handle_participant_joined(event)
elif event.type == "participant.left": elif event.type == "participant.left":
@@ -154,7 +127,7 @@ async def webhook(request: Request):
async def _handle_participant_joined(event: DailyWebhookEvent): async def _handle_participant_joined(event: DailyWebhookEvent):
daily_room_name = _extract_room_name(event) daily_room_name = extract_room_name(event)
if not daily_room_name: if not daily_room_name:
logger.warning("participant.joined: no room in payload", payload=event.payload) logger.warning("participant.joined: no room in payload", payload=event.payload)
return return
@@ -167,7 +140,6 @@ async def _handle_participant_joined(event: DailyWebhookEvent):
return return
payload = event.payload payload = event.payload
logger.warning({"payload": payload})
joined_at = datetime.fromtimestamp(payload["joined_at"], tz=timezone.utc) joined_at = datetime.fromtimestamp(payload["joined_at"], tz=timezone.utc)
session_id = f"{meeting.id}:{payload['session_id']}" session_id = f"{meeting.id}:{payload['session_id']}"
@@ -225,7 +197,7 @@ async def _handle_participant_joined(event: DailyWebhookEvent):
async def _handle_participant_left(event: DailyWebhookEvent): async def _handle_participant_left(event: DailyWebhookEvent):
room_name = _extract_room_name(event) room_name = extract_room_name(event)
if not room_name: if not room_name:
logger.warning("participant.left: no room in payload", payload=event.payload) logger.warning("participant.left: no room in payload", payload=event.payload)
return return
@@ -268,7 +240,7 @@ async def _handle_participant_left(event: DailyWebhookEvent):
async def _handle_recording_started(event: DailyWebhookEvent): async def _handle_recording_started(event: DailyWebhookEvent):
room_name = _extract_room_name(event) room_name = extract_room_name(event)
if not room_name: if not room_name:
logger.warning( logger.warning(
"recording.started: no room_name in payload", payload=event.payload "recording.started: no room_name in payload", payload=event.payload
@@ -301,7 +273,7 @@ async def _handle_recording_ready(event: DailyWebhookEvent):
] ]
} }
""" """
room_name = _extract_room_name(event) room_name = extract_room_name(event)
recording_id = event.payload.get("recording_id") recording_id = event.payload.get("recording_id")
tracks_raw = event.payload.get("tracks", []) tracks_raw = event.payload.get("tracks", [])
@@ -350,8 +322,8 @@ async def _handle_recording_ready(event: DailyWebhookEvent):
async def _handle_recording_error(event: DailyWebhookEvent): async def _handle_recording_error(event: DailyWebhookEvent):
room_name = _extract_room_name(event) payload = parse_recording_error(event)
error = event.payload.get("error", "Unknown error") room_name = payload.room_name
if room_name: if room_name:
meeting = await meetings_controller.get_by_room_name(room_name) meeting = await meetings_controller.get_by_room_name(room_name)
@@ -360,6 +332,6 @@ async def _handle_recording_error(event: DailyWebhookEvent):
"Recording error", "Recording error",
meeting_id=meeting.id, meeting_id=meeting.id,
room_name=room_name, room_name=room_name,
error=error, error=payload.error_msg,
platform="daily", platform="daily",
) )

View File

@@ -6,53 +6,19 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
import httpx from reflector.dailyco_api import DailyApiClient
from reflector.settings import settings from reflector.settings import settings
async def list_webhooks(): async def list_webhooks():
""" """List all Daily.co webhooks for this account using dailyco_api module."""
List all Daily.co webhooks for this account.
"""
if not settings.DAILY_API_KEY: if not settings.DAILY_API_KEY:
print("Error: DAILY_API_KEY not set") print("Error: DAILY_API_KEY not set")
return 1 return 1
headers = { async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client:
"Authorization": f"Bearer {settings.DAILY_API_KEY}",
"Content-Type": "application/json",
}
async with httpx.AsyncClient() as client:
try: try:
""" webhooks = await client.list_webhooks()
Daily.co webhook list response format:
[
{
"uuid": "0b4e4c7c-5eaf-46fe-990b-a3752f5684f5",
"url": "{{webhook_url}}",
"hmac": "NQrSA5z0FkJ44QPrFerW7uCc5kdNLv3l2FDEKDanL1U=",
"basicAuth": null,
"eventTypes": [
"recording.started",
"recording.ready-to-download"
],
"state": "ACTVIE",
"failedCount": 0,
"lastMomentPushed": "2023-08-15T18:29:52.000Z",
"domainId": "{{domain_id}}",
"createdAt": "2023-08-15T18:28:30.000Z",
"updatedAt": "2023-08-15T18:29:52.000Z"
}
]
"""
resp = await client.get(
"https://api.daily.co/v1/webhooks",
headers=headers,
)
resp.raise_for_status()
webhooks = resp.json()
if not webhooks: if not webhooks:
print("No webhooks found") print("No webhooks found")
@@ -62,12 +28,12 @@ async def list_webhooks():
for webhook in webhooks: for webhook in webhooks:
print("=" * 80) print("=" * 80)
print(f"UUID: {webhook['uuid']}") print(f"UUID: {webhook.uuid}")
print(f"URL: {webhook['url']}") print(f"URL: {webhook.url}")
print(f"State: {webhook['state']}") print(f"State: {webhook.state}")
print(f"Event Types: {', '.join(webhook.get('eventTypes', []))}") print(f"Event Types: {', '.join(webhook.eventTypes)}")
print( print(
f"HMAC Secret: {'✓ Configured' if webhook.get('hmac') else '✗ Not set'}" f"HMAC Secret: {'✓ Configured' if webhook.hmac else '✗ Not set'}"
) )
print() print()
@@ -78,12 +44,8 @@ async def list_webhooks():
return 0 return 0
except httpx.HTTPStatusError as e:
print(f"Error fetching webhooks: {e}")
print(f"Response: {e.response.text}")
return 1
except Exception as e: except Exception as e:
print(f"Unexpected error: {e}") print(f"Error fetching webhooks: {e}")
return 1 return 1

View File

@@ -6,56 +6,60 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent)) sys.path.insert(0, str(Path(__file__).parent.parent))
import httpx from reflector.dailyco_api import (
CreateWebhookRequest,
DailyApiClient,
)
from reflector.settings import settings from reflector.settings import settings
async def setup_webhook(webhook_url: str): async def setup_webhook(webhook_url: str):
""" """
Create or update Daily.co webhook for this environment. Create or update Daily.co webhook for this environment using dailyco_api module.
Uses DAILY_WEBHOOK_UUID to identify existing webhook. Uses DAILY_WEBHOOK_UUID to identify existing webhook.
""" """
if not settings.DAILY_API_KEY: if not settings.DAILY_API_KEY:
print("Error: DAILY_API_KEY not set") print("Error: DAILY_API_KEY not set")
return 1 return 1
headers = { if not settings.DAILY_WEBHOOK_SECRET:
"Authorization": f"Bearer {settings.DAILY_API_KEY}", print("Error: DAILY_WEBHOOK_SECRET not set")
"Content-Type": "application/json", return 1
}
webhook_data = { event_types = [
"url": webhook_url, "participant.joined",
"eventTypes": [ "participant.left",
"participant.joined", "recording.started",
"participant.left", "recording.ready-to-download",
"recording.started", "recording.error",
"recording.ready-to-download", ]
"recording.error",
],
"hmac": settings.DAILY_WEBHOOK_SECRET,
}
async with httpx.AsyncClient() as client: async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client:
webhook_uuid = settings.DAILY_WEBHOOK_UUID webhook_uuid = settings.DAILY_WEBHOOK_UUID
if webhook_uuid: if webhook_uuid:
# Update existing webhook
print(f"Updating existing webhook {webhook_uuid}...") print(f"Updating existing webhook {webhook_uuid}...")
try: try:
resp = await client.patch( # Note: Daily.co doesn't support PATCH well, so we delete + recreate
f"https://api.daily.co/v1/webhooks/{webhook_uuid}", await client.delete_webhook(webhook_uuid)
headers=headers, print(f"Deleted old webhook {webhook_uuid}")
json=webhook_data,
request = CreateWebhookRequest(
url=webhook_url,
eventTypes=event_types,
hmac=settings.DAILY_WEBHOOK_SECRET,
) )
resp.raise_for_status() result = await client.create_webhook(request)
result = resp.json()
print(f"✓ Updated webhook {result['uuid']} (state: {result['state']})") print(
print(f" URL: {result['url']}") f"✓ Created replacement webhook {result.uuid} (state: {result.state})"
return 0 )
except httpx.HTTPStatusError as e: print(f" URL: {result.url}")
if e.response.status_code == 404:
webhook_uuid = result.uuid
except Exception as e:
if hasattr(e, "response") and e.response.status_code == 404:
print(f"Webhook {webhook_uuid} not found, creating new one...") print(f"Webhook {webhook_uuid} not found, creating new one...")
webhook_uuid = None # Fall through to creation webhook_uuid = None # Fall through to creation
else: else:
@@ -63,17 +67,17 @@ async def setup_webhook(webhook_url: str):
return 1 return 1
if not webhook_uuid: if not webhook_uuid:
# Create new webhook
print("Creating new webhook...") print("Creating new webhook...")
resp = await client.post( request = CreateWebhookRequest(
"https://api.daily.co/v1/webhooks", headers=headers, json=webhook_data url=webhook_url,
eventTypes=event_types,
hmac=settings.DAILY_WEBHOOK_SECRET,
) )
resp.raise_for_status() result = await client.create_webhook(request)
result = resp.json() webhook_uuid = result.uuid
webhook_uuid = result["uuid"]
print(f"✓ Created webhook {webhook_uuid} (state: {result['state']})") print(f"✓ Created webhook {webhook_uuid} (state: {result.state})")
print(f" URL: {result['url']}") print(f" URL: {result.url}")
print() print()
print("=" * 60) print("=" * 60)
print("IMPORTANT: Add this to your environment variables:") print("IMPORTANT: Add this to your environment variables:")
@@ -114,7 +118,7 @@ if __name__ == "__main__":
) )
print() print()
print("Behavior:") print("Behavior:")
print(" - If DAILY_WEBHOOK_UUID set: Updates existing webhook") print(" - If DAILY_WEBHOOK_UUID set: Deletes old webhook, creates new one")
print( print(
" - If DAILY_WEBHOOK_UUID empty: Creates new webhook, saves UUID to .env" " - If DAILY_WEBHOOK_UUID empty: Creates new webhook, saves UUID to .env"
) )