diff --git a/server/reflector/hatchet/run_workers.py b/server/reflector/hatchet/run_workers.py index 243df7bf..ae941a2d 100644 --- a/server/reflector/hatchet/run_workers.py +++ b/server/reflector/hatchet/run_workers.py @@ -1,5 +1,5 @@ """ -Run Hatchet workers for the diarization pipeline. +Run Hatchet workers for processing pipelines. Runs as a separate process, just like Celery workers. Usage: @@ -39,7 +39,7 @@ def main() -> None: # Can't use lazy init: decorators need the client object when function is defined. from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415 from reflector.hatchet.workflows import ( # noqa: PLC0415 - diarization_pipeline, + daily_multitrack_pipeline, subject_workflow, topic_chunk_workflow, track_workflow, @@ -54,7 +54,7 @@ def main() -> None: worker = hatchet.worker( "reflector-pipeline-worker", workflows=[ - diarization_pipeline, + daily_multitrack_pipeline, subject_workflow, topic_chunk_workflow, track_workflow, diff --git a/server/reflector/hatchet/workflows/__init__.py b/server/reflector/hatchet/workflows/__init__.py index df780872..ea242ad6 100644 --- a/server/reflector/hatchet/workflows/__init__.py +++ b/server/reflector/hatchet/workflows/__init__.py @@ -1,8 +1,8 @@ """Hatchet workflow definitions.""" -from reflector.hatchet.workflows.diarization_pipeline import ( +from reflector.hatchet.workflows.daily_multitrack_pipeline import ( PipelineInput, - diarization_pipeline, + daily_multitrack_pipeline, ) from reflector.hatchet.workflows.subject_processing import ( SubjectInput, @@ -15,7 +15,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import ( from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow __all__ = [ - "diarization_pipeline", + "daily_multitrack_pipeline", "subject_workflow", "topic_chunk_workflow", "track_workflow", diff --git a/server/reflector/hatchet/workflows/diarization_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py similarity index 94% rename from server/reflector/hatchet/workflows/diarization_pipeline.py rename to server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index 1557e2c6..d9676a59 100644 --- a/server/reflector/hatchet/workflows/diarization_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -1,9 +1,13 @@ """ -Hatchet main workflow: DiarizationPipeline +Hatchet main workflow: DailyMultitrackPipeline -Multitrack diarization pipeline for Daily.co recordings. +Multitrack processing pipeline for Daily.co recordings. Orchestrates the full processing flow from recording metadata to final transcript. +Note: This workflow was previously named DiarizationPipeline, which was misleading. +Daily.co recordings don't require ML diarization - speaker identification comes from +track index (each participant's audio is a separate track). + Note: This file uses deferred imports (inside functions/tasks) intentionally. Hatchet workers run in forked processes; fresh imports per task ensure DB connections are not shared across forks, avoiding connection pooling issues. @@ -93,7 +97,7 @@ from reflector.zulip import post_transcript_notification class PipelineInput(BaseModel): - """Input to trigger the diarization pipeline.""" + """Input to trigger the Daily.co multitrack pipeline.""" recording_id: NonEmptyString tracks: list[dict] # List of {"s3_key": str} @@ -104,8 +108,8 @@ class PipelineInput(BaseModel): hatchet = HatchetClientManager.get_client() -diarization_pipeline = hatchet.workflow( - name="DiarizationPipeline", input_validator=PipelineInput +daily_multitrack_pipeline = hatchet.workflow( + name="DailyMultitrackPipeline", input_validator=PipelineInput ) @@ -191,7 +195,7 @@ def with_error_handling(step_name: str, set_error_status: bool = True) -> Callab return decorator -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3 ) @with_error_handling("get_recording") @@ -244,7 +248,7 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: ) -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[get_recording], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3, @@ -335,7 +339,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe ) -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[get_participants], execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3, @@ -404,7 +408,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes ) -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[process_tracks], execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3, @@ -487,7 +491,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: ) -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[mixdown_tracks], execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM), retries=3, @@ -555,7 +559,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul return WaveformResult(waveform_generated=True) -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[mixdown_tracks], execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3, @@ -658,7 +662,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: return TopicsResult(topics=topics_list) -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[detect_topics], execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3, @@ -721,7 +725,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult: return TitleResult(title=title_result) -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[detect_topics], execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM), retries=3, @@ -802,7 +806,7 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult ) -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[extract_subjects], execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3, @@ -844,7 +848,7 @@ async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubject return ProcessSubjectsResult(subject_summaries=subject_summaries) -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[process_subjects], execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM), retries=3, @@ -938,7 +942,7 @@ async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult: return RecapResult(short_summary=short_summary, long_summary=long_summary) -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[extract_subjects], execution_timeout=timedelta(seconds=TIMEOUT_LONG), retries=3, @@ -1009,7 +1013,7 @@ async def identify_action_items( return ActionItemsResult(action_items=action_items_dict) -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[generate_waveform, generate_title, generate_recap, identify_action_items], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3, @@ -1094,7 +1098,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: return FinalizeResult(status="COMPLETED") -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[finalize], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3 ) @with_error_handling("cleanup_consent", set_error_status=False) @@ -1194,7 +1198,7 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult: return ConsentResult() -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[cleanup_consent], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=5, @@ -1221,14 +1225,14 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult: return ZulipResult(zulip_message_id=message_id) -@diarization_pipeline.task( +@daily_multitrack_pipeline.task( parents=[post_zulip], execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM), retries=30, ) @with_error_handling("send_webhook", set_error_status=False) async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult: - """Send completion webhook to external service.""" + """Send completion webhook to external service with full payload and HMAC signature.""" ctx.log(f"send_webhook: transcript_id={input.transcript_id}") if not input.room_id: @@ -1237,27 +1241,39 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult: async with fresh_db_connection(): from reflector.db.rooms import rooms_controller # noqa: PLC0415 - from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 + from reflector.utils.webhook import ( # noqa: PLC0415 + build_transcript_webhook_payload, + send_webhook_request, + ) room = await rooms_controller.get_by_id(input.room_id) - transcript = await transcripts_controller.get_by_id(input.transcript_id) + if not room or not room.webhook_url: + ctx.log("send_webhook skipped (no webhook_url configured)") + return WebhookResult(webhook_sent=False, skipped=True) - if room and room.webhook_url and transcript: - webhook_payload = { - "event": "transcript.completed", - "transcript_id": input.transcript_id, - "title": transcript.title, - "duration": transcript.duration, - } + payload = await build_transcript_webhook_payload( + transcript_id=input.transcript_id, + room_id=input.room_id, + ) - async with httpx.AsyncClient() as client: - response = await client.post( - room.webhook_url, json=webhook_payload, timeout=30 - ) - response.raise_for_status() + if not payload: + ctx.log("send_webhook skipped (could not build payload)") + return WebhookResult(webhook_sent=False, skipped=True) - ctx.log(f"send_webhook complete: status_code={response.status_code}") + ctx.log( + f"send_webhook: sending to {room.webhook_url} " + f"(topics={len(payload.transcript.topics)}, " + f"participants={len(payload.transcript.participants)})" + ) - return WebhookResult(webhook_sent=True, response_code=response.status_code) + response = await send_webhook_request( + url=room.webhook_url, + payload=payload, + event_type="transcript.completed", + webhook_secret=room.webhook_secret, + timeout=30.0, + ) - return WebhookResult(webhook_sent=False, skipped=True) + ctx.log(f"send_webhook complete: status_code={response.status_code}") + + return WebhookResult(webhook_sent=True, response_code=response.status_code) diff --git a/server/reflector/hatchet/workflows/track_processing.py b/server/reflector/hatchet/workflows/track_processing.py index 8a401b19..72b10e9a 100644 --- a/server/reflector/hatchet/workflows/track_processing.py +++ b/server/reflector/hatchet/workflows/track_processing.py @@ -4,10 +4,10 @@ Hatchet child workflow: TrackProcessing Handles individual audio track processing: padding and transcription. Spawned dynamically by the main diarization pipeline for each track. -Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline) +Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline) because Hatchet workflow DAGs are defined statically, but the number of tracks varies at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the -standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py. +standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py. Note: This file uses deferred imports (inside tasks) intentionally. Hatchet workers run in forked processes; fresh imports per task ensure diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index 1c36d0e9..2a9e7d7e 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -258,7 +258,7 @@ async def dispatch_transcript_processing( pass workflow_id = await HatchetClientManager.start_workflow( - workflow_name="DiarizationPipeline", + workflow_name="DailyMultitrackPipeline", input_data={ "recording_id": config.recording_id, "tracks": [{"s3_key": k} for k in config.track_keys], diff --git a/server/reflector/utils/audio_waveform.py b/server/reflector/utils/audio_waveform.py index b4412e05..c135b21e 100644 --- a/server/reflector/utils/audio_waveform.py +++ b/server/reflector/utils/audio_waveform.py @@ -3,8 +3,12 @@ from pathlib import Path import av import numpy as np +from reflector.utils.audio_constants import WAVEFORM_SEGMENTS -def get_audio_waveform(path: Path | str, segments_count: int = 256) -> list[int]: + +def get_audio_waveform( + path: Path | str, segments_count: int = WAVEFORM_SEGMENTS +) -> list[int]: if isinstance(path, Path): path = path.as_posix() @@ -70,7 +74,7 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("path", type=Path) - parser.add_argument("--segments-count", type=int, default=256) + parser.add_argument("--segments-count", type=int, default=WAVEFORM_SEGMENTS) args = parser.parse_args() print(get_audio_waveform(args.path, args.segments_count)) diff --git a/server/reflector/utils/webhook.py b/server/reflector/utils/webhook.py new file mode 100644 index 00000000..2accaad9 --- /dev/null +++ b/server/reflector/utils/webhook.py @@ -0,0 +1,264 @@ +"""Webhook utilities and Pydantic models. + +Shared webhook functionality for both Hatchet and Celery pipelines. +""" + +import hashlib +import hmac +import uuid +from datetime import datetime, timezone + +import httpx +from pydantic import BaseModel + +from reflector.logger import logger +from reflector.settings import settings + + +class WebhookTopicPayload(BaseModel): + title: str + summary: str + timestamp: float + duration: float | None + webvtt: str + + +class WebhookParticipantPayload(BaseModel): + id: str + name: str | None + speaker: int | None + + +class WebhookRoomPayload(BaseModel): + id: str + name: str + + +class WebhookCalendarEventPayload(BaseModel): + id: str + ics_uid: str | None = None + title: str | None = None + start_time: datetime | None = None + end_time: datetime | None = None + description: str | None = None + location: str | None = None + attendees: list[str] | None = None + + +class WebhookTranscriptPayload(BaseModel): + id: str + room_id: str | None + created_at: datetime + duration: float | None + title: str | None + short_summary: str | None + long_summary: str | None + webvtt: str | None + topics: list[WebhookTopicPayload] + participants: list[WebhookParticipantPayload] + source_language: str + target_language: str + status: str + frontend_url: str + action_items: dict | None + + +class WebhookPayload(BaseModel): + event: str + event_id: str + timestamp: datetime + transcript: WebhookTranscriptPayload + room: WebhookRoomPayload + calendar_event: WebhookCalendarEventPayload | None = None + + +class WebhookTestPayload(BaseModel): + event: str = "test" + event_id: str + timestamp: datetime + message: str + room: WebhookRoomPayload + + +def _serialize_payload(payload: BaseModel) -> bytes: + """Serialize Pydantic model to compact JSON bytes.""" + return payload.model_dump_json(by_alias=True, exclude_none=False).encode("utf-8") + + +def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str: + """Generate HMAC-SHA256 signature for webhook payload.""" + signed_payload = f"{timestamp}.{payload.decode('utf-8')}" + hmac_obj = hmac.new( + secret.encode("utf-8"), + signed_payload.encode("utf-8"), + hashlib.sha256, + ) + return hmac_obj.hexdigest() + + +def build_webhook_headers( + event_type: str, + payload_bytes: bytes, + webhook_secret: str | None = None, + retry_count: int = 0, +) -> dict[str, str]: + headers = { + "Content-Type": "application/json", + "User-Agent": "Reflector-Webhook/1.0", + "X-Webhook-Event": event_type, + "X-Webhook-Retry": str(retry_count), + } + + if webhook_secret: + timestamp = str(int(datetime.now(timezone.utc).timestamp())) + signature = generate_webhook_signature(payload_bytes, webhook_secret, timestamp) + headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}" + + return headers + + +async def send_webhook_request( + url: str, + payload: BaseModel, + event_type: str, + webhook_secret: str | None = None, + retry_count: int = 0, + timeout: float = 30.0, +) -> httpx.Response: + """Send webhook request with proper headers and signature. + + Raises: + httpx.HTTPStatusError: On non-2xx response + httpx.ConnectError: On connection failure + httpx.TimeoutException: On timeout + """ + payload_bytes = _serialize_payload(payload) + + headers = build_webhook_headers( + event_type=event_type, + payload_bytes=payload_bytes, + webhook_secret=webhook_secret, + retry_count=retry_count, + ) + + async with httpx.AsyncClient(timeout=timeout) as client: + response = await client.post(url, content=payload_bytes, headers=headers) + response.raise_for_status() + return response + + +async def build_transcript_webhook_payload( + transcript_id: str, + room_id: str, +) -> WebhookPayload | None: + """Build webhook payload by fetching transcript and room data from database.""" + # Inline imports required: this utils module would create circular imports + # if db modules were imported at top level (utils -> db -> ... -> utils). + # This pattern is consistent with Hatchet task files. + from reflector.db.calendar_events import calendar_events_controller # noqa: PLC0415 + from reflector.db.meetings import meetings_controller # noqa: PLC0415 + from reflector.db.rooms import rooms_controller # noqa: PLC0415 + from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 + from reflector.utils.webvtt import topics_to_webvtt # noqa: PLC0415 + + transcript = await transcripts_controller.get_by_id(transcript_id) + if not transcript: + return None + + room = await rooms_controller.get_by_id(room_id) + if not room: + return None + + topics_data = [ + WebhookTopicPayload( + title=topic.title, + summary=topic.summary, + timestamp=topic.timestamp, + duration=topic.duration, + webvtt=topics_to_webvtt([topic]) if topic.words else "", + ) + for topic in (transcript.topics or []) + ] + + participants_data = [ + WebhookParticipantPayload(id=p.id, name=p.name, speaker=p.speaker) + for p in (transcript.participants or []) + ] + + calendar_event_data: WebhookCalendarEventPayload | None = None + try: + if transcript.meeting_id: + meeting = await meetings_controller.get_by_id(transcript.meeting_id) + if meeting and meeting.calendar_event_id: + calendar_event = await calendar_events_controller.get_by_id( + meeting.calendar_event_id + ) + if calendar_event: + calendar_event_data = WebhookCalendarEventPayload( + id=calendar_event.id, + ics_uid=calendar_event.ics_uid, + title=calendar_event.title, + start_time=calendar_event.start_time, + end_time=calendar_event.end_time, + description=calendar_event.description or None, + location=calendar_event.location or None, + attendees=calendar_event.attendees or None, + ) + except Exception as e: + logger.warning( + "Failed to fetch calendar event for webhook", + transcript_id=transcript_id, + meeting_id=transcript.meeting_id, + error=str(e), + ) + + frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}" + + return WebhookPayload( + event="transcript.completed", + event_id=uuid.uuid4().hex, + timestamp=datetime.now(timezone.utc), + transcript=WebhookTranscriptPayload( + id=transcript.id, + room_id=transcript.room_id, + created_at=transcript.created_at, + duration=transcript.duration, + title=transcript.title, + short_summary=transcript.short_summary, + long_summary=transcript.long_summary, + webvtt=transcript.webvtt, + topics=topics_data, + participants=participants_data, + source_language=transcript.source_language, + target_language=transcript.target_language, + status=transcript.status, + frontend_url=frontend_url, + action_items=transcript.action_items, + ), + room=WebhookRoomPayload( + id=room.id, + name=room.name, + ), + calendar_event=calendar_event_data, + ) + + +async def build_test_webhook_payload(room_id: str) -> WebhookTestPayload | None: + """Build test webhook payload.""" + # Inline import: avoid circular dependency (utils -> db -> utils) + from reflector.db.rooms import rooms_controller # noqa: PLC0415 + + room = await rooms_controller.get_by_id(room_id) + if not room: + return None + + return WebhookTestPayload( + event="test", + event_id=uuid.uuid4().hex, + timestamp=datetime.now(timezone.utc), + message="This is a test webhook from Reflector", + room=WebhookRoomPayload( + id=room.id, + name=room.name, + ), + ) diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 95a75464..bf26ea6c 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -300,7 +300,7 @@ async def _process_multitrack_recording_inner( if use_hatchet: workflow_id = await HatchetClientManager.start_workflow( - workflow_name="DiarizationPipeline", + workflow_name="DailyMultitrackPipeline", input_data={ "recording_id": recording_id, "tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)], @@ -848,7 +848,7 @@ async def reprocess_failed_daily_recordings(): continue workflow_id = await HatchetClientManager.start_workflow( - workflow_name="DiarizationPipeline", + workflow_name="DailyMultitrackPipeline", input_data={ "recording_id": recording.id, "tracks": [ diff --git a/server/reflector/worker/webhook.py b/server/reflector/worker/webhook.py index d58cc7b3..98af3e2f 100644 --- a/server/reflector/worker/webhook.py +++ b/server/reflector/worker/webhook.py @@ -1,8 +1,5 @@ """Webhook task for sending transcript notifications.""" -import hashlib -import hmac -import json import uuid from datetime import datetime, timezone @@ -11,28 +8,20 @@ import structlog from celery import shared_task from celery.utils.log import get_task_logger -from reflector.db.calendar_events import calendar_events_controller -from reflector.db.meetings import meetings_controller from reflector.db.rooms import rooms_controller -from reflector.db.transcripts import transcripts_controller from reflector.pipelines.main_live_pipeline import asynctask -from reflector.settings import settings -from reflector.utils.webvtt import topics_to_webvtt +from reflector.utils.webhook import ( + WebhookRoomPayload, + WebhookTestPayload, + _serialize_payload, + build_transcript_webhook_payload, + build_webhook_headers, + send_webhook_request, +) logger = structlog.wrap_logger(get_task_logger(__name__)) -def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str: - """Generate HMAC signature for webhook payload.""" - signed_payload = f"{timestamp}.{payload.decode('utf-8')}" - hmac_obj = hmac.new( - secret.encode("utf-8"), - signed_payload.encode("utf-8"), - hashlib.sha256, - ) - return hmac_obj.hexdigest() - - @shared_task( bind=True, max_retries=30, @@ -47,6 +36,11 @@ async def send_transcript_webhook( room_id: str, event_id: str, ): + """Send webhook notification for completed transcript. + + Uses shared Pydantic models and signature generation from utils/webhook.py + to ensure consistency with Hatchet pipeline. + """ log = logger.bind( transcript_id=transcript_id, room_id=room_id, @@ -54,12 +48,6 @@ async def send_transcript_webhook( ) try: - # Fetch transcript and room - transcript = await transcripts_controller.get_by_id(transcript_id) - if not transcript: - log.error("Transcript not found, skipping webhook") - return - room = await rooms_controller.get_by_id(room_id) if not room: log.error("Room not found, skipping webhook") @@ -69,135 +57,37 @@ async def send_transcript_webhook( log.info("No webhook URL configured for room, skipping") return - # Generate WebVTT content from topics - topics_data = [] + # Build payload using shared function + payload = await build_transcript_webhook_payload( + transcript_id=transcript_id, + room_id=room_id, + ) - if transcript.topics: - # Build topics data with diarized content per topic - for topic in transcript.topics: - topic_webvtt = topics_to_webvtt([topic]) if topic.words else "" - topics_data.append( - { - "title": topic.title, - "summary": topic.summary, - "timestamp": topic.timestamp, - "duration": topic.duration, - "webvtt": topic_webvtt, - } - ) + if not payload: + log.error("Could not build webhook payload, skipping") + return - # Fetch meeting and calendar event if they exist - calendar_event = None - try: - if transcript.meeting_id: - meeting = await meetings_controller.get_by_id(transcript.meeting_id) - if meeting and meeting.calendar_event_id: - calendar_event = await calendar_events_controller.get_by_id( - meeting.calendar_event_id - ) - except Exception as e: - logger.error("Error fetching meeting or calendar event", error=str(e)) + log.info( + "Sending webhook", + url=room.webhook_url, + topics=len(payload.transcript.topics), + participants=len(payload.transcript.participants), + ) - # Build webhook payload - frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}" - participants = [ - {"id": p.id, "name": p.name, "speaker": p.speaker} - for p in (transcript.participants or []) - ] - payload_data = { - "event": "transcript.completed", - "event_id": event_id, - "timestamp": datetime.now(timezone.utc).isoformat(), - "transcript": { - "id": transcript.id, - "room_id": transcript.room_id, - "created_at": transcript.created_at.isoformat(), - "duration": transcript.duration, - "title": transcript.title, - "short_summary": transcript.short_summary, - "long_summary": transcript.long_summary, - "webvtt": transcript.webvtt, - "topics": topics_data, - "participants": participants, - "source_language": transcript.source_language, - "target_language": transcript.target_language, - "status": transcript.status, - "frontend_url": frontend_url, - "action_items": transcript.action_items, - }, - "room": { - "id": room.id, - "name": room.name, - }, - } + response = await send_webhook_request( + url=room.webhook_url, + payload=payload, + event_type="transcript.completed", + webhook_secret=room.webhook_secret, + retry_count=self.request.retries, + timeout=30.0, + ) - # Always include calendar_event field, even if no event is present - payload_data["calendar_event"] = {} - - # Add calendar event data if present - if calendar_event: - calendar_data = { - "id": calendar_event.id, - "ics_uid": calendar_event.ics_uid, - "title": calendar_event.title, - "start_time": calendar_event.start_time.isoformat() - if calendar_event.start_time - else None, - "end_time": calendar_event.end_time.isoformat() - if calendar_event.end_time - else None, - } - - # Add optional fields only if they exist - if calendar_event.description: - calendar_data["description"] = calendar_event.description - if calendar_event.location: - calendar_data["location"] = calendar_event.location - if calendar_event.attendees: - calendar_data["attendees"] = calendar_event.attendees - - payload_data["calendar_event"] = calendar_data - - # Convert to JSON - payload_json = json.dumps(payload_data, separators=(",", ":")) - payload_bytes = payload_json.encode("utf-8") - - # Generate signature if secret is configured - headers = { - "Content-Type": "application/json", - "User-Agent": "Reflector-Webhook/1.0", - "X-Webhook-Event": "transcript.completed", - "X-Webhook-Retry": str(self.request.retries), - } - - if room.webhook_secret: - timestamp = str(int(datetime.now(timezone.utc).timestamp())) - signature = generate_webhook_signature( - payload_bytes, room.webhook_secret, timestamp - ) - headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}" - - # Send webhook with timeout - async with httpx.AsyncClient(timeout=30.0) as client: - log.info( - "Sending webhook", - url=room.webhook_url, - payload_size=len(payload_bytes), - ) - - response = await client.post( - room.webhook_url, - content=payload_bytes, - headers=headers, - ) - - response.raise_for_status() - - log.info( - "Webhook sent successfully", - status_code=response.status_code, - response_size=len(response.content), - ) + log.info( + "Webhook sent successfully", + status_code=response.status_code, + response_size=len(response.content), + ) except httpx.HTTPStatusError as e: log.error( @@ -226,8 +116,8 @@ async def send_transcript_webhook( async def test_webhook(room_id: str) -> dict: - """ - Test webhook configuration by sending a sample payload. + """Test webhook configuration by sending a sample payload. + Returns immediately with success/failure status. This is the shared implementation used by both the API endpoint and Celery task. """ @@ -239,34 +129,25 @@ async def test_webhook(room_id: str) -> dict: if not room.webhook_url: return {"success": False, "error": "No webhook URL configured"} - now = (datetime.now(timezone.utc).isoformat(),) - payload_data = { - "event": "test", - "event_id": uuid.uuid4().hex, - "timestamp": now, - "message": "This is a test webhook from Reflector", - "room": { - "id": room.id, - "name": room.name, - }, - } + payload = WebhookTestPayload( + event="test", + event_id=uuid.uuid4().hex, + timestamp=datetime.now(timezone.utc), + message="This is a test webhook from Reflector", + room=WebhookRoomPayload( + id=room.id, + name=room.name, + ), + ) - payload_json = json.dumps(payload_data, separators=(",", ":")) - payload_bytes = payload_json.encode("utf-8") + payload_bytes = _serialize_payload(payload) - # Generate headers with signature - headers = { - "Content-Type": "application/json", - "User-Agent": "Reflector-Webhook/1.0", - "X-Webhook-Event": "test", - } - - if room.webhook_secret: - timestamp = str(int(datetime.now(timezone.utc).timestamp())) - signature = generate_webhook_signature( - payload_bytes, room.webhook_secret, timestamp - ) - headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}" + # Build headers with signature + headers = build_webhook_headers( + event_type="test", + payload_bytes=payload_bytes, + webhook_secret=room.webhook_secret, + ) # Send test webhook with short timeout async with httpx.AsyncClient(timeout=10.0) as client: