Files
reflector/server/reflector/video_platforms/whereby.py
Igor Monadical 8e438ca285 feat: dailyco poll (#730)
* dailyco api module (no-mistakes)

* daily co library self-review

* uncurse

* self-review: daily resource leak, uniform types, enable_recording bomb, daily custom error, video_platforms/daily typing, daily timestamp dry

* dailyco docs parser

* phase 1-2 of daily poll

* dailyco poll (no-mistakes)

* poll docs

* fix tests

* forgotten utils file

* remove generated daily docs

* pr comments

* dailyco poll pr review and self-review

* daily recording poll api fix

* daily recording poll api fix

* review

* review

* fix tests

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-24 22:24:03 -05:00

171 lines
5.9 KiB
Python

import hmac
import json
import re
import time
from datetime import datetime
from hashlib import sha256
from typing import Optional
import httpx
from reflector.db.rooms import Room
from reflector.storage import get_whereby_storage
from ..schemas.platform import WHEREBY_PLATFORM, Platform
from ..utils.string import NonEmptyString
from .base import VideoPlatformClient
from .models import MeetingData, SessionData, VideoPlatformConfig
from .whereby_utils import whereby_room_name_prefix
class WherebyClient(VideoPlatformClient):
PLATFORM_NAME: Platform = WHEREBY_PLATFORM
TIMEOUT = 10 # seconds
MAX_ELAPSED_TIME = 60 * 1000 # 1 minute in milliseconds
def __init__(self, config: VideoPlatformConfig):
super().__init__(config)
self.headers = {
"Content-Type": "application/json; charset=utf-8",
"Authorization": f"Bearer {config.api_key}",
}
async def create_meeting(
self, room_name_prefix: NonEmptyString, end_date: datetime, room: Room
) -> MeetingData:
data = {
"isLocked": room.is_locked,
"roomNamePrefix": whereby_room_name_prefix(room_name_prefix),
"roomNamePattern": "uuid",
"roomMode": room.room_mode,
"endDate": end_date.isoformat(),
"fields": ["hostRoomUrl"],
}
if room.recording_type == "cloud":
# Get storage config for passing credentials to Whereby API
whereby_storage = get_whereby_storage()
key_id, secret = whereby_storage.key_credentials
data["recording"] = {
"type": room.recording_type,
"destination": {
"provider": "s3",
"bucket": whereby_storage.bucket_name,
"accessKeyId": key_id,
"accessKeySecret": secret,
"fileFormat": "mp4",
},
"startTrigger": room.recording_trigger,
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.config.api_url}/meetings",
headers=self.headers,
json=data,
timeout=self.TIMEOUT,
)
response.raise_for_status()
result = response.json()
return MeetingData(
meeting_id=result["meetingId"],
room_name=result["roomName"],
room_url=result["roomUrl"],
host_room_url=result["hostRoomUrl"],
platform=self.PLATFORM_NAME,
extra_data=result,
)
async def get_room_sessions(self, room_name: str) -> list[SessionData]:
"""Get room session history from Whereby API.
Whereby API returns: [{"sessionId": "...", "startedAt": "...", "endedAt": "..." | null}, ...]
"""
async with httpx.AsyncClient() as client:
"""
{
"cursor": "text",
"results": [
{
"roomSessionId": "e2f29530-46ec-4cee-8b27-e565cb5bb2e9",
"roomName": "/room-prefix-793e9ec1-c686-423d-9043-9b7a10c553fd",
"startedAt": "2025-01-01T00:00:00.000Z",
"endedAt": "2025-01-01T01:00:00.000Z",
"totalParticipantMinutes": 124,
"totalRecorderMinutes": 120,
"totalStreamerMinutes": 120,
"totalUniqueParticipants": 4,
"totalUniqueRecorders": 3,
"totalUniqueStreamers": 2
}
]
}"""
response = await client.get(
f"{self.config.api_url}/insights/room-sessions?roomName={room_name}",
headers=self.headers,
timeout=self.TIMEOUT,
)
response.raise_for_status()
results = response.json().get("results", [])
return [
SessionData(
session_id=s["roomSessionId"],
started_at=datetime.fromisoformat(
s["startedAt"].replace("Z", "+00:00")
),
ended_at=datetime.fromisoformat(s["endedAt"].replace("Z", "+00:00"))
if s.get("endedAt")
else None,
)
for s in results
]
async def upload_logo(self, room_name: str, logo_path: str) -> bool:
async with httpx.AsyncClient() as client:
with open(logo_path, "rb") as f:
response = await client.put(
f"{self.config.api_url}/rooms/{room_name}/theme/logo",
headers={
"Authorization": f"Bearer {self.config.api_key}",
},
timeout=self.TIMEOUT,
files={"image": f},
)
response.raise_for_status()
return True
def verify_webhook_signature(
self, body: bytes, signature: str, timestamp: Optional[str] = None
) -> bool:
if not signature:
return False
matches = re.match(r"t=(.*),v1=(.*)", signature)
if not matches:
return False
ts, sig = matches.groups()
current_time = int(time.time() * 1000)
diff_time = current_time - int(ts) * 1000
if diff_time >= self.MAX_ELAPSED_TIME:
return False
body_dict = json.loads(body)
signed_payload = f"{ts}.{json.dumps(body_dict, separators=(',', ':'))}"
hmac_obj = hmac.new(
self.config.webhook_secret.encode("utf-8"),
signed_payload.encode("utf-8"),
sha256,
)
expected_signature = hmac_obj.hexdigest()
try:
return hmac.compare_digest(
expected_signature.encode("utf-8"), sig.encode("utf-8")
)
except Exception:
return False