mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-04-22 21:25:18 +00:00
Compare commits
21 Commits
feat/paylo
...
feat/file-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f6201dd378 | ||
|
|
9f62959069 | ||
|
|
0353c23a94 | ||
|
|
7372f80530 | ||
|
|
208361c8cc | ||
|
|
70d17997ef | ||
| adc4c20bf4 | |||
|
|
ec4f356b4c | ||
|
|
39573626e9 | ||
|
|
d9aa6d6eb0 | ||
|
|
e1ea914675 | ||
|
|
7200f3c65f | ||
|
|
2f669dfd89 | ||
|
|
d25d77333c | ||
|
|
427254fe33 | ||
|
|
46750abad9 | ||
|
|
f36b95b09f | ||
|
|
608a3805c5 | ||
|
|
d0af8ffdb7 | ||
|
|
33a93db802 | ||
|
|
663345ece6 |
@@ -63,16 +63,14 @@ services:
|
|||||||
server:
|
server:
|
||||||
environment:
|
environment:
|
||||||
TRANSCRIPT_BACKEND: modal
|
TRANSCRIPT_BACKEND: modal
|
||||||
TRANSCRIPT_URL: http://localhost:8100
|
TRANSCRIPT_URL: http://cpu:8000
|
||||||
TRANSCRIPT_MODAL_API_KEY: local
|
|
||||||
DIARIZATION_BACKEND: modal
|
DIARIZATION_BACKEND: modal
|
||||||
DIARIZATION_URL: http://localhost:8100
|
DIARIZATION_URL: http://cpu:8000
|
||||||
|
|
||||||
worker:
|
worker:
|
||||||
environment:
|
environment:
|
||||||
TRANSCRIPT_BACKEND: modal
|
TRANSCRIPT_BACKEND: modal
|
||||||
TRANSCRIPT_URL: http://cpu:8000
|
TRANSCRIPT_URL: http://cpu:8000
|
||||||
TRANSCRIPT_MODAL_API_KEY: local
|
|
||||||
DIARIZATION_BACKEND: modal
|
DIARIZATION_BACKEND: modal
|
||||||
DIARIZATION_URL: http://cpu:8000
|
DIARIZATION_URL: http://cpu:8000
|
||||||
|
|
||||||
@@ -80,8 +78,6 @@ services:
|
|||||||
build:
|
build:
|
||||||
context: ./gpu/self_hosted
|
context: ./gpu/self_hosted
|
||||||
dockerfile: Dockerfile.cpu
|
dockerfile: Dockerfile.cpu
|
||||||
ports:
|
|
||||||
- "8100:8000"
|
|
||||||
volumes:
|
volumes:
|
||||||
- gpu_cache:/root/.cache
|
- gpu_cache:/root/.cache
|
||||||
restart: unless-stopped
|
restart: unless-stopped
|
||||||
|
|||||||
@@ -2,7 +2,8 @@ services:
|
|||||||
server:
|
server:
|
||||||
build:
|
build:
|
||||||
context: server
|
context: server
|
||||||
network_mode: host
|
ports:
|
||||||
|
- 1250:1250
|
||||||
volumes:
|
volumes:
|
||||||
- ./server/:/app/
|
- ./server/:/app/
|
||||||
- /app/.venv
|
- /app/.venv
|
||||||
@@ -10,12 +11,8 @@ services:
|
|||||||
- ./server/.env
|
- ./server/.env
|
||||||
environment:
|
environment:
|
||||||
ENTRYPOINT: server
|
ENTRYPOINT: server
|
||||||
DATABASE_URL: postgresql+asyncpg://reflector:reflector@localhost:5432/reflector
|
extra_hosts:
|
||||||
REDIS_HOST: localhost
|
- "host.docker.internal:host-gateway"
|
||||||
CELERY_BROKER_URL: redis://localhost:6379/1
|
|
||||||
CELERY_RESULT_BACKEND: redis://localhost:6379/1
|
|
||||||
HATCHET_CLIENT_SERVER_URL: http://localhost:8889
|
|
||||||
HATCHET_CLIENT_HOST_PORT: localhost:7078
|
|
||||||
|
|
||||||
worker:
|
worker:
|
||||||
build:
|
build:
|
||||||
@@ -27,11 +24,6 @@ services:
|
|||||||
- ./server/.env
|
- ./server/.env
|
||||||
environment:
|
environment:
|
||||||
ENTRYPOINT: worker
|
ENTRYPOINT: worker
|
||||||
HATCHET_CLIENT_SERVER_URL: http://hatchet:8888
|
|
||||||
HATCHET_CLIENT_HOST_PORT: hatchet:7077
|
|
||||||
depends_on:
|
|
||||||
redis:
|
|
||||||
condition: service_started
|
|
||||||
|
|
||||||
beat:
|
beat:
|
||||||
build:
|
build:
|
||||||
@@ -43,9 +35,6 @@ services:
|
|||||||
- ./server/.env
|
- ./server/.env
|
||||||
environment:
|
environment:
|
||||||
ENTRYPOINT: beat
|
ENTRYPOINT: beat
|
||||||
depends_on:
|
|
||||||
redis:
|
|
||||||
condition: service_started
|
|
||||||
|
|
||||||
hatchet-worker-cpu:
|
hatchet-worker-cpu:
|
||||||
build:
|
build:
|
||||||
@@ -57,8 +46,6 @@ services:
|
|||||||
- ./server/.env
|
- ./server/.env
|
||||||
environment:
|
environment:
|
||||||
ENTRYPOINT: hatchet-worker-cpu
|
ENTRYPOINT: hatchet-worker-cpu
|
||||||
HATCHET_CLIENT_SERVER_URL: http://hatchet:8888
|
|
||||||
HATCHET_CLIENT_HOST_PORT: hatchet:7077
|
|
||||||
depends_on:
|
depends_on:
|
||||||
hatchet:
|
hatchet:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
@@ -72,8 +59,8 @@ services:
|
|||||||
- ./server/.env
|
- ./server/.env
|
||||||
environment:
|
environment:
|
||||||
ENTRYPOINT: hatchet-worker-llm
|
ENTRYPOINT: hatchet-worker-llm
|
||||||
HATCHET_CLIENT_SERVER_URL: http://hatchet:8888
|
extra_hosts:
|
||||||
HATCHET_CLIENT_HOST_PORT: hatchet:7077
|
- "host.docker.internal:host-gateway"
|
||||||
depends_on:
|
depends_on:
|
||||||
hatchet:
|
hatchet:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
@@ -97,11 +84,6 @@ services:
|
|||||||
- ./www/.env.local
|
- ./www/.env.local
|
||||||
environment:
|
environment:
|
||||||
- NODE_ENV=development
|
- NODE_ENV=development
|
||||||
- SERVER_API_URL=http://host.docker.internal:1250
|
|
||||||
extra_hosts:
|
|
||||||
- "host.docker.internal:host-gateway"
|
|
||||||
depends_on:
|
|
||||||
- server
|
|
||||||
|
|
||||||
postgres:
|
postgres:
|
||||||
image: postgres:17
|
image: postgres:17
|
||||||
@@ -117,14 +99,13 @@ services:
|
|||||||
- ./server/docker/init-hatchet-db.sql:/docker-entrypoint-initdb.d/init-hatchet-db.sql:ro
|
- ./server/docker/init-hatchet-db.sql:/docker-entrypoint-initdb.d/init-hatchet-db.sql:ro
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD-SHELL", "pg_isready -d reflector -U reflector"]
|
test: ["CMD-SHELL", "pg_isready -d reflector -U reflector"]
|
||||||
interval: 5s
|
interval: 10s
|
||||||
timeout: 5s
|
timeout: 10s
|
||||||
retries: 10
|
retries: 5
|
||||||
start_period: 15s
|
start_period: 10s
|
||||||
|
|
||||||
hatchet:
|
hatchet:
|
||||||
image: ghcr.io/hatchet-dev/hatchet/hatchet-lite:latest
|
image: ghcr.io/hatchet-dev/hatchet/hatchet-lite:latest
|
||||||
restart: on-failure
|
|
||||||
ports:
|
ports:
|
||||||
- "8889:8888"
|
- "8889:8888"
|
||||||
- "7078:7077"
|
- "7078:7077"
|
||||||
@@ -132,7 +113,7 @@ services:
|
|||||||
postgres:
|
postgres:
|
||||||
condition: service_healthy
|
condition: service_healthy
|
||||||
environment:
|
environment:
|
||||||
DATABASE_URL: "postgresql://reflector:reflector@postgres:5432/hatchet?sslmode=disable&connect_timeout=30"
|
DATABASE_URL: "postgresql://reflector:reflector@postgres:5432/hatchet?sslmode=disable"
|
||||||
SERVER_AUTH_COOKIE_DOMAIN: localhost
|
SERVER_AUTH_COOKIE_DOMAIN: localhost
|
||||||
SERVER_AUTH_COOKIE_INSECURE: "t"
|
SERVER_AUTH_COOKIE_INSECURE: "t"
|
||||||
SERVER_GRPC_BIND_ADDRESS: "0.0.0.0"
|
SERVER_GRPC_BIND_ADDRESS: "0.0.0.0"
|
||||||
@@ -154,3 +135,7 @@ services:
|
|||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
next_cache:
|
next_cache:
|
||||||
|
|
||||||
|
networks:
|
||||||
|
default:
|
||||||
|
attachable: true
|
||||||
|
|||||||
@@ -168,8 +168,6 @@ If the frontend or backend behaves unexpectedly (e.g., env vars seem ignored, ch
|
|||||||
lsof -i :3000 # frontend
|
lsof -i :3000 # frontend
|
||||||
lsof -i :1250 # backend
|
lsof -i :1250 # backend
|
||||||
lsof -i :5432 # postgres
|
lsof -i :5432 # postgres
|
||||||
lsof -i :3900 # Garage S3 API
|
|
||||||
lsof -i :6379 # Redis
|
|
||||||
|
|
||||||
# Kill stale processes on a port
|
# Kill stale processes on a port
|
||||||
lsof -ti :3000 | xargs kill
|
lsof -ti :3000 | xargs kill
|
||||||
@@ -177,13 +175,9 @@ lsof -ti :3000 | xargs kill
|
|||||||
|
|
||||||
Common causes:
|
Common causes:
|
||||||
- A stale `next dev` or `pnpm dev` process from another terminal/worktree
|
- A stale `next dev` or `pnpm dev` process from another terminal/worktree
|
||||||
- Another Docker Compose project (different worktree) with containers on the same ports — the setup script only manages its own project; containers from other projects must be stopped manually (`docker ps` to find them, `docker stop` to kill them)
|
- Another Docker Compose project (different worktree) with containers on the same ports
|
||||||
|
|
||||||
The setup script checks ports 3000, 1250, 5432, 6379, 3900, 3903 for conflicts before starting services. It ignores OrbStack/Docker Desktop port forwarding processes (which always bind these ports but are not real conflicts).
|
The setup script checks for port conflicts before starting services.
|
||||||
|
|
||||||
### OrbStack false port-conflict warnings (Mac)
|
|
||||||
|
|
||||||
If you use OrbStack as your Docker runtime, `lsof` will show OrbStack binding ports like 3000, 1250, etc. even when no containers are running. This is OrbStack's port forwarding mechanism — not a real conflict. The setup script filters these out automatically.
|
|
||||||
|
|
||||||
### Re-enabling authentication
|
### Re-enabling authentication
|
||||||
|
|
||||||
|
|||||||
@@ -113,7 +113,7 @@ step_llm() {
|
|||||||
echo ""
|
echo ""
|
||||||
|
|
||||||
# Pull model if not already present
|
# Pull model if not already present
|
||||||
if ollama list 2>/dev/null | awk '{print $1}' | grep -qx "$MODEL"; then
|
if ollama list 2>/dev/null | grep -q "$MODEL"; then
|
||||||
ok "Model $MODEL already pulled"
|
ok "Model $MODEL already pulled"
|
||||||
else
|
else
|
||||||
info "Pulling model $MODEL (this may take a while)..."
|
info "Pulling model $MODEL (this may take a while)..."
|
||||||
@@ -143,7 +143,7 @@ step_llm() {
|
|||||||
echo ""
|
echo ""
|
||||||
|
|
||||||
# Pull model inside container
|
# Pull model inside container
|
||||||
if compose_cmd exec "$OLLAMA_SVC" ollama list 2>/dev/null | awk '{print $1}' | grep -qx "$MODEL"; then
|
if compose_cmd exec "$OLLAMA_SVC" ollama list 2>/dev/null | grep -q "$MODEL"; then
|
||||||
ok "Model $MODEL already pulled"
|
ok "Model $MODEL already pulled"
|
||||||
else
|
else
|
||||||
info "Pulling model $MODEL inside container (this may take a while)..."
|
info "Pulling model $MODEL inside container (this may take a while)..."
|
||||||
@@ -290,23 +290,16 @@ ENVEOF
|
|||||||
step_services() {
|
step_services() {
|
||||||
info "Step 5: Starting Docker services"
|
info "Step 5: Starting Docker services"
|
||||||
|
|
||||||
# Check for port conflicts — stale processes silently shadow Docker port mappings.
|
# Check for port conflicts — stale processes silently shadow Docker port mappings
|
||||||
# OrbStack/Docker Desktop bind ports for forwarding; ignore those PIDs.
|
|
||||||
local ports_ok=true
|
local ports_ok=true
|
||||||
for port in 3000 1250 5432 6379 3900 3903; do
|
for port in 3000 1250; do
|
||||||
local pids
|
local pid
|
||||||
pids=$(lsof -ti :"$port" 2>/dev/null || true)
|
pid=$(lsof -ti :"$port" 2>/dev/null || true)
|
||||||
for pid in $pids; do
|
if [[ -n "$pid" ]]; then
|
||||||
local pname
|
warn "Port $port already in use by PID $pid"
|
||||||
pname=$(ps -p "$pid" -o comm= 2>/dev/null || true)
|
|
||||||
# OrbStack and Docker Desktop own port forwarding — not real conflicts
|
|
||||||
if [[ "$pname" == *"OrbStack"* ]] || [[ "$pname" == *"com.docker"* ]] || [[ "$pname" == *"vpnkit"* ]]; then
|
|
||||||
continue
|
|
||||||
fi
|
|
||||||
warn "Port $port already in use by PID $pid ($pname)"
|
|
||||||
warn "Kill it with: lsof -ti :$port | xargs kill"
|
warn "Kill it with: lsof -ti :$port | xargs kill"
|
||||||
ports_ok=false
|
ports_ok=false
|
||||||
done
|
fi
|
||||||
done
|
done
|
||||||
if [[ "$ports_ok" == "false" ]]; then
|
if [[ "$ports_ok" == "false" ]]; then
|
||||||
warn "Port conflicts detected — Docker containers may not be reachable"
|
warn "Port conflicts detected — Docker containers may not be reachable"
|
||||||
|
|||||||
@@ -1,5 +1,11 @@
|
|||||||
|
from typing import Annotated
|
||||||
|
|
||||||
|
from fastapi import Depends
|
||||||
|
from fastapi.security import OAuth2PasswordBearer
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False)
|
||||||
|
|
||||||
|
|
||||||
class UserInfo(BaseModel):
|
class UserInfo(BaseModel):
|
||||||
sub: str
|
sub: str
|
||||||
@@ -9,13 +15,13 @@ class AccessTokenInfo(BaseModel):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def authenticated():
|
def authenticated(token: Annotated[str, Depends(oauth2_scheme)]):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def current_user():
|
def current_user(token: Annotated[str, Depends(oauth2_scheme)]):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
def current_user_optional():
|
def current_user_optional(token: Annotated[str, Depends(oauth2_scheme)]):
|
||||||
return None
|
return None
|
||||||
|
|||||||
@@ -720,6 +720,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
|
|||||||
chunk_text=chunk["text"],
|
chunk_text=chunk["text"],
|
||||||
timestamp=chunk["timestamp"],
|
timestamp=chunk["timestamp"],
|
||||||
duration=chunk["duration"],
|
duration=chunk["duration"],
|
||||||
|
words=chunk["words"],
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
for chunk in chunks
|
for chunk in chunks
|
||||||
@@ -731,41 +732,31 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
|
|||||||
TopicChunkResult(**result[TaskName.DETECT_CHUNK_TOPIC]) for result in results
|
TopicChunkResult(**result[TaskName.DETECT_CHUNK_TOPIC]) for result in results
|
||||||
]
|
]
|
||||||
|
|
||||||
# Build index-to-words map from local chunks (words not in child workflow results)
|
|
||||||
chunks_by_index = {chunk["index"]: chunk["words"] for chunk in chunks}
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||||
if not transcript:
|
if not transcript:
|
||||||
raise ValueError(f"Transcript {input.transcript_id} not found")
|
raise ValueError(f"Transcript {input.transcript_id} not found")
|
||||||
|
|
||||||
# Clear topics for idempotency on retry (each topic gets a fresh UUID,
|
|
||||||
# so upsert_topic would append duplicates without this)
|
|
||||||
await transcripts_controller.update(transcript, {"topics": []})
|
|
||||||
|
|
||||||
for chunk in topic_chunks:
|
for chunk in topic_chunks:
|
||||||
chunk_words = chunks_by_index[chunk.chunk_index]
|
|
||||||
topic = TranscriptTopic(
|
topic = TranscriptTopic(
|
||||||
title=chunk.title,
|
title=chunk.title,
|
||||||
summary=chunk.summary,
|
summary=chunk.summary,
|
||||||
timestamp=chunk.timestamp,
|
timestamp=chunk.timestamp,
|
||||||
transcript=" ".join(w.text for w in chunk_words),
|
transcript=" ".join(w.text for w in chunk.words),
|
||||||
words=chunk_words,
|
words=chunk.words,
|
||||||
)
|
)
|
||||||
await transcripts_controller.upsert_topic(transcript, topic)
|
await transcripts_controller.upsert_topic(transcript, topic)
|
||||||
await append_event_and_broadcast(
|
await append_event_and_broadcast(
|
||||||
input.transcript_id, transcript, "TOPIC", topic, logger=logger
|
input.transcript_id, transcript, "TOPIC", topic, logger=logger
|
||||||
)
|
)
|
||||||
|
|
||||||
# Words omitted from TopicsResult — already persisted to DB above.
|
|
||||||
# Downstream tasks that need words refetch from DB.
|
|
||||||
topics_list = [
|
topics_list = [
|
||||||
TitleSummary(
|
TitleSummary(
|
||||||
title=chunk.title,
|
title=chunk.title,
|
||||||
summary=chunk.summary,
|
summary=chunk.summary,
|
||||||
timestamp=chunk.timestamp,
|
timestamp=chunk.timestamp,
|
||||||
duration=chunk.duration,
|
duration=chunk.duration,
|
||||||
transcript=TranscriptType(words=[]),
|
transcript=TranscriptType(words=chunk.words),
|
||||||
)
|
)
|
||||||
for chunk in topic_chunks
|
for chunk in topic_chunks
|
||||||
]
|
]
|
||||||
@@ -851,8 +842,9 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
|
|||||||
ctx.log(f"extract_subjects: starting for transcript_id={input.transcript_id}")
|
ctx.log(f"extract_subjects: starting for transcript_id={input.transcript_id}")
|
||||||
|
|
||||||
topics_result = ctx.task_output(detect_topics)
|
topics_result = ctx.task_output(detect_topics)
|
||||||
|
topics = topics_result.topics
|
||||||
|
|
||||||
if not topics_result.topics:
|
if not topics:
|
||||||
ctx.log("extract_subjects: no topics, returning empty subjects")
|
ctx.log("extract_subjects: no topics, returning empty subjects")
|
||||||
return SubjectsResult(
|
return SubjectsResult(
|
||||||
subjects=[],
|
subjects=[],
|
||||||
@@ -865,13 +857,11 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
|
|||||||
# sharing DB connections and LLM HTTP pools across forks
|
# sharing DB connections and LLM HTTP pools across forks
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||||
from reflector.llm import LLM # noqa: PLC0415
|
from reflector.llm import LLM # noqa: PLC0415
|
||||||
from reflector.processors.types import words_to_segments # noqa: PLC0415
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||||
|
|
||||||
# Build transcript text from DB topics (words omitted from task output
|
# Build transcript text from topics (same logic as TranscriptFinalSummaryProcessor)
|
||||||
# to reduce Hatchet payload size — refetch from DB where they were persisted)
|
|
||||||
speakermap = {}
|
speakermap = {}
|
||||||
if transcript and transcript.participants:
|
if transcript and transcript.participants:
|
||||||
speakermap = {
|
speakermap = {
|
||||||
@@ -881,8 +871,8 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
|
|||||||
}
|
}
|
||||||
|
|
||||||
text_lines = []
|
text_lines = []
|
||||||
for db_topic in transcript.topics:
|
for topic in topics:
|
||||||
for segment in words_to_segments(db_topic.words):
|
for segment in topic.transcript.as_segments():
|
||||||
name = speakermap.get(segment.speaker, f"Speaker {segment.speaker}")
|
name = speakermap.get(segment.speaker, f"Speaker {segment.speaker}")
|
||||||
text_lines.append(f"{name}: {segment.text}")
|
text_lines.append(f"{name}: {segment.text}")
|
||||||
|
|
||||||
|
|||||||
@@ -95,6 +95,7 @@ class TopicChunkResult(BaseModel):
|
|||||||
summary: str
|
summary: str
|
||||||
timestamp: float
|
timestamp: float
|
||||||
duration: float
|
duration: float
|
||||||
|
words: list[Word]
|
||||||
|
|
||||||
|
|
||||||
class TopicsResult(BaseModel):
|
class TopicsResult(BaseModel):
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
|
|||||||
from reflector.hatchet.workflows.models import TopicChunkResult
|
from reflector.hatchet.workflows.models import TopicChunkResult
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.processors.prompts import TOPIC_PROMPT
|
from reflector.processors.prompts import TOPIC_PROMPT
|
||||||
|
from reflector.processors.types import Word
|
||||||
|
|
||||||
|
|
||||||
class TopicChunkInput(BaseModel):
|
class TopicChunkInput(BaseModel):
|
||||||
@@ -29,6 +30,7 @@ class TopicChunkInput(BaseModel):
|
|||||||
chunk_text: str
|
chunk_text: str
|
||||||
timestamp: float
|
timestamp: float
|
||||||
duration: float
|
duration: float
|
||||||
|
words: list[Word]
|
||||||
|
|
||||||
|
|
||||||
hatchet = HatchetClientManager.get_client()
|
hatchet = HatchetClientManager.get_client()
|
||||||
@@ -97,4 +99,5 @@ async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunk
|
|||||||
summary=response.summary,
|
summary=response.summary,
|
||||||
timestamp=input.timestamp,
|
timestamp=input.timestamp,
|
||||||
duration=input.duration,
|
duration=input.duration,
|
||||||
|
words=input.words,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -48,15 +48,7 @@ class RedisPubSubManager:
|
|||||||
if not self.redis_connection:
|
if not self.redis_connection:
|
||||||
await self.connect()
|
await self.connect()
|
||||||
message = json.dumps(message)
|
message = json.dumps(message)
|
||||||
try:
|
await self.redis_connection.publish(room_id, message)
|
||||||
await self.redis_connection.publish(room_id, message)
|
|
||||||
except RuntimeError:
|
|
||||||
# Celery workers run each task in a new event loop (asyncio.run),
|
|
||||||
# which closes the previous loop. Cached Redis connection is dead.
|
|
||||||
# Reconnect on the current loop and retry.
|
|
||||||
self.redis_connection = None
|
|
||||||
await self.connect()
|
|
||||||
await self.redis_connection.publish(room_id, message)
|
|
||||||
|
|
||||||
async def subscribe(self, room_id: str) -> redis.Redis:
|
async def subscribe(self, room_id: str) -> redis.Redis:
|
||||||
await self.pubsub.subscribe(room_id)
|
await self.pubsub.subscribe(room_id)
|
||||||
|
|||||||
@@ -291,12 +291,7 @@ async def test_validation_idle_transcript_with_recording_allowed():
|
|||||||
recording_id="test-recording-id",
|
recording_id="test-recording-id",
|
||||||
)
|
)
|
||||||
|
|
||||||
with patch(
|
result = await validate_transcript_for_processing(mock_transcript)
|
||||||
"reflector.services.transcript_process.task_is_scheduled_or_active"
|
|
||||||
) as mock_celery_check:
|
|
||||||
mock_celery_check.return_value = False
|
|
||||||
|
|
||||||
result = await validate_transcript_for_processing(mock_transcript)
|
|
||||||
|
|
||||||
assert isinstance(result, ValidationOk)
|
assert isinstance(result, ValidationOk)
|
||||||
assert result.recording_id == "test-recording-id"
|
assert result.recording_id == "test-recording-id"
|
||||||
|
|||||||
@@ -1,185 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for Hatchet payload thinning optimizations.
|
|
||||||
|
|
||||||
Verifies that:
|
|
||||||
1. TopicChunkInput no longer carries words
|
|
||||||
2. TopicChunkResult no longer carries words
|
|
||||||
3. words_to_segments() matches Transcript.as_segments(is_multitrack=False) — behavioral equivalence
|
|
||||||
for the extract_subjects refactoring
|
|
||||||
4. TopicsResult can be constructed with empty transcript words
|
|
||||||
"""
|
|
||||||
|
|
||||||
from reflector.hatchet.workflows.models import TopicChunkResult
|
|
||||||
from reflector.hatchet.workflows.topic_chunk_processing import TopicChunkInput
|
|
||||||
from reflector.processors.types import Word
|
|
||||||
|
|
||||||
|
|
||||||
def _make_words(speaker: int = 0, start: float = 0.0) -> list[Word]:
|
|
||||||
return [
|
|
||||||
Word(text="Hello", start=start, end=start + 0.5, speaker=speaker),
|
|
||||||
Word(text=" world.", start=start + 0.5, end=start + 1.0, speaker=speaker),
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class TestTopicChunkInputNoWords:
|
|
||||||
"""TopicChunkInput must not have a words field."""
|
|
||||||
|
|
||||||
def test_no_words_field(self):
|
|
||||||
assert "words" not in TopicChunkInput.model_fields
|
|
||||||
|
|
||||||
def test_construction_without_words(self):
|
|
||||||
inp = TopicChunkInput(
|
|
||||||
chunk_index=0, chunk_text="Hello world.", timestamp=0.0, duration=1.0
|
|
||||||
)
|
|
||||||
assert inp.chunk_index == 0
|
|
||||||
assert inp.chunk_text == "Hello world."
|
|
||||||
|
|
||||||
def test_rejects_words_kwarg(self):
|
|
||||||
"""Passing words= should raise a validation error (field doesn't exist)."""
|
|
||||||
import pydantic
|
|
||||||
|
|
||||||
try:
|
|
||||||
TopicChunkInput(
|
|
||||||
chunk_index=0,
|
|
||||||
chunk_text="text",
|
|
||||||
timestamp=0.0,
|
|
||||||
duration=1.0,
|
|
||||||
words=_make_words(),
|
|
||||||
)
|
|
||||||
# If pydantic is configured to ignore extra, this won't raise.
|
|
||||||
# Verify the field is still absent from the model.
|
|
||||||
assert "words" not in TopicChunkInput.model_fields
|
|
||||||
except pydantic.ValidationError:
|
|
||||||
pass # Expected
|
|
||||||
|
|
||||||
|
|
||||||
class TestTopicChunkResultNoWords:
|
|
||||||
"""TopicChunkResult must not have a words field."""
|
|
||||||
|
|
||||||
def test_no_words_field(self):
|
|
||||||
assert "words" not in TopicChunkResult.model_fields
|
|
||||||
|
|
||||||
def test_construction_without_words(self):
|
|
||||||
result = TopicChunkResult(
|
|
||||||
chunk_index=0,
|
|
||||||
title="Test",
|
|
||||||
summary="Summary",
|
|
||||||
timestamp=0.0,
|
|
||||||
duration=1.0,
|
|
||||||
)
|
|
||||||
assert result.title == "Test"
|
|
||||||
assert result.chunk_index == 0
|
|
||||||
|
|
||||||
def test_serialization_roundtrip(self):
|
|
||||||
"""Serialized TopicChunkResult has no words key."""
|
|
||||||
result = TopicChunkResult(
|
|
||||||
chunk_index=0,
|
|
||||||
title="Test",
|
|
||||||
summary="Summary",
|
|
||||||
timestamp=0.0,
|
|
||||||
duration=1.0,
|
|
||||||
)
|
|
||||||
data = result.model_dump()
|
|
||||||
assert "words" not in data
|
|
||||||
reconstructed = TopicChunkResult(**data)
|
|
||||||
assert reconstructed == result
|
|
||||||
|
|
||||||
|
|
||||||
class TestWordsToSegmentsBehavioralEquivalence:
|
|
||||||
"""words_to_segments() must produce same output as Transcript.as_segments(is_multitrack=False).
|
|
||||||
|
|
||||||
This ensures the extract_subjects refactoring (from task output topic.transcript.as_segments()
|
|
||||||
to words_to_segments(db_topic.words)) preserves identical behavior.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def test_single_speaker(self):
|
|
||||||
from reflector.processors.types import Transcript as TranscriptType
|
|
||||||
from reflector.processors.types import words_to_segments
|
|
||||||
|
|
||||||
words = _make_words(speaker=0)
|
|
||||||
direct = words_to_segments(words)
|
|
||||||
via_transcript = TranscriptType(words=words).as_segments(is_multitrack=False)
|
|
||||||
|
|
||||||
assert len(direct) == len(via_transcript)
|
|
||||||
for d, v in zip(direct, via_transcript):
|
|
||||||
assert d.text == v.text
|
|
||||||
assert d.speaker == v.speaker
|
|
||||||
assert d.start == v.start
|
|
||||||
assert d.end == v.end
|
|
||||||
|
|
||||||
def test_multiple_speakers(self):
|
|
||||||
from reflector.processors.types import Transcript as TranscriptType
|
|
||||||
from reflector.processors.types import words_to_segments
|
|
||||||
|
|
||||||
words = [
|
|
||||||
Word(text="Hello", start=0.0, end=0.5, speaker=0),
|
|
||||||
Word(text=" world.", start=0.5, end=1.0, speaker=0),
|
|
||||||
Word(text=" How", start=1.0, end=1.5, speaker=1),
|
|
||||||
Word(text=" are", start=1.5, end=2.0, speaker=1),
|
|
||||||
Word(text=" you?", start=2.0, end=2.5, speaker=1),
|
|
||||||
]
|
|
||||||
|
|
||||||
direct = words_to_segments(words)
|
|
||||||
via_transcript = TranscriptType(words=words).as_segments(is_multitrack=False)
|
|
||||||
|
|
||||||
assert len(direct) == len(via_transcript)
|
|
||||||
for d, v in zip(direct, via_transcript):
|
|
||||||
assert d.text == v.text
|
|
||||||
assert d.speaker == v.speaker
|
|
||||||
|
|
||||||
def test_empty_words(self):
|
|
||||||
from reflector.processors.types import Transcript as TranscriptType
|
|
||||||
from reflector.processors.types import words_to_segments
|
|
||||||
|
|
||||||
assert words_to_segments([]) == []
|
|
||||||
assert TranscriptType(words=[]).as_segments(is_multitrack=False) == []
|
|
||||||
|
|
||||||
|
|
||||||
class TestTopicsResultEmptyWords:
|
|
||||||
"""TopicsResult can carry topics with empty transcript words."""
|
|
||||||
|
|
||||||
def test_construction_with_empty_words(self):
|
|
||||||
from reflector.hatchet.workflows.models import TopicsResult
|
|
||||||
from reflector.processors.types import TitleSummary
|
|
||||||
from reflector.processors.types import Transcript as TranscriptType
|
|
||||||
|
|
||||||
topics = [
|
|
||||||
TitleSummary(
|
|
||||||
title="Topic A",
|
|
||||||
summary="Summary A",
|
|
||||||
timestamp=0.0,
|
|
||||||
duration=5.0,
|
|
||||||
transcript=TranscriptType(words=[]),
|
|
||||||
),
|
|
||||||
TitleSummary(
|
|
||||||
title="Topic B",
|
|
||||||
summary="Summary B",
|
|
||||||
timestamp=5.0,
|
|
||||||
duration=5.0,
|
|
||||||
transcript=TranscriptType(words=[]),
|
|
||||||
),
|
|
||||||
]
|
|
||||||
result = TopicsResult(topics=topics)
|
|
||||||
assert len(result.topics) == 2
|
|
||||||
for t in result.topics:
|
|
||||||
assert t.transcript.words == []
|
|
||||||
|
|
||||||
def test_serialization_roundtrip(self):
|
|
||||||
from reflector.hatchet.workflows.models import TopicsResult
|
|
||||||
from reflector.processors.types import TitleSummary
|
|
||||||
from reflector.processors.types import Transcript as TranscriptType
|
|
||||||
|
|
||||||
topics = [
|
|
||||||
TitleSummary(
|
|
||||||
title="Topic",
|
|
||||||
summary="Summary",
|
|
||||||
timestamp=0.0,
|
|
||||||
duration=1.0,
|
|
||||||
transcript=TranscriptType(words=[]),
|
|
||||||
)
|
|
||||||
]
|
|
||||||
result = TopicsResult(topics=topics)
|
|
||||||
data = result.model_dump()
|
|
||||||
reconstructed = TopicsResult(**data)
|
|
||||||
assert len(reconstructed.topics) == 1
|
|
||||||
assert reconstructed.topics[0].transcript.words == []
|
|
||||||
@@ -11,7 +11,6 @@ import {
|
|||||||
import { useRouter } from "next/navigation";
|
import { useRouter } from "next/navigation";
|
||||||
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
import { useTranscriptGet } from "../../../../lib/apiHooks";
|
||||||
import { parseNonEmptyString } from "../../../../lib/utils";
|
import { parseNonEmptyString } from "../../../../lib/utils";
|
||||||
import { useWebSockets } from "../../useWebSockets";
|
|
||||||
|
|
||||||
type TranscriptProcessing = {
|
type TranscriptProcessing = {
|
||||||
params: Promise<{
|
params: Promise<{
|
||||||
@@ -25,7 +24,6 @@ export default function TranscriptProcessing(details: TranscriptProcessing) {
|
|||||||
const router = useRouter();
|
const router = useRouter();
|
||||||
|
|
||||||
const transcript = useTranscriptGet(transcriptId);
|
const transcript = useTranscriptGet(transcriptId);
|
||||||
useWebSockets(transcriptId);
|
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
const status = transcript.data?.status;
|
const status = transcript.data?.status;
|
||||||
|
|||||||
@@ -23,16 +23,7 @@ const useWebRTC = (
|
|||||||
let p: Peer;
|
let p: Peer;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
p = new Peer({
|
p = new Peer({ initiator: true, stream: stream });
|
||||||
initiator: true,
|
|
||||||
stream: stream,
|
|
||||||
// Disable trickle ICE: single SDP exchange (offer + answer) with all candidates.
|
|
||||||
// Required for HTTP-based signaling; trickle needs WebSocket for candidate exchange.
|
|
||||||
trickle: false,
|
|
||||||
config: {
|
|
||||||
iceServers: [{ urls: "stun:stun.l.google.com:19302" }],
|
|
||||||
},
|
|
||||||
});
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
setError(error as Error, "Error creating WebRTC");
|
setError(error as Error, "Error creating WebRTC");
|
||||||
return;
|
return;
|
||||||
|
|||||||
@@ -3,7 +3,7 @@
|
|||||||
"version": "0.1.0",
|
"version": "0.1.0",
|
||||||
"private": true,
|
"private": true,
|
||||||
"scripts": {
|
"scripts": {
|
||||||
"dev": "next dev",
|
"dev": "next dev --turbopack",
|
||||||
"build": "next build",
|
"build": "next build",
|
||||||
"build-production": "next build --experimental-build-mode compile",
|
"build-production": "next build --experimental-build-mode compile",
|
||||||
"start": "next start",
|
"start": "next start",
|
||||||
|
|||||||
Reference in New Issue
Block a user