parallelize hatchet (no-mistakes)

This commit is contained in:
Igor Loskutov
2025-12-22 15:23:54 -05:00
parent 7c2d0698ed
commit 1f41f16928
11 changed files with 709 additions and 213 deletions

View File

@@ -37,6 +37,8 @@ def main() -> None:
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415
diarization_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,
)
@@ -44,7 +46,12 @@ def main() -> None:
worker = hatchet.worker(
"reflector-diarization-worker",
workflows=[diarization_pipeline, track_workflow],
workflows=[
diarization_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,
],
)
def shutdown_handler(signum: int, frame) -> None:

View File

@@ -4,11 +4,23 @@ from reflector.hatchet.workflows.diarization_pipeline import (
PipelineInput,
diarization_pipeline,
)
from reflector.hatchet.workflows.subject_processing import (
SubjectInput,
subject_workflow,
)
from reflector.hatchet.workflows.topic_chunk_processing import (
TopicChunkInput,
topic_chunk_workflow,
)
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
__all__ = [
"diarization_pipeline",
"subject_workflow",
"topic_chunk_workflow",
"track_workflow",
"PipelineInput",
"SubjectInput",
"TopicChunkInput",
"TrackInput",
]

View File

@@ -29,27 +29,37 @@ from reflector.hatchet.broadcast import (
)
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.models import (
ActionItemsResult,
ConsentResult,
FinalizeResult,
MixdownResult,
PaddedTrackInfo,
ParticipantsResult,
ProcessSubjectsResult,
ProcessTracksResult,
RecapResult,
RecordingResult,
SummaryResult,
SubjectsResult,
TitleResult,
TopicsResult,
WaveformResult,
WebhookResult,
ZulipResult,
)
from reflector.hatchet.workflows.subject_processing import (
SubjectInput,
subject_workflow,
)
from reflector.hatchet.workflows.topic_chunk_processing import (
TopicChunkInput,
topic_chunk_workflow,
)
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
from reflector.logger import logger
from reflector.pipelines import topic_processing
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.types import (
TitleSummary,
TitleSummaryWithId,
Word,
)
from reflector.processors.types import (
@@ -324,9 +334,9 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
participants_result = ctx.task_output(get_participants)
source_language = participants_result.source_language
child_coroutines = [
track_workflow.aio_run(
TrackInput(
bulk_runs = [
track_workflow.create_bulk_run_item(
input=TrackInput(
track_index=i,
s3_key=track["s3_key"],
bucket_name=input.bucket_name,
@@ -337,7 +347,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
for i, track in enumerate(input.tracks)
]
results = await asyncio.gather(*child_coroutines)
results = await track_workflow.aio_run_many(bulk_runs)
target_language = participants_result.target_language
@@ -529,55 +539,107 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
@diarization_pipeline.task(
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=300), retries=3
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=600), retries=3
)
@with_error_handling("detect_topics")
async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
"""Detect topics using LLM and save to database (matches Celery on_topic callback)."""
"""Detect topics using parallel child workflows (one per chunk)."""
ctx.log("detect_topics: analyzing transcript for topics")
track_result = ctx.task_output(process_tracks)
words = track_result.all_words
target_language = track_result.target_language
if not words:
ctx.log("detect_topics: no words, returning empty topics")
return TopicsResult(topics=[])
# Deferred imports: Hatchet workers fork processes
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptTopic,
transcripts_controller,
)
word_objects = [Word(**w) for w in words]
transcript_type = TranscriptType(words=word_objects)
chunk_size = 300
chunks = []
for i in range(0, len(words), chunk_size):
chunk_words = words[i : i + chunk_size]
if not chunk_words:
continue
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
first_word = chunk_words[0]
last_word = chunk_words[-1]
timestamp = first_word.get("start", 0)
duration = last_word.get("end", 0) - timestamp
chunk_text = " ".join(w.get("word", "") for w in chunk_words)
chunks.append(
{
"index": len(chunks),
"text": chunk_text,
"timestamp": timestamp,
"duration": duration,
"words": chunk_words,
}
)
if not chunks:
ctx.log("detect_topics: no chunks generated, returning empty topics")
return TopicsResult(topics=[])
ctx.log(f"detect_topics: spawning {len(chunks)} topic chunk workflows in parallel")
bulk_runs = [
topic_chunk_workflow.create_bulk_run_item(
input=TopicChunkInput(
chunk_index=chunk["index"],
chunk_text=chunk["text"],
timestamp=chunk["timestamp"],
duration=chunk["duration"],
words=chunk["words"],
)
)
for chunk in chunks
]
results = await topic_chunk_workflow.aio_run_many(bulk_runs)
topic_results = []
for result in results:
task_result = result.get("detect_chunk_topic", {})
if task_result:
topic_results.append(task_result)
topic_results.sort(key=lambda t: t.get("timestamp", 0))
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
async def on_topic_callback(data):
for topic_data in topic_results:
topic = TranscriptTopic(
title=data.title,
summary=data.summary,
timestamp=data.timestamp,
transcript=data.transcript.text,
words=data.transcript.words,
title=topic_data.get("title", ""),
summary=topic_data.get("summary", ""),
timestamp=topic_data.get("timestamp", 0),
transcript=" ".join(
w.get("word", "") for w in topic_data.get("words", [])
),
words=topic_data.get("words", []),
)
if isinstance(
data, TitleSummaryWithId
): # Celery parity: main_live_pipeline.py
topic.id = data.id
await transcripts_controller.upsert_topic(transcript, topic)
await append_event_and_broadcast(
input.transcript_id, transcript, "TOPIC", topic, logger=logger
)
topics = await topic_processing.detect_topics(
transcript_type,
target_language,
on_topic_callback=on_topic_callback,
empty_pipeline=empty_pipeline,
)
topics_list = [t.model_dump() for t in topics]
# Convert to TitleSummary format for downstream steps
topics_list = [
{
"title": t.get("title", ""),
"summary": t.get("summary", ""),
"timestamp": t.get("timestamp", 0),
"duration": t.get("duration", 0),
"transcript": {"words": t.get("words", [])},
}
for t in topic_results
]
ctx.log(f"detect_topics complete: found {len(topics_list)} topics")
@@ -647,97 +709,280 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
@diarization_pipeline.task(
parents=[detect_topics], execution_timeout=timedelta(seconds=600), retries=3
parents=[detect_topics], execution_timeout=timedelta(seconds=120), retries=3
)
@with_error_handling("generate_summary")
async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
"""Generate meeting summary using LLM and save to database (matches Celery callbacks)."""
ctx.log(f"generate_summary: starting for transcript_id={input.transcript_id}")
@with_error_handling("extract_subjects")
async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult:
"""Extract main subjects/topics from transcript for parallel processing."""
ctx.log(f"extract_subjects: starting for transcript_id={input.transcript_id}")
topics_result = ctx.task_output(detect_topics)
topics = topics_result.topics
ctx.log(f"generate_summary: received {len(topics)} topics from detect_topics")
if not topics:
ctx.log("extract_subjects: no topics, returning empty subjects")
return SubjectsResult(
subjects=[],
transcript_text="",
participant_names=[],
participant_name_to_id={},
)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.summary.summary_builder import ( # noqa: PLC0415
SummaryBuilder,
)
topic_objects = [TitleSummary(**t) for t in topics]
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
# Build transcript text from topics (same logic as TranscriptFinalSummaryProcessor)
speakermap = {}
if transcript and transcript.participants:
speakermap = {
p.speaker: p.name
for p in transcript.participants
if p.speaker is not None and p.name
}
text_lines = []
for topic in topic_objects:
for segment in topic.transcript.as_segments():
name = speakermap.get(segment.speaker, f"Speaker {segment.speaker}")
text_lines.append(f"{name}: {segment.text}")
transcript_text = "\n".join(text_lines)
participant_names = []
participant_name_to_id = {}
if transcript and transcript.participants:
participant_names = [p.name for p in transcript.participants if p.name]
participant_name_to_id = {
p.name: p.id for p in transcript.participants if p.name and p.id
}
llm = LLM(settings=settings)
builder = SummaryBuilder(llm, logger=logger)
builder.set_transcript(transcript_text)
if participant_names:
builder.set_known_participants(
participant_names, participant_name_to_id=participant_name_to_id
)
ctx.log("extract_subjects: calling LLM to extract subjects")
await builder.extract_subjects()
ctx.log(f"extract_subjects complete: {len(builder.subjects)} subjects")
return SubjectsResult(
subjects=builder.subjects,
transcript_text=transcript_text,
participant_names=participant_names,
participant_name_to_id=participant_name_to_id,
)
@diarization_pipeline.task(
parents=[extract_subjects], execution_timeout=timedelta(seconds=600), retries=3
)
@with_error_handling("process_subjects")
async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubjectsResult:
"""Spawn child workflows for each subject (dynamic fan-out, parallel LLM calls)."""
subjects_result = ctx.task_output(extract_subjects)
subjects = subjects_result.subjects
if not subjects:
ctx.log("process_subjects: no subjects to process")
return ProcessSubjectsResult(subject_summaries=[])
ctx.log(f"process_subjects: spawning {len(subjects)} subject workflows in parallel")
bulk_runs = [
subject_workflow.create_bulk_run_item(
input=SubjectInput(
subject=subject,
subject_index=i,
transcript_text=subjects_result.transcript_text,
participant_names=subjects_result.participant_names,
participant_name_to_id=subjects_result.participant_name_to_id,
)
)
for i, subject in enumerate(subjects)
]
results = await subject_workflow.aio_run_many(bulk_runs)
subject_summaries = []
for result in results:
task_result = result.get("generate_detailed_summary", {})
subject_summaries.append(task_result)
ctx.log(f"process_subjects complete: {len(subject_summaries)} summaries")
return ProcessSubjectsResult(subject_summaries=subject_summaries)
@diarization_pipeline.task(
parents=[process_subjects], execution_timeout=timedelta(seconds=120), retries=3
)
@with_error_handling("generate_recap")
async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
"""Generate recap and long summary from subject summaries, save to database."""
ctx.log(f"generate_recap: starting for transcript_id={input.transcript_id}")
subjects_result = ctx.task_output(extract_subjects)
process_result = ctx.task_output(process_subjects)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptActionItems,
TranscriptFinalLongSummary,
TranscriptFinalShortSummary,
transcripts_controller,
)
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.summary.prompts import ( # noqa: PLC0415
RECAP_PROMPT,
build_participant_instructions,
)
topic_objects = [TitleSummary(**t) for t in topics]
ctx.log(f"generate_summary: created {len(topic_objects)} TitleSummary objects")
subject_summaries = process_result.subject_summaries
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
summary_result = None
short_summary_result = None
action_items_result = None
if not subject_summaries:
ctx.log("generate_recap: no subject summaries, returning empty")
return RecapResult(short_summary="", long_summary="")
summaries = [
{"subject": s.get("subject", ""), "summary": s.get("paragraph_summary", "")}
for s in subject_summaries
]
summaries_text = "\n\n".join([f"{s['subject']}: {s['summary']}" for s in summaries])
llm = LLM(settings=settings)
participant_instructions = build_participant_instructions(
subjects_result.participant_names
)
recap_prompt = RECAP_PROMPT
if participant_instructions:
recap_prompt = f"{recap_prompt}\n\n{participant_instructions}"
ctx.log("generate_recap: calling LLM for recap")
recap_response = await llm.get_response(
recap_prompt,
[summaries_text],
tone_name="Recap summarizer",
)
short_summary = str(recap_response)
lines = []
lines.append("# Quick recap")
lines.append("")
lines.append(short_summary)
lines.append("")
lines.append("# Summary")
lines.append("")
for s in summaries:
lines.append(f"**{s['subject']}**")
lines.append(s["summary"])
lines.append("")
long_summary = "\n".join(lines)
async with fresh_db_connection():
ctx.log("generate_summary: DB connection established")
transcript = await transcripts_controller.get_by_id(input.transcript_id)
ctx.log(
f"generate_summary: fetched transcript, exists={transcript is not None}"
)
async def on_long_summary_callback(data):
nonlocal summary_result
ctx.log(
f"generate_summary: on_long_summary_callback received ({len(data.long_summary)} chars)"
)
summary_result = data.long_summary
final_long_summary = TranscriptFinalLongSummary(
long_summary=data.long_summary
)
if transcript:
await transcripts_controller.update(
transcript,
{"long_summary": final_long_summary.long_summary},
{
"short_summary": short_summary,
"long_summary": long_summary,
},
)
ctx.log("generate_summary: saved long_summary to DB")
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_LONG_SUMMARY",
final_long_summary,
logger=logger,
)
ctx.log("generate_summary: broadcasted FINAL_LONG_SUMMARY event")
async def on_short_summary_callback(data):
nonlocal short_summary_result
ctx.log(
f"generate_summary: on_short_summary_callback received ({len(data.short_summary)} chars)"
)
short_summary_result = data.short_summary
final_short_summary = TranscriptFinalShortSummary(
short_summary=data.short_summary
)
await transcripts_controller.update(
transcript,
{"short_summary": final_short_summary.short_summary},
)
ctx.log("generate_summary: saved short_summary to DB")
final_short = TranscriptFinalShortSummary(short_summary=short_summary)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_SHORT_SUMMARY",
final_short_summary,
final_short,
logger=logger,
)
ctx.log("generate_summary: broadcasted FINAL_SHORT_SUMMARY event")
async def on_action_items_callback(data):
nonlocal action_items_result
ctx.log(
f"generate_summary: on_action_items_callback received ({len(data.action_items)} items)"
)
action_items_result = data.action_items
action_items = TranscriptActionItems(action_items=data.action_items)
await transcripts_controller.update(
final_long = TranscriptFinalLongSummary(long_summary=long_summary)
await append_event_and_broadcast(
input.transcript_id,
transcript,
{"action_items": action_items.action_items},
"FINAL_LONG_SUMMARY",
final_long,
logger=logger,
)
ctx.log("generate_recap complete")
return RecapResult(short_summary=short_summary, long_summary=long_summary)
@diarization_pipeline.task(
parents=[extract_subjects], execution_timeout=timedelta(seconds=180), retries=3
)
@with_error_handling("identify_action_items")
async def identify_action_items(
input: PipelineInput, ctx: Context
) -> ActionItemsResult:
"""Identify action items from transcript (parallel with subject processing)."""
ctx.log(f"identify_action_items: starting for transcript_id={input.transcript_id}")
subjects_result = ctx.task_output(extract_subjects)
if not subjects_result.transcript_text:
ctx.log("identify_action_items: no transcript text, returning empty")
return ActionItemsResult(action_items={"decisions": [], "next_steps": []})
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptActionItems,
transcripts_controller,
)
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.summary.summary_builder import ( # noqa: PLC0415
SummaryBuilder,
)
llm = LLM(settings=settings)
builder = SummaryBuilder(llm, logger=logger)
builder.set_transcript(subjects_result.transcript_text)
if subjects_result.participant_names:
builder.set_known_participants(
subjects_result.participant_names,
participant_name_to_id=subjects_result.participant_name_to_id,
)
ctx.log("identify_action_items: calling LLM")
action_items_response = await builder.identify_action_items()
if action_items_response is None:
raise RuntimeError("Failed to identify action items - LLM call failed")
action_items_dict = action_items_response.model_dump()
# Save to database and broadcast
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
action_items = TranscriptActionItems(action_items=action_items_dict)
await transcripts_controller.update(
transcript, {"action_items": action_items.action_items}
)
ctx.log("generate_summary: saved action_items to DB")
await append_event_and_broadcast(
input.transcript_id,
transcript,
@@ -745,33 +990,17 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
action_items,
logger=logger,
)
ctx.log("generate_summary: broadcasted ACTION_ITEMS event")
ctx.log(
"generate_summary: calling topic_processing.generate_summaries (LLM calls)..."
)
await topic_processing.generate_summaries(
topic_objects,
transcript,
on_long_summary_callback=on_long_summary_callback,
on_short_summary_callback=on_short_summary_callback,
on_action_items_callback=on_action_items_callback,
empty_pipeline=empty_pipeline,
logger=logger,
)
ctx.log("generate_summary: topic_processing.generate_summaries returned")
ctx.log("generate_summary complete")
return SummaryResult(
summary=summary_result,
short_summary=short_summary_result,
action_items=action_items_result,
ctx.log(
f"identify_action_items complete: {len(action_items_dict.get('decisions', []))} decisions, "
f"{len(action_items_dict.get('next_steps', []))} next steps"
)
return ActionItemsResult(action_items=action_items_dict)
@diarization_pipeline.task(
parents=[generate_waveform, generate_title, generate_summary],
parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=60),
retries=3,
)

