Compare commits

..

40 Commits

Author SHA1 Message Date
2801ab3643 chore(main): release 0.18.0 (#722) 2025-11-14 16:10:26 -05:00
Igor Monadical
b20cad76e6 feat: daily QOL: participants dictionary (#721)
* daily QOL: participants dictionary

* meeting deactivation fix

* meeting deactivation fix

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-14 14:31:52 -05:00
28a7258e45 fix: add proccessing page to file upload and reprocessing (#650) 2025-11-14 14:28:39 +01:00
a9a4f32324 fix: copy transcript (#674)
* Copy transcript

* Fix share copy transcript

* Move copy button above transcript
2025-11-14 13:36:25 +01:00
Igor Monadical
857e035562 fix whereby reprocess logic branch (#720)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-13 11:35:29 -05:00
34a3f5618c chore(main): release 0.17.0 (#717) 2025-11-12 21:25:59 -05:00
Igor Monadical
1473fd82dc feat: daily.co support as alternative to whereby (#691)
* llm instructions

* vibe dailyco

* vibe dailyco

* doc update (vibe)

* dont show recording ui on call

* stub processor (vibe)

* stub processor (vibe) self-review

* stub processor (vibe) self-review

* chore(main): release 0.14.0 (#670)

* Add multitrack pipeline

* Mixdown audio tracks

* Mixdown with pyav filter graph

* Trigger multitrack processing for daily recordings

* apply platform from envs in priority: non-dry

* Use explicit track keys for processing

* Align tracks of a multitrack recording

* Generate waveforms for the mixed audio

* Emit multriack pipeline events

* Fix multitrack pipeline track alignment

* dailico docs

* Enable multitrack reprocessing

* modal temp files uniform names, cleanup. remove llm temporary docs

* docs cleanup

* dont proceed with raw recordings if any of the downloads fail

* dry transcription pipelines

* remove is_miltitrack

* comments

* explicit dailyco room name

* docs

* remove stub data/method

* frontend daily/whereby code self-review (no-mistake)

* frontend daily/whereby code self-review (no-mistakes)

* frontend daily/whereby code self-review (no-mistakes)

* consent cleanup for multitrack (no-mistakes)

* llm fun

* remove extra comments

* fix tests

* merge migrations

* Store participant names

* Get participants by meeting session id

* pop back main branch migration

* s3 paddington (no-mistakes)

* comment

* pr comments

* pr comments

* pr comments

* platform / meeting cleanup

* Use participant names in summary generation

* platform assignment to meeting at controller level

* pr comment

* room playform properly default none

* room playform properly default none

* restore migration lost

* streaming WIP

* extract storage / use common storage / proper env vars for storage

* fix mocks tests

* remove fall back

* streaming for multifile

* cenrtal storage abstraction (no-mistakes)

* remove dead code / vars

* Set participant user id for authenticated users

* whereby recording name parsing fix

* whereby recording name parsing fix

* more file stream

* storage dry + tests

* remove homemade boto3 streaming and use proper boto

* update migration guide

* webhook creation script - print uuid

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
Co-authored-by: Mathieu Virbel <mat@meltingrocks.com>
Co-authored-by: Sergey Mankovsky <sergey@monadical.com>
2025-11-12 21:21:16 -05:00
372202b0e1 feat: add API key management UI (#716)
* feat: add API key management UI

- Created settings page for users to create, view, and delete API keys
- Added Settings link to app navigation header
- Fixed delete operation return value handling in backend to properly handle asyncpg's None response

* feat: replace browser confirm with dialog for API key deletion

- Added Chakra UI Dialog component for better UX when confirming API key deletion
- Implemented proper focus management with cancelRef for accessibility
- Replaced native browser confirm() with controlled dialog state

* style: format API keys page with consistent line breaks

* feat: auto-select API key text for easier copying

- Added automatic text selection after API key creation to streamline the copy workflow
- Applied className to Code component for DOM targeting

* feat: improve API keys page layout and responsiveness

- Reduced max width from 1200px to 800px for better readability
- Added explicit width constraint to ensure consistent sizing across viewports

* refactor: remove redundant comments from API keys page
2025-11-10 18:25:08 -05:00
Igor Monadical
d20aac66c4 ui search pagination 2+page re-search fix (#714)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-10 14:18:41 -05:00
dc4b737daa chore(main): release 0.16.0 (#711) 2025-10-24 16:18:49 -06:00
Igor Monadical
0baff7abf7 transcript ui copy button placement (#712)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-10-24 16:52:02 -04:00
Igor Monadical
962c40e2b6 feat: search date filter (#710)
* search date filter

* search date filter

* search date filter

* search date filter

* pr comment

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-10-23 20:16:43 -04:00
Igor Monadical
3c4b9f2103 chore: error reporting and naming (#708)
* chore: error reporting and naming

* chore: error reporting and naming

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-10-22 13:45:08 -04:00
Igor Monadical
c6c035aacf removal of email-verified from /me (#707)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-10-21 14:49:33 -04:00
c086b91445 chore(main): release 0.15.0 (#706) 2025-10-21 08:30:22 -06:00
Igor Monadical
9a258abc02 feat: api tokens (#705)
* feat: api tokens (vibe)

* self-review

* remove token terminology + pr comments (vibe)

* return email_verified

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-10-20 12:55:25 -04:00
af86c47f1d chore(main): release 0.14.0 (#670) 2025-10-08 14:57:31 -06:00
5f6910e513 feat: Add calendar event data to transcript webhook payload (#689)
* feat: add calendar event data to transcript webhook payload and implement get_by_id method

* Update server/reflector/worker/webhook.py

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

* Update server/reflector/worker/webhook.py

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

* style: format conditional time fields with line breaks for better readability

* docs: add calendar event fields to transcript.completed webhook payload schema

---------

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
2025-10-08 11:11:57 -05:00
9a71af145e fix: update transcript list on reprocess (#676)
* Update transcript list on reprocess

* Fix transcript create

* Fix multiple sockets issue

* Pass token in sec websocket protocol

* userEvent parse example

* transcript list invalidation non-abstraction

* Emit only relevant events to the user room

* Add ws close code const

* Refactor user websocket endpoint

* Refactor user events provider

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-10-07 19:11:30 +02:00
eef6dc3903 fix: upgrade nemo toolkit (#678) 2025-10-07 16:45:02 +02:00
Igor Monadical
1dee255fed parakeet endpoint doc (#679)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-10-07 10:41:01 -04:00
5d98754305 fix: security review (#656)
* Add security review doc

* Add tests to reproduce security issues

* Fix security issues

* Fix tests

* Set auth auth backend for tests

* Fix ics api tests

* Fix transcript mutate check

* Update frontent env var names

* Remove permissions doc
2025-09-29 23:07:49 +02:00
Igor Monadical
969bd84fcc feat: container build for www / github (#672)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-09-24 12:27:45 -04:00
Igor Monadical
36608849ec fix: restore feature boolean logic (#671)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-09-24 11:57:49 -04:00
Igor Monadical
5bf64b5a41 feat: docker-compose for production frontend (#664)
* docker-compose for production frontend

* fix: Remove external Redis port mapping for Coolify compatibility

Redis should only be accessible within the internal Docker network in Coolify deployments to avoid port conflicts with other applications.

* fix: Remove external port mapping for web service in Coolify

Coolify handles port exposure through its proxy (Traefik), so services should not expose ports directly in the docker-compose file.

* server side client envs

* missing vars

* nextjs experimental

* fix claude 'fix'

* remove build env vars compose

* docker

* remove ports for coolify

* review

* cleanup

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-09-24 11:15:27 -04:00
0aaa42528a chore(main): release 0.13.1 (#668) 2025-09-22 16:47:44 -06:00
565a62900f fix: TypeError on not all arguments converted during string formatting in logger (#667) 2025-09-22 16:45:28 -06:00
Igor Monadical
27016e6051 minimum release age for npm (#665)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-09-22 13:38:23 -04:00
6ddfee0b4e chore(main): release 0.13.0 (#661) 2025-09-21 20:50:47 -06:00
Igor Monadical
47716f6e5d feat: room form edit with enter (#662)
* room form edit with enter

* mobile form enter do nothing

* restore overwritten older change

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-09-19 15:14:40 -04:00
0abcebfc94 fix: invalid cleanup call (#660) 2025-09-18 10:02:30 -06:00
Igor Monadical
2b723da08b rooms-page-calendar-ics-room-name-fix (#659)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-09-17 20:02:17 -04:00
6566e04300 chore(main): release 0.12.1 (#658) 2025-09-17 17:17:22 -06:00
870e860517 fix: production blocked because having existing meeting with room_id null (#657) 2025-09-17 17:09:54 -06:00
396a95d5ce chore(main): release 0.12.0 (#654) 2025-09-17 16:44:11 -06:00
6f680b5795 feat: calendar integration (#608)
* feat: calendar integration

* feat: add ICS calendar API endpoints for room configuration and sync

* feat: add Celery background tasks for ICS sync

* feat: implement Phase 2 - Multiple active meetings per room with grace period

This commit adds support for multiple concurrent meetings per room, implementing
grace period logic and improved meeting lifecycle management for calendar integration.

## Database Changes
- Remove unique constraint preventing multiple active meetings per room
- Add last_participant_left_at field to track when meeting becomes empty
- Add grace_period_minutes field (default: 15) for configurable grace period

## Meeting Controller Enhancements
- Add get_all_active_for_room() to retrieve all active meetings for a room
- Add get_active_by_calendar_event() to find meetings by calendar event ID
- Maintain backward compatibility with existing get_active() method

## New API Endpoints
- GET /rooms/{room_name}/meetings/active - List all active meetings
- POST /rooms/{room_name}/meetings/{meeting_id}/join - Join specific meeting

## Meeting Lifecycle Improvements
- 15-minute grace period after last participant leaves
- Automatic reactivation when participant rejoins during grace period
- Force close calendar meetings 30 minutes after scheduled end time
- Update process_meetings task to handle multiple active meetings

## Whereby Integration
- Clear grace period when participants join via webhook events
- Track participant count for grace period management

## Testing
- Add comprehensive tests for multiple active meetings
- Test grace period behavior and participant rejoin scenarios
- Test calendar meeting force closure logic
- All 5 new tests passing

This enables proper calendar integration with overlapping meetings while
preventing accidental meeting closures through the grace period mechanism.

* feat: implement frontend for calendar integration (Phase 3 & 4)

- Created MeetingSelection component for choosing between multiple active meetings
- Shows both active meetings and upcoming calendar events (30 min ahead)
- Displays meeting metadata with privacy controls (owner-only details)
- Supports creation of unscheduled meetings alongside calendar meetings

- Added waiting page for users joining before scheduled start time
- Shows countdown timer until meeting begins
- Auto-transitions to meeting when calendar event becomes active
- Handles early joining with proper routing

- Created collapsible info panel showing meeting details
- Displays calendar metadata (title, description, attendees)
- Shows participant count and duration
- Privacy-aware: sensitive info only visible to room owners

- Integrated ICS settings into room configuration dialog
- Test connection functionality with immediate feedback
- Manual sync trigger with detailed results
- Shows last sync time and ETag for monitoring
- Configurable sync intervals (1 min to 1 hour)

- New /room/{roomName} route for meeting selection
- Waiting room at /room/{roomName}/wait?eventId={id}
- Classic room page at /{roomName} with meeting info
- Uses sessionStorage to pass selected meeting between pages

- Added new endpoints for active/upcoming meetings
- Regenerated TypeScript client with latest OpenAPI spec
- Proper error handling and loading states
- Auto-refresh every 30 seconds for live updates

- Color-coded badges for meeting status
- Attendee status indicators (accepted/declined/tentative)
- Responsive design with Chakra UI components
- Clear visual hierarchy between active and upcoming meetings
- Smart truncation for long attendee lists

This completes the frontend implementation for calendar integration,
enabling users to seamlessly join scheduled meetings from their
calendar applications.

* WIP: Migrate calendar integration frontend to React Query

- Migrate all calendar components from useApi to React Query hooks
- Fix Chakra UI v3 compatibility issues (Card, Progress, spacing props, leftIcon)
- Update backend Meeting model to include calendar fields
- Replace imperative API calls with declarative React Query patterns
- Remove old OpenAPI generated files that conflict with new structure

* fix: alembic migrations

* feat: add calendar migration

* feat: update ics, first version working

* feat: implement tabbed interface for room edit dialog

- Add General, Calendar, and Share tabs to organize room settings
- Move ICS settings to dedicated Calendar tab
- Move Zulip configuration to Share tab
- Keep basic room settings and webhooks in General tab
- Remove redundant migration file
- Fix Chakra UI v3 compatibility issues in calendar components

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: infinite loop

* feat: improve ICS calendar sync UX and fix room URL matching

- Replace "Test Connection" button with "Force Sync" button (Edit Room only)
- Show detailed sync results: total events downloaded vs room matches
- Remove emoticons and auto-hide timeout for cleaner UX
- Fix room URL matching to use UI_BASE_URL instead of BASE_URL
- Replace FaSync icon with LuRefreshCw for consistency
- Clear sync results when dialog closes or Force Sync pressed
- Update tests to reflect UI_BASE_URL change and exact URL matching

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: reorganize room edit dialog and fix Force Sync button

- Move WebHook configuration from General to dedicated WebHook tab
- Add WebHook tab after Share tab in room edit dialog
- Fix Force Sync button not appearing by adding missing isEditing prop
- Fix indentation issues in MeetingSelection component

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: complete calendar integration with UI improvements and code cleanup

Calendar Integration Tasks:
- Update upcoming meetings window from 30 to 120 minutes
- Include currently happening events in upcoming meetings API
- Create shared time utility functions (formatDateTime, formatCountdown, formatStartedAgo)
- Improve ongoing meetings UI logic with proper time detection
- Fix backend code organization and remove excessive documentation

UI/UX Improvements:
- Restructure room page layout using MinimalHeader pattern
- Remove borders from header and footer elements
- Change button text from "Leave Meeting" to "Leave Room"
- Remove "Back to Reflector" footer for cleaner design
- Extract WaitPageClient component for better separation

Backend Changes:
- calendar_events.py: Fix import organization and extend timing window
- rooms.py: Update API default from 30 to 120 minutes
- Enhanced test coverage for ongoing meeting scenarios

Frontend Changes:
- MinimalHeader: Add onLeave prop for custom navigation
- MeetingSelection: Complete layout restructure with shared utilities
- timeUtils: New shared utility file for consistent time formatting

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: remove wait page and simplify Join button with 5-minute disable logic

- Remove entire wait page directory and associated files
- Update handleJoinUpcoming to create unscheduled meeting directly
- Simplify Join button to single state:
  - Always shows "Join" text
  - Blue when meeting can be joined (ongoing or within 5 minutes)
  - Gray/disabled when more than 5 minutes away
- Remove confusing "Join Now", "Join Early" text variations

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: improve calendar integration and meeting UI

- Refactor ICS sync tasks to use @asynctask decorator for cleaner async handling
- Extract meeting creation logic into reusable function
- Improve meeting selection UI with distinct current/upcoming sections
- Add early join functionality for upcoming meetings within 5-minute window
- Simplify non-ICS room workflow with direct Whereby embed
- Fix import paths and component organization

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: restore original recording consent functionality

- Remove custom ConsentDialogButton and WherebyEmbed components
- Merge RoomClient logic back into main room page
- Restore original consent UI: blue button with toast modal
- Maintain calendar integration features for ICS-enabled rooms
- Add consent-handler.md documentation of original functionality
- Preserve focus management and accessibility features

* fix: redirect Join Now button to local meeting page

- Change handleJoinDirect to use onMeetingSelect instead of opening external URL
- Join Now button now navigates to /{roomName}/{meetingId} instead of whereby.com
- Maintains proper routing within the application

* feat: remove restrictive message for non-owners in private rooms

- Remove confusing message about room owner permissions
- Cleaner UI for all users regardless of ownership status
- Users will only see available meetings and join options

* feat: improve meeting selection UI for better readability

- Limit page content to max 800px width for better 4K display readability
- Remove LIVE tag badge for cleaner interface
- Remove shadow from main live meeting box
- Remove blue border and hover effects for minimal design
- Change background to neutral gray for less visual noise

* feat: add room by name endpoint for non-authenticated access

- Add GET /rooms/name/{room_name} backend endpoint
- Endpoint supports non-authenticated access for public rooms
- Returns RoomDetails with webhook fields hidden for non-owners
- Update useRoomGetByName hook to use new direct endpoint
- Remove authentication requirement from frontend hook
- Regenerate API client types

Fixes: Non-authenticated users can now access room lobbies

* feat: add friendly message when no meetings are ongoing

- Show centered message with calendar icon when no meetings are active
- Message text: 'No meetings right now' with helpful description
- Contextual text for owners/shared rooms mentioning quick meeting option
- Consistent gray styling matching the rest of the interface
- Only displays when both currentMeetings and upcomingMeetings are empty

* style: center no meetings message and remove background

- Change from Box to Flex with flex=1 for vertical centering
- Remove gray background, border radius, and padding
- Message now appears cleanly centered in available space
- Maintains horizontal and vertical centering

* feat: move Create Meeting button to header

- Remove 'Start a Quick Meeting' box from main content area
- Add showCreateButton and onCreateMeeting props to MinimalHeader
- Create Meeting button now appears in header left of Leave Room
- Only shows for room owners or shared room users
- Update no meetings message to remove reference to quick meeting below
- Cleaner, more accessible UI with actions in the header

* style: change room title and no meetings text to pure black

- Update room title in MinimalHeader from gray.700 to black
- Update 'No meetings right now' text from gray.700 to black
- Improves visual hierarchy and readability
- Consistent with other pages' styling

* style: linting

* fix: remove plan files

* fix: alembic migration with named foreign keys

* feat: add SyncStatus enum and refactor ICS sync to use rooms controller

- Add SyncStatus enum to replace string literals in ICS sync status
- Replace direct SQL queries in worker with rooms_controller.get_ics_enabled()
- Improve type safety and maintainability of ICS sync code
- Enum values: SUCCESS, UNCHANGED, ERROR, SKIPPED maintain backward compatibility

* refactor: remove unnecessary docstring from get_ics_enabled method

The function name is self-explanatory

* fix: import top level

* feat: use Literal type for ICSStatus.status field

- Changed ICSStatus.status from str to Literal['enabled', 'disabled']
- Improves type safety and API documentation

* feat: update TypeScript definitions for ICSStatus Literal type

- OpenAPI generation now properly reflects Literal['enabled', 'disabled'] type
- Improves type safety for frontend consumers of the API
- Applied automatic formatting via pre-commit hooks

* refactor: replace loguru with structlog in ics_sync service

- Replace loguru import with structlog in services/ics_sync.py
- Update logging calls to use structlog's structured format with keyword args
- Maintains consistency with other services using structlog
- Changes: logger.info(f'...') -> logger.info('...', key=value) format

* chore: remove loguru dependency and improve type annotations

- Remove loguru from dependencies in pyproject.toml (replaced with structlog)
- Update meeting controller methods to properly return Optional types
- Update dependency lock file after loguru removal

* fix: resolve pyflakes warnings in ics_sync and meetings modules

Remove unused imports and variables to clean up code quality

* Remove grace period logic and improve meeting deactivation

- Removed grace_period_minutes and last_participant_left_at fields
- Simplified deactivation logic based on actual usage patterns:
  * Active sessions: Keep meeting active regardless of scheduled time
  * Calendar meetings: Wait until scheduled end if unused, deactivate immediately once used and empty
  * On-the-fly meetings: Deactivate immediately when empty
- Created migration to drop unused database columns
- Updated tests to remove grace period test cases

* Update test to match new deactivation logic for calendar meetings

* fix: remove unwanted file

* fix: incompleted changes from EVENT_WINDOW*

* fix: update room ICS API tests to include required webhook fields and correct URL

- Add webhook_url and webhook_secret fields to room creation tests
- Fix room URL matching in ICS sync test to use UI_BASE_URL instead of BASE_URL
- Aligns test with actual API requirements and ICS sync service implementation

* fix: add Redis distributed locking to prevent race conditions in process_meetings

- Implement per-meeting locks using Redis to prevent concurrent processing
- Add lock extension after slow API calls (Whereby) to handle long-running operations
- Use redis-py's built-in lock.extend() with replace_ttl=True for simple TTL refresh
- Track and log skipped meetings when locked by other workers
- Document SSRF analysis showing it's low-risk due to async worker isolation

This prevents multiple workers from processing the same meeting simultaneously,
which could cause state corruption or duplicate deactivations.

* refactor: rename MinimalHeader to MeetingMinimalHeader for clarity

* fix: minor code quality improvements - add emoji constants, fix type safety, cleanup comments

* fix: database migration

* self-pr review

* self-pr review

* self-pr review treeshake

* fix: local fixes

* fix: creation of meeting

* fix: meeting selection create button

* compile fix

* fix: meeting selection responsive

* fix: rework process logic for meeting

* fix: meeting useEffect frontend-only dedupe (#647)

* meeting useEffect frontend-only dedupe

* format

* also get room by name backend fix

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>

* invalidate meeting list on new meeting

* test fix

* room url copy button for ics

* calendar refresh quick action icon

* remove work.md

* meeting page frontend fixes

* hide number of meeting participants

* Revert "hide number of meeting participants"

This reverts commit 38906c5d1a.

* ui bits

* ui bits

* remove log

* room name typing stricten

* feat: protect atomic operation involving external service with redlock

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Igor Monadical <igor@monadical.com>
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-09-17 16:43:20 -06:00
ab859d65a6 feat: self-hosted gpu api (#636)
* Self-hosted gpu api

* Refactor self-hosted api

* Rename model api tests

* Use lifespan instead of startup event

* Fix self hosted imports

* Add newlines

* Add response models

* Move gpu dir to the root

* Add project description

* Refactor lifespan

* Update env var names for model api tests

* Preload diarizarion service

* Refactor uploaded file paths
2025-09-17 18:52:03 +02:00
fa049e8d06 fix: ignore player hotkeys for text inputs (#646)
* Ignore player hotkeys for text inputs

* Fix event listener effect
2025-09-16 10:57:35 +02:00
2ce7479967 chore(main): release 0.11.0 (#648) 2025-09-15 22:42:53 -06:00
b42f7cfc60 feat: remove profanity filter that was there for conference (#652) 2025-09-15 18:19:19 -06:00
243 changed files with 19499 additions and 7893 deletions

View File

@@ -1,4 +1,4 @@
name: Deploy to Amazon ECS
name: Build container/push to container registry
on: [workflow_dispatch]

57
.github/workflows/docker-frontend.yml vendored Normal file
View File

@@ -0,0 +1,57 @@
name: Build and Push Frontend Docker Image
on:
push:
branches:
- main
paths:
- 'www/**'
- '.github/workflows/docker-frontend.yml'
workflow_dispatch:
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}-frontend
jobs:
build-and-push:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Log in to GitHub Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=sha,prefix={{branch}}-
type=raw,value=latest,enable={{is_default_branch}}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: ./www
file: ./www/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: linux/amd64,linux/arm64

View File

@@ -1,5 +1,109 @@
# Changelog
## [0.18.0](https://github.com/Monadical-SAS/reflector/compare/v0.17.0...v0.18.0) (2025-11-14)
### Features
* daily QOL: participants dictionary ([#721](https://github.com/Monadical-SAS/reflector/issues/721)) ([b20cad7](https://github.com/Monadical-SAS/reflector/commit/b20cad76e69fb6a76405af299a005f1ddcf60eae))
### Bug Fixes
* add proccessing page to file upload and reprocessing ([#650](https://github.com/Monadical-SAS/reflector/issues/650)) ([28a7258](https://github.com/Monadical-SAS/reflector/commit/28a7258e45317b78e60e6397be2bc503647eaace))
* copy transcript ([#674](https://github.com/Monadical-SAS/reflector/issues/674)) ([a9a4f32](https://github.com/Monadical-SAS/reflector/commit/a9a4f32324f66c838e081eee42bb9502f38c1db1))
## [0.17.0](https://github.com/Monadical-SAS/reflector/compare/v0.16.0...v0.17.0) (2025-11-13)
### Features
* add API key management UI ([#716](https://github.com/Monadical-SAS/reflector/issues/716)) ([372202b](https://github.com/Monadical-SAS/reflector/commit/372202b0e1a86823900b0aa77be1bfbc2893d8a1))
* daily.co support as alternative to whereby ([#691](https://github.com/Monadical-SAS/reflector/issues/691)) ([1473fd8](https://github.com/Monadical-SAS/reflector/commit/1473fd82dc472c394cbaa2987212ad662a74bcac))
## [0.16.0](https://github.com/Monadical-SAS/reflector/compare/v0.15.0...v0.16.0) (2025-10-24)
### Features
* search date filter ([#710](https://github.com/Monadical-SAS/reflector/issues/710)) ([962c40e](https://github.com/Monadical-SAS/reflector/commit/962c40e2b6428ac42fd10aea926782d7a6f3f902))
## [0.15.0](https://github.com/Monadical-SAS/reflector/compare/v0.14.0...v0.15.0) (2025-10-20)
### Features
* api tokens ([#705](https://github.com/Monadical-SAS/reflector/issues/705)) ([9a258ab](https://github.com/Monadical-SAS/reflector/commit/9a258abc0209b0ac3799532a507ea6a9125d703a))
## [0.14.0](https://github.com/Monadical-SAS/reflector/compare/v0.13.1...v0.14.0) (2025-10-08)
### Features
* Add calendar event data to transcript webhook payload ([#689](https://github.com/Monadical-SAS/reflector/issues/689)) ([5f6910e](https://github.com/Monadical-SAS/reflector/commit/5f6910e5131b7f28f86c9ecdcc57fed8412ee3cd))
* container build for www / github ([#672](https://github.com/Monadical-SAS/reflector/issues/672)) ([969bd84](https://github.com/Monadical-SAS/reflector/commit/969bd84fcc14851d1a101412a0ba115f1b7cde82))
* docker-compose for production frontend ([#664](https://github.com/Monadical-SAS/reflector/issues/664)) ([5bf64b5](https://github.com/Monadical-SAS/reflector/commit/5bf64b5a41f64535e22849b4bb11734d4dbb4aae))
### Bug Fixes
* restore feature boolean logic ([#671](https://github.com/Monadical-SAS/reflector/issues/671)) ([3660884](https://github.com/Monadical-SAS/reflector/commit/36608849ec64e953e3be456172502762e3c33df9))
* security review ([#656](https://github.com/Monadical-SAS/reflector/issues/656)) ([5d98754](https://github.com/Monadical-SAS/reflector/commit/5d98754305c6c540dd194dda268544f6d88bfaf8))
* update transcript list on reprocess ([#676](https://github.com/Monadical-SAS/reflector/issues/676)) ([9a71af1](https://github.com/Monadical-SAS/reflector/commit/9a71af145ee9b833078c78d0c684590ab12e9f0e))
* upgrade nemo toolkit ([#678](https://github.com/Monadical-SAS/reflector/issues/678)) ([eef6dc3](https://github.com/Monadical-SAS/reflector/commit/eef6dc39037329b65804297786d852dddb0557f9))
## [0.13.1](https://github.com/Monadical-SAS/reflector/compare/v0.13.0...v0.13.1) (2025-09-22)
### Bug Fixes
* TypeError on not all arguments converted during string formatting in logger ([#667](https://github.com/Monadical-SAS/reflector/issues/667)) ([565a629](https://github.com/Monadical-SAS/reflector/commit/565a62900f5a02fc946b68f9269a42190ed70ab6))
## [0.13.0](https://github.com/Monadical-SAS/reflector/compare/v0.12.1...v0.13.0) (2025-09-19)
### Features
* room form edit with enter ([#662](https://github.com/Monadical-SAS/reflector/issues/662)) ([47716f6](https://github.com/Monadical-SAS/reflector/commit/47716f6e5ddee952609d2fa0ffabdfa865286796))
### Bug Fixes
* invalid cleanup call ([#660](https://github.com/Monadical-SAS/reflector/issues/660)) ([0abcebf](https://github.com/Monadical-SAS/reflector/commit/0abcebfc9491f87f605f21faa3e53996fafedd9a))
## [0.12.1](https://github.com/Monadical-SAS/reflector/compare/v0.12.0...v0.12.1) (2025-09-17)
### Bug Fixes
* production blocked because having existing meeting with room_id null ([#657](https://github.com/Monadical-SAS/reflector/issues/657)) ([870e860](https://github.com/Monadical-SAS/reflector/commit/870e8605171a27155a9cbee215eeccb9a8d6c0a2))
## [0.12.0](https://github.com/Monadical-SAS/reflector/compare/v0.11.0...v0.12.0) (2025-09-17)
### Features
* calendar integration ([#608](https://github.com/Monadical-SAS/reflector/issues/608)) ([6f680b5](https://github.com/Monadical-SAS/reflector/commit/6f680b57954c688882c4ed49f40f161c52a00a24))
* self-hosted gpu api ([#636](https://github.com/Monadical-SAS/reflector/issues/636)) ([ab859d6](https://github.com/Monadical-SAS/reflector/commit/ab859d65a6bded904133a163a081a651b3938d42))
### Bug Fixes
* ignore player hotkeys for text inputs ([#646](https://github.com/Monadical-SAS/reflector/issues/646)) ([fa049e8](https://github.com/Monadical-SAS/reflector/commit/fa049e8d068190ce7ea015fd9fcccb8543f54a3f))
## [0.11.0](https://github.com/Monadical-SAS/reflector/compare/v0.10.0...v0.11.0) (2025-09-16)
### Features
* remove profanity filter that was there for conference ([#652](https://github.com/Monadical-SAS/reflector/issues/652)) ([b42f7cf](https://github.com/Monadical-SAS/reflector/commit/b42f7cfc606783afcee792590efcc78b507468ab))
### Bug Fixes
* zulip and consent handler on the file pipeline ([#645](https://github.com/Monadical-SAS/reflector/issues/645)) ([5f143fe](https://github.com/Monadical-SAS/reflector/commit/5f143fe3640875dcb56c26694254a93189281d17))
* zulip stream and topic selection in share dialog ([#644](https://github.com/Monadical-SAS/reflector/issues/644)) ([c546e69](https://github.com/Monadical-SAS/reflector/commit/c546e69739e68bb74fbc877eb62609928e5b8de6))
## [0.10.0](https://github.com/Monadical-SAS/reflector/compare/v0.9.0...v0.10.0) (2025-09-11)

View File

@@ -151,7 +151,7 @@ All endpoints prefixed `/v1/`:
**Frontend** (`www/.env`):
- `NEXTAUTH_URL`, `NEXTAUTH_SECRET` - Authentication configuration
- `NEXT_PUBLIC_REFLECTOR_API_URL` - Backend API endpoint
- `REFLECTOR_API_URL` - Backend API endpoint
- `REFLECTOR_DOMAIN_CONFIG` - Feature flags and domain settings
## Testing Strategy

View File

@@ -168,6 +168,13 @@ You can manually process an audio file by calling the process tool:
uv run python -m reflector.tools.process path/to/audio.wav
```
## Build-time env variables
Next.js projects are more used to NEXT_PUBLIC_ prefixed buildtime vars. We don't have those for the reason we need to serve a ccustomizable prebuild docker container.
Instead, all the variables are runtime. Variables needed to the frontend are served to the frontend app at initial render.
It also means there's no static prebuild and no static files to serve for js/html.
## Feature Flags
@@ -177,24 +184,24 @@ Reflector uses environment variable-based feature flags to control application f
| Feature Flag | Environment Variable |
|-------------|---------------------|
| `requireLogin` | `NEXT_PUBLIC_FEATURE_REQUIRE_LOGIN` |
| `privacy` | `NEXT_PUBLIC_FEATURE_PRIVACY` |
| `browse` | `NEXT_PUBLIC_FEATURE_BROWSE` |
| `sendToZulip` | `NEXT_PUBLIC_FEATURE_SEND_TO_ZULIP` |
| `rooms` | `NEXT_PUBLIC_FEATURE_ROOMS` |
| `requireLogin` | `FEATURE_REQUIRE_LOGIN` |
| `privacy` | `FEATURE_PRIVACY` |
| `browse` | `FEATURE_BROWSE` |
| `sendToZulip` | `FEATURE_SEND_TO_ZULIP` |
| `rooms` | `FEATURE_ROOMS` |
### Setting Feature Flags
Feature flags are controlled via environment variables using the pattern `NEXT_PUBLIC_FEATURE_{FEATURE_NAME}` where `{FEATURE_NAME}` is the SCREAMING_SNAKE_CASE version of the feature name.
Feature flags are controlled via environment variables using the pattern `FEATURE_{FEATURE_NAME}` where `{FEATURE_NAME}` is the SCREAMING_SNAKE_CASE version of the feature name.
**Examples:**
```bash
# Enable user authentication requirement
NEXT_PUBLIC_FEATURE_REQUIRE_LOGIN=true
FEATURE_REQUIRE_LOGIN=true
# Disable browse functionality
NEXT_PUBLIC_FEATURE_BROWSE=false
FEATURE_BROWSE=false
# Enable Zulip integration
NEXT_PUBLIC_FEATURE_SEND_TO_ZULIP=true
FEATURE_SEND_TO_ZULIP=true
```

39
docker-compose.prod.yml Normal file
View File

@@ -0,0 +1,39 @@
# Production Docker Compose configuration for Frontend
# Usage: docker compose -f docker-compose.prod.yml up -d
services:
web:
build:
context: ./www
dockerfile: Dockerfile
image: reflector-frontend:latest
environment:
- KV_URL=${KV_URL:-redis://redis:6379}
- SITE_URL=${SITE_URL}
- API_URL=${API_URL}
- WEBSOCKET_URL=${WEBSOCKET_URL}
- NEXTAUTH_URL=${NEXTAUTH_URL:-http://localhost:3000}
- NEXTAUTH_SECRET=${NEXTAUTH_SECRET:-changeme-in-production}
- AUTHENTIK_ISSUER=${AUTHENTIK_ISSUER}
- AUTHENTIK_CLIENT_ID=${AUTHENTIK_CLIENT_ID}
- AUTHENTIK_CLIENT_SECRET=${AUTHENTIK_CLIENT_SECRET}
- AUTHENTIK_REFRESH_TOKEN_URL=${AUTHENTIK_REFRESH_TOKEN_URL}
- SENTRY_DSN=${SENTRY_DSN}
- SENTRY_IGNORE_API_RESOLUTION_ERROR=${SENTRY_IGNORE_API_RESOLUTION_ERROR:-1}
depends_on:
- redis
restart: unless-stopped
redis:
image: redis:7.2-alpine
restart: unless-stopped
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
timeout: 3s
retries: 3
volumes:
- redis_data:/data
volumes:
redis_data:

View File

@@ -39,7 +39,7 @@ services:
ports:
- 6379:6379
web:
image: node:18
image: node:22-alpine
ports:
- "3000:3000"
command: sh -c "corepack enable && pnpm install && pnpm dev"
@@ -50,6 +50,8 @@ services:
- /app/node_modules
env_file:
- ./www/.env.local
environment:
- NODE_ENV=development
postgres:
image: postgres:17

View File

@@ -1,369 +0,0 @@
# Jitsi Integration for Reflector
This document contains research and planning notes for integrating Jitsi Meet as a replacement for Whereby in Reflector.
## Overview
Jitsi Meet is an open-source video conferencing solution that can replace Whereby in Reflector, providing:
- Cost reduction (no per-minute charges)
- Direct recording access via Jibri
- Real-time event webhooks
- Full customization and control
## Current Whereby Integration Analysis
### Architecture
1. **Room Creation**: User creates a "room" template in Reflector DB with settings
2. **Meeting Creation**: `/rooms/{room_name}/meeting` endpoint calls Whereby API to create meeting
3. **Recording**: Whereby handles recording automatically to S3 bucket
4. **Webhooks**: Whereby sends events for participant tracking
### Database Structure
```python
# Room = Template/Configuration
class Room:
id, name, user_id
recording_type, recording_trigger # cloud, automatic-2nd-participant
webhook_url, webhook_secret
# Meeting = Actual Whereby Meeting Instance
class Meeting:
id # Whereby meetingId
room_name # Generated by Whereby
room_url, host_room_url # Whereby URLs
num_clients # Updated via webhooks
```
## Jitsi Components
### Core Architecture
- **Jitsi Meet**: Web frontend (Next.js + React)
- **Prosody**: XMPP server for messaging/rooms
- **Jicofo**: Conference focus (orchestration)
- **JVB**: Videobridge (media routing)
- **Jibri**: Recording service
- **Jigasi**: SIP gateway (optional, for phone dial-in)
### Exposure Requirements
- **Web service**: 443/80 (frontend)
- **JVB**: 10000/UDP (media streams) - **MUST EXPOSE**
- **Prosody**: 5280 (BOSH/WebSocket) - can proxy via web
- **Jicofo, Jibri, Jigasi**: Internal only
## Recording with Jibri
### How Jibri Works
- Each Jibri instance handles **one recording at a time**
- Records mixed audio/video to MP4 format
- Uses Chrome headless + ffmpeg for capture
- Supports finalize scripts for post-processing
### Jibri Pool for Scaling
- Multiple Jibri instances join "jibribrewery" MUC
- Jicofo distributes recording requests to available instances
- Automatic load balancing and failover
```yaml
# Multiple Jibri instances
jibri1:
environment:
- JIBRI_INSTANCE_ID=jibri1
- JIBRI_BREWERY_MUC=jibribrewery
jibri2:
environment:
- JIBRI_INSTANCE_ID=jibri2
- JIBRI_BREWERY_MUC=jibribrewery
```
### Recording Automation Options
1. **Environment Variables**: `ENABLE_RECORDING=1`, `AUTO_RECORDING=1`
2. **URL Parameters**: `?config.autoRecord=true`
3. **JWT Token**: Include recording permissions in JWT
4. **API Control**: `api.executeCommand('startRecording')`
### Post-Processing Integration
```bash
#!/bin/bash
# finalize.sh - runs after recording completion
RECORDING_FILE=$1
MEETING_METADATA=$2
ROOM_NAME=$3
# Copy to Reflector-accessible location
cp "$RECORDING_FILE" /shared/reflector-uploads/
# Trigger Reflector processing
curl -X POST "http://reflector-api:8000/v1/transcripts/process" \
-H "Content-Type: application/json" \
-d "{
\"file_path\": \"/shared/reflector-uploads/$(basename $RECORDING_FILE)\",
\"room_name\": \"$ROOM_NAME\",
\"source\": \"jitsi\"
}"
```
## React Integration
### Official React SDK
```bash
npm i @jitsi/react-sdk
```
```jsx
import { JitsiMeeting } from '@jitsi/react-sdk'
<JitsiMeeting
room="meeting-room"
serverURL="https://your-jitsi.domain"
jwt="your-jwt-token"
config={{
startWithAudioMuted: true,
fileRecordingsEnabled: true,
autoRecord: true
}}
onParticipantJoined={(participant) => {
// Track participant events
}}
onRecordingStatusChanged={(status) => {
// Handle recording events
}}
/>
```
## Authentication & Room Control
### JWT-Based Access Control
```python
def generate_jitsi_jwt(payload):
return jwt.encode({
"aud": "jitsi",
"iss": "reflector",
"sub": "reflector-user",
"room": payload["room"],
"exp": int(payload["exp"].timestamp()),
"context": {
"user": {
"name": payload["user_name"],
"moderator": payload.get("moderator", False)
},
"features": {
"recording": payload.get("recording", True)
}
}
}, JITSI_JWT_SECRET)
```
### Prevent Anonymous Room Creation
```bash
# Environment configuration
ENABLE_AUTH=1
ENABLE_GUESTS=0
AUTH_TYPE=jwt
JWT_APP_ID=reflector
JWT_APP_SECRET=your-secret-key
```
## Webhook Integration
### Real-time Events via Prosody
Custom event-sync module can send webhooks for:
- Participant join/leave
- Recording start/stop
- Room creation/destruction
- Mute/unmute events
```lua
-- mod_event_sync.lua
module:hook("muc-occupant-joined", function(event)
send_event({
type = "participant_joined",
room = event.room.jid,
participant = {
nick = event.occupant.nick,
jid = event.occupant.jid,
},
timestamp = os.time(),
});
end);
```
### Jibri Recording Webhooks
```bash
# Environment variable
JIBRI_WEBHOOK_SUBSCRIBERS=https://your-reflector.com/webhooks/jibri
```
## Proposed Reflector Integration
### Modified Database Schema
```python
class Meeting(BaseModel):
id: str # Our generated meeting ID
room_name: str # Generated: reflector-{room.name}-{timestamp}
room_url: str # https://jitsi.domain/room_name?jwt=token
host_room_url: str # Same but with moderator JWT
# Add Jitsi-specific fields
jitsi_jwt: str # JWT token
jitsi_room_id: str # Internal room identifier
recording_status: str # pending, recording, completed
recording_file_path: Optional[str]
```
### API Replacement
```python
# Replace whereby.py with jitsi.py
async def create_meeting(room_name_prefix: str, end_date: datetime, room: Room):
# Generate unique room name
jitsi_room = f"reflector-{room.name}-{int(time.time())}"
# Generate JWT tokens
user_jwt = generate_jwt(room=jitsi_room, moderator=False, exp=end_date)
host_jwt = generate_jwt(room=jitsi_room, moderator=True, exp=end_date)
return {
"meetingId": generate_uuid4(), # Our ID
"roomName": jitsi_room,
"roomUrl": f"https://jitsi.domain/{jitsi_room}?jwt={user_jwt}",
"hostRoomUrl": f"https://jitsi.domain/{jitsi_room}?jwt={host_jwt}",
"startDate": datetime.now().isoformat(),
"endDate": end_date.isoformat(),
}
```
### Webhook Endpoints
```python
# Replace whereby webhook with jitsi webhooks
@router.post("/jitsi/events")
async def jitsi_events_webhook(event_data: dict):
event_type = event_data.get("event")
room_name = event_data.get("room", "").split("@")[0]
meeting = await Meeting.get_by_room(room_name)
if event_type == "muc-occupant-joined":
# Update participant count
meeting.num_clients += 1
elif event_type == "jibri-recording-on":
meeting.recording_status = "recording"
elif event_type == "jibri-recording-off":
meeting.recording_status = "processing"
await process_meeting_recording.delay(meeting.id)
@router.post("/jibri/recording-complete")
async def recording_complete(data: dict):
# Handle finalize script webhook
room_name = data.get("room_name")
file_path = data.get("file_path")
meeting = await Meeting.get_by_room(room_name)
meeting.recording_file_path = file_path
meeting.recording_status = "completed"
# Start Reflector processing
await process_recording_for_transcription(meeting.id, file_path)
```
## Deployment with Docker
### Official docker-jitsi-meet
```bash
# Download official release
wget $(wget -q -O - https://api.github.com/repos/jitsi/docker-jitsi-meet/releases/latest | grep zip | cut -d\" -f4)
# Setup
mkdir -p ~/.jitsi-meet-cfg/{web,transcripts,prosody/config,prosody/prosody-plugins-custom,jicofo,jvb,jigasi,jibri}
./gen-passwords.sh # Generate secure passwords
docker compose up -d
```
### Coolify Integration
```yaml
services:
web:
ports: ["80:80", "443:443"]
jvb:
ports: ["10000:10000/udp"] # Must expose for media
jibri1:
environment:
- JIBRI_INSTANCE_ID=jibri1
- JIBRI_FINALIZE_RECORDING_SCRIPT_PATH=/config/finalize.sh
jibri2:
environment:
- JIBRI_INSTANCE_ID=jibri2
```
## Benefits vs Whereby
### Cost & Control
**No per-minute charges** - significant cost savings
**Full recording control** - direct file access
**Custom branding** - complete UI control
**Self-hosted** - no vendor lock-in
### Technical Advantages
**Real-time events** - immediate webhook notifications
**Rich participant metadata** - detailed tracking
**JWT security** - token-based access with expiration
**Multiple recording formats** - audio-only options
**Scalable architecture** - horizontal Jibri scaling
### Integration Benefits
**Same API surface** - minimal changes to existing code
**React SDK** - better frontend integration
**Direct processing** - no S3 download delays
**Event-driven architecture** - better real-time capabilities
## Implementation Plan
1. **Deploy Jitsi Stack** - Set up docker-jitsi-meet with multiple Jibri instances
2. **Create jitsi.py** - Replace whereby.py with Jitsi API functions
3. **Update Database** - Add Jitsi-specific fields to Meeting model
4. **Webhook Integration** - Replace Whereby webhooks with Jitsi events
5. **Frontend Updates** - Replace Whereby embed with Jitsi React SDK
6. **Testing & Migration** - Gradual rollout with fallback to Whereby
## Recording Limitations & Considerations
### Current Limitations
- **Mixed audio only** - Jibri doesn't separate participant tracks natively
- **One recording per Jibri** - requires multiple instances for concurrent recordings
- **Chrome dependency** - Jibri uses headless Chrome for recording
### Metadata Capabilities
**Participant join/leave timestamps** - via webhooks
**Speaking time tracking** - via audio level events
**Meeting duration** - precise timing
**Room-specific data** - custom metadata in JWT
### Alternative Recording Methods
- **Local recording** - browser-based, per-participant
- **Custom recording** - lib-jitsi-meet for individual streams
- **Third-party solutions** - Recall.ai, Otter.ai integrations
## Security Considerations
### JWT Configuration
- **Room-specific tokens** - limit access to specific rooms
- **Time-based expiration** - automatic cleanup
- **Feature permissions** - control recording, moderation rights
- **User identification** - embed user metadata in tokens
### Access Control
- **No anonymous rooms** - all rooms require valid JWT
- **API-only creation** - prevent direct room access
- **Webhook verification** - HMAC signature validation
## Next Steps
1. **Deploy test Jitsi instance** - validate recording pipeline
2. **Prototype jitsi.py** - create equivalent API functions
3. **Test webhook integration** - ensure event delivery works
4. **Performance testing** - validate multiple concurrent recordings
5. **Migration strategy** - plan gradual transition from Whereby
---
*This document serves as the comprehensive planning and research notes for Jitsi integration in Reflector. It should be updated as implementation progresses and new insights are discovered.*

View File

@@ -1,720 +0,0 @@
# Jitsi Meet Integration Configuration Guide
This guide explains how to configure Reflector to use your self-hosted Jitsi Meet installation for video meetings, recording, and participant tracking.
## Overview
Jitsi Meet is an open-source video conferencing platform that can be self-hosted. Reflector integrates with Jitsi Meet to:
- Create secure meeting rooms with JWT authentication
- Track participant join/leave events via Prosody webhooks
- Record meetings using Jibri recording service
- Process recordings for transcription and analysis
## Requirements
### Self-Hosted Jitsi Meet
You need a complete Jitsi Meet installation including:
1. **Jitsi Meet Web Interface** - The main meeting interface
2. **Prosody XMPP Server** - Handles room management and authentication
3. **Jicofo (JItsi COnference FOcus)** - Manages media sessions
4. **Jitsi Videobridge (JVB)** - Handles WebRTC media routing
5. **Jibri Recording Service** - Records meetings (optional but recommended)
### System Requirements
- **Domain with SSL Certificate** - Required for WebRTC functionality
- **Prosody mod_event_sync** - For webhook event handling
- **JWT Authentication** - For secure room access control
- **Storage Solution** - For recording files (local or cloud)
## Configuration Variables
Add the following environment variables to your Reflector `.env` file:
### Required Variables
```bash
# Jitsi Meet Domain (without https://)
JITSI_DOMAIN=meet.example.com
# JWT Secret for room authentication (generate with: openssl rand -hex 32)
JITSI_JWT_SECRET=your-64-character-hex-secret-here
# Webhook secret for event handling (generate with: openssl rand -hex 16)
JITSI_WEBHOOK_SECRET=your-32-character-hex-secret-here
```
### Optional Variables
```bash
# Application identifier (should match Jitsi configuration)
JITSI_APP_ID=reflector
# JWT issuer and audience (should match Jitsi configuration)
JITSI_JWT_ISSUER=reflector
JITSI_JWT_AUDIENCE=jitsi
```
## Installation Steps
### 1. Jitsi Meet Server Installation
#### Quick Installation (Ubuntu/Debian)
```bash
# Add Jitsi repository
curl -fsSL https://download.jitsi.org/jitsi-key.gpg.key | sudo gpg --dearmor -o /usr/share/keyrings/jitsi-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/jitsi-keyring.gpg] https://download.jitsi.org stable/" | sudo tee /etc/apt/sources.list.d/jitsi-stable.list
# Install Jitsi Meet
sudo apt update
sudo apt install jitsi-meet
# Configure SSL certificate
sudo /usr/share/jitsi-meet/scripts/install-letsencrypt-cert.sh
```
#### Docker Installation
```bash
# Clone Jitsi Docker repository
git clone https://github.com/jitsi/docker-jitsi-meet
cd docker-jitsi-meet
# Copy environment template
cp env.example .env
# Edit configuration
nano .env
# Start services
docker-compose up -d
```
### 2. JWT Authentication Setup
#### Update Prosody Configuration
Edit `/etc/prosody/conf.d/your-domain.cfg.lua`:
```lua
VirtualHost "meet.example.com"
authentication = "token"
app_id = "reflector"
app_secret = "your-jwt-secret-here"
-- Allow anonymous access for guests
c2s_require_encryption = false
admins = { "focusUser@auth.meet.example.com" }
modules_enabled = {
"bosh";
"pubsub";
"ping";
"roster";
"saslauth";
"tls";
"dialback";
"disco";
"carbons";
"pep";
"private";
"blocklist";
"vcard";
"version";
"uptime";
"time";
"ping";
"register";
"admin_adhoc";
"token_verification";
"event_sync"; -- Required for webhooks
}
```
#### Configure Jitsi Meet Interface
Edit `/etc/jitsi/meet/your-domain-config.js`:
```javascript
var config = {
hosts: {
domain: 'meet.example.com',
muc: 'conference.meet.example.com'
},
// Enable JWT authentication
enableUserRolesBasedOnToken: true,
// Recording configuration
fileRecordingsEnabled: true,
liveStreamingEnabled: false,
// Reflector integration settings
prejoinPageEnabled: true,
requireDisplayName: true
};
```
### 3. Webhook Event Configuration
#### Install Event Sync Module
```bash
# Download the module
cd /usr/share/jitsi-meet/prosody-plugins/
wget https://raw.githubusercontent.com/jitsi-contrib/prosody-plugins/main/mod_event_sync.lua
```
#### Configure Event Sync
Add to your Prosody configuration:
```lua
Component "conference.meet.example.com" "muc"
storage = "memory"
modules_enabled = {
"muc_meeting_id";
"muc_domain_mapper";
"polls";
"event_sync"; -- Enable event sync
}
-- Event sync webhook configuration
event_sync_url = "https://your-reflector-domain.com/v1/jitsi/events"
event_sync_secret = "your-webhook-secret-here"
-- Events to track
event_sync_events = {
"muc-occupant-joined",
"muc-occupant-left",
"jibri-recording-on",
"jibri-recording-off"
}
#### Webhook Event Payload Examples
**Participant Joined Event:**
```json
{
"event": "muc-occupant-joined",
"room": "reflector-my-room-uuid123",
"timestamp": "2025-01-15T10:30:00.000Z",
"data": {
"occupant_id": "participant-456",
"nick": "John Doe",
"role": "participant",
"affiliation": "none"
}
}
```
**Recording Started Event:**
```json
{
"event": "jibri-recording-on",
"room": "reflector-my-room-uuid123",
"timestamp": "2025-01-15T10:32:00.000Z",
"data": {
"recording_id": "rec-789",
"initiator": "moderator-123"
}
}
```
**Recording Completed Event:**
```json
{
"room_name": "reflector-my-room-uuid123",
"recording_file": "/var/recordings/rec-789.mp4",
"recording_status": "completed",
"timestamp": "2025-01-15T11:15:00.000Z"
}
```
### 4. Jibri Recording Setup (Optional)
#### Install Jibri
```bash
# Install Jibri package
sudo apt install jibri
# Create recording directory
sudo mkdir -p /var/recordings
sudo chown jibri:jibri /var/recordings
```
#### Configure Jibri
Edit `/etc/jitsi/jibri/jibri.conf`:
```hocon
jibri {
recording {
recordings-directory = "/var/recordings"
finalize-script = "/opt/jitsi/jibri/finalize.sh"
}
api {
xmpp {
environments = [{
name = "prod environment"
xmpp-server-hosts = ["meet.example.com"]
xmpp-domain = "meet.example.com"
control-muc {
domain = "internal.auth.meet.example.com"
room-name = "JibriBrewery"
nickname = "jibri-nickname"
}
control-login {
domain = "auth.meet.example.com"
username = "jibri"
password = "jibri-password"
}
}]
}
}
}
```
#### Create Finalize Script
Create `/opt/jitsi/jibri/finalize.sh`:
```bash
#!/bin/bash
# Jibri finalize script for Reflector integration
RECORDING_FILE="$1"
ROOM_NAME="$2"
REFLECTOR_API_URL="${REFLECTOR_API_URL:-http://localhost:1250}"
# Prepare webhook payload
TIMESTAMP=$(date -u +%Y-%m-%dT%H:%M:%S.%3NZ)
PAYLOAD=$(cat <<EOF
{
"room_name": "$ROOM_NAME",
"recording_file": "$RECORDING_FILE",
"recording_status": "completed",
"timestamp": "$TIMESTAMP"
}
EOF
)
# Generate signature
SIGNATURE=$(echo -n "$PAYLOAD" | openssl dgst -sha256 -hmac "$JITSI_WEBHOOK_SECRET" | cut -d' ' -f2)
# Send webhook to Reflector
curl -X POST "$REFLECTOR_API_URL/v1/jibri/recording-complete" \
-H "Content-Type: application/json" \
-H "X-Jitsi-Signature: $SIGNATURE" \
-d "$PAYLOAD"
echo "Recording finalization webhook sent for room: $ROOM_NAME"
```
Make executable:
```bash
sudo chmod +x /opt/jitsi/jibri/finalize.sh
```
### 5. Restart Services
After configuration changes:
```bash
sudo systemctl restart prosody
sudo systemctl restart jicofo
sudo systemctl restart jitsi-videobridge2
sudo systemctl restart jibri
sudo systemctl restart nginx
```
## Room Configuration
### Creating Jitsi Rooms
Create rooms with Jitsi platform in Reflector:
```bash
curl -X POST "https://your-reflector-domain.com/v1/rooms" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $AUTH_TOKEN" \
-d '{
"name": "my-jitsi-room",
"platform": "jitsi",
"recording_type": "cloud",
"recording_trigger": "automatic-2nd-participant",
"is_locked": false,
"room_mode": "normal"
}'
```
### Meeting Creation
Meetings automatically use JWT authentication:
```bash
curl -X POST "https://your-reflector-domain.com/v1/rooms/my-jitsi-room/meeting" \
-H "Authorization: Bearer $AUTH_TOKEN"
```
Response includes JWT-authenticated URLs:
```json
{
"id": "meeting-uuid",
"room_name": "reflector-my-jitsi-room-123456",
"room_url": "https://meet.example.com/room?jwt=user-token",
"host_room_url": "https://meet.example.com/room?jwt=moderator-token"
}
```
## Features and Capabilities
### JWT Authentication
Reflector automatically generates JWT tokens with:
- **Room Access Control** - Secure room entry
- **User Roles** - Moderator vs participant permissions
- **Expiration** - Configurable token lifetime (default 8 hours)
- **Custom Claims** - Room-specific metadata
### Recording Options
**Recording Types:**
- `"none"` - No recording
- `"local"` - Local Jibri recording
- `"cloud"` - Cloud recording (requires external storage)
**Recording Triggers:**
- `"none"` - Manual recording only
- `"prompt"` - Prompt users to start
- `"automatic"` - Start immediately
- `"automatic-2nd-participant"` - Start when 2nd person joins
### Event Tracking and Storage
Reflector automatically stores all webhook events in the `meetings` table for comprehensive meeting analytics:
**Supported Event Types:**
- `muc-occupant-joined` - Participant joined the meeting
- `muc-occupant-left` - Participant left the meeting
- `jibri-recording-on` - Recording started
- `jibri-recording-off` - Recording stopped
- `recording_completed` - Recording file ready for processing
**Event Storage Structure:**
Each webhook event is stored as a JSON object in the `meetings.events` column:
```json
{
"type": "muc-occupant-joined",
"timestamp": "2025-01-15T10:30:00.123456Z",
"data": {
"timestamp": "2025-01-15T10:30:00Z",
"user_id": "participant-123",
"display_name": "John Doe"
}
}
```
**Querying Stored Events:**
```sql
-- Get all events for a meeting
SELECT events FROM meeting WHERE id = 'meeting-uuid';
-- Count participant joins
SELECT json_array_length(
json_extract(events, '$[*] ? (@.type == "muc-occupant-joined")')
) as total_joins FROM meeting WHERE id = 'meeting-uuid';
```
## Testing and Verification
### Health Check
Test Jitsi webhook integration:
```bash
curl "https://your-reflector-domain.com/v1/jitsi/health"
```
Expected response:
```json
{
"status": "ok",
"service": "jitsi-webhooks",
"timestamp": "2025-01-15T10:30:00.000Z",
"webhook_secret_configured": true
}
```
### JWT Token Testing
Verify JWT generation works:
```bash
# Create a test meeting
MEETING=$(curl -X POST "https://your-reflector-domain.com/v1/rooms/test-room/meeting" \
-H "Authorization: Bearer $AUTH_TOKEN" | jq -r '.room_url')
echo "Test meeting URL: $MEETING"
```
### Webhook Testing
#### Manual Webhook Event Testing
Test participant join event:
```bash
# Generate proper signature
PAYLOAD='{"event":"muc-occupant-joined","room":"reflector-test-room-uuid","timestamp":"2025-01-15T10:30:00.000Z","data":{"user_id":"test-user","display_name":"Test User"}}'
SIGNATURE=$(echo -n "$PAYLOAD" | openssl dgst -sha256 -hmac "$JITSI_WEBHOOK_SECRET" | cut -d' ' -f2)
curl -X POST "https://your-reflector-domain.com/v1/jitsi/events" \
-H "Content-Type: application/json" \
-H "X-Jitsi-Signature: $SIGNATURE" \
-d "$PAYLOAD"
```
Expected response:
```json
{
"status": "ok",
"event": "muc-occupant-joined",
"room": "reflector-test-room-uuid"
}
```
#### Recording Webhook Testing
Test recording completion event:
```bash
PAYLOAD='{"room_name":"reflector-test-room-uuid","recording_file":"/recordings/test.mp4","recording_status":"completed","timestamp":"2025-01-15T10:30:00.000Z"}'
SIGNATURE=$(echo -n "$PAYLOAD" | openssl dgst -sha256 -hmac "$JITSI_WEBHOOK_SECRET" | cut -d' ' -f2)
curl -X POST "https://your-reflector-domain.com/v1/jibri/recording-complete" \
-H "Content-Type: application/json" \
-H "X-Jitsi-Signature: $SIGNATURE" \
-d "$PAYLOAD"
```
#### Event Storage Verification
Verify events were stored:
```bash
# Check meeting events via API (requires authentication)
curl -H "Authorization: Bearer $AUTH_TOKEN" \
"https://your-reflector-domain.com/v1/meetings/{meeting-id}"
```
## Troubleshooting
### Common Issues
#### JWT Authentication Failures
**Symptoms**: Users cannot join rooms, "Authentication failed" errors
**Solutions**:
1. Verify `JITSI_JWT_SECRET` matches Prosody configuration
2. Check JWT token hasn't expired (default 8 hours)
3. Ensure system clocks are synchronized between servers
4. Validate JWT issuer/audience configuration matches
**Debug JWT tokens**:
```bash
# Decode JWT payload
echo "JWT_TOKEN_HERE" | cut -d'.' -f2 | base64 -d | jq
```
#### Webhook Events Not Received
**Symptoms**: Participant counts not updating, no recording events
**Solutions**:
1. Verify `mod_event_sync` is loaded in Prosody
2. Check webhook URL is accessible from Jitsi server
3. Validate webhook signature generation
4. Review Prosody and Reflector logs
**Debug webhook connectivity**:
```bash
# Test from Jitsi server
curl -v "https://your-reflector-domain.com/v1/jitsi/health"
# Check Prosody logs
sudo tail -f /var/log/prosody/prosody.log
```
#### Webhook Signature Verification Issues
**Symptoms**: HTTP 401 "Invalid webhook signature" errors
**Solutions**:
1. Verify webhook secret matches between Jitsi and Reflector
2. Check payload encoding (no extra whitespace)
3. Ensure proper HMAC-SHA256 signature generation
**Debug signature generation**:
```bash
# Test signature manually
PAYLOAD='{"event":"test","room":"test","timestamp":"2025-01-15T10:30:00.000Z","data":{}}'
SECRET="your-webhook-secret-here"
# Generate signature (should match X-Jitsi-Signature header)
echo -n "$PAYLOAD" | openssl dgst -sha256 -hmac "$SECRET" | cut -d' ' -f2
# Test with curl
curl -X POST "https://your-reflector-domain.com/v1/jitsi/events" \
-H "Content-Type: application/json" \
-H "X-Jitsi-Signature: $(echo -n "$PAYLOAD" | openssl dgst -sha256 -hmac "$SECRET" | cut -d' ' -f2)" \
-d "$PAYLOAD" -v
```
#### Event Storage Problems
**Symptoms**: Events received but not stored in database
**Solutions**:
1. Check database connectivity and permissions
2. Verify meeting exists before event processing
3. Review Reflector application logs
4. Ensure JSON column support in database
**Debug event storage**:
```bash
# Check meeting exists
curl -H "Authorization: Bearer $TOKEN" \
"https://your-reflector-domain.com/v1/meetings/{meeting-id}"
# Monitor database queries (if using PostgreSQL)
sudo -u postgres psql -c "SELECT * FROM pg_stat_activity WHERE query LIKE '%meeting%';"
# Check Reflector logs for event processing
sudo journalctl -u reflector -f | grep -E "(event|webhook|jitsi)"
```
#### Recording Issues
**Symptoms**: Recordings not starting, finalize script errors
**Solutions**:
1. Verify Jibri service status: `sudo systemctl status jibri`
2. Check recording directory permissions: `/var/recordings`
3. Validate finalize script execution permissions
4. Monitor Jibri logs: `sudo journalctl -u jibri -f`
**Test finalize script**:
```bash
sudo -u jibri /opt/jitsi/jibri/finalize.sh "/test/recording.mp4" "test-room"
```
#### Meeting Creation Failures
**Symptoms**: HTTP 500 errors when creating meetings
**Solutions**:
1. Check Reflector logs for JWT generation errors
2. Verify all required environment variables are set
3. Ensure Jitsi domain is accessible from Reflector
4. Test JWT secret configuration
### Debug Commands
```bash
# Verify Prosody configuration
sudo prosodyctl check config
# Check Jitsi services status
sudo systemctl status prosody jicofo jitsi-videobridge2
# Test JWT generation
curl -X POST "https://your-reflector-domain.com/v1/rooms/test/meeting" \
-H "Authorization: Bearer $TOKEN" -v
# Monitor webhook events
sudo tail -f /var/log/reflector/app.log | grep jitsi
# Check SSL certificates
sudo certbot certificates
```
### Performance Optimization
#### Scaling Considerations
**Single Server Limits:**
- ~50 concurrent participants per JVB instance
- ~10 concurrent Jibri recordings
- CPU and bandwidth become bottlenecks
**Multi-Server Setup:**
- Multiple JVB instances for scaling
- Dedicated Jibri recording servers
- Load balancing for high availability
#### Resource Monitoring
```bash
# Monitor JVB performance
sudo systemctl status jitsi-videobridge2
sudo journalctl -u jitsi-videobridge2 -f
# Check Prosody connections
sudo prosodyctl mod_admin_telnet
> c2s:show()
> muc:rooms()
```
## Security Best Practices
### JWT Security
- Use strong, unique secrets (32+ characters)
- Rotate JWT secrets regularly
- Implement proper token expiration
- Never log or expose JWT tokens
### Network Security
- Use HTTPS/WSS for all communications
- Implement proper firewall rules
- Consider VPN for server-to-server communication
- Monitor for unauthorized access attempts
### Recording Security
- Encrypt recordings at rest
- Implement access controls for recording files
- Regular security audits of file permissions
- Comply with data protection regulations
## Migration from Whereby
If migrating from Whereby to Jitsi:
1. **Parallel Setup** - Configure Jitsi alongside existing Whereby
2. **Room Migration** - Update room platform field to "jitsi"
3. **Test Integration** - Verify meeting creation and webhooks
4. **User Training** - Different UI and feature set
5. **Monitor Performance** - Watch for issues during transition
6. **Cleanup** - Remove Whereby configuration when stable
## Support and Resources
### Jitsi Community Resources
- **Documentation**: [jitsi.github.io/handbook](https://jitsi.github.io/handbook/)
- **Community Forum**: [community.jitsi.org](https://community.jitsi.org/)
- **GitHub Issues**: [github.com/jitsi/jitsi-meet](https://github.com/jitsi/jitsi-meet)
### Professional Support
- **8x8 Commercial Support** - Professional Jitsi hosting and support
- **Community Consulting** - Third-party Jitsi implementation services
### Monitoring and Maintenance
- Monitor system resources (CPU, memory, bandwidth)
- Regular security updates for all components
- Backup configuration files and certificates
- Test disaster recovery procedures

View File

@@ -1,276 +0,0 @@
# Whereby Integration Configuration Guide
This guide explains how to configure Reflector to use Whereby as your video meeting platform for room creation, recording, and participant tracking.
## Overview
Whereby is a browser-based video meeting platform that provides hosted meeting rooms with recording capabilities. Reflector integrates with Whereby's API to:
- Create secure meeting rooms with custom branding
- Handle participant join/leave events via webhooks
- Automatically record meetings to AWS S3 storage
- Track meeting sessions and participant counts
## Requirements
### Whereby Account Setup
1. **Whereby Account**: Sign up for a Whereby business account at [whereby.com](https://whereby.com/business)
2. **API Access**: Request API access from Whereby support (required for programmatic room creation)
3. **Webhook Configuration**: Configure webhooks in your Whereby dashboard to point to your Reflector instance
### AWS S3 Storage
Whereby requires AWS S3 for recording storage. You need:
- AWS account with S3 access
- Dedicated S3 bucket for Whereby recordings
- AWS IAM credentials with S3 write permissions
## Configuration Variables
Add the following environment variables to your Reflector `.env` file:
### Required Variables
```bash
# Whereby API Configuration
WHEREBY_API_KEY=your-whereby-jwt-api-key
WHEREBY_WEBHOOK_SECRET=your-webhook-secret-from-whereby
# AWS S3 Storage for Recordings
AWS_WHEREBY_ACCESS_KEY_ID=your-aws-access-key
AWS_WHEREBY_ACCESS_KEY_SECRET=your-aws-secret-key
RECORDING_STORAGE_AWS_BUCKET_NAME=your-s3-bucket-name
```
### Optional Variables
```bash
# Whereby API URL (defaults to production)
WHEREBY_API_URL=https://api.whereby.dev/v1
# SQS Configuration (for recording processing)
AWS_PROCESS_RECORDING_QUEUE_URL=https://sqs.region.amazonaws.com/account/queue
SQS_POLLING_TIMEOUT_SECONDS=60
```
## Configuration Steps
### 1. Whereby API Key Setup
1. **Contact Whereby Support** to request API access for your account
2. **Generate JWT Token** in your Whereby dashboard under API settings
3. **Copy the JWT token** and set it as `WHEREBY_API_KEY` in your environment
The API key is a JWT token that looks like:
```
eyJ[...truncated JWT token...]
```
### 2. Webhook Configuration
1. **Access Whereby Dashboard** and navigate to webhook settings
2. **Set Webhook URL** to your Reflector instance:
```
https://your-reflector-domain.com/v1/whereby
```
3. **Configure Events** to send the following event types:
- `room.client.joined` - When participants join
- `room.client.left` - When participants leave
4. **Generate Webhook Secret** and set it as `WHEREBY_WEBHOOK_SECRET`
5. **Save Configuration** in your Whereby dashboard
### 3. AWS S3 Storage Setup
1. **Create S3 Bucket** dedicated for Whereby recordings
2. **Create IAM User** with programmatic access
3. **Attach S3 Policy** with the following permissions:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:PutObjectAcl",
"s3:GetObject"
],
"Resource": "arn:aws:s3:::your-bucket-name/*"
}
]
}
```
4. **Configure Environment Variables** with the IAM credentials
### 4. Room Configuration
When creating rooms in Reflector, set the platform to use Whereby:
```bash
curl -X POST "https://your-reflector-domain.com/v1/rooms" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $AUTH_TOKEN" \
-d '{
"name": "my-whereby-room",
"platform": "whereby",
"recording_type": "cloud",
"recording_trigger": "automatic-2nd-participant",
"is_locked": false,
"room_mode": "normal"
}'
```
## Meeting Features
### Recording Options
Whereby supports three recording types:
- **`none`**: No recording
- **`local`**: Local recording (not recommended for production)
- **`cloud`**: Cloud recording to S3 (recommended)
### Recording Triggers
Control when recordings start:
- **`none`**: No automatic recording
- **`prompt`**: Prompt users to start recording
- **`automatic`**: Start immediately when meeting begins
- **`automatic-2nd-participant`**: Start when second participant joins
### Room Modes
- **`normal`**: Standard meeting room
- **`group`**: Group meeting with advanced features
## Webhook Event Handling
Reflector automatically handles these Whereby webhook events:
### Participant Tracking
```json
{
"type": "room.client.joined",
"data": {
"meetingId": "room-uuid",
"numClients": 2
}
}
```
### Recording Events
Whereby sends recording completion events that trigger Reflector's processing pipeline:
- Audio transcription
- Speaker diarization
- Summary generation
## Troubleshooting
### Common Issues
#### API Authentication Errors
**Symptoms**: 401 Unauthorized errors when creating meetings
**Solutions**:
1. Verify your `WHEREBY_API_KEY` is correct and not expired
2. Ensure you have API access enabled on your Whereby account
3. Contact Whereby support if API access is not available
#### Webhook Signature Validation Failed
**Symptoms**: Webhook events rejected with 401 errors
**Solutions**:
1. Verify `WHEREBY_WEBHOOK_SECRET` matches your Whereby dashboard configuration
2. Check webhook URL is correctly configured in Whereby dashboard
3. Ensure webhook endpoint is accessible from Whereby servers
#### Recording Upload Failures
**Symptoms**: Recordings not appearing in S3 bucket
**Solutions**:
1. Verify AWS credentials have S3 write permissions
2. Check S3 bucket name is correct and accessible
3. Ensure AWS region settings match your bucket location
4. Review AWS CloudTrail logs for permission issues
#### Participant Count Not Updating
**Symptoms**: Meeting participant counts remain at 0
**Solutions**:
1. Verify webhook events are being received at `/v1/whereby`
2. Check webhook signature validation is passing
3. Ensure meeting IDs match between Whereby and Reflector database
### Debug Commands
```bash
# Test Whereby API connectivity
curl -H "Authorization: Bearer $WHEREBY_API_KEY" \
https://api.whereby.dev/v1/meetings
# Check webhook endpoint health
curl https://your-reflector-domain.com/v1/whereby/health
# Verify S3 bucket access
aws s3 ls s3://your-bucket-name --profile whereby-user
```
## Security Considerations
### API Key Security
- Store API keys securely using environment variables
- Rotate API keys regularly
- Never commit API keys to version control
- Use separate keys for development and production
### Webhook Security
- Always validate webhook signatures using HMAC-SHA256
- Use HTTPS for all webhook endpoints
- Implement rate limiting on webhook endpoints
- Monitor webhook events for suspicious activity
### Recording Privacy
- Ensure S3 bucket access is restricted to authorized users
- Consider encryption at rest for sensitive recordings
- Implement retention policies for recorded content
- Comply with data protection regulations (GDPR, etc.)
## Performance Optimization
### Meeting Scaling
- Monitor concurrent meeting limits on your Whereby plan
- Implement meeting cleanup for expired sessions
- Use appropriate room modes for different use cases
### Recording Processing
- Configure SQS for asynchronous recording processing
- Monitor S3 storage usage and costs
- Implement automatic cleanup of processed recordings
### Webhook Reliability
- Implement webhook retry mechanisms
- Monitor webhook delivery success rates
- Log webhook events for debugging and auditing
## Migration from Other Platforms
If migrating from another video platform:
1. **Update Room Configuration**: Change existing rooms to use `"platform": "whereby"`
2. **Configure Webhooks**: Set up Whereby webhook endpoints
3. **Test Integration**: Verify meeting creation and event handling
4. **Monitor Performance**: Watch for any issues during transition
5. **Update Documentation**: Inform users of any workflow changes
## Support
For Whereby-specific issues:
- **Whereby Support**: [whereby.com/support](https://whereby.com/support)
- **API Documentation**: [whereby.dev](https://whereby.dev)
- **Status Page**: [status.whereby.com](https://status.whereby.com)
For Reflector integration issues:
- Check application logs for error details
- Verify environment variable configuration
- Test webhook connectivity and authentication
- Review AWS permissions and S3 access

View File

@@ -1,474 +0,0 @@
# Video Platforms Architecture (PR #529 Analysis)
This document analyzes the video platforms refactoring implemented in PR #529 for daily.co integration, providing a blueprint for extending support to Jitsi and other video conferencing platforms.
## Overview
The video platforms refactoring introduces a clean abstraction layer that allows Reflector to support multiple video conferencing providers (Whereby, Daily.co, etc.) without changing core application logic. This architecture enables:
- Seamless switching between video platforms
- Platform-specific feature support
- Isolated platform code organization
- Consistent API surface across platforms
- Feature flags for gradual migration
## Architecture Components
### 1. **Directory Structure**
```
server/reflector/video_platforms/
├── __init__.py # Public API exports
├── base.py # Abstract base classes
├── factory.py # Platform client factory
├── registry.py # Platform registration system
├── whereby.py # Whereby implementation
├── daily.py # Daily.co implementation
└── mock.py # Testing implementation
```
### 2. **Core Abstract Classes**
#### `VideoPlatformClient` (base.py)
Abstract base class defining the interface all platforms must implement:
```python
class VideoPlatformClient(ABC):
PLATFORM_NAME: str = ""
@abstractmethod
async def create_meeting(self, room_name_prefix: str, end_date: datetime, room: Room) -> MeetingData
@abstractmethod
async def get_room_sessions(self, room_name: str) -> Dict[str, Any]
@abstractmethod
async def delete_room(self, room_name: str) -> bool
@abstractmethod
async def upload_logo(self, room_name: str, logo_path: str) -> bool
@abstractmethod
def verify_webhook_signature(self, body: bytes, signature: str, timestamp: Optional[str] = None) -> bool
```
#### `MeetingData` (base.py)
Standardized meeting data structure returned by all platforms:
```python
class MeetingData(BaseModel):
meeting_id: str
room_name: str
room_url: str
host_room_url: str
platform: str
extra_data: Dict[str, Any] = {} # Platform-specific data
```
#### `VideoPlatformConfig` (base.py)
Unified configuration structure for all platforms:
```python
class VideoPlatformConfig(BaseModel):
api_key: str
webhook_secret: str
api_url: Optional[str] = None
subdomain: Optional[str] = None
s3_bucket: Optional[str] = None
s3_region: Optional[str] = None
aws_role_arn: Optional[str] = None
aws_access_key_id: Optional[str] = None
aws_access_key_secret: Optional[str] = None
```
### 3. **Platform Registration System**
#### Registry Pattern (registry.py)
- Automatic registration of built-in platforms
- Runtime platform discovery
- Type-safe client instantiation
```python
# Auto-registration of platforms
_PLATFORMS: Dict[str, Type[VideoPlatformClient]] = {}
def register_platform(name: str, client_class: Type[VideoPlatformClient])
def get_platform_client(platform: str, config: VideoPlatformConfig) -> VideoPlatformClient
```
#### Factory System (factory.py)
- Configuration management per platform
- Platform selection logic
- Feature flag integration
```python
def get_platform_for_room(room_id: Optional[str] = None) -> str:
"""Determine which platform to use based on feature flags."""
if not settings.DAILY_MIGRATION_ENABLED:
return "whereby"
if room_id and room_id in settings.DAILY_MIGRATION_ROOM_IDS:
return "daily"
return settings.DEFAULT_VIDEO_PLATFORM
```
### 4. **Database Schema Changes**
#### Room Model Updates
Added `platform` field to track which video platform each room uses:
```python
# Database Schema
platform_column = sqlalchemy.Column(
"platform",
sqlalchemy.String,
nullable=False,
server_default="whereby"
)
# Pydantic Model
class Room(BaseModel):
platform: Literal["whereby", "daily"] = "whereby"
```
#### Meeting Model Updates
Added `platform` field to meetings for tracking and debugging:
```python
# Database Schema
platform_column = sqlalchemy.Column(
"platform",
sqlalchemy.String,
nullable=False,
server_default="whereby"
)
# Pydantic Model
class Meeting(BaseModel):
platform: Literal["whereby", "daily"] = "whereby"
```
**Key Decision**: No platform-specific fields were added to models. Instead, the `extra_data` field in `MeetingData` handles platform-specific information, following the user's rule of using generic `provider_data` as JSON if needed.
### 5. **Settings Configuration**
#### Feature Flags
```python
# Migration control
DAILY_MIGRATION_ENABLED: bool = True
DAILY_MIGRATION_ROOM_IDS: list[str] = []
DEFAULT_VIDEO_PLATFORM: str = "daily"
# Daily.co specific settings
DAILY_API_KEY: str | None = None
DAILY_WEBHOOK_SECRET: str | None = None
DAILY_SUBDOMAIN: str | None = None
AWS_DAILY_S3_BUCKET: str | None = None
AWS_DAILY_S3_REGION: str = "us-west-2"
AWS_DAILY_ROLE_ARN: str | None = None
```
#### Configuration Pattern
Each platform gets its own configuration namespace while sharing common patterns:
```python
def get_platform_config(platform: str) -> VideoPlatformConfig:
if platform == "whereby":
return VideoPlatformConfig(
api_key=settings.WHEREBY_API_KEY or "",
webhook_secret=settings.WHEREBY_WEBHOOK_SECRET or "",
# ... whereby-specific config
)
elif platform == "daily":
return VideoPlatformConfig(
api_key=settings.DAILY_API_KEY or "",
webhook_secret=settings.DAILY_WEBHOOK_SECRET or "",
# ... daily-specific config
)
```
### 6. **API Integration Updates**
#### Room Creation (views/rooms.py)
Updated to use platform factory instead of direct Whereby calls:
```python
@router.post("/rooms/{room_name}/meeting")
async def rooms_create_meeting(room_name: str, user: UserInfo):
# OLD: Direct Whereby integration
# whereby_meeting = await create_meeting("", end_date=end_date, room=room)
# NEW: Platform abstraction
platform = get_platform_for_room(room.id)
client = create_platform_client(platform)
meeting_data = await client.create_meeting(
room_name_prefix=room.name, end_date=end_date, room=room
)
await client.upload_logo(meeting_data.room_name, "./images/logo.png")
```
### 7. **Webhook Handling**
#### Separate Webhook Endpoints
Each platform gets its own webhook endpoint with platform-specific signature verification:
```python
# views/daily.py
@router.post("/daily_webhook")
async def daily_webhook(event: DailyWebhookEvent, request: Request):
# Verify Daily.co signature
body = await request.body()
signature = request.headers.get("X-Daily-Signature", "")
if not verify_daily_webhook_signature(body, signature):
raise HTTPException(status_code=401)
# Handle platform-specific events
if event.type == "participant.joined":
await _handle_participant_joined(event)
```
#### Consistent Event Handling
Despite different event formats, the core business logic remains the same:
```python
async def _handle_participant_joined(event):
room_name = event.data.get("room", {}).get("name") # Daily.co format
meeting = await meetings_controller.get_by_room_name(room_name)
if meeting:
current_count = getattr(meeting, "num_clients", 0)
await meetings_controller.update_meeting(
meeting.id, num_clients=current_count + 1
)
```
### 8. **Worker Task Integration**
#### New Task for Daily.co Recording Processing
Added platform-specific recording processing while maintaining the same pipeline:
```python
@shared_task
@asynctask
async def process_recording_from_url(recording_url: str, meeting_id: str, recording_id: str):
"""Process recording from Direct URL (Daily.co webhook)."""
logger.info("Processing recording from URL for meeting: %s", meeting_id)
# Uses same processing pipeline as Whereby S3 recordings
```
**Key Decision**: Worker tasks remain in main worker module but could be moved to platform-specific folders as suggested by the user.
### 9. **Testing Infrastructure**
#### Comprehensive Test Suite
- Unit tests for each platform client
- Integration tests for platform switching
- Mock platform for testing without external dependencies
- Webhook signature verification tests
```python
class TestPlatformIntegration:
"""Integration tests for platform switching."""
async def test_platform_switching_preserves_interface(self):
"""Test that different platforms provide consistent interface."""
# Test both Mock and Daily platforms return MeetingData objects
# with consistent fields
```
## Implementation Patterns for Jitsi Integration
Based on the daily.co implementation, here's how Jitsi should be integrated:
### 1. **Jitsi Client Implementation**
```python
# video_platforms/jitsi.py
class JitsiClient(VideoPlatformClient):
PLATFORM_NAME = "jitsi"
async def create_meeting(self, room_name_prefix: str, end_date: datetime, room: Room) -> MeetingData:
# Generate unique room name
jitsi_room = f"reflector-{room.name}-{int(time.time())}"
# Generate JWT tokens
user_jwt = self._generate_jwt(room=jitsi_room, moderator=False, exp=end_date)
host_jwt = self._generate_jwt(room=jitsi_room, moderator=True, exp=end_date)
return MeetingData(
meeting_id=generate_uuid4(),
room_name=jitsi_room,
room_url=f"https://jitsi.domain/{jitsi_room}?jwt={user_jwt}",
host_room_url=f"https://jitsi.domain/{jitsi_room}?jwt={host_jwt}",
platform=self.PLATFORM_NAME,
extra_data={"user_jwt": user_jwt, "host_jwt": host_jwt}
)
```
### 2. **Settings Integration**
```python
# settings.py
JITSI_DOMAIN: str = "meet.jit.si"
JITSI_JWT_SECRET: str | None = None
JITSI_WEBHOOK_SECRET: str | None = None
JITSI_API_URL: str | None = None # If using Jitsi API
```
### 3. **Factory Registration**
```python
# registry.py
def _register_builtin_platforms():
from .jitsi import JitsiClient
register_platform("jitsi", JitsiClient)
# factory.py
def get_platform_config(platform: str) -> VideoPlatformConfig:
elif platform == "jitsi":
return VideoPlatformConfig(
api_key="", # Jitsi may not need API key
webhook_secret=settings.JITSI_WEBHOOK_SECRET or "",
api_url=settings.JITSI_API_URL,
)
```
### 4. **Webhook Integration**
```python
# views/jitsi.py
@router.post("/jitsi/events")
async def jitsi_events_webhook(event_data: dict):
# Handle Prosody event-sync webhook format
event_type = event_data.get("event")
room_name = event_data.get("room", "").split("@")[0]
if event_type == "muc-occupant-joined":
# Same participant handling logic as other platforms
```
## Key Benefits of This Architecture
### 1. **Isolation and Organization**
- Platform-specific code contained in separate modules
- No platform logic leaking into core application
- Easy to add/remove platforms without affecting others
### 2. **Consistent Interface**
- All platforms implement the same abstract methods
- Standardized `MeetingData` structure
- Uniform error handling and logging
### 3. **Gradual Migration Support**
- Feature flags for controlled rollouts
- Room-specific platform selection
- Fallback mechanisms for platform failures
### 4. **Configuration Management**
- Centralized settings per platform
- Consistent naming patterns
- Environment-based configuration
### 5. **Testing and Quality**
- Mock platform for testing
- Comprehensive test coverage
- Platform-specific test utilities
## Migration Strategy Applied
The daily.co implementation demonstrates a careful migration approach:
### 1. **Backward Compatibility**
- Default platform remains "whereby"
- Existing rooms continue using Whereby unless explicitly migrated
- Same API endpoints and response formats
### 2. **Feature Flag Control**
```python
# Gradual rollout control
DAILY_MIGRATION_ENABLED: bool = True
DAILY_MIGRATION_ROOM_IDS: list[str] = [] # Specific rooms to migrate
DEFAULT_VIDEO_PLATFORM: str = "daily" # New rooms default
```
### 3. **Data Integrity**
- Platform field tracks which service each room/meeting uses
- No data loss during migration
- Platform-specific data preserved in `extra_data`
### 4. **Monitoring and Rollback**
- Comprehensive logging of platform selection
- Easy rollback by changing feature flags
- Platform-specific error tracking
## Recommendations for Jitsi Integration
Based on this analysis and the user's requirements:
### 1. **Follow the Pattern**
- Create `video_platforms/jitsi/` directory with:
- `client.py` - Main JitsiClient implementation
- `tasks.py` - Jitsi-specific worker tasks
- `__init__.py` - Module exports
### 2. **Settings Organization**
- Use `JITSI_*` prefix for all Jitsi settings
- Follow the same configuration pattern as Daily.co
- Support both environment variables and config files
### 3. **Generic Database Fields**
- Avoid platform-specific columns in database
- Use `provider_data` JSON field if platform-specific data needed
- Keep `platform` field as simple string identifier
### 4. **Worker Task Migration**
According to user requirements, migrate platform-specific tasks:
```
video_platforms/
├── whereby/
│ ├── client.py (moved from whereby.py)
│ └── tasks.py (moved from worker/whereby_tasks.py)
├── daily/
│ ├── client.py (moved from daily.py)
│ └── tasks.py (moved from worker/daily_tasks.py)
└── jitsi/
├── client.py (new JitsiClient)
└── tasks.py (new Jitsi recording tasks)
```
### 5. **Webhook Architecture**
- Create `views/jitsi.py` for Jitsi-specific webhooks
- Follow the same signature verification pattern
- Reuse existing participant tracking logic
## Implementation Checklist for Jitsi
- [ ] Create `video_platforms/jitsi/` directory structure
- [ ] Implement `JitsiClient` following the abstract interface
- [ ] Add Jitsi settings to configuration
- [ ] Register Jitsi platform in factory/registry
- [ ] Create Jitsi webhook endpoint
- [ ] Implement JWT token generation for room access
- [ ] Add Jitsi recording processing tasks
- [ ] Create comprehensive test suite
- [ ] Update database migrations for platform field
- [ ] Document Jitsi-specific configuration
## Conclusion
The video platforms refactoring in PR #529 provides an excellent foundation for adding Jitsi support. The architecture is well-designed with clear separation of concerns, consistent interfaces, and excellent extensibility. The daily.co implementation demonstrates how to add a new platform while maintaining backward compatibility and providing gradual migration capabilities.
The pattern should be directly applicable to Jitsi integration, with the main differences being:
- JWT-based authentication instead of API keys
- Different webhook event formats
- Jibri recording pipeline integration
- Self-hosted deployment considerations
This architecture successfully achieves the user's goals of:
1. Settings-based configuration
2. Generic database fields (no provider-specific columns)
3. Platform isolation in separate directories
4. Worker task organization within platform folders

33
gpu/modal_deployments/.gitignore vendored Normal file
View File

@@ -0,0 +1,33 @@
# OS / Editor
.DS_Store
.vscode/
.idea/
# Python
__pycache__/
*.py[cod]
*$py.class
# Logs
*.log
# Env and secrets
.env
.env.*
*.env
*.secret
# Build / dist
build/
dist/
.eggs/
*.egg-info/
# Coverage / test
.pytest_cache/
.coverage*
htmlcov/
# Modal local state (if any)
modal_mounts/
.modal_cache/

View File

@@ -77,7 +77,7 @@ image = (
.pip_install(
"hf_transfer==0.1.9",
"huggingface_hub[hf-xet]==0.31.2",
"nemo_toolkit[asr]==2.3.0",
"nemo_toolkit[asr]==2.5.0",
"cuda-python==12.8.0",
"fastapi==0.115.12",
"numpy<2",

View File

@@ -0,0 +1,2 @@
REFLECTOR_GPU_APIKEY=
HF_TOKEN=

38
gpu/self_hosted/.gitignore vendored Normal file
View File

@@ -0,0 +1,38 @@
cache/
# OS / Editor
.DS_Store
.vscode/
.idea/
# Python
__pycache__/
*.py[cod]
*$py.class
# Env and secrets
.env
*.env
*.secret
HF_TOKEN
REFLECTOR_GPU_APIKEY
# Virtual env / uv
.venv/
venv/
ENV/
uv/
# Build / dist
build/
dist/
.eggs/
*.egg-info/
# Coverage / test
.pytest_cache/
.coverage*
htmlcov/
# Logs
*.log

View File

@@ -0,0 +1,46 @@
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1 \
UV_LINK_MODE=copy \
UV_NO_CACHE=1
WORKDIR /tmp
RUN apt-get update \
&& apt-get install -y \
ffmpeg \
curl \
ca-certificates \
gnupg \
wget \
&& apt-get clean
# Add NVIDIA CUDA repo for Debian 12 (bookworm) and install cuDNN 9 for CUDA 12
ADD https://developer.download.nvidia.com/compute/cuda/repos/debian12/x86_64/cuda-keyring_1.1-1_all.deb /cuda-keyring.deb
RUN dpkg -i /cuda-keyring.deb \
&& rm /cuda-keyring.deb \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
cuda-cudart-12-6 \
libcublas-12-6 \
libcudnn9-cuda-12 \
libcudnn9-dev-cuda-12 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
ADD https://astral.sh/uv/install.sh /uv-installer.sh
RUN sh /uv-installer.sh && rm /uv-installer.sh
ENV PATH="/root/.local/bin/:$PATH"
ENV LD_LIBRARY_PATH="/usr/local/cuda/lib64:/usr/lib/x86_64-linux-gnu:$LD_LIBRARY_PATH"
RUN mkdir -p /app
WORKDIR /app
COPY pyproject.toml uv.lock /app/
COPY ./app /app/app
COPY ./main.py /app/
COPY ./runserver.sh /app/
EXPOSE 8000
CMD ["sh", "/app/runserver.sh"]

73
gpu/self_hosted/README.md Normal file
View File

@@ -0,0 +1,73 @@
# Self-hosted Model API
Run transcription, translation, and diarization services compatible with Reflector's GPU Model API. Works on CPU or GPU.
Environment variables
- REFLECTOR_GPU_APIKEY: Optional Bearer token. If unset, auth is disabled.
- HF_TOKEN: Optional. Required for diarization to download pyannote pipelines
Requirements
- FFmpeg must be installed and on PATH (used for URL-based and segmented transcription)
- Python 3.12+
- NVIDIA GPU optional. If available, it will be used automatically
Local run
Set env vars in self_hosted/.env file
uv sync
uv run uvicorn main:app --host 0.0.0.0 --port 8000
Authentication
- If REFLECTOR_GPU_APIKEY is set, include header: Authorization: Bearer <key>
Endpoints
- POST /v1/audio/transcriptions
- multipart/form-data
- fields: file (single file) OR files[] (multiple files), language, batch (true/false)
- response: single { text, words, filename } or { results: [ ... ] }
- POST /v1/audio/transcriptions-from-url
- application/json
- body: { audio_file_url, language, timestamp_offset }
- response: { text, words }
- POST /translate
- text: query parameter
- body (application/json): { source_language, target_language }
- response: { text: { <src>: original, <tgt>: translated } }
- POST /diarize
- query parameters: audio_file_url, timestamp (optional)
- requires HF_TOKEN to be set (for pyannote)
- response: { diarization: [ { start, end, speaker } ] }
OpenAPI docs
- Visit /docs when the server is running
Docker
- Not yet provided in this directory. A Dockerfile will be added later. For now, use Local run above
Conformance tests
# From this directory
TRANSCRIPT_URL=http://localhost:8000 \
TRANSCRIPT_API_KEY=dev-key \
uv run -m pytest -m model_api --no-cov ../../server/tests/test_model_api_transcript.py
TRANSLATION_URL=http://localhost:8000 \
TRANSLATION_API_KEY=dev-key \
uv run -m pytest -m model_api --no-cov ../../server/tests/test_model_api_translation.py
DIARIZATION_URL=http://localhost:8000 \
DIARIZATION_API_KEY=dev-key \
uv run -m pytest -m model_api --no-cov ../../server/tests/test_model_api_diarization.py

View File

@@ -0,0 +1,19 @@
import os
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def apikey_auth(apikey: str = Depends(oauth2_scheme)):
required_key = os.environ.get("REFLECTOR_GPU_APIKEY")
if not required_key:
return
if apikey == required_key:
return
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)

View File

@@ -0,0 +1,12 @@
from pathlib import Path
SUPPORTED_FILE_EXTENSIONS = ["mp3", "mp4", "mpeg", "mpga", "m4a", "wav", "webm"]
SAMPLE_RATE = 16000
VAD_CONFIG = {
"batch_max_duration": 30.0,
"silence_padding": 0.5,
"window_size": 512,
}
# App-level paths
UPLOADS_PATH = Path("/tmp/whisper-uploads")

View File

@@ -0,0 +1,30 @@
from contextlib import asynccontextmanager
from fastapi import FastAPI
from .routers.diarization import router as diarization_router
from .routers.transcription import router as transcription_router
from .routers.translation import router as translation_router
from .services.transcriber import WhisperService
from .services.diarizer import PyannoteDiarizationService
from .utils import ensure_dirs
@asynccontextmanager
async def lifespan(app: FastAPI):
ensure_dirs()
whisper_service = WhisperService()
whisper_service.load()
app.state.whisper = whisper_service
diarization_service = PyannoteDiarizationService()
diarization_service.load()
app.state.diarizer = diarization_service
yield
def create_app() -> FastAPI:
app = FastAPI(lifespan=lifespan)
app.include_router(transcription_router)
app.include_router(translation_router)
app.include_router(diarization_router)
return app

View File

@@ -0,0 +1,30 @@
from typing import List
from fastapi import APIRouter, Depends, Request
from pydantic import BaseModel
from ..auth import apikey_auth
from ..services.diarizer import PyannoteDiarizationService
from ..utils import download_audio_file
router = APIRouter(tags=["diarization"])
class DiarizationSegment(BaseModel):
start: float
end: float
speaker: int
class DiarizationResponse(BaseModel):
diarization: List[DiarizationSegment]
@router.post(
"/diarize", dependencies=[Depends(apikey_auth)], response_model=DiarizationResponse
)
def diarize(request: Request, audio_file_url: str, timestamp: float = 0.0):
with download_audio_file(audio_file_url) as (file_path, _ext):
file_path = str(file_path)
diarizer: PyannoteDiarizationService = request.app.state.diarizer
return diarizer.diarize_file(file_path, timestamp=timestamp)

View File

@@ -0,0 +1,109 @@
import uuid
from typing import Optional, Union
from fastapi import APIRouter, Body, Depends, Form, HTTPException, Request, UploadFile
from pydantic import BaseModel
from pathlib import Path
from ..auth import apikey_auth
from ..config import SUPPORTED_FILE_EXTENSIONS, UPLOADS_PATH
from ..services.transcriber import MODEL_NAME
from ..utils import cleanup_uploaded_files, download_audio_file
router = APIRouter(prefix="/v1/audio", tags=["transcription"])
class WordTiming(BaseModel):
word: str
start: float
end: float
class TranscriptResult(BaseModel):
text: str
words: list[WordTiming]
filename: Optional[str] = None
class TranscriptBatchResponse(BaseModel):
results: list[TranscriptResult]
@router.post(
"/transcriptions",
dependencies=[Depends(apikey_auth)],
response_model=Union[TranscriptResult, TranscriptBatchResponse],
)
def transcribe(
request: Request,
file: UploadFile = None,
files: list[UploadFile] | None = None,
model: str = Form(MODEL_NAME),
language: str = Form("en"),
batch: bool = Form(False),
):
service = request.app.state.whisper
if not file and not files:
raise HTTPException(
status_code=400, detail="Either 'file' or 'files' parameter is required"
)
if batch and not files:
raise HTTPException(
status_code=400, detail="Batch transcription requires 'files'"
)
upload_files = [file] if file else files
uploaded_paths: list[Path] = []
with cleanup_uploaded_files(uploaded_paths):
for upload_file in upload_files:
audio_suffix = upload_file.filename.split(".")[-1].lower()
if audio_suffix not in SUPPORTED_FILE_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=(
f"Unsupported audio format. Supported extensions: {', '.join(SUPPORTED_FILE_EXTENSIONS)}"
),
)
unique_filename = f"{uuid.uuid4()}.{audio_suffix}"
file_path = UPLOADS_PATH / unique_filename
with open(file_path, "wb") as f:
content = upload_file.file.read()
f.write(content)
uploaded_paths.append(file_path)
if batch and len(upload_files) > 1:
results = []
for path in uploaded_paths:
result = service.transcribe_file(str(path), language=language)
result["filename"] = path.name
results.append(result)
return {"results": results}
results = []
for path in uploaded_paths:
result = service.transcribe_file(str(path), language=language)
result["filename"] = path.name
results.append(result)
return {"results": results} if len(results) > 1 else results[0]
@router.post(
"/transcriptions-from-url",
dependencies=[Depends(apikey_auth)],
response_model=TranscriptResult,
)
def transcribe_from_url(
request: Request,
audio_file_url: str = Body(..., description="URL of the audio file to transcribe"),
model: str = Body(MODEL_NAME),
language: str = Body("en"),
timestamp_offset: float = Body(0.0),
):
service = request.app.state.whisper
with download_audio_file(audio_file_url) as (file_path, _ext):
file_path = str(file_path)
result = service.transcribe_vad_url_segment(
file_path=file_path, timestamp_offset=timestamp_offset, language=language
)
return result

View File

@@ -0,0 +1,28 @@
from typing import Dict
from fastapi import APIRouter, Body, Depends
from pydantic import BaseModel
from ..auth import apikey_auth
from ..services.translator import TextTranslatorService
router = APIRouter(tags=["translation"])
translator = TextTranslatorService()
class TranslationResponse(BaseModel):
text: Dict[str, str]
@router.post(
"/translate",
dependencies=[Depends(apikey_auth)],
response_model=TranslationResponse,
)
def translate(
text: str,
source_language: str = Body("en"),
target_language: str = Body("fr"),
):
return translator.translate(text, source_language, target_language)

View File

@@ -0,0 +1,42 @@
import os
import threading
import torch
import torchaudio
from pyannote.audio import Pipeline
class PyannoteDiarizationService:
def __init__(self):
self._pipeline = None
self._device = "cpu"
self._lock = threading.Lock()
def load(self):
self._device = "cuda" if torch.cuda.is_available() else "cpu"
self._pipeline = Pipeline.from_pretrained(
"pyannote/speaker-diarization-3.1",
use_auth_token=os.environ.get("HF_TOKEN"),
)
self._pipeline.to(torch.device(self._device))
def diarize_file(self, file_path: str, timestamp: float = 0.0) -> dict:
if self._pipeline is None:
self.load()
waveform, sample_rate = torchaudio.load(file_path)
with self._lock:
diarization = self._pipeline(
{"waveform": waveform, "sample_rate": sample_rate}
)
words = []
for diarization_segment, _, speaker in diarization.itertracks(yield_label=True):
words.append(
{
"start": round(timestamp + diarization_segment.start, 3),
"end": round(timestamp + diarization_segment.end, 3),
"speaker": int(speaker[-2:])
if speaker and speaker[-2:].isdigit()
else 0,
}
)
return {"diarization": words}

View File

@@ -0,0 +1,208 @@
import os
import shutil
import subprocess
import threading
from typing import Generator
import faster_whisper
import librosa
import numpy as np
import torch
from fastapi import HTTPException
from silero_vad import VADIterator, load_silero_vad
from ..config import SAMPLE_RATE, VAD_CONFIG
# Whisper configuration (service-local defaults)
MODEL_NAME = "large-v2"
# None delegates compute type to runtime: float16 on CUDA, int8 on CPU
MODEL_COMPUTE_TYPE = None
MODEL_NUM_WORKERS = 1
CACHE_PATH = os.path.join(os.path.expanduser("~"), ".cache", "reflector-whisper")
from ..utils import NoStdStreams
class WhisperService:
def __init__(self):
self.model = None
self.device = "cpu"
self.lock = threading.Lock()
def load(self):
self.device = "cuda" if torch.cuda.is_available() else "cpu"
compute_type = MODEL_COMPUTE_TYPE or (
"float16" if self.device == "cuda" else "int8"
)
self.model = faster_whisper.WhisperModel(
MODEL_NAME,
device=self.device,
compute_type=compute_type,
num_workers=MODEL_NUM_WORKERS,
download_root=CACHE_PATH,
)
def pad_audio(self, audio_array, sample_rate: int = SAMPLE_RATE):
audio_duration = len(audio_array) / sample_rate
if audio_duration < VAD_CONFIG["silence_padding"]:
silence_samples = int(sample_rate * VAD_CONFIG["silence_padding"])
silence = np.zeros(silence_samples, dtype=np.float32)
return np.concatenate([audio_array, silence])
return audio_array
def enforce_word_timing_constraints(self, words: list[dict]) -> list[dict]:
if len(words) <= 1:
return words
enforced: list[dict] = []
for i, word in enumerate(words):
current = dict(word)
if i < len(words) - 1:
next_start = words[i + 1]["start"]
if current["end"] > next_start:
current["end"] = next_start
enforced.append(current)
return enforced
def transcribe_file(self, file_path: str, language: str = "en") -> dict:
input_for_model: str | "object" = file_path
try:
audio_array, _sample_rate = librosa.load(
file_path, sr=SAMPLE_RATE, mono=True
)
if len(audio_array) / float(SAMPLE_RATE) < VAD_CONFIG["silence_padding"]:
input_for_model = self.pad_audio(audio_array, SAMPLE_RATE)
except Exception:
pass
with self.lock:
with NoStdStreams():
segments, _ = self.model.transcribe(
input_for_model,
language=language,
beam_size=5,
word_timestamps=True,
vad_filter=True,
vad_parameters={"min_silence_duration_ms": 500},
)
segments = list(segments)
text = "".join(segment.text for segment in segments).strip()
words = [
{
"word": word.word,
"start": round(float(word.start), 2),
"end": round(float(word.end), 2),
}
for segment in segments
for word in segment.words
]
words = self.enforce_word_timing_constraints(words)
return {"text": text, "words": words}
def transcribe_vad_url_segment(
self, file_path: str, timestamp_offset: float = 0.0, language: str = "en"
) -> dict:
def load_audio_via_ffmpeg(input_path: str, sample_rate: int) -> np.ndarray:
ffmpeg_bin = shutil.which("ffmpeg") or "ffmpeg"
cmd = [
ffmpeg_bin,
"-nostdin",
"-threads",
"1",
"-i",
input_path,
"-f",
"f32le",
"-acodec",
"pcm_f32le",
"-ac",
"1",
"-ar",
str(sample_rate),
"pipe:1",
]
try:
proc = subprocess.run(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=True
)
except Exception as e:
raise HTTPException(status_code=400, detail=f"ffmpeg failed: {e}")
audio = np.frombuffer(proc.stdout, dtype=np.float32)
return audio
def vad_segments(
audio_array,
sample_rate: int = SAMPLE_RATE,
window_size: int = VAD_CONFIG["window_size"],
) -> Generator[tuple[float, float], None, None]:
vad_model = load_silero_vad(onnx=False)
iterator = VADIterator(vad_model, sampling_rate=sample_rate)
start = None
for i in range(0, len(audio_array), window_size):
chunk = audio_array[i : i + window_size]
if len(chunk) < window_size:
chunk = np.pad(
chunk, (0, window_size - len(chunk)), mode="constant"
)
speech = iterator(chunk)
if not speech:
continue
if "start" in speech:
start = speech["start"]
continue
if "end" in speech and start is not None:
end = speech["end"]
yield (start / float(SAMPLE_RATE), end / float(SAMPLE_RATE))
start = None
iterator.reset_states()
audio_array = load_audio_via_ffmpeg(file_path, SAMPLE_RATE)
merged_batches: list[tuple[float, float]] = []
batch_start = None
batch_end = None
max_duration = VAD_CONFIG["batch_max_duration"]
for seg_start, seg_end in vad_segments(audio_array):
if batch_start is None:
batch_start, batch_end = seg_start, seg_end
continue
if seg_end - batch_start <= max_duration:
batch_end = seg_end
else:
merged_batches.append((batch_start, batch_end))
batch_start, batch_end = seg_start, seg_end
if batch_start is not None and batch_end is not None:
merged_batches.append((batch_start, batch_end))
all_text = []
all_words = []
for start_time, end_time in merged_batches:
s_idx = int(start_time * SAMPLE_RATE)
e_idx = int(end_time * SAMPLE_RATE)
segment = audio_array[s_idx:e_idx]
segment = self.pad_audio(segment, SAMPLE_RATE)
with self.lock:
segments, _ = self.model.transcribe(
segment,
language=language,
beam_size=5,
word_timestamps=True,
vad_filter=True,
vad_parameters={"min_silence_duration_ms": 500},
)
segments = list(segments)
text = "".join(seg.text for seg in segments).strip()
words = [
{
"word": w.word,
"start": round(float(w.start) + start_time + timestamp_offset, 2),
"end": round(float(w.end) + start_time + timestamp_offset, 2),
}
for seg in segments
for w in seg.words
]
if text:
all_text.append(text)
all_words.extend(words)
all_words = self.enforce_word_timing_constraints(all_words)
return {"text": " ".join(all_text), "words": all_words}

View File

@@ -0,0 +1,44 @@
import threading
from transformers import MarianMTModel, MarianTokenizer, pipeline
class TextTranslatorService:
"""Simple text-to-text translator using HuggingFace MarianMT models.
This mirrors the modal translator API shape but uses text translation only.
"""
def __init__(self):
self._pipeline = None
self._lock = threading.Lock()
def load(self, source_language: str = "en", target_language: str = "fr"):
# Pick a default MarianMT model pair if available; fall back to Helsinki-NLP en->fr
model_name = self._resolve_model_name(source_language, target_language)
tokenizer = MarianTokenizer.from_pretrained(model_name)
model = MarianMTModel.from_pretrained(model_name)
self._pipeline = pipeline("translation", model=model, tokenizer=tokenizer)
def _resolve_model_name(self, src: str, tgt: str) -> str:
# Minimal mapping; extend as needed
pair = (src.lower(), tgt.lower())
mapping = {
("en", "fr"): "Helsinki-NLP/opus-mt-en-fr",
("fr", "en"): "Helsinki-NLP/opus-mt-fr-en",
("en", "es"): "Helsinki-NLP/opus-mt-en-es",
("es", "en"): "Helsinki-NLP/opus-mt-es-en",
("en", "de"): "Helsinki-NLP/opus-mt-en-de",
("de", "en"): "Helsinki-NLP/opus-mt-de-en",
}
return mapping.get(pair, "Helsinki-NLP/opus-mt-en-fr")
def translate(self, text: str, source_language: str, target_language: str) -> dict:
if self._pipeline is None:
self.load(source_language, target_language)
with self._lock:
results = self._pipeline(
text, src_lang=source_language, tgt_lang=target_language
)
translated = results[0]["translation_text"] if results else ""
return {"text": {source_language: text, target_language: translated}}

View File

@@ -0,0 +1,107 @@
import logging
import os
import sys
import uuid
from contextlib import contextmanager
from typing import Mapping
from urllib.parse import urlparse
from pathlib import Path
import requests
from fastapi import HTTPException
from .config import SUPPORTED_FILE_EXTENSIONS, UPLOADS_PATH
logger = logging.getLogger(__name__)
class NoStdStreams:
def __init__(self):
self.devnull = open(os.devnull, "w")
def __enter__(self):
self._stdout, self._stderr = sys.stdout, sys.stderr
self._stdout.flush()
self._stderr.flush()
sys.stdout, sys.stderr = self.devnull, self.devnull
def __exit__(self, exc_type, exc_value, traceback):
sys.stdout, sys.stderr = self._stdout, self._stderr
self.devnull.close()
def ensure_dirs():
UPLOADS_PATH.mkdir(parents=True, exist_ok=True)
def detect_audio_format(url: str, headers: Mapping[str, str]) -> str:
url_path = urlparse(url).path
for ext in SUPPORTED_FILE_EXTENSIONS:
if url_path.lower().endswith(f".{ext}"):
return ext
content_type = headers.get("content-type", "").lower()
if "audio/mpeg" in content_type or "audio/mp3" in content_type:
return "mp3"
if "audio/wav" in content_type:
return "wav"
if "audio/mp4" in content_type:
return "mp4"
raise HTTPException(
status_code=400,
detail=(
f"Unsupported audio format for URL. Supported extensions: {', '.join(SUPPORTED_FILE_EXTENSIONS)}"
),
)
def download_audio_to_uploads(audio_file_url: str) -> tuple[Path, str]:
response = requests.head(audio_file_url, allow_redirects=True)
if response.status_code == 404:
raise HTTPException(status_code=404, detail="Audio file not found")
response = requests.get(audio_file_url, allow_redirects=True)
response.raise_for_status()
audio_suffix = detect_audio_format(audio_file_url, response.headers)
unique_filename = f"{uuid.uuid4()}.{audio_suffix}"
file_path: Path = UPLOADS_PATH / unique_filename
with open(file_path, "wb") as f:
f.write(response.content)
return file_path, audio_suffix
@contextmanager
def download_audio_file(audio_file_url: str):
"""Download an audio file to UPLOADS_PATH and remove it after use.
Yields (file_path: Path, audio_suffix: str).
"""
file_path, audio_suffix = download_audio_to_uploads(audio_file_url)
try:
yield file_path, audio_suffix
finally:
try:
file_path.unlink(missing_ok=True)
except Exception as e:
logger.error("Error deleting temporary file %s: %s", file_path, e)
@contextmanager
def cleanup_uploaded_files(file_paths: list[Path]):
"""Ensure provided file paths are removed after use.
The provided list can be populated inside the context; all present entries
at exit will be deleted.
"""
try:
yield file_paths
finally:
for path in list(file_paths):
try:
path.unlink(missing_ok=True)
except Exception as e:
logger.error("Error deleting temporary file %s: %s", path, e)

View File

@@ -0,0 +1,10 @@
services:
reflector_gpu:
build:
context: .
ports:
- "8000:8000"
env_file:
- .env
volumes:
- ./cache:/root/.cache

3
gpu/self_hosted/main.py Normal file
View File

@@ -0,0 +1,3 @@
from app.factory import create_app
app = create_app()

View File

@@ -0,0 +1,19 @@
[project]
name = "reflector-gpu"
version = "0.1.0"
description = "Self-hosted GPU service for speech transcription, diarization, and translation via FastAPI."
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"fastapi[standard]>=0.116.1",
"uvicorn[standard]>=0.30.0",
"torch>=2.3.0",
"faster-whisper>=1.1.0",
"librosa==0.10.1",
"numpy<2",
"silero-vad==5.1.0",
"transformers>=4.35.0",
"sentencepiece",
"pyannote.audio==3.1.0",
"torchaudio>=2.3.0",
]

View File

@@ -0,0 +1,17 @@
#!/bin/sh
set -e
export PATH="/root/.local/bin:$PATH"
cd /app
# Install Python dependencies at runtime (first run or when FORCE_SYNC=1)
if [ ! -d "/app/.venv" ] || [ "$FORCE_SYNC" = "1" ]; then
echo "[startup] Installing Python dependencies with uv..."
uv sync --compile-bytecode --locked
else
echo "[startup] Using existing virtual environment at /app/.venv"
fi
exec uv run uvicorn main:app --host 0.0.0.0 --port 8000

3013
gpu/self_hosted/uv.lock generated Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,29 @@
## API Key Management
### Finding Your User ID
```bash
# Get your OAuth sub (user ID) - requires authentication
curl -H "Authorization: Bearer <your_jwt>" http://localhost:1250/v1/me
# Returns: {"sub": "your-oauth-sub-here", "email": "...", ...}
```
### Creating API Keys
```bash
curl -X POST http://localhost:1250/v1/user/api-keys \
-H "Authorization: Bearer <your_jwt>" \
-H "Content-Type: application/json" \
-d '{"name": "My API Key"}'
```
### Using API Keys
```bash
# Use X-API-Key header instead of Authorization
curl -H "X-API-Key: <your_api_key>" http://localhost:1250/v1/transcripts
```
## AWS S3/SQS usage clarification
Whereby.com uploads recordings directly to our S3 bucket when meetings end.

View File

@@ -1,212 +0,0 @@
# Event Logger for Docker-Jitsi-Meet
A Prosody module that logs Jitsi meeting events to JSONL files alongside recordings, enabling complete participant tracking and speaker statistics.
## Prerequisites
- Running docker-jitsi-meet installation
- Jibri configured for recording
## Installation
### Step 1: Copy the Module
Copy the Prosody module to your custom plugins directory:
```bash
# Create the directory if it doesn't exist
mkdir -p ~/.jitsi-meet-cfg/prosody/prosody-plugins-custom
# Copy the module
cp mod_event_logger.lua ~/.jitsi-meet-cfg/prosody/prosody-plugins-custom/
```
### Step 2: Update Your .env File
Add or modify these variables in your `.env` file:
```bash
# If XMPP_MUC_MODULES already exists, append event_logger
# Example: XMPP_MUC_MODULES=existing_module,event_logger
XMPP_MUC_MODULES=event_logger
# Optional: Configure the module (these are defaults)
JIBRI_RECORDINGS_PATH=/config/recordings
JIBRI_LOG_SPEAKER_STATS=true
JIBRI_SPEAKER_STATS_INTERVAL=10
```
**Important**: If you already have `XMPP_MUC_MODULES` defined, add `event_logger` to the comma-separated list:
```bash
# Existing modules + our module
XMPP_MUC_MODULES=mod_info,mod_alert,event_logger
```
### Step 3: Modify docker-compose.yml
Add a shared recordings volume so Prosody can write events alongside Jibri recordings:
```yaml
services:
prosody:
# ... existing configuration ...
volumes:
- ${CONFIG}/prosody/config:/config:Z
- ${CONFIG}/prosody/prosody-plugins-custom:/prosody-plugins-custom:Z
- ${CONFIG}/recordings:/config/recordings:Z # Add this line
environment:
# Add if not using .env file
- XMPP_MUC_MODULES=${XMPP_MUC_MODULES:-event_logger}
- JIBRI_RECORDINGS_PATH=/config/recordings
jibri:
# ... existing configuration ...
volumes:
- ${CONFIG}/jibri:/config:Z
- ${CONFIG}/recordings:/config/recordings:Z # Add this line
environment:
# For Reflector webhook integration (optional)
- REFLECTOR_WEBHOOK_URL=${REFLECTOR_WEBHOOK_URL:-}
- JIBRI_FINALIZE_RECORDING_SCRIPT_PATH=/config/finalize.sh
```
### Step 4: Add Finalize Script (Optional - For Reflector Integration)
If you want to notify Reflector when recordings complete:
```bash
# Copy the finalize script
cp finalize.sh ~/.jitsi-meet-cfg/jibri/finalize.sh
chmod +x ~/.jitsi-meet-cfg/jibri/finalize.sh
# Add to .env
REFLECTOR_WEBHOOK_URL=http://your-reflector-api:8000
```
### Step 5: Restart Services
```bash
docker-compose down
docker-compose up -d
```
## What Gets Created
After a recording, you'll find in `~/.jitsi-meet-cfg/recordings/{session-id}/`:
- `recording.mp4` - The video recording (created by Jibri)
- `metadata.json` - Basic metadata (created by Jibri)
- `events.jsonl` - Complete participant timeline (created by this module)
## Event Format
Each line in `events.jsonl` is a JSON object:
```json
{"type":"room_created","timestamp":1234567890,"room_name":"TestRoom","room_jid":"testroom@conference.meet.jitsi","meeting_url":"https://meet.jitsi/TestRoom"}
{"type":"recording_started","timestamp":1234567891,"room_name":"TestRoom","session_id":"20240115120000_TestRoom","jibri_jid":"jibri@recorder.meet.jitsi"}
{"type":"participant_joined","timestamp":1234567892,"room_name":"TestRoom","participant":{"jid":"user1@meet.jitsi/web","nick":"John Doe","id":"user1@meet.jitsi","is_moderator":false}}
{"type":"speaker_active","timestamp":1234567895,"room_name":"TestRoom","speaker_jid":"user1@meet.jitsi","speaker_nick":"John Doe","duration":10}
{"type":"participant_left","timestamp":1234567920,"room_name":"TestRoom","participant":{"jid":"user1@meet.jitsi/web","nick":"John Doe","duration_seconds":28}}
{"type":"recording_stopped","timestamp":1234567950,"room_name":"TestRoom","session_id":"20240115120000_TestRoom","meeting_url":"https://meet.jitsi/TestRoom"}
```
## Configuration Options
All configuration can be done via environment variables:
| Environment Variable | Default | Description |
|---------------------|---------|-------------|
| `JIBRI_RECORDINGS_PATH` | `/config/recordings` | Path where recordings are stored |
| `JIBRI_LOG_SPEAKER_STATS` | `true` | Enable speaker statistics logging |
| `JIBRI_SPEAKER_STATS_INTERVAL` | `10` | Seconds between speaker stats updates |
## Verifying Installation
Check that the module is loaded:
```bash
docker-compose logs prosody | grep "Event Logger"
# Should see: "Event Logger loaded - writing to /config/recordings"
```
Check for events after a recording:
```bash
ls -la ~/.jitsi-meet-cfg/recordings/*/events.jsonl
cat ~/.jitsi-meet-cfg/recordings/*/events.jsonl | jq .
```
## Troubleshooting
### No events.jsonl file created
1. **Check module is enabled**:
```bash
docker-compose exec prosody grep -r "event_logger" /config
```
2. **Verify volume permissions**:
```bash
docker-compose exec prosody ls -la /config/recordings
```
3. **Check Prosody logs for errors**:
```bash
docker-compose logs prosody | grep -i error
```
### Module not loading
1. **Verify file exists in container**:
```bash
docker-compose exec prosody ls -la /prosody-plugins-custom/
```
2. **Check XMPP_MUC_MODULES format** (must be comma-separated, no spaces):
- ✅ Correct: `XMPP_MUC_MODULES=mod1,mod2,event_logger`
- ❌ Wrong: `XMPP_MUC_MODULES=mod1, mod2, event_logger`
## Common docker-compose.yml Patterns
### Minimal Addition (if you trust defaults)
```yaml
services:
prosody:
volumes:
- ${CONFIG}/recordings:/config/recordings:Z # Just add this
```
### Full Configuration
```yaml
services:
prosody:
volumes:
- ${CONFIG}/prosody/config:/config:Z
- ${CONFIG}/prosody/prosody-plugins-custom:/prosody-plugins-custom:Z
- ${CONFIG}/recordings:/config/recordings:Z
environment:
- XMPP_MUC_MODULES=event_logger
- JIBRI_RECORDINGS_PATH=/config/recordings
- JIBRI_LOG_SPEAKER_STATS=true
- JIBRI_SPEAKER_STATS_INTERVAL=10
jibri:
volumes:
- ${CONFIG}/jibri:/config:Z
- ${CONFIG}/recordings:/config/recordings:Z
environment:
- JIBRI_RECORDING_DIR=/config/recordings
- JIBRI_FINALIZE_RECORDING_SCRIPT_PATH=/config/finalize.sh
```
## Integration with Reflector
The finalize.sh script will automatically notify Reflector when a recording completes if `REFLECTOR_WEBHOOK_URL` is set. Reflector will receive:
```json
{
"session_id": "20240115120000_TestRoom",
"path": "20240115120000_TestRoom",
"meeting_url": "https://meet.jitsi/TestRoom"
}
```
Reflector then processes the recording along with the complete participant timeline from `events.jsonl`.

View File

@@ -1,49 +0,0 @@
#!/bin/bash
# Jibri finalize script to notify Reflector when recording is complete
# This script is called by Jibri with the recording directory as argument
RECORDING_PATH="$1"
SESSION_ID=$(basename "$RECORDING_PATH")
METADATA_FILE="$RECORDING_PATH/metadata.json"
# Extract meeting URL from Jibri's metadata
MEETING_URL=""
if [ -f "$METADATA_FILE" ]; then
MEETING_URL=$(jq -r '.meeting_url' "$METADATA_FILE" 2>/dev/null || echo "")
fi
echo "[$(date)] Recording finalized: $RECORDING_PATH"
echo "[$(date)] Session ID: $SESSION_ID"
echo "[$(date)] Meeting URL: $MEETING_URL"
# Check if events.jsonl was created by our Prosody module
if [ -f "$RECORDING_PATH/events.jsonl" ]; then
EVENT_COUNT=$(wc -l < "$RECORDING_PATH/events.jsonl")
echo "[$(date)] Found events.jsonl with $EVENT_COUNT events"
else
echo "[$(date)] Warning: No events.jsonl found"
fi
# Notify Reflector if webhook URL is configured
if [ -n "$REFLECTOR_WEBHOOK_URL" ]; then
echo "[$(date)] Notifying Reflector at: $REFLECTOR_WEBHOOK_URL"
RESPONSE=$(curl -s -w "\n%{http_code}" -X POST "$REFLECTOR_WEBHOOK_URL/api/v1/jibri/recording-ready" \
-H "Content-Type: application/json" \
-d "{\"session_id\":\"$SESSION_ID\",\"path\":\"$SESSION_ID\",\"meeting_url\":\"$MEETING_URL\"}")
HTTP_CODE=$(echo "$RESPONSE" | tail -n1)
BODY=$(echo "$RESPONSE" | sed '$d')
if [ "$HTTP_CODE" = "200" ]; then
echo "[$(date)] Reflector notified successfully"
echo "[$(date)] Response: $BODY"
else
echo "[$(date)] Failed to notify Reflector. HTTP code: $HTTP_CODE"
echo "[$(date)] Response: $BODY"
fi
else
echo "[$(date)] No REFLECTOR_WEBHOOK_URL configured, skipping notification"
fi
echo "[$(date)] Finalize script completed"

View File

@@ -1,372 +0,0 @@
local json = require "util.json"
local st = require "util.stanza"
local jid_bare = require "util.jid".bare
local recordings_path = os.getenv("JIBRI_RECORDINGS_PATH") or
module:get_option_string("jibri_recordings_path", "/recordings")
-- room_jid -> { session_id, participants = {jid -> info} }
local active_recordings = {}
-- room_jid -> { participants = {jid -> info}, created_at }
local room_states = {}
local function get_timestamp()
return os.time()
end
local function write_event(session_id, event)
if not session_id then
module:log("warn", "No session_id for event: %s", event.type)
return
end
local session_dir = string.format("%s/%s", recordings_path, session_id)
local event_file = string.format("%s/events.jsonl", session_dir)
module:log("info", "Writing event %s to %s", event.type, event_file)
-- Create directory
local mkdir_cmd = string.format("mkdir -p '%s' 2>&1", session_dir)
local mkdir_result = os.execute(mkdir_cmd)
module:log("debug", "mkdir result: %s", tostring(mkdir_result))
local file, err = io.open(event_file, "a")
if file then
local json_str = json.encode(event)
file:write(json_str .. "\n")
file:close()
module:log("info", "Successfully wrote event %s", event.type)
else
module:log("error", "Failed to write event to %s: %s", event_file, err)
end
end
local function extract_participant_info(occupant)
local info = {
jid = occupant.jid,
bare_jid = occupant.bare_jid,
nick = occupant.nick,
display_name = nil,
role = occupant.role
}
local presence = occupant:get_presence()
if presence then
local nick_element = presence:get_child("nick", "http://jabber.org/protocol/nick")
if nick_element then
info.display_name = nick_element:get_text()
end
local identity = presence:get_child("identity")
if identity then
local user = identity:get_child("user")
if user then
local name = user:get_child("name")
if name then
info.display_name = name:get_text()
end
local id_element = user:get_child("id")
if id_element then
info.id = id_element:get_text()
end
end
end
if not info.display_name and occupant.nick then
local _, _, resource = occupant.nick:match("([^@]+)@([^/]+)/(.+)")
if resource then
info.display_name = resource
end
end
end
return info
end
local function get_room_participant_count(room)
local count = 0
for _ in room:each_occupant() do
count = count + 1
end
return count
end
local function snapshot_room_participants(room)
local participants = {}
local total = 0
local skipped = 0
module:log("info", "Snapshotting room participants")
for _, occupant in room:each_occupant() do
total = total + 1
-- Skip recorders (Jibri)
if occupant.bare_jid and (occupant.bare_jid:match("^recorder@") or
occupant.bare_jid:match("^jibri@")) then
skipped = skipped + 1
else
local info = extract_participant_info(occupant)
participants[occupant.jid] = info
module:log("debug", "Added participant: %s", info.display_name or info.bare_jid)
end
end
module:log("info", "Snapshot: %d total, %d participants", total, total - skipped)
return participants
end
-- Import utility functions if available
local util = module:require "util";
local get_room_from_jid = util.get_room_from_jid;
local room_jid_match_rewrite = util.room_jid_match_rewrite;
-- Main IQ handler for Jibri stanzas
module:hook("pre-iq/full", function(event)
local stanza = event.stanza
if stanza.name ~= "iq" then
return
end
local jibri = stanza:get_child('jibri', 'http://jitsi.org/protocol/jibri')
if not jibri then
return
end
module:log("info", "=== Jibri IQ intercepted ===")
local action = jibri.attr.action
local session_id = jibri.attr.session_id
local room_jid = jibri.attr.room
local recording_mode = jibri.attr.recording_mode
local app_data = jibri.attr.app_data
module:log("info", "Jibri %s - session: %s, room: %s, mode: %s",
action or "?", session_id or "?", room_jid or "?", recording_mode or "?")
if not room_jid or not session_id then
module:log("warn", "Missing room_jid or session_id")
return
end
-- Get the room using util function
local room = get_room_from_jid(room_jid_match_rewrite(jid_bare(stanza.attr.to)))
if not room then
-- Try with the room_jid directly
room = get_room_from_jid(room_jid)
end
if not room then
module:log("error", "Room not found for jid: %s", room_jid)
return
end
module:log("info", "Room found: %s", room:get_name() or room_jid)
if action == "start" then
module:log("info", "Recording START for session %s", session_id)
-- Count and snapshot participants
local participant_count = 0
for _ in room:each_occupant() do
participant_count = participant_count + 1
end
local participants = snapshot_room_participants(room)
local participant_list = {}
for jid, info in pairs(participants) do
table.insert(participant_list, info)
end
active_recordings[room_jid] = {
session_id = session_id,
participants = participants,
started_at = get_timestamp()
}
write_event(session_id, {
type = "recording_started",
timestamp = get_timestamp(),
room_jid = room_jid,
room_name = room:get_name(),
session_id = session_id,
recording_mode = recording_mode,
app_data = app_data,
participant_count = participant_count,
participants_at_start = participant_list
})
elseif action == "stop" then
module:log("info", "Recording STOP for session %s", session_id)
local recording = active_recordings[room_jid]
if recording and recording.session_id == session_id then
write_event(session_id, {
type = "recording_stopped",
timestamp = get_timestamp(),
room_jid = room_jid,
room_name = room:get_name(),
session_id = session_id,
duration = get_timestamp() - recording.started_at,
participant_count = get_room_participant_count(room)
})
active_recordings[room_jid] = nil
else
module:log("warn", "No active recording found for room %s", room_jid)
end
end
end);
-- Room and participant event hooks
local function setup_room_hooks(host_module)
module:log("info", "Setting up room hooks on %s", host_module.host or "unknown")
-- Room created
host_module:hook("muc-room-created", function(event)
local room = event.room
local room_jid = room.jid
room_states[room_jid] = {
participants = {},
created_at = get_timestamp()
}
module:log("info", "Room created: %s", room_jid)
end)
-- Room destroyed
host_module:hook("muc-room-destroyed", function(event)
local room = event.room
local room_jid = room.jid
room_states[room_jid] = nil
active_recordings[room_jid] = nil
module:log("info", "Room destroyed: %s", room_jid)
end)
-- Occupant joined
host_module:hook("muc-occupant-joined", function(event)
local room = event.room
local occupant = event.occupant
local room_jid = room.jid
-- Skip recorders
if occupant.bare_jid and (occupant.bare_jid:match("^recorder@") or
occupant.bare_jid:match("^jibri@")) then
return
end
local participant_info = extract_participant_info(occupant)
-- Update room state
if room_states[room_jid] then
room_states[room_jid].participants[occupant.jid] = participant_info
end
-- Log to active recording if exists
local recording = active_recordings[room_jid]
if recording then
recording.participants[occupant.jid] = participant_info
write_event(recording.session_id, {
type = "participant_joined",
timestamp = get_timestamp(),
room_jid = room_jid,
room_name = room:get_name(),
participant = participant_info,
participant_count = get_room_participant_count(room)
})
end
module:log("info", "Participant joined %s: %s (%d total)",
room:get_name() or room_jid,
participant_info.display_name or participant_info.bare_jid,
get_room_participant_count(room))
end)
-- Occupant left
host_module:hook("muc-occupant-left", function(event)
local room = event.room
local occupant = event.occupant
local room_jid = room.jid
-- Skip recorders
if occupant.bare_jid and (occupant.bare_jid:match("^recorder@") or
occupant.bare_jid:match("^jibri@")) then
return
end
local participant_info = extract_participant_info(occupant)
-- Update room state
if room_states[room_jid] then
room_states[room_jid].participants[occupant.jid] = nil
end
-- Log to active recording if exists
local recording = active_recordings[room_jid]
if recording then
if recording.participants[occupant.jid] then
recording.participants[occupant.jid] = nil
end
write_event(recording.session_id, {
type = "participant_left",
timestamp = get_timestamp(),
room_jid = room_jid,
room_name = room:get_name(),
participant = participant_info,
participant_count = get_room_participant_count(room)
})
end
module:log("info", "Participant left %s: %s (%d remaining)",
room:get_name() or room_jid,
participant_info.display_name or participant_info.bare_jid,
get_room_participant_count(room))
end)
end
-- Module initialization
local current_host = module:get_host()
local host_type = module:get_host_type()
module:log("info", "Event Logger loading on %s (type: %s)", current_host, host_type or "unknown")
module:log("info", "Recording path: %s", recordings_path)
-- Setup room hooks based on host type
if host_type == "component" and current_host:match("^[^.]+%.") then
setup_room_hooks(module)
else
-- Try to find and hook to MUC component
local process_host_module = util.process_host_module
local muc_component_host = module:get_option_string("muc_component") or
module:get_option_string("main_muc")
if not muc_component_host then
local possible_hosts = {
"muc." .. current_host,
"conference." .. current_host,
"rooms." .. current_host
}
for _, host in ipairs(possible_hosts) do
if prosody.hosts[host] then
muc_component_host = host
module:log("info", "Auto-detected MUC component: %s", muc_component_host)
break
end
end
end
if muc_component_host then
process_host_module(muc_component_host, function(host_module, host)
module:log("info", "Hooking to MUC events on %s", host)
setup_room_hooks(host_module)
end)
else
module:log("error", "Could not find MUC component")
end
end

View File

@@ -190,5 +190,5 @@ Use the pytest-based conformance tests to validate any new implementation (inclu
```
TRANSCRIPT_URL=https://<your-deployment-base> \
TRANSCRIPT_MODAL_API_KEY=your-api-key \
uv run -m pytest -m gpu_modal --no-cov server/tests/test_gpu_modal_transcript.py
uv run -m pytest -m model_api --no-cov server/tests/test_model_api_transcript.py
```

View File

@@ -1,493 +0,0 @@
# Jitsi Integration Configuration Guide
This guide provides step-by-step instructions for configuring Reflector to work with a self-hosted Jitsi Meet installation for video meetings and recording.
## Prerequisites
Before configuring Jitsi integration, ensure you have:
- **Self-hosted Jitsi Meet installation** (version 2.0.8922 or later recommended)
- **Jibri recording service** configured and running
- **Prosody XMPP server** with mod_event_sync module installed
- **Docker or system deployment** of Reflector with access to environment variables
- **SSL certificates** for secure communication between services
## Environment Configuration
Add the following environment variables to your Reflector deployment:
### Required Settings
```bash
# Jitsi Meet domain (without https://)
JITSI_DOMAIN=meet.example.com
# JWT secret for room authentication (generate with: openssl rand -hex 32)
JITSI_JWT_SECRET=your-64-character-hex-secret-here
# Webhook secret for secure event handling (generate with: openssl rand -hex 16)
JITSI_WEBHOOK_SECRET=your-32-character-hex-secret-here
# Application identifier (should match Jitsi configuration)
JITSI_APP_ID=reflector
# JWT issuer and audience (should match Jitsi configuration)
JITSI_JWT_ISSUER=reflector
JITSI_JWT_AUDIENCE=jitsi
```
### Example .env Configuration
```bash
# Add to your server/.env file
JITSI_DOMAIN=meet.mycompany.com
JITSI_JWT_SECRET=$(openssl rand -hex 32)
JITSI_WEBHOOK_SECRET=$(openssl rand -hex 16)
JITSI_APP_ID=reflector
JITSI_JWT_ISSUER=reflector
JITSI_JWT_AUDIENCE=jitsi
```
## Jitsi Meet Server Configuration
### 1. JWT Authentication Setup
Edit `/etc/prosody/conf.d/[YOUR_DOMAIN].cfg.lua`:
```lua
VirtualHost "meet.example.com"
authentication = "token"
app_id = "reflector"
app_secret = "your-jwt-secret-here"
-- Allow anonymous access for non-authenticated users
c2s_require_encryption = false
admins = { "focusUser@auth.meet.example.com" }
modules_enabled = {
"bosh";
"pubsub";
"ping";
"roster";
"saslauth";
"tls";
"dialback";
"disco";
"carbons";
"pep";
"private";
"blocklist";
"vcard";
"version";
"uptime";
"time";
"ping";
"register";
"admin_adhoc";
"token_verification";
"event_sync"; -- Required for webhook events
}
```
### 2. Room Access Control
Edit `/etc/jitsi/meet/meet.example.com-config.js`:
```javascript
var config = {
hosts: {
domain: 'meet.example.com',
muc: 'conference.meet.example.com'
},
// Enable JWT authentication
enableUserRolesBasedOnToken: true,
// Recording configuration
fileRecordingsEnabled: true,
liveStreamingEnabled: false,
// Reflector-specific settings
prejoinPageEnabled: true,
requireDisplayName: true,
};
```
### 3. Interface Configuration
Edit `/usr/share/jitsi-meet/interface_config.js`:
```javascript
var interfaceConfig = {
// Customize for Reflector branding
APP_NAME: 'Reflector Meeting',
DEFAULT_WELCOME_PAGE_LOGO_URL: 'https://your-domain.com/logo.png',
// Hide unnecessary buttons
TOOLBAR_BUTTONS: [
'microphone', 'camera', 'closedcaptions', 'desktop',
'fullscreen', 'fodeviceselection', 'hangup',
'chat', 'recording', 'livestreaming', 'etherpad',
'sharedvideo', 'settings', 'raisehand', 'videoquality',
'filmstrip', 'invite', 'feedback', 'stats', 'shortcuts',
'tileview', 'videobackgroundblur', 'download', 'help',
'mute-everyone'
]
};
```
## Jibri Configuration
### 1. Recording Service Setup
Edit `/etc/jitsi/jibri/jibri.conf`:
```hocon
jibri {
recording {
recordings-directory = "/var/recordings"
finalize-script = "/opt/jitsi/jibri/finalize.sh"
}
api {
xmpp {
environments = [{
name = "prod environment"
xmpp-server-hosts = ["meet.example.com"]
xmpp-domain = "meet.example.com"
control-muc {
domain = "internal.auth.meet.example.com"
room-name = "JibriBrewery"
nickname = "jibri-nickname"
}
control-login {
domain = "auth.meet.example.com"
username = "jibri"
password = "jibri-password"
}
}]
}
}
}
```
### 2. Finalize Script Setup
Create `/opt/jitsi/jibri/finalize.sh`:
```bash
#!/bin/bash
# Jibri finalize script for Reflector integration
RECORDING_FILE="$1"
ROOM_NAME="$2"
REFLECTOR_API_URL="${REFLECTOR_API_URL:-http://localhost:1250}"
WEBHOOK_SECRET="${JITSI_WEBHOOK_SECRET}"
# Generate webhook signature
generate_signature() {
local payload="$1"
echo -n "$payload" | openssl dgst -sha256 -hmac "$WEBHOOK_SECRET" | cut -d' ' -f2
}
# Prepare webhook payload
TIMESTAMP=$(date -u +%Y-%m-%dT%H:%M:%S.%3NZ)
PAYLOAD=$(cat <<EOF
{
"room_name": "$ROOM_NAME",
"recording_file": "$RECORDING_FILE",
"recording_status": "completed",
"timestamp": "$TIMESTAMP"
}
EOF
)
# Generate signature
SIGNATURE=$(generate_signature "$PAYLOAD")
# Send webhook to Reflector
curl -X POST "$REFLECTOR_API_URL/v1/jibri/recording-complete" \
-H "Content-Type: application/json" \
-H "X-Jitsi-Signature: $SIGNATURE" \
-d "$PAYLOAD" \
--max-time 30
echo "Recording finalization webhook sent for room: $ROOM_NAME"
```
Make the script executable:
```bash
chmod +x /opt/jitsi/jibri/finalize.sh
```
## Prosody Event Configuration
### 1. Event-Sync Module Installation
Install the mod_event_sync module:
```bash
# Download the module
cd /usr/share/jitsi-meet/prosody-plugins/
wget https://raw.githubusercontent.com/jitsi-contrib/prosody-plugins/main/mod_event_sync.lua
# Or if using git
git clone https://github.com/jitsi-contrib/prosody-plugins.git
cp prosody-plugins/mod_event_sync.lua /usr/share/jitsi-meet/prosody-plugins/
```
### 2. Webhook Configuration
Add to `/etc/prosody/conf.d/[YOUR_DOMAIN].cfg.lua`:
```lua
Component "conference.meet.example.com" "muc"
storage = "memory"
modules_enabled = {
"muc_meeting_id";
"muc_domain_mapper";
"polls";
"event_sync"; -- Enable event sync
}
-- Event sync webhook configuration
event_sync_url = "https://your-reflector-domain.com/v1/jitsi/events"
event_sync_secret = "your-webhook-secret-here"
-- Events to track
event_sync_events = {
"muc-occupant-joined",
"muc-occupant-left",
"jibri-recording-on",
"jibri-recording-off"
}
```
### 3. Restart Services
After configuration changes, restart all services:
```bash
systemctl restart prosody
systemctl restart jicofo
systemctl restart jitsi-videobridge2
systemctl restart jibri
systemctl restart nginx
```
## Reflector Room Configuration
### 1. Create Jitsi Room
When creating rooms in Reflector, set the platform field:
```bash
curl -X POST "https://your-reflector-domain.com/v1/rooms" \
-H "Authorization: Bearer $AUTH_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"name": "my-jitsi-room",
"platform": "jitsi",
"recording_type": "cloud",
"recording_trigger": "automatic-2nd-participant",
"is_locked": false,
"room_mode": "normal"
}'
```
### 2. Meeting Creation
Meetings will automatically use Jitsi when the room platform is set to "jitsi":
```bash
curl -X POST "https://your-reflector-domain.com/v1/rooms/my-jitsi-room/meeting" \
-H "Authorization: Bearer $AUTH_TOKEN"
```
## Testing the Integration
### 1. Health Check
Verify Jitsi webhook configuration:
```bash
curl "https://your-reflector-domain.com/v1/jitsi/health"
```
Expected response:
```json
{
"status": "ok",
"service": "jitsi-webhooks",
"timestamp": "2025-01-15T10:30:00.000Z",
"webhook_secret_configured": true
}
```
### 2. Room Creation Test
1. Create a Jitsi room via Reflector API
2. Start a meeting - should generate Jitsi Meet URL with JWT token
3. Join with multiple participants - should trigger participant events
4. Start recording - should trigger Jibri recording workflow
### 3. Webhook Event Test
Monitor Reflector logs for incoming webhook events:
```bash
# Check for participant events
curl -X POST "https://your-reflector-domain.com/v1/jitsi/events" \
-H "Content-Type: application/json" \
-H "X-Jitsi-Signature: test-signature" \
-d '{
"event": "muc-occupant-joined",
"room": "test-room-name",
"timestamp": "2025-01-15T10:30:00.000Z",
"data": {}
}'
```
## Troubleshooting
### Common Issues
#### JWT Authentication Failures
**Symptoms:** Users can't join rooms, "Authentication failed" errors
**Solutions:**
1. Verify JWT secret matches between Jitsi and Reflector
2. Check JWT token expiration (default 8 hours)
3. Ensure system clocks are synchronized
4. Validate JWT issuer/audience configuration
```bash
# Debug JWT tokens
echo "JWT_TOKEN_HERE" | cut -d'.' -f2 | base64 -d | jq
```
#### Webhook Events Not Received
**Symptoms:** Participant counts not updating, recording events missing
**Solutions:**
1. Verify event_sync module is loaded in Prosody
2. Check webhook URL accessibility from Jitsi server
3. Validate webhook signature generation
4. Review Prosody and Reflector logs
```bash
# Test webhook connectivity
curl -v "https://your-reflector-domain.com/v1/jitsi/health"
# Check Prosody logs
tail -f /var/log/prosody/prosody.log
# Check Reflector logs
docker logs your-reflector-container
```
#### Recording Issues
**Symptoms:** Recordings not starting, finalize script errors
**Solutions:**
1. Verify Jibri service status and configuration
2. Check recording directory permissions
3. Validate finalize script execution permissions
4. Monitor Jibri logs for errors
```bash
# Check Jibri status
systemctl status jibri
# Test finalize script
sudo -u jibri /opt/jitsi/jibri/finalize.sh "/test/recording.mp4" "test-room"
# Check Jibri logs
journalctl -u jibri -f
```
### Debug Commands
```bash
# Verify Jitsi configuration
prosodyctl check config
# Test JWT generation
curl -X POST "https://your-reflector-domain.com/v1/rooms/test/meeting" \
-H "Authorization: Bearer $TOKEN" -v
# Monitor webhook events
tail -f /var/log/reflector/app.log | grep jitsi
# Check room participant counts
curl "https://your-reflector-domain.com/v1/rooms" \
-H "Authorization: Bearer $TOKEN" | jq '.data[].num_clients'
```
### Performance Optimization
#### For High-Concurrent Usage
1. **Jitsi Videobridge Tuning:**
```bash
# /etc/jitsi/videobridge/sip-communicator.properties
org.jitsi.videobridge.STATISTICS_INTERVAL=5000
org.jitsi.videobridge.load.INITIAL_STREAM_LIMIT=50
```
2. **Database Connection Pooling:**
```python
# In your Reflector settings
DATABASE_POOL_SIZE=20
DATABASE_MAX_OVERFLOW=30
```
3. **Redis Configuration:**
```bash
# For webhook event caching
REDIS_URL=redis://localhost:6379/1
WEBHOOK_EVENT_TTL=3600
```
## Security Considerations
### Network Security
- Use HTTPS/WSS for all communications
- Implement proper firewall rules
- Consider VPN for server-to-server communication
### Authentication Security
- Rotate JWT secrets regularly
- Use strong webhook secrets (32+ characters)
- Implement rate limiting on webhook endpoints
### Recording Security
- Encrypt recordings at rest
- Implement access controls for recording files
- Regular security audits of file permissions
## Support
For additional support:
1. **Reflector Issues:** Check GitHub issues or create new ones
2. **Jitsi Community:** [Community Forum](https://community.jitsi.org/)
3. **Documentation:** [Jitsi Developer Guide](https://jitsi.github.io/handbook/)
## Migration from Whereby
If migrating from Whereby integration:
1. Update existing rooms to use "jitsi" platform
2. Verify webhook configurations are updated
3. Test recording workflows thoroughly
4. Monitor participant event accuracy
5. Update any custom integrations using meeting APIs
The platform abstraction layer ensures smooth migration with minimal API changes.

View File

@@ -0,0 +1,234 @@
# Reflector Architecture: Whereby + Daily.co Recording Storage
## System Overview
```mermaid
graph TB
subgraph "Actors"
APP[Our App<br/>Reflector]
WHEREBY[Whereby Service<br/>External]
DAILY[Daily.co Service<br/>External]
end
subgraph "AWS S3 Buckets"
TRANSCRIPT_BUCKET[Transcript Bucket<br/>reflector-transcripts<br/>Output: Processed MP3s]
WHEREBY_BUCKET[Whereby Bucket<br/>reflector-whereby-recordings<br/>Input: Raw MP4s]
DAILY_BUCKET[Daily.co Bucket<br/>reflector-dailyco-recordings<br/>Input: Raw WebM tracks]
end
subgraph "AWS Infrastructure"
SQS[SQS Queue<br/>Whereby notifications]
end
subgraph "Database"
DB[(PostgreSQL<br/>Recordings, Transcripts, Meetings)]
end
APP -->|Write processed| TRANSCRIPT_BUCKET
APP -->|Read/Delete| WHEREBY_BUCKET
APP -->|Read/Delete| DAILY_BUCKET
APP -->|Poll| SQS
APP -->|Store metadata| DB
WHEREBY -->|Write recordings| WHEREBY_BUCKET
WHEREBY_BUCKET -->|S3 Event| SQS
WHEREBY -->|Participant webhooks<br/>room.client.joined/left| APP
DAILY -->|Write recordings| DAILY_BUCKET
DAILY -->|Recording webhook<br/>recording.ready-to-download| APP
```
**Note on Webhook vs S3 Event for Recording Processing:**
- **Whereby**: Uses S3 Events → SQS for recording availability (S3 as source of truth, no race conditions)
- **Daily.co**: Uses webhooks for recording availability (more immediate, built-in reliability)
- **Both**: Use webhooks for participant tracking (real-time updates)
## Credentials & Permissions
```mermaid
graph LR
subgraph "Master Credentials"
MASTER[TRANSCRIPT_STORAGE_AWS_*<br/>Access Key ID + Secret]
end
subgraph "Whereby Upload Credentials"
WHEREBY_CREDS[AWS_WHEREBY_ACCESS_KEY_*<br/>Access Key ID + Secret]
end
subgraph "Daily.co Upload Role"
DAILY_ROLE[DAILY_STORAGE_AWS_ROLE_ARN<br/>IAM Role ARN]
end
subgraph "Our App Uses"
MASTER -->|Read/Write/Delete| TRANSCRIPT_BUCKET[Transcript Bucket]
MASTER -->|Read/Delete| WHEREBY_BUCKET[Whereby Bucket]
MASTER -->|Read/Delete| DAILY_BUCKET[Daily.co Bucket]
MASTER -->|Poll/Delete| SQS[SQS Queue]
end
subgraph "We Give To Services"
WHEREBY_CREDS -->|Passed in API call| WHEREBY_SERVICE[Whereby Service]
WHEREBY_SERVICE -->|Write Only| WHEREBY_BUCKET
DAILY_ROLE -->|Passed in API call| DAILY_SERVICE[Daily.co Service]
DAILY_SERVICE -->|Assume Role| DAILY_ROLE
DAILY_SERVICE -->|Write Only| DAILY_BUCKET
end
```
# Video Platform Recording Integration
This document explains how Reflector receives and identifies multitrack audio recordings from different video platforms.
## Platform Comparison
| Platform | Delivery Method | Track Identification |
|----------|----------------|---------------------|
| **Daily.co** | Webhook | Explicit track list in payload |
| **Whereby** | SQS (S3 notifications) | Single file per notification |
---
## Daily.co (Webhook-based)
Daily.co uses **webhooks** to notify Reflector when recordings are ready.
### How It Works
1. **Daily.co sends webhook** when recording is ready
- Event type: `recording.ready-to-download`
- Endpoint: `/v1/daily/webhook` (`reflector/views/daily.py:46-102`)
2. **Webhook payload explicitly includes track list**:
```json
{
"recording_id": "7443ee0a-dab1-40eb-b316-33d6c0d5ff88",
"room_name": "daily-20251020193458",
"tracks": [
{
"type": "audio",
"s3Key": "monadical/daily-20251020193458/1760988935484-52f7f48b-fbab-431f-9a50-87b9abfc8255-cam-audio-1760988935922",
"size": 831843
},
{
"type": "audio",
"s3Key": "monadical/daily-20251020193458/1760988935484-a37c35e3-6f8e-4274-a482-e9d0f102a732-cam-audio-1760988943823",
"size": 408438
},
{
"type": "video",
"s3Key": "monadical/daily-20251020193458/...-video.webm",
"size": 30000000
}
]
}
```
3. **System extracts audio tracks** (`daily.py:211`):
```python
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
```
4. **Triggers multitrack processing** (`daily.py:213-218`):
```python
process_multitrack_recording.delay(
bucket_name=bucket_name, # reflector-dailyco-local
room_name=room_name, # daily-20251020193458
recording_id=recording_id, # 7443ee0a-dab1-40eb-b316-33d6c0d5ff88
track_keys=track_keys # Only audio s3Keys
)
```
### Key Advantage: No Ambiguity
Even though multiple meetings may share the same S3 bucket/folder (`monadical/`), **there's no ambiguity** because:
- Each webhook payload contains the exact `s3Key` list for that specific `recording_id`
- No need to scan folders or guess which files belong together
- Each track's s3Key includes the room timestamp subfolder (e.g., `daily-20251020193458/`)
The room name includes timestamp (`daily-20251020193458`) to keep recordings organized, but **the webhook's explicit track list is what prevents mixing files from different meetings**.
### Track Timeline Extraction
Daily.co provides timing information in two places:
**1. PyAV WebM Metadata (current approach)**:
```python
# Read from WebM container stream metadata
stream.start_time = 8.130s # Meeting-relative timing
```
**2. Filename Timestamps (alternative approach, commit 3bae9076)**:
```
Filename format: {recording_start_ts}-{uuid}-cam-audio-{track_start_ts}.webm
Example: 1760988935484-52f7f48b-fbab-431f-9a50-87b9abfc8255-cam-audio-1760988935922.webm
Parse timestamps:
- recording_start_ts: 1760988935484 (Unix ms)
- track_start_ts: 1760988935922 (Unix ms)
- offset: (1760988935922 - 1760988935484) / 1000 = 0.438s
```
**Time Difference (PyAV vs Filename)**:
```
Track 0:
Filename offset: 438ms
PyAV metadata: 229ms
Difference: 209ms
Track 1:
Filename offset: 8339ms
PyAV metadata: 8130ms
Difference: 209ms
```
**Consistent 209ms delta** suggests network/encoding delay between file upload initiation (filename) and actual audio stream start (metadata).
**Current implementation uses PyAV metadata** because:
- More accurate (represents when audio actually started)
- Padding BEFORE transcription produces correct Whisper timestamps automatically
- No manual offset adjustment needed during transcript merge
### Why Re-encoding During Padding
Padding coincidentally involves re-encoding, which is important for Daily.co + Whisper:
**Problem:** Daily.co skips frames in recordings when microphone is muted or paused
- WebM containers have gaps where audio frames should be
- Whisper doesn't understand these gaps and produces incorrect timestamps
- Example: 5s of audio with 2s muted → file has frames only for 3s, Whisper thinks duration is 3s
**Solution:** Re-encoding via PyAV filter graph (`adelay` + `aresample`)
- Restores missing frames as silence
- Produces continuous audio stream without gaps
- Whisper now sees correct duration and produces accurate timestamps
**Why combined with padding:**
- Already re-encoding for padding (adding initial silence)
- More performant to do both operations in single PyAV pipeline
- Padded values needed for mixdown anyway (creating final MP3)
Implementation: `main_multitrack_pipeline.py:_apply_audio_padding_streaming()`
---
## Whereby (SQS-based)
Whereby uses **AWS SQS** (via S3 notifications) to notify Reflector when files are uploaded.
### How It Works
1. **Whereby uploads recording** to S3
2. **S3 sends notification** to SQS queue (one notification per file)
3. **Reflector polls SQS queue** (`worker/process.py:process_messages()`)
4. **System processes single file** (`worker/process.py:process_recording()`)
### Key Difference from Daily.co
**Whereby (SQS):** System receives S3 notification "file X was created" - only knows about one file at a time, would need to scan folder to find related files
**Daily.co (Webhook):** Daily explicitly tells system which files belong together in the webhook payload
---

View File

@@ -14,7 +14,7 @@ Webhooks are configured at the room level with two fields:
### `transcript.completed`
Triggered when a transcript has been fully processed, including transcription, diarization, summarization, and topic detection.
Triggered when a transcript has been fully processed, including transcription, diarization, summarization, topic detection and calendar event integration.
### `test`
@@ -128,6 +128,27 @@ This event includes a convenient URL for accessing the transcript:
"room": {
"id": "room-789",
"name": "Product Team Room"
},
"calendar_event": {
"id": "calendar-event-123",
"ics_uid": "event-123",
"title": "Q3 Product Planning Meeting",
"start_time": "2025-08-27T12:00:00Z",
"end_time": "2025-08-27T12:30:00Z",
"description": "Team discussed Q3 product roadmap, prioritizing mobile app features and API improvements.",
"location": "Conference Room 1",
"attendees": [
{
"id": "participant-1",
"name": "John Doe",
"speaker": "Speaker 1"
},
{
"id": "participant-2",
"name": "Jane Smith",
"speaker": "Speaker 2"
}
]
}
}
```

View File

@@ -27,7 +27,7 @@ AUTH_JWT_AUDIENCE=
#TRANSCRIPT_MODAL_API_KEY=xxxxx
TRANSCRIPT_BACKEND=modal
TRANSCRIPT_URL=https://monadical-sas--reflector-transcriber-web.modal.run
TRANSCRIPT_URL=https://monadical-sas--reflector-transcriber-parakeet-web.modal.run
TRANSCRIPT_MODAL_API_KEY=
## =======================================================
@@ -71,3 +71,30 @@ DIARIZATION_URL=https://monadical-sas--reflector-diarizer-web.modal.run
## Sentry DSN configuration
#SENTRY_DSN=
## =======================================================
## Video Platform Configuration
## =======================================================
## Whereby
#WHEREBY_API_KEY=your-whereby-api-key
#WHEREBY_WEBHOOK_SECRET=your-whereby-webhook-secret
#WHEREBY_STORAGE_AWS_ACCESS_KEY_ID=your-aws-key
#WHEREBY_STORAGE_AWS_SECRET_ACCESS_KEY=your-aws-secret
#AWS_PROCESS_RECORDING_QUEUE_URL=https://sqs.us-west-2.amazonaws.com/...
## Daily.co
#DAILY_API_KEY=your-daily-api-key
#DAILY_WEBHOOK_SECRET=your-daily-webhook-secret
#DAILY_SUBDOMAIN=your-subdomain
#DAILY_WEBHOOK_UUID= # Auto-populated by recreate_daily_webhook.py script
#DAILYCO_STORAGE_AWS_ROLE_ARN=... # IAM role ARN for Daily.co S3 access
#DAILYCO_STORAGE_AWS_BUCKET_NAME=reflector-dailyco
#DAILYCO_STORAGE_AWS_REGION=us-west-2
## Whereby (optional separate bucket)
#WHEREBY_STORAGE_AWS_BUCKET_NAME=reflector-whereby
#WHEREBY_STORAGE_AWS_REGION=us-east-1
## Platform Configuration
#DEFAULT_VIDEO_PLATFORM=whereby # Default platform for new rooms

View File

@@ -1,8 +1,8 @@
"""Add VideoPlatform enum for rooms and meetings
"""add_platform_support
Revision ID: 6e6ea8e607c5
Revises: 61882a919591
Create Date: 2025-09-02 17:33:21.022214
Revision ID: 1e49625677e4
Revises: 9e3f7b2a4c8e
Create Date: 2025-10-08 13:17:29.943612
"""
@@ -12,33 +12,39 @@ import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "6e6ea8e607c5"
down_revision: Union[str, None] = "61882a919591"
revision: str = "1e49625677e4"
down_revision: Union[str, None] = "9e3f7b2a4c8e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.add_column(
sa.Column("platform", sa.String(), server_default="whereby", nullable=False)
)
"""Add platform field with default 'whereby' for backward compatibility."""
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column("platform", sa.String(), server_default="whereby", nullable=False)
sa.Column(
"platform",
sa.String(),
nullable=True,
server_default=None,
)
)
# ### end Alembic commands ###
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"platform",
sa.String(),
nullable=False,
server_default="whereby",
)
)
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("platform")
"""Remove platform field."""
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.drop_column("platform")
# ### end Alembic commands ###
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("platform")

View File

@@ -1,38 +0,0 @@
"""Add events column to meetings table
Revision ID: 2890b5104577
Revises: 6e6ea8e607c5
Create Date: 2025-09-02 17:51:41.620777
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "2890b5104577"
down_revision: Union[str, None] = "6e6ea8e607c5"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"events", sa.JSON(), server_default=sa.text("'[]'"), nullable=False
)
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.drop_column("events")
# ### end Alembic commands ###

View File

@@ -0,0 +1,79 @@
"""add daily participant session table with immutable left_at
Revision ID: 2b92a1b03caa
Revises: f8294b31f022
Create Date: 2025-11-13 20:29:30.486577
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "2b92a1b03caa"
down_revision: Union[str, None] = "f8294b31f022"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create table
op.create_table(
"daily_participant_session",
sa.Column("id", sa.String(), nullable=False),
sa.Column("meeting_id", sa.String(), nullable=False),
sa.Column("room_id", sa.String(), nullable=False),
sa.Column("session_id", sa.String(), nullable=False),
sa.Column("user_id", sa.String(), nullable=True),
sa.Column("user_name", sa.String(), nullable=False),
sa.Column("joined_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("left_at", sa.DateTime(timezone=True), nullable=True),
sa.ForeignKeyConstraint(["meeting_id"], ["meeting.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["room_id"], ["room.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
with op.batch_alter_table("daily_participant_session", schema=None) as batch_op:
batch_op.create_index(
"idx_daily_session_meeting_left", ["meeting_id", "left_at"], unique=False
)
batch_op.create_index("idx_daily_session_room", ["room_id"], unique=False)
# Create trigger function to prevent left_at from being updated once set
op.execute("""
CREATE OR REPLACE FUNCTION prevent_left_at_update()
RETURNS TRIGGER AS $$
BEGIN
IF OLD.left_at IS NOT NULL THEN
RAISE EXCEPTION 'left_at is immutable once set';
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
""")
# Create trigger
op.execute("""
CREATE TRIGGER prevent_left_at_update_trigger
BEFORE UPDATE ON daily_participant_session
FOR EACH ROW
EXECUTE FUNCTION prevent_left_at_update();
""")
def downgrade() -> None:
# Drop trigger
op.execute(
"DROP TRIGGER IF EXISTS prevent_left_at_update_trigger ON daily_participant_session;"
)
# Drop trigger function
op.execute("DROP FUNCTION IF EXISTS prevent_left_at_update();")
# Drop indexes and table
with op.batch_alter_table("daily_participant_session", schema=None) as batch_op:
batch_op.drop_index("idx_daily_session_room")
batch_op.drop_index("idx_daily_session_meeting_left")
op.drop_table("daily_participant_session")

View File

@@ -0,0 +1,53 @@
"""remove_one_active_meeting_per_room_constraint
Revision ID: 6025e9b2bef2
Revises: 2ae3db106d4e
Create Date: 2025-08-18 18:45:44.418392
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "6025e9b2bef2"
down_revision: Union[str, None] = "2ae3db106d4e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Remove the unique constraint that prevents multiple active meetings per room
# This is needed to support calendar integration with overlapping meetings
# Check if index exists before trying to drop it
from alembic import context
if context.get_context().dialect.name == "postgresql":
conn = op.get_bind()
result = conn.execute(
sa.text(
"SELECT 1 FROM pg_indexes WHERE indexname = 'idx_one_active_meeting_per_room'"
)
)
if result.fetchone():
op.drop_index("idx_one_active_meeting_per_room", table_name="meeting")
else:
# For SQLite, just try to drop it
try:
op.drop_index("idx_one_active_meeting_per_room", table_name="meeting")
except:
pass
def downgrade() -> None:
# Restore the unique constraint
op.create_index(
"idx_one_active_meeting_per_room",
"meeting",
["room_id"],
unique=True,
postgresql_where=sa.text("is_active = true"),
sqlite_where=sa.text("is_active = 1"),
)

View File

@@ -8,7 +8,6 @@ Create Date: 2025-09-10 10:47:06.006819
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
@@ -21,7 +20,6 @@ depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.alter_column("room_id", existing_type=sa.VARCHAR(), nullable=False)
batch_op.create_foreign_key(
None, "room", ["room_id"], ["id"], ondelete="CASCADE"
)
@@ -33,6 +31,5 @@ def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.drop_constraint("meeting_room_id_fkey", type_="foreignkey")
batch_op.alter_column("room_id", existing_type=sa.VARCHAR(), nullable=True)
# ### end Alembic commands ###

View File

@@ -0,0 +1,38 @@
"""add user api keys
Revision ID: 9e3f7b2a4c8e
Revises: dc035ff72fd5
Create Date: 2025-10-17 00:00:00.000000
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "9e3f7b2a4c8e"
down_revision: Union[str, None] = "dc035ff72fd5"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"user_api_key",
sa.Column("id", sa.String(), nullable=False),
sa.Column("user_id", sa.String(), nullable=False),
sa.Column("key_hash", sa.String(), nullable=False),
sa.Column("name", sa.String(), nullable=True),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
with op.batch_alter_table("user_api_key", schema=None) as batch_op:
batch_op.create_index("idx_user_api_key_hash", ["key_hash"], unique=True)
batch_op.create_index("idx_user_api_key_user_id", ["user_id"], unique=False)
def downgrade() -> None:
op.drop_table("user_api_key")

View File

@@ -0,0 +1,34 @@
"""add_grace_period_fields_to_meeting
Revision ID: d4a1c446458c
Revises: 6025e9b2bef2
Create Date: 2025-08-18 18:50:37.768052
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "d4a1c446458c"
down_revision: Union[str, None] = "6025e9b2bef2"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Add fields to track when participants left for grace period logic
op.add_column(
"meeting", sa.Column("last_participant_left_at", sa.DateTime(timezone=True))
)
op.add_column(
"meeting",
sa.Column("grace_period_minutes", sa.Integer, server_default=sa.text("15")),
)
def downgrade() -> None:
op.drop_column("meeting", "grace_period_minutes")
op.drop_column("meeting", "last_participant_left_at")

View File

@@ -0,0 +1,129 @@
"""add calendar
Revision ID: d8e204bbf615
Revises: d4a1c446458c
Create Date: 2025-09-10 19:56:22.295756
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = "d8e204bbf615"
down_revision: Union[str, None] = "d4a1c446458c"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"calendar_event",
sa.Column("id", sa.String(), nullable=False),
sa.Column("room_id", sa.String(), nullable=False),
sa.Column("ics_uid", sa.Text(), nullable=False),
sa.Column("title", sa.Text(), nullable=True),
sa.Column("description", sa.Text(), nullable=True),
sa.Column("start_time", sa.DateTime(timezone=True), nullable=False),
sa.Column("end_time", sa.DateTime(timezone=True), nullable=False),
sa.Column("attendees", postgresql.JSONB(astext_type=sa.Text()), nullable=True),
sa.Column("location", sa.Text(), nullable=True),
sa.Column("ics_raw_data", sa.Text(), nullable=True),
sa.Column("last_synced", sa.DateTime(timezone=True), nullable=False),
sa.Column(
"is_deleted", sa.Boolean(), server_default=sa.text("false"), nullable=False
),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
sa.ForeignKeyConstraint(
["room_id"],
["room.id"],
name="fk_calendar_event_room_id",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("room_id", "ics_uid", name="uq_room_calendar_event"),
)
with op.batch_alter_table("calendar_event", schema=None) as batch_op:
batch_op.create_index(
"idx_calendar_event_deleted",
["is_deleted"],
unique=False,
postgresql_where=sa.text("NOT is_deleted"),
)
batch_op.create_index(
"idx_calendar_event_room_start", ["room_id", "start_time"], unique=False
)
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.add_column(sa.Column("calendar_event_id", sa.String(), nullable=True))
batch_op.add_column(
sa.Column(
"calendar_metadata",
postgresql.JSONB(astext_type=sa.Text()),
nullable=True,
)
)
batch_op.create_index(
"idx_meeting_calendar_event", ["calendar_event_id"], unique=False
)
batch_op.create_foreign_key(
"fk_meeting_calendar_event_id",
"calendar_event",
["calendar_event_id"],
["id"],
ondelete="SET NULL",
)
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(sa.Column("ics_url", sa.Text(), nullable=True))
batch_op.add_column(
sa.Column(
"ics_fetch_interval", sa.Integer(), server_default="300", nullable=True
)
)
batch_op.add_column(
sa.Column(
"ics_enabled",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)
batch_op.add_column(
sa.Column("ics_last_sync", sa.DateTime(timezone=True), nullable=True)
)
batch_op.add_column(sa.Column("ics_last_etag", sa.Text(), nullable=True))
batch_op.create_index("idx_room_ics_enabled", ["ics_enabled"], unique=False)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_index("idx_room_ics_enabled")
batch_op.drop_column("ics_last_etag")
batch_op.drop_column("ics_last_sync")
batch_op.drop_column("ics_enabled")
batch_op.drop_column("ics_fetch_interval")
batch_op.drop_column("ics_url")
with op.batch_alter_table("meeting", schema=None) as batch_op:
batch_op.drop_constraint("fk_meeting_calendar_event_id", type_="foreignkey")
batch_op.drop_index("idx_meeting_calendar_event")
batch_op.drop_column("calendar_metadata")
batch_op.drop_column("calendar_event_id")
with op.batch_alter_table("calendar_event", schema=None) as batch_op:
batch_op.drop_index("idx_calendar_event_room_start")
batch_op.drop_index(
"idx_calendar_event_deleted", postgresql_where=sa.text("NOT is_deleted")
)
op.drop_table("calendar_event")
# ### end Alembic commands ###

View File

@@ -0,0 +1,43 @@
"""remove_grace_period_fields
Revision ID: dc035ff72fd5
Revises: d8e204bbf615
Create Date: 2025-09-11 10:36:45.197588
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "dc035ff72fd5"
down_revision: Union[str, None] = "d8e204bbf615"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Remove grace period columns from meeting table
op.drop_column("meeting", "last_participant_left_at")
op.drop_column("meeting", "grace_period_minutes")
def downgrade() -> None:
# Add back grace period columns to meeting table
op.add_column(
"meeting",
sa.Column(
"last_participant_left_at", sa.DateTime(timezone=True), nullable=True
),
)
op.add_column(
"meeting",
sa.Column(
"grace_period_minutes",
sa.Integer(),
server_default=sa.text("15"),
nullable=True,
),
)

View File

@@ -0,0 +1,28 @@
"""add_track_keys
Revision ID: f8294b31f022
Revises: 1e49625677e4
Create Date: 2025-10-27 18:52:17.589167
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "f8294b31f022"
down_revision: Union[str, None] = "1e49625677e4"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("recording", schema=None) as batch_op:
batch_op.add_column(sa.Column("track_keys", sa.JSON(), nullable=True))
def downgrade() -> None:
with op.batch_alter_table("recording", schema=None) as batch_op:
batch_op.drop_column("track_keys")

View File

@@ -12,7 +12,6 @@ dependencies = [
"requests>=2.31.0",
"aiortc>=1.5.0",
"sortedcontainers>=2.4.0",
"loguru>=0.7.0",
"pydantic-settings>=2.0.2",
"structlog>=23.1.0",
"uvicorn[standard]>=0.23.1",
@@ -27,7 +26,6 @@ dependencies = [
"prometheus-fastapi-instrumentator>=6.1.0",
"sentencepiece>=0.1.99",
"protobuf>=4.24.3",
"profanityfilter>=2.0.6",
"celery>=5.3.4",
"redis>=5.0.1",
"python-jose[cryptography]>=3.3.0",
@@ -40,7 +38,7 @@ dependencies = [
"llama-index-llms-openai-like>=0.4.0",
"pytest-env>=1.1.5",
"webvtt-py>=0.5.0",
"PyJWT>=2.8.0",
"icalendar>=6.0.0",
]
[dependency-groups]
@@ -114,13 +112,14 @@ source = ["reflector"]
[tool.pytest_env]
ENVIRONMENT = "pytest"
DATABASE_URL = "postgresql://test_user:test_password@localhost:15432/reflector_test"
AUTH_BACKEND = "jwt"
[tool.pytest.ini_options]
addopts = "-ra -q --disable-pytest-warnings --cov --cov-report html -v"
testpaths = ["tests"]
asyncio_mode = "auto"
markers = [
"gpu_modal: mark test to run only with GPU Modal endpoints (deselect with '-m \"not gpu_modal\"')",
"model_api: tests for the unified model-serving HTTP API (backend- and hardware-agnostic)",
]
[tool.ruff.lint]
@@ -132,7 +131,7 @@ select = [
[tool.ruff.lint.per-file-ignores]
"reflector/processors/summary/summary_builder.py" = ["E501"]
"gpu/**.py" = ["PLC0415"]
"gpu/modal_deployments/**.py" = ["PLC0415"]
"reflector/tools/**.py" = ["PLC0415"]
"migrations/versions/**.py" = ["PLC0415"]
"tests/**.py" = ["PLC0415"]

View File

@@ -12,9 +12,7 @@ from reflector.events import subscribers_shutdown, subscribers_startup
from reflector.logger import logger
from reflector.metrics import metrics_init
from reflector.settings import settings
from reflector.video_platforms.jitsi import router as jitsi_router
from reflector.video_platforms.whereby import router as whereby_router
from reflector.views.jibri_webhook import router as jibri_webhook_router
from reflector.views.daily import router as daily_router
from reflector.views.meetings import router as meetings_router
from reflector.views.rooms import router as rooms_router
from reflector.views.rtc_offer import router as rtc_offer_router
@@ -29,6 +27,9 @@ from reflector.views.transcripts_upload import router as transcripts_upload_rout
from reflector.views.transcripts_webrtc import router as transcripts_webrtc_router
from reflector.views.transcripts_websocket import router as transcripts_websocket_router
from reflector.views.user import router as user_router
from reflector.views.user_api_keys import router as user_api_keys_router
from reflector.views.user_websocket import router as user_ws_router
from reflector.views.whereby import router as whereby_router
from reflector.views.zulip import router as zulip_router
try:
@@ -67,6 +68,12 @@ app.add_middleware(
allow_headers=["*"],
)
@app.get("/health")
async def health():
return {"status": "healthy"}
# metrics
instrumentator = Instrumentator(
excluded_handlers=["/docs", "/metrics"],
@@ -86,10 +93,11 @@ app.include_router(transcripts_websocket_router, prefix="/v1")
app.include_router(transcripts_webrtc_router, prefix="/v1")
app.include_router(transcripts_process_router, prefix="/v1")
app.include_router(user_router, prefix="/v1")
app.include_router(user_api_keys_router, prefix="/v1")
app.include_router(user_ws_router, prefix="/v1")
app.include_router(zulip_router, prefix="/v1")
app.include_router(whereby_router, prefix="/v1")
app.include_router(jitsi_router, prefix="/v1")
app.include_router(jibri_webhook_router) # No /v1 prefix, uses /api/v1/jibri
app.include_router(daily_router, prefix="/v1/daily")
add_pagination(app)
# prepare celery

View File

@@ -1,14 +1,16 @@
from typing import Annotated, Optional
from typing import Annotated, List, Optional
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
from fastapi.security import APIKeyHeader, OAuth2PasswordBearer
from jose import JWTError, jwt
from pydantic import BaseModel
from reflector.db.user_api_keys import user_api_keys_controller
from reflector.logger import logger
from reflector.settings import settings
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
jwt_public_key = open(f"reflector/auth/jwt/keys/{settings.AUTH_JWT_PUBLIC_KEY}").read()
jwt_algorithm = settings.AUTH_JWT_ALGORITHM
@@ -26,7 +28,7 @@ class JWTException(Exception):
class UserInfo(BaseModel):
sub: str
email: str
email: Optional[str] = None
def __getitem__(self, key):
return getattr(self, key)
@@ -58,33 +60,53 @@ def authenticated(token: Annotated[str, Depends(oauth2_scheme)]):
return None
def current_user(
token: Annotated[Optional[str], Depends(oauth2_scheme)],
jwtauth: JWTAuth = Depends(),
):
if token is None:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
payload = jwtauth.verify_token(token)
sub = payload["sub"]
return UserInfo(sub=sub)
except JWTError as e:
logger.error(f"JWT error: {e}")
raise HTTPException(status_code=401, detail="Invalid authentication")
async def _authenticate_user(
jwt_token: Optional[str],
api_key: Optional[str],
jwtauth: JWTAuth,
) -> UserInfo | None:
user_infos: List[UserInfo] = []
if api_key:
user_api_key = await user_api_keys_controller.verify_key(api_key)
if user_api_key:
user_infos.append(UserInfo(sub=user_api_key.user_id, email=None))
if jwt_token:
try:
payload = jwtauth.verify_token(jwt_token)
sub = payload["sub"]
email = payload["email"]
user_infos.append(UserInfo(sub=sub, email=email))
except JWTError as e:
logger.error(f"JWT error: {e}")
raise HTTPException(status_code=401, detail="Invalid authentication")
def current_user_optional(
token: Annotated[Optional[str], Depends(oauth2_scheme)],
jwtauth: JWTAuth = Depends(),
):
# we accept no token, but if one is provided, it must be a valid one.
if token is None:
if len(user_infos) == 0:
return None
try:
payload = jwtauth.verify_token(token)
sub = payload["sub"]
email = payload["email"]
return UserInfo(sub=sub, email=email)
except JWTError as e:
logger.error(f"JWT error: {e}")
raise HTTPException(status_code=401, detail="Invalid authentication")
if len(set([x.sub for x in user_infos])) > 1:
raise JWTException(
status_code=401,
detail="Invalid authentication: more than one user provided",
)
return user_infos[0]
async def current_user(
jwt_token: Annotated[Optional[str], Depends(oauth2_scheme)],
api_key: Annotated[Optional[str], Depends(api_key_header)],
jwtauth: JWTAuth = Depends(),
):
user = await _authenticate_user(jwt_token, api_key, jwtauth)
if user is None:
raise HTTPException(status_code=401, detail="Not authenticated")
return user
async def current_user_optional(
jwt_token: Annotated[Optional[str], Depends(oauth2_scheme)],
api_key: Annotated[Optional[str], Depends(api_key_header)],
jwtauth: JWTAuth = Depends(),
):
return await _authenticate_user(jwt_token, api_key, jwtauth)

View File

@@ -24,10 +24,13 @@ def get_database() -> databases.Database:
# import models
import reflector.db.calendar_events # noqa
import reflector.db.daily_participant_sessions # noqa
import reflector.db.meetings # noqa
import reflector.db.recordings # noqa
import reflector.db.rooms # noqa
import reflector.db.transcripts # noqa
import reflector.db.user_api_keys # noqa
kwargs = {}
if "postgres" not in settings.DATABASE_URL:

View File

@@ -0,0 +1,187 @@
from datetime import datetime, timedelta, timezone
from typing import Any
import sqlalchemy as sa
from pydantic import BaseModel, Field
from sqlalchemy.dialects.postgresql import JSONB
from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4
calendar_events = sa.Table(
"calendar_event",
metadata,
sa.Column("id", sa.String, primary_key=True),
sa.Column(
"room_id",
sa.String,
sa.ForeignKey("room.id", ondelete="CASCADE", name="fk_calendar_event_room_id"),
nullable=False,
),
sa.Column("ics_uid", sa.Text, nullable=False),
sa.Column("title", sa.Text),
sa.Column("description", sa.Text),
sa.Column("start_time", sa.DateTime(timezone=True), nullable=False),
sa.Column("end_time", sa.DateTime(timezone=True), nullable=False),
sa.Column("attendees", JSONB),
sa.Column("location", sa.Text),
sa.Column("ics_raw_data", sa.Text),
sa.Column("last_synced", sa.DateTime(timezone=True), nullable=False),
sa.Column("is_deleted", sa.Boolean, nullable=False, server_default=sa.false()),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
sa.UniqueConstraint("room_id", "ics_uid", name="uq_room_calendar_event"),
sa.Index("idx_calendar_event_room_start", "room_id", "start_time"),
sa.Index(
"idx_calendar_event_deleted",
"is_deleted",
postgresql_where=sa.text("NOT is_deleted"),
),
)
class CalendarEvent(BaseModel):
id: str = Field(default_factory=generate_uuid4)
room_id: str
ics_uid: str
title: str | None = None
description: str | None = None
start_time: datetime
end_time: datetime
attendees: list[dict[str, Any]] | None = None
location: str | None = None
ics_raw_data: str | None = None
last_synced: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
is_deleted: bool = False
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class CalendarEventController:
async def get_by_room(
self,
room_id: str,
include_deleted: bool = False,
start_after: datetime | None = None,
end_before: datetime | None = None,
) -> list[CalendarEvent]:
query = calendar_events.select().where(calendar_events.c.room_id == room_id)
if not include_deleted:
query = query.where(calendar_events.c.is_deleted == False)
if start_after:
query = query.where(calendar_events.c.start_time >= start_after)
if end_before:
query = query.where(calendar_events.c.end_time <= end_before)
query = query.order_by(calendar_events.c.start_time.asc())
results = await get_database().fetch_all(query)
return [CalendarEvent(**result) for result in results]
async def get_upcoming(
self, room_id: str, minutes_ahead: int = 120
) -> list[CalendarEvent]:
"""Get upcoming events for a room within the specified minutes, including currently happening events."""
now = datetime.now(timezone.utc)
future_time = now + timedelta(minutes=minutes_ahead)
query = (
calendar_events.select()
.where(
sa.and_(
calendar_events.c.room_id == room_id,
calendar_events.c.is_deleted == False,
calendar_events.c.start_time <= future_time,
calendar_events.c.end_time >= now,
)
)
.order_by(calendar_events.c.start_time.asc())
)
results = await get_database().fetch_all(query)
return [CalendarEvent(**result) for result in results]
async def get_by_id(self, event_id: str) -> CalendarEvent | None:
query = calendar_events.select().where(calendar_events.c.id == event_id)
result = await get_database().fetch_one(query)
return CalendarEvent(**result) if result else None
async def get_by_ics_uid(self, room_id: str, ics_uid: str) -> CalendarEvent | None:
query = calendar_events.select().where(
sa.and_(
calendar_events.c.room_id == room_id,
calendar_events.c.ics_uid == ics_uid,
)
)
result = await get_database().fetch_one(query)
return CalendarEvent(**result) if result else None
async def upsert(self, event: CalendarEvent) -> CalendarEvent:
existing = await self.get_by_ics_uid(event.room_id, event.ics_uid)
if existing:
event.id = existing.id
event.created_at = existing.created_at
event.updated_at = datetime.now(timezone.utc)
query = (
calendar_events.update()
.where(calendar_events.c.id == existing.id)
.values(**event.model_dump())
)
else:
query = calendar_events.insert().values(**event.model_dump())
await get_database().execute(query)
return event
async def soft_delete_missing(
self, room_id: str, current_ics_uids: list[str]
) -> int:
"""Soft delete future events that are no longer in the calendar."""
now = datetime.now(timezone.utc)
select_query = calendar_events.select().where(
sa.and_(
calendar_events.c.room_id == room_id,
calendar_events.c.start_time > now,
calendar_events.c.is_deleted == False,
calendar_events.c.ics_uid.notin_(current_ics_uids)
if current_ics_uids
else True,
)
)
to_delete = await get_database().fetch_all(select_query)
delete_count = len(to_delete)
if delete_count > 0:
update_query = (
calendar_events.update()
.where(
sa.and_(
calendar_events.c.room_id == room_id,
calendar_events.c.start_time > now,
calendar_events.c.is_deleted == False,
calendar_events.c.ics_uid.notin_(current_ics_uids)
if current_ics_uids
else True,
)
)
.values(is_deleted=True, updated_at=now)
)
await get_database().execute(update_query)
return delete_count
async def delete_by_room(self, room_id: str) -> int:
query = calendar_events.delete().where(calendar_events.c.room_id == room_id)
result = await get_database().execute(query)
return result.rowcount
calendar_events_controller = CalendarEventController()

View File

@@ -0,0 +1,169 @@
"""Daily.co participant session tracking.
Stores webhook data for participant.joined and participant.left events to provide
historical session information (Daily.co API only returns current participants).
"""
from datetime import datetime
import sqlalchemy as sa
from pydantic import BaseModel
from sqlalchemy.dialects.postgresql import insert
from reflector.db import get_database, metadata
from reflector.utils.string import NonEmptyString
daily_participant_sessions = sa.Table(
"daily_participant_session",
metadata,
sa.Column("id", sa.String, primary_key=True),
sa.Column(
"meeting_id",
sa.String,
sa.ForeignKey("meeting.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column(
"room_id",
sa.String,
sa.ForeignKey("room.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("session_id", sa.String, nullable=False),
sa.Column("user_id", sa.String, nullable=True),
sa.Column("user_name", sa.String, nullable=False),
sa.Column("joined_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("left_at", sa.DateTime(timezone=True), nullable=True),
sa.Index("idx_daily_session_meeting_left", "meeting_id", "left_at"),
sa.Index("idx_daily_session_room", "room_id"),
)
class DailyParticipantSession(BaseModel):
"""Daily.co participant session record.
Tracks when a participant joined and left a meeting. Populated from webhooks:
- participant.joined: Creates record with left_at=None
- participant.left: Updates record with left_at
ID format: {meeting_id}:{user_id}:{joined_at_ms}
- Ensures idempotency (duplicate webhooks don't create duplicates)
- Allows same user to rejoin (different joined_at = different session)
Duration is calculated as: left_at - joined_at (not stored)
"""
id: NonEmptyString
meeting_id: NonEmptyString
room_id: NonEmptyString
session_id: NonEmptyString # Daily.co's session_id (identifies room session)
user_id: NonEmptyString | None = None
user_name: str
joined_at: datetime
left_at: datetime | None = None
class DailyParticipantSessionController:
"""Controller for Daily.co participant session persistence."""
async def get_by_id(self, id: str) -> DailyParticipantSession | None:
"""Get a session by its ID."""
query = daily_participant_sessions.select().where(
daily_participant_sessions.c.id == id
)
result = await get_database().fetch_one(query)
return DailyParticipantSession(**result) if result else None
async def get_open_session(
self, meeting_id: NonEmptyString, session_id: NonEmptyString
) -> DailyParticipantSession | None:
"""Get the open (not left) session for a user in a meeting."""
query = daily_participant_sessions.select().where(
sa.and_(
daily_participant_sessions.c.meeting_id == meeting_id,
daily_participant_sessions.c.session_id == session_id,
daily_participant_sessions.c.left_at.is_(None),
)
)
results = await get_database().fetch_all(query)
if len(results) > 1:
raise ValueError(
f"Multiple open sessions for daily session {session_id} in meeting {meeting_id}: "
f"found {len(results)} sessions"
)
return DailyParticipantSession(**results[0]) if results else None
async def upsert_joined(self, session: DailyParticipantSession) -> None:
"""Insert or update when participant.joined webhook arrives.
Idempotent: Duplicate webhooks with same ID are safely ignored.
Out-of-order: If left webhook arrived first, preserves left_at.
"""
query = insert(daily_participant_sessions).values(**session.model_dump())
query = query.on_conflict_do_update(
index_elements=["id"],
set_={"user_name": session.user_name},
)
await get_database().execute(query)
async def upsert_left(self, session: DailyParticipantSession) -> None:
"""Update session when participant.left webhook arrives.
Finds the open session for this user in this meeting and updates left_at.
Works around Daily.co webhook timestamp inconsistency (joined_at differs by ~4ms between webhooks).
Handles three cases:
1. Normal flow: open session exists → updates left_at
2. Out-of-order: left arrives first → creates new record with left data
3. Duplicate: left arrives again → idempotent (DB trigger prevents left_at modification)
"""
if session.left_at is None:
raise ValueError("left_at is required for upsert_left")
if session.left_at <= session.joined_at:
raise ValueError(
f"left_at ({session.left_at}) must be after joined_at ({session.joined_at})"
)
# Find existing open session (works around timestamp mismatch in webhooks)
existing = await self.get_open_session(session.meeting_id, session.session_id)
if existing:
# Update existing open session
query = (
daily_participant_sessions.update()
.where(daily_participant_sessions.c.id == existing.id)
.values(left_at=session.left_at)
)
await get_database().execute(query)
else:
# Out-of-order or first webhook: insert new record
query = insert(daily_participant_sessions).values(**session.model_dump())
query = query.on_conflict_do_nothing(index_elements=["id"])
await get_database().execute(query)
async def get_by_meeting(self, meeting_id: str) -> list[DailyParticipantSession]:
"""Get all participant sessions for a meeting (active and ended)."""
query = daily_participant_sessions.select().where(
daily_participant_sessions.c.meeting_id == meeting_id
)
results = await get_database().fetch_all(query)
return [DailyParticipantSession(**result) for result in results]
async def get_active_by_meeting(
self, meeting_id: str
) -> list[DailyParticipantSession]:
"""Get only active (not left) participant sessions for a meeting."""
query = daily_participant_sessions.select().where(
sa.and_(
daily_participant_sessions.c.meeting_id == meeting_id,
daily_participant_sessions.c.left_at.is_(None),
)
)
results = await get_database().fetch_all(query)
return [DailyParticipantSession(**result) for result in results]
daily_participant_sessions_controller = DailyParticipantSessionController()

View File

@@ -1,12 +1,16 @@
from datetime import datetime, timezone
from typing import Any, Dict, List, Literal
from datetime import datetime
from typing import Any, Literal
import sqlalchemy as sa
from pydantic import BaseModel, Field
from sqlalchemy.dialects.postgresql import JSONB
from reflector.db import get_database, metadata
from reflector.db.rooms import Room, VideoPlatform
from reflector.db.rooms import Room
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.utils import generate_uuid4
from reflector.utils.string import assert_equal
from reflector.video_platforms.factory import get_platform
meetings = sa.Table(
"meeting",
@@ -44,15 +48,24 @@ meetings = sa.Table(
nullable=False,
server_default=sa.true(),
),
sa.Column("platform", sa.String, nullable=False, server_default="whereby"),
sa.Column("events", sa.JSON, nullable=False, server_default=sa.text("'[]'")),
sa.Index("idx_meeting_room_id", "room_id"),
sa.Index(
"idx_one_active_meeting_per_room",
"room_id",
unique=True,
postgresql_where=sa.text("is_active = true"),
sa.Column(
"calendar_event_id",
sa.String,
sa.ForeignKey(
"calendar_event.id",
ondelete="SET NULL",
name="fk_meeting_calendar_event_id",
),
),
sa.Column("calendar_metadata", JSONB),
sa.Column(
"platform",
sa.String,
nullable=False,
server_default=assert_equal(WHEREBY_PLATFORM, "whereby"),
),
sa.Index("idx_meeting_room_id", "room_id"),
sa.Index("idx_meeting_calendar_event", "calendar_event_id"),
)
meeting_consent = sa.Table(
@@ -90,12 +103,14 @@ class Meeting(BaseModel):
is_locked: bool = False
room_mode: Literal["normal", "group"] = "normal"
recording_type: Literal["none", "local", "cloud"] = "cloud"
recording_trigger: Literal[
recording_trigger: Literal[ # whereby-specific
"none", "prompt", "automatic", "automatic-2nd-participant"
] = "automatic-2nd-participant"
num_clients: int = 0
platform: VideoPlatform = VideoPlatform.WHEREBY
events: List[Dict[str, Any]] = Field(default_factory=list)
is_active: bool = True
calendar_event_id: str | None = None
calendar_metadata: dict[str, Any] | None = None
platform: Platform = WHEREBY_PLATFORM
class MeetingController:
@@ -108,6 +123,8 @@ class MeetingController:
start_date: datetime,
end_date: datetime,
room: Room,
calendar_event_id: str | None = None,
calendar_metadata: dict[str, Any] | None = None,
):
meeting = Meeting(
id=id,
@@ -121,7 +138,9 @@ class MeetingController:
room_mode=room.room_mode,
recording_type=room.recording_type,
recording_trigger=room.recording_trigger,
platform=room.platform,
calendar_event_id=calendar_event_id,
calendar_metadata=calendar_metadata,
platform=get_platform(room.platform),
)
query = meetings.insert().values(**meeting.model_dump())
await get_database().execute(query)
@@ -129,20 +148,32 @@ class MeetingController:
async def get_all_active(self) -> list[Meeting]:
query = meetings.select().where(meetings.c.is_active)
return await get_database().fetch_all(query)
results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results]
async def get_by_room_name(
self,
room_name: str,
) -> Meeting | None:
query = meetings.select().where(meetings.c.room_name == room_name)
"""
Get a meeting by room name.
For backward compatibility, returns the most recent meeting.
"""
query = (
meetings.select()
.where(meetings.c.room_name == room_name)
.order_by(meetings.c.end_date.desc())
)
result = await get_database().fetch_one(query)
if not result:
return None
return Meeting(**result)
async def get_active(self, room: Room, current_time: datetime) -> Meeting | None:
"""
Get latest active meeting for a room.
For backward compatibility, returns the most recent active meeting.
"""
end_date = getattr(meetings.c, "end_date")
query = (
meetings.select()
@@ -158,11 +189,66 @@ class MeetingController:
result = await get_database().fetch_one(query)
if not result:
return None
return Meeting(**result)
async def get_by_id(self, meeting_id: str, **kwargs) -> Meeting | None:
async def get_all_active_for_room(
self, room: Room, current_time: datetime
) -> list[Meeting]:
end_date = getattr(meetings.c, "end_date")
query = (
meetings.select()
.where(
sa.and_(
meetings.c.room_id == room.id,
meetings.c.end_date > current_time,
meetings.c.is_active,
)
)
.order_by(end_date.desc())
)
results = await get_database().fetch_all(query)
return [Meeting(**result) for result in results]
async def get_active_by_calendar_event(
self, room: Room, calendar_event_id: str, current_time: datetime
) -> Meeting | None:
"""
Get active meeting for a specific calendar event.
"""
query = meetings.select().where(
sa.and_(
meetings.c.room_id == room.id,
meetings.c.calendar_event_id == calendar_event_id,
meetings.c.end_date > current_time,
meetings.c.is_active,
)
)
result = await get_database().fetch_one(query)
if not result:
return None
return Meeting(**result)
async def get_by_id(
self, meeting_id: str, room: Room | None = None
) -> Meeting | None:
query = meetings.select().where(meetings.c.id == meeting_id)
if room:
query = query.where(meetings.c.room_id == room.id)
result = await get_database().fetch_one(query)
if not result:
return None
return Meeting(**result)
async def get_by_calendar_event(
self, calendar_event_id: str, room: Room
) -> Meeting | None:
query = meetings.select().where(
meetings.c.calendar_event_id == calendar_event_id
)
if room:
query = query.where(meetings.c.room_id == room.id)
result = await get_database().fetch_one(query)
if not result:
return None
@@ -172,67 +258,27 @@ class MeetingController:
query = meetings.update().where(meetings.c.id == meeting_id).values(**kwargs)
await get_database().execute(query)
async def add_event(
self, meeting_id: str, event_type: str, event_data: Dict[str, Any] = None
):
"""Add an event to a meeting's events list."""
if event_data is None:
event_data = {}
event = {
"type": event_type,
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
"data": event_data,
}
# Get current events
query = meetings.select().where(meetings.c.id == meeting_id)
result = await get_database().fetch_one(query)
if not result:
return
current_events = result["events"] or []
current_events.append(event)
# Update with new events list
update_query = (
async def increment_num_clients(self, meeting_id: str) -> None:
"""Atomically increment participant count."""
query = (
meetings.update()
.where(meetings.c.id == meeting_id)
.values(events=current_events)
.values(num_clients=meetings.c.num_clients + 1)
)
await get_database().execute(update_query)
await get_database().execute(query)
async def participant_joined(
self, meeting_id: str, participant_data: Dict[str, Any] = None
):
"""Record a participant joined event."""
await self.add_event(meeting_id, "participant_joined", participant_data)
async def participant_left(
self, meeting_id: str, participant_data: Dict[str, Any] = None
):
"""Record a participant left event."""
await self.add_event(meeting_id, "participant_left", participant_data)
async def recording_started(
self, meeting_id: str, recording_data: Dict[str, Any] = None
):
"""Record a recording started event."""
await self.add_event(meeting_id, "recording_started", recording_data)
async def recording_stopped(
self, meeting_id: str, recording_data: Dict[str, Any] = None
):
"""Record a recording stopped event."""
await self.add_event(meeting_id, "recording_stopped", recording_data)
async def get_events(self, meeting_id: str) -> List[Dict[str, Any]]:
"""Get all events for a meeting."""
query = meetings.select().where(meetings.c.id == meeting_id)
result = await get_database().fetch_one(query)
if not result:
return []
return result["events"] or []
async def decrement_num_clients(self, meeting_id: str) -> None:
"""Atomically decrement participant count (min 0)."""
query = (
meetings.update()
.where(meetings.c.id == meeting_id)
.values(
num_clients=sa.case(
(meetings.c.num_clients > 0, meetings.c.num_clients - 1), else_=0
)
)
)
await get_database().execute(query)
class MeetingConsentController:
@@ -257,7 +303,6 @@ class MeetingConsentController:
return MeetingConsent(**result)
async def upsert(self, consent: MeetingConsent) -> MeetingConsent:
"""Create new consent or update existing one for authenticated users"""
if consent.user_id:
# For authenticated users, check if consent already exists
# not transactional but we're ok with that; the consents ain't deleted anyways

View File

@@ -21,6 +21,7 @@ recordings = sa.Table(
server_default="pending",
),
sa.Column("meeting_id", sa.String),
sa.Column("track_keys", sa.JSON, nullable=True),
sa.Index("idx_recording_meeting_id", "meeting_id"),
)
@@ -28,10 +29,13 @@ recordings = sa.Table(
class Recording(BaseModel):
id: str = Field(default_factory=generate_uuid4)
bucket_name: str
# for single-track
object_key: str
recorded_at: datetime
status: Literal["pending", "processing", "completed", "failed"] = "pending"
meeting_id: str | None = None
# for multitrack reprocessing
track_keys: list[str] | None = None
class RecordingController:

View File

@@ -1,6 +1,5 @@
import secrets
from datetime import datetime, timezone
from enum import StrEnum
from sqlite3 import IntegrityError
from typing import Literal
@@ -10,14 +9,9 @@ from pydantic import BaseModel, Field
from sqlalchemy.sql import false, or_
from reflector.db import get_database, metadata
from reflector.schemas.platform import Platform
from reflector.utils import generate_uuid4
class VideoPlatform(StrEnum):
WHEREBY = "whereby"
JITSI = "jitsi"
rooms = sqlalchemy.Table(
"room",
metadata,
@@ -50,10 +44,21 @@ rooms = sqlalchemy.Table(
),
sqlalchemy.Column("webhook_url", sqlalchemy.String, nullable=True),
sqlalchemy.Column("webhook_secret", sqlalchemy.String, nullable=True),
sqlalchemy.Column("ics_url", sqlalchemy.Text),
sqlalchemy.Column("ics_fetch_interval", sqlalchemy.Integer, server_default="300"),
sqlalchemy.Column(
"platform", sqlalchemy.String, nullable=False, server_default="whereby"
"ics_enabled", sqlalchemy.Boolean, nullable=False, server_default=false()
),
sqlalchemy.Column("ics_last_sync", sqlalchemy.DateTime(timezone=True)),
sqlalchemy.Column("ics_last_etag", sqlalchemy.Text),
sqlalchemy.Column(
"platform",
sqlalchemy.String,
nullable=True,
server_default=None,
),
sqlalchemy.Index("idx_room_is_shared", "is_shared"),
sqlalchemy.Index("idx_room_ics_enabled", "ics_enabled"),
)
@@ -68,13 +73,18 @@ class Room(BaseModel):
is_locked: bool = False
room_mode: Literal["normal", "group"] = "normal"
recording_type: Literal["none", "local", "cloud"] = "cloud"
recording_trigger: Literal[
recording_trigger: Literal[ # whereby-specific
"none", "prompt", "automatic", "automatic-2nd-participant"
] = "automatic-2nd-participant"
is_shared: bool = False
webhook_url: str | None = None
webhook_secret: str | None = None
platform: VideoPlatform = VideoPlatform.WHEREBY
ics_url: str | None = None
ics_fetch_interval: int = 300
ics_enabled: bool = False
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform | None = None
class RoomController:
@@ -125,7 +135,10 @@ class RoomController:
is_shared: bool,
webhook_url: str = "",
webhook_secret: str = "",
platform: str = "whereby",
ics_url: str | None = None,
ics_fetch_interval: int = 300,
ics_enabled: bool = False,
platform: Platform | None = None,
):
"""
Add a new room
@@ -146,6 +159,9 @@ class RoomController:
is_shared=is_shared,
webhook_url=webhook_url,
webhook_secret=webhook_secret,
ics_url=ics_url,
ics_fetch_interval=ics_fetch_interval,
ics_enabled=ics_enabled,
platform=platform,
)
query = rooms.insert().values(**room.model_dump())
@@ -211,6 +227,13 @@ class RoomController:
return room
async def get_ics_enabled(self) -> list[Room]:
query = rooms.select().where(
rooms.c.ics_enabled == True, rooms.c.ics_url != None
)
results = await get_database().fetch_all(query)
return [Room(**result) for result in results]
async def remove_by_id(
self,
room_id: str,

View File

@@ -135,6 +135,8 @@ class SearchParameters(BaseModel):
user_id: str | None = None
room_id: str | None = None
source_kind: SourceKind | None = None
from_datetime: datetime | None = None
to_datetime: datetime | None = None
class SearchResultDB(BaseModel):
@@ -402,6 +404,14 @@ class SearchController:
base_query = base_query.where(
transcripts.c.source_kind == params.source_kind
)
if params.from_datetime:
base_query = base_query.where(
transcripts.c.created_at >= params.from_datetime
)
if params.to_datetime:
base_query = base_query.where(
transcripts.c.created_at <= params.to_datetime
)
if params.query_text is not None:
order_by = sqlalchemy.desc(sqlalchemy.text("rank"))

View File

@@ -21,7 +21,7 @@ from reflector.db.utils import is_postgresql
from reflector.logger import logger
from reflector.processors.types import Word as ProcessorWord
from reflector.settings import settings
from reflector.storage import get_recordings_storage, get_transcripts_storage
from reflector.storage import get_transcripts_storage
from reflector.utils import generate_uuid4
from reflector.utils.webvtt import topics_to_webvtt
@@ -186,6 +186,7 @@ class TranscriptParticipant(BaseModel):
id: str = Field(default_factory=generate_uuid4)
speaker: int | None
name: str
user_id: str | None = None
class Transcript(BaseModel):
@@ -623,7 +624,9 @@ class TranscriptController:
)
if recording:
try:
await get_recordings_storage().delete_file(recording.object_key)
await get_transcripts_storage().delete_file(
recording.object_key, bucket=recording.bucket_name
)
except Exception as e:
logger.warning(
"Failed to delete recording object from S3",
@@ -647,6 +650,19 @@ class TranscriptController:
query = transcripts.delete().where(transcripts.c.recording_id == recording_id)
await get_database().execute(query)
@staticmethod
def user_can_mutate(transcript: Transcript, user_id: str | None) -> bool:
"""
Returns True if the given user is allowed to modify the transcript.
Policy:
- Anonymous transcripts (user_id is None) cannot be modified via API
- Only the owner (matching user_id) can modify their transcript
"""
if transcript.user_id is None:
return False
return user_id and transcript.user_id == user_id
@asynccontextmanager
async def transaction(self):
"""
@@ -712,11 +728,13 @@ class TranscriptController:
"""
Download audio from storage
"""
transcript.audio_mp3_filename.write_bytes(
await get_transcripts_storage().get_file(
transcript.storage_audio_path,
)
)
storage = get_transcripts_storage()
try:
with open(transcript.audio_mp3_filename, "wb") as f:
await storage.stream_to_fileobj(transcript.storage_audio_path, f)
except Exception:
transcript.audio_mp3_filename.unlink(missing_ok=True)
raise
async def upsert_participant(
self,

View File

@@ -0,0 +1,91 @@
import hmac
import secrets
from datetime import datetime, timezone
from hashlib import sha256
import sqlalchemy
from pydantic import BaseModel, Field
from reflector.db import get_database, metadata
from reflector.settings import settings
from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString
user_api_keys = sqlalchemy.Table(
"user_api_key",
metadata,
sqlalchemy.Column("id", sqlalchemy.String, primary_key=True),
sqlalchemy.Column("user_id", sqlalchemy.String, nullable=False),
sqlalchemy.Column("key_hash", sqlalchemy.String, nullable=False),
sqlalchemy.Column("name", sqlalchemy.String, nullable=True),
sqlalchemy.Column("created_at", sqlalchemy.DateTime(timezone=True), nullable=False),
sqlalchemy.Index("idx_user_api_key_hash", "key_hash", unique=True),
sqlalchemy.Index("idx_user_api_key_user_id", "user_id"),
)
class UserApiKey(BaseModel):
id: NonEmptyString = Field(default_factory=generate_uuid4)
user_id: NonEmptyString
key_hash: NonEmptyString
name: NonEmptyString | None = None
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class UserApiKeyController:
@staticmethod
def generate_key() -> NonEmptyString:
return secrets.token_urlsafe(48)
@staticmethod
def hash_key(key: NonEmptyString) -> str:
return hmac.new(
settings.SECRET_KEY.encode(), key.encode(), digestmod=sha256
).hexdigest()
@classmethod
async def create_key(
cls,
user_id: NonEmptyString,
name: NonEmptyString | None = None,
) -> tuple[UserApiKey, NonEmptyString]:
plaintext = cls.generate_key()
api_key = UserApiKey(
user_id=user_id,
key_hash=cls.hash_key(plaintext),
name=name,
)
query = user_api_keys.insert().values(**api_key.model_dump())
await get_database().execute(query)
return api_key, plaintext
@classmethod
async def verify_key(cls, plaintext_key: NonEmptyString) -> UserApiKey | None:
key_hash = cls.hash_key(plaintext_key)
query = user_api_keys.select().where(
user_api_keys.c.key_hash == key_hash,
)
result = await get_database().fetch_one(query)
return UserApiKey(**result) if result else None
@staticmethod
async def list_by_user_id(user_id: NonEmptyString) -> list[UserApiKey]:
query = (
user_api_keys.select()
.where(user_api_keys.c.user_id == user_id)
.order_by(user_api_keys.c.created_at.desc())
)
results = await get_database().fetch_all(query)
return [UserApiKey(**r) for r in results]
@staticmethod
async def delete_key(key_id: NonEmptyString, user_id: NonEmptyString) -> bool:
query = user_api_keys.delete().where(
(user_api_keys.c.id == key_id) & (user_api_keys.c.user_id == user_id)
)
result = await get_database().execute(query)
# asyncpg returns None for DELETE, consider it success if no exception
return result is None or result > 0
user_api_keys_controller = UserApiKeyController()

View File

@@ -1,227 +0,0 @@
import json
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Union
from pydantic import BaseModel
from typing_extensions import TypedDict
class ParticipantInfo(BaseModel):
jid: str
nick: str
id: str
is_moderator: bool = False
class ParticipantLeftInfo(BaseModel):
jid: str
nick: Optional[str] = None
duration_seconds: Optional[int] = None
class RoomCreatedEvent(BaseModel):
type: Literal["room_created"]
timestamp: int
room_name: str
room_jid: str
meeting_url: str
class RecordingStartedEvent(BaseModel):
type: Literal["recording_started"]
timestamp: int
room_name: str
session_id: str
jibri_jid: str
class RecordingStoppedEvent(BaseModel):
type: Literal["recording_stopped"]
timestamp: int
room_name: str
session_id: str
meeting_url: str
class ParticipantJoinedEvent(BaseModel):
type: Literal["participant_joined"]
timestamp: int
room_name: str
participant: ParticipantInfo
class ParticipantLeftEvent(BaseModel):
type: Literal["participant_left"]
timestamp: int
room_name: str
participant: ParticipantLeftInfo
class SpeakerActiveEvent(BaseModel):
type: Literal["speaker_active"]
timestamp: int
room_name: str
speaker_jid: str
speaker_nick: str
duration: int
class DominantSpeakerChangedEvent(BaseModel):
type: Literal["dominant_speaker_changed"]
timestamp: int
room_name: str
previous: str
current: str
JitsiEvent = Union[
RoomCreatedEvent,
RecordingStartedEvent,
RecordingStoppedEvent,
ParticipantJoinedEvent,
ParticipantLeftEvent,
SpeakerActiveEvent,
DominantSpeakerChangedEvent,
]
class RoomInfo(TypedDict):
name: str
jid: str
created_at: int
meeting_url: str
recording_stopped_at: Optional[int]
class ParticipantData(TypedDict):
jid: str
nick: str
id: str
is_moderator: bool
joined_at: int
left_at: Optional[int]
duration: Optional[int]
events: List[str]
class SpeakerStats(TypedDict):
total_time: int
nick: str
class ParsedMetadata(TypedDict):
room: RoomInfo
participants: List[ParticipantData]
speaker_stats: Dict[str, SpeakerStats]
event_count: int
class JitsiEventParser:
def parse_event(self, event_data: Dict[str, Any]) -> Optional[JitsiEvent]:
event_type = event_data.get("type")
try:
if event_type == "room_created":
return RoomCreatedEvent(**event_data)
elif event_type == "recording_started":
return RecordingStartedEvent(**event_data)
elif event_type == "recording_stopped":
return RecordingStoppedEvent(**event_data)
elif event_type == "participant_joined":
return ParticipantJoinedEvent(**event_data)
elif event_type == "participant_left":
return ParticipantLeftEvent(**event_data)
elif event_type == "speaker_active":
return SpeakerActiveEvent(**event_data)
elif event_type == "dominant_speaker_changed":
return DominantSpeakerChangedEvent(**event_data)
else:
return None
except Exception:
return None
def parse_events_file(self, recording_path: str) -> ParsedMetadata:
events_file = Path(recording_path) / "events.jsonl"
room_info: RoomInfo = {
"name": "",
"jid": "",
"created_at": 0,
"meeting_url": "",
"recording_stopped_at": None,
}
if not events_file.exists():
return ParsedMetadata(
room=room_info, participants=[], speaker_stats={}, event_count=0
)
events: List[JitsiEvent] = []
participants: Dict[str, ParticipantData] = {}
speaker_stats: Dict[str, SpeakerStats] = {}
with open(events_file, "r") as f:
for line in f:
if not line.strip():
continue
try:
event_data = json.loads(line)
event = self.parse_event(event_data)
if event is None:
continue
events.append(event)
if isinstance(event, RoomCreatedEvent):
room_info = {
"name": event.room_name,
"jid": event.room_jid,
"created_at": event.timestamp,
"meeting_url": event.meeting_url,
"recording_stopped_at": None,
}
elif isinstance(event, ParticipantJoinedEvent):
participants[event.participant.id] = {
"jid": event.participant.jid,
"nick": event.participant.nick,
"id": event.participant.id,
"is_moderator": event.participant.is_moderator,
"joined_at": event.timestamp,
"left_at": None,
"duration": None,
"events": ["joined"],
}
elif isinstance(event, ParticipantLeftEvent):
participant_id = event.participant.jid.split("/")[0]
if participant_id in participants:
participants[participant_id]["left_at"] = event.timestamp
participants[participant_id]["duration"] = (
event.participant.duration_seconds
)
participants[participant_id]["events"].append("left")
elif isinstance(event, SpeakerActiveEvent):
if event.speaker_jid not in speaker_stats:
speaker_stats[event.speaker_jid] = {
"total_time": 0,
"nick": event.speaker_nick,
}
speaker_stats[event.speaker_jid]["total_time"] += event.duration
elif isinstance(event, RecordingStoppedEvent):
room_info["recording_stopped_at"] = event.timestamp
room_info["meeting_url"] = event.meeting_url
except (json.JSONDecodeError, Exception):
continue
return ParsedMetadata(
room=room_info,
participants=list(participants.values()),
speaker_stats=speaker_stats,
event_count=len(events),
)

View File

@@ -0,0 +1 @@
"""Pipeline modules for audio processing."""

View File

@@ -23,23 +23,18 @@ from reflector.db.transcripts import (
transcripts_controller,
)
from reflector.logger import logger
from reflector.pipelines import topic_processing
from reflector.pipelines.main_live_pipeline import (
PipelineMainBase,
broadcast_to_sockets,
task_cleanup_consent,
task_pipeline_post_to_zulip,
)
from reflector.processors import (
AudioFileWriterProcessor,
TranscriptFinalSummaryProcessor,
TranscriptFinalTitleProcessor,
TranscriptTopicDetectorProcessor,
)
from reflector.pipelines.transcription_helpers import transcribe_file_with_processor
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.processors.file_diarization import FileDiarizationInput
from reflector.processors.file_diarization_auto import FileDiarizationAutoProcessor
from reflector.processors.file_transcript import FileTranscriptInput
from reflector.processors.file_transcript_auto import FileTranscriptAutoProcessor
from reflector.processors.transcript_diarization_assembler import (
TranscriptDiarizationAssemblerInput,
TranscriptDiarizationAssemblerProcessor,
@@ -56,19 +51,6 @@ from reflector.storage import get_transcripts_storage
from reflector.worker.webhook import send_transcript_webhook
class EmptyPipeline:
"""Empty pipeline for processors that need a pipeline reference"""
def __init__(self, logger: structlog.BoundLogger):
self.logger = logger
def get_pref(self, k, d=None):
return d
async def emit(self, event):
pass
class PipelineMainFile(PipelineMainBase):
"""
Optimized file processing pipeline.
@@ -81,7 +63,7 @@ class PipelineMainFile(PipelineMainBase):
def __init__(self, transcript_id: str):
super().__init__(transcript_id=transcript_id)
self.logger = logger.bind(transcript_id=self.transcript_id)
self.empty_pipeline = EmptyPipeline(logger=self.logger)
self.empty_pipeline = topic_processing.EmptyPipeline(logger=self.logger)
def _handle_gather_exceptions(self, results: list, operation: str) -> None:
"""Handle exceptions from asyncio.gather with return_exceptions=True"""
@@ -131,7 +113,7 @@ class PipelineMainFile(PipelineMainBase):
self.logger.info("File pipeline complete")
await transcripts_controller.set_status(transcript.id, "ended")
await self.set_status(transcript.id, "ended")
async def extract_and_write_audio(
self, file_path: Path, transcript: Transcript
@@ -262,24 +244,7 @@ class PipelineMainFile(PipelineMainBase):
async def transcribe_file(self, audio_url: str, language: str) -> TranscriptType:
"""Transcribe complete file"""
processor = FileTranscriptAutoProcessor()
input_data = FileTranscriptInput(audio_url=audio_url, language=language)
# Store result for retrieval
result: TranscriptType | None = None
async def capture_result(transcript):
nonlocal result
result = transcript
processor.on(capture_result)
await processor.push(input_data)
await processor.flush()
if not result:
raise ValueError("No transcript captured")
return result
return await transcribe_file_with_processor(audio_url, language)
async def diarize_file(self, audio_url: str) -> list[DiarizationSegment] | None:
"""Get diarization for file"""
@@ -322,63 +287,31 @@ class PipelineMainFile(PipelineMainBase):
async def detect_topics(
self, transcript: TranscriptType, target_language: str
) -> list[TitleSummary]:
"""Detect topics from complete transcript"""
chunk_size = 300
topics: list[TitleSummary] = []
async def on_topic(topic: TitleSummary):
topics.append(topic)
return await self.on_topic(topic)
topic_detector = TranscriptTopicDetectorProcessor(callback=on_topic)
topic_detector.set_pipeline(self.empty_pipeline)
for i in range(0, len(transcript.words), chunk_size):
chunk_words = transcript.words[i : i + chunk_size]
if not chunk_words:
continue
chunk_transcript = TranscriptType(
words=chunk_words, translation=transcript.translation
)
await topic_detector.push(chunk_transcript)
await topic_detector.flush()
return topics
return await topic_processing.detect_topics(
transcript,
target_language,
on_topic_callback=self.on_topic,
empty_pipeline=self.empty_pipeline,
)
async def generate_title(self, topics: list[TitleSummary]):
"""Generate title from topics"""
if not topics:
self.logger.warning("No topics for title generation")
return
processor = TranscriptFinalTitleProcessor(callback=self.on_title)
processor.set_pipeline(self.empty_pipeline)
for topic in topics:
await processor.push(topic)
await processor.flush()
return await topic_processing.generate_title(
topics,
on_title_callback=self.on_title,
empty_pipeline=self.empty_pipeline,
logger=self.logger,
)
async def generate_summaries(self, topics: list[TitleSummary]):
"""Generate long and short summaries from topics"""
if not topics:
self.logger.warning("No topics for summary generation")
return
transcript = await self.get_transcript()
processor = TranscriptFinalSummaryProcessor(
transcript=transcript,
callback=self.on_long_summary,
on_short_summary=self.on_short_summary,
return await topic_processing.generate_summaries(
topics,
transcript,
on_long_summary_callback=self.on_long_summary,
on_short_summary_callback=self.on_short_summary,
empty_pipeline=self.empty_pipeline,
logger=self.logger,
)
processor.set_pipeline(self.empty_pipeline)
for topic in topics:
await processor.push(topic)
await processor.flush()
@shared_task
@@ -426,7 +359,12 @@ async def task_pipeline_file_process(*, transcript_id: str):
await pipeline.process(audio_file)
except Exception:
except Exception as e:
logger.error(
f"File pipeline failed for transcript {transcript_id}: {type(e).__name__}: {str(e)}",
exc_info=True,
transcript_id=transcript_id,
)
await pipeline.set_status(transcript_id, "error")
raise

View File

@@ -17,7 +17,6 @@ from contextlib import asynccontextmanager
from typing import Generic
import av
import boto3
from celery import chord, current_task, group, shared_task
from pydantic import BaseModel
from structlog import BoundLogger as Logger
@@ -85,6 +84,20 @@ def broadcast_to_sockets(func):
message=resp.model_dump(mode="json"),
)
transcript = await transcripts_controller.get_by_id(self.transcript_id)
if transcript and transcript.user_id:
# Emit only relevant events to the user room to avoid noisy updates.
# Allowed: STATUS, FINAL_TITLE, DURATION. All are prefixed with TRANSCRIPT_
allowed_user_events = {"STATUS", "FINAL_TITLE", "DURATION"}
if resp.event in allowed_user_events:
await self.ws_manager.send_json(
room_id=f"user:{transcript.user_id}",
message={
"event": f"TRANSCRIPT_{resp.event}",
"data": {"id": self.transcript_id, **resp.data},
},
)
return wrapper
@@ -570,6 +583,7 @@ async def cleanup_consent(transcript: Transcript, logger: Logger):
consent_denied = False
recording = None
meeting = None
try:
if transcript.recording_id:
recording = await recordings_controller.get_by_id(transcript.recording_id)
@@ -580,8 +594,8 @@ async def cleanup_consent(transcript: Transcript, logger: Logger):
meeting.id
)
except Exception as e:
logger.error(f"Failed to get fetch consent: {e}", exc_info=e)
consent_denied = True
logger.error(f"Failed to fetch consent: {e}", exc_info=e)
raise
if not consent_denied:
logger.info("Consent approved, keeping all files")
@@ -589,25 +603,24 @@ async def cleanup_consent(transcript: Transcript, logger: Logger):
logger.info("Consent denied, cleaning up all related audio files")
if recording and recording.bucket_name and recording.object_key:
s3_whereby = boto3.client(
"s3",
aws_access_key_id=settings.AWS_WHEREBY_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_WHEREBY_ACCESS_KEY_SECRET,
)
try:
s3_whereby.delete_object(
Bucket=recording.bucket_name, Key=recording.object_key
)
logger.info(
f"Deleted original Whereby recording: {recording.bucket_name}/{recording.object_key}"
)
except Exception as e:
logger.error(f"Failed to delete Whereby recording: {e}", exc_info=e)
deletion_errors = []
if recording and recording.bucket_name:
keys_to_delete = []
if recording.track_keys:
keys_to_delete = recording.track_keys
elif recording.object_key:
keys_to_delete = [recording.object_key]
master_storage = get_transcripts_storage()
for key in keys_to_delete:
try:
await master_storage.delete_file(key, bucket=recording.bucket_name)
logger.info(f"Deleted recording file: {recording.bucket_name}/{key}")
except Exception as e:
error_msg = f"Failed to delete {key}: {e}"
logger.error(error_msg, exc_info=e)
deletion_errors.append(error_msg)
# non-transactional, files marked for deletion not actually deleted is possible
await transcripts_controller.update(transcript, {"audio_deleted": True})
# 2. Delete processed audio from transcript storage S3 bucket
if transcript.audio_location == "storage":
storage = get_transcripts_storage()
try:
@@ -616,18 +629,28 @@ async def cleanup_consent(transcript: Transcript, logger: Logger):
f"Deleted processed audio from storage: {transcript.storage_audio_path}"
)
except Exception as e:
logger.error(f"Failed to delete processed audio: {e}", exc_info=e)
error_msg = f"Failed to delete processed audio: {e}"
logger.error(error_msg, exc_info=e)
deletion_errors.append(error_msg)
# 3. Delete local audio files
try:
if hasattr(transcript, "audio_mp3_filename") and transcript.audio_mp3_filename:
transcript.audio_mp3_filename.unlink(missing_ok=True)
if hasattr(transcript, "audio_wav_filename") and transcript.audio_wav_filename:
transcript.audio_wav_filename.unlink(missing_ok=True)
except Exception as e:
logger.error(f"Failed to delete local audio files: {e}", exc_info=e)
error_msg = f"Failed to delete local audio files: {e}"
logger.error(error_msg, exc_info=e)
deletion_errors.append(error_msg)
logger.info("Consent cleanup done")
if deletion_errors:
logger.warning(
f"Consent cleanup completed with {len(deletion_errors)} errors",
errors=deletion_errors,
)
else:
await transcripts_controller.update(transcript, {"audio_deleted": True})
logger.info("Consent cleanup done - all audio deleted")
@get_transcript

View File

@@ -0,0 +1,694 @@
import asyncio
import math
import tempfile
from fractions import Fraction
from pathlib import Path
import av
from av.audio.resampler import AudioResampler
from celery import chain, shared_task
from reflector.asynctask import asynctask
from reflector.db.transcripts import (
TranscriptStatus,
TranscriptWaveform,
transcripts_controller,
)
from reflector.logger import logger
from reflector.pipelines import topic_processing
from reflector.pipelines.main_file_pipeline import task_send_webhook_if_needed
from reflector.pipelines.main_live_pipeline import (
PipelineMainBase,
broadcast_to_sockets,
task_cleanup_consent,
task_pipeline_post_to_zulip,
)
from reflector.pipelines.transcription_helpers import transcribe_file_with_processor
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType
from reflector.storage import Storage, get_transcripts_storage
from reflector.utils.string import NonEmptyString
# Audio encoding constants
OPUS_STANDARD_SAMPLE_RATE = 48000
OPUS_DEFAULT_BIT_RATE = 128000
# Storage operation constants
PRESIGNED_URL_EXPIRATION_SECONDS = 7200 # 2 hours
class PipelineMainMultitrack(PipelineMainBase):
def __init__(self, transcript_id: str):
super().__init__(transcript_id=transcript_id)
self.logger = logger.bind(transcript_id=self.transcript_id)
self.empty_pipeline = topic_processing.EmptyPipeline(logger=self.logger)
async def pad_track_for_transcription(
self,
track_url: NonEmptyString,
track_idx: int,
storage: Storage,
) -> NonEmptyString:
"""
Pad a single track with silence based on stream metadata start_time.
Downloads from S3 presigned URL, processes via PyAV using tempfile, uploads to S3.
Returns presigned URL of padded track (or original URL if no padding needed).
Memory usage:
- Pattern: fixed_overhead(2-5MB) for PyAV codec/filters
- PyAV streams input efficiently (no full download, verified)
- Output written to tempfile (disk-based, not memory)
- Upload streams from file handle (boto3 chunks, typically 5-10MB)
Daily.co raw-tracks timing - Two approaches:
CURRENT APPROACH (PyAV metadata):
The WebM stream.start_time field encodes MEETING-RELATIVE timing:
- t=0: When Daily.co recording started (first participant joined)
- start_time=8.13s: This participant's track began 8.13s after recording started
- Purpose: Enables track alignment without external manifest files
This is NOT:
- Stream-internal offset (first packet timestamp relative to stream start)
- Absolute/wall-clock time
- Recording duration
ALTERNATIVE APPROACH (filename parsing):
Daily.co filenames contain Unix timestamps (milliseconds):
Format: {recording_start_ts}-{participant_id}-cam-audio-{track_start_ts}.webm
Example: 1760988935484-52f7f48b-fbab-431f-9a50-87b9abfc8255-cam-audio-1760988935922.webm
Can calculate offset: (track_start_ts - recording_start_ts) / 1000
- Track 0: (1760988935922 - 1760988935484) / 1000 = 0.438s
- Track 1: (1760988943823 - 1760988935484) / 1000 = 8.339s
TIME DIFFERENCE: PyAV metadata vs filename timestamps differ by ~209ms:
- Track 0: filename=438ms, metadata=229ms (diff: 209ms)
- Track 1: filename=8339ms, metadata=8130ms (diff: 209ms)
Consistent delta suggests network/encoding delay. PyAV metadata is ground truth
(represents when audio stream actually started vs when file upload initiated).
Example with 2 participants:
Track A: start_time=0.2s → Joined 200ms after recording began
Track B: start_time=8.1s → Joined 8.1 seconds later
After padding:
Track A: [0.2s silence] + [speech...]
Track B: [8.1s silence] + [speech...]
Whisper transcription timestamps are now synchronized:
Track A word at 5.0s → happened at meeting t=5.0s
Track B word at 10.0s → happened at meeting t=10.0s
Merging just sorts by timestamp - no offset calculation needed.
Padding coincidentally involves re-encoding. It's important when we work with Daily.co + Whisper.
This is because Daily.co returns recordings with skipped frames e.g. when microphone muted.
Daily.co doesn't understand those frames and ignores them, causing timestamp issues in transcription.
Re-encoding restores those frames. We do padding and re-encoding together just because it's convenient and more performant:
we need padded values for mix mp3 anyways
"""
transcript = await self.get_transcript()
try:
# PyAV streams input from S3 URL efficiently (2-5MB fixed overhead for codec/filters)
with av.open(track_url) as in_container:
start_time_seconds = self._extract_stream_start_time_from_container(
in_container, track_idx
)
if start_time_seconds <= 0:
self.logger.info(
f"Track {track_idx} requires no padding (start_time={start_time_seconds}s)",
track_idx=track_idx,
)
return track_url
# Use tempfile instead of BytesIO for better memory efficiency
# Reduces peak memory usage during encoding/upload
with tempfile.NamedTemporaryFile(
suffix=".webm", delete=False
) as temp_file:
temp_path = temp_file.name
try:
self._apply_audio_padding_to_file(
in_container, temp_path, start_time_seconds, track_idx
)
storage_path = (
f"file_pipeline/{transcript.id}/tracks/padded_{track_idx}.webm"
)
# Upload using file handle for streaming
with open(temp_path, "rb") as padded_file:
await storage.put_file(storage_path, padded_file)
finally:
# Clean up temp file
Path(temp_path).unlink(missing_ok=True)
padded_url = await storage.get_file_url(
storage_path,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
self.logger.info(
f"Successfully padded track {track_idx}",
track_idx=track_idx,
start_time_seconds=start_time_seconds,
padded_url=padded_url,
)
return padded_url
except Exception as e:
self.logger.error(
f"Failed to process track {track_idx}",
track_idx=track_idx,
url=track_url,
error=str(e),
exc_info=True,
)
raise Exception(
f"Track {track_idx} padding failed - transcript would have incorrect timestamps"
) from e
def _extract_stream_start_time_from_container(
self, container, track_idx: int
) -> float:
"""
Extract meeting-relative start time from WebM stream metadata.
Uses PyAV to read stream.start_time from WebM container.
More accurate than filename timestamps by ~209ms due to network/encoding delays.
"""
start_time_seconds = 0.0
try:
audio_streams = [s for s in container.streams if s.type == "audio"]
stream = audio_streams[0] if audio_streams else container.streams[0]
# 1) Try stream-level start_time (most reliable for Daily.co tracks)
if stream.start_time is not None and stream.time_base is not None:
start_time_seconds = float(stream.start_time * stream.time_base)
# 2) Fallback to container-level start_time (in av.time_base units)
if (start_time_seconds <= 0) and (container.start_time is not None):
start_time_seconds = float(container.start_time * av.time_base)
# 3) Fallback to first packet DTS in stream.time_base
if start_time_seconds <= 0:
for packet in container.demux(stream):
if packet.dts is not None:
start_time_seconds = float(packet.dts * stream.time_base)
break
except Exception as e:
self.logger.warning(
"PyAV metadata read failed; assuming 0 start_time",
track_idx=track_idx,
error=str(e),
)
start_time_seconds = 0.0
self.logger.info(
f"Track {track_idx} stream metadata: start_time={start_time_seconds:.3f}s",
track_idx=track_idx,
)
return start_time_seconds
def _apply_audio_padding_to_file(
self,
in_container,
output_path: str,
start_time_seconds: float,
track_idx: int,
) -> None:
"""Apply silence padding to audio track using PyAV filter graph, writing to file"""
delay_ms = math.floor(start_time_seconds * 1000)
self.logger.info(
f"Padding track {track_idx} with {delay_ms}ms delay using PyAV",
track_idx=track_idx,
delay_ms=delay_ms,
)
try:
with av.open(output_path, "w", format="webm") as out_container:
in_stream = next(
(s for s in in_container.streams if s.type == "audio"), None
)
if in_stream is None:
raise Exception("No audio stream in input")
out_stream = out_container.add_stream(
"libopus", rate=OPUS_STANDARD_SAMPLE_RATE
)
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
graph = av.filter.Graph()
abuf_args = (
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_fmt=s16:"
f"channel_layout=stereo"
)
src = graph.add("abuffer", args=abuf_args, name="src")
aresample_f = graph.add("aresample", args="async=1", name="ares")
# adelay requires one delay value per channel separated by '|'
delays_arg = f"{delay_ms}|{delay_ms}"
adelay_f = graph.add(
"adelay", args=f"delays={delays_arg}:all=1", name="delay"
)
sink = graph.add("abuffersink", name="sink")
src.link_to(aresample_f)
aresample_f.link_to(adelay_f)
adelay_f.link_to(sink)
graph.configure()
resampler = AudioResampler(
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
)
# Decode -> resample -> push through graph -> encode Opus
for frame in in_container.decode(in_stream):
out_frames = resampler.resample(frame) or []
for rframe in out_frames:
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
src.push(rframe)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
src.push(None)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
for packet in out_stream.encode(None):
out_container.mux(packet)
except Exception as e:
self.logger.error(
"PyAV padding failed for track",
track_idx=track_idx,
delay_ms=delay_ms,
error=str(e),
exc_info=True,
)
raise
async def mixdown_tracks(
self,
track_urls: list[str],
writer: AudioFileWriterProcessor,
offsets_seconds: list[float] | None = None,
) -> None:
"""Multi-track mixdown using PyAV filter graph (amix), reading from S3 presigned URLs"""
target_sample_rate: int | None = None
for url in track_urls:
if not url:
continue
container = None
try:
container = av.open(url)
for frame in container.decode(audio=0):
target_sample_rate = frame.sample_rate
break
except Exception:
continue
finally:
if container is not None:
container.close()
if target_sample_rate:
break
if not target_sample_rate:
self.logger.error("Mixdown failed - no decodable audio frames found")
raise Exception("Mixdown failed: No decodable audio frames in any track")
# Build PyAV filter graph:
# N abuffer (s32/stereo)
# -> optional adelay per input (for alignment)
# -> amix (s32)
# -> aformat(s16)
# -> sink
graph = av.filter.Graph()
inputs = []
valid_track_urls = [url for url in track_urls if url]
input_offsets_seconds = None
if offsets_seconds is not None:
input_offsets_seconds = [
offsets_seconds[i] for i, url in enumerate(track_urls) if url
]
for idx, url in enumerate(valid_track_urls):
args = (
f"time_base=1/{target_sample_rate}:"
f"sample_rate={target_sample_rate}:"
f"sample_fmt=s32:"
f"channel_layout=stereo"
)
in_ctx = graph.add("abuffer", args=args, name=f"in{idx}")
inputs.append(in_ctx)
if not inputs:
self.logger.error("Mixdown failed - no valid inputs for graph")
raise Exception("Mixdown failed: No valid inputs for filter graph")
mixer = graph.add("amix", args=f"inputs={len(inputs)}:normalize=0", name="mix")
fmt = graph.add(
"aformat",
args=(
f"sample_fmts=s32:channel_layouts=stereo:sample_rates={target_sample_rate}"
),
name="fmt",
)
sink = graph.add("abuffersink", name="out")
# Optional per-input delay before mixing
delays_ms: list[int] = []
if input_offsets_seconds is not None:
base = min(input_offsets_seconds) if input_offsets_seconds else 0.0
delays_ms = [
max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds
]
else:
delays_ms = [0 for _ in inputs]
for idx, in_ctx in enumerate(inputs):
delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0
if delay_ms > 0:
# adelay requires one value per channel; use same for stereo
adelay = graph.add(
"adelay",
args=f"delays={delay_ms}|{delay_ms}:all=1",
name=f"delay{idx}",
)
in_ctx.link_to(adelay)
adelay.link_to(mixer, 0, idx)
else:
in_ctx.link_to(mixer, 0, idx)
mixer.link_to(fmt)
fmt.link_to(sink)
graph.configure()
containers = []
try:
# Open all containers with cleanup guaranteed
for i, url in enumerate(valid_track_urls):
try:
c = av.open(url)
containers.append(c)
except Exception as e:
self.logger.warning(
"Mixdown: failed to open container from URL",
input=i,
url=url,
error=str(e),
)
if not containers:
self.logger.error("Mixdown failed - no valid containers opened")
raise Exception("Mixdown failed: Could not open any track containers")
decoders = [c.decode(audio=0) for c in containers]
active = [True] * len(decoders)
resamplers = [
AudioResampler(format="s32", layout="stereo", rate=target_sample_rate)
for _ in decoders
]
while any(active):
for i, (dec, is_active) in enumerate(zip(decoders, active)):
if not is_active:
continue
try:
frame = next(dec)
except StopIteration:
active[i] = False
continue
if frame.sample_rate != target_sample_rate:
continue
out_frames = resamplers[i].resample(frame) or []
for rf in out_frames:
rf.sample_rate = target_sample_rate
rf.time_base = Fraction(1, target_sample_rate)
inputs[i].push(rf)
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
for in_ctx in inputs:
in_ctx.push(None)
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
finally:
# Cleanup all containers, even if processing failed
for c in containers:
if c is not None:
try:
c.close()
except Exception:
pass # Best effort cleanup
@broadcast_to_sockets
async def set_status(self, transcript_id: str, status: TranscriptStatus):
async with self.lock_transaction():
return await transcripts_controller.set_status(transcript_id, status)
async def on_waveform(self, data):
async with self.transaction():
waveform = TranscriptWaveform(waveform=data)
transcript = await self.get_transcript()
return await transcripts_controller.append_event(
transcript=transcript, event="WAVEFORM", data=waveform
)
async def process(self, bucket_name: str, track_keys: list[str]):
transcript = await self.get_transcript()
async with self.transaction():
await transcripts_controller.update(
transcript,
{
"events": [],
"topics": [],
},
)
source_storage = get_transcripts_storage()
transcript_storage = source_storage
track_urls: list[str] = []
for key in track_keys:
url = await source_storage.get_file_url(
key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=bucket_name,
)
track_urls.append(url)
self.logger.info(
f"Generated presigned URL for track from {bucket_name}",
key=key,
)
created_padded_files = set()
padded_track_urls: list[str] = []
for idx, url in enumerate(track_urls):
padded_url = await self.pad_track_for_transcription(
url, idx, transcript_storage
)
padded_track_urls.append(padded_url)
if padded_url != url:
storage_path = f"file_pipeline/{transcript.id}/tracks/padded_{idx}.webm"
created_padded_files.add(storage_path)
self.logger.info(f"Track {idx} processed, padded URL: {padded_url}")
transcript.data_path.mkdir(parents=True, exist_ok=True)
mp3_writer = AudioFileWriterProcessor(
path=str(transcript.audio_mp3_filename),
on_duration=self.on_duration,
)
await self.mixdown_tracks(padded_track_urls, mp3_writer, offsets_seconds=None)
await mp3_writer.flush()
if not transcript.audio_mp3_filename.exists():
raise Exception(
"Mixdown failed - no MP3 file generated. Cannot proceed without playable audio."
)
storage_path = f"{transcript.id}/audio.mp3"
# Use file handle streaming to avoid loading entire MP3 into memory
mp3_size = transcript.audio_mp3_filename.stat().st_size
with open(transcript.audio_mp3_filename, "rb") as mp3_file:
await transcript_storage.put_file(storage_path, mp3_file)
mp3_url = await transcript_storage.get_file_url(storage_path)
await transcripts_controller.update(transcript, {"audio_location": "storage"})
self.logger.info(
f"Uploaded mixed audio to storage",
storage_path=storage_path,
size=mp3_size,
url=mp3_url,
)
self.logger.info("Generating waveform from mixed audio")
waveform_processor = AudioWaveformProcessor(
audio_path=transcript.audio_mp3_filename,
waveform_path=transcript.audio_waveform_filename,
on_waveform=self.on_waveform,
)
waveform_processor.set_pipeline(self.empty_pipeline)
await waveform_processor.flush()
self.logger.info("Waveform generated successfully")
speaker_transcripts: list[TranscriptType] = []
for idx, padded_url in enumerate(padded_track_urls):
if not padded_url:
continue
t = await self.transcribe_file(padded_url, transcript.source_language)
if not t.words:
continue
for w in t.words:
w.speaker = idx
speaker_transcripts.append(t)
self.logger.info(
f"Track {idx} transcribed successfully with {len(t.words)} words",
track_idx=idx,
)
valid_track_count = len([url for url in padded_track_urls if url])
if valid_track_count > 0 and len(speaker_transcripts) != valid_track_count:
raise Exception(
f"Only {len(speaker_transcripts)}/{valid_track_count} tracks transcribed successfully. "
f"All tracks must succeed to avoid incomplete transcripts."
)
if not speaker_transcripts:
raise Exception("No valid track transcriptions")
self.logger.info(f"Cleaning up {len(created_padded_files)} temporary S3 files")
cleanup_tasks = []
for storage_path in created_padded_files:
cleanup_tasks.append(transcript_storage.delete_file(storage_path))
if cleanup_tasks:
cleanup_results = await asyncio.gather(
*cleanup_tasks, return_exceptions=True
)
for storage_path, result in zip(created_padded_files, cleanup_results):
if isinstance(result, Exception):
self.logger.warning(
"Failed to cleanup temporary padded track",
storage_path=storage_path,
error=str(result),
)
merged_words = []
for t in speaker_transcripts:
merged_words.extend(t.words)
merged_words.sort(
key=lambda w: w.start if hasattr(w, "start") and w.start is not None else 0
)
merged_transcript = TranscriptType(words=merged_words, translation=None)
await self.on_transcript(merged_transcript)
topics = await self.detect_topics(merged_transcript, transcript.target_language)
await asyncio.gather(
self.generate_title(topics),
self.generate_summaries(topics),
return_exceptions=False,
)
await self.set_status(transcript.id, "ended")
async def transcribe_file(self, audio_url: str, language: str) -> TranscriptType:
return await transcribe_file_with_processor(audio_url, language)
async def detect_topics(
self, transcript: TranscriptType, target_language: str
) -> list[TitleSummary]:
return await topic_processing.detect_topics(
transcript,
target_language,
on_topic_callback=self.on_topic,
empty_pipeline=self.empty_pipeline,
)
async def generate_title(self, topics: list[TitleSummary]):
return await topic_processing.generate_title(
topics,
on_title_callback=self.on_title,
empty_pipeline=self.empty_pipeline,
logger=self.logger,
)
async def generate_summaries(self, topics: list[TitleSummary]):
transcript = await self.get_transcript()
return await topic_processing.generate_summaries(
topics,
transcript,
on_long_summary_callback=self.on_long_summary,
on_short_summary_callback=self.on_short_summary,
empty_pipeline=self.empty_pipeline,
logger=self.logger,
)
@shared_task
@asynctask
async def task_pipeline_multitrack_process(
*, transcript_id: str, bucket_name: str, track_keys: list[str]
):
pipeline = PipelineMainMultitrack(transcript_id=transcript_id)
try:
await pipeline.set_status(transcript_id, "processing")
await pipeline.process(bucket_name, track_keys)
except Exception:
await pipeline.set_status(transcript_id, "error")
raise
post_chain = chain(
task_cleanup_consent.si(transcript_id=transcript_id),
task_pipeline_post_to_zulip.si(transcript_id=transcript_id),
task_send_webhook_if_needed.si(transcript_id=transcript_id),
)
post_chain.delay()

View File

@@ -0,0 +1,109 @@
"""
Topic processing utilities
==========================
Shared topic detection, title generation, and summarization logic
used across file and multitrack pipelines.
"""
from typing import Callable
import structlog
from reflector.db.transcripts import Transcript
from reflector.processors import (
TranscriptFinalSummaryProcessor,
TranscriptFinalTitleProcessor,
TranscriptTopicDetectorProcessor,
)
from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType
class EmptyPipeline:
def __init__(self, logger: structlog.BoundLogger):
self.logger = logger
def get_pref(self, k, d=None):
return d
async def emit(self, event):
pass
async def detect_topics(
transcript: TranscriptType,
target_language: str,
*,
on_topic_callback: Callable,
empty_pipeline: EmptyPipeline,
) -> list[TitleSummary]:
chunk_size = 300
topics: list[TitleSummary] = []
async def on_topic(topic: TitleSummary):
topics.append(topic)
return await on_topic_callback(topic)
topic_detector = TranscriptTopicDetectorProcessor(callback=on_topic)
topic_detector.set_pipeline(empty_pipeline)
for i in range(0, len(transcript.words), chunk_size):
chunk_words = transcript.words[i : i + chunk_size]
if not chunk_words:
continue
chunk_transcript = TranscriptType(
words=chunk_words, translation=transcript.translation
)
await topic_detector.push(chunk_transcript)
await topic_detector.flush()
return topics
async def generate_title(
topics: list[TitleSummary],
*,
on_title_callback: Callable,
empty_pipeline: EmptyPipeline,
logger: structlog.BoundLogger,
):
if not topics:
logger.warning("No topics for title generation")
return
processor = TranscriptFinalTitleProcessor(callback=on_title_callback)
processor.set_pipeline(empty_pipeline)
for topic in topics:
await processor.push(topic)
await processor.flush()
async def generate_summaries(
topics: list[TitleSummary],
transcript: Transcript,
*,
on_long_summary_callback: Callable,
on_short_summary_callback: Callable,
empty_pipeline: EmptyPipeline,
logger: structlog.BoundLogger,
):
if not topics:
logger.warning("No topics for summary generation")
return
processor = TranscriptFinalSummaryProcessor(
transcript=transcript,
callback=on_long_summary_callback,
on_short_summary=on_short_summary_callback,
)
processor.set_pipeline(empty_pipeline)
for topic in topics:
await processor.push(topic)
await processor.flush()

View File

@@ -0,0 +1,34 @@
from reflector.processors.file_transcript import FileTranscriptInput
from reflector.processors.file_transcript_auto import FileTranscriptAutoProcessor
from reflector.processors.types import Transcript as TranscriptType
async def transcribe_file_with_processor(
audio_url: str,
language: str,
processor_name: str | None = None,
) -> TranscriptType:
processor = (
FileTranscriptAutoProcessor(name=processor_name)
if processor_name
else FileTranscriptAutoProcessor()
)
input_data = FileTranscriptInput(audio_url=audio_url, language=language)
result: TranscriptType | None = None
async def capture_result(transcript):
nonlocal result
result = transcript
processor.on(capture_result)
await processor.push(input_data)
await processor.flush()
if not result:
processor_label = processor_name or "default"
raise ValueError(
f"No transcript captured from {processor_label} processor for audio: {audio_url}"
)
return result

View File

@@ -56,6 +56,16 @@ class FileTranscriptModalProcessor(FileTranscriptProcessor):
},
follow_redirects=True,
)
if response.status_code != 200:
error_body = response.text
self.logger.error(
"Modal API error",
audio_url=data.audio_url,
status_code=response.status_code,
error_body=error_body,
)
response.raise_for_status()
result = response.json()

View File

@@ -165,6 +165,7 @@ class SummaryBuilder:
self.llm: LLM = llm
self.model_name: str = llm.model_name
self.logger = logger or structlog.get_logger()
self.participant_instructions: str | None = None
if filename:
self.read_transcript_from_file(filename)
@@ -191,14 +192,61 @@ class SummaryBuilder:
self, prompt: str, output_cls: Type[T], tone_name: str | None = None
) -> T:
"""Generic function to get structured output from LLM for non-function-calling models."""
# Add participant instructions to the prompt if available
enhanced_prompt = self._enhance_prompt_with_participants(prompt)
return await self.llm.get_structured_response(
prompt, [self.transcript], output_cls, tone_name=tone_name
enhanced_prompt, [self.transcript], output_cls, tone_name=tone_name
)
async def _get_response(
self, prompt: str, texts: list[str], tone_name: str | None = None
) -> str:
"""Get text response with automatic participant instructions injection."""
enhanced_prompt = self._enhance_prompt_with_participants(prompt)
return await self.llm.get_response(enhanced_prompt, texts, tone_name=tone_name)
def _enhance_prompt_with_participants(self, prompt: str) -> str:
"""Add participant instructions to any prompt if participants are known."""
if self.participant_instructions:
self.logger.debug("Adding participant instructions to prompt")
return f"{prompt}\n\n{self.participant_instructions}"
return prompt
# ----------------------------------------------------------------------------
# Participants
# ----------------------------------------------------------------------------
def set_known_participants(self, participants: list[str]) -> None:
"""
Set known participants directly without LLM identification.
This is used when participants are already identified and stored.
They are appended at the end of the transcript, providing more context for the assistant.
"""
if not participants:
self.logger.warning("No participants provided")
return
self.logger.info(
"Using known participants",
participants=participants,
)
participants_md = self.format_list_md(participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}"
# Set instructions that will be automatically added to all prompts
participants_list = ", ".join(participants)
self.participant_instructions = dedent(
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
async def identify_participants(self) -> None:
"""
From a transcript, try to identify the participants using TreeSummarize with structured output.
@@ -232,6 +280,19 @@ class SummaryBuilder:
if unique_participants:
participants_md = self.format_list_md(unique_participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}"
# Set instructions that will be automatically added to all prompts
participants_list = ", ".join(unique_participants)
self.participant_instructions = dedent(
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
else:
self.logger.warning("No participants identified in the transcript")
@@ -318,13 +379,13 @@ class SummaryBuilder:
for subject in self.subjects:
detailed_prompt = DETAILED_SUBJECT_PROMPT_TEMPLATE.format(subject=subject)
detailed_response = await self.llm.get_response(
detailed_response = await self._get_response(
detailed_prompt, [self.transcript], tone_name="Topic assistant"
)
paragraph_prompt = PARAGRAPH_SUMMARY_PROMPT
paragraph_response = await self.llm.get_response(
paragraph_response = await self._get_response(
paragraph_prompt, [str(detailed_response)], tone_name="Topic summarizer"
)
@@ -345,7 +406,7 @@ class SummaryBuilder:
recap_prompt = RECAP_PROMPT
recap_response = await self.llm.get_response(
recap_response = await self._get_response(
recap_prompt, [summaries_text], tone_name="Recap summarizer"
)

View File

@@ -26,7 +26,25 @@ class TranscriptFinalSummaryProcessor(Processor):
async def get_summary_builder(self, text) -> SummaryBuilder:
builder = SummaryBuilder(self.llm, logger=self.logger)
builder.set_transcript(text)
await builder.identify_participants()
# Use known participants if available, otherwise identify them
if self.transcript and self.transcript.participants:
# Extract participant names from the stored participants
participant_names = [p.name for p in self.transcript.participants if p.name]
if participant_names:
self.logger.info(
f"Using {len(participant_names)} known participants from transcript"
)
builder.set_known_participants(participant_names)
else:
self.logger.info(
"Participants field exists but is empty, identifying participants"
)
await builder.identify_participants()
else:
self.logger.info("No participants stored, identifying participants")
await builder.identify_participants()
await builder.generate_summary()
return builder
@@ -49,18 +67,30 @@ class TranscriptFinalSummaryProcessor(Processor):
speakermap = {}
if self.transcript:
speakermap = {
participant["speaker"]: participant["name"]
for participant in self.transcript.participants
p.speaker: p.name
for p in (self.transcript.participants or [])
if p.speaker is not None and p.name
}
self.logger.info(
f"Built speaker map with {len(speakermap)} participants",
speakermap=speakermap,
)
# build the transcript as a single string
# XXX: unsure if the participants name as replaced directly in speaker ?
# Replace speaker IDs with actual participant names if available
text_transcript = []
unique_speakers = set()
for topic in self.chunks:
for segment in topic.transcript.as_segments():
name = speakermap.get(segment.speaker, f"Speaker {segment.speaker}")
unique_speakers.add((segment.speaker, name))
text_transcript.append(f"{name}: {segment.text}")
self.logger.info(
f"Built transcript with {len(unique_speakers)} unique speakers",
speakers=list(unique_speakers),
)
text_transcript = "\n".join(text_transcript)
last_chunk = self.chunks[-1]

View File

@@ -1,6 +1,6 @@
from textwrap import dedent
from pydantic import BaseModel, Field
from pydantic import AliasChoices, BaseModel, Field
from reflector.llm import LLM
from reflector.processors.base import Processor
@@ -34,8 +34,14 @@ TOPIC_PROMPT = dedent(
class TopicResponse(BaseModel):
"""Structured response for topic detection"""
title: str = Field(description="A descriptive title for the topic being discussed")
summary: str = Field(description="A concise 1-2 sentence summary of the discussion")
title: str = Field(
description="A descriptive title for the topic being discussed",
validation_alias=AliasChoices("title", "Title"),
)
summary: str = Field(
description="A concise 1-2 sentence summary of the discussion",
validation_alias=AliasChoices("summary", "Summary"),
)
class TranscriptTopicDetectorProcessor(Processor):

View File

@@ -4,11 +4,8 @@ import tempfile
from pathlib import Path
from typing import Annotated, TypedDict
from profanityfilter import ProfanityFilter
from pydantic import BaseModel, Field, PrivateAttr
from reflector.redis_cache import redis_cache
class DiarizationSegment(TypedDict):
"""Type definition for diarization segment containing speaker information"""
@@ -20,9 +17,6 @@ class DiarizationSegment(TypedDict):
PUNC_RE = re.compile(r"[.;:?!…]")
profanity_filter = ProfanityFilter()
profanity_filter.set_censor("*")
class AudioFile(BaseModel):
name: str
@@ -124,21 +118,11 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]:
class Transcript(BaseModel):
translation: str | None = None
words: list[Word] = None
@property
def raw_text(self):
# Uncensored text
return "".join([word.text for word in self.words])
@redis_cache(prefix="profanity", duration=3600 * 24 * 7)
def _get_censored_text(self, text: str):
return profanity_filter.censor(text).strip()
words: list[Word] = []
@property
def text(self):
# Censored text
return self._get_censored_text(self.raw_text)
return "".join([word.text for word in self.words])
@property
def human_timestamp(self):
@@ -170,12 +154,6 @@ class Transcript(BaseModel):
word.start += offset
word.end += offset
def clone(self):
words = [
Word(text=word.text, start=word.start, end=word.end) for word in self.words
]
return Transcript(text=self.text, translation=self.translation, words=words)
def as_segments(self) -> list[TranscriptSegment]:
return words_to_segments(self.words)

View File

@@ -1,10 +1,17 @@
import asyncio
import functools
import json
from typing import Optional
import redis
import redis.asyncio as redis_async
import structlog
from redis.exceptions import LockError
from reflector.settings import settings
logger = structlog.get_logger(__name__)
redis_clients = {}
@@ -21,6 +28,12 @@ def get_redis_client(db=0):
return redis_clients[db]
async def get_async_redis_client(db: int = 0):
return await redis_async.from_url(
f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{db}"
)
def redis_cache(prefix="cache", duration=3600, db=settings.REDIS_CACHE_DB, argidx=1):
"""
Cache the result of a function in Redis.
@@ -49,3 +62,87 @@ def redis_cache(prefix="cache", duration=3600, db=settings.REDIS_CACHE_DB, argid
return wrapper
return decorator
class RedisAsyncLock:
def __init__(
self,
key: str,
timeout: int = 120,
extend_interval: int = 30,
skip_if_locked: bool = False,
blocking: bool = True,
blocking_timeout: Optional[float] = None,
):
self.key = f"async_lock:{key}"
self.timeout = timeout
self.extend_interval = extend_interval
self.skip_if_locked = skip_if_locked
self.blocking = blocking
self.blocking_timeout = blocking_timeout
self._lock = None
self._redis = None
self._extend_task = None
self._acquired = False
async def _extend_lock_periodically(self):
while True:
try:
await asyncio.sleep(self.extend_interval)
if self._lock:
await self._lock.extend(self.timeout, replace_ttl=True)
logger.debug("Extended lock", key=self.key)
except LockError:
logger.warning("Failed to extend lock", key=self.key)
break
except asyncio.CancelledError:
break
except Exception as e:
logger.error("Error extending lock", key=self.key, error=str(e))
break
async def __aenter__(self):
self._redis = await get_async_redis_client()
self._lock = self._redis.lock(
self.key,
timeout=self.timeout,
blocking=self.blocking,
blocking_timeout=self.blocking_timeout,
)
self._acquired = await self._lock.acquire()
if not self._acquired:
if self.skip_if_locked:
logger.warning(
"Lock already acquired by another process, skipping", key=self.key
)
return self
else:
raise LockError(f"Failed to acquire lock: {self.key}")
self._extend_task = asyncio.create_task(self._extend_lock_periodically())
logger.info("Acquired lock", key=self.key)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._extend_task:
self._extend_task.cancel()
try:
await self._extend_task
except asyncio.CancelledError:
pass
if self._acquired and self._lock:
try:
await self._lock.release()
logger.info("Released lock", key=self.key)
except LockError:
logger.debug("Lock already released or expired", key=self.key)
if self._redis:
await self._redis.aclose()
@property
def acquired(self) -> bool:
return self._acquired

View File

@@ -0,0 +1,5 @@
from typing import Literal
Platform = Literal["whereby", "daily"]
WHEREBY_PLATFORM: Platform = "whereby"
DAILY_PLATFORM: Platform = "daily"

View File

@@ -0,0 +1,408 @@
"""
ICS Calendar Synchronization Service
This module provides services for fetching, parsing, and synchronizing ICS (iCalendar)
calendar feeds with room booking data in the database.
Key Components:
- ICSFetchService: Handles HTTP fetching and parsing of ICS calendar data
- ICSSyncService: Manages the synchronization process between ICS feeds and database
Example Usage:
# Sync a room's calendar
room = Room(id="room1", name="conference-room", ics_url="https://cal.example.com/room.ics")
result = await ics_sync_service.sync_room_calendar(room)
# Result structure:
{
"status": "success", # success|unchanged|error|skipped
"hash": "abc123...", # MD5 hash of ICS content
"events_found": 5, # Events matching this room
"total_events": 12, # Total events in calendar within time window
"events_created": 2, # New events added to database
"events_updated": 3, # Existing events modified
"events_deleted": 1 # Events soft-deleted (no longer in calendar)
}
Event Matching:
Events are matched to rooms by checking if the room's full URL appears in the
event's LOCATION or DESCRIPTION fields. Only events within a 25-hour window
(1 hour ago to 24 hours from now) are processed.
Input: ICS calendar URL (e.g., "https://calendar.google.com/calendar/ical/...")
Output: EventData objects with structured calendar information:
{
"ics_uid": "event123@google.com",
"title": "Team Meeting",
"description": "Weekly sync meeting",
"location": "https://meet.company.com/conference-room",
"start_time": datetime(2024, 1, 15, 14, 0, tzinfo=UTC),
"end_time": datetime(2024, 1, 15, 15, 0, tzinfo=UTC),
"attendees": [
{"email": "user@company.com", "name": "John Doe", "role": "ORGANIZER"},
{"email": "attendee@company.com", "name": "Jane Smith", "status": "ACCEPTED"}
],
"ics_raw_data": "BEGIN:VEVENT\nUID:event123@google.com\n..."
}
"""
import hashlib
from datetime import date, datetime, timedelta, timezone
from enum import Enum
from typing import TypedDict
import httpx
import pytz
import structlog
from icalendar import Calendar, Event
from reflector.db.calendar_events import CalendarEvent, calendar_events_controller
from reflector.db.rooms import Room, rooms_controller
from reflector.redis_cache import RedisAsyncLock
from reflector.settings import settings
logger = structlog.get_logger()
EVENT_WINDOW_DELTA_START = timedelta(hours=-1)
EVENT_WINDOW_DELTA_END = timedelta(hours=24)
class SyncStatus(str, Enum):
SUCCESS = "success"
UNCHANGED = "unchanged"
ERROR = "error"
SKIPPED = "skipped"
class AttendeeData(TypedDict, total=False):
email: str | None
name: str | None
status: str | None
role: str | None
class EventData(TypedDict):
ics_uid: str
title: str | None
description: str | None
location: str | None
start_time: datetime
end_time: datetime
attendees: list[AttendeeData]
ics_raw_data: str
class SyncStats(TypedDict):
events_created: int
events_updated: int
events_deleted: int
class SyncResultBase(TypedDict):
status: SyncStatus
class SyncResult(SyncResultBase, total=False):
hash: str | None
events_found: int
total_events: int
events_created: int
events_updated: int
events_deleted: int
error: str | None
reason: str | None
class ICSFetchService:
def __init__(self):
self.client = httpx.AsyncClient(
timeout=30.0, headers={"User-Agent": "Reflector/1.0"}
)
async def fetch_ics(self, url: str) -> str:
response = await self.client.get(url)
response.raise_for_status()
return response.text
def parse_ics(self, ics_content: str) -> Calendar:
return Calendar.from_ical(ics_content)
def extract_room_events(
self, calendar: Calendar, room_name: str, room_url: str
) -> tuple[list[EventData], int]:
events = []
total_events = 0
now = datetime.now(timezone.utc)
window_start = now + EVENT_WINDOW_DELTA_START
window_end = now + EVENT_WINDOW_DELTA_END
for component in calendar.walk():
if component.name != "VEVENT":
continue
status = component.get("STATUS", "").upper()
if status == "CANCELLED":
continue
# Count total non-cancelled events in the time window
event_data = self._parse_event(component)
if event_data and window_start <= event_data["start_time"] <= window_end:
total_events += 1
# Check if event matches this room
if self._event_matches_room(component, room_name, room_url):
events.append(event_data)
return events, total_events
def _event_matches_room(self, event: Event, room_name: str, room_url: str) -> bool:
location = str(event.get("LOCATION", ""))
description = str(event.get("DESCRIPTION", ""))
# Only match full room URL
# XXX leaved here as a patterns, to later be extended with tinyurl or such too
patterns = [
room_url,
]
# Check location and description for patterns
text_to_check = f"{location} {description}".lower()
for pattern in patterns:
if pattern.lower() in text_to_check:
return True
return False
def _parse_event(self, event: Event) -> EventData | None:
uid = str(event.get("UID", ""))
summary = str(event.get("SUMMARY", ""))
description = str(event.get("DESCRIPTION", ""))
location = str(event.get("LOCATION", ""))
dtstart = event.get("DTSTART")
dtend = event.get("DTEND")
if not dtstart:
return None
# Convert fields
start_time = self._normalize_datetime(
dtstart.dt if hasattr(dtstart, "dt") else dtstart
)
end_time = (
self._normalize_datetime(dtend.dt if hasattr(dtend, "dt") else dtend)
if dtend
else start_time + timedelta(hours=1)
)
attendees = self._parse_attendees(event)
# Get raw event data for storage
raw_data = event.to_ical().decode("utf-8")
return {
"ics_uid": uid,
"title": summary,
"description": description,
"location": location,
"start_time": start_time,
"end_time": end_time,
"attendees": attendees,
"ics_raw_data": raw_data,
}
def _normalize_datetime(self, dt) -> datetime:
# Ensure datetime is with timezone, if not, assume UTC
if isinstance(dt, date) and not isinstance(dt, datetime):
dt = datetime.combine(dt, datetime.min.time())
dt = pytz.UTC.localize(dt)
elif isinstance(dt, datetime):
if dt.tzinfo is None:
dt = pytz.UTC.localize(dt)
else:
dt = dt.astimezone(pytz.UTC)
return dt
def _parse_attendees(self, event: Event) -> list[AttendeeData]:
# Extracts attendee information from both ATTENDEE and ORGANIZER properties.
# Handles malformed comma-separated email addresses in single ATTENDEE fields
# by splitting them into separate attendee entries. Returns a list of attendee
# data including email, name, status, and role information.
final_attendees = []
attendees = event.get("ATTENDEE", [])
if not isinstance(attendees, list):
attendees = [attendees]
for att in attendees:
email_str = str(att).replace("mailto:", "") if att else None
# Handle malformed comma-separated email addresses in a single ATTENDEE field
if email_str and "," in email_str:
# Split comma-separated emails and create separate attendee entries
email_parts = [email.strip() for email in email_str.split(",")]
for email in email_parts:
if email and "@" in email:
clean_email = email.replace("MAILTO:", "").replace(
"mailto:", ""
)
att_data: AttendeeData = {
"email": clean_email,
"name": att.params.get("CN")
if hasattr(att, "params") and email == email_parts[0]
else None,
"status": att.params.get("PARTSTAT")
if hasattr(att, "params") and email == email_parts[0]
else None,
"role": att.params.get("ROLE")
if hasattr(att, "params") and email == email_parts[0]
else None,
}
final_attendees.append(att_data)
else:
# Normal single attendee
att_data: AttendeeData = {
"email": email_str,
"name": att.params.get("CN") if hasattr(att, "params") else None,
"status": att.params.get("PARTSTAT")
if hasattr(att, "params")
else None,
"role": att.params.get("ROLE") if hasattr(att, "params") else None,
}
final_attendees.append(att_data)
# Add organizer
organizer = event.get("ORGANIZER")
if organizer:
org_email = (
str(organizer).replace("mailto:", "").replace("MAILTO:", "")
if organizer
else None
)
org_data: AttendeeData = {
"email": org_email,
"name": organizer.params.get("CN")
if hasattr(organizer, "params")
else None,
"role": "ORGANIZER",
}
final_attendees.append(org_data)
return final_attendees
class ICSSyncService:
def __init__(self):
self.fetch_service = ICSFetchService()
async def sync_room_calendar(self, room: Room) -> SyncResult:
async with RedisAsyncLock(
f"ics_sync_room:{room.id}", skip_if_locked=True
) as lock:
if not lock.acquired:
logger.warning("ICS sync already in progress for room", room_id=room.id)
return {
"status": SyncStatus.SKIPPED,
"reason": "Sync already in progress",
}
return await self._sync_room_calendar(room)
async def _sync_room_calendar(self, room: Room) -> SyncResult:
if not room.ics_enabled or not room.ics_url:
return {"status": SyncStatus.SKIPPED, "reason": "ICS not configured"}
try:
if not self._should_sync(room):
return {"status": SyncStatus.SKIPPED, "reason": "Not time to sync yet"}
ics_content = await self.fetch_service.fetch_ics(room.ics_url)
calendar = self.fetch_service.parse_ics(ics_content)
content_hash = hashlib.md5(ics_content.encode()).hexdigest()
if room.ics_last_etag == content_hash:
logger.info("No changes in ICS for room", room_id=room.id)
room_url = f"{settings.UI_BASE_URL}/{room.name}"
events, total_events = self.fetch_service.extract_room_events(
calendar, room.name, room_url
)
return {
"status": SyncStatus.UNCHANGED,
"hash": content_hash,
"events_found": len(events),
"total_events": total_events,
"events_created": 0,
"events_updated": 0,
"events_deleted": 0,
}
# Extract matching events
room_url = f"{settings.UI_BASE_URL}/{room.name}"
events, total_events = self.fetch_service.extract_room_events(
calendar, room.name, room_url
)
sync_result = await self._sync_events_to_database(room.id, events)
# Update room sync metadata
await rooms_controller.update(
room,
{
"ics_last_sync": datetime.now(timezone.utc),
"ics_last_etag": content_hash,
},
mutate=False,
)
return {
"status": SyncStatus.SUCCESS,
"hash": content_hash,
"events_found": len(events),
"total_events": total_events,
**sync_result,
}
except Exception as e:
logger.error("Failed to sync ICS for room", room_id=room.id, error=str(e))
return {"status": SyncStatus.ERROR, "error": str(e)}
def _should_sync(self, room: Room) -> bool:
if not room.ics_last_sync:
return True
time_since_sync = datetime.now(timezone.utc) - room.ics_last_sync
return time_since_sync.total_seconds() >= room.ics_fetch_interval
async def _sync_events_to_database(
self, room_id: str, events: list[EventData]
) -> SyncStats:
created = 0
updated = 0
current_ics_uids = []
for event_data in events:
calendar_event = CalendarEvent(room_id=room_id, **event_data)
existing = await calendar_events_controller.get_by_ics_uid(
room_id, event_data["ics_uid"]
)
if existing:
updated += 1
else:
created += 1
await calendar_events_controller.upsert(calendar_event)
current_ics_uids.append(event_data["ics_uid"])
# Soft delete events that are no longer in calendar
deleted = await calendar_events_controller.soft_delete_missing(
room_id, current_ics_uids
)
return {
"events_created": created,
"events_updated": updated,
"events_deleted": deleted,
}
ics_sync_service = ICSSyncService()

View File

@@ -1,6 +1,7 @@
from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString
@@ -47,14 +48,17 @@ class Settings(BaseSettings):
TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID: str | None = None
TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY: str | None = None
# Recording storage
RECORDING_STORAGE_BACKEND: str | None = None
# Platform-specific recording storage (follows {PREFIX}_STORAGE_AWS_{CREDENTIAL} pattern)
# Whereby storage configuration
WHEREBY_STORAGE_AWS_BUCKET_NAME: str | None = None
WHEREBY_STORAGE_AWS_REGION: str | None = None
WHEREBY_STORAGE_AWS_ACCESS_KEY_ID: str | None = None
WHEREBY_STORAGE_AWS_SECRET_ACCESS_KEY: str | None = None
# Recording storage configuration for AWS
RECORDING_STORAGE_AWS_BUCKET_NAME: str = "recording-bucket"
RECORDING_STORAGE_AWS_REGION: str = "us-east-1"
RECORDING_STORAGE_AWS_ACCESS_KEY_ID: str | None = None
RECORDING_STORAGE_AWS_SECRET_ACCESS_KEY: str | None = None
# Daily.co storage configuration
DAILYCO_STORAGE_AWS_BUCKET_NAME: str | None = None
DAILYCO_STORAGE_AWS_REGION: str | None = None
DAILYCO_STORAGE_AWS_ROLE_ARN: str | None = None
# Translate into the target language
TRANSLATION_BACKEND: str = "passthrough"
@@ -123,22 +127,20 @@ class Settings(BaseSettings):
# Whereby integration
WHEREBY_API_URL: str = "https://api.whereby.dev/v1"
WHEREBY_API_KEY: NonEmptyString | None = None
# Jibri integration
JIBRI_RECORDINGS_PATH: str = "/recordings"
WHEREBY_WEBHOOK_SECRET: str | None = None
AWS_WHEREBY_ACCESS_KEY_ID: str | None = None
AWS_WHEREBY_ACCESS_KEY_SECRET: str | None = None
AWS_PROCESS_RECORDING_QUEUE_URL: str | None = None
SQS_POLLING_TIMEOUT_SECONDS: int = 60
# Jitsi Meet
JITSI_DOMAIN: str = "meet.jit.si"
JITSI_JWT_SECRET: str | None = None
JITSI_WEBHOOK_SECRET: str | None = None
JITSI_APP_ID: str = "reflector"
JITSI_JWT_ISSUER: str = "reflector"
JITSI_JWT_AUDIENCE: str = "jitsi"
# Daily.co integration
DAILY_API_KEY: str | None = None
DAILY_WEBHOOK_SECRET: str | None = None
DAILY_SUBDOMAIN: str | None = None
DAILY_WEBHOOK_UUID: str | None = (
None # Webhook UUID for this environment. Not used by production code
)
# Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM
# Zulip integration
ZULIP_REALM: str | None = None

View File

@@ -3,6 +3,13 @@ from reflector.settings import settings
def get_transcripts_storage() -> Storage:
"""
Get storage for processed transcript files (master credentials).
Also use this for ALL our file operations with bucket override:
master = get_transcripts_storage()
master.delete_file(key, bucket=recording.bucket_name)
"""
assert settings.TRANSCRIPT_STORAGE_BACKEND
return Storage.get_instance(
name=settings.TRANSCRIPT_STORAGE_BACKEND,
@@ -10,8 +17,53 @@ def get_transcripts_storage() -> Storage:
)
def get_recordings_storage() -> Storage:
def get_whereby_storage() -> Storage:
"""
Get storage config for Whereby (for passing to Whereby API).
Usage:
whereby_storage = get_whereby_storage()
key_id, secret = whereby_storage.key_credentials
whereby_api.create_meeting(
bucket=whereby_storage.bucket_name,
access_key_id=key_id,
secret=secret,
)
Do NOT use for our file operations - use get_transcripts_storage() instead.
"""
if not settings.WHEREBY_STORAGE_AWS_BUCKET_NAME:
raise ValueError(
"WHEREBY_STORAGE_AWS_BUCKET_NAME required for Whereby with AWS storage"
)
return Storage.get_instance(
name=settings.RECORDING_STORAGE_BACKEND,
settings_prefix="RECORDING_STORAGE_",
name="aws",
settings_prefix="WHEREBY_STORAGE_",
)
def get_dailyco_storage() -> Storage:
"""
Get storage config for Daily.co (for passing to Daily API).
Usage:
daily_storage = get_dailyco_storage()
daily_api.create_meeting(
bucket=daily_storage.bucket_name,
region=daily_storage.region,
role_arn=daily_storage.role_credential,
)
Do NOT use for our file operations - use get_transcripts_storage() instead.
"""
# Fail fast if platform-specific config missing
if not settings.DAILYCO_STORAGE_AWS_BUCKET_NAME:
raise ValueError(
"DAILYCO_STORAGE_AWS_BUCKET_NAME required for Daily.co with AWS storage"
)
return Storage.get_instance(
name="aws",
settings_prefix="DAILYCO_STORAGE_",
)

View File

@@ -1,10 +1,23 @@
import importlib
from typing import BinaryIO, Union
from pydantic import BaseModel
from reflector.settings import settings
class StorageError(Exception):
"""Base exception for storage operations."""
pass
class StoragePermissionError(StorageError):
"""Exception raised when storage operation fails due to permission issues."""
pass
class FileResult(BaseModel):
filename: str
url: str
@@ -36,26 +49,113 @@ class Storage:
return cls._registry[name](**config)
async def put_file(self, filename: str, data: bytes) -> FileResult:
return await self._put_file(filename, data)
async def _put_file(self, filename: str, data: bytes) -> FileResult:
# Credential properties for API passthrough
@property
def bucket_name(self) -> str:
"""Default bucket name for this storage instance."""
raise NotImplementedError
async def delete_file(self, filename: str):
return await self._delete_file(filename)
async def _delete_file(self, filename: str):
@property
def region(self) -> str:
"""AWS region for this storage instance."""
raise NotImplementedError
async def get_file_url(self, filename: str) -> str:
return await self._get_file_url(filename)
@property
def access_key_id(self) -> str | None:
"""AWS access key ID (None for role-based auth). Prefer key_credentials property."""
return None
async def _get_file_url(self, filename: str) -> str:
@property
def secret_access_key(self) -> str | None:
"""AWS secret access key (None for role-based auth). Prefer key_credentials property."""
return None
@property
def role_arn(self) -> str | None:
"""AWS IAM role ARN for role-based auth (None for key-based auth). Prefer role_credential property."""
return None
@property
def key_credentials(self) -> tuple[str, str]:
"""
Get (access_key_id, secret_access_key) for key-based auth.
Raises ValueError if storage uses IAM role instead.
"""
raise NotImplementedError
async def get_file(self, filename: str):
return await self._get_file(filename)
async def _get_file(self, filename: str):
@property
def role_credential(self) -> str:
"""
Get IAM role ARN for role-based auth.
Raises ValueError if storage uses access keys instead.
"""
raise NotImplementedError
async def put_file(
self, filename: str, data: Union[bytes, BinaryIO], *, bucket: str | None = None
) -> FileResult:
"""Upload data. bucket: override instance default if provided."""
return await self._put_file(filename, data, bucket=bucket)
async def _put_file(
self, filename: str, data: Union[bytes, BinaryIO], *, bucket: str | None = None
) -> FileResult:
raise NotImplementedError
async def delete_file(self, filename: str, *, bucket: str | None = None):
"""Delete file. bucket: override instance default if provided."""
return await self._delete_file(filename, bucket=bucket)
async def _delete_file(self, filename: str, *, bucket: str | None = None):
raise NotImplementedError
async def get_file_url(
self,
filename: str,
operation: str = "get_object",
expires_in: int = 3600,
*,
bucket: str | None = None,
) -> str:
"""Generate presigned URL. bucket: override instance default if provided."""
return await self._get_file_url(filename, operation, expires_in, bucket=bucket)
async def _get_file_url(
self,
filename: str,
operation: str = "get_object",
expires_in: int = 3600,
*,
bucket: str | None = None,
) -> str:
raise NotImplementedError
async def get_file(self, filename: str, *, bucket: str | None = None):
"""Download file. bucket: override instance default if provided."""
return await self._get_file(filename, bucket=bucket)
async def _get_file(self, filename: str, *, bucket: str | None = None):
raise NotImplementedError
async def list_objects(
self, prefix: str = "", *, bucket: str | None = None
) -> list[str]:
"""List object keys. bucket: override instance default if provided."""
return await self._list_objects(prefix, bucket=bucket)
async def _list_objects(
self, prefix: str = "", *, bucket: str | None = None
) -> list[str]:
raise NotImplementedError
async def stream_to_fileobj(
self, filename: str, fileobj: BinaryIO, *, bucket: str | None = None
):
"""Stream file directly to file object without loading into memory.
bucket: override instance default if provided."""
return await self._stream_to_fileobj(filename, fileobj, bucket=bucket)
async def _stream_to_fileobj(
self, filename: str, fileobj: BinaryIO, *, bucket: str | None = None
):
raise NotImplementedError

View File

@@ -1,79 +1,236 @@
from functools import wraps
from typing import BinaryIO, Union
import aioboto3
from botocore.config import Config
from botocore.exceptions import ClientError
from reflector.logger import logger
from reflector.storage.base import FileResult, Storage
from reflector.storage.base import FileResult, Storage, StoragePermissionError
def handle_s3_client_errors(operation_name: str):
"""Decorator to handle S3 ClientError with bucket-aware messaging.
Args:
operation_name: Human-readable operation name for error messages (e.g., "upload", "delete")
"""
def decorator(func):
@wraps(func)
async def wrapper(self, *args, **kwargs):
bucket = kwargs.get("bucket")
try:
return await func(self, *args, **kwargs)
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code")
if error_code in ("AccessDenied", "NoSuchBucket"):
actual_bucket = bucket or self._bucket_name
bucket_context = (
f"overridden bucket '{actual_bucket}'"
if bucket
else f"default bucket '{actual_bucket}'"
)
raise StoragePermissionError(
f"S3 {operation_name} failed for {bucket_context}: {error_code}. "
f"Check TRANSCRIPT_STORAGE_AWS_* credentials have permission."
) from e
raise
return wrapper
return decorator
class AwsStorage(Storage):
"""AWS S3 storage with bucket override for multi-platform recording architecture.
Master credentials access all buckets via optional bucket parameter in operations."""
def __init__(
self,
aws_access_key_id: str,
aws_secret_access_key: str,
aws_bucket_name: str,
aws_region: str,
aws_access_key_id: str | None = None,
aws_secret_access_key: str | None = None,
aws_role_arn: str | None = None,
):
if not aws_access_key_id:
raise ValueError("Storage `aws_storage` require `aws_access_key_id`")
if not aws_secret_access_key:
raise ValueError("Storage `aws_storage` require `aws_secret_access_key`")
if not aws_bucket_name:
raise ValueError("Storage `aws_storage` require `aws_bucket_name`")
if not aws_region:
raise ValueError("Storage `aws_storage` require `aws_region`")
if not aws_access_key_id and not aws_role_arn:
raise ValueError(
"Storage `aws_storage` require either `aws_access_key_id` or `aws_role_arn`"
)
if aws_role_arn and (aws_access_key_id or aws_secret_access_key):
raise ValueError(
"Storage `aws_storage` cannot use both `aws_role_arn` and access keys"
)
super().__init__()
self.aws_bucket_name = aws_bucket_name
self._bucket_name = aws_bucket_name
self._region = aws_region
self._access_key_id = aws_access_key_id
self._secret_access_key = aws_secret_access_key
self._role_arn = aws_role_arn
self.aws_folder = ""
if "/" in aws_bucket_name:
self.aws_bucket_name, self.aws_folder = aws_bucket_name.split("/", 1)
self._bucket_name, self.aws_folder = aws_bucket_name.split("/", 1)
self.boto_config = Config(retries={"max_attempts": 3, "mode": "adaptive"})
self.session = aioboto3.Session(
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=aws_region,
)
self.base_url = f"https://{aws_bucket_name}.s3.amazonaws.com/"
self.base_url = f"https://{self._bucket_name}.s3.amazonaws.com/"
async def _put_file(self, filename: str, data: bytes) -> FileResult:
bucket = self.aws_bucket_name
folder = self.aws_folder
logger.info(f"Uploading {filename} to S3 {bucket}/{folder}")
s3filename = f"{folder}/{filename}" if folder else filename
async with self.session.client("s3") as client:
await client.put_object(
Bucket=bucket,
Key=s3filename,
Body=data,
# Implement credential properties
@property
def bucket_name(self) -> str:
return self._bucket_name
@property
def region(self) -> str:
return self._region
@property
def access_key_id(self) -> str | None:
return self._access_key_id
@property
def secret_access_key(self) -> str | None:
return self._secret_access_key
@property
def role_arn(self) -> str | None:
return self._role_arn
@property
def key_credentials(self) -> tuple[str, str]:
"""Get (access_key_id, secret_access_key) for key-based auth."""
if self._role_arn:
raise ValueError(
"Storage uses IAM role authentication. "
"Use role_credential property instead of key_credentials."
)
if not self._access_key_id or not self._secret_access_key:
raise ValueError("Storage access key credentials not configured")
return (self._access_key_id, self._secret_access_key)
async def _get_file_url(self, filename: str) -> FileResult:
bucket = self.aws_bucket_name
@property
def role_credential(self) -> str:
"""Get IAM role ARN for role-based auth."""
if self._access_key_id or self._secret_access_key:
raise ValueError(
"Storage uses access key authentication. "
"Use key_credentials property instead of role_credential."
)
if not self._role_arn:
raise ValueError("Storage IAM role ARN not configured")
return self._role_arn
@handle_s3_client_errors("upload")
async def _put_file(
self, filename: str, data: Union[bytes, BinaryIO], *, bucket: str | None = None
) -> FileResult:
actual_bucket = bucket or self._bucket_name
folder = self.aws_folder
s3filename = f"{folder}/{filename}" if folder else filename
async with self.session.client("s3") as client:
logger.info(f"Uploading {filename} to S3 {actual_bucket}/{folder}")
async with self.session.client("s3", config=self.boto_config) as client:
if isinstance(data, bytes):
await client.put_object(Bucket=actual_bucket, Key=s3filename, Body=data)
else:
# boto3 reads file-like object in chunks
# avoids creating extra memory copy vs bytes.getvalue() approach
await client.upload_fileobj(data, Bucket=actual_bucket, Key=s3filename)
url = await self._get_file_url(filename, bucket=bucket)
return FileResult(filename=filename, url=url)
@handle_s3_client_errors("presign")
async def _get_file_url(
self,
filename: str,
operation: str = "get_object",
expires_in: int = 3600,
*,
bucket: str | None = None,
) -> str:
actual_bucket = bucket or self._bucket_name
folder = self.aws_folder
s3filename = f"{folder}/{filename}" if folder else filename
async with self.session.client("s3", config=self.boto_config) as client:
presigned_url = await client.generate_presigned_url(
"get_object",
Params={"Bucket": bucket, "Key": s3filename},
ExpiresIn=3600,
operation,
Params={"Bucket": actual_bucket, "Key": s3filename},
ExpiresIn=expires_in,
)
return presigned_url
async def _delete_file(self, filename: str):
bucket = self.aws_bucket_name
@handle_s3_client_errors("delete")
async def _delete_file(self, filename: str, *, bucket: str | None = None):
actual_bucket = bucket or self._bucket_name
folder = self.aws_folder
logger.info(f"Deleting {filename} from S3 {bucket}/{folder}")
logger.info(f"Deleting {filename} from S3 {actual_bucket}/{folder}")
s3filename = f"{folder}/{filename}" if folder else filename
async with self.session.client("s3") as client:
await client.delete_object(Bucket=bucket, Key=s3filename)
async with self.session.client("s3", config=self.boto_config) as client:
await client.delete_object(Bucket=actual_bucket, Key=s3filename)
async def _get_file(self, filename: str):
bucket = self.aws_bucket_name
@handle_s3_client_errors("download")
async def _get_file(self, filename: str, *, bucket: str | None = None):
actual_bucket = bucket or self._bucket_name
folder = self.aws_folder
logger.info(f"Downloading {filename} from S3 {bucket}/{folder}")
logger.info(f"Downloading {filename} from S3 {actual_bucket}/{folder}")
s3filename = f"{folder}/{filename}" if folder else filename
async with self.session.client("s3") as client:
response = await client.get_object(Bucket=bucket, Key=s3filename)
async with self.session.client("s3", config=self.boto_config) as client:
response = await client.get_object(Bucket=actual_bucket, Key=s3filename)
return await response["Body"].read()
@handle_s3_client_errors("list_objects")
async def _list_objects(
self, prefix: str = "", *, bucket: str | None = None
) -> list[str]:
actual_bucket = bucket or self._bucket_name
folder = self.aws_folder
# Combine folder and prefix
s3prefix = f"{folder}/{prefix}" if folder else prefix
logger.info(f"Listing objects from S3 {actual_bucket} with prefix '{s3prefix}'")
keys = []
async with self.session.client("s3", config=self.boto_config) as client:
paginator = client.get_paginator("list_objects_v2")
async for page in paginator.paginate(Bucket=actual_bucket, Prefix=s3prefix):
if "Contents" in page:
for obj in page["Contents"]:
# Strip folder prefix from keys if present
key = obj["Key"]
if folder:
if key.startswith(f"{folder}/"):
key = key[len(folder) + 1 :]
elif key == folder:
# Skip folder marker itself
continue
keys.append(key)
return keys
@handle_s3_client_errors("stream")
async def _stream_to_fileobj(
self, filename: str, fileobj: BinaryIO, *, bucket: str | None = None
):
"""Stream file from S3 directly to file object without loading into memory."""
actual_bucket = bucket or self._bucket_name
folder = self.aws_folder
logger.info(f"Streaming {filename} from S3 {actual_bucket}/{folder}")
s3filename = f"{folder}/{filename}" if folder else filename
async with self.session.client("s3", config=self.boto_config) as client:
await client.download_fileobj(
Bucket=actual_bucket, Key=s3filename, Fileobj=fileobj
)
Storage.register("aws", AwsStorage)

View File

@@ -0,0 +1,26 @@
from reflector.utils.string import NonEmptyString
DailyRoomName = str
def extract_base_room_name(daily_room_name: DailyRoomName) -> NonEmptyString:
"""
Extract base room name from Daily.co timestamped room name.
Daily.co creates rooms with timestamp suffix: {base_name}-YYYYMMDDHHMMSS
This function removes the timestamp to get the original room name.
Examples:
"daily-20251020193458""daily"
"daily-2-20251020193458""daily-2"
"my-room-name-20251020193458""my-room-name"
Args:
daily_room_name: Full Daily.co room name with optional timestamp
Returns:
Base room name without timestamp suffix
"""
base_name = daily_room_name.rsplit("-", 1)[0]
assert base_name, f"Extracted base name is empty from: {daily_room_name}"
return base_name

View File

@@ -0,0 +1,9 @@
from datetime import datetime, timezone
def parse_datetime_with_timezone(iso_string: str) -> datetime:
"""Parse ISO datetime string and ensure timezone awareness (defaults to UTC if naive)."""
dt = datetime.fromisoformat(iso_string)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt

View File

@@ -1,4 +1,4 @@
from typing import Annotated
from typing import Annotated, TypeVar
from pydantic import Field, TypeAdapter, constr
@@ -21,3 +21,12 @@ def try_parse_non_empty_string(s: str) -> NonEmptyString | None:
if not s:
return None
return parse_non_empty_string(s)
T = TypeVar("T", bound=str)
def assert_equal[T](s1: T, s2: T) -> T:
if s1 != s2:
raise ValueError(f"assert_equal: {s1} != {s2}")
return s1

View File

@@ -0,0 +1,37 @@
"""URL manipulation utilities."""
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
def add_query_param(url: str, key: str, value: str) -> str:
"""
Add or update a query parameter in a URL.
Properly handles URLs with or without existing query parameters,
preserving fragments and encoding special characters.
Args:
url: The URL to modify
key: The query parameter name
value: The query parameter value
Returns:
The URL with the query parameter added or updated
Examples:
>>> add_query_param("https://example.com/room", "t", "token123")
'https://example.com/room?t=token123'
>>> add_query_param("https://example.com/room?existing=param", "t", "token123")
'https://example.com/room?existing=param&t=token123'
"""
parsed = urlparse(url)
query_params = parse_qs(parsed.query, keep_blank_values=True)
query_params[key] = [value]
new_query = urlencode(query_params, doseq=True)
new_parsed = parsed._replace(query=new_query)
return urlunparse(new_parsed)

View File

@@ -1,11 +1,5 @@
# Video Platform Abstraction Layer
"""
This module provides an abstraction layer for different video conferencing platforms.
It allows seamless switching between providers (Whereby, Daily.co, etc.) without
changing the core application logic.
"""
from .base import MeetingData, VideoPlatformClient, VideoPlatformConfig
from .base import VideoPlatformClient
from .models import MeetingData, VideoPlatformConfig
from .registry import get_platform_client, register_platform
__all__ = [

View File

@@ -1,77 +1,50 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Any, Dict, Optional
from typing import TYPE_CHECKING, Any, Dict, Optional
from pydantic import BaseModel
from ..schemas.platform import Platform
from ..utils.string import NonEmptyString
from .models import MeetingData, SessionData, VideoPlatformConfig
from reflector.db.rooms import Room
if TYPE_CHECKING:
from reflector.db.rooms import Room
class MeetingData(BaseModel):
"""Standardized meeting data returned by all platforms."""
meeting_id: str
room_name: str
room_url: str
host_room_url: str
platform: str
extra_data: Dict[str, Any] = {} # Platform-specific data
class VideoPlatformConfig(BaseModel):
"""Configuration for a video platform."""
api_key: str
webhook_secret: str
api_url: Optional[str] = None
subdomain: Optional[str] = None
s3_bucket: Optional[str] = None
s3_region: Optional[str] = None
aws_role_arn: Optional[str] = None
aws_access_key_id: Optional[str] = None
aws_access_key_secret: Optional[str] = None
# separator doesn't guarantee there's no more "ROOM_PREFIX_SEPARATOR" strings in room name
ROOM_PREFIX_SEPARATOR = "-"
class VideoPlatformClient(ABC):
"""Abstract base class for video platform integrations."""
PLATFORM_NAME: str = ""
PLATFORM_NAME: Platform
def __init__(self, config: VideoPlatformConfig):
self.config = config
@abstractmethod
async def create_meeting(
self, room_name_prefix: str, end_date: datetime, room: Room
self, room_name_prefix: NonEmptyString, end_date: datetime, room: "Room"
) -> MeetingData:
"""Create a new meeting room."""
pass
@abstractmethod
async def get_room_sessions(self, room_name: str) -> Dict[str, Any]:
"""Get session information for a room."""
async def get_room_sessions(self, room_name: str) -> list[SessionData]:
"""Get session history for a room."""
pass
@abstractmethod
async def delete_room(self, room_name: str) -> bool:
"""Delete a room. Returns True if successful."""
pass
@abstractmethod
async def upload_logo(self, room_name: str, logo_path: str) -> bool:
"""Upload a logo to the room. Returns True if successful."""
pass
@abstractmethod
def verify_webhook_signature(
self, body: bytes, signature: str, timestamp: Optional[str] = None
) -> bool:
"""Verify webhook signature for security."""
pass
def format_recording_config(self, room: Room) -> Dict[str, Any]:
"""Format recording configuration for the platform.
Can be overridden by specific implementations."""
def format_recording_config(self, room: "Room") -> Dict[str, Any]:
if room.recording_type == "cloud" and self.config.s3_bucket:
return {
"type": room.recording_type,

View File

@@ -0,0 +1,261 @@
import base64
import hmac
from datetime import datetime
from hashlib import sha256
from http import HTTPStatus
from typing import Any, Dict, Optional
import httpx
from reflector.db.daily_participant_sessions import (
daily_participant_sessions_controller,
)
from reflector.db.rooms import Room
from reflector.logger import logger
from reflector.storage import get_dailyco_storage
from ..schemas.platform import Platform
from ..utils.daily import DailyRoomName
from ..utils.string import NonEmptyString
from .base import ROOM_PREFIX_SEPARATOR, VideoPlatformClient
from .models import MeetingData, RecordingType, SessionData, VideoPlatformConfig
class DailyClient(VideoPlatformClient):
PLATFORM_NAME: Platform = "daily"
TIMEOUT = 10
BASE_URL = "https://api.daily.co/v1"
TIMESTAMP_FORMAT = "%Y%m%d%H%M%S"
RECORDING_NONE: RecordingType = "none"
RECORDING_CLOUD: RecordingType = "cloud"
def __init__(self, config: VideoPlatformConfig):
super().__init__(config)
self.headers = {
"Authorization": f"Bearer {config.api_key}",
"Content-Type": "application/json",
}
async def create_meeting(
self, room_name_prefix: NonEmptyString, end_date: datetime, room: Room
) -> MeetingData:
"""
Daily.co rooms vs meetings:
- We create a NEW Daily.co room for each Reflector meeting
- Daily.co meeting/session starts automatically when first participant joins
- Room auto-deletes after exp time
- Meeting.room_name stores the timestamped Daily.co room name
"""
timestamp = datetime.now().strftime(self.TIMESTAMP_FORMAT)
room_name = f"{room_name_prefix}{ROOM_PREFIX_SEPARATOR}{timestamp}"
data = {
"name": room_name,
"privacy": "private" if room.is_locked else "public",
"properties": {
"enable_recording": "raw-tracks"
if room.recording_type != self.RECORDING_NONE
else False,
"enable_chat": True,
"enable_screenshare": True,
"start_video_off": False,
"start_audio_off": False,
"exp": int(end_date.timestamp()),
},
}
# Only configure recordings_bucket if recording is enabled
if room.recording_type != self.RECORDING_NONE:
daily_storage = get_dailyco_storage()
assert daily_storage.bucket_name, "S3 bucket must be configured"
data["properties"]["recordings_bucket"] = {
"bucket_name": daily_storage.bucket_name,
"bucket_region": daily_storage.region,
"assume_role_arn": daily_storage.role_credential,
"allow_api_access": True,
}
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/rooms",
headers=self.headers,
json=data,
timeout=self.TIMEOUT,
)
if response.status_code >= 400:
logger.error(
"Daily.co API error",
status_code=response.status_code,
response_body=response.text,
request_data=data,
)
response.raise_for_status()
result = response.json()
room_url = result["url"]
return MeetingData(
meeting_id=result["id"],
room_name=result["name"],
room_url=room_url,
host_room_url=room_url,
platform=self.PLATFORM_NAME,
extra_data=result,
)
async def get_room_sessions(self, room_name: str) -> list[SessionData]:
"""Get room session history from database (webhook-stored sessions).
Daily.co doesn't provide historical session API, so we query our database
where participant.joined/left webhooks are stored.
"""
from reflector.db.meetings import meetings_controller
meeting = await meetings_controller.get_by_room_name(room_name)
if not meeting:
return []
sessions = await daily_participant_sessions_controller.get_by_meeting(
meeting.id
)
return [
SessionData(
session_id=s.id,
started_at=s.joined_at,
ended_at=s.left_at,
)
for s in sessions
]
async def get_room_presence(self, room_name: str) -> Dict[str, Any]:
"""Get room presence/session data for a Daily.co room.
Example response:
{
"total_count": 1,
"data": [
{
"room": "w2pp2cf4kltgFACPKXmX",
"id": "d61cd7b2-a273-42b4-89bd-be763fd562c1",
"userId": "pbZ+ismP7dk=",
"userName": "Moishe",
"joinTime": "2023-01-01T20:53:19.000Z",
"duration": 2312
}
]
}
"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/rooms/{room_name}/presence",
headers=self.headers,
timeout=self.TIMEOUT,
)
response.raise_for_status()
return response.json()
async def get_meeting_participants(self, meeting_id: str) -> Dict[str, Any]:
"""Get participant data for a specific Daily.co meeting.
Example response:
{
"data": [
{
"user_id": "4q47OTmqa/w=",
"participant_id": "d61cd7b2-a273-42b4-89bd-be763fd562c1",
"user_name": "Lindsey",
"join_time": 1672786813,
"duration": 150
},
{
"user_id": "pbZ+ismP7dk=",
"participant_id": "b3d56359-14d7-46af-ac8b-18f8c991f5f6",
"user_name": "Moishe",
"join_time": 1672786797,
"duration": 165
}
]
}
"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/meetings/{meeting_id}/participants",
headers=self.headers,
timeout=self.TIMEOUT,
)
response.raise_for_status()
return response.json()
async def get_recording(self, recording_id: str) -> Dict[str, Any]:
async with httpx.AsyncClient() as client:
response = await client.get(
f"{self.BASE_URL}/recordings/{recording_id}",
headers=self.headers,
timeout=self.TIMEOUT,
)
response.raise_for_status()
return response.json()
async def delete_room(self, room_name: str) -> bool:
async with httpx.AsyncClient() as client:
response = await client.delete(
f"{self.BASE_URL}/rooms/{room_name}",
headers=self.headers,
timeout=self.TIMEOUT,
)
return response.status_code in (HTTPStatus.OK, HTTPStatus.NOT_FOUND)
async def upload_logo(self, room_name: str, logo_path: str) -> bool:
return True
def verify_webhook_signature(
self, body: bytes, signature: str, timestamp: Optional[str] = None
) -> bool:
"""Verify Daily.co webhook signature.
Daily.co uses:
- X-Webhook-Signature header
- X-Webhook-Timestamp header
- Signature format: HMAC-SHA256(base64_decode(secret), timestamp + '.' + body)
- Result is base64 encoded
"""
if not signature or not timestamp:
return False
try:
secret_bytes = base64.b64decode(self.config.webhook_secret)
signed_content = timestamp.encode() + b"." + body
expected = hmac.new(secret_bytes, signed_content, sha256).digest()
expected_b64 = base64.b64encode(expected).decode()
return hmac.compare_digest(expected_b64, signature)
except Exception as e:
logger.error("Daily.co webhook signature verification failed", exc_info=e)
return False
async def create_meeting_token(
self,
room_name: DailyRoomName,
enable_recording: bool,
user_id: Optional[str] = None,
) -> str:
data = {"properties": {"room_name": room_name}}
if enable_recording:
data["properties"]["start_cloud_recording"] = True
data["properties"]["enable_recording_ui"] = False
if user_id:
data["properties"]["user_id"] = user_id
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.BASE_URL}/meeting-tokens",
headers=self.headers,
json=data,
timeout=self.TIMEOUT,
)
response.raise_for_status()
return response.json()["token"]

View File

@@ -1,54 +1,62 @@
"""Factory for creating video platform clients based on configuration."""
from typing import Optional
from typing import TYPE_CHECKING, Literal, Optional, overload
from reflector.db.rooms import VideoPlatform
from reflector.settings import settings
from reflector.storage import get_dailyco_storage, get_whereby_storage
from ..schemas.platform import WHEREBY_PLATFORM, Platform
from .base import VideoPlatformClient, VideoPlatformConfig
from .registry import get_platform_client
if TYPE_CHECKING:
from .jitsi import JitsiClient
from .whereby import WherebyClient
def get_platform_config(platform: str) -> VideoPlatformConfig:
"""Get configuration for a specific platform."""
if platform == VideoPlatform.WHEREBY:
def get_platform_config(platform: Platform) -> VideoPlatformConfig:
if platform == WHEREBY_PLATFORM:
if not settings.WHEREBY_API_KEY:
raise ValueError(
"WHEREBY_API_KEY is required when platform='whereby'. "
"Set WHEREBY_API_KEY environment variable."
)
whereby_storage = get_whereby_storage()
key_id, secret = whereby_storage.key_credentials
return VideoPlatformConfig(
api_key=settings.WHEREBY_API_KEY or "",
api_key=settings.WHEREBY_API_KEY,
webhook_secret=settings.WHEREBY_WEBHOOK_SECRET or "",
api_url=settings.WHEREBY_API_URL,
s3_bucket=settings.RECORDING_STORAGE_AWS_BUCKET_NAME,
aws_access_key_id=settings.AWS_WHEREBY_ACCESS_KEY_ID,
aws_access_key_secret=settings.AWS_WHEREBY_ACCESS_KEY_SECRET,
s3_bucket=whereby_storage.bucket_name,
s3_region=whereby_storage.region,
aws_access_key_id=key_id,
aws_access_key_secret=secret,
)
elif platform == VideoPlatform.JITSI:
elif platform == "daily":
if not settings.DAILY_API_KEY:
raise ValueError(
"DAILY_API_KEY is required when platform='daily'. "
"Set DAILY_API_KEY environment variable."
)
if not settings.DAILY_SUBDOMAIN:
raise ValueError(
"DAILY_SUBDOMAIN is required when platform='daily'. "
"Set DAILY_SUBDOMAIN environment variable."
)
daily_storage = get_dailyco_storage()
return VideoPlatformConfig(
api_key="", # Jitsi uses JWT, no API key
webhook_secret=settings.JITSI_WEBHOOK_SECRET or "",
api_url=f"https://{settings.JITSI_DOMAIN}",
api_key=settings.DAILY_API_KEY,
webhook_secret=settings.DAILY_WEBHOOK_SECRET or "",
subdomain=settings.DAILY_SUBDOMAIN,
s3_bucket=daily_storage.bucket_name,
s3_region=daily_storage.region,
aws_role_arn=daily_storage.role_credential,
)
else:
raise ValueError(f"Unknown platform: {platform}")
@overload
def create_platform_client(platform: Literal["jitsi"]) -> "JitsiClient": ...
@overload
def create_platform_client(platform: Literal["whereby"]) -> "WherebyClient": ...
def create_platform_client(platform: str) -> VideoPlatformClient:
"""Create a video platform client instance."""
def create_platform_client(platform: Platform) -> VideoPlatformClient:
config = get_platform_config(platform)
return get_platform_client(platform, config)
def get_platform_for_room(room_id: Optional[str] = None) -> str:
"""Determine which platform to use for a room based on feature flags."""
# For now, default to whereby since we don't have feature flags yet
return VideoPlatform.WHEREBY
def get_platform(room_platform: Optional[Platform] = None) -> Platform:
if room_platform:
return room_platform
return settings.DEFAULT_VIDEO_PLATFORM

View File

@@ -1,4 +0,0 @@
from .client import JitsiClient, JitsiMeetingData
from .router import router
__all__ = ["JitsiClient", "JitsiMeetingData", "router"]

View File

@@ -1,111 +0,0 @@
import hmac
from datetime import datetime, timezone
from hashlib import sha256
from typing import Any, Dict, Optional
import jwt
from reflector.db.rooms import Room, VideoPlatform
from reflector.settings import settings
from reflector.utils import generate_uuid4
from ..base import MeetingData, VideoPlatformClient
class JitsiMeetingData(MeetingData):
@property
def user_jwt(self) -> str:
return self.extra_data.get("user_jwt", "")
@property
def host_jwt(self) -> str:
return self.extra_data.get("host_jwt", "")
@property
def domain(self) -> str:
return self.extra_data.get("domain", "")
class JitsiClient(VideoPlatformClient):
PLATFORM_NAME = VideoPlatform.JITSI
def _generate_jwt(self, room: str, moderator: bool, exp: datetime) -> str:
if not settings.JITSI_JWT_SECRET:
raise ValueError("JITSI_JWT_SECRET is required for JWT generation")
payload = {
"aud": settings.JITSI_JWT_AUDIENCE,
"iss": settings.JITSI_JWT_ISSUER,
"sub": settings.JITSI_DOMAIN,
"room": room,
"exp": int(exp.timestamp()),
"context": {
"user": {
"name": "Reflector User",
"moderator": moderator,
},
"features": {
"recording": True,
"livestreaming": False,
},
},
}
return jwt.encode(payload, settings.JITSI_JWT_SECRET, algorithm="HS256")
async def create_meeting(
self, room_name_prefix: str, end_date: datetime, room: Room
) -> JitsiMeetingData:
jitsi_room = f"reflector-{room.name}-{generate_uuid4()}"
user_jwt = self._generate_jwt(room=jitsi_room, moderator=False, exp=end_date)
host_jwt = self._generate_jwt(room=jitsi_room, moderator=True, exp=end_date)
room_url = f"https://{settings.JITSI_DOMAIN}/{jitsi_room}?jwt={user_jwt}"
host_room_url = f"https://{settings.JITSI_DOMAIN}/{jitsi_room}?jwt={host_jwt}"
return JitsiMeetingData(
meeting_id=generate_uuid4(),
room_name=jitsi_room,
room_url=room_url,
host_room_url=host_room_url,
platform=self.PLATFORM_NAME,
extra_data={
"user_jwt": user_jwt,
"host_jwt": host_jwt,
"domain": settings.JITSI_DOMAIN,
},
)
async def get_room_sessions(self, room_name: str) -> Dict[str, Any]:
return {
"roomName": room_name,
"sessions": [
{
"sessionId": generate_uuid4(),
"startTime": datetime.now(tz=timezone.utc).isoformat(),
"participants": [],
"isActive": True,
}
],
}
async def delete_room(self, room_name: str) -> bool:
return True
async def upload_logo(self, room_name: str, logo_path: str) -> bool:
return True
def verify_webhook_signature(
self, body: bytes, signature: str, timestamp: Optional[str] = None
) -> bool:
if not signature or not self.config.webhook_secret:
return False
try:
expected = hmac.new(
self.config.webhook_secret.encode(), body, sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
except Exception:
return False

View File

@@ -1,165 +0,0 @@
import hmac
from datetime import datetime
from hashlib import sha256
from typing import Any, Dict
from fastapi import APIRouter, HTTPException, Request
from pydantic import BaseModel
from reflector.db.meetings import meetings_controller
from reflector.settings import settings
try:
from reflector.video_platforms import create_platform_client
except ImportError:
# PyJWT not yet installed, will be added in final task
def create_platform_client(platform: str):
return None
router = APIRouter()
class JitsiWebhookEvent(BaseModel):
event: str
room: str
timestamp: datetime
data: Dict[str, Any] = {}
class JibriRecordingEvent(BaseModel):
room_name: str
recording_file: str
recording_status: str
timestamp: datetime
def verify_jitsi_webhook_signature(body: bytes, signature: str) -> bool:
"""Verify Jitsi webhook signature using HMAC-SHA256."""
if not signature or not settings.JITSI_WEBHOOK_SECRET:
return False
try:
client = create_platform_client("jitsi")
if client is None:
# Fallback verification when platform client not available
expected = hmac.new(
settings.JITSI_WEBHOOK_SECRET.encode(), body, sha256
).hexdigest()
return hmac.compare_digest(expected, signature)
return client.verify_webhook_signature(body, signature)
except Exception:
return False
@router.post("/jitsi/events")
async def jitsi_events_webhook(event: JitsiWebhookEvent, request: Request):
"""
Handle Prosody event-sync webhooks from Jitsi Meet.
Expected event types:
- muc-occupant-joined: participant joined the room
- muc-occupant-left: participant left the room
- jibri-recording-on: recording started
- jibri-recording-off: recording stopped
"""
# Verify webhook signature
body = await request.body()
signature = request.headers.get("x-jitsi-signature", "")
if not verify_jitsi_webhook_signature(body, signature):
raise HTTPException(status_code=401, detail="Invalid webhook signature")
# Find meeting by room name
meeting = await meetings_controller.get_by_room_name(event.room)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
# Handle participant events
if event.event == "muc-occupant-joined":
# Store event and update participant count
await meetings_controller.participant_joined(
meeting.id, {"timestamp": event.timestamp, "data": event.data}
)
current_count = getattr(meeting, "num_clients", 0)
await meetings_controller.update_meeting(
meeting.id, num_clients=current_count + 1
)
elif event.event == "muc-occupant-left":
# Store event and update participant count
await meetings_controller.participant_left(
meeting.id, {"timestamp": event.timestamp, "data": event.data}
)
current_count = getattr(meeting, "num_clients", 0)
await meetings_controller.update_meeting(
meeting.id, num_clients=max(0, current_count - 1)
)
elif event.event == "jibri-recording-on":
# Store recording started event
await meetings_controller.recording_started(
meeting.id, {"timestamp": event.timestamp, "data": event.data}
)
elif event.event == "jibri-recording-off":
# Store recording stopped event
await meetings_controller.recording_stopped(
meeting.id, {"timestamp": event.timestamp, "data": event.data}
)
return {"status": "ok", "event": event.event, "room": event.room}
@router.post("/jibri/recording-complete")
async def jibri_recording_complete(event: JibriRecordingEvent, request: Request):
"""
Handle Jibri recording completion webhook.
This endpoint is called by the Jibri finalize script when a recording
is completed and uploaded to storage.
"""
# Verify webhook signature
body = await request.body()
signature = request.headers.get("x-jitsi-signature", "")
if not verify_jitsi_webhook_signature(body, signature):
raise HTTPException(status_code=401, detail="Invalid webhook signature")
# Find meeting by room name
meeting = await meetings_controller.get_by_room_name(event.room_name)
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
# Store recording completion event
await meetings_controller.add_event(
meeting.id,
"recording_completed",
{
"recording_file": event.recording_file,
"recording_status": event.recording_status,
"timestamp": event.timestamp,
},
)
# TODO: Trigger recording processing pipeline
# This is where we would:
# 1. Download the recording file from Jibri storage
# 2. Create a transcript record in the database
# 3. Queue the audio processing tasks (chunking, transcription, etc.)
# 4. Update meeting status to indicate recording is being processed
return {
"status": "ok",
"room_name": event.room_name,
"recording_file": event.recording_file,
"message": "Recording processing queued",
}
@router.get("/jitsi/health")
async def jitsi_health_check():
"""Simple health check endpoint for Jitsi webhook configuration."""
return {
"status": "ok",
"service": "jitsi-webhooks",
"timestamp": datetime.now(tz=timezone.utc).isoformat(),
"webhook_secret_configured": bool(settings.JITSI_WEBHOOK_SECRET),
}

View File

@@ -1,3 +0,0 @@
"""Jitsi-specific worker tasks."""
# Placeholder for Jitsi recording tasks

View File

@@ -0,0 +1,60 @@
from datetime import datetime
from typing import Any, Dict, Literal, Optional
from pydantic import BaseModel, Field
from reflector.schemas.platform import WHEREBY_PLATFORM, Platform
from reflector.utils.string import NonEmptyString
RecordingType = Literal["none", "local", "cloud"]
class SessionData(BaseModel):
"""Platform-agnostic session data.
Represents a participant session in a meeting room, regardless of platform.
Used to determine if a meeting is still active or has ended.
"""
session_id: NonEmptyString = Field(description="Unique session identifier")
started_at: datetime = Field(description="When session started (UTC)")
ended_at: datetime | None = Field(
description="When session ended (UTC), None if still active"
)
class MeetingData(BaseModel):
platform: Platform
meeting_id: NonEmptyString = Field(
description="Platform-specific meeting identifier"
)
room_url: NonEmptyString = Field(description="URL for participants to join")
host_room_url: NonEmptyString = Field(
description="URL for hosts (may be same as room_url)"
)
room_name: NonEmptyString = Field(description="Human-readable room name")
extra_data: Dict[str, Any] = Field(default_factory=dict)
class Config:
json_schema_extra = {
"example": {
"platform": WHEREBY_PLATFORM,
"meeting_id": "12345678",
"room_url": "https://subdomain.whereby.com/room-20251008120000",
"host_room_url": "https://subdomain.whereby.com/room-20251008120000?roomKey=abc123",
"room_name": "room-20251008120000",
}
}
class VideoPlatformConfig(BaseModel):
api_key: str
webhook_secret: str
api_url: Optional[str] = None
subdomain: Optional[str] = None # Whereby/Daily subdomain
s3_bucket: Optional[str] = None
s3_region: Optional[str] = None
# Whereby uses access keys, Daily uses IAM role
aws_access_key_id: Optional[str] = None
aws_access_key_secret: Optional[str] = None
aws_role_arn: Optional[str] = None

View File

@@ -1,56 +1,35 @@
from typing import TYPE_CHECKING, Dict, Literal, Type, overload
from typing import Dict, Type
from ..schemas.platform import DAILY_PLATFORM, WHEREBY_PLATFORM, Platform
from .base import VideoPlatformClient, VideoPlatformConfig
if TYPE_CHECKING:
from .jitsi import JitsiClient
from .whereby import WherebyClient
# Registry of available video platforms
_PLATFORMS: Dict[str, Type[VideoPlatformClient]] = {}
_PLATFORMS: Dict[Platform, Type[VideoPlatformClient]] = {}
def register_platform(name: str, client_class: Type[VideoPlatformClient]):
"""Register a video platform implementation."""
_PLATFORMS[name.lower()] = client_class
@overload
def get_platform_client(
platform: Literal["jitsi"], config: VideoPlatformConfig
) -> "JitsiClient": ...
@overload
def get_platform_client(
platform: Literal["whereby"], config: VideoPlatformConfig
) -> "WherebyClient": ...
def register_platform(name: Platform, client_class: Type[VideoPlatformClient]):
_PLATFORMS[name] = client_class
def get_platform_client(
platform: str, config: VideoPlatformConfig
platform: Platform, config: VideoPlatformConfig
) -> VideoPlatformClient:
"""Get a video platform client instance."""
platform_lower = platform.lower()
if platform_lower not in _PLATFORMS:
if platform not in _PLATFORMS:
raise ValueError(f"Unknown video platform: {platform}")
client_class = _PLATFORMS[platform_lower]
client_class = _PLATFORMS[platform]
return client_class(config)
def get_available_platforms() -> list[str]:
"""Get list of available platform names."""
def get_available_platforms() -> list[Platform]:
return list(_PLATFORMS.keys())
# Auto-register built-in platforms
def _register_builtin_platforms():
from .jitsi import JitsiClient
from .whereby import WherebyClient
from .daily import DailyClient # noqa: PLC0415
from .whereby import WherebyClient # noqa: PLC0415
register_platform("jitsi", JitsiClient)
register_platform("whereby", WherebyClient)
register_platform(WHEREBY_PLATFORM, WherebyClient)
register_platform(DAILY_PLATFORM, DailyClient)
_register_builtin_platforms()

Some files were not shown because too many files have changed in this diff Show More