From da700069d970b9af0550ace6f53a6305c8930348 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Tue, 2 Sep 2025 17:53:35 -0600 Subject: [PATCH] Add webhook events storage to meetings model - Add events column as JSON type to meetings table with default empty array - Add events: List[Dict[str, Any]] field to Meeting model - Create migration 2890b5104577 for events column and apply successfully - Add MeetingController helper methods for event storage: - add_event() for generic event storage with timestamps - participant_joined(), participant_left() for participant tracking - recording_started(), recording_stopped() for recording events - get_events() for event retrieval - Update Jitsi webhook endpoints to store events: - Store participant join/leave events with data and timestamps - Store recording start/stop events from Prosody webhooks - Store recording completion events from Jibri finalize script - Events stored with type, timestamp, and data for webhook history tracking - Fix linting and formatting issues Addresses PR feedback point 12: save webhook events in meetings events field --- ...577_add_events_column_to_meetings_table.py | 38 +++++++++++ server/reflector/db/meetings.py | 68 ++++++++++++++++++- server/reflector/views/jitsi.py | 35 +++++++--- 3 files changed, 131 insertions(+), 10 deletions(-) create mode 100644 server/migrations/versions/2890b5104577_add_events_column_to_meetings_table.py diff --git a/server/migrations/versions/2890b5104577_add_events_column_to_meetings_table.py b/server/migrations/versions/2890b5104577_add_events_column_to_meetings_table.py new file mode 100644 index 00000000..b1787b24 --- /dev/null +++ b/server/migrations/versions/2890b5104577_add_events_column_to_meetings_table.py @@ -0,0 +1,38 @@ +"""Add events column to meetings table + +Revision ID: 2890b5104577 +Revises: 6e6ea8e607c5 +Create Date: 2025-09-02 17:51:41.620777 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "2890b5104577" +down_revision: Union[str, None] = "6e6ea8e607c5" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("meeting", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "events", sa.JSON(), server_default=sa.text("'[]'"), nullable=False + ) + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("meeting", schema=None) as batch_op: + batch_op.drop_column("events") + + # ### end Alembic commands ### diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index b32800f8..ed4f6169 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -1,5 +1,5 @@ -from datetime import datetime -from typing import Literal +from datetime import datetime, timezone +from typing import Any, Dict, List, Literal import sqlalchemy as sa from fastapi import HTTPException @@ -42,6 +42,7 @@ meetings = sa.Table( server_default=sa.true(), ), sa.Column("platform", sa.String, nullable=False, server_default="whereby"), + sa.Column("events", sa.JSON, nullable=False, server_default=sa.text("'[]'")), sa.Index("idx_meeting_room_id", "room_id"), sa.Index( "idx_one_active_meeting_per_room", @@ -92,6 +93,7 @@ class Meeting(BaseModel): ] = "automatic-2nd-participant" num_clients: int = 0 platform: VideoPlatform = VideoPlatform.WHEREBY + events: List[Dict[str, Any]] = Field(default_factory=list) class MeetingController: @@ -202,6 +204,68 @@ class MeetingController: query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs) await get_database().execute(query) + async def add_event( + self, meeting_id: str, event_type: str, event_data: Dict[str, Any] = None + ): + """Add an event to a meeting's events list.""" + if event_data is None: + event_data = {} + + event = { + "type": event_type, + "timestamp": datetime.now(tz=timezone.utc).isoformat(), + "data": event_data, + } + + # Get current events + query = meetings.select().where(meetings.c.id == meeting_id) + result = await get_database().fetch_one(query) + if not result: + return + + current_events = result["events"] or [] + current_events.append(event) + + # Update with new events list + update_query = ( + meetings.update() + .where(meetings.c.id == meeting_id) + .values(events=current_events) + ) + await get_database().execute(update_query) + + async def participant_joined( + self, meeting_id: str, participant_data: Dict[str, Any] = None + ): + """Record a participant joined event.""" + await self.add_event(meeting_id, "participant_joined", participant_data) + + async def participant_left( + self, meeting_id: str, participant_data: Dict[str, Any] = None + ): + """Record a participant left event.""" + await self.add_event(meeting_id, "participant_left", participant_data) + + async def recording_started( + self, meeting_id: str, recording_data: Dict[str, Any] = None + ): + """Record a recording started event.""" + await self.add_event(meeting_id, "recording_started", recording_data) + + async def recording_stopped( + self, meeting_id: str, recording_data: Dict[str, Any] = None + ): + """Record a recording stopped event.""" + await self.add_event(meeting_id, "recording_stopped", recording_data) + + async def get_events(self, meeting_id: str) -> List[Dict[str, Any]]: + """Get all events for a meeting.""" + query = meetings.select().where(meetings.c.id == meeting_id) + result = await get_database().fetch_one(query) + if not result: + return [] + return result["events"] or [] + class MeetingConsentController: async def get_by_meeting_id(self, meeting_id: str) -> list[MeetingConsent]: diff --git a/server/reflector/views/jitsi.py b/server/reflector/views/jitsi.py index 62013344..f7fb2205 100644 --- a/server/reflector/views/jitsi.py +++ b/server/reflector/views/jitsi.py @@ -77,25 +77,33 @@ async def jitsi_events_webhook(event: JitsiWebhookEvent, request: Request): # Handle participant events if event.event == "muc-occupant-joined": - # Get current participant count and increment + # Store event and update participant count + await meetings_controller.participant_joined( + meeting.id, {"timestamp": event.timestamp, "data": event.data} + ) current_count = getattr(meeting, "num_clients", 0) await meetings_controller.update_meeting( meeting.id, num_clients=current_count + 1 ) elif event.event == "muc-occupant-left": - # Get current participant count and decrement (minimum 0) + # Store event and update participant count + await meetings_controller.participant_left( + meeting.id, {"timestamp": event.timestamp, "data": event.data} + ) current_count = getattr(meeting, "num_clients", 0) await meetings_controller.update_meeting( meeting.id, num_clients=max(0, current_count - 1) ) elif event.event == "jibri-recording-on": - # Recording started - could update meeting status if needed - # For now, we just acknowledge the event - pass + # Store recording started event + await meetings_controller.recording_started( + meeting.id, {"timestamp": event.timestamp, "data": event.data} + ) elif event.event == "jibri-recording-off": - # Recording stopped - could trigger processing pipeline - # This would be where we initiate transcript processing - pass + # Store recording stopped event + await meetings_controller.recording_stopped( + meeting.id, {"timestamp": event.timestamp, "data": event.data} + ) return {"status": "ok", "event": event.event, "room": event.room} @@ -120,6 +128,17 @@ async def jibri_recording_complete(event: JibriRecordingEvent, request: Request) if not meeting: raise HTTPException(status_code=404, detail="Meeting not found") + # Store recording completion event + await meetings_controller.add_event( + meeting.id, + "recording_completed", + { + "recording_file": event.recording_file, + "recording_status": event.recording_status, + "timestamp": event.timestamp, + }, + ) + # TODO: Trigger recording processing pipeline # This is where we would: # 1. Download the recording file from Jibri storage