Compare commits

...

73 Commits

Author SHA1 Message Date
Igor Loskutov
341884085d migration sync 2025-12-20 11:12:29 -05:00
Igor Loskutov
c9325ea4a4 Merge branch 'main' into feat/durable 2025-12-20 11:07:04 -05:00
Igor Loskutov
f163111b4a durable_started return 2025-12-19 21:07:12 -05:00
f0ee7b531a fix: logout redirect (#802) 2025-12-19 17:19:09 +01:00
37a454f283 chore(main): release 0.24.0 (#793) 2025-12-19 15:00:43 +01:00
964cd78bb6 feat: identify action items (#790)
* Identify action items

* Add action items to mock summary

* Add action items validator

* Remove final prefix from action items

* Make on action items callback required

* Don't mutation action items response

* Assign action items to none on error

* Use timeout constant

* Exclude action items from transcript list
2025-12-18 21:13:47 +01:00
5f458aa4a7 fix: automatically reprocess daily recordings (#797)
* Automatically reprocess recordings

* Restore the comments

* Remove redundant check

* Fix indent

* Add comment about cyclic import
2025-12-18 21:10:04 +01:00
5f7dfadabd fix: retry on workflow timeout (#798) 2025-12-18 20:49:06 +01:00
0bc971ba96 fix: main menu login (#800) 2025-12-18 20:48:39 +01:00
Igor Loskutov
84c1a57c83 better log webhook events 2025-12-18 14:22:03 -05:00
Igor Loskutov
af425e6dfd pr autoreviewer fixes 2025-12-18 13:53:39 -05:00
Igor Loskutov
28007e846f add forgotten file 2025-12-18 13:46:35 -05:00
Igor Loskutov
17a93b7393 can_replay cancelled 2025-12-18 13:44:57 -05:00
Igor Loskutov
0ce38dfeb3 self-review round 2025-12-18 13:39:04 -05:00
Igor Loskutov
8272c79856 self-review round 2025-12-18 13:15:18 -05:00
Igor Loskutov
acad80df50 self-review round 2025-12-18 12:46:05 -05:00
Igor Loskutov
61e2b3211e self-review wip 2025-12-18 11:42:32 -05:00
Igor Loskutov
bf90bd076b Merge branch 'feat/durable' of github-monadical:Monadical-SAS/reflector into feat/durable 2025-12-17 15:47:07 -05:00
Igor Loskutov
557073850e more NES instead of str 2025-12-17 15:46:59 -05:00
Igor Monadical
ce6b185bf7 Merge branch 'main' into feat/durable 2025-12-17 15:42:03 -05:00
Igor Loskutov
cb41e9e779 self-review round 2025-12-17 15:25:29 -05:00
Igor Loskutov
f7f2957fc9 dry hatched with celery - 2 2025-12-17 15:11:33 -05:00
Igor Loskutov
d683a83906 dry hatchet with celery 2025-12-17 14:48:23 -05:00
Igor Loskutov
e77f38a12a self-review round 2025-12-17 13:51:50 -05:00
Igor Loskutov
6ae621eadd self-review round 2025-12-17 13:29:17 -05:00
Igor Loskutov
6ae8f1d870 self-review round 2025-12-17 13:05:08 -05:00
Igor Loskutov
7a29c742c5 hatchet: restore zullip report 2025-12-17 11:06:27 -05:00
Igor Monadical
c62e3c0753 incorporate daily api undocumented feature (#796)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-17 09:51:55 -05:00
Igor Loskutov
298abe8656 self-review (no-mistakes) 2025-12-16 22:59:56 -05:00
Igor Loskutov
67420d2ec4 self-review (no-mistakes) 2025-12-16 22:47:09 -05:00
Igor Loskutov
4b00dda0ca hatchet init db 2025-12-16 17:24:16 -05:00
Igor Loskutov
7591387e52 cleanup 2025-12-16 16:49:42 -05:00
Igor Loskutov
447bf97854 . 2025-12-16 16:44:15 -05:00
Igor Loskutov
c280e8dc1d and add hatchet processor setting to room 2025-12-16 16:40:18 -05:00
Igor Loskutov
9b8f76929e remove shadow mode for hatchet 2025-12-16 16:39:52 -05:00
Igor Loskutov
409c257889 hatched logs 2025-12-16 16:31:29 -05:00
Igor Loskutov
fce0945564 self-review (no-mistakes) 2025-12-16 16:04:52 -05:00
Igor Loskutov
e81e0cb5c3 remove conductor and add hatchet tests (no-mistakes) 2025-12-16 13:24:05 -05:00
Igor Loskutov
1f49deb5b5 hatchet no-mistake, better logging 2025-12-16 12:26:59 -05:00
Igor Loskutov
0f266eabdf hatchet no-mistake 2025-12-16 12:09:02 -05:00
Igor Loskutov
c5498d26bf hatchet no-mistake 2025-12-16 00:48:58 -05:00
Igor Monadical
16284e1ac3 fix: daily video optimisation (#789)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-15 15:00:53 -05:00
Igor Monadical
443982617d coolify pull policy (#792)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-15 14:54:05 -05:00
Igor Monadical
23023b3cdb update nextjs (#791)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-15 13:58:34 -05:00
Igor Loskutov
243ff2177c durable (no-mistakes) 2025-12-15 12:18:47 -05:00
90c3ecc9c3 chore(main): release 0.23.2 (#786) 2025-12-11 13:37:41 +01:00
d7f140b7d1 fix: build on push tags (#785) 2025-12-11 13:30:36 +01:00
a47a5f5781 chore(main): release 0.23.1 (#784) 2025-12-11 12:43:25 +01:00
0eba147018 fix: populate room_name in transcript GET endpoint (#783)
Fixes monadical/internalai#14
2025-12-11 12:37:59 +01:00
18a27f7b45 Fix image tags (#781) 2025-12-10 13:57:13 -05:00
32a049c134 chore(main): release 0.23.0 (#770) 2025-12-10 13:42:28 +01:00
91650ec65f fix: deploy frontend to coolify (#779)
* Ignore act secrets

* Deploy frontend container to ECR

* Use published image

* Remove ecr workflows

* Trigger coolify deployment

* Deploy on release please pr merge

* Upgrade nextjs

* Update secrets example
2025-12-10 13:35:53 +01:00
Igor Monadical
61f0e29d4c feat: llm retries (#739)
* llm retries no-mistakes

* self-review (no-mistakes)

* self-review (no-mistakes)

* bigger retry intervals by default

* tests and dry

* restore to main state

* parse retries

* json retries (no-mistakes)

* json retries (no-mistakes)

* json retries (no-mistakes)

* json retries (no-mistakes) self-review

* additional network retry test

* more lindt

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-05 12:08:21 -05:00
Igor Monadical
ec17ed7b58 fix: celery inspect bug sidestep in restart script (#766)
* celery bug sidestep

* Update server/reflector/services/transcript_process.py

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
2025-12-04 09:22:51 -05:00
Igor Monadical
00549f153a feat: dockerhub ci (#772)
* dockerhub ci

* ci test

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-03 13:26:08 -05:00
3ad78be762 fix: hide rooms settings instead of disabling (#763)
* Hide rooms settings instead of disabling

* Reset recording trigger
2025-12-03 16:49:17 +01:00
d3a5cd12d2 fix: return participant emails from transcript endpoint (#769)
* Return participant emails from transcript endpoint

* Fix broken test
2025-12-03 16:47:56 +01:00
af921ce927 chore(main): release 0.22.4 (#765) 2025-12-02 17:11:48 -05:00
Igor Monadical
bd5df1ce2e fix: Multitrack mixdown optimisation 2 (#764)
* Revert "fix: Skip mixdown for multitrack (#760)"

This reverts commit b51b7aa917.

* multitrack mixdown optimisation

* return the "good" ui part of "skip mixdown"

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-02 17:10:06 -05:00
c8024484b3 chore(main): release 0.22.3 (#761) 2025-12-02 09:08:22 +01:00
28f87c09dc fix: align daily room settings (#759)
* Switch platform ui

* Update room settings based on platform

* Add local and none recording options to daily

* Don't create tokens for unauthentikated users

* Enable knocking for private rooms

* Create new meeting on room settings change

* Always use 2-200 option for daily

* Show recording start trigger for daily

* Fix broken test
2025-12-02 09:06:36 +01:00
dabf7251db chore(main): release 0.22.2 (#756) 2025-12-01 23:39:32 -05:00
Igor Monadical
b51b7aa917 fix: Skip mixdown for multitrack (#760)
* multitrack mixdown optimisation

* skip mixdown for multitrack

* skip mixdown for multitrack

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-01 23:35:12 -05:00
Igor Monadical
a8983b4e7e daily auth hotfix (#757)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-28 14:52:59 -05:00
Igor Monadical
fe47c46489 fix: daily auto refresh fix (#755)
* daily auto refresh fix

* Update www/app/lib/AuthProvider.tsx

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

* Update www/app/[roomName]/components/DailyRoom.tsx

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

* fix bot lint

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
2025-11-27 18:31:03 -05:00
a2bb6a27d6 chore(main): release 0.22.1 (#750) 2025-11-27 16:55:08 +01:00
7f0b728991 fix: participants update from daily (#749)
* Fix participants update from daily

* Use track keys from params
2025-11-27 16:53:26 +01:00
692895c859 chore(main): release 0.22.0 (#748) 2025-11-26 16:53:27 -05:00
Igor Monadical
d63040e2fd feat: Multitrack segmentation (#747)
* segmentation multitrack (no-mistakes)

* segmentation multitrack (no-mistakes)

* self review

* self review

* recording poll daily doc

* filter cam_audio tracks to remove screensharing from daily processing

* pr review

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-26 16:21:32 -05:00
8d696aa775 chore(main): release 0.21.0 (#746) 2025-11-26 19:12:02 +01:00
f6ca07505f feat: add transcript format parameter to GET endpoint (#709)
* feat: add transcript format parameter to GET endpoint

Add transcript_format query parameter to /v1/transcripts/{id} endpoint
with support for multiple output formats using discriminated unions.

Formats supported:
- text: Plain speaker dialogue (default)
- text-timestamped: Dialogue with [MM:SS] timestamps
- webvtt-named: WebVTT subtitles with participant names
- json: Structured segments with full metadata

Response models use Pydantic discriminated unions with transcript_format
as discriminator field. POST/PATCH endpoints return GetTranscriptWithParticipants
for minimal responses. GET endpoint returns format-specific models.

* Copy transcript format

* Regenerate types

* Fix transcript formats

* Don't throw inside try

* Remove any type

* Toast share copy errors

* transcript_format exhaustiveness and python idiomatic assert_never

* format_timestamp_mmss clear type definition

* Rename seconds_to_timestamp

* Test transcript format with overlapping speakers

* exact match for vtt multispeaker test

---------

Co-authored-by: Sergey Mankovsky <sergey@monadical.com>
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-26 18:51:14 +01:00
Igor Monadical
3aef926203 room creatio hotfix (#744)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-25 22:42:09 -05:00
Igor Monadical
0b2c82227d is_owner pass for dailyco (#745)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-25 22:41:54 -05:00
84 changed files with 9016 additions and 2801 deletions

View File

@@ -1,90 +0,0 @@
name: Build container/push to container registry
on: [workflow_dispatch]
env:
# 950402358378.dkr.ecr.us-east-1.amazonaws.com/reflector
AWS_REGION: us-east-1
ECR_REPOSITORY: reflector
jobs:
build:
strategy:
matrix:
include:
- platform: linux/amd64
runner: linux-amd64
arch: amd64
- platform: linux/arm64
runner: linux-arm64
arch: arm64
runs-on: ${{ matrix.runner }}
permissions:
contents: read
outputs:
registry: ${{ steps.login-ecr.outputs.registry }}
steps:
- uses: actions/checkout@v4
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build and push ${{ matrix.arch }}
uses: docker/build-push-action@v5
with:
context: server
platforms: ${{ matrix.platform }}
push: true
tags: ${{ steps.login-ecr.outputs.registry }}/${{ env.ECR_REPOSITORY }}:latest-${{ matrix.arch }}
cache-from: type=gha,scope=${{ matrix.arch }}
cache-to: type=gha,mode=max,scope=${{ matrix.arch }}
github-token: ${{ secrets.GHA_CACHE_TOKEN }}
provenance: false
create-manifest:
runs-on: ubuntu-latest
needs: [build]
permissions:
deployments: write
contents: read
steps:
- name: Configure AWS credentials
uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: ${{ env.AWS_REGION }}
- name: Login to Amazon ECR
uses: aws-actions/amazon-ecr-login@v2
- name: Create and push multi-arch manifest
run: |
# Get the registry URL (since we can't easily access job outputs in matrix)
ECR_REGISTRY=$(aws ecr describe-registry --query 'registryId' --output text).dkr.ecr.${{ env.AWS_REGION }}.amazonaws.com
docker manifest create \
$ECR_REGISTRY/${{ env.ECR_REPOSITORY }}:latest \
$ECR_REGISTRY/${{ env.ECR_REPOSITORY }}:latest-amd64 \
$ECR_REGISTRY/${{ env.ECR_REPOSITORY }}:latest-arm64
docker manifest push $ECR_REGISTRY/${{ env.ECR_REPOSITORY }}:latest
echo "✅ Multi-arch manifest pushed: $ECR_REGISTRY/${{ env.ECR_REPOSITORY }}:latest"

View File

@@ -1,35 +1,31 @@
name: Build and Push Frontend Docker Image
name: Build and Push Backend Docker Image (Docker Hub)
on:
push:
branches:
- main
paths:
- 'www/**'
- '.github/workflows/docker-frontend.yml'
tags:
- "v*"
workflow_dispatch:
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}-frontend
REGISTRY: docker.io
IMAGE_NAME: monadicalsas/reflector-backend
jobs:
build-and-push:
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Log in to GitHub Container Registry
- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
username: monadicalsas
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Extract metadata
id: meta
@@ -38,7 +34,7 @@ jobs:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=sha,prefix={{branch}}-
type=ref,event=tag
type=raw,value=latest,enable={{is_default_branch}}
- name: Set up Docker Buildx
@@ -47,8 +43,8 @@ jobs:
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: ./www
file: ./www/Dockerfile
context: ./server
file: ./server/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

View File

@@ -0,0 +1,70 @@
name: Build and Push Frontend Docker Image
on:
push:
tags:
- "v*"
workflow_dispatch:
env:
REGISTRY: docker.io
IMAGE_NAME: monadicalsas/reflector-frontend
jobs:
build-and-push:
runs-on: ubuntu-latest
permissions:
contents: read
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Log in to Docker Hub
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: monadicalsas
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Extract metadata
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=ref,event=tag
type=raw,value=latest,enable={{is_default_branch}}
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: ./www
file: ./www/Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: linux/amd64,linux/arm64
deploy:
needs: build-and-push
runs-on: ubuntu-latest
if: success()
strategy:
matrix:
environment: [reflector-monadical, reflector-media]
environment: ${{ matrix.environment }}
steps:
- name: Trigger Coolify deployment
run: |
curl -X POST "${{ secrets.COOLIFY_WEBHOOK_URL }}" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer ${{ secrets.COOLIFY_WEBHOOK_TOKEN }}" \
-f || (echo "Failed to trigger Coolify deployment for ${{ matrix.environment }}" && exit 1)

1
.gitignore vendored
View File

@@ -18,3 +18,4 @@ CLAUDE.local.md
www/.env.development
www/.env.production
.playwright-mcp
.secrets

24
.secrets.example Normal file
View File

@@ -0,0 +1,24 @@
# Example secrets file for GitHub Actions workflows
# Copy this to .secrets and fill in your values
# These secrets should be configured in GitHub repository settings:
# Settings > Secrets and variables > Actions
# DockerHub Configuration (required for frontend and backend deployment)
# Create a Docker Hub access token at https://hub.docker.com/settings/security
# Username: monadicalsas
DOCKERHUB_TOKEN=your-dockerhub-access-token
# GitHub Token (required for frontend and backend deployment)
# Used by docker/metadata-action for extracting image metadata
# Can use the default GITHUB_TOKEN or create a personal access token
GITHUB_TOKEN=your-github-token-or-use-default-GITHUB_TOKEN
# Coolify Deployment Webhook (required for frontend deployment)
# Used to trigger automatic deployment after image push
# Configure these secrets in GitHub Environments:
# Each environment should have:
# - COOLIFY_WEBHOOK_URL: The webhook URL for that specific deployment
# - COOLIFY_WEBHOOK_TOKEN: The webhook token (can be the same for both if using same token)
# Optional: GitHub Actions Cache Token (for local testing with act)
GHA_CACHE_TOKEN=your-github-token-or-empty

View File

@@ -1,5 +1,93 @@
# Changelog
## [0.24.0](https://github.com/Monadical-SAS/reflector/compare/v0.23.2...v0.24.0) (2025-12-18)
### Features
* identify action items ([#790](https://github.com/Monadical-SAS/reflector/issues/790)) ([964cd78](https://github.com/Monadical-SAS/reflector/commit/964cd78bb699d83d012ae4b8c96565df25b90a5d))
### Bug Fixes
* automatically reprocess daily recordings ([#797](https://github.com/Monadical-SAS/reflector/issues/797)) ([5f458aa](https://github.com/Monadical-SAS/reflector/commit/5f458aa4a7ec3d00ca5ec49d62fcc8ad232b138e))
* daily video optimisation ([#789](https://github.com/Monadical-SAS/reflector/issues/789)) ([16284e1](https://github.com/Monadical-SAS/reflector/commit/16284e1ac3faede2b74f0d91b50c0b5612af2c35))
* main menu login ([#800](https://github.com/Monadical-SAS/reflector/issues/800)) ([0bc971b](https://github.com/Monadical-SAS/reflector/commit/0bc971ba966a52d719c8c240b47dc7b3bdea4391))
* retry on workflow timeout ([#798](https://github.com/Monadical-SAS/reflector/issues/798)) ([5f7dfad](https://github.com/Monadical-SAS/reflector/commit/5f7dfadabd3e8017406ad3720ba495a59963ee34))
## [0.23.2](https://github.com/Monadical-SAS/reflector/compare/v0.23.1...v0.23.2) (2025-12-11)
### Bug Fixes
* build on push tags ([#785](https://github.com/Monadical-SAS/reflector/issues/785)) ([d7f140b](https://github.com/Monadical-SAS/reflector/commit/d7f140b7d1f4660d5da7a0da1357f68869e0b5cd))
## [0.23.1](https://github.com/Monadical-SAS/reflector/compare/v0.23.0...v0.23.1) (2025-12-11)
### Bug Fixes
* populate room_name in transcript GET endpoint ([#783](https://github.com/Monadical-SAS/reflector/issues/783)) ([0eba147](https://github.com/Monadical-SAS/reflector/commit/0eba1470181c7b9e0a79964a1ef28c09bcbdd9d7))
## [0.23.0](https://github.com/Monadical-SAS/reflector/compare/v0.22.4...v0.23.0) (2025-12-10)
### Features
* dockerhub ci ([#772](https://github.com/Monadical-SAS/reflector/issues/772)) ([00549f1](https://github.com/Monadical-SAS/reflector/commit/00549f153ade922cf4cb6c5358a7d11a39c426d2))
* llm retries ([#739](https://github.com/Monadical-SAS/reflector/issues/739)) ([61f0e29](https://github.com/Monadical-SAS/reflector/commit/61f0e29d4c51eab54ee67af92141fbb171e8ccaa))
### Bug Fixes
* celery inspect bug sidestep in restart script ([#766](https://github.com/Monadical-SAS/reflector/issues/766)) ([ec17ed7](https://github.com/Monadical-SAS/reflector/commit/ec17ed7b587cf6ee143646baaee67a7c017044d4))
* deploy frontend to coolify ([#779](https://github.com/Monadical-SAS/reflector/issues/779)) ([91650ec](https://github.com/Monadical-SAS/reflector/commit/91650ec65f65713faa7ee0dcfb75af427b7c4ba0))
* hide rooms settings instead of disabling ([#763](https://github.com/Monadical-SAS/reflector/issues/763)) ([3ad78be](https://github.com/Monadical-SAS/reflector/commit/3ad78be7628c0d029296b301a0e87236c76b7598))
* return participant emails from transcript endpoint ([#769](https://github.com/Monadical-SAS/reflector/issues/769)) ([d3a5cd1](https://github.com/Monadical-SAS/reflector/commit/d3a5cd12d2d0d9c32af2d5bd9322e030ef69b85d))
## [0.22.4](https://github.com/Monadical-SAS/reflector/compare/v0.22.3...v0.22.4) (2025-12-02)
### Bug Fixes
* Multitrack mixdown optimisation 2 ([#764](https://github.com/Monadical-SAS/reflector/issues/764)) ([bd5df1c](https://github.com/Monadical-SAS/reflector/commit/bd5df1ce2ebf35d7f3413b295e56937a9a28ef7b))
## [0.22.3](https://github.com/Monadical-SAS/reflector/compare/v0.22.2...v0.22.3) (2025-12-02)
### Bug Fixes
* align daily room settings ([#759](https://github.com/Monadical-SAS/reflector/issues/759)) ([28f87c0](https://github.com/Monadical-SAS/reflector/commit/28f87c09dc459846873d0dde65b03e3d7b2b9399))
## [0.22.2](https://github.com/Monadical-SAS/reflector/compare/v0.22.1...v0.22.2) (2025-12-02)
### Bug Fixes
* daily auto refresh fix ([#755](https://github.com/Monadical-SAS/reflector/issues/755)) ([fe47c46](https://github.com/Monadical-SAS/reflector/commit/fe47c46489c5aa0cc538109f7559cc9accb35c01))
* Skip mixdown for multitrack ([#760](https://github.com/Monadical-SAS/reflector/issues/760)) ([b51b7aa](https://github.com/Monadical-SAS/reflector/commit/b51b7aa9176c1a53ba57ad99f5e976c804a1e80c))
## [0.22.1](https://github.com/Monadical-SAS/reflector/compare/v0.22.0...v0.22.1) (2025-11-27)
### Bug Fixes
* participants update from daily ([#749](https://github.com/Monadical-SAS/reflector/issues/749)) ([7f0b728](https://github.com/Monadical-SAS/reflector/commit/7f0b728991c1b9f9aae702c96297eae63b561ef5))
## [0.22.0](https://github.com/Monadical-SAS/reflector/compare/v0.21.0...v0.22.0) (2025-11-26)
### Features
* Multitrack segmentation ([#747](https://github.com/Monadical-SAS/reflector/issues/747)) ([d63040e](https://github.com/Monadical-SAS/reflector/commit/d63040e2fdc07e7b272e85a39eb2411cd6a14798))
## [0.21.0](https://github.com/Monadical-SAS/reflector/compare/v0.20.0...v0.21.0) (2025-11-26)
### Features
* add transcript format parameter to GET endpoint ([#709](https://github.com/Monadical-SAS/reflector/issues/709)) ([f6ca075](https://github.com/Monadical-SAS/reflector/commit/f6ca07505f34483b02270a2ef3bd809e9d2e1045))
## [0.20.0](https://github.com/Monadical-SAS/reflector/compare/v0.19.0...v0.20.0) (2025-11-25)

View File

@@ -3,10 +3,8 @@
services:
web:
build:
context: ./www
dockerfile: Dockerfile
image: reflector-frontend:latest
image: monadicalsas/reflector-frontend:latest
pull_policy: always
environment:
- KV_URL=${KV_URL:-redis://redis:6379}
- SITE_URL=${SITE_URL}

View File

@@ -34,6 +34,20 @@ services:
environment:
ENTRYPOINT: beat
hatchet-worker:
build:
context: server
volumes:
- ./server/:/app/
- /app/.venv
env_file:
- ./server/.env
environment:
ENTRYPOINT: hatchet-worker
depends_on:
hatchet:
condition: service_healthy
redis:
image: redis:7.2
ports:
@@ -55,6 +69,7 @@ services:
postgres:
image: postgres:17
command: postgres -c 'max_connections=200'
ports:
- 5432:5432
environment:
@@ -63,6 +78,42 @@ services:
POSTGRES_DB: reflector
volumes:
- ./data/postgres:/var/lib/postgresql/data
- ./server/docker/init-hatchet-db.sql:/docker-entrypoint-initdb.d/init-hatchet-db.sql:ro
healthcheck:
test: ["CMD-SHELL", "pg_isready -d reflector -U reflector"]
interval: 10s
timeout: 10s
retries: 5
start_period: 10s
hatchet:
image: ghcr.io/hatchet-dev/hatchet/hatchet-lite:latest
ports:
- "8889:8888"
- "7078:7077"
depends_on:
postgres:
condition: service_healthy
environment:
DATABASE_URL: "postgresql://reflector:reflector@postgres:5432/hatchet?sslmode=disable"
SERVER_AUTH_COOKIE_DOMAIN: localhost
SERVER_AUTH_COOKIE_INSECURE: "t"
SERVER_GRPC_BIND_ADDRESS: "0.0.0.0"
SERVER_GRPC_INSECURE: "t"
SERVER_GRPC_BROADCAST_ADDRESS: hatchet:7077
SERVER_GRPC_PORT: "7077"
SERVER_URL: http://localhost:8889
SERVER_AUTH_SET_EMAIL_VERIFIED: "t"
# SERVER_DEFAULT_ENGINE_VERSION: "V1" # default
SERVER_INTERNAL_CLIENT_INTERNAL_GRPC_BROADCAST_ADDRESS: hatchet:7077
volumes:
- ./data/hatchet-config:/config
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8888/api/live"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
networks:
default:

241
docs/transcript.md Normal file
View File

@@ -0,0 +1,241 @@
# Transcript Formats
The Reflector API provides multiple output formats for transcript data through the `transcript_format` query parameter on the GET `/v1/transcripts/{id}` endpoint.
## Overview
When retrieving a transcript, you can specify the desired format using the `transcript_format` query parameter. The API supports four formats optimized for different use cases:
- **text** - Plain text with speaker names (default)
- **text-timestamped** - Timestamped text with speaker names
- **webvtt-named** - WebVTT subtitle format with participant names
- **json** - Structured JSON segments with full metadata
All formats include participant information when available, resolving speaker IDs to actual names.
## Query Parameter Usage
```
GET /v1/transcripts/{id}?transcript_format={format}
```
### Parameters
- `transcript_format` (optional): The desired output format
- Type: `"text" | "text-timestamped" | "webvtt-named" | "json"`
- Default: `"text"`
## Format Descriptions
### Text Format (`text`)
**Use case:** Simple, human-readable transcript for display or export.
**Format:** Speaker names followed by their dialogue, one line per segment.
**Example:**
```
John Smith: Hello everyone
Jane Doe: Hi there
John Smith: How are you today?
```
**Request:**
```bash
GET /v1/transcripts/{id}?transcript_format=text
```
**Response:**
```json
{
"id": "transcript_123",
"name": "Meeting Recording",
"transcript_format": "text",
"transcript": "John Smith: Hello everyone\nJane Doe: Hi there\nJohn Smith: How are you today?",
"participants": [
{"id": "p1", "speaker": 0, "name": "John Smith"},
{"id": "p2", "speaker": 1, "name": "Jane Doe"}
],
...
}
```
### Text Timestamped Format (`text-timestamped`)
**Use case:** Transcript with timing information for navigation or reference.
**Format:** `[MM:SS]` timestamp prefix before each speaker and dialogue.
**Example:**
```
[00:00] John Smith: Hello everyone
[00:05] Jane Doe: Hi there
[00:12] John Smith: How are you today?
```
**Request:**
```bash
GET /v1/transcripts/{id}?transcript_format=text-timestamped
```
**Response:**
```json
{
"id": "transcript_123",
"name": "Meeting Recording",
"transcript_format": "text-timestamped",
"transcript": "[00:00] John Smith: Hello everyone\n[00:05] Jane Doe: Hi there\n[00:12] John Smith: How are you today?",
"participants": [
{"id": "p1", "speaker": 0, "name": "John Smith"},
{"id": "p2", "speaker": 1, "name": "Jane Doe"}
],
...
}
```
### WebVTT Named Format (`webvtt-named`)
**Use case:** Subtitle files for video players, accessibility tools, or video editing.
**Format:** Standard WebVTT subtitle format with voice tags using participant names.
**Example:**
```
WEBVTT
00:00:00.000 --> 00:00:05.000
<v John Smith>Hello everyone
00:00:05.000 --> 00:00:12.000
<v Jane Doe>Hi there
00:00:12.000 --> 00:00:18.000
<v John Smith>How are you today?
```
**Request:**
```bash
GET /v1/transcripts/{id}?transcript_format=webvtt-named
```
**Response:**
```json
{
"id": "transcript_123",
"name": "Meeting Recording",
"transcript_format": "webvtt-named",
"transcript": "WEBVTT\n\n00:00:00.000 --> 00:00:05.000\n<v John Smith>Hello everyone\n\n...",
"participants": [
{"id": "p1", "speaker": 0, "name": "John Smith"},
{"id": "p2", "speaker": 1, "name": "Jane Doe"}
],
...
}
```
### JSON Format (`json`)
**Use case:** Programmatic access with full timing and speaker metadata.
**Format:** Array of segment objects with speaker information, text content, and precise timing.
**Example:**
```json
[
{
"speaker": 0,
"speaker_name": "John Smith",
"text": "Hello everyone",
"start": 0.0,
"end": 5.0
},
{
"speaker": 1,
"speaker_name": "Jane Doe",
"text": "Hi there",
"start": 5.0,
"end": 12.0
},
{
"speaker": 0,
"speaker_name": "John Smith",
"text": "How are you today?",
"start": 12.0,
"end": 18.0
}
]
```
**Request:**
```bash
GET /v1/transcripts/{id}?transcript_format=json
```
**Response:**
```json
{
"id": "transcript_123",
"name": "Meeting Recording",
"transcript_format": "json",
"transcript": [
{
"speaker": 0,
"speaker_name": "John Smith",
"text": "Hello everyone",
"start": 0.0,
"end": 5.0
},
{
"speaker": 1,
"speaker_name": "Jane Doe",
"text": "Hi there",
"start": 5.0,
"end": 12.0
}
],
"participants": [
{"id": "p1", "speaker": 0, "name": "John Smith"},
{"id": "p2", "speaker": 1, "name": "Jane Doe"}
],
...
}
```
## Response Structure
All formats return the same base transcript metadata with an additional `transcript_format` field and format-specific `transcript` field:
### Common Fields
- `id`: Transcript identifier
- `user_id`: Owner user ID (if authenticated)
- `name`: Transcript name
- `status`: Processing status
- `locked`: Whether transcript is locked for editing
- `duration`: Total duration in seconds
- `title`: Auto-generated or custom title
- `short_summary`: Brief summary
- `long_summary`: Detailed summary
- `created_at`: Creation timestamp
- `share_mode`: Access control setting
- `source_language`: Original audio language
- `target_language`: Translation target language
- `reviewed`: Whether transcript has been reviewed
- `meeting_id`: Associated meeting ID (if applicable)
- `source_kind`: Source type (live, file, room)
- `room_id`: Associated room ID (if applicable)
- `audio_deleted`: Whether audio has been deleted
- `participants`: Array of participant objects with speaker mappings
### Format-Specific Fields
- `transcript_format`: The format identifier (discriminator field)
- `transcript`: The formatted transcript content (string for text/webvtt formats, array for json format)
## Speaker Name Resolution
All formats resolve speaker IDs to participant names when available:
- If a participant exists for the speaker ID, their name is used
- If no participant exists, a default name like "Speaker 0" is generated
- Speaker IDs are integers (0, 1, 2, etc.) assigned during diarization

View File

@@ -53,6 +53,36 @@ response = sqs.receive_message(QueueUrl=queue_url, ...)
uv run /app/requeue_uploaded_file.py TRANSCRIPT_ID
```
## Hatchet Setup (Fresh DB)
After resetting the Hatchet database:
### Option A: Automatic (CLI)
```bash
# Get default tenant ID and create token in one command
TENANT_ID=$(docker compose exec -T postgres psql -U reflector -d hatchet -t -c \
"SELECT id FROM \"Tenant\" WHERE slug = 'default';" | tr -d ' \n') && \
TOKEN=$(docker compose exec -T hatchet /hatchet-admin token create \
--config /config --tenant-id "$TENANT_ID" 2>/dev/null | tr -d '\n') && \
echo "HATCHET_CLIENT_TOKEN=$TOKEN"
```
Copy the output to `server/.env`.
### Option B: Manual (UI)
1. Create API token at http://localhost:8889 → Settings → API Tokens
2. Update `server/.env`: `HATCHET_CLIENT_TOKEN=<new-token>`
### Then restart workers
```bash
docker compose restart server hatchet-worker
```
Workflows register automatically when hatchet-worker starts.
## Pipeline Management
### Continue stuck pipeline from final summaries (identify_participants) step:

View File

@@ -0,0 +1,2 @@
-- Create hatchet database for Hatchet workflow engine
CREATE DATABASE hatchet;

View File

@@ -0,0 +1,26 @@
"""add_action_items
Revision ID: 05f8688d6895
Revises: bbafedfa510c
Create Date: 2025-12-12 11:57:50.209658
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "05f8688d6895"
down_revision: Union[str, None] = "bbafedfa510c"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.add_column("transcript", sa.Column("action_items", sa.JSON(), nullable=True))
def downgrade() -> None:
op.drop_column("transcript", "action_items")

View File

@@ -0,0 +1,28 @@
"""add workflow_run_id to transcript
Revision ID: 0f943fede0e0
Revises: 05f8688d6895
Create Date: 2025-12-16 01:54:13.855106
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "0f943fede0e0"
down_revision: Union[str, None] = "05f8688d6895"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("transcript", schema=None) as batch_op:
batch_op.add_column(sa.Column("workflow_run_id", sa.String(), nullable=True))
def downgrade() -> None:
with op.batch_alter_table("transcript", schema=None) as batch_op:
batch_op.drop_column("workflow_run_id")

View File

@@ -0,0 +1,35 @@
"""add use_hatchet to room
Revision ID: bd3a729bb379
Revises: 0f943fede0e0
Create Date: 2025-12-16 16:34:03.594231
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "bd3a729bb379"
down_revision: Union[str, None] = "0f943fede0e0"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"use_hatchet",
sa.Boolean(),
server_default=sa.text("false"),
nullable=False,
)
)
def downgrade() -> None:
with op.batch_alter_table("room", schema=None) as batch_op:
batch_op.drop_column("use_hatchet")

View File

@@ -39,6 +39,7 @@ dependencies = [
"pytest-env>=1.1.5",
"webvtt-py>=0.5.0",
"icalendar>=6.0.0",
"hatchet-sdk>=0.47.0",
]
[dependency-groups]
@@ -126,6 +127,7 @@ markers = [
select = [
"I", # isort - import sorting
"F401", # unused imports
"E402", # module level import not at top of file
"PLC0415", # import-outside-top-level - detect inline imports
]

View File

@@ -1,13 +1,19 @@
import asyncio
import functools
from uuid import uuid4
from celery import current_task
from reflector.db import get_database
from reflector.llm import llm_session_id
def asynctask(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
async def run_with_db():
task_id = current_task.request.id if current_task else None
llm_session_id.set(task_id or f"random-{uuid4().hex}")
database = get_database()
await database.connect()
try:

View File

@@ -18,6 +18,7 @@ from .requests import (
# Response models
from .responses import (
FinishedRecordingResponse,
MeetingParticipant,
MeetingParticipantsResponse,
MeetingResponse,
@@ -79,6 +80,7 @@ __all__ = [
"MeetingParticipant",
"MeetingResponse",
"RecordingResponse",
"FinishedRecordingResponse",
"RecordingS3Info",
"MeetingTokenResponse",
"WebhookResponse",

View File

@@ -47,7 +47,7 @@ class DailyApiError(Exception):
)
super().__init__(
f"Daily.co API error: {operation} failed with status {self.status_code}"
f"Daily.co API error: {operation} failed with status {self.status_code}: {response.text}"
)

View File

@@ -40,6 +40,10 @@ class RoomProperties(BaseModel):
)
enable_chat: bool = Field(default=True, description="Enable in-meeting chat")
enable_screenshare: bool = Field(default=True, description="Enable screen sharing")
enable_knocking: bool = Field(
default=False,
description="Enable knocking for private rooms (allows participants to request access)",
)
start_video_off: bool = Field(
default=False, description="Start with video off for all participants"
)

View File

@@ -68,7 +68,7 @@ class MeetingParticipant(BaseModel):
Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-participants
"""
user_id: NonEmptyString = Field(description="User identifier")
user_id: NonEmptyString | None = Field(None, description="User identifier")
participant_id: NonEmptyString = Field(description="Participant session identifier")
user_name: NonEmptyString | None = Field(None, description="User display name")
join_time: int = Field(description="Join timestamp (Unix epoch seconds)")
@@ -121,7 +121,10 @@ class RecordingS3Info(BaseModel):
class RecordingResponse(BaseModel):
"""
Response from recording retrieval endpoint.
Response from recording retrieval endpoint (network layer).
Duration may be None for recordings still being processed by Daily.
Use FinishedRecordingResponse for recordings ready for processing.
Reference: https://docs.daily.co/reference/rest-api/recordings
"""
@@ -135,7 +138,9 @@ class RecordingResponse(BaseModel):
max_participants: int | None = Field(
None, description="Maximum participants during recording (may be missing)"
)
duration: int = Field(description="Recording duration in seconds")
duration: int | None = Field(
None, description="Recording duration in seconds (None if still processing)"
)
share_token: NonEmptyString | None = Field(
None, description="Token for sharing recording"
)
@@ -149,6 +154,25 @@ class RecordingResponse(BaseModel):
None, description="Meeting session identifier (may be missing)"
)
def to_finished(self) -> "FinishedRecordingResponse | None":
"""Convert to FinishedRecordingResponse if duration is available and status is finished."""
if self.duration is None or self.status != "finished":
return None
return FinishedRecordingResponse(**self.model_dump())
class FinishedRecordingResponse(RecordingResponse):
"""
Recording with confirmed duration - ready for processing.
This model guarantees duration is present and status is finished.
"""
status: Literal["finished"] = Field(
description="Recording status (always 'finished')"
)
duration: int = Field(description="Recording duration in seconds")
class MeetingTokenResponse(BaseModel):
"""

View File

@@ -3,6 +3,7 @@ from typing import Literal
import sqlalchemy as sa
from pydantic import BaseModel, Field
from sqlalchemy import or_
from reflector.db import get_database, metadata
from reflector.utils import generate_uuid4
@@ -35,8 +36,15 @@ class Recording(BaseModel):
status: Literal["pending", "processing", "completed", "failed"] = "pending"
meeting_id: str | None = None
# for multitrack reprocessing
# track_keys can be empty list [] if recording finished but no audio was captured (silence/muted)
# None means not a multitrack recording, [] means multitrack with no tracks
track_keys: list[str] | None = None
@property
def is_multitrack(self) -> bool:
"""True if recording has separate audio tracks (1+ tracks counts as multitrack)."""
return self.track_keys is not None and len(self.track_keys) > 0
class RecordingController:
async def create(self, recording: Recording):
@@ -72,5 +80,35 @@ class RecordingController:
results = await get_database().fetch_all(query)
return [Recording(**row) for row in results]
async def get_multitrack_needing_reprocessing(
self, bucket_name: str
) -> list[Recording]:
"""
Get multitrack recordings that need reprocessing:
- Have track_keys (multitrack)
- Either have no transcript OR transcript has error status
This is more efficient than fetching all recordings and filtering in Python.
"""
from reflector.db.transcripts import (
transcripts, # noqa: PLC0415 cyclic import
)
query = (
recordings.select()
.outerjoin(transcripts, recordings.c.id == transcripts.c.recording_id)
.where(
recordings.c.bucket_name == bucket_name,
recordings.c.track_keys.isnot(None),
or_(
transcripts.c.id.is_(None),
transcripts.c.status == "error",
),
)
)
results = await get_database().fetch_all(query)
recordings_list = [Recording(**row) for row in results]
return [r for r in recordings_list if r.is_multitrack]
recordings_controller = RecordingController()

View File

@@ -57,6 +57,12 @@ rooms = sqlalchemy.Table(
sqlalchemy.String,
nullable=False,
),
sqlalchemy.Column(
"use_hatchet",
sqlalchemy.Boolean,
nullable=False,
server_default=false(),
),
sqlalchemy.Index("idx_room_is_shared", "is_shared"),
sqlalchemy.Index("idx_room_ics_enabled", "ics_enabled"),
)
@@ -85,6 +91,7 @@ class Room(BaseModel):
ics_last_sync: datetime | None = None
ics_last_etag: str | None = None
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
use_hatchet: bool = False
class RoomController:

View File

@@ -44,6 +44,7 @@ transcripts = sqlalchemy.Table(
sqlalchemy.Column("title", sqlalchemy.String),
sqlalchemy.Column("short_summary", sqlalchemy.String),
sqlalchemy.Column("long_summary", sqlalchemy.String),
sqlalchemy.Column("action_items", sqlalchemy.JSON),
sqlalchemy.Column("topics", sqlalchemy.JSON),
sqlalchemy.Column("events", sqlalchemy.JSON),
sqlalchemy.Column("participants", sqlalchemy.JSON),
@@ -83,6 +84,8 @@ transcripts = sqlalchemy.Table(
sqlalchemy.Column("audio_deleted", sqlalchemy.Boolean),
sqlalchemy.Column("room_id", sqlalchemy.String),
sqlalchemy.Column("webvtt", sqlalchemy.Text),
# Hatchet workflow run ID for resumption of failed workflows
sqlalchemy.Column("workflow_run_id", sqlalchemy.String),
sqlalchemy.Index("idx_transcript_recording_id", "recording_id"),
sqlalchemy.Index("idx_transcript_user_id", "user_id"),
sqlalchemy.Index("idx_transcript_created_at", "created_at"),
@@ -164,6 +167,10 @@ class TranscriptFinalLongSummary(BaseModel):
long_summary: str
class TranscriptActionItems(BaseModel):
action_items: dict
class TranscriptFinalTitle(BaseModel):
title: str
@@ -204,6 +211,7 @@ class Transcript(BaseModel):
locked: bool = False
short_summary: str | None = None
long_summary: str | None = None
action_items: dict | None = None
topics: list[TranscriptTopic] = []
events: list[TranscriptEvent] = []
participants: list[TranscriptParticipant] | None = []
@@ -217,6 +225,7 @@ class Transcript(BaseModel):
zulip_message_id: int | None = None
audio_deleted: bool | None = None
webvtt: str | None = None
workflow_run_id: str | None = None # Hatchet workflow run ID for resumption
@field_serializer("created_at", when_used="json")
def serialize_datetime(self, dt: datetime) -> str:
@@ -368,7 +377,12 @@ class TranscriptController:
room_id: str | None = None,
search_term: str | None = None,
return_query: bool = False,
exclude_columns: list[str] = ["topics", "events", "participants"],
exclude_columns: list[str] = [
"topics",
"events",
"participants",
"action_items",
],
) -> list[Transcript]:
"""
Get all transcripts

View File

@@ -88,5 +88,11 @@ class UserController:
results = await get_database().fetch_all(query)
return [User(**r) for r in results]
@staticmethod
async def get_by_ids(user_ids: list[NonEmptyString]) -> dict[str, User]:
query = users.select().where(users.c.id.in_(user_ids))
results = await get_database().fetch_all(query)
return {user.id: User(**user) for user in results}
user_controller = UserController()

View File

@@ -0,0 +1,5 @@
"""Hatchet workflow orchestration for Reflector."""
from reflector.hatchet.client import HatchetClientManager
__all__ = ["HatchetClientManager"]

View File

@@ -0,0 +1,98 @@
"""WebSocket broadcasting helpers for Hatchet workflows.
DUPLICATION NOTE: To be kept when Celery is deprecated. Currently dupes Celery logic.
Provides WebSocket broadcasting for Hatchet that matches Celery's @broadcast_to_sockets
decorator behavior. Events are broadcast to transcript rooms and user rooms.
"""
from typing import Any
import structlog
from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller
from reflector.utils.string import NonEmptyString
from reflector.ws_manager import get_ws_manager
# Events that should also be sent to user room (matches Celery behavior)
USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"}
async def broadcast_event(
transcript_id: NonEmptyString,
event: TranscriptEvent,
logger: structlog.BoundLogger,
) -> None:
"""Broadcast a TranscriptEvent to WebSocket subscribers.
Fire-and-forget: errors are logged but don't interrupt workflow execution.
"""
logger.info(
"Broadcasting event",
transcript_id=transcript_id,
event_type=event.event,
)
try:
ws_manager = get_ws_manager()
await ws_manager.send_json(
room_id=f"ts:{transcript_id}",
message=event.model_dump(mode="json"),
)
logger.info(
"Event sent to transcript room",
transcript_id=transcript_id,
event_type=event.event,
)
if event.event in USER_ROOM_EVENTS:
transcript = await transcripts_controller.get_by_id(transcript_id)
if transcript and transcript.user_id:
await ws_manager.send_json(
room_id=f"user:{transcript.user_id}",
message={
"event": f"TRANSCRIPT_{event.event}",
"data": {"id": transcript_id, **event.data},
},
)
except Exception as e:
logger.warning(
"Failed to broadcast event",
error=str(e),
transcript_id=transcript_id,
event_type=event.event,
)
async def set_status_and_broadcast(
transcript_id: NonEmptyString,
status: str,
logger: structlog.BoundLogger,
) -> None:
"""Set transcript status and broadcast to WebSocket.
Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting.
"""
event = await transcripts_controller.set_status(transcript_id, status)
if event:
await broadcast_event(transcript_id, event, logger=logger)
async def append_event_and_broadcast(
transcript_id: NonEmptyString,
transcript: Transcript,
event_name: str,
data: Any,
logger: structlog.BoundLogger,
) -> TranscriptEvent:
"""Append event to transcript and broadcast to WebSocket.
Wrapper around transcripts_controller.append_event that adds WebSocket broadcasting.
"""
event = await transcripts_controller.append_event(
transcript=transcript,
event=event_name,
data=data,
)
await broadcast_event(transcript_id, event, logger=logger)
return event

View File

@@ -0,0 +1,111 @@
"""Hatchet Python client wrapper.
Uses singleton pattern because:
1. Hatchet client maintains persistent gRPC connections for workflow registration
2. Creating multiple clients would cause registration conflicts and resource leaks
3. The SDK is designed for a single client instance per process
4. Tests use `HatchetClientManager.reset()` to isolate state between tests
"""
import logging
import threading
from hatchet_sdk import ClientConfig, Hatchet
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.logger import logger
from reflector.settings import settings
class HatchetClientManager:
"""Singleton manager for Hatchet client connections.
See module docstring for rationale. For test isolation, use `reset()`.
"""
_instance: Hatchet | None = None
_lock = threading.Lock()
@classmethod
def get_client(cls) -> Hatchet:
"""Get or create the Hatchet client (thread-safe singleton)."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
if not settings.HATCHET_CLIENT_TOKEN:
raise ValueError("HATCHET_CLIENT_TOKEN must be set")
# Pass root logger to Hatchet so workflow logs appear in dashboard
root_logger = logging.getLogger()
cls._instance = Hatchet(
debug=settings.HATCHET_DEBUG,
config=ClientConfig(logger=root_logger),
)
return cls._instance
@classmethod
async def start_workflow(
cls,
workflow_name: str,
input_data: dict,
additional_metadata: dict | None = None,
) -> str:
"""Start a workflow and return the workflow run ID.
Args:
workflow_name: Name of the workflow to trigger.
input_data: Input data for the workflow run.
additional_metadata: Optional metadata for filtering in dashboard
(e.g., transcript_id, recording_id).
"""
client = cls.get_client()
result = await client.runs.aio_create(
workflow_name,
input_data,
additional_metadata=additional_metadata,
)
return result.run.metadata.id
@classmethod
async def get_workflow_run_status(cls, workflow_run_id: str) -> V1TaskStatus:
client = cls.get_client()
return await client.runs.aio_get_status(workflow_run_id)
@classmethod
async def cancel_workflow(cls, workflow_run_id: str) -> None:
client = cls.get_client()
await client.runs.aio_cancel(workflow_run_id)
logger.info("[Hatchet] Cancelled workflow", workflow_run_id=workflow_run_id)
@classmethod
async def replay_workflow(cls, workflow_run_id: str) -> None:
client = cls.get_client()
await client.runs.aio_replay(workflow_run_id)
logger.info("[Hatchet] Replaying workflow", workflow_run_id=workflow_run_id)
@classmethod
async def can_replay(cls, workflow_run_id: str) -> bool:
"""Check if workflow can be replayed (is FAILED)."""
try:
status = await cls.get_workflow_run_status(workflow_run_id)
return status == V1TaskStatus.FAILED or status == V1TaskStatus.CANCELLED
except Exception as e:
logger.warning(
"[Hatchet] Failed to check replay status",
workflow_run_id=workflow_run_id,
error=str(e),
)
return False
@classmethod
async def get_workflow_status(cls, workflow_run_id: str) -> dict:
"""Get the full workflow run details as dict."""
client = cls.get_client()
run = await client.runs.aio_get(workflow_run_id)
return run.to_dict()
@classmethod
def reset(cls) -> None:
"""Reset the client instance (for testing)."""
with cls._lock:
cls._instance = None

View File

@@ -0,0 +1,63 @@
"""
Run Hatchet workers for the diarization pipeline.
Runs as a separate process, just like Celery workers.
Usage:
uv run -m reflector.hatchet.run_workers
# Or via docker:
docker compose exec server uv run -m reflector.hatchet.run_workers
"""
import signal
import sys
from reflector.logger import logger
from reflector.settings import settings
def main() -> None:
"""Start Hatchet worker polling."""
if not settings.HATCHET_ENABLED:
logger.error("HATCHET_ENABLED is False, not starting workers")
sys.exit(1)
if not settings.HATCHET_CLIENT_TOKEN:
logger.error("HATCHET_CLIENT_TOKEN is not set")
sys.exit(1)
logger.info(
"Starting Hatchet workers",
debug=settings.HATCHET_DEBUG,
)
# Import here (not top-level) - workflow modules call HatchetClientManager.get_client()
# at module level because Hatchet SDK decorators (@workflow.task) bind at import time.
# Can't use lazy init: decorators need the client object when function is defined.
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415
diarization_pipeline,
track_workflow,
)
hatchet = HatchetClientManager.get_client()
worker = hatchet.worker(
"reflector-diarization-worker",
workflows=[diarization_pipeline, track_workflow],
)
def shutdown_handler(signum: int, frame) -> None:
logger.info("Received shutdown signal, stopping workers...")
# Worker cleanup happens automatically on exit
sys.exit(0)
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
logger.info("Starting Hatchet worker polling...")
worker.start()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,14 @@
"""Hatchet workflow definitions."""
from reflector.hatchet.workflows.diarization_pipeline import (
PipelineInput,
diarization_pipeline,
)
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
__all__ = [
"diarization_pipeline",
"track_workflow",
"PipelineInput",
"TrackInput",
]

View File

@@ -0,0 +1,938 @@
"""
Hatchet main workflow: DiarizationPipeline
Multitrack diarization pipeline for Daily.co recordings.
Orchestrates the full processing flow from recording metadata to final transcript.
Note: This file uses deferred imports (inside functions/tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure DB connections
are not shared across forks, avoiding connection pooling issues.
"""
import asyncio
import functools
import tempfile
from contextlib import asynccontextmanager
from datetime import timedelta
from pathlib import Path
from typing import Callable
import httpx
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.dailyco_api.client import DailyApiClient
from reflector.hatchet.broadcast import (
append_event_and_broadcast,
set_status_and_broadcast,
)
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.models import (
ConsentResult,
FinalizeResult,
MixdownResult,
PaddedTrackInfo,
ParticipantsResult,
ProcessTracksResult,
RecordingResult,
SummaryResult,
TitleResult,
TopicsResult,
WaveformResult,
WebhookResult,
ZulipResult,
)
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
from reflector.logger import logger
from reflector.pipelines import topic_processing
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.types import (
TitleSummary,
TitleSummaryWithId,
Word,
)
from reflector.processors.types import (
Transcript as TranscriptType,
)
from reflector.settings import settings
from reflector.storage.storage_aws import AwsStorage
from reflector.utils.audio_constants import (
PRESIGNED_URL_EXPIRATION_SECONDS,
WAVEFORM_SEGMENTS,
)
from reflector.utils.audio_mixdown import (
detect_sample_rate_from_tracks,
mixdown_tracks_pyav,
)
from reflector.utils.audio_waveform import get_audio_waveform
from reflector.utils.daily import (
filter_cam_audio_tracks,
parse_daily_recording_filename,
)
from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty
from reflector.zulip import post_transcript_notification
class PipelineInput(BaseModel):
"""Input to trigger the diarization pipeline."""
recording_id: NonEmptyString
tracks: list[dict] # List of {"s3_key": str}
bucket_name: NonEmptyString
transcript_id: NonEmptyString
room_id: NonEmptyString | None = None
hatchet = HatchetClientManager.get_client()
diarization_pipeline = hatchet.workflow(
name="DiarizationPipeline", input_validator=PipelineInput
)
@asynccontextmanager
async def fresh_db_connection():
"""Context manager for database connections in Hatchet workers.
TECH DEBT: Made to make connection fork-aware without changing db code too much.
The real fix would be making the db module fork-aware instead of bypassing it.
Current pattern is acceptable given Hatchet's process model.
"""
import databases # noqa: PLC0415
from reflector.db import _database_context # noqa: PLC0415
_database_context.set(None)
db = databases.Database(settings.DATABASE_URL)
_database_context.set(db)
await db.connect()
try:
yield db
finally:
await db.disconnect()
_database_context.set(None)
async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool:
"""Set transcript status to 'error' on workflow failure.
Returns:
True if status was set successfully, False if failed.
Failure is logged as CRITICAL since it means transcript may be stuck.
"""
try:
async with fresh_db_connection():
await set_status_and_broadcast(transcript_id, "error", logger=logger)
return True
except Exception as e:
logger.critical(
"[Hatchet] CRITICAL: Failed to set error status - transcript may be stuck in 'processing'",
transcript_id=transcript_id,
error=str(e),
exc_info=True,
)
return False
def _spawn_storage():
"""Create fresh storage instance."""
return AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
def with_error_handling(step_name: str, set_error_status: bool = True) -> Callable:
"""Decorator that handles task failures uniformly.
Args:
step_name: Name of the step for logging and progress tracking.
set_error_status: Whether to set transcript status to 'error' on failure.
"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(input: PipelineInput, ctx: Context):
try:
return await func(input, ctx)
except Exception as e:
logger.error(
f"[Hatchet] {step_name} failed",
transcript_id=input.transcript_id,
error=str(e),
exc_info=True,
)
if set_error_status:
await set_workflow_error_status(input.transcript_id)
raise
return wrapper
return decorator
@diarization_pipeline.task(execution_timeout=timedelta(seconds=60), retries=3)
@with_error_handling("get_recording")
async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
"""Fetch recording metadata from Daily.co API."""
ctx.log(f"get_recording: recording_id={input.recording_id}")
# Set transcript status to "processing" at workflow start (broadcasts to WebSocket)
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
await set_status_and_broadcast(
input.transcript_id, "processing", logger=logger
)
ctx.log(f"Set transcript status to processing: {input.transcript_id}")
if not settings.DAILY_API_KEY:
raise ValueError("DAILY_API_KEY not configured")
async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client:
recording = await client.get_recording(input.recording_id)
ctx.log(
f"get_recording complete: room={recording.room_name}, duration={recording.duration}s"
)
return RecordingResult(
id=recording.id,
mtg_session_id=recording.mtgSessionId,
duration=recording.duration,
)
@diarization_pipeline.task(
parents=[get_recording], execution_timeout=timedelta(seconds=60), retries=3
)
@with_error_handling("get_participants")
async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult:
"""Fetch participant list from Daily.co API and update transcript in database."""
ctx.log(f"get_participants: transcript_id={input.transcript_id}")
recording = ctx.task_output(get_recording)
mtg_session_id = recording.mtg_session_id
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptParticipant,
transcripts_controller,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
# Note: title NOT cleared - preserves existing titles
await transcripts_controller.update(
transcript,
{
"events": [],
"topics": [],
"participants": [],
},
)
mtg_session_id = assert_non_none_and_non_empty(
mtg_session_id, "mtg_session_id is required"
)
daily_api_key = assert_non_none_and_non_empty(
settings.DAILY_API_KEY, "DAILY_API_KEY is required"
)
async with DailyApiClient(api_key=daily_api_key) as client:
participants = await client.get_meeting_participants(mtg_session_id)
id_to_name = {}
id_to_user_id = {}
for p in participants.data:
if p.user_name:
id_to_name[p.participant_id] = p.user_name
if p.user_id:
id_to_user_id[p.participant_id] = p.user_id
track_keys = [t["s3_key"] for t in input.tracks]
cam_audio_keys = filter_cam_audio_tracks(track_keys)
participants_list = []
for idx, key in enumerate(cam_audio_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
)
await transcripts_controller.upsert_participant(transcript, participant)
participants_list.append(
{
"participant_id": participant_id,
"user_name": name,
"speaker": idx,
}
)
ctx.log(f"get_participants complete: {len(participants_list)} participants")
return ParticipantsResult(
participants=participants_list,
num_tracks=len(input.tracks),
source_language=transcript.source_language if transcript else "en",
target_language=transcript.target_language if transcript else "en",
)
@diarization_pipeline.task(
parents=[get_participants], execution_timeout=timedelta(seconds=600), retries=3
)
@with_error_handling("process_tracks")
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
"""Spawn child workflows for each track (dynamic fan-out)."""
ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows")
participants_result = ctx.task_output(get_participants)
source_language = participants_result.source_language
child_coroutines = [
track_workflow.aio_run(
TrackInput(
track_index=i,
s3_key=track["s3_key"],
bucket_name=input.bucket_name,
transcript_id=input.transcript_id,
language=source_language,
)
)
for i, track in enumerate(input.tracks)
]
results = await asyncio.gather(*child_coroutines)
target_language = participants_result.target_language
track_words = []
padded_tracks = []
created_padded_files = set()
for result in results:
transcribe_result = result.get("transcribe_track", {})
track_words.append(transcribe_result.get("words", []))
pad_result = result.get("pad_track", {})
padded_key = pad_result.get("padded_key")
bucket_name = pad_result.get("bucket_name")
# Store S3 key info (not presigned URL) - consumer tasks presign on demand
if padded_key:
padded_tracks.append(
PaddedTrackInfo(key=padded_key, bucket_name=bucket_name)
)
track_index = pad_result.get("track_index")
if pad_result.get("size", 0) > 0 and track_index is not None:
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{track_index}.webm"
created_padded_files.add(storage_path)
all_words = [word for words in track_words for word in words]
all_words.sort(key=lambda w: w.get("start", 0))
ctx.log(
f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks"
)
return ProcessTracksResult(
all_words=all_words,
padded_tracks=padded_tracks,
word_count=len(all_words),
num_tracks=len(input.tracks),
target_language=target_language,
created_padded_files=list(created_padded_files),
)
@diarization_pipeline.task(
parents=[process_tracks], execution_timeout=timedelta(seconds=300), retries=3
)
@with_error_handling("mixdown_tracks")
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
"""Mix all padded tracks into single audio file using PyAV (same as Celery)."""
ctx.log("mixdown_tracks: mixing padded tracks into single audio file")
track_result = ctx.task_output(process_tracks)
padded_tracks = track_result.padded_tracks
# TODO think of NonEmpty type to avoid those checks, e.g. sized.NonEmpty from https://github.com/antonagestam/phantom-types/
if not padded_tracks:
raise ValueError("No padded tracks to mixdown")
storage = _spawn_storage()
# Presign URLs on demand (avoids stale URLs on workflow replay)
padded_urls = []
for track_info in padded_tracks:
if track_info.key:
url = await storage.get_file_url(
track_info.key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=track_info.bucket_name,
)
padded_urls.append(url)
valid_urls = [url for url in padded_urls if url]
if not valid_urls:
raise ValueError("No valid padded tracks to mixdown")
target_sample_rate = detect_sample_rate_from_tracks(valid_urls, logger=logger)
if not target_sample_rate:
logger.error("Mixdown failed - no decodable audio frames found")
raise ValueError("No decodable audio frames in any track")
output_path = tempfile.mktemp(suffix=".mp3")
duration_ms_callback_capture_container = [0.0]
async def capture_duration(d):
duration_ms_callback_capture_container[0] = d
writer = AudioFileWriterProcessor(path=output_path, on_duration=capture_duration)
await mixdown_tracks_pyav(
valid_urls,
writer,
target_sample_rate,
offsets_seconds=None,
logger=logger,
)
await writer.flush()
file_size = Path(output_path).stat().st_size
storage_path = f"{input.transcript_id}/audio.mp3"
with open(output_path, "rb") as mixed_file:
await storage.put_file(storage_path, mixed_file)
Path(output_path).unlink(missing_ok=True)
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
await transcripts_controller.update(
transcript, {"audio_location": "storage"}
)
ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}")
return MixdownResult(
audio_key=storage_path,
duration=duration_ms_callback_capture_container[0],
tracks_mixed=len(valid_urls),
)
@diarization_pipeline.task(
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=120), retries=3
)
@with_error_handling("generate_waveform")
async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResult:
"""Generate audio waveform visualization using AudioWaveformProcessor (matches Celery)."""
ctx.log(f"generate_waveform: transcript_id={input.transcript_id}")
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptWaveform,
transcripts_controller,
)
mixdown_result = ctx.task_output(mixdown_tracks)
audio_key = mixdown_result.audio_key
storage = _spawn_storage()
audio_url = await storage.get_file_url(
audio_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
# Download MP3 to temp file (AudioWaveformProcessor needs local file)
with tempfile.NamedTemporaryFile(suffix=".mp3", delete=False) as temp_file:
temp_path = temp_file.name
try:
async with httpx.AsyncClient() as client:
response = await client.get(audio_url, timeout=120)
response.raise_for_status()
with open(temp_path, "wb") as f:
f.write(response.content)
waveform = get_audio_waveform(
path=Path(temp_path), segments_count=WAVEFORM_SEGMENTS
)
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
waveform_data = TranscriptWaveform(waveform=waveform)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"WAVEFORM",
waveform_data,
logger=logger,
)
finally:
Path(temp_path).unlink(missing_ok=True)
ctx.log("generate_waveform complete")
return WaveformResult(waveform_generated=True)
@diarization_pipeline.task(
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=300), retries=3
)
@with_error_handling("detect_topics")
async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
"""Detect topics using LLM and save to database (matches Celery on_topic callback)."""
ctx.log("detect_topics: analyzing transcript for topics")
track_result = ctx.task_output(process_tracks)
words = track_result.all_words
target_language = track_result.target_language
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptTopic,
transcripts_controller,
)
word_objects = [Word(**w) for w in words]
transcript_type = TranscriptType(words=word_objects)
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
async def on_topic_callback(data):
topic = TranscriptTopic(
title=data.title,
summary=data.summary,
timestamp=data.timestamp,
transcript=data.transcript.text,
words=data.transcript.words,
)
if isinstance(
data, TitleSummaryWithId
): # Celery parity: main_live_pipeline.py
topic.id = data.id
await transcripts_controller.upsert_topic(transcript, topic)
await append_event_and_broadcast(
input.transcript_id, transcript, "TOPIC", topic, logger=logger
)
topics = await topic_processing.detect_topics(
transcript_type,
target_language,
on_topic_callback=on_topic_callback,
empty_pipeline=empty_pipeline,
)
topics_list = [t.model_dump() for t in topics]
ctx.log(f"detect_topics complete: found {len(topics_list)} topics")
return TopicsResult(topics=topics_list)
@diarization_pipeline.task(
parents=[detect_topics], execution_timeout=timedelta(seconds=120), retries=3
)
@with_error_handling("generate_title")
async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
"""Generate meeting title using LLM and save to database (matches Celery on_title callback)."""
ctx.log("generate_title: generating title from topics")
topics_result = ctx.task_output(detect_topics)
topics = topics_result.topics
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptFinalTitle,
transcripts_controller,
)
topic_objects = [TitleSummary(**t) for t in topics]
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
title_result = None
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
async def on_title_callback(data):
nonlocal title_result
title_result = data.title
final_title = TranscriptFinalTitle(title=data.title)
if not transcript.title:
await transcripts_controller.update(
transcript,
{"title": final_title.title},
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_TITLE",
final_title,
logger=logger,
)
await topic_processing.generate_title(
topic_objects,
on_title_callback=on_title_callback,
empty_pipeline=empty_pipeline,
logger=logger,
)
ctx.log(f"generate_title complete: '{title_result}'")
return TitleResult(title=title_result)
@diarization_pipeline.task(
parents=[detect_topics], execution_timeout=timedelta(seconds=300), retries=3
)
@with_error_handling("generate_summary")
async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
"""Generate meeting summary using LLM and save to database (matches Celery callbacks)."""
ctx.log("generate_summary: generating long and short summaries")
topics_result = ctx.task_output(detect_topics)
topics = topics_result.topics
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptFinalLongSummary,
TranscriptFinalShortSummary,
transcripts_controller,
)
topic_objects = [TitleSummary(**t) for t in topics]
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
summary_result = None
short_summary_result = None
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
async def on_long_summary_callback(data):
nonlocal summary_result
summary_result = data.long_summary
final_long_summary = TranscriptFinalLongSummary(
long_summary=data.long_summary
)
await transcripts_controller.update(
transcript,
{"long_summary": final_long_summary.long_summary},
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_LONG_SUMMARY",
final_long_summary,
logger=logger,
)
async def on_short_summary_callback(data):
nonlocal short_summary_result
short_summary_result = data.short_summary
final_short_summary = TranscriptFinalShortSummary(
short_summary=data.short_summary
)
await transcripts_controller.update(
transcript,
{"short_summary": final_short_summary.short_summary},
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_SHORT_SUMMARY",
final_short_summary,
logger=logger,
)
await topic_processing.generate_summaries(
topic_objects,
transcript, # DB transcript for context
on_long_summary_callback=on_long_summary_callback,
on_short_summary_callback=on_short_summary_callback,
empty_pipeline=empty_pipeline,
logger=logger,
)
ctx.log("generate_summary complete")
return SummaryResult(summary=summary_result, short_summary=short_summary_result)
@diarization_pipeline.task(
parents=[generate_waveform, generate_title, generate_summary],
execution_timeout=timedelta(seconds=60),
retries=3,
)
@with_error_handling("finalize")
async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
"""Finalize transcript: save words, emit TRANSCRIPT event, set status to 'ended'.
Matches Celery's on_transcript + set_status behavior.
Note: Title and summaries are already saved by their respective task callbacks.
"""
ctx.log("finalize: saving transcript and setting status to 'ended'")
mixdown_result = ctx.task_output(mixdown_tracks)
track_result = ctx.task_output(process_tracks)
duration = mixdown_result.duration
all_words = track_result.all_words
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
created_padded_files = track_result.created_padded_files
if created_padded_files:
ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files")
storage = _spawn_storage()
cleanup_results = await asyncio.gather(
*[storage.delete_file(path) for path in created_padded_files],
return_exceptions=True,
)
for storage_path, result in zip(created_padded_files, cleanup_results):
if isinstance(result, Exception):
logger.warning(
"[Hatchet] Failed to cleanup temporary padded track",
storage_path=storage_path,
error=str(result),
)
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
TranscriptText,
transcripts_controller,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript is None:
raise ValueError(f"Transcript {input.transcript_id} not found in database")
word_objects = [Word(**w) for w in all_words]
merged_transcript = TranscriptType(words=word_objects, translation=None)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"TRANSCRIPT",
TranscriptText(
text=merged_transcript.text,
translation=merged_transcript.translation,
),
logger=logger,
)
# Save duration and clear workflow_run_id (workflow completed successfully)
# Note: title/long_summary/short_summary already saved by their callbacks
await transcripts_controller.update(
transcript,
{
"duration": duration,
"workflow_run_id": None, # Clear on success - no need to resume
},
)
duration_data = TranscriptDuration(duration=duration)
await append_event_and_broadcast(
input.transcript_id, transcript, "DURATION", duration_data, logger=logger
)
await set_status_and_broadcast(input.transcript_id, "ended", logger=logger)
ctx.log(
f"finalize complete: transcript {input.transcript_id} status set to 'ended'"
)
return FinalizeResult(status="COMPLETED")
@diarization_pipeline.task(
parents=[finalize], execution_timeout=timedelta(seconds=60), retries=3
)
@with_error_handling("cleanup_consent", set_error_status=False)
async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
"""Check consent and delete audio files if any participant denied."""
ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.db.meetings import ( # noqa: PLC0415
meeting_consent_controller,
meetings_controller,
)
from reflector.db.recordings import recordings_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.storage import get_transcripts_storage # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
ctx.log("cleanup_consent: transcript not found")
return ConsentResult()
consent_denied = False
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting:
consent_denied = await meeting_consent_controller.has_any_denial(
meeting.id
)
if not consent_denied:
ctx.log("cleanup_consent: consent approved, keeping all files")
return ConsentResult()
ctx.log("cleanup_consent: consent denied, deleting audio files")
input_track_keys = set(t["s3_key"] for t in input.tracks)
# Detect if recording.track_keys was manually modified after workflow started
if transcript.recording_id:
recording = await recordings_controller.get_by_id(transcript.recording_id)
if recording and recording.track_keys:
db_track_keys = set(filter_cam_audio_tracks(recording.track_keys))
if input_track_keys != db_track_keys:
added = db_track_keys - input_track_keys
removed = input_track_keys - db_track_keys
logger.warning(
"[Hatchet] Track keys mismatch: DB changed since workflow start",
transcript_id=input.transcript_id,
recording_id=transcript.recording_id,
input_count=len(input_track_keys),
db_count=len(db_track_keys),
added_in_db=list(added) if added else None,
removed_from_db=list(removed) if removed else None,
)
ctx.log(
f"WARNING: track_keys mismatch - "
f"input has {len(input_track_keys)}, DB has {len(db_track_keys)}. "
f"Using input tracks for deletion."
)
deletion_errors = []
if input_track_keys and input.bucket_name:
master_storage = get_transcripts_storage()
for key in input_track_keys:
try:
await master_storage.delete_file(key, bucket=input.bucket_name)
ctx.log(f"Deleted recording file: {input.bucket_name}/{key}")
except Exception as e:
error_msg = f"Failed to delete {key}: {e}"
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
if transcript.audio_location == "storage":
storage = get_transcripts_storage()
try:
await storage.delete_file(transcript.storage_audio_path)
ctx.log(f"Deleted processed audio: {transcript.storage_audio_path}")
except Exception as e:
error_msg = f"Failed to delete processed audio: {e}"
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
if deletion_errors:
logger.warning(
"[Hatchet] cleanup_consent completed with errors",
transcript_id=input.transcript_id,
error_count=len(deletion_errors),
errors=deletion_errors,
)
ctx.log(f"cleanup_consent completed with {len(deletion_errors)} errors")
else:
await transcripts_controller.update(transcript, {"audio_deleted": True})
ctx.log("cleanup_consent: all audio deleted successfully")
return ConsentResult()
@diarization_pipeline.task(
parents=[cleanup_consent], execution_timeout=timedelta(seconds=60), retries=5
)
@with_error_handling("post_zulip", set_error_status=False)
async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
"""Post notification to Zulip."""
ctx.log(f"post_zulip: transcript_id={input.transcript_id}")
if not settings.ZULIP_REALM:
ctx.log("post_zulip skipped (Zulip not configured)")
return ZulipResult(zulip_message_id=None, skipped=True)
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
message_id = await post_transcript_notification(transcript)
ctx.log(f"post_zulip complete: zulip_message_id={message_id}")
else:
message_id = None
return ZulipResult(zulip_message_id=message_id)
@diarization_pipeline.task(
parents=[post_zulip], execution_timeout=timedelta(seconds=120), retries=30
)
@with_error_handling("send_webhook", set_error_status=False)
async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
"""Send completion webhook to external service."""
ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
if not input.room_id:
ctx.log("send_webhook skipped (no room_id)")
return WebhookResult(webhook_sent=False, skipped=True)
async with fresh_db_connection():
from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
room = await rooms_controller.get_by_id(input.room_id)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if room and room.webhook_url and transcript:
webhook_payload = {
"event": "transcript.completed",
"transcript_id": input.transcript_id,
"title": transcript.title,
"duration": transcript.duration,
}
async with httpx.AsyncClient() as client:
response = await client.post(
room.webhook_url, json=webhook_payload, timeout=30
)
response.raise_for_status()
ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code)
return WebhookResult(webhook_sent=False, skipped=True)

View File

@@ -0,0 +1,123 @@
"""
Pydantic models for Hatchet workflow task return types.
Provides static typing for all task outputs, enabling type checking
and better IDE support.
"""
from typing import Any
from pydantic import BaseModel
from reflector.utils.string import NonEmptyString
class PadTrackResult(BaseModel):
"""Result from pad_track task."""
padded_key: NonEmptyString # S3 key (not presigned URL) - presign on demand to avoid stale URLs on replay
bucket_name: (
NonEmptyString | None
) # None means use default transcript storage bucket
size: int
track_index: int
class TranscribeTrackResult(BaseModel):
"""Result from transcribe_track task."""
words: list[dict[str, Any]]
track_index: int
class RecordingResult(BaseModel):
"""Result from get_recording task."""
id: NonEmptyString | None
mtg_session_id: NonEmptyString | None
duration: float
class ParticipantsResult(BaseModel):
"""Result from get_participants task."""
participants: list[dict[str, Any]]
num_tracks: int
source_language: NonEmptyString
target_language: NonEmptyString
class PaddedTrackInfo(BaseModel):
"""Info for a padded track - S3 key + bucket for on-demand presigning."""
key: NonEmptyString
bucket_name: NonEmptyString | None # None = use default storage bucket
class ProcessTracksResult(BaseModel):
"""Result from process_tracks task."""
all_words: list[dict[str, Any]]
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
word_count: int
num_tracks: int
target_language: NonEmptyString
created_padded_files: list[NonEmptyString]
class MixdownResult(BaseModel):
"""Result from mixdown_tracks task."""
audio_key: NonEmptyString
duration: float
tracks_mixed: int
class WaveformResult(BaseModel):
"""Result from generate_waveform task."""
waveform_generated: bool
class TopicsResult(BaseModel):
"""Result from detect_topics task."""
topics: list[dict[str, Any]]
class TitleResult(BaseModel):
"""Result from generate_title task."""
title: str | None
class SummaryResult(BaseModel):
"""Result from generate_summary task."""
summary: str | None
short_summary: str | None
class FinalizeResult(BaseModel):
"""Result from finalize task."""
status: NonEmptyString
class ConsentResult(BaseModel):
"""Result from cleanup_consent task."""
class ZulipResult(BaseModel):
"""Result from post_zulip task."""
zulip_message_id: int | None = None
skipped: bool = False
class WebhookResult(BaseModel):
"""Result from send_webhook task."""
webhook_sent: bool
skipped: bool = False
response_code: int | None = None

View File

@@ -0,0 +1,222 @@
"""
Hatchet child workflow: TrackProcessing
Handles individual audio track processing: padding and transcription.
Spawned dynamically by the main diarization pipeline for each track.
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline)
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py.
Note: This file uses deferred imports (inside tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure
storage/DB connections are not shared across forks.
"""
import tempfile
from datetime import timedelta
from pathlib import Path
import av
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import (
apply_audio_padding_to_file,
extract_stream_start_time_from_container,
)
class TrackInput(BaseModel):
"""Input for individual track processing."""
track_index: int
s3_key: str
bucket_name: str
transcript_id: str
language: str = "en"
hatchet = HatchetClientManager.get_client()
track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput)
@track_workflow.task(execution_timeout=timedelta(seconds=300), retries=3)
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
"""Pad single audio track with silence for alignment.
Extracts stream.start_time from WebM container metadata and applies
silence padding using PyAV filter graph (adelay).
"""
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
logger.info(
"[Hatchet] pad_track",
track_index=input.track_index,
s3_key=input.s3_key,
transcript_id=input.transcript_id,
)
try:
# Create fresh storage instance to avoid aioboto3 fork issues
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
source_url = await storage.get_file_url(
input.s3_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=input.bucket_name,
)
with av.open(source_url) as in_container:
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)
# If no padding needed, return original S3 key
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
track_index=input.track_index,
)
return PadTrackResult(
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as temp_file:
temp_path = temp_file.name
try:
apply_audio_padding_to_file(
in_container,
temp_path,
start_time_seconds,
input.track_index,
logger=logger,
)
file_size = Path(temp_path).stat().st_size
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
logger.info(
f"About to upload padded track",
key=storage_path,
size=file_size,
)
with open(temp_path, "rb") as padded_file:
await storage.put_file(storage_path, padded_file)
logger.info(
f"Uploaded padded track to S3",
key=storage_path,
size=file_size,
)
finally:
Path(temp_path).unlink(missing_ok=True)
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
logger.info(
"[Hatchet] pad_track complete",
track_index=input.track_index,
padded_key=storage_path,
)
# Return S3 key (not presigned URL) - consumer tasks presign on demand
# This avoids stale URLs when workflow is replayed
return PadTrackResult(
padded_key=storage_path,
bucket_name=None, # None = use default transcript storage bucket
size=file_size,
track_index=input.track_index,
)
except Exception as e:
logger.error("[Hatchet] pad_track failed", error=str(e), exc_info=True)
raise
@track_workflow.task(
parents=[pad_track], execution_timeout=timedelta(seconds=600), retries=3
)
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
ctx.log(f"transcribe_track: track {input.track_index}, language={input.language}")
logger.info(
"[Hatchet] transcribe_track",
track_index=input.track_index,
language=input.language,
)
try:
pad_result = ctx.task_output(pad_track)
padded_key = pad_result.padded_key
bucket_name = pad_result.bucket_name
if not padded_key:
raise ValueError("Missing padded_key from pad_track")
# Presign URL on demand (avoids stale URLs on workflow replay)
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
audio_url = await storage.get_file_url(
padded_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=bucket_name,
)
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
transcribe_file_with_processor,
)
transcript = await transcribe_file_with_processor(audio_url, input.language)
# Tag all words with speaker index
words = []
for word in transcript.words:
word_dict = word.model_dump()
word_dict["speaker"] = input.track_index
words.append(word_dict)
ctx.log(
f"transcribe_track complete: track {input.track_index}, {len(words)} words"
)
logger.info(
"[Hatchet] transcribe_track complete",
track_index=input.track_index,
word_count=len(words),
)
return TranscribeTrackResult(
words=words,
track_index=input.track_index,
)
except Exception as e:
logger.error("[Hatchet] transcribe_track failed", error=str(e), exc_info=True)
raise

View File

@@ -1,14 +1,32 @@
import logging
from typing import Type, TypeVar
from contextvars import ContextVar
from typing import Generic, Type, TypeVar
from uuid import uuid4
from llama_index.core import Settings
from llama_index.core.output_parsers import PydanticOutputParser
from llama_index.core.program import LLMTextCompletionProgram
from llama_index.core.response_synthesizers import TreeSummarize
from llama_index.core.workflow import (
Context,
Event,
StartEvent,
StopEvent,
Workflow,
step,
)
from llama_index.llms.openai_like import OpenAILike
from pydantic import BaseModel, ValidationError
from workflows.errors import WorkflowTimeoutError
from reflector.utils.retry import retry
T = TypeVar("T", bound=BaseModel)
OutputT = TypeVar("OutputT", bound=BaseModel)
# Session ID for LiteLLM request grouping - set per processing run
llm_session_id: ContextVar[str | None] = ContextVar("llm_session_id", default=None)
logger = logging.getLogger(__name__)
STRUCTURED_RESPONSE_PROMPT_TEMPLATE = """
Based on the following analysis, provide the information in the requested JSON format:
@@ -20,6 +38,158 @@ Analysis:
"""
class LLMParseError(Exception):
"""Raised when LLM output cannot be parsed after retries."""
def __init__(self, output_cls: Type[BaseModel], error_msg: str, attempts: int):
self.output_cls = output_cls
self.error_msg = error_msg
self.attempts = attempts
super().__init__(
f"Failed to parse {output_cls.__name__} after {attempts} attempts: {error_msg}"
)
class ExtractionDone(Event):
"""Event emitted when LLM JSON formatting completes."""
output: str
class ValidationErrorEvent(Event):
"""Event emitted when validation fails."""
error: str
wrong_output: str
class StructuredOutputWorkflow(Workflow, Generic[OutputT]):
"""Workflow for structured output extraction with validation retry.
This workflow handles parse/validation retries only. Network error retries
are handled internally by Settings.llm (OpenAILike max_retries=3).
The caller should NOT wrap this workflow in additional retry logic.
"""
def __init__(
self,
output_cls: Type[OutputT],
max_retries: int = 3,
**kwargs,
):
super().__init__(**kwargs)
self.output_cls: Type[OutputT] = output_cls
self.max_retries = max_retries
self.output_parser = PydanticOutputParser(output_cls)
@step
async def extract(
self, ctx: Context, ev: StartEvent | ValidationErrorEvent
) -> StopEvent | ExtractionDone:
"""Extract structured data from text using two-step LLM process.
Step 1 (first call only): TreeSummarize generates text analysis
Step 2 (every call): Settings.llm.acomplete formats analysis as JSON
"""
current_retries = await ctx.store.get("retries", default=0)
await ctx.store.set("retries", current_retries + 1)
if current_retries >= self.max_retries:
last_error = await ctx.store.get("last_error", default=None)
logger.error(
f"Max retries ({self.max_retries}) reached for {self.output_cls.__name__}"
)
return StopEvent(result={"error": last_error, "attempts": current_retries})
if isinstance(ev, StartEvent):
# First call: run TreeSummarize to get analysis, store in context
prompt = ev.get("prompt")
texts = ev.get("texts")
tone_name = ev.get("tone_name")
if not prompt or not isinstance(texts, list):
raise ValueError(
"StartEvent must contain 'prompt' (str) and 'texts' (list)"
)
summarizer = TreeSummarize(verbose=False)
analysis = await summarizer.aget_response(
prompt, texts, tone_name=tone_name
)
await ctx.store.set("analysis", str(analysis))
reflection = ""
else:
# Retry: reuse analysis from context
analysis = await ctx.store.get("analysis")
if not analysis:
raise RuntimeError("Internal error: analysis not found in context")
wrong_output = ev.wrong_output
if len(wrong_output) > 2000:
wrong_output = wrong_output[:2000] + "... [truncated]"
reflection = (
f"\n\nYour previous response could not be parsed:\n{wrong_output}\n\n"
f"Error:\n{ev.error}\n\n"
"Please try again. Return ONLY valid JSON matching the schema above, "
"with no markdown formatting or extra text."
)
# Step 2: Format analysis as JSON using LLM completion
format_instructions = self.output_parser.format(
"Please structure the above information in the following JSON format:"
)
json_prompt = STRUCTURED_RESPONSE_PROMPT_TEMPLATE.format(
analysis=analysis,
format_instructions=format_instructions + reflection,
)
# Network retries handled by OpenAILike (max_retries=3)
response = await Settings.llm.acomplete(json_prompt)
return ExtractionDone(output=response.text)
@step
async def validate(
self, ctx: Context, ev: ExtractionDone
) -> StopEvent | ValidationErrorEvent:
"""Validate extracted output against Pydantic schema."""
raw_output = ev.output
retries = await ctx.store.get("retries", default=0)
try:
parsed = self.output_parser.parse(raw_output)
if retries > 1:
logger.info(
f"LLM parse succeeded on attempt {retries}/{self.max_retries} "
f"for {self.output_cls.__name__}"
)
return StopEvent(result={"success": parsed})
except (ValidationError, ValueError) as e:
error_msg = self._format_error(e, raw_output)
await ctx.store.set("last_error", error_msg)
logger.error(
f"LLM parse error (attempt {retries}/{self.max_retries}): "
f"{type(e).__name__}: {e}\nRaw response: {raw_output[:500]}"
)
return ValidationErrorEvent(
error=error_msg,
wrong_output=raw_output,
)
def _format_error(self, error: Exception, raw_output: str) -> str:
"""Format error for LLM feedback."""
if isinstance(error, ValidationError):
error_messages = []
for err in error.errors():
field = ".".join(str(loc) for loc in err["loc"])
error_messages.append(f"- {err['msg']} in field '{field}'")
return "Schema validation errors:\n" + "\n".join(error_messages)
else:
return f"Parse error: {str(error)}"
class LLM:
def __init__(self, settings, temperature: float = 0.4, max_tokens: int = 2048):
self.settings_obj = settings
@@ -30,11 +200,12 @@ class LLM:
self.temperature = temperature
self.max_tokens = max_tokens
# Configure llamaindex Settings
self._configure_llamaindex()
def _configure_llamaindex(self):
"""Configure llamaindex Settings with OpenAILike LLM"""
session_id = llm_session_id.get() or f"fallback-{uuid4().hex}"
Settings.llm = OpenAILike(
model=self.model_name,
api_base=self.url,
@@ -44,6 +215,7 @@ class LLM:
is_function_calling_model=False,
temperature=self.temperature,
max_tokens=self.max_tokens,
additional_kwargs={"extra_body": {"litellm_session_id": session_id}},
)
async def get_response(
@@ -60,44 +232,38 @@ class LLM:
texts: list[str],
output_cls: Type[T],
tone_name: str | None = None,
timeout: int | None = None,
) -> T:
"""Get structured output from LLM for non-function-calling models"""
logger = logging.getLogger(__name__)
"""Get structured output from LLM with validation retry via Workflow."""
if timeout is None:
timeout = self.settings_obj.LLM_STRUCTURED_RESPONSE_TIMEOUT
summarizer = TreeSummarize(verbose=True)
response = await summarizer.aget_response(prompt, texts, tone_name=tone_name)
output_parser = PydanticOutputParser(output_cls)
program = LLMTextCompletionProgram.from_defaults(
output_parser=output_parser,
prompt_template_str=STRUCTURED_RESPONSE_PROMPT_TEMPLATE,
verbose=False,
async def run_workflow():
workflow = StructuredOutputWorkflow(
output_cls=output_cls,
max_retries=self.settings_obj.LLM_PARSE_MAX_RETRIES + 1,
timeout=timeout,
)
format_instructions = output_parser.format(
"Please structure the above information in the following JSON format:"
result = await workflow.run(
prompt=prompt,
texts=texts,
tone_name=tone_name,
)
try:
output = await program.acall(
analysis=str(response), format_instructions=format_instructions
if "error" in result:
error_msg = result["error"] or "Max retries exceeded"
raise LLMParseError(
output_cls=output_cls,
error_msg=error_msg,
attempts=result.get("attempts", 0),
)
except ValidationError as e:
# Extract the raw JSON from the error details
errors = e.errors()
if errors and "input" in errors[0]:
raw_json = errors[0]["input"]
logger.error(
f"JSON validation failed for {output_cls.__name__}. "
f"Full raw JSON output:\n{raw_json}\n"
f"Validation errors: {errors}"
)
else:
logger.error(
f"JSON validation failed for {output_cls.__name__}. "
f"Validation errors: {errors}"
)
raise
return output
return result["success"]
return await retry(run_workflow)(
retry_attempts=3,
retry_backoff_interval=1.0,
retry_backoff_max=30.0,
retry_ignore_exc_types=(WorkflowTimeoutError,),
)

View File

@@ -97,13 +97,8 @@ class PipelineMainFile(PipelineMainBase):
},
)
# Extract audio and write to transcript location
audio_path = await self.extract_and_write_audio(file_path, transcript)
# Upload for processing
audio_url = await self.upload_audio(audio_path, transcript)
# Run parallel processing
await self.run_parallel_processing(
audio_path,
audio_url,
@@ -197,7 +192,6 @@ class PipelineMainFile(PipelineMainBase):
transcript_result = results[0]
diarization_result = results[1]
# Handle errors - raise any exception that occurred
self._handle_gather_exceptions(results, "parallel processing")
for result in results:
if isinstance(result, Exception):
@@ -212,7 +206,6 @@ class PipelineMainFile(PipelineMainBase):
transcript=transcript_result, diarization=diarization_result or []
)
# Store result for retrieval
diarized_transcript: Transcript | None = None
async def capture_result(transcript):
@@ -309,6 +302,7 @@ class PipelineMainFile(PipelineMainBase):
transcript,
on_long_summary_callback=self.on_long_summary,
on_short_summary_callback=self.on_short_summary,
on_action_items_callback=self.on_action_items,
empty_pipeline=self.empty_pipeline,
logger=self.logger,
)
@@ -340,7 +334,6 @@ async def task_send_webhook_if_needed(*, transcript_id: str):
@asynctask
async def task_pipeline_file_process(*, transcript_id: str):
"""Celery task for file pipeline processing"""
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
raise Exception(f"Transcript {transcript_id} not found")
@@ -349,7 +342,6 @@ async def task_pipeline_file_process(*, transcript_id: str):
try:
await pipeline.set_status(transcript_id, "processing")
# Find the file to process
audio_file = next(transcript.data_path.glob("upload.*"), None)
if not audio_file:
audio_file = next(transcript.data_path.glob("audio.*"), None)

View File

@@ -27,6 +27,7 @@ from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import (
Transcript,
TranscriptActionItems,
TranscriptDuration,
TranscriptFinalLongSummary,
TranscriptFinalShortSummary,
@@ -306,6 +307,23 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage]
data=final_short_summary,
)
@broadcast_to_sockets
async def on_action_items(self, data):
action_items = TranscriptActionItems(action_items=data.action_items)
async with self.transaction():
transcript = await self.get_transcript()
await transcripts_controller.update(
transcript,
{
"action_items": action_items.action_items,
},
)
return await transcripts_controller.append_event(
transcript=transcript,
event="ACTION_ITEMS",
data=action_items,
)
@broadcast_to_sockets
async def on_duration(self, data):
async with self.transaction():
@@ -465,6 +483,7 @@ class PipelineMainFinalSummaries(PipelineMainFromTopics):
transcript=self._transcript,
callback=self.on_long_summary,
on_short_summary=self.on_short_summary,
on_action_items=self.on_action_items,
),
]

View File

@@ -1,15 +1,15 @@
import asyncio
import math
import tempfile
from fractions import Fraction
from pathlib import Path
import av
from av.audio.resampler import AudioResampler
from celery import chain, shared_task
from reflector.asynctask import asynctask
from reflector.dailyco_api import MeetingParticipantsResponse
from reflector.db.transcripts import (
Transcript,
TranscriptParticipant,
TranscriptStatus,
TranscriptWaveform,
transcripts_controller,
@@ -29,14 +29,21 @@ from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType
from reflector.storage import Storage, get_transcripts_storage
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_mixdown import (
detect_sample_rate_from_tracks,
mixdown_tracks_pyav,
)
from reflector.utils.audio_padding import (
apply_audio_padding_to_file,
extract_stream_start_time_from_container,
)
from reflector.utils.daily import (
filter_cam_audio_tracks,
parse_daily_recording_filename,
)
from reflector.utils.string import NonEmptyString
# Audio encoding constants
OPUS_STANDARD_SAMPLE_RATE = 48000
OPUS_DEFAULT_BIT_RATE = 128000
# Storage operation constants
PRESIGNED_URL_EXPIRATION_SECONDS = 7200 # 2 hours
from reflector.video_platforms.factory import create_platform_client
class PipelineMainMultitrack(PipelineMainBase):
@@ -117,8 +124,8 @@ class PipelineMainMultitrack(PipelineMainBase):
try:
# PyAV streams input from S3 URL efficiently (2-5MB fixed overhead for codec/filters)
with av.open(track_url) as in_container:
start_time_seconds = self._extract_stream_start_time_from_container(
in_container, track_idx
start_time_seconds = extract_stream_start_time_from_container(
in_container, track_idx, logger=self.logger
)
if start_time_seconds <= 0:
@@ -136,8 +143,12 @@ class PipelineMainMultitrack(PipelineMainBase):
temp_path = temp_file.name
try:
self._apply_audio_padding_to_file(
in_container, temp_path, start_time_seconds, track_idx
apply_audio_padding_to_file(
in_container,
temp_path,
start_time_seconds,
track_idx,
logger=self.logger,
)
storage_path = (
@@ -148,7 +159,6 @@ class PipelineMainMultitrack(PipelineMainBase):
with open(temp_path, "rb") as padded_file:
await storage.put_file(storage_path, padded_file)
finally:
# Clean up temp file
Path(temp_path).unlink(missing_ok=True)
padded_url = await storage.get_file_url(
@@ -178,308 +188,27 @@ class PipelineMainMultitrack(PipelineMainBase):
f"Track {track_idx} padding failed - transcript would have incorrect timestamps"
) from e
def _extract_stream_start_time_from_container(
self, container, track_idx: int
) -> float:
"""
Extract meeting-relative start time from WebM stream metadata.
Uses PyAV to read stream.start_time from WebM container.
More accurate than filename timestamps by ~209ms due to network/encoding delays.
"""
start_time_seconds = 0.0
try:
audio_streams = [s for s in container.streams if s.type == "audio"]
stream = audio_streams[0] if audio_streams else container.streams[0]
# 1) Try stream-level start_time (most reliable for Daily.co tracks)
if stream.start_time is not None and stream.time_base is not None:
start_time_seconds = float(stream.start_time * stream.time_base)
# 2) Fallback to container-level start_time (in av.time_base units)
if (start_time_seconds <= 0) and (container.start_time is not None):
start_time_seconds = float(container.start_time * av.time_base)
# 3) Fallback to first packet DTS in stream.time_base
if start_time_seconds <= 0:
for packet in container.demux(stream):
if packet.dts is not None:
start_time_seconds = float(packet.dts * stream.time_base)
break
except Exception as e:
self.logger.warning(
"PyAV metadata read failed; assuming 0 start_time",
track_idx=track_idx,
error=str(e),
)
start_time_seconds = 0.0
self.logger.info(
f"Track {track_idx} stream metadata: start_time={start_time_seconds:.3f}s",
track_idx=track_idx,
)
return start_time_seconds
def _apply_audio_padding_to_file(
self,
in_container,
output_path: str,
start_time_seconds: float,
track_idx: int,
) -> None:
"""Apply silence padding to audio track using PyAV filter graph, writing to file"""
delay_ms = math.floor(start_time_seconds * 1000)
self.logger.info(
f"Padding track {track_idx} with {delay_ms}ms delay using PyAV",
track_idx=track_idx,
delay_ms=delay_ms,
)
try:
with av.open(output_path, "w", format="webm") as out_container:
in_stream = next(
(s for s in in_container.streams if s.type == "audio"), None
)
if in_stream is None:
raise Exception("No audio stream in input")
out_stream = out_container.add_stream(
"libopus", rate=OPUS_STANDARD_SAMPLE_RATE
)
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
graph = av.filter.Graph()
abuf_args = (
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_fmt=s16:"
f"channel_layout=stereo"
)
src = graph.add("abuffer", args=abuf_args, name="src")
aresample_f = graph.add("aresample", args="async=1", name="ares")
# adelay requires one delay value per channel separated by '|'
delays_arg = f"{delay_ms}|{delay_ms}"
adelay_f = graph.add(
"adelay", args=f"delays={delays_arg}:all=1", name="delay"
)
sink = graph.add("abuffersink", name="sink")
src.link_to(aresample_f)
aresample_f.link_to(adelay_f)
adelay_f.link_to(sink)
graph.configure()
resampler = AudioResampler(
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
)
# Decode -> resample -> push through graph -> encode Opus
for frame in in_container.decode(in_stream):
out_frames = resampler.resample(frame) or []
for rframe in out_frames:
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
src.push(rframe)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
src.push(None)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
for packet in out_stream.encode(None):
out_container.mux(packet)
except Exception as e:
self.logger.error(
"PyAV padding failed for track",
track_idx=track_idx,
delay_ms=delay_ms,
error=str(e),
exc_info=True,
)
raise
async def mixdown_tracks(
self,
track_urls: list[str],
writer: AudioFileWriterProcessor,
offsets_seconds: list[float] | None = None,
) -> None:
"""Multi-track mixdown using PyAV filter graph (amix), reading from S3 presigned URLs"""
target_sample_rate: int | None = None
for url in track_urls:
if not url:
continue
container = None
try:
container = av.open(url)
for frame in container.decode(audio=0):
target_sample_rate = frame.sample_rate
break
except Exception:
continue
finally:
if container is not None:
container.close()
if target_sample_rate:
break
"""Multi-track mixdown using PyAV filter graph (amix), reading from S3 presigned URLs."""
target_sample_rate = detect_sample_rate_from_tracks(
track_urls, logger=self.logger
)
if not target_sample_rate:
self.logger.error("Mixdown failed - no decodable audio frames found")
raise Exception("Mixdown failed: No decodable audio frames in any track")
# Build PyAV filter graph:
# N abuffer (s32/stereo)
# -> optional adelay per input (for alignment)
# -> amix (s32)
# -> aformat(s16)
# -> sink
graph = av.filter.Graph()
inputs = []
valid_track_urls = [url for url in track_urls if url]
input_offsets_seconds = None
if offsets_seconds is not None:
input_offsets_seconds = [
offsets_seconds[i] for i, url in enumerate(track_urls) if url
]
for idx, url in enumerate(valid_track_urls):
args = (
f"time_base=1/{target_sample_rate}:"
f"sample_rate={target_sample_rate}:"
f"sample_fmt=s32:"
f"channel_layout=stereo"
await mixdown_tracks_pyav(
track_urls,
writer,
target_sample_rate,
offsets_seconds=offsets_seconds,
logger=self.logger,
)
in_ctx = graph.add("abuffer", args=args, name=f"in{idx}")
inputs.append(in_ctx)
if not inputs:
self.logger.error("Mixdown failed - no valid inputs for graph")
raise Exception("Mixdown failed: No valid inputs for filter graph")
mixer = graph.add("amix", args=f"inputs={len(inputs)}:normalize=0", name="mix")
fmt = graph.add(
"aformat",
args=(
f"sample_fmts=s32:channel_layouts=stereo:sample_rates={target_sample_rate}"
),
name="fmt",
)
sink = graph.add("abuffersink", name="out")
# Optional per-input delay before mixing
delays_ms: list[int] = []
if input_offsets_seconds is not None:
base = min(input_offsets_seconds) if input_offsets_seconds else 0.0
delays_ms = [
max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds
]
else:
delays_ms = [0 for _ in inputs]
for idx, in_ctx in enumerate(inputs):
delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0
if delay_ms > 0:
# adelay requires one value per channel; use same for stereo
adelay = graph.add(
"adelay",
args=f"delays={delay_ms}|{delay_ms}:all=1",
name=f"delay{idx}",
)
in_ctx.link_to(adelay)
adelay.link_to(mixer, 0, idx)
else:
in_ctx.link_to(mixer, 0, idx)
mixer.link_to(fmt)
fmt.link_to(sink)
graph.configure()
containers = []
try:
# Open all containers with cleanup guaranteed
for i, url in enumerate(valid_track_urls):
try:
c = av.open(url)
containers.append(c)
except Exception as e:
self.logger.warning(
"Mixdown: failed to open container from URL",
input=i,
url=url,
error=str(e),
)
if not containers:
self.logger.error("Mixdown failed - no valid containers opened")
raise Exception("Mixdown failed: Could not open any track containers")
decoders = [c.decode(audio=0) for c in containers]
active = [True] * len(decoders)
resamplers = [
AudioResampler(format="s32", layout="stereo", rate=target_sample_rate)
for _ in decoders
]
while any(active):
for i, (dec, is_active) in enumerate(zip(decoders, active)):
if not is_active:
continue
try:
frame = next(dec)
except StopIteration:
active[i] = False
continue
if frame.sample_rate != target_sample_rate:
continue
out_frames = resamplers[i].resample(frame) or []
for rf in out_frames:
rf.sample_rate = target_sample_rate
rf.time_base = Fraction(1, target_sample_rate)
inputs[i].push(rf)
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
for in_ctx in inputs:
in_ctx.push(None)
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
finally:
# Cleanup all containers, even if processing failed
for c in containers:
if c is not None:
try:
c.close()
except Exception:
pass # Best effort cleanup
@broadcast_to_sockets
async def set_status(self, transcript_id: str, status: TranscriptStatus):
@@ -494,6 +223,90 @@ class PipelineMainMultitrack(PipelineMainBase):
transcript=transcript, event="WAVEFORM", data=waveform
)
async def update_participants_from_daily(
self, transcript: Transcript, track_keys: list[str]
) -> None:
"""Update transcript participants with user_id and names from Daily.co API."""
if not transcript.recording_id:
return
try:
async with create_platform_client("daily") as daily_client:
id_to_name = {}
id_to_user_id = {}
try:
rec_details = await daily_client.get_recording(
transcript.recording_id
)
mtg_session_id = rec_details.mtgSessionId
if mtg_session_id:
try:
payload: MeetingParticipantsResponse = (
await daily_client.get_meeting_participants(
mtg_session_id
)
)
for p in payload.data:
pid = p.participant_id
name = p.user_name
user_id = p.user_id
if name:
id_to_name[pid] = name
if user_id:
id_to_user_id[pid] = user_id
except Exception as e:
self.logger.warning(
"Failed to fetch Daily meeting participants",
error=str(e),
mtg_session_id=mtg_session_id,
exc_info=True,
)
else:
self.logger.warning(
"No mtgSessionId found for recording; participant names may be generic",
recording_id=transcript.recording_id,
)
except Exception as e:
self.logger.warning(
"Failed to fetch Daily recording details",
error=str(e),
recording_id=transcript.recording_id,
exc_info=True,
)
return
cam_audio_keys = filter_cam_audio_tracks(track_keys)
for idx, key in enumerate(cam_audio_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
self.logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
exc_info=True,
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
)
await transcripts_controller.upsert_participant(
transcript, participant
)
except Exception as e:
self.logger.warning(
"Failed to map participant names", error=str(e), exc_info=True
)
async def process(self, bucket_name: str, track_keys: list[str]):
transcript = await self.get_transcript()
async with self.transaction():
@@ -502,9 +315,12 @@ class PipelineMainMultitrack(PipelineMainBase):
{
"events": [],
"topics": [],
"participants": [],
},
)
await self.update_participants_from_daily(transcript, track_keys)
source_storage = get_transcripts_storage()
transcript_storage = source_storage
@@ -669,6 +485,7 @@ class PipelineMainMultitrack(PipelineMainBase):
transcript,
on_long_summary_callback=self.on_long_summary,
on_short_summary_callback=self.on_short_summary,
on_action_items_callback=self.on_action_items,
empty_pipeline=self.empty_pipeline,
logger=self.logger,
)

View File

@@ -89,6 +89,7 @@ async def generate_summaries(
*,
on_long_summary_callback: Callable,
on_short_summary_callback: Callable,
on_action_items_callback: Callable,
empty_pipeline: EmptyPipeline,
logger: structlog.BoundLogger,
):
@@ -96,11 +97,14 @@ async def generate_summaries(
logger.warning("No topics for summary generation")
return
processor = TranscriptFinalSummaryProcessor(
transcript=transcript,
callback=on_long_summary_callback,
on_short_summary=on_short_summary_callback,
)
processor_kwargs = {
"transcript": transcript,
"callback": on_long_summary_callback,
"on_short_summary": on_short_summary_callback,
"on_action_items": on_action_items_callback,
}
processor = TranscriptFinalSummaryProcessor(**processor_kwargs)
processor.set_pipeline(empty_pipeline)
for topic in topics:

View File

@@ -96,6 +96,36 @@ RECAP_PROMPT = dedent(
"""
).strip()
ACTION_ITEMS_PROMPT = dedent(
"""
Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next.
Look for:
1. **Decisions Made**: Any decisions, choices, or conclusions reached during the meeting. For each decision:
- What was decided? (be specific)
- Who made the decision or was involved? (use actual participant names)
- Why was this decision made? (key factors, reasoning, or rationale)
2. **Next Steps / Action Items**: Any tasks, follow-ups, or actions that were mentioned or assigned. For each action item:
- What specific task needs to be done? (be concrete and actionable)
- Who is responsible? (use actual participant names if mentioned, or "team" if unclear)
- When is it due? (any deadlines, timeframes, or "by next meeting" type commitments)
- What context is needed? (any additional details that help understand the task)
Guidelines:
- Be thorough and identify all action items, even if they seem minor
- Include items that were agreed upon, assigned, or committed to
- Include decisions even if they seem obvious or implicit
- If someone says "I'll do X" or "We should do Y", that's an action item
- If someone says "Let's go with option A", that's a decision
- Use the exact participant names from the transcript
- If no participant name is mentioned, you can leave assigned_to/decided_by as null
Only return empty lists if the transcript contains NO decisions and NO action items whatsoever.
"""
).strip()
STRUCTURED_RESPONSE_PROMPT_TEMPLATE = dedent(
"""
Based on the following analysis, provide the information in the requested JSON format:
@@ -155,6 +185,53 @@ class SubjectsResponse(BaseModel):
)
class ActionItem(BaseModel):
"""A single action item from the meeting"""
task: str = Field(description="The task or action item to be completed")
assigned_to: str | None = Field(
default=None, description="Person or team assigned to this task (name)"
)
assigned_to_participant_id: str | None = Field(
default=None, description="Participant ID if assigned_to matches a participant"
)
deadline: str | None = Field(
default=None, description="Deadline or timeframe mentioned for this task"
)
context: str | None = Field(
default=None, description="Additional context or notes about this task"
)
class Decision(BaseModel):
"""A decision made during the meeting"""
decision: str = Field(description="What was decided")
rationale: str | None = Field(
default=None,
description="Reasoning or key factors that influenced this decision",
)
decided_by: str | None = Field(
default=None, description="Person or group who made the decision (name)"
)
decided_by_participant_id: str | None = Field(
default=None, description="Participant ID if decided_by matches a participant"
)
class ActionItemsResponse(BaseModel):
"""Pydantic model for identified action items"""
decisions: list[Decision] = Field(
default_factory=list,
description="List of decisions made during the meeting",
)
next_steps: list[ActionItem] = Field(
default_factory=list,
description="List of action items and next steps to be taken",
)
class SummaryBuilder:
def __init__(self, llm: LLM, filename: str | None = None, logger=None) -> None:
self.transcript: str | None = None
@@ -166,6 +243,8 @@ class SummaryBuilder:
self.model_name: str = llm.model_name
self.logger = logger or structlog.get_logger()
self.participant_instructions: str | None = None
self.action_items: ActionItemsResponse | None = None
self.participant_name_to_id: dict[str, str] = {}
if filename:
self.read_transcript_from_file(filename)
@@ -189,13 +268,20 @@ class SummaryBuilder:
self.llm = llm
async def _get_structured_response(
self, prompt: str, output_cls: Type[T], tone_name: str | None = None
self,
prompt: str,
output_cls: Type[T],
tone_name: str | None = None,
timeout: int | None = None,
) -> T:
"""Generic function to get structured output from LLM for non-function-calling models."""
# Add participant instructions to the prompt if available
enhanced_prompt = self._enhance_prompt_with_participants(prompt)
return await self.llm.get_structured_response(
enhanced_prompt, [self.transcript], output_cls, tone_name=tone_name
enhanced_prompt,
[self.transcript],
output_cls,
tone_name=tone_name,
timeout=timeout,
)
async def _get_response(
@@ -216,11 +302,19 @@ class SummaryBuilder:
# Participants
# ----------------------------------------------------------------------------
def set_known_participants(self, participants: list[str]) -> None:
def set_known_participants(
self,
participants: list[str],
participant_name_to_id: dict[str, str] | None = None,
) -> None:
"""
Set known participants directly without LLM identification.
This is used when participants are already identified and stored.
They are appended at the end of the transcript, providing more context for the assistant.
Args:
participants: List of participant names
participant_name_to_id: Optional mapping of participant names to their IDs
"""
if not participants:
self.logger.warning("No participants provided")
@@ -231,10 +325,12 @@ class SummaryBuilder:
participants=participants,
)
if participant_name_to_id:
self.participant_name_to_id = participant_name_to_id
participants_md = self.format_list_md(participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}"
# Set instructions that will be automatically added to all prompts
participants_list = ", ".join(participants)
self.participant_instructions = dedent(
f"""
@@ -413,6 +509,92 @@ class SummaryBuilder:
self.recap = str(recap_response)
self.logger.info(f"Quick recap: {self.recap}")
def _map_participant_names_to_ids(
self, response: ActionItemsResponse
) -> ActionItemsResponse:
"""Map participant names in action items to participant IDs."""
if not self.participant_name_to_id:
return response
decisions = []
for decision in response.decisions:
new_decision = decision.model_copy()
if (
decision.decided_by
and decision.decided_by in self.participant_name_to_id
):
new_decision.decided_by_participant_id = self.participant_name_to_id[
decision.decided_by
]
decisions.append(new_decision)
next_steps = []
for item in response.next_steps:
new_item = item.model_copy()
if item.assigned_to and item.assigned_to in self.participant_name_to_id:
new_item.assigned_to_participant_id = self.participant_name_to_id[
item.assigned_to
]
next_steps.append(new_item)
return ActionItemsResponse(decisions=decisions, next_steps=next_steps)
async def identify_action_items(self) -> ActionItemsResponse | None:
"""Identify action items (decisions and next steps) from the transcript."""
self.logger.info("--- identify action items using TreeSummarize")
if not self.transcript:
self.logger.warning(
"No transcript available for action items identification"
)
self.action_items = None
return None
action_items_prompt = ACTION_ITEMS_PROMPT
try:
response = await self._get_structured_response(
action_items_prompt,
ActionItemsResponse,
tone_name="Action item identifier",
timeout=settings.LLM_STRUCTURED_RESPONSE_TIMEOUT,
)
response = self._map_participant_names_to_ids(response)
self.action_items = response
self.logger.info(
f"Identified {len(response.decisions)} decisions and {len(response.next_steps)} action items",
decisions_count=len(response.decisions),
next_steps_count=len(response.next_steps),
)
if response.decisions:
self.logger.debug(
"Decisions identified",
decisions=[d.decision for d in response.decisions],
)
if response.next_steps:
self.logger.debug(
"Action items identified",
tasks=[item.task for item in response.next_steps],
)
if not response.decisions and not response.next_steps:
self.logger.warning(
"No action items identified from transcript",
transcript_length=len(self.transcript),
)
return response
except Exception as e:
self.logger.error(
f"Error identifying action items: {e}",
exc_info=True,
)
self.action_items = None
return None
async def generate_summary(self, only_subjects: bool = False) -> None:
"""
Generate summary by extracting subjects, creating summaries for each, and generating a recap.
@@ -424,6 +606,7 @@ class SummaryBuilder:
await self.generate_subject_summaries()
await self.generate_recap()
await self.identify_action_items()
# ----------------------------------------------------------------------------
# Markdown
@@ -526,8 +709,6 @@ if __name__ == "__main__":
if args.summary:
await sm.generate_summary()
# Note: action items generation has been removed
print("")
print("-" * 80)
print("")

View File

@@ -1,7 +1,12 @@
from reflector.llm import LLM
from reflector.processors.base import Processor
from reflector.processors.summary.summary_builder import SummaryBuilder
from reflector.processors.types import FinalLongSummary, FinalShortSummary, TitleSummary
from reflector.processors.types import (
ActionItems,
FinalLongSummary,
FinalShortSummary,
TitleSummary,
)
from reflector.settings import settings
@@ -27,15 +32,20 @@ class TranscriptFinalSummaryProcessor(Processor):
builder = SummaryBuilder(self.llm, logger=self.logger)
builder.set_transcript(text)
# Use known participants if available, otherwise identify them
if self.transcript and self.transcript.participants:
# Extract participant names from the stored participants
participant_names = [p.name for p in self.transcript.participants if p.name]
if participant_names:
self.logger.info(
f"Using {len(participant_names)} known participants from transcript"
)
builder.set_known_participants(participant_names)
participant_name_to_id = {
p.name: p.id
for p in self.transcript.participants
if p.name and p.id
}
builder.set_known_participants(
participant_names, participant_name_to_id=participant_name_to_id
)
else:
self.logger.info(
"Participants field exists but is empty, identifying participants"
@@ -63,7 +73,6 @@ class TranscriptFinalSummaryProcessor(Processor):
self.logger.warning("No summary to output")
return
# build the speakermap from the transcript
speakermap = {}
if self.transcript:
speakermap = {
@@ -76,8 +85,6 @@ class TranscriptFinalSummaryProcessor(Processor):
speakermap=speakermap,
)
# build the transcript as a single string
# Replace speaker IDs with actual participant names if available
text_transcript = []
unique_speakers = set()
for topic in self.chunks:
@@ -111,4 +118,9 @@ class TranscriptFinalSummaryProcessor(Processor):
)
await self.emit(final_short_summary, name="short_summary")
if self.builder and self.builder.action_items:
action_items = self.builder.action_items.model_dump()
action_items = ActionItems(action_items=action_items)
await self.emit(action_items, name="action_items")
await self.emit(final_long_summary)

View File

@@ -78,7 +78,11 @@ class TranscriptTopicDetectorProcessor(Processor):
"""
prompt = TOPIC_PROMPT.format(text=text)
response = await self.llm.get_structured_response(
prompt, [text], TopicResponse, tone_name="Topic analyzer"
prompt,
[text],
TopicResponse,
tone_name="Topic analyzer",
timeout=settings.LLM_STRUCTURED_RESPONSE_TIMEOUT,
)
return response

View File

@@ -1,6 +1,7 @@
import io
import re
import tempfile
from collections import defaultdict
from pathlib import Path
from typing import Annotated, TypedDict
@@ -16,6 +17,17 @@ class DiarizationSegment(TypedDict):
PUNC_RE = re.compile(r"[.;:?!…]")
SENTENCE_END_RE = re.compile(r"[.?!…]$")
# Max segment length for words_to_segments() - breaks on any punctuation (. ; : ? ! …)
# when segment exceeds this limit. Used for non-multitrack recordings.
MAX_SEGMENT_CHARS = 120
# Max segment length for words_to_segments_by_sentence() - only breaks on sentence-ending
# punctuation (. ? ! …) when segment exceeds this limit. Higher threshold allows complete
# sentences in multitrack recordings where speakers overlap.
# similar number to server/reflector/processors/transcript_liner.py
MAX_SENTENCE_SEGMENT_CHARS = 1000
class AudioFile(BaseModel):
@@ -76,7 +88,6 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]:
# but separate if the speaker changes, or if the punctuation is a . , ; : ? !
segments = []
current_segment = None
MAX_SEGMENT_LENGTH = 120
for word in words:
if current_segment is None:
@@ -106,7 +117,7 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]:
current_segment.end = word.end
have_punc = PUNC_RE.search(word.text)
if have_punc and (len(current_segment.text) > MAX_SEGMENT_LENGTH):
if have_punc and (len(current_segment.text) > MAX_SEGMENT_CHARS):
segments.append(current_segment)
current_segment = None
@@ -116,6 +127,70 @@ def words_to_segments(words: list[Word]) -> list[TranscriptSegment]:
return segments
def words_to_segments_by_sentence(words: list[Word]) -> list[TranscriptSegment]:
"""Group words by speaker, then split into sentences.
For multitrack recordings where words from different speakers are interleaved
by timestamp, this function first groups all words by speaker, then creates
segments based on sentence boundaries within each speaker's words.
This produces cleaner output than words_to_segments() which breaks on every
speaker change, resulting in many tiny segments when speakers overlap.
"""
if not words:
return []
# Group words by speaker, preserving order within each speaker
by_speaker: dict[int, list[Word]] = defaultdict(list)
for w in words:
by_speaker[w.speaker].append(w)
segments: list[TranscriptSegment] = []
for speaker, speaker_words in by_speaker.items():
current_text = ""
current_start: float | None = None
current_end: float = 0.0
for word in speaker_words:
if current_start is None:
current_start = word.start
current_text += word.text
current_end = word.end
# Check for sentence end or max length
is_sentence_end = SENTENCE_END_RE.search(word.text.strip())
is_too_long = len(current_text) >= MAX_SENTENCE_SEGMENT_CHARS
if is_sentence_end or is_too_long:
segments.append(
TranscriptSegment(
text=current_text,
start=current_start,
end=current_end,
speaker=speaker,
)
)
current_text = ""
current_start = None
# Flush remaining words for this speaker
if current_text and current_start is not None:
segments.append(
TranscriptSegment(
text=current_text,
start=current_start,
end=current_end,
speaker=speaker,
)
)
# Sort segments by start time
segments.sort(key=lambda s: s.start)
return segments
class Transcript(BaseModel):
translation: str | None = None
words: list[Word] = []
@@ -154,7 +229,9 @@ class Transcript(BaseModel):
word.start += offset
word.end += offset
def as_segments(self) -> list[TranscriptSegment]:
def as_segments(self, is_multitrack: bool = False) -> list[TranscriptSegment]:
if is_multitrack:
return words_to_segments_by_sentence(self.words)
return words_to_segments(self.words)
@@ -187,6 +264,10 @@ class FinalShortSummary(BaseModel):
duration: float
class ActionItems(BaseModel):
action_items: dict # JSON-serializable dict from ActionItemsResponse
class FinalTitle(BaseModel):
title: str

View File

@@ -0,0 +1,17 @@
"""Schema definitions for transcript format types and segments."""
from typing import Literal
from pydantic import BaseModel
TranscriptFormat = Literal["text", "text-timestamped", "webvtt-named", "json"]
class TranscriptSegment(BaseModel):
"""A single transcript segment with speaker and timing information."""
speaker: int
speaker_name: str
text: str
start: float
end: float

View File

@@ -7,18 +7,23 @@ This module provides result-based error handling that works in both contexts:
"""
from dataclasses import dataclass
from typing import Literal, Union
from typing import Literal, Union, assert_never
import celery
from celery.result import AsyncResult
from hatchet_sdk.clients.rest.exceptions import ApiException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller
from reflector.db.transcripts import Transcript
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.utils.match import absurd
from reflector.settings import settings
from reflector.utils.string import NonEmptyString
@@ -38,6 +43,8 @@ class MultitrackProcessingConfig:
transcript_id: NonEmptyString
bucket_name: NonEmptyString
track_keys: list[str]
recording_id: NonEmptyString | None = None
room_id: NonEmptyString | None = None
mode: Literal["multitrack"] = "multitrack"
@@ -50,6 +57,7 @@ class ValidationOk:
# transcript currently doesnt always have recording_id
recording_id: NonEmptyString | None
transcript_id: NonEmptyString
room_id: NonEmptyString | None = None
@dataclass
@@ -97,6 +105,7 @@ async def validate_transcript_for_processing(
if transcript.status == "idle":
return ValidationNotReady(detail="Recording is not ready for processing")
# Check Celery tasks
if task_is_scheduled_or_active(
"reflector.pipelines.main_file_pipeline.task_pipeline_file_process",
transcript_id=transcript.id,
@@ -106,8 +115,25 @@ async def validate_transcript_for_processing(
):
return ValidationAlreadyScheduled(detail="already running")
# Check Hatchet workflows (if enabled)
if settings.HATCHET_ENABLED and transcript.workflow_run_id:
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
# If workflow is running or queued, don't allow new processing
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
return ValidationAlreadyScheduled(
detail="Hatchet workflow already running"
)
except ApiException:
# Workflow might be gone (404) or API issue - allow processing
pass
return ValidationOk(
recording_id=transcript.recording_id, transcript_id=transcript.id
recording_id=transcript.recording_id,
transcript_id=transcript.id,
room_id=transcript.room_id,
)
@@ -117,6 +143,7 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
"""
bucket_name: str | None = None
track_keys: list[str] | None = None
recording_id: str | None = validation.recording_id
if validation.recording_id:
recording = await recordings_controller.get_by_id(validation.recording_id)
@@ -138,6 +165,8 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
bucket_name=bucket_name, # type: ignore (validated above)
track_keys=track_keys,
transcript_id=validation.transcript_id,
recording_id=recording_id,
room_id=validation.room_id,
)
return FileProcessingConfig(
@@ -145,8 +174,104 @@ async def prepare_transcript_processing(validation: ValidationOk) -> PrepareResu
)
def dispatch_transcript_processing(config: ProcessingConfig) -> AsyncResult:
async def dispatch_transcript_processing(
config: ProcessingConfig, force: bool = False
) -> AsyncResult | None:
"""Dispatch transcript processing to appropriate backend (Hatchet or Celery).
Returns AsyncResult for Celery tasks, None for Hatchet workflows.
"""
if isinstance(config, MultitrackProcessingConfig):
# Check if room has use_hatchet=True (overrides env vars)
room_forces_hatchet = False
if config.room_id:
room = await rooms_controller.get_by_id(config.room_id)
room_forces_hatchet = room.use_hatchet if room else False
# Start durable workflow if enabled (Hatchet)
# or if room has use_hatchet=True
use_hatchet = settings.HATCHET_ENABLED or room_forces_hatchet
if room_forces_hatchet:
logger.info(
"Room forces Hatchet workflow",
room_id=config.room_id,
transcript_id=config.transcript_id,
)
if use_hatchet:
# First check if we can replay (outside transaction since it's read-only)
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id and not force:
can_replay = await HatchetClientManager.can_replay(
transcript.workflow_run_id
)
if can_replay:
await HatchetClientManager.replay_workflow(
transcript.workflow_run_id
)
logger.info(
"Replaying Hatchet workflow",
workflow_id=transcript.workflow_run_id,
)
return None
# Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id)
logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": None}
)
# Re-fetch and check for concurrent dispatch (optimistic approach).
# No database lock - worst case is duplicate dispatch, but Hatchet
# workflows are idempotent so this is acceptable.
transcript = await transcripts_controller.get_by_id(config.transcript_id)
if transcript and transcript.workflow_run_id:
# Another process started a workflow between validation and now
try:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
logger.info(
"Concurrent workflow detected, skipping dispatch",
workflow_id=transcript.workflow_run_id,
)
return None
except ApiException:
# Workflow might be gone (404) or API issue - proceed with new workflow
pass
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": config.recording_id,
"tracks": [{"s3_key": k} for k in config.track_keys],
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
},
additional_metadata={
"transcript_id": config.transcript_id,
"recording_id": config.recording_id,
"daily_recording_id": config.recording_id,
},
)
if transcript:
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info("Hatchet workflow dispatched", workflow_id=workflow_id)
return None
# Celery pipeline (durable workflows disabled)
return task_pipeline_multitrack_process.delay(
transcript_id=config.transcript_id,
bucket_name=config.bucket_name,
@@ -155,13 +280,16 @@ def dispatch_transcript_processing(config: ProcessingConfig) -> AsyncResult:
elif isinstance(config, FileProcessingConfig):
return task_pipeline_file_process.delay(transcript_id=config.transcript_id)
else:
absurd(config)
assert_never(config)
def task_is_scheduled_or_active(task_name: str, **kwargs):
inspect = celery.current_app.control.inspect()
for worker, tasks in (inspect.scheduled() | inspect.active()).items():
scheduled = inspect.scheduled() or {}
active = inspect.active() or {}
all = scheduled | active
for worker, tasks in all.items():
for task in tasks:
if task["name"] == task_name and task["kwargs"] == kwargs:
return True

View File

@@ -74,6 +74,13 @@ class Settings(BaseSettings):
LLM_API_KEY: str | None = None
LLM_CONTEXT_WINDOW: int = 16000
LLM_PARSE_MAX_RETRIES: int = (
3 # Max retries for JSON/validation errors (total attempts = retries + 1)
)
LLM_STRUCTURED_RESPONSE_TIMEOUT: int = (
300 # Timeout in seconds for structured responses (5 minutes)
)
# Diarization
DIARIZATION_ENABLED: bool = True
DIARIZATION_BACKEND: str = "modal"
@@ -146,5 +153,19 @@ class Settings(BaseSettings):
ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None
# Durable workflow orchestration
# Provider: "hatchet" (or "none" to disable)
DURABLE_WORKFLOW_PROVIDER: str = "none"
# Hatchet workflow orchestration
HATCHET_CLIENT_TOKEN: str | None = None
HATCHET_CLIENT_TLS_STRATEGY: str = "none" # none, tls, mtls
HATCHET_DEBUG: bool = False
@property
def HATCHET_ENABLED(self) -> bool:
"""True if Hatchet is the active provider."""
return self.DURABLE_WORKFLOW_PROVIDER == "hatchet"
settings = Settings()

View File

@@ -15,8 +15,11 @@ import time
from typing import Callable
from celery.result import AsyncResult
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db import get_database
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.services.transcript_process import (
FileProcessingConfig,
MultitrackProcessingConfig,
@@ -34,24 +37,26 @@ async def process_transcript_inner(
transcript: Transcript,
on_validation: Callable[[ValidationResult], None],
on_preprocess: Callable[[PrepareResult], None],
) -> AsyncResult:
force: bool = False,
) -> AsyncResult | None:
validation = await validate_transcript_for_processing(transcript)
on_validation(validation)
config = await prepare_transcript_processing(validation)
on_preprocess(config)
return dispatch_transcript_processing(config)
return await dispatch_transcript_processing(config, force=force)
async def process_transcript(transcript_id: str, sync: bool = False) -> None:
async def process_transcript(
transcript_id: str, sync: bool = False, force: bool = False
) -> None:
"""
Process a transcript by ID, auto-detecting multitrack vs file pipeline.
Args:
transcript_id: The transcript UUID
sync: If True, wait for task completion. If False, dispatch and exit.
force: If True, cancel old workflow and start new (latest code). If False, replay failed workflow.
"""
from reflector.db import get_database
database = get_database()
await database.connect()
@@ -82,10 +87,42 @@ async def process_transcript(transcript_id: str, sync: bool = False) -> None:
print(f"Dispatching file pipeline", file=sys.stderr)
result = await process_transcript_inner(
transcript, on_validation=on_validation, on_preprocess=on_preprocess
transcript,
on_validation=on_validation,
on_preprocess=on_preprocess,
force=force,
)
if result is None:
# Hatchet workflow dispatched
if sync:
# Re-fetch transcript to get workflow_run_id
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.workflow_run_id:
print("Error: workflow_run_id not found", file=sys.stderr)
sys.exit(1)
print("Waiting for Hatchet workflow...", file=sys.stderr)
while True:
status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id
)
print(f" Status: {status.value}", file=sys.stderr)
if status == V1TaskStatus.COMPLETED:
print("Workflow completed successfully", file=sys.stderr)
break
elif status in (V1TaskStatus.FAILED, V1TaskStatus.CANCELLED):
print(f"Workflow failed: {status}", file=sys.stderr)
sys.exit(1)
await asyncio.sleep(5)
else:
print(
"Task dispatched (use --sync to wait for completion)",
file=sys.stderr,
)
elif sync:
print("Waiting for task completion...", file=sys.stderr)
while not result.ready():
print(f" Status: {result.state}", file=sys.stderr)
@@ -118,9 +155,16 @@ def main():
action="store_true",
help="Wait for task completion instead of just dispatching",
)
parser.add_argument(
"--force",
action="store_true",
help="Cancel old workflow and start new (uses latest code instead of replaying)",
)
args = parser.parse_args()
asyncio.run(process_transcript(args.transcript_id, sync=args.sync))
asyncio.run(
process_transcript(args.transcript_id, sync=args.sync, force=args.force)
)
if __name__ == "__main__":

View File

@@ -0,0 +1,15 @@
"""
Shared audio processing constants.
Used by both Hatchet workflows and Celery pipelines for consistent audio encoding.
"""
# Opus codec settings
OPUS_STANDARD_SAMPLE_RATE = 48000
OPUS_DEFAULT_BIT_RATE = 128000 # 128kbps for good speech quality
# S3 presigned URL expiration
PRESIGNED_URL_EXPIRATION_SECONDS = 7200 # 2 hours
# Waveform visualization
WAVEFORM_SEGMENTS = 255

View File

@@ -0,0 +1,227 @@
"""
Audio track mixdown utilities.
Shared PyAV-based functions for mixing multiple audio tracks into a single output.
Used by both Hatchet workflows and Celery pipelines.
"""
from fractions import Fraction
import av
from av.audio.resampler import AudioResampler
def detect_sample_rate_from_tracks(track_urls: list[str], logger=None) -> int | None:
"""Detect sample rate from first decodable audio frame.
Args:
track_urls: List of URLs to audio files (S3 presigned or local)
logger: Optional logger instance
Returns:
Sample rate in Hz, or None if no decodable frames found
"""
for url in track_urls:
if not url:
continue
container = None
try:
container = av.open(url)
for frame in container.decode(audio=0):
return frame.sample_rate
except Exception:
continue
finally:
if container is not None:
container.close()
return None
async def mixdown_tracks_pyav(
track_urls: list[str],
writer,
target_sample_rate: int,
offsets_seconds: list[float] | None = None,
logger=None,
) -> None:
"""Multi-track mixdown using PyAV filter graph (amix).
Builds a filter graph: N abuffer -> optional adelay -> amix -> aformat -> sink
Reads from S3 presigned URLs or local files, pushes mixed frames to writer.
Args:
track_urls: List of URLs to audio tracks (S3 presigned or local)
writer: AudioFileWriterProcessor instance with async push() method
target_sample_rate: Sample rate for output (Hz)
offsets_seconds: Optional per-track delays in seconds for alignment.
If provided, must have same length as track_urls. Delays are relative
to the minimum offset (earliest track has delay=0).
logger: Optional logger instance
Raises:
ValueError: If offsets_seconds length doesn't match track_urls,
no valid tracks provided, or no containers can be opened
"""
if offsets_seconds is not None and len(offsets_seconds) != len(track_urls):
raise ValueError(
f"offsets_seconds length ({len(offsets_seconds)}) must match track_urls ({len(track_urls)})"
)
valid_track_urls = [url for url in track_urls if url]
if not valid_track_urls:
if logger:
logger.error("Mixdown failed - no valid track URLs provided")
raise ValueError("Mixdown failed: No valid track URLs")
# Calculate per-input delays if offsets provided
input_offsets_seconds = None
if offsets_seconds is not None:
input_offsets_seconds = [
offsets_seconds[i] for i, url in enumerate(track_urls) if url
]
# Build PyAV filter graph:
# N abuffer (s32/stereo)
# -> optional adelay per input (for alignment)
# -> amix (s32)
# -> aformat(s16)
# -> sink
graph = av.filter.Graph()
inputs = []
for idx, url in enumerate(valid_track_urls):
args = (
f"time_base=1/{target_sample_rate}:"
f"sample_rate={target_sample_rate}:"
f"sample_fmt=s32:"
f"channel_layout=stereo"
)
in_ctx = graph.add("abuffer", args=args, name=f"in{idx}")
inputs.append(in_ctx)
if not inputs:
if logger:
logger.error("Mixdown failed - no valid inputs for graph")
raise ValueError("Mixdown failed: No valid inputs for filter graph")
mixer = graph.add("amix", args=f"inputs={len(inputs)}:normalize=0", name="mix")
fmt = graph.add(
"aformat",
args=f"sample_fmts=s32:channel_layouts=stereo:sample_rates={target_sample_rate}",
name="fmt",
)
sink = graph.add("abuffersink", name="out")
# Optional per-input delay before mixing
delays_ms: list[int] = []
if input_offsets_seconds is not None:
base = min(input_offsets_seconds) if input_offsets_seconds else 0.0
delays_ms = [
max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds
]
else:
delays_ms = [0 for _ in inputs]
for idx, in_ctx in enumerate(inputs):
delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0
if delay_ms > 0:
# adelay requires one value per channel; use same for stereo
adelay = graph.add(
"adelay",
args=f"delays={delay_ms}|{delay_ms}:all=1",
name=f"delay{idx}",
)
in_ctx.link_to(adelay)
adelay.link_to(mixer, 0, idx)
else:
in_ctx.link_to(mixer, 0, idx)
mixer.link_to(fmt)
fmt.link_to(sink)
graph.configure()
containers = []
try:
# Open all containers with cleanup guaranteed
for i, url in enumerate(valid_track_urls):
try:
c = av.open(
url,
options={
# S3 streaming options
"reconnect": "1",
"reconnect_streamed": "1",
"reconnect_delay_max": "5",
},
)
containers.append(c)
except Exception as e:
if logger:
logger.warning(
"Mixdown: failed to open container from URL",
input=i,
url=url,
error=str(e),
)
if not containers:
if logger:
logger.error("Mixdown failed - no valid containers opened")
raise ValueError("Mixdown failed: Could not open any track containers")
decoders = [c.decode(audio=0) for c in containers]
active = [True] * len(decoders)
resamplers = [
AudioResampler(format="s32", layout="stereo", rate=target_sample_rate)
for _ in decoders
]
while any(active):
for i, (dec, is_active) in enumerate(zip(decoders, active)):
if not is_active:
continue
try:
frame = next(dec)
except StopIteration:
active[i] = False
# Signal end of stream to filter graph
inputs[i].push(None)
continue
if frame.sample_rate != target_sample_rate:
continue
out_frames = resamplers[i].resample(frame) or []
for rf in out_frames:
rf.sample_rate = target_sample_rate
rf.time_base = Fraction(1, target_sample_rate)
inputs[i].push(rf)
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
# Flush remaining frames from filter graph
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
finally:
# Cleanup all containers, even if processing failed
for c in containers:
if c is not None:
try:
c.close()
except Exception:
pass # Best effort cleanup

View File

@@ -0,0 +1,186 @@
"""
Audio track padding utilities.
Shared PyAV-based functions for extracting stream metadata and applying
silence padding to audio tracks. Used by both Hatchet workflows and Celery pipelines.
"""
import math
from fractions import Fraction
import av
from av.audio.resampler import AudioResampler
from reflector.utils.audio_constants import (
OPUS_DEFAULT_BIT_RATE,
OPUS_STANDARD_SAMPLE_RATE,
)
def extract_stream_start_time_from_container(
container,
track_idx: int,
logger=None,
) -> float:
"""Extract meeting-relative start time from WebM stream metadata.
Uses PyAV to read stream.start_time from WebM container.
More accurate than filename timestamps by ~209ms due to network/encoding delays.
Args:
container: PyAV container opened from audio file/URL
track_idx: Track index for logging context
logger: Optional logger instance (structlog or stdlib compatible)
Returns:
Start time in seconds (0.0 if extraction fails)
"""
start_time_seconds = 0.0
try:
audio_streams = [s for s in container.streams if s.type == "audio"]
stream = audio_streams[0] if audio_streams else container.streams[0]
# 1) Try stream-level start_time (most reliable for Daily.co tracks)
if stream.start_time is not None and stream.time_base is not None:
start_time_seconds = float(stream.start_time * stream.time_base)
# 2) Fallback to container-level start_time (in av.time_base units)
if (start_time_seconds <= 0) and (container.start_time is not None):
start_time_seconds = float(container.start_time * av.time_base)
# 3) Fallback to first packet DTS in stream.time_base
if start_time_seconds <= 0:
for packet in container.demux(stream):
if packet.dts is not None:
start_time_seconds = float(packet.dts * stream.time_base)
break
except Exception as e:
if logger:
logger.warning(
"PyAV metadata read failed; assuming 0 start_time",
track_idx=track_idx,
error=str(e),
)
start_time_seconds = 0.0
if logger:
logger.info(
f"Track {track_idx} stream metadata: start_time={start_time_seconds:.3f}s",
track_idx=track_idx,
)
return start_time_seconds
def apply_audio_padding_to_file(
in_container,
output_path: str,
start_time_seconds: float,
track_idx: int,
logger=None,
) -> None:
"""Apply silence padding to audio track using PyAV filter graph.
Uses adelay filter to prepend silence, aligning track to meeting start time.
Output is WebM/Opus format.
Args:
in_container: PyAV container opened from source audio
output_path: Path for output WebM file
start_time_seconds: Amount of silence to prepend (in seconds)
track_idx: Track index for logging context
logger: Optional logger instance (structlog or stdlib compatible)
Raises:
Exception: If no audio stream found or PyAV processing fails
"""
delay_ms = math.floor(start_time_seconds * 1000)
if logger:
logger.info(
f"Padding track {track_idx} with {delay_ms}ms delay using PyAV",
track_idx=track_idx,
delay_ms=delay_ms,
)
try:
with av.open(output_path, "w", format="webm") as out_container:
in_stream = next(
(s for s in in_container.streams if s.type == "audio"), None
)
if in_stream is None:
raise Exception("No audio stream in input")
out_stream = out_container.add_stream(
"libopus", rate=OPUS_STANDARD_SAMPLE_RATE
)
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
graph = av.filter.Graph()
abuf_args = (
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_fmt=s16:"
f"channel_layout=stereo"
)
src = graph.add("abuffer", args=abuf_args, name="src")
aresample_f = graph.add("aresample", args="async=1", name="ares")
# adelay requires one delay value per channel separated by '|'
delays_arg = f"{delay_ms}|{delay_ms}"
adelay_f = graph.add(
"adelay", args=f"delays={delays_arg}:all=1", name="delay"
)
sink = graph.add("abuffersink", name="sink")
src.link_to(aresample_f)
aresample_f.link_to(adelay_f)
adelay_f.link_to(sink)
graph.configure()
resampler = AudioResampler(
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
)
# Decode -> resample -> push through graph -> encode Opus
for frame in in_container.decode(in_stream):
out_frames = resampler.resample(frame) or []
for rframe in out_frames:
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
src.push(rframe)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush remaining frames from filter graph
src.push(None)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush encoder
for packet in out_stream.encode(None):
out_container.mux(packet)
except Exception as e:
if logger:
logger.error(
"PyAV padding failed for track",
track_idx=track_idx,
delay_ms=delay_ms,
error=str(e),
exc_info=True,
)
raise

View File

@@ -0,0 +1,4 @@
def assert_not_none[T](value: T | None, message: str = "Value is None") -> T:
if value is None:
raise ValueError(message)
return value

View File

@@ -64,6 +64,11 @@ def recording_lock_key(recording_id: NonEmptyString) -> NonEmptyString:
return f"recording:{recording_id}"
def filter_cam_audio_tracks(track_keys: list[str]) -> list[str]:
"""Filter track keys to cam-audio tracks only (skip screen-audio, etc.)."""
return [k for k in track_keys if "cam-audio" in k]
def extract_base_room_name(daily_room_name: DailyRoomName) -> NonEmptyString:
"""
Extract base room name from Daily.co timestamped room name.

View File

@@ -1,10 +0,0 @@
from typing import NoReturn
def assert_exhaustiveness(x: NoReturn) -> NoReturn:
"""Provide an assertion at type-check time that this function is never called."""
raise AssertionError(f"Invalid value: {x!r}")
def absurd(x: NoReturn) -> NoReturn:
return assert_exhaustiveness(x)

View File

@@ -2,6 +2,17 @@ from typing import Annotated, TypeVar
from pydantic import Field, TypeAdapter, constr
T_NotNone = TypeVar("T_NotNone")
def assert_not_none(
value: T_NotNone | None, message: str = "Value is None"
) -> T_NotNone:
if value is None:
raise ValueError(message)
return value
NonEmptyStringBase = constr(min_length=1, strip_whitespace=False)
NonEmptyString = Annotated[
NonEmptyStringBase,
@@ -23,10 +34,18 @@ def try_parse_non_empty_string(s: str) -> NonEmptyString | None:
return parse_non_empty_string(s)
T = TypeVar("T", bound=str)
T_Str = TypeVar("T_Str", bound=str)
def assert_equal[T](s1: T, s2: T) -> T:
def assert_equal(s1: T_Str, s2: T_Str) -> T_Str:
if s1 != s2:
raise ValueError(f"assert_equal: {s1} != {s2}")
return s1
def assert_non_none_and_non_empty(
value: str | None, error: str | None = None
) -> NonEmptyString:
return parse_non_empty_string(
assert_not_none(value, error or "Value is None"), error
)

View File

@@ -0,0 +1,133 @@
"""Utilities for converting transcript data to various output formats."""
import webvtt
from reflector.db.transcripts import TranscriptParticipant, TranscriptTopic
from reflector.processors.types import (
Transcript as ProcessorTranscript,
)
from reflector.schemas.transcript_formats import TranscriptSegment
from reflector.utils.webvtt import seconds_to_timestamp
def get_speaker_name(
speaker: int, participants: list[TranscriptParticipant] | None
) -> str:
"""Get participant name for speaker or default to 'Speaker N'."""
if participants:
for participant in participants:
if participant.speaker == speaker:
return participant.name
return f"Speaker {speaker}"
def format_timestamp_mmss(seconds: float | int) -> str:
"""Format seconds as MM:SS timestamp."""
minutes = int(seconds // 60)
secs = int(seconds % 60)
return f"{minutes:02d}:{secs:02d}"
def transcript_to_text(
topics: list[TranscriptTopic],
participants: list[TranscriptParticipant] | None,
is_multitrack: bool = False,
) -> str:
"""Convert transcript topics to plain text with speaker names."""
lines = []
for topic in topics:
if not topic.words:
continue
transcript = ProcessorTranscript(words=topic.words)
segments = transcript.as_segments(is_multitrack)
for segment in segments:
speaker_name = get_speaker_name(segment.speaker, participants)
text = segment.text.strip()
lines.append(f"{speaker_name}: {text}")
return "\n".join(lines)
def transcript_to_text_timestamped(
topics: list[TranscriptTopic],
participants: list[TranscriptParticipant] | None,
is_multitrack: bool = False,
) -> str:
"""Convert transcript topics to timestamped text with speaker names."""
lines = []
for topic in topics:
if not topic.words:
continue
transcript = ProcessorTranscript(words=topic.words)
segments = transcript.as_segments(is_multitrack)
for segment in segments:
speaker_name = get_speaker_name(segment.speaker, participants)
timestamp = format_timestamp_mmss(segment.start)
text = segment.text.strip()
lines.append(f"[{timestamp}] {speaker_name}: {text}")
return "\n".join(lines)
def topics_to_webvtt_named(
topics: list[TranscriptTopic],
participants: list[TranscriptParticipant] | None,
is_multitrack: bool = False,
) -> str:
"""Convert transcript topics to WebVTT format with participant names."""
vtt = webvtt.WebVTT()
for topic in topics:
if not topic.words:
continue
transcript = ProcessorTranscript(words=topic.words)
segments = transcript.as_segments(is_multitrack)
for segment in segments:
speaker_name = get_speaker_name(segment.speaker, participants)
text = segment.text.strip()
text = f"<v {speaker_name}>{text}"
caption = webvtt.Caption(
start=seconds_to_timestamp(segment.start),
end=seconds_to_timestamp(segment.end),
text=text,
)
vtt.captions.append(caption)
return vtt.content
def transcript_to_json_segments(
topics: list[TranscriptTopic],
participants: list[TranscriptParticipant] | None,
is_multitrack: bool = False,
) -> list[TranscriptSegment]:
"""Convert transcript topics to a flat list of JSON segments."""
result = []
for topic in topics:
if not topic.words:
continue
transcript = ProcessorTranscript(words=topic.words)
segments = transcript.as_segments(is_multitrack)
for segment in segments:
speaker_name = get_speaker_name(segment.speaker, participants)
result.append(
TranscriptSegment(
speaker=segment.speaker,
speaker_name=speaker_name,
text=segment.text.strip(),
start=segment.start,
end=segment.end,
)
)
return result

View File

@@ -13,7 +13,7 @@ VttTimestamp = Annotated[str, "vtt_timestamp"]
WebVTTStr = Annotated[str, "webvtt_str"]
def _seconds_to_timestamp(seconds: Seconds) -> VttTimestamp:
def seconds_to_timestamp(seconds: Seconds) -> VttTimestamp:
# lib doesn't do that
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
@@ -37,8 +37,8 @@ def words_to_webvtt(words: list[Word]) -> WebVTTStr:
text = f"<v Speaker{segment.speaker}>{text}"
caption = webvtt.Caption(
start=_seconds_to_timestamp(segment.start),
end=_seconds_to_timestamp(segment.end),
start=seconds_to_timestamp(segment.start),
end=seconds_to_timestamp(segment.end),
text=text,
)
vtt.captions.append(caption)

View File

@@ -31,6 +31,7 @@ class DailyClient(VideoPlatformClient):
PLATFORM_NAME: Platform = "daily"
TIMESTAMP_FORMAT = "%Y%m%d%H%M%S"
RECORDING_NONE: RecordingType = "none"
RECORDING_LOCAL: RecordingType = "local"
RECORDING_CLOUD: RecordingType = "cloud"
def __init__(self, config: VideoPlatformConfig):
@@ -54,19 +55,23 @@ class DailyClient(VideoPlatformClient):
timestamp = datetime.now().strftime(self.TIMESTAMP_FORMAT)
room_name = f"{room_name_prefix}{ROOM_PREFIX_SEPARATOR}{timestamp}"
properties = RoomProperties(
enable_recording = None
if room.recording_type == self.RECORDING_LOCAL:
enable_recording = "local"
elif room.recording_type == self.RECORDING_CLOUD:
enable_recording = "raw-tracks"
if room.recording_type != self.RECORDING_NONE
else False,
properties = RoomProperties(
enable_recording=enable_recording,
enable_chat=True,
enable_screenshare=True,
enable_knocking=room.is_locked,
start_video_off=False,
start_audio_off=False,
exp=int(end_date.timestamp()),
)
# Only configure recordings_bucket if recording is enabled
if room.recording_type != self.RECORDING_NONE:
if room.recording_type == self.RECORDING_CLOUD:
daily_storage = get_dailyco_storage()
assert daily_storage.bucket_name, "S3 bucket must be configured"
properties.recordings_bucket = RecordingsBucketConfig(
@@ -172,16 +177,18 @@ class DailyClient(VideoPlatformClient):
async def create_meeting_token(
self,
room_name: DailyRoomName,
enable_recording: bool,
user_id: str | None = None,
) -> str:
start_cloud_recording: bool,
enable_recording_ui: bool,
user_id: NonEmptyString | None = None,
is_owner: bool = False,
) -> NonEmptyString:
properties = MeetingTokenProperties(
room_name=room_name,
user_id=user_id,
start_cloud_recording=enable_recording,
enable_recording_ui=not enable_recording,
start_cloud_recording=start_cloud_recording,
enable_recording_ui=enable_recording_ui,
is_owner=is_owner,
)
request = CreateMeetingTokenRequest(properties=properties)
result = await self._api_client.create_meeting_token(request)
return result.token

View File

@@ -89,7 +89,7 @@ class CreateRoom(BaseModel):
ics_url: Optional[str] = None
ics_fetch_interval: int = 300
ics_enabled: bool = False
platform: Optional[Platform] = None
platform: Platform
class UpdateRoom(BaseModel):
@@ -310,6 +310,22 @@ async def rooms_create_meeting(
room=room, current_time=current_time
)
if meeting is not None:
settings_match = (
meeting.is_locked == room.is_locked
and meeting.room_mode == room.room_mode
and meeting.recording_type == room.recording_type
and meeting.recording_trigger == room.recording_trigger
and meeting.platform == room.platform
)
if not settings_match:
logger.info(
f"Room settings changed for {room_name}, creating new meeting",
room_id=room.id,
old_meeting_id=meeting.id,
)
meeting = None
if meeting is None:
end_date = current_time + timedelta(hours=8)
@@ -549,20 +565,16 @@ async def rooms_join_meeting(
if meeting.end_date <= current_time:
raise HTTPException(status_code=400, detail="Meeting has ended")
if meeting.platform == "daily":
if meeting.platform == "daily" and user_id is not None:
client = create_platform_client(meeting.platform)
enable_recording = room.recording_trigger != "none"
token = await client.create_meeting_token(
meeting.room_name,
enable_recording=enable_recording,
start_cloud_recording=meeting.recording_type == "cloud",
enable_recording_ui=meeting.recording_type == "local",
user_id=user_id,
is_owner=user_id == room.user_id,
)
meeting = meeting.model_copy()
meeting.room_url = add_query_param(meeting.room_url, "t", token)
if meeting.host_room_url:
meeting.host_room_url = add_query_param(meeting.host_room_url, "t", token)
if user_id != room.user_id and meeting.platform == "whereby":
meeting.host_room_url = ""
return meeting

View File

@@ -1,14 +1,23 @@
from datetime import datetime, timedelta, timezone
from typing import Annotated, Literal, Optional
from typing import Annotated, Literal, Optional, assert_never
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi_pagination import Page
from fastapi_pagination.ext.databases import apaginate
from jose import jwt
from pydantic import AwareDatetime, BaseModel, Field, constr, field_serializer
from pydantic import (
AwareDatetime,
BaseModel,
Discriminator,
Field,
constr,
field_serializer,
)
import reflector.auth as auth
from reflector.db import get_database
from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.search import (
DEFAULT_SEARCH_LIMIT,
SearchLimit,
@@ -29,9 +38,17 @@ from reflector.db.transcripts import (
TranscriptTopic,
transcripts_controller,
)
from reflector.db.users import user_controller
from reflector.processors.types import Transcript as ProcessorTranscript
from reflector.processors.types import Word
from reflector.schemas.transcript_formats import TranscriptFormat, TranscriptSegment
from reflector.settings import settings
from reflector.utils.transcript_formats import (
topics_to_webvtt_named,
transcript_to_json_segments,
transcript_to_text,
transcript_to_text_timestamped,
)
from reflector.ws_manager import get_ws_manager
from reflector.zulip import (
InvalidMessageError,
@@ -46,6 +63,14 @@ ALGORITHM = "HS256"
DOWNLOAD_EXPIRE_MINUTES = 60
async def _get_is_multitrack(transcript) -> bool:
"""Detect if transcript is from multitrack recording."""
if not transcript.recording_id:
return False
recording = await recordings_controller.get_by_id(transcript.recording_id)
return recording is not None and recording.is_multitrack
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.now(timezone.utc) + expires_delta
@@ -88,8 +113,86 @@ class GetTranscriptMinimal(BaseModel):
audio_deleted: bool | None = None
class GetTranscript(GetTranscriptMinimal):
participants: list[TranscriptParticipant] | None
class TranscriptParticipantWithEmail(TranscriptParticipant):
email: str | None = None
class GetTranscriptWithParticipants(GetTranscriptMinimal):
participants: list[TranscriptParticipantWithEmail] | None
class GetTranscriptWithText(GetTranscriptWithParticipants):
"""
Transcript response with plain text format.
Format: Speaker names followed by their dialogue, one line per segment.
Example:
John Smith: Hello everyone
Jane Doe: Hi there
"""
transcript_format: Literal["text"] = "text"
transcript: str
class GetTranscriptWithTextTimestamped(GetTranscriptWithParticipants):
"""
Transcript response with timestamped text format.
Format: [MM:SS] timestamp prefix before each speaker and dialogue.
Example:
[00:00] John Smith: Hello everyone
[00:05] Jane Doe: Hi there
"""
transcript_format: Literal["text-timestamped"] = "text-timestamped"
transcript: str
class GetTranscriptWithWebVTTNamed(GetTranscriptWithParticipants):
"""
Transcript response in WebVTT subtitle format with participant names.
Format: Standard WebVTT with voice tags using participant names.
Example:
WEBVTT
00:00:00.000 --> 00:00:05.000
<v John Smith>Hello everyone
"""
transcript_format: Literal["webvtt-named"] = "webvtt-named"
transcript: str
class GetTranscriptWithJSON(GetTranscriptWithParticipants):
"""
Transcript response as structured JSON segments.
Format: Array of segment objects with speaker info, text, and timing.
Example:
[
{
"speaker": 0,
"speaker_name": "John Smith",
"text": "Hello everyone",
"start": 0.0,
"end": 5.0
}
]
"""
transcript_format: Literal["json"] = "json"
transcript: list[TranscriptSegment]
GetTranscript = Annotated[
GetTranscriptWithText
| GetTranscriptWithTextTimestamped
| GetTranscriptWithWebVTTNamed
| GetTranscriptWithJSON,
Discriminator("transcript_format"),
]
class CreateTranscript(BaseModel):
@@ -228,7 +331,7 @@ async def transcripts_search(
)
@router.post("/transcripts", response_model=GetTranscript)
@router.post("/transcripts", response_model=GetTranscriptWithParticipants)
async def transcripts_create(
info: CreateTranscript,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
@@ -272,7 +375,7 @@ class GetTranscriptTopic(BaseModel):
segments: list[GetTranscriptSegmentTopic] = []
@classmethod
def from_transcript_topic(cls, topic: TranscriptTopic):
def from_transcript_topic(cls, topic: TranscriptTopic, is_multitrack: bool = False):
if not topic.words:
# In previous version, words were missing
# Just output a segment with speaker 0
@@ -296,7 +399,7 @@ class GetTranscriptTopic(BaseModel):
start=segment.start,
speaker=segment.speaker,
)
for segment in transcript.as_segments()
for segment in transcript.as_segments(is_multitrack)
]
return cls(
id=topic.id,
@@ -313,8 +416,8 @@ class GetTranscriptTopicWithWords(GetTranscriptTopic):
words: list[Word] = []
@classmethod
def from_transcript_topic(cls, topic: TranscriptTopic):
instance = super().from_transcript_topic(topic)
def from_transcript_topic(cls, topic: TranscriptTopic, is_multitrack: bool = False):
instance = super().from_transcript_topic(topic, is_multitrack)
if topic.words:
instance.words = topic.words
return instance
@@ -329,8 +432,8 @@ class GetTranscriptTopicWithWordsPerSpeaker(GetTranscriptTopic):
words_per_speaker: list[SpeakerWords] = []
@classmethod
def from_transcript_topic(cls, topic: TranscriptTopic):
instance = super().from_transcript_topic(topic)
def from_transcript_topic(cls, topic: TranscriptTopic, is_multitrack: bool = False):
instance = super().from_transcript_topic(topic, is_multitrack)
if topic.words:
words_per_speakers = []
# group words by speaker
@@ -362,14 +465,95 @@ class GetTranscriptTopicWithWordsPerSpeaker(GetTranscriptTopic):
async def transcript_get(
transcript_id: str,
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
transcript_format: TranscriptFormat = "text",
):
user_id = user["sub"] if user else None
return await transcripts_controller.get_by_id_for_http(
transcript = await transcripts_controller.get_by_id_for_http(
transcript_id, user_id=user_id
)
is_multitrack = await _get_is_multitrack(transcript)
@router.patch("/transcripts/{transcript_id}", response_model=GetTranscript)
room_name = None
if transcript.room_id:
room = await rooms_controller.get_by_id(transcript.room_id)
room_name = room.name if room else None
participants = []
if transcript.participants:
user_ids = [p.user_id for p in transcript.participants if p.user_id is not None]
users_dict = await user_controller.get_by_ids(user_ids) if user_ids else {}
for p in transcript.participants:
user = users_dict.get(p.user_id) if p.user_id else None
participants.append(
TranscriptParticipantWithEmail(
**p.model_dump(), email=user.email if user else None
)
)
base_data = {
"id": transcript.id,
"user_id": transcript.user_id,
"name": transcript.name,
"status": transcript.status,
"locked": transcript.locked,
"duration": transcript.duration,
"title": transcript.title,
"short_summary": transcript.short_summary,
"long_summary": transcript.long_summary,
"action_items": transcript.action_items,
"created_at": transcript.created_at,
"share_mode": transcript.share_mode,
"source_language": transcript.source_language,
"target_language": transcript.target_language,
"reviewed": transcript.reviewed,
"meeting_id": transcript.meeting_id,
"source_kind": transcript.source_kind,
"room_id": transcript.room_id,
"room_name": room_name,
"audio_deleted": transcript.audio_deleted,
"participants": participants,
}
if transcript_format == "text":
return GetTranscriptWithText(
**base_data,
transcript_format="text",
transcript=transcript_to_text(
transcript.topics, transcript.participants, is_multitrack
),
)
elif transcript_format == "text-timestamped":
return GetTranscriptWithTextTimestamped(
**base_data,
transcript_format="text-timestamped",
transcript=transcript_to_text_timestamped(
transcript.topics, transcript.participants, is_multitrack
),
)
elif transcript_format == "webvtt-named":
return GetTranscriptWithWebVTTNamed(
**base_data,
transcript_format="webvtt-named",
transcript=topics_to_webvtt_named(
transcript.topics, transcript.participants, is_multitrack
),
)
elif transcript_format == "json":
return GetTranscriptWithJSON(
**base_data,
transcript_format="json",
transcript=transcript_to_json_segments(
transcript.topics, transcript.participants, is_multitrack
),
)
else:
assert_never(transcript_format)
@router.patch(
"/transcripts/{transcript_id}", response_model=GetTranscriptWithParticipants
)
async def transcript_update(
transcript_id: str,
info: UpdateTranscript,
@@ -419,9 +603,12 @@ async def transcript_get_topics(
transcript_id, user_id=user_id
)
is_multitrack = await _get_is_multitrack(transcript)
# convert to GetTranscriptTopic
return [
GetTranscriptTopic.from_transcript_topic(topic) for topic in transcript.topics
GetTranscriptTopic.from_transcript_topic(topic, is_multitrack)
for topic in transcript.topics
]
@@ -438,9 +625,11 @@ async def transcript_get_topics_with_words(
transcript_id, user_id=user_id
)
is_multitrack = await _get_is_multitrack(transcript)
# convert to GetTranscriptTopicWithWords
return [
GetTranscriptTopicWithWords.from_transcript_topic(topic)
GetTranscriptTopicWithWords.from_transcript_topic(topic, is_multitrack)
for topic in transcript.topics
]
@@ -459,13 +648,17 @@ async def transcript_get_topics_with_words_per_speaker(
transcript_id, user_id=user_id
)
is_multitrack = await _get_is_multitrack(transcript)
# get the topic from the transcript
topic = next((t for t in transcript.topics if t.id == topic_id), None)
if not topic:
raise HTTPException(status_code=404, detail="Topic not found")
# convert to GetTranscriptTopicWithWordsPerSpeaker
return GetTranscriptTopicWithWordsPerSpeaker.from_transcript_topic(topic)
return GetTranscriptTopicWithWordsPerSpeaker.from_transcript_topic(
topic, is_multitrack
)
@router.post("/transcripts/{transcript_id}/zulip")

View File

@@ -1,4 +1,4 @@
from typing import Annotated, Optional
from typing import Annotated, Optional, assert_never
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
@@ -15,7 +15,6 @@ from reflector.services.transcript_process import (
prepare_transcript_processing,
validate_transcript_for_processing,
)
from reflector.utils.match import absurd
router = APIRouter()
@@ -44,12 +43,12 @@ async def transcript_process(
elif isinstance(validation, ValidationOk):
pass
else:
absurd(validation)
assert_never(validation)
config = await prepare_transcript_processing(validation)
if isinstance(config, ProcessError):
raise HTTPException(status_code=500, detail=config.detail)
else:
dispatch_transcript_processing(config)
await dispatch_transcript_processing(config)
return ProcessStatus(status="ok")

View File

@@ -38,6 +38,10 @@ else:
"task": "reflector.worker.process.reprocess_failed_recordings",
"schedule": crontab(hour=5, minute=0), # Midnight EST
},
"reprocess_failed_daily_recordings": {
"task": "reflector.worker.process.reprocess_failed_daily_recordings",
"schedule": crontab(hour=5, minute=0), # Midnight EST
},
"poll_daily_recordings": {
"task": "reflector.worker.process.poll_daily_recordings",
"schedule": 180.0, # Every 3 minutes (configurable lookback window)

View File

@@ -2,6 +2,7 @@ import json
import os
import re
from datetime import datetime, timezone
from typing import List
from urllib.parse import unquote
import av
@@ -11,7 +12,7 @@ from celery import shared_task
from celery.utils.log import get_task_logger
from pydantic import ValidationError
from reflector.dailyco_api import MeetingParticipantsResponse
from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
@@ -21,9 +22,9 @@ from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import (
SourceKind,
TranscriptParticipant,
transcripts_controller,
)
from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
@@ -38,7 +39,7 @@ from reflector.storage import get_transcripts_storage
from reflector.utils.daily import (
DailyRoomName,
extract_base_room_name,
parse_daily_recording_filename,
filter_cam_audio_tracks,
recording_lock_key,
)
from reflector.video_platforms.factory import create_platform_client
@@ -273,15 +274,7 @@ async def _process_multitrack_recording_inner(
# else: Recording already exists; metadata set at creation time
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if transcript:
await transcripts_controller.update(
transcript,
{
"topics": [],
"participants": [],
},
)
else:
if not transcript:
transcript = await transcripts_controller.add(
"",
source_kind=SourceKind.ROOM,
@@ -294,79 +287,49 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
try:
async with create_platform_client("daily") as daily_client:
id_to_name = {}
id_to_user_id = {}
# Start durable workflow if enabled (Hatchet) or room overrides it
durable_started = False
use_hatchet = settings.HATCHET_ENABLED or (room and room.use_hatchet)
try:
rec_details = await daily_client.get_recording(recording_id)
mtg_session_id = rec_details.mtgSessionId
if mtg_session_id:
try:
payload: MeetingParticipantsResponse = (
await daily_client.get_meeting_participants(mtg_session_id)
)
for p in payload.data:
pid = p.participant_id
assert (
pid is not None
), "panic! participant id cannot be None"
name = p.user_name
user_id = p.user_id
if name:
id_to_name[pid] = name
if user_id:
id_to_user_id[pid] = user_id
except Exception as e:
logger.warning(
"Failed to fetch Daily meeting participants",
error=str(e),
mtg_session_id=mtg_session_id,
exc_info=True,
)
else:
logger.warning(
"No mtgSessionId found for recording; participant names may be generic",
recording_id=recording_id,
)
except Exception as e:
logger.warning(
"Failed to fetch Daily recording details",
error=str(e),
recording_id=recording_id,
exc_info=True,
if room and room.use_hatchet and not settings.HATCHET_ENABLED:
logger.info(
"Room forces Hatchet workflow",
room_id=room.id,
transcript_id=transcript.id,
)
for idx, key in enumerate(track_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
exc_info=True,
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.upsert_participant(transcript, participant)
except Exception as e:
logger.warning("Failed to map participant names", error=str(e), exc_info=True)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=track_keys,
track_keys=filter_cam_audio_tracks(track_keys),
)
@@ -391,7 +354,7 @@ async def poll_daily_recordings():
async with create_platform_client("daily") as daily_client:
# latest 100. TODO cursor-based state
api_recordings = await daily_client.list_recordings()
api_recordings: List[RecordingResponse] = await daily_client.list_recordings()
if not api_recordings:
logger.debug(
@@ -399,16 +362,38 @@ async def poll_daily_recordings():
)
return
recording_ids = [rec.id for rec in api_recordings]
finished_recordings: List[FinishedRecordingResponse] = []
for rec in api_recordings:
finished = rec.to_finished()
if finished is None:
logger.debug(
"Skipping unfinished recording",
recording_id=rec.id,
room_name=rec.room_name,
status=rec.status,
)
continue
finished_recordings.append(finished)
if not finished_recordings:
logger.debug(
"No finished recordings found from Daily.co API",
total_api_count=len(api_recordings),
)
return
recording_ids = [rec.id for rec in finished_recordings]
existing_recordings = await recordings_controller.get_by_ids(recording_ids)
existing_ids = {rec.id for rec in existing_recordings}
missing_recordings = [rec for rec in api_recordings if rec.id not in existing_ids]
missing_recordings = [
rec for rec in finished_recordings if rec.id not in existing_ids
]
if not missing_recordings:
logger.debug(
"All recordings already in DB",
api_count=len(api_recordings),
api_count=len(finished_recordings),
existing_count=len(existing_recordings),
)
return
@@ -416,17 +401,19 @@ async def poll_daily_recordings():
logger.info(
"Found recordings missing from DB",
missing_count=len(missing_recordings),
total_api_count=len(api_recordings),
total_api_count=len(finished_recordings),
existing_count=len(existing_recordings),
)
for recording in missing_recordings:
if not recording.tracks:
assert recording.status != "finished", (
f"Recording {recording.id} has status='finished' but no tracks. "
f"Daily.co API guarantees finished recordings have tracks available. "
f"room_name={recording.room_name}"
if recording.status == "finished":
logger.warning(
"Finished recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
else:
logger.debug(
"No tracks in recording yet",
recording_id=recording.id,
@@ -724,7 +711,7 @@ async def reprocess_failed_recordings():
Find recordings in Whereby S3 bucket and check if they have proper transcriptions.
If not, requeue them for processing.
Note: Daily.co recordings are processed via webhooks, not this cron job.
Note: Daily.co multitrack recordings are handled by reprocess_failed_daily_recordings.
"""
logger.info("Checking Whereby recordings that need processing or reprocessing")
@@ -777,6 +764,103 @@ async def reprocess_failed_recordings():
return reprocessed_count
@shared_task
@asynctask
async def reprocess_failed_daily_recordings():
"""
Find Daily.co multitrack recordings in the database and check if they have proper transcriptions.
If not, requeue them for processing.
"""
logger.info(
"Checking Daily.co multitrack recordings that need processing or reprocessing"
)
if not settings.DAILYCO_STORAGE_AWS_BUCKET_NAME:
logger.debug(
"DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; skipping Daily recording reprocessing"
)
return 0
bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME
reprocessed_count = 0
try:
multitrack_recordings = (
await recordings_controller.get_multitrack_needing_reprocessing(bucket_name)
)
logger.info(
"Found multitrack recordings needing reprocessing",
count=len(multitrack_recordings),
bucket=bucket_name,
)
for recording in multitrack_recordings:
if not recording.meeting_id:
logger.debug(
"Skipping recording without meeting_id",
recording_id=recording.id,
)
continue
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.warning(
"Meeting not found for recording",
recording_id=recording.id,
meeting_id=recording.meeting_id,
)
continue
transcript = None
try:
transcript = await transcripts_controller.get_by_recording_id(
recording.id
)
except ValidationError:
await transcripts_controller.remove_by_recording_id(recording.id)
logger.warning(
"Removed invalid transcript for recording",
recording_id=recording.id,
)
if not recording.track_keys:
logger.warning(
"Recording has no track_keys, cannot reprocess",
recording_id=recording.id,
)
continue
logger.info(
"Queueing Daily recording for reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
)
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
)
reprocessed_count += 1
except Exception as e:
logger.error(
"Error checking Daily multitrack recordings",
error=str(e),
exc_info=True,
)
logger.info(
"Daily reprocessing complete",
requeued_count=reprocessed_count,
)
return reprocessed_count
@shared_task
@asynctask
async def trigger_daily_reconciliation() -> None:

View File

@@ -123,6 +123,7 @@ async def send_transcript_webhook(
"target_language": transcript.target_language,
"status": transcript.status,
"frontend_url": frontend_url,
"action_items": transcript.action_items,
},
"room": {
"id": room.id,

View File

@@ -16,6 +16,7 @@ import threading
import redis.asyncio as redis
from fastapi import WebSocket
from reflector.events import subscribers_shutdown
from reflector.settings import settings
@@ -109,29 +110,30 @@ class WebsocketManager:
await socket.send_json(data)
_ws_manager_instance: WebsocketManager | None = None
_ws_manager_lock = threading.Lock()
def get_ws_manager() -> WebsocketManager:
"""
Returns the WebsocketManager instance for managing websockets.
This function initializes and returns the WebsocketManager instance,
which is responsible for managing websockets and handling websocket
connections.
Returns:
WebsocketManager: The initialized WebsocketManager instance.
Raises:
ImportError: If the 'reflector.settings' module cannot be imported.
RedisConnectionError: If there is an error connecting to the Redis server.
"""
local = threading.local()
if hasattr(local, "ws_manager"):
return local.ws_manager
"""Returns the WebsocketManager singleton instance."""
global _ws_manager_instance
if _ws_manager_instance is None:
with _ws_manager_lock:
if _ws_manager_instance is None:
pubsub_client = RedisPubSubManager(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
)
ws_manager = WebsocketManager(pubsub_client=pubsub_client)
local.ws_manager = ws_manager
return ws_manager
_ws_manager_instance = WebsocketManager(pubsub_client=pubsub_client)
return _ws_manager_instance
async def cleanup_ws_manager(_app=None) -> None:
"""Cleanup WebsocketManager singleton on shutdown."""
global _ws_manager_instance
if _ws_manager_instance is not None:
await _ws_manager_instance.pubsub_client.disconnect()
_ws_manager_instance = None
subscribers_shutdown.append(cleanup_ws_manager)

View File

@@ -3,7 +3,8 @@ from urllib.parse import urlparse
import httpx
from reflector.db.transcripts import Transcript
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.settings import settings
@@ -113,6 +114,49 @@ def get_zulip_message(transcript: Transcript, include_topics: bool):
return message
async def post_transcript_notification(transcript: Transcript) -> int | None:
"""Post or update transcript notification in Zulip.
Uses transcript.room_id directly (Hatchet flow).
Celery's pipeline_post_to_zulip uses recording→meeting→room path instead.
DUPLICATION NOTE: This function will stay when we use Celery no more, and Celery one will be removed.
"""
if not transcript.room_id:
return None
room = await rooms_controller.get_by_id(transcript.room_id)
if not room or not room.zulip_stream or not room.zulip_auto_post:
return None
message = get_zulip_message(transcript=transcript, include_topics=True)
message_updated = False
if transcript.zulip_message_id:
try:
await update_zulip_message(
transcript.zulip_message_id,
room.zulip_stream,
room.zulip_topic,
message,
)
message_updated = True
except Exception:
pass
if not message_updated:
response = await send_message_to_zulip(
room.zulip_stream, room.zulip_topic, message
)
message_id = response.get("id")
if message_id:
await transcripts_controller.update(
transcript, {"zulip_message_id": message_id}
)
return message_id
return transcript.zulip_message_id
def extract_domain(url: str) -> str:
return urlparse(url).netloc

View File

@@ -7,6 +7,8 @@ elif [ "${ENTRYPOINT}" = "worker" ]; then
uv run celery -A reflector.worker.app worker --loglevel=info
elif [ "${ENTRYPOINT}" = "beat" ]; then
uv run celery -A reflector.worker.app beat --loglevel=info
elif [ "${ENTRYPOINT}" = "hatchet-worker" ]; then
uv run python -m reflector.hatchet.run_workers
else
echo "Unknown command"
fi

View File

@@ -318,6 +318,14 @@ async def dummy_storage():
yield
@pytest.fixture
def test_settings():
"""Provide isolated settings for tests to avoid modifying global settings"""
from reflector.settings import Settings
return Settings()
@pytest.fixture(scope="session")
def celery_enable_logging():
return True
@@ -519,6 +527,22 @@ def fake_mp3_upload():
yield
@pytest.fixture(autouse=True)
def reset_hatchet_client():
"""Reset HatchetClientManager singleton before and after each test.
This ensures test isolation - each test starts with a fresh client state.
The fixture is autouse=True so it applies to all tests automatically.
"""
from reflector.hatchet.client import HatchetClientManager
# Reset before test
HatchetClientManager.reset()
yield
# Reset after test to clean up
HatchetClientManager.reset()
@pytest.fixture
async def fake_transcript_with_topics(tmpdir, client):
import shutil

View File

@@ -0,0 +1,54 @@
"""
Tests for HatchetClientManager error handling and validation.
Only tests that catch real bugs - not mock verification tests.
Note: The `reset_hatchet_client` fixture (autouse=True in conftest.py)
automatically resets the singleton before and after each test.
"""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@pytest.mark.asyncio
async def test_hatchet_client_can_replay_handles_exception():
"""Test can_replay returns False when status check fails.
Useful: Ensures network/API errors don't crash the system and
gracefully allow reprocessing when workflow state is unknown.
"""
from reflector.hatchet.client import HatchetClientManager
with patch("reflector.hatchet.client.settings") as mock_settings:
mock_settings.HATCHET_CLIENT_TOKEN = "test-token"
mock_settings.HATCHET_DEBUG = False
with patch("reflector.hatchet.client.Hatchet") as mock_hatchet_class:
mock_client = MagicMock()
mock_hatchet_class.return_value = mock_client
mock_client.runs.aio_get_status = AsyncMock(
side_effect=Exception("Network error")
)
can_replay = await HatchetClientManager.can_replay("workflow-123")
# Should return False on error (workflow might be gone)
assert can_replay is False
def test_hatchet_client_raises_without_token():
"""Test that get_client raises ValueError without token.
Useful: Catches if someone removes the token validation,
which would cause cryptic errors later.
"""
from reflector.hatchet.client import HatchetClientManager
with patch("reflector.hatchet.client.settings") as mock_settings:
mock_settings.HATCHET_CLIENT_TOKEN = None
with pytest.raises(ValueError, match="HATCHET_CLIENT_TOKEN must be set"):
HatchetClientManager.get_client()

View File

@@ -0,0 +1,398 @@
"""
Tests for Hatchet workflow dispatch and routing logic.
These tests verify:
1. Routing to Hatchet when HATCHET_ENABLED=True
2. Replay logic for failed workflows
3. Force flag to cancel and restart
4. Validation prevents concurrent workflows
"""
from unittest.mock import AsyncMock, patch
import pytest
from hatchet_sdk.clients.rest.exceptions import ApiException
from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.transcripts import Transcript
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_blocks_running_workflow():
"""Test that validation blocks reprocessing when workflow is running."""
from reflector.services.transcript_process import (
ValidationAlreadyScheduled,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="processing",
source_kind="room",
workflow_run_id="running-workflow-123",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.RUNNING
)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationAlreadyScheduled)
assert "running" in result.detail.lower()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_blocks_queued_workflow():
"""Test that validation blocks reprocessing when workflow is queued."""
from reflector.services.transcript_process import (
ValidationAlreadyScheduled,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="processing",
source_kind="room",
workflow_run_id="queued-workflow-123",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.QUEUED
)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationAlreadyScheduled)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_allows_failed_workflow():
"""Test that validation allows reprocessing when workflow has failed."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="error",
source_kind="room",
workflow_run_id="failed-workflow-123",
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.FAILED
)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationOk)
assert result.transcript_id == "test-transcript-id"
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_allows_completed_workflow():
"""Test that validation allows reprocessing when workflow has completed."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="ended",
source_kind="room",
workflow_run_id="completed-workflow-123",
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
mock_hatchet.get_workflow_run_status = AsyncMock(
return_value=V1TaskStatus.COMPLETED
)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_allows_when_status_check_fails():
"""Test that validation allows reprocessing when status check fails (workflow might be gone)."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="error",
source_kind="room",
workflow_run_id="old-workflow-123",
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Status check fails (workflow might be deleted)
mock_hatchet.get_workflow_run_status = AsyncMock(
side_effect=ApiException("Workflow not found")
)
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
# Should allow processing when we can't get status
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_skipped_when_no_workflow_id():
"""Test that Hatchet validation is skipped when transcript has no workflow_run_id."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="uploaded",
source_kind="room",
workflow_run_id=None, # No workflow yet
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = True
with patch(
"reflector.services.transcript_process.HatchetClientManager"
) as mock_hatchet:
# Should not be called
mock_hatchet.get_workflow_run_status = AsyncMock()
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
# Should not check Hatchet status
mock_hatchet.get_workflow_run_status.assert_not_called()
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_hatchet_validation_skipped_when_disabled():
"""Test that Hatchet validation is skipped when HATCHET_ENABLED is False."""
from reflector.services.transcript_process import (
ValidationOk,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="uploaded",
source_kind="room",
workflow_run_id="some-workflow-123",
recording_id="test-recording-id",
)
with patch("reflector.services.transcript_process.settings") as mock_settings:
mock_settings.HATCHET_ENABLED = False # Hatchet disabled
with patch(
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
# Should not check Hatchet at all
assert isinstance(result, ValidationOk)
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_validation_locked_transcript():
"""Test that validation rejects locked transcripts."""
from reflector.services.transcript_process import (
ValidationLocked,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="ended",
source_kind="room",
locked=True,
)
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationLocked)
assert "locked" in result.detail.lower()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_validation_idle_transcript():
"""Test that validation rejects idle transcripts (not ready)."""
from reflector.services.transcript_process import (
ValidationNotReady,
validate_transcript_for_processing,
)
mock_transcript = Transcript(
id="test-transcript-id",
name="Test",
status="idle",
source_kind="room",
)
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationNotReady)
assert "not ready" in result.detail.lower()
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_prepare_multitrack_config():
"""Test preparing multitrack processing config."""
from reflector.db.recordings import Recording
from reflector.services.transcript_process import (
MultitrackProcessingConfig,
ValidationOk,
prepare_transcript_processing,
)
validation = ValidationOk(
recording_id="test-recording-id",
transcript_id="test-transcript-id",
)
mock_recording = Recording(
id="test-recording-id",
bucket_name="test-bucket",
object_key="recordings/test",
recorded_at="2024-01-01T00:00:00Z",
track_keys=["track1.webm", "track2.webm"],
)
with patch(
"reflector.services.transcript_process.recordings_controller"
) as mock_rc:
mock_rc.get_by_id = AsyncMock(return_value=mock_recording)
result = await prepare_transcript_processing(validation)
assert isinstance(result, MultitrackProcessingConfig)
assert result.bucket_name == "test-bucket"
assert result.track_keys == ["track1.webm", "track2.webm"]
assert result.transcript_id == "test-transcript-id"
assert result.room_id is None # ValidationOk didn't specify room_id
@pytest.mark.usefixtures("setup_database")
@pytest.mark.asyncio
async def test_prepare_file_config():
"""Test preparing file processing config (no track keys)."""
from reflector.db.recordings import Recording
from reflector.services.transcript_process import (
FileProcessingConfig,
ValidationOk,
prepare_transcript_processing,
)
validation = ValidationOk(
recording_id="test-recording-id",
transcript_id="test-transcript-id",
)
mock_recording = Recording(
id="test-recording-id",
bucket_name="test-bucket",
object_key="recordings/test.mp4",
recorded_at="2024-01-01T00:00:00Z",
track_keys=None, # No track keys = file pipeline
)
with patch(
"reflector.services.transcript_process.recordings_controller"
) as mock_rc:
mock_rc.get_by_id = AsyncMock(return_value=mock_recording)
result = await prepare_transcript_processing(validation)
assert isinstance(result, FileProcessingConfig)
assert result.transcript_id == "test-transcript-id"

View File

@@ -0,0 +1,488 @@
"""Tests for LLM parse error recovery using llama-index Workflow"""
from time import monotonic
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from pydantic import BaseModel, Field
from workflows.errors import WorkflowRuntimeError, WorkflowTimeoutError
from reflector.llm import LLM, LLMParseError, StructuredOutputWorkflow
from reflector.utils.retry import RetryException
class TestResponse(BaseModel):
"""Test response model for structured output"""
title: str = Field(description="A title")
summary: str = Field(description="A summary")
confidence: float = Field(description="Confidence score", ge=0, le=1)
def make_completion_response(text: str):
"""Create a mock CompletionResponse with .text attribute"""
response = MagicMock()
response.text = text
return response
class TestLLMParseErrorRecovery:
"""Test parse error recovery with Workflow feedback loop"""
@pytest.mark.asyncio
async def test_parse_error_recovery_with_feedback(self, test_settings):
"""Test that parse errors trigger retry with error feedback"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
# TreeSummarize returns plain text analysis (step 1)
mock_summarizer.aget_response = AsyncMock(
return_value="The analysis shows a test with summary and high confidence."
)
call_count = {"count": 0}
async def acomplete_handler(prompt, *args, **kwargs):
call_count["count"] += 1
if call_count["count"] == 1:
# First JSON formatting call returns invalid JSON
return make_completion_response('{"title": "Test"}')
else:
# Second call should have error feedback in prompt
assert "Your previous response could not be parsed:" in prompt
assert '{"title": "Test"}' in prompt
assert "Error:" in prompt
assert "Please try again" in prompt
return make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
mock_settings.llm.acomplete = AsyncMock(side_effect=acomplete_handler)
result = await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert result.title == "Test"
assert result.summary == "Summary"
assert result.confidence == 0.95
# TreeSummarize called once, Settings.llm.acomplete called twice
assert mock_summarizer.aget_response.call_count == 1
assert call_count["count"] == 2
@pytest.mark.asyncio
async def test_max_parse_retry_attempts(self, test_settings):
"""Test that parse error retry stops after max attempts"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
# Always return invalid JSON from acomplete
mock_settings.llm.acomplete = AsyncMock(
return_value=make_completion_response(
'{"invalid": "missing required fields"}'
)
)
with pytest.raises(LLMParseError, match="Failed to parse"):
await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
expected_attempts = test_settings.LLM_PARSE_MAX_RETRIES + 1
# TreeSummarize called once, acomplete called max_retries times
assert mock_summarizer.aget_response.call_count == 1
assert mock_settings.llm.acomplete.call_count == expected_attempts
@pytest.mark.asyncio
async def test_raw_response_logging_on_parse_error(self, test_settings, caplog):
"""Test that raw response is logged when parse error occurs"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
caplog.at_level("ERROR"),
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
call_count = {"count": 0}
async def acomplete_handler(*args, **kwargs):
call_count["count"] += 1
if call_count["count"] == 1:
return make_completion_response('{"title": "Test"}') # Invalid
return make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
mock_settings.llm.acomplete = AsyncMock(side_effect=acomplete_handler)
result = await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert result.title == "Test"
error_logs = [r for r in caplog.records if r.levelname == "ERROR"]
raw_response_logged = any("Raw response:" in r.message for r in error_logs)
assert raw_response_logged, "Raw response should be logged on parse error"
@pytest.mark.asyncio
async def test_multiple_validation_errors_in_feedback(self, test_settings):
"""Test that validation errors are included in feedback"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
call_count = {"count": 0}
async def acomplete_handler(prompt, *args, **kwargs):
call_count["count"] += 1
if call_count["count"] == 1:
# Missing title and summary
return make_completion_response('{"confidence": 0.5}')
else:
# Should have schema validation errors in prompt
assert (
"Schema validation errors" in prompt
or "error" in prompt.lower()
)
return make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
mock_settings.llm.acomplete = AsyncMock(side_effect=acomplete_handler)
result = await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert result.title == "Test"
assert call_count["count"] == 2
@pytest.mark.asyncio
async def test_success_on_first_attempt(self, test_settings):
"""Test that no retry happens when first attempt succeeds"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
mock_settings.llm.acomplete = AsyncMock(
return_value=make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
)
result = await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert result.title == "Test"
assert result.summary == "Summary"
assert result.confidence == 0.95
assert mock_summarizer.aget_response.call_count == 1
assert mock_settings.llm.acomplete.call_count == 1
class TestStructuredOutputWorkflow:
"""Direct tests for the StructuredOutputWorkflow"""
@pytest.mark.asyncio
async def test_workflow_retries_on_validation_error(self):
"""Test workflow retries when validation fails"""
workflow = StructuredOutputWorkflow(
output_cls=TestResponse,
max_retries=3,
timeout=30,
)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
call_count = {"count": 0}
async def acomplete_handler(*args, **kwargs):
call_count["count"] += 1
if call_count["count"] < 2:
return make_completion_response('{"title": "Only title"}')
return make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.9}'
)
mock_settings.llm.acomplete = AsyncMock(side_effect=acomplete_handler)
result = await workflow.run(
prompt="Extract data",
texts=["Some text"],
tone_name=None,
)
assert "success" in result
assert result["success"].title == "Test"
assert call_count["count"] == 2
@pytest.mark.asyncio
async def test_workflow_returns_error_after_max_retries(self):
"""Test workflow returns error after exhausting retries"""
workflow = StructuredOutputWorkflow(
output_cls=TestResponse,
max_retries=2,
timeout=30,
)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
# Always return invalid JSON
mock_settings.llm.acomplete = AsyncMock(
return_value=make_completion_response('{"invalid": true}')
)
result = await workflow.run(
prompt="Extract data",
texts=["Some text"],
tone_name=None,
)
assert "error" in result
# TreeSummarize called once, acomplete called max_retries times
assert mock_summarizer.aget_response.call_count == 1
assert mock_settings.llm.acomplete.call_count == 2
class TestNetworkErrorRetries:
"""Test that network error retries are handled by OpenAILike, not Workflow"""
@pytest.mark.asyncio
async def test_network_error_propagates_after_openai_retries(self, test_settings):
"""Test that network errors are retried by OpenAILike and then propagate.
Network retries are handled by OpenAILike (max_retries=3), not by our
StructuredOutputWorkflow. This test verifies that network errors propagate
up after OpenAILike exhausts its retries.
"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
# Simulate network error from acomplete (after OpenAILike retries exhausted)
network_error = ConnectionError("Connection refused")
mock_settings.llm.acomplete = AsyncMock(side_effect=network_error)
# Network error wrapped in WorkflowRuntimeError
with pytest.raises(WorkflowRuntimeError, match="Connection refused"):
await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
# acomplete called only once - network error propagates, not retried by Workflow
assert mock_settings.llm.acomplete.call_count == 1
@pytest.mark.asyncio
async def test_network_error_not_retried_by_workflow(self, test_settings):
"""Test that Workflow does NOT retry network errors (OpenAILike handles those).
This verifies the separation of concerns:
- StructuredOutputWorkflow: retries parse/validation errors
- OpenAILike: retries network errors (internally, max_retries=3)
"""
workflow = StructuredOutputWorkflow(
output_cls=TestResponse,
max_retries=3,
timeout=30,
)
with (
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
# Network error should propagate immediately, not trigger Workflow retry
mock_settings.llm.acomplete = AsyncMock(
side_effect=TimeoutError("Request timed out")
)
# Network error wrapped in WorkflowRuntimeError
with pytest.raises(WorkflowRuntimeError, match="Request timed out"):
await workflow.run(
prompt="Extract data",
texts=["Some text"],
tone_name=None,
)
# Only called once - Workflow doesn't retry network errors
assert mock_settings.llm.acomplete.call_count == 1
class TestWorkflowTimeoutRetry:
"""Test timeout retry mechanism in get_structured_response"""
@pytest.mark.asyncio
async def test_timeout_retry_succeeds_on_retry(self, test_settings):
"""Test that WorkflowTimeoutError triggers retry and succeeds"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
call_count = {"count": 0}
async def workflow_run_side_effect(*args, **kwargs):
call_count["count"] += 1
if call_count["count"] == 1:
raise WorkflowTimeoutError("Operation timed out after 120 seconds")
return {
"success": TestResponse(
title="Test", summary="Summary", confidence=0.95
)
}
with (
patch("reflector.llm.StructuredOutputWorkflow") as mock_workflow_class,
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_workflow = MagicMock()
mock_workflow.run = AsyncMock(side_effect=workflow_run_side_effect)
mock_workflow_class.return_value = mock_workflow
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
mock_settings.llm.acomplete = AsyncMock(
return_value=make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
)
result = await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert result.title == "Test"
assert result.summary == "Summary"
assert call_count["count"] == 2
@pytest.mark.asyncio
async def test_timeout_retry_exhausts_after_max_attempts(self, test_settings):
"""Test that timeout retry stops after max attempts"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
call_count = {"count": 0}
async def workflow_run_side_effect(*args, **kwargs):
call_count["count"] += 1
raise WorkflowTimeoutError("Operation timed out after 120 seconds")
with (
patch("reflector.llm.StructuredOutputWorkflow") as mock_workflow_class,
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_workflow = MagicMock()
mock_workflow.run = AsyncMock(side_effect=workflow_run_side_effect)
mock_workflow_class.return_value = mock_workflow
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
mock_settings.llm.acomplete = AsyncMock(
return_value=make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
)
with pytest.raises(RetryException, match="Retry attempts exceeded"):
await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert call_count["count"] == 3
@pytest.mark.asyncio
async def test_timeout_retry_with_backoff(self, test_settings):
"""Test that exponential backoff is applied between retries"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
call_times = []
async def workflow_run_side_effect(*args, **kwargs):
call_times.append(monotonic())
if len(call_times) < 3:
raise WorkflowTimeoutError("Operation timed out after 120 seconds")
return {
"success": TestResponse(
title="Test", summary="Summary", confidence=0.95
)
}
with (
patch("reflector.llm.StructuredOutputWorkflow") as mock_workflow_class,
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_workflow = MagicMock()
mock_workflow.run = AsyncMock(side_effect=workflow_run_side_effect)
mock_workflow_class.return_value = mock_workflow
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
mock_settings.llm.acomplete = AsyncMock(
return_value=make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
)
result = await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert result.title == "Test"
if len(call_times) >= 2:
time_between_calls = call_times[1] - call_times[0]
assert (
time_between_calls >= 1.5
), f"Expected ~2s backoff, got {time_between_calls}s"

View File

@@ -266,7 +266,11 @@ async def mock_summary_processor():
# When flush is called, simulate summary generation by calling the callbacks
async def flush_with_callback():
mock_summary.flush_called = True
from reflector.processors.types import FinalLongSummary, FinalShortSummary
from reflector.processors.types import (
ActionItems,
FinalLongSummary,
FinalShortSummary,
)
if hasattr(mock_summary, "_callback"):
await mock_summary._callback(
@@ -276,12 +280,19 @@ async def mock_summary_processor():
await mock_summary._on_short_summary(
FinalShortSummary(short_summary="Test short summary", duration=10.0)
)
if hasattr(mock_summary, "_on_action_items"):
await mock_summary._on_action_items(
ActionItems(action_items={"test": "action item"})
)
mock_summary.flush = flush_with_callback
def init_with_callback(transcript=None, callback=None, on_short_summary=None):
def init_with_callback(
transcript=None, callback=None, on_short_summary=None, on_action_items=None
):
mock_summary._callback = callback
mock_summary._on_short_summary = on_short_summary
mock_summary._on_action_items = on_action_items
return mock_summary
mock_summary_class.side_effect = init_with_callback

View File

@@ -159,3 +159,78 @@ def test_processor_transcript_segment():
assert segments[3].start == 30.72
assert segments[4].start == 31.56
assert segments[5].start == 32.38
def test_processor_transcript_segment_multitrack_interleaved():
"""Test as_segments(is_multitrack=True) with interleaved speakers.
Multitrack recordings have words from different speakers sorted by start time,
causing frequent speaker alternation. The multitrack mode should group by
speaker first, then split into sentences.
"""
from reflector.processors.types import Transcript, Word
# Simulate real multitrack data: words sorted by start time, speakers interleave
# Speaker 0 says: "Hello there."
# Speaker 1 says: "I'm good."
# When sorted by time, words interleave
transcript = Transcript(
words=[
Word(text="Hello ", start=0.0, end=0.5, speaker=0),
Word(text="I'm ", start=0.5, end=0.8, speaker=1),
Word(text="there.", start=0.5, end=1.0, speaker=0),
Word(text="good.", start=1.0, end=1.5, speaker=1),
]
)
# Default behavior (is_multitrack=False): breaks on every speaker change = 4 segments
segments_default = transcript.as_segments(is_multitrack=False)
assert len(segments_default) == 4
# Multitrack behavior: groups by speaker, then sentences = 2 segments
segments_multitrack = transcript.as_segments(is_multitrack=True)
assert len(segments_multitrack) == 2
# Check content - sorted by start time
assert segments_multitrack[0].speaker == 0
assert segments_multitrack[0].text == "Hello there."
assert segments_multitrack[0].start == 0.0
assert segments_multitrack[0].end == 1.0
assert segments_multitrack[1].speaker == 1
assert segments_multitrack[1].text == "I'm good."
assert segments_multitrack[1].start == 0.5
assert segments_multitrack[1].end == 1.5
def test_processor_transcript_segment_multitrack_overlapping_timestamps():
"""Test multitrack with exactly overlapping timestamps (real Daily.co data pattern)."""
from reflector.processors.types import Transcript, Word
# Real pattern from transcript 38d84d57: words with identical timestamps
transcript = Transcript(
words=[
Word(text="speaking ", start=6.71, end=7.11, speaker=0),
Word(text="Speaking ", start=6.71, end=7.11, speaker=1),
Word(text="at ", start=7.11, end=7.27, speaker=0),
Word(text="at ", start=7.11, end=7.27, speaker=1),
Word(text="the ", start=7.27, end=7.43, speaker=0),
Word(text="the ", start=7.27, end=7.43, speaker=1),
Word(text="same ", start=7.43, end=7.59, speaker=0),
Word(text="same ", start=7.43, end=7.59, speaker=1),
Word(text="time.", start=7.59, end=8.0, speaker=0),
Word(text="time.", start=7.59, end=8.0, speaker=1),
]
)
# Default: 10 segments (one per speaker change)
segments_default = transcript.as_segments(is_multitrack=False)
assert len(segments_default) == 10
# Multitrack: 2 segments (one per speaker sentence)
segments_multitrack = transcript.as_segments(is_multitrack=True)
assert len(segments_multitrack) == 2
# Both should have complete sentences
assert "speaking at the same time." in segments_multitrack[0].text
assert "Speaking at the same time." in segments_multitrack[1].text

View File

@@ -0,0 +1,779 @@
"""Tests for transcript format conversion functionality."""
import pytest
from reflector.db.transcripts import TranscriptParticipant, TranscriptTopic
from reflector.processors.types import Word
from reflector.utils.transcript_formats import (
format_timestamp_mmss,
get_speaker_name,
topics_to_webvtt_named,
transcript_to_json_segments,
transcript_to_text,
transcript_to_text_timestamped,
)
@pytest.mark.asyncio
async def test_get_speaker_name_with_participants():
"""Test speaker name resolution with participants list."""
participants = [
TranscriptParticipant(id="1", speaker=0, name="John Smith"),
TranscriptParticipant(id="2", speaker=1, name="Jane Doe"),
]
assert get_speaker_name(0, participants) == "John Smith"
assert get_speaker_name(1, participants) == "Jane Doe"
assert get_speaker_name(2, participants) == "Speaker 2"
@pytest.mark.asyncio
async def test_get_speaker_name_without_participants():
"""Test speaker name resolution without participants list."""
assert get_speaker_name(0, None) == "Speaker 0"
assert get_speaker_name(1, None) == "Speaker 1"
assert get_speaker_name(5, []) == "Speaker 5"
@pytest.mark.asyncio
async def test_format_timestamp_mmss():
"""Test timestamp formatting to MM:SS."""
assert format_timestamp_mmss(0) == "00:00"
assert format_timestamp_mmss(5) == "00:05"
assert format_timestamp_mmss(65) == "01:05"
assert format_timestamp_mmss(125.7) == "02:05"
assert format_timestamp_mmss(3661) == "61:01"
@pytest.mark.asyncio
async def test_transcript_to_text():
"""Test plain text format conversion."""
topics = [
TranscriptTopic(
id="1",
title="Topic 1",
summary="Summary 1",
timestamp=0.0,
words=[
Word(text="Hello", start=0.0, end=1.0, speaker=0),
Word(text=" world.", start=1.0, end=2.0, speaker=0),
],
),
TranscriptTopic(
id="2",
title="Topic 2",
summary="Summary 2",
timestamp=2.0,
words=[
Word(text="How", start=2.0, end=3.0, speaker=1),
Word(text=" are", start=3.0, end=4.0, speaker=1),
Word(text=" you?", start=4.0, end=5.0, speaker=1),
],
),
]
participants = [
TranscriptParticipant(id="1", speaker=0, name="John Smith"),
TranscriptParticipant(id="2", speaker=1, name="Jane Doe"),
]
result = transcript_to_text(topics, participants)
lines = result.split("\n")
assert len(lines) == 2
assert lines[0] == "John Smith: Hello world."
assert lines[1] == "Jane Doe: How are you?"
@pytest.mark.asyncio
async def test_transcript_to_text_timestamped():
"""Test timestamped text format conversion."""
topics = [
TranscriptTopic(
id="1",
title="Topic 1",
summary="Summary 1",
timestamp=0.0,
words=[
Word(text="Hello", start=0.0, end=1.0, speaker=0),
Word(text=" world.", start=1.0, end=2.0, speaker=0),
],
),
TranscriptTopic(
id="2",
title="Topic 2",
summary="Summary 2",
timestamp=65.0,
words=[
Word(text="How", start=65.0, end=66.0, speaker=1),
Word(text=" are", start=66.0, end=67.0, speaker=1),
Word(text=" you?", start=67.0, end=68.0, speaker=1),
],
),
]
participants = [
TranscriptParticipant(id="1", speaker=0, name="John Smith"),
TranscriptParticipant(id="2", speaker=1, name="Jane Doe"),
]
result = transcript_to_text_timestamped(topics, participants)
lines = result.split("\n")
assert len(lines) == 2
assert lines[0] == "[00:00] John Smith: Hello world."
assert lines[1] == "[01:05] Jane Doe: How are you?"
@pytest.mark.asyncio
async def test_topics_to_webvtt_named():
"""Test WebVTT format conversion with participant names."""
topics = [
TranscriptTopic(
id="1",
title="Topic 1",
summary="Summary 1",
timestamp=0.0,
words=[
Word(text="Hello", start=0.0, end=1.0, speaker=0),
Word(text=" world.", start=1.0, end=2.0, speaker=0),
],
),
]
participants = [
TranscriptParticipant(id="1", speaker=0, name="John Smith"),
]
result = topics_to_webvtt_named(topics, participants)
assert result.startswith("WEBVTT")
assert "<v John Smith>" in result
assert "00:00:00.000 --> 00:00:02.000" in result
assert "Hello world." in result
@pytest.mark.asyncio
async def test_transcript_to_json_segments():
"""Test JSON segments format conversion."""
topics = [
TranscriptTopic(
id="1",
title="Topic 1",
summary="Summary 1",
timestamp=0.0,
words=[
Word(text="Hello", start=0.0, end=1.0, speaker=0),
Word(text=" world.", start=1.0, end=2.0, speaker=0),
],
),
TranscriptTopic(
id="2",
title="Topic 2",
summary="Summary 2",
timestamp=2.0,
words=[
Word(text="How", start=2.0, end=3.0, speaker=1),
Word(text=" are", start=3.0, end=4.0, speaker=1),
Word(text=" you?", start=4.0, end=5.0, speaker=1),
],
),
]
participants = [
TranscriptParticipant(id="1", speaker=0, name="John Smith"),
TranscriptParticipant(id="2", speaker=1, name="Jane Doe"),
]
result = transcript_to_json_segments(topics, participants)
assert len(result) == 2
assert result[0].speaker == 0
assert result[0].speaker_name == "John Smith"
assert result[0].text == "Hello world."
assert result[0].start == 0.0
assert result[0].end == 2.0
assert result[1].speaker == 1
assert result[1].speaker_name == "Jane Doe"
assert result[1].text == "How are you?"
assert result[1].start == 2.0
assert result[1].end == 5.0
@pytest.mark.asyncio
async def test_transcript_formats_with_empty_topics():
"""Test format conversion with empty topics list."""
topics = []
participants = []
assert transcript_to_text(topics, participants) == ""
assert transcript_to_text_timestamped(topics, participants) == ""
assert "WEBVTT" in topics_to_webvtt_named(topics, participants)
assert transcript_to_json_segments(topics, participants) == []
@pytest.mark.asyncio
async def test_transcript_formats_with_empty_words():
"""Test format conversion with topics containing no words."""
topics = [
TranscriptTopic(
id="1",
title="Topic 1",
summary="Summary 1",
timestamp=0.0,
words=[],
),
]
participants = []
assert transcript_to_text(topics, participants) == ""
assert transcript_to_text_timestamped(topics, participants) == ""
assert "WEBVTT" in topics_to_webvtt_named(topics, participants)
assert transcript_to_json_segments(topics, participants) == []
@pytest.mark.asyncio
async def test_transcript_formats_with_multiple_speakers():
"""Test format conversion with multiple speaker changes."""
topics = [
TranscriptTopic(
id="1",
title="Topic 1",
summary="Summary 1",
timestamp=0.0,
words=[
Word(text="Hello", start=0.0, end=1.0, speaker=0),
Word(text=" there.", start=1.0, end=2.0, speaker=0),
Word(text="Hi", start=2.0, end=3.0, speaker=1),
Word(text=" back.", start=3.0, end=4.0, speaker=1),
Word(text="Good", start=4.0, end=5.0, speaker=0),
Word(text=" morning.", start=5.0, end=6.0, speaker=0),
],
),
]
participants = [
TranscriptParticipant(id="1", speaker=0, name="Alice"),
TranscriptParticipant(id="2", speaker=1, name="Bob"),
]
text_result = transcript_to_text(topics, participants)
lines = text_result.split("\n")
assert len(lines) == 3
assert "Alice: Hello there." in lines[0]
assert "Bob: Hi back." in lines[1]
assert "Alice: Good morning." in lines[2]
json_result = transcript_to_json_segments(topics, participants)
assert len(json_result) == 3
assert json_result[0].speaker_name == "Alice"
assert json_result[1].speaker_name == "Bob"
assert json_result[2].speaker_name == "Alice"
@pytest.mark.asyncio
async def test_transcript_formats_with_overlapping_speakers_multitrack():
"""Test format conversion for multitrack recordings with truly interleaved words.
Multitrack recordings have words from different speakers sorted by start time,
causing frequent speaker alternation. This tests the sentence-based segmentation
that groups each speaker's words into complete sentences.
"""
# Real multitrack data: words sorted by start time, speakers interleave
# Alice says: "Hello there." (0.0-1.0)
# Bob says: "I'm good." (0.5-1.5)
# When sorted by time, words interleave: Hello, I'm, there., good.
topics = [
TranscriptTopic(
id="1",
title="Topic 1",
summary="Summary 1",
timestamp=0.0,
words=[
Word(text="Hello ", start=0.0, end=0.5, speaker=0),
Word(text="I'm ", start=0.5, end=0.8, speaker=1),
Word(text="there.", start=0.5, end=1.0, speaker=0),
Word(text="good.", start=1.0, end=1.5, speaker=1),
],
),
]
participants = [
TranscriptParticipant(id="1", speaker=0, name="Alice"),
TranscriptParticipant(id="2", speaker=1, name="Bob"),
]
# With is_multitrack=True, should produce 2 segments (one per speaker sentence)
# not 4 segments (one per speaker change)
webvtt_result = topics_to_webvtt_named(topics, participants, is_multitrack=True)
expected_webvtt = """WEBVTT
00:00:00.000 --> 00:00:01.000
<v Alice>Hello there.
00:00:00.500 --> 00:00:01.500
<v Bob>I'm good.
"""
assert webvtt_result == expected_webvtt
text_result = transcript_to_text(topics, participants, is_multitrack=True)
lines = text_result.split("\n")
assert len(lines) == 2
assert "Alice: Hello there." in lines[0]
assert "Bob: I'm good." in lines[1]
timestamped_result = transcript_to_text_timestamped(
topics, participants, is_multitrack=True
)
timestamped_lines = timestamped_result.split("\n")
assert len(timestamped_lines) == 2
assert "[00:00] Alice: Hello there." in timestamped_lines[0]
assert "[00:00] Bob: I'm good." in timestamped_lines[1]
segments = transcript_to_json_segments(topics, participants, is_multitrack=True)
assert len(segments) == 2
assert segments[0].speaker_name == "Alice"
assert segments[0].text == "Hello there."
assert segments[1].speaker_name == "Bob"
assert segments[1].text == "I'm good."
@pytest.mark.asyncio
async def test_api_transcript_format_text(client):
"""Test GET /transcripts/{id} with transcript_format=text."""
response = await client.post("/transcripts", json={"name": "Test transcript"})
assert response.status_code == 200
tid = response.json()["id"]
from reflector.db.transcripts import (
TranscriptParticipant,
TranscriptTopic,
transcripts_controller,
)
from reflector.processors.types import Word
transcript = await transcripts_controller.get_by_id(tid)
await transcripts_controller.update(
transcript,
{
"participants": [
TranscriptParticipant(
id="1", speaker=0, name="John Smith"
).model_dump(),
TranscriptParticipant(id="2", speaker=1, name="Jane Doe").model_dump(),
]
},
)
await transcripts_controller.upsert_topic(
transcript,
TranscriptTopic(
title="Topic 1",
summary="Summary 1",
timestamp=0,
words=[
Word(text="Hello", start=0, end=1, speaker=0),
Word(text=" world.", start=1, end=2, speaker=0),
],
),
)
response = await client.get(f"/transcripts/{tid}?transcript_format=text")
assert response.status_code == 200
data = response.json()
assert data["transcript_format"] == "text"
assert "transcript" in data
assert "John Smith: Hello world." in data["transcript"]
@pytest.mark.asyncio
async def test_api_transcript_format_text_timestamped(client):
"""Test GET /transcripts/{id} with transcript_format=text-timestamped."""
response = await client.post("/transcripts", json={"name": "Test transcript"})
assert response.status_code == 200
tid = response.json()["id"]
from reflector.db.transcripts import (
TranscriptParticipant,
TranscriptTopic,
transcripts_controller,
)
from reflector.processors.types import Word
transcript = await transcripts_controller.get_by_id(tid)
await transcripts_controller.update(
transcript,
{
"participants": [
TranscriptParticipant(
id="1", speaker=0, name="John Smith"
).model_dump(),
]
},
)
await transcripts_controller.upsert_topic(
transcript,
TranscriptTopic(
title="Topic 1",
summary="Summary 1",
timestamp=0,
words=[
Word(text="Hello", start=65, end=66, speaker=0),
Word(text=" world.", start=66, end=67, speaker=0),
],
),
)
response = await client.get(
f"/transcripts/{tid}?transcript_format=text-timestamped"
)
assert response.status_code == 200
data = response.json()
assert data["transcript_format"] == "text-timestamped"
assert "transcript" in data
assert "[01:05] John Smith: Hello world." in data["transcript"]
@pytest.mark.asyncio
async def test_api_transcript_format_webvtt_named(client):
"""Test GET /transcripts/{id} with transcript_format=webvtt-named."""
response = await client.post("/transcripts", json={"name": "Test transcript"})
assert response.status_code == 200
tid = response.json()["id"]
from reflector.db.transcripts import (
TranscriptParticipant,
TranscriptTopic,
transcripts_controller,
)
from reflector.processors.types import Word
transcript = await transcripts_controller.get_by_id(tid)
await transcripts_controller.update(
transcript,
{
"participants": [
TranscriptParticipant(
id="1", speaker=0, name="John Smith"
).model_dump(),
]
},
)
await transcripts_controller.upsert_topic(
transcript,
TranscriptTopic(
title="Topic 1",
summary="Summary 1",
timestamp=0,
words=[
Word(text="Hello", start=0, end=1, speaker=0),
Word(text=" world.", start=1, end=2, speaker=0),
],
),
)
response = await client.get(f"/transcripts/{tid}?transcript_format=webvtt-named")
assert response.status_code == 200
data = response.json()
assert data["transcript_format"] == "webvtt-named"
assert "transcript" in data
assert "WEBVTT" in data["transcript"]
assert "<v John Smith>" in data["transcript"]
@pytest.mark.asyncio
async def test_api_transcript_format_json(client):
"""Test GET /transcripts/{id} with transcript_format=json."""
response = await client.post("/transcripts", json={"name": "Test transcript"})
assert response.status_code == 200
tid = response.json()["id"]
from reflector.db.transcripts import (
TranscriptParticipant,
TranscriptTopic,
transcripts_controller,
)
from reflector.processors.types import Word
transcript = await transcripts_controller.get_by_id(tid)
await transcripts_controller.update(
transcript,
{
"participants": [
TranscriptParticipant(
id="1", speaker=0, name="John Smith"
).model_dump(),
]
},
)
await transcripts_controller.upsert_topic(
transcript,
TranscriptTopic(
title="Topic 1",
summary="Summary 1",
timestamp=0,
words=[
Word(text="Hello", start=0, end=1, speaker=0),
Word(text=" world.", start=1, end=2, speaker=0),
],
),
)
response = await client.get(f"/transcripts/{tid}?transcript_format=json")
assert response.status_code == 200
data = response.json()
assert data["transcript_format"] == "json"
assert "transcript" in data
assert isinstance(data["transcript"], list)
assert len(data["transcript"]) == 1
assert data["transcript"][0]["speaker"] == 0
assert data["transcript"][0]["speaker_name"] == "John Smith"
assert data["transcript"][0]["text"] == "Hello world."
@pytest.mark.asyncio
async def test_api_transcript_format_default_is_text(client):
"""Test GET /transcripts/{id} defaults to text format."""
response = await client.post("/transcripts", json={"name": "Test transcript"})
assert response.status_code == 200
tid = response.json()["id"]
from reflector.db.transcripts import TranscriptTopic, transcripts_controller
from reflector.processors.types import Word
transcript = await transcripts_controller.get_by_id(tid)
await transcripts_controller.upsert_topic(
transcript,
TranscriptTopic(
title="Topic 1",
summary="Summary 1",
timestamp=0,
words=[
Word(text="Hello", start=0, end=1, speaker=0),
],
),
)
response = await client.get(f"/transcripts/{tid}")
assert response.status_code == 200
data = response.json()
assert data["transcript_format"] == "text"
assert "transcript" in data
@pytest.mark.asyncio
async def test_api_topics_endpoint_multitrack_segmentation(client):
"""Test GET /transcripts/{id}/topics uses sentence-based segmentation for multitrack.
This tests the fix for TASKS2.md - ensuring /topics endpoints correctly detect
multitrack recordings and use sentence-based segmentation instead of fragmenting
on every speaker change.
"""
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.transcripts import (
TranscriptParticipant,
TranscriptTopic,
transcripts_controller,
)
from reflector.processors.types import Word
# Create a multitrack recording (has track_keys)
recording = Recording(
bucket_name="test-bucket",
object_key="test-key",
recorded_at=datetime.now(timezone.utc),
track_keys=["track1.webm", "track2.webm"], # This makes it multitrack
)
await recordings_controller.create(recording)
# Create transcript linked to the recording
transcript = await transcripts_controller.add(
name="Multitrack Test",
source_kind="file",
recording_id=recording.id,
)
await transcripts_controller.update(
transcript,
{
"participants": [
TranscriptParticipant(id="1", speaker=0, name="Alice").model_dump(),
TranscriptParticipant(id="2", speaker=1, name="Bob").model_dump(),
]
},
)
# Add interleaved words (as they appear in real multitrack data)
await transcripts_controller.upsert_topic(
transcript,
TranscriptTopic(
title="Topic 1",
summary="Summary 1",
timestamp=0,
words=[
Word(text="Hello ", start=0.0, end=0.5, speaker=0),
Word(text="I'm ", start=0.5, end=0.8, speaker=1),
Word(text="there.", start=0.5, end=1.0, speaker=0),
Word(text="good.", start=1.0, end=1.5, speaker=1),
],
),
)
# Test /topics endpoint
response = await client.get(f"/transcripts/{transcript.id}/topics")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
topic = data[0]
# Key assertion: multitrack should produce 2 segments (one per speaker sentence)
# Not 4 segments (one per speaker change)
assert len(topic["segments"]) == 2
# Check content
segment_texts = [s["text"] for s in topic["segments"]]
assert "Hello there." in segment_texts
assert "I'm good." in segment_texts
@pytest.mark.asyncio
async def test_api_topics_endpoint_non_multitrack_segmentation(client):
"""Test GET /transcripts/{id}/topics uses default segmentation for non-multitrack.
Ensures backward compatibility - transcripts without multitrack recordings
should continue using the default speaker-change-based segmentation.
"""
from reflector.db.transcripts import (
TranscriptParticipant,
TranscriptTopic,
transcripts_controller,
)
from reflector.processors.types import Word
# Create transcript WITHOUT recording (defaulted as not multitrack) TODO better heuristic
response = await client.post("/transcripts", json={"name": "Test transcript"})
assert response.status_code == 200
tid = response.json()["id"]
transcript = await transcripts_controller.get_by_id(tid)
await transcripts_controller.update(
transcript,
{
"participants": [
TranscriptParticipant(id="1", speaker=0, name="Alice").model_dump(),
TranscriptParticipant(id="2", speaker=1, name="Bob").model_dump(),
]
},
)
# Add interleaved words
await transcripts_controller.upsert_topic(
transcript,
TranscriptTopic(
title="Topic 1",
summary="Summary 1",
timestamp=0,
words=[
Word(text="Hello ", start=0.0, end=0.5, speaker=0),
Word(text="I'm ", start=0.5, end=0.8, speaker=1),
Word(text="there.", start=0.5, end=1.0, speaker=0),
Word(text="good.", start=1.0, end=1.5, speaker=1),
],
),
)
# Test /topics endpoint
response = await client.get(f"/transcripts/{tid}/topics")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
topic = data[0]
# Non-multitrack: should produce 4 segments (one per speaker change)
assert len(topic["segments"]) == 4
@pytest.mark.asyncio
async def test_api_topics_with_words_endpoint_multitrack(client):
"""Test GET /transcripts/{id}/topics/with-words uses multitrack segmentation."""
from datetime import datetime, timezone
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.transcripts import (
TranscriptParticipant,
TranscriptTopic,
transcripts_controller,
)
from reflector.processors.types import Word
# Create multitrack recording
recording = Recording(
bucket_name="test-bucket",
object_key="test-key-2",
recorded_at=datetime.now(timezone.utc),
track_keys=["track1.webm", "track2.webm"],
)
await recordings_controller.create(recording)
transcript = await transcripts_controller.add(
name="Multitrack Test 2",
source_kind="file",
recording_id=recording.id,
)
await transcripts_controller.update(
transcript,
{
"participants": [
TranscriptParticipant(id="1", speaker=0, name="Alice").model_dump(),
TranscriptParticipant(id="2", speaker=1, name="Bob").model_dump(),
]
},
)
await transcripts_controller.upsert_topic(
transcript,
TranscriptTopic(
title="Topic 1",
summary="Summary 1",
timestamp=0,
words=[
Word(text="Hello ", start=0.0, end=0.5, speaker=0),
Word(text="I'm ", start=0.5, end=0.8, speaker=1),
Word(text="there.", start=0.5, end=1.0, speaker=0),
Word(text="good.", start=1.0, end=1.5, speaker=1),
],
),
)
response = await client.get(f"/transcripts/{transcript.id}/topics/with-words")
assert response.status_code == 200
data = response.json()
assert len(data) == 1
topic = data[0]
# Should have 2 segments (multitrack sentence-based)
assert len(topic["segments"]) == 2
# Should also have words field
assert "words" in topic
assert len(topic["words"]) == 4

View File

@@ -1,5 +1,8 @@
import pytest
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
@pytest.mark.asyncio
async def test_transcript_create(client):
@@ -182,3 +185,51 @@ async def test_transcript_mark_reviewed(authenticated_client, client):
response = await client.get(f"/transcripts/{tid}")
assert response.status_code == 200
assert response.json()["reviewed"] is True
@pytest.mark.asyncio
async def test_transcript_get_returns_room_name(authenticated_client, client):
"""Test that getting a transcript returns its room_name when linked to a room."""
# Create a room
room = await rooms_controller.add(
name="test-room-for-transcript",
user_id="test-user",
zulip_auto_post=False,
zulip_stream="",
zulip_topic="",
is_locked=False,
room_mode="normal",
recording_type="cloud",
recording_trigger="automatic-2nd-participant",
is_shared=False,
webhook_url="",
webhook_secret="",
)
# Create a transcript linked to the room
transcript = await transcripts_controller.add(
name="transcript-with-room",
source_kind="file",
room_id=room.id,
)
# Get the transcript and verify room_name is returned
response = await client.get(f"/transcripts/{transcript.id}")
assert response.status_code == 200
assert response.json()["room_id"] == room.id
assert response.json()["room_name"] == "test-room-for-transcript"
@pytest.mark.asyncio
async def test_transcript_get_returns_null_room_name_when_no_room(
authenticated_client, client
):
"""Test that room_name is null when transcript has no room."""
response = await client.post("/transcripts", json={"name": "no-room-transcript"})
assert response.status_code == 200
tid = response.json()["id"]
response = await client.get(f"/transcripts/{tid}")
assert response.status_code == 200
assert response.json()["room_id"] is None
assert response.json()["room_name"] is None

3416
server/uv.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -15,9 +15,12 @@ import {
createListCollection,
useDisclosure,
Tabs,
Popover,
Text,
HStack,
} from "@chakra-ui/react";
import { useEffect, useMemo, useState } from "react";
import { LuEye, LuEyeOff } from "react-icons/lu";
import { LuEye, LuEyeOff, LuInfo } from "react-icons/lu";
import useRoomList from "./useRoomList";
import type { components } from "../../reflector-api";
import {
@@ -67,6 +70,11 @@ const recordingTypeOptions: SelectOption[] = [
{ label: "Cloud", value: "cloud" },
];
const platformOptions: SelectOption[] = [
{ label: "Whereby", value: "whereby" },
{ label: "Daily", value: "daily" },
];
const roomInitialState = {
name: "",
zulipAutoPost: false,
@@ -82,6 +90,7 @@ const roomInitialState = {
icsUrl: "",
icsEnabled: false,
icsFetchInterval: 5,
platform: "whereby",
};
export default function RoomsList() {
@@ -99,6 +108,11 @@ export default function RoomsList() {
const recordingTypeCollection = createListCollection({
items: recordingTypeOptions,
});
const platformCollection = createListCollection({
items: platformOptions,
});
const [roomInput, setRoomInput] = useState<null | typeof roomInitialState>(
null,
);
@@ -143,15 +157,24 @@ export default function RoomsList() {
zulipStream: detailedEditedRoom.zulip_stream,
zulipTopic: detailedEditedRoom.zulip_topic,
isLocked: detailedEditedRoom.is_locked,
roomMode: detailedEditedRoom.room_mode,
roomMode:
detailedEditedRoom.platform === "daily"
? "group"
: detailedEditedRoom.room_mode,
recordingType: detailedEditedRoom.recording_type,
recordingTrigger: detailedEditedRoom.recording_trigger,
recordingTrigger:
detailedEditedRoom.platform === "daily"
? detailedEditedRoom.recording_type === "cloud"
? "automatic-2nd-participant"
: "none"
: detailedEditedRoom.recording_trigger,
isShared: detailedEditedRoom.is_shared,
webhookUrl: detailedEditedRoom.webhook_url || "",
webhookSecret: detailedEditedRoom.webhook_secret || "",
icsUrl: detailedEditedRoom.ics_url || "",
icsEnabled: detailedEditedRoom.ics_enabled || false,
icsFetchInterval: detailedEditedRoom.ics_fetch_interval || 5,
platform: detailedEditedRoom.platform,
}
: null,
[detailedEditedRoom],
@@ -277,21 +300,32 @@ export default function RoomsList() {
return;
}
const platform: "whereby" | "daily" | null =
room.platform === "whereby" || room.platform === "daily"
? room.platform
: null;
const roomData = {
name: room.name,
zulip_auto_post: room.zulipAutoPost,
zulip_stream: room.zulipStream,
zulip_topic: room.zulipTopic,
is_locked: room.isLocked,
room_mode: room.roomMode,
room_mode: platform === "daily" ? "group" : room.roomMode,
recording_type: room.recordingType,
recording_trigger: room.recordingTrigger,
recording_trigger:
platform === "daily"
? room.recordingType === "cloud"
? "automatic-2nd-participant"
: "none"
: room.recordingTrigger,
is_shared: room.isShared,
webhook_url: room.webhookUrl,
webhook_secret: room.webhookSecret,
ics_url: room.icsUrl,
ics_enabled: room.icsEnabled,
ics_fetch_interval: room.icsFetchInterval,
platform,
};
if (isEditing) {
@@ -339,15 +373,21 @@ export default function RoomsList() {
zulipStream: roomData.zulip_stream,
zulipTopic: roomData.zulip_topic,
isLocked: roomData.is_locked,
roomMode: roomData.room_mode,
roomMode: roomData.platform === "daily" ? "group" : roomData.room_mode, // Daily always uses 2-200
recordingType: roomData.recording_type,
recordingTrigger: roomData.recording_trigger,
recordingTrigger:
roomData.platform === "daily"
? roomData.recording_type === "cloud"
? "automatic-2nd-participant"
: "none"
: roomData.recording_trigger,
isShared: roomData.is_shared,
webhookUrl: roomData.webhook_url || "",
webhookSecret: roomData.webhook_secret || "",
icsUrl: roomData.ics_url || "",
icsEnabled: roomData.ics_enabled || false,
icsFetchInterval: roomData.ics_fetch_interval || 5,
platform: roomData.platform,
});
setEditRoomId(roomId);
setIsEditing(true);
@@ -482,6 +522,52 @@ export default function RoomsList() {
)}
</Field.Root>
<Field.Root mt={4}>
<Field.Label>Platform</Field.Label>
<Select.Root
value={[room.platform]}
onValueChange={(e) => {
const newPlatform = e.value[0] as "whereby" | "daily";
const updates: Partial<typeof room> = {
platform: newPlatform,
};
if (newPlatform === "daily") {
updates.roomMode = "group";
updates.recordingTrigger =
room.recordingType === "cloud"
? "automatic-2nd-participant"
: "none";
} else {
if (room.recordingType !== "cloud") {
updates.recordingTrigger = "none";
}
}
setRoomInput({ ...room, ...updates });
}}
collection={platformCollection}
>
<Select.HiddenSelect />
<Select.Control>
<Select.Trigger>
<Select.ValueText placeholder="Select platform" />
</Select.Trigger>
<Select.IndicatorGroup>
<Select.Indicator />
</Select.IndicatorGroup>
</Select.Control>
<Select.Positioner>
<Select.Content>
{platformOptions.map((option) => (
<Select.Item key={option.value} item={option}>
{option.label}
<Select.ItemIndicator />
</Select.Item>
))}
</Select.Content>
</Select.Positioner>
</Select.Root>
</Field.Root>
<Field.Root mt={4}>
<Checkbox.Root
name="isLocked"
@@ -504,6 +590,7 @@ export default function RoomsList() {
<Checkbox.Label>Locked room</Checkbox.Label>
</Checkbox.Root>
</Field.Root>
{room.platform !== "daily" && (
<Field.Root mt={4}>
<Field.Label>Room size</Field.Label>
<Select.Root
@@ -534,20 +621,64 @@ export default function RoomsList() {
</Select.Positioner>
</Select.Root>
</Field.Root>
)}
<Field.Root mt={4}>
<HStack gap={2} alignItems="center">
<Field.Label>Recording type</Field.Label>
<Popover.Root>
<Popover.Trigger asChild>
<IconButton
aria-label="Recording type help"
variant="ghost"
size="xs"
colorPalette="gray"
>
<LuInfo />
</IconButton>
</Popover.Trigger>
<Popover.Positioner>
<Popover.Content>
<Popover.Arrow />
<Popover.Body>
<Text fontSize="sm" lineHeight="1.6">
<strong>None:</strong> No recording will be
created.
<br />
<br />
<strong>Local:</strong> Recording happens on
each participant's device. Files are saved
locally.
<br />
<br />
<strong>Cloud:</strong> Recording happens on
the platform's servers and is available after
the meeting ends.
</Text>
</Popover.Body>
</Popover.Content>
</Popover.Positioner>
</Popover.Root>
</HStack>
<Select.Root
value={[room.recordingType]}
onValueChange={(e) =>
setRoomInput({
...room,
recordingType: e.value[0],
recordingTrigger:
e.value[0] !== "cloud"
onValueChange={(e) => {
const newRecordingType = e.value[0];
const updates: Partial<typeof room> = {
recordingType: newRecordingType,
};
if (room.platform === "daily") {
updates.recordingTrigger =
newRecordingType === "cloud"
? "automatic-2nd-participant"
: "none";
} else {
updates.recordingTrigger =
newRecordingType !== "cloud"
? "none"
: room.recordingTrigger,
})
: room.recordingTrigger;
}
setRoomInput({ ...room, ...updates });
}}
collection={recordingTypeCollection}
>
<Select.HiddenSelect />
@@ -571,8 +702,45 @@ export default function RoomsList() {
</Select.Positioner>
</Select.Root>
</Field.Root>
{room.recordingType === "cloud" &&
room.platform !== "daily" && (
<Field.Root mt={4}>
<Field.Label>Cloud recording start trigger</Field.Label>
<HStack gap={2} alignItems="center">
<Field.Label>Recording start trigger</Field.Label>
<Popover.Root>
<Popover.Trigger asChild>
<IconButton
aria-label="Recording start trigger help"
variant="ghost"
size="xs"
colorPalette="gray"
>
<LuInfo />
</IconButton>
</Popover.Trigger>
<Popover.Positioner>
<Popover.Content>
<Popover.Arrow />
<Popover.Body>
<Text fontSize="sm" lineHeight="1.6">
<strong>None:</strong> Recording must be
started manually by a participant.
<br />
<br />
<strong>Prompt:</strong> Participants will
be prompted to start recording when they
join.
<br />
<br />
<strong>Automatic:</strong> Recording
starts automatically when a second
participant joins.
</Text>
</Popover.Body>
</Popover.Content>
</Popover.Positioner>
</Popover.Root>
</HStack>
<Select.Root
value={[room.recordingTrigger]}
onValueChange={(e) =>
@@ -582,7 +750,6 @@ export default function RoomsList() {
})
}
collection={recordingTriggerCollection}
disabled={room.recordingType !== "cloud"}
>
<Select.HiddenSelect />
<Select.Control>
@@ -605,6 +772,7 @@ export default function RoomsList() {
</Select.Positioner>
</Select.Root>
</Field.Root>
)}
<Field.Root mt={4}>
<Checkbox.Root

View File

@@ -117,15 +117,6 @@ export default function TranscriptDetails(details: TranscriptDetails) {
return <Modal title="Loading" text={"Loading transcript..."} />;
}
if (mp3.error) {
return (
<Modal
title="Transcription error"
text={`There was an error loading the recording. Error: ${mp3.error}`}
/>
);
}
return (
<>
<Grid
@@ -147,7 +138,12 @@ export default function TranscriptDetails(details: TranscriptDetails) {
/>
) : !mp3.loading && (waveform.error || mp3.error) ? (
<Box p={4} bg="red.100" borderRadius="md">
<Text>Error loading this recording</Text>
<Text>
Error loading{" "}
{[waveform.error && "waveform", mp3.error && "mp3"]
.filter(Boolean)
.join(" and ")}
</Text>
</Box>
) : (
<Skeleton h={14} />

View File

@@ -1,14 +1,16 @@
import { useState } from "react";
import type { components } from "../../reflector-api";
type GetTranscript = components["schemas"]["GetTranscript"];
import type { components, operations } from "../../reflector-api";
type GetTranscriptWithParticipants =
components["schemas"]["GetTranscriptWithParticipants"];
type GetTranscriptTopic = components["schemas"]["GetTranscriptTopic"];
import { Button, BoxProps, Box } from "@chakra-ui/react";
import { buildTranscriptWithTopics } from "./buildTranscriptWithTopics";
import { useTranscriptParticipants } from "../../lib/apiHooks";
import { Button, BoxProps, Box, Menu, Text } from "@chakra-ui/react";
import { LuChevronDown } from "react-icons/lu";
import { client } from "../../lib/apiClient";
import { toaster } from "../../components/ui/toaster";
type ShareCopyProps = {
finalSummaryElement: HTMLDivElement | null;
transcript: GetTranscript;
transcript: GetTranscriptWithParticipants;
topics: GetTranscriptTopic[];
};
@@ -20,11 +22,33 @@ export default function ShareCopy({
}: ShareCopyProps & BoxProps) {
const [isCopiedSummary, setIsCopiedSummary] = useState(false);
const [isCopiedTranscript, setIsCopiedTranscript] = useState(false);
const participantsQuery = useTranscriptParticipants(transcript?.id || null);
const [isCopying, setIsCopying] = useState(false);
type ApiTranscriptFormat = NonNullable<
operations["v1_transcript_get"]["parameters"]["query"]
>["transcript_format"];
const TRANSCRIPT_FORMATS = [
"text",
"text-timestamped",
"webvtt-named",
"json",
] as const satisfies ApiTranscriptFormat[];
type TranscriptFormat = (typeof TRANSCRIPT_FORMATS)[number];
const TRANSCRIPT_FORMAT_LABELS: { [k in TranscriptFormat]: string } = {
text: "Plain text",
"text-timestamped": "Text + timestamps",
"webvtt-named": "WebVTT (named)",
json: "JSON",
};
const formatOptions = TRANSCRIPT_FORMATS.map((f) => ({
value: f,
label: TRANSCRIPT_FORMAT_LABELS[f],
}));
const onCopySummaryClick = () => {
const text_to_copy = finalSummaryElement?.innerText;
if (text_to_copy) {
navigator.clipboard.writeText(text_to_copy).then(() => {
setIsCopiedSummary(true);
@@ -34,27 +58,91 @@ export default function ShareCopy({
}
};
const onCopyTranscriptClick = () => {
const text_to_copy =
buildTranscriptWithTopics(
topics || [],
participantsQuery?.data || null,
transcript?.title || null,
) || "";
text_to_copy &&
navigator.clipboard.writeText(text_to_copy).then(() => {
setIsCopiedTranscript(true);
// Reset the copied state after 2 seconds
setTimeout(() => setIsCopiedTranscript(false), 2000);
const onCopyTranscriptFormatClick = async (format: TranscriptFormat) => {
try {
setIsCopying(true);
const { data, error } = await client.GET(
"/v1/transcripts/{transcript_id}",
{
params: {
path: { transcript_id: transcript.id },
query: { transcript_format: format },
},
},
);
if (error) {
console.error("Failed to copy transcript:", error);
toaster.create({
duration: 3000,
render: () => (
<Box bg="red.500" color="white" px={4} py={3} borderRadius="md">
<Text fontWeight="bold">Error</Text>
<Text fontSize="sm">Failed to fetch transcript</Text>
</Box>
),
});
return;
}
const copiedText =
format === "json"
? JSON.stringify(data?.transcript ?? {}, null, 2)
: String(data?.transcript ?? "");
if (copiedText) {
await navigator.clipboard.writeText(copiedText);
setIsCopiedTranscript(true);
setTimeout(() => setIsCopiedTranscript(false), 2000);
}
} catch (e) {
console.error("Failed to copy transcript:", e);
toaster.create({
duration: 3000,
render: () => (
<Box bg="red.500" color="white" px={4} py={3} borderRadius="md">
<Text fontWeight="bold">Error</Text>
<Text fontSize="sm">Failed to copy transcript</Text>
</Box>
),
});
} finally {
setIsCopying(false);
}
};
return (
<Box {...boxProps}>
<Button onClick={onCopyTranscriptClick} mr={2} variant="subtle">
<Menu.Root
closeOnSelect={true}
lazyMount={true}
positioning={{ gutter: 4 }}
>
<Menu.Trigger asChild>
<Button
mr={2}
variant="subtle"
loading={isCopying}
loadingText="Copying..."
>
{isCopiedTranscript ? "Copied!" : "Copy Transcript"}
<LuChevronDown style={{ marginLeft: 6 }} />
</Button>
</Menu.Trigger>
<Menu.Positioner>
<Menu.Content>
{formatOptions.map((opt) => (
<Menu.Item
key={opt.value}
value={opt.value}
_hover={{ backgroundColor: "gray.100" }}
onClick={() => onCopyTranscriptFormatClick(opt.value)}
>
{opt.label}
</Menu.Item>
))}
</Menu.Content>
</Menu.Positioner>
</Menu.Root>
<Button onClick={onCopySummaryClick} variant="subtle">
{isCopiedSummary ? "Copied!" : "Copy Summary"}
</Button>

View File

@@ -2,20 +2,29 @@
import { Spinner, Link } from "@chakra-ui/react";
import { useAuth } from "../lib/AuthProvider";
import { usePathname } from "next/navigation";
import { getLogoutRedirectUrl } from "../lib/auth";
export default function UserInfo() {
const auth = useAuth();
const pathname = usePathname();
const status = auth.status;
const isLoading = status === "loading";
const isAuthenticated = status === "authenticated";
const isRefreshing = status === "refreshing";
const callbackUrl = getLogoutRedirectUrl(pathname);
return isLoading ? (
<Spinner size="xs" className="mx-3" />
) : !isAuthenticated && !isRefreshing ? (
<Link
href="/"
href="#"
className="font-light px-2"
onClick={() => auth.signIn("authentik")}
onClick={(e) => {
e.preventDefault();
auth.signIn("authentik");
}}
>
Log in
</Link>
@@ -23,7 +32,7 @@ export default function UserInfo() {
<Link
href="#"
className="font-light px-2"
onClick={() => auth.signOut({ callbackUrl: "/" })}
onClick={() => auth.signOut({ callbackUrl })}
>
Log out
</Link>

View File

@@ -11,6 +11,7 @@ import {
recordingTypeRequiresConsent,
} from "../../lib/consent";
import { useRoomJoinMeeting } from "../../lib/apiHooks";
import { assertExists } from "../../lib/utils";
type Meeting = components["schemas"]["Meeting"];
@@ -22,16 +23,15 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
const router = useRouter();
const params = useParams();
const auth = useAuth();
const status = auth.status;
const authLastUserId = auth.lastUserId;
const containerRef = useRef<HTMLDivElement>(null);
const joinMutation = useRoomJoinMeeting();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
const roomName = params?.roomName as string;
// Always call /join to get a fresh token with user_id
useEffect(() => {
if (status === "loading" || !meeting?.id || !roomName) return;
if (authLastUserId === undefined || !meeting?.id || !roomName) return;
const join = async () => {
try {
@@ -50,18 +50,17 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
};
join();
}, [meeting?.id, roomName, status]);
}, [meeting?.id, roomName, authLastUserId]);
const roomUrl = joinedMeeting?.host_room_url || joinedMeeting?.room_url;
const isLoading =
status === "loading" || joinMutation.isPending || !joinedMeeting;
const roomUrl = joinedMeeting?.room_url;
const handleLeave = useCallback(() => {
router.push("/browse");
}, [router]);
useEffect(() => {
if (isLoading || !roomUrl || !containerRef.current) return;
if (authLastUserId === undefined || !roomUrl || !containerRef.current)
return;
let frame: DailyCall | null = null;
let destroyed = false;
@@ -92,19 +91,41 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
frame.on("joined-meeting", async () => {
try {
await frame.startRecording({ type: "raw-tracks" });
const frameInstance = assertExists(
frame,
"frame object got lost somewhere after frame.on was called",
);
if (meeting.recording_type === "cloud") {
console.log("Starting cloud recording");
await frameInstance.startRecording({ type: "raw-tracks" });
}
} catch (error) {
console.error("Failed to start recording:", error);
}
});
await frame.join({ url: roomUrl });
await frame.join({
url: roomUrl,
sendSettings: {
video: {
// Optimize bandwidth for camera video
// allowAdaptiveLayers automatically adjusts quality based on network conditions
allowAdaptiveLayers: true,
// Use bandwidth-optimized preset as fallback for browsers without adaptive support
maxQuality: "medium",
},
// Note: screenVideo intentionally not configured to preserve full quality for screen shares
},
});
} catch (error) {
console.error("Error creating Daily frame:", error);
}
};
createAndJoin();
createAndJoin().catch((error) => {
console.error("Failed to create and join meeting:", error);
});
return () => {
destroyed = true;
@@ -114,9 +135,9 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
});
}
};
}, [roomUrl, isLoading, handleLeave]);
}, [roomUrl, authLastUserId, handleLeave]);
if (isLoading) {
if (authLastUserId === undefined) {
return (
<Center width="100vw" height="100vh">
<Spinner size="xl" />

View File

@@ -1,6 +1,6 @@
"use client";
import { createContext, useContext } from "react";
import { createContext, useContext, useRef } from "react";
import { useSession as useNextAuthSession } from "next-auth/react";
import { signOut, signIn } from "next-auth/react";
import { configureApiAuth } from "./apiClient";
@@ -25,6 +25,9 @@ type AuthContextType = (
update: () => Promise<Session | null>;
signIn: typeof signIn;
signOut: typeof signOut;
// TODO probably rename isLoading to isReloading and make THIS field "isLoading"
// undefined is "not known", null is "is certainly logged out"
lastUserId: CustomSession["user"]["id"] | null | undefined;
};
const AuthContext = createContext<AuthContextType | undefined>(undefined);
@@ -41,10 +44,15 @@ const noopAuthContext: AuthContextType = {
signOut: async () => {
throw new Error("signOut not supposed to be called");
},
lastUserId: undefined,
};
export function AuthProvider({ children }: { children: React.ReactNode }) {
const { data: session, status, update } = useNextAuthSession();
// referential comparison done in component, must be primitive /or cached
const lastUserId = useRef<CustomSession["user"]["id"] | null | undefined>(
null,
);
const contextValue: AuthContextType = isAuthEnabled
? {
@@ -73,11 +81,16 @@ export function AuthProvider({ children }: { children: React.ReactNode }) {
case "authenticated": {
const customSession = assertCustomSession(session);
if (customSession?.error === REFRESH_ACCESS_TOKEN_ERROR) {
// warning: call order-dependent
lastUserId.current = null;
// token had expired but next auth still returns "authenticated" so show user unauthenticated state
return {
status: "unauthenticated" as const,
};
} else if (customSession?.accessToken) {
// updates anyways with updated properties below
// warning! execution order conscience, must be ran before reading lastUserId.current below
lastUserId.current = customSession.user.id;
return {
status,
accessToken: customSession.accessToken,
@@ -92,6 +105,8 @@ export function AuthProvider({ children }: { children: React.ReactNode }) {
}
}
case "unauthenticated": {
// warning: call order-dependent
lastUserId.current = null;
return { status: "unauthenticated" as const };
}
default: {
@@ -103,6 +118,8 @@ export function AuthProvider({ children }: { children: React.ReactNode }) {
update,
signIn,
signOut,
// for optimistic cases when we assume "loading" doesn't immediately invalidate the user
lastUserId: lastUserId.current,
}
: noopAuthContext;

View File

@@ -18,3 +18,8 @@ export const LOGIN_REQUIRED_PAGES = [
export const PROTECTED_PAGES = new RegExp(
LOGIN_REQUIRED_PAGES.map((page) => `^${page}$`).join("|"),
);
export function getLogoutRedirectUrl(pathname: string): string {
const transcriptPagePattern = /^\/transcripts\/[^/]+$/;
return transcriptPagePattern.test(pathname) ? pathname : "/";
}

View File

@@ -32,6 +32,11 @@ async function getUserId(accessToken: string): Promise<string | null> {
});
if (!response.ok) {
try {
console.error(await response.text());
} catch (e) {
console.error("Failed to parse error response", e);
}
return null;
}
@@ -143,7 +148,7 @@ export const authOptions = (): AuthOptions =>
},
async session({ session, token }) {
const extendedToken = token as JWTWithAccessToken;
console.log("extendedToken", extendedToken);
const userId = await getUserId(extendedToken.accessToken);
return {

View File

@@ -696,7 +696,7 @@ export interface paths {
patch?: never;
trace?: never;
};
"/v1/webhook": {
"/v1/daily/webhook": {
parameters: {
query?: never;
header?: never;
@@ -708,6 +708,27 @@ export interface paths {
/**
* Webhook
* @description Handle Daily webhook events.
*
* Example webhook payload:
* {
* "version": "1.0.0",
* "type": "recording.ready-to-download",
* "id": "rec-rtd-c3df927c-f738-4471-a2b7-066fa7e95a6b-1692124192",
* "payload": {
* "recording_id": "08fa0b24-9220-44c5-846c-3f116cf8e738",
* "room_name": "Xcm97xRZ08b2dePKb78g",
* "start_ts": 1692124183,
* "status": "finished",
* "max_participants": 1,
* "duration": 9,
* "share_token": "ntDCL5k98Ulq", #gitleaks:allow
* "s3_key": "api-test-1j8fizhzd30c/Xcm97xRZ08b2dePKb78g/1692124183028"
* },
* "event_ts": 1692124192
* }
*
* Daily.co circuit-breaker: After 3+ failed responses (4xx/5xx), webhook
* state→FAILED, stops sending events. Reset: scripts/recreate_daily_webhook.py
*/
post: operations["v1_webhook"];
delete?: never;
@@ -899,81 +920,11 @@ export interface components {
target_language: string;
source_kind?: components["schemas"]["SourceKind"] | null;
};
/**
* DailyWebhookEvent
* @description Daily webhook event structure.
*/
DailyWebhookEvent: {
/** Type */
type: string;
/** Id */
id: string;
/** Ts */
ts: number;
/** Data */
data: {
[key: string]: unknown;
};
};
/** DeletionStatus */
DeletionStatus: {
/** Status */
status: string;
};
/** GetTranscript */
GetTranscript: {
/** Id */
id: string;
/** User Id */
user_id: string | null;
/** Name */
name: string;
/**
* Status
* @enum {string}
*/
status:
| "idle"
| "uploaded"
| "recording"
| "processing"
| "error"
| "ended";
/** Locked */
locked: boolean;
/** Duration */
duration: number;
/** Title */
title: string | null;
/** Short Summary */
short_summary: string | null;
/** Long Summary */
long_summary: string | null;
/** Created At */
created_at: string;
/**
* Share Mode
* @default private
*/
share_mode: string;
/** Source Language */
source_language: string | null;
/** Target Language */
target_language: string | null;
/** Reviewed */
reviewed: boolean;
/** Meeting Id */
meeting_id: string | null;
source_kind: components["schemas"]["SourceKind"];
/** Room Id */
room_id?: string | null;
/** Room Name */
room_name?: string | null;
/** Audio Deleted */
audio_deleted?: boolean | null;
/** Participants */
participants: components["schemas"]["TranscriptParticipant"][] | null;
};
/** GetTranscriptMinimal */
GetTranscriptMinimal: {
/** Id */
@@ -1105,6 +1056,345 @@ export interface components {
*/
words_per_speaker: components["schemas"]["SpeakerWords"][];
};
/**
* GetTranscriptWithJSON
* @description Transcript response as structured JSON segments.
*
* Format: Array of segment objects with speaker info, text, and timing.
* Example:
* [
* {
* "speaker": 0,
* "speaker_name": "John Smith",
* "text": "Hello everyone",
* "start": 0.0,
* "end": 5.0
* }
* ]
*/
GetTranscriptWithJSON: {
/** Id */
id: string;
/** User Id */
user_id: string | null;
/** Name */
name: string;
/**
* Status
* @enum {string}
*/
status:
| "idle"
| "uploaded"
| "recording"
| "processing"
| "error"
| "ended";
/** Locked */
locked: boolean;
/** Duration */
duration: number;
/** Title */
title: string | null;
/** Short Summary */
short_summary: string | null;
/** Long Summary */
long_summary: string | null;
/** Created At */
created_at: string;
/**
* Share Mode
* @default private
*/
share_mode: string;
/** Source Language */
source_language: string | null;
/** Target Language */
target_language: string | null;
/** Reviewed */
reviewed: boolean;
/** Meeting Id */
meeting_id: string | null;
source_kind: components["schemas"]["SourceKind"];
/** Room Id */
room_id?: string | null;
/** Room Name */
room_name?: string | null;
/** Audio Deleted */
audio_deleted?: boolean | null;
/** Participants */
participants: components["schemas"]["TranscriptParticipant"][] | null;
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
transcript_format: "json";
/** Transcript */
transcript: components["schemas"]["TranscriptSegment"][];
};
/** GetTranscriptWithParticipants */
GetTranscriptWithParticipants: {
/** Id */
id: string;
/** User Id */
user_id: string | null;
/** Name */
name: string;
/**
* Status
* @enum {string}
*/
status:
| "idle"
| "uploaded"
| "recording"
| "processing"
| "error"
| "ended";
/** Locked */
locked: boolean;
/** Duration */
duration: number;
/** Title */
title: string | null;
/** Short Summary */
short_summary: string | null;
/** Long Summary */
long_summary: string | null;
/** Created At */
created_at: string;
/**
* Share Mode
* @default private
*/
share_mode: string;
/** Source Language */
source_language: string | null;
/** Target Language */
target_language: string | null;
/** Reviewed */
reviewed: boolean;
/** Meeting Id */
meeting_id: string | null;
source_kind: components["schemas"]["SourceKind"];
/** Room Id */
room_id?: string | null;
/** Room Name */
room_name?: string | null;
/** Audio Deleted */
audio_deleted?: boolean | null;
/** Participants */
participants: components["schemas"]["TranscriptParticipant"][] | null;
};
/**
* GetTranscriptWithText
* @description Transcript response with plain text format.
*
* Format: Speaker names followed by their dialogue, one line per segment.
* Example:
* John Smith: Hello everyone
* Jane Doe: Hi there
*/
GetTranscriptWithText: {
/** Id */
id: string;
/** User Id */
user_id: string | null;
/** Name */
name: string;
/**
* Status
* @enum {string}
*/
status:
| "idle"
| "uploaded"
| "recording"
| "processing"
| "error"
| "ended";
/** Locked */
locked: boolean;
/** Duration */
duration: number;
/** Title */
title: string | null;
/** Short Summary */
short_summary: string | null;
/** Long Summary */
long_summary: string | null;
/** Created At */
created_at: string;
/**
* Share Mode
* @default private
*/
share_mode: string;
/** Source Language */
source_language: string | null;
/** Target Language */
target_language: string | null;
/** Reviewed */
reviewed: boolean;
/** Meeting Id */
meeting_id: string | null;
source_kind: components["schemas"]["SourceKind"];
/** Room Id */
room_id?: string | null;
/** Room Name */
room_name?: string | null;
/** Audio Deleted */
audio_deleted?: boolean | null;
/** Participants */
participants: components["schemas"]["TranscriptParticipant"][] | null;
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
transcript_format: "text";
/** Transcript */
transcript: string;
};
/**
* GetTranscriptWithTextTimestamped
* @description Transcript response with timestamped text format.
*
* Format: [MM:SS] timestamp prefix before each speaker and dialogue.
* Example:
* [00:00] John Smith: Hello everyone
* [00:05] Jane Doe: Hi there
*/
GetTranscriptWithTextTimestamped: {
/** Id */
id: string;
/** User Id */
user_id: string | null;
/** Name */
name: string;
/**
* Status
* @enum {string}
*/
status:
| "idle"
| "uploaded"
| "recording"
| "processing"
| "error"
| "ended";
/** Locked */
locked: boolean;
/** Duration */
duration: number;
/** Title */
title: string | null;
/** Short Summary */
short_summary: string | null;
/** Long Summary */
long_summary: string | null;
/** Created At */
created_at: string;
/**
* Share Mode
* @default private
*/
share_mode: string;
/** Source Language */
source_language: string | null;
/** Target Language */
target_language: string | null;
/** Reviewed */
reviewed: boolean;
/** Meeting Id */
meeting_id: string | null;
source_kind: components["schemas"]["SourceKind"];
/** Room Id */
room_id?: string | null;
/** Room Name */
room_name?: string | null;
/** Audio Deleted */
audio_deleted?: boolean | null;
/** Participants */
participants: components["schemas"]["TranscriptParticipant"][] | null;
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
transcript_format: "text-timestamped";
/** Transcript */
transcript: string;
};
/**
* GetTranscriptWithWebVTTNamed
* @description Transcript response in WebVTT subtitle format with participant names.
*
* Format: Standard WebVTT with voice tags using participant names.
* Example:
* WEBVTT
*
* 00:00:00.000 --> 00:00:05.000
* <v John Smith>Hello everyone
*/
GetTranscriptWithWebVTTNamed: {
/** Id */
id: string;
/** User Id */
user_id: string | null;
/** Name */
name: string;
/**
* Status
* @enum {string}
*/
status:
| "idle"
| "uploaded"
| "recording"
| "processing"
| "error"
| "ended";
/** Locked */
locked: boolean;
/** Duration */
duration: number;
/** Title */
title: string | null;
/** Short Summary */
short_summary: string | null;
/** Long Summary */
long_summary: string | null;
/** Created At */
created_at: string;
/**
* Share Mode
* @default private
*/
share_mode: string;
/** Source Language */
source_language: string | null;
/** Target Language */
target_language: string | null;
/** Reviewed */
reviewed: boolean;
/** Meeting Id */
meeting_id: string | null;
source_kind: components["schemas"]["SourceKind"];
/** Room Id */
room_id?: string | null;
/** Room Name */
room_name?: string | null;
/** Audio Deleted */
audio_deleted?: boolean | null;
/** Participants */
participants: components["schemas"]["TranscriptParticipant"][] | null;
/**
* @description discriminator enum property added by openapi-typescript
* @enum {string}
*/
transcript_format: "webvtt-named";
/** Transcript */
transcript: string;
};
/** HTTPValidationError */
HTTPValidationError: {
/** Detail */
@@ -1233,7 +1523,6 @@ export interface components {
} | null;
/**
* Platform
* @default whereby
* @enum {string}
*/
platform: "whereby" | "daily";
@@ -1325,7 +1614,6 @@ export interface components {
ics_last_etag?: string | null;
/**
* Platform
* @default whereby
* @enum {string}
*/
platform: "whereby" | "daily";
@@ -1377,7 +1665,6 @@ export interface components {
ics_last_etag?: string | null;
/**
* Platform
* @default whereby
* @enum {string}
*/
platform: "whereby" | "daily";
@@ -1523,6 +1810,24 @@ export interface components {
speaker: number | null;
/** Name */
name: string;
/** User Id */
user_id?: string | null;
};
/**
* TranscriptSegment
* @description A single transcript segment with speaker and timing information.
*/
TranscriptSegment: {
/** Speaker */
speaker: number;
/** Speaker Name */
speaker_name: string;
/** Text */
text: string;
/** Start */
start: number;
/** End */
end: number;
};
/** UpdateParticipant */
UpdateParticipant: {
@@ -2311,7 +2616,7 @@ export interface operations {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["GetTranscript"];
"application/json": components["schemas"]["GetTranscriptWithParticipants"];
};
};
/** @description Validation Error */
@@ -2369,7 +2674,13 @@ export interface operations {
};
v1_transcript_get: {
parameters: {
query?: never;
query?: {
transcript_format?:
| "text"
| "text-timestamped"
| "webvtt-named"
| "json";
};
header?: never;
path: {
transcript_id: string;
@@ -2384,7 +2695,11 @@ export interface operations {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["GetTranscript"];
"application/json":
| components["schemas"]["GetTranscriptWithText"]
| components["schemas"]["GetTranscriptWithTextTimestamped"]
| components["schemas"]["GetTranscriptWithWebVTTNamed"]
| components["schemas"]["GetTranscriptWithJSON"];
};
};
/** @description Validation Error */
@@ -2450,7 +2765,7 @@ export interface operations {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["GetTranscript"];
"application/json": components["schemas"]["GetTranscriptWithParticipants"];
};
};
/** @description Validation Error */
@@ -3256,11 +3571,7 @@ export interface operations {
path?: never;
cookie?: never;
};
requestBody: {
content: {
"application/json": components["schemas"]["DailyWebhookEvent"];
};
};
requestBody?: never;
responses: {
/** @description Successful Response */
200: {
@@ -3271,15 +3582,6 @@ export interface operations {
"application/json": unknown;
};
};
/** @description Validation Error */
422: {
headers: {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["HTTPValidationError"];
};
};
};
};
}

View File

@@ -31,7 +31,7 @@
"ioredis": "^5.7.0",
"jest-worker": "^29.6.2",
"lucide-react": "^0.525.0",
"next": "^15.5.3",
"next": "^15.5.9",
"next-auth": "^4.24.7",
"next-themes": "^0.4.6",
"nuqs": "^2.4.3",

508
www/pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff