dry hatched with celery - 2

This commit is contained in:
Igor Loskutov
2025-12-17 15:11:33 -05:00
parent d683a83906
commit f7f2957fc9
6 changed files with 426 additions and 6 deletions

View File

@@ -0,0 +1,17 @@
"""
Hatchet workflow utilities.
Shared helpers for Hatchet task implementations.
"""
def to_dict(output) -> dict:
"""Convert task output to dict, handling both dict and Pydantic model returns.
Hatchet SDK can return task outputs as either raw dicts or Pydantic models
depending on serialization context. This normalizes the output for consistent
downstream processing.
"""
if isinstance(output, dict):
return output
return output.model_dump()

View File

@@ -166,7 +166,6 @@ class SummaryBuilder:
self.model_name: str = llm.model_name
self.logger = logger or structlog.get_logger()
self.participant_instructions: str | None = None
self._logged_participant_instructions: bool = False
if filename:
self.read_transcript_from_file(filename)
@@ -209,9 +208,7 @@ class SummaryBuilder:
def _enhance_prompt_with_participants(self, prompt: str) -> str:
"""Add participant instructions to any prompt if participants are known."""
if self.participant_instructions:
if not self._logged_participant_instructions:
self.logger.debug("Adding participant instructions to prompts")
self._logged_participant_instructions = True
self.logger.debug("Adding participant instructions to prompts")
return f"{prompt}\n\n{self.participant_instructions}"
return prompt

View File

