Compare commits

...

9 Commits

Author SHA1 Message Date
7f42ef6d17 chore(main): release 0.27.0 (#814) 2025-12-27 18:11:47 -05:00
5f7b1ff1a6 fix: webhook parity, pipeline rename, waveform constant fix (#806)
* pipeline fixes: whereby Hatchet preparation

* send_webhook fixes

* cleanup

* self-review

* comment

* webhook util functions: less dependencies

* remove comment

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-26 18:00:32 -05:00
2d0df48767 feat: devex/hatchet log progress track (#813)
* progress track for some hatchet tasks

* remove inline imports / type fixes

* progress callback for mixdown - move to a function

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-26 14:10:21 -05:00
5baa6dd92e pipeline type fixes (#812)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-26 11:28:43 -05:00
bab1e2d537 dynamic mixdown hatchet (#811)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 19:48:16 -05:00
e886153ae1 fix hatchet parallel syntax (#810)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 18:45:06 -05:00
7b352f465e dont always enable hatchet (#809)
* dont always enable hatchet

* fix hatchet worker params

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 18:15:33 -05:00
3cf9757ac2 diarization flow - pralellelize better (#808)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 17:35:43 -05:00
d9d3938192 better hatchet concurrency limits (#807)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 17:26:23 -05:00
19 changed files with 682 additions and 345 deletions

View File

@@ -1,5 +1,17 @@
# Changelog
## [0.27.0](https://github.com/Monadical-SAS/reflector/compare/v0.26.0...v0.27.0) (2025-12-26)
### Features
* devex/hatchet log progress track ([#813](https://github.com/Monadical-SAS/reflector/issues/813)) ([2d0df48](https://github.com/Monadical-SAS/reflector/commit/2d0df487674e5486208cd599e3338ebff8b6e470))
### Bug Fixes
* webhook parity, pipeline rename, waveform constant fix ([#806](https://github.com/Monadical-SAS/reflector/issues/806)) ([5f7b1ff](https://github.com/Monadical-SAS/reflector/commit/5f7b1ff1a68ebbb907684c7c5f55c1f82dac8550))
## [0.26.0](https://github.com/Monadical-SAS/reflector/compare/v0.25.0...v0.26.0) (2025-12-23)

View File

@@ -5,7 +5,7 @@ import shutil
from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Literal
from typing import Any, Literal, Sequence
import sqlalchemy
from fastapi import HTTPException
@@ -180,7 +180,7 @@ class TranscriptDuration(BaseModel):
class TranscriptWaveform(BaseModel):
waveform: list[float]
waveform: Sequence[float]
class TranscriptEvent(BaseModel):

View File

@@ -81,7 +81,8 @@ async def set_status_and_broadcast(
async def append_event_and_broadcast(
transcript_id: NonEmptyString,
transcript: Transcript,
event_name: str,
event_name: NonEmptyString,
# TODO proper dictionary event => type
data: Any,
logger: structlog.BoundLogger,
) -> TranscriptEvent:

View File

@@ -2,6 +2,31 @@
Hatchet workflow constants.
"""
from enum import StrEnum
class TaskName(StrEnum):
GET_RECORDING = "get_recording"
GET_PARTICIPANTS = "get_participants"
PROCESS_TRACKS = "process_tracks"
MIXDOWN_TRACKS = "mixdown_tracks"
GENERATE_WAVEFORM = "generate_waveform"
DETECT_TOPICS = "detect_topics"
GENERATE_TITLE = "generate_title"
EXTRACT_SUBJECTS = "extract_subjects"
PROCESS_SUBJECTS = "process_subjects"
GENERATE_RECAP = "generate_recap"
IDENTIFY_ACTION_ITEMS = "identify_action_items"
FINALIZE = "finalize"
CLEANUP_CONSENT = "cleanup_consent"
POST_ZULIP = "post_zulip"
SEND_WEBHOOK = "send_webhook"
PAD_TRACK = "pad_track"
TRANSCRIBE_TRACK = "transcribe_track"
DETECT_CHUNK_TOPIC = "detect_chunk_topic"
GENERATE_DETAILED_SUMMARY = "generate_detailed_summary"
# Rate limit key for LLM API calls (shared across all LLM-calling tasks)
LLM_RATE_LIMIT_KEY = "llm"

View File

@@ -1,5 +1,5 @@
"""
Run Hatchet workers for the diarization pipeline.
Run Hatchet workers for the multitrack pipeline.
Runs as a separate process, just like Celery workers.
Usage:
@@ -39,7 +39,7 @@ def main() -> None:
# Can't use lazy init: decorators need the client object when function is defined.
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415
diarization_pipeline,
daily_multitrack_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,
@@ -54,7 +54,7 @@ def main() -> None:
worker = hatchet.worker(
"reflector-pipeline-worker",
workflows=[
diarization_pipeline,
daily_multitrack_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,

View File

@@ -1,8 +1,8 @@
"""Hatchet workflow definitions."""
from reflector.hatchet.workflows.diarization_pipeline import (
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
diarization_pipeline,
daily_multitrack_pipeline,
)
from reflector.hatchet.workflows.subject_processing import (
SubjectInput,
@@ -15,7 +15,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
__all__ = [
"diarization_pipeline",
"daily_multitrack_pipeline",
"subject_workflow",
"topic_chunk_workflow",
"track_workflow",

View File

@@ -1,9 +1,12 @@
"""
Hatchet main workflow: DiarizationPipeline
Hatchet main workflow: DailyMultitrackPipeline
Multitrack diarization pipeline for Daily.co recordings.
Multitrack processing pipeline for Daily.co recordings.
Orchestrates the full processing flow from recording metadata to final transcript.
Daily.co recordings don't require ML diarization - speaker identification comes from
track index (each participant's audio is a separate track).
Note: This file uses deferred imports (inside functions/tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure DB connections
are not shared across forks, avoiding connection pooling issues.
@@ -13,10 +16,11 @@ import asyncio
import functools
import json
import tempfile
import time
from contextlib import asynccontextmanager
from datetime import timedelta
from pathlib import Path
from typing import Callable
from typing import Any, Callable, Coroutine, Protocol, TypeVar
import httpx
from hatchet_sdk import Context
@@ -34,6 +38,7 @@ from reflector.hatchet.constants import (
TIMEOUT_LONG,
TIMEOUT_MEDIUM,
TIMEOUT_SHORT,
TaskName,
)
from reflector.hatchet.workflows.models import (
ActionItemsResult,
@@ -70,6 +75,13 @@ from reflector.hatchet.workflows.track_processing import TrackInput, track_workf
from reflector.logger import logger
from reflector.pipelines import topic_processing
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.summary.models import ActionItemsResponse
from reflector.processors.summary.prompts import (
RECAP_PROMPT,
build_participant_instructions,
build_summary_markdown,
)
from reflector.processors.summary.summary_builder import SummaryBuilder
from reflector.processors.types import TitleSummary, Word
from reflector.processors.types import Transcript as TranscriptType
from reflector.settings import settings
@@ -93,7 +105,7 @@ from reflector.zulip import post_transcript_notification
class PipelineInput(BaseModel):
"""Input to trigger the diarization pipeline."""
"""Input to trigger the Daily.co multitrack pipeline."""
recording_id: NonEmptyString
tracks: list[dict] # List of {"s3_key": str}
@@ -104,7 +116,7 @@ class PipelineInput(BaseModel):
hatchet = HatchetClientManager.get_client()
diarization_pipeline = hatchet.workflow(
daily_multitrack_pipeline = hatchet.workflow(
name="DiarizationPipeline", input_validator=PipelineInput
)
@@ -162,7 +174,52 @@ def _spawn_storage():
)
def with_error_handling(step_name: str, set_error_status: bool = True) -> Callable:
class Loggable(Protocol):
def log(self, message: str) -> None: ...
def make_audio_progress_logger(
ctx: Loggable, task_name: TaskName, interval: float = 5.0
) -> Callable[[float | None, float], None]:
"""Create a throttled progress logger callback for audio processing.
Args:
ctx: Object with .log() method (e.g., Hatchet Context).
task_name: Name to prefix in log messages.
interval: Minimum seconds between log messages.
Returns:
Callback(progress_pct, audio_position) that logs at most every `interval` seconds.
"""
start_time = time.monotonic()
last_log_time = [start_time]
def callback(progress_pct: float | None, audio_position: float) -> None:
now = time.monotonic()
if now - last_log_time[0] >= interval:
elapsed = now - start_time
if progress_pct is not None:
ctx.log(
f"{task_name} progress: {progress_pct:.1f}% @ {audio_position:.1f}s (elapsed: {elapsed:.1f}s)"
)
else:
ctx.log(
f"{task_name} progress: @ {audio_position:.1f}s (elapsed: {elapsed:.1f}s)"
)
last_log_time[0] = now
return callback
R = TypeVar("R")
def with_error_handling(
step_name: TaskName, set_error_status: bool = True
) -> Callable[
[Callable[[PipelineInput, Context], Coroutine[Any, Any, R]]],
Callable[[PipelineInput, Context], Coroutine[Any, Any, R]],
]:
"""Decorator that handles task failures uniformly.
Args:
@@ -170,9 +227,11 @@ def with_error_handling(step_name: str, set_error_status: bool = True) -> Callab
set_error_status: Whether to set transcript status to 'error' on failure.
"""
def decorator(func: Callable) -> Callable:
def decorator(
func: Callable[[PipelineInput, Context], Coroutine[Any, Any, R]],
) -> Callable[[PipelineInput, Context], Coroutine[Any, Any, R]]:
@functools.wraps(func)
async def wrapper(input: PipelineInput, ctx: Context):
async def wrapper(input: PipelineInput, ctx: Context) -> R:
try:
return await func(input, ctx)
except Exception as e:
@@ -186,15 +245,15 @@ def with_error_handling(step_name: str, set_error_status: bool = True) -> Callab
await set_workflow_error_status(input.transcript_id)
raise
return wrapper
return wrapper # type: ignore[return-value]
return decorator
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
)
@with_error_handling("get_recording")
@with_error_handling(TaskName.GET_RECORDING)
async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
"""Fetch recording metadata from Daily.co API."""
ctx.log(f"get_recording: starting for recording_id={input.recording_id}")
@@ -244,19 +303,18 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[get_recording],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
)
@with_error_handling("get_participants")
@with_error_handling(TaskName.GET_PARTICIPANTS)
async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult:
"""Fetch participant list from Daily.co API and update transcript in database."""
ctx.log(f"get_participants: transcript_id={input.transcript_id}")
recording = ctx.task_output(get_recording)
mtg_session_id = recording.mtg_session_id
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptParticipant,
@@ -264,16 +322,17 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
# Note: title NOT cleared - preserves existing titles
await transcripts_controller.update(
transcript,
{
"events": [],
"topics": [],
"participants": [],
},
)
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
# Note: title NOT cleared - preserves existing titles
await transcripts_controller.update(
transcript,
{
"events": [],
"topics": [],
"participants": [],
},
)
mtg_session_id = assert_non_none_and_non_empty(
mtg_session_id, "mtg_session_id is required"
@@ -335,12 +394,12 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[get_participants],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
)
@with_error_handling("process_tracks")
@with_error_handling(TaskName.PROCESS_TRACKS)
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
"""Spawn child workflows for each track (dynamic fan-out)."""
ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows")
@@ -370,10 +429,10 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
created_padded_files = set()
for result in results:
transcribe_result = TranscribeTrackResult(**result["transcribe_track"])
transcribe_result = TranscribeTrackResult(**result[TaskName.TRANSCRIBE_TRACK])
track_words.append(transcribe_result.words)
pad_result = PadTrackResult(**result["pad_track"])
pad_result = PadTrackResult(**result[TaskName.PAD_TRACK])
# Store S3 key info (not presigned URL) - consumer tasks presign on demand
if pad_result.padded_key:
@@ -404,19 +463,31 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[process_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
retries=3,
)
@with_error_handling("mixdown_tracks")
@with_error_handling(TaskName.MIXDOWN_TRACKS)
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
"""Mix all padded tracks into single audio file using PyAV (same as Celery)."""
ctx.log("mixdown_tracks: mixing padded tracks into single audio file")
track_result = ctx.task_output(process_tracks)
recording_result = ctx.task_output(get_recording)
padded_tracks = track_result.padded_tracks
# Dynamic timeout: scales with track count and recording duration
# Base 300s + 60s per track + 1s per 10s of recording
track_count = len(padded_tracks) if padded_tracks else 0
recording_duration = recording_result.duration or 0
timeout_estimate = 300 + (track_count * 60) + int(recording_duration / 10)
ctx.refresh_timeout(f"{timeout_estimate}s")
ctx.log(
f"mixdown_tracks: dynamic timeout set to {timeout_estimate}s "
f"(tracks={track_count}, duration={recording_duration:.0f}s)"
)
# TODO think of NonEmpty type to avoid those checks, e.g. sized.NonEmpty from https://github.com/antonagestam/phantom-types/
if not padded_tracks:
raise ValueError("No padded tracks to mixdown")
@@ -458,6 +529,8 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
target_sample_rate,
offsets_seconds=None,
logger=logger,
progress_callback=make_audio_progress_logger(ctx, TaskName.MIXDOWN_TRACKS),
expected_duration_sec=recording_duration if recording_duration > 0 else None,
)
await writer.flush()
@@ -487,12 +560,12 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[mixdown_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
)
@with_error_handling("generate_waveform")
@with_error_handling(TaskName.GENERATE_WAVEFORM)
async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResult:
"""Generate audio waveform visualization using AudioWaveformProcessor (matches Celery)."""
ctx.log(f"generate_waveform: transcript_id={input.transcript_id}")
@@ -555,12 +628,12 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
return WaveformResult(waveform_generated=True)
@diarization_pipeline.task(
parents=[mixdown_tracks],
@daily_multitrack_pipeline.task(
parents=[process_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
)
@with_error_handling("detect_topics")
@with_error_handling(TaskName.DETECT_TOPICS)
async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
"""Detect topics using parallel child workflows (one per chunk)."""
ctx.log("detect_topics: analyzing transcript for topics")
@@ -623,11 +696,13 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
results = await topic_chunk_workflow.aio_run_many(bulk_runs)
topic_chunks = [
TopicChunkResult(**result["detect_chunk_topic"]) for result in results
TopicChunkResult(**result[TaskName.DETECT_CHUNK_TOPIC]) for result in results
]
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
for chunk in topic_chunks:
topic = TranscriptTopic(
@@ -635,7 +710,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
summary=chunk.summary,
timestamp=chunk.timestamp,
transcript=" ".join(w.text for w in chunk.words),
words=[w.model_dump() for w in chunk.words],
words=chunk.words,
)
await transcripts_controller.upsert_topic(transcript, topic)
await append_event_and_broadcast(
@@ -658,12 +733,12 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
return TopicsResult(topics=topics_list)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[detect_topics],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
)
@with_error_handling("generate_title")
@with_error_handling(TaskName.GENERATE_TITLE)
async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
"""Generate meeting title using LLM and save to database (matches Celery on_title callback)."""
ctx.log(f"generate_title: starting for transcript_id={input.transcript_id}")
@@ -685,6 +760,8 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
async with fresh_db_connection():
ctx.log("generate_title: DB connection established")
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
ctx.log(f"generate_title: fetched transcript, exists={transcript is not None}")
async def on_title_callback(data):
@@ -721,12 +798,12 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
return TitleResult(title=title_result)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[detect_topics],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
)
@with_error_handling("extract_subjects")
@with_error_handling(TaskName.EXTRACT_SUBJECTS)
async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult:
"""Extract main subjects/topics from transcript for parallel processing."""
ctx.log(f"extract_subjects: starting for transcript_id={input.transcript_id}")
@@ -747,9 +824,6 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
# sharing DB connections and LLM HTTP pools across forks
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.summary.summary_builder import ( # noqa: PLC0415
SummaryBuilder,
)
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
@@ -802,12 +876,12 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[extract_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
)
@with_error_handling("process_subjects")
@with_error_handling(TaskName.PROCESS_SUBJECTS)
async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubjectsResult:
"""Spawn child workflows for each subject (dynamic fan-out, parallel LLM calls)."""
subjects_result = ctx.task_output(extract_subjects)
@@ -835,7 +909,7 @@ async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubject
results = await subject_workflow.aio_run_many(bulk_runs)
subject_summaries = [
SubjectSummaryResult(**result["generate_detailed_summary"])
SubjectSummaryResult(**result[TaskName.GENERATE_DETAILED_SUMMARY])
for result in results
]
@@ -844,12 +918,12 @@ async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubject
return ProcessSubjectsResult(subject_summaries=subject_summaries)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[process_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
)
@with_error_handling("generate_recap")
@with_error_handling(TaskName.GENERATE_RECAP)
async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
"""Generate recap and long summary from subject summaries, save to database."""
ctx.log(f"generate_recap: starting for transcript_id={input.transcript_id}")
@@ -865,11 +939,6 @@ async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
transcripts_controller,
)
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.summary.prompts import ( # noqa: PLC0415
RECAP_PROMPT,
build_participant_instructions,
build_summary_markdown,
)
subject_summaries = process_result.subject_summaries
@@ -938,12 +1007,12 @@ async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
return RecapResult(short_summary=short_summary, long_summary=long_summary)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[extract_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_LONG),
retries=3,
)
@with_error_handling("identify_action_items")
@with_error_handling(TaskName.IDENTIFY_ACTION_ITEMS)
async def identify_action_items(
input: PipelineInput, ctx: Context
) -> ActionItemsResult:
@@ -954,7 +1023,7 @@ async def identify_action_items(
if not subjects_result.transcript_text:
ctx.log("identify_action_items: no transcript text, returning empty")
return ActionItemsResult(action_items={"decisions": [], "next_steps": []})
return ActionItemsResult(action_items=ActionItemsResponse())
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
@@ -963,9 +1032,6 @@ async def identify_action_items(
transcripts_controller,
)
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.summary.summary_builder import ( # noqa: PLC0415
SummaryBuilder,
)
# TODO: refactor SummaryBuilder methods into standalone functions
llm = LLM(settings=settings)
@@ -984,11 +1050,11 @@ async def identify_action_items(
if action_items_response is None:
raise RuntimeError("Failed to identify action items - LLM call failed")
action_items_dict = action_items_response.model_dump()
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
# Serialize to dict for DB storage and WebSocket broadcast
action_items_dict = action_items_response.model_dump()
action_items = TranscriptActionItems(action_items=action_items_dict)
await transcripts_controller.update(
transcript, {"action_items": action_items.action_items}
@@ -1002,19 +1068,19 @@ async def identify_action_items(
)
ctx.log(
f"identify_action_items complete: {len(action_items_dict.get('decisions', []))} decisions, "
f"{len(action_items_dict.get('next_steps', []))} next steps"
f"identify_action_items complete: {len(action_items_response.decisions)} decisions, "
f"{len(action_items_response.next_steps)} next steps"
)
return ActionItemsResult(action_items=action_items_dict)
return ActionItemsResult(action_items=action_items_response)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
)
@with_error_handling("finalize")
@with_error_handling(TaskName.FINALIZE)
async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
"""Finalize transcript: save words, emit TRANSCRIPT event, set status to 'ended'.
@@ -1094,10 +1160,10 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
return FinalizeResult(status="COMPLETED")
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[finalize], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
)
@with_error_handling("cleanup_consent", set_error_status=False)
@with_error_handling(TaskName.CLEANUP_CONSENT, set_error_status=False)
async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
"""Check consent and delete audio files if any participant denied."""
ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}")
@@ -1194,12 +1260,12 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
return ConsentResult()
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[cleanup_consent],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=5,
)
@with_error_handling("post_zulip", set_error_status=False)
@with_error_handling(TaskName.POST_ZULIP, set_error_status=False)
async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
"""Post notification to Zulip."""
ctx.log(f"post_zulip: transcript_id={input.transcript_id}")
@@ -1221,14 +1287,14 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
return ZulipResult(zulip_message_id=message_id)
@diarization_pipeline.task(
parents=[post_zulip],
@daily_multitrack_pipeline.task(
parents=[cleanup_consent],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=30,
retries=5,
)
@with_error_handling("send_webhook", set_error_status=False)
@with_error_handling(TaskName.SEND_WEBHOOK, set_error_status=False)
async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
"""Send completion webhook to external service."""
"""Send completion webhook to external service with full payload and HMAC signature."""
ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
if not input.room_id:
@@ -1237,27 +1303,39 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
async with fresh_db_connection():
from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.utils.webhook import ( # noqa: PLC0415
fetch_transcript_webhook_payload,
send_webhook_request,
)
room = await rooms_controller.get_by_id(input.room_id)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not room or not room.webhook_url:
ctx.log("send_webhook skipped (no webhook_url configured)")
return WebhookResult(webhook_sent=False, skipped=True)
if room and room.webhook_url and transcript:
webhook_payload = {
"event": "transcript.completed",
"transcript_id": input.transcript_id,
"title": transcript.title,
"duration": transcript.duration,
}
payload = await fetch_transcript_webhook_payload(
transcript_id=input.transcript_id,
room_id=input.room_id,
)
async with httpx.AsyncClient() as client:
response = await client.post(
room.webhook_url, json=webhook_payload, timeout=30
)
response.raise_for_status()
if isinstance(payload, str):
ctx.log(f"send_webhook skipped (could not build payload): {payload}")
return WebhookResult(webhook_sent=False, skipped=True)
ctx.log(f"send_webhook complete: status_code={response.status_code}")
ctx.log(
f"send_webhook: sending to {room.webhook_url} "
f"(topics={len(payload.transcript.topics)}, "
f"participants={len(payload.transcript.participants)})"
)
return WebhookResult(webhook_sent=True, response_code=response.status_code)
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
timeout=30.0,
)
return WebhookResult(webhook_sent=False, skipped=True)
ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code)

View File

@@ -7,6 +7,7 @@ and better IDE support.
from pydantic import BaseModel
from reflector.processors.summary.models import ActionItemsResponse
from reflector.processors.types import TitleSummary, Word
from reflector.utils.string import NonEmptyString
@@ -42,7 +43,7 @@ class RecordingResult(BaseModel):
id: NonEmptyString | None
mtg_session_id: NonEmptyString | None
duration: float
duration: int | None
class ParticipantsResult(BaseModel):
@@ -143,7 +144,7 @@ class RecapResult(BaseModel):
class ActionItemsResult(BaseModel):
"""Result from identify_action_items task."""
action_items: dict # ActionItemsResponse as dict (may have empty lists)
action_items: ActionItemsResponse
class FinalizeResult(BaseModel):

View File

@@ -7,12 +7,12 @@ Spawned dynamically by detect_topics via aio_run_many() for parallel processing.
from datetime import timedelta
from hatchet_sdk import Context
from hatchet_sdk import ConcurrencyExpression, ConcurrencyLimitStrategy, Context
from hatchet_sdk.rate_limit import RateLimit
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_SHORT
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
from reflector.hatchet.workflows.models import TopicChunkResult
from reflector.logger import logger
from reflector.processors.prompts import TOPIC_PROMPT
@@ -32,12 +32,18 @@ class TopicChunkInput(BaseModel):
hatchet = HatchetClientManager.get_client()
topic_chunk_workflow = hatchet.workflow(
name="TopicChunkProcessing", input_validator=TopicChunkInput
name="TopicChunkProcessing",
input_validator=TopicChunkInput,
concurrency=ConcurrencyExpression(
expression="'global'", # constant string = global limit across all runs
max_runs=20,
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
),
)
@topic_chunk_workflow.task(
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=1)],
)

View File

@@ -4,10 +4,10 @@ Hatchet child workflow: TrackProcessing
Handles individual audio track processing: padding and transcription.
Spawned dynamically by the main diarization pipeline for each track.
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline)
Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline)
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py.
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
Note: This file uses deferred imports (inside tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure
@@ -83,6 +83,15 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
)
with av.open(source_url) as in_container:
if in_container.duration:
try:
duration = timedelta(seconds=in_container.duration // 1_000_000)
ctx.log(
f"pad_track: track {input.track_index}, duration={duration}"
)
except Exception:
ctx.log(f"pad_track: track {input.track_index}, duration=ERROR")
start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger
)

View File

@@ -0,0 +1,50 @@
"""Pydantic models for summary processing."""
from pydantic import BaseModel, Field
class ActionItem(BaseModel):
"""A single action item from the meeting"""
task: str = Field(description="The task or action item to be completed")
assigned_to: str | None = Field(
default=None, description="Person or team assigned to this task (name)"
)
assigned_to_participant_id: str | None = Field(
default=None, description="Participant ID if assigned_to matches a participant"
)
deadline: str | None = Field(
default=None, description="Deadline or timeframe mentioned for this task"
)
context: str | None = Field(
default=None, description="Additional context or notes about this task"
)
class Decision(BaseModel):
"""A decision made during the meeting"""
decision: str = Field(description="What was decided")
rationale: str | None = Field(
default=None,
description="Reasoning or key factors that influenced this decision",
)
decided_by: str | None = Field(
default=None, description="Person or group who made the decision (name)"
)
decided_by_participant_id: str | None = Field(
default=None, description="Participant ID if decided_by matches a participant"
)
class ActionItemsResponse(BaseModel):
"""Pydantic model for identified action items"""
decisions: list[Decision] = Field(
default_factory=list,
description="List of decisions made during the meeting",
)
next_steps: list[ActionItem] = Field(
default_factory=list,
description="List of action items and next steps to be taken",
)

View File

@@ -15,6 +15,7 @@ import structlog
from pydantic import BaseModel, Field
from reflector.llm import LLM
from reflector.processors.summary.models import ActionItemsResponse
from reflector.processors.summary.prompts import (
DETAILED_SUBJECT_PROMPT_TEMPLATE,
PARAGRAPH_SUMMARY_PROMPT,
@@ -148,53 +149,6 @@ class SubjectsResponse(BaseModel):
)
class ActionItem(BaseModel):
"""A single action item from the meeting"""
task: str = Field(description="The task or action item to be completed")
assigned_to: str | None = Field(
default=None, description="Person or team assigned to this task (name)"
)
assigned_to_participant_id: str | None = Field(
default=None, description="Participant ID if assigned_to matches a participant"
)
deadline: str | None = Field(
default=None, description="Deadline or timeframe mentioned for this task"
)
context: str | None = Field(
default=None, description="Additional context or notes about this task"
)
class Decision(BaseModel):
"""A decision made during the meeting"""
decision: str = Field(description="What was decided")
rationale: str | None = Field(
default=None,
description="Reasoning or key factors that influenced this decision",
)
decided_by: str | None = Field(
default=None, description="Person or group who made the decision (name)"
)
decided_by_participant_id: str | None = Field(
default=None, description="Participant ID if decided_by matches a participant"
)
class ActionItemsResponse(BaseModel):
"""Pydantic model for identified action items"""
decisions: list[Decision] = Field(
default_factory=list,
description="List of decisions made during the meeting",
)
next_steps: list[ActionItem] = Field(
default_factory=list,
description="List of action items and next steps to be taken",
)
class SummaryBuilder:
def __init__(self, llm: LLM, filename: str | None = None, logger=None) -> None:
self.transcript: str | None = None

View File

@@ -188,8 +188,8 @@ async def dispatch_transcript_processing(
room_forces_hatchet = room.use_hatchet if room else False
# Start durable workflow if enabled (Hatchet)
# or if room has use_hatchet=True
use_hatchet = settings.HATCHET_ENABLED or room_forces_hatchet
# and if room has use_hatchet=True
use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet
if room_forces_hatchet:
logger.info(

View File

@@ -43,6 +43,8 @@ async def mixdown_tracks_pyav(
target_sample_rate: int,
offsets_seconds: list[float] | None = None,
logger=None,
progress_callback=None,
expected_duration_sec: float | None = None,
) -> None:
"""Multi-track mixdown using PyAV filter graph (amix).
@@ -57,6 +59,10 @@ async def mixdown_tracks_pyav(
If provided, must have same length as track_urls. Delays are relative
to the minimum offset (earliest track has delay=0).
logger: Optional logger instance
progress_callback: Optional callback(progress_pct: float | None, audio_position: float)
called on progress updates. progress_pct is 0-100 if duration known, None otherwise.
audio_position is current position in seconds.
expected_duration_sec: Optional fallback duration if container metadata unavailable.
Raises:
ValueError: If offsets_seconds length doesn't match track_urls,
@@ -171,6 +177,17 @@ async def mixdown_tracks_pyav(
logger.error("Mixdown failed - no valid containers opened")
raise ValueError("Mixdown failed: Could not open any track containers")
# Calculate total duration for progress reporting.
# Try container metadata first, fall back to expected_duration_sec if provided.
max_duration_sec = 0.0
for c in containers:
if c.duration is not None:
dur_sec = c.duration / av.time_base
max_duration_sec = max(max_duration_sec, dur_sec)
if max_duration_sec == 0.0 and expected_duration_sec:
max_duration_sec = expected_duration_sec
current_max_time = 0.0
decoders = [c.decode(audio=0) for c in containers]
active = [True] * len(decoders)
resamplers = [
@@ -192,6 +209,18 @@ async def mixdown_tracks_pyav(
if frame.sample_rate != target_sample_rate:
continue
# Update progress based on frame timestamp
if progress_callback and frame.time is not None:
current_max_time = max(current_max_time, frame.time)
if max_duration_sec > 0:
progress_pct = min(
100.0, (current_max_time / max_duration_sec) * 100
)
else:
progress_pct = None # Duration unavailable
progress_callback(progress_pct, current_max_time)
out_frames = resamplers[i].resample(frame) or []
for rf in out_frames:
rf.sample_rate = target_sample_rate

View File

@@ -3,8 +3,12 @@ from pathlib import Path
import av
import numpy as np
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
def get_audio_waveform(path: Path | str, segments_count: int = 256) -> list[int]:
def get_audio_waveform(
path: Path | str, segments_count: int = WAVEFORM_SEGMENTS
) -> list[int]:
if isinstance(path, Path):
path = path.as_posix()
@@ -70,7 +74,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("path", type=Path)
parser.add_argument("--segments-count", type=int, default=256)
parser.add_argument("--segments-count", type=int, default=WAVEFORM_SEGMENTS)
args = parser.parse_args()
print(get_audio_waveform(args.path, args.segments_count))

View File

@@ -0,0 +1,216 @@
"""Webhook utilities.
Shared webhook functionality for both Hatchet and Celery pipelines.
"""
import hashlib
import hmac
import uuid
from datetime import datetime, timezone
from typing import Union
import httpx
from pydantic import BaseModel
from reflector.logger import logger
from reflector.settings import settings
from reflector.utils.string import NonEmptyString
from reflector.utils.webhook_outgoing_models import (
WebhookCalendarEventPayload,
WebhookParticipantPayload,
WebhookPayload,
WebhookRoomPayload,
WebhookTestPayload,
WebhookTopicPayload,
WebhookTranscriptPayload,
)
__all__ = [
"fetch_transcript_webhook_payload",
"fetch_test_webhook_payload",
"build_webhook_headers",
"generate_webhook_signature",
"send_webhook_request",
]
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
from reflector.utils.webvtt import topics_to_webvtt
def _serialize_payload(payload: BaseModel) -> bytes:
"""Serialize Pydantic model to compact JSON bytes."""
return payload.model_dump_json(by_alias=True, exclude_none=False).encode("utf-8")
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC-SHA256 signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
def build_webhook_headers(
event_type: str,
payload_bytes: bytes,
webhook_secret: str | None = None,
retry_count: int = 0,
) -> dict[str, str]:
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": event_type,
"X-Webhook-Retry": str(retry_count),
}
if webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(payload_bytes, webhook_secret, timestamp)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
return headers
async def send_webhook_request(
url: str,
payload: BaseModel,
event_type: str,
webhook_secret: str | None = None,
retry_count: int = 0,
timeout: float = 30.0,
) -> httpx.Response:
"""Send webhook request with proper headers and signature.
Raises:
httpx.HTTPStatusError: On non-2xx response
httpx.ConnectError: On connection failure
httpx.TimeoutException: On timeout
"""
payload_bytes = _serialize_payload(payload)
headers = build_webhook_headers(
event_type=event_type,
payload_bytes=payload_bytes,
webhook_secret=webhook_secret,
retry_count=retry_count,
)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, content=payload_bytes, headers=headers)
response.raise_for_status()
return response
async def fetch_transcript_webhook_payload(
transcript_id: NonEmptyString,
room_id: NonEmptyString,
) -> Union[WebhookPayload, str]:
"""Build webhook payload by fetching transcript and room data from database."""
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
return f"Transcript {transcript_id} not found"
room = await rooms_controller.get_by_id(room_id)
if not room:
return f"Room {room_id} not found"
topics_data = [
WebhookTopicPayload(
title=topic.title,
summary=topic.summary,
timestamp=topic.timestamp,
duration=topic.duration,
webvtt=topics_to_webvtt([topic]) if topic.words else "",
)
for topic in (transcript.topics or [])
]
participants_data = [
WebhookParticipantPayload(id=p.id, name=p.name, speaker=p.speaker)
for p in (transcript.participants or [])
]
calendar_event_data: WebhookCalendarEventPayload | None = None
try:
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting and meeting.calendar_event_id:
calendar_event = await calendar_events_controller.get_by_id(
meeting.calendar_event_id
)
if calendar_event:
calendar_event_data = WebhookCalendarEventPayload(
id=calendar_event.id,
ics_uid=calendar_event.ics_uid,
title=calendar_event.title,
start_time=calendar_event.start_time,
end_time=calendar_event.end_time,
description=calendar_event.description or None,
location=calendar_event.location or None,
attendees=calendar_event.attendees or None,
)
except Exception as e:
logger.warning(
"Failed to fetch calendar event for webhook",
transcript_id=transcript_id,
meeting_id=transcript.meeting_id,
error=str(e),
)
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
return WebhookPayload(
event="transcript.completed",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
transcript=WebhookTranscriptPayload(
id=transcript.id,
room_id=transcript.room_id,
created_at=transcript.created_at,
duration=transcript.duration,
title=transcript.title,
short_summary=transcript.short_summary,
long_summary=transcript.long_summary,
webvtt=transcript.webvtt,
topics=topics_data,
participants=participants_data,
source_language=transcript.source_language,
target_language=transcript.target_language,
status=transcript.status,
frontend_url=frontend_url,
action_items=transcript.action_items,
),
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
calendar_event=calendar_event_data,
)
async def fetch_test_webhook_payload(
room_id: NonEmptyString,
) -> WebhookTestPayload | None:
"""Build test webhook payload."""
room = await rooms_controller.get_by_id(room_id)
if not room:
return None
return WebhookTestPayload(
event="test",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
message="This is a test webhook from Reflector",
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
)

View File

@@ -0,0 +1,80 @@
"""Pydantic models for outgoing webhook payloads.
These models define the structure of webhook payloads sent by Reflector
to external services when transcript processing completes.
"""
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
from reflector.utils.string import NonEmptyString
WebhookTranscriptEventType = Literal["transcript.completed"]
WebhookTestEventType = Literal["test"]
class WebhookTopicPayload(BaseModel):
title: NonEmptyString
summary: NonEmptyString
timestamp: float
duration: float | None
webvtt: str # can be empty when no words
class WebhookParticipantPayload(BaseModel):
id: NonEmptyString
name: str | None
speaker: int | None
class WebhookRoomPayload(BaseModel):
id: NonEmptyString
name: NonEmptyString
class WebhookCalendarEventPayload(BaseModel):
id: NonEmptyString
ics_uid: str | None = None
title: str | None = None
start_time: datetime | None = None
end_time: datetime | None = None
description: str | None = None
location: str | None = None
attendees: list[str] | None = None
class WebhookTranscriptPayload(BaseModel):
id: NonEmptyString
room_id: NonEmptyString | None
created_at: datetime
duration: float | None
title: str | None
short_summary: str | None
long_summary: str | None
webvtt: str | None
topics: list[WebhookTopicPayload]
participants: list[WebhookParticipantPayload]
source_language: NonEmptyString
target_language: NonEmptyString
status: NonEmptyString
frontend_url: NonEmptyString
action_items: dict | None
class WebhookPayload(BaseModel):
event: WebhookTranscriptEventType
event_id: NonEmptyString
timestamp: datetime
transcript: WebhookTranscriptPayload
room: WebhookRoomPayload
calendar_event: WebhookCalendarEventPayload | None = None
class WebhookTestPayload(BaseModel):
event: WebhookTestEventType
event_id: NonEmptyString
timestamp: datetime
message: NonEmptyString
room: WebhookRoomPayload

View File

@@ -287,9 +287,7 @@ async def _process_multitrack_recording_inner(
room_id=room.id,
)
# Start durable workflow if enabled (Hatchet) or room overrides it
durable_started = False
use_hatchet = settings.HATCHET_ENABLED or (room and room.use_hatchet)
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
if room and room.use_hatchet and not settings.HATCHET_ENABLED:
logger.info(
@@ -300,7 +298,7 @@ async def _process_multitrack_recording_inner(
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
workflow_name="DailyMultitrackPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
@@ -836,7 +834,7 @@ async def reprocess_failed_daily_recordings():
)
continue
use_hatchet = settings.HATCHET_ENABLED or (room and room.use_hatchet)
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
if use_hatchet:
# Hatchet requires a transcript for workflow_run_id tracking
@@ -848,7 +846,7 @@ async def reprocess_failed_daily_recordings():
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
workflow_name="DailyMultitrackPipeline",
input_data={
"recording_id": recording.id,
"tracks": [

View File

@@ -1,8 +1,5 @@
"""Webhook task for sending transcript notifications."""
import hashlib
import hmac
import json
import uuid
from datetime import datetime, timezone
@@ -11,28 +8,20 @@ import structlog
from celery import shared_task
from celery.utils.log import get_task_logger
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.settings import settings
from reflector.utils.webvtt import topics_to_webvtt
from reflector.utils.webhook import (
WebhookRoomPayload,
WebhookTestPayload,
_serialize_payload,
build_webhook_headers,
fetch_transcript_webhook_payload,
send_webhook_request,
)
logger = structlog.wrap_logger(get_task_logger(__name__))
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
@shared_task(
bind=True,
max_retries=30,
@@ -54,12 +43,6 @@ async def send_transcript_webhook(
)
try:
# Fetch transcript and room
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
log.error("Transcript not found, skipping webhook")
return
room = await rooms_controller.get_by_id(room_id)
if not room:
log.error("Room not found, skipping webhook")
@@ -69,135 +52,36 @@ async def send_transcript_webhook(
log.info("No webhook URL configured for room, skipping")
return
# Generate WebVTT content from topics
topics_data = []
payload = await fetch_transcript_webhook_payload(
transcript_id=transcript_id,
room_id=room_id,
)
if transcript.topics:
# Build topics data with diarized content per topic
for topic in transcript.topics:
topic_webvtt = topics_to_webvtt([topic]) if topic.words else ""
topics_data.append(
{
"title": topic.title,
"summary": topic.summary,
"timestamp": topic.timestamp,
"duration": topic.duration,
"webvtt": topic_webvtt,
}
)
if isinstance(payload, str):
log.error(f"Could not build webhook payload, skipping: {payload}")
return
# Fetch meeting and calendar event if they exist
calendar_event = None
try:
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting and meeting.calendar_event_id:
calendar_event = await calendar_events_controller.get_by_id(
meeting.calendar_event_id
)
except Exception as e:
logger.error("Error fetching meeting or calendar event", error=str(e))
log.info(
"Sending webhook",
url=room.webhook_url,
topics=len(payload.transcript.topics),
participants=len(payload.transcript.participants),
)
# Build webhook payload
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
participants = [
{"id": p.id, "name": p.name, "speaker": p.speaker}
for p in (transcript.participants or [])
]
payload_data = {
"event": "transcript.completed",
"event_id": event_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"transcript": {
"id": transcript.id,
"room_id": transcript.room_id,
"created_at": transcript.created_at.isoformat(),
"duration": transcript.duration,
"title": transcript.title,
"short_summary": transcript.short_summary,
"long_summary": transcript.long_summary,
"webvtt": transcript.webvtt,
"topics": topics_data,
"participants": participants,
"source_language": transcript.source_language,
"target_language": transcript.target_language,
"status": transcript.status,
"frontend_url": frontend_url,
"action_items": transcript.action_items,
},
"room": {
"id": room.id,
"name": room.name,
},
}
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
retry_count=self.request.retries,
timeout=30.0,
)
# Always include calendar_event field, even if no event is present
payload_data["calendar_event"] = {}
# Add calendar event data if present
if calendar_event:
calendar_data = {
"id": calendar_event.id,
"ics_uid": calendar_event.ics_uid,
"title": calendar_event.title,
"start_time": calendar_event.start_time.isoformat()
if calendar_event.start_time
else None,
"end_time": calendar_event.end_time.isoformat()
if calendar_event.end_time
else None,
}
# Add optional fields only if they exist
if calendar_event.description:
calendar_data["description"] = calendar_event.description
if calendar_event.location:
calendar_data["location"] = calendar_event.location
if calendar_event.attendees:
calendar_data["attendees"] = calendar_event.attendees
payload_data["calendar_event"] = calendar_data
# Convert to JSON
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8")
# Generate signature if secret is configured
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "transcript.completed",
"X-Webhook-Retry": str(self.request.retries),
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
# Send webhook with timeout
async with httpx.AsyncClient(timeout=30.0) as client:
log.info(
"Sending webhook",
url=room.webhook_url,
payload_size=len(payload_bytes),
)
response = await client.post(
room.webhook_url,
content=payload_bytes,
headers=headers,
)
response.raise_for_status()
log.info(
"Webhook sent successfully",
status_code=response.status_code,
response_size=len(response.content),
)
log.info(
"Webhook sent successfully",
status_code=response.status_code,
response_size=len(response.content),
)
except httpx.HTTPStatusError as e:
log.error(
@@ -226,8 +110,8 @@ async def send_transcript_webhook(
async def test_webhook(room_id: str) -> dict:
"""
Test webhook configuration by sending a sample payload.
"""Test webhook configuration by sending a sample payload.
Returns immediately with success/failure status.
This is the shared implementation used by both the API endpoint and Celery task.
"""
@@ -239,34 +123,24 @@ async def test_webhook(room_id: str) -> dict:
if not room.webhook_url:
return {"success": False, "error": "No webhook URL configured"}
now = (datetime.now(timezone.utc).isoformat(),)
payload_data = {
"event": "test",
"event_id": uuid.uuid4().hex,
"timestamp": now,
"message": "This is a test webhook from Reflector",
"room": {
"id": room.id,
"name": room.name,
},
}
payload = WebhookTestPayload(
event="test",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
message="This is a test webhook from Reflector",
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
)
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8")
payload_bytes = _serialize_payload(payload)
# Generate headers with signature
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "test",
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
headers = build_webhook_headers(
event_type="test",
payload_bytes=payload_bytes,
webhook_secret=room.webhook_secret,
)
# Send test webhook with short timeout
async with httpx.AsyncClient(timeout=10.0) as client: