chore: remove refactor md (#527)

This commit is contained in:
2025-08-01 16:33:40 -06:00
parent 28ac031ff6
commit 8b644384a2
28 changed files with 3419 additions and 423 deletions

View File

@@ -0,0 +1,41 @@
"""add platform support
Revision ID: 20250801180012
Revises: b0e5f7876032
Create Date: 2025-08-01 18:00:12.000000
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "20250801180012"
down_revision: Union[str, None] = "b0e5f7876032"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Add platform column to rooms table
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column("platform", sa.String(), server_default="whereby", nullable=False)
)
# Add platform column to meeting table
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.add_column(
sa.Column("platform", sa.String(), server_default="whereby", nullable=False)
)
def downgrade() -> None:
# Remove platform columns
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.drop_column("platform")
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("platform")

View File

@@ -12,6 +12,7 @@ from reflector.events import subscribers_shutdown, subscribers_startup
from reflector.logger import logger
from reflector.metrics import metrics_init
from reflector.settings import settings
from reflector.views.daily import router as daily_router
from reflector.views.meetings import router as meetings_router
from reflector.views.rooms import router as rooms_router
from reflector.views.rtc_offer import router as rtc_offer_router
@@ -86,6 +87,7 @@ app.include_router(transcripts_process_router, prefix="/v1")
app.include_router(user_router, prefix="/v1")
app.include_router(zulip_router, prefix="/v1")
app.include_router(whereby_router, prefix="/v1")
app.include_router(daily_router, prefix="/v1")
add_pagination(app)
# prepare celery

View File

@@ -41,6 +41,12 @@ meetings = sa.Table(
nullable=False,
server_default=sa.true(),
),
sa.Column(
"platform",
sa.String,
nullable=False,
server_default="whereby",
),
sa.Index("idx_meeting_room_id", "room_id"),
)
@@ -79,6 +85,7 @@ class Meeting(BaseModel):
"none", "prompt", "automatic", "automatic-2nd-participant"
] = "automatic-2nd-participant"
num_clients: int = 0
platform: Literal["whereby", "daily"] = "whereby"
class MeetingController:
@@ -109,6 +116,7 @@ class MeetingController:
room_mode=room.room_mode,
recording_type=room.recording_type,
recording_trigger=room.recording_trigger,
platform=room.platform,
)
query = meetings.insert().values(**meeting.model_dump())
await database.execute(query)

View File

@@ -40,6 +40,9 @@ rooms = sqlalchemy.Table(
sqlalchemy.Column(
"is_shared", sqlalchemy.Boolean, nullable=False, server_default=false()
),
sqlalchemy.Column(
"platform", sqlalchemy.String, nullable=False, server_default="whereby"
),
sqlalchemy.Index("idx_room_is_shared", "is_shared"),
)
@@ -59,6 +62,7 @@ class Room(BaseModel):
"none", "prompt", "automatic", "automatic-2nd-participant"
] = "automatic-2nd-participant"
is_shared: bool = False
platform: Literal["whereby", "daily"] = "whereby"
class RoomController:
@@ -107,6 +111,7 @@ class RoomController:
recording_type: str,
recording_trigger: str,
is_shared: bool,
platform: str = "whereby",
):
"""
Add a new room
@@ -122,6 +127,7 @@ class RoomController:
recording_type=recording_type,
recording_trigger=recording_trigger,
is_shared=is_shared,
platform=platform,
)
query = rooms.insert().values(**room.model_dump())
try:

View File

@@ -101,6 +101,18 @@ class Settings(BaseSettings):
AWS_PROCESS_RECORDING_QUEUE_URL: str | None = None
SQS_POLLING_TIMEOUT_SECONDS: int = 60
# Daily.co integration
DAILY_API_KEY: str | None = None
DAILY_WEBHOOK_SECRET: str | None = None
DAILY_SUBDOMAIN: str | None = None
AWS_DAILY_S3_BUCKET: str | None = None
AWS_DAILY_ROLE_ARN: str | None = None
# Video platform migration feature flags
DAILY_MIGRATION_ENABLED: bool = False
DAILY_MIGRATION_ROOM_IDS: list[str] = []
DEFAULT_VIDEO_PLATFORM: str = "whereby"
# Zulip integration
ZULIP_REALM: str | None = None
ZULIP_API_KEY: str | None = None

View File

@@ -0,0 +1,17 @@
# Video Platform Abstraction Layer
"""
This module provides an abstraction layer for different video conferencing platforms.
It allows seamless switching between providers (Whereby, Daily.co, etc.) without
changing the core application logic.
"""
from .base import MeetingData, VideoPlatformClient, VideoPlatformConfig
from .registry import get_platform_client, register_platform
__all__ = [
"VideoPlatformClient",
"VideoPlatformConfig",
"MeetingData",
"get_platform_client",
"register_platform",
]

View File

@@ -0,0 +1,82 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic import BaseModel
from reflector.db.rooms import Room
class MeetingData(BaseModel):
"""Standardized meeting data returned by all platforms."""
meeting_id: str
room_name: str
room_url: str
host_room_url: str
platform: str
extra_data: Dict[str, Any] = {} # Platform-specific data
class VideoPlatformConfig(BaseModel):
"""Configuration for a video platform."""
api_key: str
webhook_secret: str
api_url: Optional[str] = None
subdomain: Optional[str] = None
s3_bucket: Optional[str] = None
s3_region: Optional[str] = None
aws_role_arn: Optional[str] = None
aws_access_key_id: Optional[str] = None
aws_access_key_secret: Optional[str] = None
class VideoPlatformClient(ABC):
"""Abstract base class for video platform integrations."""
PLATFORM_NAME: str = ""
def __init__(self, config: VideoPlatformConfig):
self.config = config
@abstractmethod
async def create_meeting(
self, room_name_prefix: str, end_date: datetime, room: Room
) -> MeetingData:
"""Create a new meeting room."""
pass
@abstractmethod
async def get_room_sessions(self, room_name: str) -> Dict[str, Any]:
"""Get session information for a room."""
pass
@abstractmethod
async def delete_room(self, room_name: str) -> bool:
"""Delete a room. Returns True if successful."""
pass
@abstractmethod
async def upload_logo(self, room_name: str, logo_path: str) -> bool:
"""Upload a logo to the room. Returns True if successful."""
pass
@abstractmethod
def verify_webhook_signature(
self, body: bytes, signature: str, timestamp: Optional[str] = None
) -> bool:
"""Verify webhook signature for security."""
pass
def format_recording_config(self, room: Room) -> Dict[str, Any]:
"""Format recording configuration for the platform.
Can be overridden by specific implementations."""
if room.recording_type == "cloud" and self.config.s3_bucket:
return {
"type": room.recording_type,
"bucket": self.config.s3_bucket,
"region": self.config.s3_region,
"trigger": room.recording_trigger,
}
return {"type": room.recording_type}

View File

