mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-22 05:09:05 +00:00
feat: daily QOL: participants dictionary (#721)
* daily QOL: participants dictionary * meeting deactivation fix * meeting deactivation fix --------- Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
This commit is contained in:
@@ -25,6 +25,7 @@ def get_database() -> databases.Database:
|
||||
|
||||
# import models
|
||||
import reflector.db.calendar_events # noqa
|
||||
import reflector.db.daily_participant_sessions # noqa
|
||||
import reflector.db.meetings # noqa
|
||||
import reflector.db.recordings # noqa
|
||||
import reflector.db.rooms # noqa
|
||||
|
||||
169
server/reflector/db/daily_participant_sessions.py
Normal file
169
server/reflector/db/daily_participant_sessions.py
Normal file
@@ -0,0 +1,169 @@
|
||||
"""Daily.co participant session tracking.
|
||||
|
||||
Stores webhook data for participant.joined and participant.left events to provide
|
||||
historical session information (Daily.co API only returns current participants).
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
import sqlalchemy as sa
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy.dialects.postgresql import insert
|
||||
|
||||
from reflector.db import get_database, metadata
|
||||
from reflector.utils.string import NonEmptyString
|
||||
|
||||
daily_participant_sessions = sa.Table(
|
||||
"daily_participant_session",
|
||||
metadata,
|
||||
sa.Column("id", sa.String, primary_key=True),
|
||||
sa.Column(
|
||||
"meeting_id",
|
||||
sa.String,
|
||||
sa.ForeignKey("meeting.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column(
|
||||
"room_id",
|
||||
sa.String,
|
||||
sa.ForeignKey("room.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
),
|
||||
sa.Column("session_id", sa.String, nullable=False),
|
||||
sa.Column("user_id", sa.String, nullable=True),
|
||||
sa.Column("user_name", sa.String, nullable=False),
|
||||
sa.Column("joined_at", sa.DateTime(timezone=True), nullable=False),
|
||||
sa.Column("left_at", sa.DateTime(timezone=True), nullable=True),
|
||||
sa.Index("idx_daily_session_meeting_left", "meeting_id", "left_at"),
|
||||
sa.Index("idx_daily_session_room", "room_id"),
|
||||
)
|
||||
|
||||
|
||||
class DailyParticipantSession(BaseModel):
|
||||
"""Daily.co participant session record.
|
||||
|
||||
Tracks when a participant joined and left a meeting. Populated from webhooks:
|
||||
- participant.joined: Creates record with left_at=None
|
||||
- participant.left: Updates record with left_at
|
||||
|
||||
ID format: {meeting_id}:{user_id}:{joined_at_ms}
|
||||
- Ensures idempotency (duplicate webhooks don't create duplicates)
|
||||
- Allows same user to rejoin (different joined_at = different session)
|
||||
|
||||
Duration is calculated as: left_at - joined_at (not stored)
|
||||
"""
|
||||
|
||||
id: NonEmptyString
|
||||
meeting_id: NonEmptyString
|
||||
room_id: NonEmptyString
|
||||
session_id: NonEmptyString # Daily.co's session_id (identifies room session)
|
||||
user_id: NonEmptyString | None = None
|
||||
user_name: str
|
||||
joined_at: datetime
|
||||
left_at: datetime | None = None
|
||||
|
||||
|
||||
class DailyParticipantSessionController:
|
||||
"""Controller for Daily.co participant session persistence."""
|
||||
|
||||
async def get_by_id(self, id: str) -> DailyParticipantSession | None:
|
||||
"""Get a session by its ID."""
|
||||
query = daily_participant_sessions.select().where(
|
||||
daily_participant_sessions.c.id == id
|
||||
)
|
||||
result = await get_database().fetch_one(query)
|
||||
return DailyParticipantSession(**result) if result else None
|
||||
|
||||
async def get_open_session(
|
||||
self, meeting_id: NonEmptyString, session_id: NonEmptyString
|
||||
) -> DailyParticipantSession | None:
|
||||
"""Get the open (not left) session for a user in a meeting."""
|
||||
query = daily_participant_sessions.select().where(
|
||||
sa.and_(
|
||||
daily_participant_sessions.c.meeting_id == meeting_id,
|
||||
daily_participant_sessions.c.session_id == session_id,
|
||||
daily_participant_sessions.c.left_at.is_(None),
|
||||
)
|
||||
)
|
||||
results = await get_database().fetch_all(query)
|
||||
|
||||
if len(results) > 1:
|
||||
raise ValueError(
|
||||
f"Multiple open sessions for daily session {session_id} in meeting {meeting_id}: "
|
||||
f"found {len(results)} sessions"
|
||||
)
|
||||
|
||||
return DailyParticipantSession(**results[0]) if results else None
|
||||
|
||||
async def upsert_joined(self, session: DailyParticipantSession) -> None:
|
||||
"""Insert or update when participant.joined webhook arrives.
|
||||
|
||||
Idempotent: Duplicate webhooks with same ID are safely ignored.
|
||||
Out-of-order: If left webhook arrived first, preserves left_at.
|
||||
"""
|
||||
query = insert(daily_participant_sessions).values(**session.model_dump())
|
||||
query = query.on_conflict_do_update(
|
||||
index_elements=["id"],
|
||||
set_={"user_name": session.user_name},
|
||||
)
|
||||
await get_database().execute(query)
|
||||
|
||||
async def upsert_left(self, session: DailyParticipantSession) -> None:
|
||||
"""Update session when participant.left webhook arrives.
|
||||
|
||||
Finds the open session for this user in this meeting and updates left_at.
|
||||
Works around Daily.co webhook timestamp inconsistency (joined_at differs by ~4ms between webhooks).
|
||||
|
||||
Handles three cases:
|
||||
1. Normal flow: open session exists → updates left_at
|
||||
2. Out-of-order: left arrives first → creates new record with left data
|
||||
3. Duplicate: left arrives again → idempotent (DB trigger prevents left_at modification)
|
||||
"""
|
||||
if session.left_at is None:
|
||||
raise ValueError("left_at is required for upsert_left")
|
||||
|
||||
if session.left_at <= session.joined_at:
|
||||
raise ValueError(
|
||||
f"left_at ({session.left_at}) must be after joined_at ({session.joined_at})"
|
||||
)
|
||||
|
||||
# Find existing open session (works around timestamp mismatch in webhooks)
|
||||
existing = await self.get_open_session(session.meeting_id, session.session_id)
|
||||
|
||||
if existing:
|
||||
# Update existing open session
|
||||
query = (
|
||||
daily_participant_sessions.update()
|
||||
.where(daily_participant_sessions.c.id == existing.id)
|
||||
.values(left_at=session.left_at)
|
||||
)
|
||||
await get_database().execute(query)
|
||||
else:
|
||||
# Out-of-order or first webhook: insert new record
|
||||
query = insert(daily_participant_sessions).values(**session.model_dump())
|
||||
query = query.on_conflict_do_nothing(index_elements=["id"])
|
||||
await get_database().execute(query)
|
||||
|
||||
async def get_by_meeting(self, meeting_id: str) -> list[DailyParticipantSession]:
|
||||
"""Get all participant sessions for a meeting (active and ended)."""
|
||||
query = daily_participant_sessions.select().where(
|
||||
daily_participant_sessions.c.meeting_id == meeting_id
|
||||
)
|
||||
results = await get_database().fetch_all(query)
|
||||
return [DailyParticipantSession(**result) for result in results]
|
||||
|
||||
async def get_active_by_meeting(
|
||||
self, meeting_id: str
|
||||
) -> list[DailyParticipantSession]:
|
||||
"""Get only active (not left) participant sessions for a meeting."""
|
||||
query = daily_participant_sessions.select().where(
|
||||
sa.and_(
|
||||
daily_participant_sessions.c.meeting_id == meeting_id,
|
||||
daily_participant_sessions.c.left_at.is_(None),
|
||||
)
|
||||
)
|
||||
results = await get_database().fetch_all(query)
|
||||
return [DailyParticipantSession(**result) for result in results]
|
||||
|
||||
|
||||
daily_participant_sessions_controller = DailyParticipantSessionController()
|
||||
@@ -1,10 +1,10 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
||||
from typing import TYPE_CHECKING, Any, Dict, Optional
|
||||
|
||||
from ..schemas.platform import Platform
|
||||
from ..utils.string import NonEmptyString
|
||||
from .models import MeetingData, VideoPlatformConfig
|
||||
from .models import MeetingData, SessionData, VideoPlatformConfig
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from reflector.db.rooms import Room
|
||||
@@ -26,7 +26,8 @@ class VideoPlatformClient(ABC):
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_room_sessions(self, room_name: str) -> List[Any] | None:
|
||||
async def get_room_sessions(self, room_name: str) -> list[SessionData]:
|
||||
"""Get session history for a room."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
|
||||
@@ -3,10 +3,13 @@ import hmac
|
||||
from datetime import datetime
|
||||
from hashlib import sha256
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Dict, List, Optional
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import httpx
|
||||
|
||||
from reflector.db.daily_participant_sessions import (
|
||||
daily_participant_sessions_controller,
|
||||
)
|
||||
from reflector.db.rooms import Room
|
||||
from reflector.logger import logger
|
||||
from reflector.storage import get_dailyco_storage
|
||||
@@ -15,7 +18,7 @@ from ..schemas.platform import Platform
|
||||
from ..utils.daily import DailyRoomName
|
||||
from ..utils.string import NonEmptyString
|
||||
from .base import ROOM_PREFIX_SEPARATOR, VideoPlatformClient
|
||||
from .models import MeetingData, RecordingType, VideoPlatformConfig
|
||||
from .models import MeetingData, RecordingType, SessionData, VideoPlatformConfig
|
||||
|
||||
|
||||
class DailyClient(VideoPlatformClient):
|
||||
@@ -61,16 +64,16 @@ class DailyClient(VideoPlatformClient):
|
||||
},
|
||||
}
|
||||
|
||||
# Get storage config for passing to Daily API
|
||||
daily_storage = get_dailyco_storage()
|
||||
assert daily_storage.bucket_name, "S3 bucket must be configured"
|
||||
data["properties"]["recordings_bucket"] = {
|
||||
"bucket_name": daily_storage.bucket_name,
|
||||
"bucket_region": daily_storage.region,
|
||||
"assume_role_arn": daily_storage.role_credential,
|
||||
"allow_api_access": True,
|
||||
}
|
||||
|
||||
# Only configure recordings_bucket if recording is enabled
|
||||
if room.recording_type != self.RECORDING_NONE:
|
||||
daily_storage = get_dailyco_storage()
|
||||
assert daily_storage.bucket_name, "S3 bucket must be configured"
|
||||
data["properties"]["recordings_bucket"] = {
|
||||
"bucket_name": daily_storage.bucket_name,
|
||||
"bucket_region": daily_storage.region,
|
||||
"assume_role_arn": daily_storage.role_credential,
|
||||
"allow_api_access": True,
|
||||
}
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
f"{self.BASE_URL}/rooms",
|
||||
@@ -99,11 +102,49 @@ class DailyClient(VideoPlatformClient):
|
||||
extra_data=result,
|
||||
)
|
||||
|
||||
async def get_room_sessions(self, room_name: str) -> List[Any] | None:
|
||||
# no such api
|
||||
return None
|
||||
async def get_room_sessions(self, room_name: str) -> list[SessionData]:
|
||||
"""Get room session history from database (webhook-stored sessions).
|
||||
|
||||
Daily.co doesn't provide historical session API, so we query our database
|
||||
where participant.joined/left webhooks are stored.
|
||||
"""
|
||||
from reflector.db.meetings import meetings_controller
|
||||
|
||||
meeting = await meetings_controller.get_by_room_name(room_name)
|
||||
if not meeting:
|
||||
return []
|
||||
|
||||
sessions = await daily_participant_sessions_controller.get_by_meeting(
|
||||
meeting.id
|
||||
)
|
||||
|
||||
return [
|
||||
SessionData(
|
||||
session_id=s.id,
|
||||
started_at=s.joined_at,
|
||||
ended_at=s.left_at,
|
||||
)
|
||||
for s in sessions
|
||||
]
|
||||
|
||||
async def get_room_presence(self, room_name: str) -> Dict[str, Any]:
|
||||
"""Get room presence/session data for a Daily.co room.
|
||||
|
||||
Example response:
|
||||
{
|
||||
"total_count": 1,
|
||||
"data": [
|
||||
{
|
||||
"room": "w2pp2cf4kltgFACPKXmX",
|
||||
"id": "d61cd7b2-a273-42b4-89bd-be763fd562c1",
|
||||
"userId": "pbZ+ismP7dk=",
|
||||
"userName": "Moishe",
|
||||
"joinTime": "2023-01-01T20:53:19.000Z",
|
||||
"duration": 2312
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
f"{self.BASE_URL}/rooms/{room_name}/presence",
|
||||
@@ -114,6 +155,28 @@ class DailyClient(VideoPlatformClient):
|
||||
return response.json()
|
||||
|
||||
async def get_meeting_participants(self, meeting_id: str) -> Dict[str, Any]:
|
||||
"""Get participant data for a specific Daily.co meeting.
|
||||
|
||||
Example response:
|
||||
{
|
||||
"data": [
|
||||
{
|
||||
"user_id": "4q47OTmqa/w=",
|
||||
"participant_id": "d61cd7b2-a273-42b4-89bd-be763fd562c1",
|
||||
"user_name": "Lindsey",
|
||||
"join_time": 1672786813,
|
||||
"duration": 150
|
||||
},
|
||||
{
|
||||
"user_id": "pbZ+ismP7dk=",
|
||||
"participant_id": "b3d56359-14d7-46af-ac8b-18f8c991f5f6",
|
||||
"user_name": "Moishe",
|
||||
"join_time": 1672786797,
|
||||
"duration": 165
|
||||
}
|
||||
]
|
||||
}
|
||||
"""
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(
|
||||
f"{self.BASE_URL}/meetings/{meeting_id}/participants",
|
||||
|
||||
@@ -1,18 +1,38 @@
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, Literal, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
|
||||
from reflector.utils.string import NonEmptyString
|
||||
|
||||
RecordingType = Literal["none", "local", "cloud"]
|
||||
|
||||
|
||||
class SessionData(BaseModel):
|
||||
"""Platform-agnostic session data.
|
||||
|
||||
Represents a participant session in a meeting room, regardless of platform.
|
||||
Used to determine if a meeting is still active or has ended.
|
||||
"""
|
||||
|
||||
session_id: NonEmptyString = Field(description="Unique session identifier")
|
||||
started_at: datetime = Field(description="When session started (UTC)")
|
||||
ended_at: datetime | None = Field(
|
||||
description="When session ended (UTC), None if still active"
|
||||
)
|
||||
|
||||
|
||||
class MeetingData(BaseModel):
|
||||
platform: Platform
|
||||
meeting_id: str = Field(description="Platform-specific meeting identifier")
|
||||
room_url: str = Field(description="URL for participants to join")
|
||||
host_room_url: str = Field(description="URL for hosts (may be same as room_url)")
|
||||
room_name: str = Field(description="Human-readable room name")
|
||||
meeting_id: NonEmptyString = Field(
|
||||
description="Platform-specific meeting identifier"
|
||||
)
|
||||
room_url: NonEmptyString = Field(description="URL for participants to join")
|
||||
host_room_url: NonEmptyString = Field(
|
||||
description="URL for hosts (may be same as room_url)"
|
||||
)
|
||||
room_name: NonEmptyString = Field(description="Human-readable room name")
|
||||
extra_data: Dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
class Config:
|
||||
|
||||
@@ -4,7 +4,7 @@ import re
|
||||
import time
|
||||
from datetime import datetime
|
||||
from hashlib import sha256
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import Optional
|
||||
|
||||
import httpx
|
||||
|
||||
@@ -13,11 +13,8 @@ from reflector.storage import get_whereby_storage
|
||||
|
||||
from ..schemas.platform import WHEREBY_PLATFORM, Platform
|
||||
from ..utils.string import NonEmptyString
|
||||
from .base import (
|
||||
MeetingData,
|
||||
VideoPlatformClient,
|
||||
VideoPlatformConfig,
|
||||
)
|
||||
from .base import VideoPlatformClient
|
||||
from .models import MeetingData, SessionData, VideoPlatformConfig
|
||||
from .whereby_utils import whereby_room_name_prefix
|
||||
|
||||
|
||||
@@ -80,15 +77,50 @@ class WherebyClient(VideoPlatformClient):
|
||||
extra_data=result,
|
||||
)
|
||||
|
||||
async def get_room_sessions(self, room_name: str) -> Dict[str, Any]:
|
||||
async def get_room_sessions(self, room_name: str) -> list[SessionData]:
|
||||
"""Get room session history from Whereby API.
|
||||
|
||||
Whereby API returns: [{"sessionId": "...", "startedAt": "...", "endedAt": "..." | null}, ...]
|
||||
"""
|
||||
async with httpx.AsyncClient() as client:
|
||||
"""
|
||||
{
|
||||
"cursor": "text",
|
||||
"results": [
|
||||
{
|
||||
"roomSessionId": "e2f29530-46ec-4cee-8b27-e565cb5bb2e9",
|
||||
"roomName": "/room-prefix-793e9ec1-c686-423d-9043-9b7a10c553fd",
|
||||
"startedAt": "2025-01-01T00:00:00.000Z",
|
||||
"endedAt": "2025-01-01T01:00:00.000Z",
|
||||
"totalParticipantMinutes": 124,
|
||||
"totalRecorderMinutes": 120,
|
||||
"totalStreamerMinutes": 120,
|
||||
"totalUniqueParticipants": 4,
|
||||
"totalUniqueRecorders": 3,
|
||||
"totalUniqueStreamers": 2
|
||||
}
|
||||
]
|
||||
}"""
|
||||
response = await client.get(
|
||||
f"{self.config.api_url}/insights/room-sessions?roomName={room_name}",
|
||||
headers=self.headers,
|
||||
timeout=self.TIMEOUT,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json().get("results", [])
|
||||
results = response.json().get("results", [])
|
||||
|
||||
return [
|
||||
SessionData(
|
||||
session_id=s["roomSessionId"],
|
||||
started_at=datetime.fromisoformat(
|
||||
s["startedAt"].replace("Z", "+00:00")
|
||||
),
|
||||
ended_at=datetime.fromisoformat(s["endedAt"].replace("Z", "+00:00"))
|
||||
if s.get("endedAt")
|
||||
else None,
|
||||
)
|
||||
for s in results
|
||||
]
|
||||
|
||||
async def delete_room(self, room_name: str) -> bool:
|
||||
return True
|
||||
|
||||
@@ -1,9 +1,15 @@
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Dict, Literal
|
||||
|
||||
from fastapi import APIRouter, HTTPException, Request
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.db import get_database
|
||||
from reflector.db.daily_participant_sessions import (
|
||||
DailyParticipantSession,
|
||||
daily_participant_sessions_controller,
|
||||
)
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.logger import logger as _logger
|
||||
from reflector.settings import settings
|
||||
@@ -44,6 +50,24 @@ def _extract_room_name(event: DailyWebhookEvent) -> DailyRoomName | None:
|
||||
async def webhook(request: Request):
|
||||
"""Handle Daily webhook events.
|
||||
|
||||
Example webhook payload:
|
||||
{
|
||||
"version": "1.0.0",
|
||||
"type": "recording.ready-to-download",
|
||||
"id": "rec-rtd-c3df927c-f738-4471-a2b7-066fa7e95a6b-1692124192",
|
||||
"payload": {
|
||||
"recording_id": "08fa0b24-9220-44c5-846c-3f116cf8e738",
|
||||
"room_name": "Xcm97xRZ08b2dePKb78g",
|
||||
"start_ts": 1692124183,
|
||||
"status": "finished",
|
||||
"max_participants": 1,
|
||||
"duration": 9,
|
||||
"share_token": "ntDCL5k98Ulq", #gitleaks:allow
|
||||
"s3_key": "api-test-1j8fizhzd30c/Xcm97xRZ08b2dePKb78g/1692124183028"
|
||||
},
|
||||
"event_ts": 1692124192
|
||||
}
|
||||
|
||||
Daily.co circuit-breaker: After 3+ failed responses (4xx/5xx), webhook
|
||||
state→FAILED, stops sending events. Reset: scripts/recreate_daily_webhook.py
|
||||
"""
|
||||
@@ -103,6 +127,32 @@ async def webhook(request: Request):
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
"""
|
||||
{
|
||||
"version": "1.0.0",
|
||||
"type": "participant.joined",
|
||||
"id": "ptcpt-join-6497c79b-f326-4942-aef8-c36a29140ad1-1708972279961",
|
||||
"payload": {
|
||||
"room": "test",
|
||||
"user_id": "6497c79b-f326-4942-aef8-c36a29140ad1",
|
||||
"user_name": "testuser",
|
||||
"session_id": "0c0d2dda-f21d-4cf9-ab56-86bf3c407ffa",
|
||||
"joined_at": 1708972279.96,
|
||||
"will_eject_at": 1708972299.541,
|
||||
"owner": false,
|
||||
"permissions": {
|
||||
"hasPresence": true,
|
||||
"canSend": true,
|
||||
"canReceive": { "base": true },
|
||||
"canAdmin": false
|
||||
}
|
||||
},
|
||||
"event_ts": 1708972279.961
|
||||
}
|
||||
|
||||
"""
|
||||
|
||||
|
||||
async def _handle_participant_joined(event: DailyWebhookEvent):
|
||||
daily_room_name = _extract_room_name(event)
|
||||
if not daily_room_name:
|
||||
@@ -110,29 +160,111 @@ async def _handle_participant_joined(event: DailyWebhookEvent):
|
||||
return
|
||||
|
||||
meeting = await meetings_controller.get_by_room_name(daily_room_name)
|
||||
if meeting:
|
||||
await meetings_controller.increment_num_clients(meeting.id)
|
||||
logger.info(
|
||||
"Participant joined",
|
||||
meeting_id=meeting.id,
|
||||
room_name=daily_room_name,
|
||||
recording_type=meeting.recording_type,
|
||||
recording_trigger=meeting.recording_trigger,
|
||||
)
|
||||
else:
|
||||
if not meeting:
|
||||
logger.warning(
|
||||
"participant.joined: meeting not found", room_name=daily_room_name
|
||||
)
|
||||
return
|
||||
|
||||
payload = event.payload
|
||||
logger.warning({"payload": payload})
|
||||
joined_at = datetime.fromtimestamp(payload["joined_at"], tz=timezone.utc)
|
||||
session_id = f"{meeting.id}:{payload['session_id']}"
|
||||
|
||||
session = DailyParticipantSession(
|
||||
id=session_id,
|
||||
meeting_id=meeting.id,
|
||||
room_id=meeting.room_id,
|
||||
session_id=payload["session_id"],
|
||||
user_id=payload.get("user_id", None),
|
||||
user_name=payload["user_name"],
|
||||
joined_at=joined_at,
|
||||
left_at=None,
|
||||
)
|
||||
|
||||
# num_clients serves as a projection/cache of active session count for Daily.co
|
||||
# Both operations must succeed or fail together to maintain consistency
|
||||
async with get_database().transaction():
|
||||
await meetings_controller.increment_num_clients(meeting.id)
|
||||
await daily_participant_sessions_controller.upsert_joined(session)
|
||||
|
||||
logger.info(
|
||||
"Participant joined",
|
||||
meeting_id=meeting.id,
|
||||
room_name=daily_room_name,
|
||||
user_id=payload.get("user_id", None),
|
||||
user_name=payload.get("user_name"),
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
|
||||
"""
|
||||
{
|
||||
"version": "1.0.0",
|
||||
"type": "participant.left",
|
||||
"id": "ptcpt-left-16168c97-f973-4eae-9642-020fe3fda5db-1708972302986",
|
||||
"payload": {
|
||||
"room": "test",
|
||||
"user_id": "16168c97-f973-4eae-9642-020fe3fda5db",
|
||||
"user_name": "bipol",
|
||||
"session_id": "0c0d2dda-f21d-4cf9-ab56-86bf3c407ffa",
|
||||
"joined_at": 1708972291.567,
|
||||
"will_eject_at": null,
|
||||
"owner": false,
|
||||
"permissions": {
|
||||
"hasPresence": true,
|
||||
"canSend": true,
|
||||
"canReceive": { "base": true },
|
||||
"canAdmin": false
|
||||
},
|
||||
"duration": 11.419000148773193
|
||||
},
|
||||
"event_ts": 1708972302.986
|
||||
}
|
||||
"""
|
||||
|
||||
|
||||
async def _handle_participant_left(event: DailyWebhookEvent):
|
||||
room_name = _extract_room_name(event)
|
||||
if not room_name:
|
||||
logger.warning("participant.left: no room in payload", payload=event.payload)
|
||||
return
|
||||
|
||||
meeting = await meetings_controller.get_by_room_name(room_name)
|
||||
if meeting:
|
||||
if not meeting:
|
||||
logger.warning("participant.left: meeting not found", room_name=room_name)
|
||||
return
|
||||
|
||||
payload = event.payload
|
||||
joined_at = datetime.fromtimestamp(payload["joined_at"], tz=timezone.utc)
|
||||
left_at = datetime.fromtimestamp(event.event_ts, tz=timezone.utc)
|
||||
session_id = f"{meeting.id}:{payload['session_id']}"
|
||||
|
||||
session = DailyParticipantSession(
|
||||
id=session_id,
|
||||
meeting_id=meeting.id,
|
||||
room_id=meeting.room_id,
|
||||
session_id=payload["session_id"],
|
||||
user_id=payload.get("user_id", None),
|
||||
user_name=payload["user_name"],
|
||||
joined_at=joined_at,
|
||||
left_at=left_at,
|
||||
)
|
||||
|
||||
# num_clients serves as a projection/cache of active session count for Daily.co
|
||||
# Both operations must succeed or fail together to maintain consistency
|
||||
async with get_database().transaction():
|
||||
await meetings_controller.decrement_num_clients(meeting.id)
|
||||
await daily_participant_sessions_controller.upsert_left(session)
|
||||
|
||||
logger.info(
|
||||
"Participant left",
|
||||
meeting_id=meeting.id,
|
||||
room_name=room_name,
|
||||
user_id=payload.get("user_id", None),
|
||||
duration=payload.get("duration"),
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
|
||||
async def _handle_recording_started(event: DailyWebhookEvent):
|
||||
|
||||
@@ -107,7 +107,7 @@ async def create_upcoming_meetings_for_event(event, create_window, room: Room):
|
||||
client = create_platform_client(get_platform(room.platform))
|
||||
|
||||
meeting_data = await client.create_meeting(
|
||||
"",
|
||||
room.name,
|
||||
end_date=end_date,
|
||||
room=room,
|
||||
)
|
||||
|
||||
@@ -335,15 +335,15 @@ async def process_meetings():
|
||||
Uses distributed locking to prevent race conditions when multiple workers
|
||||
process the same meeting simultaneously.
|
||||
"""
|
||||
logger.debug("Processing meetings")
|
||||
meetings = await meetings_controller.get_all_active()
|
||||
logger.info(f"Processing {len(meetings)} meetings")
|
||||
current_time = datetime.now(timezone.utc)
|
||||
redis_client = get_redis_client()
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
|
||||
for meeting in meetings:
|
||||
logger_ = logger.bind(meeting_id=meeting.id, room_name=meeting.room_name)
|
||||
logger_.info("Processing meeting")
|
||||
lock_key = f"meeting_process_lock:{meeting.id}"
|
||||
lock = redis_client.lock(lock_key, timeout=120)
|
||||
|
||||
@@ -359,21 +359,23 @@ async def process_meetings():
|
||||
if end_date.tzinfo is None:
|
||||
end_date = end_date.replace(tzinfo=timezone.utc)
|
||||
|
||||
# This API call could be slow, extend lock if needed
|
||||
client = create_platform_client(meeting.platform)
|
||||
room_sessions = await client.get_room_sessions(meeting.room_name)
|
||||
|
||||
try:
|
||||
# Extend lock after slow operation to ensure we still hold it
|
||||
# Extend lock after operation to ensure we still hold it
|
||||
lock.extend(120, replace_ttl=True)
|
||||
except LockError:
|
||||
logger_.warning("Lost lock for meeting, skipping")
|
||||
continue
|
||||
|
||||
has_active_sessions = room_sessions and any(
|
||||
rs["endedAt"] is None for rs in room_sessions
|
||||
s.ended_at is None for s in room_sessions
|
||||
)
|
||||
has_had_sessions = bool(room_sessions)
|
||||
logger_.info(
|
||||
f"found {has_active_sessions} active sessions, had {has_had_sessions}"
|
||||
)
|
||||
|
||||
if has_active_sessions:
|
||||
logger_.debug("Meeting still has active sessions, keep it")
|
||||
|
||||
Reference in New Issue
Block a user