mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 12:19:06 +00:00
* feat: improve pipeline threading, and transcriber (parakeet and silero vad) * refactor: remove whisperx, implement parakeet * refactor: make audio_chunker more smart and wait for speech, instead of fixed frame * refactor: make audio merge to always downscale the audio to 16k for transcription * refactor: make the audio transcript modal accepting batches * refactor: improve type safety and remove prometheus metrics - Add DiarizationSegment TypedDict for proper diarization typing - Replace List/Optional with modern Python list/| None syntax - Remove all Prometheus metrics from TranscriptDiarizationAssemblerProcessor - Add comprehensive file processing pipeline with parallel execution - Update processor imports and type annotations throughout - Implement optimized file pipeline as default in process.py tool * refactor: convert FileDiarizationProcessor I/O types to BaseModel Update FileDiarizationInput and FileDiarizationOutput to inherit from BaseModel instead of plain classes, following the standard pattern used by other processors in the codebase. * test: add tests for file transcript and diarization with pytest-recording * build: add pytest-recording * feat: add local pyannote for testing * fix: replace PyAV AudioResampler with torchaudio for reliable audio processing - Replace problematic PyAV AudioResampler that was causing ValueError: [Errno 22] Invalid argument - Use torchaudio.functional.resample for robust sample rate conversion - Optimize processing: skip conversion for already 16kHz mono audio - Add direct WAV writing with Python wave module for better performance - Consolidate duplicate downsample checks for cleaner code - Maintain list[av.AudioFrame] input interface - Required for Silero VAD which needs 16kHz mono audio * fix: replace PyAV AudioResampler with torchaudio solution - Resolves ValueError: [Errno 22] Invalid argument in AudioMergeProcessor - Replaces problematic PyAV AudioResampler with torchaudio.functional.resample - Optimizes processing to skip unnecessary conversions when audio is already 16kHz mono - Uses direct WAV writing with Python's wave module for better performance - Fixes test_basic_process to disable diarization (pyannote dependency not installed) - Updates test expectations to match actual processor behavior - Removes unused pydub dependency from pyproject.toml - Adds comprehensive TEST_ANALYSIS.md documenting test suite status * feat: add parameterized test for both diarization modes - Adds @pytest.mark.parametrize to test_basic_process with enable_diarization=[False, True] - Test with diarization=False always passes (tests core AudioMergeProcessor functionality) - Test with diarization=True gracefully skips when pyannote.audio is not installed - Provides comprehensive test coverage for both pipeline configurations * fix: resolve pipeline property naming conflict in AudioDiarizationPyannoteProcessor - Renames 'pipeline' property to 'diarization_pipeline' to avoid conflict with base Processor.pipeline attribute - Fixes AttributeError: 'property 'pipeline' object has no setter' when set_pipeline() is called - Updates property usage in _diarize method to use new name - Now correctly supports pipeline initialization for diarization processing * fix: add local for pyannote * test: add diarization test * fix: resample on audio merge now working * fix: correctly restore timestamp * fix: display exception in a threaded processor if that happen * Update pyproject.toml * ci: remove option * ci: update astral-sh/setup-uv * test: add monadical url for pytest-recording * refactor: remove previous version * build: move faster whisper to local dep * test: fix missing import * refactor: improve main_file_pipeline organization and error handling - Move all imports to the top of the file - Create unified EmptyPipeline class to replace duplicate mock pipeline code - Remove timeout and fallback logic - let processors handle their own retries - Fix error handling to raise any exception from parallel tasks - Add proper type hints and validation for captured results * fix: wrong function * fix: remove task_done * feat: add configurable file processing timeouts for modal processors - Add TRANSCRIPT_FILE_TIMEOUT setting (default: 600s) for file transcription - Add DIARIZATION_FILE_TIMEOUT setting (default: 600s) for file diarization - Replace hardcoded timeout=600 with configurable settings in modal processors - Allows customization of timeout values via environment variables * fix: use logger * fix: worker process meetings now use file pipeline * fix: topic not gathered * refactor: remove prepare(), pipeline now work * refactor: implement many review from Igor * test: add test for test_pipeline_main_file * refactor: remove doc * doc: add doc * ci: update build to use native arm64 builder * fix: merge fixes * refactor: changes from Igor review + add test (not by default) to test gpu modal part * ci: update to our own runner linux-amd64 * ci: try using suggested mode=min * fix: update diarizer for latest modal, and use volume * fix: modal file extension detection * fix: put the diarizer as A100
266 lines
9.4 KiB
Python
266 lines
9.4 KiB
Python
"""
|
|
Tests for Modal-based processors using pytest-recording for HTTP recording/playbook
|
|
|
|
Note: theses tests require full modal configuration to be able to record
|
|
vcr cassettes
|
|
|
|
Configuration required for the first recording:
|
|
- TRANSCRIPT_BACKEND=modal
|
|
- TRANSCRIPT_URL=https://xxxxx--reflector-transcriber-parakeet-web.modal.run
|
|
- TRANSCRIPT_MODAL_API_KEY=xxxxx
|
|
- DIARIZATION_BACKEND=modal
|
|
- DIARIZATION_URL=https://xxxxx--reflector-diarizer-web.modal.run
|
|
- DIARIZATION_MODAL_API_KEY=xxxxx
|
|
"""
|
|
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from reflector.processors.file_diarization import FileDiarizationInput
|
|
from reflector.processors.file_diarization_modal import FileDiarizationModalProcessor
|
|
from reflector.processors.file_transcript import FileTranscriptInput
|
|
from reflector.processors.file_transcript_modal import FileTranscriptModalProcessor
|
|
from reflector.processors.transcript_diarization_assembler import (
|
|
TranscriptDiarizationAssemblerInput,
|
|
TranscriptDiarizationAssemblerProcessor,
|
|
)
|
|
from reflector.processors.types import DiarizationSegment, Transcript, Word
|
|
|
|
# Public test audio file hosted on S3 specifically for reflector pytests
|
|
TEST_AUDIO_URL = (
|
|
"https://reflector-github-pytest.s3.us-east-1.amazonaws.com/test_mathieu_hello.mp3"
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_transcript_modal_processor_missing_url():
|
|
with patch("reflector.processors.file_transcript_modal.settings") as mock_settings:
|
|
mock_settings.TRANSCRIPT_URL = None
|
|
with pytest.raises(Exception, match="TRANSCRIPT_URL required"):
|
|
FileTranscriptModalProcessor(modal_api_key="test-api-key")
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_file_diarization_modal_processor_missing_url():
|
|
with patch("reflector.processors.file_diarization_modal.settings") as mock_settings:
|
|
mock_settings.DIARIZATION_URL = None
|
|
with pytest.raises(Exception, match="DIARIZATION_URL required"):
|
|
FileDiarizationModalProcessor(modal_api_key="test-api-key")
|
|
|
|
|
|
@pytest.mark.vcr()
|
|
@pytest.mark.asyncio
|
|
async def test_file_diarization_modal_processor(vcr):
|
|
"""Test FileDiarizationModalProcessor using public audio URL and Modal API"""
|
|
from reflector.settings import settings
|
|
|
|
processor = FileDiarizationModalProcessor(
|
|
modal_api_key=settings.DIARIZATION_MODAL_API_KEY
|
|
)
|
|
|
|
test_input = FileDiarizationInput(audio_url=TEST_AUDIO_URL)
|
|
result = await processor._diarize(test_input)
|
|
|
|
# Verify the result structure
|
|
assert result is not None
|
|
assert hasattr(result, "diarization")
|
|
assert isinstance(result.diarization, list)
|
|
|
|
# Check structure of each diarization segment
|
|
for segment in result.diarization:
|
|
assert "start" in segment
|
|
assert "end" in segment
|
|
assert "speaker" in segment
|
|
assert isinstance(segment["start"], (int, float))
|
|
assert isinstance(segment["end"], (int, float))
|
|
assert isinstance(segment["speaker"], int)
|
|
# Basic sanity check - start should be before end
|
|
assert segment["start"] < segment["end"]
|
|
|
|
|
|
@pytest.mark.vcr()
|
|
@pytest.mark.asyncio
|
|
async def test_file_transcript_modal_processor():
|
|
"""Test FileTranscriptModalProcessor using public audio URL and Modal API"""
|
|
from reflector.settings import settings
|
|
|
|
processor = FileTranscriptModalProcessor(
|
|
modal_api_key=settings.TRANSCRIPT_MODAL_API_KEY
|
|
)
|
|
|
|
test_input = FileTranscriptInput(
|
|
audio_url=TEST_AUDIO_URL,
|
|
language="en",
|
|
)
|
|
|
|
# This will record the HTTP interaction on first run, replay on subsequent runs
|
|
result = await processor._transcript(test_input)
|
|
|
|
# Verify the result structure
|
|
assert result is not None
|
|
assert hasattr(result, "words")
|
|
assert isinstance(result.words, list)
|
|
|
|
# Check structure of each word if present
|
|
for word in result.words:
|
|
assert hasattr(word, "text")
|
|
assert hasattr(word, "start")
|
|
assert hasattr(word, "end")
|
|
assert isinstance(word.start, (int, float))
|
|
assert isinstance(word.end, (int, float))
|
|
assert isinstance(word.text, str)
|
|
# Basic sanity check - start should be before or equal to end
|
|
assert word.start <= word.end
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_transcript_diarization_assembler_processor():
|
|
"""Test TranscriptDiarizationAssemblerProcessor without VCR (no HTTP requests)"""
|
|
# Create test transcript with words
|
|
words = [
|
|
Word(text="Hello", start=0.0, end=1.0, speaker=0),
|
|
Word(text=" ", start=1.0, end=1.1, speaker=0),
|
|
Word(text="world", start=1.1, end=2.0, speaker=0),
|
|
Word(text=".", start=2.0, end=2.1, speaker=0),
|
|
Word(text=" ", start=2.1, end=2.2, speaker=0),
|
|
Word(text="How", start=2.2, end=2.8, speaker=0),
|
|
Word(text=" ", start=2.8, end=2.9, speaker=0),
|
|
Word(text="are", start=2.9, end=3.2, speaker=0),
|
|
Word(text=" ", start=3.2, end=3.3, speaker=0),
|
|
Word(text="you", start=3.3, end=3.8, speaker=0),
|
|
Word(text="?", start=3.8, end=3.9, speaker=0),
|
|
]
|
|
transcript = Transcript(words=words)
|
|
|
|
# Create test diarization segments
|
|
diarization = [
|
|
DiarizationSegment(start=0.0, end=2.1, speaker=0),
|
|
DiarizationSegment(start=2.1, end=3.9, speaker=1),
|
|
]
|
|
|
|
# Create processor and test input
|
|
processor = TranscriptDiarizationAssemblerProcessor()
|
|
test_input = TranscriptDiarizationAssemblerInput(
|
|
transcript=transcript, diarization=diarization
|
|
)
|
|
|
|
# Track emitted results
|
|
emitted_results = []
|
|
|
|
async def capture_result(result):
|
|
emitted_results.append(result)
|
|
|
|
processor.on(capture_result)
|
|
|
|
# Process the input
|
|
await processor.push(test_input)
|
|
|
|
# Verify result was emitted
|
|
assert len(emitted_results) == 1
|
|
result = emitted_results[0]
|
|
|
|
# Verify result structure
|
|
assert isinstance(result, Transcript)
|
|
assert len(result.words) == len(words)
|
|
|
|
# Verify speaker assignments were applied
|
|
# Words 0-3 (indices) should be speaker 0 (time 0.0-2.0)
|
|
# Words 4-10 (indices) should be speaker 1 (time 2.1-3.9)
|
|
for i in range(4): # First 4 words (Hello, space, world, .)
|
|
assert (
|
|
result.words[i].speaker == 0
|
|
), f"Word {i} '{result.words[i].text}' should be speaker 0, got {result.words[i].speaker}"
|
|
|
|
for i in range(4, 11): # Remaining words (space, How, space, are, space, you, ?)
|
|
assert (
|
|
result.words[i].speaker == 1
|
|
), f"Word {i} '{result.words[i].text}' should be speaker 1, got {result.words[i].speaker}"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_transcript_diarization_assembler_no_diarization():
|
|
"""Test TranscriptDiarizationAssemblerProcessor with no diarization data"""
|
|
# Create test transcript
|
|
words = [Word(text="Hello", start=0.0, end=1.0, speaker=0)]
|
|
transcript = Transcript(words=words)
|
|
|
|
# Create processor and test input with empty diarization
|
|
processor = TranscriptDiarizationAssemblerProcessor()
|
|
test_input = TranscriptDiarizationAssemblerInput(
|
|
transcript=transcript, diarization=[]
|
|
)
|
|
|
|
# Track emitted results
|
|
emitted_results = []
|
|
|
|
async def capture_result(result):
|
|
emitted_results.append(result)
|
|
|
|
processor.on(capture_result)
|
|
|
|
# Process the input
|
|
await processor.push(test_input)
|
|
|
|
# Verify original transcript was returned unchanged
|
|
assert len(emitted_results) == 1
|
|
result = emitted_results[0]
|
|
assert result is transcript # Should be the same object
|
|
assert result.words[0].speaker == 0 # Original speaker unchanged
|
|
|
|
|
|
@pytest.mark.vcr()
|
|
@pytest.mark.asyncio
|
|
async def test_full_modal_pipeline_integration(vcr):
|
|
"""Integration test: Transcription -> Diarization -> Assembly
|
|
|
|
This test demonstrates the full pipeline:
|
|
1. Run transcription via Modal
|
|
2. Run diarization via Modal
|
|
3. Assemble transcript with diarization
|
|
"""
|
|
from reflector.settings import settings
|
|
|
|
# Step 1: Transcription
|
|
transcript_processor = FileTranscriptModalProcessor(
|
|
modal_api_key=settings.TRANSCRIPT_MODAL_API_KEY
|
|
)
|
|
transcript_input = FileTranscriptInput(audio_url=TEST_AUDIO_URL, language="en")
|
|
transcript = await transcript_processor._transcript(transcript_input)
|
|
|
|
# Step 2: Diarization
|
|
diarization_processor = FileDiarizationModalProcessor(
|
|
modal_api_key=settings.DIARIZATION_MODAL_API_KEY
|
|
)
|
|
diarization_input = FileDiarizationInput(audio_url=TEST_AUDIO_URL)
|
|
diarization_result = await diarization_processor._diarize(diarization_input)
|
|
|
|
# Step 3: Assembly
|
|
assembler = TranscriptDiarizationAssemblerProcessor()
|
|
assembly_input = TranscriptDiarizationAssemblerInput(
|
|
transcript=transcript, diarization=diarization_result.diarization
|
|
)
|
|
|
|
# Track assembled result
|
|
assembled_results = []
|
|
|
|
async def capture_result(result):
|
|
assembled_results.append(result)
|
|
|
|
assembler.on(capture_result)
|
|
|
|
await assembler.push(assembly_input)
|
|
|
|
# Verify the full pipeline worked
|
|
assert len(assembled_results) == 1
|
|
final_transcript = assembled_results[0]
|
|
|
|
# Verify the final transcript has the original words with updated speaker info
|
|
assert isinstance(final_transcript, Transcript)
|
|
assert len(final_transcript.words) == len(transcript.words)
|
|
assert len(final_transcript.words) > 0
|
|
|
|
# Verify some words have been assigned speakers from diarization
|
|
speakers_found = set(word.speaker for word in final_transcript.words)
|
|
assert len(speakers_found) > 0 # At least some speaker assignments
|