mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 04:39:06 +00:00
Add webhook events storage to meetings model
- Add events column as JSON type to meetings table with default empty array - Add events: List[Dict[str, Any]] field to Meeting model - Create migration 2890b5104577 for events column and apply successfully - Add MeetingController helper methods for event storage: - add_event() for generic event storage with timestamps - participant_joined(), participant_left() for participant tracking - recording_started(), recording_stopped() for recording events - get_events() for event retrieval - Update Jitsi webhook endpoints to store events: - Store participant join/leave events with data and timestamps - Store recording start/stop events from Prosody webhooks - Store recording completion events from Jibri finalize script - Events stored with type, timestamp, and data for webhook history tracking - Fix linting and formatting issues Addresses PR feedback point 12: save webhook events in meetings events field
This commit is contained in:
@@ -0,0 +1,38 @@
|
|||||||
|
"""Add events column to meetings table
|
||||||
|
|
||||||
|
Revision ID: 2890b5104577
|
||||||
|
Revises: 6e6ea8e607c5
|
||||||
|
Create Date: 2025-09-02 17:51:41.620777
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from alembic import op
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = "2890b5104577"
|
||||||
|
down_revision: Union[str, None] = "6e6ea8e607c5"
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
with op.batch_alter_table("meeting", schema=None) as batch_op:
|
||||||
|
batch_op.add_column(
|
||||||
|
sa.Column(
|
||||||
|
"events", sa.JSON(), server_default=sa.text("'[]'"), nullable=False
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
with op.batch_alter_table("meeting", schema=None) as batch_op:
|
||||||
|
batch_op.drop_column("events")
|
||||||
|
|
||||||
|
# ### end Alembic commands ###
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
from datetime import datetime
|
from datetime import datetime, timezone
|
||||||
from typing import Literal
|
from typing import Any, Dict, List, Literal
|
||||||
|
|
||||||
import sqlalchemy as sa
|
import sqlalchemy as sa
|
||||||
from fastapi import HTTPException
|
from fastapi import HTTPException
|
||||||
@@ -42,6 +42,7 @@ meetings = sa.Table(
|
|||||||
server_default=sa.true(),
|
server_default=sa.true(),
|
||||||
),
|
),
|
||||||
sa.Column("platform", sa.String, nullable=False, server_default="whereby"),
|
sa.Column("platform", sa.String, nullable=False, server_default="whereby"),
|
||||||
|
sa.Column("events", sa.JSON, nullable=False, server_default=sa.text("'[]'")),
|
||||||
sa.Index("idx_meeting_room_id", "room_id"),
|
sa.Index("idx_meeting_room_id", "room_id"),
|
||||||
sa.Index(
|
sa.Index(
|
||||||
"idx_one_active_meeting_per_room",
|
"idx_one_active_meeting_per_room",
|
||||||
@@ -92,6 +93,7 @@ class Meeting(BaseModel):
|
|||||||
] = "automatic-2nd-participant"
|
] = "automatic-2nd-participant"
|
||||||
num_clients: int = 0
|
num_clients: int = 0
|
||||||
platform: VideoPlatform = VideoPlatform.WHEREBY
|
platform: VideoPlatform = VideoPlatform.WHEREBY
|
||||||
|
events: List[Dict[str, Any]] = Field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
class MeetingController:
|
class MeetingController:
|
||||||
@@ -202,6 +204,68 @@ class MeetingController:
|
|||||||
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
|
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
|
||||||
await get_database().execute(query)
|
await get_database().execute(query)
|
||||||
|
|
||||||
|
async def add_event(
|
||||||
|
self, meeting_id: str, event_type: str, event_data: Dict[str, Any] = None
|
||||||
|
):
|
||||||
|
"""Add an event to a meeting's events list."""
|
||||||
|
if event_data is None:
|
||||||
|
event_data = {}
|
||||||
|
|
||||||
|
event = {
|
||||||
|
"type": event_type,
|
||||||
|
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
|
||||||
|
"data": event_data,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Get current events
|
||||||
|
query = meetings.select().where(meetings.c.id == meeting_id)
|
||||||
|
result = await get_database().fetch_one(query)
|
||||||
|
if not result:
|
||||||
|
return
|
||||||
|
|
||||||
|
current_events = result["events"] or []
|
||||||
|
current_events.append(event)
|
||||||
|
|
||||||
|
# Update with new events list
|
||||||
|
update_query = (
|
||||||
|
meetings.update()
|
||||||
|
.where(meetings.c.id == meeting_id)
|
||||||
|
.values(events=current_events)
|
||||||
|
)
|
||||||
|
await get_database().execute(update_query)
|
||||||
|
|
||||||
|
async def participant_joined(
|
||||||
|
self, meeting_id: str, participant_data: Dict[str, Any] = None
|
||||||
|
):
|
||||||
|
"""Record a participant joined event."""
|
||||||
|
await self.add_event(meeting_id, "participant_joined", participant_data)
|
||||||
|
|
||||||
|
async def participant_left(
|
||||||
|
self, meeting_id: str, participant_data: Dict[str, Any] = None
|
||||||
|
):
|
||||||
|
"""Record a participant left event."""
|
||||||
|
await self.add_event(meeting_id, "participant_left", participant_data)
|
||||||
|
|
||||||
|
async def recording_started(
|
||||||
|
self, meeting_id: str, recording_data: Dict[str, Any] = None
|
||||||
|
):
|
||||||
|
"""Record a recording started event."""
|
||||||
|
await self.add_event(meeting_id, "recording_started", recording_data)
|
||||||
|
|
||||||
|
async def recording_stopped(
|
||||||
|
self, meeting_id: str, recording_data: Dict[str, Any] = None
|
||||||
|
):
|
||||||
|
"""Record a recording stopped event."""
|
||||||
|
await self.add_event(meeting_id, "recording_stopped", recording_data)
|
||||||
|
|
||||||
|
async def get_events(self, meeting_id: str) -> List[Dict[str, Any]]:
|
||||||
|
"""Get all events for a meeting."""
|
||||||
|
query = meetings.select().where(meetings.c.id == meeting_id)
|
||||||
|
result = await get_database().fetch_one(query)
|
||||||
|
if not result:
|
||||||
|
return []
|
||||||
|
return result["events"] or []
|
||||||
|
|
||||||
|
|
||||||
class MeetingConsentController:
|
class MeetingConsentController:
|
||||||
async def get_by_meeting_id(self, meeting_id: str) -> list[MeetingConsent]:
|
async def get_by_meeting_id(self, meeting_id: str) -> list[MeetingConsent]:
|
||||||
|
|||||||
@@ -77,25 +77,33 @@ async def jitsi_events_webhook(event: JitsiWebhookEvent, request: Request):
|
|||||||
|
|
||||||
# Handle participant events
|
# Handle participant events
|
||||||
if event.event == "muc-occupant-joined":
|
if event.event == "muc-occupant-joined":
|
||||||
# Get current participant count and increment
|
# Store event and update participant count
|
||||||
|
await meetings_controller.participant_joined(
|
||||||
|
meeting.id, {"timestamp": event.timestamp, "data": event.data}
|
||||||
|
)
|
||||||
current_count = getattr(meeting, "num_clients", 0)
|
current_count = getattr(meeting, "num_clients", 0)
|
||||||
await meetings_controller.update_meeting(
|
await meetings_controller.update_meeting(
|
||||||
meeting.id, num_clients=current_count + 1
|
meeting.id, num_clients=current_count + 1
|
||||||
)
|
)
|
||||||
elif event.event == "muc-occupant-left":
|
elif event.event == "muc-occupant-left":
|
||||||
# Get current participant count and decrement (minimum 0)
|
# Store event and update participant count
|
||||||
|
await meetings_controller.participant_left(
|
||||||
|
meeting.id, {"timestamp": event.timestamp, "data": event.data}
|
||||||
|
)
|
||||||
current_count = getattr(meeting, "num_clients", 0)
|
current_count = getattr(meeting, "num_clients", 0)
|
||||||
await meetings_controller.update_meeting(
|
await meetings_controller.update_meeting(
|
||||||
meeting.id, num_clients=max(0, current_count - 1)
|
meeting.id, num_clients=max(0, current_count - 1)
|
||||||
)
|
)
|
||||||
elif event.event == "jibri-recording-on":
|
elif event.event == "jibri-recording-on":
|
||||||
# Recording started - could update meeting status if needed
|
# Store recording started event
|
||||||
# For now, we just acknowledge the event
|
await meetings_controller.recording_started(
|
||||||
pass
|
meeting.id, {"timestamp": event.timestamp, "data": event.data}
|
||||||
|
)
|
||||||
elif event.event == "jibri-recording-off":
|
elif event.event == "jibri-recording-off":
|
||||||
# Recording stopped - could trigger processing pipeline
|
# Store recording stopped event
|
||||||
# This would be where we initiate transcript processing
|
await meetings_controller.recording_stopped(
|
||||||
pass
|
meeting.id, {"timestamp": event.timestamp, "data": event.data}
|
||||||
|
)
|
||||||
|
|
||||||
return {"status": "ok", "event": event.event, "room": event.room}
|
return {"status": "ok", "event": event.event, "room": event.room}
|
||||||
|
|
||||||
@@ -120,6 +128,17 @@ async def jibri_recording_complete(event: JibriRecordingEvent, request: Request)
|
|||||||
if not meeting:
|
if not meeting:
|
||||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||||
|
|
||||||
|
# Store recording completion event
|
||||||
|
await meetings_controller.add_event(
|
||||||
|
meeting.id,
|
||||||
|
"recording_completed",
|
||||||
|
{
|
||||||
|
"recording_file": event.recording_file,
|
||||||
|
"recording_status": event.recording_status,
|
||||||
|
"timestamp": event.timestamp,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
# TODO: Trigger recording processing pipeline
|
# TODO: Trigger recording processing pipeline
|
||||||
# This is where we would:
|
# This is where we would:
|
||||||
# 1. Download the recording file from Jibri storage
|
# 1. Download the recording file from Jibri storage
|
||||||
|
|||||||
Reference in New Issue
Block a user