mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-03-23 23:46:47 +00:00
Compare commits
30 Commits
feat/paylo
...
local-llm-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d4c5c463c | ||
|
|
f6a23cfddd | ||
|
|
b1405af8c7 | ||
|
|
71ad8a294f | ||
|
|
bba272505f | ||
|
|
67aea78243 | ||
|
|
2d81321733 | ||
|
|
8c2b720564 | ||
|
|
88e945ec00 | ||
|
|
f6201dd378 | ||
|
|
9f62959069 | ||
|
|
0353c23a94 | ||
|
|
7372f80530 | ||
|
|
208361c8cc | ||
|
|
70d17997ef | ||
| adc4c20bf4 | |||
|
|
ec4f356b4c | ||
|
|
39573626e9 | ||
|
|
d9aa6d6eb0 | ||
|
|
e1ea914675 | ||
|
|
7200f3c65f | ||
|
|
2f669dfd89 | ||
|
|
d25d77333c | ||
|
|
427254fe33 | ||
|
|
46750abad9 | ||
|
|
f36b95b09f | ||
|
|
608a3805c5 | ||
|
|
d0af8ffdb7 | ||
|
|
33a93db802 | ||
|
|
663345ece6 |
10
COMPOSE_STANDALONE_TODO.md
Normal file
10
COMPOSE_STANDALONE_TODO.md
Normal file
@@ -0,0 +1,10 @@
|
||||
# Standalone Compose: Remaining Production Work
|
||||
|
||||
## Server/worker/beat: remove host network mode + bind mounts
|
||||
|
||||
Currently `server` uses `network_mode: host` and all three services bind-mount `./server/:/app/`. For full standalone prod:
|
||||
|
||||
- Remove `network_mode: host` from server
|
||||
- Remove bind-mount volumes from server, worker, beat (use built image only)
|
||||
- Update `compose_cmd` in `setup-standalone.sh` to not rely on host network
|
||||
- Change `SERVER_API_URL` from `http://host.docker.internal:1250` to `http://server:1250` (server reachable via Docker network once off host mode)
|
||||
@@ -76,6 +76,15 @@ services:
|
||||
DIARIZATION_BACKEND: modal
|
||||
DIARIZATION_URL: http://cpu:8000
|
||||
|
||||
web:
|
||||
image: reflector-frontend-standalone
|
||||
build:
|
||||
context: ./www
|
||||
command: ["node", "server.js"]
|
||||
volumes: !reset []
|
||||
environment:
|
||||
NODE_ENV: production
|
||||
|
||||
cpu:
|
||||
build:
|
||||
context: ./gpu/self_hosted
|
||||
|
||||
@@ -720,6 +720,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
|
||||
chunk_text=chunk["text"],
|
||||
timestamp=chunk["timestamp"],
|
||||
duration=chunk["duration"],
|
||||
words=chunk["words"],
|
||||
)
|
||||
)
|
||||
for chunk in chunks
|
||||
@@ -731,41 +732,31 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
|
||||
TopicChunkResult(**result[TaskName.DETECT_CHUNK_TOPIC]) for result in results
|
||||
]
|
||||
|
||||
# Build index-to-words map from local chunks (words not in child workflow results)
|
||||
chunks_by_index = {chunk["index"]: chunk["words"] for chunk in chunks}
|
||||
|
||||
async with fresh_db_connection():
|
||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||
if not transcript:
|
||||
raise ValueError(f"Transcript {input.transcript_id} not found")
|
||||
|
||||
# Clear topics for idempotency on retry (each topic gets a fresh UUID,
|
||||
# so upsert_topic would append duplicates without this)
|
||||
await transcripts_controller.update(transcript, {"topics": []})
|
||||
|
||||
for chunk in topic_chunks:
|
||||
chunk_words = chunks_by_index[chunk.chunk_index]
|
||||
topic = TranscriptTopic(
|
||||
title=chunk.title,
|
||||
summary=chunk.summary,
|
||||
timestamp=chunk.timestamp,
|
||||
transcript=" ".join(w.text for w in chunk_words),
|
||||
words=chunk_words,
|
||||
transcript=" ".join(w.text for w in chunk.words),
|
||||
words=chunk.words,
|
||||
)
|
||||
await transcripts_controller.upsert_topic(transcript, topic)
|
||||
await append_event_and_broadcast(
|
||||
input.transcript_id, transcript, "TOPIC", topic, logger=logger
|
||||
)
|
||||
|
||||
# Words omitted from TopicsResult — already persisted to DB above.
|
||||
# Downstream tasks that need words refetch from DB.
|
||||
topics_list = [
|
||||
TitleSummary(
|
||||
title=chunk.title,
|
||||
summary=chunk.summary,
|
||||
timestamp=chunk.timestamp,
|
||||
duration=chunk.duration,
|
||||
transcript=TranscriptType(words=[]),
|
||||
transcript=TranscriptType(words=chunk.words),
|
||||
)
|
||||
for chunk in topic_chunks
|
||||
]
|
||||
@@ -851,8 +842,9 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
|
||||
ctx.log(f"extract_subjects: starting for transcript_id={input.transcript_id}")
|
||||
|
||||
topics_result = ctx.task_output(detect_topics)
|
||||
topics = topics_result.topics
|
||||
|
||||
if not topics_result.topics:
|
||||
if not topics:
|
||||
ctx.log("extract_subjects: no topics, returning empty subjects")
|
||||
return SubjectsResult(
|
||||
subjects=[],
|
||||
@@ -865,13 +857,11 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
|
||||
# sharing DB connections and LLM HTTP pools across forks
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
from reflector.llm import LLM # noqa: PLC0415
|
||||
from reflector.processors.types import words_to_segments # noqa: PLC0415
|
||||
|
||||
async with fresh_db_connection():
|
||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||
|
||||
# Build transcript text from DB topics (words omitted from task output
|
||||
# to reduce Hatchet payload size — refetch from DB where they were persisted)
|
||||
# Build transcript text from topics (same logic as TranscriptFinalSummaryProcessor)
|
||||
speakermap = {}
|
||||
if transcript and transcript.participants:
|
||||
speakermap = {
|
||||
@@ -881,8 +871,8 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
|
||||
}
|
||||
|
||||
text_lines = []
|
||||
for db_topic in transcript.topics:
|
||||
for segment in words_to_segments(db_topic.words):
|
||||
for topic in topics:
|
||||
for segment in topic.transcript.as_segments():
|
||||
name = speakermap.get(segment.speaker, f"Speaker {segment.speaker}")
|
||||
text_lines.append(f"{name}: {segment.text}")
|
||||
|
||||
|
||||
@@ -95,6 +95,7 @@ class TopicChunkResult(BaseModel):
|
||||
summary: str
|
||||
timestamp: float
|
||||
duration: float
|
||||
words: list[Word]
|
||||
|
||||
|
||||
class TopicsResult(BaseModel):
|
||||
|
||||
@@ -20,6 +20,7 @@ from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
|
||||
from reflector.hatchet.workflows.models import TopicChunkResult
|
||||
from reflector.logger import logger
|
||||
from reflector.processors.prompts import TOPIC_PROMPT
|
||||
from reflector.processors.types import Word
|
||||
|
||||
|
||||
class TopicChunkInput(BaseModel):
|
||||
@@ -29,6 +30,7 @@ class TopicChunkInput(BaseModel):
|
||||
chunk_text: str
|
||||
timestamp: float
|
||||
duration: float
|
||||
words: list[Word]
|
||||
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
@@ -97,4 +99,5 @@ async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunk
|
||||
summary=response.summary,
|
||||
timestamp=input.timestamp,
|
||||
duration=input.duration,
|
||||
words=input.words,
|
||||
)
|
||||
|
||||
@@ -1,185 +0,0 @@
|
||||
"""
|
||||
Tests for Hatchet payload thinning optimizations.
|
||||
|
||||
Verifies that:
|
||||
1. TopicChunkInput no longer carries words
|
||||
2. TopicChunkResult no longer carries words
|
||||
3. words_to_segments() matches Transcript.as_segments(is_multitrack=False) — behavioral equivalence
|
||||
for the extract_subjects refactoring
|
||||
4. TopicsResult can be constructed with empty transcript words
|
||||
"""
|
||||
|
||||
from reflector.hatchet.workflows.models import TopicChunkResult
|
||||
from reflector.hatchet.workflows.topic_chunk_processing import TopicChunkInput
|
||||
from reflector.processors.types import Word
|
||||
|
||||
|
||||
def _make_words(speaker: int = 0, start: float = 0.0) -> list[Word]:
|
||||
return [
|
||||
Word(text="Hello", start=start, end=start + 0.5, speaker=speaker),
|
||||
Word(text=" world.", start=start + 0.5, end=start + 1.0, speaker=speaker),
|
||||
]
|
||||
|
||||
|
||||
class TestTopicChunkInputNoWords:
|
||||
"""TopicChunkInput must not have a words field."""
|
||||
|
||||
def test_no_words_field(self):
|
||||
assert "words" not in TopicChunkInput.model_fields
|
||||
|
||||
def test_construction_without_words(self):
|
||||
inp = TopicChunkInput(
|
||||
chunk_index=0, chunk_text="Hello world.", timestamp=0.0, duration=1.0
|
||||
)
|
||||
assert inp.chunk_index == 0
|
||||
assert inp.chunk_text == "Hello world."
|
||||
|
||||
def test_rejects_words_kwarg(self):
|
||||
"""Passing words= should raise a validation error (field doesn't exist)."""
|
||||
import pydantic
|
||||
|
||||
try:
|
||||
TopicChunkInput(
|
||||
chunk_index=0,
|
||||
chunk_text="text",
|
||||
timestamp=0.0,
|
||||
duration=1.0,
|
||||
words=_make_words(),
|
||||
)
|
||||
# If pydantic is configured to ignore extra, this won't raise.
|
||||
# Verify the field is still absent from the model.
|
||||
assert "words" not in TopicChunkInput.model_fields
|
||||
except pydantic.ValidationError:
|
||||
pass # Expected
|
||||
|
||||
|
||||
class TestTopicChunkResultNoWords:
|
||||
"""TopicChunkResult must not have a words field."""
|
||||
|
||||
def test_no_words_field(self):
|
||||
assert "words" not in TopicChunkResult.model_fields
|
||||
|
||||
def test_construction_without_words(self):
|
||||
result = TopicChunkResult(
|
||||
chunk_index=0,
|
||||
title="Test",
|
||||
summary="Summary",
|
||||
timestamp=0.0,
|
||||
duration=1.0,
|
||||
)
|
||||
assert result.title == "Test"
|
||||
assert result.chunk_index == 0
|
||||
|
||||
def test_serialization_roundtrip(self):
|
||||
"""Serialized TopicChunkResult has no words key."""
|
||||
result = TopicChunkResult(
|
||||
chunk_index=0,
|
||||
title="Test",
|
||||
summary="Summary",
|
||||
timestamp=0.0,
|
||||
duration=1.0,
|
||||
)
|
||||
data = result.model_dump()
|
||||
assert "words" not in data
|
||||
reconstructed = TopicChunkResult(**data)
|
||||
assert reconstructed == result
|
||||
|
||||
|
||||
class TestWordsToSegmentsBehavioralEquivalence:
|
||||
"""words_to_segments() must produce same output as Transcript.as_segments(is_multitrack=False).
|
||||
|
||||
This ensures the extract_subjects refactoring (from task output topic.transcript.as_segments()
|
||||
to words_to_segments(db_topic.words)) preserves identical behavior.
|
||||
"""
|
||||
|
||||
def test_single_speaker(self):
|
||||
from reflector.processors.types import Transcript as TranscriptType
|
||||
from reflector.processors.types import words_to_segments
|
||||
|
||||
words = _make_words(speaker=0)
|
||||
direct = words_to_segments(words)
|
||||
via_transcript = TranscriptType(words=words).as_segments(is_multitrack=False)
|
||||
|
||||
assert len(direct) == len(via_transcript)
|
||||
for d, v in zip(direct, via_transcript):
|
||||
assert d.text == v.text
|
||||
assert d.speaker == v.speaker
|
||||
assert d.start == v.start
|
||||
assert d.end == v.end
|
||||
|
||||
def test_multiple_speakers(self):
|
||||
from reflector.processors.types import Transcript as TranscriptType
|
||||
from reflector.processors.types import words_to_segments
|
||||
|
||||
words = [
|
||||
Word(text="Hello", start=0.0, end=0.5, speaker=0),
|
||||
Word(text=" world.", start=0.5, end=1.0, speaker=0),
|
||||
Word(text=" How", start=1.0, end=1.5, speaker=1),
|
||||
Word(text=" are", start=1.5, end=2.0, speaker=1),
|
||||
Word(text=" you?", start=2.0, end=2.5, speaker=1),
|
||||
]
|
||||
|
||||
direct = words_to_segments(words)
|
||||
via_transcript = TranscriptType(words=words).as_segments(is_multitrack=False)
|
||||
|
||||
assert len(direct) == len(via_transcript)
|
||||
for d, v in zip(direct, via_transcript):
|
||||
assert d.text == v.text
|
||||
assert d.speaker == v.speaker
|
||||
|
||||
def test_empty_words(self):
|
||||
from reflector.processors.types import Transcript as TranscriptType
|
||||
from reflector.processors.types import words_to_segments
|
||||
|
||||
assert words_to_segments([]) == []
|
||||
assert TranscriptType(words=[]).as_segments(is_multitrack=False) == []
|
||||
|
||||
|
||||
class TestTopicsResultEmptyWords:
|
||||
"""TopicsResult can carry topics with empty transcript words."""
|
||||
|
||||
def test_construction_with_empty_words(self):
|
||||
from reflector.hatchet.workflows.models import TopicsResult
|
||||
from reflector.processors.types import TitleSummary
|
||||
from reflector.processors.types import Transcript as TranscriptType
|
||||
|
||||
topics = [
|
||||
TitleSummary(
|
||||
title="Topic A",
|
||||
summary="Summary A",
|
||||
timestamp=0.0,
|
||||
duration=5.0,
|
||||
transcript=TranscriptType(words=[]),
|
||||
),
|
||||
TitleSummary(
|
||||
title="Topic B",
|
||||
summary="Summary B",
|
||||
timestamp=5.0,
|
||||
duration=5.0,
|
||||
transcript=TranscriptType(words=[]),
|
||||
),
|
||||
]
|
||||
result = TopicsResult(topics=topics)
|
||||
assert len(result.topics) == 2
|
||||
for t in result.topics:
|
||||
assert t.transcript.words == []
|
||||
|
||||
def test_serialization_roundtrip(self):
|
||||
from reflector.hatchet.workflows.models import TopicsResult
|
||||
from reflector.processors.types import TitleSummary
|
||||
from reflector.processors.types import Transcript as TranscriptType
|
||||
|
||||
topics = [
|
||||
TitleSummary(
|
||||
title="Topic",
|
||||
summary="Summary",
|
||||
timestamp=0.0,
|
||||
duration=1.0,
|
||||
transcript=TranscriptType(words=[]),
|
||||
)
|
||||
]
|
||||
result = TopicsResult(topics=topics)
|
||||
data = result.model_dump()
|
||||
reconstructed = TopicsResult(**data)
|
||||
assert len(reconstructed.topics) == 1
|
||||
assert reconstructed.topics[0].transcript.words == []
|
||||
Reference in New Issue
Block a user