Compare commits

...

4 Commits

Author SHA1 Message Date
Igor Loskutov
10c17d7086 send_webhook fixes 2025-12-23 16:56:43 -05:00
Igor Loskutov
7a1d1dc08d pipeline fixes: whereby Hatchet preparation 2025-12-23 13:47:04 -05:00
8598707c1c chore(main): release 0.26.0 (#805) 2025-12-23 11:05:43 -05:00
Igor Monadical
594bcc09e0 feat: parallelize hatchet (#804)
* parallelize hatchet (no-mistakes)

* dry (no-mistakes) (minimal)

* comments

* self-review

* self-review

* self-review

* self-review

* pr comments

* pr comments

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 11:03:36 -05:00
21 changed files with 1278 additions and 505 deletions

View File

@@ -1,5 +1,12 @@
# Changelog # Changelog
## [0.26.0](https://github.com/Monadical-SAS/reflector/compare/v0.25.0...v0.26.0) (2025-12-23)
### Features
* parallelize hatchet ([#804](https://github.com/Monadical-SAS/reflector/issues/804)) ([594bcc0](https://github.com/Monadical-SAS/reflector/commit/594bcc09e0ca744163de2f1525ebbf7c52a68448))
## [0.25.0](https://github.com/Monadical-SAS/reflector/compare/v0.24.0...v0.25.0) (2025-12-22) ## [0.25.0](https://github.com/Monadical-SAS/reflector/compare/v0.24.0...v0.25.0) (2025-12-22)

View File

@@ -0,0 +1,16 @@
"""
Hatchet workflow constants.
"""
# Rate limit key for LLM API calls (shared across all LLM-calling tasks)
LLM_RATE_LIMIT_KEY = "llm"
# Max LLM calls per second across all tasks
LLM_RATE_LIMIT_PER_SECOND = 10
# Task execution timeouts (seconds)
TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates
TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation
TIMEOUT_LONG = 180 # Action items (larger context LLM)
TIMEOUT_AUDIO = 300 # Audio processing: padding, mixdown
TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks

View File

@@ -12,6 +12,9 @@ Usage:
import signal import signal
import sys import sys
from hatchet_sdk.rate_limit import RateLimitDuration
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND
from reflector.logger import logger from reflector.logger import logger
from reflector.settings import settings from reflector.settings import settings
@@ -37,14 +40,25 @@ def main() -> None:
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415 from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415 from reflector.hatchet.workflows import ( # noqa: PLC0415
diarization_pipeline, diarization_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow, track_workflow,
) )
hatchet = HatchetClientManager.get_client() hatchet = HatchetClientManager.get_client()
hatchet.rate_limits.put(
LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND, RateLimitDuration.SECOND
)
worker = hatchet.worker( worker = hatchet.worker(
"reflector-diarization-worker", "reflector-pipeline-worker",
workflows=[diarization_pipeline, track_workflow], workflows=[
diarization_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,
],
) )
def shutdown_handler(signum: int, frame) -> None: def shutdown_handler(signum: int, frame) -> None:

View File

@@ -4,11 +4,23 @@ from reflector.hatchet.workflows.diarization_pipeline import (
PipelineInput, PipelineInput,
diarization_pipeline, diarization_pipeline,
) )
from reflector.hatchet.workflows.subject_processing import (
SubjectInput,
subject_workflow,
)
from reflector.hatchet.workflows.topic_chunk_processing import (
TopicChunkInput,
topic_chunk_workflow,
)
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
__all__ = [ __all__ = [
"diarization_pipeline", "diarization_pipeline",
"subject_workflow",
"topic_chunk_workflow",
"track_workflow", "track_workflow",
"PipelineInput", "PipelineInput",
"SubjectInput",
"TopicChunkInput",
"TrackInput", "TrackInput",
] ]

View File

@@ -1,9 +1,12 @@
""" """
Hatchet main workflow: DiarizationPipeline Hatchet main workflow: DailyMultitrackPipeline
Multitrack diarization pipeline for Daily.co recordings. Multitrack processing pipeline for Daily.co recordings.
Orchestrates the full processing flow from recording metadata to final transcript. Orchestrates the full processing flow from recording metadata to final transcript.
Daily.co recordings don't require ML diarization - speaker identification comes from
track index (each participant's audio is a separate track).
Note: This file uses deferred imports (inside functions/tasks) intentionally. Note: This file uses deferred imports (inside functions/tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure DB connections Hatchet workers run in forked processes; fresh imports per task ensure DB connections
are not shared across forks, avoiding connection pooling issues. are not shared across forks, avoiding connection pooling issues.
@@ -28,33 +31,50 @@ from reflector.hatchet.broadcast import (
set_status_and_broadcast, set_status_and_broadcast,
) )
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import (
TIMEOUT_AUDIO,
TIMEOUT_HEAVY,
TIMEOUT_LONG,
TIMEOUT_MEDIUM,
TIMEOUT_SHORT,
)
from reflector.hatchet.workflows.models import ( from reflector.hatchet.workflows.models import (
ActionItemsResult,
ConsentResult, ConsentResult,
FinalizeResult, FinalizeResult,
MixdownResult, MixdownResult,
PaddedTrackInfo, PaddedTrackInfo,
PadTrackResult,
ParticipantInfo,
ParticipantsResult, ParticipantsResult,
ProcessSubjectsResult,
ProcessTracksResult, ProcessTracksResult,
RecapResult,
RecordingResult, RecordingResult,
SummaryResult, SubjectsResult,
SubjectSummaryResult,
TitleResult, TitleResult,
TopicChunkResult,
TopicsResult, TopicsResult,
TranscribeTrackResult,
WaveformResult, WaveformResult,
WebhookResult, WebhookResult,
ZulipResult, ZulipResult,
) )
from reflector.hatchet.workflows.subject_processing import (
SubjectInput,
subject_workflow,
)
from reflector.hatchet.workflows.topic_chunk_processing import (
TopicChunkInput,
topic_chunk_workflow,
)
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
from reflector.logger import logger from reflector.logger import logger
from reflector.pipelines import topic_processing from reflector.pipelines import topic_processing
from reflector.processors import AudioFileWriterProcessor from reflector.processors import AudioFileWriterProcessor
from reflector.processors.types import ( from reflector.processors.types import TitleSummary, Word
TitleSummary, from reflector.processors.types import Transcript as TranscriptType
TitleSummaryWithId,
Word,
)
from reflector.processors.types import (
Transcript as TranscriptType,
)
from reflector.settings import settings from reflector.settings import settings
from reflector.storage.storage_aws import AwsStorage from reflector.storage.storage_aws import AwsStorage
from reflector.utils.audio_constants import ( from reflector.utils.audio_constants import (
@@ -71,11 +91,12 @@ from reflector.utils.daily import (
parse_daily_recording_filename, parse_daily_recording_filename,
) )
from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty
from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT
from reflector.zulip import post_transcript_notification from reflector.zulip import post_transcript_notification
class PipelineInput(BaseModel): class PipelineInput(BaseModel):
"""Input to trigger the diarization pipeline.""" """Input to trigger the Daily.co multitrack pipeline."""
recording_id: NonEmptyString recording_id: NonEmptyString
tracks: list[dict] # List of {"s3_key": str} tracks: list[dict] # List of {"s3_key": str}
@@ -86,8 +107,8 @@ class PipelineInput(BaseModel):
hatchet = HatchetClientManager.get_client() hatchet = HatchetClientManager.get_client()
diarization_pipeline = hatchet.workflow( daily_multitrack_pipeline = hatchet.workflow(
name="DiarizationPipeline", input_validator=PipelineInput name="DailyMultitrackPipeline", input_validator=PipelineInput
) )
@@ -173,7 +194,9 @@ def with_error_handling(step_name: str, set_error_status: bool = True) -> Callab
return decorator return decorator
@diarization_pipeline.task(execution_timeout=timedelta(seconds=60), retries=3) @daily_multitrack_pipeline.task(
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
)
@with_error_handling("get_recording") @with_error_handling("get_recording")
async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
"""Fetch recording metadata from Daily.co API.""" """Fetch recording metadata from Daily.co API."""
@@ -224,8 +247,10 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
) )
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[get_recording], execution_timeout=timedelta(seconds=60), retries=3 parents=[get_recording],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
) )
@with_error_handling("get_participants") @with_error_handling("get_participants")
async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult: async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult:
@@ -274,7 +299,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
track_keys = [t["s3_key"] for t in input.tracks] track_keys = [t["s3_key"] for t in input.tracks]
cam_audio_keys = filter_cam_audio_tracks(track_keys) cam_audio_keys = filter_cam_audio_tracks(track_keys)
participants_list = [] participants_list: list[ParticipantInfo] = []
for idx, key in enumerate(cam_audio_keys): for idx, key in enumerate(cam_audio_keys):
try: try:
parsed = parse_daily_recording_filename(key) parsed = parse_daily_recording_filename(key)
@@ -296,11 +321,11 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
) )
await transcripts_controller.upsert_participant(transcript, participant) await transcripts_controller.upsert_participant(transcript, participant)
participants_list.append( participants_list.append(
{ ParticipantInfo(
"participant_id": participant_id, participant_id=participant_id,
"user_name": name, user_name=name,
"speaker": idx, speaker=idx,
} )
) )
ctx.log(f"get_participants complete: {len(participants_list)} participants") ctx.log(f"get_participants complete: {len(participants_list)} participants")
@@ -313,8 +338,10 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
) )
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[get_participants], execution_timeout=timedelta(seconds=600), retries=3 parents=[get_participants],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
) )
@with_error_handling("process_tracks") @with_error_handling("process_tracks")
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult: async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
@@ -324,9 +351,9 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
participants_result = ctx.task_output(get_participants) participants_result = ctx.task_output(get_participants)
source_language = participants_result.source_language source_language = participants_result.source_language
child_coroutines = [ bulk_runs = [
track_workflow.aio_run( track_workflow.create_bulk_run_item(
TrackInput( input=TrackInput(
track_index=i, track_index=i,
s3_key=track["s3_key"], s3_key=track["s3_key"],
bucket_name=input.bucket_name, bucket_name=input.bucket_name,
@@ -337,35 +364,34 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
for i, track in enumerate(input.tracks) for i, track in enumerate(input.tracks)
] ]
results = await asyncio.gather(*child_coroutines) results = await track_workflow.aio_run_many(bulk_runs)
target_language = participants_result.target_language target_language = participants_result.target_language
track_words = [] track_words: list[list[Word]] = []
padded_tracks = [] padded_tracks = []
created_padded_files = set() created_padded_files = set()
for result in results: for result in results:
transcribe_result = result.get("transcribe_track", {}) transcribe_result = TranscribeTrackResult(**result["transcribe_track"])
track_words.append(transcribe_result.get("words", [])) track_words.append(transcribe_result.words)
pad_result = result.get("pad_track", {}) pad_result = PadTrackResult(**result["pad_track"])
padded_key = pad_result.get("padded_key")
bucket_name = pad_result.get("bucket_name")
# Store S3 key info (not presigned URL) - consumer tasks presign on demand # Store S3 key info (not presigned URL) - consumer tasks presign on demand
if padded_key: if pad_result.padded_key:
padded_tracks.append( padded_tracks.append(
PaddedTrackInfo(key=padded_key, bucket_name=bucket_name) PaddedTrackInfo(
key=pad_result.padded_key, bucket_name=pad_result.bucket_name
)
) )
track_index = pad_result.get("track_index") if pad_result.size > 0:
if pad_result.get("size", 0) > 0 and track_index is not None: storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{pad_result.track_index}.webm"
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{track_index}.webm"
created_padded_files.add(storage_path) created_padded_files.add(storage_path)
all_words = [word for words in track_words for word in words] all_words = [word for words in track_words for word in words]
all_words.sort(key=lambda w: w.get("start", 0)) all_words.sort(key=lambda w: w.start)
ctx.log( ctx.log(
f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks" f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks"
@@ -381,8 +407,10 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
) )
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[process_tracks], execution_timeout=timedelta(seconds=300), retries=3 parents=[process_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
retries=3,
) )
@with_error_handling("mixdown_tracks") @with_error_handling("mixdown_tracks")
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
@@ -462,8 +490,10 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
) )
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=120), retries=3 parents=[mixdown_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
) )
@with_error_handling("generate_waveform") @with_error_handling("generate_waveform")
async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResult: async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResult:
@@ -528,64 +558,113 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
return WaveformResult(waveform_generated=True) return WaveformResult(waveform_generated=True)
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=300), retries=3 parents=[mixdown_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
) )
@with_error_handling("detect_topics") @with_error_handling("detect_topics")
async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
"""Detect topics using LLM and save to database (matches Celery on_topic callback).""" """Detect topics using parallel child workflows (one per chunk)."""
ctx.log("detect_topics: analyzing transcript for topics") ctx.log("detect_topics: analyzing transcript for topics")
track_result = ctx.task_output(process_tracks) track_result = ctx.task_output(process_tracks)
words = track_result.all_words words = track_result.all_words
target_language = track_result.target_language
if not words:
ctx.log("detect_topics: no words, returning empty topics")
return TopicsResult(topics=[])
# Deferred imports: Hatchet workers fork processes
from reflector.db.transcripts import ( # noqa: PLC0415 from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptTopic, TranscriptTopic,
transcripts_controller, transcripts_controller,
) )
word_objects = [Word(**w) for w in words] chunk_size = TOPIC_CHUNK_WORD_COUNT
transcript_type = TranscriptType(words=word_objects) chunks = []
for i in range(0, len(words), chunk_size):
chunk_words = words[i : i + chunk_size]
if not chunk_words:
continue
empty_pipeline = topic_processing.EmptyPipeline(logger=logger) first_word = chunk_words[0]
last_word = chunk_words[-1]
timestamp = first_word.start
duration = last_word.end - timestamp
chunk_text = " ".join(w.text for w in chunk_words)
chunks.append(
{
"index": len(chunks),
"text": chunk_text,
"timestamp": timestamp,
"duration": duration,
"words": chunk_words,
}
)
if not chunks:
ctx.log("detect_topics: no chunks generated, returning empty topics")
return TopicsResult(topics=[])
ctx.log(f"detect_topics: spawning {len(chunks)} topic chunk workflows in parallel")
bulk_runs = [
topic_chunk_workflow.create_bulk_run_item(
input=TopicChunkInput(
chunk_index=chunk["index"],
chunk_text=chunk["text"],
timestamp=chunk["timestamp"],
duration=chunk["duration"],
words=chunk["words"],
)
)
for chunk in chunks
]
results = await topic_chunk_workflow.aio_run_many(bulk_runs)
topic_chunks = [
TopicChunkResult(**result["detect_chunk_topic"]) for result in results
]
async with fresh_db_connection(): async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id) transcript = await transcripts_controller.get_by_id(input.transcript_id)
async def on_topic_callback(data): for chunk in topic_chunks:
topic = TranscriptTopic( topic = TranscriptTopic(
title=data.title, title=chunk.title,
summary=data.summary, summary=chunk.summary,
timestamp=data.timestamp, timestamp=chunk.timestamp,
transcript=data.transcript.text, transcript=" ".join(w.text for w in chunk.words),
words=data.transcript.words, words=[w.model_dump() for w in chunk.words],
) )
if isinstance(
data, TitleSummaryWithId
): # Celery parity: main_live_pipeline.py
topic.id = data.id
await transcripts_controller.upsert_topic(transcript, topic) await transcripts_controller.upsert_topic(transcript, topic)
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, transcript, "TOPIC", topic, logger=logger input.transcript_id, transcript, "TOPIC", topic, logger=logger
) )
topics = await topic_processing.detect_topics( topics_list = [
transcript_type, TitleSummary(
target_language, title=chunk.title,
on_topic_callback=on_topic_callback, summary=chunk.summary,
empty_pipeline=empty_pipeline, timestamp=chunk.timestamp,
duration=chunk.duration,
transcript=TranscriptType(words=chunk.words),
) )
for chunk in topic_chunks
topics_list = [t.model_dump() for t in topics] ]
ctx.log(f"detect_topics complete: found {len(topics_list)} topics") ctx.log(f"detect_topics complete: found {len(topics_list)} topics")
return TopicsResult(topics=topics_list) return TopicsResult(topics=topics_list)
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[detect_topics], execution_timeout=timedelta(seconds=600), retries=3 parents=[detect_topics],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
) )
@with_error_handling("generate_title") @with_error_handling("generate_title")
async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult: async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
@@ -601,8 +680,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
transcripts_controller, transcripts_controller,
) )
topic_objects = [TitleSummary(**t) for t in topics] ctx.log(f"generate_title: received {len(topics)} TitleSummary objects")
ctx.log(f"generate_title: created {len(topic_objects)} TitleSummary objects")
empty_pipeline = topic_processing.EmptyPipeline(logger=logger) empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
title_result = None title_result = None
@@ -634,7 +712,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
ctx.log("generate_title: calling topic_processing.generate_title (LLM call)...") ctx.log("generate_title: calling topic_processing.generate_title (LLM call)...")
await topic_processing.generate_title( await topic_processing.generate_title(
topic_objects, topics,
on_title_callback=on_title_callback, on_title_callback=on_title_callback,
empty_pipeline=empty_pipeline, empty_pipeline=empty_pipeline,
logger=logger, logger=logger,
@@ -646,98 +724,278 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
return TitleResult(title=title_result) return TitleResult(title=title_result)
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[detect_topics], execution_timeout=timedelta(seconds=600), retries=3 parents=[detect_topics],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
) )
@with_error_handling("generate_summary") @with_error_handling("extract_subjects")
async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult: async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult:
"""Generate meeting summary using LLM and save to database (matches Celery callbacks).""" """Extract main subjects/topics from transcript for parallel processing."""
ctx.log(f"generate_summary: starting for transcript_id={input.transcript_id}") ctx.log(f"extract_subjects: starting for transcript_id={input.transcript_id}")
topics_result = ctx.task_output(detect_topics) topics_result = ctx.task_output(detect_topics)
topics = topics_result.topics topics = topics_result.topics
ctx.log(f"generate_summary: received {len(topics)} topics from detect_topics")
if not topics:
ctx.log("extract_subjects: no topics, returning empty subjects")
return SubjectsResult(
subjects=[],
transcript_text="",
participant_names=[],
participant_name_to_id={},
)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.summary.summary_builder import ( # noqa: PLC0415
SummaryBuilder,
)
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
# Build transcript text from topics (same logic as TranscriptFinalSummaryProcessor)
speakermap = {}
if transcript and transcript.participants:
speakermap = {
p.speaker: p.name
for p in transcript.participants
if p.speaker is not None and p.name
}
text_lines = []
for topic in topics:
for segment in topic.transcript.as_segments():
name = speakermap.get(segment.speaker, f"Speaker {segment.speaker}")
text_lines.append(f"{name}: {segment.text}")
transcript_text = "\n".join(text_lines)
participant_names = []
participant_name_to_id = {}
if transcript and transcript.participants:
participant_names = [p.name for p in transcript.participants if p.name]
participant_name_to_id = {
p.name: p.id for p in transcript.participants if p.name and p.id
}
# TODO: refactor SummaryBuilder methods into standalone functions
llm = LLM(settings=settings)
builder = SummaryBuilder(llm, logger=logger)
builder.set_transcript(transcript_text)
if participant_names:
builder.set_known_participants(
participant_names, participant_name_to_id=participant_name_to_id
)
ctx.log("extract_subjects: calling LLM to extract subjects")
await builder.extract_subjects()
ctx.log(f"extract_subjects complete: {len(builder.subjects)} subjects")
return SubjectsResult(
subjects=builder.subjects,
transcript_text=transcript_text,
participant_names=participant_names,
participant_name_to_id=participant_name_to_id,
)
@daily_multitrack_pipeline.task(
parents=[extract_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
)
@with_error_handling("process_subjects")
async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubjectsResult:
"""Spawn child workflows for each subject (dynamic fan-out, parallel LLM calls)."""
subjects_result = ctx.task_output(extract_subjects)
subjects = subjects_result.subjects
if not subjects:
ctx.log("process_subjects: no subjects to process")
return ProcessSubjectsResult(subject_summaries=[])
ctx.log(f"process_subjects: spawning {len(subjects)} subject workflows in parallel")
bulk_runs = [
subject_workflow.create_bulk_run_item(
input=SubjectInput(
subject=subject,
subject_index=i,
transcript_text=subjects_result.transcript_text,
participant_names=subjects_result.participant_names,
participant_name_to_id=subjects_result.participant_name_to_id,
)
)
for i, subject in enumerate(subjects)
]
results = await subject_workflow.aio_run_many(bulk_runs)
subject_summaries = [
SubjectSummaryResult(**result["generate_detailed_summary"])
for result in results
]
ctx.log(f"process_subjects complete: {len(subject_summaries)} summaries")
return ProcessSubjectsResult(subject_summaries=subject_summaries)
@daily_multitrack_pipeline.task(
parents=[process_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
)
@with_error_handling("generate_recap")
async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
"""Generate recap and long summary from subject summaries, save to database."""
ctx.log(f"generate_recap: starting for transcript_id={input.transcript_id}")
subjects_result = ctx.task_output(extract_subjects)
process_result = ctx.task_output(process_subjects)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
from reflector.db.transcripts import ( # noqa: PLC0415 from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptActionItems,
TranscriptFinalLongSummary, TranscriptFinalLongSummary,
TranscriptFinalShortSummary, TranscriptFinalShortSummary,
transcripts_controller, transcripts_controller,
) )
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.summary.prompts import ( # noqa: PLC0415
RECAP_PROMPT,
build_participant_instructions,
build_summary_markdown,
)
topic_objects = [TitleSummary(**t) for t in topics] subject_summaries = process_result.subject_summaries
ctx.log(f"generate_summary: created {len(topic_objects)} TitleSummary objects")
empty_pipeline = topic_processing.EmptyPipeline(logger=logger) if not subject_summaries:
summary_result = None ctx.log("generate_recap: no subject summaries, returning empty")
short_summary_result = None return RecapResult(short_summary="", long_summary="")
action_items_result = None
summaries = [
{"subject": s.subject, "summary": s.paragraph_summary}
for s in subject_summaries
]
summaries_text = "\n\n".join([f"{s['subject']}: {s['summary']}" for s in summaries])
llm = LLM(settings=settings)
participant_instructions = build_participant_instructions(
subjects_result.participant_names
)
recap_prompt = RECAP_PROMPT
if participant_instructions:
recap_prompt = f"{recap_prompt}\n\n{participant_instructions}"
ctx.log("generate_recap: calling LLM for recap")
recap_response = await llm.get_response(
recap_prompt,
[summaries_text],
tone_name="Recap summarizer",
)
short_summary = str(recap_response)
long_summary = build_summary_markdown(short_summary, summaries)
async with fresh_db_connection(): async with fresh_db_connection():
ctx.log("generate_summary: DB connection established")
transcript = await transcripts_controller.get_by_id(input.transcript_id) transcript = await transcripts_controller.get_by_id(input.transcript_id)
ctx.log( if transcript:
f"generate_summary: fetched transcript, exists={transcript is not None}"
)
async def on_long_summary_callback(data):
nonlocal summary_result
ctx.log(
f"generate_summary: on_long_summary_callback received ({len(data.long_summary)} chars)"
)
summary_result = data.long_summary
final_long_summary = TranscriptFinalLongSummary(
long_summary=data.long_summary
)
await transcripts_controller.update( await transcripts_controller.update(
transcript, transcript,
{"long_summary": final_long_summary.long_summary}, {
"short_summary": short_summary,
"long_summary": long_summary,
},
) )
ctx.log("generate_summary: saved long_summary to DB")
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_LONG_SUMMARY",
final_long_summary,
logger=logger,
)
ctx.log("generate_summary: broadcasted FINAL_LONG_SUMMARY event")
async def on_short_summary_callback(data): final_short = TranscriptFinalShortSummary(short_summary=short_summary)
nonlocal short_summary_result
ctx.log(
f"generate_summary: on_short_summary_callback received ({len(data.short_summary)} chars)"
)
short_summary_result = data.short_summary
final_short_summary = TranscriptFinalShortSummary(
short_summary=data.short_summary
)
await transcripts_controller.update(
transcript,
{"short_summary": final_short_summary.short_summary},
)
ctx.log("generate_summary: saved short_summary to DB")
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, input.transcript_id,
transcript, transcript,
"FINAL_SHORT_SUMMARY", "FINAL_SHORT_SUMMARY",
final_short_summary, final_short,
logger=logger, logger=logger,
) )
ctx.log("generate_summary: broadcasted FINAL_SHORT_SUMMARY event")
async def on_action_items_callback(data): final_long = TranscriptFinalLongSummary(long_summary=long_summary)
nonlocal action_items_result await append_event_and_broadcast(
ctx.log( input.transcript_id,
f"generate_summary: on_action_items_callback received ({len(data.action_items)} items)"
)
action_items_result = data.action_items
action_items = TranscriptActionItems(action_items=data.action_items)
await transcripts_controller.update(
transcript, transcript,
{"action_items": action_items.action_items}, "FINAL_LONG_SUMMARY",
final_long,
logger=logger,
)
ctx.log("generate_recap complete")
return RecapResult(short_summary=short_summary, long_summary=long_summary)
@daily_multitrack_pipeline.task(
parents=[extract_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_LONG),
retries=3,
)
@with_error_handling("identify_action_items")
async def identify_action_items(
input: PipelineInput, ctx: Context
) -> ActionItemsResult:
"""Identify action items from transcript (parallel with subject processing)."""
ctx.log(f"identify_action_items: starting for transcript_id={input.transcript_id}")
subjects_result = ctx.task_output(extract_subjects)
if not subjects_result.transcript_text:
ctx.log("identify_action_items: no transcript text, returning empty")
return ActionItemsResult(action_items={"decisions": [], "next_steps": []})
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptActionItems,
transcripts_controller,
)
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.summary.summary_builder import ( # noqa: PLC0415
SummaryBuilder,
)
# TODO: refactor SummaryBuilder methods into standalone functions
llm = LLM(settings=settings)
builder = SummaryBuilder(llm, logger=logger)
builder.set_transcript(subjects_result.transcript_text)
if subjects_result.participant_names:
builder.set_known_participants(
subjects_result.participant_names,
participant_name_to_id=subjects_result.participant_name_to_id,
)
ctx.log("identify_action_items: calling LLM")
action_items_response = await builder.identify_action_items()
if action_items_response is None:
raise RuntimeError("Failed to identify action items - LLM call failed")
action_items_dict = action_items_response.model_dump()
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
action_items = TranscriptActionItems(action_items=action_items_dict)
await transcripts_controller.update(
transcript, {"action_items": action_items.action_items}
) )
ctx.log("generate_summary: saved action_items to DB")
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, input.transcript_id,
transcript, transcript,
@@ -745,34 +1003,18 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
action_items, action_items,
logger=logger, logger=logger,
) )
ctx.log("generate_summary: broadcasted ACTION_ITEMS event")
ctx.log( ctx.log(
"generate_summary: calling topic_processing.generate_summaries (LLM calls)..." f"identify_action_items complete: {len(action_items_dict.get('decisions', []))} decisions, "
) f"{len(action_items_dict.get('next_steps', []))} next steps"
await topic_processing.generate_summaries(
topic_objects,
transcript,
on_long_summary_callback=on_long_summary_callback,
on_short_summary_callback=on_short_summary_callback,
on_action_items_callback=on_action_items_callback,
empty_pipeline=empty_pipeline,
logger=logger,
)
ctx.log("generate_summary: topic_processing.generate_summaries returned")
ctx.log("generate_summary complete")
return SummaryResult(
summary=summary_result,
short_summary=short_summary_result,
action_items=action_items_result,
) )
return ActionItemsResult(action_items=action_items_dict)
@diarization_pipeline.task(
parents=[generate_waveform, generate_title, generate_summary], @daily_multitrack_pipeline.task(
execution_timeout=timedelta(seconds=60), parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3, retries=3,
) )
@with_error_handling("finalize") @with_error_handling("finalize")
@@ -818,8 +1060,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
if transcript is None: if transcript is None:
raise ValueError(f"Transcript {input.transcript_id} not found in database") raise ValueError(f"Transcript {input.transcript_id} not found in database")
word_objects = [Word(**w) for w in all_words] merged_transcript = TranscriptType(words=all_words, translation=None)
merged_transcript = TranscriptType(words=word_objects, translation=None)
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, input.transcript_id,
@@ -856,8 +1097,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
return FinalizeResult(status="COMPLETED") return FinalizeResult(status="COMPLETED")
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[finalize], execution_timeout=timedelta(seconds=60), retries=3 parents=[finalize], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
) )
@with_error_handling("cleanup_consent", set_error_status=False) @with_error_handling("cleanup_consent", set_error_status=False)
async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult: async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
@@ -956,8 +1197,10 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
return ConsentResult() return ConsentResult()
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[cleanup_consent], execution_timeout=timedelta(seconds=60), retries=5 parents=[cleanup_consent],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=5,
) )
@with_error_handling("post_zulip", set_error_status=False) @with_error_handling("post_zulip", set_error_status=False)
async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult: async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
@@ -981,12 +1224,14 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
return ZulipResult(zulip_message_id=message_id) return ZulipResult(zulip_message_id=message_id)
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[post_zulip], execution_timeout=timedelta(seconds=120), retries=30 parents=[post_zulip],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=30,
) )
@with_error_handling("send_webhook", set_error_status=False) @with_error_handling("send_webhook", set_error_status=False)
async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult: async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
"""Send completion webhook to external service.""" """Send completion webhook to external service with full payload and HMAC signature."""
ctx.log(f"send_webhook: transcript_id={input.transcript_id}") ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
if not input.room_id: if not input.room_id:
@@ -995,27 +1240,39 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.rooms import rooms_controller # noqa: PLC0415 from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 from reflector.utils.webhook import ( # noqa: PLC0415
build_transcript_webhook_payload,
send_webhook_request,
)
room = await rooms_controller.get_by_id(input.room_id) room = await rooms_controller.get_by_id(input.room_id)
transcript = await transcripts_controller.get_by_id(input.transcript_id) if not room or not room.webhook_url:
ctx.log("send_webhook skipped (no webhook_url configured)")
return WebhookResult(webhook_sent=False, skipped=True)
if room and room.webhook_url and transcript: payload = await build_transcript_webhook_payload(
webhook_payload = { transcript_id=input.transcript_id,
"event": "transcript.completed", room_id=input.room_id,
"transcript_id": input.transcript_id, )
"title": transcript.title,
"duration": transcript.duration, if not payload:
} ctx.log("send_webhook skipped (could not build payload)")
return WebhookResult(webhook_sent=False, skipped=True)
async with httpx.AsyncClient() as client:
response = await client.post( ctx.log(
room.webhook_url, json=webhook_payload, timeout=30 f"send_webhook: sending to {room.webhook_url} "
f"(topics={len(payload.transcript.topics)}, "
f"participants={len(payload.transcript.participants)})"
)
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
timeout=30.0,
) )
response.raise_for_status()
ctx.log(f"send_webhook complete: status_code={response.status_code}") ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code) return WebhookResult(webhook_sent=True, response_code=response.status_code)
return WebhookResult(webhook_sent=False, skipped=True)

View File

@@ -5,13 +5,20 @@ Provides static typing for all task outputs, enabling type checking
and better IDE support. and better IDE support.
""" """
from typing import Any
from pydantic import BaseModel from pydantic import BaseModel
from reflector.processors.types import TitleSummary, Word
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
class ParticipantInfo(BaseModel):
"""Participant info with speaker index for workflow result."""
participant_id: NonEmptyString
user_name: NonEmptyString
speaker: int
class PadTrackResult(BaseModel): class PadTrackResult(BaseModel):
"""Result from pad_track task.""" """Result from pad_track task."""
@@ -26,7 +33,7 @@ class PadTrackResult(BaseModel):
class TranscribeTrackResult(BaseModel): class TranscribeTrackResult(BaseModel):
"""Result from transcribe_track task.""" """Result from transcribe_track task."""
words: list[dict[str, Any]] words: list[Word]
track_index: int track_index: int
@@ -41,7 +48,7 @@ class RecordingResult(BaseModel):
class ParticipantsResult(BaseModel): class ParticipantsResult(BaseModel):
"""Result from get_participants task.""" """Result from get_participants task."""
participants: list[dict[str, Any]] participants: list[ParticipantInfo]
num_tracks: int num_tracks: int
source_language: NonEmptyString source_language: NonEmptyString
target_language: NonEmptyString target_language: NonEmptyString
@@ -57,7 +64,7 @@ class PaddedTrackInfo(BaseModel):
class ProcessTracksResult(BaseModel): class ProcessTracksResult(BaseModel):
"""Result from process_tracks task.""" """Result from process_tracks task."""
all_words: list[dict[str, Any]] all_words: list[Word]
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
word_count: int word_count: int
num_tracks: int num_tracks: int
@@ -79,10 +86,21 @@ class WaveformResult(BaseModel):
waveform_generated: bool waveform_generated: bool
class TopicChunkResult(BaseModel):
"""Result from topic chunk child workflow."""
chunk_index: int
title: str
summary: str
timestamp: float
duration: float
words: list[Word]
class TopicsResult(BaseModel): class TopicsResult(BaseModel):
"""Result from detect_topics task.""" """Result from detect_topics task."""
topics: list[dict[str, Any]] topics: list[TitleSummary]
class TitleResult(BaseModel): class TitleResult(BaseModel):
@@ -91,12 +109,41 @@ class TitleResult(BaseModel):
title: str | None title: str | None
class SummaryResult(BaseModel): class SubjectsResult(BaseModel):
"""Result from generate_summary task.""" """Result from extract_subjects task."""
summary: str | None subjects: list[str]
short_summary: str | None transcript_text: str # Formatted transcript for LLM consumption
action_items: dict | None = None participant_names: list[str]
participant_name_to_id: dict[str, str]
class SubjectSummaryResult(BaseModel):
"""Result from subject summary child workflow."""
subject: str
subject_index: int
detailed_summary: str
paragraph_summary: str
class ProcessSubjectsResult(BaseModel):
"""Result from process_subjects fan-out task."""
subject_summaries: list[SubjectSummaryResult]
class RecapResult(BaseModel):
"""Result from generate_recap task."""
short_summary: str # Recap paragraph
long_summary: str # Full markdown summary
class ActionItemsResult(BaseModel):
"""Result from identify_action_items task."""
action_items: dict # ActionItemsResponse as dict (may have empty lists)
class FinalizeResult(BaseModel): class FinalizeResult(BaseModel):

View File

@@ -0,0 +1,107 @@
"""
Hatchet child workflow: SubjectProcessing
Handles individual subject/topic summary generation.
Spawned dynamically by the main diarization pipeline for each extracted subject
via aio_run_many() for parallel processing.
"""
from datetime import timedelta
from hatchet_sdk import Context
from hatchet_sdk.rate_limit import RateLimit
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
from reflector.hatchet.workflows.models import SubjectSummaryResult
from reflector.logger import logger
from reflector.processors.summary.prompts import (
DETAILED_SUBJECT_PROMPT_TEMPLATE,
PARAGRAPH_SUMMARY_PROMPT,
build_participant_instructions,
)
class SubjectInput(BaseModel):
"""Input for individual subject processing."""
subject: str
subject_index: int
transcript_text: str
participant_names: list[str]
participant_name_to_id: dict[str, str]
hatchet = HatchetClientManager.get_client()
subject_workflow = hatchet.workflow(
name="SubjectProcessing", input_validator=SubjectInput
)
@subject_workflow.task(
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=2)],
)
async def generate_detailed_summary(
input: SubjectInput, ctx: Context
) -> SubjectSummaryResult:
"""Generate detailed analysis for a single subject, then condense to paragraph."""
ctx.log(
f"generate_detailed_summary: subject '{input.subject}' (index {input.subject_index})"
)
logger.info(
"[Hatchet] generate_detailed_summary",
subject=input.subject,
subject_index=input.subject_index,
)
# Deferred imports: Hatchet workers fork processes, fresh imports ensure
# LLM HTTP connection pools aren't shared across forks
from reflector.llm import LLM # noqa: PLC0415
from reflector.settings import settings # noqa: PLC0415
llm = LLM(settings=settings)
participant_instructions = build_participant_instructions(input.participant_names)
detailed_prompt = DETAILED_SUBJECT_PROMPT_TEMPLATE.format(subject=input.subject)
if participant_instructions:
detailed_prompt = f"{detailed_prompt}\n\n{participant_instructions}"
ctx.log("generate_detailed_summary: calling LLM for detailed analysis")
detailed_response = await llm.get_response(
detailed_prompt,
[input.transcript_text],
tone_name="Topic assistant",
)
detailed_summary = str(detailed_response)
paragraph_prompt = PARAGRAPH_SUMMARY_PROMPT
if participant_instructions:
paragraph_prompt = f"{paragraph_prompt}\n\n{participant_instructions}"
ctx.log("generate_detailed_summary: calling LLM for paragraph summary")
paragraph_response = await llm.get_response(
paragraph_prompt,
[detailed_summary],
tone_name="Topic summarizer",
)
paragraph_summary = str(paragraph_response)
ctx.log(f"generate_detailed_summary complete: subject '{input.subject}'")
logger.info(
"[Hatchet] generate_detailed_summary complete",
subject=input.subject,
subject_index=input.subject_index,
detailed_len=len(detailed_summary),
paragraph_len=len(paragraph_summary),
)
return SubjectSummaryResult(
subject=input.subject,
subject_index=input.subject_index,
detailed_summary=detailed_summary,
paragraph_summary=paragraph_summary,
)

