From dd021e9e71987115128f33e1a76b2dba085da690 Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Tue, 28 Jan 2025 12:41:23 +0100 Subject: [PATCH] Deactivate meeting when session ends --- compose.yml | 10 +++ .../b0e5f7876032_add_meeting_is_active.py | 34 ++++++++++ server/reflector/db/meetings.py | 65 ++++++++++--------- server/reflector/settings.py | 2 +- server/reflector/views/rooms.py | 8 +-- server/reflector/whereby.py | 26 ++++++-- server/reflector/worker/app.py | 6 +- server/reflector/worker/process.py | 22 +++++++ 8 files changed, 130 insertions(+), 43 deletions(-) create mode 100644 server/migrations/versions/b0e5f7876032_add_meeting_is_active.py diff --git a/compose.yml b/compose.yml index 2ba58fa0..2f8a8567 100644 --- a/compose.yml +++ b/compose.yml @@ -21,6 +21,16 @@ services: environment: ENTRYPOINT: worker + beat: + build: + context: server + volumes: + - ./server/data/:/app/data/ + env_file: + - ./server/.env + environment: + ENTRYPOINT: beat + redis: image: redis:7.2 ports: diff --git a/server/migrations/versions/b0e5f7876032_add_meeting_is_active.py b/server/migrations/versions/b0e5f7876032_add_meeting_is_active.py new file mode 100644 index 00000000..a3be5df2 --- /dev/null +++ b/server/migrations/versions/b0e5f7876032_add_meeting_is_active.py @@ -0,0 +1,34 @@ +"""add meeting is_active + +Revision ID: b0e5f7876032 +Revises: 6ea59639f30e +Create Date: 2025-01-28 10:06:50.446233 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision: str = 'b0e5f7876032' +down_revision: Union[str, None] = '6ea59639f30e' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('meeting', schema=None) as batch_op: + batch_op.add_column(sa.Column('is_active', sa.Boolean(), server_default=sa.text('1'), nullable=False)) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('meeting', schema=None) as batch_op: + batch_op.drop_column('is_active') + + # ### end Alembic commands ### diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index a8794db1..de19af1b 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -1,44 +1,43 @@ from datetime import datetime from typing import Literal -import sqlalchemy +import sqlalchemy as sa from fastapi import HTTPException from pydantic import BaseModel from reflector.db import database, metadata from reflector.db.rooms import Room -from sqlalchemy.sql import false -meetings = sqlalchemy.Table( +meetings = sa.Table( "meeting", metadata, - sqlalchemy.Column("id", sqlalchemy.String, primary_key=True), - sqlalchemy.Column("room_name", sqlalchemy.String), - sqlalchemy.Column("room_url", sqlalchemy.String), - sqlalchemy.Column("host_room_url", sqlalchemy.String), - sqlalchemy.Column("start_date", sqlalchemy.DateTime), - sqlalchemy.Column("end_date", sqlalchemy.DateTime), - sqlalchemy.Column("user_id", sqlalchemy.String), - sqlalchemy.Column("room_id", sqlalchemy.String), - sqlalchemy.Column( - "is_locked", sqlalchemy.Boolean, nullable=False, server_default=false() - ), - sqlalchemy.Column( - "room_mode", sqlalchemy.String, nullable=False, server_default="normal" - ), - sqlalchemy.Column( - "recording_type", sqlalchemy.String, nullable=False, server_default="cloud" - ), - sqlalchemy.Column( + sa.Column("id", sa.String, primary_key=True), + sa.Column("room_name", sa.String), + sa.Column("room_url", sa.String), + sa.Column("host_room_url", sa.String), + sa.Column("start_date", sa.DateTime), + sa.Column("end_date", sa.DateTime), + sa.Column("user_id", sa.String), + sa.Column("room_id", sa.String), + sa.Column("is_locked", sa.Boolean, nullable=False, server_default=sa.false()), + sa.Column("room_mode", sa.String, nullable=False, server_default="normal"), + sa.Column("recording_type", sa.String, nullable=False, server_default="cloud"), + sa.Column( "recording_trigger", - sqlalchemy.String, + sa.String, nullable=False, server_default="automatic-2nd-participant", ), - sqlalchemy.Column( + sa.Column( "num_clients", - sqlalchemy.Integer, + sa.Integer, nullable=False, - server_default=sqlalchemy.text("0"), + server_default=sa.text("0"), + ), + sa.Column( + "is_active", + sa.Boolean, + nullable=False, + server_default=sa.true(), ), ) @@ -94,6 +93,13 @@ class MeetingController: await database.execute(query) return meeting + async def get_all_active(self) -> list[Meeting]: + """ + Get active meetings. + """ + query = meetings.select().where(meetings.c.is_active == True) + return await database.fetch_all(query) + async def get_by_room_name( self, room_name: str, @@ -108,7 +114,7 @@ class MeetingController: return Meeting(**result) - async def get_latest(self, room: Room, current_time: datetime) -> Meeting: + async def get_active(self, room: Room, current_time: datetime) -> Meeting: """ Get latest meeting for a room. """ @@ -116,12 +122,9 @@ class MeetingController: query = ( meetings.select() .where( - sqlalchemy.and_( + sa.and_( meetings.c.room_id == room.id, - meetings.c.is_locked == room.is_locked, - meetings.c.room_mode == room.room_mode, - meetings.c.recording_type == room.recording_type, - meetings.c.recording_trigger == room.recording_trigger, + meetings.c.is_active == True, meetings.c.end_date > current_time, ) ) diff --git a/server/reflector/settings.py b/server/reflector/settings.py index d6dd842f..9fd8748e 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -139,7 +139,7 @@ class Settings(BaseSettings): AWS_PROCESS_RECORDING_QUEUE_URL: str | None = None - WHEREBY_API_URL: str = "https://api.whereby.dev/v1/meetings" + WHEREBY_API_URL: str = "https://api.whereby.dev/v1" WHEREBY_API_KEY: str | None = None diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index b02c17a9..b488c056 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -145,12 +145,10 @@ async def rooms_create_meeting( raise HTTPException(status_code=404, detail="Room not found") current_time = datetime.utcnow() - meeting = await meetings_controller.get_latest(room=room, current_time=current_time) + meeting = await meetings_controller.get_active(room=room, current_time=current_time) if meeting is None: - end_date = datetime( - current_time.year, current_time.month, current_time.day, 5 - ) + timedelta(days=1) + end_date = current_time + timedelta(hours=8) meeting = await create_meeting("", end_date=end_date, room=room) meeting = await meetings_controller.create( @@ -164,7 +162,7 @@ async def rooms_create_meeting( room=room, ) - if user_id is None: + if user_id != room.user_id: meeting.host_room_url = "" return meeting diff --git a/server/reflector/whereby.py b/server/reflector/whereby.py index 85ece6f4..2502d59d 100644 --- a/server/reflector/whereby.py +++ b/server/reflector/whereby.py @@ -4,12 +4,14 @@ import httpx from reflector.db.rooms import Room from reflector.settings import settings +HEADERS = { + "Content-Type": "application/json; charset=utf-8", + "Authorization": f"Bearer {settings.WHEREBY_API_KEY}", +} +TIMEOUT = 10 # seconds + async def create_meeting(room_name_prefix: str, end_date: datetime, room: Room): - headers = { - "Content-Type": "application/json; charset=utf-8", - "Authorization": f"Bearer {settings.WHEREBY_API_KEY}", - } data = { "isLocked": room.is_locked, "roomNamePrefix": room_name_prefix, @@ -32,7 +34,21 @@ async def create_meeting(room_name_prefix: str, end_date: datetime, room: Room): async with httpx.AsyncClient() as client: response = await client.post( - settings.WHEREBY_API_URL, headers=headers, json=data, timeout=10 + f"{settings.WHEREBY_API_URL}/meetings", + headers=HEADERS, + json=data, + timeout=TIMEOUT, + ) + response.raise_for_status() + return response.json() + + +async def get_room_sessions(room_name: str): + async with httpx.AsyncClient() as client: + response = await client.get( + f"{settings.WHEREBY_API_URL}/insights/room-sessions?roomName={room_name}", + headers=HEADERS, + timeout=TIMEOUT, ) response.raise_for_status() return response.json() diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index 3fb65c4e..72421171 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -25,7 +25,11 @@ else: "process_messages": { "task": "reflector.worker.process.process_messages", "schedule": 60.0, - } + }, + "process_meetings": { + "task": "reflector.worker.process.process_meetings", + "schedule": 60.0, + }, } if settings.HEALTHCHECK_URL: diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 5c8b791e..8f9db2f6 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -1,5 +1,6 @@ import json import os +from datetime import datetime from urllib.parse import unquote import av @@ -12,6 +13,7 @@ from reflector.db.rooms import rooms_controller from reflector.db.transcripts import SourceKind, transcripts_controller from reflector.pipelines.main_live_pipeline import asynctask, task_pipeline_process from reflector.settings import settings +from reflector.whereby import get_room_sessions logger = structlog.wrap_logger(get_task_logger(__name__)) @@ -104,3 +106,23 @@ async def process_recording(bucket_name: str, object_key: str): await transcripts_controller.update(transcript, {"status": "uploaded"}) task_pipeline_process.delay(transcript_id=transcript.id) + + +@shared_task +@asynctask +async def process_meetings(): + logger.info("Processing meetings") + meetings = await meetings_controller.get_all_active() + for meeting in meetings: + is_active = False + if meeting.end_date > datetime.utcnow(): + response = await get_room_sessions(meeting.room_name) + room_sessions = response.get("results", []) + is_active = not room_sessions or any( + rs["endedAt"] is None for rs in room_sessions + ) + if not is_active: + await meetings_controller.update_meeting(meeting.id, is_active=False) + logger.info("Meeting %s is deactivated", meeting.id) + + logger.info("Processed meetings")