From 409c257889fb4e0ce8b16ca83ba71ec85e70860a Mon Sep 17 00:00:00 2001 From: Igor Loskutov Date: Tue, 16 Dec 2025 16:31:29 -0500 Subject: [PATCH] hatched logs --- .../hatchet/workflows/diarization_pipeline.py | 60 +++++++++++++++++-- server/reflector/hatchet/workflows/models.py | 12 +++- .../hatchet/workflows/track_processing.py | 50 +++++++++++----- 3 files changed, 101 insertions(+), 21 deletions(-) diff --git a/server/reflector/hatchet/workflows/diarization_pipeline.py b/server/reflector/hatchet/workflows/diarization_pipeline.py index bf15ae46..d30e2b7d 100644 --- a/server/reflector/hatchet/workflows/diarization_pipeline.py +++ b/server/reflector/hatchet/workflows/diarization_pipeline.py @@ -23,6 +23,7 @@ from reflector.hatchet.workflows.models import ( ConsentResult, FinalizeResult, MixdownResult, + PaddedTrackInfo, ParticipantsResult, ProcessTracksResult, RecordingResult, @@ -178,6 +179,7 @@ def with_error_handling(step_name: str, set_error_status: bool = True) -> Callab @with_error_handling("get_recording") async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: """Fetch recording metadata from Daily.co API.""" + ctx.log(f"get_recording: recording_id={input.recording_id}") logger.info("[Hatchet] get_recording", recording_id=input.recording_id) await emit_progress_async( @@ -217,6 +219,9 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client: recording = await client.get_recording(input.recording_id) + ctx.log( + f"get_recording complete: room={recording.room_name}, duration={recording.duration}s" + ) logger.info( "[Hatchet] get_recording complete", recording_id=input.recording_id, @@ -242,6 +247,7 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: @with_error_handling("get_participants") async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult: """Fetch participant list from Daily.co API and update transcript in database.""" + ctx.log(f"get_participants: transcript_id={input.transcript_id}") logger.info("[Hatchet] get_participants", transcript_id=input.transcript_id) await emit_progress_async( @@ -338,6 +344,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe } ) + ctx.log(f"get_participants complete: {len(participants_list)} participants") logger.info( "[Hatchet] get_participants complete", participant_count=len(participants_list), @@ -361,6 +368,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe @with_error_handling("process_tracks") async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult: """Spawn child workflows for each track (dynamic fan-out).""" + ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows") logger.info( "[Hatchet] process_tracks", num_tracks=len(input.tracks), @@ -392,7 +400,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes # Collect results from each track (don't mutate lists while iterating) track_words = [] - padded_urls = [] + padded_tracks = [] created_padded_files = set() for result in results: @@ -400,7 +408,14 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes track_words.append(transcribe_result.get("words", [])) pad_result = result.get("pad_track", {}) - padded_urls.append(pad_result.get("padded_url")) + padded_key = pad_result.get("padded_key") + bucket_name = pad_result.get("bucket_name") + + # Store S3 key info (not presigned URL) - consumer tasks presign on demand + if padded_key: + padded_tracks.append( + PaddedTrackInfo(key=padded_key, bucket_name=bucket_name) + ) # Track padded files for cleanup track_index = pad_result.get("track_index") @@ -412,6 +427,9 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes all_words = [word for words in track_words for word in words] all_words.sort(key=lambda w: w.get("start", 0)) + ctx.log( + f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks" + ) logger.info( "[Hatchet] process_tracks complete", num_tracks=len(input.tracks), @@ -420,7 +438,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes return ProcessTracksResult( all_words=all_words, - padded_urls=padded_urls, + padded_tracks=padded_tracks, word_count=len(all_words), num_tracks=len(input.tracks), target_language=target_language, @@ -434,6 +452,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes @with_error_handling("mixdown_tracks") async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: """Mix all padded tracks into single audio file using PyAV (same as Celery).""" + ctx.log("mixdown_tracks: mixing padded tracks into single audio file") logger.info("[Hatchet] mixdown_tracks", transcript_id=input.transcript_id) await emit_progress_async( @@ -441,13 +460,33 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: ) track_data = _to_dict(ctx.task_output(process_tracks)) - padded_urls = track_data.get("padded_urls", []) + padded_tracks_data = track_data.get("padded_tracks", []) - if not padded_urls: + if not padded_tracks_data: raise ValueError("No padded tracks to mixdown") storage = _get_storage() + # Presign URLs on demand (avoids stale URLs on workflow replay) + padded_urls = [] + for track_info in padded_tracks_data: + # Handle both dict (from _to_dict) and PaddedTrackInfo + if isinstance(track_info, dict): + key = track_info.get("key") + bucket = track_info.get("bucket_name") + else: + key = track_info.key + bucket = track_info.bucket_name + + if key: + url = await storage.get_file_url( + key, + operation="get_object", + expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, + bucket=bucket, + ) + padded_urls.append(url) + # Use PipelineMainMultitrack.mixdown_tracks which uses PyAV filter graph from fractions import Fraction @@ -616,6 +655,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: transcript, {"audio_location": "storage"} ) + ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}") logger.info( "[Hatchet] mixdown_tracks uploaded", key=storage_path, @@ -724,6 +764,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul @with_error_handling("detect_topics") async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: """Detect topics using LLM and save to database (matches Celery on_topic callback).""" + ctx.log("detect_topics: analyzing transcript for topics") logger.info("[Hatchet] detect_topics", transcript_id=input.transcript_id) await emit_progress_async( @@ -776,6 +817,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: topics_list = [t.model_dump() for t in topics] + ctx.log(f"detect_topics complete: found {len(topics_list)} topics") logger.info("[Hatchet] detect_topics complete", topic_count=len(topics_list)) await emit_progress_async( @@ -791,6 +833,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: @with_error_handling("generate_title") async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult: """Generate meeting title using LLM and save to database (matches Celery on_title callback).""" + ctx.log("generate_title: generating title from topics") logger.info("[Hatchet] generate_title", transcript_id=input.transcript_id) await emit_progress_async( @@ -836,6 +879,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult: logger=logger, ) + ctx.log(f"generate_title complete: '{title_result}'") logger.info("[Hatchet] generate_title complete", title=title_result) await emit_progress_async( @@ -851,6 +895,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult: @with_error_handling("generate_summary") async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult: """Generate meeting summary using LLM and save to database (matches Celery callbacks).""" + ctx.log("generate_summary: generating long and short summaries") logger.info("[Hatchet] generate_summary", transcript_id=input.transcript_id) await emit_progress_async( @@ -920,6 +965,7 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult: logger=logger, ) + ctx.log("generate_summary complete") logger.info("[Hatchet] generate_summary complete") await emit_progress_async( @@ -941,6 +987,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: Matches Celery's on_transcript + set_status behavior. Note: Title and summaries are already saved by their respective task callbacks. """ + ctx.log("finalize: saving transcript and setting status to 'ended'") logger.info("[Hatchet] finalize", transcript_id=input.transcript_id) await emit_progress_async( @@ -991,6 +1038,9 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: # Set status to "ended" await transcripts_controller.set_status(input.transcript_id, "ended") + ctx.log( + f"finalize complete: transcript {input.transcript_id} status set to 'ended'" + ) logger.info("[Hatchet] finalize complete", transcript_id=input.transcript_id) await emit_progress_async( diff --git a/server/reflector/hatchet/workflows/models.py b/server/reflector/hatchet/workflows/models.py index 9011bc86..7373e205 100644 --- a/server/reflector/hatchet/workflows/models.py +++ b/server/reflector/hatchet/workflows/models.py @@ -17,7 +17,8 @@ from pydantic import BaseModel class PadTrackResult(BaseModel): """Result from pad_track task.""" - padded_url: str + padded_key: str # S3 key (not presigned URL) - presign on demand to avoid stale URLs on replay + bucket_name: str | None # None means use default transcript storage bucket size: int track_index: int @@ -52,11 +53,18 @@ class ParticipantsResult(BaseModel): target_language: str +class PaddedTrackInfo(BaseModel): + """Info for a padded track - S3 key + bucket for on-demand presigning.""" + + key: str + bucket_name: str | None # None = use default storage bucket + + class ProcessTracksResult(BaseModel): """Result from process_tracks task.""" all_words: list[dict[str, Any]] - padded_urls: list[str | None] + padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs word_count: int num_tracks: int target_language: str diff --git a/server/reflector/hatchet/workflows/track_processing.py b/server/reflector/hatchet/workflows/track_processing.py index c5f5ac4f..9a2ec709 100644 --- a/server/reflector/hatchet/workflows/track_processing.py +++ b/server/reflector/hatchet/workflows/track_processing.py @@ -176,6 +176,7 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: Extracts stream.start_time from WebM container metadata and applies silence padding using PyAV filter graph (adelay). """ + ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}") logger.info( "[Hatchet] pad_track", track_index=input.track_index, @@ -213,7 +214,7 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: in_container, input.track_index ) - # If no padding needed, return original URL + # If no padding needed, return original S3 key if start_time_seconds <= 0: logger.info( f"Track {input.track_index} requires no padding", @@ -223,7 +224,8 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: input.transcript_id, "pad_track", "completed", ctx.workflow_run_id ) return PadTrackResult( - padded_url=source_url, + padded_key=input.s3_key, + bucket_name=input.bucket_name, size=0, track_index=input.track_index, ) @@ -257,25 +259,22 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: finally: Path(temp_path).unlink(missing_ok=True) - # Get presigned URL for padded file - padded_url = await storage.get_file_url( - storage_path, - operation="get_object", - expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, - ) - + ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}") logger.info( "[Hatchet] pad_track complete", track_index=input.track_index, - padded_url=padded_url[:50] + "...", + padded_key=storage_path, ) await emit_progress_async( input.transcript_id, "pad_track", "completed", ctx.workflow_run_id ) + # Return S3 key (not presigned URL) - consumer tasks presign on demand + # This avoids stale URLs when workflow is replayed return PadTrackResult( - padded_url=padded_url, + padded_key=storage_path, + bucket_name=None, # None = use default transcript storage bucket size=file_size, track_index=input.track_index, ) @@ -293,6 +292,7 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: ) async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult: """Transcribe audio track using GPU (Modal.com) or local Whisper.""" + ctx.log(f"transcribe_track: track {input.track_index}, language={input.language}") logger.info( "[Hatchet] transcribe_track", track_index=input.track_index, @@ -305,10 +305,29 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe try: pad_result = _to_dict(ctx.task_output(pad_track)) - audio_url = pad_result.get("padded_url") + padded_key = pad_result.get("padded_key") + bucket_name = pad_result.get("bucket_name") - if not audio_url: - raise ValueError("Missing padded_url from pad_track") + if not padded_key: + raise ValueError("Missing padded_key from pad_track") + + # Presign URL on demand (avoids stale URLs on workflow replay) + from reflector.settings import settings + from reflector.storage.storage_aws import AwsStorage + + storage = AwsStorage( + aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME, + aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION, + aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID, + aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY, + ) + + audio_url = await storage.get_file_url( + padded_key, + operation="get_object", + expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, + bucket=bucket_name, + ) from reflector.pipelines.transcription_helpers import ( transcribe_file_with_processor, @@ -323,6 +342,9 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe word_dict["speaker"] = input.track_index words.append(word_dict) + ctx.log( + f"transcribe_track complete: track {input.track_index}, {len(words)} words" + ) logger.info( "[Hatchet] transcribe_track complete", track_index=input.track_index,