From 159bd82e1c83e3854b1e6d762b3b9c91e2dc8669 Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Tue, 24 Dec 2024 14:18:35 +0100 Subject: [PATCH] Create new meeting after previous has ended --- ...6ea59639f30e_add_num_clients_to_meeting.py | 31 ++++++++ server/reflector/app.py | 2 + server/reflector/db/meetings.py | 17 ++++- server/reflector/settings.py | 2 + server/reflector/views/rooms.py | 8 +- server/reflector/views/whereby.py | 74 +++++++++++++++++++ 6 files changed, 128 insertions(+), 6 deletions(-) create mode 100644 server/migrations/versions/6ea59639f30e_add_num_clients_to_meeting.py create mode 100644 server/reflector/views/whereby.py diff --git a/server/migrations/versions/6ea59639f30e_add_num_clients_to_meeting.py b/server/migrations/versions/6ea59639f30e_add_num_clients_to_meeting.py new file mode 100644 index 00000000..a0a2391c --- /dev/null +++ b/server/migrations/versions/6ea59639f30e_add_num_clients_to_meeting.py @@ -0,0 +1,31 @@ +"""add num_clients to meeting + +Revision ID: 6ea59639f30e +Revises: b469348df210 +Create Date: 2024-12-24 10:50:03.109729 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "6ea59639f30e" +down_revision: Union[str, None] = "b469348df210" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column( + "meeting", + sa.Column( + "num_clients", sa.Integer, nullable=False, server_default=sa.text("0") + ), + ) + + +def downgrade() -> None: + op.drop_column("meeting", "num_clients") diff --git a/server/reflector/app.py b/server/reflector/app.py index fc9fbdd9..079a5efe 100644 --- a/server/reflector/app.py +++ b/server/reflector/app.py @@ -24,6 +24,7 @@ from reflector.views.transcripts_upload import router as transcripts_upload_rout from reflector.views.transcripts_webrtc import router as transcripts_webrtc_router from reflector.views.transcripts_websocket import router as transcripts_websocket_router from reflector.views.user import router as user_router +from reflector.views.whereby import router as whereby_router from reflector.views.zulip import router as zulip_router try: @@ -81,6 +82,7 @@ app.include_router(transcripts_webrtc_router, prefix="/v1") app.include_router(transcripts_process_router, prefix="/v1") app.include_router(user_router, prefix="/v1") app.include_router(zulip_router, prefix="/v1") +app.include_router(whereby_router, prefix="/v1") add_pagination(app) # prepare celery diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index 2ed86173..a8794db1 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -1,4 +1,4 @@ -from datetime import datetime, timezone +from datetime import datetime from typing import Literal import sqlalchemy @@ -34,6 +34,12 @@ meetings = sqlalchemy.Table( nullable=False, server_default="automatic-2nd-participant", ), + sqlalchemy.Column( + "num_clients", + sqlalchemy.Integer, + nullable=False, + server_default=sqlalchemy.text("0"), + ), ) @@ -52,6 +58,7 @@ class Meeting(BaseModel): recording_trigger: Literal[ "none", "prompt", "automatic", "automatic-2nd-participant" ] = "automatic-2nd-participant" + num_clients: int = 0 class MeetingController: @@ -101,7 +108,7 @@ class MeetingController: return Meeting(**result) - async def get_latest(self, room: Room) -> Meeting: + async def get_latest(self, room: Room, current_time: datetime) -> Meeting: """ Get latest meeting for a room. """ @@ -115,7 +122,7 @@ class MeetingController: meetings.c.room_mode == room.room_mode, meetings.c.recording_type == room.recording_type, meetings.c.recording_trigger == room.recording_trigger, - meetings.c.end_date > datetime.now(timezone.utc), + meetings.c.end_date > current_time, ) ) .order_by(end_date.desc()) @@ -153,5 +160,9 @@ class MeetingController: return meeting + async def update_meeting(self, meeting_id: str, **kwargs): + query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs) + await database.execute(query) + meetings_controller = MeetingController() diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 4b24ba21..d6dd842f 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -153,5 +153,7 @@ class Settings(BaseSettings): UI_BASE_URL: str = "http://localhost:3000" + WHEREBY_WEBHOOK_SECRET: str | None = None + settings = Settings() diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index bd52b9a0..deb89a17 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -144,9 +144,11 @@ async def rooms_create_meeting( if not room: raise HTTPException(status_code=404, detail="Room not found") - meeting = await meetings_controller.get_latest(room=room) - if meeting is None: - start_date = datetime.now(timezone.utc) + current_time = datetime.utcnow() + meeting = await meetings_controller.get_latest(room=room, current_time=current_time) + + if meeting is None or meeting.num_clients == 0: + start_date = current_time end_date = start_date + timedelta(hours=1) meeting = await create_meeting( "", start_date=start_date, end_date=end_date, room=room diff --git a/server/reflector/views/whereby.py b/server/reflector/views/whereby.py new file mode 100644 index 00000000..c4d23e0b --- /dev/null +++ b/server/reflector/views/whereby.py @@ -0,0 +1,74 @@ +import hmac +import json +import re +import time +from datetime import datetime +from hashlib import sha256 + +from fastapi import APIRouter, HTTPException, Request +from pydantic import BaseModel +from reflector.db.meetings import meetings_controller +from reflector.settings import settings + +router = APIRouter() + + +class WherebyWebhookEvent(BaseModel): + apiVersion: str + id: str + createdAt: datetime + type: str + data: dict + + +MAX_ELAPSED_TIME = 60 * 1000 # 1 minute in milliseconds + + +def is_webhook_event_valid(body: dict, signature: str) -> bool: + """Validate Whereby webhook signature and timestamp.""" + if not signature: + return False + + matches = re.match(r"t=(.*),v1=(.*)", signature) + if not matches: + return False + + timestamp, signature = matches.groups() + + current_time = int(time.time() * 1000) + diff_time = current_time - int(timestamp) * 1000 + if diff_time >= MAX_ELAPSED_TIME: + return False + + signed_payload = f"{timestamp}.{json.dumps(body, separators=(',', ':'))}" + hmac_obj = hmac.new( + settings.WHEREBY_WEBHOOK_SECRET.encode("utf-8"), + signed_payload.encode("utf-8"), + sha256, + ) + expected_signature = hmac_obj.hexdigest() + try: + return hmac.compare_digest( + expected_signature.encode("utf-8"), signature.encode("utf-8") + ) + except Exception: + return False + + +@router.post("/whereby") +async def whereby_webhook(event: WherebyWebhookEvent, request: Request): + if not is_webhook_event_valid( + await request.json(), request.headers["whereby-signature"] + ): + raise HTTPException(status_code=401, detail="Invalid webhook signature") + + meeting = await meetings_controller.get_by_id(event.data["meetingId"]) + if not meeting: + raise HTTPException(status_code=404, detail="Meeting not found") + + if event.type in ["room.client.joined", "room.client.left"]: + await meetings_controller.update_meeting( + meeting.id, num_clients=event.data["numClients"] + ) + + return {"status": "ok"}