Compare commits

..

7 Commits

Author SHA1 Message Date
Igor Loskutov
2c1c48a219 self-review 2025-12-22 18:35:33 -05:00
Igor Loskutov
4b495417b5 self-review 2025-12-22 18:18:03 -05:00
Igor Loskutov
2cbc373cc3 self-review 2025-12-22 18:06:59 -05:00
Igor Loskutov
8665204ab1 self-review 2025-12-22 17:19:00 -05:00
Igor Loskutov
42bde5adb6 comments 2025-12-22 15:59:52 -05:00
Igor Loskutov
b9698c2aaf dry (no-mistakes) (minimal) 2025-12-22 15:58:13 -05:00
Igor Loskutov
1f41f16928 parallelize hatchet (no-mistakes) 2025-12-22 15:23:54 -05:00
14 changed files with 767 additions and 274 deletions

View File

@@ -37,6 +37,8 @@ def main() -> None:
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415 from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415 from reflector.hatchet.workflows import ( # noqa: PLC0415
diarization_pipeline, diarization_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow, track_workflow,
) )
@@ -44,7 +46,12 @@ def main() -> None:
worker = hatchet.worker( worker = hatchet.worker(
"reflector-diarization-worker", "reflector-diarization-worker",
workflows=[diarization_pipeline, track_workflow], workflows=[
diarization_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,
],
) )
def shutdown_handler(signum: int, frame) -> None: def shutdown_handler(signum: int, frame) -> None:

View File

@@ -4,11 +4,23 @@ from reflector.hatchet.workflows.diarization_pipeline import (
PipelineInput, PipelineInput,
diarization_pipeline, diarization_pipeline,
) )
from reflector.hatchet.workflows.subject_processing import (
SubjectInput,
subject_workflow,
)
from reflector.hatchet.workflows.topic_chunk_processing import (
TopicChunkInput,
topic_chunk_workflow,
)
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
__all__ = [ __all__ = [
"diarization_pipeline", "diarization_pipeline",
"subject_workflow",
"topic_chunk_workflow",
"track_workflow", "track_workflow",
"PipelineInput", "PipelineInput",
"SubjectInput",
"TopicChunkInput",
"TrackInput", "TrackInput",
] ]

View File

