diff --git a/server/reflector/hatchet/utils.py b/server/reflector/hatchet/utils.py new file mode 100644 index 00000000..d98bdf59 --- /dev/null +++ b/server/reflector/hatchet/utils.py @@ -0,0 +1,17 @@ +""" +Hatchet workflow utilities. + +Shared helpers for Hatchet task implementations. +""" + + +def to_dict(output) -> dict: + """Convert task output to dict, handling both dict and Pydantic model returns. + + Hatchet SDK can return task outputs as either raw dicts or Pydantic models + depending on serialization context. This normalizes the output for consistent + downstream processing. + """ + if isinstance(output, dict): + return output + return output.model_dump() diff --git a/server/reflector/processors/summary/summary_builder.py b/server/reflector/processors/summary/summary_builder.py index eb76d94f..c81c7669 100644 --- a/server/reflector/processors/summary/summary_builder.py +++ b/server/reflector/processors/summary/summary_builder.py @@ -166,7 +166,6 @@ class SummaryBuilder: self.model_name: str = llm.model_name self.logger = logger or structlog.get_logger() self.participant_instructions: str | None = None - self._logged_participant_instructions: bool = False if filename: self.read_transcript_from_file(filename) @@ -209,9 +208,7 @@ class SummaryBuilder: def _enhance_prompt_with_participants(self, prompt: str) -> str: """Add participant instructions to any prompt if participants are known.""" if self.participant_instructions: - if not self._logged_participant_instructions: - self.logger.debug("Adding participant instructions to prompts") - self._logged_participant_instructions = True + self.logger.debug("Adding participant instructions to prompts") return f"{prompt}\n\n{self.participant_instructions}" return prompt diff --git a/server/reflector/utils/audio_mixdown.py b/server/reflector/utils/audio_mixdown.py new file mode 100644 index 00000000..b61fc15d --- /dev/null +++ b/server/reflector/utils/audio_mixdown.py @@ -0,0 +1,221 @@ +""" +Audio track mixdown utilities. + +Shared PyAV-based functions for mixing multiple audio tracks into a single output. +Used by both Hatchet workflows and Celery pipelines. +""" + +from fractions import Fraction + +import av +from av.audio.resampler import AudioResampler + + +def detect_sample_rate_from_tracks(track_urls: list[str], logger=None) -> int | None: + """Detect sample rate from first decodable audio frame. + + Args: + track_urls: List of URLs to audio files (S3 presigned or local) + logger: Optional logger instance + + Returns: + Sample rate in Hz, or None if no decodable frames found + """ + for url in track_urls: + if not url: + continue + container = None + try: + container = av.open(url) + for frame in container.decode(audio=0): + return frame.sample_rate + except Exception: + continue + finally: + if container is not None: + container.close() + return None + + +async def mixdown_tracks_pyav( + track_urls: list[str], + writer, + target_sample_rate: int, + offsets_seconds: list[float] | None = None, + logger=None, +) -> None: + """Multi-track mixdown using PyAV filter graph (amix). + + Builds a filter graph: N abuffer -> optional adelay -> amix -> aformat -> sink + Reads from S3 presigned URLs or local files, pushes mixed frames to writer. + + Args: + track_urls: List of URLs to audio tracks (S3 presigned or local) + writer: AudioFileWriterProcessor instance with async push() method + target_sample_rate: Sample rate for output (Hz) + offsets_seconds: Optional per-track delays in seconds for alignment. + If provided, must have same length as track_urls. Delays are relative + to the minimum offset (earliest track has delay=0). + logger: Optional logger instance + + Raises: + ValueError: If no valid tracks or containers can be opened + """ + valid_track_urls = [url for url in track_urls if url] + if not valid_track_urls: + if logger: + logger.error("Mixdown failed - no valid track URLs provided") + raise ValueError("Mixdown failed: No valid track URLs") + + # Calculate per-input delays if offsets provided + input_offsets_seconds = None + if offsets_seconds is not None: + input_offsets_seconds = [ + offsets_seconds[i] for i, url in enumerate(track_urls) if url + ] + + # Build PyAV filter graph: + # N abuffer (s32/stereo) + # -> optional adelay per input (for alignment) + # -> amix (s32) + # -> aformat(s16) + # -> sink + graph = av.filter.Graph() + inputs = [] + + for idx, url in enumerate(valid_track_urls): + args = ( + f"time_base=1/{target_sample_rate}:" + f"sample_rate={target_sample_rate}:" + f"sample_fmt=s32:" + f"channel_layout=stereo" + ) + in_ctx = graph.add("abuffer", args=args, name=f"in{idx}") + inputs.append(in_ctx) + + if not inputs: + if logger: + logger.error("Mixdown failed - no valid inputs for graph") + raise ValueError("Mixdown failed: No valid inputs for filter graph") + + mixer = graph.add("amix", args=f"inputs={len(inputs)}:normalize=0", name="mix") + + fmt = graph.add( + "aformat", + args=f"sample_fmts=s32:channel_layouts=stereo:sample_rates={target_sample_rate}", + name="fmt", + ) + + sink = graph.add("abuffersink", name="out") + + # Optional per-input delay before mixing + delays_ms: list[int] = [] + if input_offsets_seconds is not None: + base = min(input_offsets_seconds) if input_offsets_seconds else 0.0 + delays_ms = [ + max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds + ] + else: + delays_ms = [0 for _ in inputs] + + for idx, in_ctx in enumerate(inputs): + delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0 + if delay_ms > 0: + # adelay requires one value per channel; use same for stereo + adelay = graph.add( + "adelay", + args=f"delays={delay_ms}|{delay_ms}:all=1", + name=f"delay{idx}", + ) + in_ctx.link_to(adelay) + adelay.link_to(mixer, 0, idx) + else: + in_ctx.link_to(mixer, 0, idx) + + mixer.link_to(fmt) + fmt.link_to(sink) + graph.configure() + + containers = [] + try: + # Open all containers with cleanup guaranteed + for i, url in enumerate(valid_track_urls): + try: + c = av.open( + url, + options={ + # S3 streaming options + "reconnect": "1", + "reconnect_streamed": "1", + "reconnect_delay_max": "5", + }, + ) + containers.append(c) + except Exception as e: + if logger: + logger.warning( + "Mixdown: failed to open container from URL", + input=i, + url=url, + error=str(e), + ) + + if not containers: + if logger: + logger.error("Mixdown failed - no valid containers opened") + raise ValueError("Mixdown failed: Could not open any track containers") + + decoders = [c.decode(audio=0) for c in containers] + active = [True] * len(decoders) + resamplers = [ + AudioResampler(format="s32", layout="stereo", rate=target_sample_rate) + for _ in decoders + ] + + while any(active): + for i, (dec, is_active) in enumerate(zip(decoders, active)): + if not is_active: + continue + try: + frame = next(dec) + except StopIteration: + active[i] = False + # Signal end of stream to filter graph + inputs[i].push(None) + continue + + if frame.sample_rate != target_sample_rate: + continue + out_frames = resamplers[i].resample(frame) or [] + for rf in out_frames: + rf.sample_rate = target_sample_rate + rf.time_base = Fraction(1, target_sample_rate) + inputs[i].push(rf) + + while True: + try: + mixed = sink.pull() + except Exception: + break + mixed.sample_rate = target_sample_rate + mixed.time_base = Fraction(1, target_sample_rate) + await writer.push(mixed) + + # Flush remaining frames from filter graph + while True: + try: + mixed = sink.pull() + except Exception: + break + mixed.sample_rate = target_sample_rate + mixed.time_base = Fraction(1, target_sample_rate) + await writer.push(mixed) + + finally: + # Cleanup all containers, even if processing failed + for c in containers: + if c is not None: + try: + c.close() + except Exception: + pass # Best effort cleanup diff --git a/server/reflector/utils/audio_padding.py b/server/reflector/utils/audio_padding.py new file mode 100644 index 00000000..0fb6fabb --- /dev/null +++ b/server/reflector/utils/audio_padding.py @@ -0,0 +1,186 @@ +""" +Audio track padding utilities. + +Shared PyAV-based functions for extracting stream metadata and applying +silence padding to audio tracks. Used by both Hatchet workflows and Celery pipelines. +""" + +import math +from fractions import Fraction + +import av +from av.audio.resampler import AudioResampler + +from reflector.utils.audio_constants import ( + OPUS_DEFAULT_BIT_RATE, + OPUS_STANDARD_SAMPLE_RATE, +) + + +def extract_stream_start_time_from_container( + container, + track_idx: int, + logger=None, +) -> float: + """Extract meeting-relative start time from WebM stream metadata. + + Uses PyAV to read stream.start_time from WebM container. + More accurate than filename timestamps by ~209ms due to network/encoding delays. + + Args: + container: PyAV container opened from audio file/URL + track_idx: Track index for logging context + logger: Optional logger instance (structlog or stdlib compatible) + + Returns: + Start time in seconds (0.0 if extraction fails) + """ + start_time_seconds = 0.0 + try: + audio_streams = [s for s in container.streams if s.type == "audio"] + stream = audio_streams[0] if audio_streams else container.streams[0] + + # 1) Try stream-level start_time (most reliable for Daily.co tracks) + if stream.start_time is not None and stream.time_base is not None: + start_time_seconds = float(stream.start_time * stream.time_base) + + # 2) Fallback to container-level start_time (in av.time_base units) + if (start_time_seconds <= 0) and (container.start_time is not None): + start_time_seconds = float(container.start_time * av.time_base) + + # 3) Fallback to first packet DTS in stream.time_base + if start_time_seconds <= 0: + for packet in container.demux(stream): + if packet.dts is not None: + start_time_seconds = float(packet.dts * stream.time_base) + break + except Exception as e: + if logger: + logger.warning( + "PyAV metadata read failed; assuming 0 start_time", + track_idx=track_idx, + error=str(e), + ) + start_time_seconds = 0.0 + + if logger: + logger.info( + f"Track {track_idx} stream metadata: start_time={start_time_seconds:.3f}s", + track_idx=track_idx, + ) + return start_time_seconds + + +def apply_audio_padding_to_file( + in_container, + output_path: str, + start_time_seconds: float, + track_idx: int, + logger=None, +) -> None: + """Apply silence padding to audio track using PyAV filter graph. + + Uses adelay filter to prepend silence, aligning track to meeting start time. + Output is WebM/Opus format. + + Args: + in_container: PyAV container opened from source audio + output_path: Path for output WebM file + start_time_seconds: Amount of silence to prepend (in seconds) + track_idx: Track index for logging context + logger: Optional logger instance (structlog or stdlib compatible) + + Raises: + Exception: If no audio stream found or PyAV processing fails + """ + delay_ms = math.floor(start_time_seconds * 1000) + + if logger: + logger.info( + f"Padding track {track_idx} with {delay_ms}ms delay using PyAV", + track_idx=track_idx, + delay_ms=delay_ms, + ) + + try: + with av.open(output_path, "w", format="webm") as out_container: + in_stream = next( + (s for s in in_container.streams if s.type == "audio"), None + ) + if in_stream is None: + raise Exception("No audio stream in input") + + out_stream = out_container.add_stream( + "libopus", rate=OPUS_STANDARD_SAMPLE_RATE + ) + out_stream.bit_rate = OPUS_DEFAULT_BIT_RATE + graph = av.filter.Graph() + + abuf_args = ( + f"time_base=1/{OPUS_STANDARD_SAMPLE_RATE}:" + f"sample_rate={OPUS_STANDARD_SAMPLE_RATE}:" + f"sample_fmt=s16:" + f"channel_layout=stereo" + ) + src = graph.add("abuffer", args=abuf_args, name="src") + aresample_f = graph.add("aresample", args="async=1", name="ares") + # adelay requires one delay value per channel separated by '|' + delays_arg = f"{delay_ms}|{delay_ms}" + adelay_f = graph.add( + "adelay", args=f"delays={delays_arg}:all=1", name="delay" + ) + sink = graph.add("abuffersink", name="sink") + + src.link_to(aresample_f) + aresample_f.link_to(adelay_f) + adelay_f.link_to(sink) + graph.configure() + + resampler = AudioResampler( + format="s16", layout="stereo", rate=OPUS_STANDARD_SAMPLE_RATE + ) + + # Decode -> resample -> push through graph -> encode Opus + for frame in in_container.decode(in_stream): + out_frames = resampler.resample(frame) or [] + for rframe in out_frames: + rframe.sample_rate = OPUS_STANDARD_SAMPLE_RATE + rframe.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + src.push(rframe) + + while True: + try: + f_out = sink.pull() + except Exception: + break + f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE + f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + for packet in out_stream.encode(f_out): + out_container.mux(packet) + + # Flush remaining frames from filter graph + src.push(None) + while True: + try: + f_out = sink.pull() + except Exception: + break + f_out.sample_rate = OPUS_STANDARD_SAMPLE_RATE + f_out.time_base = Fraction(1, OPUS_STANDARD_SAMPLE_RATE) + for packet in out_stream.encode(f_out): + out_container.mux(packet) + + # Flush encoder + for packet in out_stream.encode(None): + out_container.mux(packet) + + except Exception as e: + if logger: + logger.error( + "PyAV padding failed for track", + track_idx=track_idx, + delay_ms=delay_ms, + error=str(e), + exc_info=True, + ) + raise diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 801d5bd5..fde24e0f 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -320,13 +320,11 @@ async def _process_multitrack_recording_inner( transcript_id=transcript.id, ) - # Store workflow_run_id on transcript for replay/resume await transcripts_controller.update( transcript, {"workflow_run_id": workflow_id} ) durable_started = True - # If durable workflow started, skip Celery if durable_started: return diff --git a/server/reflector/zulip.py b/server/reflector/zulip.py index 99a98627..2b83ddf0 100644 --- a/server/reflector/zulip.py +++ b/server/reflector/zulip.py @@ -119,6 +119,7 @@ async def post_transcript_notification(transcript: Transcript) -> int | None: Uses transcript.room_id directly (Hatchet flow). Celery's pipeline_post_to_zulip uses recording→meeting→room path instead. + DUPLICATION NOTE: This function will stay when we use Celery no more, and Celery one will be removed. """ if not transcript.room_id: return None