View File

@@ -0,0 +1,91 @@
"""
Hatchet child workflow: TopicChunkProcessing
Handles topic detection for individual transcript chunks.
Spawned dynamically by detect_topics via aio_run_many() for parallel processing.
"""
from datetime import timedelta
from hatchet_sdk import Context
from hatchet_sdk.rate_limit import RateLimit
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_SHORT
from reflector.hatchet.workflows.models import TopicChunkResult
from reflector.logger import logger
from reflector.processors.prompts import TOPIC_PROMPT
from reflector.processors.types import Word
class TopicChunkInput(BaseModel):
"""Input for individual topic chunk processing."""
chunk_index: int
chunk_text: str
timestamp: float
duration: float
words: list[Word]
hatchet = HatchetClientManager.get_client()
topic_chunk_workflow = hatchet.workflow(
name="TopicChunkProcessing", input_validator=TopicChunkInput
)
@topic_chunk_workflow.task(
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=1)],
)
async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunkResult:
"""Detect topic for a single transcript chunk."""
ctx.log(f"detect_chunk_topic: chunk {input.chunk_index}")
logger.info(
"[Hatchet] detect_chunk_topic",
chunk_index=input.chunk_index,
text_length=len(input.chunk_text),
)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing LLM HTTP connection pools across forks
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.transcript_topic_detector import ( # noqa: PLC0415
TopicResponse,
)
from reflector.settings import settings # noqa: PLC0415
from reflector.utils.text import clean_title # noqa: PLC0415
llm = LLM(settings=settings, temperature=0.9, max_tokens=500)
prompt = TOPIC_PROMPT.format(text=input.chunk_text)
response = await llm.get_structured_response(
prompt,
[input.chunk_text],
TopicResponse,
tone_name="Topic analyzer",
timeout=settings.LLM_STRUCTURED_RESPONSE_TIMEOUT,
)
title = clean_title(response.title)
ctx.log(
f"detect_chunk_topic complete: chunk {input.chunk_index}, title='{title[:50]}'"
)
logger.info(
"[Hatchet] detect_chunk_topic complete",
chunk_index=input.chunk_index,
title=title[:50],
)
return TopicChunkResult(
chunk_index=input.chunk_index,
title=title,
summary=response.summary,
timestamp=input.timestamp,
duration=input.duration,
words=input.words,
)