@@ -29,32 +29,42 @@ from reflector.hatchet.broadcast import (
) )
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.models import ( from reflector.hatchet.workflows.models import (
ActionItemsResult,
ConsentResult, ConsentResult,
FinalizeResult, FinalizeResult,
MixdownResult, MixdownResult,
PaddedTrackInfo, PaddedTrackInfo,
PadTrackResult,
ParticipantInfo,
ParticipantsResult, ParticipantsResult,
ProcessSubjectsResult,
ProcessTracksResult, ProcessTracksResult,
RecapResult,
RecordingResult, RecordingResult,
SummaryResult, SubjectsResult,
SubjectSummaryResult,
TitleResult, TitleResult,
TopicChunkResult,
TopicsResult, TopicsResult,
TranscribeTrackResult,
WaveformResult, WaveformResult,
WebhookResult, WebhookResult,
ZulipResult, ZulipResult,
) )
from reflector.hatchet.workflows.subject_processing import (
SubjectInput,
subject_workflow,
)
from reflector.hatchet.workflows.topic_chunk_processing import (
TopicChunkInput,
topic_chunk_workflow,
)
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
from reflector.logger import logger from reflector.logger import logger
from reflector.pipelines import topic_processing from reflector.pipelines import topic_processing
from reflector.processors import AudioFileWriterProcessor from reflector.processors import AudioFileWriterProcessor
from reflector.processors.types import ( from reflector.processors.types import TitleSummary, Word
TitleSummary, from reflector.processors.types import Transcript as TranscriptType
TitleSummaryWithId,
Word,
)
from reflector.processors.types import (
Transcript as TranscriptType,
)
from reflector.settings import settings from reflector.settings import settings
from reflector.storage.storage_aws import AwsStorage from reflector.storage.storage_aws import AwsStorage
from reflector.utils.audio_constants import ( from reflector.utils.audio_constants import (
@@ -71,6 +81,7 @@ from reflector.utils.daily import (
parse_daily_recording_filename, parse_daily_recording_filename,
) )
from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty
from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT
from reflector.zulip import post_transcript_notification from reflector.zulip import post_transcript_notification
@@ -274,7 +285,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
track_keys = [t["s3_key"] for t in input.tracks] track_keys = [t["s3_key"] for t in input.tracks]
cam_audio_keys = filter_cam_audio_tracks(track_keys) cam_audio_keys = filter_cam_audio_tracks(track_keys)
participants_list = [] participants_list: list[ParticipantInfo] = []
for idx, key in enumerate(cam_audio_keys): for idx, key in enumerate(cam_audio_keys):
try: try:
parsed = parse_daily_recording_filename(key) parsed = parse_daily_recording_filename(key)
@@ -296,11 +307,11 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
) )
await transcripts_controller.upsert_participant(transcript, participant) await transcripts_controller.upsert_participant(transcript, participant)
participants_list.append( participants_list.append(
{ ParticipantInfo(
"participant_id": participant_id, participant_id=participant_id,
"user_name": name, user_name=name,
"speaker": idx, speaker=idx,
} )
) )
ctx.log(f"get_participants complete: {len(participants_list)} participants") ctx.log(f"get_participants complete: {len(participants_list)} participants")
@@ -324,9 +335,9 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
participants_result = ctx.task_output(get_participants) participants_result = ctx.task_output(get_participants)
source_language = participants_result.source_language source_language = participants_result.source_language
child_coroutines = [ bulk_runs = [
track_workflow.aio_run( track_workflow.create_bulk_run_item(
TrackInput( input=TrackInput(
track_index=i, track_index=i,
s3_key=track["s3_key"], s3_key=track["s3_key"],
bucket_name=input.bucket_name, bucket_name=input.bucket_name,
@@ -337,35 +348,34 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
for i, track in enumerate(input.tracks) for i, track in enumerate(input.tracks)
] ]
results = await asyncio.gather(*child_coroutines) results = await track_workflow.aio_run_many(bulk_runs)
target_language = participants_result.target_language target_language = participants_result.target_language
track_words = [] track_words: list[list[Word]] = []
padded_tracks = [] padded_tracks = []
created_padded_files = set() created_padded_files = set()
for result in results: for result in results:
transcribe_result = result.get("transcribe_track", {}) transcribe_result = TranscribeTrackResult(**result["transcribe_track"])
track_words.append(transcribe_result.get("words", [])) track_words.append(transcribe_result.words)
pad_result = result.get("pad_track", {}) pad_result = PadTrackResult(**result["pad_track"])
padded_key = pad_result.get("padded_key")
bucket_name = pad_result.get("bucket_name")
# Store S3 key info (not presigned URL) - consumer tasks presign on demand # Store S3 key info (not presigned URL) - consumer tasks presign on demand
if padded_key: if pad_result.padded_key:
padded_tracks.append( padded_tracks.append(
PaddedTrackInfo(key=padded_key, bucket_name=bucket_name) PaddedTrackInfo(
key=pad_result.padded_key, bucket_name=pad_result.bucket_name
)
) )
track_index = pad_result.get("track_index") if pad_result.size > 0:
if pad_result.get("size", 0) > 0 and track_index is not None: storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{pad_result.track_index}.webm"
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{track_index}.webm"
created_padded_files.add(storage_path) created_padded_files.add(storage_path)
all_words = [word for words in track_words for word in words] all_words = [word for words in track_words for word in words]
all_words.sort(key=lambda w: w.get("start", 0)) all_words.sort(key=lambda w: w.start)
ctx.log( ctx.log(
f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks" f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks"
@@ -529,55 +539,100 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
@diarization_pipeline.task( @diarization_pipeline.task(
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=300), retries=3 parents=[mixdown_tracks], execution_timeout=timedelta(seconds=600), retries=3
) )
@with_error_handling("detect_topics") @with_error_handling("detect_topics")
async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
"""Detect topics using LLM and save to database (matches Celery on_topic callback).""" """Detect topics using parallel child workflows (one per chunk)."""
ctx.log("detect_topics: analyzing transcript for topics") ctx.log("detect_topics: analyzing transcript for topics")
track_result = ctx.task_output(process_tracks) track_result = ctx.task_output(process_tracks)
words = track_result.all_words words = track_result.all_words
target_language = track_result.target_language
if not words:
ctx.log("detect_topics: no words, returning empty topics")
return TopicsResult(topics=[])
# Deferred imports: Hatchet workers fork processes
from reflector.db.transcripts import ( # noqa: PLC0415 from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptTopic, TranscriptTopic,
transcripts_controller, transcripts_controller,
) )
word_objects = [Word(**w) for w in words] chunk_size = TOPIC_CHUNK_WORD_COUNT
transcript_type = TranscriptType(words=word_objects) chunks = []
for i in range(0, len(words), chunk_size):
chunk_words = words[i : i + chunk_size]
if not chunk_words:
continue
empty_pipeline = topic_processing.EmptyPipeline(logger=logger) first_word = chunk_words[0]
last_word = chunk_words[-1]
timestamp = first_word.start
duration = last_word.end - timestamp
chunk_text = " ".join(w.text for w in chunk_words)
chunks.append(
{
"index": len(chunks),
"text": chunk_text,
"timestamp": timestamp,
"duration": duration,
"words": chunk_words,
}
)
if not chunks:
ctx.log("detect_topics: no chunks generated, returning empty topics")
return TopicsResult(topics=[])
ctx.log(f"detect_topics: spawning {len(chunks)} topic chunk workflows in parallel")
bulk_runs = [
topic_chunk_workflow.create_bulk_run_item(
input=TopicChunkInput(
chunk_index=chunk["index"],
chunk_text=chunk["text"],
timestamp=chunk["timestamp"],
duration=chunk["duration"],
words=chunk["words"],
)
)
for chunk in chunks
]
results = await topic_chunk_workflow.aio_run_many(bulk_runs)
topic_chunks = [
TopicChunkResult(**result["detect_chunk_topic"]) for result in results
]
async with fresh_db_connection(): async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id) transcript = await transcripts_controller.get_by_id(input.transcript_id)
async def on_topic_callback(data): for chunk in topic_chunks:
topic = TranscriptTopic( topic = TranscriptTopic(
title=data.title, title=chunk.title,
summary=data.summary, summary=chunk.summary,
timestamp=data.timestamp, timestamp=chunk.timestamp,
transcript=data.transcript.text, transcript=" ".join(w.text for w in chunk.words),
words=data.transcript.words, words=[w.model_dump() for w in chunk.words],
) )
if isinstance(
data, TitleSummaryWithId
): # Celery parity: main_live_pipeline.py
topic.id = data.id
await transcripts_controller.upsert_topic(transcript, topic) await transcripts_controller.upsert_topic(transcript, topic)
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, transcript, "TOPIC", topic, logger=logger input.transcript_id, transcript, "TOPIC", topic, logger=logger
) )
topics = await topic_processing.detect_topics( topics_list = [
transcript_type, TitleSummary(
target_language, title=chunk.title,
on_topic_callback=on_topic_callback, summary=chunk.summary,
empty_pipeline=empty_pipeline, timestamp=chunk.timestamp,
duration=chunk.duration,
transcript=TranscriptType(words=chunk.words),
) )
for chunk in topic_chunks
topics_list = [t.model_dump() for t in topics] ]
ctx.log(f"detect_topics complete: found {len(topics_list)} topics") ctx.log(f"detect_topics complete: found {len(topics_list)} topics")
@@ -601,8 +656,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
transcripts_controller, transcripts_controller,
) )
topic_objects = [TitleSummary(**t) for t in topics] ctx.log(f"generate_title: received {len(topics)} TitleSummary objects")
ctx.log(f"generate_title: created {len(topic_objects)} TitleSummary objects")
empty_pipeline = topic_processing.EmptyPipeline(logger=logger) empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
title_result = None title_result = None
@@ -634,7 +688,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
ctx.log("generate_title: calling topic_processing.generate_title (LLM call)...") ctx.log("generate_title: calling topic_processing.generate_title (LLM call)...")
await topic_processing.generate_title( await topic_processing.generate_title(
topic_objects, topics,
on_title_callback=on_title_callback, on_title_callback=on_title_callback,
empty_pipeline=empty_pipeline, empty_pipeline=empty_pipeline,
logger=logger, logger=logger,
@@ -647,97 +701,269 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
@diarization_pipeline.task( @diarization_pipeline.task(
parents=[detect_topics], execution_timeout=timedelta(seconds=600), retries=3 parents=[detect_topics], execution_timeout=timedelta(seconds=120), retries=3
) )
@with_error_handling("generate_summary") @with_error_handling("extract_subjects")
async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult: async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult:
"""Generate meeting summary using LLM and save to database (matches Celery callbacks).""" """Extract main subjects/topics from transcript for parallel processing."""
ctx.log(f"generate_summary: starting for transcript_id={input.transcript_id}") ctx.log(f"extract_subjects: starting for transcript_id={input.transcript_id}")
topics_result = ctx.task_output(detect_topics) topics_result = ctx.task_output(detect_topics)
topics = topics_result.topics topics = topics_result.topics
ctx.log(f"generate_summary: received {len(topics)} topics from detect_topics")
if not topics:
ctx.log("extract_subjects: no topics, returning empty subjects")
return SubjectsResult(
subjects=[],
transcript_text="",
participant_names=[],
participant_name_to_id={},
)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.summary.summary_builder import ( # noqa: PLC0415
SummaryBuilder,
)
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
# Build transcript text from topics (same logic as TranscriptFinalSummaryProcessor)
speakermap = {}
if transcript and transcript.participants:
speakermap = {
p.speaker: p.name
for p in transcript.participants
if p.speaker is not None and p.name
}
text_lines = []
for topic in topics:
for segment in topic.transcript.as_segments():
name = speakermap.get(segment.speaker, f"Speaker {segment.speaker}")
text_lines.append(f"{name}: {segment.text}")
transcript_text = "\n".join(text_lines)
participant_names = []
participant_name_to_id = {}
if transcript and transcript.participants:
participant_names = [p.name for p in transcript.participants if p.name]
participant_name_to_id = {
p.name: p.id for p in transcript.participants if p.name and p.id
}
# TODO: refactor SummaryBuilder methods into standalone functions
llm = LLM(settings=settings)
builder = SummaryBuilder(llm, logger=logger)
builder.set_transcript(transcript_text)
if participant_names:
builder.set_known_participants(
participant_names, participant_name_to_id=participant_name_to_id
)
ctx.log("extract_subjects: calling LLM to extract subjects")
await builder.extract_subjects()
ctx.log(f"extract_subjects complete: {len(builder.subjects)} subjects")
return SubjectsResult(
subjects=builder.subjects,
transcript_text=transcript_text,
participant_names=participant_names,
participant_name_to_id=participant_name_to_id,
)
@diarization_pipeline.task(
parents=[extract_subjects], execution_timeout=timedelta(seconds=600), retries=3
)
@with_error_handling("process_subjects")
async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubjectsResult:
"""Spawn child workflows for each subject (dynamic fan-out, parallel LLM calls)."""
subjects_result = ctx.task_output(extract_subjects)
subjects = subjects_result.subjects
if not subjects:
ctx.log("process_subjects: no subjects to process")
return ProcessSubjectsResult(subject_summaries=[])
ctx.log(f"process_subjects: spawning {len(subjects)} subject workflows in parallel")
bulk_runs = [
subject_workflow.create_bulk_run_item(
input=SubjectInput(
subject=subject,
subject_index=i,
transcript_text=subjects_result.transcript_text,
participant_names=subjects_result.participant_names,
participant_name_to_id=subjects_result.participant_name_to_id,
)
)
for i, subject in enumerate(subjects)
]
results = await subject_workflow.aio_run_many(bulk_runs)
subject_summaries = [
SubjectSummaryResult(**result["generate_detailed_summary"])
for result in results
]
ctx.log(f"process_subjects complete: {len(subject_summaries)} summaries")
return ProcessSubjectsResult(subject_summaries=subject_summaries)
@diarization_pipeline.task(
parents=[process_subjects], execution_timeout=timedelta(seconds=120), retries=3
)
@with_error_handling("generate_recap")
async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
"""Generate recap and long summary from subject summaries, save to database."""
ctx.log(f"generate_recap: starting for transcript_id={input.transcript_id}")
subjects_result = ctx.task_output(extract_subjects)
process_result = ctx.task_output(process_subjects)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
from reflector.db.transcripts import ( # noqa: PLC0415 from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptActionItems,
TranscriptFinalLongSummary, TranscriptFinalLongSummary,
TranscriptFinalShortSummary, TranscriptFinalShortSummary,
transcripts_controller, transcripts_controller,
) )
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.summary.prompts import ( # noqa: PLC0415
RECAP_PROMPT,
build_participant_instructions,
build_summary_markdown,
)
topic_objects = [TitleSummary(**t) for t in topics] subject_summaries = process_result.subject_summaries
ctx.log(f"generate_summary: created {len(topic_objects)} TitleSummary objects")
empty_pipeline = topic_processing.EmptyPipeline(logger=logger) if not subject_summaries:
summary_result = None ctx.log("generate_recap: no subject summaries, returning empty")
short_summary_result = None return RecapResult(short_summary="", long_summary="")
action_items_result = None
summaries = [
{"subject": s.subject, "summary": s.paragraph_summary}
for s in subject_summaries
]
summaries_text = "\n\n".join([f"{s['subject']}: {s['summary']}" for s in summaries])
llm = LLM(settings=settings)
participant_instructions = build_participant_instructions(
subjects_result.participant_names
)
recap_prompt = RECAP_PROMPT
if participant_instructions:
recap_prompt = f"{recap_prompt}\n\n{participant_instructions}"
ctx.log("generate_recap: calling LLM for recap")
recap_response = await llm.get_response(
recap_prompt,
[summaries_text],
tone_name="Recap summarizer",
)
short_summary = str(recap_response)
long_summary = build_summary_markdown(short_summary, summaries)
async with fresh_db_connection(): async with fresh_db_connection():
ctx.log("generate_summary: DB connection established")
transcript = await transcripts_controller.get_by_id(input.transcript_id) transcript = await transcripts_controller.get_by_id(input.transcript_id)
ctx.log( if transcript:
f"generate_summary: fetched transcript, exists={transcript is not None}"
)
async def on_long_summary_callback(data):
nonlocal summary_result
ctx.log(
f"generate_summary: on_long_summary_callback received ({len(data.long_summary)} chars)"
)
summary_result = data.long_summary
final_long_summary = TranscriptFinalLongSummary(
long_summary=data.long_summary
)
await transcripts_controller.update( await transcripts_controller.update(
transcript, transcript,
{"long_summary": final_long_summary.long_summary}, {
"short_summary": short_summary,
"long_summary": long_summary,
},
) )
ctx.log("generate_summary: saved long_summary to DB")
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_LONG_SUMMARY",
final_long_summary,
logger=logger,
)
ctx.log("generate_summary: broadcasted FINAL_LONG_SUMMARY event")
async def on_short_summary_callback(data): final_short = TranscriptFinalShortSummary(short_summary=short_summary)
nonlocal short_summary_result
ctx.log(
f"generate_summary: on_short_summary_callback received ({len(data.short_summary)} chars)"
)
short_summary_result = data.short_summary
final_short_summary = TranscriptFinalShortSummary(
short_summary=data.short_summary
)
await transcripts_controller.update(
transcript,
{"short_summary": final_short_summary.short_summary},
)
ctx.log("generate_summary: saved short_summary to DB")
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, input.transcript_id,
transcript, transcript,
"FINAL_SHORT_SUMMARY", "FINAL_SHORT_SUMMARY",
final_short_summary, final_short,
logger=logger, logger=logger,
) )
ctx.log("generate_summary: broadcasted FINAL_SHORT_SUMMARY event")
async def on_action_items_callback(data): final_long = TranscriptFinalLongSummary(long_summary=long_summary)
nonlocal action_items_result await append_event_and_broadcast(
ctx.log( input.transcript_id,
f"generate_summary: on_action_items_callback received ({len(data.action_items)} items)"
)
action_items_result = data.action_items
action_items = TranscriptActionItems(action_items=data.action_items)
await transcripts_controller.update(
transcript, transcript,
{"action_items": action_items.action_items}, "FINAL_LONG_SUMMARY",
final_long,
logger=logger,
)
ctx.log("generate_recap complete")
return RecapResult(short_summary=short_summary, long_summary=long_summary)
@diarization_pipeline.task(
parents=[extract_subjects], execution_timeout=timedelta(seconds=180), retries=3
)
@with_error_handling("identify_action_items")
async def identify_action_items(
input: PipelineInput, ctx: Context
) -> ActionItemsResult:
"""Identify action items from transcript (parallel with subject processing)."""
ctx.log(f"identify_action_items: starting for transcript_id={input.transcript_id}")
subjects_result = ctx.task_output(extract_subjects)
if not subjects_result.transcript_text:
ctx.log("identify_action_items: no transcript text, returning empty")
return ActionItemsResult(action_items={"decisions": [], "next_steps": []})
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptActionItems,
transcripts_controller,
)
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.summary.summary_builder import ( # noqa: PLC0415
SummaryBuilder,
)
# TODO: refactor SummaryBuilder methods into standalone functions
llm = LLM(settings=settings)
builder = SummaryBuilder(llm, logger=logger)
builder.set_transcript(subjects_result.transcript_text)
if subjects_result.participant_names:
builder.set_known_participants(
subjects_result.participant_names,
participant_name_to_id=subjects_result.participant_name_to_id,
)
ctx.log("identify_action_items: calling LLM")
action_items_response = await builder.identify_action_items()
if action_items_response is None:
raise RuntimeError("Failed to identify action items - LLM call failed")
action_items_dict = action_items_response.model_dump()
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
action_items = TranscriptActionItems(action_items=action_items_dict)
await transcripts_controller.update(
transcript, {"action_items": action_items.action_items}
) )
ctx.log("generate_summary: saved action_items to DB")
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, input.transcript_id,
transcript, transcript,
@@ -745,33 +971,17 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
action_items, action_items,
logger=logger, logger=logger,
) )
ctx.log("generate_summary: broadcasted ACTION_ITEMS event")
ctx.log( ctx.log(
"generate_summary: calling topic_processing.generate_summaries (LLM calls)..." f"identify_action_items complete: {len(action_items_dict.get('decisions', []))} decisions, "
) f"{len(action_items_dict.get('next_steps', []))} next steps"
await topic_processing.generate_summaries(
topic_objects,
transcript,
on_long_summary_callback=on_long_summary_callback,
on_short_summary_callback=on_short_summary_callback,
on_action_items_callback=on_action_items_callback,
empty_pipeline=empty_pipeline,
logger=logger,
)
ctx.log("generate_summary: topic_processing.generate_summaries returned")
ctx.log("generate_summary complete")
return SummaryResult(
summary=summary_result,
short_summary=short_summary_result,
action_items=action_items_result,
) )
return ActionItemsResult(action_items=action_items_dict)
@diarization_pipeline.task( @diarization_pipeline.task(
parents=[generate_waveform, generate_title, generate_summary], parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=60), execution_timeout=timedelta(seconds=60),
retries=3, retries=3,
) )
@@ -818,8 +1028,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
if transcript is None: if transcript is None:
raise ValueError(f"Transcript {input.transcript_id} not found in database") raise ValueError(f"Transcript {input.transcript_id} not found in database")
word_objects = [Word(**w) for w in all_words] merged_transcript = TranscriptType(words=all_words, translation=None)
merged_transcript = TranscriptType(words=word_objects, translation=None)
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, input.transcript_id,