View File

@@ -79,6 +79,17 @@ class WaveformResult(BaseModel):
waveform_generated: bool
class TopicChunkResult(BaseModel):
"""Result from topic chunk child workflow."""
chunk_index: int
title: str
summary: str
timestamp: float
duration: float
words: list[dict[str, Any]]
class TopicsResult(BaseModel):
"""Result from detect_topics task."""
@@ -91,12 +102,41 @@ class TitleResult(BaseModel):
title: str | None
class SummaryResult(BaseModel):
"""Result from generate_summary task."""
class SubjectsResult(BaseModel):
"""Result from extract_subjects task."""
summary: str | None
short_summary: str | None
action_items: dict | None = None
subjects: list[str]
transcript_text: str # Formatted transcript for LLM consumption
participant_names: list[str]
participant_name_to_id: dict[str, str]
class SubjectSummaryResult(BaseModel):
"""Result from subject summary child workflow."""
subject: str
subject_index: int
detailed_summary: str
paragraph_summary: str
class ProcessSubjectsResult(BaseModel):
"""Result from process_subjects fan-out task."""
subject_summaries: list[dict[str, Any]] # List of SubjectSummaryResult dicts
class RecapResult(BaseModel):
"""Result from generate_recap task."""
short_summary: str # Recap paragraph
long_summary: str # Full markdown summary
class ActionItemsResult(BaseModel):
"""Result from identify_action_items task."""
action_items: dict # ActionItemsResponse as dict (may have empty lists)
class FinalizeResult(BaseModel):

View File

@@ -0,0 +1,101 @@
"""
Hatchet child workflow: SubjectProcessing
Handles individual subject/topic summary generation.
Spawned dynamically by the main diarization pipeline for each extracted subject
via aio_run_many() for parallel processing.
"""
from datetime import timedelta
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.models import SubjectSummaryResult
from reflector.logger import logger
from reflector.processors.summary.prompts import (
DETAILED_SUBJECT_PROMPT_TEMPLATE,
PARAGRAPH_SUMMARY_PROMPT,
build_participant_instructions,
)
class SubjectInput(BaseModel):
"""Input for individual subject processing."""
subject: str
subject_index: int
transcript_text: str
participant_names: list[str]
participant_name_to_id: dict[str, str]
hatchet = HatchetClientManager.get_client()
subject_workflow = hatchet.workflow(
name="SubjectProcessing", input_validator=SubjectInput
)
@subject_workflow.task(execution_timeout=timedelta(seconds=120), retries=3)
async def generate_detailed_summary(
input: SubjectInput, ctx: Context
) -> SubjectSummaryResult:
"""Generate detailed analysis for a single subject, then condense to paragraph."""
ctx.log(
f"generate_detailed_summary: subject '{input.subject}' (index {input.subject_index})"
)
logger.info(
"[Hatchet] generate_detailed_summary",
subject=input.subject,
subject_index=input.subject_index,
)
# Deferred imports: Hatchet workers fork processes, fresh imports ensure
# LLM HTTP connection pools aren't shared across forks
from reflector.llm import LLM # noqa: PLC0415
from reflector.settings import settings # noqa: PLC0415
llm = LLM(settings=settings)
participant_instructions = build_participant_instructions(input.participant_names)
detailed_prompt = DETAILED_SUBJECT_PROMPT_TEMPLATE.format(subject=input.subject)
if participant_instructions:
detailed_prompt = f"{detailed_prompt}\n\n{participant_instructions}"
ctx.log("generate_detailed_summary: calling LLM for detailed analysis")
detailed_response = await llm.get_response(
detailed_prompt,
[input.transcript_text],
tone_name="Topic assistant",
)
detailed_summary = str(detailed_response)
paragraph_prompt = PARAGRAPH_SUMMARY_PROMPT
if participant_instructions:
paragraph_prompt = f"{paragraph_prompt}\n\n{participant_instructions}"
ctx.log("generate_detailed_summary: calling LLM for paragraph summary")
paragraph_response = await llm.get_response(
paragraph_prompt,
[detailed_summary],
tone_name="Topic summarizer",
)
paragraph_summary = str(paragraph_response)
ctx.log(f"generate_detailed_summary complete: subject '{input.subject}'")
logger.info(
"[Hatchet] generate_detailed_summary complete",
subject=input.subject,
subject_index=input.subject_index,
detailed_len=len(detailed_summary),
paragraph_len=len(paragraph_summary),
)
return SubjectSummaryResult(
subject=input.subject,
subject_index=input.subject_index,
detailed_summary=detailed_summary,
paragraph_summary=paragraph_summary,
)

