Compare commits

..

21 Commits

Author SHA1 Message Date
Igor Loskutov
f6201dd378 fix: set source_kind to FILE on audio file upload
The upload endpoint left source_kind as the default LIVE even when
a file was uploaded. Now sets it to FILE when the upload completes.
2026-02-11 13:37:55 -05:00
Igor Loskutov
9f62959069 feat: standalone uses self-hosted GPU service for transcription+diarization
Replace in-process pyannote approach with self-hosted gpu/self_hosted/ service.
Same HTTP API as Modal — just TRANSCRIPT_URL/DIARIZATION_URL point to local container.

- Add gpu/self_hosted/Dockerfile.cpu (GPU Dockerfile minus NVIDIA CUDA)
- Add S3 model bundle fallback in diarizer.py when HF_TOKEN not set
- Add gpu service to docker-compose.standalone.yml with compose env overrides
- Fix /browse empty in PUBLIC_MODE (search+list queries filtered out roomless transcripts)
- Remove audio_diarization_pyannote.py, file_diarization_pyannote.py and tests
- Remove pyannote-audio from server local deps
2026-02-11 13:37:55 -05:00
Igor Loskutov
0353c23a94 feat: add local pyannote file diarization processor
Enables file diarization without Modal by using pyannote.audio locally.
Downloads model bundle from S3 on first use, caches locally, patches
config to use local paths. Set DIARIZATION_BACKEND=pyannote to enable.
2026-02-11 13:37:12 -05:00
Sergey Mankovsky
7372f80530 Allow reprocessing idle multitrack transcripts 2026-02-11 19:29:29 +01:00
Sergey Mankovsky
208361c8cc Fix event loop is closed in Celery workers 2026-02-11 19:29:23 +01:00
Sergey Mankovsky
70d17997ef Fix websocket disconnect errors 2026-02-11 19:29:16 +01:00
adc4c20bf4 feat: add local pyannote file diarization processor (#858)
* feat: add local pyannote file diarization processor

Enables file diarization without Modal by using pyannote.audio locally.
Downloads model bundle from S3 on first use, caches locally, patches
config to use local paths. Set DIARIZATION_BACKEND=pyannote to enable.

* fix: standalone setup enables pyannote diarization and public mode

Replace DIARIZATION_ENABLED=false with DIARIZATION_BACKEND=pyannote so
file uploads get speaker diarization out of the box. Add PUBLIC_MODE=true
so unauthenticated users can list/browse transcripts.

* fix: touch env files before first compose_cmd in standalone setup

docker-compose.yml references www/.env.local as env_file, but the
setup script only creates it in step 4. compose_cmd calls in step 3
(Garage) fail on a fresh clone when the file doesn't exist yet.

* feat: standalone uses self-hosted GPU service for transcription+diarization

Replace in-process pyannote approach with self-hosted gpu/self_hosted/ service.
Same HTTP API as Modal — just TRANSCRIPT_URL/DIARIZATION_URL point to local container.

- Add gpu/self_hosted/Dockerfile.cpu (GPU Dockerfile minus NVIDIA CUDA)
- Add S3 model bundle fallback in diarizer.py when HF_TOKEN not set
- Add gpu service to docker-compose.standalone.yml with compose env overrides
- Fix /browse empty in PUBLIC_MODE (search+list queries filtered out roomless transcripts)
- Remove audio_diarization_pyannote.py, file_diarization_pyannote.py and tests
- Remove pyannote-audio from server local deps

* fix: allow unauthenticated GPU requests when no API key configured

OAuth2PasswordBearer with auto_error=True rejects requests without
Authorization header before apikey_auth can check if auth is needed.

* fix: rename standalone gpu service to cpu to match Dockerfile.cpu usage

* docs: add programmatic testing section and fix gpu->cpu naming in setup script/docs

- Add "Testing programmatically" section to standalone docs with curl commands
  for creating transcript, uploading audio, polling status, checking result
- Fix setup-standalone.sh to reference `cpu` service (was still `gpu` after rename)
- Update all docs references from gpu to cpu service naming

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-11 12:41:32 -05:00
Sergey Mankovsky
ec4f356b4c fix: local env setup (#855)
* Ensure rate limit

* Increase nextjs compilation speed

* Fix daily no content handling

* Simplify daily webhook creation

* Fix webhook request validation
2026-02-11 16:59:21 +01:00
Igor Loskutov
39573626e9 fix: invalidate transcript query on STATUS websocket event
Without this, the processing page never redirects after completion
because the redirect logic watches the REST query data, not the
WebSocket status state.

Cherry-picked from feat-dag-progress (faec509a).
2026-02-10 20:27:34 -05:00
Igor Loskutov
d9aa6d6eb0 docs: add troubleshooting section + port conflict check in setup script
Port conflicts from stale next dev / other worktree processes silently
shadow Docker container port mappings, causing env vars to appear ignored.
2026-02-10 19:54:04 -05:00
Igor Loskutov
e1ea914675 docs: update standalone md — symlink handling, garage config template 2026-02-10 19:05:02 -05:00
Igor Loskutov
7200f3c65f fix: standalone setup — garage config, symlink handling, healthcheck
- garage.toml: fix rpc_secret field name (was secret_transmitter),
  move to top-level per Garage v1.1.0 spec, remove unused [s3_web]
- setup-standalone.sh: resolve symlinked .env files before writing,
  always ensure all standalone-critical vars via env_set,
  fix garage key create/info syntax (positional arg, not --name),
  avoid overwriting key secret with "(redacted)" on re-run,
  use compose_cmd in health check
- docker-compose.standalone.yml: fix garage healthcheck (no curl in
  image, use /garage stats instead)
2026-02-10 19:04:42 -05:00
Igor Loskutov
2f669dfd89 feat: add custom S3 endpoint support + Garage standalone storage
Add TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL setting to enable S3-compatible
backends (Garage, MinIO). When set, uses path-style addressing and
routes all requests to the custom endpoint. When unset, AWS behavior
is unchanged.

- AwsStorage: accept aws_endpoint_url, pass to all 6 session.client()
  calls, configure path-style addressing and base_url
- Fix 4 direct AwsStorage constructions in Hatchet workflows to pass
  endpoint_url (would have silently targeted wrong endpoint)
- Standalone: add Garage service to docker-compose.standalone.yml,
  setup script initializes layout/bucket/key and writes credentials
- Fix compose_cmd() bug: Mac path was missing standalone yml
- garage.toml template with runtime secret generation via openssl
2026-02-10 18:40:23 -05:00
Igor Loskutov
d25d77333c chore: rename to setup-standalone, remove redundant setup-local-llm.sh 2026-02-10 17:51:03 -05:00
Igor Loskutov
427254fe33 feat: add unified setup-local-dev.sh for standalone deployment
Single script takes fresh clone to working Reflector: Ollama/LLM setup,
env file generation (server/.env + www/.env.local), docker compose up,
health checks. No Hatchet in standalone — live pipeline is pure Celery.
2026-02-10 17:47:12 -05:00
Igor Loskutov
46750abad9 docs: add TASKS.md for standalone env defaults + setup script work 2026-02-10 17:12:01 -05:00
Igor Loskutov
f36b95b09f docs: resolve standalone storage step — skip S3 for live-only mode 2026-02-10 16:48:18 -05:00
Igor Loskutov
608a3805c5 chore: remove completed PRD, rename setup doc, drop response_format tests
- Remove docs/01_ollama.prd.md (implementation complete)
- Rename local-dev-setup.md -> standalone-local-setup.md
- Remove TestResponseFormat class from test_llm_retry.py
2026-02-10 16:14:33 -05:00
Igor Loskutov
d0af8ffdb7 fix: correct PRD goal (demo/eval, not dev replacement) and processor naming 2026-02-10 16:07:16 -05:00
Igor Loskutov
33a93db802 refactor: move Ollama services to docker-compose.standalone.yml
Ollama profiles (ollama-gpu, ollama-cpu) are only for Linux standalone
deployment. Mac devs never use them. Separate file keeps the main
compose clean and provides a natural home for future standalone services
(MinIO, etc.).

Linux: docker compose -f docker-compose.yml -f docker-compose.standalone.yml --profile ollama-gpu up -d
Mac: docker compose up -d (native Ollama, no standalone file needed)
2026-02-10 16:02:28 -05:00
Igor Loskutov
663345ece6 feat: local LLM via Ollama + structured output response_format
- Add setup script (scripts/setup-local-llm.sh) for one-command Ollama setup
  Mac: native Metal GPU, Linux: containerized via docker-compose profiles
- Add ollama-gpu and ollama-cpu docker-compose profiles for Linux
- Add extra_hosts to server/hatchet-worker-llm for host.docker.internal
- Pass response_format JSON schema in StructuredOutputWorkflow.extract()
  enabling grammar-based constrained decoding on Ollama/llama.cpp/vLLM/OpenAI
- Update .env.example with Ollama as default LLM option
- Add Ollama PRD and local dev setup docs
2026-02-10 15:55:21 -05:00
14 changed files with 54 additions and 295 deletions

View File

@@ -63,16 +63,14 @@ services:
server: server:
environment: environment:
TRANSCRIPT_BACKEND: modal TRANSCRIPT_BACKEND: modal
TRANSCRIPT_URL: http://localhost:8100 TRANSCRIPT_URL: http://cpu:8000
TRANSCRIPT_MODAL_API_KEY: local
DIARIZATION_BACKEND: modal DIARIZATION_BACKEND: modal
DIARIZATION_URL: http://localhost:8100 DIARIZATION_URL: http://cpu:8000
worker: worker:
environment: environment:
TRANSCRIPT_BACKEND: modal TRANSCRIPT_BACKEND: modal
TRANSCRIPT_URL: http://cpu:8000 TRANSCRIPT_URL: http://cpu:8000
TRANSCRIPT_MODAL_API_KEY: local
DIARIZATION_BACKEND: modal DIARIZATION_BACKEND: modal
DIARIZATION_URL: http://cpu:8000 DIARIZATION_URL: http://cpu:8000
@@ -80,8 +78,6 @@ services:
build: build:
context: ./gpu/self_hosted context: ./gpu/self_hosted
dockerfile: Dockerfile.cpu dockerfile: Dockerfile.cpu
ports:
- "8100:8000"
volumes: volumes:
- gpu_cache:/root/.cache - gpu_cache:/root/.cache
restart: unless-stopped restart: unless-stopped

View File

@@ -2,7 +2,8 @@ services:
server: server:
build: build:
context: server context: server
network_mode: host ports:
- 1250:1250
volumes: volumes:
- ./server/:/app/ - ./server/:/app/
- /app/.venv - /app/.venv
@@ -10,12 +11,8 @@ services:
- ./server/.env - ./server/.env
environment: environment:
ENTRYPOINT: server ENTRYPOINT: server
DATABASE_URL: postgresql+asyncpg://reflector:reflector@localhost:5432/reflector extra_hosts:
REDIS_HOST: localhost - "host.docker.internal:host-gateway"
CELERY_BROKER_URL: redis://localhost:6379/1
CELERY_RESULT_BACKEND: redis://localhost:6379/1
HATCHET_CLIENT_SERVER_URL: http://localhost:8889
HATCHET_CLIENT_HOST_PORT: localhost:7078
worker: worker:
build: build:
@@ -27,11 +24,6 @@ services:
- ./server/.env - ./server/.env
environment: environment:
ENTRYPOINT: worker ENTRYPOINT: worker
HATCHET_CLIENT_SERVER_URL: http://hatchet:8888
HATCHET_CLIENT_HOST_PORT: hatchet:7077
depends_on:
redis:
condition: service_started
beat: beat:
build: build:
@@ -43,9 +35,6 @@ services:
- ./server/.env - ./server/.env
environment: environment:
ENTRYPOINT: beat ENTRYPOINT: beat
depends_on:
redis:
condition: service_started
hatchet-worker-cpu: hatchet-worker-cpu:
build: build:
@@ -57,8 +46,6 @@ services:
- ./server/.env - ./server/.env
environment: environment:
ENTRYPOINT: hatchet-worker-cpu ENTRYPOINT: hatchet-worker-cpu
HATCHET_CLIENT_SERVER_URL: http://hatchet:8888
HATCHET_CLIENT_HOST_PORT: hatchet:7077
depends_on: depends_on:
hatchet: hatchet:
condition: service_healthy condition: service_healthy
@@ -72,8 +59,8 @@ services:
- ./server/.env - ./server/.env
environment: environment:
ENTRYPOINT: hatchet-worker-llm ENTRYPOINT: hatchet-worker-llm
HATCHET_CLIENT_SERVER_URL: http://hatchet:8888 extra_hosts:
HATCHET_CLIENT_HOST_PORT: hatchet:7077 - "host.docker.internal:host-gateway"
depends_on: depends_on:
hatchet: hatchet:
condition: service_healthy condition: service_healthy
@@ -97,11 +84,6 @@ services:
- ./www/.env.local - ./www/.env.local
environment: environment:
- NODE_ENV=development - NODE_ENV=development
- SERVER_API_URL=http://host.docker.internal:1250
extra_hosts:
- "host.docker.internal:host-gateway"
depends_on:
- server
postgres: postgres:
image: postgres:17 image: postgres:17
@@ -117,14 +99,13 @@ services:
- ./server/docker/init-hatchet-db.sql:/docker-entrypoint-initdb.d/init-hatchet-db.sql:ro - ./server/docker/init-hatchet-db.sql:/docker-entrypoint-initdb.d/init-hatchet-db.sql:ro
healthcheck: healthcheck:
test: ["CMD-SHELL", "pg_isready -d reflector -U reflector"] test: ["CMD-SHELL", "pg_isready -d reflector -U reflector"]
interval: 5s interval: 10s
timeout: 5s timeout: 10s
retries: 10 retries: 5
start_period: 15s start_period: 10s
hatchet: hatchet:
image: ghcr.io/hatchet-dev/hatchet/hatchet-lite:latest image: ghcr.io/hatchet-dev/hatchet/hatchet-lite:latest
restart: on-failure
ports: ports:
- "8889:8888" - "8889:8888"
- "7078:7077" - "7078:7077"
@@ -132,7 +113,7 @@ services:
postgres: postgres:
condition: service_healthy condition: service_healthy
environment: environment:
DATABASE_URL: "postgresql://reflector:reflector@postgres:5432/hatchet?sslmode=disable&connect_timeout=30" DATABASE_URL: "postgresql://reflector:reflector@postgres:5432/hatchet?sslmode=disable"
SERVER_AUTH_COOKIE_DOMAIN: localhost SERVER_AUTH_COOKIE_DOMAIN: localhost
SERVER_AUTH_COOKIE_INSECURE: "t" SERVER_AUTH_COOKIE_INSECURE: "t"
SERVER_GRPC_BIND_ADDRESS: "0.0.0.0" SERVER_GRPC_BIND_ADDRESS: "0.0.0.0"
@@ -154,3 +135,7 @@ services:
volumes: volumes:
next_cache: next_cache:
networks:
default:
attachable: true

View File

@@ -168,8 +168,6 @@ If the frontend or backend behaves unexpectedly (e.g., env vars seem ignored, ch
lsof -i :3000 # frontend lsof -i :3000 # frontend
lsof -i :1250 # backend lsof -i :1250 # backend
lsof -i :5432 # postgres lsof -i :5432 # postgres
lsof -i :3900 # Garage S3 API
lsof -i :6379 # Redis
# Kill stale processes on a port # Kill stale processes on a port
lsof -ti :3000 | xargs kill lsof -ti :3000 | xargs kill
@@ -177,13 +175,9 @@ lsof -ti :3000 | xargs kill
Common causes: Common causes:
- A stale `next dev` or `pnpm dev` process from another terminal/worktree - A stale `next dev` or `pnpm dev` process from another terminal/worktree
- Another Docker Compose project (different worktree) with containers on the same ports — the setup script only manages its own project; containers from other projects must be stopped manually (`docker ps` to find them, `docker stop` to kill them) - Another Docker Compose project (different worktree) with containers on the same ports
The setup script checks ports 3000, 1250, 5432, 6379, 3900, 3903 for conflicts before starting services. It ignores OrbStack/Docker Desktop port forwarding processes (which always bind these ports but are not real conflicts). The setup script checks for port conflicts before starting services.
### OrbStack false port-conflict warnings (Mac)
If you use OrbStack as your Docker runtime, `lsof` will show OrbStack binding ports like 3000, 1250, etc. even when no containers are running. This is OrbStack's port forwarding mechanism — not a real conflict. The setup script filters these out automatically.
### Re-enabling authentication ### Re-enabling authentication

View File

@@ -113,7 +113,7 @@ step_llm() {
echo "" echo ""
# Pull model if not already present # Pull model if not already present
if ollama list 2>/dev/null | awk '{print $1}' | grep -qx "$MODEL"; then if ollama list 2>/dev/null | grep -q "$MODEL"; then
ok "Model $MODEL already pulled" ok "Model $MODEL already pulled"
else else
info "Pulling model $MODEL (this may take a while)..." info "Pulling model $MODEL (this may take a while)..."
@@ -143,7 +143,7 @@ step_llm() {
echo "" echo ""
# Pull model inside container # Pull model inside container
if compose_cmd exec "$OLLAMA_SVC" ollama list 2>/dev/null | awk '{print $1}' | grep -qx "$MODEL"; then if compose_cmd exec "$OLLAMA_SVC" ollama list 2>/dev/null | grep -q "$MODEL"; then
ok "Model $MODEL already pulled" ok "Model $MODEL already pulled"
else else
info "Pulling model $MODEL inside container (this may take a while)..." info "Pulling model $MODEL inside container (this may take a while)..."
@@ -290,23 +290,16 @@ ENVEOF
step_services() { step_services() {
info "Step 5: Starting Docker services" info "Step 5: Starting Docker services"
# Check for port conflicts — stale processes silently shadow Docker port mappings. # Check for port conflicts — stale processes silently shadow Docker port mappings
# OrbStack/Docker Desktop bind ports for forwarding; ignore those PIDs.
local ports_ok=true local ports_ok=true
for port in 3000 1250 5432 6379 3900 3903; do for port in 3000 1250; do
local pids local pid
pids=$(lsof -ti :"$port" 2>/dev/null || true) pid=$(lsof -ti :"$port" 2>/dev/null || true)
for pid in $pids; do if [[ -n "$pid" ]]; then
local pname warn "Port $port already in use by PID $pid"
pname=$(ps -p "$pid" -o comm= 2>/dev/null || true)
# OrbStack and Docker Desktop own port forwarding — not real conflicts
if [[ "$pname" == *"OrbStack"* ]] || [[ "$pname" == *"com.docker"* ]] || [[ "$pname" == *"vpnkit"* ]]; then
continue
fi
warn "Port $port already in use by PID $pid ($pname)"
warn "Kill it with: lsof -ti :$port | xargs kill" warn "Kill it with: lsof -ti :$port | xargs kill"
ports_ok=false ports_ok=false
done fi
done done
if [[ "$ports_ok" == "false" ]]; then if [[ "$ports_ok" == "false" ]]; then
warn "Port conflicts detected — Docker containers may not be reachable" warn "Port conflicts detected — Docker containers may not be reachable"

View File

@@ -1,5 +1,11 @@
from typing import Annotated
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel from pydantic import BaseModel
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
class UserInfo(BaseModel): class UserInfo(BaseModel):
sub: str sub: str
@@ -9,13 +15,13 @@ class AccessTokenInfo(BaseModel):
pass pass
def authenticated(): def authenticated(token: Annotated[str, Depends(oauth2_scheme)]):
return None return None
def current_user(): def current_user(token: Annotated[str, Depends(oauth2_scheme)]):
return None return None
def current_user_optional(): def current_user_optional(token: Annotated[str, Depends(oauth2_scheme)]):
return None return None

View File

@@ -720,6 +720,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
chunk_text=chunk["text"], chunk_text=chunk["text"],
timestamp=chunk["timestamp"], timestamp=chunk["timestamp"],
duration=chunk["duration"], duration=chunk["duration"],
words=chunk["words"],
) )
) )
for chunk in chunks for chunk in chunks
@@ -731,41 +732,31 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
TopicChunkResult(**result[TaskName.DETECT_CHUNK_TOPIC]) for result in results TopicChunkResult(**result[TaskName.DETECT_CHUNK_TOPIC]) for result in results
] ]
# Build index-to-words map from local chunks (words not in child workflow results)
chunks_by_index = {chunk["index"]: chunk["words"] for chunk in chunks}
async with fresh_db_connection(): async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id) transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript: if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found") raise ValueError(f"Transcript {input.transcript_id} not found")
# Clear topics for idempotency on retry (each topic gets a fresh UUID,
# so upsert_topic would append duplicates without this)
await transcripts_controller.update(transcript, {"topics": []})
for chunk in topic_chunks: for chunk in topic_chunks:
chunk_words = chunks_by_index[chunk.chunk_index]
topic = TranscriptTopic( topic = TranscriptTopic(
title=chunk.title, title=chunk.title,
summary=chunk.summary, summary=chunk.summary,
timestamp=chunk.timestamp, timestamp=chunk.timestamp,
transcript=" ".join(w.text for w in chunk_words), transcript=" ".join(w.text for w in chunk.words),
words=chunk_words, words=chunk.words,
) )
await transcripts_controller.upsert_topic(transcript, topic) await transcripts_controller.upsert_topic(transcript, topic)
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, transcript, "TOPIC", topic, logger=logger input.transcript_id, transcript, "TOPIC", topic, logger=logger
) )
# Words omitted from TopicsResult — already persisted to DB above.
# Downstream tasks that need words refetch from DB.
topics_list = [ topics_list = [
TitleSummary( TitleSummary(
title=chunk.title, title=chunk.title,
summary=chunk.summary, summary=chunk.summary,
timestamp=chunk.timestamp, timestamp=chunk.timestamp,
duration=chunk.duration, duration=chunk.duration,
transcript=TranscriptType(words=[]), transcript=TranscriptType(words=chunk.words),
) )
for chunk in topic_chunks for chunk in topic_chunks
] ]
@@ -851,8 +842,9 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
ctx.log(f"extract_subjects: starting for transcript_id={input.transcript_id}") ctx.log(f"extract_subjects: starting for transcript_id={input.transcript_id}")
topics_result = ctx.task_output(detect_topics) topics_result = ctx.task_output(detect_topics)
topics = topics_result.topics
if not topics_result.topics: if not topics:
ctx.log("extract_subjects: no topics, returning empty subjects") ctx.log("extract_subjects: no topics, returning empty subjects")
return SubjectsResult( return SubjectsResult(
subjects=[], subjects=[],
@@ -865,13 +857,11 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
# sharing DB connections and LLM HTTP pools across forks # sharing DB connections and LLM HTTP pools across forks
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.llm import LLM # noqa: PLC0415 from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.types import words_to_segments # noqa: PLC0415
async with fresh_db_connection(): async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id) transcript = await transcripts_controller.get_by_id(input.transcript_id)
# Build transcript text from DB topics (words omitted from task output # Build transcript text from topics (same logic as TranscriptFinalSummaryProcessor)
# to reduce Hatchet payload size — refetch from DB where they were persisted)
speakermap = {} speakermap = {}
if transcript and transcript.participants: if transcript and transcript.participants:
speakermap = { speakermap = {
@@ -881,8 +871,8 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
} }
text_lines = [] text_lines = []
for db_topic in transcript.topics: for topic in topics:
for segment in words_to_segments(db_topic.words): for segment in topic.transcript.as_segments():
name = speakermap.get(segment.speaker, f"Speaker {segment.speaker}") name = speakermap.get(segment.speaker, f"Speaker {segment.speaker}")
text_lines.append(f"{name}: {segment.text}") text_lines.append(f"{name}: {segment.text}")

View File

@@ -95,6 +95,7 @@ class TopicChunkResult(BaseModel):
summary: str summary: str
timestamp: float timestamp: float
duration: float duration: float
words: list[Word]
class TopicsResult(BaseModel): class TopicsResult(BaseModel):

View File

@@ -20,6 +20,7 @@ from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
from reflector.hatchet.workflows.models import TopicChunkResult from reflector.hatchet.workflows.models import TopicChunkResult
from reflector.logger import logger from reflector.logger import logger
from reflector.processors.prompts import TOPIC_PROMPT from reflector.processors.prompts import TOPIC_PROMPT
from reflector.processors.types import Word
class TopicChunkInput(BaseModel): class TopicChunkInput(BaseModel):
@@ -29,6 +30,7 @@ class TopicChunkInput(BaseModel):
chunk_text: str chunk_text: str
timestamp: float timestamp: float
duration: float duration: float
words: list[Word]
hatchet = HatchetClientManager.get_client() hatchet = HatchetClientManager.get_client()
@@ -97,4 +99,5 @@ async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunk
summary=response.summary, summary=response.summary,
timestamp=input.timestamp, timestamp=input.timestamp,
duration=input.duration, duration=input.duration,
words=input.words,
) )

