diff --git a/server/gpu/modal_deployments/reflector_transcriber_parakeet.py b/server/gpu/modal_deployments/reflector_transcriber_parakeet.py index df53a0ae..97e150e3 100644 --- a/server/gpu/modal_deployments/reflector_transcriber_parakeet.py +++ b/server/gpu/modal_deployments/reflector_transcriber_parakeet.py @@ -172,7 +172,7 @@ class TranscriberParakeetLive: text = output.text.strip() words = [ { - "word": word_info["word"], + "word": word_info["word"] + " ", "start": round(word_info["start"], 2), "end": round(word_info["end"], 2), } @@ -213,7 +213,7 @@ class TranscriberParakeetLive: words = [ { - "word": word_info["word"], + "word": word_info["word"] + " ", "start": round(word_info["start"], 2), "end": round(word_info["end"], 2), } @@ -386,7 +386,7 @@ class TranscriberParakeetFile: text = output.text.strip() words = [ { - "word": word_info["word"], + "word": word_info["word"] + " ", "start": round( word_info["start"] + start_time + timestamp_offset, 2 ), diff --git a/server/reflector/pipelines/main_live_pipeline.py b/server/reflector/pipelines/main_live_pipeline.py index 03fdbd65..b15fcb05 100644 --- a/server/reflector/pipelines/main_live_pipeline.py +++ b/server/reflector/pipelines/main_live_pipeline.py @@ -40,8 +40,9 @@ from reflector.db.transcripts import ( from reflector.logger import logger from reflector.pipelines.runner import PipelineMessage, PipelineRunner from reflector.processors import ( - AudioChunkerProcessor, + AudioChunkerAutoProcessor, AudioDiarizationAutoProcessor, + AudioDownscaleProcessor, AudioFileWriterProcessor, AudioMergeProcessor, AudioTranscriptAutoProcessor, @@ -365,7 +366,8 @@ class PipelineMainLive(PipelineMainBase): path=transcript.audio_wav_filename, on_duration=self.on_duration, ), - AudioChunkerProcessor(), + AudioDownscaleProcessor(), + AudioChunkerAutoProcessor(), AudioMergeProcessor(), AudioTranscriptAutoProcessor.as_threaded(), TranscriptLinerProcessor(), diff --git a/server/reflector/processors/__init__.py b/server/reflector/processors/__init__.py index e95d949e..ab0ad312 100644 --- a/server/reflector/processors/__init__.py +++ b/server/reflector/processors/__init__.py @@ -1,5 +1,7 @@ from .audio_chunker import AudioChunkerProcessor # noqa: F401 +from .audio_chunker_auto import AudioChunkerAutoProcessor # noqa: F401 from .audio_diarization_auto import AudioDiarizationAutoProcessor # noqa: F401 +from .audio_downscale import AudioDownscaleProcessor # noqa: F401 from .audio_file_writer import AudioFileWriterProcessor # noqa: F401 from .audio_merge import AudioMergeProcessor # noqa: F401 from .audio_transcript import AudioTranscriptProcessor # noqa: F401 diff --git a/server/reflector/processors/audio_chunker.py b/server/reflector/processors/audio_chunker.py index af12de89..fff64e5c 100644 --- a/server/reflector/processors/audio_chunker.py +++ b/server/reflector/processors/audio_chunker.py @@ -1,340 +1,78 @@ from typing import Optional import av -import numpy as np -import torch -from silero_vad import VADIterator, load_silero_vad +from prometheus_client import Counter, Histogram from reflector.processors.base import Processor class AudioChunkerProcessor(Processor): """ - Assemble audio frames into chunks with VAD-based speech detection + Base class for assembling audio frames into chunks """ INPUT_TYPE = av.AudioFrame OUTPUT_TYPE = list[av.AudioFrame] - def __init__( - self, - block_frames=256, - max_frames=1024, - vad_threshold=0.5, - use_onnx=False, - min_frames=2, - ): - super().__init__() + m_chunk = Histogram( + "audio_chunker", + "Time spent in AudioChunker.chunk", + ["backend"], + ) + m_chunk_call = Counter( + "audio_chunker_call", + "Number of calls to AudioChunker.chunk", + ["backend"], + ) + m_chunk_success = Counter( + "audio_chunker_success", + "Number of successful calls to AudioChunker.chunk", + ["backend"], + ) + m_chunk_failure = Counter( + "audio_chunker_failure", + "Number of failed calls to AudioChunker.chunk", + ["backend"], + ) + + def __init__(self, *args, **kwargs): + name = self.__class__.__name__ + self.m_chunk = self.m_chunk.labels(name) + self.m_chunk_call = self.m_chunk_call.labels(name) + self.m_chunk_success = self.m_chunk_success.labels(name) + self.m_chunk_failure = self.m_chunk_failure.labels(name) + super().__init__(*args, **kwargs) self.frames: list[av.AudioFrame] = [] - self.block_frames = block_frames - self.max_frames = max_frames - self.vad_threshold = vad_threshold - self.min_frames = min_frames - - # Initialize Silero VAD - self._init_vad(use_onnx) - - def _init_vad(self, use_onnx=False): - """Initialize Silero VAD model""" - try: - torch.set_num_threads(1) - self.vad_model = load_silero_vad(onnx=use_onnx) - self.vad_iterator = VADIterator(self.vad_model, sampling_rate=16000) - self.logger.info("Silero VAD initialized successfully") - - except Exception as e: - self.logger.error(f"Failed to initialize Silero VAD: {e}") - self.vad_model = None - self.vad_iterator = None async def _push(self, data: av.AudioFrame): - self.frames.append(data) - # print("timestamp", data.pts * data.time_base * 1000) - - # Check for speech segments every 32 frames (~1 second) - if len(self.frames) >= 32 and len(self.frames) % 32 == 0: - await self._process_block() - - # Safety fallback - emit if we hit max frames - elif len(self.frames) >= self.max_frames: - self.logger.warning( - f"AudioChunkerProcessor: Reached max frames ({self.max_frames}), " - f"emitting first {self.max_frames // 2} frames" - ) - frames_to_emit = self.frames[: self.max_frames // 2] - self.frames = self.frames[self.max_frames // 2 :] - if len(frames_to_emit) >= self.min_frames: - await self.emit(frames_to_emit) - else: - self.logger.debug( - f"Ignoring fallback segment with {len(frames_to_emit)} frames " - f"(< {self.min_frames} minimum)" + """Process incoming audio frame""" + # Validate audio format on first frame + if len(self.frames) == 0: + if data.sample_rate != 16000 or len(data.layout.channels) != 1: + raise ValueError( + f"AudioChunkerProcessor expects 16kHz mono audio, got {data.sample_rate}Hz " + f"with {len(data.layout.channels)} channel(s). " + f"Use AudioDownscaleProcessor before this processor." ) - async def _process_block(self): - # Need at least 32 frames for VAD detection (~1 second) - if len(self.frames) < 32 or self.vad_iterator is None: - return - - # Processing block with current buffer size - # print(f"Processing block: {len(self.frames)} frames in buffer") - try: - # Convert frames to numpy array for VAD - audio_array = self._frames_to_numpy(self.frames) + self.m_chunk_call.inc() + with self.m_chunk.time(): + result = await self._chunk(data) + self.m_chunk_success.inc() + if result: + await self.emit(result) + except Exception: + self.m_chunk_failure.inc() + raise - if audio_array is None: - # Fallback: emit all frames if conversion failed - frames_to_emit = self.frames[:] - self.frames = [] - if len(frames_to_emit) >= self.min_frames: - await self.emit(frames_to_emit) - else: - self.logger.debug( - f"Ignoring conversion-failed segment with {len(frames_to_emit)} frames " - f"(< {self.min_frames} minimum)" - ) - return - - # Find complete speech segments in the buffer - speech_end_frame = self._find_speech_segment_end(audio_array) - - if speech_end_frame is None or speech_end_frame <= 0: - # No speech found but buffer is getting large - if len(self.frames) > 512: - # Check if it's all silence and can be discarded - # No speech segment found, buffer at {len(self.frames)} frames - - # Could emit silence or discard old frames here - # For now, keep first 256 frames and discard older silence - if len(self.frames) > 768: - self.logger.debug( - f"Discarding {len(self.frames) - 256} old frames (likely silence)" - ) - self.frames = self.frames[-256:] - return - - # Calculate segment timing information - frames_to_emit = self.frames[:speech_end_frame] - - # Get timing from av.AudioFrame - if frames_to_emit: - first_frame = frames_to_emit[0] - last_frame = frames_to_emit[-1] - sample_rate = first_frame.sample_rate - - # Calculate duration - total_samples = sum(f.samples for f in frames_to_emit) - duration_seconds = total_samples / sample_rate if sample_rate > 0 else 0 - - # Get timestamps if available - start_time = ( - first_frame.pts * first_frame.time_base if first_frame.pts else 0 - ) - end_time = ( - last_frame.pts * last_frame.time_base if last_frame.pts else 0 - ) - - # Convert to HH:MM:SS format for logging - def format_time(seconds): - if not seconds: - return "00:00:00" - total_seconds = int(float(seconds)) - hours = total_seconds // 3600 - minutes = (total_seconds % 3600) // 60 - secs = total_seconds % 60 - return f"{hours:02d}:{minutes:02d}:{secs:02d}" - - start_formatted = format_time(start_time) - end_formatted = format_time(end_time) - - # Keep remaining frames for next processing - remaining_after = len(self.frames) - speech_end_frame - - # Single structured log line - self.logger.info( - "Speech segment found", - start=start_formatted, - end=end_formatted, - frames=speech_end_frame, - duration=round(duration_seconds, 2), - buffer_before=len(self.frames), - remaining=remaining_after, - ) - - # Keep remaining frames for next processing - self.frames = self.frames[speech_end_frame:] - - # Filter out segments with too few frames - if len(frames_to_emit) >= self.min_frames: - await self.emit(frames_to_emit) - else: - self.logger.debug( - f"Ignoring segment with {len(frames_to_emit)} frames " - f"(< {self.min_frames} minimum)" - ) - - except Exception as e: - self.logger.error(f"Error in VAD processing: {e}") - # Fallback to simple chunking - if len(self.frames) >= self.block_frames: - frames_to_emit = self.frames[: self.block_frames] - self.frames = self.frames[self.block_frames :] - if len(frames_to_emit) >= self.min_frames: - await self.emit(frames_to_emit) - else: - self.logger.debug( - f"Ignoring exception-fallback segment with {len(frames_to_emit)} frames " - f"(< {self.min_frames} minimum)" - ) - - def _frames_to_numpy(self, frames: list[av.AudioFrame]) -> Optional[np.ndarray]: - """Convert av.AudioFrame list to numpy array for VAD processing""" - if not frames: - return None - - try: - first_frame = frames[0] - original_sample_rate = first_frame.sample_rate - - audio_data = [] - for frame in frames: - frame_array = frame.to_ndarray() - - # Handle stereo -> mono conversion - if len(frame_array.shape) == 2 and frame_array.shape[0] > 1: - frame_array = np.mean(frame_array, axis=0) - elif len(frame_array.shape) == 2: - frame_array = frame_array.flatten() - - audio_data.append(frame_array) - - if not audio_data: - return None - - combined_audio = np.concatenate(audio_data) - - # Resample from 48kHz to 16kHz if needed - if original_sample_rate != 16000: - combined_audio = self._resample_audio( - combined_audio, original_sample_rate, 16000 - ) - - # Ensure float32 format - if combined_audio.dtype == np.int16: - # Normalize int16 audio to float32 in range [-1.0, 1.0] - combined_audio = combined_audio.astype(np.float32) / 32768.0 - elif combined_audio.dtype != np.float32: - combined_audio = combined_audio.astype(np.float32) - - return combined_audio - - except Exception as e: - self.logger.error(f"Error converting frames to numpy: {e}") - - return None - - def _resample_audio( - self, audio: np.ndarray, from_sr: int, to_sr: int - ) -> np.ndarray: - """Simple linear resampling from from_sr to to_sr""" - if from_sr == to_sr: - return audio - - try: - # Simple linear interpolation resampling - ratio = to_sr / from_sr - new_length = int(len(audio) * ratio) - - # Create indices for interpolation - old_indices = np.linspace(0, len(audio) - 1, new_length) - resampled = np.interp(old_indices, np.arange(len(audio)), audio) - - return resampled.astype(np.float32) - - except Exception as e: - self.logger.error("Resampling error", exc_info=e) - # Fallback: simple decimation/repetition - if from_sr > to_sr: - # Downsample by taking every nth sample - step = from_sr // to_sr - return audio[::step] - else: - # Upsample by repeating samples - repeat = to_sr // from_sr - return np.repeat(audio, repeat) - - def _find_speech_segment_end(self, audio_array: np.ndarray) -> Optional[int]: - """Find complete speech segments and return frame index at segment end""" - if self.vad_iterator is None or len(audio_array) == 0: - return None - - try: - # Process audio in 512-sample windows for VAD - window_size = 512 - min_silence_windows = 3 # Require 3 windows of silence after speech - - # Track speech state - in_speech = False - speech_start = None - speech_end = None - silence_count = 0 - - for i in range(0, len(audio_array), window_size): - chunk = audio_array[i : i + window_size] - if len(chunk) < window_size: - chunk = np.pad(chunk, (0, window_size - len(chunk))) - - # Detect if this window has speech - speech_dict = self.vad_iterator(chunk, return_seconds=True) - - # VADIterator returns dict with 'start' and 'end' when speech segments are detected - if speech_dict: - if not in_speech: - # Speech started - speech_start = i - in_speech = True - # Debug: print(f"Speech START at sample {i}, VAD: {speech_dict}") - silence_count = 0 # Reset silence counter - continue - - if not in_speech: - continue - - # We're in speech but found silence - silence_count += 1 - if silence_count < min_silence_windows: - continue - - # Found end of speech segment - speech_end = i - (min_silence_windows - 1) * window_size - # Debug: print(f"Speech END at sample {speech_end}") - - # Convert sample position to frame index - samples_per_frame = self.frames[0].samples if self.frames else 1024 - # Account for resampling: we process at 16kHz but frames might be 48kHz - resample_ratio = 48000 / 16000 # 3x - actual_sample_pos = int(speech_end * resample_ratio) - frame_index = actual_sample_pos // samples_per_frame - - # Ensure we don't exceed buffer - frame_index = min(frame_index, len(self.frames)) - return frame_index - - return None - - except Exception as e: - self.logger.error(f"Error finding speech segment: {e}") - return None + async def _chunk(self, data: av.AudioFrame) -> Optional[list[av.AudioFrame]]: + """ + Process audio frame and return chunk when ready. + Subclasses should implement their chunking logic here. + """ + raise NotImplementedError async def _flush(self): - frames = self.frames[:] - self.frames = [] - if frames: - if len(frames) >= self.min_frames: - await self.emit(frames) - else: - self.logger.debug( - f"Ignoring flush segment with {len(frames)} frames " - f"(< {self.min_frames} minimum)" - ) + """Flush any remaining frames when processing ends""" + raise NotImplementedError diff --git a/server/reflector/processors/audio_chunker_auto.py b/server/reflector/processors/audio_chunker_auto.py new file mode 100644 index 00000000..1b9775a4 --- /dev/null +++ b/server/reflector/processors/audio_chunker_auto.py @@ -0,0 +1,32 @@ +import importlib + +from reflector.processors.audio_chunker import AudioChunkerProcessor +from reflector.settings import settings + + +class AudioChunkerAutoProcessor(AudioChunkerProcessor): + _registry = {} + + @classmethod + def register(cls, name, kclass): + cls._registry[name] = kclass + + def __new__(cls, name: str | None = None, **kwargs): + if name is None: + name = settings.AUDIO_CHUNKER_BACKEND + if name not in cls._registry: + module_name = f"reflector.processors.audio_chunker_{name}" + importlib.import_module(module_name) + + # gather specific configuration for the processor + # search `AUDIO_CHUNKER_BACKEND_XXX_YYY`, push to constructor as `backend_xxx_yyy` + config = {} + name_upper = name.upper() + settings_prefix = "AUDIO_CHUNKER_" + config_prefix = f"{settings_prefix}{name_upper}_" + for key, value in settings: + if key.startswith(config_prefix): + config_name = key[len(settings_prefix) :].lower() + config[config_name] = value + + return cls._registry[name](**config | kwargs) diff --git a/server/reflector/processors/audio_chunker_frames.py b/server/reflector/processors/audio_chunker_frames.py new file mode 100644 index 00000000..e2eb0941 --- /dev/null +++ b/server/reflector/processors/audio_chunker_frames.py @@ -0,0 +1,34 @@ +from typing import Optional + +import av + +from reflector.processors.audio_chunker import AudioChunkerProcessor +from reflector.processors.audio_chunker_auto import AudioChunkerAutoProcessor + + +class AudioChunkerFramesProcessor(AudioChunkerProcessor): + """ + Simple frame-based audio chunker that emits chunks after a fixed number of frames + """ + + def __init__(self, max_frames=256, **kwargs): + super().__init__(**kwargs) + self.max_frames = max_frames + + async def _chunk(self, data: av.AudioFrame) -> Optional[list[av.AudioFrame]]: + self.frames.append(data) + if len(self.frames) >= self.max_frames: + frames_to_emit = self.frames[:] + self.frames = [] + return frames_to_emit + + return None + + async def _flush(self): + frames = self.frames[:] + self.frames = [] + if frames: + await self.emit(frames) + + +AudioChunkerAutoProcessor.register("frames", AudioChunkerFramesProcessor) diff --git a/server/reflector/processors/audio_chunker_silero.py b/server/reflector/processors/audio_chunker_silero.py new file mode 100644 index 00000000..c9719ed3 --- /dev/null +++ b/server/reflector/processors/audio_chunker_silero.py @@ -0,0 +1,298 @@ +from typing import Optional + +import av +import numpy as np +import torch +from silero_vad import VADIterator, load_silero_vad + +from reflector.processors.audio_chunker import AudioChunkerProcessor +from reflector.processors.audio_chunker_auto import AudioChunkerAutoProcessor + + +class AudioChunkerSileroProcessor(AudioChunkerProcessor): + """ + Assemble audio frames into chunks with VAD-based speech detection using Silero VAD + """ + + def __init__( + self, + block_frames=256, + max_frames=1024, + use_onnx=True, + min_frames=2, + **kwargs, + ): + super().__init__(**kwargs) + self.block_frames = block_frames + self.max_frames = max_frames + self.min_frames = min_frames + + # Initialize Silero VAD + self._init_vad(use_onnx) + + def _init_vad(self, use_onnx=False): + """Initialize Silero VAD model""" + try: + torch.set_num_threads(1) + self.vad_model = load_silero_vad(onnx=use_onnx) + self.vad_iterator = VADIterator(self.vad_model, sampling_rate=16000) + self.logger.info("Silero VAD initialized successfully") + + except Exception as e: + self.logger.error(f"Failed to initialize Silero VAD: {e}") + self.vad_model = None + self.vad_iterator = None + + async def _chunk(self, data: av.AudioFrame) -> Optional[list[av.AudioFrame]]: + """Process audio frame and return chunk when ready""" + self.frames.append(data) + + # Check for speech segments every 32 frames (~1 second) + if len(self.frames) >= 32 and len(self.frames) % 32 == 0: + return await self._process_block() + + # Safety fallback - emit if we hit max frames + elif len(self.frames) >= self.max_frames: + self.logger.warning( + f"AudioChunkerSileroProcessor: Reached max frames ({self.max_frames}), " + f"emitting first {self.max_frames // 2} frames" + ) + frames_to_emit = self.frames[: self.max_frames // 2] + self.frames = self.frames[self.max_frames // 2 :] + if len(frames_to_emit) >= self.min_frames: + return frames_to_emit + else: + self.logger.debug( + f"Ignoring fallback segment with {len(frames_to_emit)} frames " + f"(< {self.min_frames} minimum)" + ) + + return None + + async def _process_block(self) -> Optional[list[av.AudioFrame]]: + # Need at least 32 frames for VAD detection (~1 second) + if len(self.frames) < 32 or self.vad_iterator is None: + return None + + # Processing block with current buffer size + print(f"Processing block: {len(self.frames)} frames in buffer") + + try: + # Convert frames to numpy array for VAD + audio_array = self._frames_to_numpy(self.frames) + + if audio_array is None: + # Fallback: emit all frames if conversion failed + frames_to_emit = self.frames[:] + self.frames = [] + if len(frames_to_emit) >= self.min_frames: + return frames_to_emit + else: + self.logger.debug( + f"Ignoring conversion-failed segment with {len(frames_to_emit)} frames " + f"(< {self.min_frames} minimum)" + ) + return None + + # Find complete speech segments in the buffer + speech_end_frame = self._find_speech_segment_end(audio_array) + + if speech_end_frame is None or speech_end_frame <= 0: + # No speech found but buffer is getting large + if len(self.frames) > 512: + # Check if it's all silence and can be discarded + # No speech segment found, buffer at {len(self.frames)} frames + + # Could emit silence or discard old frames here + # For now, keep first 256 frames and discard older silence + if len(self.frames) > 768: + self.logger.debug( + f"Discarding {len(self.frames) - 256} old frames (likely silence)" + ) + self.frames = self.frames[-256:] + return None + + # Calculate segment timing information + frames_to_emit = self.frames[:speech_end_frame] + + # Get timing from av.AudioFrame + if frames_to_emit: + first_frame = frames_to_emit[0] + last_frame = frames_to_emit[-1] + sample_rate = first_frame.sample_rate + + # Calculate duration + total_samples = sum(f.samples for f in frames_to_emit) + duration_seconds = total_samples / sample_rate if sample_rate > 0 else 0 + + # Get timestamps if available + start_time = ( + first_frame.pts * first_frame.time_base if first_frame.pts else 0 + ) + end_time = ( + last_frame.pts * last_frame.time_base if last_frame.pts else 0 + ) + + # Convert to HH:MM:SS format for logging + def format_time(seconds): + if not seconds: + return "00:00:00" + total_seconds = int(float(seconds)) + hours = total_seconds // 3600 + minutes = (total_seconds % 3600) // 60 + secs = total_seconds % 60 + return f"{hours:02d}:{minutes:02d}:{secs:02d}" + + start_formatted = format_time(start_time) + end_formatted = format_time(end_time) + + # Keep remaining frames for next processing + remaining_after = len(self.frames) - speech_end_frame + + # Single structured log line + self.logger.info( + "Speech segment found", + start=start_formatted, + end=end_formatted, + frames=speech_end_frame, + duration=round(duration_seconds, 2), + buffer_before=len(self.frames), + remaining=remaining_after, + ) + + # Keep remaining frames for next processing + self.frames = self.frames[speech_end_frame:] + + # Filter out segments with too few frames + if len(frames_to_emit) >= self.min_frames: + return frames_to_emit + else: + self.logger.debug( + f"Ignoring segment with {len(frames_to_emit)} frames " + f"(< {self.min_frames} minimum)" + ) + + except Exception as e: + self.logger.error(f"Error in VAD processing: {e}") + # Fallback to simple chunking + if len(self.frames) >= self.block_frames: + frames_to_emit = self.frames[: self.block_frames] + self.frames = self.frames[self.block_frames :] + if len(frames_to_emit) >= self.min_frames: + return frames_to_emit + else: + self.logger.debug( + f"Ignoring exception-fallback segment with {len(frames_to_emit)} frames " + f"(< {self.min_frames} minimum)" + ) + + return None + + def _frames_to_numpy(self, frames: list[av.AudioFrame]) -> Optional[np.ndarray]: + """Convert av.AudioFrame list to numpy array for VAD processing""" + if not frames: + return None + + try: + audio_data = [] + for frame in frames: + frame_array = frame.to_ndarray() + + if len(frame_array.shape) == 2: + frame_array = frame_array.flatten() + + audio_data.append(frame_array) + + if not audio_data: + return None + + combined_audio = np.concatenate(audio_data) + + # Ensure float32 format + if combined_audio.dtype == np.int16: + # Normalize int16 audio to float32 in range [-1.0, 1.0] + combined_audio = combined_audio.astype(np.float32) / 32768.0 + elif combined_audio.dtype != np.float32: + combined_audio = combined_audio.astype(np.float32) + + return combined_audio + + except Exception as e: + self.logger.error(f"Error converting frames to numpy: {e}") + + return None + + def _find_speech_segment_end(self, audio_array: np.ndarray) -> Optional[int]: + """Find complete speech segments and return frame index at segment end""" + if self.vad_iterator is None or len(audio_array) == 0: + return None + + try: + # Process audio in 512-sample windows for VAD + window_size = 512 + min_silence_windows = 3 # Require 3 windows of silence after speech + + # Track speech state + in_speech = False + speech_start = None + speech_end = None + silence_count = 0 + + for i in range(0, len(audio_array), window_size): + chunk = audio_array[i : i + window_size] + if len(chunk) < window_size: + chunk = np.pad(chunk, (0, window_size - len(chunk))) + + # Detect if this window has speech + speech_dict = self.vad_iterator(chunk, return_seconds=True) + + # VADIterator returns dict with 'start' and 'end' when speech segments are detected + if speech_dict: + if not in_speech: + # Speech started + speech_start = i + in_speech = True + # Debug: print(f"Speech START at sample {i}, VAD: {speech_dict}") + silence_count = 0 # Reset silence counter + continue + + if not in_speech: + continue + + # We're in speech but found silence + silence_count += 1 + if silence_count < min_silence_windows: + continue + + # Found end of speech segment + speech_end = i - (min_silence_windows - 1) * window_size + # Debug: print(f"Speech END at sample {speech_end}") + + # Convert sample position to frame index + samples_per_frame = self.frames[0].samples if self.frames else 1024 + frame_index = speech_end // samples_per_frame + + # Ensure we don't exceed buffer + frame_index = min(frame_index, len(self.frames)) + return frame_index + + return None + + except Exception as e: + self.logger.error(f"Error finding speech segment: {e}") + return None + + async def _flush(self): + frames = self.frames[:] + self.frames = [] + if frames: + if len(frames) >= self.min_frames: + await self.emit(frames) + else: + self.logger.debug( + f"Ignoring flush segment with {len(frames)} frames " + f"(< {self.min_frames} minimum)" + ) + + +AudioChunkerAutoProcessor.register("silero", AudioChunkerSileroProcessor) diff --git a/server/reflector/processors/audio_downscale.py b/server/reflector/processors/audio_downscale.py new file mode 100644 index 00000000..94eac67a --- /dev/null +++ b/server/reflector/processors/audio_downscale.py @@ -0,0 +1,60 @@ +from typing import Optional + +import av +from av.audio.resampler import AudioResampler + +from reflector.processors.base import Processor + + +def copy_frame(frame: av.AudioFrame) -> av.AudioFrame: + frame_copy = frame.from_ndarray( + frame.to_ndarray(), + format=frame.format.name, + layout=frame.layout.name, + ) + frame_copy.sample_rate = frame.sample_rate + frame_copy.pts = frame.pts + frame_copy.time_base = frame.time_base + return frame_copy + + +class AudioDownscaleProcessor(Processor): + """ + Downscale audio frames to 16kHz mono format + """ + + INPUT_TYPE = av.AudioFrame + OUTPUT_TYPE = av.AudioFrame + + def __init__(self, target_rate: int = 16000, target_layout: str = "mono", **kwargs): + super().__init__(**kwargs) + self.target_rate = target_rate + self.target_layout = target_layout + self.resampler: Optional[AudioResampler] = None + self.needs_resampling: Optional[bool] = None + + async def _push(self, data: av.AudioFrame): + if self.needs_resampling is None: + self.needs_resampling = ( + data.sample_rate != self.target_rate + or data.layout.name != self.target_layout + ) + + if self.needs_resampling: + self.resampler = AudioResampler( + format="s16", layout=self.target_layout, rate=self.target_rate + ) + + if not self.needs_resampling or not self.resampler: + await self.emit(data) + return + + resampled_frames = self.resampler.resample(copy_frame(data)) + for resampled_frame in resampled_frames: + await self.emit(resampled_frame) + + async def _flush(self): + if self.needs_resampling and self.resampler: + final_frames = self.resampler.resample(None) + for frame in final_frames: + await self.emit(frame) diff --git a/server/reflector/processors/audio_merge.py b/server/reflector/processors/audio_merge.py index 84d6e856..58f93ac4 100644 --- a/server/reflector/processors/audio_merge.py +++ b/server/reflector/processors/audio_merge.py @@ -3,24 +3,11 @@ from time import monotonic_ns from uuid import uuid4 import av -from av.audio.resampler import AudioResampler from reflector.processors.base import Processor from reflector.processors.types import AudioFile -def copy_frame(frame: av.AudioFrame) -> av.AudioFrame: - frame_copy = frame.from_ndarray( - frame.to_ndarray(), - format=frame.format.name, - layout=frame.layout.name, - ) - frame_copy.sample_rate = frame.sample_rate - frame_copy.pts = frame.pts - frame_copy.time_base = frame.time_base - return frame_copy - - class AudioMergeProcessor(Processor): """ Merge audio frame into a single file @@ -29,9 +16,8 @@ class AudioMergeProcessor(Processor): INPUT_TYPE = list[av.AudioFrame] OUTPUT_TYPE = AudioFile - def __init__(self, downsample_to_16k_mono: bool = True, **kwargs): + def __init__(self, **kwargs): super().__init__(**kwargs) - self.downsample_to_16k_mono = downsample_to_16k_mono async def _push(self, data: list[av.AudioFrame]): if not data: @@ -39,72 +25,27 @@ class AudioMergeProcessor(Processor): # get audio information from first frame frame = data[0] - original_channels = len(frame.layout.channels) - original_sample_rate = frame.sample_rate - original_sample_width = frame.format.bytes - - # determine if we need processing - needs_processing = self.downsample_to_16k_mono and ( - original_sample_rate != 16000 or original_channels != 1 - ) - - # determine output parameters - if self.downsample_to_16k_mono: - output_sample_rate = 16000 - output_channels = 1 - output_sample_width = 2 # 16-bit = 2 bytes - else: - output_sample_rate = original_sample_rate - output_channels = original_channels - output_sample_width = original_sample_width + output_channels = len(frame.layout.channels) + output_sample_rate = frame.sample_rate + output_sample_width = frame.format.bytes # create audio file uu = uuid4().hex fd = io.BytesIO() - if needs_processing: - # Process with PyAV resampler - out_container = av.open(fd, "w", format="wav") - out_stream = out_container.add_stream("pcm_s16le", rate=16000) - out_stream.layout = "mono" + # Use PyAV to write frames + out_container = av.open(fd, "w", format="wav") + out_stream = out_container.add_stream("pcm_s16le", rate=output_sample_rate) + out_stream.layout = frame.layout.name - # Create resampler if needed - resampler = None - if original_sample_rate != 16000 or original_channels != 1: - resampler = AudioResampler(format="s16", layout="mono", rate=16000) - - for frame in data: - if resampler: - # Resample and convert to mono - # XXX for an unknown reason, if we don't use a copy of the frame, we get - # Invalid Argumment from resample. Debugging indicate that when a previous processor - # already used the frame (like AudioFileWriter), it make it invalid argument here. - resampled_frames = resampler.resample(copy_frame(frame)) - for resampled_frame in resampled_frames: - for packet in out_stream.encode(resampled_frame): - out_container.mux(packet) - else: - # Direct encoding without resampling - for packet in out_stream.encode(frame): - out_container.mux(packet) - - # Flush the encoder - for packet in out_stream.encode(None): + for frame in data: + for packet in out_stream.encode(frame): out_container.mux(packet) - out_container.close() - else: - # Use PyAV for original frames (no processing needed) - out_container = av.open(fd, "w", format="wav") - out_stream = out_container.add_stream("pcm_s16le", rate=output_sample_rate) - out_stream.layout = "mono" if output_channels == 1 else frame.layout - for frame in data: - for packet in out_stream.encode(frame): - out_container.mux(packet) - - for packet in out_stream.encode(None): - out_container.mux(packet) - out_container.close() + # Flush the encoder + for packet in out_stream.encode(None): + out_container.mux(packet) + out_container.close() fd.seek(0) diff --git a/server/reflector/processors/audio_transcript_modal.py b/server/reflector/processors/audio_transcript_modal.py index efe0319f..6ce19ea1 100644 --- a/server/reflector/processors/audio_transcript_modal.py +++ b/server/reflector/processors/audio_transcript_modal.py @@ -12,9 +12,6 @@ API will be a POST request to TRANSCRIPT_URL: """ -from typing import List - -import aiohttp from openai import AsyncOpenAI from reflector.processors.audio_transcript import AudioTranscriptProcessor @@ -25,7 +22,9 @@ from reflector.settings import settings class AudioTranscriptModalProcessor(AudioTranscriptProcessor): def __init__( - self, modal_api_key: str | None = None, batch_enabled: bool = True, **kwargs + self, + modal_api_key: str | None = None, + **kwargs, ): super().__init__() if not settings.TRANSCRIPT_URL: @@ -35,126 +34,6 @@ class AudioTranscriptModalProcessor(AudioTranscriptProcessor): self.transcript_url = settings.TRANSCRIPT_URL + "/v1" self.timeout = settings.TRANSCRIPT_TIMEOUT self.modal_api_key = modal_api_key - self.max_batch_duration = 10.0 - self.max_batch_files = 15 - self.batch_enabled = batch_enabled - self.pending_files: List[AudioFile] = [] # Files waiting to be processed - - @classmethod - def _calculate_duration(cls, audio_file: AudioFile) -> float: - """Calculate audio duration in seconds from AudioFile metadata""" - # Duration = total_samples / sample_rate - # We need to estimate total samples from the file data - import wave - - try: - # Try to read as WAV file to get duration - audio_file.fd.seek(0) - with wave.open(audio_file.fd, "rb") as wav_file: - frames = wav_file.getnframes() - sample_rate = wav_file.getframerate() - duration = frames / sample_rate - return duration - except Exception: - # Fallback: estimate from file size and audio parameters - audio_file.fd.seek(0, 2) # Seek to end - file_size = audio_file.fd.tell() - audio_file.fd.seek(0) # Reset to beginning - - # Estimate: file_size / (sample_rate * channels * sample_width) - bytes_per_second = ( - audio_file.sample_rate - * audio_file.channels - * (audio_file.sample_width // 8) - ) - estimated_duration = ( - file_size / bytes_per_second if bytes_per_second > 0 else 0 - ) - return max(0, estimated_duration) - - def _create_batches(self, audio_files: List[AudioFile]) -> List[List[AudioFile]]: - """Group audio files into batches with maximum 30s total duration""" - batches = [] - current_batch = [] - current_duration = 0.0 - - for audio_file in audio_files: - duration = self._calculate_duration(audio_file) - - # If adding this file exceeds max duration, start a new batch - if current_duration + duration > self.max_batch_duration and current_batch: - batches.append(current_batch) - current_batch = [audio_file] - current_duration = duration - else: - current_batch.append(audio_file) - current_duration += duration - - # Add the last batch if not empty - if current_batch: - batches.append(current_batch) - - return batches - - async def _transcript_batch(self, audio_files: List[AudioFile]) -> List[Transcript]: - """Transcribe a batch of audio files using the parakeet backend""" - if not audio_files: - return [] - - self.logger.debug(f"Batch transcribing {len(audio_files)} files") - - # Prepare form data for batch request - data = aiohttp.FormData() - data.add_field("language", self.get_pref("audio:source_language", "en")) - data.add_field("batch", "true") - - for i, audio_file in enumerate(audio_files): - audio_file.fd.seek(0) - data.add_field( - "files", - audio_file.fd, - filename=f"{audio_file.name}", - content_type="audio/wav", - ) - - # Make batch request - headers = {"Authorization": f"Bearer {self.modal_api_key}"} - - async with aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=self.timeout) - ) as session: - async with session.post( - f"{self.transcript_url}/audio/transcriptions", - data=data, - headers=headers, - ) as response: - if response.status != 200: - error_text = await response.text() - raise Exception( - f"Batch transcription failed: {response.status} {error_text}" - ) - - result = await response.json() - - # Process batch results - transcripts = [] - results = result.get("results", []) - - for i, (audio_file, file_result) in enumerate(zip(audio_files, results)): - transcript = Transcript( - words=[ - Word( - text=word_info["word"], - start=word_info["start"], - end=word_info["end"], - ) - for word_info in file_result.get("words", []) - ] - ) - transcript.add_offset(audio_file.timestamp) - transcripts.append(transcript) - - return transcripts async def _transcript(self, data: AudioFile): async with AsyncOpenAI( @@ -187,96 +66,5 @@ class AudioTranscriptModalProcessor(AudioTranscriptProcessor): return transcript - async def transcript_multiple( - self, audio_files: List[AudioFile] - ) -> List[Transcript]: - """Transcribe multiple audio files using batching""" - if len(audio_files) == 1: - # Single file, use existing method - return [await self._transcript(audio_files[0])] - - # Create batches with max 30s duration each - batches = self._create_batches(audio_files) - - self.logger.debug( - f"Processing {len(audio_files)} files in {len(batches)} batches" - ) - - # Process all batches concurrently - all_transcripts = [] - - for batch in batches: - batch_transcripts = await self._transcript_batch(batch) - all_transcripts.extend(batch_transcripts) - - return all_transcripts - - async def _push(self, data: AudioFile): - """Override _push to support batching""" - if not self.batch_enabled: - # Use parent implementation for single file processing - return await super()._push(data) - - # Add file to pending batch - self.pending_files.append(data) - self.logger.debug( - f"Added file to batch: {data.name}, batch size: {len(self.pending_files)}" - ) - - # Calculate total duration of pending files - total_duration = sum(self._calculate_duration(f) for f in self.pending_files) - - # Process batch if it reaches max duration or has multiple files ready for optimization - should_process_batch = ( - total_duration >= self.max_batch_duration - or len(self.pending_files) >= self.max_batch_files - ) - - if should_process_batch: - await self._process_pending_batch() - - async def _process_pending_batch(self): - """Process all pending files as batches""" - if not self.pending_files: - return - - self.logger.debug(f"Processing batch of {len(self.pending_files)} files") - - try: - # Create batches respecting duration limit - batches = self._create_batches(self.pending_files) - - # Process each batch - for batch in batches: - self.m_transcript_call.inc() - try: - with self.m_transcript.time(): - # Use batch transcription - transcripts = await self._transcript_batch(batch) - - self.m_transcript_success.inc() - - # Emit each transcript - for transcript in transcripts: - if transcript: - await self.emit(transcript) - - except Exception: - self.m_transcript_failure.inc() - raise - finally: - # Release audio files - for audio_file in batch: - audio_file.release() - - finally: - # Clear pending files - self.pending_files.clear() - - async def _flush(self): - """Process any remaining files when flushing""" - await self._process_pending_batch() - await super()._flush() - AudioTranscriptAutoProcessor.register("modal", AudioTranscriptModalProcessor) diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 7b50911b..bbc835cd 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -21,6 +21,10 @@ class Settings(BaseSettings): # local data directory DATA_DIR: str = "./data" + # Audio Chunking + # backends: silero, frames + AUDIO_CHUNKER_BACKEND: str = "frames" + # Audio Transcription # backends: whisper, modal TRANSCRIPT_BACKEND: str = "whisper" diff --git a/server/reflector/tools/process.py b/server/reflector/tools/process.py index 43ec06ab..4f1cafdd 100644 --- a/server/reflector/tools/process.py +++ b/server/reflector/tools/process.py @@ -16,7 +16,8 @@ import av from reflector.logger import logger from reflector.processors import ( - AudioChunkerProcessor, + AudioChunkerAutoProcessor, + AudioDownscaleProcessor, AudioFileWriterProcessor, AudioMergeProcessor, AudioTranscriptAutoProcessor, @@ -95,7 +96,8 @@ async def process_audio_file( # Add the rest of the processors processors += [ - AudioChunkerProcessor(), + AudioDownscaleProcessor(), + AudioChunkerAutoProcessor(), AudioMergeProcessor(), AudioTranscriptAutoProcessor.as_threaded(), TranscriptLinerProcessor(), @@ -322,7 +324,8 @@ if __name__ == "__main__": # Ignore internal processors if processor in ( - "AudioChunkerProcessor", + "AudioDownscaleProcessor", + "AudioChunkerAutoProcessor", "AudioMergeProcessor", "AudioFileWriterProcessor", "TopicCollectorProcessor", diff --git a/server/reflector/tools/process_with_diarization.py b/server/reflector/tools/process_with_diarization.py index 11561df7..f1415e1a 100644 --- a/server/reflector/tools/process_with_diarization.py +++ b/server/reflector/tools/process_with_diarization.py @@ -17,7 +17,8 @@ import av from reflector.logger import logger from reflector.processors import ( - AudioChunkerProcessor, + AudioChunkerAutoProcessor, + AudioDownscaleProcessor, AudioFileWriterProcessor, AudioMergeProcessor, AudioTranscriptAutoProcessor, @@ -96,7 +97,8 @@ async def process_audio_file_with_diarization( # Add the rest of the processors processors += [ - AudioChunkerProcessor(), + AudioDownscaleProcessor(), + AudioChunkerAutoProcessor(), AudioMergeProcessor(), AudioTranscriptAutoProcessor.as_threaded(), ] @@ -276,7 +278,8 @@ if __name__ == "__main__": # Ignore internal processors if processor in ( - "AudioChunkerProcessor", + "AudioDownscaleProcessor", + "AudioChunkerAutoProcessor", "AudioMergeProcessor", "AudioFileWriterProcessor", "TopicCollectorProcessor", diff --git a/server/reflector/tools/runpipeline.py b/server/reflector/tools/runpipeline.py index eeb9647b..d723f7b4 100644 --- a/server/reflector/tools/runpipeline.py +++ b/server/reflector/tools/runpipeline.py @@ -53,7 +53,7 @@ async def run_single_processor(args): async def event_callback(event: PipelineEvent): processor = event.processor # ignore some processor - if processor in ("AudioChunkerProcessor", "AudioMergeProcessor"): + if processor in ("AudioChunkerAutoProcessor", "AudioMergeProcessor"): return print(f"Event: {event}") if output_fd: