Files
reflector/server/reflector/hatchet/workflows/file_pipeline.py
2026-04-08 20:32:47 -05:00

971 lines
34 KiB
Python

"""
Hatchet workflow: FilePipeline
Processing pipeline for file uploads and Whereby recordings.
Orchestrates: extract audio → upload → transcribe/diarize/waveform (parallel)
→ assemble → detect topics → title/summaries (parallel) → finalize
→ cleanup consent → post zulip / send webhook.
Note: This file uses deferred imports (inside functions/tasks) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure DB connections
are not shared across forks, avoiding connection pooling issues.
"""
import json
from datetime import timedelta
from pathlib import Path
import av
import httpx
from hatchet_sdk import Context
from pydantic import BaseModel
from reflector.email import is_email_configured, send_transcript_email
from reflector.hatchet.broadcast import (
append_event_and_broadcast,
set_status_and_broadcast,
)
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import (
TIMEOUT_HEAVY,
TIMEOUT_MEDIUM,
TIMEOUT_SHORT,
TIMEOUT_TITLE,
TaskName,
)
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
fresh_db_connection,
set_workflow_error_status,
with_error_handling,
)
from reflector.hatchet.workflows.models import (
ConsentResult,
EmailResult,
TitleResult,
TopicsResult,
WaveformResult,
WebhookResult,
ZulipResult,
)
from reflector.logger import logger
from reflector.pipelines import topic_processing
from reflector.pipelines.transcription_helpers import transcribe_file_with_processor
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.file_diarization import FileDiarizationInput
from reflector.processors.file_diarization_auto import FileDiarizationAutoProcessor
from reflector.processors.transcript_diarization_assembler import (
TranscriptDiarizationAssemblerInput,
TranscriptDiarizationAssemblerProcessor,
)
from reflector.processors.types import (
DiarizationSegment,
Word,
)
from reflector.processors.types import (
Transcript as TranscriptType,
)
from reflector.settings import settings
from reflector.storage import get_source_storage, get_transcripts_storage
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
from reflector.utils.audio_waveform import get_audio_waveform
from reflector.utils.webhook import (
fetch_transcript_webhook_payload,
send_webhook_request,
)
from reflector.zulip import post_transcript_notification
class FilePipelineInput(BaseModel):
transcript_id: str
room_id: str | None = None
# --- Result models specific to file pipeline ---
class ExtractAudioResult(BaseModel):
audio_path: str
duration_ms: float = 0.0
class UploadAudioResult(BaseModel):
audio_url: str
audio_path: str
class TranscribeResult(BaseModel):
words: list[dict]
translation: str | None = None
class DiarizeResult(BaseModel):
diarization: list[dict] | None = None
class AssembleTranscriptResult(BaseModel):
assembled: bool
class SummariesResult(BaseModel):
generated: bool
class FinalizeResult(BaseModel):
status: str
hatchet = HatchetClientManager.get_client()
file_pipeline = hatchet.workflow(name="FilePipeline", input_validator=FilePipelineInput)
@file_pipeline.task(
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
backoff_factor=2.0,
backoff_max_seconds=10,
)
@with_error_handling(TaskName.EXTRACT_AUDIO)
async def extract_audio(input: FilePipelineInput, ctx: Context) -> ExtractAudioResult:
"""Extract audio from upload file, convert to MP3."""
ctx.log(f"extract_audio: starting for transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
await set_status_and_broadcast(input.transcript_id, "processing", logger=logger)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
# Clear transcript as we're going to regenerate everything
await transcripts_controller.update(
transcript,
{
"events": [],
"topics": [],
},
)
# Find upload file
audio_file = next(transcript.data_path.glob("upload.*"), None)
if not audio_file:
audio_file = next(transcript.data_path.glob("audio.*"), None)
if not audio_file:
raise ValueError("No audio file found to process")
ctx.log(f"extract_audio: processing {audio_file}")
# Extract audio and write as MP3
duration_ms_container = [0.0]
async def capture_duration(d):
duration_ms_container[0] = d
mp3_writer = AudioFileWriterProcessor(
path=transcript.audio_mp3_filename,
on_duration=capture_duration,
)
input_container = av.open(str(audio_file))
for frame in input_container.decode(audio=0):
await mp3_writer.push(frame)
await mp3_writer.flush()
input_container.close()
duration_ms = duration_ms_container[0]
audio_path = str(transcript.audio_mp3_filename)
# Persist duration to database and broadcast to websocket clients
from reflector.db.transcripts import TranscriptDuration # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller as tc
await tc.update(transcript, {"duration": duration_ms})
await append_event_and_broadcast(
input.transcript_id,
transcript,
"DURATION",
TranscriptDuration(duration=duration_ms),
logger=logger,
)
ctx.log(f"extract_audio complete: {audio_path}, duration={duration_ms}ms")
return ExtractAudioResult(audio_path=audio_path, duration_ms=duration_ms)
@file_pipeline.task(
parents=[extract_audio],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
backoff_factor=2.0,
backoff_max_seconds=10,
)
@with_error_handling(TaskName.UPLOAD_AUDIO)
async def upload_audio(input: FilePipelineInput, ctx: Context) -> UploadAudioResult:
"""Upload audio to S3/storage, return audio_url."""
ctx.log(f"upload_audio: starting for transcript_id={input.transcript_id}")
extract_result = ctx.task_output(extract_audio)
audio_path = extract_result.audio_path
storage = get_transcripts_storage()
if not storage:
raise ValueError(
"Storage backend required for file processing. "
"Configure TRANSCRIPT_STORAGE_* settings."
)
with open(audio_path, "rb") as f:
audio_data = f.read()
storage_path = f"file_pipeline/{input.transcript_id}/audio.mp3"
await storage.put_file(storage_path, audio_data)
audio_url = await storage.get_file_url(storage_path)
ctx.log(f"upload_audio complete: {audio_url}")
return UploadAudioResult(audio_url=audio_url, audio_path=audio_path)
@file_pipeline.task(
parents=[upload_audio],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
backoff_factor=2.0,
backoff_max_seconds=30,
)
@with_error_handling(TaskName.TRANSCRIBE)
async def transcribe(input: FilePipelineInput, ctx: Context) -> TranscribeResult:
"""Transcribe the audio file using the configured backend."""
ctx.log(f"transcribe: starting for transcript_id={input.transcript_id}")
upload_result = ctx.task_output(upload_audio)
audio_url = upload_result.audio_url
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
source_language = transcript.source_language
result = await transcribe_file_with_processor(audio_url, source_language)
ctx.log(f"transcribe complete: {len(result.words)} words")
return TranscribeResult(
words=[w.model_dump() for w in result.words],
translation=result.translation,
)
@file_pipeline.task(
parents=[upload_audio],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
backoff_factor=2.0,
backoff_max_seconds=30,
)
@with_error_handling(TaskName.DIARIZE)
async def diarize(input: FilePipelineInput, ctx: Context) -> DiarizeResult:
"""Diarize the audio file (speaker identification)."""
ctx.log(f"diarize: starting for transcript_id={input.transcript_id}")
if not settings.DIARIZATION_BACKEND:
ctx.log("diarize: diarization disabled, skipping")
return DiarizeResult(diarization=None)
upload_result = ctx.task_output(upload_audio)
audio_url = upload_result.audio_url
processor = FileDiarizationAutoProcessor()
input_data = FileDiarizationInput(audio_url=audio_url)
result = None
async def capture_result(diarization_output):
nonlocal result
result = diarization_output.diarization
try:
processor.on(capture_result)
await processor.push(input_data)
await processor.flush()
except Exception as e:
logger.error(f"Diarization failed: {e}")
return DiarizeResult(diarization=None)
ctx.log(f"diarize complete: {len(result) if result else 0} segments")
return DiarizeResult(diarization=list(result) if result else None)
@file_pipeline.task(
parents=[upload_audio],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
backoff_factor=2.0,
backoff_max_seconds=10,
)
@with_error_handling(TaskName.GENERATE_WAVEFORM)
async def generate_waveform(input: FilePipelineInput, ctx: Context) -> WaveformResult:
"""Generate audio waveform visualization."""
ctx.log(f"generate_waveform: starting for transcript_id={input.transcript_id}")
upload_result = ctx.task_output(upload_audio)
audio_path = upload_result.audio_path
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptWaveform,
transcripts_controller,
)
waveform = get_audio_waveform(
path=Path(audio_path), segments_count=WAVEFORM_SEGMENTS
)
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
transcript.data_path.mkdir(parents=True, exist_ok=True)
with open(transcript.audio_waveform_filename, "w") as f:
json.dump(waveform, f)
waveform_data = TranscriptWaveform(waveform=waveform)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"WAVEFORM",
waveform_data,
logger=logger,
)
ctx.log("generate_waveform complete")
return WaveformResult(waveform_generated=True)
@file_pipeline.task(
parents=[transcribe, diarize, generate_waveform],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=3,
backoff_factor=2.0,
backoff_max_seconds=10,
)
@with_error_handling(TaskName.ASSEMBLE_TRANSCRIPT)
async def assemble_transcript(
input: FilePipelineInput, ctx: Context
) -> AssembleTranscriptResult:
"""Merge transcription + diarization results."""
ctx.log(f"assemble_transcript: starting for transcript_id={input.transcript_id}")
transcribe_result = ctx.task_output(transcribe)
diarize_result = ctx.task_output(diarize)
words = [Word(**w) for w in transcribe_result.words]
transcript_data = TranscriptType(
words=words, translation=transcribe_result.translation
)
diarization = None
if diarize_result.diarization:
diarization = [DiarizationSegment(**s) for s in diarize_result.diarization]
processor = TranscriptDiarizationAssemblerProcessor()
assembler_input = TranscriptDiarizationAssemblerInput(
transcript=transcript_data, diarization=diarization or []
)
diarized_transcript = None
async def capture_result(transcript):
nonlocal diarized_transcript
diarized_transcript = transcript
processor.on(capture_result)
await processor.push(assembler_input)
await processor.flush()
if not diarized_transcript:
raise ValueError("No diarized transcript captured")
# Save the assembled transcript events to the database
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptText,
transcripts_controller,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
assembled_text = diarized_transcript.text if diarized_transcript else ""
assembled_translation = (
diarized_transcript.translation if diarized_transcript else None
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"TRANSCRIPT",
TranscriptText(text=assembled_text, translation=assembled_translation),
logger=logger,
)
ctx.log("assemble_transcript complete")
return AssembleTranscriptResult(assembled=True)
@file_pipeline.task(
parents=[assemble_transcript],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
backoff_factor=2.0,
backoff_max_seconds=30,
)
@with_error_handling(TaskName.DETECT_TOPICS)
async def detect_topics(input: FilePipelineInput, ctx: Context) -> TopicsResult:
"""Detect topics from the assembled transcript."""
ctx.log(f"detect_topics: starting for transcript_id={input.transcript_id}")
# Re-read the transcript to get the diarized words
transcribe_result = ctx.task_output(transcribe)
diarize_result = ctx.task_output(diarize)
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptTopic,
transcripts_controller,
)
words = [Word(**w) for w in transcribe_result.words]
transcript_data = TranscriptType(
words=words, translation=transcribe_result.translation
)
diarization = None
if diarize_result.diarization:
diarization = [DiarizationSegment(**s) for s in diarize_result.diarization]
# Re-assemble to get the diarized transcript for topic detection
processor = TranscriptDiarizationAssemblerProcessor()
assembler_input = TranscriptDiarizationAssemblerInput(
transcript=transcript_data, diarization=diarization or []
)
diarized_transcript = None
async def capture_result(transcript):
nonlocal diarized_transcript
diarized_transcript = transcript
processor.on(capture_result)
await processor.push(assembler_input)
await processor.flush()
if not diarized_transcript:
raise ValueError("No diarized transcript for topic detection")
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
target_language = transcript.target_language
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
async def on_topic_callback(data):
topic = TranscriptTopic(
title=data.title,
summary=data.summary,
timestamp=data.timestamp,
transcript=data.transcript.text
if hasattr(data.transcript, "text")
else "",
words=data.transcript.words
if hasattr(data.transcript, "words")
else [],
)
await transcripts_controller.upsert_topic(transcript, topic)
await append_event_and_broadcast(
input.transcript_id, transcript, "TOPIC", topic, logger=logger
)
topics = await topic_processing.detect_topics(
diarized_transcript,
target_language,
on_topic_callback=on_topic_callback,
empty_pipeline=empty_pipeline,
)
ctx.log(f"detect_topics complete: {len(topics)} topics")
return TopicsResult(topics=topics)
@file_pipeline.task(
parents=[detect_topics],
execution_timeout=timedelta(seconds=TIMEOUT_TITLE),
retries=3,
backoff_factor=2.0,
backoff_max_seconds=15,
)
@with_error_handling(TaskName.GENERATE_TITLE)
async def generate_title(input: FilePipelineInput, ctx: Context) -> TitleResult:
"""Generate meeting title using LLM."""
ctx.log(f"generate_title: starting for transcript_id={input.transcript_id}")
topics_result = ctx.task_output(detect_topics)
topics = topics_result.topics
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptFinalTitle,
transcripts_controller,
)
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
title_result = None
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
async def on_title_callback(data):
nonlocal title_result
title_result = data.title
final_title = TranscriptFinalTitle(title=data.title)
if not transcript.title:
await transcripts_controller.update(
transcript, {"title": final_title.title}
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_TITLE",
final_title,
logger=logger,
)
await topic_processing.generate_title(
topics,
on_title_callback=on_title_callback,
empty_pipeline=empty_pipeline,
logger=logger,
)
ctx.log(f"generate_title complete: '{title_result}'")
return TitleResult(title=title_result)
@file_pipeline.task(
parents=[detect_topics],
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
retries=3,
backoff_factor=2.0,
backoff_max_seconds=30,
)
@with_error_handling(TaskName.GENERATE_SUMMARIES)
async def generate_summaries(input: FilePipelineInput, ctx: Context) -> SummariesResult:
"""Generate long/short summaries and action items."""
ctx.log(f"generate_summaries: starting for transcript_id={input.transcript_id}")
topics_result = ctx.task_output(detect_topics)
topics = topics_result.topics
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptActionItems,
TranscriptFinalLongSummary,
TranscriptFinalShortSummary,
transcripts_controller,
)
empty_pipeline = topic_processing.EmptyPipeline(logger=logger)
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
raise ValueError(f"Transcript {input.transcript_id} not found")
async def on_long_summary_callback(data):
final_long = TranscriptFinalLongSummary(long_summary=data.long_summary)
await transcripts_controller.update(
transcript, {"long_summary": final_long.long_summary}
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_LONG_SUMMARY",
final_long,
logger=logger,
)
async def on_short_summary_callback(data):
final_short = TranscriptFinalShortSummary(short_summary=data.short_summary)
await transcripts_controller.update(
transcript, {"short_summary": final_short.short_summary}
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"FINAL_SHORT_SUMMARY",
final_short,
logger=logger,
)
async def on_action_items_callback(data):
action_items = TranscriptActionItems(action_items=data.action_items)
await transcripts_controller.update(
transcript, {"action_items": action_items.action_items}
)
await append_event_and_broadcast(
input.transcript_id,
transcript,
"ACTION_ITEMS",
action_items,
logger=logger,
)
await topic_processing.generate_summaries(
topics,
transcript,
on_long_summary_callback=on_long_summary_callback,
on_short_summary_callback=on_short_summary_callback,
on_action_items_callback=on_action_items_callback,
empty_pipeline=empty_pipeline,
logger=logger,
)
ctx.log("generate_summaries complete")
return SummariesResult(generated=True)
@file_pipeline.task(
parents=[generate_title, generate_summaries],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
backoff_factor=2.0,
backoff_max_seconds=5,
)
@with_error_handling(TaskName.FINALIZE)
async def finalize(input: FilePipelineInput, ctx: Context) -> FinalizeResult:
"""Set transcript status to 'ended' and broadcast."""
ctx.log("finalize: setting status to 'ended'")
async with fresh_db_connection():
await set_status_and_broadcast(input.transcript_id, "ended", logger=logger)
ctx.log("finalize complete")
return FinalizeResult(status="COMPLETED")
@file_pipeline.task(
parents=[finalize],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=3,
backoff_factor=2.0,
backoff_max_seconds=10,
)
@with_error_handling(TaskName.CLEANUP_CONSENT, set_error_status=False)
async def cleanup_consent(input: FilePipelineInput, ctx: Context) -> ConsentResult:
"""Check consent and delete audio files if any participant denied."""
ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.db.meetings import ( # noqa: PLC0415
meeting_consent_controller,
meetings_controller,
)
from reflector.db.recordings import recordings_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
ctx.log("cleanup_consent: transcript not found")
return ConsentResult()
consent_denied = False
recording = None
meeting = None
if transcript.recording_id:
recording = await recordings_controller.get_by_id(transcript.recording_id)
if recording and recording.meeting_id:
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if meeting:
consent_denied = await meeting_consent_controller.has_any_denial(
meeting.id
)
if not consent_denied:
ctx.log("cleanup_consent: consent approved, keeping all files")
return ConsentResult()
ctx.log("cleanup_consent: consent denied, deleting audio files")
deletion_errors = []
if recording and recording.bucket_name:
keys_to_delete = []
if recording.track_keys:
keys_to_delete = recording.track_keys
elif recording.object_key:
keys_to_delete = [recording.object_key]
master_storage = get_transcripts_storage()
for key in keys_to_delete:
try:
await master_storage.delete_file(key, bucket=recording.bucket_name)
ctx.log(f"Deleted recording file: {recording.bucket_name}/{key}")
except Exception as e:
error_msg = f"Failed to delete {key}: {e}"
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
if transcript.audio_location == "storage":
storage = get_transcripts_storage()
try:
await storage.delete_file(transcript.storage_audio_path)
ctx.log(f"Deleted processed audio: {transcript.storage_audio_path}")
except Exception as e:
error_msg = f"Failed to delete processed audio: {e}"
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
try:
if (
hasattr(transcript, "audio_mp3_filename")
and transcript.audio_mp3_filename
):
transcript.audio_mp3_filename.unlink(missing_ok=True)
if (
hasattr(transcript, "audio_wav_filename")
and transcript.audio_wav_filename
):
transcript.audio_wav_filename.unlink(missing_ok=True)
except Exception as e:
error_msg = f"Failed to delete local audio files: {e}"
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
# Delete cloud video if present
if meeting and meeting.daily_composed_video_s3_key:
try:
source_storage = get_source_storage("daily")
await source_storage.delete_file(meeting.daily_composed_video_s3_key)
await meetings_controller.update_meeting(
meeting.id,
daily_composed_video_s3_key=None,
daily_composed_video_duration=None,
)
ctx.log(f"Deleted cloud video: {meeting.daily_composed_video_s3_key}")
except Exception as e:
error_msg = f"Failed to delete cloud video: {e}"
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
if deletion_errors:
logger.warning(
"[Hatchet] cleanup_consent completed with errors",
transcript_id=input.transcript_id,
error_count=len(deletion_errors),
)
else:
await transcripts_controller.update(transcript, {"audio_deleted": True})
ctx.log("cleanup_consent: all audio and video deleted successfully")
return ConsentResult()
@file_pipeline.task(
parents=[cleanup_consent],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=5,
backoff_factor=2.0,
backoff_max_seconds=15,
)
@with_error_handling(TaskName.POST_ZULIP, set_error_status=False)
async def post_zulip(input: FilePipelineInput, ctx: Context) -> ZulipResult:
"""Post notification to Zulip."""
ctx.log(f"post_zulip: transcript_id={input.transcript_id}")
if not settings.ZULIP_REALM:
ctx.log("post_zulip skipped (Zulip not configured)")
return ZulipResult(zulip_message_id=None, skipped=True)
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
message_id = await post_transcript_notification(transcript)
ctx.log(f"post_zulip complete: zulip_message_id={message_id}")
else:
message_id = None
return ZulipResult(zulip_message_id=message_id)
@file_pipeline.task(
parents=[cleanup_consent],
execution_timeout=timedelta(seconds=TIMEOUT_MEDIUM),
retries=5,
backoff_factor=2.0,
backoff_max_seconds=15,
)
@with_error_handling(TaskName.SEND_WEBHOOK, set_error_status=False)
async def send_webhook(input: FilePipelineInput, ctx: Context) -> WebhookResult:
"""Send completion webhook to external service."""
ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
if not input.room_id:
ctx.log("send_webhook skipped (no room_id)")
return WebhookResult(webhook_sent=False, skipped=True)
async with fresh_db_connection():
from reflector.db.rooms import rooms_controller # noqa: PLC0415
room = await rooms_controller.get_by_id(input.room_id)
if not room or not room.webhook_url:
ctx.log("send_webhook skipped (no webhook_url configured)")
return WebhookResult(webhook_sent=False, skipped=True)
payload = await fetch_transcript_webhook_payload(
transcript_id=input.transcript_id,
room_id=input.room_id,
)
if isinstance(payload, str):
ctx.log(f"send_webhook skipped (could not build payload): {payload}")
return WebhookResult(webhook_sent=False, skipped=True)
try:
response = await send_webhook_request(
url=room.webhook_url,
payload=payload,
event_type="transcript.completed",
webhook_secret=room.webhook_secret,
timeout=30.0,
)
ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code)
except httpx.HTTPStatusError as e:
ctx.log(f"send_webhook failed (HTTP {e.response.status_code}), continuing")
return WebhookResult(
webhook_sent=False, response_code=e.response.status_code
)
except (httpx.ConnectError, httpx.TimeoutException) as e:
ctx.log(f"send_webhook failed ({e}), continuing")
return WebhookResult(webhook_sent=False)
except Exception as e:
ctx.log(f"send_webhook unexpected error: {e}")
return WebhookResult(webhook_sent=False)
@file_pipeline.task(
parents=[cleanup_consent],
execution_timeout=timedelta(seconds=TIMEOUT_SHORT),
retries=5,
backoff_factor=2.0,
backoff_max_seconds=15,
)
@with_error_handling(TaskName.SEND_EMAIL, set_error_status=False)
async def send_email(input: FilePipelineInput, ctx: Context) -> EmailResult:
"""Send transcript email to collected recipients."""
ctx.log(f"send_email: transcript_id={input.transcript_id}")
if not is_email_configured():
ctx.log("send_email skipped (SMTP not configured)")
return EmailResult(skipped=True)
async with fresh_db_connection():
from reflector.db.meetings import meetings_controller # noqa: PLC0415
from reflector.db.recordings import recordings_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
ctx.log("send_email skipped (transcript not found)")
return EmailResult(skipped=True)
# Try transcript.meeting_id first, then fall back to recording.meeting_id
meeting = None
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if not meeting and transcript.recording_id:
recording = await recordings_controller.get_by_id(transcript.recording_id)
if recording and recording.meeting_id:
meeting = await meetings_controller.get_by_id(recording.meeting_id)
# Normalise meeting recipients (legacy strings → dicts)
meeting_recipients: list[dict] = (
[
entry
if isinstance(entry, dict)
else {"email": entry, "include_link": True}
for entry in (meeting.email_recipients or [])
]
if meeting and meeting.email_recipients
else []
)
# Room-level email always gets a link (room owner)
from reflector.db.rooms import rooms_controller # noqa: PLC0415
room_email = None
if transcript.room_id:
room = await rooms_controller.get_by_id(transcript.room_id)
if room and room.email_transcript_to:
room_email = room.email_transcript_to
# Build two groups: with link and without link
with_link = [
r["email"] for r in meeting_recipients if r.get("include_link", True)
]
without_link = [
r["email"] for r in meeting_recipients if not r.get("include_link", True)
]
if room_email:
if room_email not in with_link:
with_link.append(room_email)
without_link = [e for e in without_link if e != room_email]
if not with_link and not without_link:
ctx.log("send_email skipped (no email recipients)")
return EmailResult(skipped=True)
# For room-level emails, do NOT change share_mode (only set public if meeting had recipients)
if meeting and meeting.email_recipients:
await transcripts_controller.update(transcript, {"share_mode": "public"})
count = 0
if with_link:
count += await send_transcript_email(
with_link, transcript, include_link=True
)
if without_link:
count += await send_transcript_email(
without_link, transcript, include_link=False
)
ctx.log(f"send_email complete: sent {count} emails")
return EmailResult(emails_sent=count)
# --- On failure handler ---
async def on_workflow_failure(input: FilePipelineInput, ctx: Context) -> None:
"""Set transcript status to 'error' only if not already 'ended'."""
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript and transcript.status == "ended":
logger.info(
"[Hatchet] FilePipeline on_workflow_failure: transcript already ended, skipping error status",
transcript_id=input.transcript_id,
)
ctx.log(
"on_workflow_failure: transcript already ended, skipping error status"
)
return
await set_workflow_error_status(input.transcript_id)
@file_pipeline.on_failure_task()
async def _register_on_workflow_failure(input: FilePipelineInput, ctx: Context) -> None:
await on_workflow_failure(input, ctx)