Files
reflector/server
Igor Monadical 972a52d22f fix: live flow real-time updates during processing (#861)
* fix: live flow real-time updates during processing

Three gaps caused transcript pages to require manual refresh after
live recording/processing:

1. UserEventsProvider only invalidated list queries on TRANSCRIPT_STATUS,
   not individual transcript queries. Now parses data.id from the event
   and calls invalidateTranscript for the specific transcript.

2. useWebSockets had no reconnection logic — a dropped WS silently
   killed all real-time updates. Added exponential backoff reconnection
   (1s-30s, max 10 retries) with intentional close detection.

3. No polling fallback — WS was single point of failure. Added
   conditional refetchInterval to useTranscriptGet that polls every 5s
   when transcript status is processing/uploaded/recording.

* feat: type-safe WebSocket events via OpenAPI stub

Define Pydantic models with Literal discriminators for all WS events
(9 transcript-level, 5 user-level). Expose via stub GET endpoints so
pnpm openapi generates TS discriminated unions with exhaustive switch
narrowing on the frontend.

- New server/reflector/ws_events.py with TranscriptWsEvent and UserWsEvent
- Tighten backend emit signatures with TranscriptEventName literal
- Frontend uses generated types, removes Zod schema and manual casts
- Fix pre-existing bugs: waveform mapping, FINAL_LONG_SUMMARY field name
- STATUS value now typed as TranscriptStatus literal end-to-end
- TOPIC handler simplified to query invalidation only (avoids shape mismatch)

* fix: restore TOPIC WS handler with immediate state update

The setTopics call provides instant topic rendering during live
transcription. Query invalidation still follows for full data sync.

* fix: align TOPIC WS event data with GetTranscriptTopic shape

Convert TranscriptTopic → GetTranscriptTopic in pipeline before
emitting, so WS sends segments instead of words. Removes the
`as unknown as Topic` cast on the frontend.

* fix: use NonEmptyString and TranscriptStatus in user WS event models

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2026-02-12 14:49:57 -05:00
..
2025-12-22 12:09:20 -05:00
2026-01-23 12:33:06 -05:00
2025-02-03 16:11:01 +01:00
2025-08-20 20:56:45 -04:00
2025-07-16 18:10:11 -06:00
2023-08-29 10:58:27 +02:00
2025-12-22 12:09:20 -05:00
2026-01-20 12:27:16 -05:00
2025-09-17 16:43:20 -06:00

API Key Management

Finding Your User ID

# Get your OAuth sub (user ID) - requires authentication
curl -H "Authorization: Bearer <your_jwt>" http://localhost:1250/v1/me
# Returns: {"sub": "your-oauth-sub-here", "email": "...", ...}

Creating API Keys

curl -X POST http://localhost:1250/v1/user/api-keys \
  -H "Authorization: Bearer <your_jwt>" \
  -H "Content-Type: application/json" \
  -d '{"name": "My API Key"}'

Using API Keys

# Use X-API-Key header instead of Authorization
curl -H "X-API-Key: <your_api_key>" http://localhost:1250/v1/transcripts

AWS S3/SQS usage clarification

Whereby.com uploads recordings directly to our S3 bucket when meetings end.

SQS Queue (AWS_PROCESS_RECORDING_QUEUE_URL)

Filled by: AWS S3 Event Notifications

The S3 bucket is configured to send notifications to our SQS queue when new objects are created. This is standard AWS infrastructure - not in our codebase.

AWS S3 → SQS Event Configuration:

  • Event Type: s3:ObjectCreated:*
  • Filter: *.mp4 files
  • Destination: Our SQS queue

Our System's Role

Polls SQS every 60 seconds via /server/reflector/worker/process.py:24-62:

Every 60 seconds, check for new recordings

sqs = boto3.client("sqs", ...) response = sqs.receive_message(QueueUrl=queue_url, ...)

Requeue

uv run /app/requeue_uploaded_file.py TRANSCRIPT_ID

Hatchet Setup (Fresh DB)

After resetting the Hatchet database:

Option A: Automatic (CLI)

# Get default tenant ID and create token in one command
TENANT_ID=$(docker compose exec -T postgres psql -U reflector -d hatchet -t -c \
  "SELECT id FROM \"Tenant\" WHERE slug = 'default';" | tr -d ' \n') && \
TOKEN=$(docker compose exec -T hatchet /hatchet-admin token create \
  --config /config --tenant-id "$TENANT_ID" 2>/dev/null | tr -d '\n') && \
echo "HATCHET_CLIENT_TOKEN=$TOKEN"

Copy the output to server/.env.

Option B: Manual (UI)

  1. Create API token at http://localhost:8889 → Settings → API Tokens
  2. Update server/.env: HATCHET_CLIENT_TOKEN=<new-token>

Then restart workers

docker compose restart server hatchet-worker

Workflows register automatically when hatchet-worker starts.

Pipeline Management

Continue stuck pipeline from final summaries (identify_participants) step:

uv run python -c "from reflector.pipelines.main_live_pipeline import task_pipeline_final_summaries; result = task_pipeline_final_summaries.delay(transcript_id='TRANSCRIPT_ID'); print(f'Task queued: {result.id}')"

Run full post-processing pipeline (continues to completion):

uv run python -c "from reflector.pipelines.main_live_pipeline import pipeline_post; pipeline_post(transcript_id='TRANSCRIPT_ID')"

.