From e91979abbc8b8c8b2570e1bb3ce9a54ca9865128 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Wed, 17 Sep 2025 15:16:03 -0600 Subject: [PATCH] feat: use jitsi file system --- server/contrib/jitsi/README.md | 212 ++++++++++++ server/contrib/jitsi/finalize.sh | 49 +++ server/contrib/jitsi/mod_event_logger.lua | 372 ++++++++++++++++++++++ server/reflector/app.py | 2 + server/reflector/jibri_events.py | 227 +++++++++++++ server/reflector/settings.py | 3 + server/reflector/views/jibri_webhook.py | 126 ++++++++ server/reflector/worker/app.py | 5 + server/reflector/worker/jitsi_events.py | 281 ++++++++++++++++ server/run_jibri_tests.py | 49 +++ server/tests/test_jibri_events.py | 122 +++++++ server/tests/test_jibri_webhook.py | 254 +++++++++++++++ 12 files changed, 1702 insertions(+) create mode 100644 server/contrib/jitsi/README.md create mode 100755 server/contrib/jitsi/finalize.sh create mode 100644 server/contrib/jitsi/mod_event_logger.lua create mode 100644 server/reflector/jibri_events.py create mode 100644 server/reflector/views/jibri_webhook.py create mode 100644 server/reflector/worker/jitsi_events.py create mode 100644 server/run_jibri_tests.py create mode 100644 server/tests/test_jibri_events.py create mode 100644 server/tests/test_jibri_webhook.py diff --git a/server/contrib/jitsi/README.md b/server/contrib/jitsi/README.md new file mode 100644 index 00000000..e2d8e3dc --- /dev/null +++ b/server/contrib/jitsi/README.md @@ -0,0 +1,212 @@ +# Event Logger for Docker-Jitsi-Meet + +A Prosody module that logs Jitsi meeting events to JSONL files alongside recordings, enabling complete participant tracking and speaker statistics. + +## Prerequisites + +- Running docker-jitsi-meet installation +- Jibri configured for recording + +## Installation + +### Step 1: Copy the Module + +Copy the Prosody module to your custom plugins directory: + +```bash +# Create the directory if it doesn't exist +mkdir -p ~/.jitsi-meet-cfg/prosody/prosody-plugins-custom + +# Copy the module +cp mod_event_logger.lua ~/.jitsi-meet-cfg/prosody/prosody-plugins-custom/ +``` + +### Step 2: Update Your .env File + +Add or modify these variables in your `.env` file: + +```bash +# If XMPP_MUC_MODULES already exists, append event_logger +# Example: XMPP_MUC_MODULES=existing_module,event_logger +XMPP_MUC_MODULES=event_logger + +# Optional: Configure the module (these are defaults) +JIBRI_RECORDINGS_PATH=/config/recordings +JIBRI_LOG_SPEAKER_STATS=true +JIBRI_SPEAKER_STATS_INTERVAL=10 +``` + +**Important**: If you already have `XMPP_MUC_MODULES` defined, add `event_logger` to the comma-separated list: +```bash +# Existing modules + our module +XMPP_MUC_MODULES=mod_info,mod_alert,event_logger +``` + +### Step 3: Modify docker-compose.yml + +Add a shared recordings volume so Prosody can write events alongside Jibri recordings: + +```yaml +services: + prosody: + # ... existing configuration ... + volumes: + - ${CONFIG}/prosody/config:/config:Z + - ${CONFIG}/prosody/prosody-plugins-custom:/prosody-plugins-custom:Z + - ${CONFIG}/recordings:/config/recordings:Z # Add this line + environment: + # Add if not using .env file + - XMPP_MUC_MODULES=${XMPP_MUC_MODULES:-event_logger} + - JIBRI_RECORDINGS_PATH=/config/recordings + + jibri: + # ... existing configuration ... + volumes: + - ${CONFIG}/jibri:/config:Z + - ${CONFIG}/recordings:/config/recordings:Z # Add this line + environment: + # For Reflector webhook integration (optional) + - REFLECTOR_WEBHOOK_URL=${REFLECTOR_WEBHOOK_URL:-} + - JIBRI_FINALIZE_RECORDING_SCRIPT_PATH=/config/finalize.sh +``` + +### Step 4: Add Finalize Script (Optional - For Reflector Integration) + +If you want to notify Reflector when recordings complete: + +```bash +# Copy the finalize script +cp finalize.sh ~/.jitsi-meet-cfg/jibri/finalize.sh +chmod +x ~/.jitsi-meet-cfg/jibri/finalize.sh + +# Add to .env +REFLECTOR_WEBHOOK_URL=http://your-reflector-api:8000 +``` + +### Step 5: Restart Services + +```bash +docker-compose down +docker-compose up -d +``` + +## What Gets Created + +After a recording, you'll find in `~/.jitsi-meet-cfg/recordings/{session-id}/`: +- `recording.mp4` - The video recording (created by Jibri) +- `metadata.json` - Basic metadata (created by Jibri) +- `events.jsonl` - Complete participant timeline (created by this module) + +## Event Format + +Each line in `events.jsonl` is a JSON object: + +```json +{"type":"room_created","timestamp":1234567890,"room_name":"TestRoom","room_jid":"testroom@conference.meet.jitsi","meeting_url":"https://meet.jitsi/TestRoom"} +{"type":"recording_started","timestamp":1234567891,"room_name":"TestRoom","session_id":"20240115120000_TestRoom","jibri_jid":"jibri@recorder.meet.jitsi"} +{"type":"participant_joined","timestamp":1234567892,"room_name":"TestRoom","participant":{"jid":"user1@meet.jitsi/web","nick":"John Doe","id":"user1@meet.jitsi","is_moderator":false}} +{"type":"speaker_active","timestamp":1234567895,"room_name":"TestRoom","speaker_jid":"user1@meet.jitsi","speaker_nick":"John Doe","duration":10} +{"type":"participant_left","timestamp":1234567920,"room_name":"TestRoom","participant":{"jid":"user1@meet.jitsi/web","nick":"John Doe","duration_seconds":28}} +{"type":"recording_stopped","timestamp":1234567950,"room_name":"TestRoom","session_id":"20240115120000_TestRoom","meeting_url":"https://meet.jitsi/TestRoom"} +``` + +## Configuration Options + +All configuration can be done via environment variables: + +| Environment Variable | Default | Description | +|---------------------|---------|-------------| +| `JIBRI_RECORDINGS_PATH` | `/config/recordings` | Path where recordings are stored | +| `JIBRI_LOG_SPEAKER_STATS` | `true` | Enable speaker statistics logging | +| `JIBRI_SPEAKER_STATS_INTERVAL` | `10` | Seconds between speaker stats updates | + +## Verifying Installation + +Check that the module is loaded: +```bash +docker-compose logs prosody | grep "Event Logger" +# Should see: "Event Logger loaded - writing to /config/recordings" +``` + +Check for events after a recording: +```bash +ls -la ~/.jitsi-meet-cfg/recordings/*/events.jsonl +cat ~/.jitsi-meet-cfg/recordings/*/events.jsonl | jq . +``` + +## Troubleshooting + +### No events.jsonl file created + +1. **Check module is enabled**: + ```bash + docker-compose exec prosody grep -r "event_logger" /config + ``` + +2. **Verify volume permissions**: + ```bash + docker-compose exec prosody ls -la /config/recordings + ``` + +3. **Check Prosody logs for errors**: + ```bash + docker-compose logs prosody | grep -i error + ``` + +### Module not loading + +1. **Verify file exists in container**: + ```bash + docker-compose exec prosody ls -la /prosody-plugins-custom/ + ``` + +2. **Check XMPP_MUC_MODULES format** (must be comma-separated, no spaces): + - ✅ Correct: `XMPP_MUC_MODULES=mod1,mod2,event_logger` + - ❌ Wrong: `XMPP_MUC_MODULES=mod1, mod2, event_logger` + +## Common docker-compose.yml Patterns + +### Minimal Addition (if you trust defaults) +```yaml +services: + prosody: + volumes: + - ${CONFIG}/recordings:/config/recordings:Z # Just add this +``` + +### Full Configuration +```yaml +services: + prosody: + volumes: + - ${CONFIG}/prosody/config:/config:Z + - ${CONFIG}/prosody/prosody-plugins-custom:/prosody-plugins-custom:Z + - ${CONFIG}/recordings:/config/recordings:Z + environment: + - XMPP_MUC_MODULES=event_logger + - JIBRI_RECORDINGS_PATH=/config/recordings + - JIBRI_LOG_SPEAKER_STATS=true + - JIBRI_SPEAKER_STATS_INTERVAL=10 + + jibri: + volumes: + - ${CONFIG}/jibri:/config:Z + - ${CONFIG}/recordings:/config/recordings:Z + environment: + - JIBRI_RECORDING_DIR=/config/recordings + - JIBRI_FINALIZE_RECORDING_SCRIPT_PATH=/config/finalize.sh +``` + +## Integration with Reflector + +The finalize.sh script will automatically notify Reflector when a recording completes if `REFLECTOR_WEBHOOK_URL` is set. Reflector will receive: + +```json +{ + "session_id": "20240115120000_TestRoom", + "path": "20240115120000_TestRoom", + "meeting_url": "https://meet.jitsi/TestRoom" +} +``` + +Reflector then processes the recording along with the complete participant timeline from `events.jsonl`. \ No newline at end of file diff --git a/server/contrib/jitsi/finalize.sh b/server/contrib/jitsi/finalize.sh new file mode 100755 index 00000000..633bc849 --- /dev/null +++ b/server/contrib/jitsi/finalize.sh @@ -0,0 +1,49 @@ +#!/bin/bash +# Jibri finalize script to notify Reflector when recording is complete +# This script is called by Jibri with the recording directory as argument + +RECORDING_PATH="$1" +SESSION_ID=$(basename "$RECORDING_PATH") +METADATA_FILE="$RECORDING_PATH/metadata.json" + +# Extract meeting URL from Jibri's metadata +MEETING_URL="" +if [ -f "$METADATA_FILE" ]; then + MEETING_URL=$(jq -r '.meeting_url' "$METADATA_FILE" 2>/dev/null || echo "") +fi + +echo "[$(date)] Recording finalized: $RECORDING_PATH" +echo "[$(date)] Session ID: $SESSION_ID" +echo "[$(date)] Meeting URL: $MEETING_URL" + +# Check if events.jsonl was created by our Prosody module +if [ -f "$RECORDING_PATH/events.jsonl" ]; then + EVENT_COUNT=$(wc -l < "$RECORDING_PATH/events.jsonl") + echo "[$(date)] Found events.jsonl with $EVENT_COUNT events" +else + echo "[$(date)] Warning: No events.jsonl found" +fi + +# Notify Reflector if webhook URL is configured +if [ -n "$REFLECTOR_WEBHOOK_URL" ]; then + echo "[$(date)] Notifying Reflector at: $REFLECTOR_WEBHOOK_URL" + + RESPONSE=$(curl -s -w "\n%{http_code}" -X POST "$REFLECTOR_WEBHOOK_URL/api/v1/jibri/recording-ready" \ + -H "Content-Type: application/json" \ + -d "{\"session_id\":\"$SESSION_ID\",\"path\":\"$SESSION_ID\",\"meeting_url\":\"$MEETING_URL\"}") + + HTTP_CODE=$(echo "$RESPONSE" | tail -n1) + BODY=$(echo "$RESPONSE" | sed '$d') + + if [ "$HTTP_CODE" = "200" ]; then + echo "[$(date)] Reflector notified successfully" + echo "[$(date)] Response: $BODY" + else + echo "[$(date)] Failed to notify Reflector. HTTP code: $HTTP_CODE" + echo "[$(date)] Response: $BODY" + fi +else + echo "[$(date)] No REFLECTOR_WEBHOOK_URL configured, skipping notification" +fi + +echo "[$(date)] Finalize script completed" \ No newline at end of file diff --git a/server/contrib/jitsi/mod_event_logger.lua b/server/contrib/jitsi/mod_event_logger.lua new file mode 100644 index 00000000..943ed72c --- /dev/null +++ b/server/contrib/jitsi/mod_event_logger.lua @@ -0,0 +1,372 @@ +local json = require "util.json" +local st = require "util.stanza" +local jid_bare = require "util.jid".bare + +local recordings_path = os.getenv("JIBRI_RECORDINGS_PATH") or + module:get_option_string("jibri_recordings_path", "/recordings") + +-- room_jid -> { session_id, participants = {jid -> info} } +local active_recordings = {} +-- room_jid -> { participants = {jid -> info}, created_at } +local room_states = {} + +local function get_timestamp() + return os.time() +end + +local function write_event(session_id, event) + if not session_id then + module:log("warn", "No session_id for event: %s", event.type) + return + end + + local session_dir = string.format("%s/%s", recordings_path, session_id) + local event_file = string.format("%s/events.jsonl", session_dir) + + module:log("info", "Writing event %s to %s", event.type, event_file) + + -- Create directory + local mkdir_cmd = string.format("mkdir -p '%s' 2>&1", session_dir) + local mkdir_result = os.execute(mkdir_cmd) + module:log("debug", "mkdir result: %s", tostring(mkdir_result)) + + local file, err = io.open(event_file, "a") + if file then + local json_str = json.encode(event) + file:write(json_str .. "\n") + file:close() + module:log("info", "Successfully wrote event %s", event.type) + else + module:log("error", "Failed to write event to %s: %s", event_file, err) + end +end + +local function extract_participant_info(occupant) + local info = { + jid = occupant.jid, + bare_jid = occupant.bare_jid, + nick = occupant.nick, + display_name = nil, + role = occupant.role + } + + local presence = occupant:get_presence() + if presence then + local nick_element = presence:get_child("nick", "http://jabber.org/protocol/nick") + if nick_element then + info.display_name = nick_element:get_text() + end + + local identity = presence:get_child("identity") + if identity then + local user = identity:get_child("user") + if user then + local name = user:get_child("name") + if name then + info.display_name = name:get_text() + end + + local id_element = user:get_child("id") + if id_element then + info.id = id_element:get_text() + end + end + end + + if not info.display_name and occupant.nick then + local _, _, resource = occupant.nick:match("([^@]+)@([^/]+)/(.+)") + if resource then + info.display_name = resource + end + end + end + + return info +end + +local function get_room_participant_count(room) + local count = 0 + for _ in room:each_occupant() do + count = count + 1 + end + return count +end + +local function snapshot_room_participants(room) + local participants = {} + local total = 0 + local skipped = 0 + + module:log("info", "Snapshotting room participants") + + for _, occupant in room:each_occupant() do + total = total + 1 + -- Skip recorders (Jibri) + if occupant.bare_jid and (occupant.bare_jid:match("^recorder@") or + occupant.bare_jid:match("^jibri@")) then + skipped = skipped + 1 + else + local info = extract_participant_info(occupant) + participants[occupant.jid] = info + module:log("debug", "Added participant: %s", info.display_name or info.bare_jid) + end + end + + module:log("info", "Snapshot: %d total, %d participants", total, total - skipped) + return participants +end + +-- Import utility functions if available +local util = module:require "util"; +local get_room_from_jid = util.get_room_from_jid; +local room_jid_match_rewrite = util.room_jid_match_rewrite; + +-- Main IQ handler for Jibri stanzas +module:hook("pre-iq/full", function(event) + local stanza = event.stanza + if stanza.name ~= "iq" then + return + end + + local jibri = stanza:get_child('jibri', 'http://jitsi.org/protocol/jibri') + if not jibri then + return + end + + module:log("info", "=== Jibri IQ intercepted ===") + + local action = jibri.attr.action + local session_id = jibri.attr.session_id + local room_jid = jibri.attr.room + local recording_mode = jibri.attr.recording_mode + local app_data = jibri.attr.app_data + + module:log("info", "Jibri %s - session: %s, room: %s, mode: %s", + action or "?", session_id or "?", room_jid or "?", recording_mode or "?") + + if not room_jid or not session_id then + module:log("warn", "Missing room_jid or session_id") + return + end + + -- Get the room using util function + local room = get_room_from_jid(room_jid_match_rewrite(jid_bare(stanza.attr.to))) + if not room then + -- Try with the room_jid directly + room = get_room_from_jid(room_jid) + end + + if not room then + module:log("error", "Room not found for jid: %s", room_jid) + return + end + + module:log("info", "Room found: %s", room:get_name() or room_jid) + + if action == "start" then + module:log("info", "Recording START for session %s", session_id) + + -- Count and snapshot participants + local participant_count = 0 + for _ in room:each_occupant() do + participant_count = participant_count + 1 + end + + local participants = snapshot_room_participants(room) + local participant_list = {} + for jid, info in pairs(participants) do + table.insert(participant_list, info) + end + + active_recordings[room_jid] = { + session_id = session_id, + participants = participants, + started_at = get_timestamp() + } + + write_event(session_id, { + type = "recording_started", + timestamp = get_timestamp(), + room_jid = room_jid, + room_name = room:get_name(), + session_id = session_id, + recording_mode = recording_mode, + app_data = app_data, + participant_count = participant_count, + participants_at_start = participant_list + }) + + elseif action == "stop" then + module:log("info", "Recording STOP for session %s", session_id) + + local recording = active_recordings[room_jid] + if recording and recording.session_id == session_id then + write_event(session_id, { + type = "recording_stopped", + timestamp = get_timestamp(), + room_jid = room_jid, + room_name = room:get_name(), + session_id = session_id, + duration = get_timestamp() - recording.started_at, + participant_count = get_room_participant_count(room) + }) + + active_recordings[room_jid] = nil + else + module:log("warn", "No active recording found for room %s", room_jid) + end + end +end); + +-- Room and participant event hooks +local function setup_room_hooks(host_module) + module:log("info", "Setting up room hooks on %s", host_module.host or "unknown") + + -- Room created + host_module:hook("muc-room-created", function(event) + local room = event.room + local room_jid = room.jid + + room_states[room_jid] = { + participants = {}, + created_at = get_timestamp() + } + + module:log("info", "Room created: %s", room_jid) + end) + + -- Room destroyed + host_module:hook("muc-room-destroyed", function(event) + local room = event.room + local room_jid = room.jid + + room_states[room_jid] = nil + active_recordings[room_jid] = nil + + module:log("info", "Room destroyed: %s", room_jid) + end) + + -- Occupant joined + host_module:hook("muc-occupant-joined", function(event) + local room = event.room + local occupant = event.occupant + local room_jid = room.jid + + -- Skip recorders + if occupant.bare_jid and (occupant.bare_jid:match("^recorder@") or + occupant.bare_jid:match("^jibri@")) then + return + end + + local participant_info = extract_participant_info(occupant) + + -- Update room state + if room_states[room_jid] then + room_states[room_jid].participants[occupant.jid] = participant_info + end + + -- Log to active recording if exists + local recording = active_recordings[room_jid] + if recording then + recording.participants[occupant.jid] = participant_info + + write_event(recording.session_id, { + type = "participant_joined", + timestamp = get_timestamp(), + room_jid = room_jid, + room_name = room:get_name(), + participant = participant_info, + participant_count = get_room_participant_count(room) + }) + end + + module:log("info", "Participant joined %s: %s (%d total)", + room:get_name() or room_jid, + participant_info.display_name or participant_info.bare_jid, + get_room_participant_count(room)) + end) + + -- Occupant left + host_module:hook("muc-occupant-left", function(event) + local room = event.room + local occupant = event.occupant + local room_jid = room.jid + + -- Skip recorders + if occupant.bare_jid and (occupant.bare_jid:match("^recorder@") or + occupant.bare_jid:match("^jibri@")) then + return + end + + local participant_info = extract_participant_info(occupant) + + -- Update room state + if room_states[room_jid] then + room_states[room_jid].participants[occupant.jid] = nil + end + + -- Log to active recording if exists + local recording = active_recordings[room_jid] + if recording then + if recording.participants[occupant.jid] then + recording.participants[occupant.jid] = nil + end + + write_event(recording.session_id, { + type = "participant_left", + timestamp = get_timestamp(), + room_jid = room_jid, + room_name = room:get_name(), + participant = participant_info, + participant_count = get_room_participant_count(room) + }) + end + + module:log("info", "Participant left %s: %s (%d remaining)", + room:get_name() or room_jid, + participant_info.display_name or participant_info.bare_jid, + get_room_participant_count(room)) + end) +end + +-- Module initialization +local current_host = module:get_host() +local host_type = module:get_host_type() + +module:log("info", "Event Logger loading on %s (type: %s)", current_host, host_type or "unknown") +module:log("info", "Recording path: %s", recordings_path) + +-- Setup room hooks based on host type +if host_type == "component" and current_host:match("^[^.]+%.") then + setup_room_hooks(module) +else + -- Try to find and hook to MUC component + local process_host_module = util.process_host_module + local muc_component_host = module:get_option_string("muc_component") or + module:get_option_string("main_muc") + + if not muc_component_host then + local possible_hosts = { + "muc." .. current_host, + "conference." .. current_host, + "rooms." .. current_host + } + + for _, host in ipairs(possible_hosts) do + if prosody.hosts[host] then + muc_component_host = host + module:log("info", "Auto-detected MUC component: %s", muc_component_host) + break + end + end + end + + if muc_component_host then + process_host_module(muc_component_host, function(host_module, host) + module:log("info", "Hooking to MUC events on %s", host) + setup_room_hooks(host_module) + end) + else + module:log("error", "Could not find MUC component") + end +end \ No newline at end of file diff --git a/server/reflector/app.py b/server/reflector/app.py index b07bf16b..72551cb2 100644 --- a/server/reflector/app.py +++ b/server/reflector/app.py @@ -14,6 +14,7 @@ from reflector.metrics import metrics_init from reflector.settings import settings from reflector.video_platforms.jitsi import router as jitsi_router from reflector.video_platforms.whereby import router as whereby_router +from reflector.views.jibri_webhook import router as jibri_webhook_router from reflector.views.meetings import router as meetings_router from reflector.views.rooms import router as rooms_router from reflector.views.rtc_offer import router as rtc_offer_router @@ -88,6 +89,7 @@ app.include_router(user_router, prefix="/v1") app.include_router(zulip_router, prefix="/v1") app.include_router(whereby_router, prefix="/v1") app.include_router(jitsi_router, prefix="/v1") +app.include_router(jibri_webhook_router) # No /v1 prefix, uses /api/v1/jibri add_pagination(app) # prepare celery diff --git a/server/reflector/jibri_events.py b/server/reflector/jibri_events.py new file mode 100644 index 00000000..b115f8bc --- /dev/null +++ b/server/reflector/jibri_events.py @@ -0,0 +1,227 @@ +import json +from pathlib import Path +from typing import Any, Dict, List, Literal, Optional, Union + +from pydantic import BaseModel +from typing_extensions import TypedDict + + +class ParticipantInfo(BaseModel): + jid: str + nick: str + id: str + is_moderator: bool = False + + +class ParticipantLeftInfo(BaseModel): + jid: str + nick: Optional[str] = None + duration_seconds: Optional[int] = None + + +class RoomCreatedEvent(BaseModel): + type: Literal["room_created"] + timestamp: int + room_name: str + room_jid: str + meeting_url: str + + +class RecordingStartedEvent(BaseModel): + type: Literal["recording_started"] + timestamp: int + room_name: str + session_id: str + jibri_jid: str + + +class RecordingStoppedEvent(BaseModel): + type: Literal["recording_stopped"] + timestamp: int + room_name: str + session_id: str + meeting_url: str + + +class ParticipantJoinedEvent(BaseModel): + type: Literal["participant_joined"] + timestamp: int + room_name: str + participant: ParticipantInfo + + +class ParticipantLeftEvent(BaseModel): + type: Literal["participant_left"] + timestamp: int + room_name: str + participant: ParticipantLeftInfo + + +class SpeakerActiveEvent(BaseModel): + type: Literal["speaker_active"] + timestamp: int + room_name: str + speaker_jid: str + speaker_nick: str + duration: int + + +class DominantSpeakerChangedEvent(BaseModel): + type: Literal["dominant_speaker_changed"] + timestamp: int + room_name: str + previous: str + current: str + + +JitsiEvent = Union[ + RoomCreatedEvent, + RecordingStartedEvent, + RecordingStoppedEvent, + ParticipantJoinedEvent, + ParticipantLeftEvent, + SpeakerActiveEvent, + DominantSpeakerChangedEvent, +] + + +class RoomInfo(TypedDict): + name: str + jid: str + created_at: int + meeting_url: str + recording_stopped_at: Optional[int] + + +class ParticipantData(TypedDict): + jid: str + nick: str + id: str + is_moderator: bool + joined_at: int + left_at: Optional[int] + duration: Optional[int] + events: List[str] + + +class SpeakerStats(TypedDict): + total_time: int + nick: str + + +class ParsedMetadata(TypedDict): + room: RoomInfo + participants: List[ParticipantData] + speaker_stats: Dict[str, SpeakerStats] + event_count: int + + +class JitsiEventParser: + def parse_event(self, event_data: Dict[str, Any]) -> Optional[JitsiEvent]: + event_type = event_data.get("type") + + try: + if event_type == "room_created": + return RoomCreatedEvent(**event_data) + elif event_type == "recording_started": + return RecordingStartedEvent(**event_data) + elif event_type == "recording_stopped": + return RecordingStoppedEvent(**event_data) + elif event_type == "participant_joined": + return ParticipantJoinedEvent(**event_data) + elif event_type == "participant_left": + return ParticipantLeftEvent(**event_data) + elif event_type == "speaker_active": + return SpeakerActiveEvent(**event_data) + elif event_type == "dominant_speaker_changed": + return DominantSpeakerChangedEvent(**event_data) + else: + return None + except Exception: + return None + + def parse_events_file(self, recording_path: str) -> ParsedMetadata: + events_file = Path(recording_path) / "events.jsonl" + + room_info: RoomInfo = { + "name": "", + "jid": "", + "created_at": 0, + "meeting_url": "", + "recording_stopped_at": None, + } + + if not events_file.exists(): + return ParsedMetadata( + room=room_info, participants=[], speaker_stats={}, event_count=0 + ) + + events: List[JitsiEvent] = [] + participants: Dict[str, ParticipantData] = {} + speaker_stats: Dict[str, SpeakerStats] = {} + + with open(events_file, "r") as f: + for line in f: + if not line.strip(): + continue + + try: + event_data = json.loads(line) + event = self.parse_event(event_data) + + if event is None: + continue + + events.append(event) + + if isinstance(event, RoomCreatedEvent): + room_info = { + "name": event.room_name, + "jid": event.room_jid, + "created_at": event.timestamp, + "meeting_url": event.meeting_url, + "recording_stopped_at": None, + } + + elif isinstance(event, ParticipantJoinedEvent): + participants[event.participant.id] = { + "jid": event.participant.jid, + "nick": event.participant.nick, + "id": event.participant.id, + "is_moderator": event.participant.is_moderator, + "joined_at": event.timestamp, + "left_at": None, + "duration": None, + "events": ["joined"], + } + + elif isinstance(event, ParticipantLeftEvent): + participant_id = event.participant.jid.split("/")[0] + if participant_id in participants: + participants[participant_id]["left_at"] = event.timestamp + participants[participant_id]["duration"] = ( + event.participant.duration_seconds + ) + participants[participant_id]["events"].append("left") + + elif isinstance(event, SpeakerActiveEvent): + if event.speaker_jid not in speaker_stats: + speaker_stats[event.speaker_jid] = { + "total_time": 0, + "nick": event.speaker_nick, + } + speaker_stats[event.speaker_jid]["total_time"] += event.duration + + elif isinstance(event, RecordingStoppedEvent): + room_info["recording_stopped_at"] = event.timestamp + room_info["meeting_url"] = event.meeting_url + + except (json.JSONDecodeError, Exception): + continue + + return ParsedMetadata( + room=room_info, + participants=list(participants.values()), + speaker_stats=speaker_stats, + event_count=len(events), + ) diff --git a/server/reflector/settings.py b/server/reflector/settings.py index fcd9f1cf..0f68f342 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -123,6 +123,9 @@ class Settings(BaseSettings): # Whereby integration WHEREBY_API_URL: str = "https://api.whereby.dev/v1" WHEREBY_API_KEY: NonEmptyString | None = None + + # Jibri integration + JIBRI_RECORDINGS_PATH: str = "/recordings" WHEREBY_WEBHOOK_SECRET: str | None = None AWS_WHEREBY_ACCESS_KEY_ID: str | None = None AWS_WHEREBY_ACCESS_KEY_SECRET: str | None = None diff --git a/server/reflector/views/jibri_webhook.py b/server/reflector/views/jibri_webhook.py new file mode 100644 index 00000000..b46f0f78 --- /dev/null +++ b/server/reflector/views/jibri_webhook.py @@ -0,0 +1,126 @@ +from pathlib import Path +from typing import Annotated, Any, Dict, Optional + +import structlog +from fastapi import APIRouter, Depends, HTTPException +from pydantic import BaseModel + +import reflector.auth as auth +from reflector.db.transcripts import SourceKind, transcripts_controller +from reflector.jibri_events import JitsiEventParser +from reflector.pipelines.main_file_pipeline import task_pipeline_file_process +from reflector.settings import settings + +logger = structlog.get_logger(__name__) + +router = APIRouter(prefix="/api/v1/jibri", tags=["jibri"]) + + +class RecordingReadyRequest(BaseModel): + session_id: str + path: str # Relative path from recordings directory + meeting_url: str + + +@router.post("/recording-ready") +async def handle_recording_ready( + request: RecordingReadyRequest, + user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)], +) -> Dict[str, Any]: + user_id = user["sub"] if user else None + + recordings_base = Path(settings.JIBRI_RECORDINGS_PATH or "/recordings") + recording_path = recordings_base / request.path + + if not recording_path.exists(): + raise HTTPException( + status_code=404, detail=f"Recording path not found: {request.path}" + ) + + recording_file = recording_path / "recording.mp4" + events_file = recording_path / "events.jsonl" + + if not recording_file.exists(): + raise HTTPException(status_code=404, detail="Recording file not found") + + # Parse events if available + metadata = {} + participant_count = 0 + + if events_file.exists(): + parser = JitsiEventParser() + metadata = parser.parse_events_file(str(recording_path)) + participant_count = len(metadata.get("participants", [])) + logger.info( + "Parsed Jibri events", + session_id=request.session_id, + event_count=metadata.get("event_count", 0), + participant_count=participant_count, + ) + else: + logger.warning("No events file found", session_id=request.session_id) + metadata = { + "room": {"meeting_url": request.meeting_url, "name": request.session_id}, + "participants": [], + "speaker_stats": {}, + "event_count": 0, + } + + # Create transcript using controller + title = f"Meeting: {metadata.get('room', {}).get('name', request.session_id)}" + transcript = await transcripts_controller.add( + name=title, + source_kind=SourceKind.FILE, + source_language="en", + target_language="en", + user_id=user_id, + ) + + # Store Jitsi data in appropriate fields + update_data = {} + + # Store participants if available + if metadata.get("participants"): + update_data["participants"] = metadata["participants"] + + # Store events data (room info, speaker stats, etc.) + update_data["events"] = { + "jitsi_metadata": metadata, + "session_id": request.session_id, + "recording_path": str(recording_path), + "meeting_url": request.meeting_url, + } + + if update_data: + await transcripts_controller.update(transcript, update_data) + + # Copy recording file to transcript data path + # The pipeline expects the file to be in the transcript's data path + upload_file = transcript.data_path / "upload.mp4" + upload_file.parent.mkdir(parents=True, exist_ok=True) + + # Create symlink or copy the file + import shutil + + shutil.copy2(recording_file, upload_file) + + # Update status to uploaded + await transcripts_controller.update(transcript, {"status": "uploaded"}) + + # Trigger processing pipeline + task_pipeline_file_process.delay(transcript_id=transcript.id) + + logger.info( + "Jibri recording ready for processing", + transcript_id=transcript.id, + session_id=request.session_id, + participant_count=participant_count, + ) + + return { + "status": "accepted", + "transcript_id": transcript.id, + "session_id": request.session_id, + "events_found": events_file.exists(), + "participant_count": participant_count, + } diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index e9468bd2..ba8e29c2 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -20,6 +20,7 @@ else: "reflector.worker.healthcheck", "reflector.worker.process", "reflector.worker.cleanup", + "reflector.worker.jitsi_events", ] ) @@ -33,6 +34,10 @@ else: "task": "reflector.worker.process.process_meetings", "schedule": float(settings.SQS_POLLING_TIMEOUT_SECONDS), }, + "process_jitsi_events": { + "task": "reflector.worker.jitsi_events.process_jitsi_events", + "schedule": 5.0, # Process every 5 seconds + }, "reprocess_failed_recordings": { "task": "reflector.worker.process.reprocess_failed_recordings", "schedule": crontab(hour=5, minute=0), # Midnight EST diff --git a/server/reflector/worker/jitsi_events.py b/server/reflector/worker/jitsi_events.py new file mode 100644 index 00000000..0fa75573 --- /dev/null +++ b/server/reflector/worker/jitsi_events.py @@ -0,0 +1,281 @@ +""" +Celery tasks for consuming Jitsi events from Redis queues. +""" + +import json +from datetime import datetime +from typing import Any, Dict + +import redis +import structlog +from sqlalchemy.orm import Session + +from reflector.database import get_db_sync +from reflector.models import Meeting, Transcript +from reflector.settings import settings +from reflector.worker.app import app + +logger = structlog.get_logger(__name__) + + +class JitsiEventProcessor: + """Process Jitsi events from Redis queues.""" + + def __init__(self): + self.redis_client = redis.Redis( + host=settings.REDIS_HOST or "redis", + port=settings.REDIS_PORT or 6379, + decode_responses=True, + ) + self.participants = {} # room_name -> {jid: participant_info} + self.speaker_stats = {} # room_name -> {jid: stats} + + def process_participant_joined(self, data: Dict[str, Any], db: Session): + """Track participant joining a room.""" + room_name = data["room_name"] + participant = { + "jid": data["participant_jid"], + "nick": data["participant_nick"], + "id": data["participant_id"], + "is_moderator": data.get("is_moderator", False), + "joined_at": datetime.now(), + } + + if room_name not in self.participants: + self.participants[room_name] = {} + + self.participants[room_name][participant["jid"]] = participant + + logger.info( + "Participant joined", + room=room_name, + participant=participant["nick"], + total_participants=len(self.participants[room_name]), + ) + + # Update meeting in database if exists + meeting = ( + db.query(Meeting) + .filter( + Meeting.room_name == room_name, + Meeting.status.in_(["active", "pending"]), + ) + .first() + ) + + if meeting: + # Store participant info in meeting metadata + metadata = meeting.metadata or {} + if "participants" not in metadata: + metadata["participants"] = [] + + metadata["participants"].append( + { + "id": participant["id"], + "name": participant["nick"], + "joined_at": participant["joined_at"].isoformat(), + "is_moderator": participant["is_moderator"], + } + ) + + meeting.metadata = metadata + db.commit() + + def process_participant_left(self, data: Dict[str, Any], db: Session): + """Track participant leaving a room.""" + room_name = data["room_name"] + participant_jid = data["participant_jid"] + + if room_name in self.participants: + if participant_jid in self.participants[room_name]: + participant = self.participants[room_name][participant_jid] + participant["left_at"] = datetime.now() + + logger.info( + "Participant left", + room=room_name, + participant=participant["nick"], + duration=( + participant["left_at"] - participant["joined_at"] + ).total_seconds(), + ) + + # Update meeting in database + meeting = ( + db.query(Meeting) + .filter( + Meeting.room_name == room_name, + Meeting.status.in_(["active", "pending"]), + ) + .first() + ) + + if meeting and meeting.metadata and "participants" in meeting.metadata: + for p in meeting.metadata["participants"]: + if p["id"] == participant["id"]: + p["left_at"] = participant["left_at"].isoformat() + break + db.commit() + + def process_speaker_stats(self, data: Dict[str, Any], db: Session): + """Update speaker statistics.""" + room_name = data["room_jid"].split("@")[0] + self.speaker_stats[room_name] = data["stats"] + + logger.debug( + "Speaker stats updated", room=room_name, speakers=len(data["stats"]) + ) + + def process_recording_completed(self, data: Dict[str, Any], db: Session): + """Process completed recording with all metadata.""" + room_name = data["room_name"] + meeting_url = data["meeting_url"] + recording_path = data["recording_path"] + recording_file = data["recording_file"] + + logger.info( + "Recording completed", room=room_name, url=meeting_url, path=recording_path + ) + + # Get participant data for this room + participants = self.participants.get(room_name, {}) + speaker_stats = self.speaker_stats.get(room_name, {}) + + # Create transcript record with full metadata + transcript = Transcript( + title=f"Recording: {room_name}", + source_url=meeting_url, + metadata={ + "jitsi": { + "room_name": room_name, + "meeting_url": meeting_url, + "recording_path": recording_path, + "participants": [ + { + "id": p["id"], + "name": p["nick"], + "joined_at": p["joined_at"].isoformat(), + "left_at": p.get("left_at", datetime.now()).isoformat(), + "is_moderator": p["is_moderator"], + "speaking_time": speaker_stats.get(p["jid"], {}).get( + "total_time", 0 + ), + } + for p in participants.values() + ], + "speaker_stats": speaker_stats, + } + }, + status="pending", + ) + db.add(transcript) + db.commit() + + # Trigger processing pipeline + from reflector.pipelines.main_transcript_pipeline import TranscriptMainPipeline + + pipeline = TranscriptMainPipeline() + pipeline.create(transcript.id, recording_file) + + # Clean up room data + self.participants.pop(room_name, None) + self.speaker_stats.pop(room_name, None) + + logger.info( + "Transcript created", + transcript_id=transcript.id, + participants=len(participants), + has_speaker_stats=bool(speaker_stats), + ) + + +processor = JitsiEventProcessor() + + +@app.task(name="reflector.worker.jitsi_events.process_jitsi_events") +def process_jitsi_events(): + """ + Process Jitsi events from Redis queue. + This should be called periodically by Celery Beat. + """ + db = next(get_db_sync()) + processed = 0 + + try: + # Process up to 100 events per run + for _ in range(100): + # Pop event from queue (blocking with 1 second timeout) + event_data = processor.redis_client.brpop( + ["jitsi:events:queue", "jitsi:recordings:queue"], timeout=1 + ) + + if not event_data: + break + + queue_name, event_json = event_data + event = json.loads(event_json) + + event_type = event["type"] + data = event["data"] + + logger.debug(f"Processing event: {event_type}") + + # Route to appropriate processor + if event_type == "participant_joined": + processor.process_participant_joined(data, db) + elif event_type == "participant_left": + processor.process_participant_left(data, db) + elif event_type == "speaker_stats_update": + processor.process_speaker_stats(data, db) + elif event_type == "recording_completed": + processor.process_recording_completed(data, db) + else: + logger.warning(f"Unknown event type: {event_type}") + + processed += 1 + + if processed > 0: + logger.info(f"Processed {processed} Jitsi events") + + except Exception as e: + logger.error(f"Error processing Jitsi events: {e}") + raise + finally: + db.close() + + return processed + + +@app.task(name="reflector.worker.jitsi_events.consume_jitsi_stream") +def consume_jitsi_stream(): + """ + Alternative: Use Redis Streams for more reliable event processing. + Redis Streams provide better guarantees and consumer groups. + """ + db = next(get_db_sync()) + + try: + # Read from stream with consumer group + events = processor.redis_client.xreadgroup( + "reflector-consumers", + "worker-1", + {"jitsi:events": ">"}, + count=10, + block=1000, + ) + + for stream_name, messages in events: + for message_id, data in messages: + event = json.loads(data[b"event"]) + # Process event... + + # Acknowledge message + processor.redis_client.xack( + stream_name, "reflector-consumers", message_id + ) + + except Exception as e: + logger.error(f"Error consuming stream: {e}") + raise + finally: + db.close() diff --git a/server/run_jibri_tests.py b/server/run_jibri_tests.py new file mode 100644 index 00000000..ba215b84 --- /dev/null +++ b/server/run_jibri_tests.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python +"""Simple test runner for Jibri tests that doesn't require Docker.""" + +import os +import sys + +sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) + +# Import test functions after path is set +exec(open("tests/test_jibri_events.py").read(), globals()) + + +def run_tests(): + tests = [ + ("test_parse_room_created_event", test_parse_room_created_event), + ("test_parse_participant_joined_event", test_parse_participant_joined_event), + ( + "test_parse_unknown_event_returns_none", + test_parse_unknown_event_returns_none, + ), + ( + "test_parse_events_file_with_complete_session", + test_parse_events_file_with_complete_session, + ), + ("test_parse_events_file_missing_file", test_parse_events_file_missing_file), + ] + + passed = 0 + failed = 0 + + for name, test_func in tests: + try: + test_func() + print(f"✓ {name}") + passed += 1 + except AssertionError as e: + print(f"✗ {name}: {e}") + failed += 1 + except Exception as e: + print(f"✗ {name}: Unexpected error: {e}") + failed += 1 + + print(f"\nResults: {passed} passed, {failed} failed") + return failed == 0 + + +if __name__ == "__main__": + success = run_tests() + sys.exit(0 if success else 1) diff --git a/server/tests/test_jibri_events.py b/server/tests/test_jibri_events.py new file mode 100644 index 00000000..3903b93b --- /dev/null +++ b/server/tests/test_jibri_events.py @@ -0,0 +1,122 @@ +import json +import tempfile +from pathlib import Path + +from reflector.jibri_events import ( + JitsiEventParser, + ParticipantJoinedEvent, + RoomCreatedEvent, +) + + +def test_parse_room_created_event(): + parser = JitsiEventParser() + event_data = { + "type": "room_created", + "timestamp": 1234567890, + "room_name": "TestRoom", + "room_jid": "testroom@conference.meet.jitsi", + "meeting_url": "https://meet.jitsi/TestRoom", + } + + event = parser.parse_event(event_data) + + assert isinstance(event, RoomCreatedEvent) + assert event.room_name == "TestRoom" + assert event.meeting_url == "https://meet.jitsi/TestRoom" + + +def test_parse_participant_joined_event(): + parser = JitsiEventParser() + event_data = { + "type": "participant_joined", + "timestamp": 1234567891, + "room_name": "TestRoom", + "participant": { + "jid": "user1@meet.jitsi/resource", + "nick": "John Doe", + "id": "user1@meet.jitsi", + "is_moderator": False, + }, + } + + event = parser.parse_event(event_data) + + assert isinstance(event, ParticipantJoinedEvent) + assert event.participant.nick == "John Doe" + assert event.participant.is_moderator is False + + +def test_parse_unknown_event_returns_none(): + parser = JitsiEventParser() + event_data = {"type": "unknown_event", "timestamp": 1234567890} + + event = parser.parse_event(event_data) + assert event is None + + +def test_parse_events_file_with_complete_session(): + parser = JitsiEventParser() + + with tempfile.TemporaryDirectory() as tmpdir: + events_file = Path(tmpdir) / "events.jsonl" + + events = [ + { + "type": "room_created", + "timestamp": 1234567890, + "room_name": "TestRoom", + "room_jid": "testroom@conference.meet.jitsi", + "meeting_url": "https://meet.jitsi/TestRoom", + }, + { + "type": "participant_joined", + "timestamp": 1234567892, + "room_name": "TestRoom", + "participant": { + "jid": "user1@meet.jitsi/resource", + "nick": "John Doe", + "id": "user1@meet.jitsi", + "is_moderator": False, + }, + }, + { + "type": "speaker_active", + "timestamp": 1234567895, + "room_name": "TestRoom", + "speaker_jid": "user1@meet.jitsi", + "speaker_nick": "John Doe", + "duration": 10, + }, + { + "type": "participant_left", + "timestamp": 1234567920, + "room_name": "TestRoom", + "participant": { + "jid": "user1@meet.jitsi/resource", + "duration_seconds": 28, + }, + }, + ] + + with open(events_file, "w") as f: + for event in events: + f.write(json.dumps(event) + "\n") + + metadata = parser.parse_events_file(tmpdir) + + assert metadata["room"]["name"] == "TestRoom" + assert metadata["room"]["meeting_url"] == "https://meet.jitsi/TestRoom" + assert len(metadata["participants"]) == 1 + assert metadata["event_count"] == 4 + + +def test_parse_events_file_missing_file(): + parser = JitsiEventParser() + + with tempfile.TemporaryDirectory() as tmpdir: + metadata = parser.parse_events_file(tmpdir) + + assert metadata["room"]["name"] == "" + assert len(metadata["participants"]) == 0 + assert metadata["event_count"] == 0 diff --git a/server/tests/test_jibri_webhook.py b/server/tests/test_jibri_webhook.py new file mode 100644 index 00000000..5cfbafbe --- /dev/null +++ b/server/tests/test_jibri_webhook.py @@ -0,0 +1,254 @@ +import json +import tempfile +from pathlib import Path +from unittest.mock import Mock, patch + +import pytest +from fastapi.testclient import TestClient +from sqlalchemy.orm import Session + +from reflector.api.jibri_webhook import router +from reflector.models import Transcript + + +@pytest.fixture +def client(): + from fastapi import FastAPI + + app = FastAPI() + app.include_router(router) + return TestClient(app) + + +@pytest.fixture +def mock_db(): + db = Mock(spec=Session) + db.add = Mock() + db.commit = Mock() + db.refresh = Mock() + return db + + +@pytest.fixture +def mock_settings(): + with patch("reflector.api.jibri_webhook.settings") as mock: + mock.JIBRI_RECORDINGS_PATH = "/recordings" + yield mock + + +@pytest.fixture +def mock_pipeline(): + with patch("reflector.api.jibri_webhook.TranscriptMainPipeline") as mock: + pipeline_instance = Mock() + pipeline_instance.create = Mock() + mock.return_value = pipeline_instance + yield mock + + +class TestJibriWebhook: + def test_recording_ready_success_with_events( + self, client, mock_db, mock_settings, mock_pipeline + ): + with tempfile.TemporaryDirectory() as tmpdir: + mock_settings.JIBRI_RECORDINGS_PATH = tmpdir + + # Create recording directory and files + session_id = "test-session-123" + recording_dir = Path(tmpdir) / session_id + recording_dir.mkdir() + + recording_file = recording_dir / "recording.mp4" + recording_file.write_text("fake video content") + + events_file = recording_dir / "events.jsonl" + events = [ + { + "type": "room_created", + "timestamp": 1234567890, + "room_name": "TestRoom", + "room_jid": "testroom@conference.meet.jitsi", + "meeting_url": "https://meet.jitsi/TestRoom", + }, + { + "type": "participant_joined", + "timestamp": 1234567892, + "room_name": "TestRoom", + "participant": { + "jid": "user1@meet.jitsi/resource", + "nick": "John Doe", + "id": "user1@meet.jitsi", + "is_moderator": False, + }, + }, + ] + + with open(events_file, "w") as f: + for event in events: + f.write(json.dumps(event) + "\n") + + # Mock database dependency + with patch("reflector.api.jibri_webhook.get_db") as mock_get_db: + mock_get_db.return_value = mock_db + + response = client.post( + "/api/v1/jibri/recording-ready", + json={ + "session_id": session_id, + "path": session_id, + "meeting_url": "https://meet.jitsi/TestRoom", + }, + ) + + assert response.status_code == 200 + data = response.json() + + assert data["status"] == "accepted" + assert data["session_id"] == session_id + assert data["events_found"] is True + assert data["participant_count"] == 1 + + # Verify transcript was created + mock_db.add.assert_called_once() + transcript_arg = mock_db.add.call_args[0][0] + assert isinstance(transcript_arg, Transcript) + assert "TestRoom" in transcript_arg.title + assert transcript_arg.metadata["jitsi"]["room"]["name"] == "TestRoom" + + # Verify pipeline was triggered + mock_pipeline.return_value.create.assert_called_once() + + def test_recording_ready_success_without_events( + self, client, mock_db, mock_settings, mock_pipeline + ): + with tempfile.TemporaryDirectory() as tmpdir: + mock_settings.JIBRI_RECORDINGS_PATH = tmpdir + + session_id = "test-session-456" + recording_dir = Path(tmpdir) / session_id + recording_dir.mkdir() + + recording_file = recording_dir / "recording.mp4" + recording_file.write_text("fake video content") + + with patch("reflector.api.jibri_webhook.get_db") as mock_get_db: + mock_get_db.return_value = mock_db + + response = client.post( + "/api/v1/jibri/recording-ready", + json={ + "session_id": session_id, + "path": session_id, + "meeting_url": "https://meet.jitsi/NoEventsRoom", + }, + ) + + assert response.status_code == 200 + data = response.json() + + assert data["status"] == "accepted" + assert data["events_found"] is False + assert data["participant_count"] == 0 + + # Verify transcript was created with minimal metadata + mock_db.add.assert_called_once() + transcript_arg = mock_db.add.call_args[0][0] + assert transcript_arg.metadata["jitsi"]["participants"] == [] + + def test_recording_ready_path_not_found(self, client, mock_settings): + with tempfile.TemporaryDirectory() as tmpdir: + mock_settings.JIBRI_RECORDINGS_PATH = tmpdir + + response = client.post( + "/api/v1/jibri/recording-ready", + json={ + "session_id": "nonexistent", + "path": "nonexistent", + "meeting_url": "https://meet.jitsi/Test", + }, + ) + + assert response.status_code == 404 + assert "Recording path not found" in response.json()["detail"] + + def test_recording_ready_recording_file_not_found(self, client, mock_settings): + with tempfile.TemporaryDirectory() as tmpdir: + mock_settings.JIBRI_RECORDINGS_PATH = tmpdir + + session_id = "test-no-recording" + recording_dir = Path(tmpdir) / session_id + recording_dir.mkdir() + + # No recording.mp4 file created + + response = client.post( + "/api/v1/jibri/recording-ready", + json={ + "session_id": session_id, + "path": session_id, + "meeting_url": "https://meet.jitsi/Test", + }, + ) + + assert response.status_code == 404 + assert "Recording file not found" in response.json()["detail"] + + def test_recording_ready_with_relative_path( + self, client, mock_db, mock_settings, mock_pipeline + ): + with tempfile.TemporaryDirectory() as tmpdir: + mock_settings.JIBRI_RECORDINGS_PATH = tmpdir + + # Create nested directory structure + session_id = "2024/01/15/test-session" + recording_dir = Path(tmpdir) / session_id + recording_dir.mkdir(parents=True) + + recording_file = recording_dir / "recording.mp4" + recording_file.write_text("fake video content") + + with patch("reflector.api.jibri_webhook.get_db") as mock_get_db: + mock_get_db.return_value = mock_db + + response = client.post( + "/api/v1/jibri/recording-ready", + json={ + "session_id": "test-session", + "path": session_id, # Relative path with subdirectories + "meeting_url": "https://meet.jitsi/Test", + }, + ) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "accepted" + + def test_recording_ready_empty_meeting_url( + self, client, mock_db, mock_settings, mock_pipeline + ): + with tempfile.TemporaryDirectory() as tmpdir: + mock_settings.JIBRI_RECORDINGS_PATH = tmpdir + + session_id = "test-session" + recording_dir = Path(tmpdir) / session_id + recording_dir.mkdir() + + recording_file = recording_dir / "recording.mp4" + recording_file.write_text("fake video content") + + with patch("reflector.api.jibri_webhook.get_db") as mock_get_db: + mock_get_db.return_value = mock_db + + response = client.post( + "/api/v1/jibri/recording-ready", + json={ + "session_id": session_id, + "path": session_id, + "meeting_url": "", + }, + ) + + assert response.status_code == 200 + + # Verify fallback URL was used + transcript_arg = mock_db.add.call_args[0][0] + assert transcript_arg.source_url == f"jitsi://{session_id}"