View File

@@ -4,10 +4,10 @@ Hatchet child workflow: TrackProcessing
Handles individual audio track processing: padding and transcription. Handles individual audio track processing: padding and transcription.
Spawned dynamically by the main diarization pipeline for each track. Spawned dynamically by the main diarization pipeline for each track.
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline) Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline)
because Hatchet workflow DAGs are defined statically, but the number of tracks varies because Hatchet workflow DAGs are defined statically, but the number of tracks varies
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py. standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
Note: This file uses deferred imports (inside tasks) intentionally. Note: This file uses deferred imports (inside tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure Hatchet workers run in forked processes; fresh imports per task ensure
@@ -23,6 +23,7 @@ from hatchet_sdk import Context
from pydantic import BaseModel from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
from reflector.logger import logger from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
@@ -47,7 +48,7 @@ hatchet = HatchetClientManager.get_client()
track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput) track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput)
@track_workflow.task(execution_timeout=timedelta(seconds=300), retries=3) @track_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
"""Pad single audio track with silence for alignment. """Pad single audio track with silence for alignment.
@@ -153,7 +154,7 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
@track_workflow.task( @track_workflow.task(
parents=[pad_track], execution_timeout=timedelta(seconds=600), retries=3 parents=[pad_track], execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3
) )
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult: async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
"""Transcribe audio track using GPU (Modal.com) or local Whisper.""" """Transcribe audio track using GPU (Modal.com) or local Whisper."""
@@ -197,23 +198,20 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
transcript = await transcribe_file_with_processor(audio_url, input.language) transcript = await transcribe_file_with_processor(audio_url, input.language)
# Tag all words with speaker index # Tag all words with speaker index
words = []
for word in transcript.words: for word in transcript.words:
word_dict = word.model_dump() word.speaker = input.track_index
word_dict["speaker"] = input.track_index
words.append(word_dict)
ctx.log( ctx.log(
f"transcribe_track complete: track {input.track_index}, {len(words)} words" f"transcribe_track complete: track {input.track_index}, {len(transcript.words)} words"
) )
logger.info( logger.info(
"[Hatchet] transcribe_track complete", "[Hatchet] transcribe_track complete",
track_index=input.track_index, track_index=input.track_index,
word_count=len(words), word_count=len(transcript.words),
) )
return TranscribeTrackResult( return TranscribeTrackResult(
words=words, words=transcript.words,
track_index=input.track_index, track_index=input.track_index,
) )

View File

