mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-24 06:09:07 +00:00
Compare commits
10 Commits
v0.25.0
...
fix/pipeli
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b72403fda7 | ||
|
|
abaf819c3e | ||
|
|
f8c4f542c1 | ||
|
|
e369ed66ca | ||
|
|
3cf9757ac2 | ||
|
|
d9d3938192 | ||
|
|
10c17d7086 | ||
|
|
7a1d1dc08d | ||
| 8598707c1c | |||
|
|
594bcc09e0 |
@@ -1,5 +1,12 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
|
|
||||||
|
## [0.26.0](https://github.com/Monadical-SAS/reflector/compare/v0.25.0...v0.26.0) (2025-12-23)
|
||||||
|
|
||||||
|
|
||||||
|
### Features
|
||||||
|
|
||||||
|
* parallelize hatchet ([#804](https://github.com/Monadical-SAS/reflector/issues/804)) ([594bcc0](https://github.com/Monadical-SAS/reflector/commit/594bcc09e0ca744163de2f1525ebbf7c52a68448))
|
||||||
|
|
||||||
## [0.25.0](https://github.com/Monadical-SAS/reflector/compare/v0.24.0...v0.25.0) (2025-12-22)
|
## [0.25.0](https://github.com/Monadical-SAS/reflector/compare/v0.24.0...v0.25.0) (2025-12-22)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
16
server/reflector/hatchet/constants.py
Normal file
16
server/reflector/hatchet/constants.py
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
"""
|
||||||
|
Hatchet workflow constants.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Rate limit key for LLM API calls (shared across all LLM-calling tasks)
|
||||||
|
LLM_RATE_LIMIT_KEY = "llm"
|
||||||
|
|
||||||
|
# Max LLM calls per second across all tasks
|
||||||
|
LLM_RATE_LIMIT_PER_SECOND = 10
|
||||||
|
|
||||||
|
# Task execution timeouts (seconds)
|
||||||
|
TIMEOUT_SHORT = 60 # Quick operations: API calls, DB updates
|
||||||
|
TIMEOUT_MEDIUM = 120 # Single LLM calls, waveform generation
|
||||||
|
TIMEOUT_LONG = 180 # Action items (larger context LLM)
|
||||||
|
TIMEOUT_AUDIO = 300 # Audio processing: padding, mixdown
|
||||||
|
TIMEOUT_HEAVY = 600 # Transcription, fan-out LLM tasks
|
||||||
@@ -1,5 +1,5 @@
|
|||||||
"""
|
"""
|
||||||
Run Hatchet workers for the diarization pipeline.
|
Run Hatchet workers for the multitrack pipeline.
|
||||||
Runs as a separate process, just like Celery workers.
|
Runs as a separate process, just like Celery workers.
|
||||||
|
|
||||||
Usage:
|
Usage:
|
||||||
@@ -12,6 +12,9 @@ Usage:
|
|||||||
import signal
|
import signal
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
from hatchet_sdk.rate_limit import RateLimitDuration
|
||||||
|
|
||||||
|
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
|
|
||||||
@@ -36,15 +39,26 @@ def main() -> None:
|
|||||||
# Can't use lazy init: decorators need the client object when function is defined.
|
# Can't use lazy init: decorators need the client object when function is defined.
|
||||||
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
|
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
|
||||||
from reflector.hatchet.workflows import ( # noqa: PLC0415
|
from reflector.hatchet.workflows import ( # noqa: PLC0415
|
||||||
diarization_pipeline,
|
daily_multitrack_pipeline,
|
||||||
|
subject_workflow,
|
||||||
|
topic_chunk_workflow,
|
||||||
track_workflow,
|
track_workflow,
|
||||||
)
|
)
|
||||||
|
|
||||||
hatchet = HatchetClientManager.get_client()
|
hatchet = HatchetClientManager.get_client()
|
||||||
|
|
||||||
|
hatchet.rate_limits.put(
|
||||||
|
LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND, RateLimitDuration.SECOND
|
||||||
|
)
|
||||||
|
|
||||||
worker = hatchet.worker(
|
worker = hatchet.worker(
|
||||||
"reflector-diarization-worker",
|
"reflector-pipeline-worker",
|
||||||
workflows=[diarization_pipeline, track_workflow],
|
workflows=[
|
||||||
|
daily_multitrack_pipeline,
|
||||||
|
subject_workflow,
|
||||||
|
topic_chunk_workflow,
|
||||||
|
track_workflow,
|
||||||
|
],
|
||||||
)
|
)
|
||||||
|
|
||||||
def shutdown_handler(signum: int, frame) -> None:
|
def shutdown_handler(signum: int, frame) -> None:
|
||||||
|
|||||||
@@ -1,14 +1,26 @@
|
|||||||
"""Hatchet workflow definitions."""
|
"""Hatchet workflow definitions."""
|
||||||
|
|
||||||
from reflector.hatchet.workflows.diarization_pipeline import (
|
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||||
PipelineInput,
|
PipelineInput,
|
||||||
diarization_pipeline,
|
daily_multitrack_pipeline,
|
||||||
|
)
|
||||||
|
from reflector.hatchet.workflows.subject_processing import (
|
||||||
|
SubjectInput,
|
||||||
|
subject_workflow,
|
||||||
|
)
|
||||||
|
from reflector.hatchet.workflows.topic_chunk_processing import (
|
||||||
|
TopicChunkInput,
|
||||||
|
topic_chunk_workflow,
|
||||||
)
|
)
|
||||||
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"diarization_pipeline",
|
"daily_multitrack_pipeline",
|
||||||
|
"subject_workflow",
|
||||||
|
"topic_chunk_workflow",
|
||||||
"track_workflow",
|
"track_workflow",
|
||||||
"PipelineInput",
|
"PipelineInput",
|
||||||
|
"SubjectInput",
|
||||||
|
"TopicChunkInput",
|
||||||
"TrackInput",
|
"TrackInput",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -1,9 +1,12 @@
|
|||||||
"""
|
"""
|
||||||
Hatchet main workflow: DiarizationPipeline
|
Hatchet main workflow: DailyMultitrackPipeline
|
||||||
|
|
||||||
Multitrack diarization pipeline for Daily.co recordings.
|
Multitrack processing pipeline for Daily.co recordings.
|
||||||
Orchestrates the full processing flow from recording metadata to final transcript.
|
Orchestrates the full processing flow from recording metadata to final transcript.
|
||||||
|
|
||||||
|
Daily.co recordings don't require ML diarization - speaker identification comes from
|
||||||
|
track index (each participant's audio is a separate track).
|
||||||
|
|
||||||
Note: This file uses deferred imports (inside functions/tasks) intentionally.
|
Note: This file uses deferred imports (inside functions/tasks) intentionally.
|
||||||
Hatchet workers run in forked processes; fresh imports per task ensure DB connections
|
Hatchet workers run in forked processes; fresh imports per task ensure DB connections
|
||||||
are not shared across forks, avoiding connection pooling issues.
|
are not shared across forks, avoiding connection pooling issues.
|
||||||
@@ -28,33 +31,50 @@ from reflector.hatchet.broadcast import (
|
|||||||
set_status_and_broadcast,
|
set_status_and_broadcast,
|
||||||
)
|
)
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
from reflector.hatchet.client import HatchetClientManager
|
||||||
|
from reflector.hatchet.constants import (
|
||||||
|
TIMEOUT_AUDIO,
|
||||||
|
TIMEOUT_HEAVY,
|
||||||
|
TIMEOUT_LONG,
|
||||||
|
TIMEOUT_MEDIUM,
|
||||||
|
TIMEOUT_SHORT,
|
||||||
|
)
|
||||||
from reflector.hatchet.workflows.models import (
|
from reflector.hatchet.workflows.models import (
|
||||||
|
ActionItemsResult,
|
||||||
ConsentResult,
|
ConsentResult,
|
||||||
FinalizeResult,
|
FinalizeResult,
|
||||||
MixdownResult,
|
MixdownResult,
|
||||||
PaddedTrackInfo,
|
PaddedTrackInfo,
|
||||||
|
PadTrackResult,
|
||||||
|
ParticipantInfo,
|
||||||
ParticipantsResult,
|
ParticipantsResult,
|
||||||
|
ProcessSubjectsResult,
|
||||||
ProcessTracksResult,
|
ProcessTracksResult,
|
||||||
|
RecapResult,
|
||||||
RecordingResult,
|
RecordingResult,
|
||||||
SummaryResult,
|
SubjectsResult,
|
||||||
|
SubjectSummaryResult,
|
||||||
TitleResult,
|
TitleResult,
|
||||||
|
TopicChunkResult,
|
||||||
TopicsResult,
|
TopicsResult,
|
||||||
|
TranscribeTrackResult,
|
||||||
WaveformResult,
|
WaveformResult,
|
||||||
WebhookResult,
|
WebhookResult,
|
||||||
ZulipResult,
|
ZulipResult,
|
||||||
)
|
)
|
||||||
|
from reflector.hatchet.workflows.subject_processing import (
|
||||||
|
SubjectInput,
|
||||||
|
subject_workflow,
|
||||||
|
)
|
||||||
|
from reflector.hatchet.workflows.topic_chunk_processing import (
|
||||||
|
TopicChunkInput,
|
||||||
|
topic_chunk_workflow,
|
||||||
|
)
|
||||||
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.pipelines import topic_processing
|
from reflector.pipelines import topic_processing
|
||||||
from reflector.processors import AudioFileWriterProcessor
|
from reflector.processors import AudioFileWriterProcessor
|
||||||
from reflector.processors.types import (
|
from reflector.processors.types import TitleSummary, Word
|
||||||
TitleSummary,
|
from reflector.processors.types import Transcript as TranscriptType
|
||||||
TitleSummaryWithId,
|
|
||||||
Word,
|
|
||||||
)
|
|
||||||
from reflector.processors.types import (
|
|
||||||
Transcript as TranscriptType,
|
|
||||||
)
|
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
from reflector.storage.storage_aws import AwsStorage
|
from reflector.storage.storage_aws import AwsStorage
|
||||||
from reflector.utils.audio_constants import (
|
from reflector.utils.audio_constants import (
|
||||||
@@ -71,11 +91,12 @@ from reflector.utils.daily import (
|
|||||||
parse_daily_recording_filename,
|
parse_daily_recording_filename,
|
||||||
)
|
)
|
||||||
from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty
|
from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty
|
||||||
|
from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT
|
||||||
from reflector.zulip import post_transcript_notification
|
from reflector.zulip import post_transcript_notification
|
||||||
|
|
||||||
|
|
||||||
class PipelineInput(BaseModel):
|
class PipelineInput(BaseModel):
|
||||||
"""Input to trigger the diarization pipeline."""
|
"""Input to trigger the Daily.co multitrack pipeline."""
|
||||||
|
|
||||||
recording_id: NonEmptyString
|
recording_id: NonEmptyString
|
||||||
tracks: list[dict] # List of {"s3_key": str}
|
tracks: list[dict] # List of {"s3_key": str}
|
||||||
@@ -86,7 +107,7 @@ class PipelineInput(BaseModel):
|
|||||||
|
|
||||||
hatchet = HatchetClientManager.get_client()
|
hatchet = HatchetClientManager.get_client()
|
||||||
|
|
||||||
diarization_pipeline = hatchet.workflow(
|
daily_multitrack_pipeline = hatchet.workflow(
|
||||||
name="DiarizationPipeline", input_validator=PipelineInput
|
name="DiarizationPipeline", input_validator=PipelineInput
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -173,7 +194,9 @@ def with_error_handling(step_name: str, set_error_status: bool = True) -> Callab
|
|||||||
return decorator
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(execution_timeout=timedelta(seconds=60), retries=3)
|
@daily_multitrack_pipeline.task(
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
|
||||||
|
)
|
||||||
@with_error_handling("get_recording")
|
@with_error_handling("get_recording")
|
||||||
async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
|
async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
|
||||||
"""Fetch recording metadata from Daily.co API."""
|
"""Fetch recording metadata from Daily.co API."""
|
||||||
@@ -224,8 +247,10 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[get_recording], execution_timeout=timedelta(seconds=60), retries=3
|
parents=[get_recording],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
|
||||||
|
retries=3,
|
||||||
)
|
)
|
||||||
@with_error_handling("get_participants")
|
@with_error_handling("get_participants")
|
||||||
async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult:
|
async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult:
|
||||||
@@ -274,7 +299,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
|
|||||||
track_keys = [t["s3_key"] for t in input.tracks]
|
track_keys = [t["s3_key"] for t in input.tracks]
|
||||||
cam_audio_keys = filter_cam_audio_tracks(track_keys)
|
cam_audio_keys = filter_cam_audio_tracks(track_keys)
|
||||||
|
|
||||||
participants_list = []
|
participants_list: list[ParticipantInfo] = []
|
||||||
for idx, key in enumerate(cam_audio_keys):
|
for idx, key in enumerate(cam_audio_keys):
|
||||||
try:
|
try:
|
||||||
parsed = parse_daily_recording_filename(key)
|
parsed = parse_daily_recording_filename(key)
|
||||||
@@ -296,11 +321,11 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
|
|||||||
)
|
)
|
||||||
await transcripts_controller.upsert_participant(transcript, participant)
|
await transcripts_controller.upsert_participant(transcript, participant)
|
||||||
participants_list.append(
|
participants_list.append(
|
||||||
{
|
ParticipantInfo(
|
||||||
"participant_id": participant_id,
|
participant_id=participant_id,
|
||||||
"user_name": name,
|
user_name=name,
|
||||||
"speaker": idx,
|
speaker=idx,
|
||||||
}
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx.log(f"get_participants complete: {len(participants_list)} participants")
|
ctx.log(f"get_participants complete: {len(participants_list)} participants")
|
||||||
@@ -313,8 +338,10 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[get_participants], execution_timeout=timedelta(seconds=600), retries=3
|
parents=[get_participants],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||||
|
retries=3,
|
||||||
)
|
)
|
||||||
@with_error_handling("process_tracks")
|
@with_error_handling("process_tracks")
|
||||||
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
|
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
|
||||||
@@ -324,9 +351,9 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
|
|||||||
participants_result = ctx.task_output(get_participants)
|
participants_result = ctx.task_output(get_participants)
|
||||||
source_language = participants_result.source_language
|
source_language = participants_result.source_language
|
||||||
|
|
||||||
child_coroutines = [
|
bulk_runs = [
|
||||||
track_workflow.aio_run(
|
track_workflow.create_bulk_run_item(
|
||||||
TrackInput(
|
input=TrackInput(
|
||||||
track_index=i,
|
track_index=i,
|
||||||
s3_key=track["s3_key"],
|
s3_key=track["s3_key"],
|
||||||
bucket_name=input.bucket_name,
|
bucket_name=input.bucket_name,
|
||||||
@@ -337,35 +364,34 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
|
|||||||
for i, track in enumerate(input.tracks)
|
for i, track in enumerate(input.tracks)
|
||||||
]
|
]
|
||||||
|
|
||||||
results = await asyncio.gather(*child_coroutines)
|
results = await track_workflow.aio_run_many(bulk_runs)
|
||||||
|
|
||||||
target_language = participants_result.target_language
|
target_language = participants_result.target_language
|
||||||
|
|
||||||
track_words = []
|
track_words: list[list[Word]] = []
|
||||||
padded_tracks = []
|
padded_tracks = []
|
||||||
created_padded_files = set()
|
created_padded_files = set()
|
||||||
|
|
||||||
for result in results:
|
for result in results:
|
||||||
transcribe_result = result.get("transcribe_track", {})
|
transcribe_result = TranscribeTrackResult(**result["transcribe_track"])
|
||||||
track_words.append(transcribe_result.get("words", []))
|
track_words.append(transcribe_result.words)
|
||||||
|
|
||||||
pad_result = result.get("pad_track", {})
|
pad_result = PadTrackResult(**result["pad_track"])
|
||||||
padded_key = pad_result.get("padded_key")
|
|
||||||
bucket_name = pad_result.get("bucket_name")
|
|
||||||
|
|
||||||
# Store S3 key info (not presigned URL) - consumer tasks presign on demand
|
# Store S3 key info (not presigned URL) - consumer tasks presign on demand
|
||||||
if padded_key:
|
if pad_result.padded_key:
|
||||||
padded_tracks.append(
|
padded_tracks.append(
|
||||||
PaddedTrackInfo(key=padded_key, bucket_name=bucket_name)
|
PaddedTrackInfo(
|
||||||
|
key=pad_result.padded_key, bucket_name=pad_result.bucket_name
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
track_index = pad_result.get("track_index")
|
if pad_result.size > 0:
|
||||||
if pad_result.get("size", 0) > 0 and track_index is not None:
|
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{pad_result.track_index}.webm"
|
||||||
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{track_index}.webm"
|
|
||||||
created_padded_files.add(storage_path)
|
created_padded_files.add(storage_path)
|
||||||
|
|
||||||
all_words = [word for words in track_words for word in words]
|
all_words = [word for words in track_words for word in words]
|
||||||
all_words.sort(key=lambda w: w.get("start", 0))
|
all_words.sort(key=lambda w: w.start)
|
||||||
|
|
||||||
ctx.log(
|
ctx.log(
|
||||||
f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks"
|
f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks"
|
||||||
@@ -381,8 +407,10 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[process_tracks], execution_timeout=timedelta(seconds=300), retries=3
|
parents=[process_tracks],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
|
||||||
|
retries=3,
|
||||||
)
|
)
|
||||||
@with_error_handling("mixdown_tracks")
|
@with_error_handling("mixdown_tracks")
|
||||||
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
|
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
|
||||||
@@ -462,8 +490,10 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=120), retries=3
|
parents=[mixdown_tracks],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
|
||||||
|
retries=3,
|
||||||
)
|
)
|
||||||
@with_error_handling("generate_waveform")
|
@with_error_handling("generate_waveform")
|
||||||
async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResult:
|
async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResult:
|
||||||
@@ -528,64 +558,113 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
|
|||||||
return WaveformResult(waveform_generated=True)
|
return WaveformResult(waveform_generated=True)
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[mixdown_tracks], execution_timeout=timedelta(seconds=300), retries=3
|
parents=[process_tracks],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||||
|
retries=3,
|
||||||
)
|
)
|
||||||
@with_error_handling("detect_topics")
|
@with_error_handling("detect_topics")
|
||||||
async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
|
async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
|
||||||
"""Detect topics using LLM and save to database (matches Celery on_topic callback)."""
|
"""Detect topics using parallel child workflows (one per chunk)."""
|
||||||
ctx.log("detect_topics: analyzing transcript for topics")
|
ctx.log("detect_topics: analyzing transcript for topics")
|
||||||
|
|
||||||
track_result = ctx.task_output(process_tracks)
|
track_result = ctx.task_output(process_tracks)
|
||||||
words = track_result.all_words
|
words = track_result.all_words
|
||||||
target_language = track_result.target_language
|
|
||||||
|
|
||||||
|
if not words:
|
||||||
|
ctx.log("detect_topics: no words, returning empty topics")
|
||||||
|
return TopicsResult(topics=[])
|
||||||
|
|
||||||
|
# Deferred imports: Hatchet workers fork processes
|
||||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
from reflector.db.transcripts import ( # noqa: PLC0415
|
||||||
TranscriptTopic,
|
TranscriptTopic,
|
||||||
transcripts_controller,
|
transcripts_controller,
|
||||||
)
|
)
|
||||||
|
|
||||||
word_objects = [Word(**w) for w in words]
|
chunk_size = TOPIC_CHUNK_WORD_COUNT
|
||||||
transcript_type = TranscriptType(words=word_objects)
|
chunks = []
|
||||||
|
for i in range(0, len(words), chunk_size):
|
||||||
|
chunk_words = words[i : i + chunk_size]
|
||||||
|
if not chunk_words:
|
||||||
|
continue
|
||||||
|
|
||||||
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
|
first_word = chunk_words[0]
|
||||||
|
last_word = chunk_words[-1]
|
||||||
|
timestamp = first_word.start
|
||||||
|
duration = last_word.end - timestamp
|
||||||
|
chunk_text = " ".join(w.text for w in chunk_words)
|
||||||
|
|
||||||
|
chunks.append(
|
||||||
|
{
|
||||||
|
"index": len(chunks),
|
||||||
|
"text": chunk_text,
|
||||||
|
"timestamp": timestamp,
|
||||||
|
"duration": duration,
|
||||||
|
"words": chunk_words,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if not chunks:
|
||||||
|
ctx.log("detect_topics: no chunks generated, returning empty topics")
|
||||||
|
return TopicsResult(topics=[])
|
||||||
|
|
||||||
|
ctx.log(f"detect_topics: spawning {len(chunks)} topic chunk workflows in parallel")
|
||||||
|
|
||||||
|
bulk_runs = [
|
||||||
|
topic_chunk_workflow.create_bulk_run_item(
|
||||||
|
input=TopicChunkInput(
|
||||||
|
chunk_index=chunk["index"],
|
||||||
|
chunk_text=chunk["text"],
|
||||||
|
timestamp=chunk["timestamp"],
|
||||||
|
duration=chunk["duration"],
|
||||||
|
words=chunk["words"],
|
||||||
|
)
|
||||||
|
)
|
||||||
|
for chunk in chunks
|
||||||
|
]
|
||||||
|
|
||||||
|
results = await topic_chunk_workflow.aio_run_many(bulk_runs)
|
||||||
|
|
||||||
|
topic_chunks = [
|
||||||
|
TopicChunkResult(**result["detect_chunk_topic"]) for result in results
|
||||||
|
]
|
||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||||
|
|
||||||
async def on_topic_callback(data):
|
for chunk in topic_chunks:
|
||||||
topic = TranscriptTopic(
|
topic = TranscriptTopic(
|
||||||
title=data.title,
|
title=chunk.title,
|
||||||
summary=data.summary,
|
summary=chunk.summary,
|
||||||
timestamp=data.timestamp,
|
timestamp=chunk.timestamp,
|
||||||
transcript=data.transcript.text,
|
transcript=" ".join(w.text for w in chunk.words),
|
||||||
words=data.transcript.words,
|
words=[w.model_dump() for w in chunk.words],
|
||||||
)
|
)
|
||||||
if isinstance(
|
|
||||||
data, TitleSummaryWithId
|
|
||||||
): # Celery parity: main_live_pipeline.py
|
|
||||||
topic.id = data.id
|
|
||||||
await transcripts_controller.upsert_topic(transcript, topic)
|
await transcripts_controller.upsert_topic(transcript, topic)
|
||||||
await append_event_and_broadcast(
|
await append_event_and_broadcast(
|
||||||
input.transcript_id, transcript, "TOPIC", topic, logger=logger
|
input.transcript_id, transcript, "TOPIC", topic, logger=logger
|
||||||
)
|
)
|
||||||
|
|
||||||
topics = await topic_processing.detect_topics(
|
topics_list = [
|
||||||
transcript_type,
|
TitleSummary(
|
||||||
target_language,
|
title=chunk.title,
|
||||||
on_topic_callback=on_topic_callback,
|
summary=chunk.summary,
|
||||||
empty_pipeline=empty_pipeline,
|
timestamp=chunk.timestamp,
|
||||||
|
duration=chunk.duration,
|
||||||
|
transcript=TranscriptType(words=chunk.words),
|
||||||
)
|
)
|
||||||
|
for chunk in topic_chunks
|
||||||
topics_list = [t.model_dump() for t in topics]
|
]
|
||||||
|
|
||||||
ctx.log(f"detect_topics complete: found {len(topics_list)} topics")
|
ctx.log(f"detect_topics complete: found {len(topics_list)} topics")
|
||||||
|
|
||||||
return TopicsResult(topics=topics_list)
|
return TopicsResult(topics=topics_list)
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[detect_topics], execution_timeout=timedelta(seconds=600), retries=3
|
parents=[detect_topics],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||||
|
retries=3,
|
||||||
)
|
)
|
||||||
@with_error_handling("generate_title")
|
@with_error_handling("generate_title")
|
||||||
async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
|
async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
|
||||||
@@ -601,8 +680,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
|
|||||||
transcripts_controller,
|
transcripts_controller,
|
||||||
)
|
)
|
||||||
|
|
||||||
topic_objects = [TitleSummary(**t) for t in topics]
|
ctx.log(f"generate_title: received {len(topics)} TitleSummary objects")
|
||||||
ctx.log(f"generate_title: created {len(topic_objects)} TitleSummary objects")
|
|
||||||
|
|
||||||
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
|
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
|
||||||
title_result = None
|
title_result = None
|
||||||
@@ -634,7 +712,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
|
|||||||
|
|
||||||
ctx.log("generate_title: calling topic_processing.generate_title (LLM call)...")
|
ctx.log("generate_title: calling topic_processing.generate_title (LLM call)...")
|
||||||
await topic_processing.generate_title(
|
await topic_processing.generate_title(
|
||||||
topic_objects,
|
topics,
|
||||||
on_title_callback=on_title_callback,
|
on_title_callback=on_title_callback,
|
||||||
empty_pipeline=empty_pipeline,
|
empty_pipeline=empty_pipeline,
|
||||||
logger=logger,
|
logger=logger,
|
||||||
@@ -646,98 +724,278 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
|
|||||||
return TitleResult(title=title_result)
|
return TitleResult(title=title_result)
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[detect_topics], execution_timeout=timedelta(seconds=600), retries=3
|
parents=[detect_topics],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
|
||||||
|
retries=3,
|
||||||
)
|
)
|
||||||
@with_error_handling("generate_summary")
|
@with_error_handling("extract_subjects")
|
||||||
async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
|
async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult:
|
||||||
"""Generate meeting summary using LLM and save to database (matches Celery callbacks)."""
|
"""Extract main subjects/topics from transcript for parallel processing."""
|
||||||
ctx.log(f"generate_summary: starting for transcript_id={input.transcript_id}")
|
ctx.log(f"extract_subjects: starting for transcript_id={input.transcript_id}")
|
||||||
|
|
||||||
topics_result = ctx.task_output(detect_topics)
|
topics_result = ctx.task_output(detect_topics)
|
||||||
topics = topics_result.topics
|
topics = topics_result.topics
|
||||||
ctx.log(f"generate_summary: received {len(topics)} topics from detect_topics")
|
|
||||||
|
|
||||||
|
if not topics:
|
||||||
|
ctx.log("extract_subjects: no topics, returning empty subjects")
|
||||||
|
return SubjectsResult(
|
||||||
|
subjects=[],
|
||||||
|
transcript_text="",
|
||||||
|
participant_names=[],
|
||||||
|
participant_name_to_id={},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
||||||
|
# sharing DB connections and LLM HTTP pools across forks
|
||||||
|
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||||
|
from reflector.llm import LLM # noqa: PLC0415
|
||||||
|
from reflector.processors.summary.summary_builder import ( # noqa: PLC0415
|
||||||
|
SummaryBuilder,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with fresh_db_connection():
|
||||||
|
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||||
|
|
||||||
|
# Build transcript text from topics (same logic as TranscriptFinalSummaryProcessor)
|
||||||
|
speakermap = {}
|
||||||
|
if transcript and transcript.participants:
|
||||||
|
speakermap = {
|
||||||
|
p.speaker: p.name
|
||||||
|
for p in transcript.participants
|
||||||
|
if p.speaker is not None and p.name
|
||||||
|
}
|
||||||
|
|
||||||
|
text_lines = []
|
||||||
|
for topic in topics:
|
||||||
|
for segment in topic.transcript.as_segments():
|
||||||
|
name = speakermap.get(segment.speaker, f"Speaker {segment.speaker}")
|
||||||
|
text_lines.append(f"{name}: {segment.text}")
|
||||||
|
|
||||||
|
transcript_text = "\n".join(text_lines)
|
||||||
|
|
||||||
|
participant_names = []
|
||||||
|
participant_name_to_id = {}
|
||||||
|
if transcript and transcript.participants:
|
||||||
|
participant_names = [p.name for p in transcript.participants if p.name]
|
||||||
|
participant_name_to_id = {
|
||||||
|
p.name: p.id for p in transcript.participants if p.name and p.id
|
||||||
|
}
|
||||||
|
|
||||||
|
# TODO: refactor SummaryBuilder methods into standalone functions
|
||||||
|
llm = LLM(settings=settings)
|
||||||
|
builder = SummaryBuilder(llm, logger=logger)
|
||||||
|
builder.set_transcript(transcript_text)
|
||||||
|
|
||||||
|
if participant_names:
|
||||||
|
builder.set_known_participants(
|
||||||
|
participant_names, participant_name_to_id=participant_name_to_id
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx.log("extract_subjects: calling LLM to extract subjects")
|
||||||
|
await builder.extract_subjects()
|
||||||
|
|
||||||
|
ctx.log(f"extract_subjects complete: {len(builder.subjects)} subjects")
|
||||||
|
|
||||||
|
return SubjectsResult(
|
||||||
|
subjects=builder.subjects,
|
||||||
|
transcript_text=transcript_text,
|
||||||
|
participant_names=participant_names,
|
||||||
|
participant_name_to_id=participant_name_to_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@daily_multitrack_pipeline.task(
|
||||||
|
parents=[extract_subjects],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||||
|
retries=3,
|
||||||
|
)
|
||||||
|
@with_error_handling("process_subjects")
|
||||||
|
async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubjectsResult:
|
||||||
|
"""Spawn child workflows for each subject (dynamic fan-out, parallel LLM calls)."""
|
||||||
|
subjects_result = ctx.task_output(extract_subjects)
|
||||||
|
subjects = subjects_result.subjects
|
||||||
|
|
||||||
|
if not subjects:
|
||||||
|
ctx.log("process_subjects: no subjects to process")
|
||||||
|
return ProcessSubjectsResult(subject_summaries=[])
|
||||||
|
|
||||||
|
ctx.log(f"process_subjects: spawning {len(subjects)} subject workflows in parallel")
|
||||||
|
|
||||||
|
bulk_runs = [
|
||||||
|
subject_workflow.create_bulk_run_item(
|
||||||
|
input=SubjectInput(
|
||||||
|
subject=subject,
|
||||||
|
subject_index=i,
|
||||||
|
transcript_text=subjects_result.transcript_text,
|
||||||
|
participant_names=subjects_result.participant_names,
|
||||||
|
participant_name_to_id=subjects_result.participant_name_to_id,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
for i, subject in enumerate(subjects)
|
||||||
|
]
|
||||||
|
|
||||||
|
results = await subject_workflow.aio_run_many(bulk_runs)
|
||||||
|
|
||||||
|
subject_summaries = [
|
||||||
|
SubjectSummaryResult(**result["generate_detailed_summary"])
|
||||||
|
for result in results
|
||||||
|
]
|
||||||
|
|
||||||
|
ctx.log(f"process_subjects complete: {len(subject_summaries)} summaries")
|
||||||
|
|
||||||
|
return ProcessSubjectsResult(subject_summaries=subject_summaries)
|
||||||
|
|
||||||
|
|
||||||
|
@daily_multitrack_pipeline.task(
|
||||||
|
parents=[process_subjects],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
|
||||||
|
retries=3,
|
||||||
|
)
|
||||||
|
@with_error_handling("generate_recap")
|
||||||
|
async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
|
||||||
|
"""Generate recap and long summary from subject summaries, save to database."""
|
||||||
|
ctx.log(f"generate_recap: starting for transcript_id={input.transcript_id}")
|
||||||
|
|
||||||
|
subjects_result = ctx.task_output(extract_subjects)
|
||||||
|
process_result = ctx.task_output(process_subjects)
|
||||||
|
|
||||||
|
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
||||||
|
# sharing DB connections and LLM HTTP pools across forks
|
||||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
from reflector.db.transcripts import ( # noqa: PLC0415
|
||||||
TranscriptActionItems,
|
|
||||||
TranscriptFinalLongSummary,
|
TranscriptFinalLongSummary,
|
||||||
TranscriptFinalShortSummary,
|
TranscriptFinalShortSummary,
|
||||||
transcripts_controller,
|
transcripts_controller,
|
||||||
)
|
)
|
||||||
|
from reflector.llm import LLM # noqa: PLC0415
|
||||||
|
from reflector.processors.summary.prompts import ( # noqa: PLC0415
|
||||||
|
RECAP_PROMPT,
|
||||||
|
build_participant_instructions,
|
||||||
|
build_summary_markdown,
|
||||||
|
)
|
||||||
|
|
||||||
topic_objects = [TitleSummary(**t) for t in topics]
|
subject_summaries = process_result.subject_summaries
|
||||||
ctx.log(f"generate_summary: created {len(topic_objects)} TitleSummary objects")
|
|
||||||
|
|
||||||
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
|
if not subject_summaries:
|
||||||
summary_result = None
|
ctx.log("generate_recap: no subject summaries, returning empty")
|
||||||
short_summary_result = None
|
return RecapResult(short_summary="", long_summary="")
|
||||||
action_items_result = None
|
|
||||||
|
summaries = [
|
||||||
|
{"subject": s.subject, "summary": s.paragraph_summary}
|
||||||
|
for s in subject_summaries
|
||||||
|
]
|
||||||
|
|
||||||
|
summaries_text = "\n\n".join([f"{s['subject']}: {s['summary']}" for s in summaries])
|
||||||
|
|
||||||
|
llm = LLM(settings=settings)
|
||||||
|
|
||||||
|
participant_instructions = build_participant_instructions(
|
||||||
|
subjects_result.participant_names
|
||||||
|
)
|
||||||
|
|
||||||
|
recap_prompt = RECAP_PROMPT
|
||||||
|
if participant_instructions:
|
||||||
|
recap_prompt = f"{recap_prompt}\n\n{participant_instructions}"
|
||||||
|
|
||||||
|
ctx.log("generate_recap: calling LLM for recap")
|
||||||
|
recap_response = await llm.get_response(
|
||||||
|
recap_prompt,
|
||||||
|
[summaries_text],
|
||||||
|
tone_name="Recap summarizer",
|
||||||
|
)
|
||||||
|
short_summary = str(recap_response)
|
||||||
|
|
||||||
|
long_summary = build_summary_markdown(short_summary, summaries)
|
||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
ctx.log("generate_summary: DB connection established")
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||||
ctx.log(
|
if transcript:
|
||||||
f"generate_summary: fetched transcript, exists={transcript is not None}"
|
|
||||||
)
|
|
||||||
|
|
||||||
async def on_long_summary_callback(data):
|
|
||||||
nonlocal summary_result
|
|
||||||
ctx.log(
|
|
||||||
f"generate_summary: on_long_summary_callback received ({len(data.long_summary)} chars)"
|
|
||||||
)
|
|
||||||
summary_result = data.long_summary
|
|
||||||
final_long_summary = TranscriptFinalLongSummary(
|
|
||||||
long_summary=data.long_summary
|
|
||||||
)
|
|
||||||
await transcripts_controller.update(
|
await transcripts_controller.update(
|
||||||
transcript,
|
transcript,
|
||||||
{"long_summary": final_long_summary.long_summary},
|
{
|
||||||
|
"short_summary": short_summary,
|
||||||
|
"long_summary": long_summary,
|
||||||
|
},
|
||||||
)
|
)
|
||||||
ctx.log("generate_summary: saved long_summary to DB")
|
|
||||||
await append_event_and_broadcast(
|
|
||||||
input.transcript_id,
|
|
||||||
transcript,
|
|
||||||
"FINAL_LONG_SUMMARY",
|
|
||||||
final_long_summary,
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
ctx.log("generate_summary: broadcasted FINAL_LONG_SUMMARY event")
|
|
||||||
|
|
||||||
async def on_short_summary_callback(data):
|
final_short = TranscriptFinalShortSummary(short_summary=short_summary)
|
||||||
nonlocal short_summary_result
|
|
||||||
ctx.log(
|
|
||||||
f"generate_summary: on_short_summary_callback received ({len(data.short_summary)} chars)"
|
|
||||||
)
|
|
||||||
short_summary_result = data.short_summary
|
|
||||||
final_short_summary = TranscriptFinalShortSummary(
|
|
||||||
short_summary=data.short_summary
|
|
||||||
)
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript,
|
|
||||||
{"short_summary": final_short_summary.short_summary},
|
|
||||||
)
|
|
||||||
ctx.log("generate_summary: saved short_summary to DB")
|
|
||||||
await append_event_and_broadcast(
|
await append_event_and_broadcast(
|
||||||
input.transcript_id,
|
input.transcript_id,
|
||||||
transcript,
|
transcript,
|
||||||
"FINAL_SHORT_SUMMARY",
|
"FINAL_SHORT_SUMMARY",
|
||||||
final_short_summary,
|
final_short,
|
||||||
logger=logger,
|
logger=logger,
|
||||||
)
|
)
|
||||||
ctx.log("generate_summary: broadcasted FINAL_SHORT_SUMMARY event")
|
|
||||||
|
|
||||||
async def on_action_items_callback(data):
|
final_long = TranscriptFinalLongSummary(long_summary=long_summary)
|
||||||
nonlocal action_items_result
|
await append_event_and_broadcast(
|
||||||
ctx.log(
|
input.transcript_id,
|
||||||
f"generate_summary: on_action_items_callback received ({len(data.action_items)} items)"
|
|
||||||
)
|
|
||||||
action_items_result = data.action_items
|
|
||||||
action_items = TranscriptActionItems(action_items=data.action_items)
|
|
||||||
await transcripts_controller.update(
|
|
||||||
transcript,
|
transcript,
|
||||||
{"action_items": action_items.action_items},
|
"FINAL_LONG_SUMMARY",
|
||||||
|
final_long,
|
||||||
|
logger=logger,
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx.log("generate_recap complete")
|
||||||
|
|
||||||
|
return RecapResult(short_summary=short_summary, long_summary=long_summary)
|
||||||
|
|
||||||
|
|
||||||
|
@daily_multitrack_pipeline.task(
|
||||||
|
parents=[extract_subjects],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_LONG),
|
||||||
|
retries=3,
|
||||||
|
)
|
||||||
|
@with_error_handling("identify_action_items")
|
||||||
|
async def identify_action_items(
|
||||||
|
input: PipelineInput, ctx: Context
|
||||||
|
) -> ActionItemsResult:
|
||||||
|
"""Identify action items from transcript (parallel with subject processing)."""
|
||||||
|
ctx.log(f"identify_action_items: starting for transcript_id={input.transcript_id}")
|
||||||
|
|
||||||
|
subjects_result = ctx.task_output(extract_subjects)
|
||||||
|
|
||||||
|
if not subjects_result.transcript_text:
|
||||||
|
ctx.log("identify_action_items: no transcript text, returning empty")
|
||||||
|
return ActionItemsResult(action_items={"decisions": [], "next_steps": []})
|
||||||
|
|
||||||
|
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
||||||
|
# sharing DB connections and LLM HTTP pools across forks
|
||||||
|
from reflector.db.transcripts import ( # noqa: PLC0415
|
||||||
|
TranscriptActionItems,
|
||||||
|
transcripts_controller,
|
||||||
|
)
|
||||||
|
from reflector.llm import LLM # noqa: PLC0415
|
||||||
|
from reflector.processors.summary.summary_builder import ( # noqa: PLC0415
|
||||||
|
SummaryBuilder,
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO: refactor SummaryBuilder methods into standalone functions
|
||||||
|
llm = LLM(settings=settings)
|
||||||
|
builder = SummaryBuilder(llm, logger=logger)
|
||||||
|
builder.set_transcript(subjects_result.transcript_text)
|
||||||
|
|
||||||
|
if subjects_result.participant_names:
|
||||||
|
builder.set_known_participants(
|
||||||
|
subjects_result.participant_names,
|
||||||
|
participant_name_to_id=subjects_result.participant_name_to_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx.log("identify_action_items: calling LLM")
|
||||||
|
action_items_response = await builder.identify_action_items()
|
||||||
|
|
||||||
|
if action_items_response is None:
|
||||||
|
raise RuntimeError("Failed to identify action items - LLM call failed")
|
||||||
|
|
||||||
|
action_items_dict = action_items_response.model_dump()
|
||||||
|
|
||||||
|
async with fresh_db_connection():
|
||||||
|
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||||
|
if transcript:
|
||||||
|
action_items = TranscriptActionItems(action_items=action_items_dict)
|
||||||
|
await transcripts_controller.update(
|
||||||
|
transcript, {"action_items": action_items.action_items}
|
||||||
)
|
)
|
||||||
ctx.log("generate_summary: saved action_items to DB")
|
|
||||||
await append_event_and_broadcast(
|
await append_event_and_broadcast(
|
||||||
input.transcript_id,
|
input.transcript_id,
|
||||||
transcript,
|
transcript,
|
||||||
@@ -745,34 +1003,18 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
|
|||||||
action_items,
|
action_items,
|
||||||
logger=logger,
|
logger=logger,
|
||||||
)
|
)
|
||||||
ctx.log("generate_summary: broadcasted ACTION_ITEMS event")
|
|
||||||
|
|
||||||
ctx.log(
|
ctx.log(
|
||||||
"generate_summary: calling topic_processing.generate_summaries (LLM calls)..."
|
f"identify_action_items complete: {len(action_items_dict.get('decisions', []))} decisions, "
|
||||||
)
|
f"{len(action_items_dict.get('next_steps', []))} next steps"
|
||||||
await topic_processing.generate_summaries(
|
|
||||||
topic_objects,
|
|
||||||
transcript,
|
|
||||||
on_long_summary_callback=on_long_summary_callback,
|
|
||||||
on_short_summary_callback=on_short_summary_callback,
|
|
||||||
on_action_items_callback=on_action_items_callback,
|
|
||||||
empty_pipeline=empty_pipeline,
|
|
||||||
logger=logger,
|
|
||||||
)
|
|
||||||
ctx.log("generate_summary: topic_processing.generate_summaries returned")
|
|
||||||
|
|
||||||
ctx.log("generate_summary complete")
|
|
||||||
|
|
||||||
return SummaryResult(
|
|
||||||
summary=summary_result,
|
|
||||||
short_summary=short_summary_result,
|
|
||||||
action_items=action_items_result,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
return ActionItemsResult(action_items=action_items_dict)
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
|
||||||
parents=[generate_waveform, generate_title, generate_summary],
|
@daily_multitrack_pipeline.task(
|
||||||
execution_timeout=timedelta(seconds=60),
|
parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
|
||||||
retries=3,
|
retries=3,
|
||||||
)
|
)
|
||||||
@with_error_handling("finalize")
|
@with_error_handling("finalize")
|
||||||
@@ -818,8 +1060,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
|
|||||||
if transcript is None:
|
if transcript is None:
|
||||||
raise ValueError(f"Transcript {input.transcript_id} not found in database")
|
raise ValueError(f"Transcript {input.transcript_id} not found in database")
|
||||||
|
|
||||||
word_objects = [Word(**w) for w in all_words]
|
merged_transcript = TranscriptType(words=all_words, translation=None)
|
||||||
merged_transcript = TranscriptType(words=word_objects, translation=None)
|
|
||||||
|
|
||||||
await append_event_and_broadcast(
|
await append_event_and_broadcast(
|
||||||
input.transcript_id,
|
input.transcript_id,
|
||||||
@@ -856,8 +1097,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
|
|||||||
return FinalizeResult(status="COMPLETED")
|
return FinalizeResult(status="COMPLETED")
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[finalize], execution_timeout=timedelta(seconds=60), retries=3
|
parents=[finalize], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
|
||||||
)
|
)
|
||||||
@with_error_handling("cleanup_consent", set_error_status=False)
|
@with_error_handling("cleanup_consent", set_error_status=False)
|
||||||
async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
|
async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
|
||||||
@@ -956,8 +1197,10 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
|
|||||||
return ConsentResult()
|
return ConsentResult()
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[cleanup_consent], execution_timeout=timedelta(seconds=60), retries=5
|
parents=[cleanup_consent],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
|
||||||
|
retries=5,
|
||||||
)
|
)
|
||||||
@with_error_handling("post_zulip", set_error_status=False)
|
@with_error_handling("post_zulip", set_error_status=False)
|
||||||
async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
|
async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
|
||||||
@@ -981,12 +1224,14 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
|
|||||||
return ZulipResult(zulip_message_id=message_id)
|
return ZulipResult(zulip_message_id=message_id)
|
||||||
|
|
||||||
|
|
||||||
@diarization_pipeline.task(
|
@daily_multitrack_pipeline.task(
|
||||||
parents=[post_zulip], execution_timeout=timedelta(seconds=120), retries=30
|
parents=[cleanup_consent],
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
|
||||||
|
retries=5,
|
||||||
)
|
)
|
||||||
@with_error_handling("send_webhook", set_error_status=False)
|
@with_error_handling("send_webhook", set_error_status=False)
|
||||||
async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
|
async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
|
||||||
"""Send completion webhook to external service."""
|
"""Send completion webhook to external service with full payload and HMAC signature."""
|
||||||
ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
|
ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
|
||||||
|
|
||||||
if not input.room_id:
|
if not input.room_id:
|
||||||
@@ -995,27 +1240,39 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
|
|||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
from reflector.utils.webhook import ( # noqa: PLC0415
|
||||||
|
build_transcript_webhook_payload,
|
||||||
|
send_webhook_request,
|
||||||
|
)
|
||||||
|
|
||||||
room = await rooms_controller.get_by_id(input.room_id)
|
room = await rooms_controller.get_by_id(input.room_id)
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
if not room or not room.webhook_url:
|
||||||
|
ctx.log("send_webhook skipped (no webhook_url configured)")
|
||||||
|
return WebhookResult(webhook_sent=False, skipped=True)
|
||||||
|
|
||||||
if room and room.webhook_url and transcript:
|
payload = await build_transcript_webhook_payload(
|
||||||
webhook_payload = {
|
transcript_id=input.transcript_id,
|
||||||
"event": "transcript.completed",
|
room_id=input.room_id,
|
||||||
"transcript_id": input.transcript_id,
|
)
|
||||||
"title": transcript.title,
|
|
||||||
"duration": transcript.duration,
|
|
||||||
}
|
|
||||||
|
|
||||||
async with httpx.AsyncClient() as client:
|
if not payload:
|
||||||
response = await client.post(
|
ctx.log("send_webhook skipped (could not build payload)")
|
||||||
room.webhook_url, json=webhook_payload, timeout=30
|
return WebhookResult(webhook_sent=False, skipped=True)
|
||||||
)
|
|
||||||
response.raise_for_status()
|
|
||||||
|
|
||||||
ctx.log(f"send_webhook complete: status_code={response.status_code}")
|
ctx.log(
|
||||||
|
f"send_webhook: sending to {room.webhook_url} "
|
||||||
|
f"(topics={len(payload.transcript.topics)}, "
|
||||||
|
f"participants={len(payload.transcript.participants)})"
|
||||||
|
)
|
||||||
|
|
||||||
return WebhookResult(webhook_sent=True, response_code=response.status_code)
|
response = await send_webhook_request(
|
||||||
|
url=room.webhook_url,
|
||||||
|
payload=payload,
|
||||||
|
event_type="transcript.completed",
|
||||||
|
webhook_secret=room.webhook_secret,
|
||||||
|
timeout=30.0,
|
||||||
|
)
|
||||||
|
|
||||||
return WebhookResult(webhook_sent=False, skipped=True)
|
ctx.log(f"send_webhook complete: status_code={response.status_code}")
|
||||||
|
|
||||||
|
return WebhookResult(webhook_sent=True, response_code=response.status_code)
|
||||||
@@ -5,13 +5,20 @@ Provides static typing for all task outputs, enabling type checking
|
|||||||
and better IDE support.
|
and better IDE support.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from reflector.processors.types import TitleSummary, Word
|
||||||
from reflector.utils.string import NonEmptyString
|
from reflector.utils.string import NonEmptyString
|
||||||
|
|
||||||
|
|
||||||
|
class ParticipantInfo(BaseModel):
|
||||||
|
"""Participant info with speaker index for workflow result."""
|
||||||
|
|
||||||
|
participant_id: NonEmptyString
|
||||||
|
user_name: NonEmptyString
|
||||||
|
speaker: int
|
||||||
|
|
||||||
|
|
||||||
class PadTrackResult(BaseModel):
|
class PadTrackResult(BaseModel):
|
||||||
"""Result from pad_track task."""
|
"""Result from pad_track task."""
|
||||||
|
|
||||||
@@ -26,7 +33,7 @@ class PadTrackResult(BaseModel):
|
|||||||
class TranscribeTrackResult(BaseModel):
|
class TranscribeTrackResult(BaseModel):
|
||||||
"""Result from transcribe_track task."""
|
"""Result from transcribe_track task."""
|
||||||
|
|
||||||
words: list[dict[str, Any]]
|
words: list[Word]
|
||||||
track_index: int
|
track_index: int
|
||||||
|
|
||||||
|
|
||||||
@@ -41,7 +48,7 @@ class RecordingResult(BaseModel):
|
|||||||
class ParticipantsResult(BaseModel):
|
class ParticipantsResult(BaseModel):
|
||||||
"""Result from get_participants task."""
|
"""Result from get_participants task."""
|
||||||
|
|
||||||
participants: list[dict[str, Any]]
|
participants: list[ParticipantInfo]
|
||||||
num_tracks: int
|
num_tracks: int
|
||||||
source_language: NonEmptyString
|
source_language: NonEmptyString
|
||||||
target_language: NonEmptyString
|
target_language: NonEmptyString
|
||||||
@@ -57,7 +64,7 @@ class PaddedTrackInfo(BaseModel):
|
|||||||
class ProcessTracksResult(BaseModel):
|
class ProcessTracksResult(BaseModel):
|
||||||
"""Result from process_tracks task."""
|
"""Result from process_tracks task."""
|
||||||
|
|
||||||
all_words: list[dict[str, Any]]
|
all_words: list[Word]
|
||||||
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
|
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
|
||||||
word_count: int
|
word_count: int
|
||||||
num_tracks: int
|
num_tracks: int
|
||||||
@@ -79,10 +86,21 @@ class WaveformResult(BaseModel):
|
|||||||
waveform_generated: bool
|
waveform_generated: bool
|
||||||
|
|
||||||
|
|
||||||
|
class TopicChunkResult(BaseModel):
|
||||||
|
"""Result from topic chunk child workflow."""
|
||||||
|
|
||||||
|
chunk_index: int
|
||||||
|
title: str
|
||||||
|
summary: str
|
||||||
|
timestamp: float
|
||||||
|
duration: float
|
||||||
|
words: list[Word]
|
||||||
|
|
||||||
|
|
||||||
class TopicsResult(BaseModel):
|
class TopicsResult(BaseModel):
|
||||||
"""Result from detect_topics task."""
|
"""Result from detect_topics task."""
|
||||||
|
|
||||||
topics: list[dict[str, Any]]
|
topics: list[TitleSummary]
|
||||||
|
|
||||||
|
|
||||||
class TitleResult(BaseModel):
|
class TitleResult(BaseModel):
|
||||||
@@ -91,12 +109,41 @@ class TitleResult(BaseModel):
|
|||||||
title: str | None
|
title: str | None
|
||||||
|
|
||||||
|
|
||||||
class SummaryResult(BaseModel):
|
class SubjectsResult(BaseModel):
|
||||||
"""Result from generate_summary task."""
|
"""Result from extract_subjects task."""
|
||||||
|
|
||||||
summary: str | None
|
subjects: list[str]
|
||||||
short_summary: str | None
|
transcript_text: str # Formatted transcript for LLM consumption
|
||||||
action_items: dict | None = None
|
participant_names: list[str]
|
||||||
|
participant_name_to_id: dict[str, str]
|
||||||
|
|
||||||
|
|
||||||
|
class SubjectSummaryResult(BaseModel):
|
||||||
|
"""Result from subject summary child workflow."""
|
||||||
|
|
||||||
|
subject: str
|
||||||
|
subject_index: int
|
||||||
|
detailed_summary: str
|
||||||
|
paragraph_summary: str
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessSubjectsResult(BaseModel):
|
||||||
|
"""Result from process_subjects fan-out task."""
|
||||||
|
|
||||||
|
subject_summaries: list[SubjectSummaryResult]
|
||||||
|
|
||||||
|
|
||||||
|
class RecapResult(BaseModel):
|
||||||
|
"""Result from generate_recap task."""
|
||||||
|
|
||||||
|
short_summary: str # Recap paragraph
|
||||||
|
long_summary: str # Full markdown summary
|
||||||
|
|
||||||
|
|
||||||
|
class ActionItemsResult(BaseModel):
|
||||||
|
"""Result from identify_action_items task."""
|
||||||
|
|
||||||
|
action_items: dict # ActionItemsResponse as dict (may have empty lists)
|
||||||
|
|
||||||
|
|
||||||
class FinalizeResult(BaseModel):
|
class FinalizeResult(BaseModel):
|
||||||
|
|||||||
107
server/reflector/hatchet/workflows/subject_processing.py
Normal file
107
server/reflector/hatchet/workflows/subject_processing.py
Normal file
@@ -0,0 +1,107 @@
|
|||||||
|
"""
|
||||||
|
Hatchet child workflow: SubjectProcessing
|
||||||
|
|
||||||
|
Handles individual subject/topic summary generation.
|
||||||
|
Spawned dynamically by the main diarization pipeline for each extracted subject
|
||||||
|
via aio_run_many() for parallel processing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
|
from hatchet_sdk import Context
|
||||||
|
from hatchet_sdk.rate_limit import RateLimit
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from reflector.hatchet.client import HatchetClientManager
|
||||||
|
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
|
||||||
|
from reflector.hatchet.workflows.models import SubjectSummaryResult
|
||||||
|
from reflector.logger import logger
|
||||||
|
from reflector.processors.summary.prompts import (
|
||||||
|
DETAILED_SUBJECT_PROMPT_TEMPLATE,
|
||||||
|
PARAGRAPH_SUMMARY_PROMPT,
|
||||||
|
build_participant_instructions,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class SubjectInput(BaseModel):
|
||||||
|
"""Input for individual subject processing."""
|
||||||
|
|
||||||
|
subject: str
|
||||||
|
subject_index: int
|
||||||
|
transcript_text: str
|
||||||
|
participant_names: list[str]
|
||||||
|
participant_name_to_id: dict[str, str]
|
||||||
|
|
||||||
|
|
||||||
|
hatchet = HatchetClientManager.get_client()
|
||||||
|
|
||||||
|
subject_workflow = hatchet.workflow(
|
||||||
|
name="SubjectProcessing", input_validator=SubjectInput
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@subject_workflow.task(
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
|
||||||
|
retries=3,
|
||||||
|
rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=2)],
|
||||||
|
)
|
||||||
|
async def generate_detailed_summary(
|
||||||
|
input: SubjectInput, ctx: Context
|
||||||
|
) -> SubjectSummaryResult:
|
||||||
|
"""Generate detailed analysis for a single subject, then condense to paragraph."""
|
||||||
|
ctx.log(
|
||||||
|
f"generate_detailed_summary: subject '{input.subject}' (index {input.subject_index})"
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"[Hatchet] generate_detailed_summary",
|
||||||
|
subject=input.subject,
|
||||||
|
subject_index=input.subject_index,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Deferred imports: Hatchet workers fork processes, fresh imports ensure
|
||||||
|
# LLM HTTP connection pools aren't shared across forks
|
||||||
|
from reflector.llm import LLM # noqa: PLC0415
|
||||||
|
from reflector.settings import settings # noqa: PLC0415
|
||||||
|
|
||||||
|
llm = LLM(settings=settings)
|
||||||
|
|
||||||
|
participant_instructions = build_participant_instructions(input.participant_names)
|
||||||
|
detailed_prompt = DETAILED_SUBJECT_PROMPT_TEMPLATE.format(subject=input.subject)
|
||||||
|
if participant_instructions:
|
||||||
|
detailed_prompt = f"{detailed_prompt}\n\n{participant_instructions}"
|
||||||
|
|
||||||
|
ctx.log("generate_detailed_summary: calling LLM for detailed analysis")
|
||||||
|
detailed_response = await llm.get_response(
|
||||||
|
detailed_prompt,
|
||||||
|
[input.transcript_text],
|
||||||
|
tone_name="Topic assistant",
|
||||||
|
)
|
||||||
|
detailed_summary = str(detailed_response)
|
||||||
|
|
||||||
|
paragraph_prompt = PARAGRAPH_SUMMARY_PROMPT
|
||||||
|
if participant_instructions:
|
||||||
|
paragraph_prompt = f"{paragraph_prompt}\n\n{participant_instructions}"
|
||||||
|
|
||||||
|
ctx.log("generate_detailed_summary: calling LLM for paragraph summary")
|
||||||
|
paragraph_response = await llm.get_response(
|
||||||
|
paragraph_prompt,
|
||||||
|
[detailed_summary],
|
||||||
|
tone_name="Topic summarizer",
|
||||||
|
)
|
||||||
|
paragraph_summary = str(paragraph_response)
|
||||||
|
|
||||||
|
ctx.log(f"generate_detailed_summary complete: subject '{input.subject}'")
|
||||||
|
logger.info(
|
||||||
|
"[Hatchet] generate_detailed_summary complete",
|
||||||
|
subject=input.subject,
|
||||||
|
subject_index=input.subject_index,
|
||||||
|
detailed_len=len(detailed_summary),
|
||||||
|
paragraph_len=len(paragraph_summary),
|
||||||
|
)
|
||||||
|
|
||||||
|
return SubjectSummaryResult(
|
||||||
|
subject=input.subject,
|
||||||
|
subject_index=input.subject_index,
|
||||||
|
detailed_summary=detailed_summary,
|
||||||
|
paragraph_summary=paragraph_summary,
|
||||||
|
)
|
||||||
96
server/reflector/hatchet/workflows/topic_chunk_processing.py
Normal file
96
server/reflector/hatchet/workflows/topic_chunk_processing.py
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
"""
|
||||||
|
Hatchet child workflow: TopicChunkProcessing
|
||||||
|
|
||||||
|
Handles topic detection for individual transcript chunks.
|
||||||
|
Spawned dynamically by detect_topics via aio_run_many() for parallel processing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import timedelta
|
||||||
|
|
||||||
|
from hatchet_sdk import ConcurrencyExpression, Context
|
||||||
|
from hatchet_sdk.rate_limit import RateLimit
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from reflector.hatchet.client import HatchetClientManager
|
||||||
|
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
|
||||||
|
from reflector.hatchet.workflows.models import TopicChunkResult
|
||||||
|
from reflector.logger import logger
|
||||||
|
from reflector.processors.prompts import TOPIC_PROMPT
|
||||||
|
from reflector.processors.types import Word
|
||||||
|
|
||||||
|
|
||||||
|
class TopicChunkInput(BaseModel):
|
||||||
|
"""Input for individual topic chunk processing."""
|
||||||
|
|
||||||
|
chunk_index: int
|
||||||
|
chunk_text: str
|
||||||
|
timestamp: float
|
||||||
|
duration: float
|
||||||
|
words: list[Word]
|
||||||
|
|
||||||
|
|
||||||
|
hatchet = HatchetClientManager.get_client()
|
||||||
|
|
||||||
|
topic_chunk_workflow = hatchet.workflow(
|
||||||
|
name="TopicChunkProcessing",
|
||||||
|
input_validator=TopicChunkInput,
|
||||||
|
concurrency=ConcurrencyExpression(
|
||||||
|
expression="true", # constant CEL expression = global limit
|
||||||
|
max_runs=20,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@topic_chunk_workflow.task(
|
||||||
|
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
|
||||||
|
retries=3,
|
||||||
|
rate_limits=[RateLimit(static_key=LLM_RATE_LIMIT_KEY, units=1)],
|
||||||
|
)
|
||||||
|
async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunkResult:
|
||||||
|
"""Detect topic for a single transcript chunk."""
|
||||||
|
ctx.log(f"detect_chunk_topic: chunk {input.chunk_index}")
|
||||||
|
logger.info(
|
||||||
|
"[Hatchet] detect_chunk_topic",
|
||||||
|
chunk_index=input.chunk_index,
|
||||||
|
text_length=len(input.chunk_text),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
||||||
|
# sharing LLM HTTP connection pools across forks
|
||||||
|
from reflector.llm import LLM # noqa: PLC0415
|
||||||
|
from reflector.processors.transcript_topic_detector import ( # noqa: PLC0415
|
||||||
|
TopicResponse,
|
||||||
|
)
|
||||||
|
from reflector.settings import settings # noqa: PLC0415
|
||||||
|
from reflector.utils.text import clean_title # noqa: PLC0415
|
||||||
|
|
||||||
|
llm = LLM(settings=settings, temperature=0.9, max_tokens=500)
|
||||||
|
|
||||||
|
prompt = TOPIC_PROMPT.format(text=input.chunk_text)
|
||||||
|
response = await llm.get_structured_response(
|
||||||
|
prompt,
|
||||||
|
[input.chunk_text],
|
||||||
|
TopicResponse,
|
||||||
|
tone_name="Topic analyzer",
|
||||||
|
timeout=settings.LLM_STRUCTURED_RESPONSE_TIMEOUT,
|
||||||
|
)
|
||||||
|
|
||||||
|
title = clean_title(response.title)
|
||||||
|
|
||||||
|
ctx.log(
|
||||||
|
f"detect_chunk_topic complete: chunk {input.chunk_index}, title='{title[:50]}'"
|
||||||
|
)
|
||||||
|
logger.info(
|
||||||
|
"[Hatchet] detect_chunk_topic complete",
|
||||||
|
chunk_index=input.chunk_index,
|
||||||
|
title=title[:50],
|
||||||
|
)
|
||||||
|
|
||||||
|
return TopicChunkResult(
|
||||||
|
chunk_index=input.chunk_index,
|
||||||
|
title=title,
|
||||||
|
summary=response.summary,
|
||||||
|
timestamp=input.timestamp,
|
||||||
|
duration=input.duration,
|
||||||
|
words=input.words,
|
||||||
|
)
|
||||||
@@ -4,10 +4,10 @@ Hatchet child workflow: TrackProcessing
|
|||||||
Handles individual audio track processing: padding and transcription.
|
Handles individual audio track processing: padding and transcription.
|
||||||
Spawned dynamically by the main diarization pipeline for each track.
|
Spawned dynamically by the main diarization pipeline for each track.
|
||||||
|
|
||||||
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline)
|
Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline)
|
||||||
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
|
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
|
||||||
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
|
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
|
||||||
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py.
|
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
|
||||||
|
|
||||||
Note: This file uses deferred imports (inside tasks) intentionally.
|
Note: This file uses deferred imports (inside tasks) intentionally.
|
||||||
Hatchet workers run in forked processes; fresh imports per task ensure
|
Hatchet workers run in forked processes; fresh imports per task ensure
|
||||||
@@ -23,6 +23,7 @@ from hatchet_sdk import Context
|
|||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
from reflector.hatchet.client import HatchetClientManager
|
||||||
|
from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
|
||||||
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
|
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
||||||
@@ -47,7 +48,7 @@ hatchet = HatchetClientManager.get_client()
|
|||||||
track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput)
|
track_workflow = hatchet.workflow(name="TrackProcessing", input_validator=TrackInput)
|
||||||
|
|
||||||
|
|
||||||
@track_workflow.task(execution_timeout=timedelta(seconds=300), retries=3)
|
@track_workflow.task(execution_timeout=timedelta(seconds=TIMEOUT_AUDIO), retries=3)
|
||||||
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
||||||
"""Pad single audio track with silence for alignment.
|
"""Pad single audio track with silence for alignment.
|
||||||
|
|
||||||
@@ -153,7 +154,7 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
|||||||
|
|
||||||
|
|
||||||
@track_workflow.task(
|
@track_workflow.task(
|
||||||
parents=[pad_track], execution_timeout=timedelta(seconds=600), retries=3
|
parents=[pad_track], execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), retries=3
|
||||||
)
|
)
|
||||||
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
|
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
|
||||||
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
|
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
|
||||||
@@ -197,23 +198,20 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
|
|||||||
transcript = await transcribe_file_with_processor(audio_url, input.language)
|
transcript = await transcribe_file_with_processor(audio_url, input.language)
|
||||||
|
|
||||||
# Tag all words with speaker index
|
# Tag all words with speaker index
|
||||||
words = []
|
|
||||||
for word in transcript.words:
|
for word in transcript.words:
|
||||||
word_dict = word.model_dump()
|
word.speaker = input.track_index
|
||||||
word_dict["speaker"] = input.track_index
|
|
||||||
words.append(word_dict)
|
|
||||||
|
|
||||||
ctx.log(
|
ctx.log(
|
||||||
f"transcribe_track complete: track {input.track_index}, {len(words)} words"
|
f"transcribe_track complete: track {input.track_index}, {len(transcript.words)} words"
|
||||||
)
|
)
|
||||||
logger.info(
|
logger.info(
|
||||||
"[Hatchet] transcribe_track complete",
|
"[Hatchet] transcribe_track complete",
|
||||||
track_index=input.track_index,
|
track_index=input.track_index,
|
||||||
word_count=len(words),
|
word_count=len(transcript.words),
|
||||||
)
|
)
|
||||||
|
|
||||||
return TranscribeTrackResult(
|
return TranscribeTrackResult(
|
||||||
words=words,
|
words=transcript.words,
|
||||||
track_index=input.track_index,
|
track_index=input.track_index,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ from reflector.processors import (
|
|||||||
)
|
)
|
||||||
from reflector.processors.types import TitleSummary
|
from reflector.processors.types import TitleSummary
|
||||||
from reflector.processors.types import Transcript as TranscriptType
|
from reflector.processors.types import Transcript as TranscriptType
|
||||||
|
from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT
|
||||||
|
|
||||||
|
|
||||||
class EmptyPipeline:
|
class EmptyPipeline:
|
||||||
@@ -38,7 +39,7 @@ async def detect_topics(
|
|||||||
on_topic_callback: Callable,
|
on_topic_callback: Callable,
|
||||||
empty_pipeline: EmptyPipeline,
|
empty_pipeline: EmptyPipeline,
|
||||||
) -> list[TitleSummary]:
|
) -> list[TitleSummary]:
|
||||||
chunk_size = 300
|
chunk_size = TOPIC_CHUNK_WORD_COUNT
|
||||||
topics: list[TitleSummary] = []
|
topics: list[TitleSummary] = []
|
||||||
|
|
||||||
async def on_topic(topic: TitleSummary):
|
async def on_topic(topic: TitleSummary):
|
||||||
|
|||||||
30
server/reflector/processors/prompts.py
Normal file
30
server/reflector/processors/prompts.py
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
"""
|
||||||
|
LLM prompts for transcript processing.
|
||||||
|
|
||||||
|
Extracted to a separate module to avoid circular imports when importing
|
||||||
|
from processor modules (which import LLM/settings at module level).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from textwrap import dedent
|
||||||
|
|
||||||
|
TOPIC_PROMPT = dedent(
|
||||||
|
"""
|
||||||
|
Analyze the following transcript segment and extract the main topic being discussed.
|
||||||
|
Focus on the substantive content and ignore small talk or administrative chatter.
|
||||||
|
|
||||||
|
Create a title that:
|
||||||
|
- Captures the specific subject matter being discussed
|
||||||
|
- Is descriptive and self-explanatory
|
||||||
|
- Uses professional language
|
||||||
|
- Is specific rather than generic
|
||||||
|
|
||||||
|
For the summary:
|
||||||
|
- Summarize the key points in maximum two sentences
|
||||||
|
- Focus on what was discussed, decided, or accomplished
|
||||||
|
- Be concise but informative
|
||||||
|
|
||||||
|
<transcript>
|
||||||
|
{text}
|
||||||
|
</transcript>
|
||||||
|
"""
|
||||||
|
).strip()
|
||||||
91
server/reflector/processors/summary/prompts.py
Normal file
91
server/reflector/processors/summary/prompts.py
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
"""
|
||||||
|
LLM prompts for summary generation.
|
||||||
|
|
||||||
|
Extracted to a separate module to avoid circular imports when importing
|
||||||
|
from summary_builder.py (which imports LLM/settings at module level).
|
||||||
|
"""
|
||||||
|
|
||||||
|
from textwrap import dedent
|
||||||
|
|
||||||
|
|
||||||
|
def build_participant_instructions(participant_names: list[str]) -> str:
|
||||||
|
"""Build participant context instructions for LLM prompts."""
|
||||||
|
if not participant_names:
|
||||||
|
return ""
|
||||||
|
|
||||||
|
participants_list = ", ".join(participant_names)
|
||||||
|
return dedent(
|
||||||
|
f"""
|
||||||
|
# IMPORTANT: Participant Names
|
||||||
|
The following participants are identified in this conversation: {participants_list}
|
||||||
|
|
||||||
|
You MUST use these specific participant names when referring to people in your response.
|
||||||
|
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
|
||||||
|
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
|
||||||
|
"""
|
||||||
|
).strip()
|
||||||
|
|
||||||
|
|
||||||
|
DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
|
||||||
|
"""
|
||||||
|
Get me information about the topic "{subject}"
|
||||||
|
|
||||||
|
# RESPONSE GUIDELINES
|
||||||
|
Follow this structured approach to create the topic summary:
|
||||||
|
- Highlight important arguments, insights, or data presented.
|
||||||
|
- Outline decisions made.
|
||||||
|
- Indicate any decisions reached, including any rationale or key factors
|
||||||
|
that influenced these decisions.
|
||||||
|
- Detail action items and responsibilities.
|
||||||
|
- For each decision or unresolved issue, list specific action items agreed
|
||||||
|
upon, along with assigned individuals or teams responsible for each task.
|
||||||
|
- Specify deadlines or timelines if mentioned. For each action item,
|
||||||
|
include any deadlines or timeframes discussed for completion or follow-up.
|
||||||
|
- Mention unresolved issues or topics needing further discussion, aiding in
|
||||||
|
planning future meetings or follow-up actions.
|
||||||
|
- Do not include topic unrelated to {subject}.
|
||||||
|
|
||||||
|
# OUTPUT
|
||||||
|
Your summary should be clear, concise, and structured, covering all major
|
||||||
|
points, decisions, and action items from the meeting. It should be easy to
|
||||||
|
understand for someone not present, providing a comprehensive understanding
|
||||||
|
of what transpired and what needs to be done next. The summary should not
|
||||||
|
exceed one page to ensure brevity and focus.
|
||||||
|
"""
|
||||||
|
).strip()
|
||||||
|
|
||||||
|
PARAGRAPH_SUMMARY_PROMPT = dedent(
|
||||||
|
"""
|
||||||
|
Summarize the mentioned topic in 1 paragraph.
|
||||||
|
It will be integrated into the final summary, so just for this topic.
|
||||||
|
"""
|
||||||
|
).strip()
|
||||||
|
|
||||||
|
RECAP_PROMPT = dedent(
|
||||||
|
"""
|
||||||
|
Provide a high-level quick recap of the following meeting, fitting in one paragraph.
|
||||||
|
Do not include decisions, action items or unresolved issue, just highlight the high moments.
|
||||||
|
Just dive into the meeting, be concise and do not include unnecessary details.
|
||||||
|
As we already know it is a meeting, do not start with 'During the meeting' or equivalent.
|
||||||
|
"""
|
||||||
|
).strip()
|
||||||
|
|
||||||
|
|
||||||
|
def build_summary_markdown(recap: str, summaries: list[dict[str, str]]) -> str:
|
||||||
|
"""Build markdown summary from recap and subject summaries."""
|
||||||
|
lines: list[str] = []
|
||||||
|
if recap:
|
||||||
|
lines.append("# Quick recap")
|
||||||
|
lines.append("")
|
||||||
|
lines.append(recap)
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
if summaries:
|
||||||
|
lines.append("# Summary")
|
||||||
|
lines.append("")
|
||||||
|
for summary in summaries:
|
||||||
|
lines.append(f"**{summary['subject']}**")
|
||||||
|
lines.append(summary["summary"])
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
return "\n".join(lines)
|
||||||
@@ -15,6 +15,13 @@ import structlog
|
|||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
|
|
||||||
from reflector.llm import LLM
|
from reflector.llm import LLM
|
||||||
|
from reflector.processors.summary.prompts import (
|
||||||
|
DETAILED_SUBJECT_PROMPT_TEMPLATE,
|
||||||
|
PARAGRAPH_SUMMARY_PROMPT,
|
||||||
|
RECAP_PROMPT,
|
||||||
|
build_participant_instructions,
|
||||||
|
build_summary_markdown,
|
||||||
|
)
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
|
|
||||||
T = TypeVar("T", bound=BaseModel)
|
T = TypeVar("T", bound=BaseModel)
|
||||||
@@ -52,50 +59,6 @@ SUBJECTS_PROMPT = dedent(
|
|||||||
"""
|
"""
|
||||||
).strip()
|
).strip()
|
||||||
|
|
||||||
DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
|
|
||||||
"""
|
|
||||||
Get me information about the topic "{subject}"
|
|
||||||
|
|
||||||
# RESPONSE GUIDELINES
|
|
||||||
Follow this structured approach to create the topic summary:
|
|
||||||
- Highlight important arguments, insights, or data presented.
|
|
||||||
- Outline decisions made.
|
|
||||||
- Indicate any decisions reached, including any rationale or key factors
|
|
||||||
that influenced these decisions.
|
|
||||||
- Detail action items and responsibilities.
|
|
||||||
- For each decision or unresolved issue, list specific action items agreed
|
|
||||||
upon, along with assigned individuals or teams responsible for each task.
|
|
||||||
- Specify deadlines or timelines if mentioned. For each action item,
|
|
||||||
include any deadlines or timeframes discussed for completion or follow-up.
|
|
||||||
- Mention unresolved issues or topics needing further discussion, aiding in
|
|
||||||
planning future meetings or follow-up actions.
|
|
||||||
- Do not include topic unrelated to {subject}.
|
|
||||||
|
|
||||||
# OUTPUT
|
|
||||||
Your summary should be clear, concise, and structured, covering all major
|
|
||||||
points, decisions, and action items from the meeting. It should be easy to
|
|
||||||
understand for someone not present, providing a comprehensive understanding
|
|
||||||
of what transpired and what needs to be done next. The summary should not
|
|
||||||
exceed one page to ensure brevity and focus.
|
|
||||||
"""
|
|
||||||
).strip()
|
|
||||||
|
|
||||||
PARAGRAPH_SUMMARY_PROMPT = dedent(
|
|
||||||
"""
|
|
||||||
Summarize the mentioned topic in 1 paragraph.
|
|
||||||
It will be integrated into the final summary, so just for this topic.
|
|
||||||
"""
|
|
||||||
).strip()
|
|
||||||
|
|
||||||
RECAP_PROMPT = dedent(
|
|
||||||
"""
|
|
||||||
Provide a high-level quick recap of the following meeting, fitting in one paragraph.
|
|
||||||
Do not include decisions, action items or unresolved issue, just highlight the high moments.
|
|
||||||
Just dive into the meeting, be concise and do not include unnecessary details.
|
|
||||||
As we already know it is a meeting, do not start with 'During the meeting' or equivalent.
|
|
||||||
"""
|
|
||||||
).strip()
|
|
||||||
|
|
||||||
ACTION_ITEMS_PROMPT = dedent(
|
ACTION_ITEMS_PROMPT = dedent(
|
||||||
"""
|
"""
|
||||||
Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next.
|
Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next.
|
||||||
@@ -331,17 +294,7 @@ class SummaryBuilder:
|
|||||||
participants_md = self.format_list_md(participants)
|
participants_md = self.format_list_md(participants)
|
||||||
self.transcript += f"\n\n# Participants\n\n{participants_md}"
|
self.transcript += f"\n\n# Participants\n\n{participants_md}"
|
||||||
|
|
||||||
participants_list = ", ".join(participants)
|
self.participant_instructions = build_participant_instructions(participants)
|
||||||
self.participant_instructions = dedent(
|
|
||||||
f"""
|
|
||||||
# IMPORTANT: Participant Names
|
|
||||||
The following participants are identified in this conversation: {participants_list}
|
|
||||||
|
|
||||||
You MUST use these specific participant names when referring to people in your response.
|
|
||||||
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
|
|
||||||
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
|
|
||||||
"""
|
|
||||||
).strip()
|
|
||||||
|
|
||||||
async def identify_participants(self) -> None:
|
async def identify_participants(self) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -377,18 +330,9 @@ class SummaryBuilder:
|
|||||||
participants_md = self.format_list_md(unique_participants)
|
participants_md = self.format_list_md(unique_participants)
|
||||||
self.transcript += f"\n\n# Participants\n\n{participants_md}"
|
self.transcript += f"\n\n# Participants\n\n{participants_md}"
|
||||||
|
|
||||||
# Set instructions that will be automatically added to all prompts
|
self.participant_instructions = build_participant_instructions(
|
||||||
participants_list = ", ".join(unique_participants)
|
unique_participants
|
||||||
self.participant_instructions = dedent(
|
)
|
||||||
f"""
|
|
||||||
# IMPORTANT: Participant Names
|
|
||||||
The following participants are identified in this conversation: {participants_list}
|
|
||||||
|
|
||||||
You MUST use these specific participant names when referring to people in your response.
|
|
||||||
Do NOT use generic terms like "a participant", "someone", "attendee", "Speaker 1", "Speaker 2", etc.
|
|
||||||
Always refer to people by their actual names (e.g., "John suggested..." not "A participant suggested...").
|
|
||||||
"""
|
|
||||||
).strip()
|
|
||||||
else:
|
else:
|
||||||
self.logger.warning("No participants identified in the transcript")
|
self.logger.warning("No participants identified in the transcript")
|
||||||
|
|
||||||
@@ -613,22 +557,7 @@ class SummaryBuilder:
|
|||||||
# ----------------------------------------------------------------------------
|
# ----------------------------------------------------------------------------
|
||||||
|
|
||||||
def as_markdown(self) -> str:
|
def as_markdown(self) -> str:
|
||||||
lines: list[str] = []
|
return build_summary_markdown(self.recap, self.summaries)
|
||||||
if self.recap:
|
|
||||||
lines.append("# Quick recap")
|
|
||||||
lines.append("")
|
|
||||||
lines.append(self.recap)
|
|
||||||
lines.append("")
|
|
||||||
|
|
||||||
if self.summaries:
|
|
||||||
lines.append("# Summary")
|
|
||||||
lines.append("")
|
|
||||||
for summary in self.summaries:
|
|
||||||
lines.append(f"**{summary['subject']}**")
|
|
||||||
lines.append(summary["summary"])
|
|
||||||
lines.append("")
|
|
||||||
|
|
||||||
return "\n".join(lines)
|
|
||||||
|
|
||||||
def format_list_md(self, data: list[str]) -> str:
|
def format_list_md(self, data: list[str]) -> str:
|
||||||
return "\n".join([f"- {item}" for item in data])
|
return "\n".join([f"- {item}" for item in data])
|
||||||
|
|||||||
@@ -1,35 +1,12 @@
|
|||||||
from textwrap import dedent
|
|
||||||
|
|
||||||
from pydantic import AliasChoices, BaseModel, Field
|
from pydantic import AliasChoices, BaseModel, Field
|
||||||
|
|
||||||
from reflector.llm import LLM
|
from reflector.llm import LLM
|
||||||
from reflector.processors.base import Processor
|
from reflector.processors.base import Processor
|
||||||
|
from reflector.processors.prompts import TOPIC_PROMPT
|
||||||
from reflector.processors.types import TitleSummary, Transcript
|
from reflector.processors.types import TitleSummary, Transcript
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
from reflector.utils.text import clean_title
|
from reflector.utils.text import clean_title
|
||||||
|
|
||||||
TOPIC_PROMPT = dedent(
|
|
||||||
"""
|
|
||||||
Analyze the following transcript segment and extract the main topic being discussed.
|
|
||||||
Focus on the substantive content and ignore small talk or administrative chatter.
|
|
||||||
|
|
||||||
Create a title that:
|
|
||||||
- Captures the specific subject matter being discussed
|
|
||||||
- Is descriptive and self-explanatory
|
|
||||||
- Uses professional language
|
|
||||||
- Is specific rather than generic
|
|
||||||
|
|
||||||
For the summary:
|
|
||||||
- Summarize the key points in maximum two sentences
|
|
||||||
- Focus on what was discussed, decided, or accomplished
|
|
||||||
- Be concise but informative
|
|
||||||
|
|
||||||
<transcript>
|
|
||||||
{text}
|
|
||||||
</transcript>
|
|
||||||
"""
|
|
||||||
).strip()
|
|
||||||
|
|
||||||
|
|
||||||
class TopicResponse(BaseModel):
|
class TopicResponse(BaseModel):
|
||||||
"""Structured response for topic detection"""
|
"""Structured response for topic detection"""
|
||||||
|
|||||||
@@ -102,7 +102,8 @@ async def validate_transcript_for_processing(
|
|||||||
if transcript.locked:
|
if transcript.locked:
|
||||||
return ValidationLocked(detail="Recording is locked")
|
return ValidationLocked(detail="Recording is locked")
|
||||||
|
|
||||||
if transcript.status == "idle":
|
# hatchet is idempotent anyways + if it wasn't dispatched successfully
|
||||||
|
if transcript.status == "idle" and not settings.HATCHET_ENABLED:
|
||||||
return ValidationNotReady(detail="Recording is not ready for processing")
|
return ValidationNotReady(detail="Recording is not ready for processing")
|
||||||
|
|
||||||
# Check Celery tasks
|
# Check Celery tasks
|
||||||
@@ -115,13 +116,11 @@ async def validate_transcript_for_processing(
|
|||||||
):
|
):
|
||||||
return ValidationAlreadyScheduled(detail="already running")
|
return ValidationAlreadyScheduled(detail="already running")
|
||||||
|
|
||||||
# Check Hatchet workflows (if enabled)
|
|
||||||
if settings.HATCHET_ENABLED and transcript.workflow_run_id:
|
if settings.HATCHET_ENABLED and transcript.workflow_run_id:
|
||||||
try:
|
try:
|
||||||
status = await HatchetClientManager.get_workflow_run_status(
|
status = await HatchetClientManager.get_workflow_run_status(
|
||||||
transcript.workflow_run_id
|
transcript.workflow_run_id
|
||||||
)
|
)
|
||||||
# If workflow is running or queued, don't allow new processing
|
|
||||||
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
|
if status in (V1TaskStatus.RUNNING, V1TaskStatus.QUEUED):
|
||||||
return ValidationAlreadyScheduled(
|
return ValidationAlreadyScheduled(
|
||||||
detail="Hatchet workflow already running"
|
detail="Hatchet workflow already running"
|
||||||
|
|||||||
@@ -3,8 +3,12 @@ from pathlib import Path
|
|||||||
import av
|
import av
|
||||||
import numpy as np
|
import numpy as np
|
||||||
|
|
||||||
|
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
|
||||||
|
|
||||||
def get_audio_waveform(path: Path | str, segments_count: int = 256) -> list[int]:
|
|
||||||
|
def get_audio_waveform(
|
||||||
|
path: Path | str, segments_count: int = WAVEFORM_SEGMENTS
|
||||||
|
) -> list[int]:
|
||||||
if isinstance(path, Path):
|
if isinstance(path, Path):
|
||||||
path = path.as_posix()
|
path = path.as_posix()
|
||||||
|
|
||||||
@@ -70,7 +74,7 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
parser = argparse.ArgumentParser()
|
parser = argparse.ArgumentParser()
|
||||||
parser.add_argument("path", type=Path)
|
parser.add_argument("path", type=Path)
|
||||||
parser.add_argument("--segments-count", type=int, default=256)
|
parser.add_argument("--segments-count", type=int, default=WAVEFORM_SEGMENTS)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
print(get_audio_waveform(args.path, args.segments_count))
|
print(get_audio_waveform(args.path, args.segments_count))
|
||||||
|
|||||||
8
server/reflector/utils/transcript_constants.py
Normal file
8
server/reflector/utils/transcript_constants.py
Normal file
@@ -0,0 +1,8 @@
|
|||||||
|
"""
|
||||||
|
Shared transcript processing constants.
|
||||||
|
|
||||||
|
Used by both Hatchet workflows and Celery pipelines for consistent processing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Topic detection: number of words per chunk for topic extraction
|
||||||
|
TOPIC_CHUNK_WORD_COUNT = 300
|
||||||
219
server/reflector/utils/webhook.py
Normal file
219
server/reflector/utils/webhook.py
Normal file
@@ -0,0 +1,219 @@
|
|||||||
|
"""Webhook utilities.
|
||||||
|
|
||||||
|
Shared webhook functionality for both Hatchet and Celery pipelines.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import hmac
|
||||||
|
import uuid
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from reflector.logger import logger
|
||||||
|
from reflector.settings import settings
|
||||||
|
from reflector.utils.string import NonEmptyString
|
||||||
|
from reflector.utils.webhook_outgoing_models import (
|
||||||
|
WebhookCalendarEventPayload,
|
||||||
|
WebhookParticipantPayload,
|
||||||
|
WebhookPayload,
|
||||||
|
WebhookRoomPayload,
|
||||||
|
WebhookTestPayload,
|
||||||
|
WebhookTopicPayload,
|
||||||
|
WebhookTranscriptPayload,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"build_transcript_webhook_payload",
|
||||||
|
"build_test_webhook_payload",
|
||||||
|
"build_webhook_headers",
|
||||||
|
"generate_webhook_signature",
|
||||||
|
"send_webhook_request",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _serialize_payload(payload: BaseModel) -> bytes:
|
||||||
|
"""Serialize Pydantic model to compact JSON bytes."""
|
||||||
|
return payload.model_dump_json(by_alias=True, exclude_none=False).encode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
|
||||||
|
"""Generate HMAC-SHA256 signature for webhook payload."""
|
||||||
|
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
|
||||||
|
hmac_obj = hmac.new(
|
||||||
|
secret.encode("utf-8"),
|
||||||
|
signed_payload.encode("utf-8"),
|
||||||
|
hashlib.sha256,
|
||||||
|
)
|
||||||
|
return hmac_obj.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def build_webhook_headers(
|
||||||
|
event_type: str,
|
||||||
|
payload_bytes: bytes,
|
||||||
|
webhook_secret: str | None = None,
|
||||||
|
retry_count: int = 0,
|
||||||
|
) -> dict[str, str]:
|
||||||
|
headers = {
|
||||||
|
"Content-Type": "application/json",
|
||||||
|
"User-Agent": "Reflector-Webhook/1.0",
|
||||||
|
"X-Webhook-Event": event_type,
|
||||||
|
"X-Webhook-Retry": str(retry_count),
|
||||||
|
}
|
||||||
|
|
||||||
|
if webhook_secret:
|
||||||
|
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
|
||||||
|
signature = generate_webhook_signature(payload_bytes, webhook_secret, timestamp)
|
||||||
|
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
|
||||||
|
|
||||||
|
return headers
|
||||||
|
|
||||||
|
|
||||||
|
async def send_webhook_request(
|
||||||
|
url: str,
|
||||||
|
payload: BaseModel,
|
||||||
|
event_type: str,
|
||||||
|
webhook_secret: str | None = None,
|
||||||
|
retry_count: int = 0,
|
||||||
|
timeout: float = 30.0,
|
||||||
|
) -> httpx.Response:
|
||||||
|
"""Send webhook request with proper headers and signature.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
httpx.HTTPStatusError: On non-2xx response
|
||||||
|
httpx.ConnectError: On connection failure
|
||||||
|
httpx.TimeoutException: On timeout
|
||||||
|
"""
|
||||||
|
payload_bytes = _serialize_payload(payload)
|
||||||
|
|
||||||
|
headers = build_webhook_headers(
|
||||||
|
event_type=event_type,
|
||||||
|
payload_bytes=payload_bytes,
|
||||||
|
webhook_secret=webhook_secret,
|
||||||
|
retry_count=retry_count,
|
||||||
|
)
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
||||||
|
response = await client.post(url, content=payload_bytes, headers=headers)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
async def build_transcript_webhook_payload(
|
||||||
|
transcript_id: NonEmptyString,
|
||||||
|
room_id: NonEmptyString,
|
||||||
|
) -> WebhookPayload | None:
|
||||||
|
"""Build webhook payload by fetching transcript and room data from database."""
|
||||||
|
# Inline imports required: this utils module would create circular imports
|
||||||
|
# if db modules were imported at top level (utils -> db -> ... -> utils).
|
||||||
|
# This pattern is consistent with Hatchet task files.
|
||||||
|
from reflector.db.calendar_events import calendar_events_controller # noqa: PLC0415
|
||||||
|
from reflector.db.meetings import meetings_controller # noqa: PLC0415
|
||||||
|
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||||
|
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||||
|
from reflector.utils.webvtt import topics_to_webvtt # noqa: PLC0415
|
||||||
|
|
||||||
|
transcript = await transcripts_controller.get_by_id(transcript_id)
|
||||||
|
if not transcript:
|
||||||
|
return None
|
||||||
|
|
||||||
|
room = await rooms_controller.get_by_id(room_id)
|
||||||
|
if not room:
|
||||||
|
return None
|
||||||
|
|
||||||
|
topics_data = [
|
||||||
|
WebhookTopicPayload(
|
||||||
|
title=topic.title,
|
||||||
|
summary=topic.summary,
|
||||||
|
timestamp=topic.timestamp,
|
||||||
|
duration=topic.duration,
|
||||||
|
webvtt=topics_to_webvtt([topic]) if topic.words else "",
|
||||||
|
)
|
||||||
|
for topic in (transcript.topics or [])
|
||||||
|
]
|
||||||
|
|
||||||
|
participants_data = [
|
||||||
|
WebhookParticipantPayload(id=p.id, name=p.name, speaker=p.speaker)
|
||||||
|
for p in (transcript.participants or [])
|
||||||
|
]
|
||||||
|
|
||||||
|
calendar_event_data: WebhookCalendarEventPayload | None = None
|
||||||
|
try:
|
||||||
|
if transcript.meeting_id:
|
||||||
|
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
|
||||||
|
if meeting and meeting.calendar_event_id:
|
||||||
|
calendar_event = await calendar_events_controller.get_by_id(
|
||||||
|
meeting.calendar_event_id
|
||||||
|
)
|
||||||
|
if calendar_event:
|
||||||
|
calendar_event_data = WebhookCalendarEventPayload(
|
||||||
|
id=calendar_event.id,
|
||||||
|
ics_uid=calendar_event.ics_uid,
|
||||||
|
title=calendar_event.title,
|
||||||
|
start_time=calendar_event.start_time,
|
||||||
|
end_time=calendar_event.end_time,
|
||||||
|
description=calendar_event.description or None,
|
||||||
|
location=calendar_event.location or None,
|
||||||
|
attendees=calendar_event.attendees or None,
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"Failed to fetch calendar event for webhook",
|
||||||
|
transcript_id=transcript_id,
|
||||||
|
meeting_id=transcript.meeting_id,
|
||||||
|
error=str(e),
|
||||||
|
)
|
||||||
|
|
||||||
|
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
|
||||||
|
|
||||||
|
return WebhookPayload(
|
||||||
|
event="transcript.completed",
|
||||||
|
event_id=uuid.uuid4().hex,
|
||||||
|
timestamp=datetime.now(timezone.utc),
|
||||||
|
transcript=WebhookTranscriptPayload(
|
||||||
|
id=transcript.id,
|
||||||
|
room_id=transcript.room_id,
|
||||||
|
created_at=transcript.created_at,
|
||||||
|
duration=transcript.duration,
|
||||||
|
title=transcript.title,
|
||||||
|
short_summary=transcript.short_summary,
|
||||||
|
long_summary=transcript.long_summary,
|
||||||
|
webvtt=transcript.webvtt,
|
||||||
|
topics=topics_data,
|
||||||
|
participants=participants_data,
|
||||||
|
source_language=transcript.source_language,
|
||||||
|
target_language=transcript.target_language,
|
||||||
|
status=transcript.status,
|
||||||
|
frontend_url=frontend_url,
|
||||||
|
action_items=transcript.action_items,
|
||||||
|
),
|
||||||
|
room=WebhookRoomPayload(
|
||||||
|
id=room.id,
|
||||||
|
name=room.name,
|
||||||
|
),
|
||||||
|
calendar_event=calendar_event_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def build_test_webhook_payload(
|
||||||
|
room_id: NonEmptyString,
|
||||||
|
) -> WebhookTestPayload | None:
|
||||||
|
"""Build test webhook payload."""
|
||||||
|
# Inline import: avoid circular dependency (utils -> db -> utils)
|
||||||
|
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||||
|
|
||||||
|
room = await rooms_controller.get_by_id(room_id)
|
||||||
|
if not room:
|
||||||
|
return None
|
||||||
|
|
||||||
|
return WebhookTestPayload(
|
||||||
|
event="test",
|
||||||
|
event_id=uuid.uuid4().hex,
|
||||||
|
timestamp=datetime.now(timezone.utc),
|
||||||
|
message="This is a test webhook from Reflector",
|
||||||
|
room=WebhookRoomPayload(
|
||||||
|
id=room.id,
|
||||||
|
name=room.name,
|
||||||
|
),
|
||||||
|
)
|
||||||
80
server/reflector/utils/webhook_outgoing_models.py
Normal file
80
server/reflector/utils/webhook_outgoing_models.py
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
"""Pydantic models for outgoing webhook payloads.
|
||||||
|
|
||||||
|
These models define the structure of webhook payloads sent by Reflector
|
||||||
|
to external services when transcript processing completes.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
from typing import Literal
|
||||||
|
|
||||||
|
from pydantic import BaseModel
|
||||||
|
|
||||||
|
from reflector.utils.string import NonEmptyString
|
||||||
|
|
||||||
|
WebhookTranscriptEventType = Literal["transcript.completed"]
|
||||||
|
WebhookTestEventType = Literal["test"]
|
||||||
|
|
||||||
|
|
||||||
|
class WebhookTopicPayload(BaseModel):
|
||||||
|
title: NonEmptyString
|
||||||
|
summary: NonEmptyString
|
||||||
|
timestamp: float
|
||||||
|
duration: float | None
|
||||||
|
webvtt: str # can be empty when no words
|
||||||
|
|
||||||
|
|
||||||
|
class WebhookParticipantPayload(BaseModel):
|
||||||
|
id: NonEmptyString
|
||||||
|
name: str | None
|
||||||
|
speaker: int | None
|
||||||
|
|
||||||
|
|
||||||
|
class WebhookRoomPayload(BaseModel):
|
||||||
|
id: NonEmptyString
|
||||||
|
name: NonEmptyString
|
||||||
|
|
||||||
|
|
||||||
|
class WebhookCalendarEventPayload(BaseModel):
|
||||||
|
id: NonEmptyString
|
||||||
|
ics_uid: str | None = None
|
||||||
|
title: str | None = None
|
||||||
|
start_time: datetime | None = None
|
||||||
|
end_time: datetime | None = None
|
||||||
|
description: str | None = None
|
||||||
|
location: str | None = None
|
||||||
|
attendees: list[str] | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class WebhookTranscriptPayload(BaseModel):
|
||||||
|
id: NonEmptyString
|
||||||
|
room_id: NonEmptyString | None
|
||||||
|
created_at: datetime
|
||||||
|
duration: float | None
|
||||||
|
title: str | None
|
||||||
|
short_summary: str | None
|
||||||
|
long_summary: str | None
|
||||||
|
webvtt: str | None
|
||||||
|
topics: list[WebhookTopicPayload]
|
||||||
|
participants: list[WebhookParticipantPayload]
|
||||||
|
source_language: NonEmptyString
|
||||||
|
target_language: NonEmptyString
|
||||||
|
status: NonEmptyString
|
||||||
|
frontend_url: NonEmptyString
|
||||||
|
action_items: dict | None
|
||||||
|
|
||||||
|
|
||||||
|
class WebhookPayload(BaseModel):
|
||||||
|
event: WebhookTranscriptEventType
|
||||||
|
event_id: NonEmptyString
|
||||||
|
timestamp: datetime
|
||||||
|
transcript: WebhookTranscriptPayload
|
||||||
|
room: WebhookRoomPayload
|
||||||
|
calendar_event: WebhookCalendarEventPayload | None = None
|
||||||
|
|
||||||
|
|
||||||
|
class WebhookTestPayload(BaseModel):
|
||||||
|
event: WebhookTestEventType
|
||||||
|
event_id: NonEmptyString
|
||||||
|
timestamp: datetime
|
||||||
|
message: NonEmptyString
|
||||||
|
room: WebhookRoomPayload
|
||||||
@@ -300,7 +300,7 @@ async def _process_multitrack_recording_inner(
|
|||||||
|
|
||||||
if use_hatchet:
|
if use_hatchet:
|
||||||
workflow_id = await HatchetClientManager.start_workflow(
|
workflow_id = await HatchetClientManager.start_workflow(
|
||||||
workflow_name="DiarizationPipeline",
|
workflow_name="DailyMultitrackPipeline",
|
||||||
input_data={
|
input_data={
|
||||||
"recording_id": recording_id,
|
"recording_id": recording_id,
|
||||||
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
|
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
|
||||||
@@ -848,7 +848,7 @@ async def reprocess_failed_daily_recordings():
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
workflow_id = await HatchetClientManager.start_workflow(
|
workflow_id = await HatchetClientManager.start_workflow(
|
||||||
workflow_name="DiarizationPipeline",
|
workflow_name="DailyMultitrackPipeline",
|
||||||
input_data={
|
input_data={
|
||||||
"recording_id": recording.id,
|
"recording_id": recording.id,
|
||||||
"tracks": [
|
"tracks": [
|
||||||
|
|||||||
@@ -1,8 +1,5 @@
|
|||||||
"""Webhook task for sending transcript notifications."""
|
"""Webhook task for sending transcript notifications."""
|
||||||
|
|
||||||
import hashlib
|
|
||||||
import hmac
|
|
||||||
import json
|
|
||||||
import uuid
|
import uuid
|
||||||
from datetime import datetime, timezone
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
@@ -11,28 +8,20 @@ import structlog
|
|||||||
from celery import shared_task
|
from celery import shared_task
|
||||||
from celery.utils.log import get_task_logger
|
from celery.utils.log import get_task_logger
|
||||||
|
|
||||||
from reflector.db.calendar_events import calendar_events_controller
|
|
||||||
from reflector.db.meetings import meetings_controller
|
|
||||||
from reflector.db.rooms import rooms_controller
|
from reflector.db.rooms import rooms_controller
|
||||||
from reflector.db.transcripts import transcripts_controller
|
|
||||||
from reflector.pipelines.main_live_pipeline import asynctask
|
from reflector.pipelines.main_live_pipeline import asynctask
|
||||||
from reflector.settings import settings
|
from reflector.utils.webhook import (
|
||||||
from reflector.utils.webvtt import topics_to_webvtt
|
WebhookRoomPayload,
|
||||||
|
WebhookTestPayload,
|
||||||
|
_serialize_payload,
|
||||||
|
build_transcript_webhook_payload,
|
||||||
|
build_webhook_headers,
|
||||||
|
send_webhook_request,
|
||||||
|
)
|
||||||
|
|
||||||
logger = structlog.wrap_logger(get_task_logger(__name__))
|
logger = structlog.wrap_logger(get_task_logger(__name__))
|
||||||
|
|
||||||
|
|
||||||
def generate_webhook_signature(payload: bytes, secret: str, timestamp: str) -> str:
|
|
||||||
"""Generate HMAC signature for webhook payload."""
|
|
||||||
signed_payload = f"{timestamp}.{payload.decode('utf-8')}"
|
|
||||||
hmac_obj = hmac.new(
|
|
||||||
secret.encode("utf-8"),
|
|
||||||
signed_payload.encode("utf-8"),
|
|
||||||
hashlib.sha256,
|
|
||||||
)
|
|
||||||
return hmac_obj.hexdigest()
|
|
||||||
|
|
||||||
|
|
||||||
@shared_task(
|
@shared_task(
|
||||||
bind=True,
|
bind=True,
|
||||||
max_retries=30,
|
max_retries=30,
|
||||||
@@ -47,6 +36,11 @@ async def send_transcript_webhook(
|
|||||||
room_id: str,
|
room_id: str,
|
||||||
event_id: str,
|
event_id: str,
|
||||||
):
|
):
|
||||||
|
"""Send webhook notification for completed transcript.
|
||||||
|
|
||||||
|
Uses shared Pydantic models and signature generation from utils/webhook.py
|
||||||
|
to ensure consistency with Hatchet pipeline.
|
||||||
|
"""
|
||||||
log = logger.bind(
|
log = logger.bind(
|
||||||
transcript_id=transcript_id,
|
transcript_id=transcript_id,
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
@@ -54,12 +48,6 @@ async def send_transcript_webhook(
|
|||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Fetch transcript and room
|
|
||||||
transcript = await transcripts_controller.get_by_id(transcript_id)
|
|
||||||
if not transcript:
|
|
||||||
log.error("Transcript not found, skipping webhook")
|
|
||||||
return
|
|
||||||
|
|
||||||
room = await rooms_controller.get_by_id(room_id)
|
room = await rooms_controller.get_by_id(room_id)
|
||||||
if not room:
|
if not room:
|
||||||
log.error("Room not found, skipping webhook")
|
log.error("Room not found, skipping webhook")
|
||||||
@@ -69,135 +57,37 @@ async def send_transcript_webhook(
|
|||||||
log.info("No webhook URL configured for room, skipping")
|
log.info("No webhook URL configured for room, skipping")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Generate WebVTT content from topics
|
# Build payload using shared function
|
||||||
topics_data = []
|
payload = await build_transcript_webhook_payload(
|
||||||
|
transcript_id=transcript_id,
|
||||||
|
room_id=room_id,
|
||||||
|
)
|
||||||
|
|
||||||
if transcript.topics:
|
if not payload:
|
||||||
# Build topics data with diarized content per topic
|
log.error("Could not build webhook payload, skipping")
|
||||||
for topic in transcript.topics:
|
return
|
||||||
topic_webvtt = topics_to_webvtt([topic]) if topic.words else ""
|
|
||||||
topics_data.append(
|
|
||||||
{
|
|
||||||
"title": topic.title,
|
|
||||||
"summary": topic.summary,
|
|
||||||
"timestamp": topic.timestamp,
|
|
||||||
"duration": topic.duration,
|
|
||||||
"webvtt": topic_webvtt,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Fetch meeting and calendar event if they exist
|
log.info(
|
||||||
calendar_event = None
|
"Sending webhook",
|
||||||
try:
|
url=room.webhook_url,
|
||||||
if transcript.meeting_id:
|
topics=len(payload.transcript.topics),
|
||||||
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
|
participants=len(payload.transcript.participants),
|
||||||
if meeting and meeting.calendar_event_id:
|
)
|
||||||
calendar_event = await calendar_events_controller.get_by_id(
|
|
||||||
meeting.calendar_event_id
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error("Error fetching meeting or calendar event", error=str(e))
|
|
||||||
|
|
||||||
# Build webhook payload
|
response = await send_webhook_request(
|
||||||
frontend_url = f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
|
url=room.webhook_url,
|
||||||
participants = [
|
payload=payload,
|
||||||
{"id": p.id, "name": p.name, "speaker": p.speaker}
|
event_type="transcript.completed",
|
||||||
for p in (transcript.participants or [])
|
webhook_secret=room.webhook_secret,
|
||||||
]
|
retry_count=self.request.retries,
|
||||||
payload_data = {
|
timeout=30.0,
|
||||||
"event": "transcript.completed",
|
)
|
||||||
"event_id": event_id,
|
|
||||||
"timestamp": datetime.now(timezone.utc).isoformat(),
|
|
||||||
"transcript": {
|
|
||||||
"id": transcript.id,
|
|
||||||
"room_id": transcript.room_id,
|
|
||||||
"created_at": transcript.created_at.isoformat(),
|
|
||||||
"duration": transcript.duration,
|
|
||||||
"title": transcript.title,
|
|
||||||
"short_summary": transcript.short_summary,
|
|
||||||
"long_summary": transcript.long_summary,
|
|
||||||
"webvtt": transcript.webvtt,
|
|
||||||
"topics": topics_data,
|
|
||||||
"participants": participants,
|
|
||||||
"source_language": transcript.source_language,
|
|
||||||
"target_language": transcript.target_language,
|
|
||||||
"status": transcript.status,
|
|
||||||
"frontend_url": frontend_url,
|
|
||||||
"action_items": transcript.action_items,
|
|
||||||
},
|
|
||||||
"room": {
|
|
||||||
"id": room.id,
|
|
||||||
"name": room.name,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
# Always include calendar_event field, even if no event is present
|
log.info(
|
||||||
payload_data["calendar_event"] = {}
|
"Webhook sent successfully",
|
||||||
|
status_code=response.status_code,
|
||||||
# Add calendar event data if present
|
response_size=len(response.content),
|
||||||
if calendar_event:
|
)
|
||||||
calendar_data = {
|
|
||||||
"id": calendar_event.id,
|
|
||||||
"ics_uid": calendar_event.ics_uid,
|
|
||||||
"title": calendar_event.title,
|
|
||||||
"start_time": calendar_event.start_time.isoformat()
|
|
||||||
if calendar_event.start_time
|
|
||||||
else None,
|
|
||||||
"end_time": calendar_event.end_time.isoformat()
|
|
||||||
if calendar_event.end_time
|
|
||||||
else None,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Add optional fields only if they exist
|
|
||||||
if calendar_event.description:
|
|
||||||
calendar_data["description"] = calendar_event.description
|
|
||||||
if calendar_event.location:
|
|
||||||
calendar_data["location"] = calendar_event.location
|
|
||||||
if calendar_event.attendees:
|
|
||||||
calendar_data["attendees"] = calendar_event.attendees
|
|
||||||
|
|
||||||
payload_data["calendar_event"] = calendar_data
|
|
||||||
|
|
||||||
# Convert to JSON
|
|
||||||
payload_json = json.dumps(payload_data, separators=(",", ":"))
|
|
||||||
payload_bytes = payload_json.encode("utf-8")
|
|
||||||
|
|
||||||
# Generate signature if secret is configured
|
|
||||||
headers = {
|
|
||||||
"Content-Type": "application/json",
|
|
||||||
"User-Agent": "Reflector-Webhook/1.0",
|
|
||||||
"X-Webhook-Event": "transcript.completed",
|
|
||||||
"X-Webhook-Retry": str(self.request.retries),
|
|
||||||
}
|
|
||||||
|
|
||||||
if room.webhook_secret:
|
|
||||||
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
|
|
||||||
signature = generate_webhook_signature(
|
|
||||||
payload_bytes, room.webhook_secret, timestamp
|
|
||||||
)
|
|
||||||
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
|
|
||||||
|
|
||||||
# Send webhook with timeout
|
|
||||||
async with httpx.AsyncClient(timeout=30.0) as client:
|
|
||||||
log.info(
|
|
||||||
"Sending webhook",
|
|
||||||
url=room.webhook_url,
|
|
||||||
payload_size=len(payload_bytes),
|
|
||||||
)
|
|
||||||
|
|
||||||
response = await client.post(
|
|
||||||
room.webhook_url,
|
|
||||||
content=payload_bytes,
|
|
||||||
headers=headers,
|
|
||||||
)
|
|
||||||
|
|
||||||
response.raise_for_status()
|
|
||||||
|
|
||||||
log.info(
|
|
||||||
"Webhook sent successfully",
|
|
||||||
status_code=response.status_code,
|
|
||||||
response_size=len(response.content),
|
|
||||||
)
|
|
||||||
|
|
||||||
except httpx.HTTPStatusError as e:
|
except httpx.HTTPStatusError as e:
|
||||||
log.error(
|
log.error(
|
||||||
@@ -226,8 +116,8 @@ async def send_transcript_webhook(
|
|||||||
|
|
||||||
|
|
||||||
async def test_webhook(room_id: str) -> dict:
|
async def test_webhook(room_id: str) -> dict:
|
||||||
"""
|
"""Test webhook configuration by sending a sample payload.
|
||||||
Test webhook configuration by sending a sample payload.
|
|
||||||
Returns immediately with success/failure status.
|
Returns immediately with success/failure status.
|
||||||
This is the shared implementation used by both the API endpoint and Celery task.
|
This is the shared implementation used by both the API endpoint and Celery task.
|
||||||
"""
|
"""
|
||||||
@@ -239,34 +129,24 @@ async def test_webhook(room_id: str) -> dict:
|
|||||||
if not room.webhook_url:
|
if not room.webhook_url:
|
||||||
return {"success": False, "error": "No webhook URL configured"}
|
return {"success": False, "error": "No webhook URL configured"}
|
||||||
|
|
||||||
now = (datetime.now(timezone.utc).isoformat(),)
|
payload = WebhookTestPayload(
|
||||||
payload_data = {
|
event="test",
|
||||||
"event": "test",
|
event_id=uuid.uuid4().hex,
|
||||||
"event_id": uuid.uuid4().hex,
|
timestamp=datetime.now(timezone.utc),
|
||||||
"timestamp": now,
|
message="This is a test webhook from Reflector",
|
||||||
"message": "This is a test webhook from Reflector",
|
room=WebhookRoomPayload(
|
||||||
"room": {
|
id=room.id,
|
||||||
"id": room.id,
|
name=room.name,
|
||||||
"name": room.name,
|
),
|
||||||
},
|
)
|
||||||
}
|
|
||||||
|
|
||||||
payload_json = json.dumps(payload_data, separators=(",", ":"))
|
payload_bytes = _serialize_payload(payload)
|
||||||
payload_bytes = payload_json.encode("utf-8")
|
|
||||||
|
|
||||||
# Generate headers with signature
|
headers = build_webhook_headers(
|
||||||
headers = {
|
event_type="test",
|
||||||
"Content-Type": "application/json",
|
payload_bytes=payload_bytes,
|
||||||
"User-Agent": "Reflector-Webhook/1.0",
|
webhook_secret=room.webhook_secret,
|
||||||
"X-Webhook-Event": "test",
|
)
|
||||||
}
|
|
||||||
|
|
||||||
if room.webhook_secret:
|
|
||||||
timestamp = str(int(datetime.now(timezone.utc).timestamp()))
|
|
||||||
signature = generate_webhook_signature(
|
|
||||||
payload_bytes, room.webhook_secret, timestamp
|
|
||||||
)
|
|
||||||
headers["X-Webhook-Signature"] = f"t={timestamp},v1={signature}"
|
|
||||||
|
|
||||||
# Send test webhook with short timeout
|
# Send test webhook with short timeout
|
||||||
async with httpx.AsyncClient(timeout=10.0) as client:
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||||
|
|||||||
Reference in New Issue
Block a user