mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 20:29:06 +00:00
* feat: improve pipeline threading, and transcriber (parakeet and silero vad) * refactor: remove whisperx, implement parakeet * refactor: make audio_chunker more smart and wait for speech, instead of fixed frame * refactor: make audio merge to always downscale the audio to 16k for transcription * refactor: make the audio transcript modal accepting batches * refactor: improve type safety and remove prometheus metrics - Add DiarizationSegment TypedDict for proper diarization typing - Replace List/Optional with modern Python list/| None syntax - Remove all Prometheus metrics from TranscriptDiarizationAssemblerProcessor - Add comprehensive file processing pipeline with parallel execution - Update processor imports and type annotations throughout - Implement optimized file pipeline as default in process.py tool * refactor: convert FileDiarizationProcessor I/O types to BaseModel Update FileDiarizationInput and FileDiarizationOutput to inherit from BaseModel instead of plain classes, following the standard pattern used by other processors in the codebase. * test: add tests for file transcript and diarization with pytest-recording * build: add pytest-recording * feat: add local pyannote for testing * fix: replace PyAV AudioResampler with torchaudio for reliable audio processing - Replace problematic PyAV AudioResampler that was causing ValueError: [Errno 22] Invalid argument - Use torchaudio.functional.resample for robust sample rate conversion - Optimize processing: skip conversion for already 16kHz mono audio - Add direct WAV writing with Python wave module for better performance - Consolidate duplicate downsample checks for cleaner code - Maintain list[av.AudioFrame] input interface - Required for Silero VAD which needs 16kHz mono audio * fix: replace PyAV AudioResampler with torchaudio solution - Resolves ValueError: [Errno 22] Invalid argument in AudioMergeProcessor - Replaces problematic PyAV AudioResampler with torchaudio.functional.resample - Optimizes processing to skip unnecessary conversions when audio is already 16kHz mono - Uses direct WAV writing with Python's wave module for better performance - Fixes test_basic_process to disable diarization (pyannote dependency not installed) - Updates test expectations to match actual processor behavior - Removes unused pydub dependency from pyproject.toml - Adds comprehensive TEST_ANALYSIS.md documenting test suite status * feat: add parameterized test for both diarization modes - Adds @pytest.mark.parametrize to test_basic_process with enable_diarization=[False, True] - Test with diarization=False always passes (tests core AudioMergeProcessor functionality) - Test with diarization=True gracefully skips when pyannote.audio is not installed - Provides comprehensive test coverage for both pipeline configurations * fix: resolve pipeline property naming conflict in AudioDiarizationPyannoteProcessor - Renames 'pipeline' property to 'diarization_pipeline' to avoid conflict with base Processor.pipeline attribute - Fixes AttributeError: 'property 'pipeline' object has no setter' when set_pipeline() is called - Updates property usage in _diarize method to use new name - Now correctly supports pipeline initialization for diarization processing * fix: add local for pyannote * test: add diarization test * fix: resample on audio merge now working * fix: correctly restore timestamp * fix: display exception in a threaded processor if that happen * Update pyproject.toml * ci: remove option * ci: update astral-sh/setup-uv * test: add monadical url for pytest-recording * refactor: remove previous version * build: move faster whisper to local dep * test: fix missing import * refactor: improve main_file_pipeline organization and error handling - Move all imports to the top of the file - Create unified EmptyPipeline class to replace duplicate mock pipeline code - Remove timeout and fallback logic - let processors handle their own retries - Fix error handling to raise any exception from parallel tasks - Add proper type hints and validation for captured results * fix: wrong function * fix: remove task_done * feat: add configurable file processing timeouts for modal processors - Add TRANSCRIPT_FILE_TIMEOUT setting (default: 600s) for file transcription - Add DIARIZATION_FILE_TIMEOUT setting (default: 600s) for file diarization - Replace hardcoded timeout=600 with configurable settings in modal processors - Allows customization of timeout values via environment variables * fix: use logger * fix: worker process meetings now use file pipeline * fix: topic not gathered * refactor: remove prepare(), pipeline now work * refactor: implement many review from Igor * test: add test for test_pipeline_main_file * refactor: remove doc * doc: add doc * ci: update build to use native arm64 builder * fix: merge fixes * refactor: changes from Igor review + add test (not by default) to test gpu modal part * ci: update to our own runner linux-amd64 * ci: try using suggested mode=min * fix: update diarizer for latest modal, and use volume * fix: modal file extension detection * fix: put the diarizer as A100
75 lines
2.9 KiB
Python
75 lines
2.9 KiB
Python
import os
|
|
|
|
import torch
|
|
import torchaudio
|
|
from pyannote.audio import Pipeline
|
|
|
|
from reflector.processors.audio_diarization import AudioDiarizationProcessor
|
|
from reflector.processors.audio_diarization_auto import AudioDiarizationAutoProcessor
|
|
from reflector.processors.types import AudioDiarizationInput, DiarizationSegment
|
|
|
|
|
|
class AudioDiarizationPyannoteProcessor(AudioDiarizationProcessor):
|
|
"""Local diarization processor using pyannote.audio library"""
|
|
|
|
def __init__(
|
|
self,
|
|
model_name: str = "pyannote/speaker-diarization-3.1",
|
|
pyannote_auth_token: str | None = None,
|
|
device: str | None = None,
|
|
**kwargs,
|
|
):
|
|
super().__init__(**kwargs)
|
|
self.model_name = model_name
|
|
self.auth_token = pyannote_auth_token or os.environ.get("HF_TOKEN")
|
|
self.device = device
|
|
|
|
if device is None:
|
|
self.device = "cuda" if torch.cuda.is_available() else "cpu"
|
|
|
|
self.logger.info(f"Loading pyannote diarization model: {self.model_name}")
|
|
self.diarization_pipeline = Pipeline.from_pretrained(
|
|
self.model_name, use_auth_token=self.auth_token
|
|
)
|
|
self.diarization_pipeline.to(torch.device(self.device))
|
|
self.logger.info(f"Diarization model loaded on device: {self.device}")
|
|
|
|
async def _diarize(self, data: AudioDiarizationInput) -> list[DiarizationSegment]:
|
|
try:
|
|
# Load audio file (audio_url is assumed to be a local file path)
|
|
self.logger.info(f"Loading local audio file: {data.audio_url}")
|
|
waveform, sample_rate = torchaudio.load(data.audio_url)
|
|
audio_input = {"waveform": waveform, "sample_rate": sample_rate}
|
|
self.logger.info("Running speaker diarization")
|
|
diarization = self.diarization_pipeline(audio_input)
|
|
|
|
# Convert pyannote diarization output to our format
|
|
segments = []
|
|
for segment, _, speaker in diarization.itertracks(yield_label=True):
|
|
# Extract speaker number from label (e.g., "SPEAKER_00" -> 0)
|
|
speaker_id = 0
|
|
if speaker.startswith("SPEAKER_"):
|
|
try:
|
|
speaker_id = int(speaker.split("_")[-1])
|
|
except (ValueError, IndexError):
|
|
# Fallback to hash-based ID if parsing fails
|
|
speaker_id = hash(speaker) % 1000
|
|
|
|
segments.append(
|
|
{
|
|
"start": round(segment.start, 3),
|
|
"end": round(segment.end, 3),
|
|
"speaker": speaker_id,
|
|
}
|
|
)
|
|
|
|
self.logger.info(f"Diarization completed with {len(segments)} segments")
|
|
return segments
|
|
|
|
except Exception as e:
|
|
self.logger.exception(f"Diarization failed: {e}")
|
|
raise
|
|
|
|
|
|
AudioDiarizationAutoProcessor.register("pyannote", AudioDiarizationPyannoteProcessor)
|