View File

@@ -0,0 +1,85 @@
"""
Hatchet child workflow: TopicChunkProcessing
Handles topic detection for individual transcript chunks.
Spawned dynamically by detect_topics via aio_run_many() for parallel processing.
"""
from datetime import timedelta
from typing import Any
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.models import TopicChunkResult
from reflector.logger import logger
from reflector.processors.prompts import TOPIC_PROMPT
class TopicChunkInput(BaseModel):
"""Input for individual topic chunk processing."""
chunk_index: int
chunk_text: str
timestamp: float
duration: float
words: list[dict[str, Any]]
hatchet = HatchetClientManager.get_client()
topic_chunk_workflow = hatchet.workflow(
name="TopicChunkProcessing", input_validator=TopicChunkInput
)
@topic_chunk_workflow.task(execution_timeout=timedelta(seconds=60), retries=3)
async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunkResult:
"""Detect topic for a single transcript chunk."""
ctx.log(f"detect_chunk_topic: chunk {input.chunk_index}")
logger.info(
"[Hatchet] detect_chunk_topic",
chunk_index=input.chunk_index,
text_length=len(input.chunk_text),
)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing LLM HTTP connection pools across forks
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.transcript_topic_detector import ( # noqa: PLC0415
TopicResponse,
)
from reflector.settings import settings # noqa: PLC0415
from reflector.utils.text import clean_title # noqa: PLC0415
llm = LLM(settings=settings, temperature=0.9, max_tokens=500)
prompt = TOPIC_PROMPT.format(text=input.chunk_text)
response = await llm.get_structured_response(
prompt,
[input.chunk_text],
TopicResponse,
tone_name="Topic analyzer",
timeout=settings.LLM_STRUCTURED_RESPONSE_TIMEOUT,
)
title = clean_title(response.title)
ctx.log(
f"detect_chunk_topic complete: chunk {input.chunk_index}, title='{title[:50]}'"
)
logger.info(
"[Hatchet] detect_chunk_topic complete",
chunk_index=input.chunk_index,
title=title[:50],
)
return TopicChunkResult(
chunk_index=input.chunk_index,
title=title,
summary=response.summary,
timestamp=input.timestamp,
duration=input.duration,
words=input.words,
)

