Compare commits

...

19 Commits

Author SHA1 Message Date
32a049c134 chore(main): release 0.23.0 (#770) 2025-12-10 13:42:28 +01:00
91650ec65f fix: deploy frontend to coolify (#779)
* Ignore act secrets

* Deploy frontend container to ECR

* Use published image

* Remove ecr workflows

* Trigger coolify deployment

* Deploy on release please pr merge

* Upgrade nextjs

* Update secrets example
2025-12-10 13:35:53 +01:00
Igor Monadical
61f0e29d4c feat: llm retries (#739)
* llm retries no-mistakes

* self-review (no-mistakes)

* self-review (no-mistakes)

* bigger retry intervals by default

* tests and dry

* restore to main state

* parse retries

* json retries (no-mistakes)

* json retries (no-mistakes)

* json retries (no-mistakes)

* json retries (no-mistakes) self-review

* additional network retry test

* more lindt

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-05 12:08:21 -05:00
Igor Monadical
ec17ed7b58 fix: celery inspect bug sidestep in restart script (#766)
* celery bug sidestep

* Update server/reflector/services/transcript_process.py

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
2025-12-04 09:22:51 -05:00
Igor Monadical
00549f153a feat: dockerhub ci (#772)
* dockerhub ci

* ci test

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-03 13:26:08 -05:00
3ad78be762 fix: hide rooms settings instead of disabling (#763)
* Hide rooms settings instead of disabling

* Reset recording trigger
2025-12-03 16:49:17 +01:00
d3a5cd12d2 fix: return participant emails from transcript endpoint (#769)
* Return participant emails from transcript endpoint

* Fix broken test
2025-12-03 16:47:56 +01:00
af921ce927 chore(main): release 0.22.4 (#765) 2025-12-02 17:11:48 -05:00
Igor Monadical
bd5df1ce2e fix: Multitrack mixdown optimisation 2 (#764)
* Revert "fix: Skip mixdown for multitrack (#760)"

This reverts commit b51b7aa917.

* multitrack mixdown optimisation

* return the "good" ui part of "skip mixdown"

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-02 17:10:06 -05:00
c8024484b3 chore(main): release 0.22.3 (#761) 2025-12-02 09:08:22 +01:00
28f87c09dc fix: align daily room settings (#759)
* Switch platform ui

* Update room settings based on platform

* Add local and none recording options to daily

* Don't create tokens for unauthentikated users

* Enable knocking for private rooms

* Create new meeting on room settings change

* Always use 2-200 option for daily

* Show recording start trigger for daily

* Fix broken test
2025-12-02 09:06:36 +01:00
dabf7251db chore(main): release 0.22.2 (#756) 2025-12-01 23:39:32 -05:00
Igor Monadical
b51b7aa917 fix: Skip mixdown for multitrack (#760)
* multitrack mixdown optimisation

* skip mixdown for multitrack

* skip mixdown for multitrack

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-01 23:35:12 -05:00
Igor Monadical
a8983b4e7e daily auth hotfix (#757)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-28 14:52:59 -05:00
Igor Monadical
fe47c46489 fix: daily auto refresh fix (#755)
* daily auto refresh fix

* Update www/app/lib/AuthProvider.tsx

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

* Update www/app/[roomName]/components/DailyRoom.tsx

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

* fix bot lint

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
2025-11-27 18:31:03 -05:00
a2bb6a27d6 chore(main): release 0.22.1 (#750) 2025-11-27 16:55:08 +01:00
7f0b728991 fix: participants update from daily (#749)
* Fix participants update from daily

* Use track keys from params
2025-11-27 16:53:26 +01:00
692895c859 chore(main): release 0.22.0 (#748) 2025-11-26 16:53:27 -05:00
Igor Monadical
d63040e2fd feat: Multitrack segmentation (#747)
* segmentation multitrack (no-mistakes)

* segmentation multitrack (no-mistakes)

* self review

* self review

* recording poll daily doc

* filter cam_audio tracks to remove screensharing from daily processing

* pr review

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-26 16:21:32 -05:00
36 changed files with 1734 additions and 489 deletions

View File

@@ -1,90 +0,0 @@
name: Build container/push to container registry
on: [workflow_dispatch]
env:
# 950402358378.dkr.ecr.us-east-1.amazonaws.com/reflector
AWS_REGION: us-east-1
ECR_REPOSITORY: reflector
jobs:
build:
strategy:
matrix:
include:
- platform: linux/amd64
runner: linux-amd64
arch: amd64
- platform: linux/arm64
runner: linux-arm64
arch: arm64
runs-on: ${{ matrix.runner }}
permissions:
contents: read
outputs:
registry: ${{ steps.login-ecr.outputs.registry }}
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build and push ${{ matrix.arch }}
uses: docker/build-push-action@v5
with:
context: server
platforms: ${{ matrix.platform }}
push: true
tags: ${{ steps.login-ecr.outputs.registry }}/${{ env.ECR_REPOSITORY }}:latest-${{ matrix.arch }}
cache-from: type=gha,scope=${{ matrix.arch }}
cache-to: type=gha,mode=max,scope=${{ matrix.arch }}
github-token: ${{ secrets.GHA_CACHE_TOKEN }}
provenance: false
create-manifest:
runs-on: ubuntu-latest
needs: [build]
permissions:
deployments: write
contents: read
steps:
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Login to Amazon ECR
uses: aws-actions/amazon-ecr-login@v2
- name: Create and push multi-arch manifest
run: |
# Get the registry URL (since we can't easily access job outputs in matrix)
ECR_REGISTRY=$(aws ecr describe-registry --query 'registryId' --output text).dkr.ecr.${{ env.AWS_REGION }}.amazonaws.com
docker manifest create \
$ECR_REGISTRY/${{ env.ECR_REPOSITORY }}:latest \
$ECR_REGISTRY/${{ env.ECR_REPOSITORY }}:latest-amd64 \
$ECR_REGISTRY/${{ env.ECR_REPOSITORY }}:latest-arm64
docker manifest push $ECR_REGISTRY/${{ env.ECR_REPOSITORY }}:latest
echo "✅ Multi-arch manifest pushed: $ECR_REGISTRY/${{ env.ECR_REPOSITORY }}:latest"

View File

@@ -1,35 +1,39 @@
name: Build and Push Frontend Docker Image
name: Build and Push Backend Docker Image (Docker Hub)
on:
push:
branches:
- main
pull_request:
types:
- closed
paths:
- 'www/**'
- '.github/workflows/docker-frontend.yml'
- "server/**"
- ".github/workflows/dockerhub-backend.yml"
workflow_dispatch:
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}-frontend
REGISTRY: docker.io
IMAGE_NAME: monadicalsas/reflector-backend
jobs:
build-and-push:
runs-on: ubuntu-latest
if: |
github.event_name == 'workflow_dispatch' ||
(github.event.pull_request.merged == true &&
startsWith(github.event.pull_request.head.ref, 'release-please--branches--'))
permissions:
contents: read
packages: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Log in to GitHub Container Registry
- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
username: monadicalsas
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Extract metadata
id: meta
@@ -47,11 +51,11 @@ jobs:
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: ./www
file: ./www/Dockerfile
context: ./server
file: ./server/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: linux/amd64,linux/arm64
platforms: linux/amd64,linux/arm64

View File

@@ -0,0 +1,70 @@
name: Build and Push Frontend Docker Image
on:
pull_request:
types:
- closed
paths:
- "www/**"
- ".github/workflows/dockerhub-frontend.yml"
workflow_dispatch:
env:
REGISTRY: docker.io
IMAGE_NAME: monadicalsas/reflector-frontend
jobs:
build-and-push:
runs-on: ubuntu-latest
if: |
github.event_name == 'workflow_dispatch' ||
(github.event.pull_request.merged == true &&
startsWith(github.event.pull_request.head.ref, 'release-please--branches--'))
permissions:
contents: read
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: monadicalsas
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=sha,prefix={{branch}}-
type=raw,value=latest,enable={{is_default_branch}}
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: ./www
file: ./www/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: linux/amd64,linux/arm64
- name: Trigger Coolify deployment
if: success()
run: |
curl -X POST "${{ secrets.COOLIFY_WEBHOOK_URL }}" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${{ secrets.COOLIFY_WEBHOOK_TOKEN }}" \
-f || (echo "Failed to trigger Coolify deployment" && exit 1)

1
.gitignore vendored
View File

@@ -18,3 +18,4 @@ CLAUDE.local.md
www/.env.development
www/.env.production
.playwright-mcp
.secrets

22
.secrets.example Normal file
View File

@@ -0,0 +1,22 @@
# Example secrets file for GitHub Actions workflows
# Copy this to .secrets and fill in your values
# These secrets should be configured in GitHub repository settings:
# Settings > Secrets and variables > Actions
# DockerHub Configuration (required for frontend and backend deployment)
# Create a Docker Hub access token at https://hub.docker.com/settings/security
# Username: monadicalsas
DOCKERHUB_TOKEN=your-dockerhub-access-token
# GitHub Token (required for frontend and backend deployment)
# Used by docker/metadata-action for extracting image metadata
# Can use the default GITHUB_TOKEN or create a personal access token
GITHUB_TOKEN=your-github-token-or-use-default-GITHUB_TOKEN
# Coolify Deployment Webhook (required for frontend deployment)
# Used to trigger automatic deployment after image push
COOLIFY_WEBHOOK_URL=https://app.monadical.io/api/v1/deploy?uuid=your-uuid&force=false
COOLIFY_WEBHOOK_TOKEN=your-coolify-webhook-token
# Optional: GitHub Actions Cache Token (for local testing with act)
GHA_CACHE_TOKEN=your-github-token-or-empty

View File

@@ -1,5 +1,57 @@
# Changelog
## [0.23.0](https://github.com/Monadical-SAS/reflector/compare/v0.22.4...v0.23.0) (2025-12-10)
### Features
* dockerhub ci ([#772](https://github.com/Monadical-SAS/reflector/issues/772)) ([00549f1](https://github.com/Monadical-SAS/reflector/commit/00549f153ade922cf4cb6c5358a7d11a39c426d2))
* llm retries ([#739](https://github.com/Monadical-SAS/reflector/issues/739)) ([61f0e29](https://github.com/Monadical-SAS/reflector/commit/61f0e29d4c51eab54ee67af92141fbb171e8ccaa))
### Bug Fixes
* celery inspect bug sidestep in restart script ([#766](https://github.com/Monadical-SAS/reflector/issues/766)) ([ec17ed7](https://github.com/Monadical-SAS/reflector/commit/ec17ed7b587cf6ee143646baaee67a7c017044d4))
* deploy frontend to coolify ([#779](https://github.com/Monadical-SAS/reflector/issues/779)) ([91650ec](https://github.com/Monadical-SAS/reflector/commit/91650ec65f65713faa7ee0dcfb75af427b7c4ba0))
* hide rooms settings instead of disabling ([#763](https://github.com/Monadical-SAS/reflector/issues/763)) ([3ad78be](https://github.com/Monadical-SAS/reflector/commit/3ad78be7628c0d029296b301a0e87236c76b7598))
* return participant emails from transcript endpoint ([#769](https://github.com/Monadical-SAS/reflector/issues/769)) ([d3a5cd1](https://github.com/Monadical-SAS/reflector/commit/d3a5cd12d2d0d9c32af2d5bd9322e030ef69b85d))
## [0.22.4](https://github.com/Monadical-SAS/reflector/compare/v0.22.3...v0.22.4) (2025-12-02)
### Bug Fixes
* Multitrack mixdown optimisation 2 ([#764](https://github.com/Monadical-SAS/reflector/issues/764)) ([bd5df1c](https://github.com/Monadical-SAS/reflector/commit/bd5df1ce2ebf35d7f3413b295e56937a9a28ef7b))
## [0.22.3](https://github.com/Monadical-SAS/reflector/compare/v0.22.2...v0.22.3) (2025-12-02)
### Bug Fixes
* align daily room settings ([#759](https://github.com/Monadical-SAS/reflector/issues/759)) ([28f87c0](https://github.com/Monadical-SAS/reflector/commit/28f87c09dc459846873d0dde65b03e3d7b2b9399))
## [0.22.2](https://github.com/Monadical-SAS/reflector/compare/v0.22.1...v0.22.2) (2025-12-02)
### Bug Fixes
* daily auto refresh fix ([#755](https://github.com/Monadical-SAS/reflector/issues/755)) ([fe47c46](https://github.com/Monadical-SAS/reflector/commit/fe47c46489c5aa0cc538109f7559cc9accb35c01))
* Skip mixdown for multitrack ([#760](https://github.com/Monadical-SAS/reflector/issues/760)) ([b51b7aa](https://github.com/Monadical-SAS/reflector/commit/b51b7aa9176c1a53ba57ad99f5e976c804a1e80c))
## [0.22.1](https://github.com/Monadical-SAS/reflector/compare/v0.22.0...v0.22.1) (2025-11-27)
### Bug Fixes
* participants update from daily ([#749](https://github.com/Monadical-SAS/reflector/issues/749)) ([7f0b728](https://github.com/Monadical-SAS/reflector/commit/7f0b728991c1b9f9aae702c96297eae63b561ef5))
## [0.22.0](https://github.com/Monadical-SAS/reflector/compare/v0.21.0...v0.22.0) (2025-11-26)
### Features
* Multitrack segmentation ([#747](https://github.com/Monadical-SAS/reflector/issues/747)) ([d63040e](https://github.com/Monadical-SAS/reflector/commit/d63040e2fdc07e7b272e85a39eb2411cd6a14798))
## [0.21.0](https://github.com/Monadical-SAS/reflector/compare/v0.20.0...v0.21.0) (2025-11-26)

View File

@@ -3,10 +3,7 @@
services:
web:
build:
context: ./www
dockerfile: Dockerfile
image: reflector-frontend:latest
image: monadicalsas/reflector-frontend:latest
environment:
- KV_URL=${KV_URL:-redis://redis:6379}
- SITE_URL=${SITE_URL}
@@ -36,4 +33,4 @@ services:
- redis_data:/data
volumes:
redis_data:
redis_data:

View File

@@ -126,6 +126,7 @@ markers = [
select = [
"I", # isort - import sorting
"F401", # unused imports
"E402", # module level import not at top of file
"PLC0415", # import-outside-top-level - detect inline imports
]

View File

@@ -1,13 +1,19 @@
import asyncio
import functools
from uuid import uuid4
from celery import current_task
from reflector.db import get_database
from reflector.llm import llm_session_id
def asynctask(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
async def run_with_db():
task_id = current_task.request.id if current_task else None
llm_session_id.set(task_id or f"random-{uuid4().hex}")
database = get_database()
await database.connect()
try:

View File

@@ -40,6 +40,10 @@ class RoomProperties(BaseModel):
)
enable_chat: bool = Field(default=True, description="Enable in-meeting chat")
enable_screenshare: bool = Field(default=True, description="Enable screen sharing")
enable_knocking: bool = Field(
default=False,
description="Enable knocking for private rooms (allows participants to request access)",
)
start_video_off: bool = Field(
default=False, description="Start with video off for all participants"
)

View File

@@ -68,7 +68,7 @@ class MeetingParticipant(BaseModel):
Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-participants
"""
user_id: NonEmptyString = Field(description="User identifier")
user_id: NonEmptyString | None = Field(None, description="User identifier")
participant_id: NonEmptyString = Field(description="Participant session identifier")
user_name: NonEmptyString | None = Field(None, description="User display name")
join_time: int = Field(description="Join timestamp (Unix epoch seconds)")

View File

@@ -35,8 +35,15 @@ class Recording(BaseModel):
status: Literal["pending", "processing", "completed", "failed"] = "pending"
meeting_id: str | None = None
# for multitrack reprocessing
# track_keys can be empty list [] if recording finished but no audio was captured (silence/muted)
# None means not a multitrack recording, [] means multitrack with no tracks
track_keys: list[str] | None = None
@property
def is_multitrack(self) -> bool:
"""True if recording has separate audio tracks (1+ tracks counts as multitrack)."""
return self.track_keys is not None and len(self.track_keys) > 0
class RecordingController:
async def create(self, recording: Recording):

View File

@@ -88,5 +88,11 @@ class UserController:
results = await get_database().fetch_all(query)
return [User(**r) for r in results]
@staticmethod
async def get_by_ids(user_ids: list[NonEmptyString]) -> dict[str, User]:
query = users.select().where(users.c.id.in_(user_ids))
results = await get_database().fetch_all(query)
return {user.id: User(**user) for user in results}
user_controller = UserController()

View File

@@ -1,14 +1,29 @@
import logging
from typing import Type, TypeVar
from contextvars import ContextVar
from typing import Generic, Type, TypeVar
from uuid import uuid4
from llama_index.core import Settings
from llama_index.core.output_parsers import PydanticOutputParser
from llama_index.core.program import LLMTextCompletionProgram
from llama_index.core.response_synthesizers import TreeSummarize
from llama_index.core.workflow import (
Context,
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.llms.openai_like import OpenAILike
from pydantic import BaseModel, ValidationError
T = TypeVar("T", bound=BaseModel)
OutputT = TypeVar("OutputT", bound=BaseModel)
# Session ID for LiteLLM request grouping - set per processing run
llm_session_id: ContextVar[str | None] = ContextVar("llm_session_id", default=None)
logger = logging.getLogger(__name__)
STRUCTURED_RESPONSE_PROMPT_TEMPLATE = """
Based on the following analysis, provide the information in the requested JSON format:
@@ -20,6 +35,158 @@ Analysis:
"""
class LLMParseError(Exception):
"""Raised when LLM output cannot be parsed after retries."""
def __init__(self, output_cls: Type[BaseModel], error_msg: str, attempts: int):
self.output_cls = output_cls
self.error_msg = error_msg
self.attempts = attempts
super().__init__(
f"Failed to parse {output_cls.__name__} after {attempts} attempts: {error_msg}"
)
class ExtractionDone(Event):
"""Event emitted when LLM JSON formatting completes."""
output: str
class ValidationErrorEvent(Event):
"""Event emitted when validation fails."""
error: str
wrong_output: str
class StructuredOutputWorkflow(Workflow, Generic[OutputT]):
"""Workflow for structured output extraction with validation retry.
This workflow handles parse/validation retries only. Network error retries
are handled internally by Settings.llm (OpenAILike max_retries=3).
The caller should NOT wrap this workflow in additional retry logic.
"""
def __init__(
self,
output_cls: Type[OutputT],
max_retries: int = 3,
**kwargs,
):
super().__init__(**kwargs)
self.output_cls: Type[OutputT] = output_cls
self.max_retries = max_retries
self.output_parser = PydanticOutputParser(output_cls)
@step
async def extract(
self, ctx: Context, ev: StartEvent | ValidationErrorEvent
) -> StopEvent | ExtractionDone:
"""Extract structured data from text using two-step LLM process.
Step 1 (first call only): TreeSummarize generates text analysis
Step 2 (every call): Settings.llm.acomplete formats analysis as JSON
"""
current_retries = await ctx.store.get("retries", default=0)
await ctx.store.set("retries", current_retries + 1)
if current_retries >= self.max_retries:
last_error = await ctx.store.get("last_error", default=None)
logger.error(
f"Max retries ({self.max_retries}) reached for {self.output_cls.__name__}"
)
return StopEvent(result={"error": last_error, "attempts": current_retries})
if isinstance(ev, StartEvent):
# First call: run TreeSummarize to get analysis, store in context
prompt = ev.get("prompt")
texts = ev.get("texts")
tone_name = ev.get("tone_name")
if not prompt or not isinstance(texts, list):
raise ValueError(
"StartEvent must contain 'prompt' (str) and 'texts' (list)"
)
summarizer = TreeSummarize(verbose=False)
analysis = await summarizer.aget_response(
prompt, texts, tone_name=tone_name
)
await ctx.store.set("analysis", str(analysis))
reflection = ""
else:
# Retry: reuse analysis from context
analysis = await ctx.store.get("analysis")
if not analysis:
raise RuntimeError("Internal error: analysis not found in context")
wrong_output = ev.wrong_output
if len(wrong_output) > 2000:
wrong_output = wrong_output[:2000] + "... [truncated]"
reflection = (
f"\n\nYour previous response could not be parsed:\n{wrong_output}\n\n"
f"Error:\n{ev.error}\n\n"
"Please try again. Return ONLY valid JSON matching the schema above, "
"with no markdown formatting or extra text."
)
# Step 2: Format analysis as JSON using LLM completion
format_instructions = self.output_parser.format(
"Please structure the above information in the following JSON format:"
)
json_prompt = STRUCTURED_RESPONSE_PROMPT_TEMPLATE.format(
analysis=analysis,
format_instructions=format_instructions + reflection,
)
# Network retries handled by OpenAILike (max_retries=3)
response = await Settings.llm.acomplete(json_prompt)
return ExtractionDone(output=response.text)
@step
async def validate(
self, ctx: Context, ev: ExtractionDone
) -> StopEvent | ValidationErrorEvent:
"""Validate extracted output against Pydantic schema."""
raw_output = ev.output
retries = await ctx.store.get("retries", default=0)
try:
parsed = self.output_parser.parse(raw_output)
if retries > 1:
logger.info(
f"LLM parse succeeded on attempt {retries}/{self.max_retries} "
f"for {self.output_cls.__name__}"
)
return StopEvent(result={"success": parsed})
except (ValidationError, ValueError) as e:
error_msg = self._format_error(e, raw_output)
await ctx.store.set("last_error", error_msg)
logger.error(
f"LLM parse error (attempt {retries}/{self.max_retries}): "
f"{type(e).__name__}: {e}\nRaw response: {raw_output[:500]}"
)
return ValidationErrorEvent(
error=error_msg,
wrong_output=raw_output,
)
def _format_error(self, error: Exception, raw_output: str) -> str:
"""Format error for LLM feedback."""
if isinstance(error, ValidationError):
error_messages = []
for err in error.errors():
field = ".".join(str(loc) for loc in err["loc"])
error_messages.append(f"- {err['msg']} in field '{field}'")
return "Schema validation errors:\n" + "\n".join(error_messages)
else:
return f"Parse error: {str(error)}"
class LLM:
def __init__(self, settings, temperature: float = 0.4, max_tokens: int = 2048):
self.settings_obj = settings
@@ -30,11 +197,12 @@ class LLM:
self.temperature = temperature
self.max_tokens = max_tokens
# Configure llamaindex Settings
self._configure_llamaindex()
def _configure_llamaindex(self):
"""Configure llamaindex Settings with OpenAILike LLM"""
session_id = llm_session_id.get() or f"fallback-{uuid4().hex}"
Settings.llm = OpenAILike(
model=self.model_name,
api_base=self.url,
@@ -44,6 +212,7 @@ class LLM:
is_function_calling_model=False,
temperature=self.temperature,
max_tokens=self.max_tokens,
additional_kwargs={"extra_body": {"litellm_session_id": session_id}},
)
async def get_response(
@@ -61,43 +230,25 @@ class LLM:
output_cls: Type[T],
tone_name: str | None = None,
) -> T:
"""Get structured output from LLM for non-function-calling models"""
logger = logging.getLogger(__name__)
summarizer = TreeSummarize(verbose=True)
response = await summarizer.aget_response(prompt, texts, tone_name=tone_name)
output_parser = PydanticOutputParser(output_cls)
program = LLMTextCompletionProgram.from_defaults(
output_parser=output_parser,
prompt_template_str=STRUCTURED_RESPONSE_PROMPT_TEMPLATE,
verbose=False,
"""Get structured output from LLM with validation retry via Workflow."""
workflow = StructuredOutputWorkflow(
output_cls=output_cls,
max_retries=self.settings_obj.LLM_PARSE_MAX_RETRIES + 1,
timeout=120,
)
format_instructions = output_parser.format(
"Please structure the above information in the following JSON format:"
result = await workflow.run(
prompt=prompt,
texts=texts,
tone_name=tone_name,
)
try:
output = await program.acall(
analysis=str(response), format_instructions=format_instructions
if "error" in result:
error_msg = result["error"] or "Max retries exceeded"
raise LLMParseError(
output_cls=output_cls,
error_msg=error_msg,
attempts=result.get("attempts", 0),
)
except ValidationError as e:
# Extract the raw JSON from the error details
errors = e.errors()
if errors and "input" in errors[0]:
raw_json = errors[0]["input"]
logger.error(
f"JSON validation failed for {output_cls.__name__}. "
f"Full raw JSON output:\n{raw_json}\n"
f"Validation errors: {errors}"
)
else:
logger.error(
f"JSON validation failed for {output_cls.__name__}. "
f"Validation errors: {errors}"
)
raise
return output
return result["success"]

View File

@@ -340,7 +340,6 @@ async def task_send_webhook_if_needed(*, transcript_id: str):
@asynctask
async def task_pipeline_file_process(*, transcript_id: str):
"""Celery task for file pipeline processing"""
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
raise Exception(f"Transcript {transcript_id} not found")

View File

@@ -9,7 +9,10 @@ from av.audio.resampler import AudioResampler
from celery import chain, shared_task
from reflector.asynctask import asynctask
from reflector.dailyco_api import MeetingParticipantsResponse
from reflector.db.transcripts import (
Transcript,
TranscriptParticipant,
TranscriptStatus,
TranscriptWaveform,
transcripts_controller,
@@ -29,7 +32,12 @@ from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType
from reflector.storage import Storage, get_transcripts_storage
from reflector.utils.daily import (
filter_cam_audio_tracks,
parse_daily_recording_filename,
)
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
# Audio encoding constants
OPUS_STANDARD_SAMPLE_RATE = 48000
@@ -414,7 +422,15 @@ class PipelineMainMultitrack(PipelineMainBase):
# Open all containers with cleanup guaranteed
for i, url in enumerate(valid_track_urls):
try:
c = av.open(url)
c = av.open(
url,
options={
# it's trying to stream from s3 by default
"reconnect": "1",
"reconnect_streamed": "1",
"reconnect_delay_max": "5",
},
)
containers.append(c)
except Exception as e:
self.logger.warning(
@@ -443,6 +459,8 @@ class PipelineMainMultitrack(PipelineMainBase):
frame = next(dec)
except StopIteration:
active[i] = False
# causes stream to move on / unclogs memory
inputs[i].push(None)
continue
if frame.sample_rate != target_sample_rate:
@@ -462,8 +480,6 @@ class PipelineMainMultitrack(PipelineMainBase):
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
for in_ctx in inputs:
in_ctx.push(None)
while True:
try:
mixed = sink.pull()
@@ -494,6 +510,90 @@ class PipelineMainMultitrack(PipelineMainBase):
transcript=transcript, event="WAVEFORM", data=waveform
)
async def update_participants_from_daily(
self, transcript: Transcript, track_keys: list[str]
) -> None:
"""Update transcript participants with user_id and names from Daily.co API."""
if not transcript.recording_id:
return
try:
async with create_platform_client("daily") as daily_client:
id_to_name = {}
id_to_user_id = {}
try:
rec_details = await daily_client.get_recording(
transcript.recording_id
)
mtg_session_id = rec_details.mtgSessionId
if mtg_session_id:
try:
payload: MeetingParticipantsResponse = (
await daily_client.get_meeting_participants(
mtg_session_id
)
)
for p in payload.data:
pid = p.participant_id
name = p.user_name
user_id = p.user_id
if name:
id_to_name[pid] = name
if user_id:
id_to_user_id[pid] = user_id
except Exception as e:
self.logger.warning(
"Failed to fetch Daily meeting participants",
error=str(e),
mtg_session_id=mtg_session_id,
exc_info=True,
)
else:
self.logger.warning(
"No mtgSessionId found for recording; participant names may be generic",
recording_id=transcript.recording_id,
)
except Exception as e:
self.logger.warning(
"Failed to fetch Daily recording details",
error=str(e),
recording_id=transcript.recording_id,
exc_info=True,
)
return
cam_audio_keys = filter_cam_audio_tracks(track_keys)
for idx, key in enumerate(cam_audio_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
self.logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
exc_info=True,
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
)
await transcripts_controller.upsert_participant(
transcript, participant
)
except Exception as e:
self.logger.warning(
"Failed to map participant names", error=str(e), exc_info=True
)
async def process(self, bucket_name: str, track_keys: list[str]):
transcript = await self.get_transcript()
async with self.transaction():
@@ -502,9 +602,12 @@ class PipelineMainMultitrack(PipelineMainBase):
{
"events": [],
"topics": [],
"participants": [],
},
)
await self.update_participants_from_daily(transcript, track_keys)
source_storage = get_transcripts_storage()
transcript_storage = source_storage

View File

@@ -1,6 +1,7 @@
import io
import re
import tempfile
from collections import defaultdict
from pathlib import Path
from typing import Annotated, TypedDict
@@ -16,6 +17,17 @@ class DiarizationSegment(TypedDict):
PUNC_RE = re.compile(r"[.;:?!…]")
SENTENCE_END_RE = re.compile(r"[.?!…]$")
# Max segment length for words_to_segments() - breaks on any punctuation (. ; : ? ! …)
# when segment exceeds this limit. Used for non-multitrack recordings.
MAX_SEGMENT_CHARS = 120
# Max segment length for words_to_segments_by_sentence() - only breaks on sentence-ending
# punctuation (. ? ! …) when segment exceeds this limit. Higher threshold allows complete
# sentences in multitrack recordings where speakers overlap.
# similar number to server/reflector/processors/transcript_liner.py
MAX_SENTENCE_SEGMENT_CHARS = 1000
class AudioFile(BaseModel):
@@ -76,7 +88,6 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]:
# but separate if the speaker changes, or if the punctuation is a . , ; : ? !
segments = []
current_segment = None
MAX_SEGMENT_LENGTH = 120
for word in words:
if current_segment is None:
@@ -106,7 +117,7 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]:
current_segment.end = word.end
have_punc = PUNC_RE.search(word.text)
if have_punc and (len(current_segment.text) > MAX_SEGMENT_LENGTH):
if have_punc and (len(current_segment.text) > MAX_SEGMENT_CHARS):
segments.append(current_segment)
current_segment = None
@@ -116,6 +127,70 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]:
return segments
def words_to_segments_by_sentence(words: list[Word]) -> list[TranscriptSegment]:
"""Group words by speaker, then split into sentences.
For multitrack recordings where words from different speakers are interleaved
by timestamp, this function first groups all words by speaker, then creates
segments based on sentence boundaries within each speaker's words.
This produces cleaner output than words_to_segments() which breaks on every
speaker change, resulting in many tiny segments when speakers overlap.
"""
if not words:
return []
# Group words by speaker, preserving order within each speaker
by_speaker: dict[int, list[Word]] = defaultdict(list)
for w in words:
by_speaker[w.speaker].append(w)
segments: list[TranscriptSegment] = []
for speaker, speaker_words in by_speaker.items():
current_text = ""
current_start: float | None = None
current_end: float = 0.0
for word in speaker_words:
if current_start is None:
current_start = word.start
current_text += word.text
current_end = word.end
# Check for sentence end or max length
is_sentence_end = SENTENCE_END_RE.search(word.text.strip())
is_too_long = len(current_text) >= MAX_SENTENCE_SEGMENT_CHARS
if is_sentence_end or is_too_long:
segments.append(
TranscriptSegment(
text=current_text,
start=current_start,
end=current_end,
speaker=speaker,
)
)
current_text = ""
current_start = None
# Flush remaining words for this speaker
if current_text and current_start is not None:
segments.append(
TranscriptSegment(
text=current_text,
start=current_start,
end=current_end,
speaker=speaker,
)
)
# Sort segments by start time
segments.sort(key=lambda s: s.start)
return segments
class Transcript(BaseModel):
translation: str | None = None
words: list[Word] = []
@@ -154,7 +229,9 @@ class Transcript(BaseModel):
word.start += offset
word.end += offset
def as_segments(self) -> list[TranscriptSegment]:
def as_segments(self, is_multitrack: bool = False) -> list[TranscriptSegment]:
if is_multitrack:
return words_to_segments_by_sentence(self.words)
return words_to_segments(self.words)

View File

@@ -160,7 +160,10 @@ def dispatch_transcript_processing(config: ProcessingConfig) -> AsyncResult:
def task_is_scheduled_or_active(task_name: str, **kwargs):
inspect = celery.current_app.control.inspect()
for worker, tasks in (inspect.scheduled() | inspect.active()).items():
scheduled = inspect.scheduled() or {}
active = inspect.active() or {}
all = scheduled | active
for worker, tasks in all.items():
for task in tasks:
if task["name"] == task_name and task["kwargs"] == kwargs:
return True

View File

@@ -74,6 +74,10 @@ class Settings(BaseSettings):
LLM_API_KEY: str | None = None
LLM_CONTEXT_WINDOW: int = 16000
LLM_PARSE_MAX_RETRIES: int = (
3 # Max retries for JSON/validation errors (total attempts = retries + 1)
)
# Diarization
DIARIZATION_ENABLED: bool = True
DIARIZATION_BACKEND: str = "modal"

View File

@@ -64,6 +64,11 @@ def recording_lock_key(recording_id: NonEmptyString) -> NonEmptyString:
return f"recording:{recording_id}"
def filter_cam_audio_tracks(track_keys: list[str]) -> list[str]:
"""Filter track keys to cam-audio tracks only (skip screen-audio, etc.)."""
return [k for k in track_keys if "cam-audio" in k]
def extract_base_room_name(daily_room_name: DailyRoomName) -> NonEmptyString:
"""
Extract base room name from Daily.co timestamped room name.

View File

@@ -6,9 +6,6 @@ from reflector.db.transcripts import TranscriptParticipant, TranscriptTopic
from reflector.processors.types import (
Transcript as ProcessorTranscript,
)
from reflector.processors.types import (
words_to_segments,
)
from reflector.schemas.transcript_formats import TranscriptSegment
from reflector.utils.webvtt import seconds_to_timestamp
@@ -32,7 +29,9 @@ def format_timestamp_mmss(seconds: float | int) -> str:
def transcript_to_text(
topics: list[TranscriptTopic], participants: list[TranscriptParticipant] | None
topics: list[TranscriptTopic],
participants: list[TranscriptParticipant] | None,
is_multitrack: bool = False,
) -> str:
"""Convert transcript topics to plain text with speaker names."""
lines = []
@@ -41,7 +40,7 @@ def transcript_to_text(
continue
transcript = ProcessorTranscript(words=topic.words)
segments = transcript.as_segments()
segments = transcript.as_segments(is_multitrack)
for segment in segments:
speaker_name = get_speaker_name(segment.speaker, participants)
@@ -52,7 +51,9 @@ def transcript_to_text(
def transcript_to_text_timestamped(
topics: list[TranscriptTopic], participants: list[TranscriptParticipant] | None
topics: list[TranscriptTopic],
participants: list[TranscriptParticipant] | None,
is_multitrack: bool = False,
) -> str:
"""Convert transcript topics to timestamped text with speaker names."""
lines = []
@@ -61,7 +62,7 @@ def transcript_to_text_timestamped(
continue
transcript = ProcessorTranscript(words=topic.words)
segments = transcript.as_segments()
segments = transcript.as_segments(is_multitrack)
for segment in segments:
speaker_name = get_speaker_name(segment.speaker, participants)
@@ -73,7 +74,9 @@ def transcript_to_text_timestamped(
def topics_to_webvtt_named(
topics: list[TranscriptTopic], participants: list[TranscriptParticipant] | None
topics: list[TranscriptTopic],
participants: list[TranscriptParticipant] | None,
is_multitrack: bool = False,
) -> str:
"""Convert transcript topics to WebVTT format with participant names."""
vtt = webvtt.WebVTT()
@@ -82,7 +85,8 @@ def topics_to_webvtt_named(
if not topic.words:
continue
segments = words_to_segments(topic.words)
transcript = ProcessorTranscript(words=topic.words)
segments = transcript.as_segments(is_multitrack)
for segment in segments:
speaker_name = get_speaker_name(segment.speaker, participants)
@@ -100,19 +104,23 @@ def topics_to_webvtt_named(
def transcript_to_json_segments(
topics: list[TranscriptTopic], participants: list[TranscriptParticipant] | None
topics: list[TranscriptTopic],
participants: list[TranscriptParticipant] | None,
is_multitrack: bool = False,
) -> list[TranscriptSegment]:
"""Convert transcript topics to a flat list of JSON segments."""
segments = []
result = []
for topic in topics:
if not topic.words:
continue
transcript = ProcessorTranscript(words=topic.words)
for segment in transcript.as_segments():
segments = transcript.as_segments(is_multitrack)
for segment in segments:
speaker_name = get_speaker_name(segment.speaker, participants)
segments.append(
result.append(
TranscriptSegment(
speaker=segment.speaker,
speaker_name=speaker_name,
@@ -122,4 +130,4 @@ def transcript_to_json_segments(
)
)
return segments
return result

View File

@@ -31,6 +31,7 @@ class DailyClient(VideoPlatformClient):
PLATFORM_NAME: Platform = "daily"
TIMESTAMP_FORMAT = "%Y%m%d%H%M%S"
RECORDING_NONE: RecordingType = "none"
RECORDING_LOCAL: RecordingType = "local"
RECORDING_CLOUD: RecordingType = "cloud"
def __init__(self, config: VideoPlatformConfig):
@@ -54,19 +55,23 @@ class DailyClient(VideoPlatformClient):
timestamp = datetime.now().strftime(self.TIMESTAMP_FORMAT)
room_name = f"{room_name_prefix}{ROOM_PREFIX_SEPARATOR}{timestamp}"
enable_recording = None
if room.recording_type == self.RECORDING_LOCAL:
enable_recording = "local"
elif room.recording_type == self.RECORDING_CLOUD:
enable_recording = "raw-tracks"
properties = RoomProperties(
enable_recording="raw-tracks"
if room.recording_type != self.RECORDING_NONE
else False,
enable_recording=enable_recording,
enable_chat=True,
enable_screenshare=True,
enable_knocking=room.is_locked,
start_video_off=False,
start_audio_off=False,
exp=int(end_date.timestamp()),
)
# Only configure recordings_bucket if recording is enabled
if room.recording_type != self.RECORDING_NONE:
if room.recording_type == self.RECORDING_CLOUD:
daily_storage = get_dailyco_storage()
assert daily_storage.bucket_name, "S3 bucket must be configured"
properties.recordings_bucket = RecordingsBucketConfig(
@@ -172,15 +177,16 @@ class DailyClient(VideoPlatformClient):
async def create_meeting_token(
self,
room_name: DailyRoomName,
enable_recording: bool,
start_cloud_recording: bool,
enable_recording_ui: bool,
user_id: NonEmptyString | None = None,
is_owner: bool = False,
) -> NonEmptyString:
properties = MeetingTokenProperties(
room_name=room_name,
user_id=user_id,
start_cloud_recording=enable_recording,
enable_recording_ui=False,
start_cloud_recording=start_cloud_recording,
enable_recording_ui=enable_recording_ui,
is_owner=is_owner,
)
request = CreateMeetingTokenRequest(properties=properties)

View File

@@ -89,7 +89,7 @@ class CreateRoom(BaseModel):
ics_url: Optional[str] = None
ics_fetch_interval: int = 300
ics_enabled: bool = False
platform: Optional[Platform] = None
platform: Platform
class UpdateRoom(BaseModel):
@@ -248,7 +248,7 @@ async def rooms_create(
ics_url=room.ics_url,
ics_fetch_interval=room.ics_fetch_interval,
ics_enabled=room.ics_enabled,
platform=room.platform or settings.DEFAULT_VIDEO_PLATFORM,
platform=room.platform,
)
@@ -310,6 +310,22 @@ async def rooms_create_meeting(
room=room, current_time=current_time
)
if meeting is not None:
settings_match = (
meeting.is_locked == room.is_locked
and meeting.room_mode == room.room_mode
and meeting.recording_type == room.recording_type
and meeting.recording_trigger == room.recording_trigger
and meeting.platform == room.platform
)
if not settings_match:
logger.info(
f"Room settings changed for {room_name}, creating new meeting",
room_id=room.id,
old_meeting_id=meeting.id,
)
meeting = None
if meeting is None:
end_date = current_time + timedelta(hours=8)
@@ -549,21 +565,16 @@ async def rooms_join_meeting(
if meeting.end_date <= current_time:
raise HTTPException(status_code=400, detail="Meeting has ended")
if meeting.platform == "daily":
if meeting.platform == "daily" and user_id is not None:
client = create_platform_client(meeting.platform)
enable_recording = room.recording_trigger != "none"
token = await client.create_meeting_token(
meeting.room_name,
enable_recording=enable_recording,
start_cloud_recording=meeting.recording_type == "cloud",
enable_recording_ui=meeting.recording_type == "local",
user_id=user_id,
is_owner=user_id == room.user_id,
)
meeting = meeting.model_copy()
meeting.room_url = add_query_param(meeting.room_url, "t", token)
if meeting.host_room_url:
meeting.host_room_url = add_query_param(meeting.host_room_url, "t", token)
if user_id != room.user_id and meeting.platform == "whereby":
meeting.host_room_url = ""
return meeting

View File

@@ -16,6 +16,7 @@ from pydantic import (
import reflector.auth as auth
from reflector.db import get_database
from reflector.db.recordings import recordings_controller
from reflector.db.search import (
DEFAULT_SEARCH_LIMIT,
SearchLimit,
@@ -36,6 +37,7 @@ from reflector.db.transcripts import (
TranscriptTopic,
transcripts_controller,
)
from reflector.db.users import user_controller
from reflector.processors.types import Transcript as ProcessorTranscript
from reflector.processors.types import Word
from reflector.schemas.transcript_formats import TranscriptFormat, TranscriptSegment
@@ -60,6 +62,14 @@ ALGORITHM = "HS256"
DOWNLOAD_EXPIRE_MINUTES = 60
async def _get_is_multitrack(transcript) -> bool:
"""Detect if transcript is from multitrack recording."""
if not transcript.recording_id:
return False
recording = await recordings_controller.get_by_id(transcript.recording_id)
return recording is not None and recording.is_multitrack
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
@@ -102,8 +112,12 @@ class GetTranscriptMinimal(BaseModel):
audio_deleted: bool | None = None
class TranscriptParticipantWithEmail(TranscriptParticipant):
email: str | None = None
class GetTranscriptWithParticipants(GetTranscriptMinimal):
participants: list[TranscriptParticipant] | None
participants: list[TranscriptParticipantWithEmail] | None
class GetTranscriptWithText(GetTranscriptWithParticipants):
@@ -360,7 +374,7 @@ class GetTranscriptTopic(BaseModel):
segments: list[GetTranscriptSegmentTopic] = []
@classmethod
def from_transcript_topic(cls, topic: TranscriptTopic):
def from_transcript_topic(cls, topic: TranscriptTopic, is_multitrack: bool = False):
if not topic.words:
# In previous version, words were missing
# Just output a segment with speaker 0
@@ -384,7 +398,7 @@ class GetTranscriptTopic(BaseModel):
start=segment.start,
speaker=segment.speaker,
)
for segment in transcript.as_segments()
for segment in transcript.as_segments(is_multitrack)
]
return cls(
id=topic.id,
@@ -401,8 +415,8 @@ class GetTranscriptTopicWithWords(GetTranscriptTopic):
words: list[Word] = []
@classmethod
def from_transcript_topic(cls, topic: TranscriptTopic):
instance = super().from_transcript_topic(topic)
def from_transcript_topic(cls, topic: TranscriptTopic, is_multitrack: bool = False):
instance = super().from_transcript_topic(topic, is_multitrack)
if topic.words:
instance.words = topic.words
return instance
@@ -417,8 +431,8 @@ class GetTranscriptTopicWithWordsPerSpeaker(GetTranscriptTopic):
words_per_speaker: list[SpeakerWords] = []
@classmethod
def from_transcript_topic(cls, topic: TranscriptTopic):
instance = super().from_transcript_topic(topic)
def from_transcript_topic(cls, topic: TranscriptTopic, is_multitrack: bool = False):
instance = super().from_transcript_topic(topic, is_multitrack)
if topic.words:
words_per_speakers = []
# group words by speaker
@@ -457,6 +471,20 @@ async def transcript_get(
transcript_id, user_id=user_id
)
is_multitrack = await _get_is_multitrack(transcript)
participants = []
if transcript.participants:
user_ids = [p.user_id for p in transcript.participants if p.user_id is not None]
users_dict = await user_controller.get_by_ids(user_ids) if user_ids else {}
for p in transcript.participants:
user = users_dict.get(p.user_id) if p.user_id else None
participants.append(
TranscriptParticipantWithEmail(
**p.model_dump(), email=user.email if user else None
)
)
base_data = {
"id": transcript.id,
"user_id": transcript.user_id,
@@ -476,21 +504,23 @@ async def transcript_get(
"source_kind": transcript.source_kind,
"room_id": transcript.room_id,
"audio_deleted": transcript.audio_deleted,
"participants": transcript.participants,
"participants": participants,
}
if transcript_format == "text":
return GetTranscriptWithText(
**base_data,
transcript_format="text",
transcript=transcript_to_text(transcript.topics, transcript.participants),
transcript=transcript_to_text(
transcript.topics, transcript.participants, is_multitrack
),
)
elif transcript_format == "text-timestamped":
return GetTranscriptWithTextTimestamped(
**base_data,
transcript_format="text-timestamped",
transcript=transcript_to_text_timestamped(
transcript.topics, transcript.participants
transcript.topics, transcript.participants, is_multitrack
),
)
elif transcript_format == "webvtt-named":
@@ -498,7 +528,7 @@ async def transcript_get(
**base_data,
transcript_format="webvtt-named",
transcript=topics_to_webvtt_named(
transcript.topics, transcript.participants
transcript.topics, transcript.participants, is_multitrack
),
)
elif transcript_format == "json":
@@ -506,7 +536,7 @@ async def transcript_get(
**base_data,
transcript_format="json",
transcript=transcript_to_json_segments(
transcript.topics, transcript.participants
transcript.topics, transcript.participants, is_multitrack
),
)
else:
@@ -565,9 +595,12 @@ async def transcript_get_topics(
transcript_id, user_id=user_id
)
is_multitrack = await _get_is_multitrack(transcript)
# convert to GetTranscriptTopic
return [
GetTranscriptTopic.from_transcript_topic(topic) for topic in transcript.topics
GetTranscriptTopic.from_transcript_topic(topic, is_multitrack)
for topic in transcript.topics
]
@@ -584,9 +617,11 @@ async def transcript_get_topics_with_words(
transcript_id, user_id=user_id
)
is_multitrack = await _get_is_multitrack(transcript)
# convert to GetTranscriptTopicWithWords
return [
GetTranscriptTopicWithWords.from_transcript_topic(topic)
GetTranscriptTopicWithWords.from_transcript_topic(topic, is_multitrack)
for topic in transcript.topics
]
@@ -605,13 +640,17 @@ async def transcript_get_topics_with_words_per_speaker(
transcript_id, user_id=user_id
)
is_multitrack = await _get_is_multitrack(transcript)
# get the topic from the transcript
topic = next((t for t in transcript.topics if t.id == topic_id), None)
if not topic:
raise HTTPException(status_code=404, detail="Topic not found")
# convert to GetTranscriptTopicWithWordsPerSpeaker
return GetTranscriptTopicWithWordsPerSpeaker.from_transcript_topic(topic)
return GetTranscriptTopicWithWordsPerSpeaker.from_transcript_topic(
topic, is_multitrack
)
@router.post("/transcripts/{transcript_id}/zulip")

View File

@@ -2,6 +2,7 @@ import json
import os
import re
from datetime import datetime, timezone
from typing import List
from urllib.parse import unquote
import av
@@ -11,7 +12,7 @@ from celery import shared_task
from celery.utils.log import get_task_logger
from pydantic import ValidationError
from reflector.dailyco_api import MeetingParticipantsResponse
from reflector.dailyco_api import RecordingResponse
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
@@ -21,7 +22,6 @@ from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import (
SourceKind,
TranscriptParticipant,
transcripts_controller,
)
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
@@ -38,7 +38,7 @@ from reflector.storage import get_transcripts_storage
from reflector.utils.daily import (
DailyRoomName,
extract_base_room_name,
parse_daily_recording_filename,
filter_cam_audio_tracks,
recording_lock_key,
)
from reflector.video_platforms.factory import create_platform_client
@@ -273,15 +273,7 @@ async def _process_multitrack_recording_inner(
# else: Recording already exists; metadata set at creation time
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if transcript:
await transcripts_controller.update(
transcript,
{
"topics": [],
"participants": [],
},
)
else:
if not transcript:
transcript = await transcripts_controller.add(
"",
source_kind=SourceKind.ROOM,
@@ -294,79 +286,10 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
try:
async with create_platform_client("daily") as daily_client:
id_to_name = {}
id_to_user_id = {}
try:
rec_details = await daily_client.get_recording(recording_id)
mtg_session_id = rec_details.mtgSessionId
if mtg_session_id:
try:
payload: MeetingParticipantsResponse = (
await daily_client.get_meeting_participants(mtg_session_id)
)
for p in payload.data:
pid = p.participant_id
assert (
pid is not None
), "panic! participant id cannot be None"
name = p.user_name
user_id = p.user_id
if name:
id_to_name[pid] = name
if user_id:
id_to_user_id[pid] = user_id
except Exception as e:
logger.warning(
"Failed to fetch Daily meeting participants",
error=str(e),
mtg_session_id=mtg_session_id,
exc_info=True,
)
else:
logger.warning(
"No mtgSessionId found for recording; participant names may be generic",
recording_id=recording_id,
)
except Exception as e:
logger.warning(
"Failed to fetch Daily recording details",
error=str(e),
recording_id=recording_id,
exc_info=True,
)
for idx, key in enumerate(track_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
exc_info=True,
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
)
await transcripts_controller.upsert_participant(transcript, participant)
except Exception as e:
logger.warning("Failed to map participant names", error=str(e), exc_info=True)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=track_keys,
track_keys=filter_cam_audio_tracks(track_keys),
)
@@ -391,7 +314,7 @@ async def poll_daily_recordings():
async with create_platform_client("daily") as daily_client:
# latest 100. TODO cursor-based state
api_recordings = await daily_client.list_recordings()
api_recordings: List[RecordingResponse] = await daily_client.list_recordings()
if not api_recordings:
logger.debug(
@@ -422,17 +345,19 @@ async def poll_daily_recordings():
for recording in missing_recordings:
if not recording.tracks:
assert recording.status != "finished", (
f"Recording {recording.id} has status='finished' but no tracks. "
f"Daily.co API guarantees finished recordings have tracks available. "
f"room_name={recording.room_name}"
)
logger.debug(
"No tracks in recording yet",
recording_id=recording.id,
room_name=recording.room_name,
status=recording.status,
)
if recording.status == "finished":
logger.warning(
"Finished recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
else:
logger.debug(
"No tracks in recording yet",
recording_id=recording.id,
room_name=recording.room_name,
status=recording.status,
)
continue
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]

View File

@@ -318,6 +318,14 @@ async def dummy_storage():
yield
@pytest.fixture
def test_settings():
"""Provide isolated settings for tests to avoid modifying global settings"""
from reflector.settings import Settings
return Settings()
@pytest.fixture(scope="session")
def celery_enable_logging():
return True

View File

@@ -0,0 +1,357 @@
"""Tests for LLM parse error recovery using llama-index Workflow"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from pydantic import BaseModel, Field
from workflows.errors import WorkflowRuntimeError
from reflector.llm import LLM, LLMParseError, StructuredOutputWorkflow
class TestResponse(BaseModel):
"""Test response model for structured output"""
title: str = Field(description="A title")
summary: str = Field(description="A summary")
confidence: float = Field(description="Confidence score", ge=0, le=1)
def make_completion_response(text: str):
"""Create a mock CompletionResponse with .text attribute"""
response = MagicMock()
response.text = text
return response
class TestLLMParseErrorRecovery:
"""Test parse error recovery with Workflow feedback loop"""
@pytest.mark.asyncio
async def test_parse_error_recovery_with_feedback(self, test_settings):
"""Test that parse errors trigger retry with error feedback"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
# TreeSummarize returns plain text analysis (step 1)
mock_summarizer.aget_response = AsyncMock(
return_value="The analysis shows a test with summary and high confidence."
)
call_count = {"count": 0}
async def acomplete_handler(prompt, *args, **kwargs):
call_count["count"] += 1
if call_count["count"] == 1:
# First JSON formatting call returns invalid JSON
return make_completion_response('{"title": "Test"}')
else:
# Second call should have error feedback in prompt
assert "Your previous response could not be parsed:" in prompt
assert '{"title": "Test"}' in prompt
assert "Error:" in prompt
assert "Please try again" in prompt
return make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
mock_settings.llm.acomplete = AsyncMock(side_effect=acomplete_handler)
result = await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert result.title == "Test"
assert result.summary == "Summary"
assert result.confidence == 0.95
# TreeSummarize called once, Settings.llm.acomplete called twice
assert mock_summarizer.aget_response.call_count == 1
assert call_count["count"] == 2
@pytest.mark.asyncio
async def test_max_parse_retry_attempts(self, test_settings):
"""Test that parse error retry stops after max attempts"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
# Always return invalid JSON from acomplete
mock_settings.llm.acomplete = AsyncMock(
return_value=make_completion_response(
'{"invalid": "missing required fields"}'
)
)
with pytest.raises(LLMParseError, match="Failed to parse"):
await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
expected_attempts = test_settings.LLM_PARSE_MAX_RETRIES + 1
# TreeSummarize called once, acomplete called max_retries times
assert mock_summarizer.aget_response.call_count == 1
assert mock_settings.llm.acomplete.call_count == expected_attempts
@pytest.mark.asyncio
async def test_raw_response_logging_on_parse_error(self, test_settings, caplog):
"""Test that raw response is logged when parse error occurs"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
caplog.at_level("ERROR"),
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
call_count = {"count": 0}
async def acomplete_handler(*args, **kwargs):
call_count["count"] += 1
if call_count["count"] == 1:
return make_completion_response('{"title": "Test"}') # Invalid
return make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
mock_settings.llm.acomplete = AsyncMock(side_effect=acomplete_handler)
result = await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert result.title == "Test"
error_logs = [r for r in caplog.records if r.levelname == "ERROR"]
raw_response_logged = any("Raw response:" in r.message for r in error_logs)
assert raw_response_logged, "Raw response should be logged on parse error"
@pytest.mark.asyncio
async def test_multiple_validation_errors_in_feedback(self, test_settings):
"""Test that validation errors are included in feedback"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
call_count = {"count": 0}
async def acomplete_handler(prompt, *args, **kwargs):
call_count["count"] += 1
if call_count["count"] == 1:
# Missing title and summary
return make_completion_response('{"confidence": 0.5}')
else:
# Should have schema validation errors in prompt
assert (
"Schema validation errors" in prompt
or "error" in prompt.lower()
)
return make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
mock_settings.llm.acomplete = AsyncMock(side_effect=acomplete_handler)
result = await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert result.title == "Test"
assert call_count["count"] == 2
@pytest.mark.asyncio
async def test_success_on_first_attempt(self, test_settings):
"""Test that no retry happens when first attempt succeeds"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
mock_settings.llm.acomplete = AsyncMock(
return_value=make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
)
result = await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert result.title == "Test"
assert result.summary == "Summary"
assert result.confidence == 0.95
assert mock_summarizer.aget_response.call_count == 1
assert mock_settings.llm.acomplete.call_count == 1
class TestStructuredOutputWorkflow:
"""Direct tests for the StructuredOutputWorkflow"""
@pytest.mark.asyncio
async def test_workflow_retries_on_validation_error(self):
"""Test workflow retries when validation fails"""
workflow = StructuredOutputWorkflow(
output_cls=TestResponse,
max_retries=3,
timeout=30,
)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
call_count = {"count": 0}
async def acomplete_handler(*args, **kwargs):
call_count["count"] += 1
if call_count["count"] < 2:
return make_completion_response('{"title": "Only title"}')
return make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.9}'
)
mock_settings.llm.acomplete = AsyncMock(side_effect=acomplete_handler)
result = await workflow.run(
prompt="Extract data",
texts=["Some text"],
tone_name=None,
)
assert "success" in result
assert result["success"].title == "Test"
assert call_count["count"] == 2
@pytest.mark.asyncio
async def test_workflow_returns_error_after_max_retries(self):
"""Test workflow returns error after exhausting retries"""
workflow = StructuredOutputWorkflow(
output_cls=TestResponse,
max_retries=2,
timeout=30,
)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
# Always return invalid JSON
mock_settings.llm.acomplete = AsyncMock(
return_value=make_completion_response('{"invalid": true}')
)
result = await workflow.run(
prompt="Extract data",
texts=["Some text"],
tone_name=None,
)
assert "error" in result
# TreeSummarize called once, acomplete called max_retries times
assert mock_summarizer.aget_response.call_count == 1
assert mock_settings.llm.acomplete.call_count == 2
class TestNetworkErrorRetries:
"""Test that network error retries are handled by OpenAILike, not Workflow"""
@pytest.mark.asyncio
async def test_network_error_propagates_after_openai_retries(self, test_settings):
"""Test that network errors are retried by OpenAILike and then propagate.
Network retries are handled by OpenAILike (max_retries=3), not by our
StructuredOutputWorkflow. This test verifies that network errors propagate
up after OpenAILike exhausts its retries.
"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
# Simulate network error from acomplete (after OpenAILike retries exhausted)
network_error = ConnectionError("Connection refused")
mock_settings.llm.acomplete = AsyncMock(side_effect=network_error)
# Network error wrapped in WorkflowRuntimeError
with pytest.raises(WorkflowRuntimeError, match="Connection refused"):
await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
# acomplete called only once - network error propagates, not retried by Workflow
assert mock_settings.llm.acomplete.call_count == 1
@pytest.mark.asyncio
async def test_network_error_not_retried_by_workflow(self, test_settings):
"""Test that Workflow does NOT retry network errors (OpenAILike handles those).
This verifies the separation of concerns:
- StructuredOutputWorkflow: retries parse/validation errors
- OpenAILike: retries network errors (internally, max_retries=3)
"""
workflow = StructuredOutputWorkflow(
output_cls=TestResponse,
max_retries=3,
timeout=30,
)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
# Network error should propagate immediately, not trigger Workflow retry
mock_settings.llm.acomplete = AsyncMock(
side_effect=TimeoutError("Request timed out")
)
# Network error wrapped in WorkflowRuntimeError
with pytest.raises(WorkflowRuntimeError, match="Request timed out"):
await workflow.run(
prompt="Extract data",
texts=["Some text"],
tone_name=None,
)
# Only called once - Workflow doesn't retry network errors
assert mock_settings.llm.acomplete.call_count == 1

View File

@@ -159,3 +159,78 @@ def test_processor_transcript_segment():
assert segments[3].start == 30.72
assert segments[4].start == 31.56
assert segments[5].start == 32.38
def test_processor_transcript_segment_multitrack_interleaved():
"""Test as_segments(is_multitrack=True) with interleaved speakers.
Multitrack recordings have words from different speakers sorted by start time,
causing frequent speaker alternation. The multitrack mode should group by
speaker first, then split into sentences.
"""
from reflector.processors.types import Transcript, Word
# Simulate real multitrack data: words sorted by start time, speakers interleave
# Speaker 0 says: "Hello there."
# Speaker 1 says: "I'm good."
# When sorted by time, words interleave
transcript = Transcript(
words=[
Word(text="Hello ", start=0.0, end=0.5, speaker=0),
Word(text="I'm ", start=0.5, end=0.8, speaker=1),
Word(text="there.", start=0.5, end=1.0, speaker=0),
Word(text="good.", start=1.0, end=1.5, speaker=1),
]
)
# Default behavior (is_multitrack=False): breaks on every speaker change = 4 segments
segments_default = transcript.as_segments(is_multitrack=False)
assert len(segments_default) == 4
# Multitrack behavior: groups by speaker, then sentences = 2 segments
segments_multitrack = transcript.as_segments(is_multitrack=True)
assert len(segments_multitrack) == 2
# Check content - sorted by start time
assert segments_multitrack[0].speaker == 0
assert segments_multitrack[0].text == "Hello there."
assert segments_multitrack[0].start == 0.0
assert segments_multitrack[0].end == 1.0
assert segments_multitrack[1].speaker == 1
assert segments_multitrack[1].text == "I'm good."
assert segments_multitrack[1].start == 0.5
assert segments_multitrack[1].end == 1.5
def test_processor_transcript_segment_multitrack_overlapping_timestamps():
"""Test multitrack with exactly overlapping timestamps (real Daily.co data pattern)."""
from reflector.processors.types import Transcript, Word
# Real pattern from transcript 38d84d57: words with identical timestamps
transcript = Transcript(
words=[
Word(text="speaking ", start=6.71, end=7.11, speaker=0),
Word(text="Speaking ", start=6.71, end=7.11, speaker=1),
Word(text="at ", start=7.11, end=7.27, speaker=0),
Word(text="at ", start=7.11, end=7.27, speaker=1),
Word(text="the ", start=7.27, end=7.43, speaker=0),
Word(text="the ", start=7.27, end=7.43, speaker=1),
Word(text="same ", start=7.43, end=7.59, speaker=0),
Word(text="same ", start=7.43, end=7.59, speaker=1),
Word(text="time.", start=7.59, end=8.0, speaker=0),
Word(text="time.", start=7.59, end=8.0, speaker=1),
]
)
# Default: 10 segments (one per speaker change)
segments_default = transcript.as_segments(is_multitrack=False)
assert len(segments_default) == 10
# Multitrack: 2 segments (one per speaker sentence)
segments_multitrack = transcript.as_segments(is_multitrack=True)
assert len(segments_multitrack) == 2
# Both should have complete sentences
assert "speaking at the same time." in segments_multitrack[0].text
assert "Speaking at the same time." in segments_multitrack[1].text

View File

@@ -273,8 +273,17 @@ async def test_transcript_formats_with_multiple_speakers():
@pytest.mark.asyncio
async def test_transcript_formats_with_overlapping_speakers():
"""Test format conversion when multiple speakers speak at the same time (overlapping timestamps)."""
async def test_transcript_formats_with_overlapping_speakers_multitrack():
"""Test format conversion for multitrack recordings with truly interleaved words.
Multitrack recordings have words from different speakers sorted by start time,
causing frequent speaker alternation. This tests the sentence-based segmentation
that groups each speaker's words into complete sentences.
"""
# Real multitrack data: words sorted by start time, speakers interleave
# Alice says: "Hello there." (0.0-1.0)
# Bob says: "I'm good." (0.5-1.5)
# When sorted by time, words interleave: Hello, I'm, there., good.
topics = [
TranscriptTopic(
id="1",
@@ -282,11 +291,10 @@ async def test_transcript_formats_with_overlapping_speakers():
summary="Summary 1",
timestamp=0.0,
words=[
Word(text="Hello", start=0.0, end=0.5, speaker=0),
Word(text=" there.", start=0.5, end=1.0, speaker=0),
# Speaker 1 overlaps with speaker 0 at 0.5-1.0
Word(text="I'm", start=0.5, end=1.0, speaker=1),
Word(text=" good.", start=1.0, end=1.5, speaker=1),
Word(text="Hello ", start=0.0, end=0.5, speaker=0),
Word(text="I'm ", start=0.5, end=0.8, speaker=1),
Word(text="there.", start=0.5, end=1.0, speaker=0),
Word(text="good.", start=1.0, end=1.5, speaker=1),
],
),
]
@@ -296,20 +304,9 @@ async def test_transcript_formats_with_overlapping_speakers():
TranscriptParticipant(id="2", speaker=1, name="Bob"),
]
text_result = transcript_to_text(topics, participants)
lines = text_result.split("\n")
assert len(lines) >= 2
assert any("Alice:" in line for line in lines)
assert any("Bob:" in line for line in lines)
timestamped_result = transcript_to_text_timestamped(topics, participants)
timestamped_lines = timestamped_result.split("\n")
assert len(timestamped_lines) >= 2
assert any("Alice:" in line for line in timestamped_lines)
assert any("Bob:" in line for line in timestamped_lines)
assert any("[00:00]" in line for line in timestamped_lines)
webvtt_result = topics_to_webvtt_named(topics, participants)
# With is_multitrack=True, should produce 2 segments (one per speaker sentence)
# not 4 segments (one per speaker change)
webvtt_result = topics_to_webvtt_named(topics, participants, is_multitrack=True)
expected_webvtt = """WEBVTT
00:00:00.000 --> 00:00:01.000
@@ -320,23 +317,26 @@ async def test_transcript_formats_with_overlapping_speakers():
"""
assert webvtt_result == expected_webvtt
segments = transcript_to_json_segments(topics, participants)
assert len(segments) >= 2
speakers = {seg.speaker for seg in segments}
assert 0 in speakers and 1 in speakers
text_result = transcript_to_text(topics, participants, is_multitrack=True)
lines = text_result.split("\n")
assert len(lines) == 2
assert "Alice: Hello there." in lines[0]
assert "Bob: I'm good." in lines[1]
alice_seg = next(seg for seg in segments if seg.speaker == 0)
bob_seg = next(seg for seg in segments if seg.speaker == 1)
timestamped_result = transcript_to_text_timestamped(
topics, participants, is_multitrack=True
)
timestamped_lines = timestamped_result.split("\n")
assert len(timestamped_lines) == 2
assert "[00:00] Alice: Hello there." in timestamped_lines[0]
assert "[00:00] Bob: I'm good." in timestamped_lines[1]
# Verify timestamps overlap: Alice (0.0-1.0) and Bob (0.5-1.5) overlap at 0.5-1.0
assert alice_seg.start < bob_seg.end, "Alice segment should start before Bob ends"
assert bob_seg.start < alice_seg.end, "Bob segment should start before Alice ends"
overlap_start = max(alice_seg.start, bob_seg.start)
overlap_end = min(alice_seg.end, bob_seg.end)
assert (
overlap_start < overlap_end
), f"Segments should overlap between {overlap_start} and {overlap_end}"
segments = transcript_to_json_segments(topics, participants, is_multitrack=True)
assert len(segments) == 2
assert segments[0].speaker_name == "Alice"
assert segments[0].text == "Hello there."
assert segments[1].speaker_name == "Bob"
assert segments[1].text == "I'm good."
@pytest.mark.asyncio
@@ -573,3 +573,207 @@ async def test_api_transcript_format_default_is_text(client):
assert data["transcript_format"] == "text"
assert "transcript" in data
@pytest.mark.asyncio
async def test_api_topics_endpoint_multitrack_segmentation(client):
"""Test GET /transcripts/{id}/topics uses sentence-based segmentation for multitrack.
This tests the fix for TASKS2.md - ensuring /topics endpoints correctly detect
multitrack recordings and use sentence-based segmentation instead of fragmenting
on every speaker change.
"""
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.transcripts import (
TranscriptParticipant,
TranscriptTopic,
transcripts_controller,
)
from reflector.processors.types import Word
# Create a multitrack recording (has track_keys)
recording = Recording(
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
track_keys=["track1.webm", "track2.webm"], # This makes it multitrack
)
await recordings_controller.create(recording)
# Create transcript linked to the recording
transcript = await transcripts_controller.add(
name="Multitrack Test",
source_kind="file",
recording_id=recording.id,
)
await transcripts_controller.update(
transcript,
{
"participants": [
TranscriptParticipant(id="1", speaker=0, name="Alice").model_dump(),
TranscriptParticipant(id="2", speaker=1, name="Bob").model_dump(),
]
},
)
# Add interleaved words (as they appear in real multitrack data)
await transcripts_controller.upsert_topic(
transcript,
TranscriptTopic(
title="Topic 1",
summary="Summary 1",
timestamp=0,
words=[
Word(text="Hello ", start=0.0, end=0.5, speaker=0),
Word(text="I'm ", start=0.5, end=0.8, speaker=1),
Word(text="there.", start=0.5, end=1.0, speaker=0),
Word(text="good.", start=1.0, end=1.5, speaker=1),
],
),
)
# Test /topics endpoint
response = await client.get(f"/transcripts/{transcript.id}/topics")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
topic = data[0]
# Key assertion: multitrack should produce 2 segments (one per speaker sentence)
# Not 4 segments (one per speaker change)
assert len(topic["segments"]) == 2
# Check content
segment_texts = [s["text"] for s in topic["segments"]]
assert "Hello there." in segment_texts
assert "I'm good." in segment_texts
@pytest.mark.asyncio
async def test_api_topics_endpoint_non_multitrack_segmentation(client):
"""Test GET /transcripts/{id}/topics uses default segmentation for non-multitrack.
Ensures backward compatibility - transcripts without multitrack recordings
should continue using the default speaker-change-based segmentation.
"""
from reflector.db.transcripts import (
TranscriptParticipant,
TranscriptTopic,
transcripts_controller,
)
from reflector.processors.types import Word
# Create transcript WITHOUT recording (defaulted as not multitrack) TODO better heuristic
response = await client.post("/transcripts", json={"name": "Test transcript"})
assert response.status_code == 200
tid = response.json()["id"]
transcript = await transcripts_controller.get_by_id(tid)
await transcripts_controller.update(
transcript,
{
"participants": [
TranscriptParticipant(id="1", speaker=0, name="Alice").model_dump(),
TranscriptParticipant(id="2", speaker=1, name="Bob").model_dump(),
]
},
)
# Add interleaved words
await transcripts_controller.upsert_topic(
transcript,
TranscriptTopic(
title="Topic 1",
summary="Summary 1",
timestamp=0,
words=[
Word(text="Hello ", start=0.0, end=0.5, speaker=0),
Word(text="I'm ", start=0.5, end=0.8, speaker=1),
Word(text="there.", start=0.5, end=1.0, speaker=0),
Word(text="good.", start=1.0, end=1.5, speaker=1),
],
),
)
# Test /topics endpoint
response = await client.get(f"/transcripts/{tid}/topics")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
topic = data[0]
# Non-multitrack: should produce 4 segments (one per speaker change)
assert len(topic["segments"]) == 4
@pytest.mark.asyncio
async def test_api_topics_with_words_endpoint_multitrack(client):
"""Test GET /transcripts/{id}/topics/with-words uses multitrack segmentation."""
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.transcripts import (
TranscriptParticipant,
TranscriptTopic,
transcripts_controller,
)
from reflector.processors.types import Word
# Create multitrack recording
recording = Recording(
bucket_name="test-bucket",
object_key="test-key-2",
recorded_at=datetime.now(timezone.utc),
track_keys=["track1.webm", "track2.webm"],
)
await recordings_controller.create(recording)
transcript = await transcripts_controller.add(
name="Multitrack Test 2",
source_kind="file",
recording_id=recording.id,
)
await transcripts_controller.update(
transcript,
{
"participants": [
TranscriptParticipant(id="1", speaker=0, name="Alice").model_dump(),
TranscriptParticipant(id="2", speaker=1, name="Bob").model_dump(),
]
},
)
await transcripts_controller.upsert_topic(
transcript,
TranscriptTopic(
title="Topic 1",
summary="Summary 1",
timestamp=0,
words=[
Word(text="Hello ", start=0.0, end=0.5, speaker=0),
Word(text="I'm ", start=0.5, end=0.8, speaker=1),
Word(text="there.", start=0.5, end=1.0, speaker=0),
Word(text="good.", start=1.0, end=1.5, speaker=1),
],
),
)
response = await client.get(f"/transcripts/{transcript.id}/topics/with-words")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
topic = data[0]
# Should have 2 segments (multitrack sentence-based)
assert len(topic["segments"]) == 2
# Should also have words field
assert "words" in topic
assert len(topic["words"]) == 4

View File

@@ -15,9 +15,12 @@ import {
createListCollection,
useDisclosure,
Tabs,
Popover,
Text,
HStack,
} from "@chakra-ui/react";
import { useEffect, useMemo, useState } from "react";
import { LuEye, LuEyeOff } from "react-icons/lu";
import { LuEye, LuEyeOff, LuInfo } from "react-icons/lu";
import useRoomList from "./useRoomList";
import type { components } from "../../reflector-api";
import {
@@ -67,6 +70,11 @@ const recordingTypeOptions: SelectOption[] = [
{ label: "Cloud", value: "cloud" },
];
const platformOptions: SelectOption[] = [
{ label: "Whereby", value: "whereby" },
{ label: "Daily", value: "daily" },
];
const roomInitialState = {
name: "",
zulipAutoPost: false,
@@ -82,6 +90,7 @@ const roomInitialState = {
icsUrl: "",
icsEnabled: false,
icsFetchInterval: 5,
platform: "whereby",
};
export default function RoomsList() {
@@ -99,6 +108,11 @@ export default function RoomsList() {
const recordingTypeCollection = createListCollection({
items: recordingTypeOptions,
});
const platformCollection = createListCollection({
items: platformOptions,
});
const [roomInput, setRoomInput] = useState<null | typeof roomInitialState>(
null,
);
@@ -143,15 +157,24 @@ export default function RoomsList() {
zulipStream: detailedEditedRoom.zulip_stream,
zulipTopic: detailedEditedRoom.zulip_topic,
isLocked: detailedEditedRoom.is_locked,
roomMode: detailedEditedRoom.room_mode,
roomMode:
detailedEditedRoom.platform === "daily"
? "group"
: detailedEditedRoom.room_mode,
recordingType: detailedEditedRoom.recording_type,
recordingTrigger: detailedEditedRoom.recording_trigger,
recordingTrigger:
detailedEditedRoom.platform === "daily"
? detailedEditedRoom.recording_type === "cloud"
? "automatic-2nd-participant"
: "none"
: detailedEditedRoom.recording_trigger,
isShared: detailedEditedRoom.is_shared,
webhookUrl: detailedEditedRoom.webhook_url || "",
webhookSecret: detailedEditedRoom.webhook_secret || "",
icsUrl: detailedEditedRoom.ics_url || "",
icsEnabled: detailedEditedRoom.ics_enabled || false,
icsFetchInterval: detailedEditedRoom.ics_fetch_interval || 5,
platform: detailedEditedRoom.platform,
}
: null,
[detailedEditedRoom],
@@ -277,21 +300,32 @@ export default function RoomsList() {
return;
}
const platform: "whereby" | "daily" | null =
room.platform === "whereby" || room.platform === "daily"
? room.platform
: null;
const roomData = {
name: room.name,
zulip_auto_post: room.zulipAutoPost,
zulip_stream: room.zulipStream,
zulip_topic: room.zulipTopic,
is_locked: room.isLocked,
room_mode: room.roomMode,
room_mode: platform === "daily" ? "group" : room.roomMode,
recording_type: room.recordingType,
recording_trigger: room.recordingTrigger,
recording_trigger:
platform === "daily"
? room.recordingType === "cloud"
? "automatic-2nd-participant"
: "none"
: room.recordingTrigger,
is_shared: room.isShared,
webhook_url: room.webhookUrl,
webhook_secret: room.webhookSecret,
ics_url: room.icsUrl,
ics_enabled: room.icsEnabled,
ics_fetch_interval: room.icsFetchInterval,
platform,
};
if (isEditing) {
@@ -339,15 +373,21 @@ export default function RoomsList() {
zulipStream: roomData.zulip_stream,
zulipTopic: roomData.zulip_topic,
isLocked: roomData.is_locked,
roomMode: roomData.room_mode,
roomMode: roomData.platform === "daily" ? "group" : roomData.room_mode, // Daily always uses 2-200
recordingType: roomData.recording_type,
recordingTrigger: roomData.recording_trigger,
recordingTrigger:
roomData.platform === "daily"
? roomData.recording_type === "cloud"
? "automatic-2nd-participant"
: "none"
: roomData.recording_trigger,
isShared: roomData.is_shared,
webhookUrl: roomData.webhook_url || "",
webhookSecret: roomData.webhook_secret || "",
icsUrl: roomData.ics_url || "",
icsEnabled: roomData.ics_enabled || false,
icsFetchInterval: roomData.ics_fetch_interval || 5,
platform: roomData.platform,
});
setEditRoomId(roomId);
setIsEditing(true);
@@ -482,6 +522,52 @@ export default function RoomsList() {
)}
</Field.Root>
<Field.Root mt={4}>
<Field.Label>Platform</Field.Label>
<Select.Root
value={[room.platform]}
onValueChange={(e) => {
const newPlatform = e.value[0] as "whereby" | "daily";
const updates: Partial<typeof room> = {
platform: newPlatform,
};
if (newPlatform === "daily") {
updates.roomMode = "group";
updates.recordingTrigger =
room.recordingType === "cloud"
? "automatic-2nd-participant"
: "none";
} else {
if (room.recordingType !== "cloud") {
updates.recordingTrigger = "none";
}
}
setRoomInput({ ...room, ...updates });
}}
collection={platformCollection}
>
<Select.HiddenSelect />
<Select.Control>
<Select.Trigger>
<Select.ValueText placeholder="Select platform" />
</Select.Trigger>
<Select.IndicatorGroup>
<Select.Indicator />
</Select.IndicatorGroup>
</Select.Control>
<Select.Positioner>
<Select.Content>
{platformOptions.map((option) => (
<Select.Item key={option.value} item={option}>
{option.label}
<Select.ItemIndicator />
</Select.Item>
))}
</Select.Content>
</Select.Positioner>
</Select.Root>
</Field.Root>
<Field.Root mt={4}>
<Checkbox.Root
name="isLocked"
@@ -504,50 +590,95 @@ export default function RoomsList() {
<Checkbox.Label>Locked room</Checkbox.Label>
</Checkbox.Root>
</Field.Root>
{room.platform !== "daily" && (
<Field.Root mt={4}>
<Field.Label>Room size</Field.Label>
<Select.Root
value={[room.roomMode]}
onValueChange={(e) =>
setRoomInput({ ...room, roomMode: e.value[0] })
}
collection={roomModeCollection}
>
<Select.HiddenSelect />
<Select.Control>
<Select.Trigger>
<Select.ValueText placeholder="Select room size" />
</Select.Trigger>
<Select.IndicatorGroup>
<Select.Indicator />
</Select.IndicatorGroup>
</Select.Control>
<Select.Positioner>
<Select.Content>
{roomModeOptions.map((option) => (
<Select.Item key={option.value} item={option}>
{option.label}
<Select.ItemIndicator />
</Select.Item>
))}
</Select.Content>
</Select.Positioner>
</Select.Root>
</Field.Root>
)}
<Field.Root mt={4}>
<Field.Label>Room size</Field.Label>
<Select.Root
value={[room.roomMode]}
onValueChange={(e) =>
setRoomInput({ ...room, roomMode: e.value[0] })
}
collection={roomModeCollection}
>
<Select.HiddenSelect />
<Select.Control>
<Select.Trigger>
<Select.ValueText placeholder="Select room size" />
</Select.Trigger>
<Select.IndicatorGroup>
<Select.Indicator />
</Select.IndicatorGroup>
</Select.Control>
<Select.Positioner>
<Select.Content>
{roomModeOptions.map((option) => (
<Select.Item key={option.value} item={option}>
{option.label}
<Select.ItemIndicator />
</Select.Item>
))}
</Select.Content>
</Select.Positioner>
</Select.Root>
</Field.Root>
<Field.Root mt={4}>
<Field.Label>Recording type</Field.Label>
<HStack gap={2} alignItems="center">
<Field.Label>Recording type</Field.Label>
<Popover.Root>
<Popover.Trigger asChild>
<IconButton
aria-label="Recording type help"
variant="ghost"
size="xs"
colorPalette="gray"
>
<LuInfo />
</IconButton>
</Popover.Trigger>
<Popover.Positioner>
<Popover.Content>
<Popover.Arrow />
<Popover.Body>
<Text fontSize="sm" lineHeight="1.6">
<strong>None:</strong> No recording will be
created.
<br />
<br />
<strong>Local:</strong> Recording happens on
each participant's device. Files are saved
locally.
<br />
<br />
<strong>Cloud:</strong> Recording happens on
the platform's servers and is available after
the meeting ends.
</Text>
</Popover.Body>
</Popover.Content>
</Popover.Positioner>
</Popover.Root>
</HStack>
<Select.Root
value={[room.recordingType]}
onValueChange={(e) =>
setRoomInput({
...room,
recordingType: e.value[0],
recordingTrigger:
e.value[0] !== "cloud"
onValueChange={(e) => {
const newRecordingType = e.value[0];
const updates: Partial<typeof room> = {
recordingType: newRecordingType,
};
if (room.platform === "daily") {
updates.recordingTrigger =
newRecordingType === "cloud"
? "automatic-2nd-participant"
: "none";
} else {
updates.recordingTrigger =
newRecordingType !== "cloud"
? "none"
: room.recordingTrigger,
})
}
: room.recordingTrigger;
}
setRoomInput({ ...room, ...updates });
}}
collection={recordingTypeCollection}
>
<Select.HiddenSelect />
@@ -571,40 +702,77 @@ export default function RoomsList() {
</Select.Positioner>
</Select.Root>
</Field.Root>
<Field.Root mt={4}>
<Field.Label>Cloud recording start trigger</Field.Label>
<Select.Root
value={[room.recordingTrigger]}
onValueChange={(e) =>
setRoomInput({
...room,
recordingTrigger: e.value[0],
})
}
collection={recordingTriggerCollection}
disabled={room.recordingType !== "cloud"}
>
<Select.HiddenSelect />
<Select.Control>
<Select.Trigger>
<Select.ValueText placeholder="Select trigger" />
</Select.Trigger>
<Select.IndicatorGroup>
<Select.Indicator />
</Select.IndicatorGroup>
</Select.Control>
<Select.Positioner>
<Select.Content>
{recordingTriggerOptions.map((option) => (
<Select.Item key={option.value} item={option}>
{option.label}
<Select.ItemIndicator />
</Select.Item>
))}
</Select.Content>
</Select.Positioner>
</Select.Root>
</Field.Root>
{room.recordingType === "cloud" &&
room.platform !== "daily" && (
<Field.Root mt={4}>
<HStack gap={2} alignItems="center">
<Field.Label>Recording start trigger</Field.Label>
<Popover.Root>
<Popover.Trigger asChild>
<IconButton
aria-label="Recording start trigger help"
variant="ghost"
size="xs"
colorPalette="gray"
>
<LuInfo />
</IconButton>
</Popover.Trigger>
<Popover.Positioner>
<Popover.Content>
<Popover.Arrow />
<Popover.Body>
<Text fontSize="sm" lineHeight="1.6">
<strong>None:</strong> Recording must be
started manually by a participant.
<br />
<br />
<strong>Prompt:</strong> Participants will
be prompted to start recording when they
join.
<br />
<br />
<strong>Automatic:</strong> Recording
starts automatically when a second
participant joins.
</Text>
</Popover.Body>
</Popover.Content>
</Popover.Positioner>
</Popover.Root>
</HStack>
<Select.Root
value={[room.recordingTrigger]}
onValueChange={(e) =>
setRoomInput({
...room,
recordingTrigger: e.value[0],
})
}
collection={recordingTriggerCollection}
>
<Select.HiddenSelect />
<Select.Control>
<Select.Trigger>
<Select.ValueText placeholder="Select trigger" />
</Select.Trigger>
<Select.IndicatorGroup>
<Select.Indicator />
</Select.IndicatorGroup>
</Select.Control>
<Select.Positioner>
<Select.Content>
{recordingTriggerOptions.map((option) => (
<Select.Item key={option.value} item={option}>
{option.label}
<Select.ItemIndicator />
</Select.Item>
))}
</Select.Content>
</Select.Positioner>
</Select.Root>
</Field.Root>
)}
<Field.Root mt={4}>
<Checkbox.Root

View File

@@ -117,15 +117,6 @@ export default function TranscriptDetails(details: TranscriptDetails) {
return <Modal title="Loading" text={"Loading transcript..."} />;
}
if (mp3.error) {
return (
<Modal
title="Transcription error"
text={`There was an error loading the recording. Error: ${mp3.error}`}
/>
);
}
return (
<>
<Grid
@@ -147,7 +138,12 @@ export default function TranscriptDetails(details: TranscriptDetails) {
/>
) : !mp3.loading && (waveform.error || mp3.error) ? (
<Box p={4} bg="red.100" borderRadius="md">
<Text>Error loading this recording</Text>
<Text>
Error loading{" "}
{[waveform.error && "waveform", mp3.error && "mp3"]
.filter(Boolean)
.join(" and ")}
</Text>
</Box>
) : (
<Skeleton h={14} />

View File

@@ -11,6 +11,7 @@ import {
recordingTypeRequiresConsent,
} from "../../lib/consent";
import { useRoomJoinMeeting } from "../../lib/apiHooks";
import { assertExists } from "../../lib/utils";
type Meeting = components["schemas"]["Meeting"];
@@ -22,16 +23,15 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
const router = useRouter();
const params = useParams();
const auth = useAuth();
const status = auth.status;
const authLastUserId = auth.lastUserId;
const containerRef = useRef<HTMLDivElement>(null);
const joinMutation = useRoomJoinMeeting();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
const roomName = params?.roomName as string;
// Always call /join to get a fresh token with user_id
useEffect(() => {
if (status === "loading" || !meeting?.id || !roomName) return;
if (authLastUserId === undefined || !meeting?.id || !roomName) return;
const join = async () => {
try {
@@ -50,18 +50,17 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
};
join();
}, [meeting?.id, roomName, status]);
}, [meeting?.id, roomName, authLastUserId]);
const roomUrl = joinedMeeting?.host_room_url || joinedMeeting?.room_url;
const isLoading =
status === "loading" || joinMutation.isPending || !joinedMeeting;
const roomUrl = joinedMeeting?.room_url;
const handleLeave = useCallback(() => {
router.push("/browse");
}, [router]);
useEffect(() => {
if (isLoading || !roomUrl || !containerRef.current) return;
if (authLastUserId === undefined || !roomUrl || !containerRef.current)
return;
let frame: DailyCall | null = null;
let destroyed = false;
@@ -92,7 +91,15 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
frame.on("joined-meeting", async () => {
try {
await frame.startRecording({ type: "raw-tracks" });
const frameInstance = assertExists(
frame,
"frame object got lost somewhere after frame.on was called",
);
if (meeting.recording_type === "cloud") {
console.log("Starting cloud recording");
await frameInstance.startRecording({ type: "raw-tracks" });
}
} catch (error) {
console.error("Failed to start recording:", error);
}
@@ -104,7 +111,9 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
}
};
createAndJoin();
createAndJoin().catch((error) => {
console.error("Failed to create and join meeting:", error);
});
return () => {
destroyed = true;
@@ -114,9 +123,9 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
});
}
};
}, [roomUrl, isLoading, handleLeave]);
}, [roomUrl, authLastUserId, handleLeave]);
if (isLoading) {
if (authLastUserId === undefined) {
return (
<Center width="100vw" height="100vh">
<Spinner size="xl" />

View File

@@ -1,6 +1,6 @@
"use client";
import { createContext, useContext } from "react";
import { createContext, useContext, useRef } from "react";
import { useSession as useNextAuthSession } from "next-auth/react";
import { signOut, signIn } from "next-auth/react";
import { configureApiAuth } from "./apiClient";
@@ -25,6 +25,9 @@ type AuthContextType = (
update: () => Promise<Session | null>;
signIn: typeof signIn;
signOut: typeof signOut;
// TODO probably rename isLoading to isReloading and make THIS field "isLoading"
// undefined is "not known", null is "is certainly logged out"
lastUserId: CustomSession["user"]["id"] | null | undefined;
};
const AuthContext = createContext<AuthContextType | undefined>(undefined);
@@ -41,10 +44,15 @@ const noopAuthContext: AuthContextType = {
signOut: async () => {
throw new Error("signOut not supposed to be called");
},
lastUserId: undefined,
};
export function AuthProvider({ children }: { children: React.ReactNode }) {
const { data: session, status, update } = useNextAuthSession();
// referential comparison done in component, must be primitive /or cached
const lastUserId = useRef<CustomSession["user"]["id"] | null | undefined>(
null,
);
const contextValue: AuthContextType = isAuthEnabled
? {
@@ -73,11 +81,16 @@ export function AuthProvider({ children }: { children: React.ReactNode }) {
case "authenticated": {
const customSession = assertCustomSession(session);
if (customSession?.error === REFRESH_ACCESS_TOKEN_ERROR) {
// warning: call order-dependent
lastUserId.current = null;
// token had expired but next auth still returns "authenticated" so show user unauthenticated state
return {
status: "unauthenticated" as const,
};
} else if (customSession?.accessToken) {
// updates anyways with updated properties below
// warning! execution order conscience, must be ran before reading lastUserId.current below
lastUserId.current = customSession.user.id;
return {
status,
accessToken: customSession.accessToken,
@@ -92,6 +105,8 @@ export function AuthProvider({ children }: { children: React.ReactNode }) {
}
}
case "unauthenticated": {
// warning: call order-dependent
lastUserId.current = null;
return { status: "unauthenticated" as const };
}
default: {
@@ -103,6 +118,8 @@ export function AuthProvider({ children }: { children: React.ReactNode }) {
update,
signIn,
signOut,
// for optimistic cases when we assume "loading" doesn't immediately invalidate the user
lastUserId: lastUserId.current,
}
: noopAuthContext;

View File

@@ -148,7 +148,7 @@ export const authOptions = (): AuthOptions =>
},
async session({ session, token }) {
const extendedToken = token as JWTWithAccessToken;
console.log("extendedToken", extendedToken);
const userId = await getUserId(extendedToken.accessToken);
return {

View File

@@ -31,7 +31,7 @@
"ioredis": "^5.7.0",
"jest-worker": "^29.6.2",
"lucide-react": "^0.525.0",
"next": "^15.5.3",
"next": "^15.5.7",
"next-auth": "^4.24.7",
"next-themes": "^0.4.6",
"nuqs": "^2.4.3",

100
www/pnpm-lock.yaml generated
View File

@@ -27,7 +27,7 @@ importers:
version: 0.2.3(@fortawesome/fontawesome-svg-core@6.7.2)(react@18.3.1)
"@sentry/nextjs":
specifier: ^10.11.0
version: 10.11.0(@opentelemetry/context-async-hooks@2.1.0(@opentelemetry/api@1.9.0))(@opentelemetry/core@2.1.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@2.1.0(@opentelemetry/api@1.9.0))(next@15.5.3(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0))(react@18.3.1)(webpack@5.101.3)
version: 10.11.0(@opentelemetry/context-async-hooks@2.1.0(@opentelemetry/api@1.9.0))(@opentelemetry/core@2.1.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@2.1.0(@opentelemetry/api@1.9.0))(next@15.5.7(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0))(react@18.3.1)(webpack@5.101.3)
"@tanstack/react-query":
specifier: ^5.85.9
version: 5.85.9(react@18.3.1)
@@ -62,17 +62,17 @@ importers:
specifier: ^0.525.0
version: 0.525.0(react@18.3.1)
next:
specifier: ^15.5.3
version: 15.5.3(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0)
specifier: ^15.5.7
version: 15.5.7(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0)
next-auth:
specifier: ^4.24.7
version: 4.24.11(next@15.5.3(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0))(react-dom@18.3.1(react@18.3.1))(react@18.3.1)
version: 4.24.11(next@15.5.7(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0))(react-dom@18.3.1(react@18.3.1))(react@18.3.1)
next-themes:
specifier: ^0.4.6
version: 0.4.6(react-dom@18.3.1(react@18.3.1))(react@18.3.1)
nuqs:
specifier: ^2.4.3
version: 2.4.3(next@15.5.3(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0))(react@18.3.1)
version: 2.4.3(next@15.5.7(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0))(react@18.3.1)
openapi-fetch:
specifier: ^0.14.0
version: 0.14.0
@@ -1184,10 +1184,10 @@ packages:
integrity: sha512-ZVWUcfwY4E/yPitQJl481FjFo3K22D6qF0DuFH6Y/nbnE11GY5uguDxZMGXPQ8WQ0128MXQD7TnfHyK4oWoIJQ==,
}
"@next/env@15.5.3":
"@next/env@15.5.7":
resolution:
{
integrity: sha512-RSEDTRqyihYXygx/OJXwvVupfr9m04+0vH8vyy0HfZ7keRto6VX9BbEk0J2PUk0VGy6YhklJUSrgForov5F9pw==,
integrity: sha512-4h6Y2NyEkIEN7Z8YxkA27pq6zTkS09bUSYC0xjd0NpwFxjnIKeZEeH591o5WECSmjpUhLn3H2QLJcDye3Uzcvg==,
}
"@next/eslint-plugin-next@15.5.3":
@@ -1196,73 +1196,73 @@ packages:
integrity: sha512-SdhaKdko6dpsSr0DldkESItVrnPYB1NS2NpShCSX5lc7SSQmLZt5Mug6t2xbiuVWEVDLZSuIAoQyYVBYp0dR5g==,
}
"@next/swc-darwin-arm64@15.5.3":
"@next/swc-darwin-arm64@15.5.7":
resolution:
{
integrity: sha512-nzbHQo69+au9wJkGKTU9lP7PXv0d1J5ljFpvb+LnEomLtSbJkbZyEs6sbF3plQmiOB2l9OBtN2tNSvCH1nQ9Jg==,
integrity: sha512-IZwtxCEpI91HVU/rAUOOobWSZv4P2DeTtNaCdHqLcTJU4wdNXgAySvKa/qJCgR5m6KI8UsKDXtO2B31jcaw1Yw==,
}
engines: { node: ">= 10" }
cpu: [arm64]
os: [darwin]
"@next/swc-darwin-x64@15.5.3":
"@next/swc-darwin-x64@15.5.7":
resolution:
{
integrity: sha512-w83w4SkOOhekJOcA5HBvHyGzgV1W/XvOfpkrxIse4uPWhYTTRwtGEM4v/jiXwNSJvfRvah0H8/uTLBKRXlef8g==,
integrity: sha512-UP6CaDBcqaCBuiq/gfCEJw7sPEoX1aIjZHnBWN9v9qYHQdMKvCKcAVs4OX1vIjeE+tC5EIuwDTVIoXpUes29lg==,
}
engines: { node: ">= 10" }
cpu: [x64]
os: [darwin]
"@next/swc-linux-arm64-gnu@15.5.3":
"@next/swc-linux-arm64-gnu@15.5.7":
resolution:
{
integrity: sha512-+m7pfIs0/yvgVu26ieaKrifV8C8yiLe7jVp9SpcIzg7XmyyNE7toC1fy5IOQozmr6kWl/JONC51osih2RyoXRw==,
integrity: sha512-NCslw3GrNIw7OgmRBxHtdWFQYhexoUCq+0oS2ccjyYLtcn1SzGzeM54jpTFonIMUjNbHmpKpziXnpxhSWLcmBA==,
}
engines: { node: ">= 10" }
cpu: [arm64]
os: [linux]
"@next/swc-linux-arm64-musl@15.5.3":
"@next/swc-linux-arm64-musl@15.5.7":
resolution:
{
integrity: sha512-u3PEIzuguSenoZviZJahNLgCexGFhso5mxWCrrIMdvpZn6lkME5vc/ADZG8UUk5K1uWRy4hqSFECrON6UKQBbQ==,
integrity: sha512-nfymt+SE5cvtTrG9u1wdoxBr9bVB7mtKTcj0ltRn6gkP/2Nu1zM5ei8rwP9qKQP0Y//umK+TtkKgNtfboBxRrw==,
}
engines: { node: ">= 10" }
cpu: [arm64]
os: [linux]
"@next/swc-linux-x64-gnu@15.5.3":
"@next/swc-linux-x64-gnu@15.5.7":
resolution:
{
integrity: sha512-lDtOOScYDZxI2BENN9m0pfVPJDSuUkAD1YXSvlJF0DKwZt0WlA7T7o3wrcEr4Q+iHYGzEaVuZcsIbCps4K27sA==,
integrity: sha512-hvXcZvCaaEbCZcVzcY7E1uXN9xWZfFvkNHwbe/n4OkRhFWrs1J1QV+4U1BN06tXLdaS4DazEGXwgqnu/VMcmqw==,
}
engines: { node: ">= 10" }
cpu: [x64]
os: [linux]
"@next/swc-linux-x64-musl@15.5.3":
"@next/swc-linux-x64-musl@15.5.7":
resolution:
{
integrity: sha512-9vWVUnsx9PrY2NwdVRJ4dUURAQ8Su0sLRPqcCCxtX5zIQUBES12eRVHq6b70bbfaVaxIDGJN2afHui0eDm+cLg==,
integrity: sha512-4IUO539b8FmF0odY6/SqANJdgwn1xs1GkPO5doZugwZ3ETF6JUdckk7RGmsfSf7ws8Qb2YB5It33mvNL/0acqA==,
}
engines: { node: ">= 10" }
cpu: [x64]
os: [linux]
"@next/swc-win32-arm64-msvc@15.5.3":
"@next/swc-win32-arm64-msvc@15.5.7":
resolution:
{
integrity: sha512-1CU20FZzY9LFQigRi6jM45oJMU3KziA5/sSG+dXeVaTm661snQP6xu3ykGxxwU5sLG3sh14teO/IOEPVsQMRfA==,
integrity: sha512-CpJVTkYI3ZajQkC5vajM7/ApKJUOlm6uP4BknM3XKvJ7VXAvCqSjSLmM0LKdYzn6nBJVSjdclx8nYJSa3xlTgQ==,
}
engines: { node: ">= 10" }
cpu: [arm64]
os: [win32]
"@next/swc-win32-x64-msvc@15.5.3":
"@next/swc-win32-x64-msvc@15.5.7":
resolution:
{
integrity: sha512-JMoLAq3n3y5tKXPQwCK5c+6tmwkuFDa2XAxz8Wm4+IVthdBZdZGh+lmiLUHg9f9IDwIQpUjp+ysd6OkYTyZRZw==,
integrity: sha512-gMzgBX164I6DN+9/PGA+9dQiwmTkE4TloBNx8Kv9UiGARsr9Nba7IpcBRA1iTV9vwlYnrE3Uy6I7Aj6qLjQuqw==,
}
engines: { node: ">= 10" }
cpu: [x64]
@@ -6863,10 +6863,10 @@ packages:
react: ^16.8 || ^17 || ^18 || ^19 || ^19.0.0-rc
react-dom: ^16.8 || ^17 || ^18 || ^19 || ^19.0.0-rc
next@15.5.3:
next@15.5.7:
resolution:
{
integrity: sha512-r/liNAx16SQj4D+XH/oI1dlpv9tdKJ6cONYPwwcCC46f2NjpaRWY+EKCzULfgQYV6YKXjHBchff2IZBSlZmJNw==,
integrity: sha512-+t2/0jIJ48kUpGKkdlhgkv+zPTEOoXyr60qXe68eB/pl3CMJaLeIGjzp5D6Oqt25hCBiBTt8wEeeAzfJvUKnPQ==,
}
engines: { node: ^18.18.0 || ^19.8.0 || >= 20.0.0 }
hasBin: true
@@ -9877,34 +9877,34 @@ snapshots:
"@tybys/wasm-util": 0.10.0
optional: true
"@next/env@15.5.3": {}
"@next/env@15.5.7": {}
"@next/eslint-plugin-next@15.5.3":
dependencies:
fast-glob: 3.3.1
"@next/swc-darwin-arm64@15.5.3":
"@next/swc-darwin-arm64@15.5.7":
optional: true
"@next/swc-darwin-x64@15.5.3":
"@next/swc-darwin-x64@15.5.7":
optional: true
"@next/swc-linux-arm64-gnu@15.5.3":
"@next/swc-linux-arm64-gnu@15.5.7":
optional: true
"@next/swc-linux-arm64-musl@15.5.3":
"@next/swc-linux-arm64-musl@15.5.7":
optional: true
"@next/swc-linux-x64-gnu@15.5.3":
"@next/swc-linux-x64-gnu@15.5.7":
optional: true
"@next/swc-linux-x64-musl@15.5.3":
"@next/swc-linux-x64-musl@15.5.7":
optional: true
"@next/swc-win32-arm64-msvc@15.5.3":
"@next/swc-win32-arm64-msvc@15.5.7":
optional: true
"@next/swc-win32-x64-msvc@15.5.3":
"@next/swc-win32-x64-msvc@15.5.7":
optional: true
"@nodelib/fs.scandir@2.1.5":
@@ -10684,7 +10684,7 @@ snapshots:
"@sentry/core@8.55.0": {}
"@sentry/nextjs@10.11.0(@opentelemetry/context-async-hooks@2.1.0(@opentelemetry/api@1.9.0))(@opentelemetry/core@2.1.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@2.1.0(@opentelemetry/api@1.9.0))(next@15.5.3(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0))(react@18.3.1)(webpack@5.101.3)":
"@sentry/nextjs@10.11.0(@opentelemetry/context-async-hooks@2.1.0(@opentelemetry/api@1.9.0))(@opentelemetry/core@2.1.0(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@2.1.0(@opentelemetry/api@1.9.0))(next@15.5.7(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0))(react@18.3.1)(webpack@5.101.3)":
dependencies:
"@opentelemetry/api": 1.9.0
"@opentelemetry/semantic-conventions": 1.37.0
@@ -10698,7 +10698,7 @@ snapshots:
"@sentry/vercel-edge": 10.11.0
"@sentry/webpack-plugin": 4.3.0(webpack@5.101.3)
chalk: 3.0.0
next: 15.5.3(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0)
next: 15.5.7(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0)
resolve: 1.22.8
rollup: 4.50.1
stacktrace-parser: 0.1.11
@@ -14093,13 +14093,13 @@ snapshots:
neo-async@2.6.2: {}
next-auth@4.24.11(next@15.5.3(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0))(react-dom@18.3.1(react@18.3.1))(react@18.3.1):
next-auth@4.24.11(next@15.5.7(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0))(react-dom@18.3.1(react@18.3.1))(react@18.3.1):
dependencies:
"@babel/runtime": 7.28.2
"@panva/hkdf": 1.2.1
cookie: 0.7.2
jose: 4.15.9
next: 15.5.3(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0)
next: 15.5.7(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0)
oauth: 0.9.15
openid-client: 5.7.1
preact: 10.27.0
@@ -14113,9 +14113,9 @@ snapshots:
react: 18.3.1
react-dom: 18.3.1(react@18.3.1)
next@15.5.3(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0):
next@15.5.7(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0):
dependencies:
"@next/env": 15.5.3
"@next/env": 15.5.7
"@swc/helpers": 0.5.15
caniuse-lite: 1.0.30001734
postcss: 8.4.31
@@ -14123,14 +14123,14 @@ snapshots:
react-dom: 18.3.1(react@18.3.1)
styled-jsx: 5.1.6(@babel/core@7.28.3)(babel-plugin-macros@3.1.0)(react@18.3.1)
optionalDependencies:
"@next/swc-darwin-arm64": 15.5.3
"@next/swc-darwin-x64": 15.5.3
"@next/swc-linux-arm64-gnu": 15.5.3
"@next/swc-linux-arm64-musl": 15.5.3
"@next/swc-linux-x64-gnu": 15.5.3
"@next/swc-linux-x64-musl": 15.5.3
"@next/swc-win32-arm64-msvc": 15.5.3
"@next/swc-win32-x64-msvc": 15.5.3
"@next/swc-darwin-arm64": 15.5.7
"@next/swc-darwin-x64": 15.5.7
"@next/swc-linux-arm64-gnu": 15.5.7
"@next/swc-linux-arm64-musl": 15.5.7
"@next/swc-linux-x64-gnu": 15.5.7
"@next/swc-linux-x64-musl": 15.5.7
"@next/swc-win32-arm64-msvc": 15.5.7
"@next/swc-win32-x64-msvc": 15.5.7
"@opentelemetry/api": 1.9.0
sass: 1.90.0
sharp: 0.34.3
@@ -14159,12 +14159,12 @@ snapshots:
dependencies:
path-key: 3.1.1
nuqs@2.4.3(next@15.5.3(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0))(react@18.3.1):
nuqs@2.4.3(next@15.5.7(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0))(react@18.3.1):
dependencies:
mitt: 3.0.1
react: 18.3.1
optionalDependencies:
next: 15.5.3(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0)
next: 15.5.7(@babel/core@7.28.3)(@opentelemetry/api@1.9.0)(babel-plugin-macros@3.1.0)(react-dom@18.3.1(react@18.3.1))(react@18.3.1)(sass@1.90.0)
oauth@0.9.15: {}