mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-04 18:06:48 +00:00
Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7f42ef6d17 | |||
| 5f7b1ff1a6 | |||
| 2d0df48767 | |||
| 5baa6dd92e | |||
| bab1e2d537 | |||
| e886153ae1 | |||
| 7b352f465e | |||
| 3cf9757ac2 | |||
| d9d3938192 | |||
| 8598707c1c | |||
| 594bcc09e0 |
19
CHANGELOG.md
19
CHANGELOG.md
@@ -1,5 +1,24 @@
|
||||
# Changelog
|
||||
|
||||
## [0.27.0](https://github.com/Monadical-SAS/reflector/compare/v0.26.0...v0.27.0) (2025-12-26)
|
||||
|
||||
|
||||
### Features
|
||||
|
||||
* devex/hatchet log progress track ([#813](https://github.com/Monadical-SAS/reflector/issues/813)) ([2d0df48](https://github.com/Monadical-SAS/reflector/commit/2d0df487674e5486208cd599e3338ebff8b6e470))
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* webhook parity, pipeline rename, waveform constant fix ([#806](https://github.com/Monadical-SAS/reflector/issues/806)) ([5f7b1ff](https://github.com/Monadical-SAS/reflector/commit/5f7b1ff1a68ebbb907684c7c5f55c1f82dac8550))
|
||||
|
||||
## [0.26.0](https://github.com/Monadical-SAS/reflector/compare/v0.25.0...v0.26.0) (2025-12-23)
|
||||
|
||||
|
||||
### Features
|
||||
|
||||
* parallelize hatchet ([#804](https://github.com/Monadical-SAS/reflector/issues/804)) ([594bcc0](https://github.com/Monadical-SAS/reflector/commit/594bcc09e0ca744163de2f1525ebbf7c52a68448))
|
||||
|
||||
## [0.25.0](https://github.com/Monadical-SAS/reflector/compare/v0.24.0...v0.25.0) (2025-12-22)
|
||||
|
||||
|
||||
|
||||
@@ -5,7 +5,7 @@ import shutil
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Literal
|
||||
from typing import Any, Literal, Sequence
|
||||
|
||||
import sqlalchemy
|
||||
from fastapi import HTTPException
|
||||
@@ -180,7 +180,7 @@ class TranscriptDuration(BaseModel):
|
||||
|
||||
|
||||
class TranscriptWaveform(BaseModel):
|
||||
waveform: list[float]
|
||||
waveform: Sequence[float]
|
||||
|
||||
|
||||
class TranscriptEvent(BaseModel):
|
||||
|
||||
@@ -81,7 +81,8 @@ async def set_status_and_broadcast(
|
||||
async def append_event_and_broadcast(
|
||||
transcript_id: NonEmptyString,
|
||||
transcript: Transcript,
|
||||
event_name: str,
|
||||
event_name: NonEmptyString,
|
||||
# TODO proper dictionary event => type
|
||||
data: Any,
|
||||
logger: structlog.BoundLogger,
|
||||
) -> TranscriptEvent:
|
||||
|
||||
41
server/reflector/hatchet/constants.py
Normal file
41
server/reflector/hatchet/constants.py
Normal file
@@ -0,0 +1,41 @@
|
||||
"""
|
||||
Hatchet workflow constants.
|
||||
"""
|
||||
|
||||
from enum import StrEnum
|
||||
|
||||
|
||||
class TaskName(StrEnum):
|
||||
GET_RECORDING = "get_recording"
|
||||
GET_PARTICIPANTS = "get_participants"
|
||||
PROCESS_TRACKS = "process_tracks"
|
||||
MIXDOWN_TRACKS = "mixdown_tracks"
|
||||
GENERATE_WAVEFORM = "generate_waveform"
|
||||
DETECT_TOPICS = "detect_topics"
|
||||
GENERATE_TITLE = "generate_title"
|
||||
EXTRACT_SUBJECTS = "extract_subjects"
|
||||
PROCESS_SUBJECTS = "process_subjects"
|
||||
GENERATE_RECAP = "generate_recap"
|
||||
IDENTIFY_ACTION_ITEMS = "identify_action_items"
|
||||
FINALIZE = "finalize"
|
||||
CLEANUP_CONSENT = "cleanup_consent"
|
||||
POST_ZULIP = "post_zulip"
|
||||
SEND_WEBHOOK = "send_webhook"
|
||||
PAD_TRACK = "pad_track"
|
||||
TRANSCRIBE_TRACK = "transcribe_track"
|
||||
DETECT_CHUNK_TOPIC = "detect_chunk_topic"
|
||||
GENERATE_DETAILED_SUMMARY = "generate_detailed_summary"
|
||||
|
||||
|
||||
# Rate limit key for LLM API calls (shared across all LLM-calling tasks)
|
||||
LLM_RATE_LIMIT_KEY = "llm"
|
||||
|
||||
# Max LLM calls per second across all tasks
|
||||
LLM_RATE_LIMIT_PER_SECOND = 10
|
||||
|
||||
# Task execution timeouts (seconds)
|
||||
TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates
|
||||
TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation
|
||||
TIMEOUT_LONG = 180 # Action items (larger context LLM)
|
||||
TIMEOUT_AUDIO = 300 # Audio processing: padding, mixdown
|
||||
TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks
|
||||
@@ -1,5 +1,5 @@
|
||||
"""
|
||||
Run Hatchet workers for the diarization pipeline.
|
||||
Run Hatchet workers for the multitrack pipeline.
|
||||
Runs as a separate process, just like Celery workers.
|
||||
|
||||
Usage:
|
||||
@@ -12,6 +12,9 @@ Usage:
|
||||
import signal
|
||||
import sys
|
||||
|
||||
from hatchet_sdk.rate_limit import RateLimitDuration
|
||||
|
||||
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND
|
||||
from reflector.logger import logger
|
||||
from reflector.settings import settings
|
||||
|
||||
@@ -36,15 +39,26 @@ def main() -> None:
|
||||
# Can't use lazy init: decorators need the client object when function is defined.
|
||||
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
|
||||
from reflector.hatchet.workflows import ( # noqa: PLC0415
|
||||
diarization_pipeline,
|
||||
daily_multitrack_pipeline,
|
||||
subject_workflow,
|
||||
topic_chunk_workflow,
|
||||
track_workflow,
|
||||
)
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
hatchet.rate_limits.put(
|
||||
LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND, RateLimitDuration.SECOND
|
||||
)
|
||||
|
||||
worker = hatchet.worker(
|
||||
"reflector-diarization-worker",
|
||||
workflows=[diarization_pipeline, track_workflow],
|
||||
"reflector-pipeline-worker",
|
||||
workflows=[
|
||||
daily_multitrack_pipeline,
|
||||
subject_workflow,
|
||||
topic_chunk_workflow,
|
||||
track_workflow,
|
||||
],
|
||||
)
|
||||
|
||||
def shutdown_handler(signum: int, frame) -> None:
|
||||
|
||||
@@ -1,14 +1,26 @@
|
||||
"""Hatchet workflow definitions."""
|
||||
|
||||
from reflector.hatchet.workflows.diarization_pipeline import (
|
||||
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||
PipelineInput,
|
||||
diarization_pipeline,
|
||||
daily_multitrack_pipeline,
|
||||
)
|
||||
from reflector.hatchet.workflows.subject_processing import (
|
||||
SubjectInput,
|
||||
subject_workflow,
|
||||
)
|
||||
from reflector.hatchet.workflows.topic_chunk_processing import (
|
||||
TopicChunkInput,
|
||||
topic_chunk_workflow,
|
||||
)
|
||||
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
||||
|
||||
__all__ = [
|
||||
"diarization_pipeline",
|
||||
"daily_multitrack_pipeline",
|
||||
"subject_workflow",
|
||||
"topic_chunk_workflow",
|
||||
"track_workflow",
|
||||
"PipelineInput",
|
||||
"SubjectInput",
|
||||
"TopicChunkInput",
|
||||
"TrackInput",
|
||||
]
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -5,13 +5,21 @@ Provides static typing for all task outputs, enabling type checking
|
||||
and better IDE support.
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.processors.summary.models import ActionItemsResponse
|
||||
from reflector.processors.types import TitleSummary, Word
|
||||
from reflector.utils.string import NonEmptyString
|
||||
|
||||
|
||||
class ParticipantInfo(BaseModel):
|
||||
"""Participant info with speaker index for workflow result."""
|
||||
|
||||
participant_id: NonEmptyString
|
||||
user_name: NonEmptyString
|
||||
speaker: int
|
||||
|
||||
|
||||
class PadTrackResult(BaseModel):
|
||||
"""Result from pad_track task."""
|
||||
|
||||
@@ -26,7 +34,7 @@ class PadTrackResult(BaseModel):
|
||||
class TranscribeTrackResult(BaseModel):
|
||||
"""Result from transcribe_track task."""
|
||||
|
||||
words: list[dict[str, Any]]
|
||||
words: list[Word]
|
||||
track_index: int
|
||||
|
||||
|
||||
@@ -35,13 +43,13 @@ class RecordingResult(BaseModel):
|
||||
|
||||
id: NonEmptyString | None
|
||||
mtg_session_id: NonEmptyString | None
|
||||
duration: float
|
||||
duration: int | None
|
||||
|
||||
|
||||
class ParticipantsResult(BaseModel):
|
||||
"""Result from get_participants task."""
|
||||
|
||||
participants: list[dict[str, Any]]
|
||||
participants: list[ParticipantInfo]
|
||||
num_tracks: int
|
||||
source_language: NonEmptyString
|
||||
target_language: NonEmptyString
|
||||
@@ -57,7 +65,7 @@ class PaddedTrackInfo(BaseModel):
|
||||
class ProcessTracksResult(BaseModel):
|
||||
"""Result from process_tracks task."""
|
||||
|
||||
all_words: list[dict[str, Any]]
|
||||
all_words: list[Word]
|
||||
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
|
||||
word_count: int
|
||||
num_tracks: int
|
||||
@@ -79,10 +87,21 @@ class WaveformResult(BaseModel):
|
||||
waveform_generated: bool
|
||||
|
||||
|
||||
class TopicChunkResult(BaseModel):
|
||||
"""Result from topic chunk child workflow."""
|
||||
|
||||
chunk_index: int
|
||||
title: str
|
||||
summary: str
|
||||
timestamp: float
|
||||
duration: float
|
||||
words: list[Word]
|
||||
|
||||
|
||||
class TopicsResult(BaseModel):
|
||||
"""Result from detect_topics task."""
|
||||
|
||||
topics: list[dict[str, Any]]
|
||||
topics: list[TitleSummary]
|
||||
|
||||
|
||||
class TitleResult(BaseModel):
|
||||
@@ -91,12 +110,41 @@ class TitleResult(BaseModel):
|
||||
title: str | None
|
||||
|
||||
|
||||
class SummaryResult(BaseModel):
|
||||
"""Result from generate_summary task."""
|
||||
class SubjectsResult(BaseModel):
|
||||
"""Result from extract_subjects task."""
|
||||
|
||||
summary: str | None
|
||||
short_summary: str | None
|
||||
action_items: dict | None = None
|
||||
subjects: list[str]
|
||||
transcript_text: str # Formatted transcript for LLM consumption
|
||||
participant_names: list[str]
|
||||
participant_name_to_id: dict[str, str]
|
||||
|
||||
|
||||
class SubjectSummaryResult(BaseModel):
|
||||
"""Result from subject summary child workflow."""
|
||||
|
||||
subject: str
|
||||
subject_index: int
|
||||
detailed_summary: str
|
||||
paragraph_summary: str
|
||||
|
||||
|
||||
class ProcessSubjectsResult(BaseModel):
|
||||
"""Result from process_subjects fan-out task."""
|
||||
|
||||
subject_summaries: list[SubjectSummaryResult]
|
||||
|
||||
|
||||
class RecapResult(BaseModel):
|
||||
"""Result from generate_recap task."""
|
||||
|
||||
short_summary: str # Recap paragraph
|
||||
long_summary: str # Full markdown summary
|
||||
|
||||
|
||||
class ActionItemsResult(BaseModel):
|
||||
"""Result from identify_action_items task."""
|
||||
|
||||
action_items: ActionItemsResponse
|
||||
|
||||
|
||||
class FinalizeResult(BaseModel):
|
||||
|
||||
107
server/reflector/hatchet/workflows/subject_processing.py
Normal file
107
server/reflector/hatchet/workflows/subject_processing.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""
|
||||
Hatchet child workflow: SubjectProcessing
|
||||
|
||||
Handles individual subject/topic summary generation.
|
||||
Spawned dynamically by the main diarization pipeline for each extracted subject
|
||||
via aio_run_many() for parallel processing.
|
||||
"""
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
from hatchet_sdk import Context
|
||||
from hatchet_sdk.rate_limit import RateLimit
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
|
||||
from reflector.hatchet.workflows.models import SubjectSummaryResult
|
||||
from reflector.logger import logger
|
||||
from reflector.processors.summary.prompts import (
|
||||
DETAILED_SUBJECT_PROMPT_TEMPLATE,
|
||||
PARAGRAPH_SUMMARY_PROMPT,
|
||||
build_participant_instructions,
|
||||
)
|
||||
|
||||
|
||||
class SubjectInput(BaseModel):
|
||||
"""Input for individual subject processing."""
|
||||
|
||||
subject: str
|
||||
subject_index: int
|
||||
transcript_text: str
|
||||
participant_names: list[str]
|
||||
participant_name_to_id: dict[str, str]
|
||||
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
subject_workflow = hatchet.workflow(
|
||||
name="SubjectProcessing", input_validator=SubjectInput
|
||||
)
|
||||
|
||||
|
||||
@subject_workflow.task(
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
|
||||
retries=3,
|
||||
rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=2)],
|
||||
)
|
||||
async def generate_detailed_summary(
|
||||
input: SubjectInput, ctx: Context
|
||||
) -> SubjectSummaryResult:
|
||||
"""Generate detailed analysis for a single subject, then condense to paragraph."""
|
||||
ctx.log(
|
||||
f"generate_detailed_summary: subject '{input.subject}' (index {input.subject_index})"
|
||||
)
|
||||
logger.info(
|
||||
"[Hatchet] generate_detailed_summary",
|
||||
subject=input.subject,
|
||||
subject_index=input.subject_index,
|
||||
)
|
||||
|
||||
# Deferred imports: Hatchet workers fork processes, fresh imports ensure
|
||||
# LLM HTTP connection pools aren't shared across forks
|
||||
from reflector.llm import LLM # noqa: PLC0415
|
||||
from reflector.settings import settings # noqa: PLC0415
|
||||
|
||||
llm = LLM(settings=settings)
|
||||
|
||||
participant_instructions = build_participant_instructions(input.participant_names)
|
||||
detailed_prompt = DETAILED_SUBJECT_PROMPT_TEMPLATE.format(subject=input.subject)
|
||||
if participant_instructions:
|
||||
detailed_prompt = f"{detailed_prompt}\n\n{participant_instructions}"
|
||||
|
||||
ctx.log("generate_detailed_summary: calling LLM for detailed analysis")
|
||||
detailed_response = await llm.get_response(
|
||||
detailed_prompt,
|
||||
[input.transcript_text],
|
||||
tone_name="Topic assistant",
|
||||
)
|
||||
detailed_summary = str(detailed_response)
|
||||
|
||||
paragraph_prompt = PARAGRAPH_SUMMARY_PROMPT
|
||||
if participant_instructions:
|
||||
paragraph_prompt = f"{paragraph_prompt}\n\n{participant_instructions}"
|
||||
|
||||
ctx.log("generate_detailed_summary: calling LLM for paragraph summary")
|
||||
paragraph_response = await llm.get_response(
|
||||
paragraph_prompt,
|
||||
[detailed_summary],
|
||||
tone_name="Topic summarizer",
|
||||
)
|
||||
paragraph_summary = str(paragraph_response)
|
||||
|
||||
ctx.log(f"generate_detailed_summary complete: subject '{input.subject}'")
|
||||
logger.info(
|
||||
"[Hatchet] generate_detailed_summary complete",
|
||||
subject=input.subject,
|
||||
subject_index=input.subject_index,
|
||||
detailed_len=len(detailed_summary),
|
||||
paragraph_len=len(paragraph_summary),
|
||||
)
|
||||
|
||||
return SubjectSummaryResult(
|
||||
subject=input.subject,
|
||||
subject_index=input.subject_index,
|
||||
detailed_summary=detailed_summary,
|
||||
paragraph_summary=paragraph_summary,
|
||||
)
|
||||
97
server/reflector/hatchet/workflows/topic_chunk_processing.py
Normal file
97
server/reflector/hatchet/workflows/topic_chunk_processing.py
Normal file
@@ -0,0 +1,97 @@
|
||||
"""
|
||||
Hatchet child workflow: TopicChunkProcessing
|
||||
|
||||
Handles topic detection for individual transcript chunks.
|
||||
Spawned dynamically by detect_topics via aio_run_many() for parallel processing.
|
||||
"""
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
from hatchet_sdk import ConcurrencyExpression, ConcurrencyLimitStrategy, Context
|
||||
from hatchet_sdk.rate_limit import RateLimit
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
|
||||
from reflector.hatchet.workflows.models import TopicChunkResult
|
||||
from reflector.logger import logger
|
||||
from reflector.processors.prompts import TOPIC_PROMPT
|
||||
from reflector.processors.types import Word
|
||||
|
||||
|
||||
class TopicChunkInput(BaseModel):
|
||||
"""Input for individual topic chunk processing."""
|
||||
|
||||
chunk_index: int
|
||||
chunk_text: str
|
||||
timestamp: float
|
||||
duration: float
|
||||
words: list[Word]
|
||||
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
topic_chunk_workflow = hatchet.workflow(
|
||||
name="TopicChunkProcessing",
|
||||
input_validator=TopicChunkInput,
|
||||
concurrency=ConcurrencyExpression(
|
||||
expression="'global'", # constant string = global limit across all runs
|
||||
max_runs=20,
|
||||
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@topic_chunk_workflow.task(
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
|
||||
retries=3,
|
||||
rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=1)],
|
||||
)
|
||||
async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunkResult:
|
||||
"""Detect topic for a single transcript chunk."""
|
||||
ctx.log(f"detect_chunk_topic: chunk {input.chunk_index}")
|
||||
logger.info(
|
||||
"[Hatchet] detect_chunk_topic",
|
||||
chunk_index=input.chunk_index,
|
||||
text_length=len(input.chunk_text),
|
||||
)
|
||||
|
||||
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
||||
# sharing LLM HTTP connection pools across forks
|
||||
from reflector.llm import LLM # noqa: PLC0415
|
||||
from reflector.processors.transcript_topic_detector import ( # noqa: PLC0415
|
||||
TopicResponse,
|
||||
)
|
||||
from reflector.settings import settings # noqa: PLC0415
|
||||
from reflector.utils.text import clean_title # noqa: PLC0415
|
||||
|
||||
llm = LLM(settings=settings, temperature=0.9, max_tokens=500)
|
||||
|
||||
prompt = TOPIC_PROMPT.format(text=input.chunk_text)
|
||||
response = await llm.get_structured_response(
|
||||
prompt,
|
||||
[input.chunk_text],
|
||||
TopicResponse,
|
||||
tone_name="Topic analyzer",
|
||||
timeout=settings.LLM_STRUCTURED_RESPONSE_TIMEOUT,
|
||||
)
|
||||
|
||||
title = clean_title(response.title)
|
||||
|
||||
ctx.log(
|
||||
f"detect_chunk_topic complete: chunk {input.chunk_index}, title='{title[:50]}'"
|
||||
)
|
||||
logger.info(
|
||||
"[Hatchet] detect_chunk_topic complete",
|
||||
chunk_index=input.chunk_index,
|
||||
title=title[:50],
|
||||
)
|
||||
|
||||
return TopicChunkResult(
|
||||
chunk_index=input.chunk_index,
|
||||
title=title,
|
||||
summary=response.summary,
|
||||
timestamp=input.timestamp,
|
||||
duration=input.duration,
|
||||
words=input.words,
|
||||
)
|
||||
@@ -4,10 +4,10 @@ Hatchet child workflow: TrackProcessing
|
||||
Handles individual audio track processing: padding and transcription.
|
||||
Spawned dynamically by the main diarization pipeline for each track.
|
||||
|
||||
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline)
|
||||
Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline)
|
||||
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
|
||||
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
|
||||
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py.
|
||||
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
|
||||
|
||||
Note: This file uses deferred imports (inside tasks) intentionally.
|
||||
Hatchet workers run in forked processes; fresh imports per task ensure
|
||||
@@ -23,6 +23,7 @@ from hatchet_sdk import Context
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
|
||||
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
|
||||
from reflector.logger import logger
|
||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
||||
@@ -47,7 +48,7 @@ hatchet = HatchetClientManager.get_client()
|
||||
track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput)
|
||||
|
||||
|
||||
@track_workflow.task(execution_timeout=timedelta(seconds=300), retries=3)
|
||||
@track_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
|
||||
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
||||
"""Pad single audio track with silence for alignment.
|
||||
|
||||
@@ -82,6 +83,15 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
||||
)
|
||||
|
||||
with av.open(source_url) as in_container:
|
||||
if in_container.duration:
|
||||
try:
|
||||
duration = timedelta(seconds=in_container.duration // 1_000_000)
|
||||
ctx.log(
|
||||
f"pad_track: track {input.track_index}, duration={duration}"
|
||||
)
|
||||
except Exception:
|
||||
ctx.log(f"pad_track: track {input.track_index}, duration=ERROR")
|
||||
|
||||
start_time_seconds = extract_stream_start_time_from_container(
|
||||
in_container, input.track_index, logger=logger
|
||||
)
|
||||
@@ -153,7 +163,7 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
||||
|
||||
|
||||
@track_workflow.task(
|
||||
parents=[pad_track], execution_timeout=timedelta(seconds=600), retries=3
|
||||
parents=[pad_track], execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3
|
||||
)
|
||||
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
|
||||
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
|
||||
@@ -197,23 +207,20 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
|
||||
transcript = await transcribe_file_with_processor(audio_url, input.language)
|
||||
|
||||
# Tag all words with speaker index
|
||||
words = []
|
||||
for word in transcript.words:
|
||||
word_dict = word.model_dump()
|
||||
word_dict["speaker"] = input.track_index
|
||||
words.append(word_dict)
|
||||
word.speaker = input.track_index
|
||||
|
||||
ctx.log(
|
||||
f"transcribe_track complete: track {input.track_index}, {len(words)} words"
|
||||
f"transcribe_track complete: track {input.track_index}, {len(transcript.words)} words"
|
||||
)
|
||||
logger.info(
|
||||
"[Hatchet] transcribe_track complete",
|
||||
track_index=input.track_index,
|
||||
word_count=len(words),
|
||||
word_count=len(transcript.words),
|
||||
)
|
||||
|
||||
return TranscribeTrackResult(
|
||||
words=words,
|
||||
words=transcript.words,
|
||||
track_index=input.track_index,
|
||||
)
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ from reflector.processors import (
|
||||
)
|
||||
from reflector.processors.types import TitleSummary
|
||||
from reflector.processors.types import Transcript as TranscriptType
|
||||
from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT
|
||||
|
||||
|
||||
class EmptyPipeline:
|
||||
@@ -38,7 +39,7 @@ async def detect_topics(
|
||||
on_topic_callback: Callable,
|
||||
empty_pipeline: EmptyPipeline,
|
||||
) -> list[TitleSummary]:
|
||||
chunk_size = 300
|
||||
chunk_size = TOPIC_CHUNK_WORD_COUNT
|
||||
topics: list[TitleSummary] = []
|
||||
|
||||
async def on_topic(topic: TitleSummary):
|
||||
|
||||
30
server/reflector/processors/prompts.py
Normal file
30
server/reflector/processors/prompts.py
Normal file
@@ -0,0 +1,30 @@
|
||||
"""
|
||||
LLM prompts for transcript processing.
|
||||
|
||||
Extracted to a separate module to avoid circular imports when importing
|
||||
from processor modules (which import LLM/settings at module level).
|
||||
"""
|
||||
|
||||
from textwrap import dedent
|
||||
|
||||
TOPIC_PROMPT = dedent(
|
||||
"""
|
||||
Analyze the following transcript segment and extract the main topic being discussed.
|
||||
Focus on the substantive content and ignore small talk or administrative chatter.
|
||||
|
||||
Create a title that:
|
||||
- Captures the specific subject matter being discussed
|
||||
- Is descriptive and self-explanatory
|
||||
- Uses professional language
|
||||
- Is specific rather than generic
|
||||
|
||||
For the summary:
|
||||
- Summarize the key points in maximum two sentences
|
||||
- Focus on what was discussed, decided, or accomplished
|
||||
- Be concise but informative
|
||||
|
||||
<transcript>
|
||||
{text}
|
||||
</transcript>
|
||||
"""
|
||||
).strip()
|
||||
50
server/reflector/processors/summary/models.py
Normal file
50
server/reflector/processors/summary/models.py
Normal file
@@ -0,0 +1,50 @@
|
||||
"""Pydantic models for summary processing."""
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class ActionItem(BaseModel):
|
||||
"""A single action item from the meeting"""
|
||||
|
||||
task: str = Field(description="The task or action item to be completed")
|
||||
assigned_to: str | None = Field(
|
||||
default=None, description="Person or team assigned to this task (name)"
|
||||
)
|
||||
assigned_to_participant_id: str | None = Field(
|
||||
default=None, description="Participant ID if assigned_to matches a participant"
|
||||
)
|
||||
deadline: str | None = Field(
|
||||
default=None, description="Deadline or timeframe mentioned for this task"
|
||||
)
|
||||
context: str | None = Field(
|
||||
default=None, description="Additional context or notes about this task"
|
||||
)
|
||||
|
||||
|
||||
class Decision(BaseModel):
|
||||
"""A decision made during the meeting"""
|
||||
|
||||
decision: str = Field(description="What was decided")
|
||||
rationale: str | None = Field(
|
||||
default=None,
|
||||
description="Reasoning or key factors that influenced this decision",
|
||||
)
|
||||
decided_by: str | None = Field(
|
||||
default=None, description="Person or group who made the decision (name)"
|
||||
)
|
||||
decided_by_participant_id: str | None = Field(
|
||||
default=None, description="Participant ID if decided_by matches a participant"
|
||||
)
|
||||
|
||||
|
||||
class ActionItemsResponse(BaseModel):
|
||||
"""Pydantic model for identified action items"""
|
||||
|
||||
decisions: list[Decision] = Field(
|
||||
default_factory=list,
|
||||
description="List of decisions made during the meeting",
|
||||
)
|
||||
next_steps: list[ActionItem] = Field(
|
||||
default_factory=list,
|
||||
description="List of action items and next steps to be taken",
|
||||
)
|
||||
91
server/reflector/processors/summary/prompts.py
Normal file
91
server/reflector/processors/summary/prompts.py
Normal file
@@ -0,0 +1,91 @@
|
||||
"""
|
||||
LLM prompts for summary generation.
|
||||
|
||||
Extracted to a separate module to avoid circular imports when importing
|
||||
from summary_builder.py (which imports LLM/settings at module level).
|
||||
"""
|
||||
|
||||
from textwrap import dedent
|
||||
|
||||
|
||||
def build_participant_instructions(participant_names: list[str]) -> str:
|
||||
"""Build participant context instructions for LLM prompts."""
|
||||
if not participant_names:
|
||||
return ""
|
||||
|
||||
participants_list = ", ".join(participant_names)
|
||||
return dedent(
|
||||
f"""
|
||||
# IMPORTANT: Participant Names
|
||||
The following participants are identified in this conversation: {participants_list}
|
||||
|
||||
You MUST use these specific participant names when referring to people in your response.
|
||||
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
|
||||
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
|
||||
"""
|
||||
).strip()
|
||||
|
||||
|
||||
DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
|
||||
"""
|
||||
Get me information about the topic "{subject}"
|
||||
|
||||
# RESPONSE GUIDELINES
|
||||
Follow this structured approach to create the topic summary:
|
||||
- Highlight important arguments, insights, or data presented.
|
||||
- Outline decisions made.
|
||||
- Indicate any decisions reached, including any rationale or key factors
|
||||
that influenced these decisions.
|
||||
- Detail action items and responsibilities.
|
||||
- For each decision or unresolved issue, list specific action items agreed
|
||||
upon, along with assigned individuals or teams responsible for each task.
|
||||
- Specify deadlines or timelines if mentioned. For each action item,
|
||||
include any deadlines or timeframes discussed for completion or follow-up.
|
||||
- Mention unresolved issues or topics needing further discussion, aiding in
|
||||
planning future meetings or follow-up actions.
|
||||
- Do not include topic unrelated to {subject}.
|
||||
|
||||
# OUTPUT
|
||||
Your summary should be clear, concise, and structured, covering all major
|
||||
points, decisions, and action items from the meeting. It should be easy to
|
||||
understand for someone not present, providing a comprehensive understanding
|
||||
of what transpired and what needs to be done next. The summary should not
|
||||
exceed one page to ensure brevity and focus.
|
||||
"""
|
||||
).strip()
|
||||
|
||||
PARAGRAPH_SUMMARY_PROMPT = dedent(
|
||||
"""
|
||||
Summarize the mentioned topic in 1 paragraph.
|
||||
It will be integrated into the final summary, so just for this topic.
|
||||
"""
|
||||
).strip()
|
||||
|
||||
RECAP_PROMPT = dedent(
|
||||
"""
|
||||
Provide a high-level quick recap of the following meeting, fitting in one paragraph.
|
||||
Do not include decisions, action items or unresolved issue, just highlight the high moments.
|
||||
Just dive into the meeting, be concise and do not include unnecessary details.
|
||||
As we already know it is a meeting, do not start with 'During the meeting' or equivalent.
|
||||
"""
|
||||
).strip()
|
||||
|
||||
|
||||
def build_summary_markdown(recap: str, summaries: list[dict[str, str]]) -> str:
|
||||
"""Build markdown summary from recap and subject summaries."""
|
||||
lines: list[str] = []
|
||||
if recap:
|
||||
lines.append("# Quick recap")
|
||||
lines.append("")
|
||||
lines.append(recap)
|
||||
lines.append("")
|
||||
|
||||
if summaries:
|
||||
lines.append("# Summary")
|
||||
lines.append("")
|
||||
for summary in summaries:
|
||||
lines.append(f"**{summary['subject']}**")
|
||||
lines.append(summary["summary"])
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
@@ -15,6 +15,14 @@ import structlog
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from reflector.llm import LLM
|
||||
from reflector.processors.summary.models import ActionItemsResponse
|
||||
from reflector.processors.summary.prompts import (
|
||||
DETAILED_SUBJECT_PROMPT_TEMPLATE,
|
||||
PARAGRAPH_SUMMARY_PROMPT,
|
||||
RECAP_PROMPT,
|
||||
build_participant_instructions,
|
||||
build_summary_markdown,
|
||||
)
|
||||
from reflector.settings import settings
|
||||
|
||||
T = TypeVar("T", bound=BaseModel)
|
||||
@@ -52,50 +60,6 @@ SUBJECTS_PROMPT = dedent(
|
||||
"""
|
||||
).strip()
|
||||
|
||||
DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
|
||||
"""
|
||||
Get me information about the topic "{subject}"
|
||||
|
||||
# RESPONSE GUIDELINES
|
||||
Follow this structured approach to create the topic summary:
|
||||
- Highlight important arguments, insights, or data presented.
|
||||
- Outline decisions made.
|
||||
- Indicate any decisions reached, including any rationale or key factors
|
||||
that influenced these decisions.
|
||||
- Detail action items and responsibilities.
|
||||
- For each decision or unresolved issue, list specific action items agreed
|
||||
upon, along with assigned individuals or teams responsible for each task.
|
||||
- Specify deadlines or timelines if mentioned. For each action item,
|
||||
include any deadlines or timeframes discussed for completion or follow-up.
|
||||
- Mention unresolved issues or topics needing further discussion, aiding in
|
||||
planning future meetings or follow-up actions.
|
||||
- Do not include topic unrelated to {subject}.
|
||||
|
||||
# OUTPUT
|
||||
Your summary should be clear, concise, and structured, covering all major
|
||||
points, decisions, and action items from the meeting. It should be easy to
|
||||
understand for someone not present, providing a comprehensive understanding
|
||||
of what transpired and what needs to be done next. The summary should not
|
||||
exceed one page to ensure brevity and focus.
|
||||
"""
|
||||
).strip()
|
||||
|
||||
PARAGRAPH_SUMMARY_PROMPT = dedent(
|
||||
"""
|
||||
Summarize the mentioned topic in 1 paragraph.
|
||||
It will be integrated into the final summary, so just for this topic.
|
||||
"""
|
||||
).strip()
|
||||
|
||||
RECAP_PROMPT = dedent(
|
||||
"""
|
||||
Provide a high-level quick recap of the following meeting, fitting in one paragraph.
|
||||
Do not include decisions, action items or unresolved issue, just highlight the high moments.
|
||||
Just dive into the meeting, be concise and do not include unnecessary details.
|
||||
As we already know it is a meeting, do not start with 'During the meeting' or equivalent.
|
||||
"""
|
||||
).strip()
|
||||
|
||||
ACTION_ITEMS_PROMPT = dedent(
|
||||
"""
|
||||
Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next.
|
||||
@@ -185,53 +149,6 @@ class SubjectsResponse(BaseModel):
|
||||
)
|
||||
|
||||
|
||||
class ActionItem(BaseModel):
|
||||
"""A single action item from the meeting"""
|
||||
|
||||
task: str = Field(description="The task or action item to be completed")
|
||||
assigned_to: str | None = Field(
|
||||
default=None, description="Person or team assigned to this task (name)"
|
||||
)
|
||||
assigned_to_participant_id: str | None = Field(
|
||||
default=None, description="Participant ID if assigned_to matches a participant"
|
||||
)
|
||||
deadline: str | None = Field(
|
||||
default=None, description="Deadline or timeframe mentioned for this task"
|
||||
)
|
||||
context: str | None = Field(
|
||||
default=None, description="Additional context or notes about this task"
|
||||
)
|
||||
|
||||
|
||||
class Decision(BaseModel):
|
||||
"""A decision made during the meeting"""
|
||||
|
||||
decision: str = Field(description="What was decided")
|
||||
rationale: str | None = Field(
|
||||
default=None,
|
||||
description="Reasoning or key factors that influenced this decision",
|
||||
)
|
||||
decided_by: str | None = Field(
|
||||
default=None, description="Person or group who made the decision (name)"
|
||||
)
|
||||
decided_by_participant_id: str | None = Field(
|
||||
default=None, description="Participant ID if decided_by matches a participant"
|
||||
)
|
||||
|
||||
|
||||
class ActionItemsResponse(BaseModel):
|
||||
"""Pydantic model for identified action items"""
|
||||
|
||||
decisions: list[Decision] = Field(
|
||||
default_factory=list,
|
||||
description="List of decisions made during the meeting",
|
||||
)
|
||||
next_steps: list[ActionItem] = Field(
|
||||
default_factory=list,
|
||||
description="List of action items and next steps to be taken",
|
||||
)
|
||||
|
||||
|
||||
class SummaryBuilder:
|
||||
def __init__(self, llm: LLM, filename: str | None = None, logger=None) -> None:
|
||||
self.transcript: str | None = None
|
||||
@@ -331,17 +248,7 @@ class SummaryBuilder:
|
||||
participants_md = self.format_list_md(participants)
|
||||
self.transcript += f"\n\n# Participants\n\n{participants_md}"
|
||||
|
||||
participants_list = ", ".join(participants)
|
||||
self.participant_instructions = dedent(
|
||||
f"""
|
||||
# IMPORTANT: Participant Names
|
||||
The following participants are identified in this conversation: {participants_list}
|
||||
|
||||
You MUST use these specific participant names when referring to people in your response.
|
||||
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
|
||||
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
|
||||
"""
|
||||
).strip()
|
||||
self.participant_instructions = build_participant_instructions(participants)
|
||||
|
||||
async def identify_participants(self) -> None:
|
||||
"""
|
||||
@@ -377,18 +284,9 @@ class SummaryBuilder:
|
||||
participants_md = self.format_list_md(unique_participants)
|
||||
self.transcript += f"\n\n# Participants\n\n{participants_md}"
|
||||
|
||||
# Set instructions that will be automatically added to all prompts
|
||||
participants_list = ", ".join(unique_participants)
|
||||
self.participant_instructions = dedent(
|
||||
f"""
|
||||
# IMPORTANT: Participant Names
|
||||
The following participants are identified in this conversation: {participants_list}
|
||||
|
||||
You MUST use these specific participant names when referring to people in your response.
|
||||
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
|
||||
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
|
||||
"""
|
||||
).strip()
|
||||
self.participant_instructions = build_participant_instructions(
|
||||
unique_participants
|
||||
)
|
||||
else:
|
||||
self.logger.warning("No participants identified in the transcript")
|
||||
|
||||
@@ -613,22 +511,7 @@ class SummaryBuilder:
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
def as_markdown(self) -> str:
|
||||
lines: list[str] = []
|
||||
if self.recap:
|
||||
lines.append("# Quick recap")
|
||||
lines.append("")
|
||||
lines.append(self.recap)
|
||||
lines.append("")
|
||||
|
||||
if self.summaries:
|
||||
lines.append("# Summary")
|
||||
lines.append("")
|
||||
for summary in self.summaries:
|
||||
lines.append(f"**{summary['subject']}**")
|
||||
lines.append(summary["summary"])
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
return build_summary_markdown(self.recap, self.summaries)
|
||||
|
||||
def format_list_md(self, data: list[str]) -> str:
|
||||
return "\n".join([f"- {item}" for item in data])
|
||||
|
||||
@@ -1,35 +1,12 @@
|
||||
from textwrap import dedent
|
||||
|
||||
from pydantic import AliasChoices, BaseModel, Field
|
||||
|
||||
from reflector.llm import LLM
|
||||
from reflector.processors.base import Processor
|
||||
from reflector.processors.prompts import TOPIC_PROMPT
|
||||
from reflector.processors.types import TitleSummary, Transcript
|
||||
from reflector.settings import settings
|
||||
from reflector.utils.text import clean_title
|
||||
|
||||
TOPIC_PROMPT = dedent(
|
||||
"""
|
||||
Analyze the following transcript segment and extract the main topic being discussed.
|
||||
Focus on the substantive content and ignore small talk or administrative chatter.
|
||||
|
||||
Create a title that:
|
||||
- Captures the specific subject matter being discussed
|
||||
- Is descriptive and self-explanatory
|
||||
- Uses professional language
|
||||
- Is specific rather than generic
|
||||
|
||||
For the summary:
|
||||
- Summarize the key points in maximum two sentences
|
||||
- Focus on what was discussed, decided, or accomplished
|
||||
- Be concise but informative
|
||||
|
||||
<transcript>
|
||||
{text}
|
||||
</transcript>
|
||||
"""
|
||||
).strip()
|
||||
|
||||
|
||||
class TopicResponse(BaseModel):
|
||||
"""Structured response for topic detection"""
|
||||
|
||||
@@ -102,7 +102,8 @@ async def validate_transcript_for_processing(
|
||||
if transcript.locked:
|
||||
return ValidationLocked(detail="Recording is locked")
|
||||
|
||||
if transcript.status == "idle":
|
||||
# hatchet is idempotent anyways + if it wasn't dispatched successfully
|
||||
if transcript.status == "idle" and not settings.HATCHET_ENABLED:
|
||||
return ValidationNotReady(detail="Recording is not ready for processing")
|
||||
|
||||
# Check Celery tasks
|
||||
@@ -115,13 +116,11 @@ async def validate_transcript_for_processing(
|
||||
):
|
||||
return ValidationAlreadyScheduled(detail="already running")
|
||||
|
||||
# Check Hatchet workflows (if enabled)
|
||||
if settings.HATCHET_ENABLED and transcript.workflow_run_id:
|
||||
try:
|
||||
status = await HatchetClientManager.get_workflow_run_status(
|
||||
transcript.workflow_run_id
|
||||
)
|
||||
# If workflow is running or queued, don't allow new processing
|
||||
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
|
||||
return ValidationAlreadyScheduled(
|
||||
detail="Hatchet workflow already running"
|
||||
@@ -189,8 +188,8 @@ async def dispatch_transcript_processing(
|
||||
room_forces_hatchet = room.use_hatchet if room else False
|
||||
|
||||
# Start durable workflow if enabled (Hatchet)
|
||||
# or if room has use_hatchet=True
|
||||
use_hatchet = settings.HATCHET_ENABLED or room_forces_hatchet
|
||||
# and if room has use_hatchet=True
|
||||
use_hatchet = settings.HATCHET_ENABLED and room_forces_hatchet
|
||||
|
||||
if room_forces_hatchet:
|
||||
logger.info(
|
||||
|
||||
@@ -43,6 +43,8 @@ async def mixdown_tracks_pyav(
|
||||
target_sample_rate: int,
|
||||
offsets_seconds: list[float] | None = None,
|
||||
logger=None,
|
||||
progress_callback=None,
|
||||
expected_duration_sec: float | None = None,
|
||||
) -> None:
|
||||
"""Multi-track mixdown using PyAV filter graph (amix).
|
||||
|
||||
@@ -57,6 +59,10 @@ async def mixdown_tracks_pyav(
|
||||
If provided, must have same length as track_urls. Delays are relative
|
||||
to the minimum offset (earliest track has delay=0).
|
||||
logger: Optional logger instance
|
||||
progress_callback: Optional callback(progress_pct: float | None, audio_position: float)
|
||||
called on progress updates. progress_pct is 0-100 if duration known, None otherwise.
|
||||
audio_position is current position in seconds.
|
||||
expected_duration_sec: Optional fallback duration if container metadata unavailable.
|
||||
|
||||
Raises:
|
||||
ValueError: If offsets_seconds length doesn't match track_urls,
|
||||
@@ -171,6 +177,17 @@ async def mixdown_tracks_pyav(
|
||||
logger.error("Mixdown failed - no valid containers opened")
|
||||
raise ValueError("Mixdown failed: Could not open any track containers")
|
||||
|
||||
# Calculate total duration for progress reporting.
|
||||
# Try container metadata first, fall back to expected_duration_sec if provided.
|
||||
max_duration_sec = 0.0
|
||||
for c in containers:
|
||||
if c.duration is not None:
|
||||
dur_sec = c.duration / av.time_base
|
||||
max_duration_sec = max(max_duration_sec, dur_sec)
|
||||
if max_duration_sec == 0.0 and expected_duration_sec:
|
||||
max_duration_sec = expected_duration_sec
|
||||
current_max_time = 0.0
|
||||
|
||||
decoders = [c.decode(audio=0) for c in containers]
|
||||
active = [True] * len(decoders)
|
||||
resamplers = [
|
||||
@@ -192,6 +209,18 @@ async def mixdown_tracks_pyav(
|
||||
|
||||
if frame.sample_rate != target_sample_rate:
|
||||
continue
|
||||
|
||||
# Update progress based on frame timestamp
|
||||
if progress_callback and frame.time is not None:
|
||||
current_max_time = max(current_max_time, frame.time)
|
||||
if max_duration_sec > 0:
|
||||
progress_pct = min(
|
||||
100.0, (current_max_time / max_duration_sec) * 100
|
||||
)
|
||||
else:
|
||||
progress_pct = None # Duration unavailable
|
||||
progress_callback(progress_pct, current_max_time)
|
||||
|
||||
out_frames = resamplers[i].resample(frame) or []
|
||||
for rf in out_frames:
|
||||
rf.sample_rate = target_sample_rate
|
||||
|
||||
@@ -3,8 +3,12 @@ from pathlib import Path
|
||||
import av
|
||||
import numpy as np
|
||||
|
||||
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
|
||||
|
||||
def get_audio_waveform(path: Path | str, segments_count: int = 256) -> list[int]:
|
||||
|
||||
def get_audio_waveform(
|
||||
path: Path | str, segments_count: int = WAVEFORM_SEGMENTS
|
||||
) -> list[int]:
|
||||
if isinstance(path, Path):
|
||||
path = path.as_posix()
|
||||
|
||||
@@ -70,7 +74,7 @@ if __name__ == "__main__":
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("path", type=Path)
|
||||
parser.add_argument("--segments-count", type=int, default=256)
|
||||
parser.add_argument("--segments-count", type=int, default=WAVEFORM_SEGMENTS)
|
||||
args = parser.parse_args()
|
||||
|
||||
print(get_audio_waveform(args.path, args.segments_count))
|
||||
|
||||
8
server/reflector/utils/transcript_constants.py
Normal file
8
server/reflector/utils/transcript_constants.py
Normal file
@@ -0,0 +1,8 @@
|
||||
"""
|
||||
Shared transcript processing constants.
|
||||
|
||||
Used by both Hatchet workflows and Celery pipelines for consistent processing.
|
||||
"""
|
||||
|
||||
# Topic detection: number of words per chunk for topic extraction
|
||||
TOPIC_CHUNK_WORD_COUNT = 300
|
||||
216
server/reflector/utils/webhook.py
Normal file
216
server/reflector/utils/webhook.py
Normal file
@@ -0,0 +1,216 @@
|
||||
"""Webhook utilities.
|
||||
|
||||
Shared webhook functionality for both Hatchet and Celery pipelines.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import hmac
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
from typing import Union
|
||||
|
||||
import httpx
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.logger import logger
|
||||
from reflector.settings import settings
|
||||
from reflector.utils.string import NonEmptyString
|
||||
from reflector.utils.webhook_outgoing_models import (
|
||||
WebhookCalendarEventPayload,
|
||||
WebhookParticipantPayload,
|
||||
WebhookPayload,
|
||||
WebhookRoomPayload,
|
||||
WebhookTestPayload,
|
||||
WebhookTopicPayload,
|
||||
WebhookTranscriptPayload,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"fetch_transcript_webhook_payload",
|
||||
"fetch_test_webhook_payload",
|
||||
"build_webhook_headers",
|
||||
"generate_webhook_signature",
|
||||
"send_webhook_request",
|
||||
]
|
||||
|
||||
from reflector.db.calendar_events import calendar_events_controller
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.transcripts import transcripts_controller
|
||||
from reflector.utils.webvtt import topics_to_webvtt
|
||||
|
||||
|
||||
def _serialize_payload(payload: BaseModel) -> bytes:
|
||||
"""Serialize Pydantic model to compact JSON bytes."""
|
||||
return payload.model_dump_json(by_alias=True, exclude_none=False).encode("utf-8")
|
||||
|
||||
|
||||
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
|
||||
"""Generate HMAC-SHA256 signature for webhook payload."""
|
||||
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
|
||||
hmac_obj = hmac.new(
|
||||
secret.encode("utf-8"),
|
||||
signed_payload.encode("utf-8"),
|
||||
hashlib.sha256,
|
||||
)
|
||||
return hmac_obj.hexdigest()
|
||||
|
||||
|
||||
def build_webhook_headers(
|
||||
event_type: str,
|
||||
payload_bytes: bytes,
|
||||
webhook_secret: str | None = None,
|
||||
retry_count: int = 0,
|
||||
) -> dict[str, str]:
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"User-Agent": "Reflector-Webhook/1.0",
|
||||
"X-Webhook-Event": event_type,
|
||||
"X-Webhook-Retry": str(retry_count),
|
||||
}
|
||||
|
||||
if webhook_secret:
|
||||
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
|
||||
signature = generate_webhook_signature(payload_bytes, webhook_secret, timestamp)
|
||||
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
|
||||
|
||||
return headers
|
||||
|
||||
|
||||
async def send_webhook_request(
|
||||
url: str,
|
||||
payload: BaseModel,
|
||||
event_type: str,
|
||||
webhook_secret: str | None = None,
|
||||
retry_count: int = 0,
|
||||
timeout: float = 30.0,
|
||||
) -> httpx.Response:
|
||||
"""Send webhook request with proper headers and signature.
|
||||
|
||||
Raises:
|
||||
httpx.HTTPStatusError: On non-2xx response
|
||||
httpx.ConnectError: On connection failure
|
||||
httpx.TimeoutException: On timeout
|
||||
"""
|
||||
payload_bytes = _serialize_payload(payload)
|
||||
|
||||
headers = build_webhook_headers(
|
||||
event_type=event_type,
|
||||
payload_bytes=payload_bytes,
|
||||
webhook_secret=webhook_secret,
|
||||
retry_count=retry_count,
|
||||
)
|
||||
|
||||
async with httpx.AsyncClient(timeout=timeout) as client:
|
||||
response = await client.post(url, content=payload_bytes, headers=headers)
|
||||
response.raise_for_status()
|
||||
return response
|
||||
|
||||
|
||||
async def fetch_transcript_webhook_payload(
|
||||
transcript_id: NonEmptyString,
|
||||
room_id: NonEmptyString,
|
||||
) -> Union[WebhookPayload, str]:
|
||||
"""Build webhook payload by fetching transcript and room data from database."""
|
||||
|
||||
transcript = await transcripts_controller.get_by_id(transcript_id)
|
||||
if not transcript:
|
||||
return f"Transcript {transcript_id} not found"
|
||||
|
||||
room = await rooms_controller.get_by_id(room_id)
|
||||
if not room:
|
||||
return f"Room {room_id} not found"
|
||||
|
||||
topics_data = [
|
||||
WebhookTopicPayload(
|
||||
title=topic.title,
|
||||
summary=topic.summary,
|
||||
timestamp=topic.timestamp,
|
||||
duration=topic.duration,
|
||||
webvtt=topics_to_webvtt([topic]) if topic.words else "",
|
||||
)
|
||||
for topic in (transcript.topics or [])
|
||||
]
|
||||
|
||||
participants_data = [
|
||||
WebhookParticipantPayload(id=p.id, name=p.name, speaker=p.speaker)
|
||||
for p in (transcript.participants or [])
|
||||
]
|
||||
|
||||
calendar_event_data: WebhookCalendarEventPayload | None = None
|
||||
try:
|
||||
if transcript.meeting_id:
|
||||
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
|
||||
if meeting and meeting.calendar_event_id:
|
||||
calendar_event = await calendar_events_controller.get_by_id(
|
||||
meeting.calendar_event_id
|
||||
)
|
||||
if calendar_event:
|
||||
calendar_event_data = WebhookCalendarEventPayload(
|
||||
id=calendar_event.id,
|
||||
ics_uid=calendar_event.ics_uid,
|
||||
title=calendar_event.title,
|
||||
start_time=calendar_event.start_time,
|
||||
end_time=calendar_event.end_time,
|
||||
description=calendar_event.description or None,
|
||||
location=calendar_event.location or None,
|
||||
attendees=calendar_event.attendees or None,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to fetch calendar event for webhook",
|
||||
transcript_id=transcript_id,
|
||||
meeting_id=transcript.meeting_id,
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
|
||||
|
||||
return WebhookPayload(
|
||||
event="transcript.completed",
|
||||
event_id=uuid.uuid4().hex,
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
transcript=WebhookTranscriptPayload(
|
||||
id=transcript.id,
|
||||
room_id=transcript.room_id,
|
||||
created_at=transcript.created_at,
|
||||
duration=transcript.duration,
|
||||
title=transcript.title,
|
||||
short_summary=transcript.short_summary,
|
||||
long_summary=transcript.long_summary,
|
||||
webvtt=transcript.webvtt,
|
||||
topics=topics_data,
|
||||
participants=participants_data,
|
||||
source_language=transcript.source_language,
|
||||
target_language=transcript.target_language,
|
||||
status=transcript.status,
|
||||
frontend_url=frontend_url,
|
||||
action_items=transcript.action_items,
|
||||
),
|
||||
room=WebhookRoomPayload(
|
||||
id=room.id,
|
||||
name=room.name,
|
||||
),
|
||||
calendar_event=calendar_event_data,
|
||||
)
|
||||
|
||||
|
||||
async def fetch_test_webhook_payload(
|
||||
room_id: NonEmptyString,
|
||||
) -> WebhookTestPayload | None:
|
||||
"""Build test webhook payload."""
|
||||
|
||||
room = await rooms_controller.get_by_id(room_id)
|
||||
if not room:
|
||||
return None
|
||||
|
||||
return WebhookTestPayload(
|
||||
event="test",
|
||||
event_id=uuid.uuid4().hex,
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
message="This is a test webhook from Reflector",
|
||||
room=WebhookRoomPayload(
|
||||
id=room.id,
|
||||
name=room.name,
|
||||
),
|
||||
)
|
||||
80
server/reflector/utils/webhook_outgoing_models.py
Normal file
80
server/reflector/utils/webhook_outgoing_models.py
Normal file
@@ -0,0 +1,80 @@
|
||||
"""Pydantic models for outgoing webhook payloads.
|
||||
|
||||
These models define the structure of webhook payloads sent by Reflector
|
||||
to external services when transcript processing completes.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Literal
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from reflector.utils.string import NonEmptyString
|
||||
|
||||
WebhookTranscriptEventType = Literal["transcript.completed"]
|
||||
WebhookTestEventType = Literal["test"]
|
||||
|
||||
|
||||
class WebhookTopicPayload(BaseModel):
|
||||
title: NonEmptyString
|
||||
summary: NonEmptyString
|
||||
timestamp: float
|
||||
duration: float | None
|
||||
webvtt: str # can be empty when no words
|
||||
|
||||
|
||||
class WebhookParticipantPayload(BaseModel):
|
||||
id: NonEmptyString
|
||||
name: str | None
|
||||
speaker: int | None
|
||||
|
||||
|
||||
class WebhookRoomPayload(BaseModel):
|
||||
id: NonEmptyString
|
||||
name: NonEmptyString
|
||||
|
||||
|
||||
class WebhookCalendarEventPayload(BaseModel):
|
||||
id: NonEmptyString
|
||||
ics_uid: str | None = None
|
||||
title: str | None = None
|
||||
start_time: datetime | None = None
|
||||
end_time: datetime | None = None
|
||||
description: str | None = None
|
||||
location: str | None = None
|
||||
attendees: list[str] | None = None
|
||||
|
||||
|
||||
class WebhookTranscriptPayload(BaseModel):
|
||||
id: NonEmptyString
|
||||
room_id: NonEmptyString | None
|
||||
created_at: datetime
|
||||
duration: float | None
|
||||
title: str | None
|
||||
short_summary: str | None
|
||||
long_summary: str | None
|
||||
webvtt: str | None
|
||||
topics: list[WebhookTopicPayload]
|
||||
participants: list[WebhookParticipantPayload]
|
||||
source_language: NonEmptyString
|
||||
target_language: NonEmptyString
|
||||
status: NonEmptyString
|
||||
frontend_url: NonEmptyString
|
||||
action_items: dict | None
|
||||
|
||||
|
||||
class WebhookPayload(BaseModel):
|
||||
event: WebhookTranscriptEventType
|
||||
event_id: NonEmptyString
|
||||
timestamp: datetime
|
||||
transcript: WebhookTranscriptPayload
|
||||
room: WebhookRoomPayload
|
||||
calendar_event: WebhookCalendarEventPayload | None = None
|
||||
|
||||
|
||||
class WebhookTestPayload(BaseModel):
|
||||
event: WebhookTestEventType
|
||||
event_id: NonEmptyString
|
||||
timestamp: datetime
|
||||
message: NonEmptyString
|
||||
room: WebhookRoomPayload
|
||||
@@ -287,9 +287,7 @@ async def _process_multitrack_recording_inner(
|
||||
room_id=room.id,
|
||||
)
|
||||
|
||||
# Start durable workflow if enabled (Hatchet) or room overrides it
|
||||
durable_started = False
|
||||
use_hatchet = settings.HATCHET_ENABLED or (room and room.use_hatchet)
|
||||
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
|
||||
|
||||
if room and room.use_hatchet and not settings.HATCHET_ENABLED:
|
||||
logger.info(
|
||||
@@ -300,7 +298,7 @@ async def _process_multitrack_recording_inner(
|
||||
|
||||
if use_hatchet:
|
||||
workflow_id = await HatchetClientManager.start_workflow(
|
||||
workflow_name="DiarizationPipeline",
|
||||
workflow_name="DailyMultitrackPipeline",
|
||||
input_data={
|
||||
"recording_id": recording_id,
|
||||
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
|
||||
@@ -836,7 +834,7 @@ async def reprocess_failed_daily_recordings():
|
||||
)
|
||||
continue
|
||||
|
||||
use_hatchet = settings.HATCHET_ENABLED or (room and room.use_hatchet)
|
||||
use_hatchet = settings.HATCHET_ENABLED and room and room.use_hatchet
|
||||
|
||||
if use_hatchet:
|
||||
# Hatchet requires a transcript for workflow_run_id tracking
|
||||
@@ -848,7 +846,7 @@ async def reprocess_failed_daily_recordings():
|
||||
continue
|
||||
|
||||
workflow_id = await HatchetClientManager.start_workflow(
|
||||
workflow_name="DiarizationPipeline",
|
||||
workflow_name="DailyMultitrackPipeline",
|
||||
input_data={
|
||||
"recording_id": recording.id,
|
||||
"tracks": [
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
"""Webhook task for sending transcript notifications."""
|
||||
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import uuid
|
||||
from datetime import datetime, timezone
|
||||
|
||||
@@ -11,28 +8,20 @@ import structlog
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
from reflector.db.calendar_events import calendar_events_controller
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.transcripts import transcripts_controller
|
||||
from reflector.pipelines.main_live_pipeline import asynctask
|
||||
from reflector.settings import settings
|
||||
from reflector.utils.webvtt import topics_to_webvtt
|
||||
from reflector.utils.webhook import (
|
||||
WebhookRoomPayload,
|
||||
WebhookTestPayload,
|
||||
_serialize_payload,
|
||||
build_webhook_headers,
|
||||
fetch_transcript_webhook_payload,
|
||||
send_webhook_request,
|
||||
)
|
||||
|
||||
logger = structlog.wrap_logger(get_task_logger(__name__))
|
||||
|
||||
|
||||
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
|
||||
"""Generate HMAC signature for webhook payload."""
|
||||
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
|
||||
hmac_obj = hmac.new(
|
||||
secret.encode("utf-8"),
|
||||
signed_payload.encode("utf-8"),
|
||||
hashlib.sha256,
|
||||
)
|
||||
return hmac_obj.hexdigest()
|
||||
|
||||
|
||||
@shared_task(
|
||||
bind=True,
|
||||
max_retries=30,
|
||||
@@ -54,12 +43,6 @@ async def send_transcript_webhook(
|
||||
)
|
||||
|
||||
try:
|
||||
# Fetch transcript and room
|
||||
transcript = await transcripts_controller.get_by_id(transcript_id)
|
||||
if not transcript:
|
||||
log.error("Transcript not found, skipping webhook")
|
||||
return
|
||||
|
||||
room = await rooms_controller.get_by_id(room_id)
|
||||
if not room:
|
||||
log.error("Room not found, skipping webhook")
|
||||
@@ -69,135 +52,36 @@ async def send_transcript_webhook(
|
||||
log.info("No webhook URL configured for room, skipping")
|
||||
return
|
||||
|
||||
# Generate WebVTT content from topics
|
||||
topics_data = []
|
||||
payload = await fetch_transcript_webhook_payload(
|
||||
transcript_id=transcript_id,
|
||||
room_id=room_id,
|
||||
)
|
||||
|
||||
if transcript.topics:
|
||||
# Build topics data with diarized content per topic
|
||||
for topic in transcript.topics:
|
||||
topic_webvtt = topics_to_webvtt([topic]) if topic.words else ""
|
||||
topics_data.append(
|
||||
{
|
||||
"title": topic.title,
|
||||
"summary": topic.summary,
|
||||
"timestamp": topic.timestamp,
|
||||
"duration": topic.duration,
|
||||
"webvtt": topic_webvtt,
|
||||
}
|
||||
)
|
||||
if isinstance(payload, str):
|
||||
log.error(f"Could not build webhook payload, skipping: {payload}")
|
||||
return
|
||||
|
||||
# Fetch meeting and calendar event if they exist
|
||||
calendar_event = None
|
||||
try:
|
||||
if transcript.meeting_id:
|
||||
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
|
||||
if meeting and meeting.calendar_event_id:
|
||||
calendar_event = await calendar_events_controller.get_by_id(
|
||||
meeting.calendar_event_id
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error("Error fetching meeting or calendar event", error=str(e))
|
||||
log.info(
|
||||
"Sending webhook",
|
||||
url=room.webhook_url,
|
||||
topics=len(payload.transcript.topics),
|
||||
participants=len(payload.transcript.participants),
|
||||
)
|
||||
|
||||
# Build webhook payload
|
||||
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
|
||||
participants = [
|
||||
{"id": p.id, "name": p.name, "speaker": p.speaker}
|
||||
for p in (transcript.participants or [])
|
||||
]
|
||||
payload_data = {
|
||||
"event": "transcript.completed",
|
||||
"event_id": event_id,
|
||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
||||
"transcript": {
|
||||
"id": transcript.id,
|
||||
"room_id": transcript.room_id,
|
||||
"created_at": transcript.created_at.isoformat(),
|
||||
"duration": transcript.duration,
|
||||
"title": transcript.title,
|
||||
"short_summary": transcript.short_summary,
|
||||
"long_summary": transcript.long_summary,
|
||||
"webvtt": transcript.webvtt,
|
||||
"topics": topics_data,
|
||||
"participants": participants,
|
||||
"source_language": transcript.source_language,
|
||||
"target_language": transcript.target_language,
|
||||
"status": transcript.status,
|
||||
"frontend_url": frontend_url,
|
||||
"action_items": transcript.action_items,
|
||||
},
|
||||
"room": {
|
||||
"id": room.id,
|
||||
"name": room.name,
|
||||
},
|
||||
}
|
||||
response = await send_webhook_request(
|
||||
url=room.webhook_url,
|
||||
payload=payload,
|
||||
event_type="transcript.completed",
|
||||
webhook_secret=room.webhook_secret,
|
||||
retry_count=self.request.retries,
|
||||
timeout=30.0,
|
||||
)
|
||||
|
||||
# Always include calendar_event field, even if no event is present
|
||||
payload_data["calendar_event"] = {}
|
||||
|
||||
# Add calendar event data if present
|
||||
if calendar_event:
|
||||
calendar_data = {
|
||||
"id": calendar_event.id,
|
||||
"ics_uid": calendar_event.ics_uid,
|
||||
"title": calendar_event.title,
|
||||
"start_time": calendar_event.start_time.isoformat()
|
||||
if calendar_event.start_time
|
||||
else None,
|
||||
"end_time": calendar_event.end_time.isoformat()
|
||||
if calendar_event.end_time
|
||||
else None,
|
||||
}
|
||||
|
||||
# Add optional fields only if they exist
|
||||
if calendar_event.description:
|
||||
calendar_data["description"] = calendar_event.description
|
||||
if calendar_event.location:
|
||||
calendar_data["location"] = calendar_event.location
|
||||
if calendar_event.attendees:
|
||||
calendar_data["attendees"] = calendar_event.attendees
|
||||
|
||||
payload_data["calendar_event"] = calendar_data
|
||||
|
||||
# Convert to JSON
|
||||
payload_json = json.dumps(payload_data, separators=(",", ":"))
|
||||
payload_bytes = payload_json.encode("utf-8")
|
||||
|
||||
# Generate signature if secret is configured
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"User-Agent": "Reflector-Webhook/1.0",
|
||||
"X-Webhook-Event": "transcript.completed",
|
||||
"X-Webhook-Retry": str(self.request.retries),
|
||||
}
|
||||
|
||||
if room.webhook_secret:
|
||||
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
|
||||
signature = generate_webhook_signature(
|
||||
payload_bytes, room.webhook_secret, timestamp
|
||||
)
|
||||
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
|
||||
|
||||
# Send webhook with timeout
|
||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
||||
log.info(
|
||||
"Sending webhook",
|
||||
url=room.webhook_url,
|
||||
payload_size=len(payload_bytes),
|
||||
)
|
||||
|
||||
response = await client.post(
|
||||
room.webhook_url,
|
||||
content=payload_bytes,
|
||||
headers=headers,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
|
||||
log.info(
|
||||
"Webhook sent successfully",
|
||||
status_code=response.status_code,
|
||||
response_size=len(response.content),
|
||||
)
|
||||
log.info(
|
||||
"Webhook sent successfully",
|
||||
status_code=response.status_code,
|
||||
response_size=len(response.content),
|
||||
)
|
||||
|
||||
except httpx.HTTPStatusError as e:
|
||||
log.error(
|
||||
@@ -226,8 +110,8 @@ async def send_transcript_webhook(
|
||||
|
||||
|
||||
async def test_webhook(room_id: str) -> dict:
|
||||
"""
|
||||
Test webhook configuration by sending a sample payload.
|
||||
"""Test webhook configuration by sending a sample payload.
|
||||
|
||||
Returns immediately with success/failure status.
|
||||
This is the shared implementation used by both the API endpoint and Celery task.
|
||||
"""
|
||||
@@ -239,34 +123,24 @@ async def test_webhook(room_id: str) -> dict:
|
||||
if not room.webhook_url:
|
||||
return {"success": False, "error": "No webhook URL configured"}
|
||||
|
||||
now = (datetime.now(timezone.utc).isoformat(),)
|
||||
payload_data = {
|
||||
"event": "test",
|
||||
"event_id": uuid.uuid4().hex,
|
||||
"timestamp": now,
|
||||
"message": "This is a test webhook from Reflector",
|
||||
"room": {
|
||||
"id": room.id,
|
||||
"name": room.name,
|
||||
},
|
||||
}
|
||||
payload = WebhookTestPayload(
|
||||
event="test",
|
||||
event_id=uuid.uuid4().hex,
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
message="This is a test webhook from Reflector",
|
||||
room=WebhookRoomPayload(
|
||||
id=room.id,
|
||||
name=room.name,
|
||||
),
|
||||
)
|
||||
|
||||
payload_json = json.dumps(payload_data, separators=(",", ":"))
|
||||
payload_bytes = payload_json.encode("utf-8")
|
||||
payload_bytes = _serialize_payload(payload)
|
||||
|
||||
# Generate headers with signature
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"User-Agent": "Reflector-Webhook/1.0",
|
||||
"X-Webhook-Event": "test",
|
||||
}
|
||||
|
||||
if room.webhook_secret:
|
||||
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
|
||||
signature = generate_webhook_signature(
|
||||
payload_bytes, room.webhook_secret, timestamp
|
||||
)
|
||||
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
|
||||
headers = build_webhook_headers(
|
||||
event_type="test",
|
||||
payload_bytes=payload_bytes,
|
||||
webhook_secret=room.webhook_secret,
|
||||
)
|
||||
|
||||
# Send test webhook with short timeout
|
||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||
|
||||
Reference in New Issue
Block a user