View File

@@ -0,0 +1,30 @@
"""
LLM prompts for transcript processing.
Extracted to a separate module to avoid circular imports when importing
from processor modules (which import LLM/settings at module level).
"""
from textwrap import dedent
TOPIC_PROMPT = dedent(
"""
Analyze the following transcript segment and extract the main topic being discussed.
Focus on the substantive content and ignore small talk or administrative chatter.
Create a title that:
- Captures the specific subject matter being discussed
- Is descriptive and self-explanatory
- Uses professional language
- Is specific rather than generic
For the summary:
- Summarize the key points in maximum two sentences
- Focus on what was discussed, decided, or accomplished
- Be concise but informative
<transcript>
{text}
</transcript>
"""
).strip()

View File

@@ -0,0 +1,71 @@
"""
LLM prompts for summary generation.
Extracted to a separate module to avoid circular imports when importing
from summary_builder.py (which imports LLM/settings at module level).
"""
from textwrap import dedent
def build_participant_instructions(participant_names: list[str]) -> str:
"""Build participant context instructions for LLM prompts."""
if not participant_names:
return ""
participants_list = ", ".join(participant_names)
return dedent(
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
"""
Get me information about the topic "{subject}"
# RESPONSE GUIDELINES
Follow this structured approach to create the topic summary:
- Highlight important arguments, insights, or data presented.
- Outline decisions made.
- Indicate any decisions reached, including any rationale or key factors
that influenced these decisions.
- Detail action items and responsibilities.
- For each decision or unresolved issue, list specific action items agreed
upon, along with assigned individuals or teams responsible for each task.
- Specify deadlines or timelines if mentioned. For each action item,
include any deadlines or timeframes discussed for completion or follow-up.
- Mention unresolved issues or topics needing further discussion, aiding in
planning future meetings or follow-up actions.
- Do not include topic unrelated to {subject}.
# OUTPUT
Your summary should be clear, concise, and structured, covering all major
points, decisions, and action items from the meeting. It should be easy to
understand for someone not present, providing a comprehensive understanding
of what transpired and what needs to be done next. The summary should not
exceed one page to ensure brevity and focus.
"""
).strip()
PARAGRAPH_SUMMARY_PROMPT = dedent(
"""
Summarize the mentioned topic in 1 paragraph.
It will be integrated into the final summary, so just for this topic.
"""
).strip()
RECAP_PROMPT = dedent(
"""
Provide a high-level quick recap of the following meeting, fitting in one paragraph.
Do not include decisions, action items or unresolved issue, just highlight the high moments.
Just dive into the meeting, be concise and do not include unnecessary details.
As we already know it is a meeting, do not start with 'During the meeting' or equivalent.
"""
).strip()