@@ -0,0 +1,152 @@
import hmac
from datetime import datetime
from hashlib import sha256
from typing import Any, Dict, Optional
import httpx
from reflector.db.rooms import Room
from .base import MeetingData, VideoPlatformClient, VideoPlatformConfig
class DailyClient(VideoPlatformClient):
"""Daily.co video platform implementation."""
PLATFORM_NAME = "daily"
TIMEOUT = 10 # seconds
BASE_URL = "https://api.daily.co/v1"
def __init__(self, config: VideoPlatformConfig):
super().__init__(config)
self.headers = {
"Authorization": f"Bearer {config.api_key}",
"Content-Type": "application/json",
}
async def create_meeting(
self, room_name_prefix: str, end_date: datetime, room: Room
) -> MeetingData:
"""Create a Daily.co room."""
# Generate unique room name
room_name = f"{room_name_prefix}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
data = {
"name": room_name,
"privacy": "private" if room.is_locked else "public",
"properties": {
"enable_recording": room.recording_type
if room.recording_type != "none"
else False,
"enable_chat": True,
"enable_screenshare": True,
"start_video_off": False,
"start_audio_off": False,
"exp": int(end_date.timestamp()),
"enable_recording_ui": False, # We handle consent ourselves
},
}
# Configure S3 bucket for cloud recordings
if room.recording_type == "cloud" and self.config.s3_bucket:
data["properties"]["recording_bucket"] = {
"bucket_name": self.config.s3_bucket,
"bucket_region": self.config.s3_region,
"assume_role_arn": self.config.aws_role_arn,
"path": f"recordings/{room_name}",
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/rooms",
headers=self.headers,
json=data,
timeout=self.TIMEOUT,
)
response.raise_for_status()
result = response.json()
# Format response to match our standard
room_url = result["url"]
host_room_url = f"{room_url}?t={result['config']['token']}"
return MeetingData(
meeting_id=result["id"],
room_name=result["name"],
room_url=room_url,
host_room_url=host_room_url,
platform=self.PLATFORM_NAME,
extra_data=result,
)
async def get_room_sessions(self, room_name: str) -> Dict[str, Any]:
"""Get Daily.co room information."""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/rooms/{room_name}",
headers=self.headers,
timeout=self.TIMEOUT,
)
response.raise_for_status()
return response.json()
async def get_room_presence(self, room_name: str) -> Dict[str, Any]:
"""Get real-time participant data - Daily.co specific feature."""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/rooms/{room_name}/presence",
headers=self.headers,
timeout=self.TIMEOUT,
)
response.raise_for_status()
return response.json()
async def delete_room(self, room_name: str) -> bool:
"""Delete a Daily.co room."""
async with httpx.AsyncClient() as client:
response = await client.delete(
f"{self.BASE_URL}/rooms/{room_name}",
headers=self.headers,
timeout=self.TIMEOUT,
)
# Daily.co returns 200 for success, 404 if room doesn't exist
return response.status_code in (200, 404)
async def upload_logo(self, room_name: str, logo_path: str) -> bool:
"""Daily.co doesn't support custom logos per room - this is a no-op."""
return True
def verify_webhook_signature(
self, body: bytes, signature: str, timestamp: Optional[str] = None
) -> bool:
"""Verify Daily.co webhook signature."""
expected = hmac.new(
self.config.webhook_secret.encode(), body, sha256
).hexdigest()
try:
return hmac.compare_digest(expected, signature)
except Exception:
return False
async def start_recording(self, room_name: str) -> Dict[str, Any]:
"""Start recording for a room - Daily.co specific method."""
data = {
"layout": {
"preset": "audio-only" # For transcription use case
},
"streaming_settings": {
"width": 1280,
"height": 720,
},
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/rooms/{room_name}/recordings",
headers=self.headers,
json=data,
timeout=self.TIMEOUT,
)
response.raise_for_status()
return response.json()

View File

@@ -0,0 +1,54 @@
"""Factory for creating video platform clients based on configuration."""
from typing import Optional
from reflector.settings import settings
from .base import VideoPlatformClient, VideoPlatformConfig
from .registry import get_platform_client
def get_platform_config(platform: str) -> VideoPlatformConfig:
"""Get configuration for a specific platform."""
if platform == "whereby":
return VideoPlatformConfig(
api_key=settings.WHEREBY_API_KEY or "",
webhook_secret=settings.WHEREBY_WEBHOOK_SECRET or "",
api_url=settings.WHEREBY_API_URL,
s3_bucket=settings.AWS_WHEREBY_S3_BUCKET,
aws_access_key_id=settings.AWS_WHEREBY_ACCESS_KEY_ID,
aws_access_key_secret=settings.AWS_WHEREBY_ACCESS_KEY_SECRET,
)
elif platform == "daily":
return VideoPlatformConfig(
api_key=settings.DAILY_API_KEY or "",
webhook_secret=settings.DAILY_WEBHOOK_SECRET or "",
subdomain=settings.DAILY_SUBDOMAIN,
s3_bucket=settings.AWS_DAILY_S3_BUCKET,
s3_region=settings.AWS_REGION
if hasattr(settings, "AWS_REGION")
else "us-west-2",
aws_role_arn=settings.AWS_DAILY_ROLE_ARN,
)
else:
raise ValueError(f"Unknown platform: {platform}")
def create_platform_client(platform: str) -> VideoPlatformClient:
"""Create a video platform client instance."""
config = get_platform_config(platform)
return get_platform_client(platform, config)
def get_platform_for_room(room_id: Optional[str] = None) -> str:
"""Determine which platform to use for a room based on feature flags."""
# If Daily migration is disabled, always use Whereby
if not settings.DAILY_MIGRATION_ENABLED:
return "whereby"
# If a specific room is in the migration list, use Daily
if room_id and room_id in settings.DAILY_MIGRATION_ROOM_IDS:
return "daily"
# Otherwise use the default platform
return settings.DEFAULT_VIDEO_PLATFORM

View File

