Compare commits

...

5 Commits

Author SHA1 Message Date
Igor Loskutov
02b269ad6d zombie meetings ignore (no-mistakes) 2025-11-25 19:35:50 -05:00
Igor Monadical
689c8075cc transcription reprocess doc (#743)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-25 17:05:46 -05:00
201671368a chore(main): release 0.20.0 (#740) 2025-11-25 16:32:49 -05:00
Igor Monadical
86d5e26224 feat: transcript restart script (#742)
* transcript restart script

* fix tests?

* remove useless comment

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-25 16:28:43 -05:00
9bec39808f feat: link transcript participants (#737)
* Sync authentik users

* Migrate user_id from uid to id

* Fix auth user id

* Fix ci migration test

* Fix meeting token creation

* Move user id migration to a script

* Add user on first login

* Fix migration chain

* Rename uid column to authentik_uid

* Fix broken ws test
2025-11-25 19:13:19 +01:00
21 changed files with 986 additions and 99 deletions

View File

@@ -1,5 +1,13 @@
# Changelog
## [0.20.0](https://github.com/Monadical-SAS/reflector/compare/v0.19.0...v0.20.0) (2025-11-25)
### Features
* link transcript participants ([#737](https://github.com/Monadical-SAS/reflector/issues/737)) ([9bec398](https://github.com/Monadical-SAS/reflector/commit/9bec39808fc6322612d8b87e922a6f7901fc01c1))
* transcript restart script ([#742](https://github.com/Monadical-SAS/reflector/issues/742)) ([86d5e26](https://github.com/Monadical-SAS/reflector/commit/86d5e26224bb55a0f1cc785aeda52065bb92ee6f))
## [0.19.0](https://github.com/Monadical-SAS/reflector/compare/v0.18.0...v0.19.0) (2025-11-25)

View File

@@ -168,6 +168,12 @@ You can manually process an audio file by calling the process tool:
uv run python -m reflector.tools.process path/to/audio.wav
```
## Reprocessing any transcription
```bash
uv run -m reflector.tools.process_transcript 81ec38d1-9dd7-43d2-b3f8-51f4d34a07cd --sync
```
## Build-time env variables
Next.js projects are more used to NEXT_PUBLIC_ prefixed buildtime vars. We don't have those for the reason we need to serve a ccustomizable prebuild docker container.

View File

@@ -0,0 +1,38 @@
"""add user table
Revision ID: bbafedfa510c
Revises: 5d6b9df9b045
Create Date: 2025-11-19 21:06:30.543262
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "bbafedfa510c"
down_revision: Union[str, None] = "5d6b9df9b045"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"user",
sa.Column("id", sa.String(), nullable=False),
sa.Column("email", sa.String(), nullable=False),
sa.Column("authentik_uid", sa.String(), nullable=False),
sa.Column("created_at", sa.DateTime(timezone=True), nullable=False),
sa.Column("updated_at", sa.DateTime(timezone=True), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
with op.batch_alter_table("user", schema=None) as batch_op:
batch_op.create_index("idx_user_authentik_uid", ["authentik_uid"], unique=True)
batch_op.create_index("idx_user_email", ["email"], unique=False)
def downgrade() -> None:
op.drop_table("user")

View File

@@ -6,8 +6,10 @@ from jose import JWTError, jwt
from pydantic import BaseModel
from reflector.db.user_api_keys import user_api_keys_controller
from reflector.db.users import user_controller
from reflector.logger import logger
from reflector.settings import settings
from reflector.utils import generate_uuid4
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
@@ -74,9 +76,21 @@ async def _authenticate_user(
if jwt_token:
try:
payload = jwtauth.verify_token(jwt_token)
sub = payload["sub"]
authentik_uid = payload["sub"]
email = payload["email"]
user_infos.append(UserInfo(sub=sub, email=email))
user = await user_controller.get_by_authentik_uid(authentik_uid)
if not user:
logger.info(
f"Creating new user on first login: {authentik_uid} ({email})"
)
user = await user_controller.create_or_update(
id=generate_uuid4(),
authentik_uid=authentik_uid,
email=email,
)
user_infos.append(UserInfo(sub=user.id, email=email))
except JWTError as e:
logger.error(f"JWT error: {e}")
raise HTTPException(status_code=401, detail="Invalid authentication")

View File

@@ -195,7 +195,6 @@ def parse_recording_error(event: DailyWebhookEvent) -> RecordingErrorPayload:
return RecordingErrorPayload(**event.payload)
# Webhook event type to parser mapping
WEBHOOK_PARSERS = {
"participant.joined": parse_participant_joined,
"participant.left": parse_participant_left,

View File

@@ -31,6 +31,7 @@ import reflector.db.recordings # noqa
import reflector.db.rooms # noqa
import reflector.db.transcripts # noqa
import reflector.db.user_api_keys # noqa
import reflector.db.users # noqa
kwargs = {}
if "postgres" not in settings.DATABASE_URL:

View File

@@ -0,0 +1,92 @@
"""User table for storing Authentik user information."""
from datetime import datetime, timezone
import sqlalchemy
from pydantic import BaseModel, Field
from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4
from reflector.utils.string import NonEmptyString
users = sqlalchemy.Table(
"user",
metadata,
sqlalchemy.Column("id", sqlalchemy.String, primary_key=True),
sqlalchemy.Column("email", sqlalchemy.String, nullable=False),
sqlalchemy.Column("authentik_uid", sqlalchemy.String, nullable=False),
sqlalchemy.Column("created_at", sqlalchemy.DateTime(timezone=True), nullable=False),
sqlalchemy.Column("updated_at", sqlalchemy.DateTime(timezone=True), nullable=False),
sqlalchemy.Index("idx_user_authentik_uid", "authentik_uid", unique=True),
sqlalchemy.Index("idx_user_email", "email", unique=False),
)
class User(BaseModel):
id: NonEmptyString = Field(default_factory=generate_uuid4)
email: NonEmptyString
authentik_uid: NonEmptyString
created_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
updated_at: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
class UserController:
@staticmethod
async def get_by_id(user_id: NonEmptyString) -> User | None:
query = users.select().where(users.c.id == user_id)
result = await get_database().fetch_one(query)
return User(**result) if result else None
@staticmethod
async def get_by_authentik_uid(authentik_uid: NonEmptyString) -> User | None:
query = users.select().where(users.c.authentik_uid == authentik_uid)
result = await get_database().fetch_one(query)
return User(**result) if result else None
@staticmethod
async def get_by_email(email: NonEmptyString) -> User | None:
query = users.select().where(users.c.email == email)
result = await get_database().fetch_one(query)
return User(**result) if result else None
@staticmethod
async def create_or_update(
id: NonEmptyString, authentik_uid: NonEmptyString, email: NonEmptyString
) -> User:
existing = await UserController.get_by_authentik_uid(authentik_uid)
now = datetime.now(timezone.utc)
if existing:
query = (
users.update()
.where(users.c.authentik_uid == authentik_uid)
.values(email=email, updated_at=now)
)
await get_database().execute(query)
return User(
id=existing.id,
authentik_uid=authentik_uid,
email=email,
created_at=existing.created_at,
updated_at=now,
)
else:
user = User(
id=id,
authentik_uid=authentik_uid,
email=email,
created_at=now,
updated_at=now,
)
query = users.insert().values(**user.model_dump())
await get_database().execute(query)
return user
@staticmethod
async def list_all() -> list[User]:
query = users.select().order_by(users.c.created_at.desc())
results = await get_database().fetch_all(query)
return [User(**r) for r in results]
user_controller = UserController()

View File

@@ -0,0 +1,169 @@
"""
Transcript processing service - shared logic for HTTP endpoints and Celery tasks.
This module provides result-based error handling that works in both contexts:
- HTTP endpoint: converts errors to HTTPException
- Celery task: converts errors to Exception
"""
from dataclasses import dataclass
from typing import Literal, Union
import celery
from celery.result import AsyncResult
from reflector.db.recordings import recordings_controller
from reflector.db.transcripts import Transcript
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.match import absurd
from reflector.utils.string import NonEmptyString
@dataclass
class ProcessError:
detail: NonEmptyString
@dataclass
class FileProcessingConfig:
transcript_id: NonEmptyString
mode: Literal["file"] = "file"
@dataclass
class MultitrackProcessingConfig:
transcript_id: NonEmptyString
bucket_name: NonEmptyString
track_keys: list[str]
mode: Literal["multitrack"] = "multitrack"
ProcessingConfig = Union[FileProcessingConfig, MultitrackProcessingConfig]
PrepareResult = Union[ProcessingConfig, ProcessError]
@dataclass
class ValidationOk:
# transcript currently doesnt always have recording_id
recording_id: NonEmptyString | None
transcript_id: NonEmptyString
@dataclass
class ValidationLocked:
detail: NonEmptyString
@dataclass
class ValidationNotReady:
detail: NonEmptyString
@dataclass
class ValidationAlreadyScheduled:
detail: NonEmptyString
ValidationError = Union[
ValidationNotReady, ValidationLocked, ValidationAlreadyScheduled
]
ValidationResult = Union[ValidationOk, ValidationError]
@dataclass
class DispatchOk:
status: Literal["ok"] = "ok"
@dataclass
class DispatchAlreadyRunning:
status: Literal["already_running"] = "already_running"
DispatchResult = Union[
DispatchOk, DispatchAlreadyRunning, ProcessError, ValidationError
]
async def validate_transcript_for_processing(
transcript: Transcript,
) -> ValidationResult:
if transcript.locked:
return ValidationLocked(detail="Recording is locked")
if transcript.status == "idle":
return ValidationNotReady(detail="Recording is not ready for processing")
if task_is_scheduled_or_active(
"reflector.pipelines.main_file_pipeline.task_pipeline_file_process",
transcript_id=transcript.id,
) or task_is_scheduled_or_active(
"reflector.pipelines.main_multitrack_pipeline.task_pipeline_multitrack_process",
transcript_id=transcript.id,
):
return ValidationAlreadyScheduled(detail="already running")
return ValidationOk(
recording_id=transcript.recording_id, transcript_id=transcript.id
)
async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResult:
"""
Determine processing mode from transcript/recording data.
"""
bucket_name: str | None = None
track_keys: list[str] | None = None
if validation.recording_id:
recording = await recordings_controller.get_by_id(validation.recording_id)
if recording:
bucket_name = recording.bucket_name
track_keys = recording.track_keys
if track_keys is not None and len(track_keys) == 0:
return ProcessError(
detail="No track keys found, must be either > 0 or None",
)
if track_keys is not None and not bucket_name:
return ProcessError(
detail="Bucket name must be specified",
)
if track_keys:
return MultitrackProcessingConfig(
bucket_name=bucket_name, # type: ignore (validated above)
track_keys=track_keys,
transcript_id=validation.transcript_id,
)
return FileProcessingConfig(
transcript_id=validation.transcript_id,
)
def dispatch_transcript_processing(config: ProcessingConfig) -> AsyncResult:
if isinstance(config, MultitrackProcessingConfig):
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
track_keys=config.track_keys,
)
elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else:
absurd(config)
def task_is_scheduled_or_active(task_name: str, **kwargs):
inspect = celery.current_app.control.inspect()
for worker, tasks in (inspect.scheduled() | inspect.active()).items():
for task in tasks:
if task["name"] == task_name and task["kwargs"] == kwargs:
return True
return False

View File

@@ -0,0 +1,127 @@
"""
Process transcript by ID - auto-detects multitrack vs file pipeline.
Usage:
uv run -m reflector.tools.process_transcript <transcript_id>
# Or via docker:
docker compose exec server uv run -m reflector.tools.process_transcript <transcript_id>
"""
import argparse
import asyncio
import sys
import time
from typing import Callable
from celery.result import AsyncResult
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.services.transcript_process import (
FileProcessingConfig,
MultitrackProcessingConfig,
PrepareResult,
ProcessError,
ValidationError,
ValidationResult,
dispatch_transcript_processing,
prepare_transcript_processing,
validate_transcript_for_processing,
)
async def process_transcript_inner(
transcript: Transcript,
on_validation: Callable[[ValidationResult], None],
on_preprocess: Callable[[PrepareResult], None],
) -> AsyncResult:
validation = await validate_transcript_for_processing(transcript)
on_validation(validation)
config = await prepare_transcript_processing(validation)
on_preprocess(config)
return dispatch_transcript_processing(config)
async def process_transcript(transcript_id: str, sync: bool = False) -> None:
"""
Process a transcript by ID, auto-detecting multitrack vs file pipeline.
Args:
transcript_id: The transcript UUID
sync: If True, wait for task completion. If False, dispatch and exit.
"""
from reflector.db import get_database
database = get_database()
await database.connect()
try:
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
print(f"Error: Transcript {transcript_id} not found", file=sys.stderr)
sys.exit(1)
print(f"Found transcript: {transcript.title or transcript_id}", file=sys.stderr)
print(f" Status: {transcript.status}", file=sys.stderr)
print(f" Recording ID: {transcript.recording_id or 'None'}", file=sys.stderr)
def on_validation(validation: ValidationResult) -> None:
if isinstance(validation, ValidationError):
print(f"Error: {validation.detail}", file=sys.stderr)
sys.exit(1)
def on_preprocess(config: PrepareResult) -> None:
if isinstance(config, ProcessError):
print(f"Error: {config.detail}", file=sys.stderr)
sys.exit(1)
elif isinstance(config, MultitrackProcessingConfig):
print(f"Dispatching multitrack pipeline", file=sys.stderr)
print(f" Bucket: {config.bucket_name}", file=sys.stderr)
print(f" Tracks: {len(config.track_keys)}", file=sys.stderr)
elif isinstance(config, FileProcessingConfig):
print(f"Dispatching file pipeline", file=sys.stderr)
result = await process_transcript_inner(
transcript, on_validation=on_validation, on_preprocess=on_preprocess
)
if sync:
print("Waiting for task completion...", file=sys.stderr)
while not result.ready():
print(f" Status: {result.state}", file=sys.stderr)
time.sleep(5)
if result.successful():
print("Task completed successfully", file=sys.stderr)
else:
print(f"Task failed: {result.result}", file=sys.stderr)
sys.exit(1)
else:
print(
"Task dispatched (use --sync to wait for completion)", file=sys.stderr
)
finally:
await database.disconnect()
def main():
parser = argparse.ArgumentParser(
description="Process transcript by ID - auto-detects multitrack vs file pipeline"
)
parser.add_argument(
"transcript_id",
help="Transcript UUID to process",
)
parser.add_argument(
"--sync",
action="store_true",
help="Wait for task completion instead of just dispatching",
)
args = parser.parse_args()
asyncio.run(process_transcript(args.transcript_id, sync=args.sync))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,10 @@
from typing import NoReturn
def assert_exhaustiveness(x: NoReturn) -> NoReturn:
"""Provide an assertion at type-check time that this function is never called."""
raise AssertionError(f"Invalid value: {x!r}")
def absurd(x: NoReturn) -> NoReturn:
return assert_exhaustiveness(x)

View File

@@ -337,19 +337,7 @@ async def rooms_create_meeting(
status_code=503, detail="Meeting creation in progress, please try again"
)
if meeting.platform == "daily" and room.recording_trigger != "none":
client = create_platform_client(meeting.platform)
token = await client.create_meeting_token(
meeting.room_name,
enable_recording=True,
user_id=user_id,
)
meeting = meeting.model_copy()
meeting.room_url = add_query_param(meeting.room_url, "t", token)
if meeting.host_room_url:
meeting.host_room_url = add_query_param(meeting.host_room_url, "t", token)
if user_id != room.user_id:
if user_id != room.user_id and meeting.platform == "whereby":
meeting.host_room_url = ""
return meeting
@@ -508,7 +496,8 @@ async def rooms_list_active_meetings(
if user_id != room.user_id:
for meeting in meetings:
meeting.host_room_url = ""
if meeting.platform == "whereby":
meeting.host_room_url = ""
return meetings
@@ -530,7 +519,7 @@ async def rooms_get_meeting(
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
if user_id != room.user_id and not room.is_shared:
if user_id != room.user_id and not room.is_shared and meeting.platform == "whereby":
meeting.host_room_url = ""
return meeting
@@ -560,7 +549,20 @@ async def rooms_join_meeting(
if meeting.end_date <= current_time:
raise HTTPException(status_code=400, detail="Meeting has ended")
if user_id != room.user_id:
if meeting.platform == "daily":
client = create_platform_client(meeting.platform)
enable_recording = room.recording_trigger != "none"
token = await client.create_meeting_token(
meeting.room_name,
enable_recording=enable_recording,
user_id=user_id,
)
meeting = meeting.model_copy()
meeting.room_url = add_query_param(meeting.room_url, "t", token)
if meeting.host_room_url:
meeting.host_room_url = add_query_param(meeting.host_room_url, "t", token)
if user_id != room.user_id and meeting.platform == "whereby":
meeting.host_room_url = ""
return meeting

View File

@@ -1,16 +1,21 @@
from typing import Annotated, Optional
import celery
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
import reflector.auth as auth
from reflector.db.recordings import recordings_controller
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
from reflector.services.transcript_process import (
ProcessError,
ValidationAlreadyScheduled,
ValidationError,
ValidationLocked,
ValidationOk,
dispatch_transcript_processing,
prepare_transcript_processing,
validate_transcript_for_processing,
)
from reflector.utils.match import absurd
router = APIRouter()
@@ -23,68 +28,28 @@ class ProcessStatus(BaseModel):
async def transcript_process(
transcript_id: str,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
):
) -> ProcessStatus:
user_id = user["sub"] if user else None
transcript = await transcripts_controller.get_by_id_for_http(
transcript_id, user_id=user_id
)
if transcript.locked:
raise HTTPException(status_code=400, detail="Transcript is locked")
if transcript.status == "idle":
raise HTTPException(
status_code=400, detail="Recording is not ready for processing"
)
# avoid duplicate scheduling for either pipeline
if task_is_scheduled_or_active(
"reflector.pipelines.main_file_pipeline.task_pipeline_file_process",
transcript_id=transcript_id,
) or task_is_scheduled_or_active(
"reflector.pipelines.main_multitrack_pipeline.task_pipeline_multitrack_process",
transcript_id=transcript_id,
):
return ProcessStatus(status="already running")
# Determine processing mode strictly from DB to avoid S3 scans
bucket_name = None
track_keys: list[str] = []
if transcript.recording_id:
recording = await recordings_controller.get_by_id(transcript.recording_id)
if recording:
bucket_name = recording.bucket_name
track_keys = recording.track_keys
if track_keys is not None and len(track_keys) == 0:
raise HTTPException(
status_code=500,
detail="No track keys found, must be either > 0 or None",
)
if track_keys is not None and not bucket_name:
raise HTTPException(
status_code=500, detail="Bucket name must be specified"
)
if track_keys:
task_pipeline_multitrack_process.delay(
transcript_id=transcript_id,
bucket_name=bucket_name,
track_keys=track_keys,
)
validation = await validate_transcript_for_processing(transcript)
if isinstance(validation, ValidationLocked):
raise HTTPException(status_code=400, detail=validation.detail)
elif isinstance(validation, ValidationError):
raise HTTPException(status_code=400, detail=validation.detail)
elif isinstance(validation, ValidationAlreadyScheduled):
return ProcessStatus(status=validation.detail)
elif isinstance(validation, ValidationOk):
pass
else:
# Default single-file pipeline
task_pipeline_file_process.delay(transcript_id=transcript_id)
absurd(validation)
return ProcessStatus(status="ok")
config = await prepare_transcript_processing(validation)
def task_is_scheduled_or_active(task_name: str, **kwargs):
inspect = celery.current_app.control.inspect()
for worker, tasks in (inspect.scheduled() | inspect.active()).items():
for task in tasks:
if task["name"] == task_name and task["kwargs"] == kwargs:
return True
return False
if isinstance(config, ProcessError):
raise HTTPException(status_code=500, detail=config.detail)
else:
dispatch_transcript_processing(config)
return ProcessStatus(status="ok")

View File

@@ -3,6 +3,7 @@ from typing import Optional
from fastapi import APIRouter, WebSocket
from reflector.auth.auth_jwt import JWTAuth # type: ignore
from reflector.db.users import user_controller
from reflector.ws_manager import get_ws_manager
router = APIRouter()
@@ -29,7 +30,18 @@ async def user_events_websocket(websocket: WebSocket):
try:
payload = JWTAuth().verify_token(token)
user_id = payload.get("sub")
authentik_uid = payload.get("sub")
if authentik_uid:
user = await user_controller.get_by_authentik_uid(authentik_uid)
if user:
user_id = user.id
else:
await websocket.close(code=UNAUTHORISED)
return
else:
await websocket.close(code=UNAUTHORISED)
return
except Exception:
await websocket.close(code=UNAUTHORISED)
return

View File

@@ -240,6 +240,8 @@ async def _process_multitrack_recording_inner(
)
meeting = await meetings_controller.get_by_room_name(daily_room_name)
if not meeting:
raise Exception(f"Meeting not found: {daily_room_name}")
room_name_base = extract_base_room_name(daily_room_name)
@@ -247,9 +249,6 @@ async def _process_multitrack_recording_inner(
if not room:
raise Exception(f"Room not found: {room_name_base}")
if not meeting:
raise Exception(f"Meeting not found: {room_name_base}")
logger.info(
"Found existing Meeting for recording",
meeting_id=meeting.id,
@@ -446,6 +445,15 @@ async def poll_daily_recordings():
)
continue
meeting = await meetings_controller.get_by_room_name(recording.room_name)
if not meeting:
logger.warning(
"Skipping recording - no matching meeting",
recording_id=recording.id,
room_name=recording.room_name,
)
continue
logger.info(
"Queueing missing recording for processing",
recording_id=recording.id,

View File

@@ -0,0 +1,292 @@
#!/usr/bin/env python3
"""
Manual Migration Script: Migrate user_id from Authentik UID to internal user.id
This script should be run manually AFTER applying the database schema migrations.
Usage:
AUTHENTIK_API_URL=https://your-authentik-url \
AUTHENTIK_API_TOKEN=your-token \
DATABASE_URL=postgresql://... \
python scripts/migrate_user_ids.py
What this script does:
1. Collects all unique Authentik UIDs currently used in the database
2. Fetches only those users from Authentik API to populate the users table
3. Updates user_id in: user_api_key, transcript, room, meeting_consent
4. Uses user.authentik_uid to lookup the corresponding user.id
The script is idempotent:
- User inserts use ON CONFLICT DO NOTHING (safe if users already exist)
- Update queries only match authentik_uid->uuid pairs (no-op if already migrated)
- Safe to run multiple times without side effects
Prerequisites:
- AUTHENTIK_API_URL environment variable must be set
- AUTHENTIK_API_TOKEN environment variable must be set
- DATABASE_URL environment variable must be set
- Authentik API must be accessible
"""
import asyncio
import os
import sys
from datetime import datetime, timezone
from typing import Any
import httpx
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncConnection, create_async_engine
TABLES_WITH_USER_ID = ["user_api_key", "transcript", "room", "meeting_consent"]
NULLABLE_USER_ID_TABLES = {"transcript", "meeting_consent"}
AUTHENTIK_PAGE_SIZE = 100
HTTP_TIMEOUT = 30.0
class AuthentikClient:
def __init__(self, api_url: str, api_token: str):
self.api_url = api_url
self.api_token = api_token
def _get_headers(self) -> dict[str, str]:
return {
"Authorization": f"Bearer {self.api_token}",
"Accept": "application/json",
}
async def fetch_all_users(self) -> list[dict[str, Any]]:
all_users = []
page = 1
try:
async with httpx.AsyncClient(timeout=HTTP_TIMEOUT) as client:
while True:
url = f"{self.api_url}/api/v3/core/users/"
params = {
"page": page,
"page_size": AUTHENTIK_PAGE_SIZE,
"include_groups": "false",
}
print(f" Fetching users from Authentik (page {page})...")
response = await client.get(
url, headers=self._get_headers(), params=params
)
response.raise_for_status()
data = response.json()
results = data.get("results", [])
if not results:
break
all_users.extend(results)
print(f" Fetched {len(results)} users from page {page}")
if not data.get("next"):
break
page += 1
print(f" Total: {len(all_users)} users fetched from Authentik")
return all_users
except httpx.HTTPError as e:
raise Exception(f"Failed to fetch users from Authentik: {e}") from e
async def collect_used_authentik_uids(connection: AsyncConnection) -> set[str]:
print("\nStep 1: Collecting Authentik UIDs from database tables...")
used_authentik_uids = set()
for table in TABLES_WITH_USER_ID:
result = await connection.execute(
text(f'SELECT DISTINCT user_id FROM "{table}" WHERE user_id IS NOT NULL')
)
authentik_uids = [row[0] for row in result.fetchall()]
used_authentik_uids.update(authentik_uids)
print(f" Found {len(authentik_uids)} unique Authentik UIDs in {table}")
print(f" Total unique user IDs found: {len(used_authentik_uids)}")
if used_authentik_uids:
sample_id = next(iter(used_authentik_uids))
if len(sample_id) == 36 and sample_id.count("-") == 4:
print(
f"\n✅ User IDs are already in UUID format (e.g., {sample_id[:20]}...)"
)
print("Migration has already been completed!")
return set()
return used_authentik_uids
def filter_users_by_authentik_uid(
authentik_users: list[dict[str, Any]], used_authentik_uids: set[str]
) -> tuple[list[dict[str, Any]], set[str]]:
used_authentik_users = [
user for user in authentik_users if user.get("uid") in used_authentik_uids
]
missing_ids = used_authentik_uids - {u.get("uid") for u in used_authentik_users}
print(
f" Found {len(used_authentik_users)} matching users in Authentik "
f"(out of {len(authentik_users)} total)"
)
if missing_ids:
print(
f" ⚠ Warning: {len(missing_ids)} Authentik UIDs in database not found in Authentik:"
)
for user_id in sorted(missing_ids):
print(f" - {user_id}")
return used_authentik_users, missing_ids
async def sync_users_to_database(
connection: AsyncConnection, authentik_users: list[dict[str, Any]]
) -> tuple[int, int]:
created = 0
skipped = 0
now = datetime.now(timezone.utc)
for authentik_user in authentik_users:
user_id = authentik_user["uuid"]
authentik_uid = authentik_user["uid"]
email = authentik_user.get("email")
if not email:
print(f" ⚠ Skipping user {authentik_uid} (no email)")
skipped += 1
continue
result = await connection.execute(
text("""
INSERT INTO "user" (id, email, authentik_uid, created_at, updated_at)
VALUES (:id, :email, :authentik_uid, :created_at, :updated_at)
ON CONFLICT (id) DO NOTHING
"""),
{
"id": user_id,
"email": email,
"authentik_uid": authentik_uid,
"created_at": now,
"updated_at": now,
},
)
if result.rowcount > 0:
created += 1
return created, skipped
async def migrate_all_user_ids(connection: AsyncConnection) -> int:
print("\nStep 3: Migrating user_id columns from Authentik UID to internal UUID...")
print("(If no rows are updated, migration may have already been completed)")
total_updated = 0
for table in TABLES_WITH_USER_ID:
null_check = (
f"AND {table}.user_id IS NOT NULL"
if table in NULLABLE_USER_ID_TABLES
else ""
)
query = f"""
UPDATE {table}
SET user_id = u.id
FROM "user" u
WHERE {table}.user_id = u.authentik_uid
{null_check}
"""
print(f" Updating {table}.user_id...")
result = await connection.execute(text(query))
rows = result.rowcount
print(f" ✓ Updated {rows} rows")
total_updated += rows
return total_updated
async def run_migration(
database_url: str, authentik_api_url: str, authentik_api_token: str
) -> None:
engine = create_async_engine(database_url)
try:
async with engine.begin() as connection:
used_authentik_uids = await collect_used_authentik_uids(connection)
if not used_authentik_uids:
print("\n⚠️ No user IDs found in database. Nothing to migrate.")
print("Migration complete (no-op)!")
return
print("\nStep 2: Fetching user data from Authentik and syncing users...")
print("(This script is idempotent - safe to run multiple times)")
print(f"Authentik API URL: {authentik_api_url}")
client = AuthentikClient(authentik_api_url, authentik_api_token)
authentik_users = await client.fetch_all_users()
if not authentik_users:
print("\nERROR: No users returned from Authentik API.")
print(
"Please verify your Authentik configuration and ensure users exist."
)
sys.exit(1)
used_authentik_users, _ = filter_users_by_authentik_uid(
authentik_users, used_authentik_uids
)
created, skipped = await sync_users_to_database(
connection, used_authentik_users
)
if created > 0:
print(f"✓ Created {created} users from Authentik")
else:
print("✓ No new users created (users may already exist)")
if skipped > 0:
print(f" ⚠ Skipped {skipped} users without email")
result = await connection.execute(text('SELECT COUNT(*) FROM "user"'))
user_count = result.scalar()
print(f"✓ Users table now has {user_count} users")
total_updated = await migrate_all_user_ids(connection)
if total_updated > 0:
print(f"\n✅ Migration complete! Updated {total_updated} total rows.")
else:
print(
"\n✅ Migration complete! (No rows updated - migration may have already been completed)"
)
except Exception as e:
print(f"\n❌ ERROR: Migration failed: {e}")
sys.exit(1)
finally:
await engine.dispose()
async def main() -> None:
database_url = os.getenv("DATABASE_URL")
authentik_api_url = os.getenv("AUTHENTIK_API_URL")
authentik_api_token = os.getenv("AUTHENTIK_API_TOKEN")
if not database_url or not authentik_api_url or not authentik_api_token:
print(
"ERROR: DATABASE_URL, AUTHENTIK_API_URL, and AUTHENTIK_API_TOKEN must be set"
)
sys.exit(1)
await run_migration(database_url, authentik_api_url, authentik_api_token)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -59,17 +59,37 @@ def mock_recording_response():
]
@pytest.fixture
def mock_meeting():
"""Mock meeting object."""
from reflector.db.meetings import Meeting
return Meeting(
id="meeting-123",
room_name="test-room-20251118120000",
room_url="https://daily.co/test-room",
host_room_url="https://daily.co/test-room",
start_date=datetime.now(timezone.utc),
end_date=datetime.now(timezone.utc) + timedelta(hours=1),
room_id="room-123",
platform="daily",
)
@pytest.mark.asyncio
@patch("reflector.worker.process.settings")
@patch("reflector.worker.process.create_platform_client")
@patch("reflector.worker.process.recordings_controller.get_by_ids")
@patch("reflector.worker.process.meetings_controller.get_by_room_name")
@patch("reflector.worker.process.process_multitrack_recording.delay")
async def test_poll_daily_recordings_processes_missing_recordings(
mock_process_delay,
mock_get_meeting,
mock_get_recordings,
mock_create_client,
mock_settings,
mock_recording_response,
mock_meeting,
):
"""Test that poll_daily_recordings queues processing for recordings not in DB."""
mock_settings.DAILYCO_STORAGE_AWS_BUCKET_NAME = "test-bucket"
@@ -85,6 +105,9 @@ async def test_poll_daily_recordings_processes_missing_recordings(
# Mock DB controller - no existing recordings
mock_get_recordings.return_value = []
# Mock meeting exists for all recordings
mock_get_meeting.return_value = mock_meeting
# Execute - call the unwrapped async function
poll_fn = _get_poll_daily_recordings_fn()
await poll_fn()
@@ -113,6 +136,48 @@ async def test_poll_daily_recordings_processes_missing_recordings(
assert calls[1].kwargs["track_keys"] == ["track1.webm"]
@pytest.mark.asyncio
@patch("reflector.worker.process.settings")
@patch("reflector.worker.process.create_platform_client")
@patch("reflector.worker.process.recordings_controller.get_by_ids")
@patch("reflector.worker.process.meetings_controller.get_by_room_name")
@patch("reflector.worker.process.process_multitrack_recording.delay")
async def test_poll_daily_recordings_skips_recordings_without_meeting(
mock_process_delay,
mock_get_meeting,
mock_get_recordings,
mock_create_client,
mock_settings,
mock_recording_response,
):
"""Test that poll_daily_recordings skips recordings without matching meeting."""
mock_settings.DAILYCO_STORAGE_AWS_BUCKET_NAME = "test-bucket"
# Mock Daily.co API client
mock_daily_client = AsyncMock()
mock_daily_client.list_recordings = AsyncMock(return_value=mock_recording_response)
mock_create_client.return_value.__aenter__ = AsyncMock(
return_value=mock_daily_client
)
mock_create_client.return_value.__aexit__ = AsyncMock()
# Mock DB controller - no existing recordings
mock_get_recordings.return_value = []
# Mock no meeting found
mock_get_meeting.return_value = None
# Execute - call the unwrapped async function
poll_fn = _get_poll_daily_recordings_fn()
await poll_fn()
# Verify Daily.co API was called
assert mock_daily_client.list_recordings.call_count == 1
# Verify NO processing was queued (no matching meetings)
assert mock_process_delay.call_count == 0
@pytest.mark.asyncio
@patch("reflector.worker.process.settings")
@patch("reflector.worker.process.create_platform_client")

View File

@@ -139,10 +139,10 @@ async def test_whereby_recording_uses_file_pipeline(client):
with (
patch(
"reflector.views.transcripts_process.task_pipeline_file_process"
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.views.transcripts_process.task_pipeline_multitrack_process"
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
):
response = await client.post(f"/transcripts/{transcript.id}/process")
@@ -194,10 +194,10 @@ async def test_dailyco_recording_uses_multitrack_pipeline(client):
with (
patch(
"reflector.views.transcripts_process.task_pipeline_file_process"
"reflector.services.transcript_process.task_pipeline_file_process"
) as mock_file_pipeline,
patch(
"reflector.views.transcripts_process.task_pipeline_multitrack_process"
"reflector.services.transcript_process.task_pipeline_multitrack_process"
) as mock_multitrack_pipeline,
):
response = await client.post(f"/transcripts/{transcript.id}/process")

View File

@@ -120,7 +120,15 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
host, port = appserver_ws_user
base_ws = f"http://{host}:{port}/v1/events"
token = _make_dummy_jwt("user-abc")
# Create a test user in the database
from reflector.db.users import user_controller
test_uid = "user-abc"
user = await user_controller.create_or_update(
id="test-user-id-abc", authentik_uid=test_uid, email="user-abc@example.com"
)
token = _make_dummy_jwt(test_uid)
subprotocols = ["bearer", token]
# Connect and then trigger an event via HTTP create
@@ -132,12 +140,13 @@ async def test_user_ws_accepts_valid_token_and_receives_events(appserver_ws_user
from reflector.auth import current_user, current_user_optional
# Override auth dependencies so HTTP request is performed as the same user
# Use the internal user.id (not the Authentik UID)
app.dependency_overrides[current_user] = lambda: {
"sub": "user-abc",
"sub": user.id,
"email": "user-abc@example.com",
}
app.dependency_overrides[current_user_optional] = lambda: {
"sub": "user-abc",
"sub": user.id,
"email": "user-abc@example.com",
}

View File

@@ -22,9 +22,10 @@ AUTHENTIK_CLIENT_SECRET=your-client-secret-here
# API URLs
API_URL=http://127.0.0.1:1250
SERVER_API_URL=http://server:1250
WEBSOCKET_URL=ws://127.0.0.1:1250
AUTH_CALLBACK_URL=http://localhost:3000/auth-callback
# Sentry
# SENTRY_DSN=https://your-dsn@sentry.io/project-id
# SENTRY_IGNORE_API_RESOLUTION_ERROR=1
# SENTRY_IGNORE_API_RESOLUTION_ERROR=1

View File

@@ -1,8 +1,8 @@
"use client";
import { useCallback, useEffect, useRef } from "react";
import { Box } from "@chakra-ui/react";
import { useRouter } from "next/navigation";
import { useCallback, useEffect, useRef, useState } from "react";
import { Box, Spinner, Center, Text } from "@chakra-ui/react";
import { useRouter, useParams } from "next/navigation";
import DailyIframe, { DailyCall } from "@daily-co/daily-js";
import type { components } from "../../reflector-api";
import { useAuth } from "../../lib/AuthProvider";
@@ -10,6 +10,7 @@ import {
ConsentDialogButton,
recordingTypeRequiresConsent,
} from "../../lib/consent";
import { useRoomJoinMeeting } from "../../lib/apiHooks";
type Meeting = components["schemas"]["Meeting"];
@@ -19,13 +20,41 @@ interface DailyRoomProps {
export default function DailyRoom({ meeting }: DailyRoomProps) {
const router = useRouter();
const params = useParams();
const auth = useAuth();
const status = auth.status;
const containerRef = useRef<HTMLDivElement>(null);
const joinMutation = useRoomJoinMeeting();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
const roomUrl = meeting?.host_room_url || meeting?.room_url;
const roomName = params?.roomName as string;
const isLoading = status === "loading";
// Always call /join to get a fresh token with user_id
useEffect(() => {
if (status === "loading" || !meeting?.id || !roomName) return;
const join = async () => {
try {
const result = await joinMutation.mutateAsync({
params: {
path: {
room_name: roomName,
meeting_id: meeting.id,
},
},
});
setJoinedMeeting(result);
} catch (error) {
console.error("Failed to join meeting:", error);
}
};
join();
}, [meeting?.id, roomName, status]);
const roomUrl = joinedMeeting?.host_room_url || joinedMeeting?.room_url;
const isLoading =
status === "loading" || joinMutation.isPending || !joinedMeeting;
const handleLeave = useCallback(() => {
router.push("/browse");
@@ -87,6 +116,22 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
};
}, [roomUrl, isLoading, handleLeave]);
if (isLoading) {
return (
<Center width="100vw" height="100vh">
<Spinner size="xl" />
</Center>
);
}
if (joinMutation.isError) {
return (
<Center width="100vw" height="100vh">
<Text color="red.500">Failed to join meeting. Please try again.</Text>
</Center>
);
}
if (!roomUrl) {
return null;
}

View File

@@ -22,6 +22,27 @@ import { sequenceThrows } from "./errorUtils";
import { featureEnabled } from "./features";
import { getNextEnvVar } from "./nextBuild";
async function getUserId(accessToken: string): Promise<string | null> {
try {
const apiUrl = getNextEnvVar("SERVER_API_URL");
const response = await fetch(`${apiUrl}/v1/me`, {
headers: {
Authorization: `Bearer ${accessToken}`,
},
});
if (!response.ok) {
return null;
}
const userInfo = await response.json();
return userInfo.sub || null;
} catch (error) {
console.error("Error fetching user ID from backend:", error);
return null;
}
}
const TOKEN_CACHE_TTL = REFRESH_ACCESS_TOKEN_BEFORE;
const getAuthentikClientId = () => getNextEnvVar("AUTHENTIK_CLIENT_ID");
const getAuthentikClientSecret = () => getNextEnvVar("AUTHENTIK_CLIENT_SECRET");
@@ -122,13 +143,16 @@ export const authOptions = (): AuthOptions =>
},
async session({ session, token }) {
const extendedToken = token as JWTWithAccessToken;
const userId = await getUserId(extendedToken.accessToken);
return {
...session,
accessToken: extendedToken.accessToken,
accessTokenExpires: extendedToken.accessTokenExpires,
error: extendedToken.error,
user: {
id: assertExists(extendedToken.sub),
id: assertExistsAndNonEmptyString(userId, "User ID required"),
name: extendedToken.name,
email: extendedToken.email,
},