View File

@@ -5,13 +5,20 @@ Provides static typing for all task outputs, enabling type checking
and better IDE support. and better IDE support.
""" """
from typing import Any
from pydantic import BaseModel from pydantic import BaseModel
from reflector.processors.types import TitleSummary, Word
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
class ParticipantInfo(BaseModel):
"""Participant info with speaker index for workflow result."""
participant_id: NonEmptyString
user_name: NonEmptyString
speaker: int
class PadTrackResult(BaseModel): class PadTrackResult(BaseModel):
"""Result from pad_track task.""" """Result from pad_track task."""
@@ -26,7 +33,7 @@ class PadTrackResult(BaseModel):
class TranscribeTrackResult(BaseModel): class TranscribeTrackResult(BaseModel):
"""Result from transcribe_track task.""" """Result from transcribe_track task."""
words: list[dict[str, Any]] words: list[Word]
track_index: int track_index: int
@@ -41,7 +48,7 @@ class RecordingResult(BaseModel):
class ParticipantsResult(BaseModel): class ParticipantsResult(BaseModel):
"""Result from get_participants task.""" """Result from get_participants task."""
participants: list[dict[str, Any]] participants: list[ParticipantInfo]
num_tracks: int num_tracks: int
source_language: NonEmptyString source_language: NonEmptyString
target_language: NonEmptyString target_language: NonEmptyString
@@ -57,7 +64,7 @@ class PaddedTrackInfo(BaseModel):
class ProcessTracksResult(BaseModel): class ProcessTracksResult(BaseModel):
"""Result from process_tracks task.""" """Result from process_tracks task."""
all_words: list[dict[str, Any]] all_words: list[Word]
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
word_count: int word_count: int
num_tracks: int num_tracks: int
@@ -79,10 +86,21 @@ class WaveformResult(BaseModel):
waveform_generated: bool waveform_generated: bool
class TopicChunkResult(BaseModel):
"""Result from topic chunk child workflow."""
chunk_index: int
title: str
summary: str
timestamp: float
duration: float
words: list[Word]
class TopicsResult(BaseModel): class TopicsResult(BaseModel):
"""Result from detect_topics task.""" """Result from detect_topics task."""
topics: list[dict[str, Any]] topics: list[TitleSummary]
class TitleResult(BaseModel): class TitleResult(BaseModel):
@@ -91,12 +109,41 @@ class TitleResult(BaseModel):
title: str | None title: str | None
class SummaryResult(BaseModel): class SubjectsResult(BaseModel):
"""Result from generate_summary task.""" """Result from extract_subjects task."""
summary: str | None subjects: list[str]
short_summary: str | None transcript_text: str # Formatted transcript for LLM consumption
action_items: dict | None = None participant_names: list[str]
participant_name_to_id: dict[str, str]
class SubjectSummaryResult(BaseModel):
"""Result from subject summary child workflow."""
subject: str
subject_index: int
detailed_summary: str
paragraph_summary: str
class ProcessSubjectsResult(BaseModel):
"""Result from process_subjects fan-out task."""
subject_summaries: list[SubjectSummaryResult]
class RecapResult(BaseModel):
"""Result from generate_recap task."""
short_summary: str # Recap paragraph
long_summary: str # Full markdown summary
class ActionItemsResult(BaseModel):
"""Result from identify_action_items task."""
action_items: dict # ActionItemsResponse as dict (may have empty lists)
class FinalizeResult(BaseModel): class FinalizeResult(BaseModel):

View File

@@ -0,0 +1,101 @@
"""
Hatchet child workflow: SubjectProcessing
Handles individual subject/topic summary generation.
Spawned dynamically by the main diarization pipeline for each extracted subject
via aio_run_many() for parallel processing.
"""
from datetime import timedelta
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.models import SubjectSummaryResult
from reflector.logger import logger
from reflector.processors.summary.prompts import (
DETAILED_SUBJECT_PROMPT_TEMPLATE,
PARAGRAPH_SUMMARY_PROMPT,
build_participant_instructions,
)
class SubjectInput(BaseModel):
"""Input for individual subject processing."""
subject: str
subject_index: int
transcript_text: str
participant_names: list[str]
participant_name_to_id: dict[str, str]
hatchet = HatchetClientManager.get_client()
subject_workflow = hatchet.workflow(
name="SubjectProcessing", input_validator=SubjectInput
)
@subject_workflow.task(execution_timeout=timedelta(seconds=120), retries=3)
async def generate_detailed_summary(
input: SubjectInput, ctx: Context
) -> SubjectSummaryResult:
"""Generate detailed analysis for a single subject, then condense to paragraph."""
ctx.log(
f"generate_detailed_summary: subject '{input.subject}' (index {input.subject_index})"
)
logger.info(
"[Hatchet] generate_detailed_summary",
subject=input.subject,
subject_index=input.subject_index,
)
# Deferred imports: Hatchet workers fork processes, fresh imports ensure
# LLM HTTP connection pools aren't shared across forks
from reflector.llm import LLM # noqa: PLC0415
from reflector.settings import settings # noqa: PLC0415
llm = LLM(settings=settings)
participant_instructions = build_participant_instructions(input.participant_names)
detailed_prompt = DETAILED_SUBJECT_PROMPT_TEMPLATE.format(subject=input.subject)
if participant_instructions:
detailed_prompt = f"{detailed_prompt}\n\n{participant_instructions}"
ctx.log("generate_detailed_summary: calling LLM for detailed analysis")
detailed_response = await llm.get_response(
detailed_prompt,
[input.transcript_text],
tone_name="Topic assistant",
)
detailed_summary = str(detailed_response)
paragraph_prompt = PARAGRAPH_SUMMARY_PROMPT
if participant_instructions:
paragraph_prompt = f"{paragraph_prompt}\n\n{participant_instructions}"
ctx.log("generate_detailed_summary: calling LLM for paragraph summary")
paragraph_response = await llm.get_response(
paragraph_prompt,
[detailed_summary],
tone_name="Topic summarizer",
)
paragraph_summary = str(paragraph_response)
ctx.log(f"generate_detailed_summary complete: subject '{input.subject}'")
logger.info(
"[Hatchet] generate_detailed_summary complete",
subject=input.subject,
subject_index=input.subject_index,
detailed_len=len(detailed_summary),
paragraph_len=len(paragraph_summary),
)
return SubjectSummaryResult(
subject=input.subject,
subject_index=input.subject_index,
detailed_summary=detailed_summary,
paragraph_summary=paragraph_summary,
)

