Compare commits

...

10 Commits

Author SHA1 Message Date
db3beae5cd chore(main): release 0.4.0 (#510) 2025-07-25 19:09:57 -06:00
Igor Loskutov
03b9a18c1b fix: remove faulty import Meeting (#512)
* fix: remove faulty import Meeting

* fix: remove faulty import Meeting
2025-07-25 17:48:10 -04:00
Igor Loskutov
7e3027adb6 fix: room concurrency (theoretically) (#511)
* fix: room concurrency (theoretically)

* cleanup

* cleanup
2025-07-25 17:37:51 -04:00
Igor Loskutov
27b43d85ab feat: Diarization cli (#509)
* diarisation cli

* feat: s3 upload for modal diarisation cli call

* chore: cleanup

* chore: s3 cleanup improvement

* chore: lint

* chore: cleanup

* chore: cleanup

* chore: cleanup

* chore: cleanup
2025-07-25 16:24:06 -04:00
2289a1a231 chore(main): release 0.3.2 (#506) 2025-07-22 19:15:47 -06:00
d0e130eb13 fix: match font size for the filter sidebar (#507) 2025-07-22 14:59:23 -06:00
24fabe3e86 fix: whereby consent not displaying (#505) 2025-07-22 12:20:26 -06:00
6fedbbe63f chore(main): release 0.3.1 (#503) 2025-07-21 22:52:21 -06:00
b39175cdc9 fix: remove primary color for room action menu (#504) 2025-07-21 22:45:26 -06:00
2a2af5fff2 fix: remove fief out of the source code (#502)
* fix: remove fief out of the source code

* fix: remove corresponding test about migration
2025-07-21 21:09:05 -06:00
23 changed files with 971 additions and 257 deletions

View File

@@ -1,5 +1,34 @@
# Changelog
## [0.4.0](https://github.com/Monadical-SAS/reflector/compare/v0.3.2...v0.4.0) (2025-07-25)
### Features
* Diarization cli ([#509](https://github.com/Monadical-SAS/reflector/issues/509)) ([ffc8003](https://github.com/Monadical-SAS/reflector/commit/ffc8003e6dad236930a27d0fe3e2f2adfb793890))
### Bug Fixes
* remove faulty import Meeting ([#512](https://github.com/Monadical-SAS/reflector/issues/512)) ([0e68c79](https://github.com/Monadical-SAS/reflector/commit/0e68c798434e1b481f9482cc3a4702ea00365df4))
* room concurrency (theoretically) ([#511](https://github.com/Monadical-SAS/reflector/issues/511)) ([7bb3676](https://github.com/Monadical-SAS/reflector/commit/7bb367653afeb2778cff697a0eb217abf0b81b84))
## [0.3.2](https://github.com/Monadical-SAS/reflector/compare/v0.3.1...v0.3.2) (2025-07-22)
### Bug Fixes
* match font size for the filter sidebar ([#507](https://github.com/Monadical-SAS/reflector/issues/507)) ([4b8ba5d](https://github.com/Monadical-SAS/reflector/commit/4b8ba5db1733557e27b098ad3d1cdecadf97ae52))
* whereby consent not displaying ([#505](https://github.com/Monadical-SAS/reflector/issues/505)) ([1120552](https://github.com/Monadical-SAS/reflector/commit/1120552c2c83d084d3a39272ad49b6aeda1af98f))
## [0.3.1](https://github.com/Monadical-SAS/reflector/compare/v0.3.0...v0.3.1) (2025-07-22)
### Bug Fixes
* remove fief out of the source code ([#502](https://github.com/Monadical-SAS/reflector/issues/502)) ([890dd15](https://github.com/Monadical-SAS/reflector/commit/890dd15ba5a2be10dbb841e9aeb75d377885f4af))
* remove primary color for room action menu ([#504](https://github.com/Monadical-SAS/reflector/issues/504)) ([2e33f89](https://github.com/Monadical-SAS/reflector/commit/2e33f89c0f9e5fbaafa80e8d2ae9788450ea2f31))
## [0.3.0](https://github.com/Monadical-SAS/reflector/compare/v0.2.1...v0.3.0) (2025-07-21)

View File

@@ -146,7 +146,7 @@ All endpoints prefixed `/v1/`:
- `REDIS_URL` - Redis broker for Celery
- `MODAL_TOKEN_ID`, `MODAL_TOKEN_SECRET` - Modal.com GPU processing
- `WHEREBY_API_KEY` - Video platform integration
- `REFLECTOR_AUTH_BACKEND` - Authentication method (none, fief, jwt)
- `REFLECTOR_AUTH_BACKEND` - Authentication method (none, jwt)
**Frontend** (`www/.env`):
- `NEXTAUTH_URL`, `NEXTAUTH_SECRET` - Authentication configuration

View File

@@ -6,11 +6,6 @@ LLM_BACKEND=modal
LLM_URL=https://monadical-sas--reflector-llm-web.modal.run
LLM_MODAL_API_KEY=***REMOVED***
AUTH_BACKEND=fief
AUTH_FIEF_URL=https://auth.reflector.media/reflector-local
AUTH_FIEF_CLIENT_ID=***REMOVED***
AUTH_FIEF_CLIENT_SECRET=<ask in zulip> <-----------------------------------------------------------------------------------------
TRANSLATE_URL=https://monadical-sas--reflector-translator-web.modal.run
ZEPHYR_LLM_URL=https://monadical-sas--reflector-llm-zephyr-web.modal.run
DIARIZATION_URL=https://monadical-sas--reflector-diarizer-web.modal.run

1
server/.gitignore vendored
View File

@@ -180,3 +180,4 @@ reflector.sqlite3
data/
dump.rdb

View File

@@ -7,11 +7,9 @@
## User authentication
## =======================================================
## Using fief (fief.dev)
AUTH_BACKEND=fief
AUTH_FIEF_URL=https://auth.reflector.media/reflector-local
AUTH_FIEF_CLIENT_ID=***REMOVED***
AUTH_FIEF_CLIENT_SECRET=<ask in zulip>
## Using jwt/authentik
AUTH_BACKEND=jwt
AUTH_JWT_AUDIENCE=
## =======================================================
## Transcription backend
@@ -88,4 +86,3 @@ DIARIZATION_URL=https://monadical-sas--reflector-diarizer-web.modal.run
## Sentry DSN configuration
#SENTRY_DSN=

View File

@@ -0,0 +1,35 @@
"""add_unique_constraint_one_active_meeting_per_room
Revision ID: b7df9609542c
Revises: d7fbb74b673b
Create Date: 2025-07-25 16:27:06.959868
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = 'b7df9609542c'
down_revision: Union[str, None] = 'd7fbb74b673b'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# Create a partial unique index that ensures only one active meeting per room
# This works for both PostgreSQL and SQLite
op.create_index(
'idx_one_active_meeting_per_room',
'meeting',
['room_id'],
unique=True,
postgresql_where=sa.text('is_active = true'),
sqlite_where=sa.text('is_active = 1')
)
def downgrade() -> None:
op.drop_index('idx_one_active_meeting_per_room', table_name='meeting')

View File

@@ -22,7 +22,6 @@ dependencies = [
"fastapi-pagination>=0.12.6",
"databases[aiosqlite, asyncpg]>=0.7.0",
"sqlalchemy<1.5",
"fief-client[fastapi]>=0.17.0",
"alembic>=1.11.3",
"nltk>=3.8.1",
"prometheus-fastapi-instrumentator>=6.1.0",

View File

@@ -1,25 +0,0 @@
from fastapi.security import OAuth2AuthorizationCodeBearer
from fief_client import FiefAccessTokenInfo, FiefAsync, FiefUserInfo
from fief_client.integrations.fastapi import FiefAuth
from reflector.settings import settings
fief = FiefAsync(
settings.AUTH_FIEF_URL,
settings.AUTH_FIEF_CLIENT_ID,
settings.AUTH_FIEF_CLIENT_SECRET,
)
scheme = OAuth2AuthorizationCodeBearer(
f"{settings.AUTH_FIEF_URL}/authorize",
f"{settings.AUTH_FIEF_URL}/api/token",
scopes={"openid": "openid", "offline_access": "offline_access"},
auto_error=False,
)
auth = FiefAuth(fief, scheme)
UserInfo = FiefUserInfo
AccessTokenInfo = FiefAccessTokenInfo
authenticated = auth.authenticated()
current_user = auth.current_user()
current_user_optional = auth.current_user(optional=True)

View File

@@ -1,56 +0,0 @@
from reflector.db import database
from reflector.db.meetings import meetings
from reflector.db.rooms import rooms
from reflector.db.transcripts import transcripts
users_to_migrate = [
["123@lifex.pink", "63b727f5-485d-449f-b528-563d779b11ef", None],
["ana@monadical.com", "1bae2e4d-5c04-49c2-932f-a86266a6ca13", None],
["cspencer@sprocket.org", "614ed0be-392e-488c-bd19-6a9730fd0e9e", None],
["daniel.f.lopez.j@gmail.com", "ca9561bd-c989-4a1e-8877-7081cf62ae7f", None],
["jenalee@monadical.com", "c7c1e79e-b068-4b28-a9f4-29d98b1697ed", None],
["jennifer@rootandseed.com", "f5321727-7546-4b2b-b69d-095a931ef0c4", None],
["jose@monadical.com", "221f079c-7ce0-4677-90b7-0359b6315e27", None],
["labenclayton@gmail.com", "40078cd0-543c-40e4-9c2e-5ce57a686428", None],
["mathieu@monadical.com", "c7a36151-851e-4afa-9fab-aaca834bfd30", None],
["michal.flak.96@gmail.com", "3096eb5e-b590-41fc-a0d1-d152c1895402", None],
["sara@monadical.com", "31ab0cfe-5d2c-4c7a-84de-a29494714c99", None],
["sara@monadical.com", "b871e5f0-754e-447f-9c3d-19f629f0082b", None],
["sebastian@monadical.com", "f024f9d0-15d0-480f-8529-43959fc8b639", None],
["sergey@monadical.com", "5c4798eb-b9ab-4721-a540-bd96fc434156", None],
["sergey@monadical.com", "9dd8a6b4-247e-48fe-b1fb-4c84dd3c01bc", None],
["transient.tran@gmail.com", "617ba2d3-09b6-4b1f-a435-a7f41c3ce060", None],
]
async def migrate_user(email, user_id):
# if the email match the email in the users_to_migrate list
# reassign all transcripts/rooms/meetings to the new user_id
user_ids = [user[1] for user in users_to_migrate if user[0] == email]
if not user_ids:
return
# do not migrate back
if user_id in user_ids:
return
for old_user_id in user_ids:
query = (
transcripts.update()
.where(transcripts.c.user_id == old_user_id)
.values(user_id=user_id)
)
await database.execute(query)
query = (
rooms.update().where(rooms.c.user_id == old_user_id).values(user_id=user_id)
)
await database.execute(query)
query = (
meetings.update()
.where(meetings.c.user_id == old_user_id)
.values(user_id=user_id)
)
await database.execute(query)

View File

@@ -90,14 +90,9 @@ class Settings(BaseSettings):
# Sentry
SENTRY_DSN: str | None = None
# User authentication (none, fief)
# User authentication (none, jwt)
AUTH_BACKEND: str = "none"
# User authentication using fief
AUTH_FIEF_URL: str | None = None
AUTH_FIEF_CLIENT_ID: str | None = None
AUTH_FIEF_CLIENT_SECRET: str | None = None
# User authentication using JWT
AUTH_JWT_ALGORITHM: str = "RS256"
AUTH_JWT_PUBLIC_KEY: str | None = "authentik.monadical.com_public.pem"

View File

@@ -0,0 +1,314 @@
"""
@vibe-generated
Process audio file with diarization support
===========================================
Extended version of process.py that includes speaker diarization.
This tool processes audio files locally without requiring the full server infrastructure.
"""
import asyncio
import tempfile
from pathlib import Path
from typing import List
import uuid
import av
from reflector.logger import logger
from reflector.processors import (
AudioChunkerProcessor,
AudioMergeProcessor,
AudioTranscriptAutoProcessor,
AudioFileWriterProcessor,
Pipeline,
PipelineEvent,
TranscriptFinalSummaryProcessor,
TranscriptFinalTitleProcessor,
TranscriptLinerProcessor,
TranscriptTopicDetectorProcessor,
TranscriptTranslatorProcessor,
)
from reflector.processors.base import BroadcastProcessor, Processor
from reflector.processors.types import (
AudioDiarizationInput,
TitleSummary,
TitleSummaryWithId,
)
class TopicCollectorProcessor(Processor):
"""Collect topics for diarization"""
INPUT_TYPE = TitleSummary
OUTPUT_TYPE = TitleSummary
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.topics: List[TitleSummaryWithId] = []
self._topic_id = 0
async def _push(self, data: TitleSummary):
# Convert to TitleSummaryWithId and collect
self._topic_id += 1
topic_with_id = TitleSummaryWithId(
id=str(self._topic_id),
title=data.title,
summary=data.summary,
timestamp=data.timestamp,
duration=data.duration,
transcript=data.transcript,
)
self.topics.append(topic_with_id)
# Pass through the original topic
await self.emit(data)
def get_topics(self) -> List[TitleSummaryWithId]:
return self.topics
async def process_audio_file_with_diarization(
filename,
event_callback,
only_transcript=False,
source_language="en",
target_language="en",
enable_diarization=True,
diarization_backend="modal",
):
# Create temp file for audio if diarization is enabled
audio_temp_path = None
if enable_diarization:
audio_temp_file = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
audio_temp_path = audio_temp_file.name
audio_temp_file.close()
# Create processor for collecting topics
topic_collector = TopicCollectorProcessor()
# Build pipeline for audio processing
processors = []
# Add audio file writer at the beginning if diarization is enabled
if enable_diarization:
processors.append(AudioFileWriterProcessor(audio_temp_path))
# Add the rest of the processors
processors += [
AudioChunkerProcessor(),
AudioMergeProcessor(),
AudioTranscriptAutoProcessor.as_threaded(),
]
processors += [
TranscriptLinerProcessor(),
TranscriptTranslatorProcessor.as_threaded(),
]
if not only_transcript:
processors += [
TranscriptTopicDetectorProcessor.as_threaded(),
# Collect topics for diarization
topic_collector,
BroadcastProcessor(
processors=[
TranscriptFinalTitleProcessor.as_threaded(),
TranscriptFinalSummaryProcessor.as_threaded(),
],
),
]
# Create main pipeline
pipeline = Pipeline(*processors)
pipeline.set_pref("audio:source_language", source_language)
pipeline.set_pref("audio:target_language", target_language)
pipeline.describe()
pipeline.on(event_callback)
# Start processing audio
logger.info(f"Opening {filename}")
container = av.open(filename)
try:
logger.info("Start pushing audio into the pipeline")
for frame in container.decode(audio=0):
await pipeline.push(frame)
finally:
logger.info("Flushing the pipeline")
await pipeline.flush()
# Run diarization if enabled and we have topics
if enable_diarization and not only_transcript and audio_temp_path:
topics = topic_collector.get_topics()
if topics:
logger.info(f"Starting diarization with {len(topics)} topics")
try:
# Import diarization processor
from reflector.processors import AudioDiarizationAutoProcessor
# Create diarization processor
diarization_processor = AudioDiarizationAutoProcessor(
name=diarization_backend
)
diarization_processor.on(event_callback)
# For Modal backend, we need to upload the file to S3 first
if diarization_backend == "modal":
from reflector.storage import get_transcripts_storage
from reflector.utils.s3_temp_file import S3TemporaryFile
from datetime import datetime
storage = get_transcripts_storage()
# Generate a unique filename in evaluation folder
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
audio_filename = f"evaluation/diarization_temp/{timestamp}_{uuid.uuid4().hex}.wav"
# Use context manager for automatic cleanup
async with S3TemporaryFile(storage, audio_filename) as s3_file:
# Read and upload the audio file
with open(audio_temp_path, "rb") as f:
audio_data = f.read()
audio_url = await s3_file.upload(audio_data)
logger.info(f"Uploaded audio to S3: {audio_filename}")
# Create diarization input with S3 URL
diarization_input = AudioDiarizationInput(
audio_url=audio_url, topics=topics
)
# Run diarization
await diarization_processor.push(diarization_input)
await diarization_processor.flush()
logger.info("Diarization complete")
# File will be automatically cleaned up when exiting the context
else:
# For local backend, use local file path
audio_url = audio_temp_path
# Create diarization input
diarization_input = AudioDiarizationInput(
audio_url=audio_url, topics=topics
)
# Run diarization
await diarization_processor.push(diarization_input)
await diarization_processor.flush()
logger.info("Diarization complete")
except ImportError as e:
logger.error(f"Failed to import diarization dependencies: {e}")
logger.error(
"Install with: uv pip install pyannote.audio torch torchaudio"
)
logger.error(
"And set HF_TOKEN environment variable for pyannote models"
)
raise SystemExit(1)
except Exception as e:
logger.error(f"Diarization failed: {e}")
raise SystemExit(1)
else:
logger.warning("Skipping diarization: no topics available")
# Clean up temp file
if audio_temp_path:
try:
Path(audio_temp_path).unlink()
except Exception as e:
logger.warning(f"Failed to clean up temp file {audio_temp_path}: {e}")
logger.info("All done!")
if __name__ == "__main__":
import argparse
import os
parser = argparse.ArgumentParser(
description="Process audio files with optional speaker diarization"
)
parser.add_argument("source", help="Source file (mp3, wav, mp4...)")
parser.add_argument(
"--only-transcript",
"-t",
action="store_true",
help="Only generate transcript without topics/summaries",
)
parser.add_argument(
"--source-language", default="en", help="Source language code (default: en)"
)
parser.add_argument(
"--target-language", default="en", help="Target language code (default: en)"
)
parser.add_argument("--output", "-o", help="Output file (output.jsonl)")
parser.add_argument(
"--enable-diarization",
"-d",
action="store_true",
help="Enable speaker diarization",
)
parser.add_argument(
"--diarization-backend",
default="modal",
choices=["modal"],
help="Diarization backend to use (default: modal)",
)
args = parser.parse_args()
# Set REDIS_HOST to localhost if not provided
if "REDIS_HOST" not in os.environ:
os.environ["REDIS_HOST"] = "localhost"
logger.info("REDIS_HOST not set, defaulting to localhost")
output_fd = None
if args.output:
output_fd = open(args.output, "w")
async def event_callback(event: PipelineEvent):
processor = event.processor
data = event.data
# Ignore internal processors
if processor in (
"AudioChunkerProcessor",
"AudioMergeProcessor",
"AudioFileWriterProcessor",
"TopicCollectorProcessor",
"BroadcastProcessor",
):
return
# If diarization is enabled, skip the original topic events from the pipeline
# The diarization processor will emit the same topics but with speaker info
if processor == "TranscriptTopicDetectorProcessor" and args.enable_diarization:
return
# Log all events
logger.info(f"Event: {processor} - {type(data).__name__}")
# Write to output
if output_fd:
output_fd.write(event.model_dump_json())
output_fd.write("\n")
output_fd.flush()
asyncio.run(
process_audio_file_with_diarization(
args.source,
event_callback,
only_transcript=args.only_transcript,
source_language=args.source_language,
target_language=args.target_language,
enable_diarization=args.enable_diarization,
diarization_backend=args.diarization_backend,
)
)
if output_fd:
output_fd.close()
logger.info(f"Output written to {args.output}")

View File

@@ -0,0 +1,96 @@
#!/usr/bin/env python3
"""
@vibe-generated
Test script for the diarization CLI tool
=========================================
This script helps test the diarization functionality with sample audio files.
"""
import asyncio
import json
import sys
from pathlib import Path
from reflector.logger import logger
async def test_diarization(audio_file: str):
"""Test the diarization functionality"""
# Import the processing function
from process_with_diarization import process_audio_file_with_diarization
# Collect events
events = []
async def event_callback(event):
events.append({
"processor": event.processor,
"data": event.data
})
logger.info(f"Event from {event.processor}")
# Process the audio file
logger.info(f"Processing audio file: {audio_file}")
try:
await process_audio_file_with_diarization(
audio_file,
event_callback,
only_transcript=False,
source_language="en",
target_language="en",
enable_diarization=True,
diarization_backend="modal",
)
# Analyze results
logger.info(f"Processing complete. Received {len(events)} events")
# Look for diarization results
diarized_topics = []
for event in events:
if "TitleSummary" in event["processor"]:
# Check if words have speaker information
if hasattr(event["data"], "transcript") and event["data"].transcript:
words = event["data"].transcript.words
if words and hasattr(words[0], "speaker"):
speakers = set(w.speaker for w in words if hasattr(w, "speaker"))
logger.info(f"Found {len(speakers)} speakers in topic: {event['data'].title}")
diarized_topics.append(event["data"])
if diarized_topics:
logger.info(f"Successfully diarized {len(diarized_topics)} topics")
# Print sample output
sample_topic = diarized_topics[0]
logger.info("Sample diarized output:")
for i, word in enumerate(sample_topic.transcript.words[:10]):
logger.info(f" Word {i}: '{word.text}' - Speaker {word.speaker}")
else:
logger.warning("No diarization results found in output")
return events
except Exception as e:
logger.error(f"Error during processing: {e}")
raise
def main():
if len(sys.argv) < 2:
print("Usage: python test_diarization.py <audio_file>")
sys.exit(1)
audio_file = sys.argv[1]
if not Path(audio_file).exists():
print(f"Error: Audio file '{audio_file}' not found")
sys.exit(1)
# Run the test
asyncio.run(test_diarization(audio_file))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,149 @@
"""
@vibe-generated
S3 Temporary File Context Manager
Provides automatic cleanup of S3 files with retry logic and proper error handling.
"""
from typing import Optional
from reflector.storage.base import Storage
from reflector.logger import logger
from reflector.utils.retry import retry
class S3TemporaryFile:
"""
Async context manager for temporary S3 files with automatic cleanup.
Ensures that uploaded files are deleted even if exceptions occur during processing.
Uses retry logic for all S3 operations to handle transient failures.
Example:
async with S3TemporaryFile(storage, "temp/audio.wav") as s3_file:
url = await s3_file.upload(audio_data)
# Use url for processing
# File is automatically cleaned up here
"""
def __init__(self, storage: Storage, filepath: str):
"""
Initialize the temporary file context.
Args:
storage: Storage instance for S3 operations
filepath: S3 key/path for the temporary file
"""
self.storage = storage
self.filepath = filepath
self.uploaded = False
self._url: Optional[str] = None
async def __aenter__(self):
"""Enter the context manager."""
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""
Exit the context manager and clean up the file.
Cleanup is attempted even if an exception occurred during processing.
Cleanup failures are logged but don't raise exceptions.
"""
if self.uploaded:
try:
await self._delete_with_retry()
logger.info(f"Successfully cleaned up S3 file: {self.filepath}")
except Exception as e:
# Log the error but don't raise - we don't want cleanup failures
# to mask the original exception
logger.warning(
f"Failed to cleanup S3 file {self.filepath} after retries: {e}"
)
return False # Don't suppress exceptions
async def upload(self, data: bytes) -> str:
"""
Upload data to S3 and return the public URL.
Args:
data: File data to upload
Returns:
Public URL for the uploaded file
Raises:
Exception: If upload or URL generation fails after retries
"""
await self._upload_with_retry(data)
self.uploaded = True
self._url = await self._get_url_with_retry()
return self._url
@property
def url(self) -> Optional[str]:
"""Get the URL of the uploaded file, if available."""
return self._url
async def _upload_with_retry(self, data: bytes):
"""Upload file to S3 with retry logic."""
async def upload():
await self.storage.put_file(self.filepath, data)
logger.debug(f"Successfully uploaded file to S3: {self.filepath}")
return True # Return something to indicate success
await retry(upload)(
retry_attempts=3,
retry_timeout=30.0,
retry_backoff_interval=0.5,
retry_backoff_max=5.0,
)
async def _get_url_with_retry(self) -> str:
"""Get public URL for the file with retry logic."""
async def get_url():
url = await self.storage.get_file_url(self.filepath)
logger.debug(f"Generated public URL for S3 file: {self.filepath}")
return url
return await retry(get_url)(
retry_attempts=3,
retry_timeout=30.0,
retry_backoff_interval=0.5,
retry_backoff_max=5.0,
)
async def _delete_with_retry(self):
"""Delete file from S3 with retry logic."""
async def delete():
await self.storage.delete_file(self.filepath)
logger.debug(f"Successfully deleted S3 file: {self.filepath}")
return True # Return something to indicate success
await retry(delete)(
retry_attempts=3,
retry_timeout=30.0,
retry_backoff_interval=0.5,
retry_backoff_max=5.0,
)
# Convenience function for simpler usage
async def temporary_s3_file(storage: Storage, filepath: str):
"""
Create a temporary S3 file context manager.
This is a convenience wrapper around S3TemporaryFile for simpler usage.
Args:
storage: Storage instance for S3 operations
filepath: S3 key/path for the temporary file
Example:
async with temporary_s3_file(storage, "temp/audio.wav") as s3_file:
url = await s3_file.upload(audio_data)
# Use url for processing
"""
return S3TemporaryFile(storage, filepath)

View File

@@ -1,5 +1,6 @@
from datetime import datetime, timedelta
from typing import Annotated, Optional, Literal
import logging
import reflector.auth as auth
from fastapi import APIRouter, Depends, HTTPException
@@ -11,6 +12,10 @@ from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.settings import settings
from reflector.whereby import create_meeting, upload_logo
import asyncpg.exceptions
import sqlite3
logger = logging.getLogger(__name__)
router = APIRouter()
@@ -149,19 +154,47 @@ async def rooms_create_meeting(
if meeting is None:
end_date = current_time + timedelta(hours=8)
meeting = await create_meeting("", end_date=end_date, room=room)
await upload_logo(meeting["roomName"], "./images/logo.png")
meeting = await meetings_controller.create(
id=meeting["meetingId"],
room_name=meeting["roomName"],
room_url=meeting["roomUrl"],
host_room_url=meeting["hostRoomUrl"],
start_date=datetime.fromisoformat(meeting["startDate"]),
end_date=datetime.fromisoformat(meeting["endDate"]),
user_id=user_id,
room=room,
)
whereby_meeting = await create_meeting("", end_date=end_date, room=room)
await upload_logo(whereby_meeting["roomName"], "./images/logo.png")
# Now try to save to database
try:
meeting = await meetings_controller.create(
id=whereby_meeting["meetingId"],
room_name=whereby_meeting["roomName"],
room_url=whereby_meeting["roomUrl"],
host_room_url=whereby_meeting["hostRoomUrl"],
start_date=datetime.fromisoformat(whereby_meeting["startDate"]),
end_date=datetime.fromisoformat(whereby_meeting["endDate"]),
user_id=user_id,
room=room,
)
except (asyncpg.exceptions.UniqueViolationError, sqlite3.IntegrityError):
# Another request already created a meeting for this room
# Log this race condition occurrence
logger.info(
"Race condition detected for room %s - fetching existing meeting",
room.name,
)
logger.warning(
"Whereby meeting %s was created but not used (resource leak) for room %s",
whereby_meeting["meetingId"],
room.name,
)
# Fetch the meeting that was created by the other request
meeting = await meetings_controller.get_active(
room=room, current_time=current_time
)
if meeting is None:
# Edge case: meeting was created but expired/deleted between checks
logger.error(
"Meeting disappeared after race condition for room %s", room.name
)
raise HTTPException(
status_code=503, detail="Unable to join meeting - please try again"
)
if user_id != room.user_id:
meeting.host_room_url = ""

View File

@@ -8,7 +8,6 @@ from fastapi_pagination.ext.databases import paginate
from jose import jwt
from pydantic import BaseModel, Field, field_serializer
from reflector.db.meetings import meetings_controller
from reflector.db.migrate_user import migrate_user
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import (
SourceKind,
@@ -114,10 +113,6 @@ async def transcripts_list(
user_id = user["sub"] if user else None
# for fief to jwt migration, migrate user if needed
if user:
await migrate_user(email=user["email"], user_id=user["sub"])
return await paginate(
database,
await transcripts_controller.get_all(

View File

@@ -0,0 +1,129 @@
"""
@vibe-generated
Tests for S3 temporary file context manager.
"""
import pytest
from unittest.mock import Mock, AsyncMock
from reflector.utils.s3_temp_file import S3TemporaryFile
@pytest.mark.asyncio
async def test_successful_upload_and_cleanup():
"""Test that file is uploaded and cleaned up on success."""
# Mock storage
mock_storage = Mock()
mock_storage.put_file = AsyncMock()
mock_storage.get_file_url = AsyncMock(return_value="https://example.com/file.wav")
mock_storage.delete_file = AsyncMock()
# Use context manager
async with S3TemporaryFile(mock_storage, "test/file.wav") as s3_file:
url = await s3_file.upload(b"test data")
assert url == "https://example.com/file.wav"
assert s3_file.url == "https://example.com/file.wav"
# Verify operations
mock_storage.put_file.assert_called_once_with("test/file.wav", b"test data")
mock_storage.get_file_url.assert_called_once_with("test/file.wav")
mock_storage.delete_file.assert_called_once_with("test/file.wav")
@pytest.mark.asyncio
async def test_cleanup_on_exception():
"""Test that cleanup happens even when an exception occurs."""
# Mock storage
mock_storage = Mock()
mock_storage.put_file = AsyncMock()
mock_storage.get_file_url = AsyncMock(return_value="https://example.com/file.wav")
mock_storage.delete_file = AsyncMock()
# Use context manager with exception
with pytest.raises(ValueError):
async with S3TemporaryFile(mock_storage, "test/file.wav") as s3_file:
await s3_file.upload(b"test data")
raise ValueError("Simulated error during processing")
# Verify cleanup still happened
mock_storage.delete_file.assert_called_once_with("test/file.wav")
@pytest.mark.asyncio
async def test_no_cleanup_if_not_uploaded():
"""Test that cleanup is skipped if file was never uploaded."""
# Mock storage
mock_storage = Mock()
mock_storage.delete_file = AsyncMock()
# Use context manager without uploading
async with S3TemporaryFile(mock_storage, "test/file.wav"):
pass # Don't upload anything
# Verify no cleanup attempted
mock_storage.delete_file.assert_not_called()
@pytest.mark.asyncio
async def test_cleanup_failure_is_logged_not_raised():
"""Test that cleanup failures are logged but don't raise exceptions."""
# Mock storage
mock_storage = Mock()
mock_storage.put_file = AsyncMock()
mock_storage.get_file_url = AsyncMock(return_value="https://example.com/file.wav")
mock_storage.delete_file = AsyncMock(side_effect=Exception("Delete failed"))
# Use context manager - should not raise
async with S3TemporaryFile(mock_storage, "test/file.wav") as s3_file:
await s3_file.upload(b"test data")
# Verify delete was attempted (3 times due to retry)
assert mock_storage.delete_file.call_count == 3
@pytest.mark.asyncio
async def test_upload_retry_on_failure():
"""Test that upload is retried on failure."""
# Mock storage with failures then success
mock_storage = Mock()
mock_storage.put_file = AsyncMock(
side_effect=[Exception("Network error"), None] # Fail once, then succeed
)
mock_storage.get_file_url = AsyncMock(return_value="https://example.com/file.wav")
mock_storage.delete_file = AsyncMock()
# Use context manager
async with S3TemporaryFile(mock_storage, "test/file.wav") as s3_file:
url = await s3_file.upload(b"test data")
assert url == "https://example.com/file.wav"
# Verify upload was retried
assert mock_storage.put_file.call_count == 2
@pytest.mark.asyncio
async def test_delete_retry_on_failure():
"""Test that delete is retried on failure."""
# Mock storage
mock_storage = Mock()
mock_storage.put_file = AsyncMock()
mock_storage.get_file_url = AsyncMock(return_value="https://example.com/file.wav")
mock_storage.delete_file = AsyncMock(
side_effect=[Exception("Network error"), None] # Fail once, then succeed
)
# Use context manager
async with S3TemporaryFile(mock_storage, "test/file.wav") as s3_file:
await s3_file.upload(b"test data")
# Verify delete was retried
assert mock_storage.delete_file.call_count == 2
@pytest.mark.asyncio
async def test_properties_before_upload():
"""Test that properties work correctly before upload."""
mock_storage = Mock()
async with S3TemporaryFile(mock_storage, "test/file.wav") as s3_file:
assert s3_file.url is None
assert s3_file.uploaded is False

View File

@@ -1,6 +1,6 @@
import pytest
from unittest.mock import patch
from contextlib import asynccontextmanager
import pytest
from httpx import AsyncClient
@@ -261,67 +261,3 @@ async def test_transcript_mark_reviewed():
response = await ac.get(f"/transcripts/{tid}")
assert response.status_code == 200
assert response.json()["reviewed"] is True
@asynccontextmanager
async def patch_migrate_user():
with patch(
"reflector.db.migrate_user.users_to_migrate",
[["test@mail.com", "randomuserid", None]],
):
yield
@pytest.mark.asyncio
async def test_transcripts_list_authenticated_migration():
# XXX this test is a bit fragile, as it depends on the storage which
# is shared between tests
from reflector.app import app
testx1 = "testmigration1"
testx2 = "testmigration2"
async with patch_migrate_user(), AsyncClient(
app=app, base_url="http://test/v1"
) as ac:
# first ensure client 2 does not have any transcripts related to this test
async with authenticated_client2_ctx():
response = await ac.get("/transcripts")
assert response.status_code == 200
# assert len(response.json()["items"]) == 0
names = [t["name"] for t in response.json()["items"]]
assert testx1 not in names
assert testx2 not in names
# create 2 transcripts with client 1
async with authenticated_client_ctx():
response = await ac.post("/transcripts", json={"name": testx1})
assert response.status_code == 200
assert response.json()["name"] == testx1
response = await ac.post("/transcripts", json={"name": testx2})
assert response.status_code == 200
assert response.json()["name"] == testx2
response = await ac.get("/transcripts")
assert response.status_code == 200
assert len(response.json()["items"]) >= 2
names = [t["name"] for t in response.json()["items"]]
assert testx1 in names
assert testx2 in names
# now going back to client 2, migration should happen
async with authenticated_client2_ctx():
response = await ac.get("/transcripts")
assert response.status_code == 200
names = [t["name"] for t in response.json()["items"]]
assert testx1 in names
assert testx2 in names
# and client 1 should have nothing now
async with authenticated_client_ctx():
response = await ac.get("/transcripts")
assert response.status_code == 200
names = [t["name"] for t in response.json()["items"]]
assert testx1 not in names
assert testx2 not in names

43
server/uv.lock generated
View File

@@ -817,25 +817,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ad/69/28359d152f9e2ec1ff4dff3da47011b6346e9a472f89b409bb13017a7d1f/faster_whisper-1.1.1-py3-none-any.whl", hash = "sha256:5808dc334fb64fb4336921450abccfe5e313a859b31ba61def0ac7f639383d90", size = 1118368, upload-time = "2025-01-01T14:47:16.131Z" },
]
[[package]]
name = "fief-client"
version = "0.20.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "httpx" },
{ name = "jwcrypto" },
]
sdist = { url = "https://files.pythonhosted.org/packages/75/af/f6cc3ded8bdb901097b92a3ed444c48576a1b62f01352cb2fa069b0dd166/fief_client-0.20.0.tar.gz", hash = "sha256:dbfb906d03c4a5402ceac5c843aa4708535fb6f5d5c1c4e263ec06fbbbc434d7", size = 32465, upload-time = "2024-10-13T11:54:08.793Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1c/06/d33506317b4c9b71025eb010d96c4f7a8f89fa620ca30532c2e8e4390593/fief_client-0.20.0-py3-none-any.whl", hash = "sha256:425f40cc7c45c651daec63da402e033c53d91dcaa3f9bf208873fd8692fc16dc", size = 20219, upload-time = "2024-10-13T11:54:07.342Z" },
]
[package.optional-dependencies]
fastapi = [
{ name = "fastapi" },
{ name = "makefun" },
]
[[package]]
name = "filelock"
version = "3.18.0"
@@ -1211,19 +1192,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/01/0e/b27cdbaccf30b890c40ed1da9fd4a3593a5cf94dae54fb34f8a4b74fcd3f/jsonschema_specifications-2025.4.1-py3-none-any.whl", hash = "sha256:4653bffbd6584f7de83a67e0d620ef16900b390ddc7939d56684d6c81e33f1af", size = 18437, upload-time = "2025-04-23T12:34:05.422Z" },
]
[[package]]
name = "jwcrypto"
version = "1.5.6"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "cryptography" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/e1/db/870e5d5fb311b0bcf049630b5ba3abca2d339fd5e13ba175b4c13b456d08/jwcrypto-1.5.6.tar.gz", hash = "sha256:771a87762a0c081ae6166958a954f80848820b2ab066937dc8b8379d65b1b039", size = 87168, upload-time = "2024-03-06T19:58:31.831Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cd/58/4a1880ea64032185e9ae9f63940c9327c6952d5584ea544a8f66972f2fda/jwcrypto-1.5.6-py3-none-any.whl", hash = "sha256:150d2b0ebbdb8f40b77f543fb44ffd2baeff48788be71f67f03566692fd55789", size = 92520, upload-time = "2024-03-06T19:58:29.765Z" },
]
[[package]]
name = "kombu"
version = "5.5.4"
@@ -1299,15 +1267,6 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/0c/29/0348de65b8cc732daa3e33e67806420b2ae89bdce2b04af740289c5c6c8c/loguru-0.7.3-py3-none-any.whl", hash = "sha256:31a33c10c8e1e10422bfd431aeb5d351c7cf7fa671e3c4df004162264b28220c", size = 61595, upload-time = "2024-12-06T11:20:54.538Z" },
]
[[package]]
name = "makefun"
version = "1.16.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/7b/cf/6780ab8bc3b84a1cce3e4400aed3d64b6db7d5e227a2f75b6ded5674701a/makefun-1.16.0.tar.gz", hash = "sha256:e14601831570bff1f6d7e68828bcd30d2f5856f24bad5de0ccb22921ceebc947", size = 73565, upload-time = "2025-05-09T15:00:42.313Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/c0/4bc973defd1270b89ccaae04cef0d5fa3ea85b59b108ad2c08aeea9afb76/makefun-1.16.0-py2.py3-none-any.whl", hash = "sha256:43baa4c3e7ae2b17de9ceac20b669e9a67ceeadff31581007cca20a07bbe42c4", size = 22923, upload-time = "2025-05-09T15:00:41.042Z" },
]
[[package]]
name = "mako"
version = "1.3.10"
@@ -2172,7 +2131,6 @@ dependencies = [
{ name = "fastapi", extra = ["standard"] },
{ name = "fastapi-pagination" },
{ name = "faster-whisper" },
{ name = "fief-client", extra = ["fastapi"] },
{ name = "httpx" },
{ name = "jsonschema" },
{ name = "loguru" },
@@ -2234,7 +2192,6 @@ requires-dist = [
{ name = "fastapi", extras = ["standard"], specifier = ">=0.100.1" },
{ name = "fastapi-pagination", specifier = ">=0.12.6" },
{ name = "faster-whisper", specifier = ">=0.10.0" },
{ name = "fief-client", extras = ["fastapi"], specifier = ">=0.17.0" },
{ name = "httpx", specifier = ">=0.24.1" },
{ name = "jsonschema", specifier = ">=4.23.0" },
{ name = "loguru", specifier = ">=0.7.0" },

View File

@@ -20,10 +20,11 @@ export default function FilterSidebar({
const sharedRooms = rooms.filter((room) => room.is_shared);
return (
<Box w={{ base: "full", md: "300px" }} p={4} bg="gray.100" rounded="md">
<Stack gap={3}>
<Box w={{ base: "full", md: "200px" }} p={4} bg="gray.100" rounded="md">
<Stack gap={2}>
<Link
as={NextLink}
fontSize="sm"
href="#"
onClick={() => onFilterChange(null, "")}
color={selectedSourceKind === null ? "blue.500" : "gray.600"}
@@ -36,7 +37,7 @@ export default function FilterSidebar({
{myRooms.length > 0 && (
<>
<Heading size="md">My Rooms</Heading>
<Heading size="sm">My Rooms</Heading>
{myRooms.map((room) => (
<Link
@@ -54,7 +55,7 @@ export default function FilterSidebar({
? "bold"
: "normal"
}
ml={4}
fontSize="sm"
>
{room.name}
</Link>
@@ -64,7 +65,7 @@ export default function FilterSidebar({
{sharedRooms.length > 0 && (
<>
<Heading size="md">Shared Rooms</Heading>
<Heading size="sm">Shared Rooms</Heading>
{sharedRooms.map((room) => (
<Link
@@ -82,7 +83,7 @@ export default function FilterSidebar({
? "bold"
: "normal"
}
ml={4}
fontSize="sm"
>
{room.name}
</Link>
@@ -98,6 +99,7 @@ export default function FilterSidebar({
color={selectedSourceKind === "live" ? "blue.500" : "gray.600"}
_hover={{ color: "blue.300" }}
fontWeight={selectedSourceKind === "live" ? "bold" : "normal"}
fontSize="sm"
>
Live Transcripts
</Link>
@@ -108,6 +110,7 @@ export default function FilterSidebar({
color={selectedSourceKind === "file" ? "blue.500" : "gray.600"}
_hover={{ color: "blue.300" }}
fontWeight={selectedSourceKind === "file" ? "bold" : "normal"}
fontSize="sm"
>
Uploaded Files
</Link>

View File

@@ -18,7 +18,7 @@ export function RoomActionsMenu({
return (
<Menu.Root closeOnSelect={true} lazyMount={true}>
<Menu.Trigger asChild>
<IconButton aria-label="actions">
<IconButton aria-label="actions" variant="ghost">
<LuMenu />
</IconButton>
</Menu.Trigger>

View File

@@ -118,7 +118,7 @@ const useConsentDialog = (
return (
<Button
ref={buttonRef}
colorPalette="blue"
colorPalette="primary"
size="sm"
onClick={() => {
handleConsent(meetingId, true).then(() => {
@@ -147,9 +147,8 @@ const useConsentDialog = (
recording on our servers?
</Text>
<HStack gap={4} justifyContent="center">
<AcceptButton />
<Button
colorPalette="gray"
variant="ghost"
size="sm"
onClick={() => {
handleConsent(meetingId, false).then(() => {
@@ -160,6 +159,7 @@ const useConsentDialog = (
>
No, delete after transcription
</Button>
<AcceptButton />
</HStack>
</VStack>
</Box>

View File

@@ -1,7 +1,14 @@
"use client";
// Simple toaster implementation for migration
// This is a temporary solution until we properly configure Chakra UI v3 toasts
import {
createContext,
useContext,
useState,
useEffect,
useCallback,
} from "react";
import { createPortal } from "react-dom";
import { Box } from "@chakra-ui/react";
interface ToastOptions {
placement?: string;
@@ -9,41 +16,162 @@ interface ToastOptions {
render: (props: { dismiss: () => void }) => React.ReactNode;
}
interface Toast extends ToastOptions {
id: string;
}
interface ToasterContextType {
toasts: Toast[];
addToast: (options: ToastOptions) => string;
removeToast: (id: string) => void;
}
const ToasterContext = createContext<ToasterContextType | null>(null);
export const ToasterProvider = ({
children,
}: {
children: React.ReactNode;
}) => {
const [toasts, setToasts] = useState<Toast[]>([]);
const addToast = useCallback((options: ToastOptions) => {
const id = String(Date.now() + Math.random());
setToasts((prev) => [...prev, { ...options, id }]);
if (options.duration !== null) {
setTimeout(() => {
removeToast(id);
}, options.duration || 5000);
}
return id;
}, []);
const removeToast = useCallback((id: string) => {
setToasts((prev) => prev.filter((toast) => toast.id !== id));
}, []);
return (
<ToasterContext.Provider value={{ toasts, addToast, removeToast }}>
{children}
<ToastContainer />
</ToasterContext.Provider>
);
};
const ToastContainer = () => {
const context = useContext(ToasterContext);
const [mounted, setMounted] = useState(false);
useEffect(() => {
setMounted(true);
}, []);
if (!context || !mounted) return null;
return createPortal(
<Box
position="fixed"
top="20px"
left="50%"
transform="translateX(-50%)"
zIndex={9999}
pointerEvents="none"
>
{context.toasts.map((toast) => (
<Box key={toast.id} mb={3} pointerEvents="auto">
{toast.render({ dismiss: () => context.removeToast(toast.id) })}
</Box>
))}
</Box>,
document.body,
);
};
class ToasterClass {
private toasts: Map<string, any> = new Map();
private listeners: ((action: { type: string; payload: any }) => void)[] = [];
private nextId = 1;
private toastsMap: Map<string, boolean> = new Map();
subscribe(listener: (action: { type: string; payload: any }) => void) {
this.listeners.push(listener);
return () => {
this.listeners = this.listeners.filter((l) => l !== listener);
};
}
private notify(action: { type: string; payload: any }) {
this.listeners.forEach((listener) => listener(action));
}
create(options: ToastOptions): Promise<string> {
const id = String(this.nextId++);
this.toasts.set(id, options);
this.toastsMap.set(id, true);
this.notify({ type: "ADD_TOAST", payload: { ...options, id } });
// For now, we'll render toasts using a portal or modal
// This is a simplified implementation
if (typeof window !== "undefined") {
console.log("Toast created:", id, options);
// Auto-dismiss after duration if specified
if (options.duration !== null) {
setTimeout(() => {
this.dismiss(id);
}, options.duration || 5000);
}
if (options.duration !== null) {
setTimeout(() => {
this.dismiss(id);
}, options.duration || 5000);
}
return Promise.resolve(id);
}
dismiss(id: string) {
this.toasts.delete(id);
console.log("Toast dismissed:", id);
this.toastsMap.delete(id);
this.notify({ type: "REMOVE_TOAST", payload: id });
}
isActive(id: string): boolean {
return this.toasts.has(id);
return this.toastsMap.has(id);
}
}
export const toaster = new ToasterClass();
// Empty Toaster component for now
export const Toaster = () => null;
// Bridge component to connect the class-based API with React
export const Toaster = () => {
const [toasts, setToasts] = useState<Toast[]>([]);
useEffect(() => {
const unsubscribe = toaster.subscribe((action) => {
if (action.type === "ADD_TOAST") {
setToasts((prev) => [...prev, action.payload]);
} else if (action.type === "REMOVE_TOAST") {
setToasts((prev) =>
prev.filter((toast) => toast.id !== action.payload),
);
}
});
return unsubscribe;
}, []);
const [mounted, setMounted] = useState(false);
useEffect(() => {
setMounted(true);
}, []);
if (!mounted) return null;
return createPortal(
<Box
position="fixed"
top="20px"
left="50%"
transform="translateX(-50%)"
zIndex={9999}
pointerEvents="none"
>
{toasts.map((toast) => (
<Box key={toast.id} mb={3} pointerEvents="auto">
{toast.render({ dismiss: () => toaster.dismiss(toast.id) })}
</Box>
))}
</Box>,
document.body,
);
};

View File

@@ -4,11 +4,15 @@ import { ChakraProvider } from "@chakra-ui/react";
import system from "./styles/theme";
import { WherebyProvider } from "@whereby.com/browser-sdk/react";
import { Toaster } from "./components/ui/toaster";
export function Providers({ children }: { children: React.ReactNode }) {
return (
<ChakraProvider value={system}>
<WherebyProvider>{children}</WherebyProvider>
<WherebyProvider>
{children}
<Toaster />
</WherebyProvider>
</ChakraProvider>
);
}