@@ -18,6 +18,7 @@ from reflector.processors import (
) )
from reflector.processors.types import TitleSummary from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType from reflector.processors.types import Transcript as TranscriptType
from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT
class EmptyPipeline: class EmptyPipeline:
@@ -38,7 +39,7 @@ async def detect_topics(
on_topic_callback: Callable, on_topic_callback: Callable,
empty_pipeline: EmptyPipeline, empty_pipeline: EmptyPipeline,
) -> list[TitleSummary]: ) -> list[TitleSummary]:
chunk_size = 300 chunk_size = TOPIC_CHUNK_WORD_COUNT
topics: list[TitleSummary] = [] topics: list[TitleSummary] = []
async def on_topic(topic: TitleSummary): async def on_topic(topic: TitleSummary):

View File

@@ -0,0 +1,30 @@
"""
LLM prompts for transcript processing.
Extracted to a separate module to avoid circular imports when importing
from processor modules (which import LLM/settings at module level).
"""
from textwrap import dedent
TOPIC_PROMPT = dedent(
"""
Analyze the following transcript segment and extract the main topic being discussed.
Focus on the substantive content and ignore small talk or administrative chatter.
Create a title that:
- Captures the specific subject matter being discussed
- Is descriptive and self-explanatory
- Uses professional language
- Is specific rather than generic
For the summary:
- Summarize the key points in maximum two sentences
- Focus on what was discussed, decided, or accomplished
- Be concise but informative
<transcript>
{text}
</transcript>
"""
).strip()

View File

@@ -0,0 +1,91 @@
"""
LLM prompts for summary generation.
Extracted to a separate module to avoid circular imports when importing
from summary_builder.py (which imports LLM/settings at module level).
"""
from textwrap import dedent
def build_participant_instructions(participant_names: list[str]) -> str:
"""Build participant context instructions for LLM prompts."""
if not participant_names:
return ""
participants_list = ", ".join(participant_names)
return dedent(
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
"""
Get me information about the topic "{subject}"
# RESPONSE GUIDELINES
Follow this structured approach to create the topic summary:
- Highlight important arguments, insights, or data presented.
- Outline decisions made.
- Indicate any decisions reached, including any rationale or key factors
that influenced these decisions.
- Detail action items and responsibilities.
- For each decision or unresolved issue, list specific action items agreed
upon, along with assigned individuals or teams responsible for each task.
- Specify deadlines or timelines if mentioned. For each action item,
include any deadlines or timeframes discussed for completion or follow-up.
- Mention unresolved issues or topics needing further discussion, aiding in
planning future meetings or follow-up actions.
- Do not include topic unrelated to {subject}.
# OUTPUT
Your summary should be clear, concise, and structured, covering all major
points, decisions, and action items from the meeting. It should be easy to
understand for someone not present, providing a comprehensive understanding
of what transpired and what needs to be done next. The summary should not
exceed one page to ensure brevity and focus.
"""
).strip()
PARAGRAPH_SUMMARY_PROMPT = dedent(
"""
Summarize the mentioned topic in 1 paragraph.
It will be integrated into the final summary, so just for this topic.
"""
).strip()
RECAP_PROMPT = dedent(
"""
Provide a high-level quick recap of the following meeting, fitting in one paragraph.
Do not include decisions, action items or unresolved issue, just highlight the high moments.
Just dive into the meeting, be concise and do not include unnecessary details.
As we already know it is a meeting, do not start with 'During the meeting' or equivalent.
"""
).strip()
def build_summary_markdown(recap: str, summaries: list[dict[str, str]]) -> str:
"""Build markdown summary from recap and subject summaries."""
lines: list[str] = []
if recap:
lines.append("# Quick recap")
lines.append("")
lines.append(recap)
lines.append("")
if summaries:
lines.append("# Summary")
lines.append("")
for summary in summaries:
lines.append(f"**{summary['subject']}**")
lines.append(summary["summary"])
lines.append("")
return "\n".join(lines)

View File