View File

@@ -0,0 +1,85 @@
"""
Hatchet child workflow: TopicChunkProcessing
Handles topic detection for individual transcript chunks.
Spawned dynamically by detect_topics via aio_run_many() for parallel processing.
"""
from datetime import timedelta
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.models import TopicChunkResult
from reflector.logger import logger
from reflector.processors.prompts import TOPIC_PROMPT
from reflector.processors.types import Word
class TopicChunkInput(BaseModel):
"""Input for individual topic chunk processing."""
chunk_index: int
chunk_text: str
timestamp: float
duration: float
words: list[Word]
hatchet = HatchetClientManager.get_client()
topic_chunk_workflow = hatchet.workflow(
name="TopicChunkProcessing", input_validator=TopicChunkInput
)
@topic_chunk_workflow.task(execution_timeout=timedelta(seconds=60), retries=3)
async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunkResult:
"""Detect topic for a single transcript chunk."""
ctx.log(f"detect_chunk_topic: chunk {input.chunk_index}")
logger.info(
"[Hatchet] detect_chunk_topic",
chunk_index=input.chunk_index,
text_length=len(input.chunk_text),
)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing LLM HTTP connection pools across forks
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.transcript_topic_detector import ( # noqa: PLC0415
TopicResponse,
)
from reflector.settings import settings # noqa: PLC0415
from reflector.utils.text import clean_title # noqa: PLC0415
llm = LLM(settings=settings, temperature=0.9, max_tokens=500)
prompt = TOPIC_PROMPT.format(text=input.chunk_text)
response = await llm.get_structured_response(
prompt,
[input.chunk_text],
TopicResponse,
tone_name="Topic analyzer",
timeout=settings.LLM_STRUCTURED_RESPONSE_TIMEOUT,
)
title = clean_title(response.title)
ctx.log(
f"detect_chunk_topic complete: chunk {input.chunk_index}, title='{title[:50]}'"
)
logger.info(
"[Hatchet] detect_chunk_topic complete",
chunk_index=input.chunk_index,
title=title[:50],
)
return TopicChunkResult(
chunk_index=input.chunk_index,
title=title,
summary=response.summary,
timestamp=input.timestamp,
duration=input.duration,
words=input.words,
)

View File

@@ -197,23 +197,20 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
transcript = await transcribe_file_with_processor(audio_url, input.language) transcript = await transcribe_file_with_processor(audio_url, input.language)
# Tag all words with speaker index # Tag all words with speaker index
words = []
for word in transcript.words: for word in transcript.words:
word_dict = word.model_dump() word.speaker = input.track_index
word_dict["speaker"] = input.track_index
words.append(word_dict)
ctx.log( ctx.log(
f"transcribe_track complete: track {input.track_index}, {len(words)} words" f"transcribe_track complete: track {input.track_index}, {len(transcript.words)} words"
) )
logger.info( logger.info(
"[Hatchet] transcribe_track complete", "[Hatchet] transcribe_track complete",
track_index=input.track_index, track_index=input.track_index,
word_count=len(words), word_count=len(transcript.words),
) )
return TranscribeTrackResult( return TranscribeTrackResult(
words=words, words=transcript.words,
track_index=input.track_index, track_index=input.track_index,
) )

View File

@@ -18,6 +18,7 @@ from reflector.processors import (
) )
from reflector.processors.types import TitleSummary from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType from reflector.processors.types import Transcript as TranscriptType
from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT
class EmptyPipeline: class EmptyPipeline:
@@ -38,7 +39,7 @@ async def detect_topics(
on_topic_callback: Callable, on_topic_callback: Callable,
empty_pipeline: EmptyPipeline, empty_pipeline: EmptyPipeline,
) -> list[TitleSummary]: ) -> list[TitleSummary]:
chunk_size = 300 chunk_size = TOPIC_CHUNK_WORD_COUNT
topics: list[TitleSummary] = [] topics: list[TitleSummary] = []
async def on_topic(topic: TitleSummary): async def on_topic(topic: TitleSummary):

