Compare commits

...

11 Commits

Author SHA1 Message Date
7f42ef6d17 chore(main): release 0.27.0 (#814) 2025-12-27 18:11:47 -05:00
5f7b1ff1a6 fix: webhook parity, pipeline rename, waveform constant fix (#806)
* pipeline fixes: whereby Hatchet preparation

* send_webhook fixes

* cleanup

* self-review

* comment

* webhook util functions: less dependencies

* remove comment

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-26 18:00:32 -05:00
2d0df48767 feat: devex/hatchet log progress track (#813)
* progress track for some hatchet tasks

* remove inline imports / type fixes

* progress callback for mixdown - move to a function

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-26 14:10:21 -05:00
5baa6dd92e pipeline type fixes (#812)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-26 11:28:43 -05:00
bab1e2d537 dynamic mixdown hatchet (#811)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 19:48:16 -05:00
e886153ae1 fix hatchet parallel syntax (#810)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 18:45:06 -05:00
7b352f465e dont always enable hatchet (#809)
* dont always enable hatchet

* fix hatchet worker params

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 18:15:33 -05:00
3cf9757ac2 diarization flow - pralellelize better (#808)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 17:35:43 -05:00
d9d3938192 better hatchet concurrency limits (#807)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 17:26:23 -05:00
8598707c1c chore(main): release 0.26.0 (#805) 2025-12-23 11:05:43 -05:00
594bcc09e0 feat: parallelize hatchet (#804)
* parallelize hatchet (no-mistakes)

* dry (no-mistakes) (minimal)

* comments

* self-review

* self-review

* self-review

* self-review

* pr comments

* pr comments

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 11:03:36 -05:00
25 changed files with 1498 additions and 592 deletions

View File

@@ -1,5 +1,24 @@
# Changelog # Changelog
## [0.27.0](https://github.com/Monadical-SAS/reflector/compare/v0.26.0...v0.27.0) (2025-12-26)
### Features
* devex/hatchet log progress track ([#813](https://github.com/Monadical-SAS/reflector/issues/813)) ([2d0df48](https://github.com/Monadical-SAS/reflector/commit/2d0df487674e5486208cd599e3338ebff8b6e470))
### Bug Fixes
* webhook parity, pipeline rename, waveform constant fix ([#806](https://github.com/Monadical-SAS/reflector/issues/806)) ([5f7b1ff](https://github.com/Monadical-SAS/reflector/commit/5f7b1ff1a68ebbb907684c7c5f55c1f82dac8550))
## [0.26.0](https://github.com/Monadical-SAS/reflector/compare/v0.25.0...v0.26.0) (2025-12-23)
### Features
* parallelize hatchet ([#804](https://github.com/Monadical-SAS/reflector/issues/804)) ([594bcc0](https://github.com/Monadical-SAS/reflector/commit/594bcc09e0ca744163de2f1525ebbf7c52a68448))
## [0.25.0](https://github.com/Monadical-SAS/reflector/compare/v0.24.0...v0.25.0) (2025-12-22) ## [0.25.0](https://github.com/Monadical-SAS/reflector/compare/v0.24.0...v0.25.0) (2025-12-22)

View File

@@ -5,7 +5,7 @@ import shutil
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Literal from typing import Any, Literal, Sequence
import sqlalchemy import sqlalchemy
from fastapi import HTTPException from fastapi import HTTPException
@@ -180,7 +180,7 @@ class TranscriptDuration(BaseModel):
class TranscriptWaveform(BaseModel): class TranscriptWaveform(BaseModel):
waveform: list[float] waveform: Sequence[float]
class TranscriptEvent(BaseModel): class TranscriptEvent(BaseModel):

View File

@@ -81,7 +81,8 @@ async def set_status_and_broadcast(
async def append_event_and_broadcast( async def append_event_and_broadcast(
transcript_id: NonEmptyString, transcript_id: NonEmptyString,
transcript: Transcript, transcript: Transcript,
event_name: str, event_name: NonEmptyString,
# TODO proper dictionary event => type
data: Any, data: Any,
logger: structlog.BoundLogger, logger: structlog.BoundLogger,
) -> TranscriptEvent: ) -> TranscriptEvent:

View File

@@ -0,0 +1,41 @@
"""
Hatchet workflow constants.
"""
from enum import StrEnum
class TaskName(StrEnum):
GET_RECORDING = "get_recording"
GET_PARTICIPANTS = "get_participants"
PROCESS_TRACKS = "process_tracks"
MIXDOWN_TRACKS = "mixdown_tracks"
GENERATE_WAVEFORM = "generate_waveform"
DETECT_TOPICS = "detect_topics"
GENERATE_TITLE = "generate_title"
EXTRACT_SUBJECTS = "extract_subjects"
PROCESS_SUBJECTS = "process_subjects"
GENERATE_RECAP = "generate_recap"
IDENTIFY_ACTION_ITEMS = "identify_action_items"
FINALIZE = "finalize"
CLEANUP_CONSENT = "cleanup_consent"
POST_ZULIP = "post_zulip"
SEND_WEBHOOK = "send_webhook"
PAD_TRACK = "pad_track"
TRANSCRIBE_TRACK = "transcribe_track"
DETECT_CHUNK_TOPIC = "detect_chunk_topic"
GENERATE_DETAILED_SUMMARY = "generate_detailed_summary"
# Rate limit key for LLM API calls (shared across all LLM-calling tasks)
LLM_RATE_LIMIT_KEY = "llm"
# Max LLM calls per second across all tasks
LLM_RATE_LIMIT_PER_SECOND = 10
# Task execution timeouts (seconds)
TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates
TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation
TIMEOUT_LONG = 180 # Action items (larger context LLM)
TIMEOUT_AUDIO = 300 # Audio processing: padding, mixdown
TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks

View File

@@ -1,5 +1,5 @@
""" """
Run Hatchet workers for the diarization pipeline. Run Hatchet workers for the multitrack pipeline.
Runs as a separate process, just like Celery workers. Runs as a separate process, just like Celery workers.
Usage: Usage:
@@ -12,6 +12,9 @@ Usage:
import signal import signal
import sys import sys
from hatchet_sdk.rate_limit import RateLimitDuration
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND
from reflector.logger import logger from reflector.logger import logger
from reflector.settings import settings from reflector.settings import settings
@@ -36,15 +39,26 @@ def main() -> None:
# Can't use lazy init: decorators need the client object when function is defined. # Can't use lazy init: decorators need the client object when function is defined.
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415 from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415 from reflector.hatchet.workflows import ( # noqa: PLC0415
diarization_pipeline, daily_multitrack_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow, track_workflow,
) )
hatchet = HatchetClientManager.get_client() hatchet = HatchetClientManager.get_client()
hatchet.rate_limits.put(
LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND, RateLimitDuration.SECOND
)
worker = hatchet.worker( worker = hatchet.worker(
"reflector-diarization-worker", "reflector-pipeline-worker",
workflows=[diarization_pipeline, track_workflow], workflows=[
daily_multitrack_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,
],
) )
def shutdown_handler(signum: int, frame) -> None: def shutdown_handler(signum: int, frame) -> None:

View File

@@ -1,14 +1,26 @@
"""Hatchet workflow definitions.""" """Hatchet workflow definitions."""
from reflector.hatchet.workflows.diarization_pipeline import ( from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput, PipelineInput,
diarization_pipeline, daily_multitrack_pipeline,
)
from reflector.hatchet.workflows.subject_processing import (
SubjectInput,
subject_workflow,
)
from reflector.hatchet.workflows.topic_chunk_processing import (
TopicChunkInput,
topic_chunk_workflow,
) )
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
__all__ = [ __all__ = [
"diarization_pipeline", "daily_multitrack_pipeline",
"subject_workflow",
"topic_chunk_workflow",
"track_workflow", "track_workflow",
"PipelineInput", "PipelineInput",
"SubjectInput",
"TopicChunkInput",
"TrackInput", "TrackInput",
] ]

View File

@@ -5,13 +5,21 @@ Provides static typing for all task outputs, enabling type checking
and better IDE support. and better IDE support.
""" """
from typing import Any
from pydantic import BaseModel from pydantic import BaseModel
from reflector.processors.summary.models import ActionItemsResponse
from reflector.processors.types import TitleSummary, Word
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
class ParticipantInfo(BaseModel):
"""Participant info with speaker index for workflow result."""
participant_id: NonEmptyString
user_name: NonEmptyString
speaker: int
class PadTrackResult(BaseModel): class PadTrackResult(BaseModel):
"""Result from pad_track task.""" """Result from pad_track task."""
@@ -26,7 +34,7 @@ class PadTrackResult(BaseModel):
class TranscribeTrackResult(BaseModel): class TranscribeTrackResult(BaseModel):
"""Result from transcribe_track task.""" """Result from transcribe_track task."""
words: list[dict[str, Any]] words: list[Word]
track_index: int track_index: int
@@ -35,13 +43,13 @@ class RecordingResult(BaseModel):
id: NonEmptyString | None id: NonEmptyString | None
mtg_session_id: NonEmptyString | None mtg_session_id: NonEmptyString | None
duration: float duration: int | None
class ParticipantsResult(BaseModel): class ParticipantsResult(BaseModel):
"""Result from get_participants task.""" """Result from get_participants task."""
participants: list[dict[str, Any]] participants: list[ParticipantInfo]
num_tracks: int num_tracks: int
source_language: NonEmptyString source_language: NonEmptyString
target_language: NonEmptyString target_language: NonEmptyString
@@ -57,7 +65,7 @@ class PaddedTrackInfo(BaseModel):
class ProcessTracksResult(BaseModel): class ProcessTracksResult(BaseModel):
"""Result from process_tracks task.""" """Result from process_tracks task."""
all_words: list[dict[str, Any]] all_words: list[Word]
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
word_count: int word_count: int
num_tracks: int num_tracks: int
@@ -79,10 +87,21 @@ class WaveformResult(BaseModel):
waveform_generated: bool waveform_generated: bool
class TopicChunkResult(BaseModel):
"""Result from topic chunk child workflow."""
chunk_index: int
title: str
summary: str
timestamp: float
duration: float
words: list[Word]
class TopicsResult(BaseModel): class TopicsResult(BaseModel):
"""Result from detect_topics task.""" """Result from detect_topics task."""
topics: list[dict[str, Any]] topics: list[TitleSummary]
class TitleResult(BaseModel): class TitleResult(BaseModel):
@@ -91,12 +110,41 @@ class TitleResult(BaseModel):
title: str | None title: str | None
class SummaryResult(BaseModel): class SubjectsResult(BaseModel):
"""Result from generate_summary task.""" """Result from extract_subjects task."""
summary: str | None subjects: list[str]
short_summary: str | None transcript_text: str # Formatted transcript for LLM consumption
action_items: dict | None = None participant_names: list[str]
participant_name_to_id: dict[str, str]
class SubjectSummaryResult(BaseModel):
"""Result from subject summary child workflow."""
subject: str
subject_index: int
detailed_summary: str
paragraph_summary: str
class ProcessSubjectsResult(BaseModel):
"""Result from process_subjects fan-out task."""
subject_summaries: list[SubjectSummaryResult]
class RecapResult(BaseModel):
"""Result from generate_recap task."""
short_summary: str # Recap paragraph
long_summary: str # Full markdown summary
class ActionItemsResult(BaseModel):
"""Result from identify_action_items task."""
action_items: ActionItemsResponse
class FinalizeResult(BaseModel): class FinalizeResult(BaseModel):

View File

@@ -0,0 +1,107 @@
"""
Hatchet child workflow: SubjectProcessing
Handles individual subject/topic summary generation.
Spawned dynamically by the main diarization pipeline for each extracted subject
via aio_run_many() for parallel processing.
"""
from datetime import timedelta
from hatchet_sdk import Context
from hatchet_sdk.rate_limit import RateLimit
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
from reflector.hatchet.workflows.models import SubjectSummaryResult
from reflector.logger import logger
from reflector.processors.summary.prompts import (
DETAILED_SUBJECT_PROMPT_TEMPLATE,
PARAGRAPH_SUMMARY_PROMPT,
build_participant_instructions,
)
class SubjectInput(BaseModel):
"""Input for individual subject processing."""
subject: str
subject_index: int
transcript_text: str
participant_names: list[str]
participant_name_to_id: dict[str, str]
hatchet = HatchetClientManager.get_client()
subject_workflow = hatchet.workflow(
name="SubjectProcessing", input_validator=SubjectInput
)
@subject_workflow.task(
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=2)],
)
async def generate_detailed_summary(
input: SubjectInput, ctx: Context
) -> SubjectSummaryResult:
"""Generate detailed analysis for a single subject, then condense to paragraph."""
ctx.log(
f"generate_detailed_summary: subject '{input.subject}' (index {input.subject_index})"
)
logger.info(
"[Hatchet] generate_detailed_summary",
subject=input.subject,
subject_index=input.subject_index,
)
# Deferred imports: Hatchet workers fork processes, fresh imports ensure
# LLM HTTP connection pools aren't shared across forks
from reflector.llm import LLM # noqa: PLC0415
from reflector.settings import settings # noqa: PLC0415
llm = LLM(settings=settings)
participant_instructions = build_participant_instructions(input.participant_names)
detailed_prompt = DETAILED_SUBJECT_PROMPT_TEMPLATE.format(subject=input.subject)
if participant_instructions:
detailed_prompt = f"{detailed_prompt}\n\n{participant_instructions}"
ctx.log("generate_detailed_summary: calling LLM for detailed analysis")
detailed_response = await llm.get_response(
detailed_prompt,
[input.transcript_text],
tone_name="Topic assistant",
)
detailed_summary = str(detailed_response)
paragraph_prompt = PARAGRAPH_SUMMARY_PROMPT
if participant_instructions:
paragraph_prompt = f"{paragraph_prompt}\n\n{participant_instructions}"
ctx.log("generate_detailed_summary: calling LLM for paragraph summary")
paragraph_response = await llm.get_response(
paragraph_prompt,
[detailed_summary],
tone_name="Topic summarizer",
)
paragraph_summary = str(paragraph_response)
ctx.log(f"generate_detailed_summary complete: subject '{input.subject}'")
logger.info(
"[Hatchet] generate_detailed_summary complete",
subject=input.subject,
subject_index=input.subject_index,
detailed_len=len(detailed_summary),
paragraph_len=len(paragraph_summary),
)
return SubjectSummaryResult(
subject=input.subject,
subject_index=input.subject_index,
detailed_summary=detailed_summary,
paragraph_summary=paragraph_summary,
)

View File

@@ -0,0 +1,97 @@
"""
Hatchet child workflow: TopicChunkProcessing
Handles topic detection for individual transcript chunks.
Spawned dynamically by detect_topics via aio_run_many() for parallel processing.
"""
from datetime import timedelta
from hatchet_sdk import ConcurrencyExpression, ConcurrencyLimitStrategy, Context
from hatchet_sdk.rate_limit import RateLimit
from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
from reflector.hatchet.workflows.models import TopicChunkResult
from reflector.logger import logger
from reflector.processors.prompts import TOPIC_PROMPT
from reflector.processors.types import Word
class TopicChunkInput(BaseModel):
"""Input for individual topic chunk processing."""
chunk_index: int
chunk_text: str
timestamp: float
duration: float
words: list[Word]
hatchet = HatchetClientManager.get_client()
topic_chunk_workflow = hatchet.workflow(
name="TopicChunkProcessing",
input_validator=TopicChunkInput,
concurrency=ConcurrencyExpression(
expression="'global'", # constant string = global limit across all runs
max_runs=20,
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
),
)
@topic_chunk_workflow.task(
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=1)],
)
async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunkResult:
"""Detect topic for a single transcript chunk."""
ctx.log(f"detect_chunk_topic: chunk {input.chunk_index}")
logger.info(
"[Hatchet] detect_chunk_topic",
chunk_index=input.chunk_index,
text_length=len(input.chunk_text),
)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing LLM HTTP connection pools across forks
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.transcript_topic_detector import ( # noqa: PLC0415
TopicResponse,
)
from reflector.settings import settings # noqa: PLC0415
from reflector.utils.text import clean_title # noqa: PLC0415
llm = LLM(settings=settings, temperature=0.9, max_tokens=500)
prompt = TOPIC_PROMPT.format(text=input.chunk_text)
response = await llm.get_structured_response(
prompt,
[input.chunk_text],
TopicResponse,
tone_name="Topic analyzer",
timeout=settings.LLM_STRUCTURED_RESPONSE_TIMEOUT,
)
title = clean_title(response.title)
ctx.log(
f"detect_chunk_topic complete: chunk {input.chunk_index}, title='{title[:50]}'"
)
logger.info(
"[Hatchet] detect_chunk_topic complete",
chunk_index=input.chunk_index,
title=title[:50],
)
return TopicChunkResult(
chunk_index=input.chunk_index,
title=title,
summary=response.summary,
timestamp=input.timestamp,
duration=input.duration,
words=input.words,
)

View File

@@ -4,10 +4,10 @@ Hatchet child workflow: TrackProcessing
Handles individual audio track processing: padding and transcription. Handles individual audio track processing: padding and transcription.
Spawned dynamically by the main diarization pipeline for each track. Spawned dynamically by the main diarization pipeline for each track.
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline) Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline)
because Hatchet workflow DAGs are defined statically, but the number of tracks varies because Hatchet workflow DAGs are defined statically, but the number of tracks varies
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py. standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
Note: This file uses deferred imports (inside tasks) intentionally. Note: This file uses deferred imports (inside tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure Hatchet workers run in forked processes; fresh imports per task ensure
@@ -23,6 +23,7 @@ from hatchet_sdk import Context
from pydantic import BaseModel from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
from reflector.logger import logger from reflector.logger import logger
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
@@ -47,7 +48,7 @@ hatchet = HatchetClientManager.get_client()
track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput) track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput)
@track_workflow.task(execution_timeout=timedelta(seconds=300), retries=3) @track_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
"""Pad single audio track with silence for alignment. """Pad single audio track with silence for alignment.
@@ -82,6 +83,15 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
) )
with av.open(source_url) as in_container: with av.open(source_url) as in_container:
if in_container.duration:
try:
duration = timedelta(seconds=in_container.duration // 1_000_000)
ctx.log(
f"pad_track: track {input.track_index}, duration={duration}"
)
except Exception:
ctx.log(f"pad_track: track {input.track_index}, duration=ERROR")
start_time_seconds = extract_stream_start_time_from_container( start_time_seconds = extract_stream_start_time_from_container(
in_container, input.track_index, logger=logger in_container, input.track_index, logger=logger
) )
@@ -153,7 +163,7 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
@track_workflow.task( @track_workflow.task(
parents=[pad_track], execution_timeout=timedelta(seconds=600), retries=3 parents=[pad_track], execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3
) )
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult: async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
"""Transcribe audio track using GPU (Modal.com) or local Whisper.""" """Transcribe audio track using GPU (Modal.com) or local Whisper."""
@@ -197,23 +207,20 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
transcript = await transcribe_file_with_processor(audio_url, input.language) transcript = await transcribe_file_with_processor(audio_url, input.language)
# Tag all words with speaker index # Tag all words with speaker index
words = []
for word in transcript.words: for word in transcript.words:
word_dict = word.model_dump() word.speaker = input.track_index
word_dict["speaker"] = input.track_index
words.append(word_dict)
ctx.log( ctx.log(
f"transcribe_track complete: track {input.track_index}, {len(words)} words" f"transcribe_track complete: track {input.track_index}, {len(transcript.words)} words"
) )
logger.info( logger.info(
"[Hatchet] transcribe_track complete", "[Hatchet] transcribe_track complete",
track_index=input.track_index, track_index=input.track_index,
word_count=len(words), word_count=len(transcript.words),
) )
return TranscribeTrackResult( return TranscribeTrackResult(
words=words, words=transcript.words,
track_index=input.track_index, track_index=input.track_index,
) )

View File

@@ -18,6 +18,7 @@ from reflector.processors import (
) )
from reflector.processors.types import TitleSummary from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType from reflector.processors.types import Transcript as TranscriptType
from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT
class EmptyPipeline: class EmptyPipeline:
@@ -38,7 +39,7 @@ async def detect_topics(
on_topic_callback: Callable, on_topic_callback: Callable,
empty_pipeline: EmptyPipeline, empty_pipeline: EmptyPipeline,
) -> list[TitleSummary]: ) -> list[TitleSummary]:
chunk_size = 300 chunk_size = TOPIC_CHUNK_WORD_COUNT
topics: list[TitleSummary] = [] topics: list[TitleSummary] = []
async def on_topic(topic: TitleSummary): async def on_topic(topic: TitleSummary):

View File

@@ -0,0 +1,30 @@
"""
LLM prompts for transcript processing.
Extracted to a separate module to avoid circular imports when importing
from processor modules (which import LLM/settings at module level).
"""
from textwrap import dedent
TOPIC_PROMPT = dedent(
"""
Analyze the following transcript segment and extract the main topic being discussed.
Focus on the substantive content and ignore small talk or administrative chatter.
Create a title that:
- Captures the specific subject matter being discussed
- Is descriptive and self-explanatory
- Uses professional language
- Is specific rather than generic
For the summary:
- Summarize the key points in maximum two sentences
- Focus on what was discussed, decided, or accomplished
- Be concise but informative
<transcript>
{text}
</transcript>
"""
).strip()

View File

@@ -0,0 +1,50 @@
"""Pydantic models for summary processing."""
from pydantic import BaseModel, Field
class ActionItem(BaseModel):
"""A single action item from the meeting"""
task: str = Field(description="The task or action item to be completed")
assigned_to: str | None = Field(
default=None, description="Person or team assigned to this task (name)"
)
assigned_to_participant_id: str | None = Field(
default=None, description="Participant ID if assigned_to matches a participant"
)
deadline: str | None = Field(
default=None, description="Deadline or timeframe mentioned for this task"
)
context: str | None = Field(
default=None, description="Additional context or notes about this task"
)
class Decision(BaseModel):
"""A decision made during the meeting"""
decision: str = Field(description="What was decided")
rationale: str | None = Field(
default=None,
description="Reasoning or key factors that influenced this decision",
)
decided_by: str | None = Field(
default=None, description="Person or group who made the decision (name)"
)
decided_by_participant_id: str | None = Field(
default=None, description="Participant ID if decided_by matches a participant"
)
class ActionItemsResponse(BaseModel):
"""Pydantic model for identified action items"""
decisions: list[Decision] = Field(
default_factory=list,
description="List of decisions made during the meeting",
)
next_steps: list[ActionItem] = Field(
default_factory=list,
description="List of action items and next steps to be taken",
)

View File

@@ -0,0 +1,91 @@
"""
LLM prompts for summary generation.
Extracted to a separate module to avoid circular imports when importing
from summary_builder.py (which imports LLM/settings at module level).
"""
from textwrap import dedent
def build_participant_instructions(participant_names: list[str]) -> str:
"""Build participant context instructions for LLM prompts."""
if not participant_names:
return ""
participants_list = ", ".join(participant_names)
return dedent(
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
"""
Get me information about the topic "{subject}"
# RESPONSE GUIDELINES
Follow this structured approach to create the topic summary:
- Highlight important arguments, insights, or data presented.
- Outline decisions made.
- Indicate any decisions reached, including any rationale or key factors
that influenced these decisions.
- Detail action items and responsibilities.
- For each decision or unresolved issue, list specific action items agreed
upon, along with assigned individuals or teams responsible for each task.
- Specify deadlines or timelines if mentioned. For each action item,
include any deadlines or timeframes discussed for completion or follow-up.
- Mention unresolved issues or topics needing further discussion, aiding in
planning future meetings or follow-up actions.
- Do not include topic unrelated to {subject}.
# OUTPUT
Your summary should be clear, concise, and structured, covering all major
points, decisions, and action items from the meeting. It should be easy to
understand for someone not present, providing a comprehensive understanding
of what transpired and what needs to be done next. The summary should not
exceed one page to ensure brevity and focus.
"""
).strip()
PARAGRAPH_SUMMARY_PROMPT = dedent(
"""
Summarize the mentioned topic in 1 paragraph.
It will be integrated into the final summary, so just for this topic.
"""
).strip()
RECAP_PROMPT = dedent(
"""
Provide a high-level quick recap of the following meeting, fitting in one paragraph.
Do not include decisions, action items or unresolved issue, just highlight the high moments.
Just dive into the meeting, be concise and do not include unnecessary details.
As we already know it is a meeting, do not start with 'During the meeting' or equivalent.
"""
).strip()
def build_summary_markdown(recap: str, summaries: list[dict[str, str]]) -> str:
"""Build markdown summary from recap and subject summaries."""
lines: list[str] = []
if recap:
lines.append("# Quick recap")
lines.append("")
lines.append(recap)
lines.append("")
if summaries:
lines.append("# Summary")
lines.append("")
for summary in summaries:
lines.append(f"**{summary['subject']}**")
lines.append(summary["summary"])
lines.append("")
return "\n".join(lines)

View File

@@ -15,6 +15,14 @@ import structlog
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from reflector.llm import LLM from reflector.llm import LLM
from reflector.processors.summary.models import ActionItemsResponse
from reflector.processors.summary.prompts import (
DETAILED_SUBJECT_PROMPT_TEMPLATE,
PARAGRAPH_SUMMARY_PROMPT,
RECAP_PROMPT,
build_participant_instructions,
build_summary_markdown,
)
from reflector.settings import settings from reflector.settings import settings
T = TypeVar("T", bound=BaseModel) T = TypeVar("T", bound=BaseModel)
@@ -52,50 +60,6 @@ SUBJECTS_PROMPT = dedent(
""" """
).strip() ).strip()
DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
"""
Get me information about the topic "{subject}"
# RESPONSE GUIDELINES
Follow this structured approach to create the topic summary:
- Highlight important arguments, insights, or data presented.
- Outline decisions made.
- Indicate any decisions reached, including any rationale or key factors
that influenced these decisions.
- Detail action items and responsibilities.
- For each decision or unresolved issue, list specific action items agreed
upon, along with assigned individuals or teams responsible for each task.
- Specify deadlines or timelines if mentioned. For each action item,
include any deadlines or timeframes discussed for completion or follow-up.
- Mention unresolved issues or topics needing further discussion, aiding in
planning future meetings or follow-up actions.
- Do not include topic unrelated to {subject}.
# OUTPUT
Your summary should be clear, concise, and structured, covering all major
points, decisions, and action items from the meeting. It should be easy to
understand for someone not present, providing a comprehensive understanding
of what transpired and what needs to be done next. The summary should not
exceed one page to ensure brevity and focus.
"""
).strip()
PARAGRAPH_SUMMARY_PROMPT = dedent(
"""
Summarize the mentioned topic in 1 paragraph.
It will be integrated into the final summary, so just for this topic.
"""
).strip()
RECAP_PROMPT = dedent(
"""
Provide a high-level quick recap of the following meeting, fitting in one paragraph.
Do not include decisions, action items or unresolved issue, just highlight the high moments.
Just dive into the meeting, be concise and do not include unnecessary details.
As we already know it is a meeting, do not start with 'During the meeting' or equivalent.
"""
).strip()
ACTION_ITEMS_PROMPT = dedent( ACTION_ITEMS_PROMPT = dedent(
""" """
Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next. Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next.
@@ -185,53 +149,6 @@ class SubjectsResponse(BaseModel):
) )
class ActionItem(BaseModel):
"""A single action item from the meeting"""
task: str = Field(description="The task or action item to be completed")
assigned_to: str | None = Field(
default=None, description="Person or team assigned to this task (name)"
)
assigned_to_participant_id: str | None = Field(
default=None, description="Participant ID if assigned_to matches a participant"
)
deadline: str | None = Field(
default=None, description="Deadline or timeframe mentioned for this task"
)
context: str | None = Field(
default=None, description="Additional context or notes about this task"
)
class Decision(BaseModel):
"""A decision made during the meeting"""
decision: str = Field(description="What was decided")
rationale: str | None = Field(
default=None,
description="Reasoning or key factors that influenced this decision",
)
decided_by: str | None = Field(
default=None, description="Person or group who made the decision (name)"
)
decided_by_participant_id: str | None = Field(
default=None, description="Participant ID if decided_by matches a participant"
)
class ActionItemsResponse(BaseModel):
"""Pydantic model for identified action items"""
decisions: list[Decision] = Field(
default_factory=list,
description="List of decisions made during the meeting",
)
next_steps: list[ActionItem] = Field(
default_factory=list,
description="List of action items and next steps to be taken",
)
class SummaryBuilder: class SummaryBuilder:
def __init__(self, llm: LLM, filename: str | None = None, logger=None) -> None: def __init__(self, llm: LLM, filename: str | None = None, logger=None) -> None:
self.transcript: str | None = None self.transcript: str | None = None
@@ -331,17 +248,7 @@ class SummaryBuilder:
participants_md = self.format_list_md(participants) participants_md = self.format_list_md(participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}" self.transcript += f"\n\n# Participants\n\n{participants_md}"
participants_list = ", ".join(participants) self.participant_instructions = build_participant_instructions(participants)
self.participant_instructions = dedent(
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
async def identify_participants(self) -> None: async def identify_participants(self) -> None:
""" """
@@ -377,18 +284,9 @@ class SummaryBuilder:
participants_md = self.format_list_md(unique_participants) participants_md = self.format_list_md(unique_participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}" self.transcript += f"\n\n# Participants\n\n{participants_md}"
# Set instructions that will be automatically added to all prompts self.participant_instructions = build_participant_instructions(
participants_list = ", ".join(unique_participants) unique_participants
self.participant_instructions = dedent( )
f"""
# IMPORTANT: Participant Names
The following participants are identified in this conversation: {participants_list}
You MUST use these specific participant names when referring to people in your response.
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
"""
).strip()
else: else:
self.logger.warning("No participants identified in the transcript") self.logger.warning("No participants identified in the transcript")
@@ -613,22 +511,7 @@ class SummaryBuilder:
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
def as_markdown(self) -> str: def as_markdown(self) -> str:
lines: list[str] = [] return build_summary_markdown(self.recap, self.summaries)
if self.recap:
lines.append("# Quick recap")
lines.append("")
lines.append(self.recap)
lines.append("")
if self.summaries:
lines.append("# Summary")
lines.append("")
for summary in self.summaries:
lines.append(f"**{summary['subject']}**")
lines.append(summary["summary"])
lines.append("")
return "\n".join(lines)
def format_list_md(self, data: list[str]) -> str: def format_list_md(self, data: list[str]) -> str:
return "\n".join([f"- {item}" for item in data]) return "\n".join([f"- {item}" for item in data])

View File

@@ -1,35 +1,12 @@
from textwrap import dedent
from pydantic import AliasChoices, BaseModel, Field from pydantic import AliasChoices, BaseModel, Field
from reflector.llm import LLM from reflector.llm import LLM
from reflector.processors.base import Processor from reflector.processors.base import Processor
from reflector.processors.prompts import TOPIC_PROMPT
from reflector.processors.types import TitleSummary, Transcript from reflector.processors.types import TitleSummary, Transcript
from reflector.settings import settings from reflector.settings import settings
from reflector.utils.text import clean_title from reflector.utils.text import clean_title
TOPIC_PROMPT = dedent(
"""
Analyze the following transcript segment and extract the main topic being discussed.
Focus on the substantive content and ignore small talk or administrative chatter.
Create a title that:
- Captures the specific subject matter being discussed
- Is descriptive and self-explanatory
- Uses professional language
- Is specific rather than generic
For the summary:
- Summarize the key points in maximum two sentences
- Focus on what was discussed, decided, or accomplished
- Be concise but informative
<transcript>
{text}
</transcript>
"""
).strip()
class TopicResponse(BaseModel): class TopicResponse(BaseModel):
"""Structured response for topic detection""" """Structured response for topic detection"""

View File

@@ -102,7 +102,8 @@ async def validate_transcript_for_processing(
if transcript.locked: if transcript.locked:
return ValidationLocked(detail="Recording is locked") return ValidationLocked(detail="Recording is locked")
if transcript.status == "idle": # hatchet is idempotent anyways + if it wasn't dispatched successfully
if transcript.status == "idle" and not settings.HATCHET_ENABLED:
return ValidationNotReady(detail="Recording is not ready for processing") return ValidationNotReady(detail="Recording is not ready for processing")
# Check Celery tasks # Check Celery tasks
@@ -115,13 +116,11 @@ async def validate_transcript_for_processing(
): ):
return ValidationAlreadyScheduled(detail="already running") return ValidationAlreadyScheduled(detail="already running")
# Check Hatchet workflows (if enabled)
if settings.HATCHET_ENABLED and transcript.workflow_run_id: if settings.HATCHET_ENABLED and transcript.workflow_run_id:
try: try:
status = await HatchetClientManager.get_workflow_run_status( status = await HatchetClientManager.get_workflow_run_status(
transcript.workflow_run_id transcript.workflow_run_id
) )
# If workflow is running or queued, don't allow new processing
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED): if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
return ValidationAlreadyScheduled( return ValidationAlreadyScheduled(
detail="Hatchet workflow already running" detail="Hatchet workflow already running"
@@ -189,8 +188,8 @@ async def dispatch_transcript_processing(
room_forces_hatchet = room.use_hatchet if room else False room_forces_hatchet = room.use_hatchet if room else False
# Start durable workflow if enabled (Hatchet) # Start durable workflow if enabled (Hatchet)
# or if room has use_hatchet=True # and if room has use_hatchet=True
use_hatchet = settings.HATCHET_ENABLED or room_forces_hatchet use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet
if room_forces_hatchet: if room_forces_hatchet:
logger.info( logger.info(

View File

@@ -43,6 +43,8 @@ async def mixdown_tracks_pyav(
target_sample_rate: int, target_sample_rate: int,
offsets_seconds: list[float] | None = None, offsets_seconds: list[float] | None = None,
logger=None, logger=None,
progress_callback=None,
expected_duration_sec: float | None = None,
) -> None: ) -> None:
"""Multi-track mixdown using PyAV filter graph (amix). """Multi-track mixdown using PyAV filter graph (amix).
@@ -57,6 +59,10 @@ async def mixdown_tracks_pyav(
If provided, must have same length as track_urls. Delays are relative If provided, must have same length as track_urls. Delays are relative
to the minimum offset (earliest track has delay=0). to the minimum offset (earliest track has delay=0).
logger: Optional logger instance logger: Optional logger instance
progress_callback: Optional callback(progress_pct: float | None, audio_position: float)
called on progress updates. progress_pct is 0-100 if duration known, None otherwise.
audio_position is current position in seconds.
expected_duration_sec: Optional fallback duration if container metadata unavailable.
Raises: Raises:
ValueError: If offsets_seconds length doesn't match track_urls, ValueError: If offsets_seconds length doesn't match track_urls,
@@ -171,6 +177,17 @@ async def mixdown_tracks_pyav(
logger.error("Mixdown failed - no valid containers opened") logger.error("Mixdown failed - no valid containers opened")
raise ValueError("Mixdown failed: Could not open any track containers") raise ValueError("Mixdown failed: Could not open any track containers")
# Calculate total duration for progress reporting.
# Try container metadata first, fall back to expected_duration_sec if provided.
max_duration_sec = 0.0
for c in containers:
if c.duration is not None:
dur_sec = c.duration / av.time_base
max_duration_sec = max(max_duration_sec, dur_sec)
if max_duration_sec == 0.0 and expected_duration_sec:
max_duration_sec = expected_duration_sec
current_max_time = 0.0
decoders = [c.decode(audio=0) for c in containers] decoders = [c.decode(audio=0) for c in containers]
active = [True] * len(decoders) active = [True] * len(decoders)
resamplers = [ resamplers = [
@@ -192,6 +209,18 @@ async def mixdown_tracks_pyav(
if frame.sample_rate != target_sample_rate: if frame.sample_rate != target_sample_rate:
continue continue
# Update progress based on frame timestamp
if progress_callback and frame.time is not None:
current_max_time = max(current_max_time, frame.time)
if max_duration_sec > 0:
progress_pct = min(
100.0, (current_max_time / max_duration_sec) * 100
)
else:
progress_pct = None # Duration unavailable
progress_callback(progress_pct, current_max_time)
out_frames = resamplers[i].resample(frame) or [] out_frames = resamplers[i].resample(frame) or []
for rf in out_frames: for rf in out_frames:
rf.sample_rate = target_sample_rate rf.sample_rate = target_sample_rate

View File

@@ -3,8 +3,12 @@ from pathlib import Path
import av import av
import numpy as np import numpy as np
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
def get_audio_waveform(path: Path | str, segments_count: int = 256) -> list[int]:
def get_audio_waveform(
path: Path | str, segments_count: int = WAVEFORM_SEGMENTS
) -> list[int]:
if isinstance(path, Path): if isinstance(path, Path):
path = path.as_posix() path = path.as_posix()
@@ -70,7 +74,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("path", type=Path) parser.add_argument("path", type=Path)
parser.add_argument("--segments-count", type=int, default=256) parser.add_argument("--segments-count", type=int, default=WAVEFORM_SEGMENTS)
args = parser.parse_args() args = parser.parse_args()
print(get_audio_waveform(args.path, args.segments_count)) print(get_audio_waveform(args.path, args.segments_count))

View File

@@ -0,0 +1,8 @@
"""
Shared transcript processing constants.
Used by both Hatchet workflows and Celery pipelines for consistent processing.
"""
# Topic detection: number of words per chunk for topic extraction
TOPIC_CHUNK_WORD_COUNT = 300

View File

@@ -0,0 +1,216 @@
"""Webhook utilities.
Shared webhook functionality for both Hatchet and Celery pipelines.
"""
import hashlib
import hmac
import uuid
from datetime import datetime, timezone
from typing import Union
import httpx
from pydantic import BaseModel
from reflector.logger import logger
from reflector.settings import settings
from reflector.utils.string import NonEmptyString
from reflector.utils.webhook_outgoing_models import (
WebhookCalendarEventPayload,
WebhookParticipantPayload,
WebhookPayload,
WebhookRoomPayload,
WebhookTestPayload,
WebhookTopicPayload,
WebhookTranscriptPayload,
)
__all__ = [
"fetch_transcript_webhook_payload",
"fetch_test_webhook_payload",
"build_webhook_headers",
"generate_webhook_signature",
"send_webhook_request",
]
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
from reflector.utils.webvtt import topics_to_webvtt
def _serialize_payload(payload: BaseModel) -> bytes:
"""Serialize Pydantic model to compact JSON bytes."""
return payload.model_dump_json(by_alias=True, exclude_none=False).encode("utf-8")
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC-SHA256 signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
def build_webhook_headers(
event_type: str,
payload_bytes: bytes,
webhook_secret: str | None = None,
retry_count: int = 0,
) -> dict[str, str]:
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": event_type,
"X-Webhook-Retry": str(retry_count),
}
if webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(payload_bytes, webhook_secret, timestamp)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
return headers
async def send_webhook_request(
url: str,
payload: BaseModel,
event_type: str,
webhook_secret: str | None = None,
retry_count: int = 0,
timeout: float = 30.0,
) -> httpx.Response:
"""Send webhook request with proper headers and signature.
Raises:
httpx.HTTPStatusError: On non-2xx response
httpx.ConnectError: On connection failure
httpx.TimeoutException: On timeout
"""
payload_bytes = _serialize_payload(payload)
headers = build_webhook_headers(
event_type=event_type,
payload_bytes=payload_bytes,
webhook_secret=webhook_secret,
retry_count=retry_count,
)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, content=payload_bytes, headers=headers)
response.raise_for_status()
return response
async def fetch_transcript_webhook_payload(
transcript_id: NonEmptyString,
room_id: NonEmptyString,
) -> Union[WebhookPayload, str]:
"""Build webhook payload by fetching transcript and room data from database."""
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
return f"Transcript {transcript_id} not found"
room = await rooms_controller.get_by_id(room_id)
if not room:
return f"Room {room_id} not found"
topics_data = [
WebhookTopicPayload(
title=topic.title,
summary=topic.summary,
timestamp=topic.timestamp,
duration=topic.duration,
webvtt=topics_to_webvtt([topic]) if topic.words else "",
)
for topic in (transcript.topics or [])
]
participants_data = [
WebhookParticipantPayload(id=p.id, name=p.name, speaker=p.speaker)
for p in (transcript.participants or [])
]
calendar_event_data: WebhookCalendarEventPayload | None = None
try:
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting and meeting.calendar_event_id:
calendar_event = await calendar_events_controller.get_by_id(
meeting.calendar_event_id
)
if calendar_event:
calendar_event_data = WebhookCalendarEventPayload(
id=calendar_event.id,
ics_uid=calendar_event.ics_uid,
title=calendar_event.title,
start_time=calendar_event.start_time,
end_time=calendar_event.end_time,
description=calendar_event.description or None,
location=calendar_event.location or None,
attendees=calendar_event.attendees or None,
)
except Exception as e:
logger.warning(
"Failed to fetch calendar event for webhook",
transcript_id=transcript_id,
meeting_id=transcript.meeting_id,
error=str(e),
)
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
return WebhookPayload(
event="transcript.completed",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
transcript=WebhookTranscriptPayload(
id=transcript.id,
room_id=transcript.room_id,
created_at=transcript.created_at,
duration=transcript.duration,
title=transcript.title,
short_summary=transcript.short_summary,
long_summary=transcript.long_summary,
webvtt=transcript.webvtt,
topics=topics_data,
participants=participants_data,
source_language=transcript.source_language,
target_language=transcript.target_language,
status=transcript.status,
frontend_url=frontend_url,
action_items=transcript.action_items,
),
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
calendar_event=calendar_event_data,
)
async def fetch_test_webhook_payload(
room_id: NonEmptyString,
) -> WebhookTestPayload | None:
"""Build test webhook payload."""
room = await rooms_controller.get_by_id(room_id)
if not room:
return None
return WebhookTestPayload(
event="test",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
message="This is a test webhook from Reflector",
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
)

View File

@@ -0,0 +1,80 @@
"""Pydantic models for outgoing webhook payloads.
These models define the structure of webhook payloads sent by Reflector
to external services when transcript processing completes.
"""
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
from reflector.utils.string import NonEmptyString
WebhookTranscriptEventType = Literal["transcript.completed"]
WebhookTestEventType = Literal["test"]
class WebhookTopicPayload(BaseModel):
title: NonEmptyString
summary: NonEmptyString
timestamp: float
duration: float | None
webvtt: str # can be empty when no words
class WebhookParticipantPayload(BaseModel):
id: NonEmptyString
name: str | None
speaker: int | None
class WebhookRoomPayload(BaseModel):
id: NonEmptyString
name: NonEmptyString
class WebhookCalendarEventPayload(BaseModel):
id: NonEmptyString
ics_uid: str | None = None
title: str | None = None
start_time: datetime | None = None
end_time: datetime | None = None
description: str | None = None
location: str | None = None
attendees: list[str] | None = None
class WebhookTranscriptPayload(BaseModel):
id: NonEmptyString
room_id: NonEmptyString | None
created_at: datetime
duration: float | None
title: str | None
short_summary: str | None
long_summary: str | None
webvtt: str | None
topics: list[WebhookTopicPayload]
participants: list[WebhookParticipantPayload]
source_language: NonEmptyString
target_language: NonEmptyString
status: NonEmptyString
frontend_url: NonEmptyString
action_items: dict | None
class WebhookPayload(BaseModel):
event: WebhookTranscriptEventType
event_id: NonEmptyString
timestamp: datetime
transcript: WebhookTranscriptPayload
room: WebhookRoomPayload
calendar_event: WebhookCalendarEventPayload | None = None
class WebhookTestPayload(BaseModel):
event: WebhookTestEventType
event_id: NonEmptyString
timestamp: datetime
message: NonEmptyString
room: WebhookRoomPayload

View File

@@ -287,9 +287,7 @@ async def _process_multitrack_recording_inner(
room_id=room.id, room_id=room.id,
) )
# Start durable workflow if enabled (Hatchet) or room overrides it use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
durable_started = False
use_hatchet = settings.HATCHET_ENABLED or (room and room.use_hatchet)
if room and room.use_hatchet and not settings.HATCHET_ENABLED: if room and room.use_hatchet and not settings.HATCHET_ENABLED:
logger.info( logger.info(
@@ -300,7 +298,7 @@ async def _process_multitrack_recording_inner(
if use_hatchet: if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow( workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline", workflow_name="DailyMultitrackPipeline",
input_data={ input_data={
"recording_id": recording_id, "recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)], "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
@@ -836,7 +834,7 @@ async def reprocess_failed_daily_recordings():
) )
continue continue
use_hatchet = settings.HATCHET_ENABLED or (room and room.use_hatchet) use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
if use_hatchet: if use_hatchet:
# Hatchet requires a transcript for workflow_run_id tracking # Hatchet requires a transcript for workflow_run_id tracking
@@ -848,7 +846,7 @@ async def reprocess_failed_daily_recordings():
continue continue
workflow_id = await HatchetClientManager.start_workflow( workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline", workflow_name="DailyMultitrackPipeline",
input_data={ input_data={
"recording_id": recording.id, "recording_id": recording.id,
"tracks": [ "tracks": [

View File

@@ -1,8 +1,5 @@
"""Webhook task for sending transcript notifications.""" """Webhook task for sending transcript notifications."""
import hashlib
import hmac
import json
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -11,28 +8,20 @@ import structlog
from celery import shared_task from celery import shared_task
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import asynctask from reflector.pipelines.main_live_pipeline import asynctask
from reflector.settings import settings from reflector.utils.webhook import (
from reflector.utils.webvtt import topics_to_webvtt WebhookRoomPayload,
WebhookTestPayload,
_serialize_payload,
build_webhook_headers,
fetch_transcript_webhook_payload,
send_webhook_request,
)
logger = structlog.wrap_logger(get_task_logger(__name__)) logger = structlog.wrap_logger(get_task_logger(__name__))
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
@shared_task( @shared_task(
bind=True, bind=True,
max_retries=30, max_retries=30,
@@ -54,12 +43,6 @@ async def send_transcript_webhook(
) )
try: try:
# Fetch transcript and room
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
log.error("Transcript not found, skipping webhook")
return
room = await rooms_controller.get_by_id(room_id) room = await rooms_controller.get_by_id(room_id)
if not room: if not room:
log.error("Room not found, skipping webhook") log.error("Room not found, skipping webhook")
@@ -69,135 +52,36 @@ async def send_transcript_webhook(
log.info("No webhook URL configured for room, skipping") log.info("No webhook URL configured for room, skipping")
return return
# Generate WebVTT content from topics payload = await fetch_transcript_webhook_payload(
topics_data = [] transcript_id=transcript_id,
room_id=room_id,
)
if transcript.topics: if isinstance(payload, str):
# Build topics data with diarized content per topic log.error(f"Could not build webhook payload, skipping: {payload}")
for topic in transcript.topics: return
topic_webvtt = topics_to_webvtt([topic]) if topic.words else ""
topics_data.append(
{
"title": topic.title,
"summary": topic.summary,
"timestamp": topic.timestamp,
"duration": topic.duration,
"webvtt": topic_webvtt,
}
)
# Fetch meeting and calendar event if they exist log.info(
calendar_event = None "Sending webhook",
try: url=room.webhook_url,
if transcript.meeting_id: topics=len(payload.transcript.topics),
meeting = await meetings_controller.get_by_id(transcript.meeting_id) participants=len(payload.transcript.participants),
if meeting and meeting.calendar_event_id: )
calendar_event = await calendar_events_controller.get_by_id(
meeting.calendar_event_id
)
except Exception as e:
logger.error("Error fetching meeting or calendar event", error=str(e))
# Build webhook payload response = await send_webhook_request(
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}" url=room.webhook_url,
participants = [ payload=payload,
{"id": p.id, "name": p.name, "speaker": p.speaker} event_type="transcript.completed",
for p in (transcript.participants or []) webhook_secret=room.webhook_secret,
] retry_count=self.request.retries,
payload_data = { timeout=30.0,
"event": "transcript.completed", )
"event_id": event_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"transcript": {
"id": transcript.id,
"room_id": transcript.room_id,
"created_at": transcript.created_at.isoformat(),
"duration": transcript.duration,
"title": transcript.title,
"short_summary": transcript.short_summary,
"long_summary": transcript.long_summary,
"webvtt": transcript.webvtt,
"topics": topics_data,
"participants": participants,
"source_language": transcript.source_language,
"target_language": transcript.target_language,
"status": transcript.status,
"frontend_url": frontend_url,
"action_items": transcript.action_items,
},
"room": {
"id": room.id,
"name": room.name,
},
}
# Always include calendar_event field, even if no event is present log.info(
payload_data["calendar_event"] = {} "Webhook sent successfully",
status_code=response.status_code,
# Add calendar event data if present response_size=len(response.content),
if calendar_event: )
calendar_data = {
"id": calendar_event.id,
"ics_uid": calendar_event.ics_uid,
"title": calendar_event.title,
"start_time": calendar_event.start_time.isoformat()
if calendar_event.start_time
else None,
"end_time": calendar_event.end_time.isoformat()
if calendar_event.end_time
else None,
}
# Add optional fields only if they exist
if calendar_event.description:
calendar_data["description"] = calendar_event.description
if calendar_event.location:
calendar_data["location"] = calendar_event.location
if calendar_event.attendees:
calendar_data["attendees"] = calendar_event.attendees
payload_data["calendar_event"] = calendar_data
# Convert to JSON
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8")
# Generate signature if secret is configured
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "transcript.completed",
"X-Webhook-Retry": str(self.request.retries),
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
# Send webhook with timeout
async with httpx.AsyncClient(timeout=30.0) as client:
log.info(
"Sending webhook",
url=room.webhook_url,
payload_size=len(payload_bytes),
)
response = await client.post(
room.webhook_url,
content=payload_bytes,
headers=headers,
)
response.raise_for_status()
log.info(
"Webhook sent successfully",
status_code=response.status_code,
response_size=len(response.content),
)
except httpx.HTTPStatusError as e: except httpx.HTTPStatusError as e:
log.error( log.error(
@@ -226,8 +110,8 @@ async def send_transcript_webhook(
async def test_webhook(room_id: str) -> dict: async def test_webhook(room_id: str) -> dict:
""" """Test webhook configuration by sending a sample payload.
Test webhook configuration by sending a sample payload.
Returns immediately with success/failure status. Returns immediately with success/failure status.
This is the shared implementation used by both the API endpoint and Celery task. This is the shared implementation used by both the API endpoint and Celery task.
""" """
@@ -239,34 +123,24 @@ async def test_webhook(room_id: str) -> dict:
if not room.webhook_url: if not room.webhook_url:
return {"success": False, "error": "No webhook URL configured"} return {"success": False, "error": "No webhook URL configured"}
now = (datetime.now(timezone.utc).isoformat(),) payload = WebhookTestPayload(
payload_data = { event="test",
"event": "test", event_id=uuid.uuid4().hex,
"event_id": uuid.uuid4().hex, timestamp=datetime.now(timezone.utc),
"timestamp": now, message="This is a test webhook from Reflector",
"message": "This is a test webhook from Reflector", room=WebhookRoomPayload(
"room": { id=room.id,
"id": room.id, name=room.name,
"name": room.name, ),
}, )
}
payload_json = json.dumps(payload_data, separators=(",", ":")) payload_bytes = _serialize_payload(payload)
payload_bytes = payload_json.encode("utf-8")
# Generate headers with signature headers = build_webhook_headers(
headers = { event_type="test",
"Content-Type": "application/json", payload_bytes=payload_bytes,
"User-Agent": "Reflector-Webhook/1.0", webhook_secret=room.webhook_secret,
"X-Webhook-Event": "test", )
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
# Send test webhook with short timeout # Send test webhook with short timeout
async with httpx.AsyncClient(timeout=10.0) as client: async with httpx.AsyncClient(timeout=10.0) as client: