Compare commits

..

15 Commits

Author SHA1 Message Date
5f2f0e9317 chore(main): release 0.8.0 (#579) 2025-08-29 11:34:24 -06:00
88ed7cfa78 feat(rooms): add webhook for transcript completion (#578)
* feat(rooms): add webhook notifications for transcript completion

- Add webhook_url and webhook_secret fields to rooms table
- Create Celery task with 24-hour retry window using exponential backoff
- Send transcript metadata, diarized text, topics, and summaries via webhook
- Add HMAC signature verification for webhook security
- Add test endpoint POST /v1/rooms/{room_id}/webhook/test
- Update frontend with webhook configuration UI and test button
- Auto-generate webhook secret if not provided
- Trigger webhook after successful file pipeline processing for room recordings

* style: linting

* fix: remove unwanted files

* fix: update openapi gen

* fix: self-review

* docs: add comprehensive webhook documentation

- Document webhook configuration, events, and payloads
- Include transcript.completed and test event examples
- Add security considerations and best practices
- Provide example webhook receiver implementation
- Document retry policy and signature verification

* fix: remove audio_mp3_url from webhook payload

- Remove audio download URL generation from webhook
- Update documentation to reflect the change
- Keep only frontend_url for accessing transcripts

* docs: remove unwanted section

* fix: correct API method name and type imports for rooms

- Fix v1RoomsRetrieve to v1RoomsGet
- Update Room type to RoomDetails throughout frontend
- Fix type imports in useRoomList, RoomList, RoomTable, and RoomCards

* feat: add show/hide toggle for webhook secret field

- Add eye icon button to reveal/hide webhook secret when editing
- Show password dots when webhook secret is hidden
- Reset visibility state when opening/closing dialog
- Only show toggle button when editing existing room with secret

* fix: resolve event loop conflict in webhook test endpoint

- Extract webhook test logic into shared async function
- Call async function directly from FastAPI endpoint
- Keep Celery task wrapper for background processing
- Fixes RuntimeError: event loop already running

* refactor: remove unnecessary Celery task for webhook testing

- Webhook testing is synchronous and provides immediate feedback
- No need for background processing via Celery
- Keep only the async function called directly from API endpoint

* feat: improve webhook test error messages and display

- Show HTTP status code in error messages
- Parse JSON error responses to extract meaningful messages
- Improved UI layout for webhook test results
- Added colored background for success/error states
- Better text wrapping for long error messages

* docs: adjust doc

* fix: review

* fix: update attempts to match close 24h

* fix: add event_id

* fix: changed to uuid, to have new event_id when reprocess.

* style: linting

* fix: alembic revision
2025-08-29 10:07:49 -06:00
6f0c7c1a5e feat(cleanup): add automatic data retention for public instances (#574)
* feat(cleanup): add automatic data retention for public instances

- Add Celery task to clean up anonymous data after configurable retention period
- Delete transcripts, meetings, and orphaned recordings older than retention days
- Only runs when PUBLIC_MODE is enabled to prevent accidental data loss
- Properly removes all associated files (local and S3 storage)
- Add manual cleanup tool for testing and intervention
- Configure retention via PUBLIC_DATA_RETENTION_DAYS setting (default: 7 days)

Fixes #571

* fix: apply pre-commit formatting fixes

* fix: properly delete recording files from storage during cleanup

- Add storage deletion for orphaned recordings in both cleanup task and manual tool
- Delete from storage before removing database records
- Log warnings if storage deletion fails but continue with database cleanup

* Apply suggestion from @pr-agent-monadical[bot]

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

* Apply suggestion from @pr-agent-monadical[bot]

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

* refactor: cleanup_old_data for better logging

* fix: linting

* test: fix meeting cleanup test to not require room controller

- Simplify test by directly inserting meetings into database
- Remove dependency on non-existent rooms_controller.create method
- Tests now pass successfully

* fix: linting

* refactor: simplify cleanup tool to use worker implementation

- Remove duplicate cleanup logic from manual tool
- Use the same _cleanup_old_public_data function from worker
- Remove dry-run feature as requested
- Prevent code duplication and ensure consistency
- Update documentation to reflect changes

* refactor: split cleanup worker into smaller functions

- Move all imports to the top of the file
- Extract cleanup logic into separate functions:
  - cleanup_old_transcripts()
  - cleanup_old_meetings()
  - cleanup_orphaned_recordings()
  - log_cleanup_results()
- Make code more maintainable and testable
- Add days parameter support to Celery task
- Update manual tool to work with refactored code

* feat: add TypedDict typing for cleanup stats

- Add CleanupStats TypedDict for better type safety
- Update all function signatures to use proper typing
- Add return type annotations to _cleanup_old_public_data
- Improves code maintainability and IDE support

* feat: add CASCADE DELETE to meeting_consent foreign key

- Add ondelete="CASCADE" to meeting_consent.meeting_id foreign key
- Generate and apply migration to update existing constraint
- Remove manual consent deletion from cleanup code
- Add unit test to verify CASCADE DELETE behavior

* style: linting

* fix: alembic migration branchpoint

* fix: correct downgrade constraint name in CASCADE DELETE migration

* fix: regenerate CASCADE DELETE migration with proper constraint names

- Delete problematic migration and regenerate with correct names
- Use explicit constraint name in both upgrade and downgrade
- Ensure migration works bidirectionally
- All tests passing including CASCADE DELETE test

* style: linting

* refactor: simplify cleanup to use transcripts as entry point

- Remove orphaned_recordings cleanup (not part of this PR scope)
- Remove separate old_meetings cleanup
- Transcripts are now the main entry point for cleanup
- Associated meetings and recordings are deleted with their transcript
- Use single database connection for all operations
- Update tests to reflect new approach

* refactor: cleanup and rename functions for clarity

- Rename _cleanup_old_public_data to cleanup_old_public_data (make public)
- Rename celery task to cleanup_old_public_data_task for clarity
- Update docstrings and improve code organization
- Remove unnecessary comments and simplify deletion logic
- Update tests to use new function names
- All tests passing

* style: linting\

* style: typing and review

* fix: add transaction on cleanup_single_transcript

* fix: naming

---------

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
2025-08-29 08:47:14 -06:00
9dfd76996f fix: file pipeline status reporting and websocket updates (#589)
* feat: use file pipeline for upload and reprocess action

* fix: make file pipeline correctly report status events

* fix: duplication of transcripts_controller

* fix: tests

* test: fix file upload test

* test: fix reprocess

* fix: also patch from main_file_pipeline

(how patch is done is dependent of file import unfortunately)
2025-08-29 00:58:14 -06:00
55cc8637c6 ci: restrict workflow execution to main branch and add concurrency (#586)
* ci: try adding concurrency

* ci: restrict push on main branch

* ci: fix concurrency key

* ci: fix build concurrency

* refactor: apply suggestion from @pr-agent-monadical[bot]

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

---------

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
2025-08-28 16:43:17 -06:00
f5331a2107 style: more type annotations to parakeet transcriber (#581)
* feat: add comprehensive type annotations to Parakeet transcriber

- Add TypedDict for WordTiming with word, start, end fields
- Add NamedTuple for TimeSegment, AudioSegment, and TranscriptResult
- Add type hints to all generator functions (vad_segment_generator, batch_speech_segments, etc.)
- Add enforce_word_timing_constraints function to prevent word timing overlaps
- Refactor batch_segment_to_audio_segment to reuse pad_audio function

* doc: add note about space
2025-08-28 12:22:07 -06:00
Igor Loskutov
124ce03bf8 fix: Igor/evaluation (#575)
* fix: impossible import error (#563)

* evaluation cli - database events experiment

* hallucinations

* evaluation - unhallucinate

* evaluation - unhallucinate

* roll back reliability link

* self reviewio

* lint

* self review

* add file pipeline to cli

* add file pipeline to cli + sorting

* remove cli tests

* remove ai comments

* comments
2025-08-28 12:07:34 -04:00
7030e0f236 fix: optimize parakeet transcription batching algorithm (#577)
* refactor: optimize transcription batching to accumulate speech segments

- Changed VAD segment generator to return full audio array instead of segments
- Removed segment filtering step
- Modified batch_segments to accumulate maximum speech including silence
- Transcribe larger continuous chunks instead of individual speech segments

* fix: correct transcribe_batch call to use list and fix batch unpacking

* fix: simplify

* fix: remove unused variables

* fix: add typing
2025-08-27 10:32:04 -06:00
37f0110892 doc: update local model readme 2025-08-22 17:50:24 -06:00
cf2896a7f4 doc: update readme about installation instructions
Add a note about installation instructions being inaccurate.
2025-08-22 17:48:35 -06:00
aabf2c2572 chore(main): release 0.7.3 (#565) 2025-08-22 16:35:52 -06:00
6a7b08f016 doc: change readme intro 2025-08-22 16:26:25 -06:00
e2736563d9 doc: update readme with new images 2025-08-22 16:15:54 -06:00
0f54b7782d chore: ignore www/.env.[development,production] 2025-08-22 14:41:09 -06:00
359280dd34 fix: cleaned repo, and get git-leaks clean 2025-08-22 11:51:34 -06:00
45 changed files with 2407 additions and 1070 deletions

View File

@@ -2,6 +2,8 @@ name: Test Database Migrations
on:
push:
branches:
- main
paths:
- "server/migrations/**"
- "server/reflector/db/**"
@@ -17,6 +19,9 @@ on:
jobs:
test-migrations:
runs-on: ubuntu-latest
concurrency:
group: db-ubuntu-latest-${{ github.ref }}
cancel-in-progress: true
services:
postgres:
image: postgres:17

View File

@@ -5,12 +5,17 @@ on:
paths:
- "server/**"
push:
branches:
- main
paths:
- "server/**"
jobs:
pytest:
runs-on: ubuntu-latest
concurrency:
group: pytest-${{ github.ref }}
cancel-in-progress: true
services:
redis:
image: redis:6
@@ -30,6 +35,9 @@ jobs:
docker-amd64:
runs-on: linux-amd64
concurrency:
group: docker-amd64-${{ github.ref }}
cancel-in-progress: true
steps:
- uses: actions/checkout@v4
- name: Set up Docker Buildx
@@ -45,6 +53,9 @@ jobs:
docker-arm64:
runs-on: linux-arm64
concurrency:
group: docker-arm64-${{ github.ref }}
cancel-in-progress: true
steps:
- uses: actions/checkout@v4
- name: Set up Docker Buildx

4
.gitignore vendored
View File

@@ -14,4 +14,6 @@ data/
www/REFACTOR.md
www/reload-frontend
server/test.sqlite
CLAUDE.local.md
CLAUDE.local.md
www/.env.development
www/.env.production

1
.gitleaksignore Normal file
View File

@@ -0,0 +1 @@
b9d891d3424f371642cb032ecfd0e2564470a72c:server/tests/test_transcripts_recording_deletion.py:generic-api-key:15

View File

@@ -27,3 +27,8 @@ repos:
files: ^server/
- id: ruff-format
files: ^server/
- repo: https://github.com/gitleaks/gitleaks
rev: v8.28.0
hooks:
- id: gitleaks

View File

@@ -1,5 +1,28 @@
# Changelog
## [0.8.0](https://github.com/Monadical-SAS/reflector/compare/v0.7.3...v0.8.0) (2025-08-29)
### Features
* **cleanup:** add automatic data retention for public instances ([#574](https://github.com/Monadical-SAS/reflector/issues/574)) ([6f0c7c1](https://github.com/Monadical-SAS/reflector/commit/6f0c7c1a5e751713366886c8e764c2009e12ba72))
* **rooms:** add webhook for transcript completion ([#578](https://github.com/Monadical-SAS/reflector/issues/578)) ([88ed7cf](https://github.com/Monadical-SAS/reflector/commit/88ed7cfa7804794b9b54cad4c3facc8a98cf85fd))
### Bug Fixes
* file pipeline status reporting and websocket updates ([#589](https://github.com/Monadical-SAS/reflector/issues/589)) ([9dfd769](https://github.com/Monadical-SAS/reflector/commit/9dfd76996f851cc52be54feea078adbc0816dc57))
* Igor/evaluation ([#575](https://github.com/Monadical-SAS/reflector/issues/575)) ([124ce03](https://github.com/Monadical-SAS/reflector/commit/124ce03bf86044c18313d27228a25da4bc20c9c5))
* optimize parakeet transcription batching algorithm ([#577](https://github.com/Monadical-SAS/reflector/issues/577)) ([7030e0f](https://github.com/Monadical-SAS/reflector/commit/7030e0f23649a8cf6c1eb6d5889684a41ce849ec))
## [0.7.3](https://github.com/Monadical-SAS/reflector/compare/v0.7.2...v0.7.3) (2025-08-22)
### Bug Fixes
* cleaned repo, and get git-leaks clean ([359280d](https://github.com/Monadical-SAS/reflector/commit/359280dd340433ba4402ed69034094884c825e67))
* restore previous behavior on live pipeline + audio downscaler ([#561](https://github.com/Monadical-SAS/reflector/issues/561)) ([9265d20](https://github.com/Monadical-SAS/reflector/commit/9265d201b590d23c628c5f19251b70f473859043))
## [0.7.2](https://github.com/Monadical-SAS/reflector/compare/v0.7.1...v0.7.2) (2025-08-21)

View File

@@ -1,43 +1,60 @@
<div align="center">
<img width="100" alt="image" src="https://github.com/user-attachments/assets/66fb367b-2c89-4516-9912-f47ac59c6a7f"/>
# Reflector
Reflector Audio Management and Analysis is a cutting-edge web application under development by Monadical. It utilizes AI to record meetings, providing a permanent record with transcripts, translations, and automated summaries.
Reflector is an AI-powered audio transcription and meeting analysis platform that provides real-time transcription, speaker diarization, translation and summarization for audio content and live meetings. It works 100% with local models (whisper/parakeet, pyannote, seamless-m4t, and your local llm like phi-4).
[![Tests](https://github.com/monadical-sas/reflector/actions/workflows/pytests.yml/badge.svg?branch=main&event=push)](https://github.com/monadical-sas/reflector/actions/workflows/pytests.yml)
[![Tests](https://github.com/monadical-sas/reflector/actions/workflows/test_server.yml/badge.svg?branch=main&event=push)](https://github.com/monadical-sas/reflector/actions/workflows/test_server.yml)
[![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT)
</div>
## Screenshots
</div>
<table>
<tr>
<td>
<a href="https://github.com/user-attachments/assets/3a976930-56c1-47ef-8c76-55d3864309e3">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/3a976930-56c1-47ef-8c76-55d3864309e3" />
<a href="https://github.com/user-attachments/assets/21f5597c-2930-4899-a154-f7bd61a59e97">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/21f5597c-2930-4899-a154-f7bd61a59e97" />
</a>
</td>
<td>
<a href="https://github.com/user-attachments/assets/bfe3bde3-08af-4426-a9a1-11ad5cd63b33">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/bfe3bde3-08af-4426-a9a1-11ad5cd63b33" />
<a href="https://github.com/user-attachments/assets/f6b9399a-5e51-4bae-b807-59128d0a940c">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/f6b9399a-5e51-4bae-b807-59128d0a940c" />
</a>
</td>
<td>
<a href="https://github.com/user-attachments/assets/7b60c9d0-efe4-474f-a27b-ea13bd0fabdc">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/7b60c9d0-efe4-474f-a27b-ea13bd0fabdc" />
<a href="https://github.com/user-attachments/assets/a42ce460-c1fd-4489-a995-270516193897">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/a42ce460-c1fd-4489-a995-270516193897" />
</a>
</td>
<td>
<a href="https://github.com/user-attachments/assets/21929f6d-c309-42fe-9c11-f1299e50fbd4">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/21929f6d-c309-42fe-9c11-f1299e50fbd4" />
</a>
</td>
</tr>
</table>
## What is Reflector?
Reflector is a web application that utilizes local models to process audio content, providing:
- **Real-time Transcription**: Convert speech to text using [Whisper](https://github.com/openai/whisper) (multi-language) or [Parakeet](https://huggingface.co/nvidia/parakeet-tdt-0.6b-v2) (English) models
- **Speaker Diarization**: Identify and label different speakers using [Pyannote](https://github.com/pyannote/pyannote-audio) 3.1
- **Live Translation**: Translate audio content in real-time to many languages with [Facebook Seamless-M4T](https://github.com/facebookresearch/seamless_communication)
- **Topic Detection & Summarization**: Extract key topics and generate concise summaries using LLMs
- **Meeting Recording**: Create permanent records of meetings with searchable transcripts
Currently we provide [modal.com](https://modal.com/) gpu template to deploy.
## Background
The project architecture consists of three primary components:
- **Front-End**: NextJS React project hosted on Vercel, located in `www/`.
- **Back-End**: Python server that offers an API and data persistence, found in `server/`.
- **GPU implementation**: Providing services such as speech-to-text transcription, topic generation, automated summaries, and translations. Most reliable option is Modal deployment
- **Front-End**: NextJS React project hosted on Vercel, located in `www/`.
- **GPU implementation**: Providing services such as speech-to-text transcription, topic generation, automated summaries, and translations.
It also uses authentik for authentication if activated, and Vercel for deployment and configuration of the front-end.
It also uses authentik for authentication if activated.
## Contribution Guidelines
@@ -72,6 +89,8 @@ Note: We currently do not have instructions for Windows users.
## Installation
*Note: we're working toward better installation, theses instructions are not accurate for now*
### Frontend
Start with `cd www`.

View File

@@ -0,0 +1,95 @@
# Data Retention and Cleanup
## Overview
For public instances of Reflector, a data retention policy is automatically enforced to delete anonymous user data after a configurable period (default: 7 days). This ensures compliance with privacy expectations and prevents unbounded storage growth.
## Configuration
### Environment Variables
- `PUBLIC_MODE` (bool): Must be set to `true` to enable automatic cleanup
- `PUBLIC_DATA_RETENTION_DAYS` (int): Number of days to retain anonymous data (default: 7)
### What Gets Deleted
When data reaches the retention period, the following items are automatically removed:
1. **Transcripts** from anonymous users (where `user_id` is NULL):
- Database records
- Local files (audio.wav, audio.mp3, audio.json waveform)
- Storage files (cloud storage if configured)
## Automatic Cleanup
### Celery Beat Schedule
When `PUBLIC_MODE=true`, a Celery beat task runs daily at 3 AM to clean up old data:
```python
# Automatically scheduled when PUBLIC_MODE=true
"cleanup_old_public_data": {
"task": "reflector.worker.cleanup.cleanup_old_public_data",
"schedule": crontab(hour=3, minute=0), # Daily at 3 AM
}
```
### Running the Worker
Ensure both Celery worker and beat scheduler are running:
```bash
# Start Celery worker
uv run celery -A reflector.worker.app worker --loglevel=info
# Start Celery beat scheduler (in another terminal)
uv run celery -A reflector.worker.app beat
```
## Manual Cleanup
For testing or manual intervention, use the cleanup tool:
```bash
# Delete data older than 7 days (default)
uv run python -m reflector.tools.cleanup_old_data
# Delete data older than 30 days
uv run python -m reflector.tools.cleanup_old_data --days 30
```
Note: The manual tool uses the same implementation as the Celery worker task to ensure consistency.
## Important Notes
1. **User Data Deletion**: Only anonymous data (where `user_id` is NULL) is deleted. Authenticated user data is preserved.
2. **Storage Cleanup**: The system properly cleans up both local files and cloud storage when configured.
3. **Error Handling**: If individual deletions fail, the cleanup continues and logs errors. Failed deletions are reported in the task output.
4. **Public Instance Only**: The automatic cleanup task only runs when `PUBLIC_MODE=true` to prevent accidental data loss in private deployments.
## Testing
Run the cleanup tests:
```bash
uv run pytest tests/test_cleanup.py -v
```
## Monitoring
Check Celery logs for cleanup task execution:
```bash
# Look for cleanup task logs
grep "cleanup_old_public_data" celery.log
grep "Starting cleanup of old public data" celery.log
```
Task statistics are logged after each run:
- Number of transcripts deleted
- Number of meetings deleted
- Number of orphaned recordings deleted
- Any errors encountered

212
server/docs/webhook.md Normal file
View File

@@ -0,0 +1,212 @@
# Reflector Webhook Documentation
## Overview
Reflector supports webhook notifications to notify external systems when transcript processing is completed. Webhooks can be configured per room and are triggered automatically after a transcript is successfully processed.
## Configuration
Webhooks are configured at the room level with two fields:
- `webhook_url`: The HTTPS endpoint to receive webhook notifications
- `webhook_secret`: Optional secret key for HMAC signature verification (auto-generated if not provided)
## Events
### `transcript.completed`
Triggered when a transcript has been fully processed, including transcription, diarization, summarization, and topic detection.
### `test`
A test event that can be triggered manually to verify webhook configuration.
## Webhook Request Format
### Headers
All webhook requests include the following headers:
| Header | Description | Example |
|--------|-------------|---------|
| `Content-Type` | Always `application/json` | `application/json` |
| `User-Agent` | Identifies Reflector as the source | `Reflector-Webhook/1.0` |
| `X-Webhook-Event` | The event type | `transcript.completed` or `test` |
| `X-Webhook-Retry` | Current retry attempt number | `0`, `1`, `2`... |
| `X-Webhook-Signature` | HMAC signature (if secret configured) | `t=1735306800,v1=abc123...` |
### Signature Verification
If a webhook secret is configured, Reflector includes an HMAC-SHA256 signature in the `X-Webhook-Signature` header to verify the webhook authenticity.
The signature format is: `t={timestamp},v1={signature}`
To verify the signature:
1. Extract the timestamp and signature from the header
2. Create the signed payload: `{timestamp}.{request_body}`
3. Compute HMAC-SHA256 of the signed payload using your webhook secret
4. Compare the computed signature with the received signature
Example verification (Python):
```python
import hmac
import hashlib
def verify_webhook_signature(payload: bytes, signature_header: str, secret: str) -> bool:
# Parse header: "t=1735306800,v1=abc123..."
parts = dict(part.split("=") for part in signature_header.split(","))
timestamp = parts["t"]
received_signature = parts["v1"]
# Create signed payload
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
# Compute expected signature
expected_signature = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256
).hexdigest()
# Compare signatures
return hmac.compare_digest(expected_signature, received_signature)
```
## Event Payloads
### `transcript.completed` Event
This event includes a convenient URL for accessing the transcript:
- `frontend_url`: Direct link to view the transcript in the web interface
```json
{
"event": "transcript.completed",
"event_id": "transcript.completed-abc-123-def-456",
"timestamp": "2025-08-27T12:34:56.789012Z",
"transcript": {
"id": "abc-123-def-456",
"room_id": "room-789",
"created_at": "2025-08-27T12:00:00Z",
"duration": 1800.5,
"title": "Q3 Product Planning Meeting",
"short_summary": "Team discussed Q3 product roadmap, prioritizing mobile app features and API improvements.",
"long_summary": "The product team met to finalize the Q3 roadmap. Key decisions included...",
"webvtt": "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\n<v Speaker 1>Welcome everyone to today's meeting...",
"topics": [
{
"title": "Introduction and Agenda",
"summary": "Meeting kickoff with agenda review",
"timestamp": 0.0,
"duration": 120.0,
"webvtt": "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\n<v Speaker 1>Welcome everyone..."
},
{
"title": "Mobile App Features Discussion",
"summary": "Team reviewed proposed mobile app features for Q3",
"timestamp": 120.0,
"duration": 600.0,
"webvtt": "WEBVTT\n\n00:02:00.000 --> 00:02:10.000\n<v Speaker 2>Let's talk about the mobile app..."
}
],
"participants": [
{
"id": "participant-1",
"name": "John Doe",
"speaker": "Speaker 1"
},
{
"id": "participant-2",
"name": "Jane Smith",
"speaker": "Speaker 2"
}
],
"source_language": "en",
"target_language": "en",
"status": "completed",
"frontend_url": "https://app.reflector.com/transcripts/abc-123-def-456"
},
"room": {
"id": "room-789",
"name": "Product Team Room"
}
}
```
### `test` Event
```json
{
"event": "test",
"event_id": "test.2025-08-27T12:34:56.789012Z",
"timestamp": "2025-08-27T12:34:56.789012Z",
"message": "This is a test webhook from Reflector",
"room": {
"id": "room-789",
"name": "Product Team Room"
}
}
```
## Retry Policy
Webhooks are delivered with automatic retry logic to handle transient failures. When a webhook delivery fails due to server errors or network issues, Reflector will automatically retry the delivery multiple times over an extended period.
### Retry Mechanism
Reflector implements an exponential backoff strategy for webhook retries:
- **Initial retry delay**: 60 seconds after the first failure
- **Exponential backoff**: Each subsequent retry waits approximately twice as long as the previous one
- **Maximum retry interval**: 1 hour (backoff is capped at this duration)
- **Maximum retry attempts**: 30 attempts total
- **Total retry duration**: Retries continue for approximately 24 hours
### How Retries Work
When a webhook fails, Reflector will:
1. Wait 60 seconds, then retry (attempt #1)
2. If it fails again, wait ~2 minutes, then retry (attempt #2)
3. Continue doubling the wait time up to a maximum of 1 hour between attempts
4. Keep retrying at 1-hour intervals until successful or 30 attempts are exhausted
The `X-Webhook-Retry` header indicates the current retry attempt number (0 for the initial attempt, 1 for first retry, etc.), allowing your endpoint to track retry attempts.
### Retry Behavior by HTTP Status Code
| Status Code | Behavior |
|-------------|----------|
| 2xx (Success) | No retry, webhook marked as delivered |
| 4xx (Client Error) | No retry, request is considered permanently failed |
| 5xx (Server Error) | Automatic retry with exponential backoff |
| Network/Timeout Error | Automatic retry with exponential backoff |
**Important Notes:**
- Webhooks timeout after 30 seconds. If your endpoint takes longer to respond, it will be considered a timeout error and retried.
- During the retry period (~24 hours), you may receive the same webhook multiple times if your endpoint experiences intermittent failures.
- There is no mechanism to manually retry failed webhooks after the retry period expires.
## Testing Webhooks
You can test your webhook configuration before processing transcripts:
```http
POST /v1/rooms/{room_id}/webhook/test
```
Response:
```json
{
"success": true,
"status_code": 200,
"message": "Webhook test successful",
"response_preview": "OK"
}
```
Or in case of failure:
```json
{
"success": false,
"error": "Webhook request timed out (10 seconds)"
}
```

View File

@@ -3,7 +3,7 @@ import os
import sys
import threading
import uuid
from typing import Mapping, NewType
from typing import Generator, Mapping, NamedTuple, NewType, TypedDict
from urllib.parse import urlparse
import modal
@@ -14,10 +14,7 @@ SAMPLERATE = 16000
UPLOADS_PATH = "/uploads"
CACHE_PATH = "/cache"
VAD_CONFIG = {
"max_segment_duration": 30.0,
"batch_max_files": 10,
"batch_max_duration": 5.0,
"min_segment_duration": 0.02,
"batch_max_duration": 30.0,
"silence_padding": 0.5,
"window_size": 512,
}
@@ -25,6 +22,37 @@ VAD_CONFIG = {
ParakeetUniqFilename = NewType("ParakeetUniqFilename", str)
AudioFileExtension = NewType("AudioFileExtension", str)
class TimeSegment(NamedTuple):
"""Represents a time segment with start and end times."""
start: float
end: float
class AudioSegment(NamedTuple):
"""Represents an audio segment with timing and audio data."""
start: float
end: float
audio: any
class TranscriptResult(NamedTuple):
"""Represents a transcription result with text and word timings."""
text: str
words: list["WordTiming"]
class WordTiming(TypedDict):
"""Represents a word with its timing information."""
word: str
start: float
end: float
app = modal.App("reflector-transcriber-parakeet")
# Volume for caching model weights
@@ -170,12 +198,14 @@ class TranscriberParakeetLive:
(output,) = self.model.transcribe([padded_audio], timestamps=True)
text = output.text.strip()
words = [
{
"word": word_info["word"] + " ",
"start": round(word_info["start"], 2),
"end": round(word_info["end"], 2),
}
words: list[WordTiming] = [
WordTiming(
# XXX the space added here is to match the output of whisper
# whisper add space to each words, while parakeet don't
word=word_info["word"] + " ",
start=round(word_info["start"], 2),
end=round(word_info["end"], 2),
)
for word_info in output.timestamp["word"]
]
@@ -211,12 +241,12 @@ class TranscriberParakeetLive:
for i, (filename, output) in enumerate(zip(filenames, outputs)):
text = output.text.strip()
words = [
{
"word": word_info["word"] + " ",
"start": round(word_info["start"], 2),
"end": round(word_info["end"], 2),
}
words: list[WordTiming] = [
WordTiming(
word=word_info["word"] + " ",
start=round(word_info["start"], 2),
end=round(word_info["end"], 2),
)
for word_info in output.timestamp["word"]
]
@@ -271,7 +301,9 @@ class TranscriberParakeetFile:
audio_array, sample_rate = librosa.load(file_path, sr=SAMPLERATE, mono=True)
return audio_array
def vad_segment_generator(audio_array):
def vad_segment_generator(
audio_array,
) -> Generator[TimeSegment, None, None]:
"""Generate speech segments using VAD with start/end sample indices"""
vad_iterator = VADIterator(self.vad_model, sampling_rate=SAMPLERATE)
window_size = VAD_CONFIG["window_size"]
@@ -297,107 +329,121 @@ class TranscriberParakeetFile:
start_time = start / float(SAMPLERATE)
end_time = end / float(SAMPLERATE)
# Extract the actual audio segment
audio_segment = audio_array[start:end]
yield (start_time, end_time, audio_segment)
yield TimeSegment(start_time, end_time)
start = None
vad_iterator.reset_states()
def vad_segment_filter(segments):
"""Filter VAD segments by duration and chunk large segments"""
min_dur = VAD_CONFIG["min_segment_duration"]
max_dur = VAD_CONFIG["max_segment_duration"]
def batch_speech_segments(
segments: Generator[TimeSegment, None, None], max_duration: int
) -> Generator[TimeSegment, None, None]:
"""
Input segments:
[0-2] [3-5] [6-8] [10-11] [12-15] [17-19] [20-22]
for start_time, end_time, audio_segment in segments:
segment_duration = end_time - start_time
↓ (max_duration=10)
# Skip very small segments
if segment_duration < min_dur:
Output batches:
[0-8] [10-19] [20-22]
Note: silences are kept for better transcription, previous implementation was
passing segments separatly, but the output was less accurate.
"""
batch_start_time = None
batch_end_time = None
for segment in segments:
start_time, end_time = segment.start, segment.end
if batch_start_time is None or batch_end_time is None:
batch_start_time = start_time
batch_end_time = end_time
continue
# If segment is within max duration, yield as-is
if segment_duration <= max_dur:
yield (start_time, end_time, audio_segment)
total_duration = end_time - batch_start_time
if total_duration <= max_duration:
batch_end_time = end_time
continue
# Chunk large segments into smaller pieces
chunk_samples = int(max_dur * SAMPLERATE)
current_start = start_time
yield TimeSegment(batch_start_time, batch_end_time)
batch_start_time = start_time
batch_end_time = end_time
for chunk_offset in range(0, len(audio_segment), chunk_samples):
chunk_audio = audio_segment[
chunk_offset : chunk_offset + chunk_samples
]
if len(chunk_audio) == 0:
break
if batch_start_time is None or batch_end_time is None:
return
chunk_duration = len(chunk_audio) / float(SAMPLERATE)
chunk_end = current_start + chunk_duration
yield TimeSegment(batch_start_time, batch_end_time)
# Only yield chunks that meet minimum duration
if chunk_duration >= min_dur:
yield (current_start, chunk_end, chunk_audio)
def batch_segment_to_audio_segment(
segments: Generator[TimeSegment, None, None],
audio_array,
) -> Generator[AudioSegment, None, None]:
"""Extract audio segments and apply padding for Parakeet compatibility.
current_start = chunk_end
Uses pad_audio to ensure segments are at least 0.5s long, preventing
Parakeet crashes. This padding may cause slight timing overlaps between
segments, which are corrected by enforce_word_timing_constraints.
"""
for segment in segments:
start_time, end_time = segment.start, segment.end
start_sample = int(start_time * SAMPLERATE)
end_sample = int(end_time * SAMPLERATE)
audio_segment = audio_array[start_sample:end_sample]
def batch_segments(segments, max_files=10, max_duration=5.0):
batch = []
batch_duration = 0.0
padded_segment = pad_audio(audio_segment, SAMPLERATE)
for start_time, end_time, audio_segment in segments:
segment_duration = end_time - start_time
yield AudioSegment(start_time, end_time, padded_segment)
if segment_duration < VAD_CONFIG["silence_padding"]:
silence_samples = int(
(VAD_CONFIG["silence_padding"] - segment_duration) * SAMPLERATE
)
padding = np.zeros(silence_samples, dtype=np.float32)
audio_segment = np.concatenate([audio_segment, padding])
segment_duration = VAD_CONFIG["silence_padding"]
batch.append((start_time, end_time, audio_segment))
batch_duration += segment_duration
if len(batch) >= max_files or batch_duration >= max_duration:
yield batch
batch = []
batch_duration = 0.0
if batch:
yield batch
def transcribe_batch(model, audio_segments):
def transcribe_batch(model, audio_segments: list) -> list:
with NoStdStreams():
outputs = model.transcribe(audio_segments, timestamps=True)
return outputs
def enforce_word_timing_constraints(
words: list[WordTiming],
) -> list[WordTiming]:
"""Enforce that word end times don't exceed the start time of the next word.
Due to silence padding added in batch_segment_to_audio_segment for better
transcription accuracy, word timings from different segments may overlap.
This function ensures there are no overlaps by adjusting end times.
"""
if len(words) <= 1:
return words
enforced_words = []
for i, word in enumerate(words):
enforced_word = word.copy()
if i < len(words) - 1:
next_start = words[i + 1]["start"]
if enforced_word["end"] > next_start:
enforced_word["end"] = next_start
enforced_words.append(enforced_word)
return enforced_words
def emit_results(
results,
segments_info,
batch_index,
total_batches,
):
results: list,
segments_info: list[AudioSegment],
) -> Generator[TranscriptResult, None, None]:
"""Yield transcribed text and word timings from model output, adjusting timestamps to absolute positions."""
for i, (output, (start_time, end_time, _)) in enumerate(
zip(results, segments_info)
):
for i, (output, segment) in enumerate(zip(results, segments_info)):
start_time, end_time = segment.start, segment.end
text = output.text.strip()
words = [
{
"word": word_info["word"] + " ",
"start": round(
words: list[WordTiming] = [
WordTiming(
word=word_info["word"] + " ",
start=round(
word_info["start"] + start_time + timestamp_offset, 2
),
"end": round(
word_info["end"] + start_time + timestamp_offset, 2
),
}
end=round(word_info["end"] + start_time + timestamp_offset, 2),
)
for word_info in output.timestamp["word"]
]
yield text, words
yield TranscriptResult(text, words)
upload_volume.reload()
@@ -407,41 +453,31 @@ class TranscriberParakeetFile:
audio_array = load_and_convert_audio(file_path)
total_duration = len(audio_array) / float(SAMPLERATE)
processed_duration = 0.0
all_text_parts = []
all_words = []
all_text_parts: list[str] = []
all_words: list[WordTiming] = []
raw_segments = vad_segment_generator(audio_array)
filtered_segments = vad_segment_filter(raw_segments)
batches = batch_segments(
filtered_segments,
VAD_CONFIG["batch_max_files"],
speech_segments = batch_speech_segments(
raw_segments,
VAD_CONFIG["batch_max_duration"],
)
audio_segments = batch_segment_to_audio_segment(speech_segments, audio_array)
batch_index = 0
total_batches = max(
1, int(total_duration / VAD_CONFIG["batch_max_duration"]) + 1
)
for batch in audio_segments:
audio_segment = batch.audio
results = transcribe_batch(self.model, [audio_segment])
for batch in batches:
batch_index += 1
audio_segments = [seg[2] for seg in batch]
results = transcribe_batch(self.model, audio_segments)
for text, words in emit_results(
for result in emit_results(
results,
batch,
batch_index,
total_batches,
[batch],
):
if not text:
if not result.text:
continue
all_text_parts.append(text)
all_words.extend(words)
all_text_parts.append(result.text)
all_words.extend(result.words)
processed_duration += sum(len(seg[2]) / float(SAMPLERATE) for seg in batch)
all_words = enforce_word_timing_constraints(all_words)
combined_text = " ".join(all_text_parts)
return {"text": combined_text, "words": all_words}

View File

@@ -0,0 +1,36 @@
"""Add webhook fields to rooms
Revision ID: 0194f65cd6d3
Revises: 5a8907fd1d78
Create Date: 2025-08-27 09:03:19.610995
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "0194f65cd6d3"
down_revision: Union[str, None] = "5a8907fd1d78"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(sa.Column("webhook_url", sa.String(), nullable=True))
batch_op.add_column(sa.Column("webhook_secret", sa.String(), nullable=True))
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("webhook_secret")
batch_op.drop_column("webhook_url")
# ### end Alembic commands ###

View File

@@ -0,0 +1,50 @@
"""add cascade delete to meeting consent foreign key
Revision ID: 5a8907fd1d78
Revises: 0ab2d7ffaa16
Create Date: 2025-08-26 17:26:50.945491
"""
from typing import Sequence, Union
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "5a8907fd1d78"
down_revision: Union[str, None] = "0ab2d7ffaa16"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting_consent", schema=None) as batch_op:
batch_op.drop_constraint(
batch_op.f("meeting_consent_meeting_id_fkey"), type_="foreignkey"
)
batch_op.create_foreign_key(
batch_op.f("meeting_consent_meeting_id_fkey"),
"meeting",
["meeting_id"],
["id"],
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("meeting_consent", schema=None) as batch_op:
batch_op.drop_constraint(
batch_op.f("meeting_consent_meeting_id_fkey"), type_="foreignkey"
)
batch_op.create_foreign_key(
batch_op.f("meeting_consent_meeting_id_fkey"),
"meeting",
["meeting_id"],
["id"],
)
# ### end Alembic commands ###

View File

@@ -0,0 +1,27 @@
import asyncio
import functools
from reflector.db import get_database
def asynctask(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
async def run_with_db():
database = get_database()
await database.connect()
try:
return await f(*args, **kwargs)
finally:
await database.disconnect()
coro = run_with_db()
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
return loop.run_until_complete(coro)
return asyncio.run(coro)
return wrapper

View File

@@ -54,7 +54,12 @@ meeting_consent = sa.Table(
"meeting_consent",
metadata,
sa.Column("id", sa.String, primary_key=True),
sa.Column("meeting_id", sa.String, sa.ForeignKey("meeting.id"), nullable=False),
sa.Column(
"meeting_id",
sa.String,
sa.ForeignKey("meeting.id", ondelete="CASCADE"),
nullable=False,
),
sa.Column("user_id", sa.String),
sa.Column("consent_given", sa.Boolean, nullable=False),
sa.Column("consent_timestamp", sa.DateTime(timezone=True), nullable=False),

View File

@@ -1,3 +1,4 @@
import secrets
from datetime import datetime, timezone
from sqlite3 import IntegrityError
from typing import Literal
@@ -40,6 +41,8 @@ rooms = sqlalchemy.Table(
sqlalchemy.Column(
"is_shared", sqlalchemy.Boolean, nullable=False, server_default=false()
),
sqlalchemy.Column("webhook_url", sqlalchemy.String),
sqlalchemy.Column("webhook_secret", sqlalchemy.String),
sqlalchemy.Index("idx_room_is_shared", "is_shared"),
)
@@ -59,6 +62,8 @@ class Room(BaseModel):
"none", "prompt", "automatic", "automatic-2nd-participant"
] = "automatic-2nd-participant"
is_shared: bool = False
webhook_url: str = ""
webhook_secret: str = ""
class RoomController:
@@ -107,10 +112,15 @@ class RoomController:
recording_type: str,
recording_trigger: str,
is_shared: bool,
webhook_url: str = "",
webhook_secret: str = "",
):
"""
Add a new room
"""
if webhook_url and not webhook_secret:
webhook_secret = secrets.token_urlsafe(32)
room = Room(
name=name,
user_id=user_id,
@@ -122,6 +132,8 @@ class RoomController:
recording_type=recording_type,
recording_trigger=recording_trigger,
is_shared=is_shared,
webhook_url=webhook_url,
webhook_secret=webhook_secret,
)
query = rooms.insert().values(**room.model_dump())
try:
@@ -134,6 +146,9 @@ class RoomController:
"""
Update a room fields with key/values in values
"""
if values.get("webhook_url") and not values.get("webhook_secret"):
values["webhook_secret"] = secrets.token_urlsafe(32)
query = rooms.update().where(rooms.c.id == room.id).values(**values)
try:
await get_database().execute(query)

View File

@@ -122,6 +122,15 @@ def generate_transcript_name() -> str:
return f"Transcript {now.strftime('%Y-%m-%d %H:%M:%S')}"
TranscriptStatus = Literal[
"idle", "uploaded", "recording", "processing", "error", "ended"
]
class StrValue(BaseModel):
value: str
class AudioWaveform(BaseModel):
data: list[float]
@@ -185,7 +194,7 @@ class Transcript(BaseModel):
id: str = Field(default_factory=generate_uuid4)
user_id: str | None = None
name: str = Field(default_factory=generate_transcript_name)
status: str = "idle"
status: TranscriptStatus = "idle"
duration: float = 0
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
title: str | None = None
@@ -732,5 +741,27 @@ class TranscriptController:
transcript.delete_participant(participant_id)
await self.update(transcript, {"participants": transcript.participants_dump()})
async def set_status(
self, transcript_id: str, status: TranscriptStatus
) -> TranscriptEvent | None:
"""
Update the status of a transcript
Will add an event STATUS + update the status field of transcript
"""
async with self.transaction():
transcript = await self.get_by_id(transcript_id)
if not transcript:
raise Exception(f"Transcript {transcript_id} not found")
if transcript.status == status:
return
resp = await self.append_event(
transcript=transcript,
event="STATUS",
data=StrValue(value=status),
)
await self.update(transcript, {"status": status})
return resp
transcripts_controller = TranscriptController()

View File

@@ -7,18 +7,26 @@ Uses parallel processing for transcription, diarization, and waveform generation
"""
import asyncio
import uuid
from pathlib import Path
import av
import structlog
from celery import shared_task
from reflector.asynctask import asynctask
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import (
SourceKind,
Transcript,
TranscriptStatus,
transcripts_controller,
)
from reflector.logger import logger
from reflector.pipelines.main_live_pipeline import PipelineMainBase, asynctask
from reflector.pipelines.main_live_pipeline import (
PipelineMainBase,
broadcast_to_sockets,
)
from reflector.processors import (
AudioFileWriterProcessor,
TranscriptFinalSummaryProcessor,
@@ -43,6 +51,7 @@ from reflector.processors.types import (
)
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
from reflector.worker.webhook import send_transcript_webhook
class EmptyPipeline:
@@ -83,12 +92,27 @@ class PipelineMainFile(PipelineMainBase):
exc_info=result,
)
@broadcast_to_sockets
async def set_status(self, transcript_id: str, status: TranscriptStatus):
async with self.lock_transaction():
return await transcripts_controller.set_status(transcript_id, status)
async def process(self, file_path: Path):
"""Main entry point for file processing"""
self.logger.info(f"Starting file pipeline for {file_path}")
transcript = await self.get_transcript()
# Clear transcript as we're going to regenerate everything
async with self.transaction():
await transcripts_controller.update(
transcript,
{
"events": [],
"topics": [],
},
)
# Extract audio and write to transcript location
audio_path = await self.extract_and_write_audio(file_path, transcript)
@@ -105,6 +129,8 @@ class PipelineMainFile(PipelineMainBase):
self.logger.info("File pipeline complete")
await transcripts_controller.set_status(transcript.id, "ended")
async def extract_and_write_audio(
self, file_path: Path, transcript: Transcript
) -> Path:
@@ -362,14 +388,34 @@ async def task_pipeline_file_process(*, transcript_id: str):
if not transcript:
raise Exception(f"Transcript {transcript_id} not found")
# Find the file to process
audio_file = next(transcript.data_path.glob("upload.*"), None)
if not audio_file:
audio_file = next(transcript.data_path.glob("audio.*"), None)
if not audio_file:
raise Exception("No audio file found to process")
# Run file pipeline
pipeline = PipelineMainFile(transcript_id=transcript_id)
await pipeline.process(audio_file)
try:
await pipeline.set_status(transcript_id, "processing")
# Find the file to process
audio_file = next(transcript.data_path.glob("upload.*"), None)
if not audio_file:
audio_file = next(transcript.data_path.glob("audio.*"), None)
if not audio_file:
raise Exception("No audio file found to process")
await pipeline.process(audio_file)
except Exception:
await pipeline.set_status(transcript_id, "error")
raise
# Trigger webhook if this is a room recording with webhook configured
if transcript.source_kind == SourceKind.ROOM and transcript.room_id:
room = await rooms_controller.get_by_id(transcript.room_id)
if room and room.webhook_url:
logger.info(
"Dispatching webhook task",
transcript_id=transcript_id,
room_id=room.id,
webhook_url=room.webhook_url,
)
send_transcript_webhook.delay(
transcript_id, room.id, event_id=uuid.uuid4().hex
)

View File

@@ -22,7 +22,7 @@ from celery import chord, current_task, group, shared_task
from pydantic import BaseModel
from structlog import BoundLogger as Logger
from reflector.db import get_database
from reflector.asynctask import asynctask
from reflector.db.meetings import meeting_consent_controller, meetings_controller
from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
@@ -32,6 +32,7 @@ from reflector.db.transcripts import (
TranscriptFinalLongSummary,
TranscriptFinalShortSummary,
TranscriptFinalTitle,
TranscriptStatus,
TranscriptText,
TranscriptTopic,
TranscriptWaveform,
@@ -69,29 +70,6 @@ from reflector.zulip import (
)
def asynctask(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
async def run_with_db():
database = get_database()
await database.connect()
try:
return await f(*args, **kwargs)
finally:
await database.disconnect()
coro = run_with_db()
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop and loop.is_running():
return loop.run_until_complete(coro)
return asyncio.run(coro)
return wrapper
def broadcast_to_sockets(func):
"""
Decorator to broadcast transcript event to websockets
@@ -188,8 +166,15 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
]
@asynccontextmanager
async def transaction(self):
async def lock_transaction(self):
# This lock is to prevent multiple processor starting adding
# into event array at the same time
async with self._lock:
yield
@asynccontextmanager
async def transaction(self):
async with self.lock_transaction():
async with transcripts_controller.transaction():
yield
@@ -198,14 +183,14 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
# if it's the first part, update the status of the transcript
# but do not set the ended status yet.
if isinstance(self, PipelineMainLive):
status_mapping = {
status_mapping: dict[str, TranscriptStatus] = {
"started": "recording",
"push": "recording",
"flush": "processing",
"error": "error",
}
elif isinstance(self, PipelineMainFinalSummaries):
status_mapping = {
status_mapping: dict[str, TranscriptStatus] = {
"push": "processing",
"flush": "processing",
"error": "error",
@@ -221,22 +206,8 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
return
# when the status of the pipeline changes, update the transcript
async with self.transaction():
transcript = await self.get_transcript()
if status == transcript.status:
return
resp = await transcripts_controller.append_event(
transcript=transcript,
event="STATUS",
data=StrValue(value=status),
)
await transcripts_controller.update(
transcript,
{
"status": status,
},
)
return resp
async with self._lock:
return await transcripts_controller.set_status(self.transcript_id, status)
@broadcast_to_sockets
async def on_transcript(self, data):
@@ -794,7 +765,7 @@ def pipeline_post(*, transcript_id: str):
chain_final_summaries,
) | task_pipeline_post_to_zulip.si(transcript_id=transcript_id)
chain.delay()
return chain.delay()
@get_transcript

View File

@@ -11,10 +11,7 @@ from reflector.processors.audio_chunker_auto import AudioChunkerAutoProcessor
class AudioChunkerSileroProcessor(AudioChunkerProcessor):
"""
Assemble audio frames into chunks with VAD-based speech detection using Silero VAD.
Expects input audio to be already downscaled to 16kHz mono s16 format
(handled by AudioDownscaleProcessor in the pipeline).
Assemble audio frames into chunks with VAD-based speech detection using Silero VAD
"""
def __init__(
@@ -34,13 +31,12 @@ class AudioChunkerSileroProcessor(AudioChunkerProcessor):
self._init_vad(use_onnx)
def _init_vad(self, use_onnx=False):
"""Initialize Silero VAD model for 16kHz audio"""
"""Initialize Silero VAD model"""
try:
torch.set_num_threads(1)
self.vad_model = load_silero_vad(onnx=use_onnx)
# VAD expects 16kHz audio (guaranteed by AudioDownscaleProcessor)
self.vad_iterator = VADIterator(self.vad_model, sampling_rate=16000)
self.logger.info("Silero VAD initialized for 16kHz audio")
self.logger.info("Silero VAD initialized successfully")
except Exception as e:
self.logger.error(f"Failed to initialize Silero VAD: {e}")
@@ -79,7 +75,7 @@ class AudioChunkerSileroProcessor(AudioChunkerProcessor):
return None
# Processing block with current buffer size
# print(f"Processing block: {len(self.frames)} frames in buffer")
print(f"Processing block: {len(self.frames)} frames in buffer")
try:
# Convert frames to numpy array for VAD
@@ -193,29 +189,38 @@ class AudioChunkerSileroProcessor(AudioChunkerProcessor):
return None
def _frames_to_numpy(self, frames: list[av.AudioFrame]) -> Optional[np.ndarray]:
"""Convert av.AudioFrame list to numpy array for VAD processing
Input frames are already 16kHz mono s16 format from AudioDownscaleProcessor.
Only need to convert s16 to float32 for Silero VAD.
"""
"""Convert av.AudioFrame list to numpy array for VAD processing"""
if not frames:
return None
try:
# Concatenate all frame arrays
audio_arrays = [frame.to_ndarray().flatten() for frame in frames]
if not audio_arrays:
audio_data = []
for frame in frames:
frame_array = frame.to_ndarray()
if len(frame_array.shape) == 2:
frame_array = frame_array.flatten()
audio_data.append(frame_array)
if not audio_data:
return None
combined_audio = np.concatenate(audio_arrays)
combined_audio = np.concatenate(audio_data)
# Convert s16 to float32 (Silero VAD requires float32 in range [-1.0, 1.0])
# Input is guaranteed to be s16 from AudioDownscaleProcessor
return combined_audio.astype(np.float32) / 32768.0
# Ensure float32 format
if combined_audio.dtype == np.int16:
# Normalize int16 audio to float32 in range [-1.0, 1.0]
combined_audio = combined_audio.astype(np.float32) / 32768.0
elif combined_audio.dtype != np.float32:
combined_audio = combined_audio.astype(np.float32)
return combined_audio
except Exception as e:
self.logger.error(f"Error converting frames to numpy: {e}")
return None
return None
def _find_speech_segment_end(self, audio_array: np.ndarray) -> Optional[int]:
"""Find complete speech segments and return frame index at segment end"""

View File

@@ -67,6 +67,9 @@ class FileTranscriptModalProcessor(FileTranscriptProcessor):
for word_info in result.get("words", [])
]
# words come not in order
words.sort(key=lambda w: w.start)
return Transcript(words=words)

View File

@@ -1,3 +1,4 @@
from pydantic.types import PositiveInt
from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -90,9 +91,8 @@ class Settings(BaseSettings):
AUTH_JWT_PUBLIC_KEY: str | None = "authentik.monadical.com_public.pem"
AUTH_JWT_AUDIENCE: str | None = None
# API public mode
# if set, all anonymous record will be public
PUBLIC_MODE: bool = False
PUBLIC_DATA_RETENTION_DAYS: PositiveInt = 7
# Min transcript length to generate topic + summary
MIN_TRANSCRIPT_LENGTH: int = 750

View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python
"""
Manual cleanup tool for old public data.
Uses the same implementation as the Celery worker task.
"""
import argparse
import asyncio
import sys
import structlog
from reflector.settings import settings
from reflector.worker.cleanup import _cleanup_old_public_data
logger = structlog.get_logger(__name__)
async def cleanup_old_data(days: int = 7):
logger.info(
"Starting manual cleanup",
retention_days=days,
public_mode=settings.PUBLIC_MODE,
)
if not settings.PUBLIC_MODE:
logger.critical(
"WARNING: PUBLIC_MODE is False. "
"This tool is intended for public instances only."
)
raise Exception("Tool intended for public instances only")
result = await _cleanup_old_public_data(days=days)
if result:
logger.info(
"Cleanup completed",
transcripts_deleted=result.get("transcripts_deleted", 0),
meetings_deleted=result.get("meetings_deleted", 0),
recordings_deleted=result.get("recordings_deleted", 0),
errors_count=len(result.get("errors", [])),
)
if result.get("errors"):
logger.warning(
"Errors encountered during cleanup:", errors=result["errors"][:10]
)
else:
logger.info("Cleanup skipped or completed without results")
def main():
parser = argparse.ArgumentParser(
description="Clean up old transcripts and meetings"
)
parser.add_argument(
"--days",
type=int,
default=7,
help="Number of days to keep data (default: 7)",
)
args = parser.parse_args()
if args.days < 1:
logger.error("Days must be at least 1")
sys.exit(1)
asyncio.run(cleanup_old_data(days=args.days))
if __name__ == "__main__":
main()

View File

@@ -1,294 +1,204 @@
"""
Process audio file with diarization support
===========================================
Extended version of process.py that includes speaker diarization.
This tool processes audio files locally without requiring the full server infrastructure.
"""
import argparse
import asyncio
import tempfile
import uuid
import json
import shutil
import sys
import time
from pathlib import Path
from typing import List
import av
from typing import Any, Dict, List, Literal
from reflector.db.transcripts import SourceKind, TranscriptTopic, transcripts_controller
from reflector.logger import logger
from reflector.processors import (
AudioChunkerAutoProcessor,
AudioDownscaleProcessor,
AudioFileWriterProcessor,
AudioMergeProcessor,
AudioTranscriptAutoProcessor,
Pipeline,
PipelineEvent,
TranscriptFinalSummaryProcessor,
TranscriptFinalTitleProcessor,
TranscriptLinerProcessor,
TranscriptTopicDetectorProcessor,
TranscriptTranslatorAutoProcessor,
from reflector.pipelines.main_file_pipeline import (
task_pipeline_file_process as task_pipeline_file_process,
)
from reflector.processors.base import BroadcastProcessor, Processor
from reflector.processors.types import (
AudioDiarizationInput,
TitleSummary,
TitleSummaryWithId,
from reflector.pipelines.main_live_pipeline import pipeline_post as live_pipeline_post
from reflector.pipelines.main_live_pipeline import (
pipeline_process as live_pipeline_process,
)
class TopicCollectorProcessor(Processor):
"""Collect topics for diarization"""
def serialize_topics(topics: List[TranscriptTopic]) -> List[Dict[str, Any]]:
"""Convert TranscriptTopic objects to JSON-serializable dicts"""
serialized = []
for topic in topics:
topic_dict = topic.model_dump()
serialized.append(topic_dict)
return serialized
INPUT_TYPE = TitleSummary
OUTPUT_TYPE = TitleSummary
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.topics: List[TitleSummaryWithId] = []
self._topic_id = 0
def debug_print_speakers(serialized_topics: List[Dict[str, Any]]) -> None:
"""Print debug info about speakers found in topics"""
all_speakers = set()
for topic_dict in serialized_topics:
for word in topic_dict.get("words", []):
all_speakers.add(word.get("speaker", 0))
async def _push(self, data: TitleSummary):
# Convert to TitleSummaryWithId and collect
self._topic_id += 1
topic_with_id = TitleSummaryWithId(
id=str(self._topic_id),
title=data.title,
summary=data.summary,
timestamp=data.timestamp,
duration=data.duration,
transcript=data.transcript,
print(
f"Found {len(serialized_topics)} topics with speakers: {all_speakers}",
file=sys.stderr,
)
TranscriptId = str
# common interface for every flow: it needs an Entry in db with specific ceremony (file path + status + actual file in file system)
# ideally we want to get rid of it at some point
async def prepare_entry(
source_path: str,
source_language: str,
target_language: str,
) -> TranscriptId:
file_path = Path(source_path)
transcript = await transcripts_controller.add(
file_path.name,
# note that the real file upload has SourceKind: LIVE for the reason of it's an error
source_kind=SourceKind.FILE,
source_language=source_language,
target_language=target_language,
user_id=None,
)
logger.info(
f"Created empty transcript {transcript.id} for file {file_path.name} because technically we need an empty transcript before we start transcript"
)
# pipelines expect files as upload.*
extension = file_path.suffix
upload_path = transcript.data_path / f"upload{extension}"
upload_path.parent.mkdir(parents=True, exist_ok=True)
shutil.copy2(source_path, upload_path)
logger.info(f"Copied {source_path} to {upload_path}")
# pipelines expect entity status "uploaded"
await transcripts_controller.update(transcript, {"status": "uploaded"})
return transcript.id
# same reason as prepare_entry
async def extract_result_from_entry(
transcript_id: TranscriptId, output_path: str
) -> None:
post_final_transcript = await transcripts_controller.get_by_id(transcript_id)
# assert post_final_transcript.status == "ended"
# File pipeline doesn't set status to "ended", only live pipeline does https://github.com/Monadical-SAS/reflector/issues/582
topics = post_final_transcript.topics
if not topics:
raise RuntimeError(
f"No topics found for transcript {transcript_id} after processing"
)
self.topics.append(topic_with_id)
# Pass through the original topic
await self.emit(data)
serialized_topics = serialize_topics(topics)
def get_topics(self) -> List[TitleSummaryWithId]:
return self.topics
if output_path:
# Write to JSON file
with open(output_path, "w") as f:
for topic_dict in serialized_topics:
json.dump(topic_dict, f)
f.write("\n")
print(f"Results written to {output_path}", file=sys.stderr)
else:
# Write to stdout as JSONL
for topic_dict in serialized_topics:
print(json.dumps(topic_dict))
debug_print_speakers(serialized_topics)
async def process_audio_file(
filename,
event_callback,
only_transcript=False,
source_language="en",
target_language="en",
enable_diarization=True,
diarization_backend="pyannote",
async def process_live_pipeline(
transcript_id: TranscriptId,
):
# Create temp file for audio if diarization is enabled
audio_temp_path = None
if enable_diarization:
audio_temp_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
audio_temp_path = audio_temp_file.name
audio_temp_file.close()
"""Process transcript_id with transcription and diarization"""
# Create processor for collecting topics
topic_collector = TopicCollectorProcessor()
print(f"Processing transcript_id {transcript_id}...", file=sys.stderr)
await live_pipeline_process(transcript_id=transcript_id)
print(f"Processing complete for transcript {transcript_id}", file=sys.stderr)
# Build pipeline for audio processing
processors = []
pre_final_transcript = await transcripts_controller.get_by_id(transcript_id)
# Add audio file writer at the beginning if diarization is enabled
if enable_diarization:
processors.append(AudioFileWriterProcessor(audio_temp_path))
# assert documented behaviour: after process, the pipeline isn't ended. this is the reason of calling pipeline_post
assert pre_final_transcript.status != "ended"
# Add the rest of the processors
processors += [
AudioDownscaleProcessor(),
AudioChunkerAutoProcessor(),
AudioMergeProcessor(),
AudioTranscriptAutoProcessor.as_threaded(),
TranscriptLinerProcessor(),
TranscriptTranslatorAutoProcessor.as_threaded(),
]
# at this point, diarization is running but we have no access to it. run diarization in parallel - one will hopefully win after polling
result = live_pipeline_post(transcript_id=transcript_id)
if not only_transcript:
processors += [
TranscriptTopicDetectorProcessor.as_threaded(),
# Collect topics for diarization
topic_collector,
BroadcastProcessor(
processors=[
TranscriptFinalTitleProcessor.as_threaded(),
TranscriptFinalSummaryProcessor.as_threaded(),
],
),
]
# Create main pipeline
pipeline = Pipeline(*processors)
pipeline.set_pref("audio:source_language", source_language)
pipeline.set_pref("audio:target_language", target_language)
pipeline.describe()
pipeline.on(event_callback)
# Start processing audio
logger.info(f"Opening {filename}")
container = av.open(filename)
try:
logger.info("Start pushing audio into the pipeline")
for frame in container.decode(audio=0):
await pipeline.push(frame)
finally:
logger.info("Flushing the pipeline")
await pipeline.flush()
# Run diarization if enabled and we have topics
if enable_diarization and not only_transcript and audio_temp_path:
topics = topic_collector.get_topics()
if topics:
logger.info(f"Starting diarization with {len(topics)} topics")
try:
from reflector.processors import AudioDiarizationAutoProcessor
diarization_processor = AudioDiarizationAutoProcessor(
name=diarization_backend
)
diarization_processor.set_pipeline(pipeline)
# For Modal backend, we need to upload the file to S3 first
if diarization_backend == "modal":
from datetime import datetime
from reflector.storage import get_transcripts_storage
from reflector.utils.s3_temp_file import S3TemporaryFile
storage = get_transcripts_storage()
# Generate a unique filename in evaluation folder
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
audio_filename = f"evaluation/diarization_temp/{timestamp}_{uuid.uuid4().hex}.wav"
# Use context manager for automatic cleanup
async with S3TemporaryFile(storage, audio_filename) as s3_file:
# Read and upload the audio file
with open(audio_temp_path, "rb") as f:
audio_data = f.read()
audio_url = await s3_file.upload(audio_data)
logger.info(f"Uploaded audio to S3: {audio_filename}")
# Create diarization input with S3 URL
diarization_input = AudioDiarizationInput(
audio_url=audio_url, topics=topics
)
# Run diarization
await diarization_processor.push(diarization_input)
await diarization_processor.flush()
logger.info("Diarization complete")
# File will be automatically cleaned up when exiting the context
else:
# For local backend, use local file path
audio_url = audio_temp_path
# Create diarization input
diarization_input = AudioDiarizationInput(
audio_url=audio_url, topics=topics
)
# Run diarization
await diarization_processor.push(diarization_input)
await diarization_processor.flush()
logger.info("Diarization complete")
except ImportError as e:
logger.error(f"Failed to import diarization dependencies: {e}")
logger.error(
"Install with: uv pip install pyannote.audio torch torchaudio"
)
logger.error(
"And set HF_TOKEN environment variable for pyannote models"
)
raise SystemExit(1)
except Exception as e:
logger.error(f"Diarization failed: {e}")
raise SystemExit(1)
else:
logger.warning("Skipping diarization: no topics available")
# Clean up temp file
if audio_temp_path:
try:
Path(audio_temp_path).unlink()
except Exception as e:
logger.warning(f"Failed to clean up temp file {audio_temp_path}: {e}")
logger.info("All done!")
# result.ready() blocks even without await; it mutates result also
while not result.ready():
print(f"Status: {result.state}")
time.sleep(2)
async def process_file_pipeline(
filename: str,
event_callback,
source_language="en",
target_language="en",
enable_diarization=True,
diarization_backend="modal",
transcript_id: TranscriptId,
):
"""Process audio/video file using the optimized file pipeline"""
# task_pipeline_file_process is a Celery task, need to use .delay() for async execution
result = task_pipeline_file_process.delay(transcript_id=transcript_id)
# Wait for the Celery task to complete
while not result.ready():
print(f"File pipeline status: {result.state}", file=sys.stderr)
time.sleep(2)
logger.info("File pipeline processing complete")
async def process(
source_path: str,
source_language: str,
target_language: str,
pipeline: Literal["live", "file"],
output_path: str = None,
):
from reflector.db import get_database
database = get_database()
# db connect is a part of ceremony
await database.connect()
try:
from reflector.db import database
from reflector.db.transcripts import SourceKind, transcripts_controller
from reflector.pipelines.main_file_pipeline import PipelineMainFile
await database.connect()
try:
# Create a temporary transcript for processing
transcript = await transcripts_controller.add(
"",
source_kind=SourceKind.FILE,
source_language=source_language,
target_language=target_language,
)
# Process the file
pipeline = PipelineMainFile(transcript_id=transcript.id)
await pipeline.process(Path(filename))
logger.info("File pipeline processing complete")
finally:
await database.disconnect()
except ImportError as e:
logger.error(f"File pipeline not available: {e}")
logger.info("Falling back to stream pipeline")
# Fall back to stream pipeline
await process_audio_file(
filename,
event_callback,
only_transcript=False,
source_language=source_language,
target_language=target_language,
enable_diarization=enable_diarization,
diarization_backend=diarization_backend,
transcript_id = await prepare_entry(
source_path,
source_language,
target_language,
)
pipeline_handlers = {
"live": process_live_pipeline,
"file": process_file_pipeline,
}
handler = pipeline_handlers.get(pipeline)
if not handler:
raise ValueError(f"Unknown pipeline type: {pipeline}")
await handler(transcript_id)
await extract_result_from_entry(transcript_id, output_path)
finally:
await database.disconnect()
if __name__ == "__main__":
import argparse
import os
parser = argparse.ArgumentParser(
description="Process audio files with optional speaker diarization"
description="Process audio files with speaker diarization"
)
parser.add_argument("source", help="Source file (mp3, wav, mp4...)")
parser.add_argument(
"--stream",
action="store_true",
help="Use streaming pipeline (original frame-based processing)",
)
parser.add_argument(
"--only-transcript",
"-t",
action="store_true",
help="Only generate transcript without topics/summaries",
"--pipeline",
required=True,
choices=["live", "file"],
help="Pipeline type to use for processing (live: streaming/incremental, file: batch/parallel)",
)
parser.add_argument(
"--source-language", default="en", help="Source language code (default: en)"
@@ -297,82 +207,14 @@ if __name__ == "__main__":
"--target-language", default="en", help="Target language code (default: en)"
)
parser.add_argument("--output", "-o", help="Output file (output.jsonl)")
parser.add_argument(
"--enable-diarization",
"-d",
action="store_true",
help="Enable speaker diarization",
)
parser.add_argument(
"--diarization-backend",
default="pyannote",
choices=["pyannote", "modal"],
help="Diarization backend to use (default: pyannote)",
)
args = parser.parse_args()
if "REDIS_HOST" not in os.environ:
os.environ["REDIS_HOST"] = "localhost"
output_fd = None
if args.output:
output_fd = open(args.output, "w")
async def event_callback(event: PipelineEvent):
processor = event.processor
data = event.data
# Ignore internal processors
if processor in (
"AudioDownscaleProcessor",
"AudioChunkerAutoProcessor",
"AudioMergeProcessor",
"AudioFileWriterProcessor",
"TopicCollectorProcessor",
"BroadcastProcessor",
):
return
# If diarization is enabled, skip the original topic events from the pipeline
# The diarization processor will emit the same topics but with speaker info
if processor == "TranscriptTopicDetectorProcessor" and args.enable_diarization:
return
# Log all events
logger.info(f"Event: {processor} - {type(data).__name__}")
# Write to output
if output_fd:
output_fd.write(event.model_dump_json())
output_fd.write("\n")
output_fd.flush()
if args.stream:
# Use original streaming pipeline
asyncio.run(
process_audio_file(
args.source,
event_callback,
only_transcript=args.only_transcript,
source_language=args.source_language,
target_language=args.target_language,
enable_diarization=args.enable_diarization,
diarization_backend=args.diarization_backend,
)
asyncio.run(
process(
args.source,
args.source_language,
args.target_language,
args.pipeline,
args.output,
)
else:
# Use optimized file pipeline (default)
asyncio.run(
process_file_pipeline(
args.source,
event_callback,
source_language=args.source_language,
target_language=args.target_language,
enable_diarization=args.enable_diarization,
diarization_backend=args.diarization_backend,
)
)
if output_fd:
output_fd.close()
logger.info(f"Output written to {args.output}")
)

View File

@@ -1,318 +0,0 @@
"""
@vibe-generated
Process audio file with diarization support
===========================================
Extended version of process.py that includes speaker diarization.
This tool processes audio files locally without requiring the full server infrastructure.
"""
import asyncio
import tempfile
import uuid
from pathlib import Path
from typing import List
import av
from reflector.logger import logger
from reflector.processors import (
AudioChunkerAutoProcessor,
AudioDownscaleProcessor,
AudioFileWriterProcessor,
AudioMergeProcessor,
AudioTranscriptAutoProcessor,
Pipeline,
PipelineEvent,
TranscriptFinalSummaryProcessor,
TranscriptFinalTitleProcessor,
TranscriptLinerProcessor,
TranscriptTopicDetectorProcessor,
TranscriptTranslatorAutoProcessor,
)
from reflector.processors.base import BroadcastProcessor, Processor
from reflector.processors.types import (
AudioDiarizationInput,
TitleSummary,
TitleSummaryWithId,
)
class TopicCollectorProcessor(Processor):
"""Collect topics for diarization"""
INPUT_TYPE = TitleSummary
OUTPUT_TYPE = TitleSummary
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.topics: List[TitleSummaryWithId] = []
self._topic_id = 0
async def _push(self, data: TitleSummary):
# Convert to TitleSummaryWithId and collect
self._topic_id += 1
topic_with_id = TitleSummaryWithId(
id=str(self._topic_id),
title=data.title,
summary=data.summary,
timestamp=data.timestamp,
duration=data.duration,
transcript=data.transcript,
)
self.topics.append(topic_with_id)
# Pass through the original topic
await self.emit(data)
def get_topics(self) -> List[TitleSummaryWithId]:
return self.topics
async def process_audio_file_with_diarization(
filename,
event_callback,
only_transcript=False,
source_language="en",
target_language="en",
enable_diarization=True,
diarization_backend="modal",
):
# Create temp file for audio if diarization is enabled
audio_temp_path = None
if enable_diarization:
audio_temp_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
audio_temp_path = audio_temp_file.name
audio_temp_file.close()
# Create processor for collecting topics
topic_collector = TopicCollectorProcessor()
# Build pipeline for audio processing
processors = []
# Add audio file writer at the beginning if diarization is enabled
if enable_diarization:
processors.append(AudioFileWriterProcessor(audio_temp_path))
# Add the rest of the processors
processors += [
AudioDownscaleProcessor(),
AudioChunkerAutoProcessor(),
AudioMergeProcessor(),
AudioTranscriptAutoProcessor.as_threaded(),
]
processors += [
TranscriptLinerProcessor(),
TranscriptTranslatorAutoProcessor.as_threaded(),
]
if not only_transcript:
processors += [
TranscriptTopicDetectorProcessor.as_threaded(),
# Collect topics for diarization
topic_collector,
BroadcastProcessor(
processors=[
TranscriptFinalTitleProcessor.as_threaded(),
TranscriptFinalSummaryProcessor.as_threaded(),
],
),
]
# Create main pipeline
pipeline = Pipeline(*processors)
pipeline.set_pref("audio:source_language", source_language)
pipeline.set_pref("audio:target_language", target_language)
pipeline.describe()
pipeline.on(event_callback)
# Start processing audio
logger.info(f"Opening {filename}")
container = av.open(filename)
try:
logger.info("Start pushing audio into the pipeline")
for frame in container.decode(audio=0):
await pipeline.push(frame)
finally:
logger.info("Flushing the pipeline")
await pipeline.flush()
# Run diarization if enabled and we have topics
if enable_diarization and not only_transcript and audio_temp_path:
topics = topic_collector.get_topics()
if topics:
logger.info(f"Starting diarization with {len(topics)} topics")
try:
from reflector.processors import AudioDiarizationAutoProcessor
diarization_processor = AudioDiarizationAutoProcessor(
name=diarization_backend
)
diarization_processor.set_pipeline(pipeline)
# For Modal backend, we need to upload the file to S3 first
if diarization_backend == "modal":
from datetime import datetime, timezone
from reflector.storage import get_transcripts_storage
from reflector.utils.s3_temp_file import S3TemporaryFile
storage = get_transcripts_storage()
# Generate a unique filename in evaluation folder
timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S")
audio_filename = f"evaluation/diarization_temp/{timestamp}_{uuid.uuid4().hex}.wav"
# Use context manager for automatic cleanup
async with S3TemporaryFile(storage, audio_filename) as s3_file:
# Read and upload the audio file
with open(audio_temp_path, "rb") as f:
audio_data = f.read()
audio_url = await s3_file.upload(audio_data)
logger.info(f"Uploaded audio to S3: {audio_filename}")
# Create diarization input with S3 URL
diarization_input = AudioDiarizationInput(
audio_url=audio_url, topics=topics
)
# Run diarization
await diarization_processor.push(diarization_input)
await diarization_processor.flush()
logger.info("Diarization complete")
# File will be automatically cleaned up when exiting the context
else:
# For local backend, use local file path
audio_url = audio_temp_path
# Create diarization input
diarization_input = AudioDiarizationInput(
audio_url=audio_url, topics=topics
)
# Run diarization
await diarization_processor.push(diarization_input)
await diarization_processor.flush()
logger.info("Diarization complete")
except ImportError as e:
logger.error(f"Failed to import diarization dependencies: {e}")
logger.error(
"Install with: uv pip install pyannote.audio torch torchaudio"
)
logger.error(
"And set HF_TOKEN environment variable for pyannote models"
)
raise SystemExit(1)
except Exception as e:
logger.error(f"Diarization failed: {e}")
raise SystemExit(1)
else:
logger.warning("Skipping diarization: no topics available")
# Clean up temp file
if audio_temp_path:
try:
Path(audio_temp_path).unlink()
except Exception as e:
logger.warning(f"Failed to clean up temp file {audio_temp_path}: {e}")
logger.info("All done!")
if __name__ == "__main__":
import argparse
import os
parser = argparse.ArgumentParser(
description="Process audio files with optional speaker diarization"
)
parser.add_argument("source", help="Source file (mp3, wav, mp4...)")
parser.add_argument(
"--only-transcript",
"-t",
action="store_true",
help="Only generate transcript without topics/summaries",
)
parser.add_argument(
"--source-language", default="en", help="Source language code (default: en)"
)
parser.add_argument(
"--target-language", default="en", help="Target language code (default: en)"
)
parser.add_argument("--output", "-o", help="Output file (output.jsonl)")
parser.add_argument(
"--enable-diarization",
"-d",
action="store_true",
help="Enable speaker diarization",
)
parser.add_argument(
"--diarization-backend",
default="modal",
choices=["modal"],
help="Diarization backend to use (default: modal)",
)
args = parser.parse_args()
# Set REDIS_HOST to localhost if not provided
if "REDIS_HOST" not in os.environ:
os.environ["REDIS_HOST"] = "localhost"
logger.info("REDIS_HOST not set, defaulting to localhost")
output_fd = None
if args.output:
output_fd = open(args.output, "w")
async def event_callback(event: PipelineEvent):
processor = event.processor
data = event.data
# Ignore internal processors
if processor in (
"AudioDownscaleProcessor",
"AudioChunkerAutoProcessor",
"AudioMergeProcessor",
"AudioFileWriterProcessor",
"TopicCollectorProcessor",
"BroadcastProcessor",
):
return
# If diarization is enabled, skip the original topic events from the pipeline
# The diarization processor will emit the same topics but with speaker info
if processor == "TranscriptTopicDetectorProcessor" and args.enable_diarization:
return
# Log all events
logger.info(f"Event: {processor} - {type(data).__name__}")
# Write to output
if output_fd:
output_fd.write(event.model_dump_json())
output_fd.write("\n")
output_fd.flush()
asyncio.run(
process_audio_file_with_diarization(
args.source,
event_callback,
only_transcript=args.only_transcript,
source_language=args.source_language,
target_language=args.target_language,
enable_diarization=args.enable_diarization,
diarization_backend=args.diarization_backend,
)
)
if output_fd:
output_fd.close()
logger.info(f"Output written to {args.output}")

View File

@@ -1,96 +0,0 @@
#!/usr/bin/env python3
"""
@vibe-generated
Test script for the diarization CLI tool
=========================================
This script helps test the diarization functionality with sample audio files.
"""
import asyncio
import sys
from pathlib import Path
from reflector.logger import logger
async def test_diarization(audio_file: str):
"""Test the diarization functionality"""
# Import the processing function
from process_with_diarization import process_audio_file_with_diarization
# Collect events
events = []
async def event_callback(event):
events.append({"processor": event.processor, "data": event.data})
logger.info(f"Event from {event.processor}")
# Process the audio file
logger.info(f"Processing audio file: {audio_file}")
try:
await process_audio_file_with_diarization(
audio_file,
event_callback,
only_transcript=False,
source_language="en",
target_language="en",
enable_diarization=True,
diarization_backend="modal",
)
# Analyze results
logger.info(f"Processing complete. Received {len(events)} events")
# Look for diarization results
diarized_topics = []
for event in events:
if "TitleSummary" in event["processor"]:
# Check if words have speaker information
if hasattr(event["data"], "transcript") and event["data"].transcript:
words = event["data"].transcript.words
if words and hasattr(words[0], "speaker"):
speakers = set(
w.speaker for w in words if hasattr(w, "speaker")
)
logger.info(
f"Found {len(speakers)} speakers in topic: {event['data'].title}"
)
diarized_topics.append(event["data"])
if diarized_topics:
logger.info(f"Successfully diarized {len(diarized_topics)} topics")
# Print sample output
sample_topic = diarized_topics[0]
logger.info("Sample diarized output:")
for i, word in enumerate(sample_topic.transcript.words[:10]):
logger.info(f" Word {i}: '{word.text}' - Speaker {word.speaker}")
else:
logger.warning("No diarization results found in output")
return events
except Exception as e:
logger.error(f"Error during processing: {e}")
raise
def main():
if len(sys.argv) < 2:
print("Usage: python test_diarization.py <audio_file>")
sys.exit(1)
audio_file = sys.argv[1]
if not Path(audio_file).exists():
print(f"Error: Audio file '{audio_file}' not found")
sys.exit(1)
# Run the test
asyncio.run(test_diarization(audio_file))
if __name__ == "__main__":
main()

View File

@@ -15,6 +15,7 @@ from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.settings import settings
from reflector.whereby import create_meeting, upload_logo
from reflector.worker.webhook import test_webhook
logger = logging.getLogger(__name__)
@@ -44,6 +45,11 @@ class Room(BaseModel):
is_shared: bool
class RoomDetails(Room):
webhook_url: str
webhook_secret: str
class Meeting(BaseModel):
id: str
room_name: str
@@ -64,6 +70,8 @@ class CreateRoom(BaseModel):
recording_type: str
recording_trigger: str
is_shared: bool
webhook_url: str
webhook_secret: str
class UpdateRoom(BaseModel):
@@ -76,16 +84,26 @@ class UpdateRoom(BaseModel):
recording_type: str
recording_trigger: str
is_shared: bool
webhook_url: str
webhook_secret: str
class DeletionStatus(BaseModel):
status: str
@router.get("/rooms", response_model=Page[Room])
class WebhookTestResult(BaseModel):
success: bool
message: str = ""
error: str = ""
status_code: int | None = None
response_preview: str | None = None
@router.get("/rooms", response_model=Page[RoomDetails])
async def rooms_list(
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
) -> list[Room]:
) -> list[RoomDetails]:
if not user and not settings.PUBLIC_MODE:
raise HTTPException(status_code=401, detail="Not authenticated")
@@ -99,6 +117,18 @@ async def rooms_list(
)
@router.get("/rooms/{room_id}", response_model=RoomDetails)
async def rooms_get(
room_id: str,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
):
user_id = user["sub"] if user else None
room = await rooms_controller.get_by_id_for_http(room_id, user_id=user_id)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
return room
@router.post("/rooms", response_model=Room)
async def rooms_create(
room: CreateRoom,
@@ -117,10 +147,12 @@ async def rooms_create(
recording_type=room.recording_type,
recording_trigger=room.recording_trigger,
is_shared=room.is_shared,
webhook_url=room.webhook_url,
webhook_secret=room.webhook_secret,
)
@router.patch("/rooms/{room_id}", response_model=Room)
@router.patch("/rooms/{room_id}", response_model=RoomDetails)
async def rooms_update(
room_id: str,
info: UpdateRoom,
@@ -209,3 +241,24 @@ async def rooms_create_meeting(
meeting.host_room_url = ""
return meeting
@router.post("/rooms/{room_id}/webhook/test", response_model=WebhookTestResult)
async def rooms_test_webhook(
room_id: str,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
):
"""Test webhook configuration by sending a sample payload."""
user_id = user["sub"] if user else None
room = await rooms_controller.get_by_id(room_id)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
if user_id and room.user_id != user_id:
raise HTTPException(
status_code=403, detail="Not authorized to test this room's webhook"
)
result = await test_webhook(room_id)
return WebhookTestResult(**result)

View File

@@ -6,7 +6,7 @@ from pydantic import BaseModel
import reflector.auth as auth
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import task_pipeline_process
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
router = APIRouter()
@@ -40,7 +40,7 @@ async def transcript_process(
return ProcessStatus(status="already running")
# schedule a background task process the file
task_pipeline_process.delay(transcript_id=transcript_id)
task_pipeline_file_process.delay(transcript_id=transcript_id)
return ProcessStatus(status="ok")

View File

@@ -6,7 +6,7 @@ from pydantic import BaseModel
import reflector.auth as auth
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import task_pipeline_process
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
router = APIRouter()
@@ -92,6 +92,6 @@ async def transcript_record_upload(
await transcripts_controller.update(transcript, {"status": "uploaded"})
# launch a background task to process the file
task_pipeline_process.delay(transcript_id=transcript_id)
task_pipeline_file_process.delay(transcript_id=transcript_id)
return UploadStatus(status="ok")

View File

@@ -19,6 +19,7 @@ else:
"reflector.pipelines.main_live_pipeline",
"reflector.worker.healthcheck",
"reflector.worker.process",
"reflector.worker.cleanup",
]
)
@@ -38,6 +39,16 @@ else:
},
}
if settings.PUBLIC_MODE:
app.conf.beat_schedule["cleanup_old_public_data"] = {
"task": "reflector.worker.cleanup.cleanup_old_public_data_task",
"schedule": crontab(hour=3, minute=0),
}
logger.info(
"Public mode cleanup enabled",
retention_days=settings.PUBLIC_DATA_RETENTION_DAYS,
)
if settings.HEALTHCHECK_URL:
app.conf.beat_schedule["healthcheck_ping"] = {
"task": "reflector.worker.healthcheck.healthcheck_ping",

View File

@@ -0,0 +1,156 @@
"""
Main task for cleanup old public data.
Deletes old anonymous transcripts and their associated meetings/recordings.
Transcripts are the main entry point - any associated data is also removed.
"""
import asyncio
from datetime import datetime, timedelta, timezone
from typing import TypedDict
import structlog
from celery import shared_task
from databases import Database
from pydantic.types import PositiveInt
from reflector.asynctask import asynctask
from reflector.db import get_database
from reflector.db.meetings import meetings
from reflector.db.recordings import recordings
from reflector.db.transcripts import transcripts, transcripts_controller
from reflector.settings import settings
from reflector.storage import get_recordings_storage
logger = structlog.get_logger(__name__)
class CleanupStats(TypedDict):
"""Statistics for cleanup operation."""
transcripts_deleted: int
meetings_deleted: int
recordings_deleted: int
errors: list[str]
async def delete_single_transcript(
db: Database, transcript_data: dict, stats: CleanupStats
):
transcript_id = transcript_data["id"]
meeting_id = transcript_data["meeting_id"]
recording_id = transcript_data["recording_id"]
try:
async with db.transaction(isolation="serializable"):
if meeting_id:
await db.execute(meetings.delete().where(meetings.c.id == meeting_id))
stats["meetings_deleted"] += 1
logger.info("Deleted associated meeting", meeting_id=meeting_id)
if recording_id:
recording = await db.fetch_one(
recordings.select().where(recordings.c.id == recording_id)
)
if recording:
try:
await get_recordings_storage().delete_file(
recording["object_key"]
)
except Exception as storage_error:
logger.warning(
"Failed to delete recording from storage",
recording_id=recording_id,
object_key=recording["object_key"],
error=str(storage_error),
)
await db.execute(
recordings.delete().where(recordings.c.id == recording_id)
)
stats["recordings_deleted"] += 1
logger.info(
"Deleted associated recording", recording_id=recording_id
)
await transcripts_controller.remove_by_id(transcript_id)
stats["transcripts_deleted"] += 1
logger.info(
"Deleted transcript",
transcript_id=transcript_id,
created_at=transcript_data["created_at"].isoformat(),
)
except Exception as e:
error_msg = f"Failed to delete transcript {transcript_id}: {str(e)}"
logger.error(error_msg, exc_info=e)
stats["errors"].append(error_msg)
async def cleanup_old_transcripts(
db: Database, cutoff_date: datetime, stats: CleanupStats
):
"""Delete old anonymous transcripts and their associated recordings/meetings."""
query = transcripts.select().where(
(transcripts.c.created_at < cutoff_date) & (transcripts.c.user_id.is_(None))
)
old_transcripts = await db.fetch_all(query)
logger.info(f"Found {len(old_transcripts)} old transcripts to delete")
for transcript_data in old_transcripts:
await delete_single_transcript(db, transcript_data, stats)
def log_cleanup_results(stats: CleanupStats):
logger.info(
"Cleanup completed",
transcripts_deleted=stats["transcripts_deleted"],
meetings_deleted=stats["meetings_deleted"],
recordings_deleted=stats["recordings_deleted"],
errors_count=len(stats["errors"]),
)
if stats["errors"]:
logger.warning(
"Cleanup completed with errors",
errors=stats["errors"][:10],
)
async def cleanup_old_public_data(
days: PositiveInt | None = None,
) -> CleanupStats | None:
if days is None:
days = settings.PUBLIC_DATA_RETENTION_DAYS
if not settings.PUBLIC_MODE:
logger.info("Skipping cleanup - not a public instance")
return None
cutoff_date = datetime.now(timezone.utc) - timedelta(days=days)
logger.info(
"Starting cleanup of old public data",
cutoff_date=cutoff_date.isoformat(),
)
stats: CleanupStats = {
"transcripts_deleted": 0,
"meetings_deleted": 0,
"recordings_deleted": 0,
"errors": [],
}
db = get_database()
await cleanup_old_transcripts(db, cutoff_date, stats)
log_cleanup_results(stats)
return stats
@shared_task(
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 3, "countdown": 300},
)
@asynctask
def cleanup_old_public_data_task(days: int | None = None):
asyncio.run(cleanup_old_public_data(days=days))

View File

@@ -0,0 +1,258 @@
"""Webhook task for sending transcript notifications."""
import hashlib
import hmac
import json
import uuid
from datetime import datetime, timezone
import httpx
import structlog
from celery import shared_task
from celery.utils.log import get_task_logger
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.settings import settings
from reflector.utils.webvtt import topics_to_webvtt
logger = structlog.wrap_logger(get_task_logger(__name__))
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
@shared_task(
bind=True,
max_retries=30,
default_retry_delay=60,
retry_backoff=True,
retry_backoff_max=3600, # Max 1 hour between retries
)
@asynctask
async def send_transcript_webhook(
self,
transcript_id: str,
room_id: str,
event_id: str,
):
log = logger.bind(
transcript_id=transcript_id,
room_id=room_id,
retry_count=self.request.retries,
)
try:
# Fetch transcript and room
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
log.error("Transcript not found, skipping webhook")
return
room = await rooms_controller.get_by_id(room_id)
if not room:
log.error("Room not found, skipping webhook")
return
if not room.webhook_url:
log.info("No webhook URL configured for room, skipping")
return
# Generate WebVTT content from topics
topics_data = []
if transcript.topics:
# Build topics data with diarized content per topic
for topic in transcript.topics:
topic_webvtt = topics_to_webvtt([topic]) if topic.words else ""
topics_data.append(
{
"title": topic.title,
"summary": topic.summary,
"timestamp": topic.timestamp,
"duration": topic.duration,
"webvtt": topic_webvtt,
}
)
# Build webhook payload
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
participants = [
{"id": p.id, "name": p.name, "speaker": p.speaker}
for p in (transcript.participants or [])
]
payload_data = {
"event": "transcript.completed",
"event_id": event_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"transcript": {
"id": transcript.id,
"room_id": transcript.room_id,
"created_at": transcript.created_at.isoformat(),
"duration": transcript.duration,
"title": transcript.title,
"short_summary": transcript.short_summary,
"long_summary": transcript.long_summary,
"webvtt": transcript.webvtt,
"topics": topics_data,
"participants": participants,
"source_language": transcript.source_language,
"target_language": transcript.target_language,
"status": transcript.status,
"frontend_url": frontend_url,
},
"room": {
"id": room.id,
"name": room.name,
},
}
# Convert to JSON
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8")
# Generate signature if secret is configured
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "transcript.completed",
"X-Webhook-Retry": str(self.request.retries),
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
# Send webhook with timeout
async with httpx.AsyncClient(timeout=30.0) as client:
log.info(
"Sending webhook",
url=room.webhook_url,
payload_size=len(payload_bytes),
)
response = await client.post(
room.webhook_url,
content=payload_bytes,
headers=headers,
)
response.raise_for_status()
log.info(
"Webhook sent successfully",
status_code=response.status_code,
response_size=len(response.content),
)
except httpx.HTTPStatusError as e:
log.error(
"Webhook failed with HTTP error",
status_code=e.response.status_code,
response_text=e.response.text[:500], # First 500 chars
)
# Don't retry on client errors (4xx)
if 400 <= e.response.status_code < 500:
log.error("Client error, not retrying")
return
# Retry on server errors (5xx)
raise self.retry(exc=e)
except (httpx.ConnectError, httpx.TimeoutException) as e:
# Retry on network errors
log.error("Webhook failed with connection error", error=str(e))
raise self.retry(exc=e)
except Exception as e:
# Retry on unexpected errors
log.exception("Unexpected error in webhook task", error=str(e))
raise self.retry(exc=e)
async def test_webhook(room_id: str) -> dict:
"""
Test webhook configuration by sending a sample payload.
Returns immediately with success/failure status.
This is the shared implementation used by both the API endpoint and Celery task.
"""
try:
room = await rooms_controller.get_by_id(room_id)
if not room:
return {"success": False, "error": "Room not found"}
if not room.webhook_url:
return {"success": False, "error": "No webhook URL configured"}
now = (datetime.now(timezone.utc).isoformat(),)
payload_data = {
"event": "test",
"event_id": uuid.uuid4().hex,
"timestamp": now,
"message": "This is a test webhook from Reflector",
"room": {
"id": room.id,
"name": room.name,
},
}
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8")
# Generate headers with signature
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "test",
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
# Send test webhook with short timeout
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(
room.webhook_url,
content=payload_bytes,
headers=headers,
)
return {
"success": response.is_success,
"status_code": response.status_code,
"message": f"Webhook test {'successful' if response.is_success else 'failed'}",
"response_preview": response.text if response.text else None,
}
except httpx.TimeoutException:
return {
"success": False,
"error": "Webhook request timed out (10 seconds)",
}
except httpx.ConnectError as e:
return {
"success": False,
"error": f"Could not connect to webhook URL: {str(e)}",
}
except Exception as e:
return {
"success": False,
"error": f"Unexpected error: {str(e)}",
}

View File

@@ -178,6 +178,63 @@ async def dummy_diarization():
yield
@pytest.fixture
async def dummy_file_transcript():
from reflector.processors.file_transcript import FileTranscriptProcessor
from reflector.processors.types import Transcript, Word
class TestFileTranscriptProcessor(FileTranscriptProcessor):
async def _transcript(self, data):
return Transcript(
text="Hello world. How are you today?",
words=[
Word(start=0.0, end=0.5, text="Hello", speaker=0),
Word(start=0.5, end=0.6, text=" ", speaker=0),
Word(start=0.6, end=1.0, text="world", speaker=0),
Word(start=1.0, end=1.1, text=".", speaker=0),
Word(start=1.1, end=1.2, text=" ", speaker=0),
Word(start=1.2, end=1.5, text="How", speaker=0),
Word(start=1.5, end=1.6, text=" ", speaker=0),
Word(start=1.6, end=1.8, text="are", speaker=0),
Word(start=1.8, end=1.9, text=" ", speaker=0),
Word(start=1.9, end=2.1, text="you", speaker=0),
Word(start=2.1, end=2.2, text=" ", speaker=0),
Word(start=2.2, end=2.5, text="today", speaker=0),
Word(start=2.5, end=2.6, text="?", speaker=0),
],
)
with patch(
"reflector.processors.file_transcript_auto.FileTranscriptAutoProcessor.__new__"
) as mock_auto:
mock_auto.return_value = TestFileTranscriptProcessor()
yield
@pytest.fixture
async def dummy_file_diarization():
from reflector.processors.file_diarization import (
FileDiarizationOutput,
FileDiarizationProcessor,
)
from reflector.processors.types import DiarizationSegment
class TestFileDiarizationProcessor(FileDiarizationProcessor):
async def _diarize(self, data):
return FileDiarizationOutput(
diarization=[
DiarizationSegment(start=0.0, end=1.1, speaker=0),
DiarizationSegment(start=1.2, end=2.6, speaker=1),
]
)
with patch(
"reflector.processors.file_diarization_auto.FileDiarizationAutoProcessor.__new__"
) as mock_auto:
mock_auto.return_value = TestFileDiarizationProcessor()
yield
@pytest.fixture
async def dummy_transcript_translator():
from reflector.processors.transcript_translator import TranscriptTranslatorProcessor
@@ -238,9 +295,13 @@ async def dummy_storage():
with (
patch("reflector.storage.base.Storage.get_instance") as mock_storage,
patch("reflector.storage.get_transcripts_storage") as mock_get_transcripts,
patch(
"reflector.pipelines.main_file_pipeline.get_transcripts_storage"
) as mock_get_transcripts2,
):
mock_storage.return_value = dummy
mock_get_transcripts.return_value = dummy
mock_get_transcripts2.return_value = dummy
yield
@@ -260,7 +321,10 @@ def celery_config():
@pytest.fixture(scope="session")
def celery_includes():
return ["reflector.pipelines.main_live_pipeline"]
return [
"reflector.pipelines.main_live_pipeline",
"reflector.pipelines.main_file_pipeline",
]
@pytest.fixture
@@ -302,7 +366,7 @@ async def fake_transcript_with_topics(tmpdir, client):
transcript = await transcripts_controller.get_by_id(tid)
assert transcript is not None
await transcripts_controller.update(transcript, {"status": "finished"})
await transcripts_controller.update(transcript, {"status": "ended"})
# manually copy a file at the expected location
audio_filename = transcript.audio_mp3_filename

View File

@@ -0,0 +1,287 @@
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, patch
import pytest
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.transcripts import SourceKind, transcripts_controller
from reflector.worker.cleanup import cleanup_old_public_data
@pytest.mark.asyncio
async def test_cleanup_old_public_data_skips_when_not_public():
"""Test that cleanup is skipped when PUBLIC_MODE is False."""
with patch("reflector.worker.cleanup.settings") as mock_settings:
mock_settings.PUBLIC_MODE = False
result = await cleanup_old_public_data()
# Should return early without doing anything
assert result is None
@pytest.mark.asyncio
async def test_cleanup_old_public_data_deletes_old_anonymous_transcripts():
"""Test that old anonymous transcripts are deleted."""
# Create old and new anonymous transcripts
old_date = datetime.now(timezone.utc) - timedelta(days=8)
new_date = datetime.now(timezone.utc) - timedelta(days=2)
# Create old anonymous transcript (should be deleted)
old_transcript = await transcripts_controller.add(
name="Old Anonymous Transcript",
source_kind=SourceKind.FILE,
user_id=None, # Anonymous
)
# Manually update created_at to be old
from reflector.db import get_database
from reflector.db.transcripts import transcripts
await get_database().execute(
transcripts.update()
.where(transcripts.c.id == old_transcript.id)
.values(created_at=old_date)
)
# Create new anonymous transcript (should NOT be deleted)
new_transcript = await transcripts_controller.add(
name="New Anonymous Transcript",
source_kind=SourceKind.FILE,
user_id=None, # Anonymous
)
# Create old transcript with user (should NOT be deleted)
old_user_transcript = await transcripts_controller.add(
name="Old User Transcript",
source_kind=SourceKind.FILE,
user_id="user123",
)
await get_database().execute(
transcripts.update()
.where(transcripts.c.id == old_user_transcript.id)
.values(created_at=old_date)
)
with patch("reflector.worker.cleanup.settings") as mock_settings:
mock_settings.PUBLIC_MODE = True
mock_settings.PUBLIC_DATA_RETENTION_DAYS = 7
# Mock the storage deletion
with patch("reflector.db.transcripts.get_transcripts_storage") as mock_storage:
mock_storage.return_value.delete_file = AsyncMock()
result = await cleanup_old_public_data()
# Check results
assert result["transcripts_deleted"] == 1
assert result["errors"] == []
# Verify old anonymous transcript was deleted
assert await transcripts_controller.get_by_id(old_transcript.id) is None
# Verify new anonymous transcript still exists
assert await transcripts_controller.get_by_id(new_transcript.id) is not None
# Verify user transcript still exists
assert await transcripts_controller.get_by_id(old_user_transcript.id) is not None
@pytest.mark.asyncio
async def test_cleanup_deletes_associated_meeting_and_recording():
"""Test that meetings and recordings associated with old transcripts are deleted."""
from reflector.db import get_database
from reflector.db.meetings import meetings
from reflector.db.transcripts import transcripts
old_date = datetime.now(timezone.utc) - timedelta(days=8)
# Create a meeting
meeting_id = "test-meeting-for-transcript"
await get_database().execute(
meetings.insert().values(
id=meeting_id,
room_name="Meeting with Transcript",
room_url="https://example.com/meeting",
host_room_url="https://example.com/meeting-host",
start_date=old_date,
end_date=old_date + timedelta(hours=1),
user_id=None,
room_id=None,
)
)
# Create a recording
recording = await recordings_controller.create(
Recording(
bucket_name="test-bucket",
object_key="test-recording.mp4",
recorded_at=old_date,
)
)
# Create an old transcript with both meeting and recording
old_transcript = await transcripts_controller.add(
name="Old Transcript with Meeting and Recording",
source_kind=SourceKind.ROOM,
user_id=None,
meeting_id=meeting_id,
recording_id=recording.id,
)
# Update created_at to be old
await get_database().execute(
transcripts.update()
.where(transcripts.c.id == old_transcript.id)
.values(created_at=old_date)
)
with patch("reflector.worker.cleanup.settings") as mock_settings:
mock_settings.PUBLIC_MODE = True
mock_settings.PUBLIC_DATA_RETENTION_DAYS = 7
# Mock storage deletion
with patch("reflector.db.transcripts.get_transcripts_storage") as mock_storage:
mock_storage.return_value.delete_file = AsyncMock()
with patch(
"reflector.worker.cleanup.get_recordings_storage"
) as mock_rec_storage:
mock_rec_storage.return_value.delete_file = AsyncMock()
result = await cleanup_old_public_data()
# Check results
assert result["transcripts_deleted"] == 1
assert result["meetings_deleted"] == 1
assert result["recordings_deleted"] == 1
assert result["errors"] == []
# Verify transcript was deleted
assert await transcripts_controller.get_by_id(old_transcript.id) is None
# Verify meeting was deleted
query = meetings.select().where(meetings.c.id == meeting_id)
meeting_result = await get_database().fetch_one(query)
assert meeting_result is None
# Verify recording was deleted
assert await recordings_controller.get_by_id(recording.id) is None
@pytest.mark.asyncio
async def test_cleanup_handles_errors_gracefully():
"""Test that cleanup continues even when individual deletions fail."""
old_date = datetime.now(timezone.utc) - timedelta(days=8)
# Create multiple old transcripts
transcript1 = await transcripts_controller.add(
name="Transcript 1",
source_kind=SourceKind.FILE,
user_id=None,
)
transcript2 = await transcripts_controller.add(
name="Transcript 2",
source_kind=SourceKind.FILE,
user_id=None,
)
# Update created_at to be old
from reflector.db import get_database
from reflector.db.transcripts import transcripts
for t_id in [transcript1.id, transcript2.id]:
await get_database().execute(
transcripts.update()
.where(transcripts.c.id == t_id)
.values(created_at=old_date)
)
with patch("reflector.worker.cleanup.settings") as mock_settings:
mock_settings.PUBLIC_MODE = True
mock_settings.PUBLIC_DATA_RETENTION_DAYS = 7
# Mock remove_by_id to fail for the first transcript
original_remove = transcripts_controller.remove_by_id
call_count = 0
async def mock_remove_by_id(transcript_id, user_id=None):
nonlocal call_count
call_count += 1
if call_count == 1:
raise Exception("Simulated deletion error")
return await original_remove(transcript_id, user_id)
with patch.object(
transcripts_controller, "remove_by_id", side_effect=mock_remove_by_id
):
result = await cleanup_old_public_data()
# Should have one successful deletion and one error
assert result["transcripts_deleted"] == 1
assert len(result["errors"]) == 1
assert "Failed to delete transcript" in result["errors"][0]
@pytest.mark.asyncio
async def test_meeting_consent_cascade_delete():
"""Test that meeting_consent records are automatically deleted when meeting is deleted."""
from reflector.db import get_database
from reflector.db.meetings import (
meeting_consent,
meeting_consent_controller,
meetings,
)
# Create a meeting
meeting_id = "test-cascade-meeting"
await get_database().execute(
meetings.insert().values(
id=meeting_id,
room_name="Test Meeting for CASCADE",
room_url="https://example.com/cascade-test",
host_room_url="https://example.com/cascade-test-host",
start_date=datetime.now(timezone.utc),
end_date=datetime.now(timezone.utc) + timedelta(hours=1),
user_id="test-user",
room_id=None,
)
)
# Create consent records for this meeting
consent1_id = "consent-1"
consent2_id = "consent-2"
await get_database().execute(
meeting_consent.insert().values(
id=consent1_id,
meeting_id=meeting_id,
user_id="user1",
consent_given=True,
consent_timestamp=datetime.now(timezone.utc),
)
)
await get_database().execute(
meeting_consent.insert().values(
id=consent2_id,
meeting_id=meeting_id,
user_id="user2",
consent_given=False,
consent_timestamp=datetime.now(timezone.utc),
)
)
# Verify consent records exist
consents = await meeting_consent_controller.get_by_meeting_id(meeting_id)
assert len(consents) == 2
# Delete the meeting
await get_database().execute(meetings.delete().where(meetings.c.id == meeting_id))
# Verify meeting is deleted
query = meetings.select().where(meetings.c.id == meeting_id)
result = await get_database().fetch_one(query)
assert result is None
# Verify consent records are automatically deleted (CASCADE DELETE)
consents_after = await meeting_consent_controller.get_by_meeting_id(meeting_id)
assert len(consents_after) == 0

View File

@@ -1,61 +0,0 @@
import pytest
@pytest.mark.asyncio
@pytest.mark.parametrize("enable_diarization", [False, True])
async def test_basic_process(
dummy_transcript,
dummy_llm,
dummy_processors,
enable_diarization,
dummy_diarization,
):
# goal is to start the server, and send rtc audio to it
# validate the events received
from pathlib import Path
from reflector.settings import settings
from reflector.tools.process import process_audio_file
# LLM_BACKEND no longer exists in settings
# settings.LLM_BACKEND = "test"
settings.TRANSCRIPT_BACKEND = "whisper"
# event callback
marks = {}
async def event_callback(event):
if event.processor not in marks:
marks[event.processor] = 0
marks[event.processor] += 1
# invoke the process and capture events
path = Path(__file__).parent / "records" / "test_mathieu_hello.wav"
if enable_diarization:
# Test with diarization - may fail if pyannote.audio is not installed
try:
await process_audio_file(
path.as_posix(), event_callback, enable_diarization=True
)
except SystemExit:
pytest.skip("pyannote.audio not installed - skipping diarization test")
else:
# Test without diarization - should always work
await process_audio_file(
path.as_posix(), event_callback, enable_diarization=False
)
print(f"Diarization: {enable_diarization}, Marks: {marks}")
# validate the events
# Each processor should be called for each audio segment processed
# The final processors (Topic, Title, Summary) should be called once at the end
assert marks["TranscriptLinerProcessor"] > 0
assert marks["TranscriptTranslatorPassthroughProcessor"] > 0
assert marks["TranscriptTopicDetectorProcessor"] == 1
assert marks["TranscriptFinalSummaryProcessor"] == 1
assert marks["TranscriptFinalTitleProcessor"] == 1
if enable_diarization:
assert marks["TestAudioDiarizationProcessor"] == 1

View File

@@ -19,7 +19,7 @@ async def fake_transcript(tmpdir, client):
transcript = await transcripts_controller.get_by_id(tid)
assert transcript is not None
await transcripts_controller.update(transcript, {"status": "finished"})
await transcripts_controller.update(transcript, {"status": "ended"})
# manually copy a file at the expected location
audio_filename = transcript.audio_mp3_filename

View File

@@ -29,10 +29,10 @@ async def client(app_lifespan):
@pytest.mark.asyncio
async def test_transcript_process(
tmpdir,
whisper_transcript,
dummy_llm,
dummy_processors,
dummy_diarization,
dummy_file_transcript,
dummy_file_diarization,
dummy_storage,
client,
):
@@ -56,8 +56,8 @@ async def test_transcript_process(
assert response.status_code == 200
assert response.json()["status"] == "ok"
# wait for processing to finish (max 10 minutes)
timeout_seconds = 600 # 10 minutes
# wait for processing to finish (max 1 minute)
timeout_seconds = 60
start_time = time.monotonic()
while (time.monotonic() - start_time) < timeout_seconds:
# fetch the transcript and check if it is ended
@@ -75,9 +75,10 @@ async def test_transcript_process(
)
assert response.status_code == 200
assert response.json()["status"] == "ok"
await asyncio.sleep(2)
# wait for processing to finish (max 10 minutes)
timeout_seconds = 600 # 10 minutes
# wait for processing to finish (max 1 minute)
timeout_seconds = 60
start_time = time.monotonic()
while (time.monotonic() - start_time) < timeout_seconds:
# fetch the transcript and check if it is ended
@@ -99,4 +100,4 @@ async def test_transcript_process(
response = await client.get(f"/transcripts/{tid}/topics")
assert response.status_code == 200
assert len(response.json()) == 1
assert "want to share" in response.json()[0]["transcript"]
assert "Hello world. How are you today?" in response.json()[0]["transcript"]

View File

@@ -12,7 +12,8 @@ async def test_transcript_upload_file(
tmpdir,
dummy_llm,
dummy_processors,
dummy_diarization,
dummy_file_transcript,
dummy_file_diarization,
dummy_storage,
client,
):
@@ -36,8 +37,8 @@ async def test_transcript_upload_file(
assert response.status_code == 200
assert response.json()["status"] == "ok"
# wait the processing to finish (max 10 minutes)
timeout_seconds = 600 # 10 minutes
# wait the processing to finish (max 1 minute)
timeout_seconds = 60
start_time = time.monotonic()
while (time.monotonic() - start_time) < timeout_seconds:
# fetch the transcript and check if it is ended
@@ -47,7 +48,7 @@ async def test_transcript_upload_file(
break
await asyncio.sleep(1)
else:
pytest.fail(f"Processing timed out after {timeout_seconds} seconds")
return pytest.fail(f"Processing timed out after {timeout_seconds} seconds")
# check the transcript is ended
transcript = resp.json()
@@ -59,4 +60,4 @@ async def test_transcript_upload_file(
response = await client.get(f"/transcripts/{tid}/topics")
assert response.status_code == 200
assert len(response.json()) == 1
assert "want to share" in response.json()[0]["transcript"]
assert "Hello world. How are you today?" in response.json()[0]["transcript"]

View File

@@ -12,11 +12,11 @@ import {
HStack,
} from "@chakra-ui/react";
import { LuLink } from "react-icons/lu";
import { Room } from "../../../api";
import { RoomDetails } from "../../../api";
import { RoomActionsMenu } from "./RoomActionsMenu";
interface RoomCardsProps {
rooms: Room[];
rooms: RoomDetails[];
linkCopied: string;
onCopyUrl: (roomName: string) => void;
onEdit: (roomId: string, roomData: any) => void;

View File

@@ -1,11 +1,11 @@
import { Box, Heading, Text, VStack } from "@chakra-ui/react";
import { Room } from "../../../api";
import { RoomDetails } from "../../../api";
import { RoomTable } from "./RoomTable";
import { RoomCards } from "./RoomCards";
interface RoomListProps {
title: string;
rooms: Room[];
rooms: RoomDetails[];
linkCopied: string;
onCopyUrl: (roomName: string) => void;
onEdit: (roomId: string, roomData: any) => void;

View File

@@ -9,11 +9,11 @@ import {
Spinner,
} from "@chakra-ui/react";
import { LuLink } from "react-icons/lu";
import { Room } from "../../../api";
import { RoomDetails } from "../../../api";
import { RoomActionsMenu } from "./RoomActionsMenu";
interface RoomTableProps {
rooms: Room[];
rooms: RoomDetails[];
linkCopied: string;
onCopyUrl: (roomName: string) => void;
onEdit: (roomId: string, roomData: any) => void;

View File

@@ -11,13 +11,15 @@ import {
Input,
Select,
Spinner,
IconButton,
createListCollection,
useDisclosure,
} from "@chakra-ui/react";
import { useEffect, useState } from "react";
import { LuEye, LuEyeOff } from "react-icons/lu";
import useApi from "../../lib/useApi";
import useRoomList from "./useRoomList";
import { ApiError, Room } from "../../api";
import { ApiError, RoomDetails } from "../../api";
import { RoomList } from "./_components/RoomList";
import { PaginationPage } from "../browse/_components/Pagination";
@@ -55,6 +57,8 @@ const roomInitialState = {
recordingType: "cloud",
recordingTrigger: "automatic-2nd-participant",
isShared: false,
webhookUrl: "",
webhookSecret: "",
};
export default function RoomsList() {
@@ -83,6 +87,11 @@ export default function RoomsList() {
const [topics, setTopics] = useState<Topic[]>([]);
const [nameError, setNameError] = useState("");
const [linkCopied, setLinkCopied] = useState("");
const [testingWebhook, setTestingWebhook] = useState(false);
const [webhookTestResult, setWebhookTestResult] = useState<string | null>(
null,
);
const [showWebhookSecret, setShowWebhookSecret] = useState(false);
interface Stream {
stream_id: number;
name: string;
@@ -155,6 +164,69 @@ export default function RoomsList() {
}, 2000);
};
const handleCloseDialog = () => {
setShowWebhookSecret(false);
setWebhookTestResult(null);
onClose();
};
const handleTestWebhook = async () => {
if (!room.webhookUrl || !editRoomId) {
setWebhookTestResult("Please enter a webhook URL first");
return;
}
setTestingWebhook(true);
setWebhookTestResult(null);
try {
const response = await api?.v1RoomsTestWebhook({
roomId: editRoomId,
});
if (response?.success) {
setWebhookTestResult(
`✅ Webhook test successful! Status: ${response.status_code}`,
);
} else {
let errorMsg = `❌ Webhook test failed`;
if (response?.status_code) {
errorMsg += ` (Status: ${response.status_code})`;
}
if (response?.error) {
errorMsg += `: ${response.error}`;
} else if (response?.response_preview) {
// Try to parse and extract meaningful error from response
// Specific to N8N at the moment, as there is no specification for that
// We could just display as is, but decided here to dig a little bit more.
try {
const preview = JSON.parse(response.response_preview);
if (preview.message) {
errorMsg += `: ${preview.message}`;
}
} catch {
// If not JSON, just show the preview text (truncated)
const previewText = response.response_preview.substring(0, 150);
errorMsg += `: ${previewText}`;
}
} else if (response?.message) {
errorMsg += `: ${response.message}`;
}
setWebhookTestResult(errorMsg);
}
} catch (error) {
console.error("Error testing webhook:", error);
setWebhookTestResult("❌ Failed to test webhook. Please check your URL.");
} finally {
setTestingWebhook(false);
}
// Clear result after 5 seconds
setTimeout(() => {
setWebhookTestResult(null);
}, 5000);
};
const handleSaveRoom = async () => {
try {
if (RESERVED_PATHS.includes(room.name)) {
@@ -172,6 +244,8 @@ export default function RoomsList() {
recording_type: room.recordingType,
recording_trigger: room.recordingTrigger,
is_shared: room.isShared,
webhook_url: room.webhookUrl,
webhook_secret: room.webhookSecret,
};
if (isEditing) {
@@ -190,7 +264,7 @@ export default function RoomsList() {
setEditRoomId("");
setNameError("");
refetch();
onClose();
handleCloseDialog();
} catch (err) {
if (
err instanceof ApiError &&
@@ -206,18 +280,46 @@ export default function RoomsList() {
}
};
const handleEditRoom = (roomId, roomData) => {
setRoom({
name: roomData.name,
zulipAutoPost: roomData.zulip_auto_post,
zulipStream: roomData.zulip_stream,
zulipTopic: roomData.zulip_topic,
isLocked: roomData.is_locked,
roomMode: roomData.room_mode,
recordingType: roomData.recording_type,
recordingTrigger: roomData.recording_trigger,
isShared: roomData.is_shared,
});
const handleEditRoom = async (roomId, roomData) => {
// Reset states
setShowWebhookSecret(false);
setWebhookTestResult(null);
// Fetch full room details to get webhook fields
try {
const detailedRoom = await api?.v1RoomsGet({ roomId });
if (detailedRoom) {
setRoom({
name: detailedRoom.name,
zulipAutoPost: detailedRoom.zulip_auto_post,
zulipStream: detailedRoom.zulip_stream,
zulipTopic: detailedRoom.zulip_topic,
isLocked: detailedRoom.is_locked,
roomMode: detailedRoom.room_mode,
recordingType: detailedRoom.recording_type,
recordingTrigger: detailedRoom.recording_trigger,
isShared: detailedRoom.is_shared,
webhookUrl: detailedRoom.webhook_url || "",
webhookSecret: detailedRoom.webhook_secret || "",
});
}
} catch (error) {
console.error("Failed to fetch room details, using list data:", error);
// Fallback to using the data from the list
setRoom({
name: roomData.name,
zulipAutoPost: roomData.zulip_auto_post,
zulipStream: roomData.zulip_stream,
zulipTopic: roomData.zulip_topic,
isLocked: roomData.is_locked,
roomMode: roomData.room_mode,
recordingType: roomData.recording_type,
recordingTrigger: roomData.recording_trigger,
isShared: roomData.is_shared,
webhookUrl: roomData.webhook_url || "",
webhookSecret: roomData.webhook_secret || "",
});
}
setEditRoomId(roomId);
setIsEditing(true);
setNameError("");
@@ -250,9 +352,9 @@ export default function RoomsList() {
});
};
const myRooms: Room[] =
const myRooms: RoomDetails[] =
response?.items.filter((roomData) => !roomData.is_shared) || [];
const sharedRooms: Room[] =
const sharedRooms: RoomDetails[] =
response?.items.filter((roomData) => roomData.is_shared) || [];
if (loading && !response)
@@ -287,6 +389,8 @@ export default function RoomsList() {
setIsEditing(false);
setRoom(roomInitialState);
setNameError("");
setShowWebhookSecret(false);
setWebhookTestResult(null);
onOpen();
}}
>
@@ -296,7 +400,7 @@ export default function RoomsList() {
<Dialog.Root
open={open}
onOpenChange={(e) => (e.open ? onOpen() : onClose())}
onOpenChange={(e) => (e.open ? onOpen() : handleCloseDialog())}
size="lg"
>
<Dialog.Backdrop />
@@ -533,6 +637,109 @@ export default function RoomsList() {
</Select.Positioner>
</Select.Root>
</Field.Root>
{/* Webhook Configuration Section */}
<Field.Root mt={8}>
<Field.Label>Webhook URL</Field.Label>
<Input
name="webhookUrl"
type="url"
placeholder="https://example.com/webhook"
value={room.webhookUrl}
onChange={handleRoomChange}
/>
<Field.HelperText>
Optional: URL to receive notifications when transcripts are
ready
</Field.HelperText>
</Field.Root>
{room.webhookUrl && (
<>
<Field.Root mt={4}>
<Field.Label>Webhook Secret</Field.Label>
<Flex gap={2}>
<Input
name="webhookSecret"
type={showWebhookSecret ? "text" : "password"}
value={room.webhookSecret}
onChange={handleRoomChange}
placeholder={
isEditing && room.webhookSecret
? "••••••••"
: "Leave empty to auto-generate"
}
flex="1"
/>
{isEditing && room.webhookSecret && (
<IconButton
size="sm"
variant="ghost"
aria-label={
showWebhookSecret ? "Hide secret" : "Show secret"
}
onClick={() =>
setShowWebhookSecret(!showWebhookSecret)
}
>
{showWebhookSecret ? <LuEyeOff /> : <LuEye />}
</IconButton>
)}
</Flex>
<Field.HelperText>
Used for HMAC signature verification (auto-generated if
left empty)
</Field.HelperText>
</Field.Root>
{isEditing && (
<>
<Flex
mt={2}
gap={2}
alignItems="flex-start"
direction="column"
>
<Button
size="sm"
variant="outline"
onClick={handleTestWebhook}
disabled={testingWebhook || !room.webhookUrl}
>
{testingWebhook ? (
<>
<Spinner size="xs" mr={2} />
Testing...
</>
) : (
"Test Webhook"
)}
</Button>
{webhookTestResult && (
<div
style={{
fontSize: "14px",
wordBreak: "break-word",
maxWidth: "100%",
padding: "8px",
borderRadius: "4px",
backgroundColor: webhookTestResult.startsWith(
"✅",
)
? "#f0fdf4"
: "#fef2f2",
border: `1px solid ${webhookTestResult.startsWith("✅") ? "#86efac" : "#fca5a5"}`,
}}
>
{webhookTestResult}
</div>
)}
</Flex>
</>
)}
</>
)}
<Field.Root mt={4}>
<Checkbox.Root
name="isShared"
@@ -557,7 +764,7 @@ export default function RoomsList() {
</Field.Root>
</Dialog.Body>
<Dialog.Footer>
<Button variant="ghost" onClick={onClose}>
<Button variant="ghost" onClick={handleCloseDialog}>
Cancel
</Button>
<Button

View File

@@ -1,11 +1,11 @@
import { useEffect, useState } from "react";
import { useError } from "../../(errors)/errorContext";
import useApi from "../../lib/useApi";
import { Page_Room_ } from "../../api";
import { Page_RoomDetails_ } from "../../api";
import { PaginationPage } from "../browse/_components/Pagination";
type RoomList = {
response: Page_Room_ | null;
response: Page_RoomDetails_ | null;
loading: boolean;
error: Error | null;
refetch: () => void;
@@ -13,7 +13,7 @@ type RoomList = {
//always protected
const useRoomList = (page: PaginationPage): RoomList => {
const [response, setResponse] = useState<Page_Room_ | null>(null);
const [response, setResponse] = useState<Page_RoomDetails_ | null>(null);
const [loading, setLoading] = useState<boolean>(true);
const [error, setErrorState] = useState<Error | null>(null);
const { setError } = useError();

View File

@@ -91,6 +91,14 @@ export const $CreateRoom = {
type: "boolean",
title: "Is Shared",
},
webhook_url: {
type: "string",
title: "Webhook Url",
},
webhook_secret: {
type: "string",
title: "Webhook Secret",
},
},
type: "object",
required: [
@@ -103,6 +111,8 @@ export const $CreateRoom = {
"recording_type",
"recording_trigger",
"is_shared",
"webhook_url",
"webhook_secret",
],
title: "CreateRoom",
} as const;
@@ -809,11 +819,11 @@ export const $Page_GetTranscriptMinimal_ = {
title: "Page[GetTranscriptMinimal]",
} as const;
export const $Page_Room_ = {
export const $Page_RoomDetails_ = {
properties: {
items: {
items: {
$ref: "#/components/schemas/Room",
$ref: "#/components/schemas/RoomDetails",
},
type: "array",
title: "Items",
@@ -869,7 +879,7 @@ export const $Page_Room_ = {
},
type: "object",
required: ["items", "page", "size"],
title: "Page[Room]",
title: "Page[RoomDetails]",
} as const;
export const $Participant = {
@@ -969,6 +979,86 @@ export const $Room = {
title: "Room",
} as const;
export const $RoomDetails = {
properties: {
id: {
type: "string",
title: "Id",
},
name: {
type: "string",
title: "Name",
},
user_id: {
type: "string",
title: "User Id",
},
created_at: {
type: "string",
format: "date-time",
title: "Created At",
},
zulip_auto_post: {
type: "boolean",
title: "Zulip Auto Post",
},
zulip_stream: {
type: "string",
title: "Zulip Stream",
},
zulip_topic: {
type: "string",
title: "Zulip Topic",
},
is_locked: {
type: "boolean",
title: "Is Locked",
},
room_mode: {
type: "string",
title: "Room Mode",
},
recording_type: {
type: "string",
title: "Recording Type",
},
recording_trigger: {
type: "string",
title: "Recording Trigger",
},
is_shared: {
type: "boolean",
title: "Is Shared",
},
webhook_url: {
type: "string",
title: "Webhook Url",
},
webhook_secret: {
type: "string",
title: "Webhook Secret",
},
},
type: "object",
required: [
"id",
"name",
"user_id",
"created_at",
"zulip_auto_post",
"zulip_stream",
"zulip_topic",
"is_locked",
"room_mode",
"recording_type",
"recording_trigger",
"is_shared",
"webhook_url",
"webhook_secret",
],
title: "RoomDetails",
} as const;
export const $RtcOffer = {
properties: {
sdp: {
@@ -1351,6 +1441,14 @@ export const $UpdateRoom = {
type: "boolean",
title: "Is Shared",
},
webhook_url: {
type: "string",
title: "Webhook Url",
},
webhook_secret: {
type: "string",
title: "Webhook Secret",
},
},
type: "object",
required: [
@@ -1363,6 +1461,8 @@ export const $UpdateRoom = {
"recording_type",
"recording_trigger",
"is_shared",
"webhook_url",
"webhook_secret",
],
title: "UpdateRoom",
} as const;
@@ -1541,6 +1641,50 @@ export const $ValidationError = {
title: "ValidationError",
} as const;
export const $WebhookTestResult = {
properties: {
success: {
type: "boolean",
title: "Success",
},
message: {
type: "string",
title: "Message",
default: "",
},
error: {
type: "string",
title: "Error",
default: "",
},
status_code: {
anyOf: [
{
type: "integer",
},
{
type: "null",
},
],
title: "Status Code",
},
response_preview: {
anyOf: [
{
type: "string",
},
{
type: "null",
},
],
title: "Response Preview",
},
},
type: "object",
required: ["success"],
title: "WebhookTestResult",
} as const;
export const $WherebyWebhookEvent = {
properties: {
apiVersion: {

View File

@@ -10,12 +10,16 @@ import type {
V1RoomsListResponse,
V1RoomsCreateData,
V1RoomsCreateResponse,
V1RoomsGetData,
V1RoomsGetResponse,
V1RoomsUpdateData,
V1RoomsUpdateResponse,
V1RoomsDeleteData,
V1RoomsDeleteResponse,
V1RoomsCreateMeetingData,
V1RoomsCreateMeetingResponse,
V1RoomsTestWebhookData,
V1RoomsTestWebhookResponse,
V1TranscriptsListData,
V1TranscriptsListResponse,
V1TranscriptsCreateData,
@@ -118,7 +122,7 @@ export class DefaultService {
* @param data The data for the request.
* @param data.page Page number
* @param data.size Page size
* @returns Page_Room_ Successful Response
* @returns Page_RoomDetails_ Successful Response
* @throws ApiError
*/
public v1RoomsList(
@@ -158,12 +162,34 @@ export class DefaultService {
});
}
/**
* Rooms Get
* @param data The data for the request.
* @param data.roomId
* @returns RoomDetails Successful Response
* @throws ApiError
*/
public v1RoomsGet(
data: V1RoomsGetData,
): CancelablePromise<V1RoomsGetResponse> {
return this.httpRequest.request({
method: "GET",
url: "/v1/rooms/{room_id}",
path: {
room_id: data.roomId,
},
errors: {
422: "Validation Error",
},
});
}
/**
* Rooms Update
* @param data The data for the request.
* @param data.roomId
* @param data.requestBody
* @returns Room Successful Response
* @returns RoomDetails Successful Response
* @throws ApiError
*/
public v1RoomsUpdate(
@@ -227,6 +253,29 @@ export class DefaultService {
});
}
/**
* Rooms Test Webhook
* Test webhook configuration by sending a sample payload.
* @param data The data for the request.
* @param data.roomId
* @returns WebhookTestResult Successful Response
* @throws ApiError
*/
public v1RoomsTestWebhook(
data: V1RoomsTestWebhookData,
): CancelablePromise<V1RoomsTestWebhookResponse> {
return this.httpRequest.request({
method: "POST",
url: "/v1/rooms/{room_id}/webhook/test",
path: {
room_id: data.roomId,
},
errors: {
422: "Validation Error",
},
});
}
/**
* Transcripts List
* @param data The data for the request.

View File

@@ -24,6 +24,8 @@ export type CreateRoom = {
recording_type: string;
recording_trigger: string;
is_shared: boolean;
webhook_url: string;
webhook_secret: string;
};
export type CreateTranscript = {
@@ -147,8 +149,8 @@ export type Page_GetTranscriptMinimal_ = {
pages?: number | null;
};
export type Page_Room_ = {
items: Array<Room>;
export type Page_RoomDetails_ = {
items: Array<RoomDetails>;
total?: number | null;
page: number | null;
size: number | null;
@@ -176,6 +178,23 @@ export type Room = {
is_shared: boolean;
};
export type RoomDetails = {
id: string;
name: string;
user_id: string;
created_at: string;
zulip_auto_post: boolean;
zulip_stream: string;
zulip_topic: string;
is_locked: boolean;
room_mode: string;
recording_type: string;
recording_trigger: string;
is_shared: boolean;
webhook_url: string;
webhook_secret: string;
};
export type RtcOffer = {
sdp: string;
type: string;
@@ -281,6 +300,8 @@ export type UpdateRoom = {
recording_type: string;
recording_trigger: string;
is_shared: boolean;
webhook_url: string;
webhook_secret: string;
};
export type UpdateTranscript = {
@@ -307,6 +328,14 @@ export type ValidationError = {
type: string;
};
export type WebhookTestResult = {
success: boolean;
message?: string;
error?: string;
status_code?: number | null;
response_preview?: string | null;
};
export type WherebyWebhookEvent = {
apiVersion: string;
id: string;
@@ -350,7 +379,7 @@ export type V1RoomsListData = {
size?: number;
};
export type V1RoomsListResponse = Page_Room_;
export type V1RoomsListResponse = Page_RoomDetails_;
export type V1RoomsCreateData = {
requestBody: CreateRoom;
@@ -358,12 +387,18 @@ export type V1RoomsCreateData = {
export type V1RoomsCreateResponse = Room;
export type V1RoomsGetData = {
roomId: string;
};
export type V1RoomsGetResponse = RoomDetails;
export type V1RoomsUpdateData = {
requestBody: UpdateRoom;
roomId: string;
};
export type V1RoomsUpdateResponse = Room;
export type V1RoomsUpdateResponse = RoomDetails;
export type V1RoomsDeleteData = {
roomId: string;
@@ -377,6 +412,12 @@ export type V1RoomsCreateMeetingData = {
export type V1RoomsCreateMeetingResponse = Meeting;
export type V1RoomsTestWebhookData = {
roomId: string;
};
export type V1RoomsTestWebhookResponse = WebhookTestResult;
export type V1TranscriptsListData = {
/**
* Page number
@@ -613,7 +654,7 @@ export type $OpenApiTs = {
/**
* Successful Response
*/
200: Page_Room_;
200: Page_RoomDetails_;
/**
* Validation Error
*/
@@ -635,13 +676,26 @@ export type $OpenApiTs = {
};
};
"/v1/rooms/{room_id}": {
get: {
req: V1RoomsGetData;
res: {
/**
* Successful Response
*/
200: RoomDetails;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
patch: {
req: V1RoomsUpdateData;
res: {
/**
* Successful Response
*/
200: Room;
200: RoomDetails;
/**
* Validation Error
*/
@@ -677,6 +731,21 @@ export type $OpenApiTs = {
};
};
};
"/v1/rooms/{room_id}/webhook/test": {
post: {
req: V1RoomsTestWebhookData;
res: {
/**
* Successful Response
*/
200: WebhookTestResult;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/v1/transcripts": {
get: {
req: V1TranscriptsListData;