@@ -0,0 +1,221 @@
"""
Audio track mixdown utilities.
Shared PyAV-based functions for mixing multiple audio tracks into a single output.
Used by both Hatchet workflows and Celery pipelines.
"""
from fractions import Fraction
import av
from av.audio.resampler import AudioResampler
def detect_sample_rate_from_tracks(track_urls: list[str], logger=None) -> int | None:
"""Detect sample rate from first decodable audio frame.
Args:
track_urls: List of URLs to audio files (S3 presigned or local)
logger: Optional logger instance
Returns:
Sample rate in Hz, or None if no decodable frames found
"""
for url in track_urls:
if not url:
continue
container = None
try:
container = av.open(url)
for frame in container.decode(audio=0):
return frame.sample_rate
except Exception:
continue
finally:
if container is not None:
container.close()
return None
async def mixdown_tracks_pyav(
track_urls: list[str],
writer,
target_sample_rate: int,
offsets_seconds: list[float] | None = None,
logger=None,
) -> None:
"""Multi-track mixdown using PyAV filter graph (amix).
Builds a filter graph: N abuffer -> optional adelay -> amix -> aformat -> sink
Reads from S3 presigned URLs or local files, pushes mixed frames to writer.
Args:
track_urls: List of URLs to audio tracks (S3 presigned or local)
writer: AudioFileWriterProcessor instance with async push() method
target_sample_rate: Sample rate for output (Hz)
offsets_seconds: Optional per-track delays in seconds for alignment.
If provided, must have same length as track_urls. Delays are relative
to the minimum offset (earliest track has delay=0).
logger: Optional logger instance
Raises:
ValueError: If no valid tracks or containers can be opened
"""
valid_track_urls = [url for url in track_urls if url]
if not valid_track_urls:
if logger:
logger.error("Mixdown failed - no valid track URLs provided")
raise ValueError("Mixdown failed: No valid track URLs")
# Calculate per-input delays if offsets provided
input_offsets_seconds = None
if offsets_seconds is not None:
input_offsets_seconds = [
offsets_seconds[i] for i, url in enumerate(track_urls) if url
]
# Build PyAV filter graph:
# N abuffer (s32/stereo)
# -> optional adelay per input (for alignment)
# -> amix (s32)
# -> aformat(s16)
# -> sink
graph = av.filter.Graph()
inputs = []
for idx, url in enumerate(valid_track_urls):
args = (
f"time_base=1/{target_sample_rate}:"
f"sample_rate={target_sample_rate}:"
f"sample_fmt=s32:"
f"channel_layout=stereo"
)
in_ctx = graph.add("abuffer", args=args, name=f"in{idx}")
inputs.append(in_ctx)
if not inputs:
if logger:
logger.error("Mixdown failed - no valid inputs for graph")
raise ValueError("Mixdown failed: No valid inputs for filter graph")
mixer = graph.add("amix", args=f"inputs={len(inputs)}:normalize=0", name="mix")
fmt = graph.add(
"aformat",
args=f"sample_fmts=s32:channel_layouts=stereo:sample_rates={target_sample_rate}",
name="fmt",
)
sink = graph.add("abuffersink", name="out")
# Optional per-input delay before mixing
delays_ms: list[int] = []
if input_offsets_seconds is not None:
base = min(input_offsets_seconds) if input_offsets_seconds else 0.0
delays_ms = [
max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds
]
else:
delays_ms = [0 for _ in inputs]
for idx, in_ctx in enumerate(inputs):
delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0
if delay_ms > 0:
# adelay requires one value per channel; use same for stereo
adelay = graph.add(
"adelay",
args=f"delays={delay_ms}|{delay_ms}:all=1",
name=f"delay{idx}",
)
in_ctx.link_to(adelay)
adelay.link_to(mixer, 0, idx)
else:
in_ctx.link_to(mixer, 0, idx)
mixer.link_to(fmt)
fmt.link_to(sink)
graph.configure()
containers = []
try:
# Open all containers with cleanup guaranteed
for i, url in enumerate(valid_track_urls):
try:
c = av.open(
url,
options={
# S3 streaming options
"reconnect": "1",
"reconnect_streamed": "1",
"reconnect_delay_max": "5",
},
)
containers.append(c)
except Exception as e:
if logger:
logger.warning(
"Mixdown: failed to open container from URL",
input=i,
url=url,
error=str(e),
)
if not containers:
if logger:
logger.error("Mixdown failed - no valid containers opened")
raise ValueError("Mixdown failed: Could not open any track containers")
decoders = [c.decode(audio=0) for c in containers]
active = [True] * len(decoders)
resamplers = [
AudioResampler(format="s32", layout="stereo", rate=target_sample_rate)
for _ in decoders
]
while any(active):
for i, (dec, is_active) in enumerate(zip(decoders, active)):
if not is_active:
continue
try:
frame = next(dec)
except StopIteration:
active[i] = False
# Signal end of stream to filter graph
inputs[i].push(None)
continue
if frame.sample_rate != target_sample_rate:
continue
out_frames = resamplers[i].resample(frame) or []
for rf in out_frames:
rf.sample_rate = target_sample_rate
rf.time_base = Fraction(1, target_sample_rate)
inputs[i].push(rf)
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
# Flush remaining frames from filter graph
while True:
try:
mixed = sink.pull()
except Exception:
break
mixed.sample_rate = target_sample_rate
mixed.time_base = Fraction(1, target_sample_rate)
await writer.push(mixed)
finally:
# Cleanup all containers, even if processing failed
for c in containers:
if c is not None:
try:
c.close()
except Exception:
pass # Best effort cleanup

View File

@@ -0,0 +1,186 @@
"""
Audio track padding utilities.
Shared PyAV-based functions for extracting stream metadata and applying
silence padding to audio tracks. Used by both Hatchet workflows and Celery pipelines.
"""
import math
from fractions import Fraction
import av
from av.audio.resampler import AudioResampler
from reflector.utils.audio_constants import (
OPUS_DEFAULT_BIT_RATE,
OPUS_STANDARD_SAMPLE_RATE,
)
def extract_stream_start_time_from_container(
container,
track_idx: int,
logger=None,
) -> float:
"""Extract meeting-relative start time from WebM stream metadata.
Uses PyAV to read stream.start_time from WebM container.
More accurate than filename timestamps by ~209ms due to network/encoding delays.
Args:
container: PyAV container opened from audio file/URL
track_idx: Track index for logging context
logger: Optional logger instance (structlog or stdlib compatible)
Returns:
Start time in seconds (0.0 if extraction fails)
"""
start_time_seconds = 0.0
try:
audio_streams = [s for s in container.streams if s.type == "audio"]
stream = audio_streams[0] if audio_streams else container.streams[0]
# 1) Try stream-level start_time (most reliable for Daily.co tracks)
if stream.start_time is not None and stream.time_base is not None:
start_time_seconds = float(stream.start_time * stream.time_base)
# 2) Fallback to container-level start_time (in av.time_base units)
if (start_time_seconds <= 0) and (container.start_time is not None):
start_time_seconds = float(container.start_time * av.time_base)
# 3) Fallback to first packet DTS in stream.time_base
if start_time_seconds <= 0:
for packet in container.demux(stream):
if packet.dts is not None:
start_time_seconds = float(packet.dts * stream.time_base)
break
except Exception as e:
if logger:
logger.warning(
"PyAV metadata read failed; assuming 0 start_time",
track_idx=track_idx,
error=str(e),
)
start_time_seconds = 0.0
if logger:
logger.info(
f"Track {track_idx} stream metadata: start_time={start_time_seconds:.3f}s",
track_idx=track_idx,
)
return start_time_seconds
def apply_audio_padding_to_file(
in_container,
output_path: str,
start_time_seconds: float,
track_idx: int,
logger=None,
) -> None:
"""Apply silence padding to audio track using PyAV filter graph.
Uses adelay filter to prepend silence, aligning track to meeting start time.
Output is WebM/Opus format.
Args:
in_container: PyAV container opened from source audio
output_path: Path for output WebM file
start_time_seconds: Amount of silence to prepend (in seconds)
track_idx: Track index for logging context
logger: Optional logger instance (structlog or stdlib compatible)
Raises:
Exception: If no audio stream found or PyAV processing fails
"""
delay_ms = math.floor(start_time_seconds * 1000)
if logger:
logger.info(
f"Padding track {track_idx} with {delay_ms}ms delay using PyAV",
track_idx=track_idx,
delay_ms=delay_ms,
)
try:
with av.open(output_path, "w", format="webm") as out_container:
in_stream = next(
(s for s in in_container.streams if s.type == "audio"), None
)
if in_stream is None:
raise Exception("No audio stream in input")
out_stream = out_container.add_stream(
"libopus", rate=OPUS_STANDARD_SAMPLE_RATE
)
out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE
graph = av.filter.Graph()
abuf_args = (
f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:"
f"sample_fmt=s16:"
f"channel_layout=stereo"
)
src = graph.add("abuffer", args=abuf_args, name="src")
aresample_f = graph.add("aresample", args="async=1", name="ares")
# adelay requires one delay value per channel separated by '|'
delays_arg = f"{delay_ms}|{delay_ms}"
adelay_f = graph.add(
"adelay", args=f"delays={delays_arg}:all=1", name="delay"
)
sink = graph.add("abuffersink", name="sink")
src.link_to(aresample_f)
aresample_f.link_to(adelay_f)
adelay_f.link_to(sink)
graph.configure()
resampler = AudioResampler(
format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE
)
# Decode -> resample -> push through graph -> encode Opus
for frame in in_container.decode(in_stream):
out_frames = resampler.resample(frame) or []
for rframe in out_frames:
rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE
rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
src.push(rframe)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush remaining frames from filter graph
src.push(None)
while True:
try:
f_out = sink.pull()
except Exception:
break
f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE
f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE)
for packet in out_stream.encode(f_out):
out_container.mux(packet)
# Flush encoder
for packet in out_stream.encode(None):
out_container.mux(packet)
except Exception as e:
if logger:
logger.error(
"PyAV padding failed for track",
track_idx=track_idx,
delay_ms=delay_ms,
error=str(e),
exc_info=True,
)
raise

View File

@@ -320,13 +320,11 @@ async def _process_multitrack_recording_inner(
transcript_id=transcript.id,
)
# Store workflow_run_id on transcript for replay/resume
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
durable_started = True
# If durable workflow started, skip Celery
if durable_started:
return

View File

@@ -119,6 +119,7 @@ async def post_transcript_notification(transcript: Transcript) -> int | None:
Uses transcript.room_id directly (Hatchet flow).
Celery's pipeline_post_to_zulip uses recording→meeting→room path instead.
DUPLICATION NOTE: This function will stay when we use Celery no more, and Celery one will be removed.
"""
if not transcript.room_id:
return None