mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-24 06:09:07 +00:00
pipeline fixes: whereby Hatchet preparation
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
"""
|
||||
Run Hatchet workers for the diarization pipeline.
|
||||
Run Hatchet workers for processing pipelines.
|
||||
Runs as a separate process, just like Celery workers.
|
||||
|
||||
Usage:
|
||||
@@ -39,7 +39,7 @@ def main() -> None:
|
||||
# Can't use lazy init: decorators need the client object when function is defined.
|
||||
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
|
||||
from reflector.hatchet.workflows import ( # noqa: PLC0415
|
||||
diarization_pipeline,
|
||||
daily_multitrack_pipeline,
|
||||
subject_workflow,
|
||||
topic_chunk_workflow,
|
||||
track_workflow,
|
||||
@@ -54,7 +54,7 @@ def main() -> None:
|
||||
worker = hatchet.worker(
|
||||
"reflector-pipeline-worker",
|
||||
workflows=[
|
||||
diarization_pipeline,
|
||||
daily_multitrack_pipeline,
|
||||
subject_workflow,
|
||||
topic_chunk_workflow,
|
||||
track_workflow,
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
"""Hatchet workflow definitions."""
|
||||
|
||||
from reflector.hatchet.workflows.diarization_pipeline import (
|
||||
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||
PipelineInput,
|
||||
diarization_pipeline,
|
||||
daily_multitrack_pipeline,
|
||||
)
|
||||
from reflector.hatchet.workflows.subject_processing import (
|
||||
SubjectInput,
|
||||
@@ -15,7 +15,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
|
||||
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
||||
|
||||
__all__ = [
|
||||
"diarization_pipeline",
|
||||
"daily_multitrack_pipeline",
|
||||
"subject_workflow",
|
||||
"topic_chunk_workflow",
|
||||
"track_workflow",
|
||||
|
||||
@@ -1,9 +1,13 @@
|
||||
"""
|
||||
Hatchet main workflow: DiarizationPipeline
|
||||
Hatchet main workflow: DailyMultitrackPipeline
|
||||
|
||||
Multitrack diarization pipeline for Daily.co recordings.
|
||||
Multitrack processing pipeline for Daily.co recordings.
|
||||
Orchestrates the full processing flow from recording metadata to final transcript.
|
||||
|
||||
Note: This workflow was previously named DiarizationPipeline, which was misleading.
|
||||
Daily.co recordings don't require ML diarization - speaker identification comes from
|
||||
track index (each participant's audio is a separate track).
|
||||
|
||||
Note: This file uses deferred imports (inside functions/tasks) intentionally.
|
||||
Hatchet workers run in forked processes; fresh imports per task ensure DB connections
|
||||
are not shared across forks, avoiding connection pooling issues.
|
||||
@@ -93,7 +97,7 @@ from reflector.zulip import post_transcript_notification
|
||||
|
||||
|
||||
class PipelineInput(BaseModel):
|
||||
"""Input to trigger the diarization pipeline."""
|
||||
"""Input to trigger the Daily.co multitrack pipeline."""
|
||||
|
||||
recording_id: NonEmptyString
|
||||
tracks: list[dict] # List of {"s3_key": str}
|
||||
@@ -104,8 +108,8 @@ class PipelineInput(BaseModel):
|
||||
|
||||
hatchet = HatchetClientManager.get_client()
|
||||
|
||||
diarization_pipeline = hatchet.workflow(
|
||||
name="DiarizationPipeline", input_validator=PipelineInput
|
||||
daily_multitrack_pipeline = hatchet.workflow(
|
||||
name="DailyMultitrackPipeline", input_validator=PipelineInput
|
||||
)
|
||||
|
||||
|
||||
@@ -191,7 +195,7 @@ def with_error_handling(step_name: str, set_error_status: bool = True) -> Callab
|
||||
return decorator
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
|
||||
)
|
||||
@with_error_handling("get_recording")
|
||||
@@ -244,7 +248,7 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
|
||||
)
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[get_recording],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
|
||||
retries=3,
|
||||
@@ -335,7 +339,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
|
||||
)
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[get_participants],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||
retries=3,
|
||||
@@ -404,7 +408,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
|
||||
)
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[process_tracks],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_AUDIO),
|
||||
retries=3,
|
||||
@@ -487,7 +491,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
|
||||
)
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[mixdown_tracks],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
|
||||
retries=3,
|
||||
@@ -555,7 +559,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
|
||||
return WaveformResult(waveform_generated=True)
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[mixdown_tracks],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||
retries=3,
|
||||
@@ -658,7 +662,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
|
||||
return TopicsResult(topics=topics_list)
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[detect_topics],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||
retries=3,
|
||||
@@ -721,7 +725,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
|
||||
return TitleResult(title=title_result)
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[detect_topics],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
|
||||
retries=3,
|
||||
@@ -802,7 +806,7 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
|
||||
)
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[extract_subjects],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||
retries=3,
|
||||
@@ -844,7 +848,7 @@ async def process_subjects(input: PipelineInput, ctx: Context) -> ProcessSubject
|
||||
return ProcessSubjectsResult(subject_summaries=subject_summaries)
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[process_subjects],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
|
||||
retries=3,
|
||||
@@ -938,7 +942,7 @@ async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
|
||||
return RecapResult(short_summary=short_summary, long_summary=long_summary)
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[extract_subjects],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_LONG),
|
||||
retries=3,
|
||||
@@ -1009,7 +1013,7 @@ async def identify_action_items(
|
||||
return ActionItemsResult(action_items=action_items_dict)
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[generate_waveform, generate_title, generate_recap, identify_action_items],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
|
||||
retries=3,
|
||||
@@ -1094,7 +1098,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
|
||||
return FinalizeResult(status="COMPLETED")
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[finalize], execution_timeout=timedelta(seconds=TIMEOUT_SHORT), retries=3
|
||||
)
|
||||
@with_error_handling("cleanup_consent", set_error_status=False)
|
||||
@@ -1194,7 +1198,7 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
|
||||
return ConsentResult()
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[cleanup_consent],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
|
||||
retries=5,
|
||||
@@ -1221,14 +1225,14 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
|
||||
return ZulipResult(zulip_message_id=message_id)
|
||||
|
||||
|
||||
@diarization_pipeline.task(
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[post_zulip],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
|
||||
retries=30,
|
||||
)
|
||||
@with_error_handling("send_webhook", set_error_status=False)
|
||||
async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
|
||||
"""Send completion webhook to external service."""
|
||||
"""Send completion webhook to external service with full payload and HMAC signature."""
|
||||
ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
|
||||
|
||||
if not input.room_id:
|
||||
@@ -1237,27 +1241,39 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
from reflector.utils.webhook import ( # noqa: PLC0415
|
||||
build_transcript_webhook_payload,
|
||||
send_webhook_request,
|
||||
)
|
||||
|
||||
room = await rooms_controller.get_by_id(input.room_id)
|
||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||
if not room or not room.webhook_url:
|
||||
ctx.log("send_webhook skipped (no webhook_url configured)")
|
||||
return WebhookResult(webhook_sent=False, skipped=True)
|
||||
|
||||
if room and room.webhook_url and transcript:
|
||||
webhook_payload = {
|
||||
"event": "transcript.completed",
|
||||
"transcript_id": input.transcript_id,
|
||||
"title": transcript.title,
|
||||
"duration": transcript.duration,
|
||||
}
|
||||
payload = await build_transcript_webhook_payload(
|
||||
transcript_id=input.transcript_id,
|
||||
room_id=input.room_id,
|
||||
)
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
room.webhook_url, json=webhook_payload, timeout=30
|
||||
)
|
||||
response.raise_for_status()
|
||||
if not payload:
|
||||
ctx.log("send_webhook skipped (could not build payload)")
|
||||
return WebhookResult(webhook_sent=False, skipped=True)
|
||||
|
||||
ctx.log(f"send_webhook complete: status_code={response.status_code}")
|
||||
ctx.log(
|
||||
f"send_webhook: sending to {room.webhook_url} "
|
||||
f"(topics={len(payload.transcript.topics)}, "
|
||||
f"participants={len(payload.transcript.participants)})"
|
||||
)
|
||||
|
||||
return WebhookResult(webhook_sent=True, response_code=response.status_code)
|
||||
response = await send_webhook_request(
|
||||
url=room.webhook_url,
|
||||
payload=payload,
|
||||
event_type="transcript.completed",
|
||||
webhook_secret=room.webhook_secret,
|
||||
timeout=30.0,
|
||||
)
|
||||
|
||||
return WebhookResult(webhook_sent=False, skipped=True)
|
||||
ctx.log(f"send_webhook complete: status_code={response.status_code}")
|
||||
|
||||
return WebhookResult(webhook_sent=True, response_code=response.status_code)
|
||||
@@ -4,10 +4,10 @@ Hatchet child workflow: TrackProcessing
|
||||
Handles individual audio track processing: padding and transcription.
|
||||
Spawned dynamically by the main diarization pipeline for each track.
|
||||
|
||||
Architecture note: This is a separate workflow (not inline tasks in DiarizationPipeline)
|
||||
Architecture note: This is a separate workflow (not inline tasks in DailyMultitrackPipeline)
|
||||
because Hatchet workflow DAGs are defined statically, but the number of tracks varies
|
||||
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
|
||||
standard pattern for dynamic fan-out. See `process_tracks` in diarization_pipeline.py.
|
||||
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
|
||||
|
||||
Note: This file uses deferred imports (inside tasks) intentionally.
|
||||
Hatchet workers run in forked processes; fresh imports per task ensure
|
||||
|
||||
Reference in New Issue
Block a user