fix: webhook parity, pipeline rename, waveform constant fix (#806)

* pipeline fixes: whereby Hatchet preparation

* send_webhook fixes

* cleanup

* self-review

* comment

* webhook util functions: less dependencies

* remove comment

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
This commit is contained in:
2025-12-26 18:00:32 -05:00
committed by GitHub
parent 2d0df48767
commit 5f7b1ff1a6
9 changed files with 418 additions and 231 deletions

View File

@@ -1,5 +1,5 @@
"""
Run Hatchet workers for the diarization pipeline.
Run Hatchet workers for the multitrack pipeline.
Runs as a separate process, just like Celery workers.
Usage:
@@ -39,7 +39,7 @@ def main() -> None:
# Can't use lazy init: decorators need the client object when function is defined.
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415
diarization_pipeline,
daily_multitrack_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,
@@ -54,7 +54,7 @@ def main() -> None:
worker = hatchet.worker(
"reflector-pipeline-worker",
workflows=[
diarization_pipeline,
daily_multitrack_pipeline,
subject_workflow,
topic_chunk_workflow,
track_workflow,

View File

@@ -1,8 +1,8 @@
"""Hatchet workflow definitions."""
from reflector.hatchet.workflows.diarization_pipeline import (
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
PipelineInput,
diarization_pipeline,
daily_multitrack_pipeline,
)
from reflector.hatchet.workflows.subject_processing import (
SubjectInput,
@@ -15,7 +15,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
__all__ = [
"diarization_pipeline",
"daily_multitrack_pipeline",
"subject_workflow",
"topic_chunk_workflow",
"track_workflow",

View File

@@ -1,9 +1,12 @@
"""
Hatchet main workflow: DiarizationPipeline
Hatchet main workflow: DailyMultitrackPipeline
Multitrack diarization pipeline for Daily.co recordings.
Multitrack processing pipeline for Daily.co recordings.
Orchestrates the full processing flow from recording metadata to final transcript.
Daily.co recordings don't require ML diarization - speaker identification comes from
track index (each participant's audio is a separate track).
Note: This file uses deferred imports (inside functions/tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure DB connections
are not shared across forks, avoiding connection pooling issues.
@@ -102,7 +105,7 @@ from reflector.zulip import post_transcript_notification
class PipelineInput(BaseModel):
"""Input to trigger the diarization pipeline."""
"""Input to trigger the Daily.co multitrack pipeline."""
recording_id: NonEmptyString
tracks: list[dict] # List of {"s3_key": str}
@@ -113,7 +116,7 @@ class PipelineInput(BaseModel):
hatchet = HatchetClientManager.get_client()
diarization_pipeline = hatchet.workflow(
daily_multitrack_pipeline = hatchet.workflow(
name="DiarizationPipeline", input_validator=PipelineInput
)
@@ -172,8 +175,6 @@ def _spawn_storage():
class Loggable(Protocol):
"""Protocol for objects with a log method."""
def log(self, message: str) -> None: ...
@@ -249,7 +250,7 @@ def with_error_handling(
return decorator
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
)
@with_error_handling(TaskName.GET_RECORDING)
@@ -302,7 +303,7 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[get_recording],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
@@ -393,7 +394,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[get_participants],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
@@ -462,7 +463,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[process_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
retries=3,
@@ -559,7 +560,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[mixdown_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
@@ -627,7 +628,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
return WaveformResult(waveform_generated=True)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[process_tracks],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
@@ -732,7 +733,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
return TopicsResult(topics=topics_list)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[detect_topics],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
@@ -797,7 +798,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
return TitleResult(title=title_result)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[detect_topics],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
@@ -875,7 +876,7 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[extract_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
@@ -917,7 +918,7 @@ async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubject
return ProcessSubjectsResult(subject_summaries=subject_summaries)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[process_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
@@ -1006,7 +1007,7 @@ async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
return RecapResult(short_summary=short_summary, long_summary=long_summary)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[extract_subjects],
execution_timeout=timedelta(seconds=TIMEOUT_LONG),
retries=3,
@@ -1074,7 +1075,7 @@ async def identify_action_items(
return ActionItemsResult(action_items=action_items_response)
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
@@ -1159,7 +1160,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
return FinalizeResult(status="COMPLETED")
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[finalize], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
)
@with_error_handling(TaskName.CLEANUP_CONSENT, set_error_status=False)
@@ -1259,7 +1260,7 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
return ConsentResult()
@diarization_pipeline.task(
@daily_multitrack_pipeline.task(
parents=[cleanup_consent],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=5,
@@ -1286,14 +1287,14 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
return ZulipResult(zulip_message_id=message_id)
@diarization_pipeline.task(
parents=[post_zulip],
@daily_multitrack_pipeline.task(
parents=[cleanup_consent],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=30,
retries=5,
)
@with_error_handling(TaskName.SEND_WEBHOOK, set_error_status=False)
async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
"""Send completion webhook to external service."""
"""Send completion webhook to external service with full payload and HMAC signature."""
ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
if not input.room_id:
@@ -1302,27 +1303,39 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
async with fresh_db_connection():
from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.utils.webhook import ( # noqa: PLC0415
fetch_transcript_webhook_payload,
send_webhook_request,
)
room = await rooms_controller.get_by_id(input.room_id)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not room or not room.webhook_url:
ctx.log("send_webhook skipped (no webhook_url configured)")
return WebhookResult(webhook_sent=False, skipped=True)
if room and room.webhook_url and transcript:
webhook_payload = {
"event": "transcript.completed",
"transcript_id": input.transcript_id,
"title": transcript.title,
"duration": transcript.duration,
}
payload = await fetch_transcript_webhook_payload(
transcript_id=input.transcript_id,
room_id=input.room_id,
)
async with httpx.AsyncClient() as client:
response = await client.post(
room.webhook_url, json=webhook_payload, timeout=30
)
response.raise_for_status()
if isinstance(payload, str):
ctx.log(f"send_webhook skipped (could not build payload): {payload}")
return WebhookResult(webhook_sent=False, skipped=True)
ctx.log(f"send_webhook complete: status_code={response.status_code}")
ctx.log(
f"send_webhook: sending to {room.webhook_url} "
f"(topics={len(payload.transcript.topics)}, "
f"participants={len(payload.transcript.participants)})"
)
return WebhookResult(webhook_sent=True, response_code=response.status_code)
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
timeout=30.0,
)
return WebhookResult(webhook_sent=False, skipped=True)
ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code)

View File

@@ -4,10 +4,10 @@ Hatchet child workflow: TrackProcessing
Handles individual audio track processing: padding and transcription.
Spawned dynamically by the main diarization pipeline for each track.
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline)
Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline)
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py.
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
Note: This file uses deferred imports (inside tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure

View File

@@ -3,8 +3,12 @@ from pathlib import Path
import av
import numpy as np
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
def get_audio_waveform(path: Path | str, segments_count: int = 256) -> list[int]:
def get_audio_waveform(
path: Path | str, segments_count: int = WAVEFORM_SEGMENTS
) -> list[int]:
if isinstance(path, Path):
path = path.as_posix()
@@ -70,7 +74,7 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("path", type=Path)
parser.add_argument("--segments-count", type=int, default=256)
parser.add_argument("--segments-count", type=int, default=WAVEFORM_SEGMENTS)
args = parser.parse_args()
print(get_audio_waveform(args.path, args.segments_count))

View File

@@ -0,0 +1,216 @@
"""Webhook utilities.
Shared webhook functionality for both Hatchet and Celery pipelines.
"""
import hashlib
import hmac
import uuid
from datetime import datetime, timezone
from typing import Union
import httpx
from pydantic import BaseModel
from reflector.logger import logger
from reflector.settings import settings
from reflector.utils.string import NonEmptyString
from reflector.utils.webhook_outgoing_models import (
WebhookCalendarEventPayload,
WebhookParticipantPayload,
WebhookPayload,
WebhookRoomPayload,
WebhookTestPayload,
WebhookTopicPayload,
WebhookTranscriptPayload,
)
__all__ = [
"fetch_transcript_webhook_payload",
"fetch_test_webhook_payload",
"build_webhook_headers",
"generate_webhook_signature",
"send_webhook_request",
]
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
from reflector.utils.webvtt import topics_to_webvtt
def _serialize_payload(payload: BaseModel) -> bytes:
"""Serialize Pydantic model to compact JSON bytes."""
return payload.model_dump_json(by_alias=True, exclude_none=False).encode("utf-8")
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC-SHA256 signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
def build_webhook_headers(
event_type: str,
payload_bytes: bytes,
webhook_secret: str | None = None,
retry_count: int = 0,
) -> dict[str, str]:
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": event_type,
"X-Webhook-Retry": str(retry_count),
}
if webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(payload_bytes, webhook_secret, timestamp)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
return headers
async def send_webhook_request(
url: str,
payload: BaseModel,
event_type: str,
webhook_secret: str | None = None,
retry_count: int = 0,
timeout: float = 30.0,
) -> httpx.Response:
"""Send webhook request with proper headers and signature.
Raises:
httpx.HTTPStatusError: On non-2xx response
httpx.ConnectError: On connection failure
httpx.TimeoutException: On timeout
"""
payload_bytes = _serialize_payload(payload)
headers = build_webhook_headers(
event_type=event_type,
payload_bytes=payload_bytes,
webhook_secret=webhook_secret,
retry_count=retry_count,
)
async with httpx.AsyncClient(timeout=timeout) as client:
response = await client.post(url, content=payload_bytes, headers=headers)
response.raise_for_status()
return response
async def fetch_transcript_webhook_payload(
transcript_id: NonEmptyString,
room_id: NonEmptyString,
) -> Union[WebhookPayload, str]:
"""Build webhook payload by fetching transcript and room data from database."""
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
return f"Transcript {transcript_id} not found"
room = await rooms_controller.get_by_id(room_id)
if not room:
return f"Room {room_id} not found"
topics_data = [
WebhookTopicPayload(
title=topic.title,
summary=topic.summary,
timestamp=topic.timestamp,
duration=topic.duration,
webvtt=topics_to_webvtt([topic]) if topic.words else "",
)
for topic in (transcript.topics or [])
]
participants_data = [
WebhookParticipantPayload(id=p.id, name=p.name, speaker=p.speaker)
for p in (transcript.participants or [])
]
calendar_event_data: WebhookCalendarEventPayload | None = None
try:
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting and meeting.calendar_event_id:
calendar_event = await calendar_events_controller.get_by_id(
meeting.calendar_event_id
)
if calendar_event:
calendar_event_data = WebhookCalendarEventPayload(
id=calendar_event.id,
ics_uid=calendar_event.ics_uid,
title=calendar_event.title,
start_time=calendar_event.start_time,
end_time=calendar_event.end_time,
description=calendar_event.description or None,
location=calendar_event.location or None,
attendees=calendar_event.attendees or None,
)
except Exception as e:
logger.warning(
"Failed to fetch calendar event for webhook",
transcript_id=transcript_id,
meeting_id=transcript.meeting_id,
error=str(e),
)
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
return WebhookPayload(
event="transcript.completed",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
transcript=WebhookTranscriptPayload(
id=transcript.id,
room_id=transcript.room_id,
created_at=transcript.created_at,
duration=transcript.duration,
title=transcript.title,
short_summary=transcript.short_summary,
long_summary=transcript.long_summary,
webvtt=transcript.webvtt,
topics=topics_data,
participants=participants_data,
source_language=transcript.source_language,
target_language=transcript.target_language,
status=transcript.status,
frontend_url=frontend_url,
action_items=transcript.action_items,
),
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
calendar_event=calendar_event_data,
)
async def fetch_test_webhook_payload(
room_id: NonEmptyString,
) -> WebhookTestPayload | None:
"""Build test webhook payload."""
room = await rooms_controller.get_by_id(room_id)
if not room:
return None
return WebhookTestPayload(
event="test",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
message="This is a test webhook from Reflector",
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
)

View File

@@ -0,0 +1,80 @@
"""Pydantic models for outgoing webhook payloads.
These models define the structure of webhook payloads sent by Reflector
to external services when transcript processing completes.
"""
from datetime import datetime
from typing import Literal
from pydantic import BaseModel
from reflector.utils.string import NonEmptyString
WebhookTranscriptEventType = Literal["transcript.completed"]
WebhookTestEventType = Literal["test"]
class WebhookTopicPayload(BaseModel):
title: NonEmptyString
summary: NonEmptyString
timestamp: float
duration: float | None
webvtt: str # can be empty when no words
class WebhookParticipantPayload(BaseModel):
id: NonEmptyString
name: str | None
speaker: int | None
class WebhookRoomPayload(BaseModel):
id: NonEmptyString
name: NonEmptyString
class WebhookCalendarEventPayload(BaseModel):
id: NonEmptyString
ics_uid: str | None = None
title: str | None = None
start_time: datetime | None = None
end_time: datetime | None = None
description: str | None = None
location: str | None = None
attendees: list[str] | None = None
class WebhookTranscriptPayload(BaseModel):
id: NonEmptyString
room_id: NonEmptyString | None
created_at: datetime
duration: float | None
title: str | None
short_summary: str | None
long_summary: str | None
webvtt: str | None
topics: list[WebhookTopicPayload]
participants: list[WebhookParticipantPayload]
source_language: NonEmptyString
target_language: NonEmptyString
status: NonEmptyString
frontend_url: NonEmptyString
action_items: dict | None
class WebhookPayload(BaseModel):
event: WebhookTranscriptEventType
event_id: NonEmptyString
timestamp: datetime
transcript: WebhookTranscriptPayload
room: WebhookRoomPayload
calendar_event: WebhookCalendarEventPayload | None = None
class WebhookTestPayload(BaseModel):
event: WebhookTestEventType
event_id: NonEmptyString
timestamp: datetime
message: NonEmptyString
room: WebhookRoomPayload

View File

@@ -298,7 +298,7 @@ async def _process_multitrack_recording_inner(
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
workflow_name="DailyMultitrackPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
@@ -846,7 +846,7 @@ async def reprocess_failed_daily_recordings():
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
workflow_name="DailyMultitrackPipeline",
input_data={
"recording_id": recording.id,
"tracks": [

View File

@@ -1,8 +1,5 @@
"""Webhook task for sending transcript notifications."""
import hashlib
import hmac
import json
import uuid
from datetime import datetime, timezone
@@ -11,28 +8,20 @@ import structlog
from celery import shared_task
from celery.utils.log import get_task_logger
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.settings import settings
from reflector.utils.webvtt import topics_to_webvtt
from reflector.utils.webhook import (
WebhookRoomPayload,
WebhookTestPayload,
_serialize_payload,
build_webhook_headers,
fetch_transcript_webhook_payload,
send_webhook_request,
)
logger = structlog.wrap_logger(get_task_logger(__name__))
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
"""Generate HMAC signature for webhook payload."""
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
hmac_obj = hmac.new(
secret.encode("utf-8"),
signed_payload.encode("utf-8"),
hashlib.sha256,
)
return hmac_obj.hexdigest()
@shared_task(
bind=True,
max_retries=30,
@@ -54,12 +43,6 @@ async def send_transcript_webhook(
)
try:
# Fetch transcript and room
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
log.error("Transcript not found, skipping webhook")
return
room = await rooms_controller.get_by_id(room_id)
if not room:
log.error("Room not found, skipping webhook")
@@ -69,135 +52,36 @@ async def send_transcript_webhook(
log.info("No webhook URL configured for room, skipping")
return
# Generate WebVTT content from topics
topics_data = []
payload = await fetch_transcript_webhook_payload(
transcript_id=transcript_id,
room_id=room_id,
)
if transcript.topics:
# Build topics data with diarized content per topic
for topic in transcript.topics:
topic_webvtt = topics_to_webvtt([topic]) if topic.words else ""
topics_data.append(
{
"title": topic.title,
"summary": topic.summary,
"timestamp": topic.timestamp,
"duration": topic.duration,
"webvtt": topic_webvtt,
}
)
if isinstance(payload, str):
log.error(f"Could not build webhook payload, skipping: {payload}")
return
# Fetch meeting and calendar event if they exist
calendar_event = None
try:
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting and meeting.calendar_event_id:
calendar_event = await calendar_events_controller.get_by_id(
meeting.calendar_event_id
)
except Exception as e:
logger.error("Error fetching meeting or calendar event", error=str(e))
log.info(
"Sending webhook",
url=room.webhook_url,
topics=len(payload.transcript.topics),
participants=len(payload.transcript.participants),
)
# Build webhook payload
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
participants = [
{"id": p.id, "name": p.name, "speaker": p.speaker}
for p in (transcript.participants or [])
]
payload_data = {
"event": "transcript.completed",
"event_id": event_id,
"timestamp": datetime.now(timezone.utc).isoformat(),
"transcript": {
"id": transcript.id,
"room_id": transcript.room_id,
"created_at": transcript.created_at.isoformat(),
"duration": transcript.duration,
"title": transcript.title,
"short_summary": transcript.short_summary,
"long_summary": transcript.long_summary,
"webvtt": transcript.webvtt,
"topics": topics_data,
"participants": participants,
"source_language": transcript.source_language,
"target_language": transcript.target_language,
"status": transcript.status,
"frontend_url": frontend_url,
"action_items": transcript.action_items,
},
"room": {
"id": room.id,
"name": room.name,
},
}
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
retry_count=self.request.retries,
timeout=30.0,
)
# Always include calendar_event field, even if no event is present
payload_data["calendar_event"] = {}
# Add calendar event data if present
if calendar_event:
calendar_data = {
"id": calendar_event.id,
"ics_uid": calendar_event.ics_uid,
"title": calendar_event.title,
"start_time": calendar_event.start_time.isoformat()
if calendar_event.start_time
else None,
"end_time": calendar_event.end_time.isoformat()
if calendar_event.end_time
else None,
}
# Add optional fields only if they exist
if calendar_event.description:
calendar_data["description"] = calendar_event.description
if calendar_event.location:
calendar_data["location"] = calendar_event.location
if calendar_event.attendees:
calendar_data["attendees"] = calendar_event.attendees
payload_data["calendar_event"] = calendar_data
# Convert to JSON
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8")
# Generate signature if secret is configured
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "transcript.completed",
"X-Webhook-Retry": str(self.request.retries),
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
# Send webhook with timeout
async with httpx.AsyncClient(timeout=30.0) as client:
log.info(
"Sending webhook",
url=room.webhook_url,
payload_size=len(payload_bytes),
)
response = await client.post(
room.webhook_url,
content=payload_bytes,
headers=headers,
)
response.raise_for_status()
log.info(
"Webhook sent successfully",
status_code=response.status_code,
response_size=len(response.content),
)
log.info(
"Webhook sent successfully",
status_code=response.status_code,
response_size=len(response.content),
)
except httpx.HTTPStatusError as e:
log.error(
@@ -226,8 +110,8 @@ async def send_transcript_webhook(
async def test_webhook(room_id: str) -> dict:
"""
Test webhook configuration by sending a sample payload.
"""Test webhook configuration by sending a sample payload.
Returns immediately with success/failure status.
This is the shared implementation used by both the API endpoint and Celery task.
"""
@@ -239,34 +123,24 @@ async def test_webhook(room_id: str) -> dict:
if not room.webhook_url:
return {"success": False, "error": "No webhook URL configured"}
now = (datetime.now(timezone.utc).isoformat(),)
payload_data = {
"event": "test",
"event_id": uuid.uuid4().hex,
"timestamp": now,
"message": "This is a test webhook from Reflector",
"room": {
"id": room.id,
"name": room.name,
},
}
payload = WebhookTestPayload(
event="test",
event_id=uuid.uuid4().hex,
timestamp=datetime.now(timezone.utc),
message="This is a test webhook from Reflector",
room=WebhookRoomPayload(
id=room.id,
name=room.name,
),
)
payload_json = json.dumps(payload_data, separators=(",", ":"))
payload_bytes = payload_json.encode("utf-8")
payload_bytes = _serialize_payload(payload)
# Generate headers with signature
headers = {
"Content-Type": "application/json",
"User-Agent": "Reflector-Webhook/1.0",
"X-Webhook-Event": "test",
}
if room.webhook_secret:
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
signature = generate_webhook_signature(
payload_bytes, room.webhook_secret, timestamp
)
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
headers = build_webhook_headers(
event_type="test",
payload_bytes=payload_bytes,
webhook_secret=room.webhook_secret,
)
# Send test webhook with short timeout
async with httpx.AsyncClient(timeout=10.0) as client: