Compare commits

...

6 Commits

Author SHA1 Message Date
aabf2c2572 chore(main): release 0.7.3 (#565) 2025-08-22 16:35:52 -06:00
6a7b08f016 doc: change readme intro 2025-08-22 16:26:25 -06:00
e2736563d9 doc: update readme with new images 2025-08-22 16:15:54 -06:00
0f54b7782d chore: ignore www/.env.[development,production] 2025-08-22 14:41:09 -06:00
359280dd34 fix: cleaned repo, and get git-leaks clean 2025-08-22 11:51:34 -06:00
9265d201b5 fix: restore previous behavior on live pipeline + audio downscaler (#561)
This commit restore the original behavior with frame cutting. While
silero is used on our gpu for files, look like it's not working great on
the live pipeline. To be investigated, but at the moment, what we keep
is:

- refactored to extract the downscale for further processing in the
pipeline
- remove any downscale implementation from audio_chunker and audio_merge
- removed batching from audio_merge too for now
2025-08-22 10:49:26 -06:00
19 changed files with 569 additions and 631 deletions

4
.gitignore vendored
View File

@@ -14,4 +14,6 @@ data/
www/REFACTOR.md
www/reload-frontend
server/test.sqlite
CLAUDE.local.md
CLAUDE.local.md
www/.env.development
www/.env.production

1
.gitleaksignore Normal file
View File

@@ -0,0 +1 @@
b9d891d3424f371642cb032ecfd0e2564470a72c:server/tests/test_transcripts_recording_deletion.py:generic-api-key:15

View File

@@ -27,3 +27,8 @@ repos:
files: ^server/
- id: ruff-format
files: ^server/
- repo: https://github.com/gitleaks/gitleaks
rev: v8.28.0
hooks:
- id: gitleaks

View File

@@ -1,5 +1,13 @@
# Changelog
## [0.7.3](https://github.com/Monadical-SAS/reflector/compare/v0.7.2...v0.7.3) (2025-08-22)
### Bug Fixes
* cleaned repo, and get git-leaks clean ([359280d](https://github.com/Monadical-SAS/reflector/commit/359280dd340433ba4402ed69034094884c825e67))
* restore previous behavior on live pipeline + audio downscaler ([#561](https://github.com/Monadical-SAS/reflector/issues/561)) ([9265d20](https://github.com/Monadical-SAS/reflector/commit/9265d201b590d23c628c5f19251b70f473859043))
## [0.7.2](https://github.com/Monadical-SAS/reflector/compare/v0.7.1...v0.7.2) (2025-08-21)

View File

@@ -1,43 +1,60 @@
<div align="center">
<img width="100" alt="image" src="https://github.com/user-attachments/assets/66fb367b-2c89-4516-9912-f47ac59c6a7f"/>
# Reflector
Reflector Audio Management and Analysis is a cutting-edge web application under development by Monadical. It utilizes AI to record meetings, providing a permanent record with transcripts, translations, and automated summaries.
Reflector is an AI-powered audio transcription and meeting analysis platform that provides real-time transcription, speaker diarization, translation and summarization for audio content and live meetings. It works 100% with local models (whisper/parakeet, pyannote, seamless-m4t, and your local llm like phi-4).
[![Tests](https://github.com/monadical-sas/reflector/actions/workflows/pytests.yml/badge.svg?branch=main&event=push)](https://github.com/monadical-sas/reflector/actions/workflows/pytests.yml)
[![Tests](https://github.com/monadical-sas/reflector/actions/workflows/test_server.yml/badge.svg?branch=main&event=push)](https://github.com/monadical-sas/reflector/actions/workflows/test_server.yml)
[![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT)
</div>
## Screenshots
</div>
<table>
<tr>
<td>
<a href="https://github.com/user-attachments/assets/3a976930-56c1-47ef-8c76-55d3864309e3">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/3a976930-56c1-47ef-8c76-55d3864309e3" />
<a href="https://github.com/user-attachments/assets/21f5597c-2930-4899-a154-f7bd61a59e97">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/21f5597c-2930-4899-a154-f7bd61a59e97" />
</a>
</td>
<td>
<a href="https://github.com/user-attachments/assets/bfe3bde3-08af-4426-a9a1-11ad5cd63b33">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/bfe3bde3-08af-4426-a9a1-11ad5cd63b33" />
<a href="https://github.com/user-attachments/assets/f6b9399a-5e51-4bae-b807-59128d0a940c">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/f6b9399a-5e51-4bae-b807-59128d0a940c" />
</a>
</td>
<td>
<a href="https://github.com/user-attachments/assets/7b60c9d0-efe4-474f-a27b-ea13bd0fabdc">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/7b60c9d0-efe4-474f-a27b-ea13bd0fabdc" />
<a href="https://github.com/user-attachments/assets/a42ce460-c1fd-4489-a995-270516193897">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/a42ce460-c1fd-4489-a995-270516193897" />
</a>
</td>
<td>
<a href="https://github.com/user-attachments/assets/21929f6d-c309-42fe-9c11-f1299e50fbd4">
<img width="700" alt="image" src="https://github.com/user-attachments/assets/21929f6d-c309-42fe-9c11-f1299e50fbd4" />
</a>
</td>
</tr>
</table>
## What is Reflector?
Reflector is a web application that utilizes AI to process audio content, providing:
- **Real-time Transcription**: Convert speech to text using [Whisper](https://github.com/openai/whisper) (multi-language) or [Parakeet](https://huggingface.co/nvidia/parakeet-tdt-0.6b-v2) (English) models
- **Speaker Diarization**: Identify and label different speakers using [Pyannote](https://github.com/pyannote/pyannote-audio) 3.1
- **Live Translation**: Translate audio content in real-time to many languages with [Facebook Seamless-M4T](https://github.com/facebookresearch/seamless_communication)
- **Topic Detection & Summarization**: Extract key topics and generate concise summaries using LLMs
- **Meeting Recording**: Create permanent records of meetings with searchable transcripts
Currently we provide [modal.com](https://modal.com/) gpu template to deploy.
## Background
The project architecture consists of three primary components:
- **Front-End**: NextJS React project hosted on Vercel, located in `www/`.
- **Back-End**: Python server that offers an API and data persistence, found in `server/`.
- **GPU implementation**: Providing services such as speech-to-text transcription, topic generation, automated summaries, and translations. Most reliable option is Modal deployment
- **Front-End**: NextJS React project hosted on Vercel, located in `www/`.
- **GPU implementation**: Providing services such as speech-to-text transcription, topic generation, automated summaries, and translations.
It also uses authentik for authentication if activated, and Vercel for deployment and configuration of the front-end.
It also uses authentik for authentication if activated.
## Contribution Guidelines

View File

@@ -172,7 +172,7 @@ class TranscriberParakeetLive:
text = output.text.strip()
words = [
{
"word": word_info["word"],
"word": word_info["word"] + " ",
"start": round(word_info["start"], 2),
"end": round(word_info["end"], 2),
}
@@ -213,7 +213,7 @@ class TranscriberParakeetLive:
words = [
{
"word": word_info["word"],
"word": word_info["word"] + " ",
"start": round(word_info["start"], 2),
"end": round(word_info["end"], 2),
}
@@ -386,7 +386,7 @@ class TranscriberParakeetFile:
text = output.text.strip()
words = [
{
"word": word_info["word"],
"word": word_info["word"] + " ",
"start": round(
word_info["start"] + start_time + timestamp_offset, 2
),

View File

@@ -40,8 +40,9 @@ from reflector.db.transcripts import (
from reflector.logger import logger
from reflector.pipelines.runner import PipelineMessage, PipelineRunner
from reflector.processors import (
AudioChunkerProcessor,
AudioChunkerAutoProcessor,
AudioDiarizationAutoProcessor,
AudioDownscaleProcessor,
AudioFileWriterProcessor,
AudioMergeProcessor,
AudioTranscriptAutoProcessor,
@@ -365,7 +366,8 @@ class PipelineMainLive(PipelineMainBase):
path=transcript.audio_wav_filename,
on_duration=self.on_duration,
),
AudioChunkerProcessor(),
AudioDownscaleProcessor(),
AudioChunkerAutoProcessor(),
AudioMergeProcessor(),
AudioTranscriptAutoProcessor.as_threaded(),
TranscriptLinerProcessor(),

View File

@@ -1,5 +1,7 @@
from .audio_chunker import AudioChunkerProcessor # noqa: F401
from .audio_chunker_auto import AudioChunkerAutoProcessor # noqa: F401
from .audio_diarization_auto import AudioDiarizationAutoProcessor # noqa: F401
from .audio_downscale import AudioDownscaleProcessor # noqa: F401
from .audio_file_writer import AudioFileWriterProcessor # noqa: F401
from .audio_merge import AudioMergeProcessor # noqa: F401
from .audio_transcript import AudioTranscriptProcessor # noqa: F401

View File

@@ -1,340 +1,78 @@
from typing import Optional
import av
import numpy as np
import torch
from silero_vad import VADIterator, load_silero_vad
from prometheus_client import Counter, Histogram
from reflector.processors.base import Processor
class AudioChunkerProcessor(Processor):
"""
Assemble audio frames into chunks with VAD-based speech detection
Base class for assembling audio frames into chunks
"""
INPUT_TYPE = av.AudioFrame
OUTPUT_TYPE = list[av.AudioFrame]
def __init__(
self,
block_frames=256,
max_frames=1024,
vad_threshold=0.5,
use_onnx=False,
min_frames=2,
):
super().__init__()
m_chunk = Histogram(
"audio_chunker",
"Time spent in AudioChunker.chunk",
["backend"],
)
m_chunk_call = Counter(
"audio_chunker_call",
"Number of calls to AudioChunker.chunk",
["backend"],
)
m_chunk_success = Counter(
"audio_chunker_success",
"Number of successful calls to AudioChunker.chunk",
["backend"],
)
m_chunk_failure = Counter(
"audio_chunker_failure",
"Number of failed calls to AudioChunker.chunk",
["backend"],
)
def __init__(self, *args, **kwargs):
name = self.__class__.__name__
self.m_chunk = self.m_chunk.labels(name)
self.m_chunk_call = self.m_chunk_call.labels(name)
self.m_chunk_success = self.m_chunk_success.labels(name)
self.m_chunk_failure = self.m_chunk_failure.labels(name)
super().__init__(*args, **kwargs)
self.frames: list[av.AudioFrame] = []
self.block_frames = block_frames
self.max_frames = max_frames
self.vad_threshold = vad_threshold
self.min_frames = min_frames
# Initialize Silero VAD
self._init_vad(use_onnx)
def _init_vad(self, use_onnx=False):
"""Initialize Silero VAD model"""
try:
torch.set_num_threads(1)
self.vad_model = load_silero_vad(onnx=use_onnx)
self.vad_iterator = VADIterator(self.vad_model, sampling_rate=16000)
self.logger.info("Silero VAD initialized successfully")
except Exception as e:
self.logger.error(f"Failed to initialize Silero VAD: {e}")
self.vad_model = None
self.vad_iterator = None
async def _push(self, data: av.AudioFrame):
self.frames.append(data)
# print("timestamp", data.pts * data.time_base * 1000)
# Check for speech segments every 32 frames (~1 second)
if len(self.frames) >= 32 and len(self.frames) % 32 == 0:
await self._process_block()
# Safety fallback - emit if we hit max frames
elif len(self.frames) >= self.max_frames:
self.logger.warning(
f"AudioChunkerProcessor: Reached max frames ({self.max_frames}), "
f"emitting first {self.max_frames // 2} frames"
)
frames_to_emit = self.frames[: self.max_frames // 2]
self.frames = self.frames[self.max_frames // 2 :]
if len(frames_to_emit) >= self.min_frames:
await self.emit(frames_to_emit)
else:
self.logger.debug(
f"Ignoring fallback segment with {len(frames_to_emit)} frames "
f"(< {self.min_frames} minimum)"
"""Process incoming audio frame"""
# Validate audio format on first frame
if len(self.frames) == 0:
if data.sample_rate != 16000 or len(data.layout.channels) != 1:
raise ValueError(
f"AudioChunkerProcessor expects 16kHz mono audio, got {data.sample_rate}Hz "
f"with {len(data.layout.channels)} channel(s). "
f"Use AudioDownscaleProcessor before this processor."
)
async def _process_block(self):
# Need at least 32 frames for VAD detection (~1 second)
if len(self.frames) < 32 or self.vad_iterator is None:
return
# Processing block with current buffer size
# print(f"Processing block: {len(self.frames)} frames in buffer")
try:
# Convert frames to numpy array for VAD
audio_array = self._frames_to_numpy(self.frames)
self.m_chunk_call.inc()
with self.m_chunk.time():
result = await self._chunk(data)
self.m_chunk_success.inc()
if result:
await self.emit(result)
except Exception:
self.m_chunk_failure.inc()
raise
if audio_array is None:
# Fallback: emit all frames if conversion failed
frames_to_emit = self.frames[:]
self.frames = []
if len(frames_to_emit) >= self.min_frames:
await self.emit(frames_to_emit)
else:
self.logger.debug(
f"Ignoring conversion-failed segment with {len(frames_to_emit)} frames "
f"(< {self.min_frames} minimum)"
)
return
# Find complete speech segments in the buffer
speech_end_frame = self._find_speech_segment_end(audio_array)
if speech_end_frame is None or speech_end_frame <= 0:
# No speech found but buffer is getting large
if len(self.frames) > 512:
# Check if it's all silence and can be discarded
# No speech segment found, buffer at {len(self.frames)} frames
# Could emit silence or discard old frames here
# For now, keep first 256 frames and discard older silence
if len(self.frames) > 768:
self.logger.debug(
f"Discarding {len(self.frames) - 256} old frames (likely silence)"
)
self.frames = self.frames[-256:]
return
# Calculate segment timing information
frames_to_emit = self.frames[:speech_end_frame]
# Get timing from av.AudioFrame
if frames_to_emit:
first_frame = frames_to_emit[0]
last_frame = frames_to_emit[-1]
sample_rate = first_frame.sample_rate
# Calculate duration
total_samples = sum(f.samples for f in frames_to_emit)
duration_seconds = total_samples / sample_rate if sample_rate > 0 else 0
# Get timestamps if available
start_time = (
first_frame.pts * first_frame.time_base if first_frame.pts else 0
)
end_time = (
last_frame.pts * last_frame.time_base if last_frame.pts else 0
)
# Convert to HH:MM:SS format for logging
def format_time(seconds):
if not seconds:
return "00:00:00"
total_seconds = int(float(seconds))
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
start_formatted = format_time(start_time)
end_formatted = format_time(end_time)
# Keep remaining frames for next processing
remaining_after = len(self.frames) - speech_end_frame
# Single structured log line
self.logger.info(
"Speech segment found",
start=start_formatted,
end=end_formatted,
frames=speech_end_frame,
duration=round(duration_seconds, 2),
buffer_before=len(self.frames),
remaining=remaining_after,
)
# Keep remaining frames for next processing
self.frames = self.frames[speech_end_frame:]
# Filter out segments with too few frames
if len(frames_to_emit) >= self.min_frames:
await self.emit(frames_to_emit)
else:
self.logger.debug(
f"Ignoring segment with {len(frames_to_emit)} frames "
f"(< {self.min_frames} minimum)"
)
except Exception as e:
self.logger.error(f"Error in VAD processing: {e}")
# Fallback to simple chunking
if len(self.frames) >= self.block_frames:
frames_to_emit = self.frames[: self.block_frames]
self.frames = self.frames[self.block_frames :]
if len(frames_to_emit) >= self.min_frames:
await self.emit(frames_to_emit)
else:
self.logger.debug(
f"Ignoring exception-fallback segment with {len(frames_to_emit)} frames "
f"(< {self.min_frames} minimum)"
)
def _frames_to_numpy(self, frames: list[av.AudioFrame]) -> Optional[np.ndarray]:
"""Convert av.AudioFrame list to numpy array for VAD processing"""
if not frames:
return None
try:
first_frame = frames[0]
original_sample_rate = first_frame.sample_rate
audio_data = []
for frame in frames:
frame_array = frame.to_ndarray()
# Handle stereo -> mono conversion
if len(frame_array.shape) == 2 and frame_array.shape[0] > 1:
frame_array = np.mean(frame_array, axis=0)
elif len(frame_array.shape) == 2:
frame_array = frame_array.flatten()
audio_data.append(frame_array)
if not audio_data:
return None
combined_audio = np.concatenate(audio_data)
# Resample from 48kHz to 16kHz if needed
if original_sample_rate != 16000:
combined_audio = self._resample_audio(
combined_audio, original_sample_rate, 16000
)
# Ensure float32 format
if combined_audio.dtype == np.int16:
# Normalize int16 audio to float32 in range [-1.0, 1.0]
combined_audio = combined_audio.astype(np.float32) / 32768.0
elif combined_audio.dtype != np.float32:
combined_audio = combined_audio.astype(np.float32)
return combined_audio
except Exception as e:
self.logger.error(f"Error converting frames to numpy: {e}")
return None
def _resample_audio(
self, audio: np.ndarray, from_sr: int, to_sr: int
) -> np.ndarray:
"""Simple linear resampling from from_sr to to_sr"""
if from_sr == to_sr:
return audio
try:
# Simple linear interpolation resampling
ratio = to_sr / from_sr
new_length = int(len(audio) * ratio)
# Create indices for interpolation
old_indices = np.linspace(0, len(audio) - 1, new_length)
resampled = np.interp(old_indices, np.arange(len(audio)), audio)
return resampled.astype(np.float32)
except Exception as e:
self.logger.error("Resampling error", exc_info=e)
# Fallback: simple decimation/repetition
if from_sr > to_sr:
# Downsample by taking every nth sample
step = from_sr // to_sr
return audio[::step]
else:
# Upsample by repeating samples
repeat = to_sr // from_sr
return np.repeat(audio, repeat)
def _find_speech_segment_end(self, audio_array: np.ndarray) -> Optional[int]:
"""Find complete speech segments and return frame index at segment end"""
if self.vad_iterator is None or len(audio_array) == 0:
return None
try:
# Process audio in 512-sample windows for VAD
window_size = 512
min_silence_windows = 3 # Require 3 windows of silence after speech
# Track speech state
in_speech = False
speech_start = None
speech_end = None
silence_count = 0
for i in range(0, len(audio_array), window_size):
chunk = audio_array[i : i + window_size]
if len(chunk) < window_size:
chunk = np.pad(chunk, (0, window_size - len(chunk)))
# Detect if this window has speech
speech_dict = self.vad_iterator(chunk, return_seconds=True)
# VADIterator returns dict with 'start' and 'end' when speech segments are detected
if speech_dict:
if not in_speech:
# Speech started
speech_start = i
in_speech = True
# Debug: print(f"Speech START at sample {i}, VAD: {speech_dict}")
silence_count = 0 # Reset silence counter
continue
if not in_speech:
continue
# We're in speech but found silence
silence_count += 1
if silence_count < min_silence_windows:
continue
# Found end of speech segment
speech_end = i - (min_silence_windows - 1) * window_size
# Debug: print(f"Speech END at sample {speech_end}")
# Convert sample position to frame index
samples_per_frame = self.frames[0].samples if self.frames else 1024
# Account for resampling: we process at 16kHz but frames might be 48kHz
resample_ratio = 48000 / 16000 # 3x
actual_sample_pos = int(speech_end * resample_ratio)
frame_index = actual_sample_pos // samples_per_frame
# Ensure we don't exceed buffer
frame_index = min(frame_index, len(self.frames))
return frame_index
return None
except Exception as e:
self.logger.error(f"Error finding speech segment: {e}")
return None
async def _chunk(self, data: av.AudioFrame) -> Optional[list[av.AudioFrame]]:
"""
Process audio frame and return chunk when ready.
Subclasses should implement their chunking logic here.
"""
raise NotImplementedError
async def _flush(self):
frames = self.frames[:]
self.frames = []
if frames:
if len(frames) >= self.min_frames:
await self.emit(frames)
else:
self.logger.debug(
f"Ignoring flush segment with {len(frames)} frames "
f"(< {self.min_frames} minimum)"
)
"""Flush any remaining frames when processing ends"""
raise NotImplementedError

View File

@@ -0,0 +1,32 @@
import importlib
from reflector.processors.audio_chunker import AudioChunkerProcessor
from reflector.settings import settings
class AudioChunkerAutoProcessor(AudioChunkerProcessor):
_registry = {}
@classmethod
def register(cls, name, kclass):
cls._registry[name] = kclass
def __new__(cls, name: str | None = None, **kwargs):
if name is None:
name = settings.AUDIO_CHUNKER_BACKEND
if name not in cls._registry:
module_name = f"reflector.processors.audio_chunker_{name}"
importlib.import_module(module_name)
# gather specific configuration for the processor
# search `AUDIO_CHUNKER_BACKEND_XXX_YYY`, push to constructor as `backend_xxx_yyy`
config = {}
name_upper = name.upper()
settings_prefix = "AUDIO_CHUNKER_"
config_prefix = f"{settings_prefix}{name_upper}_"
for key, value in settings:
if key.startswith(config_prefix):
config_name = key[len(settings_prefix) :].lower()
config[config_name] = value
return cls._registry[name](**config | kwargs)

View File

@@ -0,0 +1,34 @@
from typing import Optional
import av
from reflector.processors.audio_chunker import AudioChunkerProcessor
from reflector.processors.audio_chunker_auto import AudioChunkerAutoProcessor
class AudioChunkerFramesProcessor(AudioChunkerProcessor):
"""
Simple frame-based audio chunker that emits chunks after a fixed number of frames
"""
def __init__(self, max_frames=256, **kwargs):
super().__init__(**kwargs)
self.max_frames = max_frames
async def _chunk(self, data: av.AudioFrame) -> Optional[list[av.AudioFrame]]:
self.frames.append(data)
if len(self.frames) >= self.max_frames:
frames_to_emit = self.frames[:]
self.frames = []
return frames_to_emit
return None
async def _flush(self):
frames = self.frames[:]
self.frames = []
if frames:
await self.emit(frames)
AudioChunkerAutoProcessor.register("frames", AudioChunkerFramesProcessor)

View File

@@ -0,0 +1,298 @@
from typing import Optional
import av
import numpy as np
import torch
from silero_vad import VADIterator, load_silero_vad
from reflector.processors.audio_chunker import AudioChunkerProcessor
from reflector.processors.audio_chunker_auto import AudioChunkerAutoProcessor
class AudioChunkerSileroProcessor(AudioChunkerProcessor):
"""
Assemble audio frames into chunks with VAD-based speech detection using Silero VAD
"""
def __init__(
self,
block_frames=256,
max_frames=1024,
use_onnx=True,
min_frames=2,
**kwargs,
):
super().__init__(**kwargs)
self.block_frames = block_frames
self.max_frames = max_frames
self.min_frames = min_frames
# Initialize Silero VAD
self._init_vad(use_onnx)
def _init_vad(self, use_onnx=False):
"""Initialize Silero VAD model"""
try:
torch.set_num_threads(1)
self.vad_model = load_silero_vad(onnx=use_onnx)
self.vad_iterator = VADIterator(self.vad_model, sampling_rate=16000)
self.logger.info("Silero VAD initialized successfully")
except Exception as e:
self.logger.error(f"Failed to initialize Silero VAD: {e}")
self.vad_model = None
self.vad_iterator = None
async def _chunk(self, data: av.AudioFrame) -> Optional[list[av.AudioFrame]]:
"""Process audio frame and return chunk when ready"""
self.frames.append(data)
# Check for speech segments every 32 frames (~1 second)
if len(self.frames) >= 32 and len(self.frames) % 32 == 0:
return await self._process_block()
# Safety fallback - emit if we hit max frames
elif len(self.frames) >= self.max_frames:
self.logger.warning(
f"AudioChunkerSileroProcessor: Reached max frames ({self.max_frames}), "
f"emitting first {self.max_frames // 2} frames"
)
frames_to_emit = self.frames[: self.max_frames // 2]
self.frames = self.frames[self.max_frames // 2 :]
if len(frames_to_emit) >= self.min_frames:
return frames_to_emit
else:
self.logger.debug(
f"Ignoring fallback segment with {len(frames_to_emit)} frames "
f"(< {self.min_frames} minimum)"
)
return None
async def _process_block(self) -> Optional[list[av.AudioFrame]]:
# Need at least 32 frames for VAD detection (~1 second)
if len(self.frames) < 32 or self.vad_iterator is None:
return None
# Processing block with current buffer size
print(f"Processing block: {len(self.frames)} frames in buffer")
try:
# Convert frames to numpy array for VAD
audio_array = self._frames_to_numpy(self.frames)
if audio_array is None:
# Fallback: emit all frames if conversion failed
frames_to_emit = self.frames[:]
self.frames = []
if len(frames_to_emit) >= self.min_frames:
return frames_to_emit
else:
self.logger.debug(
f"Ignoring conversion-failed segment with {len(frames_to_emit)} frames "
f"(< {self.min_frames} minimum)"
)
return None
# Find complete speech segments in the buffer
speech_end_frame = self._find_speech_segment_end(audio_array)
if speech_end_frame is None or speech_end_frame <= 0:
# No speech found but buffer is getting large
if len(self.frames) > 512:
# Check if it's all silence and can be discarded
# No speech segment found, buffer at {len(self.frames)} frames
# Could emit silence or discard old frames here
# For now, keep first 256 frames and discard older silence
if len(self.frames) > 768:
self.logger.debug(
f"Discarding {len(self.frames) - 256} old frames (likely silence)"
)
self.frames = self.frames[-256:]
return None
# Calculate segment timing information
frames_to_emit = self.frames[:speech_end_frame]
# Get timing from av.AudioFrame
if frames_to_emit:
first_frame = frames_to_emit[0]
last_frame = frames_to_emit[-1]
sample_rate = first_frame.sample_rate
# Calculate duration
total_samples = sum(f.samples for f in frames_to_emit)
duration_seconds = total_samples / sample_rate if sample_rate > 0 else 0
# Get timestamps if available
start_time = (
first_frame.pts * first_frame.time_base if first_frame.pts else 0
)
end_time = (
last_frame.pts * last_frame.time_base if last_frame.pts else 0
)
# Convert to HH:MM:SS format for logging
def format_time(seconds):
if not seconds:
return "00:00:00"
total_seconds = int(float(seconds))
hours = total_seconds // 3600
minutes = (total_seconds % 3600) // 60
secs = total_seconds % 60
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
start_formatted = format_time(start_time)
end_formatted = format_time(end_time)
# Keep remaining frames for next processing
remaining_after = len(self.frames) - speech_end_frame
# Single structured log line
self.logger.info(
"Speech segment found",
start=start_formatted,
end=end_formatted,
frames=speech_end_frame,
duration=round(duration_seconds, 2),
buffer_before=len(self.frames),
remaining=remaining_after,
)
# Keep remaining frames for next processing
self.frames = self.frames[speech_end_frame:]
# Filter out segments with too few frames
if len(frames_to_emit) >= self.min_frames:
return frames_to_emit
else:
self.logger.debug(
f"Ignoring segment with {len(frames_to_emit)} frames "
f"(< {self.min_frames} minimum)"
)
except Exception as e:
self.logger.error(f"Error in VAD processing: {e}")
# Fallback to simple chunking
if len(self.frames) >= self.block_frames:
frames_to_emit = self.frames[: self.block_frames]
self.frames = self.frames[self.block_frames :]
if len(frames_to_emit) >= self.min_frames:
return frames_to_emit
else:
self.logger.debug(
f"Ignoring exception-fallback segment with {len(frames_to_emit)} frames "
f"(< {self.min_frames} minimum)"
)
return None
def _frames_to_numpy(self, frames: list[av.AudioFrame]) -> Optional[np.ndarray]:
"""Convert av.AudioFrame list to numpy array for VAD processing"""
if not frames:
return None
try:
audio_data = []
for frame in frames:
frame_array = frame.to_ndarray()
if len(frame_array.shape) == 2:
frame_array = frame_array.flatten()
audio_data.append(frame_array)
if not audio_data:
return None
combined_audio = np.concatenate(audio_data)
# Ensure float32 format
if combined_audio.dtype == np.int16:
# Normalize int16 audio to float32 in range [-1.0, 1.0]
combined_audio = combined_audio.astype(np.float32) / 32768.0
elif combined_audio.dtype != np.float32:
combined_audio = combined_audio.astype(np.float32)
return combined_audio
except Exception as e:
self.logger.error(f"Error converting frames to numpy: {e}")
return None
def _find_speech_segment_end(self, audio_array: np.ndarray) -> Optional[int]:
"""Find complete speech segments and return frame index at segment end"""
if self.vad_iterator is None or len(audio_array) == 0:
return None
try:
# Process audio in 512-sample windows for VAD
window_size = 512
min_silence_windows = 3 # Require 3 windows of silence after speech
# Track speech state
in_speech = False
speech_start = None
speech_end = None
silence_count = 0
for i in range(0, len(audio_array), window_size):
chunk = audio_array[i : i + window_size]
if len(chunk) < window_size:
chunk = np.pad(chunk, (0, window_size - len(chunk)))
# Detect if this window has speech
speech_dict = self.vad_iterator(chunk, return_seconds=True)
# VADIterator returns dict with 'start' and 'end' when speech segments are detected
if speech_dict:
if not in_speech:
# Speech started
speech_start = i
in_speech = True
# Debug: print(f"Speech START at sample {i}, VAD: {speech_dict}")
silence_count = 0 # Reset silence counter
continue
if not in_speech:
continue
# We're in speech but found silence
silence_count += 1
if silence_count < min_silence_windows:
continue
# Found end of speech segment
speech_end = i - (min_silence_windows - 1) * window_size
# Debug: print(f"Speech END at sample {speech_end}")
# Convert sample position to frame index
samples_per_frame = self.frames[0].samples if self.frames else 1024
frame_index = speech_end // samples_per_frame
# Ensure we don't exceed buffer
frame_index = min(frame_index, len(self.frames))
return frame_index
return None
except Exception as e:
self.logger.error(f"Error finding speech segment: {e}")
return None
async def _flush(self):
frames = self.frames[:]
self.frames = []
if frames:
if len(frames) >= self.min_frames:
await self.emit(frames)
else:
self.logger.debug(
f"Ignoring flush segment with {len(frames)} frames "
f"(< {self.min_frames} minimum)"
)
AudioChunkerAutoProcessor.register("silero", AudioChunkerSileroProcessor)

View File

@@ -0,0 +1,60 @@
from typing import Optional
import av
from av.audio.resampler import AudioResampler
from reflector.processors.base import Processor
def copy_frame(frame: av.AudioFrame) -> av.AudioFrame:
frame_copy = frame.from_ndarray(
frame.to_ndarray(),
format=frame.format.name,
layout=frame.layout.name,
)
frame_copy.sample_rate = frame.sample_rate
frame_copy.pts = frame.pts
frame_copy.time_base = frame.time_base
return frame_copy
class AudioDownscaleProcessor(Processor):
"""
Downscale audio frames to 16kHz mono format
"""
INPUT_TYPE = av.AudioFrame
OUTPUT_TYPE = av.AudioFrame
def __init__(self, target_rate: int = 16000, target_layout: str = "mono", **kwargs):
super().__init__(**kwargs)
self.target_rate = target_rate
self.target_layout = target_layout
self.resampler: Optional[AudioResampler] = None
self.needs_resampling: Optional[bool] = None
async def _push(self, data: av.AudioFrame):
if self.needs_resampling is None:
self.needs_resampling = (
data.sample_rate != self.target_rate
or data.layout.name != self.target_layout
)
if self.needs_resampling:
self.resampler = AudioResampler(
format="s16", layout=self.target_layout, rate=self.target_rate
)
if not self.needs_resampling or not self.resampler:
await self.emit(data)
return
resampled_frames = self.resampler.resample(copy_frame(data))
for resampled_frame in resampled_frames:
await self.emit(resampled_frame)
async def _flush(self):
if self.needs_resampling and self.resampler:
final_frames = self.resampler.resample(None)
for frame in final_frames:
await self.emit(frame)

View File

@@ -3,24 +3,11 @@ from time import monotonic_ns
from uuid import uuid4
import av
from av.audio.resampler import AudioResampler
from reflector.processors.base import Processor
from reflector.processors.types import AudioFile
def copy_frame(frame: av.AudioFrame) -> av.AudioFrame:
frame_copy = frame.from_ndarray(
frame.to_ndarray(),
format=frame.format.name,
layout=frame.layout.name,
)
frame_copy.sample_rate = frame.sample_rate
frame_copy.pts = frame.pts
frame_copy.time_base = frame.time_base
return frame_copy
class AudioMergeProcessor(Processor):
"""
Merge audio frame into a single file
@@ -29,9 +16,8 @@ class AudioMergeProcessor(Processor):
INPUT_TYPE = list[av.AudioFrame]
OUTPUT_TYPE = AudioFile
def __init__(self, downsample_to_16k_mono: bool = True, **kwargs):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.downsample_to_16k_mono = downsample_to_16k_mono
async def _push(self, data: list[av.AudioFrame]):
if not data:
@@ -39,72 +25,27 @@ class AudioMergeProcessor(Processor):
# get audio information from first frame
frame = data[0]
original_channels = len(frame.layout.channels)
original_sample_rate = frame.sample_rate
original_sample_width = frame.format.bytes
# determine if we need processing
needs_processing = self.downsample_to_16k_mono and (
original_sample_rate != 16000 or original_channels != 1
)
# determine output parameters
if self.downsample_to_16k_mono:
output_sample_rate = 16000
output_channels = 1
output_sample_width = 2 # 16-bit = 2 bytes
else:
output_sample_rate = original_sample_rate
output_channels = original_channels
output_sample_width = original_sample_width
output_channels = len(frame.layout.channels)
output_sample_rate = frame.sample_rate
output_sample_width = frame.format.bytes
# create audio file
uu = uuid4().hex
fd = io.BytesIO()
if needs_processing:
# Process with PyAV resampler
out_container = av.open(fd, "w", format="wav")
out_stream = out_container.add_stream("pcm_s16le", rate=16000)
out_stream.layout = "mono"
# Use PyAV to write frames
out_container = av.open(fd, "w", format="wav")
out_stream = out_container.add_stream("pcm_s16le", rate=output_sample_rate)
out_stream.layout = frame.layout.name
# Create resampler if needed
resampler = None
if original_sample_rate != 16000 or original_channels != 1:
resampler = AudioResampler(format="s16", layout="mono", rate=16000)
for frame in data:
if resampler:
# Resample and convert to mono
# XXX for an unknown reason, if we don't use a copy of the frame, we get
# Invalid Argumment from resample. Debugging indicate that when a previous processor
# already used the frame (like AudioFileWriter), it make it invalid argument here.
resampled_frames = resampler.resample(copy_frame(frame))
for resampled_frame in resampled_frames:
for packet in out_stream.encode(resampled_frame):
out_container.mux(packet)
else:
# Direct encoding without resampling
for packet in out_stream.encode(frame):
out_container.mux(packet)
# Flush the encoder
for packet in out_stream.encode(None):
for frame in data:
for packet in out_stream.encode(frame):
out_container.mux(packet)
out_container.close()
else:
# Use PyAV for original frames (no processing needed)
out_container = av.open(fd, "w", format="wav")
out_stream = out_container.add_stream("pcm_s16le", rate=output_sample_rate)
out_stream.layout = "mono" if output_channels == 1 else frame.layout
for frame in data:
for packet in out_stream.encode(frame):
out_container.mux(packet)
for packet in out_stream.encode(None):
out_container.mux(packet)
out_container.close()
# Flush the encoder
for packet in out_stream.encode(None):
out_container.mux(packet)
out_container.close()
fd.seek(0)

View File

@@ -12,9 +12,6 @@ API will be a POST request to TRANSCRIPT_URL:
"""
from typing import List
import aiohttp
from openai import AsyncOpenAI
from reflector.processors.audio_transcript import AudioTranscriptProcessor
@@ -25,7 +22,9 @@ from reflector.settings import settings
class AudioTranscriptModalProcessor(AudioTranscriptProcessor):
def __init__(
self, modal_api_key: str | None = None, batch_enabled: bool = True, **kwargs
self,
modal_api_key: str | None = None,
**kwargs,
):
super().__init__()
if not settings.TRANSCRIPT_URL:
@@ -35,126 +34,6 @@ class AudioTranscriptModalProcessor(AudioTranscriptProcessor):
self.transcript_url = settings.TRANSCRIPT_URL + "/v1"
self.timeout = settings.TRANSCRIPT_TIMEOUT
self.modal_api_key = modal_api_key
self.max_batch_duration = 10.0
self.max_batch_files = 15
self.batch_enabled = batch_enabled
self.pending_files: List[AudioFile] = [] # Files waiting to be processed
@classmethod
def _calculate_duration(cls, audio_file: AudioFile) -> float:
"""Calculate audio duration in seconds from AudioFile metadata"""
# Duration = total_samples / sample_rate
# We need to estimate total samples from the file data
import wave
try:
# Try to read as WAV file to get duration
audio_file.fd.seek(0)
with wave.open(audio_file.fd, "rb") as wav_file:
frames = wav_file.getnframes()
sample_rate = wav_file.getframerate()
duration = frames / sample_rate
return duration
except Exception:
# Fallback: estimate from file size and audio parameters
audio_file.fd.seek(0, 2) # Seek to end
file_size = audio_file.fd.tell()
audio_file.fd.seek(0) # Reset to beginning
# Estimate: file_size / (sample_rate * channels * sample_width)
bytes_per_second = (
audio_file.sample_rate
* audio_file.channels
* (audio_file.sample_width // 8)
)
estimated_duration = (
file_size / bytes_per_second if bytes_per_second > 0 else 0
)
return max(0, estimated_duration)
def _create_batches(self, audio_files: List[AudioFile]) -> List[List[AudioFile]]:
"""Group audio files into batches with maximum 30s total duration"""
batches = []
current_batch = []
current_duration = 0.0
for audio_file in audio_files:
duration = self._calculate_duration(audio_file)
# If adding this file exceeds max duration, start a new batch
if current_duration + duration > self.max_batch_duration and current_batch:
batches.append(current_batch)
current_batch = [audio_file]
current_duration = duration
else:
current_batch.append(audio_file)
current_duration += duration
# Add the last batch if not empty
if current_batch:
batches.append(current_batch)
return batches
async def _transcript_batch(self, audio_files: List[AudioFile]) -> List[Transcript]:
"""Transcribe a batch of audio files using the parakeet backend"""
if not audio_files:
return []
self.logger.debug(f"Batch transcribing {len(audio_files)} files")
# Prepare form data for batch request
data = aiohttp.FormData()
data.add_field("language", self.get_pref("audio:source_language", "en"))
data.add_field("batch", "true")
for i, audio_file in enumerate(audio_files):
audio_file.fd.seek(0)
data.add_field(
"files",
audio_file.fd,
filename=f"{audio_file.name}",
content_type="audio/wav",
)
# Make batch request
headers = {"Authorization": f"Bearer {self.modal_api_key}"}
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.timeout)
) as session:
async with session.post(
f"{self.transcript_url}/audio/transcriptions",
data=data,
headers=headers,
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(
f"Batch transcription failed: {response.status} {error_text}"
)
result = await response.json()
# Process batch results
transcripts = []
results = result.get("results", [])
for i, (audio_file, file_result) in enumerate(zip(audio_files, results)):
transcript = Transcript(
words=[
Word(
text=word_info["word"],
start=word_info["start"],
end=word_info["end"],
)
for word_info in file_result.get("words", [])
]
)
transcript.add_offset(audio_file.timestamp)
transcripts.append(transcript)
return transcripts
async def _transcript(self, data: AudioFile):
async with AsyncOpenAI(
@@ -187,96 +66,5 @@ class AudioTranscriptModalProcessor(AudioTranscriptProcessor):
return transcript
async def transcript_multiple(
self, audio_files: List[AudioFile]
) -> List[Transcript]:
"""Transcribe multiple audio files using batching"""
if len(audio_files) == 1:
# Single file, use existing method
return [await self._transcript(audio_files[0])]
# Create batches with max 30s duration each
batches = self._create_batches(audio_files)
self.logger.debug(
f"Processing {len(audio_files)} files in {len(batches)} batches"
)
# Process all batches concurrently
all_transcripts = []
for batch in batches:
batch_transcripts = await self._transcript_batch(batch)
all_transcripts.extend(batch_transcripts)
return all_transcripts
async def _push(self, data: AudioFile):
"""Override _push to support batching"""
if not self.batch_enabled:
# Use parent implementation for single file processing
return await super()._push(data)
# Add file to pending batch
self.pending_files.append(data)
self.logger.debug(
f"Added file to batch: {data.name}, batch size: {len(self.pending_files)}"
)
# Calculate total duration of pending files
total_duration = sum(self._calculate_duration(f) for f in self.pending_files)
# Process batch if it reaches max duration or has multiple files ready for optimization
should_process_batch = (
total_duration >= self.max_batch_duration
or len(self.pending_files) >= self.max_batch_files
)
if should_process_batch:
await self._process_pending_batch()
async def _process_pending_batch(self):
"""Process all pending files as batches"""
if not self.pending_files:
return
self.logger.debug(f"Processing batch of {len(self.pending_files)} files")
try:
# Create batches respecting duration limit
batches = self._create_batches(self.pending_files)
# Process each batch
for batch in batches:
self.m_transcript_call.inc()
try:
with self.m_transcript.time():
# Use batch transcription
transcripts = await self._transcript_batch(batch)
self.m_transcript_success.inc()
# Emit each transcript
for transcript in transcripts:
if transcript:
await self.emit(transcript)
except Exception:
self.m_transcript_failure.inc()
raise
finally:
# Release audio files
for audio_file in batch:
audio_file.release()
finally:
# Clear pending files
self.pending_files.clear()
async def _flush(self):
"""Process any remaining files when flushing"""
await self._process_pending_batch()
await super()._flush()
AudioTranscriptAutoProcessor.register("modal", AudioTranscriptModalProcessor)

View File

@@ -21,6 +21,10 @@ class Settings(BaseSettings):
# local data directory
DATA_DIR: str = "./data"
# Audio Chunking
# backends: silero, frames
AUDIO_CHUNKER_BACKEND: str = "frames"
# Audio Transcription
# backends: whisper, modal
TRANSCRIPT_BACKEND: str = "whisper"

View File

@@ -16,7 +16,8 @@ import av
from reflector.logger import logger
from reflector.processors import (
AudioChunkerProcessor,
AudioChunkerAutoProcessor,
AudioDownscaleProcessor,
AudioFileWriterProcessor,
AudioMergeProcessor,
AudioTranscriptAutoProcessor,
@@ -95,7 +96,8 @@ async def process_audio_file(
# Add the rest of the processors
processors += [
AudioChunkerProcessor(),
AudioDownscaleProcessor(),
AudioChunkerAutoProcessor(),
AudioMergeProcessor(),
AudioTranscriptAutoProcessor.as_threaded(),
TranscriptLinerProcessor(),
@@ -322,7 +324,8 @@ if __name__ == "__main__":
# Ignore internal processors
if processor in (
"AudioChunkerProcessor",
"AudioDownscaleProcessor",
"AudioChunkerAutoProcessor",
"AudioMergeProcessor",
"AudioFileWriterProcessor",
"TopicCollectorProcessor",

View File

@@ -17,7 +17,8 @@ import av
from reflector.logger import logger
from reflector.processors import (
AudioChunkerProcessor,
AudioChunkerAutoProcessor,
AudioDownscaleProcessor,
AudioFileWriterProcessor,
AudioMergeProcessor,
AudioTranscriptAutoProcessor,
@@ -96,7 +97,8 @@ async def process_audio_file_with_diarization(
# Add the rest of the processors
processors += [
AudioChunkerProcessor(),
AudioDownscaleProcessor(),
AudioChunkerAutoProcessor(),
AudioMergeProcessor(),
AudioTranscriptAutoProcessor.as_threaded(),
]
@@ -276,7 +278,8 @@ if __name__ == "__main__":
# Ignore internal processors
if processor in (
"AudioChunkerProcessor",
"AudioDownscaleProcessor",
"AudioChunkerAutoProcessor",
"AudioMergeProcessor",
"AudioFileWriterProcessor",
"TopicCollectorProcessor",

View File

@@ -53,7 +53,7 @@ async def run_single_processor(args):
async def event_callback(event: PipelineEvent):
processor = event.processor
# ignore some processor
if processor in ("AudioChunkerProcessor", "AudioMergeProcessor"):
if processor in ("AudioChunkerAutoProcessor", "AudioMergeProcessor"):
return
print(f"Event: {event}")
if output_fd: