From 10c17d7086bebfa3f51295eafc942a841df87d97 Mon Sep 17 00:00:00 2001 From: Igor Loskutov Date: Tue, 23 Dec 2025 16:56:43 -0500 Subject: [PATCH] send_webhook fixes --- server/reflector/hatchet/run_workers.py | 6 +- .../reflector/hatchet/workflows/__init__.py | 6 +- .../workflows/daily_multitrack_pipeline.py | 1 - .../reflector/services/transcript_process.py | 2 +- server/reflector/utils/webhook.py | 90 ++++++------------- .../utils/webhook_outgoing_models.py | 80 +++++++++++++++++ 6 files changed, 112 insertions(+), 73 deletions(-) create mode 100644 server/reflector/utils/webhook_outgoing_models.py diff --git a/server/reflector/hatchet/run_workers.py b/server/reflector/hatchet/run_workers.py index ae941a2d..243df7bf 100644 --- a/server/reflector/hatchet/run_workers.py +++ b/server/reflector/hatchet/run_workers.py @@ -1,5 +1,5 @@ """ -Run Hatchet workers for processing pipelines. +Run Hatchet workers for the diarization pipeline. Runs as a separate process, just like Celery workers. Usage: @@ -39,7 +39,7 @@ def main() -> None: # Can't use lazy init: decorators need the client object when function is defined. from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415 from reflector.hatchet.workflows import ( # noqa: PLC0415 - daily_multitrack_pipeline, + diarization_pipeline, subject_workflow, topic_chunk_workflow, track_workflow, @@ -54,7 +54,7 @@ def main() -> None: worker = hatchet.worker( "reflector-pipeline-worker", workflows=[ - daily_multitrack_pipeline, + diarization_pipeline, subject_workflow, topic_chunk_workflow, track_workflow, diff --git a/server/reflector/hatchet/workflows/__init__.py b/server/reflector/hatchet/workflows/__init__.py index ea242ad6..df780872 100644 --- a/server/reflector/hatchet/workflows/__init__.py +++ b/server/reflector/hatchet/workflows/__init__.py @@ -1,8 +1,8 @@ """Hatchet workflow definitions.""" -from reflector.hatchet.workflows.daily_multitrack_pipeline import ( +from reflector.hatchet.workflows.diarization_pipeline import ( PipelineInput, - daily_multitrack_pipeline, + diarization_pipeline, ) from reflector.hatchet.workflows.subject_processing import ( SubjectInput, @@ -15,7 +15,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import ( from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow __all__ = [ - "daily_multitrack_pipeline", + "diarization_pipeline", "subject_workflow", "topic_chunk_workflow", "track_workflow", diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index d9676a59..167facc5 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -4,7 +4,6 @@ Hatchet main workflow: DailyMultitrackPipeline Multitrack processing pipeline for Daily.co recordings. Orchestrates the full processing flow from recording metadata to final transcript. -Note: This workflow was previously named DiarizationPipeline, which was misleading. Daily.co recordings don't require ML diarization - speaker identification comes from track index (each participant's audio is a separate track). diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index 2a9e7d7e..1c36d0e9 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -258,7 +258,7 @@ async def dispatch_transcript_processing( pass workflow_id = await HatchetClientManager.start_workflow( - workflow_name="DailyMultitrackPipeline", + workflow_name="DiarizationPipeline", input_data={ "recording_id": config.recording_id, "tracks": [{"s3_key": k} for k in config.track_keys], diff --git a/server/reflector/utils/webhook.py b/server/reflector/utils/webhook.py index 2accaad9..1957dbb3 100644 --- a/server/reflector/utils/webhook.py +++ b/server/reflector/utils/webhook.py @@ -1,4 +1,4 @@ -"""Webhook utilities and Pydantic models. +"""Webhook utilities. Shared webhook functionality for both Hatchet and Celery pipelines. """ @@ -13,71 +13,31 @@ from pydantic import BaseModel from reflector.logger import logger from reflector.settings import settings +from reflector.utils.webhook_outgoing_models import ( + WebhookCalendarEventPayload, + WebhookParticipantPayload, + WebhookPayload, + WebhookRoomPayload, + WebhookTestPayload, + WebhookTopicPayload, + WebhookTranscriptPayload, +) - -class WebhookTopicPayload(BaseModel): - title: str - summary: str - timestamp: float - duration: float | None - webvtt: str - - -class WebhookParticipantPayload(BaseModel): - id: str - name: str | None - speaker: int | None - - -class WebhookRoomPayload(BaseModel): - id: str - name: str - - -class WebhookCalendarEventPayload(BaseModel): - id: str - ics_uid: str | None = None - title: str | None = None - start_time: datetime | None = None - end_time: datetime | None = None - description: str | None = None - location: str | None = None - attendees: list[str] | None = None - - -class WebhookTranscriptPayload(BaseModel): - id: str - room_id: str | None - created_at: datetime - duration: float | None - title: str | None - short_summary: str | None - long_summary: str | None - webvtt: str | None - topics: list[WebhookTopicPayload] - participants: list[WebhookParticipantPayload] - source_language: str - target_language: str - status: str - frontend_url: str - action_items: dict | None - - -class WebhookPayload(BaseModel): - event: str - event_id: str - timestamp: datetime - transcript: WebhookTranscriptPayload - room: WebhookRoomPayload - calendar_event: WebhookCalendarEventPayload | None = None - - -class WebhookTestPayload(BaseModel): - event: str = "test" - event_id: str - timestamp: datetime - message: str - room: WebhookRoomPayload +__all__ = [ + "WebhookCalendarEventPayload", + "WebhookParticipantPayload", + "WebhookPayload", + "WebhookRoomPayload", + "WebhookTestPayload", + "WebhookTopicPayload", + "WebhookTranscriptPayload", + "_serialize_payload", + "build_transcript_webhook_payload", + "build_test_webhook_payload", + "build_webhook_headers", + "generate_webhook_signature", + "send_webhook_request", +] def _serialize_payload(payload: BaseModel) -> bytes: diff --git a/server/reflector/utils/webhook_outgoing_models.py b/server/reflector/utils/webhook_outgoing_models.py new file mode 100644 index 00000000..3790eec1 --- /dev/null +++ b/server/reflector/utils/webhook_outgoing_models.py @@ -0,0 +1,80 @@ +"""Pydantic models for outgoing webhook payloads. + +These models define the structure of webhook payloads sent by Reflector +to external services when transcript processing completes. +""" + +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel + +from reflector.utils.string import NonEmptyString + +WebhookTranscriptEventType = Literal["transcript.completed"] +WebhookTestEventType = Literal["test"] + + +class WebhookTopicPayload(BaseModel): + title: NonEmptyString + summary: NonEmptyString + timestamp: float + duration: float | None + webvtt: str # can be empty when no words + + +class WebhookParticipantPayload(BaseModel): + id: NonEmptyString + name: str | None + speaker: int | None + + +class WebhookRoomPayload(BaseModel): + id: NonEmptyString + name: NonEmptyString + + +class WebhookCalendarEventPayload(BaseModel): + id: NonEmptyString + ics_uid: str | None = None + title: str | None = None + start_time: datetime | None = None + end_time: datetime | None = None + description: str | None = None + location: str | None = None + attendees: list[str] | None = None + + +class WebhookTranscriptPayload(BaseModel): + id: NonEmptyString + room_id: NonEmptyString | None + created_at: datetime + duration: float | None + title: str | None + short_summary: str | None + long_summary: str | None + webvtt: str | None + topics: list[WebhookTopicPayload] + participants: list[WebhookParticipantPayload] + source_language: NonEmptyString + target_language: NonEmptyString + status: NonEmptyString + frontend_url: NonEmptyString + action_items: dict | None + + +class WebhookPayload(BaseModel): + event: WebhookTranscriptEventType + event_id: NonEmptyString + timestamp: datetime + transcript: WebhookTranscriptPayload + room: WebhookRoomPayload + calendar_event: WebhookCalendarEventPayload | None = None + + +class WebhookTestPayload(BaseModel): + event: WebhookTestEventType + event_id: NonEmptyString + timestamp: datetime + message: NonEmptyString + room: WebhookRoomPayload