mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 12:19:06 +00:00
Deactivate meeting when session ends
This commit is contained in:
10
compose.yml
10
compose.yml
@@ -21,6 +21,16 @@ services:
|
||||
environment:
|
||||
ENTRYPOINT: worker
|
||||
|
||||
beat:
|
||||
build:
|
||||
context: server
|
||||
volumes:
|
||||
- ./server/data/:/app/data/
|
||||
env_file:
|
||||
- ./server/.env
|
||||
environment:
|
||||
ENTRYPOINT: beat
|
||||
|
||||
redis:
|
||||
image: redis:7.2
|
||||
ports:
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
"""add meeting is_active
|
||||
|
||||
Revision ID: b0e5f7876032
|
||||
Revises: 6ea59639f30e
|
||||
Create Date: 2025-01-28 10:06:50.446233
|
||||
|
||||
"""
|
||||
from typing import Sequence, Union
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = 'b0e5f7876032'
|
||||
down_revision: Union[str, None] = '6ea59639f30e'
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('meeting', schema=None) as batch_op:
|
||||
batch_op.add_column(sa.Column('is_active', sa.Boolean(), server_default=sa.text('1'), nullable=False))
|
||||
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
with op.batch_alter_table('meeting', schema=None) as batch_op:
|
||||
batch_op.drop_column('is_active')
|
||||
|
||||
# ### end Alembic commands ###
|
||||
@@ -1,44 +1,43 @@
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
|
||||
import sqlalchemy
|
||||
import sqlalchemy as sa
|
||||
from fastapi import HTTPException
|
||||
from pydantic import BaseModel
|
||||
from reflector.db import database, metadata
|
||||
from reflector.db.rooms import Room
|
||||
from sqlalchemy.sql import false
|
||||
|
||||
meetings = sqlalchemy.Table(
|
||||
meetings = sa.Table(
|
||||
"meeting",
|
||||
metadata,
|
||||
sqlalchemy.Column("id", sqlalchemy.String, primary_key=True),
|
||||
sqlalchemy.Column("room_name", sqlalchemy.String),
|
||||
sqlalchemy.Column("room_url", sqlalchemy.String),
|
||||
sqlalchemy.Column("host_room_url", sqlalchemy.String),
|
||||
sqlalchemy.Column("start_date", sqlalchemy.DateTime),
|
||||
sqlalchemy.Column("end_date", sqlalchemy.DateTime),
|
||||
sqlalchemy.Column("user_id", sqlalchemy.String),
|
||||
sqlalchemy.Column("room_id", sqlalchemy.String),
|
||||
sqlalchemy.Column(
|
||||
"is_locked", sqlalchemy.Boolean, nullable=False, server_default=false()
|
||||
),
|
||||
sqlalchemy.Column(
|
||||
"room_mode", sqlalchemy.String, nullable=False, server_default="normal"
|
||||
),
|
||||
sqlalchemy.Column(
|
||||
"recording_type", sqlalchemy.String, nullable=False, server_default="cloud"
|
||||
),
|
||||
sqlalchemy.Column(
|
||||
sa.Column("id", sa.String, primary_key=True),
|
||||
sa.Column("room_name", sa.String),
|
||||
sa.Column("room_url", sa.String),
|
||||
sa.Column("host_room_url", sa.String),
|
||||
sa.Column("start_date", sa.DateTime),
|
||||
sa.Column("end_date", sa.DateTime),
|
||||
sa.Column("user_id", sa.String),
|
||||
sa.Column("room_id", sa.String),
|
||||
sa.Column("is_locked", sa.Boolean, nullable=False, server_default=sa.false()),
|
||||
sa.Column("room_mode", sa.String, nullable=False, server_default="normal"),
|
||||
sa.Column("recording_type", sa.String, nullable=False, server_default="cloud"),
|
||||
sa.Column(
|
||||
"recording_trigger",
|
||||
sqlalchemy.String,
|
||||
sa.String,
|
||||
nullable=False,
|
||||
server_default="automatic-2nd-participant",
|
||||
),
|
||||
sqlalchemy.Column(
|
||||
sa.Column(
|
||||
"num_clients",
|
||||
sqlalchemy.Integer,
|
||||
sa.Integer,
|
||||
nullable=False,
|
||||
server_default=sqlalchemy.text("0"),
|
||||
server_default=sa.text("0"),
|
||||
),
|
||||
sa.Column(
|
||||
"is_active",
|
||||
sa.Boolean,
|
||||
nullable=False,
|
||||
server_default=sa.true(),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -94,6 +93,13 @@ class MeetingController:
|
||||
await database.execute(query)
|
||||
return meeting
|
||||
|
||||
async def get_all_active(self) -> list[Meeting]:
|
||||
"""
|
||||
Get active meetings.
|
||||
"""
|
||||
query = meetings.select().where(meetings.c.is_active == True)
|
||||
return await database.fetch_all(query)
|
||||
|
||||
async def get_by_room_name(
|
||||
self,
|
||||
room_name: str,
|
||||
@@ -108,7 +114,7 @@ class MeetingController:
|
||||
|
||||
return Meeting(**result)
|
||||
|
||||
async def get_latest(self, room: Room, current_time: datetime) -> Meeting:
|
||||
async def get_active(self, room: Room, current_time: datetime) -> Meeting:
|
||||
"""
|
||||
Get latest meeting for a room.
|
||||
"""
|
||||
@@ -116,12 +122,9 @@ class MeetingController:
|
||||
query = (
|
||||
meetings.select()
|
||||
.where(
|
||||
sqlalchemy.and_(
|
||||
sa.and_(
|
||||
meetings.c.room_id == room.id,
|
||||
meetings.c.is_locked == room.is_locked,
|
||||
meetings.c.room_mode == room.room_mode,
|
||||
meetings.c.recording_type == room.recording_type,
|
||||
meetings.c.recording_trigger == room.recording_trigger,
|
||||
meetings.c.is_active == True,
|
||||
meetings.c.end_date > current_time,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -139,7 +139,7 @@ class Settings(BaseSettings):
|
||||
|
||||
AWS_PROCESS_RECORDING_QUEUE_URL: str | None = None
|
||||
|
||||
WHEREBY_API_URL: str = "https://api.whereby.dev/v1/meetings"
|
||||
WHEREBY_API_URL: str = "https://api.whereby.dev/v1"
|
||||
|
||||
WHEREBY_API_KEY: str | None = None
|
||||
|
||||
|
||||
@@ -145,12 +145,10 @@ async def rooms_create_meeting(
|
||||
raise HTTPException(status_code=404, detail="Room not found")
|
||||
|
||||
current_time = datetime.utcnow()
|
||||
meeting = await meetings_controller.get_latest(room=room, current_time=current_time)
|
||||
meeting = await meetings_controller.get_active(room=room, current_time=current_time)
|
||||
|
||||
if meeting is None:
|
||||
end_date = datetime(
|
||||
current_time.year, current_time.month, current_time.day, 5
|
||||
) + timedelta(days=1)
|
||||
end_date = current_time + timedelta(hours=8)
|
||||
meeting = await create_meeting("", end_date=end_date, room=room)
|
||||
|
||||
meeting = await meetings_controller.create(
|
||||
@@ -164,7 +162,7 @@ async def rooms_create_meeting(
|
||||
room=room,
|
||||
)
|
||||
|
||||
if user_id is None:
|
||||
if user_id != room.user_id:
|
||||
meeting.host_room_url = ""
|
||||
|
||||
return meeting
|
||||
|
||||
@@ -4,12 +4,14 @@ import httpx
|
||||
from reflector.db.rooms import Room
|
||||
from reflector.settings import settings
|
||||
|
||||
|
||||
async def create_meeting(room_name_prefix: str, end_date: datetime, room: Room):
|
||||
headers = {
|
||||
HEADERS = {
|
||||
"Content-Type": "application/json; charset=utf-8",
|
||||
"Authorization": f"Bearer {settings.WHEREBY_API_KEY}",
|
||||
}
|
||||
}
|
||||
TIMEOUT = 10 # seconds
|
||||
|
||||
|
||||
async def create_meeting(room_name_prefix: str, end_date: datetime, room: Room):
|
||||
data = {
|
||||
"isLocked": room.is_locked,
|
||||
"roomNamePrefix": room_name_prefix,
|
||||
@@ -32,7 +34,21 @@ async def create_meeting(room_name_prefix: str, end_date: datetime, room: Room):
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
settings.WHEREBY_API_URL, headers=headers, json=data, timeout=10
|
||||
f"{settings.WHEREBY_API_URL}/meetings",
|
||||
headers=HEADERS,
|
||||
json=data,
|
||||
timeout=TIMEOUT,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
|
||||
async def get_room_sessions(room_name: str):
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
f"{settings.WHEREBY_API_URL}/insights/room-sessions?roomName={room_name}",
|
||||
headers=HEADERS,
|
||||
timeout=TIMEOUT,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
@@ -25,7 +25,11 @@ else:
|
||||
"process_messages": {
|
||||
"task": "reflector.worker.process.process_messages",
|
||||
"schedule": 60.0,
|
||||
}
|
||||
},
|
||||
"process_meetings": {
|
||||
"task": "reflector.worker.process.process_meetings",
|
||||
"schedule": 60.0,
|
||||
},
|
||||
}
|
||||
|
||||
if settings.HEALTHCHECK_URL:
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import json
|
||||
import os
|
||||
from datetime import datetime
|
||||
from urllib.parse import unquote
|
||||
|
||||
import av
|
||||
@@ -12,6 +13,7 @@ from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.transcripts import SourceKind, transcripts_controller
|
||||
from reflector.pipelines.main_live_pipeline import asynctask, task_pipeline_process
|
||||
from reflector.settings import settings
|
||||
from reflector.whereby import get_room_sessions
|
||||
|
||||
logger = structlog.wrap_logger(get_task_logger(__name__))
|
||||
|
||||
@@ -104,3 +106,23 @@ async def process_recording(bucket_name: str, object_key: str):
|
||||
await transcripts_controller.update(transcript, {"status": "uploaded"})
|
||||
|
||||
task_pipeline_process.delay(transcript_id=transcript.id)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def process_meetings():
|
||||
logger.info("Processing meetings")
|
||||
meetings = await meetings_controller.get_all_active()
|
||||
for meeting in meetings:
|
||||
is_active = False
|
||||
if meeting.end_date > datetime.utcnow():
|
||||
response = await get_room_sessions(meeting.room_name)
|
||||
room_sessions = response.get("results", [])
|
||||
is_active = not room_sessions or any(
|
||||
rs["endedAt"] is None for rs in room_sessions
|
||||
)
|
||||
if not is_active:
|
||||
await meetings_controller.update_meeting(meeting.id, is_active=False)
|
||||
logger.info("Meeting %s is deactivated", meeting.id)
|
||||
|
||||
logger.info("Processed meetings")
|
||||
|
||||
Reference in New Issue
Block a user