View File

@@ -0,0 +1,30 @@
"""
LLM prompts for transcript processing.
Extracted to a separate module to avoid circular imports when importing
from processor modules (which import LLM/settings at module level).
"""
from textwrap import dedent
TOPIC_PROMPT = dedent(
"""
Analyze the following transcript segment and extract the main topic being discussed.
Focus on the substantive content and ignore small talk or administrative chatter.
Create a title that:
- Captures the specific subject matter being discussed
- Is descriptive and self-explanatory
- Uses professional language
- Is specific rather than generic
For the summary:
- Summarize the key points in maximum two sentences
- Focus on what was discussed, decided, or accomplished
- Be concise but informative
<transcript>
{text}
</transcript>
"""
).strip()

View File

@@ -0,0 +1,91 @@
"""
LLM prompts for summary generation.
Extracted to a separate module to avoid circular imports when importing
from summary_builder.py (which imports LLM/settings at module level).
"""
from textwrap import dedent
def build_participant_instructions(participant_names: list[str]) -> str:
"""Build participant context instructions for LLM prompts."""
if not participant_names:
return ""
participants_list = ", ".join(participant_names)
return dedent(
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
"""
Get me information about the topic "{subject}"
# RESPONSE GUIDELINES
Follow this structured approach to create the topic summary:
- Highlight important arguments, insights, or data presented.
- Outline decisions made.
- Indicate any decisions reached, including any rationale or key factors
that influenced these decisions.
- Detail action items and responsibilities.
- For each decision or unresolved issue, list specific action items agreed
upon, along with assigned individuals or teams responsible for each task.
- Specify deadlines or timelines if mentioned. For each action item,
include any deadlines or timeframes discussed for completion or follow-up.
- Mention unresolved issues or topics needing further discussion, aiding in
planning future meetings or follow-up actions.
- Do not include topic unrelated to {subject}.
# OUTPUT
Your summary should be clear, concise, and structured, covering all major
points, decisions, and action items from the meeting. It should be easy to
understand for someone not present, providing a comprehensive understanding
of what transpired and what needs to be done next. The summary should not
exceed one page to ensure brevity and focus.
"""
).strip()
PARAGRAPH_SUMMARY_PROMPT = dedent(
"""
Summarize the mentioned topic in 1 paragraph.
It will be integrated into the final summary, so just for this topic.
"""
).strip()
RECAP_PROMPT = dedent(
"""
Provide a high-level quick recap of the following meeting, fitting in one paragraph.
Do not include decisions, action items or unresolved issue, just highlight the high moments.
Just dive into the meeting, be concise and do not include unnecessary details.
As we already know it is a meeting, do not start with 'During the meeting' or equivalent.
"""
).strip()
def build_summary_markdown(recap: str, summaries: list[dict[str, str]]) -> str:
"""Build markdown summary from recap and subject summaries."""
lines: list[str] = []
if recap:
lines.append("# Quick recap")
lines.append("")
lines.append(recap)
lines.append("")
if summaries:
lines.append("# Summary")
lines.append("")
for summary in summaries:
lines.append(f"**{summary['subject']}**")
lines.append(summary["summary"])
lines.append("")
return "\n".join(lines)

View File

@@ -15,6 +15,13 @@ import structlog
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from reflector.llm import LLM from reflector.llm import LLM
from reflector.processors.summary.prompts import (
DETAILED_SUBJECT_PROMPT_TEMPLATE,
PARAGRAPH_SUMMARY_PROMPT,
RECAP_PROMPT,
build_participant_instructions,
build_summary_markdown,
)
from reflector.settings import settings from reflector.settings import settings
T = TypeVar("T", bound=BaseModel) T = TypeVar("T", bound=BaseModel)
@@ -52,50 +59,6 @@ SUBJECTS_PROMPT = dedent(
""" """
).strip() ).strip()
DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
"""
Get me information about the topic "{subject}"
# RESPONSE GUIDELINES
Follow this structured approach to create the topic summary:
- Highlight important arguments, insights, or data presented.
- Outline decisions made.
- Indicate any decisions reached, including any rationale or key factors
that influenced these decisions.
- Detail action items and responsibilities.
- For each decision or unresolved issue, list specific action items agreed
upon, along with assigned individuals or teams responsible for each task.
- Specify deadlines or timelines if mentioned. For each action item,
include any deadlines or timeframes discussed for completion or follow-up.
- Mention unresolved issues or topics needing further discussion, aiding in
planning future meetings or follow-up actions.
- Do not include topic unrelated to {subject}.
# OUTPUT
Your summary should be clear, concise, and structured, covering all major
points, decisions, and action items from the meeting. It should be easy to
understand for someone not present, providing a comprehensive understanding
of what transpired and what needs to be done next. The summary should not
exceed one page to ensure brevity and focus.
"""
).strip()
PARAGRAPH_SUMMARY_PROMPT = dedent(
"""
Summarize the mentioned topic in 1 paragraph.
It will be integrated into the final summary, so just for this topic.
"""
).strip()
RECAP_PROMPT = dedent(
"""
Provide a high-level quick recap of the following meeting, fitting in one paragraph.
Do not include decisions, action items or unresolved issue, just highlight the high moments.
Just dive into the meeting, be concise and do not include unnecessary details.
As we already know it is a meeting, do not start with 'During the meeting' or equivalent.
"""
).strip()
ACTION_ITEMS_PROMPT = dedent( ACTION_ITEMS_PROMPT = dedent(
""" """
Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next. Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next.
@@ -331,17 +294,7 @@ class SummaryBuilder:
participants_md = self.format_list_md(participants) participants_md = self.format_list_md(participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}" self.transcript += f"\n\n# Participants\n\n{participants_md}"
participants_list = ", ".join(participants) self.participant_instructions = build_participant_instructions(participants)
self.participant_instructions = dedent(
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
async def identify_participants(self) -> None: async def identify_participants(self) -> None:
""" """
@@ -377,18 +330,9 @@ class SummaryBuilder:
participants_md = self.format_list_md(unique_participants) participants_md = self.format_list_md(unique_participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}" self.transcript += f"\n\n# Participants\n\n{participants_md}"
# Set instructions that will be automatically added to all prompts self.participant_instructions = build_participant_instructions(
participants_list = ", ".join(unique_participants) unique_participants
self.participant_instructions = dedent( )
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
else: else:
self.logger.warning("No participants identified in the transcript") self.logger.warning("No participants identified in the transcript")
@@ -613,22 +557,7 @@ class SummaryBuilder:
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
def as_markdown(self) -> str: def as_markdown(self) -> str:
lines: list[str] = [] return build_summary_markdown(self.recap, self.summaries)
if self.recap:
lines.append("# Quick recap")
lines.append("")
lines.append(self.recap)
lines.append("")
if self.summaries:
lines.append("# Summary")
lines.append("")
for summary in self.summaries:
lines.append(f"**{summary['subject']}**")
lines.append(summary["summary"])
lines.append("")
return "\n".join(lines)
def format_list_md(self, data: list[str]) -> str: def format_list_md(self, data: list[str]) -> str:
return "\n".join([f"- {item}" for item in data]) return "\n".join([f"- {item}" for item in data])

View File

@@ -1,35 +1,12 @@
from textwrap import dedent
from pydantic import AliasChoices, BaseModel, Field from pydantic import AliasChoices, BaseModel, Field
from reflector.llm import LLM from reflector.llm import LLM
from reflector.processors.base import Processor from reflector.processors.base import Processor
from reflector.processors.prompts import TOPIC_PROMPT
from reflector.processors.types import TitleSummary, Transcript from reflector.processors.types import TitleSummary, Transcript
from reflector.settings import settings from reflector.settings import settings
from reflector.utils.text import clean_title from reflector.utils.text import clean_title
TOPIC_PROMPT = dedent(
"""
Analyze the following transcript segment and extract the main topic being discussed.
Focus on the substantive content and ignore small talk or administrative chatter.
Create a title that:
- Captures the specific subject matter being discussed
- Is descriptive and self-explanatory
- Uses professional language
- Is specific rather than generic
For the summary:
- Summarize the key points in maximum two sentences
- Focus on what was discussed, decided, or accomplished
- Be concise but informative
<transcript>
{text}
</transcript>
"""
).strip()
class TopicResponse(BaseModel): class TopicResponse(BaseModel):
"""Structured response for topic detection""" """Structured response for topic detection"""

View File

@@ -102,7 +102,8 @@ async def validate_transcript_for_processing(
if transcript.locked: if transcript.locked:
return ValidationLocked(detail="Recording is locked") return ValidationLocked(detail="Recording is locked")
if transcript.status == "idle": # hatchet is idempotent anyways + if it wasn't dispatched successfully
if transcript.status == "idle" and not settings.HATCHET_ENABLED:
return ValidationNotReady(detail="Recording is not ready for processing") return ValidationNotReady(detail="Recording is not ready for processing")
# Check Celery tasks # Check Celery tasks
@@ -115,13 +116,11 @@ async def validate_transcript_for_processing(
): ):
return ValidationAlreadyScheduled(detail="already running") return ValidationAlreadyScheduled(detail="already running")
# Check Hatchet workflows (if enabled)
if settings.HATCHET_ENABLED and transcript.workflow_run_id: if settings.HATCHET_ENABLED and transcript.workflow_run_id:
try: try:
status = await HatchetClientManager.get_workflow_run_status( status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id transcript.workflow_run_id
) )
# If workflow is running or queued, don't allow new processing
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED): if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
return ValidationAlreadyScheduled( return ValidationAlreadyScheduled(
detail="Hatchet workflow already running" detail="Hatchet workflow already running"

View File

@@ -0,0 +1,8 @@
"""
Shared transcript processing constants.
Used by both Hatchet workflows and Celery pipelines for consistent processing.
"""
# Topic detection: number of words per chunk for topic extraction
TOPIC_CHUNK_WORD_COUNT = 300