View File

@@ -48,15 +48,7 @@ class RedisPubSubManager:
if not self.redis_connection: if not self.redis_connection:
await self.connect() await self.connect()
message = json.dumps(message) message = json.dumps(message)
try: await self.redis_connection.publish(room_id, message)
await self.redis_connection.publish(room_id, message)
except RuntimeError:
# Celery workers run each task in a new event loop (asyncio.run),
# which closes the previous loop. Cached Redis connection is dead.
# Reconnect on the current loop and retry.
self.redis_connection = None
await self.connect()
await self.redis_connection.publish(room_id, message)
async def subscribe(self, room_id: str) -> redis.Redis: async def subscribe(self, room_id: str) -> redis.Redis:
await self.pubsub.subscribe(room_id) await self.pubsub.subscribe(room_id)

View File

@@ -291,12 +291,7 @@ async def test_validation_idle_transcript_with_recording_allowed():
recording_id="test-recording-id", recording_id="test-recording-id",
) )
with patch( result = await validate_transcript_for_processing(mock_transcript)
"reflector.services.transcript_process.task_is_scheduled_or_active"
) as mock_celery_check:
mock_celery_check.return_value = False
result = await validate_transcript_for_processing(mock_transcript)
assert isinstance(result, ValidationOk) assert isinstance(result, ValidationOk)
assert result.recording_id == "test-recording-id" assert result.recording_id == "test-recording-id"

View File

@@ -1,185 +0,0 @@
"""
Tests for Hatchet payload thinning optimizations.
Verifies that:
1. TopicChunkInput no longer carries words
2. TopicChunkResult no longer carries words
3. words_to_segments() matches Transcript.as_segments(is_multitrack=False) — behavioral equivalence
for the extract_subjects refactoring
4. TopicsResult can be constructed with empty transcript words
"""
from reflector.hatchet.workflows.models import TopicChunkResult
from reflector.hatchet.workflows.topic_chunk_processing import TopicChunkInput
from reflector.processors.types import Word
def _make_words(speaker: int = 0, start: float = 0.0) -> list[Word]:
return [
Word(text="Hello", start=start, end=start + 0.5, speaker=speaker),
Word(text=" world.", start=start + 0.5, end=start + 1.0, speaker=speaker),
]
class TestTopicChunkInputNoWords:
"""TopicChunkInput must not have a words field."""
def test_no_words_field(self):
assert "words" not in TopicChunkInput.model_fields
def test_construction_without_words(self):
inp = TopicChunkInput(
chunk_index=0, chunk_text="Hello world.", timestamp=0.0, duration=1.0
)
assert inp.chunk_index == 0
assert inp.chunk_text == "Hello world."
def test_rejects_words_kwarg(self):
"""Passing words= should raise a validation error (field doesn't exist)."""
import pydantic
try:
TopicChunkInput(
chunk_index=0,
chunk_text="text",
timestamp=0.0,
duration=1.0,
words=_make_words(),
)
# If pydantic is configured to ignore extra, this won't raise.
# Verify the field is still absent from the model.
assert "words" not in TopicChunkInput.model_fields
except pydantic.ValidationError:
pass # Expected
class TestTopicChunkResultNoWords:
"""TopicChunkResult must not have a words field."""
def test_no_words_field(self):
assert "words" not in TopicChunkResult.model_fields
def test_construction_without_words(self):
result = TopicChunkResult(
chunk_index=0,
title="Test",
summary="Summary",
timestamp=0.0,
duration=1.0,
)
assert result.title == "Test"
assert result.chunk_index == 0
def test_serialization_roundtrip(self):
"""Serialized TopicChunkResult has no words key."""
result = TopicChunkResult(
chunk_index=0,
title="Test",
summary="Summary",
timestamp=0.0,
duration=1.0,
)
data = result.model_dump()
assert "words" not in data
reconstructed = TopicChunkResult(**data)
assert reconstructed == result
class TestWordsToSegmentsBehavioralEquivalence:
"""words_to_segments() must produce same output as Transcript.as_segments(is_multitrack=False).
This ensures the extract_subjects refactoring (from task output topic.transcript.as_segments()
to words_to_segments(db_topic.words)) preserves identical behavior.
"""
def test_single_speaker(self):
from reflector.processors.types import Transcript as TranscriptType
from reflector.processors.types import words_to_segments
words = _make_words(speaker=0)
direct = words_to_segments(words)
via_transcript = TranscriptType(words=words).as_segments(is_multitrack=False)
assert len(direct) == len(via_transcript)
for d, v in zip(direct, via_transcript):
assert d.text == v.text
assert d.speaker == v.speaker
assert d.start == v.start
assert d.end == v.end
def test_multiple_speakers(self):
from reflector.processors.types import Transcript as TranscriptType
from reflector.processors.types import words_to_segments
words = [
Word(text="Hello", start=0.0, end=0.5, speaker=0),
Word(text=" world.", start=0.5, end=1.0, speaker=0),
Word(text=" How", start=1.0, end=1.5, speaker=1),
Word(text=" are", start=1.5, end=2.0, speaker=1),
Word(text=" you?", start=2.0, end=2.5, speaker=1),
]
direct = words_to_segments(words)
via_transcript = TranscriptType(words=words).as_segments(is_multitrack=False)
assert len(direct) == len(via_transcript)
for d, v in zip(direct, via_transcript):
assert d.text == v.text
assert d.speaker == v.speaker
def test_empty_words(self):
from reflector.processors.types import Transcript as TranscriptType
from reflector.processors.types import words_to_segments
assert words_to_segments([]) == []
assert TranscriptType(words=[]).as_segments(is_multitrack=False) == []
class TestTopicsResultEmptyWords:
"""TopicsResult can carry topics with empty transcript words."""
def test_construction_with_empty_words(self):
from reflector.hatchet.workflows.models import TopicsResult
from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType
topics = [
TitleSummary(
title="Topic A",
summary="Summary A",
timestamp=0.0,
duration=5.0,
transcript=TranscriptType(words=[]),
),
TitleSummary(
title="Topic B",
summary="Summary B",
timestamp=5.0,
duration=5.0,
transcript=TranscriptType(words=[]),
),
]
result = TopicsResult(topics=topics)
assert len(result.topics) == 2
for t in result.topics:
assert t.transcript.words == []
def test_serialization_roundtrip(self):
from reflector.hatchet.workflows.models import TopicsResult
from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType
topics = [
TitleSummary(
title="Topic",
summary="Summary",
timestamp=0.0,
duration=1.0,
transcript=TranscriptType(words=[]),
)
]
result = TopicsResult(topics=topics)
data = result.model_dump()
reconstructed = TopicsResult(**data)
assert len(reconstructed.topics) == 1
assert reconstructed.topics[0].transcript.words == []

