hatched logs

This commit is contained in:
Igor Loskutov
2025-12-16 16:31:29 -05:00
parent fce0945564
commit 409c257889
3 changed files with 101 additions and 21 deletions

View File

@@ -23,6 +23,7 @@ from reflector.hatchet.workflows.models import (
ConsentResult,
FinalizeResult,
MixdownResult,
PaddedTrackInfo,
ParticipantsResult,
ProcessTracksResult,
RecordingResult,
@@ -178,6 +179,7 @@ def with_error_handling(step_name: str, set_error_status: bool = True) -> Callab
@with_error_handling("get_recording")
async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
"""Fetch recording metadata from Daily.co API."""
ctx.log(f"get_recording: recording_id={input.recording_id}")
logger.info("[Hatchet] get_recording", recording_id=input.recording_id)
await emit_progress_async(
@@ -217,6 +219,9 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client:
recording = await client.get_recording(input.recording_id)
ctx.log(
f"get_recording complete: room={recording.room_name}, duration={recording.duration}s"
)
logger.info(
"[Hatchet] get_recording complete",
recording_id=input.recording_id,
@@ -242,6 +247,7 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
@with_error_handling("get_participants")
async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsResult:
"""Fetch participant list from Daily.co API and update transcript in database."""
ctx.log(f"get_participants: transcript_id={input.transcript_id}")
logger.info("[Hatchet] get_participants", transcript_id=input.transcript_id)
await emit_progress_async(
@@ -338,6 +344,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
}
)
ctx.log(f"get_participants complete: {len(participants_list)} participants")
logger.info(
"[Hatchet] get_participants complete",
participant_count=len(participants_list),
@@ -361,6 +368,7 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
@with_error_handling("process_tracks")
async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksResult:
"""Spawn child workflows for each track (dynamic fan-out)."""
ctx.log(f"process_tracks: spawning {len(input.tracks)} track workflows")
logger.info(
"[Hatchet] process_tracks",
num_tracks=len(input.tracks),
@@ -392,7 +400,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
# Collect results from each track (don't mutate lists while iterating)
track_words = []
padded_urls = []
padded_tracks = []
created_padded_files = set()
for result in results:
@@ -400,7 +408,14 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
track_words.append(transcribe_result.get("words", []))
pad_result = result.get("pad_track", {})
padded_urls.append(pad_result.get("padded_url"))
padded_key = pad_result.get("padded_key")
bucket_name = pad_result.get("bucket_name")
# Store S3 key info (not presigned URL) - consumer tasks presign on demand
if padded_key:
padded_tracks.append(
PaddedTrackInfo(key=padded_key, bucket_name=bucket_name)
)
# Track padded files for cleanup
track_index = pad_result.get("track_index")
@@ -412,6 +427,9 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
all_words = [word for words in track_words for word in words]
all_words.sort(key=lambda w: w.get("start", 0))
ctx.log(
f"process_tracks complete: {len(all_words)} words from {len(input.tracks)} tracks"
)
logger.info(
"[Hatchet] process_tracks complete",
num_tracks=len(input.tracks),
@@ -420,7 +438,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
return ProcessTracksResult(
all_words=all_words,
padded_urls=padded_urls,
padded_tracks=padded_tracks,
word_count=len(all_words),
num_tracks=len(input.tracks),
target_language=target_language,
@@ -434,6 +452,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
@with_error_handling("mixdown_tracks")
async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
"""Mix all padded tracks into single audio file using PyAV (same as Celery)."""
ctx.log("mixdown_tracks: mixing padded tracks into single audio file")
logger.info("[Hatchet] mixdown_tracks", transcript_id=input.transcript_id)
await emit_progress_async(
@@ -441,13 +460,33 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
)
track_data = _to_dict(ctx.task_output(process_tracks))
padded_urls = track_data.get("padded_urls", [])
padded_tracks_data = track_data.get("padded_tracks", [])
if not padded_urls:
if not padded_tracks_data:
raise ValueError("No padded tracks to mixdown")
storage = _get_storage()
# Presign URLs on demand (avoids stale URLs on workflow replay)
padded_urls = []
for track_info in padded_tracks_data:
# Handle both dict (from _to_dict) and PaddedTrackInfo
if isinstance(track_info, dict):
key = track_info.get("key")
bucket = track_info.get("bucket_name")
else:
key = track_info.key
bucket = track_info.bucket_name
if key:
url = await storage.get_file_url(
key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=bucket,
)
padded_urls.append(url)
# Use PipelineMainMultitrack.mixdown_tracks which uses PyAV filter graph
from fractions import Fraction
@@ -616,6 +655,7 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
transcript, {"audio_location": "storage"}
)
ctx.log(f"mixdown_tracks complete: uploaded {file_size} bytes to {storage_path}")
logger.info(
"[Hatchet] mixdown_tracks uploaded",
key=storage_path,
@@ -724,6 +764,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
@with_error_handling("detect_topics")
async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
"""Detect topics using LLM and save to database (matches Celery on_topic callback)."""
ctx.log("detect_topics: analyzing transcript for topics")
logger.info("[Hatchet] detect_topics", transcript_id=input.transcript_id)
await emit_progress_async(
@@ -776,6 +817,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
topics_list = [t.model_dump() for t in topics]
ctx.log(f"detect_topics complete: found {len(topics_list)} topics")
logger.info("[Hatchet] detect_topics complete", topic_count=len(topics_list))
await emit_progress_async(
@@ -791,6 +833,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
@with_error_handling("generate_title")
async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
"""Generate meeting title using LLM and save to database (matches Celery on_title callback)."""
ctx.log("generate_title: generating title from topics")
logger.info("[Hatchet] generate_title", transcript_id=input.transcript_id)
await emit_progress_async(
@@ -836,6 +879,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
logger=logger,
)
ctx.log(f"generate_title complete: '{title_result}'")
logger.info("[Hatchet] generate_title complete", title=title_result)
await emit_progress_async(
@@ -851,6 +895,7 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
@with_error_handling("generate_summary")
async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
"""Generate meeting summary using LLM and save to database (matches Celery callbacks)."""
ctx.log("generate_summary: generating long and short summaries")
logger.info("[Hatchet] generate_summary", transcript_id=input.transcript_id)
await emit_progress_async(
@@ -920,6 +965,7 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
logger=logger,
)
ctx.log("generate_summary complete")
logger.info("[Hatchet] generate_summary complete")
await emit_progress_async(
@@ -941,6 +987,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
Matches Celery's on_transcript + set_status behavior.
Note: Title and summaries are already saved by their respective task callbacks.
"""
ctx.log("finalize: saving transcript and setting status to 'ended'")
logger.info("[Hatchet] finalize", transcript_id=input.transcript_id)
await emit_progress_async(
@@ -991,6 +1038,9 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
# Set status to "ended"
await transcripts_controller.set_status(input.transcript_id, "ended")
ctx.log(
f"finalize complete: transcript {input.transcript_id} status set to 'ended'"
)
logger.info("[Hatchet] finalize complete", transcript_id=input.transcript_id)
await emit_progress_async(

View File

@@ -17,7 +17,8 @@ from pydantic import BaseModel
class PadTrackResult(BaseModel):
"""Result from pad_track task."""
padded_url: str
padded_key: str # S3 key (not presigned URL) - presign on demand to avoid stale URLs on replay
bucket_name: str | None # None means use default transcript storage bucket
size: int
track_index: int
@@ -52,11 +53,18 @@ class ParticipantsResult(BaseModel):
target_language: str
class PaddedTrackInfo(BaseModel):
"""Info for a padded track - S3 key + bucket for on-demand presigning."""
key: str
bucket_name: str | None # None = use default storage bucket
class ProcessTracksResult(BaseModel):
"""Result from process_tracks task."""
all_words: list[dict[str, Any]]
padded_urls: list[str | None]
padded_tracks: list[PaddedTrackInfo] # S3 keys, not presigned URLs
word_count: int
num_tracks: int
target_language: str

View File

@@ -176,6 +176,7 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
Extracts stream.start_time from WebM container metadata and applies
silence padding using PyAV filter graph (adelay).
"""
ctx.log(f"pad_track: track {input.track_index}, s3_key={input.s3_key}")
logger.info(
"[Hatchet] pad_track",
track_index=input.track_index,
@@ -213,7 +214,7 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
in_container, input.track_index
)
# If no padding needed, return original URL
# If no padding needed, return original S3 key
if start_time_seconds <= 0:
logger.info(
f"Track {input.track_index} requires no padding",
@@ -223,7 +224,8 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
input.transcript_id, "pad_track", "completed", ctx.workflow_run_id
)
return PadTrackResult(
padded_url=source_url,
padded_key=input.s3_key,
bucket_name=input.bucket_name,
size=0,
track_index=input.track_index,
)
@@ -257,25 +259,22 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
finally:
Path(temp_path).unlink(missing_ok=True)
# Get presigned URL for padded file
padded_url = await storage.get_file_url(
storage_path,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
ctx.log(f"pad_track complete: track {input.track_index} -> {storage_path}")
logger.info(
"[Hatchet] pad_track complete",
track_index=input.track_index,
padded_url=padded_url[:50] + "...",
padded_key=storage_path,
)
await emit_progress_async(
input.transcript_id, "pad_track", "completed", ctx.workflow_run_id
)
# Return S3 key (not presigned URL) - consumer tasks presign on demand
# This avoids stale URLs when workflow is replayed
return PadTrackResult(
padded_url=padded_url,
padded_key=storage_path,
bucket_name=None, # None = use default transcript storage bucket
size=file_size,
track_index=input.track_index,
)
@@ -293,6 +292,7 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
)
async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackResult:
"""Transcribe audio track using GPU (Modal.com) or local Whisper."""
ctx.log(f"transcribe_track: track {input.track_index}, language={input.language}")
logger.info(
"[Hatchet] transcribe_track",
track_index=input.track_index,
@@ -305,10 +305,29 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
try:
pad_result = _to_dict(ctx.task_output(pad_track))
audio_url = pad_result.get("padded_url")
padded_key = pad_result.get("padded_key")
bucket_name = pad_result.get("bucket_name")
if not audio_url:
raise ValueError("Missing padded_url from pad_track")
if not padded_key:
raise ValueError("Missing padded_key from pad_track")
# Presign URL on demand (avoids stale URLs on workflow replay)
from reflector.settings import settings
from reflector.storage.storage_aws import AwsStorage
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
audio_url = await storage.get_file_url(
padded_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
bucket=bucket_name,
)
from reflector.pipelines.transcription_helpers import (
transcribe_file_with_processor,
@@ -323,6 +342,9 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
word_dict["speaker"] = input.track_index
words.append(word_dict)
ctx.log(
f"transcribe_track complete: track {input.track_index}, {len(words)} words"
)
logger.info(
"[Hatchet] transcribe_track complete",
track_index=input.track_index,