diff --git a/server/reflector/video_platforms/factory.py b/server/reflector/video_platforms/factory.py index 8f58aed0..1f7500a0 100644 --- a/server/reflector/video_platforms/factory.py +++ b/server/reflector/video_platforms/factory.py @@ -16,6 +16,7 @@ def get_platform_config(platform: str) -> VideoPlatformConfig: api_key=settings.WHEREBY_API_KEY or "", webhook_secret=settings.WHEREBY_WEBHOOK_SECRET or "", api_url=settings.WHEREBY_API_URL, + s3_bucket=settings.RECORDING_STORAGE_AWS_BUCKET_NAME, aws_access_key_id=settings.AWS_WHEREBY_ACCESS_KEY_ID, aws_access_key_secret=settings.AWS_WHEREBY_ACCESS_KEY_SECRET, ) diff --git a/server/reflector/video_platforms/registry.py b/server/reflector/video_platforms/registry.py index a080b7bf..c7ea1fc7 100644 --- a/server/reflector/video_platforms/registry.py +++ b/server/reflector/video_platforms/registry.py @@ -31,8 +31,10 @@ def get_available_platforms() -> list[str]: # Auto-register built-in platforms def _register_builtin_platforms(): from .jitsi import JitsiClient + from .whereby import WherebyClient register_platform("jitsi", JitsiClient) + register_platform("whereby", WherebyClient) _register_builtin_platforms() diff --git a/server/reflector/video_platforms/whereby/__init__.py b/server/reflector/video_platforms/whereby/__init__.py new file mode 100644 index 00000000..3cd2ab86 --- /dev/null +++ b/server/reflector/video_platforms/whereby/__init__.py @@ -0,0 +1,5 @@ +"""Whereby video platform integration.""" + +from .client import WherebyClient + +__all__ = ["WherebyClient"] diff --git a/server/reflector/video_platforms/whereby/client.py b/server/reflector/video_platforms/whereby/client.py new file mode 100644 index 00000000..21a18252 --- /dev/null +++ b/server/reflector/video_platforms/whereby/client.py @@ -0,0 +1,120 @@ +import hmac +from datetime import datetime +from hashlib import sha256 +from typing import Any, Dict, Optional + +import httpx + +from reflector.db.rooms import Room, VideoPlatform +from reflector.settings import settings + +from ..base import MeetingData, VideoPlatformClient + + +class WherebyClient(VideoPlatformClient): + """Whereby video platform implementation.""" + + PLATFORM_NAME = VideoPlatform.WHEREBY + + def __init__(self, config): + super().__init__(config) + self.headers = { + "Content-Type": "application/json; charset=utf-8", + "Authorization": f"Bearer {self.config.api_key}", + } + self.timeout = 10 + + async def create_meeting( + self, room_name_prefix: str, end_date: datetime, room: Room + ) -> MeetingData: + """Create a Whereby meeting room.""" + data = { + "isLocked": room.is_locked, + "roomNamePrefix": room_name_prefix, + "roomNamePattern": "uuid", + "roomMode": room.room_mode, + "endDate": end_date.isoformat(), + "recording": { + "type": room.recording_type, + "destination": { + "provider": "s3", + "bucket": settings.RECORDING_STORAGE_AWS_BUCKET_NAME, + "accessKeyId": self.config.aws_access_key_id, + "accessKeySecret": self.config.aws_access_key_secret, + "fileFormat": "mp4", + }, + "startTrigger": room.recording_trigger, + }, + "fields": ["hostRoomUrl"], + } + + async with httpx.AsyncClient() as client: + response = await client.post( + f"{self.config.api_url}/meetings", + headers=self.headers, + json=data, + timeout=self.timeout, + ) + response.raise_for_status() + meeting_data = response.json() + + return MeetingData( + meeting_id=meeting_data["meetingId"], + room_name=meeting_data["roomName"], + room_url=meeting_data["roomUrl"], + host_room_url=meeting_data["hostRoomUrl"], + platform=self.PLATFORM_NAME, + extra_data={ + "startDate": meeting_data["startDate"], + "endDate": meeting_data["endDate"], + "recording": meeting_data.get("recording", {}), + }, + ) + + async def get_room_sessions(self, room_name: str) -> Dict[str, Any]: + """Get session information for a room.""" + async with httpx.AsyncClient() as client: + response = await client.get( + f"{self.config.api_url}/insights/room-sessions?roomName={room_name}", + headers=self.headers, + timeout=self.timeout, + ) + response.raise_for_status() + return response.json() + + async def delete_room(self, room_name: str) -> bool: + """Delete a room. Whereby rooms auto-expire, so this is a no-op.""" + return True + + async def upload_logo(self, room_name: str, logo_path: str) -> bool: + """Upload a logo to the room.""" + try: + async with httpx.AsyncClient() as client: + with open(logo_path, "rb") as f: + response = await client.put( + f"{self.config.api_url}/rooms{room_name}/theme/logo", + headers={ + "Authorization": f"Bearer {self.config.api_key}", + }, + timeout=self.timeout, + files={"image": f}, + ) + response.raise_for_status() + return True + except Exception: + return False + + def verify_webhook_signature( + self, body: bytes, signature: str, timestamp: Optional[str] = None + ) -> bool: + """Verify webhook signature for Whereby webhooks.""" + if not signature or not self.config.webhook_secret: + return False + + try: + expected = hmac.new( + self.config.webhook_secret.encode(), body, sha256 + ).hexdigest() + return hmac.compare_digest(expected, signature) + except Exception: + return False diff --git a/server/reflector/video_platforms/whereby/tasks.py b/server/reflector/video_platforms/whereby/tasks.py new file mode 100644 index 00000000..5314c68b --- /dev/null +++ b/server/reflector/video_platforms/whereby/tasks.py @@ -0,0 +1,4 @@ +"""Whereby-specific worker tasks.""" + +# Placeholder for Whereby-specific background tasks +# This can be extended with Whereby-specific processing tasks in the future diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 00126514..79eaccd8 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -17,7 +17,7 @@ from reflector.db.transcripts import SourceKind, transcripts_controller from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_live_pipeline import asynctask from reflector.settings import settings -from reflector.whereby import get_room_sessions +from reflector.video_platforms.factory import create_platform_client logger = structlog.wrap_logger(get_task_logger(__name__)) @@ -155,11 +155,18 @@ async def process_meetings(): if end_date.tzinfo is None: end_date = end_date.replace(tzinfo=timezone.utc) if end_date > datetime.now(timezone.utc): - response = await get_room_sessions(meeting.room_name) - room_sessions = response.get("results", []) - is_active = not room_sessions or any( - rs["endedAt"] is None for rs in room_sessions - ) + # Get room sessions using platform client + platform = getattr(meeting, "platform", "whereby") + client = create_platform_client(platform) + if client: + response = await client.get_room_sessions(meeting.room_name) + room_sessions = response.get("results", []) + is_active = not room_sessions or any( + rs["endedAt"] is None for rs in room_sessions + ) + else: + # Fallback: assume meeting is still active if we can't check + is_active = True if not is_active: await meetings_controller.update_meeting(meeting.id, is_active=False) logger.info("Meeting %s is deactivated", meeting.id)