View File

@@ -11,7 +11,6 @@ import {
import { useRouter } from "next/navigation"; import { useRouter } from "next/navigation";
import { useTranscriptGet } from "../../../../lib/apiHooks"; import { useTranscriptGet } from "../../../../lib/apiHooks";
import { parseNonEmptyString } from "../../../../lib/utils"; import { parseNonEmptyString } from "../../../../lib/utils";
import { useWebSockets } from "../../useWebSockets";
type TranscriptProcessing = { type TranscriptProcessing = {
params: Promise<{ params: Promise<{
@@ -25,7 +24,6 @@ export default function TranscriptProcessing(details: TranscriptProcessing) {
const router = useRouter(); const router = useRouter();
const transcript = useTranscriptGet(transcriptId); const transcript = useTranscriptGet(transcriptId);
useWebSockets(transcriptId);
useEffect(() => { useEffect(() => {
const status = transcript.data?.status; const status = transcript.data?.status;

View File

@@ -23,16 +23,7 @@ const useWebRTC = (
let p: Peer; let p: Peer;
try { try {
p = new Peer({ p = new Peer({ initiator: true, stream: stream });
initiator: true,
stream: stream,
// Disable trickle ICE: single SDP exchange (offer + answer) with all candidates.
// Required for HTTP-based signaling; trickle needs WebSocket for candidate exchange.
trickle: false,
config: {
iceServers: [{ urls: "stun:stun.l.google.com:19302" }],
},
});
} catch (error) { } catch (error) {
setError(error as Error, "Error creating WebRTC"); setError(error as Error, "Error creating WebRTC");
return; return;

View File

@@ -3,7 +3,7 @@
"version": "0.1.0", "version": "0.1.0",
"private": true, "private": true,
"scripts": { "scripts": {
"dev": "next dev", "dev": "next dev --turbopack",
"build": "next build", "build": "next build",
"build-production": "next build --experimental-build-mode compile", "build-production": "next build --experimental-build-mode compile",
"start": "next start", "start": "next start",