self-review wip

This commit is contained in:
Igor Loskutov
2025-12-18 11:42:32 -05:00
parent bf90bd076b
commit 61e2b3211e
2 changed files with 17 additions and 68 deletions

View File

@@ -122,10 +122,6 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool:
try:
async with fresh_db_connection():
await set_status_and_broadcast(transcript_id, "error")
logger.info(
"[Hatchet] Set transcript status to error",
transcript_id=transcript_id,
)
return True
except Exception as e:
logger.critical(
@@ -137,7 +133,7 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool:
return False
def _get_storage():
def _spawn_storage():
"""Create fresh storage instance."""
return AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
@@ -181,7 +177,6 @@ def with_error_handling(step_name: str, set_error_status: bool = True) -> Callab
async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
"""Fetch recording metadata from Daily.co API."""
ctx.log(f"get_recording: recording_id={input.recording_id}")
logger.info("[Hatchet] get_recording", recording_id=input.recording_id)
# Set transcript status to "processing" at workflow start (broadcasts to WebSocket)
async with fresh_db_connection():
@@ -190,10 +185,7 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
await set_status_and_broadcast(input.transcript_id, "processing")
logger.info(
"[Hatchet] Set transcript status to processing",
transcript_id=input.transcript_id,
)
ctx.log(f"Set transcript status to processing: {input.transcript_id}")
if not settings.DAILY_API_KEY:
raise ValueError("DAILY_API_KEY not configured")
@@ -204,12 +196,6 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
ctx.log(
f"get_recording complete: room={recording.room_name}, duration={recording.duration}s"
)
logger.info(
"[Hatchet] get_recording complete",
recording_id=input.recording_id,
room_name=recording.room_name,
duration=recording.duration,
)
return RecordingResult(
id=recording.id,
@@ -225,7 +211,6 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult:
"""Fetch participant list from Daily.co API and update transcript in database."""
ctx.log(f"get_participants: transcript_id={input.transcript_id}")
logger.info("[Hatchet] get_participants", transcript_id=input.transcript_id)
recording_data = to_dict(ctx.task_output(get_recording))
mtg_session_id = recording_data.get("mtg_session_id")
@@ -300,10 +285,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
)
ctx.log(f"get_participants complete: {len(participants_list)} participants")
logger.info(
"[Hatchet] get_participants complete",
participant_count=len(participants_list),
)
return ParticipantsResult(
participants=participants_list,
@@ -320,11 +301,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
"""Spawn child workflows for each track (dynamic fan-out)."""
ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows")
logger.info(
"[Hatchet] process_tracks",
num_tracks=len(input.tracks),
transcript_id=input.transcript_id,
)
participants_data = to_dict(ctx.task_output(get_participants))
source_language = participants_data.get("source_language", "en")
@@ -376,11 +352,6 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
ctx.log(
f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks"
)
logger.info(
"[Hatchet] process_tracks complete",
num_tracks=len(input.tracks),
total_words=len(all_words),
)
return ProcessTracksResult(
all_words=all_words,
@@ -399,7 +370,6 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
"""Mix all padded tracks into single audio file using PyAV (same as Celery)."""
ctx.log("mixdown_tracks: mixing padded tracks into single audio file")
logger.info("[Hatchet] mixdown_tracks", transcript_id=input.transcript_id)
track_data = to_dict(ctx.task_output(process_tracks))
padded_tracks_data = track_data.get("padded_tracks", [])
@@ -407,7 +377,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
if not padded_tracks_data:
raise ValueError("No padded tracks to mixdown")
storage = _get_storage()
storage = _spawn_storage()
# Presign URLs on demand (avoids stale URLs on workflow replay)
padded_urls = []
@@ -473,11 +443,6 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
)
ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}")
logger.info(
"[Hatchet] mixdown_tracks uploaded",
key=storage_path,
size=file_size,
)
return MixdownResult(
audio_key=storage_path,
@@ -492,7 +457,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
@with_error_handling("generate_waveform")
async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResult:
"""Generate audio waveform visualization using AudioWaveformProcessor (matches Celery)."""
logger.info("[Hatchet] generate_waveform", transcript_id=input.transcript_id)
ctx.log(f"generate_waveform: transcript_id={input.transcript_id}")
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptWaveform,
@@ -503,10 +468,8 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
track_data = to_dict(ctx.task_output(process_tracks))
created_padded_files = track_data.get("created_padded_files", [])
if created_padded_files:
logger.info(
f"[Hatchet] Cleaning up {len(created_padded_files)} temporary S3 files"
)
storage = _get_storage()
ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files")
storage = _spawn_storage()
cleanup_tasks = []
for storage_path in created_padded_files:
cleanup_tasks.append(storage.delete_file(storage_path))
@@ -523,7 +486,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
mixdown_data = to_dict(ctx.task_output(mixdown_tracks))
audio_key = mixdown_data.get("audio_key")
storage = _get_storage()
storage = _spawn_storage()
audio_url = await storage.get_file_url(
audio_key,
operation="get_object",
@@ -556,7 +519,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
finally:
Path(temp_path).unlink(missing_ok=True)
logger.info("[Hatchet] generate_waveform complete")
ctx.log("generate_waveform complete")
return WaveformResult(waveform_generated=True)
@@ -568,7 +531,6 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
"""Detect topics using LLM and save to database (matches Celery on_topic callback)."""
ctx.log("detect_topics: analyzing transcript for topics")
logger.info("[Hatchet] detect_topics", transcript_id=input.transcript_id)
track_data = to_dict(ctx.task_output(process_tracks))
words = track_data.get("all_words", [])
@@ -615,7 +577,6 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
topics_list = [t.model_dump() for t in topics]
ctx.log(f"detect_topics complete: found {len(topics_list)} topics")
logger.info("[Hatchet] detect_topics complete", topic_count=len(topics_list))
return TopicsResult(topics=topics_list)
@@ -627,7 +588,6 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
"""Generate meeting title using LLM and save to database (matches Celery on_title callback)."""
ctx.log("generate_title: generating title from topics")
logger.info("[Hatchet] generate_title", transcript_id=input.transcript_id)
topics_data = to_dict(ctx.task_output(detect_topics))
topics = topics_data.get("topics", [])
@@ -666,7 +626,6 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
)
ctx.log(f"generate_title complete: '{title_result}'")
logger.info("[Hatchet] generate_title complete", title=title_result)
return TitleResult(title=title_result)
@@ -678,7 +637,6 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
"""Generate meeting summary using LLM and save to database (matches Celery callbacks)."""
ctx.log("generate_summary: generating long and short summaries")
logger.info("[Hatchet] generate_summary", transcript_id=input.transcript_id)
topics_data = to_dict(ctx.task_output(detect_topics))
topics = topics_data.get("topics", [])
@@ -742,7 +700,6 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
)
ctx.log("generate_summary complete")
logger.info("[Hatchet] generate_summary complete")
return SummaryResult(summary=summary_result, short_summary=short_summary_result)
@@ -760,7 +717,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
Note: Title and summaries are already saved by their respective task callbacks.
"""
ctx.log("finalize: saving transcript and setting status to 'ended'")
logger.info("[Hatchet] finalize", transcript_id=input.transcript_id)
mixdown_data = to_dict(ctx.task_output(mixdown_tracks))
track_data = to_dict(ctx.task_output(process_tracks))
@@ -816,7 +772,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
ctx.log(
f"finalize complete: transcript {input.transcript_id} status set to 'ended'"
)
logger.info("[Hatchet] finalize complete", transcript_id=input.transcript_id)
return FinalizeResult(status="COMPLETED")
@@ -827,7 +782,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
@with_error_handling("cleanup_consent", set_error_status=False)
async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
"""Check and handle consent requirements."""
logger.info("[Hatchet] cleanup_consent", transcript_id=input.transcript_id)
ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.db.meetings import meetings_controller # noqa: PLC0415
@@ -841,11 +796,9 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
# For now just mark as checked
pass
logger.info(
"[Hatchet] cleanup_consent complete", transcript_id=input.transcript_id
)
ctx.log(f"cleanup_consent complete: transcript_id={input.transcript_id}")
return ConsentResult(consent_checked=True)
return ConsentResult()
@diarization_pipeline.task(
@@ -854,10 +807,10 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
@with_error_handling("post_zulip", set_error_status=False)
async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
"""Post notification to Zulip."""
logger.info("[Hatchet] post_zulip", transcript_id=input.transcript_id)
ctx.log(f"post_zulip: transcript_id={input.transcript_id}")
if not settings.ZULIP_REALM:
logger.info("[Hatchet] post_zulip skipped (Zulip not configured)")
ctx.log("post_zulip skipped (Zulip not configured)")
return ZulipResult(zulip_message_id=None, skipped=True)
async with fresh_db_connection():
@@ -866,7 +819,7 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
message_id = await post_transcript_notification(transcript)
logger.info("[Hatchet] post_zulip complete", zulip_message_id=message_id)
ctx.log(f"post_zulip complete: zulip_message_id={message_id}")
else:
message_id = None
@@ -879,10 +832,10 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
@with_error_handling("send_webhook", set_error_status=False)
async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
"""Send completion webhook to external service."""
logger.info("[Hatchet] send_webhook", transcript_id=input.transcript_id)
ctx.log(f"send_webhook: transcript_id={input.transcript_id}")
if not input.room_id:
logger.info("[Hatchet] send_webhook skipped (no room_id)")
ctx.log("send_webhook skipped (no room_id)")
return WebhookResult(webhook_sent=False, skipped=True)
async with fresh_db_connection():
@@ -906,9 +859,7 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
)
response.raise_for_status()
logger.info(
"[Hatchet] send_webhook complete", status_code=response.status_code
)
ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code)

View File

@@ -107,8 +107,6 @@ class FinalizeResult(BaseModel):
class ConsentResult(BaseModel):
"""Result from cleanup_consent task."""
consent_checked: bool
class ZulipResult(BaseModel):
"""Result from post_zulip task."""