@@ -0,0 +1,124 @@
"""Mock video platform client for testing."""
import uuid
from datetime import datetime
from typing import Any, Dict, Optional
from reflector.db.rooms import Room
from .base import MeetingData, VideoPlatformClient, VideoPlatformConfig
class MockPlatformClient(VideoPlatformClient):
"""Mock video platform implementation for testing."""
PLATFORM_NAME = "mock"
def __init__(self, config: VideoPlatformConfig):
super().__init__(config)
# Store created rooms for testing
self._rooms: Dict[str, Dict[str, Any]] = {}
self._webhook_calls: list[Dict[str, Any]] = []
async def create_meeting(
self, room_name_prefix: str, end_date: datetime, room: Room
) -> MeetingData:
"""Create a mock meeting."""
meeting_id = str(uuid.uuid4())
room_name = f"{room_name_prefix}-{meeting_id[:8]}"
room_url = f"https://mock.video/{room_name}"
host_room_url = f"{room_url}?host=true"
# Store room data for later retrieval
self._rooms[room_name] = {
"id": meeting_id,
"name": room_name,
"url": room_url,
"host_url": host_room_url,
"end_date": end_date,
"room": room,
"participants": [],
"is_active": True,
}
return MeetingData(
meeting_id=meeting_id,
room_name=room_name,
room_url=room_url,
host_room_url=host_room_url,
platform=self.PLATFORM_NAME,
extra_data={"mock": True},
)
async def get_room_sessions(self, room_name: str) -> Dict[str, Any]:
"""Get mock room session information."""
if room_name not in self._rooms:
return {"error": "Room not found"}
room_data = self._rooms[room_name]
return {
"roomName": room_name,
"sessions": [
{
"sessionId": room_data["id"],
"startTime": datetime.utcnow().isoformat(),
"participants": room_data["participants"],
"isActive": room_data["is_active"],
}
],
}
async def delete_room(self, room_name: str) -> bool:
"""Delete a mock room."""
if room_name in self._rooms:
self._rooms[room_name]["is_active"] = False
return True
return False
async def upload_logo(self, room_name: str, logo_path: str) -> bool:
"""Mock logo upload."""
if room_name in self._rooms:
self._rooms[room_name]["logo_path"] = logo_path
return True
return False
def verify_webhook_signature(
self, body: bytes, signature: str, timestamp: Optional[str] = None
) -> bool:
"""Mock webhook signature verification."""
# For testing, accept signature == "valid"
return signature == "valid"
# Mock-specific methods for testing
def add_participant(
self, room_name: str, participant_id: str, participant_name: str
):
"""Add a participant to a mock room (for testing)."""
if room_name in self._rooms:
self._rooms[room_name]["participants"].append(
{
"id": participant_id,
"name": participant_name,
"joined_at": datetime.utcnow().isoformat(),
}
)
def trigger_webhook(self, event_type: str, data: Dict[str, Any]):
"""Trigger a mock webhook event (for testing)."""
self._webhook_calls.append(
{
"type": event_type,
"data": data,
"timestamp": datetime.utcnow().isoformat(),
}
)
def get_webhook_calls(self) -> list[Dict[str, Any]]:
"""Get all webhook calls made (for testing)."""
return self._webhook_calls.copy()
def clear_data(self):
"""Clear all mock data (for testing)."""
self._rooms.clear()
self._webhook_calls.clear()

View File

@@ -0,0 +1,42 @@
from typing import Dict, Type
from .base import VideoPlatformClient, VideoPlatformConfig
# Registry of available video platforms
_PLATFORMS: Dict[str, Type[VideoPlatformClient]] = {}
def register_platform(name: str, client_class: Type[VideoPlatformClient]):
"""Register a video platform implementation."""
_PLATFORMS[name.lower()] = client_class
def get_platform_client(
platform: str, config: VideoPlatformConfig
) -> VideoPlatformClient:
"""Get a video platform client instance."""
platform_lower = platform.lower()
if platform_lower not in _PLATFORMS:
raise ValueError(f"Unknown video platform: {platform}")
client_class = _PLATFORMS[platform_lower]
return client_class(config)
def get_available_platforms() -> list[str]:
"""Get list of available platform names."""
return list(_PLATFORMS.keys())
# Auto-register built-in platforms
def _register_builtin_platforms():
from .daily import DailyClient
from .mock import MockPlatformClient
from .whereby import WherebyClient
register_platform("whereby", WherebyClient)
register_platform("daily", DailyClient)
register_platform("mock", MockPlatformClient)
_register_builtin_platforms()

View File

@@ -0,0 +1,140 @@
import hmac
import json
import re
import time
from datetime import datetime
from hashlib import sha256
from typing import Any, Dict, Optional
import httpx
from reflector.db.rooms import Room
from .base import MeetingData, VideoPlatformClient, VideoPlatformConfig
class WherebyClient(VideoPlatformClient):
"""Whereby video platform implementation."""
PLATFORM_NAME = "whereby"
TIMEOUT = 10 # seconds
MAX_ELAPSED_TIME = 60 * 1000 # 1 minute in milliseconds
def __init__(self, config: VideoPlatformConfig):
super().__init__(config)
self.headers = {
"Content-Type": "application/json; charset=utf-8",
"Authorization": f"Bearer {config.api_key}",
}
async def create_meeting(
self, room_name_prefix: str, end_date: datetime, room: Room
) -> MeetingData:
"""Create a Whereby meeting."""
data = {
"isLocked": room.is_locked,
"roomNamePrefix": room_name_prefix,
"roomNamePattern": "uuid",
"roomMode": room.room_mode,
"endDate": end_date.isoformat(),
"fields": ["hostRoomUrl"],
}
# Add recording configuration if cloud recording is enabled
if room.recording_type == "cloud":
data["recording"] = {
"type": room.recording_type,
"destination": {
"provider": "s3",
"bucket": self.config.s3_bucket,
"accessKeyId": self.config.aws_access_key_id,
"accessKeySecret": self.config.aws_access_key_secret,
"fileFormat": "mp4",
},
"startTrigger": room.recording_trigger,
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.config.api_url}/meetings",
headers=self.headers,
json=data,
timeout=self.TIMEOUT,
)
response.raise_for_status()
result = response.json()
return MeetingData(
meeting_id=result["meetingId"],
room_name=result["roomName"],
room_url=result["roomUrl"],
host_room_url=result["hostRoomUrl"],
platform=self.PLATFORM_NAME,
extra_data=result,
)
async def get_room_sessions(self, room_name: str) -> Dict[str, Any]:
"""Get Whereby room session information."""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.config.api_url}/insights/room-sessions?roomName={room_name}",
headers=self.headers,
timeout=self.TIMEOUT,
)
response.raise_for_status()
return response.json()
async def delete_room(self, room_name: str) -> bool:
"""Whereby doesn't support room deletion - meetings expire automatically."""
return True
async def upload_logo(self, room_name: str, logo_path: str) -> bool:
"""Upload logo to Whereby room."""
async with httpx.AsyncClient() as client:
with open(logo_path, "rb") as f:
response = await client.put(
f"{self.config.api_url}/rooms/{room_name}/theme/logo",
headers={
"Authorization": f"Bearer {self.config.api_key}",
},
timeout=self.TIMEOUT,
files={"image": f},
)
response.raise_for_status()
return True
def verify_webhook_signature(
self, body: bytes, signature: str, timestamp: Optional[str] = None
) -> bool:
"""Verify Whereby webhook signature."""
if not signature:
return False
matches = re.match(r"t=(.*),v1=(.*)", signature)
if not matches:
return False
ts, sig = matches.groups()
# Check timestamp to prevent replay attacks
current_time = int(time.time() * 1000)
diff_time = current_time - int(ts) * 1000
if diff_time >= self.MAX_ELAPSED_TIME:
return False
# Verify signature
body_dict = json.loads(body)
signed_payload = f"{ts}.{json.dumps(body_dict, separators=(',', ':'))}"
hmac_obj = hmac.new(
self.config.webhook_secret.encode("utf-8"),
signed_payload.encode("utf-8"),
sha256,
)
expected_signature = hmac_obj.hexdigest()
try:
return hmac.compare_digest(
expected_signature.encode("utf-8"), sig.encode("utf-8")
)
except Exception:
return False

View File

@@ -0,0 +1,142 @@
"""Daily.co webhook handler endpoint."""
import hmac
from hashlib import sha256
from typing import Any, Dict
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from reflector.db.meetings import meetings_controller
from reflector.settings import settings
router = APIRouter()
class DailyWebhookEvent(BaseModel):
"""Daily.co webhook event structure."""
type: str
id: str
ts: int # Unix timestamp in milliseconds
data: Dict[str, Any]
def verify_daily_webhook_signature(body: bytes, signature: str) -> bool:
"""Verify Daily.co webhook signature using HMAC-SHA256."""
if not signature or not settings.DAILY_WEBHOOK_SECRET:
return False
try:
expected = hmac.new(
settings.DAILY_WEBHOOK_SECRET.encode(), body, sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
except Exception:
return False
@router.post("/daily_webhook")
async def daily_webhook(event: DailyWebhookEvent, request: Request):
"""Handle Daily.co webhook events."""
# Verify webhook signature for security
body = await request.body()
signature = request.headers.get("X-Daily-Signature", "")
if not verify_daily_webhook_signature(body, signature):
raise HTTPException(status_code=401, detail="Invalid webhook signature")
# Handle participant events
if event.type == "participant.joined":
await _handle_participant_joined(event)
elif event.type == "participant.left":
await _handle_participant_left(event)
elif event.type == "recording.started":
await _handle_recording_started(event)
elif event.type == "recording.ready-to-download":
await _handle_recording_ready(event)
elif event.type == "recording.error":
await _handle_recording_error(event)
return {"status": "ok"}
async def _handle_participant_joined(event: DailyWebhookEvent):
"""Handle participant joined event."""
room_name = event.data.get("room", {}).get("name")
if not room_name:
return
meeting = await meetings_controller.get_by_room_name(room_name)
if meeting:
# Update participant count (same as Whereby)
current_count = getattr(meeting, "num_clients", 0)
await meetings_controller.update_meeting(
meeting.id, num_clients=current_count + 1
)
async def _handle_participant_left(event: DailyWebhookEvent):
"""Handle participant left event."""
room_name = event.data.get("room", {}).get("name")
if not room_name:
return
meeting = await meetings_controller.get_by_room_name(room_name)
if meeting:
# Update participant count (same as Whereby)
current_count = getattr(meeting, "num_clients", 0)
await meetings_controller.update_meeting(
meeting.id, num_clients=max(0, current_count - 1)
)
async def _handle_recording_started(event: DailyWebhookEvent):
"""Handle recording started event."""
room_name = event.data.get("room", {}).get("name")
if not room_name:
return
meeting = await meetings_controller.get_by_room_name(room_name)
if meeting:
# Log recording start for debugging
print(f"Recording started for meeting {meeting.id} in room {room_name}")
async def _handle_recording_ready(event: DailyWebhookEvent):
"""Handle recording ready for download event."""
room_name = event.data.get("room", {}).get("name")
download_link = event.data.get("download_link")
recording_id = event.data.get("recording_id")
if not room_name or not download_link:
return
meeting = await meetings_controller.get_by_room_name(room_name)
if meeting:
# Queue recording processing task (same as Whereby)
try:
# Import here to avoid circular imports
from reflector.worker.tasks import process_recording
process_recording.delay(
meeting_id=meeting.id,
recording_url=download_link,
recording_id=recording_id or event.id,
)
except ImportError:
# Handle case where worker tasks aren't available
print(
f"Warning: Could not queue recording processing for meeting {meeting.id}"
)
async def _handle_recording_error(event: DailyWebhookEvent):
"""Handle recording error event."""
room_name = event.data.get("room", {}).get("name")
error = event.data.get("error", "Unknown error")
if room_name:
meeting = await meetings_controller.get_by_room_name(room_name)
if meeting:
print(f"Recording error for meeting {meeting.id}: {error}")

View File

@@ -14,7 +14,10 @@ from reflector.db import database
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.settings import settings
from reflector.whereby import create_meeting, upload_logo
from reflector.video_platforms.factory import (
create_platform_client,
get_platform_for_room,
)
logger = logging.getLogger(__name__)
@@ -34,6 +37,7 @@ class Room(BaseModel):
recording_type: str
recording_trigger: str
is_shared: bool
platform: str
class Meeting(BaseModel):
@@ -44,6 +48,7 @@ class Meeting(BaseModel):
start_date: datetime
end_date: datetime
recording_type: Literal["none", "local", "cloud"] = "cloud"
platform: str
class CreateRoom(BaseModel):
@@ -98,6 +103,14 @@ async def rooms_create(
):
user_id = user["sub"] if user else None
# Determine platform for this room (will be "whereby" unless feature flag is enabled)
# Note: Since room doesn't exist yet, we can't use room_id for selection
platform = (
settings.DEFAULT_VIDEO_PLATFORM
if settings.DAILY_MIGRATION_ENABLED
else "whereby"
)
return await rooms_controller.add(
name=room.name,
user_id=user_id,
@@ -109,6 +122,7 @@ async def rooms_create(
recording_type=room.recording_type,
recording_trigger=room.recording_trigger,
is_shared=room.is_shared,
platform=platform,
)
@@ -156,18 +170,26 @@ async def rooms_create_meeting(
if meeting is None:
end_date = current_time + timedelta(hours=8)
whereby_meeting = await create_meeting("", end_date=end_date, room=room)
await upload_logo(whereby_meeting["roomName"], "./images/logo.png")
# Use the platform abstraction to create meeting
platform = get_platform_for_room(room.id)
client = create_platform_client(platform)
meeting_data = await client.create_meeting(
room_name_prefix=room.name, end_date=end_date, room=room
)
# Upload logo if supported by platform
await client.upload_logo(meeting_data.room_name, "./images/logo.png")
# Now try to save to database
try:
meeting = await meetings_controller.create(
id=whereby_meeting["meetingId"],
room_name=whereby_meeting["roomName"],
room_url=whereby_meeting["roomUrl"],
host_room_url=whereby_meeting["hostRoomUrl"],
start_date=datetime.fromisoformat(whereby_meeting["startDate"]),
end_date=datetime.fromisoformat(whereby_meeting["endDate"]),
id=meeting_data.meeting_id,
room_name=meeting_data.room_name,
room_url=meeting_data.room_url,
host_room_url=meeting_data.host_room_url,
start_date=current_time,
end_date=end_date,
user_id=user_id,
room=room,
)
@@ -179,8 +201,9 @@ async def rooms_create_meeting(
room.name,
)
logger.warning(
"Whereby meeting %s was created but not used (resource leak) for room %s",
whereby_meeting["meetingId"],
"%s meeting %s was created but not used (resource leak) for room %s",
platform,
meeting_data.meeting_id,
room.name,
)

View File

@@ -0,0 +1,392 @@
"""Tests for Daily.co webhook integration."""
import hashlib
import hmac
import json
from datetime import datetime
from unittest.mock import MagicMock, patch
import pytest
from httpx import AsyncClient
from reflector.app import app
from reflector.views.daily import DailyWebhookEvent
class TestDailyWebhookIntegration:
"""Test Daily.co webhook endpoint integration."""
@pytest.fixture
def webhook_secret(self):
"""Test webhook secret."""
return "test-webhook-secret-123"
@pytest.fixture
def mock_room(self):
"""Create a mock room for testing."""
room = MagicMock()
room.id = "test-room-123"
room.name = "Test Room"
room.recording_type = "cloud"
room.platform = "daily"
return room
@pytest.fixture
def mock_meeting(self):
"""Create a mock meeting for testing."""
meeting = MagicMock()
meeting.id = "test-meeting-456"
meeting.room_id = "test-room-123"
meeting.platform = "daily"
meeting.room_name = "test-room-123-abc"
return meeting
def create_webhook_signature(self, payload: bytes, secret: str) -> str:
"""Create HMAC signature for webhook payload."""
return hmac.new(secret.encode(), payload, hashlib.sha256).hexdigest()
def create_webhook_event(
self, event_type: str, room_name: str = "test-room-123-abc", **kwargs
) -> dict:
"""Create a Daily.co webhook event payload."""
base_event = {
"type": event_type,
"id": f"evt_{event_type.replace('.', '_')}_{int(datetime.utcnow().timestamp())}",
"ts": int(datetime.utcnow().timestamp() * 1000), # milliseconds
"data": {"room": {"name": room_name}, **kwargs},
}
return base_event
@pytest.mark.asyncio
async def test_webhook_participant_joined(
self, webhook_secret, mock_room, mock_meeting
):
"""Test participant joined webhook event."""
event_data = self.create_webhook_event(
"participant.joined",
participant={
"id": "participant-123",
"user_name": "John Doe",
"session_id": "session-456",
},
)
payload = json.dumps(event_data).encode()
signature = self.create_webhook_signature(payload, webhook_secret)
with patch("reflector.views.daily.settings") as mock_settings:
mock_settings.DAILY_WEBHOOK_SECRET = webhook_secret
with patch(
"reflector.db.meetings.meetings_controller.get_by_room_name"
) as mock_get_meeting:
mock_get_meeting.return_value = mock_meeting
with patch(
"reflector.db.meetings.meetings_controller.update_meeting"
) as mock_update:
async with AsyncClient(app=app, base_url="http://test/v1") as ac:
response = await ac.post(
"/daily_webhook",
json=event_data,
headers={"X-Daily-Signature": signature},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# Verify meeting was looked up
mock_get_meeting.assert_called_once_with("test-room-123-abc")
@pytest.mark.asyncio
async def test_webhook_participant_left(
self, webhook_secret, mock_room, mock_meeting
):
"""Test participant left webhook event."""
event_data = self.create_webhook_event(
"participant.left",
participant={
"id": "participant-123",
"user_name": "John Doe",
"session_id": "session-456",
},
)
payload = json.dumps(event_data).encode()
signature = self.create_webhook_signature(payload, webhook_secret)
with patch("reflector.views.daily.settings") as mock_settings:
mock_settings.DAILY_WEBHOOK_SECRET = webhook_secret
with patch(
"reflector.db.meetings.meetings_controller.get_by_room_name"
) as mock_get_meeting:
mock_get_meeting.return_value = mock_meeting
async with AsyncClient(app=app, base_url="http://test/v1") as ac:
response = await ac.post(
"/daily_webhook",
json=event_data,
headers={"X-Daily-Signature": signature},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
@pytest.mark.asyncio
async def test_webhook_recording_started(
self, webhook_secret, mock_room, mock_meeting
):
"""Test recording started webhook event."""
event_data = self.create_webhook_event(
"recording.started",
recording={
"id": "recording-789",
"status": "recording",
"start_time": "2025-01-01T10:00:00Z",
},
)
payload = json.dumps(event_data).encode()
signature = self.create_webhook_signature(payload, webhook_secret)
with patch("reflector.views.daily.settings") as mock_settings:
mock_settings.DAILY_WEBHOOK_SECRET = webhook_secret
with patch(
"reflector.db.meetings.meetings_controller.get_by_room_name"
) as mock_get_meeting:
mock_get_meeting.return_value = mock_meeting
with patch(
"reflector.db.meetings.meetings_controller.update_meeting"
) as mock_update:
async with AsyncClient(app=app, base_url="http://test/v1") as ac:
response = await ac.post(
"/daily_webhook",
json=event_data,
headers={"X-Daily-Signature": signature},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
@pytest.mark.asyncio
async def test_webhook_recording_ready_triggers_processing(
self, webhook_secret, mock_room, mock_meeting
):
"""Test recording ready webhook triggers audio processing."""
event_data = self.create_webhook_event(
"recording.ready-to-download",
recording={
"id": "recording-789",
"status": "finished",
"download_url": "https://s3.amazonaws.com/bucket/recording.mp4",
"start_time": "2025-01-01T10:00:00Z",
"duration": 1800,
},
)
payload = json.dumps(event_data).encode()
signature = self.create_webhook_signature(payload, webhook_secret)
with patch("reflector.views.daily.settings") as mock_settings:
mock_settings.DAILY_WEBHOOK_SECRET = webhook_secret
with patch(
"reflector.db.meetings.meetings_controller.get_by_room_name"
) as mock_get_meeting:
mock_get_meeting.return_value = mock_meeting
with patch(
"reflector.db.meetings.meetings_controller.update_meeting"
) as mock_update_url:
with patch(
"reflector.worker.tasks.process_recording.delay"
) as mock_process:
async with AsyncClient(
app=app, base_url="http://test/v1"
) as ac:
response = await ac.post(
"/daily_webhook",
json=event_data,
headers={"X-Daily-Signature": signature},
)
assert response.status_code == 200
assert response.json() == {"status": "ok"}
# Verify recording URL was updated
mock_update_url.assert_called_once_with(
mock_meeting.id,
"https://s3.amazonaws.com/bucket/recording.mp4",
)
# Verify processing was triggered
mock_process.assert_called_once_with(mock_meeting.id)
@pytest.mark.asyncio
async def test_webhook_invalid_signature_rejected(self, webhook_secret):
"""Test webhook with invalid signature is rejected."""
event_data = self.create_webhook_event("participant.joined")
with patch("reflector.views.daily.settings") as mock_settings:
mock_settings.DAILY_WEBHOOK_SECRET = webhook_secret
async with AsyncClient(app=app, base_url="http://test/v1") as ac:
response = await ac.post(
"/daily_webhook",
json=event_data,
headers={"X-Daily-Signature": "invalid-signature"},
)
assert response.status_code == 401
assert "Invalid signature" in response.json()["detail"]
@pytest.mark.asyncio
async def test_webhook_missing_signature_rejected(self):
"""Test webhook without signature header is rejected."""
event_data = self.create_webhook_event("participant.joined")
async with AsyncClient(app=app, base_url="http://test/v1") as ac:
response = await ac.post("/daily_webhook", json=event_data)
assert response.status_code == 401
assert "Missing signature" in response.json()["detail"]
@pytest.mark.asyncio
async def test_webhook_meeting_not_found(self, webhook_secret):
"""Test webhook for non-existent meeting."""
event_data = self.create_webhook_event(
"participant.joined", room_name="non-existent-room"
)
payload = json.dumps(event_data).encode()
signature = self.create_webhook_signature(payload, webhook_secret)
with patch("reflector.views.daily.settings") as mock_settings:
mock_settings.DAILY_WEBHOOK_SECRET = webhook_secret
with patch(
"reflector.db.meetings.meetings_controller.get_by_room_name"
) as mock_get_meeting:
mock_get_meeting.return_value = None
async with AsyncClient(app=app, base_url="http://test/v1") as ac:
response = await ac.post(
"/daily_webhook",
json=event_data,
headers={"X-Daily-Signature": signature},
)
assert response.status_code == 404
assert "Meeting not found" in response.json()["detail"]
@pytest.mark.asyncio
async def test_webhook_unknown_event_type(self, webhook_secret, mock_meeting):
"""Test webhook with unknown event type."""
event_data = self.create_webhook_event("unknown.event")
payload = json.dumps(event_data).encode()
signature = self.create_webhook_signature(payload, webhook_secret)
with patch("reflector.views.daily.settings") as mock_settings:
mock_settings.DAILY_WEBHOOK_SECRET = webhook_secret
with patch(
"reflector.db.meetings.meetings_controller.get_by_room_name"
) as mock_get_meeting:
mock_get_meeting.return_value = mock_meeting
async with AsyncClient(app=app, base_url="http://test/v1") as ac:
response = await ac.post(
"/daily_webhook",
json=event_data,
headers={"X-Daily-Signature": signature},
)
# Should still return 200 but log the unknown event
assert response.status_code == 200
assert response.json() == {"status": "ok"}
@pytest.mark.asyncio
async def test_webhook_malformed_json(self, webhook_secret):
"""Test webhook with malformed JSON."""
with patch("reflector.views.daily.settings") as mock_settings:
mock_settings.DAILY_WEBHOOK_SECRET = webhook_secret
async with AsyncClient(app=app, base_url="http://test/v1") as ac:
response = await ac.post(
"/daily_webhook",
content="invalid json",
headers={
"Content-Type": "application/json",
"X-Daily-Signature": "test-signature",
},
)
assert response.status_code == 422 # Validation error
class TestWebhookEventValidation:
"""Test webhook event data validation."""
def test_daily_webhook_event_validation_valid(self):
"""Test valid webhook event passes validation."""
event_data = {
"type": "participant.joined",
"id": "evt_123",
"ts": 1640995200000, # milliseconds
"data": {
"room": {"name": "test-room"},
"participant": {
"id": "participant-123",
"user_name": "John Doe",
"session_id": "session-456",
},
},
}
event = DailyWebhookEvent(**event_data)
assert event.type == "participant.joined"
assert event.data["room"]["name"] == "test-room"
assert event.data["participant"]["id"] == "participant-123"
def test_daily_webhook_event_validation_minimal(self):
"""Test minimal valid webhook event."""
event_data = {
"type": "room.created",
"id": "evt_123",
"ts": 1640995200000,
"data": {"room": {"name": "test-room"}},
}
event = DailyWebhookEvent(**event_data)
assert event.type == "room.created"
assert event.data["room"]["name"] == "test-room"
def test_daily_webhook_event_validation_with_recording(self):
"""Test webhook event with recording data."""
event_data = {
"type": "recording.ready-to-download",
"id": "evt_123",
"ts": 1640995200000,
"data": {
"room": {"name": "test-room"},
"recording": {
"id": "recording-123",
"status": "finished",
"download_url": "https://example.com/recording.mp4",
"start_time": "2025-01-01T10:00:00Z",
"duration": 1800,
},
},
}
event = DailyWebhookEvent(**event_data)
assert event.type == "recording.ready-to-download"
assert event.data["recording"]["id"] == "recording-123"
assert (
event.data["recording"]["download_url"]
== "https://example.com/recording.mp4"
)

View File

@@ -0,0 +1,323 @@
"""Tests for video platform clients."""
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from reflector.db.rooms import Room
from reflector.video_platforms.base import MeetingData, VideoPlatformConfig
from reflector.video_platforms.daily import DailyClient
from reflector.video_platforms.mock import MockPlatformClient
from reflector.video_platforms.registry import get_platform_client
from reflector.video_platforms.whereby import WherebyClient
@pytest.fixture
def mock_room():
"""Create a mock room for testing."""
room = MagicMock(spec=Room)
room.id = "test-room-123"
room.name = "Test Room"
room.recording_type = "cloud"
room.platform = "whereby"
return room
@pytest.fixture
def config():
"""Create a test configuration."""
return VideoPlatformConfig(
api_key="test-api-key",
webhook_secret="test-webhook-secret",
subdomain="test-subdomain",
)
class TestPlatformFactory:
"""Test platform client factory."""
def test_create_whereby_client(self, config):
"""Test creating Whereby client."""
client = get_platform_client("whereby", config)
assert isinstance(client, WherebyClient)
def test_create_daily_client(self, config):
"""Test creating Daily.co client."""
client = get_platform_client("daily", config)
assert isinstance(client, DailyClient)
def test_create_mock_client(self, config):
"""Test creating mock client."""
client = get_platform_client("mock", config)
assert isinstance(client, MockPlatformClient)
def test_invalid_platform_raises_error(self, config):
"""Test that invalid platform raises ValueError."""
with pytest.raises(ValueError, match="Unknown platform: invalid"):
get_platform_client("invalid", config)
class TestMockPlatformClient:
"""Test mock platform client implementation."""
@pytest.fixture
def mock_client(self, config):
return MockPlatformClient(config)
@pytest.mark.asyncio
async def test_create_meeting(self, mock_client, mock_room):
"""Test creating a meeting with mock client."""
end_date = datetime.utcnow() + timedelta(hours=1)
meeting_data = await mock_client.create_meeting(
room_name_prefix="test", end_date=end_date, room=mock_room
)
assert isinstance(meeting_data, MeetingData)
assert meeting_data.room_url.startswith("https://mock.video/")
assert meeting_data.host_room_url.startswith("https://mock.video/")
assert meeting_data.room_name.startswith("test")
@pytest.mark.asyncio
async def test_get_room_sessions(self, mock_client):
"""Test getting room sessions."""
# First create a room so it exists
end_date = datetime.utcnow() + timedelta(hours=1)
mock_room = MagicMock()
mock_room.id = "test-room"
meeting = await mock_client.create_meeting("test", end_date, mock_room)
sessions = await mock_client.get_room_sessions(meeting.room_name)
assert isinstance(sessions, dict)
assert "sessions" in sessions
assert len(sessions["sessions"]) == 1
@pytest.mark.asyncio
async def test_delete_room(self, mock_client):
"""Test deleting a room."""
# First create a room so it exists
end_date = datetime.utcnow() + timedelta(hours=1)
mock_room = MagicMock()
mock_room.id = "test-room"
meeting = await mock_client.create_meeting("test", end_date, mock_room)
result = await mock_client.delete_room(meeting.room_name)
assert result is True
def test_verify_webhook_signature_valid(self, mock_client):
"""Test webhook signature verification with valid signature."""
payload = b'{"event": "test"}'
signature = "valid" # Mock accepts "valid" as valid signature
result = mock_client.verify_webhook_signature(payload, signature)
assert result is True
def test_verify_webhook_signature_invalid(self, mock_client):
"""Test webhook signature verification with invalid signature."""
payload = b'{"event": "test"}'
signature = "invalid-signature"
result = mock_client.verify_webhook_signature(payload, signature)
assert result is False
class TestDailyClient:
"""Test Daily.co platform client."""
@pytest.fixture
def daily_client(self, config):
return DailyClient(config)
@pytest.mark.asyncio
async def test_create_meeting_success(self, daily_client, mock_room):
"""Test successful meeting creation."""
end_date = datetime.utcnow() + timedelta(hours=1)
mock_response = {
"url": "https://test.daily.co/test-room-123-abc",
"name": "test-room-123-abc",
"api_created": True,
"privacy": "public",
"config": {"enable_recording": "cloud"},
}
with patch.object(
daily_client, "_make_request", new_callable=AsyncMock
) as mock_request:
mock_request.return_value = mock_response
meeting_data = await daily_client.create_meeting(
room_name_prefix="test", end_date=end_date, room=mock_room
)
assert isinstance(meeting_data, MeetingData)
assert meeting_data.room_url == "https://test.daily.co/test-room-123-abc"
assert (
meeting_data.host_room_url == "https://test.daily.co/test-room-123-abc"
)
assert meeting_data.room_name == "test-room-123-abc"
# Verify request was made with correct parameters
mock_request.assert_called_once()
call_args = mock_request.call_args
assert call_args[0][0] == "POST"
assert "/rooms" in call_args[0][1]
@pytest.mark.asyncio
async def test_get_room_sessions_success(self, daily_client):
"""Test successful room sessions retrieval."""
mock_response = {
"data": [
{
"id": "session-1",
"room_name": "test-room",
"start_time": "2025-01-01T10:00:00Z",
"participants": [],
}
]
}
with patch.object(
daily_client, "_make_request", new_callable=AsyncMock
) as mock_request:
mock_request.return_value = mock_response
sessions = await daily_client.get_room_sessions("test-room")
assert isinstance(sessions, list)
assert len(sessions) == 1
assert sessions[0]["id"] == "session-1"
@pytest.mark.asyncio
async def test_delete_room_success(self, daily_client):
"""Test successful room deletion."""
with patch.object(
daily_client, "_make_request", new_callable=AsyncMock
) as mock_request:
mock_request.return_value = {"deleted": True}
result = await daily_client.delete_room("test-room")
assert result is True
mock_request.assert_called_once_with("DELETE", "/rooms/test-room")
def test_verify_webhook_signature_valid(self, daily_client):
"""Test webhook signature verification with valid HMAC."""
import hashlib
import hmac
payload = b'{"event": "participant.joined"}'
expected_signature = hmac.new(
daily_client.webhook_secret.encode(), payload, hashlib.sha256
).hexdigest()
result = daily_client.verify_webhook_signature(payload, expected_signature)
assert result is True
def test_verify_webhook_signature_invalid(self, daily_client):
"""Test webhook signature verification with invalid HMAC."""
payload = b'{"event": "participant.joined"}'
invalid_signature = "invalid-signature"
result = daily_client.verify_webhook_signature(payload, invalid_signature)
assert result is False
class TestWherebyClient:
"""Test Whereby platform client."""
@pytest.fixture
def whereby_client(self, config):
return WherebyClient(config)
@pytest.mark.asyncio
async def test_create_meeting_delegates_to_whereby_client(
self, whereby_client, mock_room
):
"""Test that create_meeting delegates to existing Whereby client."""
end_date = datetime.utcnow() + timedelta(hours=1)
mock_whereby_response = {
"roomUrl": "https://whereby.com/test-room",
"hostRoomUrl": "https://whereby.com/test-room?host",
"meetingId": "meeting-123",
}
with patch("reflector.video_platforms.whereby.whereby_client") as mock_client:
mock_client.create_meeting.return_value = mock_whereby_response
meeting_data = await whereby_client.create_meeting(
room_name_prefix="test", end_date=end_date, room=mock_room
)
assert isinstance(meeting_data, MeetingData)
assert meeting_data.room_url == "https://whereby.com/test-room"
assert meeting_data.host_room_url == "https://whereby.com/test-room?host"
assert meeting_data.meeting_id == "meeting-123"
@pytest.mark.asyncio
async def test_get_room_sessions_delegates_to_whereby_client(self, whereby_client):
"""Test that get_room_sessions delegates to existing Whereby client."""
mock_sessions = [{"id": "session-1"}]
with patch("reflector.video_platforms.whereby.whereby_client") as mock_client:
mock_client.get_room_sessions.return_value = mock_sessions
sessions = await whereby_client.get_room_sessions("test-room")
assert sessions == mock_sessions
def test_verify_webhook_signature_delegates_to_whereby_client(self, whereby_client):
"""Test that webhook verification delegates to existing Whereby client."""
payload = b'{"event": "test"}'
signature = "test-signature"
with patch("reflector.video_platforms.whereby.whereby_client") as mock_client:
mock_client.verify_webhook_signature.return_value = True
result = whereby_client.verify_webhook_signature(payload, signature)
assert result is True
mock_client.verify_webhook_signature.assert_called_once_with(
payload, signature
)
class TestPlatformIntegration:
"""Integration tests for platform switching."""
@pytest.mark.asyncio
async def test_platform_switching_preserves_interface(self, config, mock_room):
"""Test that different platforms provide consistent interface."""
end_date = datetime.utcnow() + timedelta(hours=1)
# Test with mock platform
mock_client = get_platform_client("mock", config)
mock_meeting = await mock_client.create_meeting("test", end_date, mock_room)
# Test with Daily platform (mocked)
daily_client = get_platform_client("daily", config)
with patch.object(
daily_client, "_make_request", new_callable=AsyncMock
) as mock_request:
mock_request.return_value = {
"url": "https://test.daily.co/test-room",
"name": "test-room",
"api_created": True,
}
daily_meeting = await daily_client.create_meeting(
"test", end_date, mock_room
)
# Both should return MeetingData objects with consistent fields
assert isinstance(mock_meeting, MeetingData)
assert isinstance(daily_meeting, MeetingData)
# Both should have required fields
for meeting in [mock_meeting, daily_meeting]:
assert hasattr(meeting, "room_url")
assert hasattr(meeting, "host_room_url")
assert hasattr(meeting, "room_name")
assert meeting.room_url.startswith("https://")

View File

@@ -0,0 +1 @@
# Test utilities

View File

@@ -0,0 +1,256 @@
"""Utilities for testing video platform functionality."""
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from typing import Any, Dict, Optional
from unittest.mock import AsyncMock, MagicMock, patch
from reflector.db.rooms import Room
from reflector.video_platforms.base import MeetingData, VideoPlatformConfig
from reflector.video_platforms.factory import create_platform_client
class MockVideoPlatformTestHelper:
"""Helper class for testing with mock video platforms."""
def __init__(self, platform: str = "mock"):
self.platform = platform
self.config = VideoPlatformConfig(
api_key="test-api-key",
webhook_secret="test-webhook-secret",
subdomain="test-subdomain",
)
self.client = create_platform_client(platform, self.config)
def create_mock_room(self, room_id: str = "test-room-123", **kwargs) -> MagicMock:
"""Create a mock room for testing."""
room = MagicMock(spec=Room)
room.id = room_id
room.name = kwargs.get("name", "Test Room")
room.recording_type = kwargs.get("recording_type", "cloud")
room.platform = kwargs.get("platform", self.platform)
return room
async def create_test_meeting(
self, room: Optional[Room] = None, **kwargs
) -> MeetingData:
"""Create a test meeting with default values."""
if room is None:
room = self.create_mock_room()
end_date = kwargs.get("end_date", datetime.utcnow() + timedelta(hours=1))
room_name_prefix = kwargs.get("room_name_prefix", "test")
return await self.client.create_meeting(room_name_prefix, end_date, room)
def create_webhook_event(
self, event_type: str, room_name: str = "test-room-123-abc", **kwargs
) -> Dict[str, Any]:
"""Create a webhook event payload for testing."""
if self.platform == "daily":
return self._create_daily_webhook_event(event_type, room_name, **kwargs)
elif self.platform == "whereby":
return self._create_whereby_webhook_event(event_type, room_name, **kwargs)
else:
return {"type": event_type, "room_name": room_name, **kwargs}
def _create_daily_webhook_event(
self, event_type: str, room_name: str, **kwargs
) -> Dict[str, Any]:
"""Create Daily.co-specific webhook event."""
base_event = {
"type": event_type,
"event_ts": int(datetime.utcnow().timestamp()),
"room": {"name": room_name},
}
if event_type == "participant.joined" or event_type == "participant.left":
base_event["participant"] = kwargs.get(
"participant",
{
"id": "participant-123",
"user_name": "Test User",
"session_id": "session-456",
},
)
if event_type.startswith("recording."):
base_event["recording"] = kwargs.get(
"recording",
{
"id": "recording-789",
"status": "finished" if "ready" in event_type else "recording",
"start_time": "2025-01-01T10:00:00Z",
},
)
if "ready" in event_type:
base_event["recording"]["download_url"] = (
"https://s3.amazonaws.com/bucket/recording.mp4"
)
base_event["recording"]["duration"] = 1800
return base_event
def _create_whereby_webhook_event(
self, event_type: str, room_name: str, **kwargs
) -> Dict[str, Any]:
"""Create Whereby-specific webhook event."""
# Whereby uses different event structure
return {
"event": event_type,
"roomName": room_name,
"timestamp": datetime.utcnow().isoformat(),
**kwargs,
}
def mock_platform_responses(self, platform: str, responses: Dict[str, Any]):
"""Context manager to mock platform API responses."""
if platform == "daily":
return self._mock_daily_responses(responses)
elif platform == "whereby":
return self._mock_whereby_responses(responses)
else:
return self._mock_generic_responses(responses)
@asynccontextmanager
async def _mock_daily_responses(self, responses: Dict[str, Any]):
"""Mock Daily.co API responses."""
with patch(
"reflector.video_platforms.daily.DailyPlatformClient._make_request"
) as mock_request:
mock_request.side_effect = lambda method, url, **kwargs: responses.get(
f"{method} {url}", {}
)
yield mock_request
@asynccontextmanager
async def _mock_whereby_responses(self, responses: Dict[str, Any]):
"""Mock Whereby API responses."""
with patch("reflector.video_platforms.whereby.whereby_client") as mock_client:
for method, response in responses.items():
setattr(mock_client, method, AsyncMock(return_value=response))
yield mock_client
@asynccontextmanager
async def _mock_generic_responses(self, responses: Dict[str, Any]):
"""Mock generic platform responses."""
yield responses
class IntegrationTestScenario:
"""Helper for running integration test scenarios across platforms."""
def __init__(self, platforms: list = None):
self.platforms = platforms or ["mock", "daily", "whereby"]
self.helpers = {
platform: MockVideoPlatformTestHelper(platform)
for platform in self.platforms
}
async def test_meeting_lifecycle(self, room_config: Dict[str, Any] = None):
"""Test complete meeting lifecycle across all platforms."""
results = {}
for platform in self.platforms:
helper = self.helpers[platform]
room = helper.create_mock_room(**(room_config or {}))
# Test meeting creation
meeting = await helper.create_test_meeting(room=room)
assert isinstance(meeting, MeetingData)
assert meeting.room_url.startswith("https://")
# Test room sessions
sessions = await helper.client.get_room_sessions(meeting.room_name)
assert isinstance(sessions, list)
# Test room deletion
deleted = await helper.client.delete_room(meeting.room_name)
assert deleted is True
results[platform] = {
"meeting": meeting,
"sessions": sessions,
"deleted": deleted,
}
return results
def test_webhook_signatures(self, payload: bytes = None):
"""Test webhook signature verification across platforms."""
if payload is None:
payload = b'{"event": "test"}'
results = {}
for platform in self.platforms:
helper = self.helpers[platform]
# Test valid signature
if platform == "mock":
valid_signature = "valid-signature"
else:
import hashlib
import hmac
valid_signature = hmac.new(
helper.config.webhook_secret.encode(), payload, hashlib.sha256
).hexdigest()
valid_result = helper.client.verify_webhook_signature(
payload, valid_signature
)
invalid_result = helper.client.verify_webhook_signature(
payload, "invalid-signature"
)
results[platform] = {"valid": valid_result, "invalid": invalid_result}
return results
def create_test_meeting_data(platform: str = "mock", **overrides) -> MeetingData:
"""Create test meeting data with platform-specific URLs."""
base_data = {"room_name": "test-room-123-abc", "meeting_id": "meeting-456"}
if platform == "daily":
base_data.update(
{
"room_url": "https://test.daily.co/test-room-123-abc",
"host_room_url": "https://test.daily.co/test-room-123-abc",
}
)
elif platform == "whereby":
base_data.update(
{
"room_url": "https://whereby.com/test-room-123-abc",
"host_room_url": "https://whereby.com/test-room-123-abc?host",
}
)
else: # mock
base_data.update(
{
"room_url": "https://mock.daily.co/test-room-123-abc",
"host_room_url": "https://mock.daily.co/test-room-123-abc",
}
)
base_data.update(overrides)
return MeetingData(**base_data)
def assert_meeting_data_valid(meeting_data: MeetingData, platform: str = None):
"""Assert that meeting data is valid for the given platform."""
assert isinstance(meeting_data, MeetingData)
assert meeting_data.room_url.startswith("https://")
assert meeting_data.host_room_url.startswith("https://")
assert isinstance(meeting_data.room_name, str)
assert len(meeting_data.room_name) > 0
if platform == "daily":
assert "daily.co" in meeting_data.room_url
elif platform == "whereby":
assert "whereby.com" in meeting_data.room_url
elif platform == "mock":
assert "mock.daily.co" in meeting_data.room_url