View File

@@ -15,6 +15,12 @@ import structlog
from pydantic import BaseModel, Field
from reflector.llm import LLM
from reflector.processors.summary.prompts import (
DETAILED_SUBJECT_PROMPT_TEMPLATE,
PARAGRAPH_SUMMARY_PROMPT,
RECAP_PROMPT,
build_participant_instructions,
)
from reflector.settings import settings
T = TypeVar("T", bound=BaseModel)
@@ -52,50 +58,6 @@ SUBJECTS_PROMPT = dedent(
"""
).strip()
DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
"""
Get me information about the topic "{subject}"
# RESPONSE GUIDELINES
Follow this structured approach to create the topic summary:
- Highlight important arguments, insights, or data presented.
- Outline decisions made.
- Indicate any decisions reached, including any rationale or key factors
that influenced these decisions.
- Detail action items and responsibilities.
- For each decision or unresolved issue, list specific action items agreed
upon, along with assigned individuals or teams responsible for each task.
- Specify deadlines or timelines if mentioned. For each action item,
include any deadlines or timeframes discussed for completion or follow-up.
- Mention unresolved issues or topics needing further discussion, aiding in
planning future meetings or follow-up actions.
- Do not include topic unrelated to {subject}.
# OUTPUT
Your summary should be clear, concise, and structured, covering all major
points, decisions, and action items from the meeting. It should be easy to
understand for someone not present, providing a comprehensive understanding
of what transpired and what needs to be done next. The summary should not
exceed one page to ensure brevity and focus.
"""
).strip()
PARAGRAPH_SUMMARY_PROMPT = dedent(
"""
Summarize the mentioned topic in 1 paragraph.
It will be integrated into the final summary, so just for this topic.
"""
).strip()
RECAP_PROMPT = dedent(
"""
Provide a high-level quick recap of the following meeting, fitting in one paragraph.
Do not include decisions, action items or unresolved issue, just highlight the high moments.
Just dive into the meeting, be concise and do not include unnecessary details.
As we already know it is a meeting, do not start with 'During the meeting' or equivalent.
"""
).strip()
ACTION_ITEMS_PROMPT = dedent(
"""
Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next.
@@ -331,17 +293,7 @@ class SummaryBuilder:
participants_md = self.format_list_md(participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}"
participants_list = ", ".join(participants)
self.participant_instructions = dedent(
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
self.participant_instructions = build_participant_instructions(participants)
async def identify_participants(self) -> None:
"""
@@ -377,18 +329,9 @@ class SummaryBuilder:
participants_md = self.format_list_md(unique_participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}"
# Set instructions that will be automatically added to all prompts
participants_list = ", ".join(unique_participants)
self.participant_instructions = dedent(
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
self.participant_instructions = build_participant_instructions(
unique_participants
)
else:
self.logger.warning("No participants identified in the transcript")

View File

@@ -1,35 +1,12 @@
from textwrap import dedent
from pydantic import AliasChoices, BaseModel, Field
from reflector.llm import LLM
from reflector.processors.base import Processor
from reflector.processors.prompts import TOPIC_PROMPT
from reflector.processors.types import TitleSummary, Transcript
from reflector.settings import settings
from reflector.utils.text import clean_title
TOPIC_PROMPT = dedent(
"""
Analyze the following transcript segment and extract the main topic being discussed.
Focus on the substantive content and ignore small talk or administrative chatter.
Create a title that:
- Captures the specific subject matter being discussed
- Is descriptive and self-explanatory
- Uses professional language
- Is specific rather than generic
For the summary:
- Summarize the key points in maximum two sentences
- Focus on what was discussed, decided, or accomplished
- Be concise but informative
<transcript>
{text}
</transcript>
"""
).strip()
class TopicResponse(BaseModel):
"""Structured response for topic detection"""

View File

@@ -102,7 +102,8 @@ async def validate_transcript_for_processing(
if transcript.locked:
return ValidationLocked(detail="Recording is locked")
if transcript.status == "idle":
# hatchet is idempotent anyways + if it wasn't dispatched successfully
if transcript.status == "idle" and not settings.HATCHET_ENABLED:
return ValidationNotReady(detail="Recording is not ready for processing")
# Check Celery tasks