Compare commits

..

2 Commits

Author SHA1 Message Date
Igor Loskutov
10c17d7086 send_webhook fixes 2025-12-23 16:56:43 -05:00
Igor Loskutov
7a1d1dc08d pipeline fixes: whereby Hatchet preparation 2025-12-23 13:47:04 -05:00
7 changed files with 426 additions and 222 deletions

View File

@@ -1,9 +1,12 @@
"""
Hatchet main workflow: DiarizationPipeline
Hatchet main workflow: DailyMultitrackPipeline
Multitrack diarization pipeline for Daily.co recordings.
Multitrack processing pipeline for Daily.co recordings.
Orchestrates the full processing flow from recording metadata to final transcript.
Daily.co recordings don't require ML diarization - speaker identification comes from
track index (each participant's audio is a separate track).
Note: This file uses deferred imports (inside functions/tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure DB connections
are not shared across forks, avoiding connection pooling issues.
@@ -93,7 +96,7 @@ from reflector.zulip import post_transcript_notification
class PipelineInput(BaseModel):
"""Input to trigger the diarization pipeline."""
"""Input to trigger the Daily.co multitrack pipeline."""
recording_id: NonEmptyString
tracks: list[dict] # List of {"s3_key": str}
@@ -104,8 +107,8 @@ class PipelineInput(BaseModel):
hatchet = HatchetClientManager.get_client()
diarization_pipeline = hatchet.workflow(
name="DiarizationPipeline", input_validator=PipelineInput
daily_multitrack_pipeline = hatchet.workflow(
name="DailyMultitrackPipeline", input_validator=PipelineInput
)
@@ -191,7 +194,7 @@ def with_error_handling(step_name: str, set_error_status: bool = True) -> Callab
return decorator
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
)
@with_error_handling("get_recording")
@@ -244,7 +247,7 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[get_recording],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
@@ -335,7 +338,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[get_participants],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
@@ -404,7 +407,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[process_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
retries=3,
@@ -487,7 +490,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[mixdown_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
@@ -555,7 +558,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
return WaveformResult(waveform_generated=True)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[mixdown_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
@@ -658,7 +661,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
return TopicsResult(topics=topics_list)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[detect_topics],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
@@ -721,7 +724,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
return TitleResult(title=title_result)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[detect_topics],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
@@ -802,7 +805,7 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[extract_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
@@ -844,7 +847,7 @@ async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubject
return ProcessSubjectsResult(subject_summaries=subject_summaries)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[process_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
@@ -938,7 +941,7 @@ async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
return RecapResult(short_summary=short_summary, long_summary=long_summary)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[extract_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_LONG),
retries=3,
@@ -1009,7 +1012,7 @@ async def identify_action_items(
return ActionItemsResult(action_items=action_items_dict)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
@@ -1094,7 +1097,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
return FinalizeResult(status="COMPLETED")
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[finalize], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
)
@with_error_handling("cleanup_consent", set_error_status=False)
@@ -1194,7 +1197,7 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
return ConsentResult()
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[cleanup_consent],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=5,
@@ -1221,14 +1224,14 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
return ZulipResult(zulip_message_id=message_id)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[post_zulip],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=30,
)
@with_error_handling("send_webhook", set_error_status=False)
async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
"""Send completion webhook to external service."""
"""Send completion webhook to external service with full payload and HMAC signature."""
ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
if not input.room_id:
@@ -1237,27 +1240,39 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
async with fresh_db_connection():
from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.utils.webhook import ( # noqa: PLC0415
build_transcript_webhook_payload,
send_webhook_request,
)
room = await rooms_controller.get_by_id(input.room_id)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not room or not room.webhook_url:
ctx.log("send_webhook skipped (no webhook_url configured)")
return WebhookResult(webhook_sent=False, skipped=True)
if room and room.webhook_url and transcript:
webhook_payload = {
"event": "transcript.completed",
"transcript_id": input.transcript_id,
"title": transcript.title,
"duration": transcript.duration,
}
payload = await build_transcript_webhook_payload(
transcript_id=input.transcript_id,
room_id=input.room_id,
)
async with httpx.AsyncClient() as client:
response = await client.post(
room.webhook_url, json=webhook_payload, timeout=30
)
response.raise_for_status()
if not payload:
ctx.log("send_webhook skipped (could not build payload)")
return WebhookResult(webhook_sent=False, skipped=True)
ctx.log(f"send_webhook complete: status_code={response.status_code}")
ctx.log(
f"send_webhook: sending to {room.webhook_url} "
f"(topics={len(payload.transcript.topics)}, "
f"participants={len(payload.transcript.participants)})"
)
return WebhookResult(webhook_sent=True, response_code=response.status_code)
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
timeout=30.0,
)
return WebhookResult(webhook_sent=False, skipped=True)
ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code)

View File

@@ -4,10 +4,10 @@ Hatchet child workflow: TrackProcessing
Handles individual audio track processing: padding and transcription.
Spawned dynamically by the main diarization pipeline for each track.
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline)
Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline)
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py.
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
Note: This file uses deferred imports (inside tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure

View File

@@ -3,8 +3,12 @@ from pathlib import Path
import av
import numpy as np
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
def get_audio_waveform(path: Path | str, segments_count: int = 256) -> list[int]:
def get_audio_waveform(
path: Path | str, segments_count: int = WAVEFORM_SEGMENTS
) -> list[int]:
if isinstance(path, Path):
path = path.as_posix()
@@ -70,7 +74,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("path", type=Path)
parser.add_argument("--segments-count", type=int, default=256)
parser.add_argument("--segments-count", type=int, default=WAVEFORM_SEGMENTS)
args = parser.parse_args()
print(get_audio_waveform(args.path, args.segments_count))

View File

@@ -0,0 +1,224 @@
"""Webhook utilities.
Shared webhook functionality for both Hatchet and Celery pipelines.
"""
import hashlib
import hmac
import uuid
from datetime import datetime, timezone
import httpx
from pydantic import BaseModel
from reflector.logger import logger
from reflector.settings import settings
from reflector.utils.webhook_outgoing_models import (
WebhookCalendarEventPayload,
WebhookParticipantPayload,
WebhookPayload,
WebhookRoomPayload,
WebhookTestPayload,
WebhookTopicPayload,
WebhookTranscriptPayload,
)
__all__ = [
"WebhookCalendarEventPayload",
"WebhookParticipantPayload",
"WebhookPayload",
"WebhookRoomPayload",
"WebhookTestPayload",
"WebhookTopicPayload",
"WebhookTranscriptPayload",
"_serialize_payload",
"build_transcript_webhook_payload",
"build_test_webhook_payload",
"build_webhook_headers",
"generate_webhook_signature",
"send_webhook_request",
]
def _serialize_payload(payload: BaseModel) -> bytes:
"""Serialize Pydantic model to compact JSON bytes."""
return payload.model_dump_json(by_alias=True, exclude_none=False).encode("utf-8")
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC-SHA256 signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
def build_webhook_headers(
event_type: str,
payload_bytes: bytes,
webhook_secret: str | None = None,
retry_count: int = 0,
) -> dict[str, str]:
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": event_type,
"X-Webhook-Retry": str(retry_count),
}
if webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(payload_bytes, webhook_secret, timestamp)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
return headers
async def send_webhook_request(
url: str,
payload: BaseModel,
event_type: str,
webhook_secret: str | None = None,
retry_count: int = 0,
timeout: float = 30.0,
) -> httpx.Response:
"""Send webhook request with proper headers and signature.
Raises:
httpx.HTTPStatusError: On non-2xx response
httpx.ConnectError: On connection failure
httpx.TimeoutException: On timeout
"""
payload_bytes = _serialize_payload(payload)
headers = build_webhook_headers(
event_type=event_type,
payload_bytes=payload_bytes,
webhook_secret=webhook_secret,
retry_count=retry_count,
)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, content=payload_bytes, headers=headers)
response.raise_for_status()
return response
async def build_transcript_webhook_payload(
transcript_id: str,
room_id: str,
) -> WebhookPayload | None:
"""Build webhook payload by fetching transcript and room data from database."""
# Inline imports required: this utils module would create circular imports
# if db modules were imported at top level (utils -> db -> ... -> utils).
# This pattern is consistent with Hatchet task files.
from reflector.db.calendar_events import calendar_events_controller # noqa: PLC0415
from reflector.db.meetings import meetings_controller # noqa: PLC0415
from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.utils.webvtt import topics_to_webvtt # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
return None
room = await rooms_controller.get_by_id(room_id)
if not room:
return None
topics_data = [
WebhookTopicPayload(
title=topic.title,
summary=topic.summary,
timestamp=topic.timestamp,
duration=topic.duration,
webvtt=topics_to_webvtt([topic]) if topic.words else "",
)
for topic in (transcript.topics or [])
]
participants_data = [
WebhookParticipantPayload(id=p.id, name=p.name, speaker=p.speaker)
for p in (transcript.participants or [])
]
calendar_event_data: WebhookCalendarEventPayload | None = None
try:
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting and meeting.calendar_event_id:
calendar_event = await calendar_events_controller.get_by_id(
meeting.calendar_event_id
)
if calendar_event:
calendar_event_data = WebhookCalendarEventPayload(
id=calendar_event.id,
ics_uid=calendar_event.ics_uid,
title=calendar_event.title,
start_time=calendar_event.start_time,
end_time=calendar_event.end_time,
description=calendar_event.description or None,
location=calendar_event.location or None,
attendees=calendar_event.attendees or None,
)
except Exception as e:
logger.warning(
"Failed to fetch calendar event for webhook",
transcript_id=transcript_id,
meeting_id=transcript.meeting_id,
error=str(e),
)
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
return WebhookPayload(
event="transcript.completed",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
transcript=WebhookTranscriptPayload(
id=transcript.id,
room_id=transcript.room_id,
created_at=transcript.created_at,
duration=transcript.duration,
title=transcript.title,
short_summary=transcript.short_summary,
long_summary=transcript.long_summary,
webvtt=transcript.webvtt,
topics=topics_data,
participants=participants_data,
source_language=transcript.source_language,
target_language=transcript.target_language,
status=transcript.status,
frontend_url=frontend_url,
action_items=transcript.action_items,
),
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
calendar_event=calendar_event_data,
)
async def build_test_webhook_payload(room_id: str) -> WebhookTestPayload | None:
"""Build test webhook payload."""
# Inline import: avoid circular dependency (utils -> db -> utils)
from reflector.db.rooms import rooms_controller # noqa: PLC0415
room = await rooms_controller.get_by_id(room_id)
if not room:
return None
return WebhookTestPayload(
event="test",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
message="This is a test webhook from Reflector",
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
)

View File

@@ -0,0 +1,80 @@
"""Pydantic models for outgoing webhook payloads.
These models define the structure of webhook payloads sent by Reflector
to external services when transcript processing completes.
"""
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
from reflector.utils.string import NonEmptyString
WebhookTranscriptEventType = Literal["transcript.completed"]
WebhookTestEventType = Literal["test"]
class WebhookTopicPayload(BaseModel):
title: NonEmptyString
summary: NonEmptyString
timestamp: float
duration: float | None
webvtt: str # can be empty when no words
class WebhookParticipantPayload(BaseModel):
id: NonEmptyString
name: str | None
speaker: int | None
class WebhookRoomPayload(BaseModel):
id: NonEmptyString
name: NonEmptyString
class WebhookCalendarEventPayload(BaseModel):
id: NonEmptyString
ics_uid: str | None = None
title: str | None = None
start_time: datetime | None = None
end_time: datetime | None = None
description: str | None = None
location: str | None = None
attendees: list[str] | None = None
class WebhookTranscriptPayload(BaseModel):
id: NonEmptyString
room_id: NonEmptyString | None
created_at: datetime
duration: float | None
title: str | None
short_summary: str | None
long_summary: str | None
webvtt: str | None
topics: list[WebhookTopicPayload]
participants: list[WebhookParticipantPayload]
source_language: NonEmptyString
target_language: NonEmptyString
status: NonEmptyString
frontend_url: NonEmptyString
action_items: dict | None
class WebhookPayload(BaseModel):
event: WebhookTranscriptEventType
event_id: NonEmptyString
timestamp: datetime
transcript: WebhookTranscriptPayload
room: WebhookRoomPayload
calendar_event: WebhookCalendarEventPayload | None = None
class WebhookTestPayload(BaseModel):
event: WebhookTestEventType
event_id: NonEmptyString
timestamp: datetime
message: NonEmptyString
room: WebhookRoomPayload

View File

@@ -300,7 +300,7 @@ async def _process_multitrack_recording_inner(
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
workflow_name="DailyMultitrackPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
@@ -848,7 +848,7 @@ async def reprocess_failed_daily_recordings():
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
workflow_name="DailyMultitrackPipeline",
input_data={
"recording_id": recording.id,
"tracks": [

View File

@@ -1,8 +1,5 @@
"""Webhook task for sending transcript notifications."""
import hashlib
import hmac
import json
import uuid
from datetime import datetime, timezone
@@ -11,28 +8,20 @@ import structlog
from celery import shared_task
from celery.utils.log import get_task_logger
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.settings import settings
from reflector.utils.webvtt import topics_to_webvtt
from reflector.utils.webhook import (
WebhookRoomPayload,
WebhookTestPayload,
_serialize_payload,
build_transcript_webhook_payload,
build_webhook_headers,
send_webhook_request,
)
logger = structlog.wrap_logger(get_task_logger(__name__))
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
@shared_task(
bind=True,
max_retries=30,
@@ -47,6 +36,11 @@ async def send_transcript_webhook(
room_id: str,
event_id: str,
):
"""Send webhook notification for completed transcript.
Uses shared Pydantic models and signature generation from utils/webhook.py
to ensure consistency with Hatchet pipeline.
"""
log = logger.bind(
transcript_id=transcript_id,
room_id=room_id,
@@ -54,12 +48,6 @@ async def send_transcript_webhook(
)
try:
# Fetch transcript and room
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
log.error("Transcript not found, skipping webhook")
return
room = await rooms_controller.get_by_id(room_id)
if not room:
log.error("Room not found, skipping webhook")
@@ -69,135 +57,37 @@ async def send_transcript_webhook(
log.info("No webhook URL configured for room, skipping")
return
# Generate WebVTT content from topics
topics_data = []
# Build payload using shared function
payload = await build_transcript_webhook_payload(
transcript_id=transcript_id,
room_id=room_id,
)
if transcript.topics:
# Build topics data with diarized content per topic
for topic in transcript.topics:
topic_webvtt = topics_to_webvtt([topic]) if topic.words else ""
topics_data.append(
{
"title": topic.title,
"summary": topic.summary,
"timestamp": topic.timestamp,
"duration": topic.duration,
"webvtt": topic_webvtt,
}
)
if not payload:
log.error("Could not build webhook payload, skipping")
return
# Fetch meeting and calendar event if they exist
calendar_event = None
try:
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting and meeting.calendar_event_id:
calendar_event = await calendar_events_controller.get_by_id(
meeting.calendar_event_id
)
except Exception as e:
logger.error("Error fetching meeting or calendar event", error=str(e))
log.info(
"Sending webhook",
url=room.webhook_url,
topics=len(payload.transcript.topics),
participants=len(payload.transcript.participants),
)
# Build webhook payload
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
participants = [
{"id": p.id, "name": p.name, "speaker": p.speaker}
for p in (transcript.participants or [])
]
payload_data = {
"event": "transcript.completed",
"event_id": event_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"transcript": {
"id": transcript.id,
"room_id": transcript.room_id,
"created_at": transcript.created_at.isoformat(),
"duration": transcript.duration,
"title": transcript.title,
"short_summary": transcript.short_summary,
"long_summary": transcript.long_summary,
"webvtt": transcript.webvtt,
"topics": topics_data,
"participants": participants,
"source_language": transcript.source_language,
"target_language": transcript.target_language,
"status": transcript.status,
"frontend_url": frontend_url,
"action_items": transcript.action_items,
},
"room": {
"id": room.id,
"name": room.name,
},
}
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
retry_count=self.request.retries,
timeout=30.0,
)
# Always include calendar_event field, even if no event is present
payload_data["calendar_event"] = {}
# Add calendar event data if present
if calendar_event:
calendar_data = {
"id": calendar_event.id,
"ics_uid": calendar_event.ics_uid,
"title": calendar_event.title,
"start_time": calendar_event.start_time.isoformat()
if calendar_event.start_time
else None,
"end_time": calendar_event.end_time.isoformat()
if calendar_event.end_time
else None,
}
# Add optional fields only if they exist
if calendar_event.description:
calendar_data["description"] = calendar_event.description
if calendar_event.location:
calendar_data["location"] = calendar_event.location
if calendar_event.attendees:
calendar_data["attendees"] = calendar_event.attendees
payload_data["calendar_event"] = calendar_data
# Convert to JSON
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8")
# Generate signature if secret is configured
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "transcript.completed",
"X-Webhook-Retry": str(self.request.retries),
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
# Send webhook with timeout
async with httpx.AsyncClient(timeout=30.0) as client:
log.info(
"Sending webhook",
url=room.webhook_url,
payload_size=len(payload_bytes),
)
response = await client.post(
room.webhook_url,
content=payload_bytes,
headers=headers,
)
response.raise_for_status()
log.info(
"Webhook sent successfully",
status_code=response.status_code,
response_size=len(response.content),
)
log.info(
"Webhook sent successfully",
status_code=response.status_code,
response_size=len(response.content),
)
except httpx.HTTPStatusError as e:
log.error(
@@ -226,8 +116,8 @@ async def send_transcript_webhook(
async def test_webhook(room_id: str) -> dict:
"""
Test webhook configuration by sending a sample payload.
"""Test webhook configuration by sending a sample payload.
Returns immediately with success/failure status.
This is the shared implementation used by both the API endpoint and Celery task.
"""
@@ -239,34 +129,25 @@ async def test_webhook(room_id: str) -> dict:
if not room.webhook_url:
return {"success": False, "error": "No webhook URL configured"}
now = (datetime.now(timezone.utc).isoformat(),)
payload_data = {
"event": "test",
"event_id": uuid.uuid4().hex,
"timestamp": now,
"message": "This is a test webhook from Reflector",
"room": {
"id": room.id,
"name": room.name,
},
}
payload = WebhookTestPayload(
event="test",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
message="This is a test webhook from Reflector",
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
)
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8")
payload_bytes = _serialize_payload(payload)
# Generate headers with signature
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "test",
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
# Build headers with signature
headers = build_webhook_headers(
event_type="test",
payload_bytes=payload_bytes,
webhook_secret=room.webhook_secret,
)
# Send test webhook with short timeout
async with httpx.AsyncClient(timeout=10.0) as client: