Compare commits

..

8 Commits

Author SHA1 Message Date
Igor Loskutov
b72403fda7 merge 2025-12-23 17:56:48 -05:00
Igor Loskutov
abaf819c3e self-review 2025-12-23 17:55:42 -05:00
Igor Loskutov
f8c4f542c1 cleanup 2025-12-23 17:44:45 -05:00
Igor Loskutov
e369ed66ca Merge branch 'main' into fix/pipeline-fixes-whereby-preparation 2025-12-23 17:40:42 -05:00
Igor Monadical
3cf9757ac2 diarization flow - pralellelize better (#808)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 17:35:43 -05:00
Igor Monadical
d9d3938192 better hatchet concurrency limits (#807)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-23 17:26:23 -05:00
Igor Loskutov
10c17d7086 send_webhook fixes 2025-12-23 16:56:43 -05:00
Igor Loskutov
7a1d1dc08d pipeline fixes: whereby Hatchet preparation 2025-12-23 13:47:04 -05:00
10 changed files with 437 additions and 234 deletions

View File

@@ -1,5 +1,5 @@
""" """
Run Hatchet workers for the diarization pipeline. Run Hatchet workers for the multitrack pipeline.
Runs as a separate process, just like Celery workers. Runs as a separate process, just like Celery workers.
Usage: Usage:
@@ -39,7 +39,7 @@ def main() -> None:
# Can't use lazy init: decorators need the client object when function is defined. # Can't use lazy init: decorators need the client object when function is defined.
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415 from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415 from reflector.hatchet.workflows import ( # noqa: PLC0415
diarization_pipeline, daily_multitrack_pipeline,
subject_workflow, subject_workflow,
topic_chunk_workflow, topic_chunk_workflow,
track_workflow, track_workflow,
@@ -54,7 +54,7 @@ def main() -> None:
worker = hatchet.worker( worker = hatchet.worker(
"reflector-pipeline-worker", "reflector-pipeline-worker",
workflows=[ workflows=[
diarization_pipeline, daily_multitrack_pipeline,
subject_workflow, subject_workflow,
topic_chunk_workflow, topic_chunk_workflow,
track_workflow, track_workflow,

View File

@@ -1,8 +1,8 @@
"""Hatchet workflow definitions.""" """Hatchet workflow definitions."""
from reflector.hatchet.workflows.diarization_pipeline import ( from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput, PipelineInput,
diarization_pipeline, daily_multitrack_pipeline,
) )
from reflector.hatchet.workflows.subject_processing import ( from reflector.hatchet.workflows.subject_processing import (
SubjectInput, SubjectInput,
@@ -15,7 +15,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
__all__ = [ __all__ = [
"diarization_pipeline", "daily_multitrack_pipeline",
"subject_workflow", "subject_workflow",
"topic_chunk_workflow", "topic_chunk_workflow",
"track_workflow", "track_workflow",

View File

@@ -1,9 +1,12 @@
""" """
Hatchet main workflow: DiarizationPipeline Hatchet main workflow: DailyMultitrackPipeline
Multitrack diarization pipeline for Daily.co recordings. Multitrack processing pipeline for Daily.co recordings.
Orchestrates the full processing flow from recording metadata to final transcript. Orchestrates the full processing flow from recording metadata to final transcript.
Daily.co recordings don't require ML diarization - speaker identification comes from
track index (each participant's audio is a separate track).
Note: This file uses deferred imports (inside functions/tasks) intentionally. Note: This file uses deferred imports (inside functions/tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure DB connections Hatchet workers run in forked processes; fresh imports per task ensure DB connections
are not shared across forks, avoiding connection pooling issues. are not shared across forks, avoiding connection pooling issues.
@@ -93,7 +96,7 @@ from reflector.zulip import post_transcript_notification
class PipelineInput(BaseModel): class PipelineInput(BaseModel):
"""Input to trigger the diarization pipeline.""" """Input to trigger the Daily.co multitrack pipeline."""
recording_id: NonEmptyString recording_id: NonEmptyString
tracks: list[dict] # List of {"s3_key": str} tracks: list[dict] # List of {"s3_key": str}
@@ -104,7 +107,7 @@ class PipelineInput(BaseModel):
hatchet = HatchetClientManager.get_client() hatchet = HatchetClientManager.get_client()
diarization_pipeline = hatchet.workflow( daily_multitrack_pipeline = hatchet.workflow(
name="DiarizationPipeline", input_validator=PipelineInput name="DiarizationPipeline", input_validator=PipelineInput
) )
@@ -191,7 +194,7 @@ def with_error_handling(step_name: str, set_error_status: bool = True) -> Callab
return decorator return decorator
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3 execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
) )
@with_error_handling("get_recording") @with_error_handling("get_recording")
@@ -244,7 +247,7 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
) )
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[get_recording], parents=[get_recording],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3, retries=3,
@@ -335,7 +338,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
) )
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[get_participants], parents=[get_participants],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3, retries=3,
@@ -404,7 +407,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
) )
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[process_tracks], parents=[process_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
retries=3, retries=3,
@@ -487,7 +490,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
) )
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[mixdown_tracks], parents=[mixdown_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM), execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3, retries=3,
@@ -555,8 +558,8 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
return WaveformResult(waveform_generated=True) return WaveformResult(waveform_generated=True)
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[mixdown_tracks], parents=[process_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3, retries=3,
) )
@@ -658,7 +661,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
return TopicsResult(topics=topics_list) return TopicsResult(topics=topics_list)
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[detect_topics], parents=[detect_topics],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3, retries=3,
@@ -721,7 +724,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
return TitleResult(title=title_result) return TitleResult(title=title_result)
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[detect_topics], parents=[detect_topics],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM), execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3, retries=3,
@@ -802,7 +805,7 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
) )
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[extract_subjects], parents=[extract_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3, retries=3,
@@ -844,7 +847,7 @@ async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubject
return ProcessSubjectsResult(subject_summaries=subject_summaries) return ProcessSubjectsResult(subject_summaries=subject_summaries)
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[process_subjects], parents=[process_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM), execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3, retries=3,
@@ -938,7 +941,7 @@ async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
return RecapResult(short_summary=short_summary, long_summary=long_summary) return RecapResult(short_summary=short_summary, long_summary=long_summary)
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[extract_subjects], parents=[extract_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_LONG), execution_timeout=timedelta(seconds=TIMEOUT_LONG),
retries=3, retries=3,
@@ -1009,7 +1012,7 @@ async def identify_action_items(
return ActionItemsResult(action_items=action_items_dict) return ActionItemsResult(action_items=action_items_dict)
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[generate_waveform, generate_title, generate_recap, identify_action_items], parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3, retries=3,
@@ -1094,7 +1097,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
return FinalizeResult(status="COMPLETED") return FinalizeResult(status="COMPLETED")
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[finalize], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3 parents=[finalize], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
) )
@with_error_handling("cleanup_consent", set_error_status=False) @with_error_handling("cleanup_consent", set_error_status=False)
@@ -1194,7 +1197,7 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
return ConsentResult() return ConsentResult()
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[cleanup_consent], parents=[cleanup_consent],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=5, retries=5,
@@ -1221,14 +1224,14 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
return ZulipResult(zulip_message_id=message_id) return ZulipResult(zulip_message_id=message_id)
@diarization_pipeline.task( @daily_multitrack_pipeline.task(
parents=[post_zulip], parents=[cleanup_consent],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM), execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=30, retries=5,
) )
@with_error_handling("send_webhook", set_error_status=False) @with_error_handling("send_webhook", set_error_status=False)
async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult: async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
"""Send completion webhook to external service.""" """Send completion webhook to external service with full payload and HMAC signature."""
ctx.log(f"send_webhook: transcript_id={input.transcript_id}") ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
if not input.room_id: if not input.room_id:
@@ -1237,27 +1240,39 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.rooms import rooms_controller # noqa: PLC0415 from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 from reflector.utils.webhook import ( # noqa: PLC0415
build_transcript_webhook_payload,
send_webhook_request,
)
room = await rooms_controller.get_by_id(input.room_id) room = await rooms_controller.get_by_id(input.room_id)
transcript = await transcripts_controller.get_by_id(input.transcript_id) if not room or not room.webhook_url:
ctx.log("send_webhook skipped (no webhook_url configured)")
return WebhookResult(webhook_sent=False, skipped=True)
if room and room.webhook_url and transcript: payload = await build_transcript_webhook_payload(
webhook_payload = { transcript_id=input.transcript_id,
"event": "transcript.completed", room_id=input.room_id,
"transcript_id": input.transcript_id, )
"title": transcript.title,
"duration": transcript.duration, if not payload:
} ctx.log("send_webhook skipped (could not build payload)")
return WebhookResult(webhook_sent=False, skipped=True)
async with httpx.AsyncClient() as client:
response = await client.post( ctx.log(
room.webhook_url, json=webhook_payload, timeout=30 f"send_webhook: sending to {room.webhook_url} "
f"(topics={len(payload.transcript.topics)}, "
f"participants={len(payload.transcript.participants)})"
)
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
timeout=30.0,
) )
response.raise_for_status()
ctx.log(f"send_webhook complete: status_code={response.status_code}") ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code) return WebhookResult(webhook_sent=True, response_code=response.status_code)
return WebhookResult(webhook_sent=False, skipped=True)