@@ -15,6 +15,13 @@ import structlog
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from reflector.llm import LLM from reflector.llm import LLM
from reflector.processors.summary.prompts import (
DETAILED_SUBJECT_PROMPT_TEMPLATE,
PARAGRAPH_SUMMARY_PROMPT,
RECAP_PROMPT,
build_participant_instructions,
build_summary_markdown,
)
from reflector.settings import settings from reflector.settings import settings
T = TypeVar("T", bound=BaseModel) T = TypeVar("T", bound=BaseModel)
@@ -52,50 +59,6 @@ SUBJECTS_PROMPT = dedent(
""" """
).strip() ).strip()
DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
"""
Get me information about the topic "{subject}"
# RESPONSE GUIDELINES
Follow this structured approach to create the topic summary:
- Highlight important arguments, insights, or data presented.
- Outline decisions made.
- Indicate any decisions reached, including any rationale or key factors
that influenced these decisions.
- Detail action items and responsibilities.
- For each decision or unresolved issue, list specific action items agreed
upon, along with assigned individuals or teams responsible for each task.
- Specify deadlines or timelines if mentioned. For each action item,
include any deadlines or timeframes discussed for completion or follow-up.
- Mention unresolved issues or topics needing further discussion, aiding in
planning future meetings or follow-up actions.
- Do not include topic unrelated to {subject}.
# OUTPUT
Your summary should be clear, concise, and structured, covering all major
points, decisions, and action items from the meeting. It should be easy to
understand for someone not present, providing a comprehensive understanding
of what transpired and what needs to be done next. The summary should not
exceed one page to ensure brevity and focus.
"""
).strip()
PARAGRAPH_SUMMARY_PROMPT = dedent(
"""
Summarize the mentioned topic in 1 paragraph.
It will be integrated into the final summary, so just for this topic.
"""
).strip()
RECAP_PROMPT = dedent(
"""
Provide a high-level quick recap of the following meeting, fitting in one paragraph.
Do not include decisions, action items or unresolved issue, just highlight the high moments.
Just dive into the meeting, be concise and do not include unnecessary details.
As we already know it is a meeting, do not start with 'During the meeting' or equivalent.
"""
).strip()
ACTION_ITEMS_PROMPT = dedent( ACTION_ITEMS_PROMPT = dedent(
""" """
Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next. Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next.
@@ -331,17 +294,7 @@ class SummaryBuilder:
participants_md = self.format_list_md(participants) participants_md = self.format_list_md(participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}" self.transcript += f"\n\n# Participants\n\n{participants_md}"
participants_list = ", ".join(participants) self.participant_instructions = build_participant_instructions(participants)
self.participant_instructions = dedent(
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
async def identify_participants(self) -> None: async def identify_participants(self) -> None:
""" """
@@ -377,18 +330,9 @@ class SummaryBuilder:
participants_md = self.format_list_md(unique_participants) participants_md = self.format_list_md(unique_participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}" self.transcript += f"\n\n# Participants\n\n{participants_md}"
# Set instructions that will be automatically added to all prompts self.participant_instructions = build_participant_instructions(
participants_list = ", ".join(unique_participants) unique_participants
self.participant_instructions = dedent( )
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
else: else:
self.logger.warning("No participants identified in the transcript") self.logger.warning("No participants identified in the transcript")
@@ -613,22 +557,7 @@ class SummaryBuilder:
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
def as_markdown(self) -> str: def as_markdown(self) -> str:
lines: list[str] = [] return build_summary_markdown(self.recap, self.summaries)
if self.recap:
lines.append("# Quick recap")
lines.append("")
lines.append(self.recap)
lines.append("")
if self.summaries:
lines.append("# Summary")
lines.append("")
for summary in self.summaries:
lines.append(f"**{summary['subject']}**")
lines.append(summary["summary"])
lines.append("")
return "\n".join(lines)
def format_list_md(self, data: list[str]) -> str: def format_list_md(self, data: list[str]) -> str:
return "\n".join([f"- {item}" for item in data]) return "\n".join([f"- {item}" for item in data])

View File

@@ -1,35 +1,12 @@
from textwrap import dedent
from pydantic import AliasChoices, BaseModel, Field from pydantic import AliasChoices, BaseModel, Field
from reflector.llm import LLM from reflector.llm import LLM
from reflector.processors.base import Processor from reflector.processors.base import Processor
from reflector.processors.prompts import TOPIC_PROMPT
from reflector.processors.types import TitleSummary, Transcript from reflector.processors.types import TitleSummary, Transcript
from reflector.settings import settings from reflector.settings import settings
from reflector.utils.text import clean_title from reflector.utils.text import clean_title
TOPIC_PROMPT = dedent(
"""
Analyze the following transcript segment and extract the main topic being discussed.
Focus on the substantive content and ignore small talk or administrative chatter.
Create a title that:
- Captures the specific subject matter being discussed
- Is descriptive and self-explanatory
- Uses professional language
- Is specific rather than generic
For the summary:
- Summarize the key points in maximum two sentences
- Focus on what was discussed, decided, or accomplished
- Be concise but informative
<transcript>
{text}
</transcript>
"""
).strip()
class TopicResponse(BaseModel): class TopicResponse(BaseModel):
"""Structured response for topic detection""" """Structured response for topic detection"""

View File

@@ -102,7 +102,8 @@ async def validate_transcript_for_processing(
if transcript.locked: if transcript.locked:
return ValidationLocked(detail="Recording is locked") return ValidationLocked(detail="Recording is locked")
if transcript.status == "idle": # hatchet is idempotent anyways + if it wasn't dispatched successfully
if transcript.status == "idle" and not settings.HATCHET_ENABLED:
return ValidationNotReady(detail="Recording is not ready for processing") return ValidationNotReady(detail="Recording is not ready for processing")
# Check Celery tasks # Check Celery tasks
@@ -115,13 +116,11 @@ async def validate_transcript_for_processing(
): ):
return ValidationAlreadyScheduled(detail="already running") return ValidationAlreadyScheduled(detail="already running")
# Check Hatchet workflows (if enabled)
if settings.HATCHET_ENABLED and transcript.workflow_run_id: if settings.HATCHET_ENABLED and transcript.workflow_run_id:
try: try:
status = await HatchetClientManager.get_workflow_run_status( status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id transcript.workflow_run_id
) )
# If workflow is running or queued, don't allow new processing
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED): if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
return ValidationAlreadyScheduled( return ValidationAlreadyScheduled(
detail="Hatchet workflow already running" detail="Hatchet workflow already running"

View File

@@ -3,8 +3,12 @@ from pathlib import Path
import av import av
import numpy as np import numpy as np
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
def get_audio_waveform(path: Path | str, segments_count: int = 256) -> list[int]:
def get_audio_waveform(
path: Path | str, segments_count: int = WAVEFORM_SEGMENTS
) -> list[int]:
if isinstance(path, Path): if isinstance(path, Path):
path = path.as_posix() path = path.as_posix()
@@ -70,7 +74,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("path", type=Path) parser.add_argument("path", type=Path)
parser.add_argument("--segments-count", type=int, default=256) parser.add_argument("--segments-count", type=int, default=WAVEFORM_SEGMENTS)
args = parser.parse_args() args = parser.parse_args()
print(get_audio_waveform(args.path, args.segments_count)) print(get_audio_waveform(args.path, args.segments_count))

View File

@@ -0,0 +1,8 @@
"""
Shared transcript processing constants.
Used by both Hatchet workflows and Celery pipelines for consistent processing.
"""
# Topic detection: number of words per chunk for topic extraction
TOPIC_CHUNK_WORD_COUNT = 300

View File

@@ -0,0 +1,224 @@
"""Webhook utilities.
Shared webhook functionality for both Hatchet and Celery pipelines.
"""
import hashlib
import hmac
import uuid
from datetime import datetime, timezone
import httpx
from pydantic import BaseModel
from reflector.logger import logger
from reflector.settings import settings
from reflector.utils.webhook_outgoing_models import (
WebhookCalendarEventPayload,
WebhookParticipantPayload,
WebhookPayload,
WebhookRoomPayload,
WebhookTestPayload,
WebhookTopicPayload,
WebhookTranscriptPayload,
)
__all__ = [
"WebhookCalendarEventPayload",
"WebhookParticipantPayload",
"WebhookPayload",
"WebhookRoomPayload",
"WebhookTestPayload",
"WebhookTopicPayload",
"WebhookTranscriptPayload",
"_serialize_payload",
"build_transcript_webhook_payload",
"build_test_webhook_payload",
"build_webhook_headers",
"generate_webhook_signature",
"send_webhook_request",
]
def _serialize_payload(payload: BaseModel) -> bytes:
"""Serialize Pydantic model to compact JSON bytes."""
return payload.model_dump_json(by_alias=True, exclude_none=False).encode("utf-8")
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC-SHA256 signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
def build_webhook_headers(
event_type: str,
payload_bytes: bytes,
webhook_secret: str | None = None,
retry_count: int = 0,
) -> dict[str, str]:
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": event_type,
"X-Webhook-Retry": str(retry_count),
}
if webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(payload_bytes, webhook_secret, timestamp)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
return headers
async def send_webhook_request(
url: str,
payload: BaseModel,
event_type: str,
webhook_secret: str | None = None,
retry_count: int = 0,
timeout: float = 30.0,
) -> httpx.Response:
"""Send webhook request with proper headers and signature.
Raises:
httpx.HTTPStatusError: On non-2xx response
httpx.ConnectError: On connection failure
httpx.TimeoutException: On timeout
"""
payload_bytes = _serialize_payload(payload)
headers = build_webhook_headers(
event_type=event_type,
payload_bytes=payload_bytes,
webhook_secret=webhook_secret,
retry_count=retry_count,
)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, content=payload_bytes, headers=headers)
response.raise_for_status()
return response
async def build_transcript_webhook_payload(
transcript_id: str,
room_id: str,
) -> WebhookPayload | None:
"""Build webhook payload by fetching transcript and room data from database."""
# Inline imports required: this utils module would create circular imports
# if db modules were imported at top level (utils -> db -> ... -> utils).
# This pattern is consistent with Hatchet task files.
from reflector.db.calendar_events import calendar_events_controller # noqa: PLC0415
from reflector.db.meetings import meetings_controller # noqa: PLC0415
from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.utils.webvtt import topics_to_webvtt # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
return None
room = await rooms_controller.get_by_id(room_id)
if not room:
return None
topics_data = [
WebhookTopicPayload(
title=topic.title,
summary=topic.summary,
timestamp=topic.timestamp,
duration=topic.duration,
webvtt=topics_to_webvtt([topic]) if topic.words else "",
)
for topic in (transcript.topics or [])
]
participants_data = [
WebhookParticipantPayload(id=p.id, name=p.name, speaker=p.speaker)
for p in (transcript.participants or [])
]
calendar_event_data: WebhookCalendarEventPayload | None = None
try:
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting and meeting.calendar_event_id:
calendar_event = await calendar_events_controller.get_by_id(
meeting.calendar_event_id
)
if calendar_event:
calendar_event_data = WebhookCalendarEventPayload(
id=calendar_event.id,
ics_uid=calendar_event.ics_uid,
title=calendar_event.title,
start_time=calendar_event.start_time,
end_time=calendar_event.end_time,
description=calendar_event.description or None,
location=calendar_event.location or None,
attendees=calendar_event.attendees or None,
)
except Exception as e:
logger.warning(
"Failed to fetch calendar event for webhook",
transcript_id=transcript_id,
meeting_id=transcript.meeting_id,
error=str(e),
)
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
return WebhookPayload(
event="transcript.completed",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
transcript=WebhookTranscriptPayload(
id=transcript.id,
room_id=transcript.room_id,
created_at=transcript.created_at,
duration=transcript.duration,
title=transcript.title,
short_summary=transcript.short_summary,
long_summary=transcript.long_summary,
webvtt=transcript.webvtt,
topics=topics_data,
participants=participants_data,
source_language=transcript.source_language,
target_language=transcript.target_language,
status=transcript.status,
frontend_url=frontend_url,
action_items=transcript.action_items,
),
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
calendar_event=calendar_event_data,
)
async def build_test_webhook_payload(room_id: str) -> WebhookTestPayload | None:
"""Build test webhook payload."""
# Inline import: avoid circular dependency (utils -> db -> utils)
from reflector.db.rooms import rooms_controller # noqa: PLC0415
room = await rooms_controller.get_by_id(room_id)
if not room:
return None
return WebhookTestPayload(
event="test",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
message="This is a test webhook from Reflector",
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
)

View File

@@ -0,0 +1,80 @@
"""Pydantic models for outgoing webhook payloads.
These models define the structure of webhook payloads sent by Reflector
to external services when transcript processing completes.
"""
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
from reflector.utils.string import NonEmptyString
WebhookTranscriptEventType = Literal["transcript.completed"]
WebhookTestEventType = Literal["test"]
class WebhookTopicPayload(BaseModel):
title: NonEmptyString
summary: NonEmptyString
timestamp: float
duration: float | None
webvtt: str # can be empty when no words
class WebhookParticipantPayload(BaseModel):
id: NonEmptyString
name: str | None
speaker: int | None
class WebhookRoomPayload(BaseModel):
id: NonEmptyString
name: NonEmptyString
class WebhookCalendarEventPayload(BaseModel):
id: NonEmptyString
ics_uid: str | None = None
title: str | None = None
start_time: datetime | None = None
end_time: datetime | None = None
description: str | None = None
location: str | None = None
attendees: list[str] | None = None
class WebhookTranscriptPayload(BaseModel):
id: NonEmptyString
room_id: NonEmptyString | None
created_at: datetime
duration: float | None
title: str | None
short_summary: str | None
long_summary: str | None
webvtt: str | None
topics: list[WebhookTopicPayload]
participants: list[WebhookParticipantPayload]
source_language: NonEmptyString
target_language: NonEmptyString
status: NonEmptyString
frontend_url: NonEmptyString
action_items: dict | None
class WebhookPayload(BaseModel):
event: WebhookTranscriptEventType
event_id: NonEmptyString
timestamp: datetime
transcript: WebhookTranscriptPayload
room: WebhookRoomPayload
calendar_event: WebhookCalendarEventPayload | None = None
class WebhookTestPayload(BaseModel):
event: WebhookTestEventType
event_id: NonEmptyString
timestamp: datetime
message: NonEmptyString
room: WebhookRoomPayload

View File

@@ -300,7 +300,7 @@ async def _process_multitrack_recording_inner(
if use_hatchet: if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow( workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline", workflow_name="DailyMultitrackPipeline",
input_data={ input_data={
"recording_id": recording_id, "recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)], "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
@@ -848,7 +848,7 @@ async def reprocess_failed_daily_recordings():
continue continue
workflow_id = await HatchetClientManager.start_workflow( workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline", workflow_name="DailyMultitrackPipeline",
input_data={ input_data={
"recording_id": recording.id, "recording_id": recording.id,
"tracks": [ "tracks": [

View File

@@ -1,8 +1,5 @@
"""Webhook task for sending transcript notifications.""" """Webhook task for sending transcript notifications."""
import hashlib
import hmac
import json
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -11,28 +8,20 @@ import structlog
from celery import shared_task from celery import shared_task
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import asynctask from reflector.pipelines.main_live_pipeline import asynctask
from reflector.settings import settings from reflector.utils.webhook import (
from reflector.utils.webvtt import topics_to_webvtt WebhookRoomPayload,
WebhookTestPayload,
_serialize_payload,
build_transcript_webhook_payload,
build_webhook_headers,
send_webhook_request,
)
logger = structlog.wrap_logger(get_task_logger(__name__)) logger = structlog.wrap_logger(get_task_logger(__name__))
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
@shared_task( @shared_task(
bind=True, bind=True,
max_retries=30, max_retries=30,
@@ -47,6 +36,11 @@ async def send_transcript_webhook(
room_id: str, room_id: str,
event_id: str, event_id: str,
): ):
"""Send webhook notification for completed transcript.
Uses shared Pydantic models and signature generation from utils/webhook.py
to ensure consistency with Hatchet pipeline.
"""
log = logger.bind( log = logger.bind(
transcript_id=transcript_id, transcript_id=transcript_id,
room_id=room_id, room_id=room_id,
@@ -54,12 +48,6 @@ async def send_transcript_webhook(
) )
try: try:
# Fetch transcript and room
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
log.error("Transcript not found, skipping webhook")
return
room = await rooms_controller.get_by_id(room_id) room = await rooms_controller.get_by_id(room_id)
if not room: if not room:
log.error("Room not found, skipping webhook") log.error("Room not found, skipping webhook")
@@ -69,130 +57,32 @@ async def send_transcript_webhook(
log.info("No webhook URL configured for room, skipping") log.info("No webhook URL configured for room, skipping")
return return
# Generate WebVTT content from topics # Build payload using shared function
topics_data = [] payload = await build_transcript_webhook_payload(
transcript_id=transcript_id,
if transcript.topics: room_id=room_id,
# Build topics data with diarized content per topic
for topic in transcript.topics:
topic_webvtt = topics_to_webvtt([topic]) if topic.words else ""
topics_data.append(
{
"title": topic.title,
"summary": topic.summary,
"timestamp": topic.timestamp,
"duration": topic.duration,
"webvtt": topic_webvtt,
}
) )
# Fetch meeting and calendar event if they exist if not payload:
calendar_event = None log.error("Could not build webhook payload, skipping")
try: return
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting and meeting.calendar_event_id:
calendar_event = await calendar_events_controller.get_by_id(
meeting.calendar_event_id
)
except Exception as e:
logger.error("Error fetching meeting or calendar event", error=str(e))
# Build webhook payload
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
participants = [
{"id": p.id, "name": p.name, "speaker": p.speaker}
for p in (transcript.participants or [])
]
payload_data = {
"event": "transcript.completed",
"event_id": event_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"transcript": {
"id": transcript.id,
"room_id": transcript.room_id,
"created_at": transcript.created_at.isoformat(),
"duration": transcript.duration,
"title": transcript.title,
"short_summary": transcript.short_summary,
"long_summary": transcript.long_summary,
"webvtt": transcript.webvtt,
"topics": topics_data,
"participants": participants,
"source_language": transcript.source_language,
"target_language": transcript.target_language,
"status": transcript.status,
"frontend_url": frontend_url,
"action_items": transcript.action_items,
},
"room": {
"id": room.id,
"name": room.name,
},
}
# Always include calendar_event field, even if no event is present
payload_data["calendar_event"] = {}
# Add calendar event data if present
if calendar_event:
calendar_data = {
"id": calendar_event.id,
"ics_uid": calendar_event.ics_uid,
"title": calendar_event.title,
"start_time": calendar_event.start_time.isoformat()
if calendar_event.start_time
else None,
"end_time": calendar_event.end_time.isoformat()
if calendar_event.end_time
else None,
}
# Add optional fields only if they exist
if calendar_event.description:
calendar_data["description"] = calendar_event.description
if calendar_event.location:
calendar_data["location"] = calendar_event.location
if calendar_event.attendees:
calendar_data["attendees"] = calendar_event.attendees
payload_data["calendar_event"] = calendar_data
# Convert to JSON
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8")
# Generate signature if secret is configured
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "transcript.completed",
"X-Webhook-Retry": str(self.request.retries),
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
# Send webhook with timeout
async with httpx.AsyncClient(timeout=30.0) as client:
log.info( log.info(
"Sending webhook", "Sending webhook",
url=room.webhook_url, url=room.webhook_url,
payload_size=len(payload_bytes), topics=len(payload.transcript.topics),
participants=len(payload.transcript.participants),
) )
response = await client.post( response = await send_webhook_request(
room.webhook_url, url=room.webhook_url,
content=payload_bytes, payload=payload,
headers=headers, event_type="transcript.completed",
webhook_secret=room.webhook_secret,
retry_count=self.request.retries,
timeout=30.0,
) )
response.raise_for_status()
log.info( log.info(
"Webhook sent successfully", "Webhook sent successfully",
status_code=response.status_code, status_code=response.status_code,
@@ -226,8 +116,8 @@ async def send_transcript_webhook(
async def test_webhook(room_id: str) -> dict: async def test_webhook(room_id: str) -> dict:
""" """Test webhook configuration by sending a sample payload.
Test webhook configuration by sending a sample payload.
Returns immediately with success/failure status. Returns immediately with success/failure status.
This is the shared implementation used by both the API endpoint and Celery task. This is the shared implementation used by both the API endpoint and Celery task.
""" """
@@ -239,34 +129,25 @@ async def test_webhook(room_id: str) -> dict:
if not room.webhook_url: if not room.webhook_url:
return {"success": False, "error": "No webhook URL configured"} return {"success": False, "error": "No webhook URL configured"}
now = (datetime.now(timezone.utc).isoformat(),) payload = WebhookTestPayload(
payload_data = { event="test",
"event": "test", event_id=uuid.uuid4().hex,
"event_id": uuid.uuid4().hex, timestamp=datetime.now(timezone.utc),
"timestamp": now, message="This is a test webhook from Reflector",
"message": "This is a test webhook from Reflector", room=WebhookRoomPayload(
"room": { id=room.id,
"id": room.id, name=room.name,
"name": room.name, ),
}, )
}
payload_bytes = _serialize_payload(payload)
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8") # Build headers with signature
headers = build_webhook_headers(
# Generate headers with signature event_type="test",
headers = { payload_bytes=payload_bytes,
"Content-Type": "application/json", webhook_secret=room.webhook_secret,
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "test",
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
) )
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
# Send test webhook with short timeout # Send test webhook with short timeout
async with httpx.AsyncClient(timeout=10.0) as client: async with httpx.AsyncClient(timeout=10.0) as client: