diff --git a/server/reflector/hatchet/workflows/diarization_pipeline.py b/server/reflector/hatchet/workflows/diarization_pipeline.py index a137efa1..c9cf797f 100644 --- a/server/reflector/hatchet/workflows/diarization_pipeline.py +++ b/server/reflector/hatchet/workflows/diarization_pipeline.py @@ -122,10 +122,6 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool: try: async with fresh_db_connection(): await set_status_and_broadcast(transcript_id, "error") - logger.info( - "[Hatchet] Set transcript status to error", - transcript_id=transcript_id, - ) return True except Exception as e: logger.critical( @@ -137,7 +133,7 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool: return False -def _get_storage(): +def _spawn_storage(): """Create fresh storage instance.""" return AwsStorage( aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME, @@ -181,7 +177,6 @@ def with_error_handling(step_name: str, set_error_status: bool = True) -> Callab async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: """Fetch recording metadata from Daily.co API.""" ctx.log(f"get_recording: recording_id={input.recording_id}") - logger.info("[Hatchet] get_recording", recording_id=input.recording_id) # Set transcript status to "processing" at workflow start (broadcasts to WebSocket) async with fresh_db_connection(): @@ -190,10 +185,7 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: transcript = await transcripts_controller.get_by_id(input.transcript_id) if transcript: await set_status_and_broadcast(input.transcript_id, "processing") - logger.info( - "[Hatchet] Set transcript status to processing", - transcript_id=input.transcript_id, - ) + ctx.log(f"Set transcript status to processing: {input.transcript_id}") if not settings.DAILY_API_KEY: raise ValueError("DAILY_API_KEY not configured") @@ -204,12 +196,6 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: ctx.log( f"get_recording complete: room={recording.room_name}, duration={recording.duration}s" ) - logger.info( - "[Hatchet] get_recording complete", - recording_id=input.recording_id, - room_name=recording.room_name, - duration=recording.duration, - ) return RecordingResult( id=recording.id, @@ -225,7 +211,6 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult: """Fetch participant list from Daily.co API and update transcript in database.""" ctx.log(f"get_participants: transcript_id={input.transcript_id}") - logger.info("[Hatchet] get_participants", transcript_id=input.transcript_id) recording_data = to_dict(ctx.task_output(get_recording)) mtg_session_id = recording_data.get("mtg_session_id") @@ -300,10 +285,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe ) ctx.log(f"get_participants complete: {len(participants_list)} participants") - logger.info( - "[Hatchet] get_participants complete", - participant_count=len(participants_list), - ) return ParticipantsResult( participants=participants_list, @@ -320,11 +301,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult: """Spawn child workflows for each track (dynamic fan-out).""" ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows") - logger.info( - "[Hatchet] process_tracks", - num_tracks=len(input.tracks), - transcript_id=input.transcript_id, - ) participants_data = to_dict(ctx.task_output(get_participants)) source_language = participants_data.get("source_language", "en") @@ -376,11 +352,6 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes ctx.log( f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks" ) - logger.info( - "[Hatchet] process_tracks complete", - num_tracks=len(input.tracks), - total_words=len(all_words), - ) return ProcessTracksResult( all_words=all_words, @@ -399,7 +370,6 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: """Mix all padded tracks into single audio file using PyAV (same as Celery).""" ctx.log("mixdown_tracks: mixing padded tracks into single audio file") - logger.info("[Hatchet] mixdown_tracks", transcript_id=input.transcript_id) track_data = to_dict(ctx.task_output(process_tracks)) padded_tracks_data = track_data.get("padded_tracks", []) @@ -407,7 +377,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: if not padded_tracks_data: raise ValueError("No padded tracks to mixdown") - storage = _get_storage() + storage = _spawn_storage() # Presign URLs on demand (avoids stale URLs on workflow replay) padded_urls = [] @@ -473,11 +443,6 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: ) ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}") - logger.info( - "[Hatchet] mixdown_tracks uploaded", - key=storage_path, - size=file_size, - ) return MixdownResult( audio_key=storage_path, @@ -492,7 +457,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: @with_error_handling("generate_waveform") async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResult: """Generate audio waveform visualization using AudioWaveformProcessor (matches Celery).""" - logger.info("[Hatchet] generate_waveform", transcript_id=input.transcript_id) + ctx.log(f"generate_waveform: transcript_id={input.transcript_id}") from reflector.db.transcripts import ( # noqa: PLC0415 TranscriptWaveform, @@ -503,10 +468,8 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul track_data = to_dict(ctx.task_output(process_tracks)) created_padded_files = track_data.get("created_padded_files", []) if created_padded_files: - logger.info( - f"[Hatchet] Cleaning up {len(created_padded_files)} temporary S3 files" - ) - storage = _get_storage() + ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files") + storage = _spawn_storage() cleanup_tasks = [] for storage_path in created_padded_files: cleanup_tasks.append(storage.delete_file(storage_path)) @@ -523,7 +486,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul mixdown_data = to_dict(ctx.task_output(mixdown_tracks)) audio_key = mixdown_data.get("audio_key") - storage = _get_storage() + storage = _spawn_storage() audio_url = await storage.get_file_url( audio_key, operation="get_object", @@ -556,7 +519,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul finally: Path(temp_path).unlink(missing_ok=True) - logger.info("[Hatchet] generate_waveform complete") + ctx.log("generate_waveform complete") return WaveformResult(waveform_generated=True) @@ -568,7 +531,6 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: """Detect topics using LLM and save to database (matches Celery on_topic callback).""" ctx.log("detect_topics: analyzing transcript for topics") - logger.info("[Hatchet] detect_topics", transcript_id=input.transcript_id) track_data = to_dict(ctx.task_output(process_tracks)) words = track_data.get("all_words", []) @@ -615,7 +577,6 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: topics_list = [t.model_dump() for t in topics] ctx.log(f"detect_topics complete: found {len(topics_list)} topics") - logger.info("[Hatchet] detect_topics complete", topic_count=len(topics_list)) return TopicsResult(topics=topics_list) @@ -627,7 +588,6 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult: """Generate meeting title using LLM and save to database (matches Celery on_title callback).""" ctx.log("generate_title: generating title from topics") - logger.info("[Hatchet] generate_title", transcript_id=input.transcript_id) topics_data = to_dict(ctx.task_output(detect_topics)) topics = topics_data.get("topics", []) @@ -666,7 +626,6 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult: ) ctx.log(f"generate_title complete: '{title_result}'") - logger.info("[Hatchet] generate_title complete", title=title_result) return TitleResult(title=title_result) @@ -678,7 +637,6 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult: async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult: """Generate meeting summary using LLM and save to database (matches Celery callbacks).""" ctx.log("generate_summary: generating long and short summaries") - logger.info("[Hatchet] generate_summary", transcript_id=input.transcript_id) topics_data = to_dict(ctx.task_output(detect_topics)) topics = topics_data.get("topics", []) @@ -742,7 +700,6 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult: ) ctx.log("generate_summary complete") - logger.info("[Hatchet] generate_summary complete") return SummaryResult(summary=summary_result, short_summary=short_summary_result) @@ -760,7 +717,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: Note: Title and summaries are already saved by their respective task callbacks. """ ctx.log("finalize: saving transcript and setting status to 'ended'") - logger.info("[Hatchet] finalize", transcript_id=input.transcript_id) mixdown_data = to_dict(ctx.task_output(mixdown_tracks)) track_data = to_dict(ctx.task_output(process_tracks)) @@ -816,7 +772,6 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: ctx.log( f"finalize complete: transcript {input.transcript_id} status set to 'ended'" ) - logger.info("[Hatchet] finalize complete", transcript_id=input.transcript_id) return FinalizeResult(status="COMPLETED") @@ -827,7 +782,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: @with_error_handling("cleanup_consent", set_error_status=False) async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult: """Check and handle consent requirements.""" - logger.info("[Hatchet] cleanup_consent", transcript_id=input.transcript_id) + ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}") async with fresh_db_connection(): from reflector.db.meetings import meetings_controller # noqa: PLC0415 @@ -841,11 +796,9 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult: # For now just mark as checked pass - logger.info( - "[Hatchet] cleanup_consent complete", transcript_id=input.transcript_id - ) + ctx.log(f"cleanup_consent complete: transcript_id={input.transcript_id}") - return ConsentResult(consent_checked=True) + return ConsentResult() @diarization_pipeline.task( @@ -854,10 +807,10 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult: @with_error_handling("post_zulip", set_error_status=False) async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult: """Post notification to Zulip.""" - logger.info("[Hatchet] post_zulip", transcript_id=input.transcript_id) + ctx.log(f"post_zulip: transcript_id={input.transcript_id}") if not settings.ZULIP_REALM: - logger.info("[Hatchet] post_zulip skipped (Zulip not configured)") + ctx.log("post_zulip skipped (Zulip not configured)") return ZulipResult(zulip_message_id=None, skipped=True) async with fresh_db_connection(): @@ -866,7 +819,7 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult: transcript = await transcripts_controller.get_by_id(input.transcript_id) if transcript: message_id = await post_transcript_notification(transcript) - logger.info("[Hatchet] post_zulip complete", zulip_message_id=message_id) + ctx.log(f"post_zulip complete: zulip_message_id={message_id}") else: message_id = None @@ -879,10 +832,10 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult: @with_error_handling("send_webhook", set_error_status=False) async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult: """Send completion webhook to external service.""" - logger.info("[Hatchet] send_webhook", transcript_id=input.transcript_id) + ctx.log(f"send_webhook: transcript_id={input.transcript_id}") if not input.room_id: - logger.info("[Hatchet] send_webhook skipped (no room_id)") + ctx.log("send_webhook skipped (no room_id)") return WebhookResult(webhook_sent=False, skipped=True) async with fresh_db_connection(): @@ -906,9 +859,7 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult: ) response.raise_for_status() - logger.info( - "[Hatchet] send_webhook complete", status_code=response.status_code - ) + ctx.log(f"send_webhook complete: status_code={response.status_code}") return WebhookResult(webhook_sent=True, response_code=response.status_code) diff --git a/server/reflector/hatchet/workflows/models.py b/server/reflector/hatchet/workflows/models.py index 13b34e07..bc3577cf 100644 --- a/server/reflector/hatchet/workflows/models.py +++ b/server/reflector/hatchet/workflows/models.py @@ -107,8 +107,6 @@ class FinalizeResult(BaseModel): class ConsentResult(BaseModel): """Result from cleanup_consent task.""" - consent_checked: bool - class ZulipResult(BaseModel): """Result from post_zulip task."""