View File

@@ -7,12 +7,12 @@ Spawned dynamically by detect_topics via aio_run_many() for parallel processing.
from datetime import timedelta from datetime import timedelta
from hatchet_sdk import Context from hatchet_sdk import ConcurrencyExpression, Context
from hatchet_sdk.rate_limit import RateLimit from hatchet_sdk.rate_limit import RateLimit
from pydantic import BaseModel from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_SHORT from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
from reflector.hatchet.workflows.models import TopicChunkResult from reflector.hatchet.workflows.models import TopicChunkResult
from reflector.logger import logger from reflector.logger import logger
from reflector.processors.prompts import TOPIC_PROMPT from reflector.processors.prompts import TOPIC_PROMPT
@@ -32,12 +32,17 @@ class TopicChunkInput(BaseModel):
hatchet = HatchetClientManager.get_client() hatchet = HatchetClientManager.get_client()
topic_chunk_workflow = hatchet.workflow( topic_chunk_workflow = hatchet.workflow(
name="TopicChunkProcessing", input_validator=TopicChunkInput name="TopicChunkProcessing",
input_validator=TopicChunkInput,
concurrency=ConcurrencyExpression(
expression="true", # constant CEL expression = global limit
max_runs=20,
),
) )
@topic_chunk_workflow.task( @topic_chunk_workflow.task(
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3, retries=3,
rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=1)], rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=1)],
) )

View File

@@ -4,10 +4,10 @@ Hatchet child workflow: TrackProcessing
Handles individual audio track processing: padding and transcription. Handles individual audio track processing: padding and transcription.
Spawned dynamically by the main diarization pipeline for each track. Spawned dynamically by the main diarization pipeline for each track.
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline) Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline)
because Hatchet workflow DAGs are defined statically, but the number of tracks varies because Hatchet workflow DAGs are defined statically, but the number of tracks varies
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py. standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
Note: This file uses deferred imports (inside tasks) intentionally. Note: This file uses deferred imports (inside tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure Hatchet workers run in forked processes; fresh imports per task ensure

View File

@@ -3,8 +3,12 @@ from pathlib import Path
import av import av
import numpy as np import numpy as np
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
def get_audio_waveform(path: Path | str, segments_count: int = 256) -> list[int]:
def get_audio_waveform(
path: Path | str, segments_count: int = WAVEFORM_SEGMENTS
) -> list[int]:
if isinstance(path, Path): if isinstance(path, Path):
path = path.as_posix() path = path.as_posix()
@@ -70,7 +74,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("path", type=Path) parser.add_argument("path", type=Path)
parser.add_argument("--segments-count", type=int, default=256) parser.add_argument("--segments-count", type=int, default=WAVEFORM_SEGMENTS)
args = parser.parse_args() args = parser.parse_args()
print(get_audio_waveform(args.path, args.segments_count)) print(get_audio_waveform(args.path, args.segments_count))

View File

@@ -0,0 +1,219 @@
"""Webhook utilities.
Shared webhook functionality for both Hatchet and Celery pipelines.
"""
import hashlib
import hmac
import uuid
from datetime import datetime, timezone
import httpx
from pydantic import BaseModel
from reflector.logger import logger
from reflector.settings import settings
from reflector.utils.string import NonEmptyString
from reflector.utils.webhook_outgoing_models import (
WebhookCalendarEventPayload,
WebhookParticipantPayload,
WebhookPayload,
WebhookRoomPayload,
WebhookTestPayload,
WebhookTopicPayload,
WebhookTranscriptPayload,
)
__all__ = [
"build_transcript_webhook_payload",
"build_test_webhook_payload",
"build_webhook_headers",
"generate_webhook_signature",
"send_webhook_request",
]
def _serialize_payload(payload: BaseModel) -> bytes:
"""Serialize Pydantic model to compact JSON bytes."""
return payload.model_dump_json(by_alias=True, exclude_none=False).encode("utf-8")
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC-SHA256 signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
def build_webhook_headers(
event_type: str,
payload_bytes: bytes,
webhook_secret: str | None = None,
retry_count: int = 0,
) -> dict[str, str]:
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": event_type,
"X-Webhook-Retry": str(retry_count),
}
if webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(payload_bytes, webhook_secret, timestamp)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
return headers
async def send_webhook_request(
url: str,
payload: BaseModel,
event_type: str,
webhook_secret: str | None = None,
retry_count: int = 0,
timeout: float = 30.0,
) -> httpx.Response:
"""Send webhook request with proper headers and signature.
Raises:
httpx.HTTPStatusError: On non-2xx response
httpx.ConnectError: On connection failure
httpx.TimeoutException: On timeout
"""
payload_bytes = _serialize_payload(payload)
headers = build_webhook_headers(
event_type=event_type,
payload_bytes=payload_bytes,
webhook_secret=webhook_secret,
retry_count=retry_count,
)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, content=payload_bytes, headers=headers)
response.raise_for_status()
return response
async def build_transcript_webhook_payload(
transcript_id: NonEmptyString,
room_id: NonEmptyString,
) -> WebhookPayload | None:
"""Build webhook payload by fetching transcript and room data from database."""
# Inline imports required: this utils module would create circular imports
# if db modules were imported at top level (utils -> db -> ... -> utils).
# This pattern is consistent with Hatchet task files.
from reflector.db.calendar_events import calendar_events_controller # noqa: PLC0415
from reflector.db.meetings import meetings_controller # noqa: PLC0415
from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.utils.webvtt import topics_to_webvtt # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
return None
room = await rooms_controller.get_by_id(room_id)
if not room:
return None
topics_data = [
WebhookTopicPayload(
title=topic.title,
summary=topic.summary,
timestamp=topic.timestamp,
duration=topic.duration,
webvtt=topics_to_webvtt([topic]) if topic.words else "",
)
for topic in (transcript.topics or [])
]
participants_data = [
WebhookParticipantPayload(id=p.id, name=p.name, speaker=p.speaker)
for p in (transcript.participants or [])
]
calendar_event_data: WebhookCalendarEventPayload | None = None
try:
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting and meeting.calendar_event_id:
calendar_event = await calendar_events_controller.get_by_id(
meeting.calendar_event_id
)
if calendar_event:
calendar_event_data = WebhookCalendarEventPayload(
id=calendar_event.id,
ics_uid=calendar_event.ics_uid,
title=calendar_event.title,
start_time=calendar_event.start_time,
end_time=calendar_event.end_time,
description=calendar_event.description or None,
location=calendar_event.location or None,
attendees=calendar_event.attendees or None,
)
except Exception as e:
logger.warning(
"Failed to fetch calendar event for webhook",
transcript_id=transcript_id,
meeting_id=transcript.meeting_id,
error=str(e),
)
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
return WebhookPayload(
event="transcript.completed",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
transcript=WebhookTranscriptPayload(
id=transcript.id,
room_id=transcript.room_id,
created_at=transcript.created_at,
duration=transcript.duration,
title=transcript.title,
short_summary=transcript.short_summary,
long_summary=transcript.long_summary,
webvtt=transcript.webvtt,
topics=topics_data,
participants=participants_data,
source_language=transcript.source_language,
target_language=transcript.target_language,
status=transcript.status,
frontend_url=frontend_url,
action_items=transcript.action_items,
),
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
calendar_event=calendar_event_data,
)
async def build_test_webhook_payload(
room_id: NonEmptyString,
) -> WebhookTestPayload | None:
"""Build test webhook payload."""
# Inline import: avoid circular dependency (utils -> db -> utils)
from reflector.db.rooms import rooms_controller # noqa: PLC0415
room = await rooms_controller.get_by_id(room_id)
if not room:
return None
return WebhookTestPayload(
event="test",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
message="This is a test webhook from Reflector",
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
)

View File

@@ -0,0 +1,80 @@
"""Pydantic models for outgoing webhook payloads.
These models define the structure of webhook payloads sent by Reflector
to external services when transcript processing completes.
"""
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
from reflector.utils.string import NonEmptyString
WebhookTranscriptEventType = Literal["transcript.completed"]
WebhookTestEventType = Literal["test"]
class WebhookTopicPayload(BaseModel):
title: NonEmptyString
summary: NonEmptyString
timestamp: float
duration: float | None
webvtt: str # can be empty when no words
class WebhookParticipantPayload(BaseModel):
id: NonEmptyString
name: str | None
speaker: int | None
class WebhookRoomPayload(BaseModel):
id: NonEmptyString
name: NonEmptyString
class WebhookCalendarEventPayload(BaseModel):
id: NonEmptyString
ics_uid: str | None = None
title: str | None = None
start_time: datetime | None = None
end_time: datetime | None = None
description: str | None = None
location: str | None = None
attendees: list[str] | None = None
class WebhookTranscriptPayload(BaseModel):
id: NonEmptyString
room_id: NonEmptyString | None
created_at: datetime
duration: float | None
title: str | None
short_summary: str | None
long_summary: str | None
webvtt: str | None
topics: list[WebhookTopicPayload]
participants: list[WebhookParticipantPayload]
source_language: NonEmptyString
target_language: NonEmptyString
status: NonEmptyString
frontend_url: NonEmptyString
action_items: dict | None
class WebhookPayload(BaseModel):
event: WebhookTranscriptEventType
event_id: NonEmptyString
timestamp: datetime
transcript: WebhookTranscriptPayload
room: WebhookRoomPayload
calendar_event: WebhookCalendarEventPayload | None = None
class WebhookTestPayload(BaseModel):
event: WebhookTestEventType
event_id: NonEmptyString
timestamp: datetime
message: NonEmptyString
room: WebhookRoomPayload

View File

@@ -300,7 +300,7 @@ async def _process_multitrack_recording_inner(
if use_hatchet: if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow( workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline", workflow_name="DailyMultitrackPipeline",
input_data={ input_data={
"recording_id": recording_id, "recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)], "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
@@ -848,7 +848,7 @@ async def reprocess_failed_daily_recordings():
continue continue
workflow_id = await HatchetClientManager.start_workflow( workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline", workflow_name="DailyMultitrackPipeline",
input_data={ input_data={
"recording_id": recording.id, "recording_id": recording.id,
"tracks": [ "tracks": [

View File

@@ -1,8 +1,5 @@
"""Webhook task for sending transcript notifications.""" """Webhook task for sending transcript notifications."""
import hashlib
import hmac
import json
import uuid import uuid
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -11,28 +8,20 @@ import structlog
from celery import shared_task from celery import shared_task
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import asynctask from reflector.pipelines.main_live_pipeline import asynctask
from reflector.settings import settings from reflector.utils.webhook import (
from reflector.utils.webvtt import topics_to_webvtt WebhookRoomPayload,
WebhookTestPayload,
_serialize_payload,
build_transcript_webhook_payload,
build_webhook_headers,
send_webhook_request,
)
logger = structlog.wrap_logger(get_task_logger(__name__)) logger = structlog.wrap_logger(get_task_logger(__name__))
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
@shared_task( @shared_task(
bind=True, bind=True,
max_retries=30, max_retries=30,
@@ -47,6 +36,11 @@ async def send_transcript_webhook(
room_id: str, room_id: str,
event_id: str, event_id: str,
): ):
"""Send webhook notification for completed transcript.
Uses shared Pydantic models and signature generation from utils/webhook.py
to ensure consistency with Hatchet pipeline.
"""
log = logger.bind( log = logger.bind(
transcript_id=transcript_id, transcript_id=transcript_id,
room_id=room_id, room_id=room_id,
@@ -54,12 +48,6 @@ async def send_transcript_webhook(
) )
try: try:
# Fetch transcript and room
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
log.error("Transcript not found, skipping webhook")
return
room = await rooms_controller.get_by_id(room_id) room = await rooms_controller.get_by_id(room_id)
if not room: if not room:
log.error("Room not found, skipping webhook") log.error("Room not found, skipping webhook")
@@ -69,130 +57,32 @@ async def send_transcript_webhook(
log.info("No webhook URL configured for room, skipping") log.info("No webhook URL configured for room, skipping")
return return
# Generate WebVTT content from topics # Build payload using shared function
topics_data = [] payload = await build_transcript_webhook_payload(
transcript_id=transcript_id,
if transcript.topics: room_id=room_id,
# Build topics data with diarized content per topic
for topic in transcript.topics:
topic_webvtt = topics_to_webvtt([topic]) if topic.words else ""
topics_data.append(
{
"title": topic.title,
"summary": topic.summary,
"timestamp": topic.timestamp,
"duration": topic.duration,
"webvtt": topic_webvtt,
}
) )
# Fetch meeting and calendar event if they exist if not payload:
calendar_event = None log.error("Could not build webhook payload, skipping")
try: return
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting and meeting.calendar_event_id:
calendar_event = await calendar_events_controller.get_by_id(
meeting.calendar_event_id
)
except Exception as e:
logger.error("Error fetching meeting or calendar event", error=str(e))
# Build webhook payload
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
participants = [
{"id": p.id, "name": p.name, "speaker": p.speaker}
for p in (transcript.participants or [])
]
payload_data = {
"event": "transcript.completed",
"event_id": event_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"transcript": {
"id": transcript.id,
"room_id": transcript.room_id,
"created_at": transcript.created_at.isoformat(),
"duration": transcript.duration,
"title": transcript.title,
"short_summary": transcript.short_summary,
"long_summary": transcript.long_summary,
"webvtt": transcript.webvtt,
"topics": topics_data,
"participants": participants,
"source_language": transcript.source_language,
"target_language": transcript.target_language,
"status": transcript.status,
"frontend_url": frontend_url,
"action_items": transcript.action_items,
},
"room": {
"id": room.id,
"name": room.name,
},
}
# Always include calendar_event field, even if no event is present
payload_data["calendar_event"] = {}
# Add calendar event data if present
if calendar_event:
calendar_data = {
"id": calendar_event.id,
"ics_uid": calendar_event.ics_uid,
"title": calendar_event.title,
"start_time": calendar_event.start_time.isoformat()
if calendar_event.start_time
else None,
"end_time": calendar_event.end_time.isoformat()
if calendar_event.end_time
else None,
}
# Add optional fields only if they exist
if calendar_event.description:
calendar_data["description"] = calendar_event.description
if calendar_event.location:
calendar_data["location"] = calendar_event.location
if calendar_event.attendees:
calendar_data["attendees"] = calendar_event.attendees
payload_data["calendar_event"] = calendar_data
# Convert to JSON
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8")
# Generate signature if secret is configured
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "transcript.completed",
"X-Webhook-Retry": str(self.request.retries),
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
# Send webhook with timeout
async with httpx.AsyncClient(timeout=30.0) as client:
log.info( log.info(
"Sending webhook", "Sending webhook",
url=room.webhook_url, url=room.webhook_url,
payload_size=len(payload_bytes), topics=len(payload.transcript.topics),
participants=len(payload.transcript.participants),
) )
response = await client.post( response = await send_webhook_request(
room.webhook_url, url=room.webhook_url,
content=payload_bytes, payload=payload,
headers=headers, event_type="transcript.completed",
webhook_secret=room.webhook_secret,
retry_count=self.request.retries,
timeout=30.0,
) )
response.raise_for_status()
log.info( log.info(
"Webhook sent successfully", "Webhook sent successfully",
status_code=response.status_code, status_code=response.status_code,
@@ -226,8 +116,8 @@ async def send_transcript_webhook(
async def test_webhook(room_id: str) -> dict: async def test_webhook(room_id: str) -> dict:
""" """Test webhook configuration by sending a sample payload.
Test webhook configuration by sending a sample payload.
Returns immediately with success/failure status. Returns immediately with success/failure status.
This is the shared implementation used by both the API endpoint and Celery task. This is the shared implementation used by both the API endpoint and Celery task.
""" """
@@ -239,34 +129,24 @@ async def test_webhook(room_id: str) -> dict:
if not room.webhook_url: if not room.webhook_url:
return {"success": False, "error": "No webhook URL configured"} return {"success": False, "error": "No webhook URL configured"}
now = (datetime.now(timezone.utc).isoformat(),) payload = WebhookTestPayload(
payload_data = { event="test",
"event": "test", event_id=uuid.uuid4().hex,
"event_id": uuid.uuid4().hex, timestamp=datetime.now(timezone.utc),
"timestamp": now, message="This is a test webhook from Reflector",
"message": "This is a test webhook from Reflector", room=WebhookRoomPayload(
"room": { id=room.id,
"id": room.id, name=room.name,
"name": room.name, ),
}, )
}
payload_bytes = _serialize_payload(payload)
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8") headers = build_webhook_headers(
event_type="test",
# Generate headers with signature payload_bytes=payload_bytes,
headers = { webhook_secret=room.webhook_secret,
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "test",
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
) )
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
# Send test webhook with short timeout # Send test webhook with short timeout
async with httpx.AsyncClient(timeout=10.0) as client: async with httpx.AsyncClient(timeout=10.0) as client: