mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 20:29:06 +00:00
* feat: improve pipeline threading, and transcriber (parakeet and silero vad) * refactor: remove whisperx, implement parakeet * refactor: make audio_chunker more smart and wait for speech, instead of fixed frame * refactor: make audio merge to always downscale the audio to 16k for transcription * refactor: make the audio transcript modal accepting batches * refactor: improve type safety and remove prometheus metrics - Add DiarizationSegment TypedDict for proper diarization typing - Replace List/Optional with modern Python list/| None syntax - Remove all Prometheus metrics from TranscriptDiarizationAssemblerProcessor - Add comprehensive file processing pipeline with parallel execution - Update processor imports and type annotations throughout - Implement optimized file pipeline as default in process.py tool * refactor: convert FileDiarizationProcessor I/O types to BaseModel Update FileDiarizationInput and FileDiarizationOutput to inherit from BaseModel instead of plain classes, following the standard pattern used by other processors in the codebase. * test: add tests for file transcript and diarization with pytest-recording * build: add pytest-recording * feat: add local pyannote for testing * fix: replace PyAV AudioResampler with torchaudio for reliable audio processing - Replace problematic PyAV AudioResampler that was causing ValueError: [Errno 22] Invalid argument - Use torchaudio.functional.resample for robust sample rate conversion - Optimize processing: skip conversion for already 16kHz mono audio - Add direct WAV writing with Python wave module for better performance - Consolidate duplicate downsample checks for cleaner code - Maintain list[av.AudioFrame] input interface - Required for Silero VAD which needs 16kHz mono audio * fix: replace PyAV AudioResampler with torchaudio solution - Resolves ValueError: [Errno 22] Invalid argument in AudioMergeProcessor - Replaces problematic PyAV AudioResampler with torchaudio.functional.resample - Optimizes processing to skip unnecessary conversions when audio is already 16kHz mono - Uses direct WAV writing with Python's wave module for better performance - Fixes test_basic_process to disable diarization (pyannote dependency not installed) - Updates test expectations to match actual processor behavior - Removes unused pydub dependency from pyproject.toml - Adds comprehensive TEST_ANALYSIS.md documenting test suite status * feat: add parameterized test for both diarization modes - Adds @pytest.mark.parametrize to test_basic_process with enable_diarization=[False, True] - Test with diarization=False always passes (tests core AudioMergeProcessor functionality) - Test with diarization=True gracefully skips when pyannote.audio is not installed - Provides comprehensive test coverage for both pipeline configurations * fix: resolve pipeline property naming conflict in AudioDiarizationPyannoteProcessor - Renames 'pipeline' property to 'diarization_pipeline' to avoid conflict with base Processor.pipeline attribute - Fixes AttributeError: 'property 'pipeline' object has no setter' when set_pipeline() is called - Updates property usage in _diarize method to use new name - Now correctly supports pipeline initialization for diarization processing * fix: add local for pyannote * test: add diarization test * fix: resample on audio merge now working * fix: correctly restore timestamp * fix: display exception in a threaded processor if that happen * Update pyproject.toml * ci: remove option * ci: update astral-sh/setup-uv * test: add monadical url for pytest-recording * refactor: remove previous version * build: move faster whisper to local dep * test: fix missing import * refactor: improve main_file_pipeline organization and error handling - Move all imports to the top of the file - Create unified EmptyPipeline class to replace duplicate mock pipeline code - Remove timeout and fallback logic - let processors handle their own retries - Fix error handling to raise any exception from parallel tasks - Add proper type hints and validation for captured results * fix: wrong function * fix: remove task_done * feat: add configurable file processing timeouts for modal processors - Add TRANSCRIPT_FILE_TIMEOUT setting (default: 600s) for file transcription - Add DIARIZATION_FILE_TIMEOUT setting (default: 600s) for file diarization - Replace hardcoded timeout=600 with configurable settings in modal processors - Allows customization of timeout values via environment variables * fix: use logger * fix: worker process meetings now use file pipeline * fix: topic not gathered * refactor: remove prepare(), pipeline now work * refactor: implement many review from Igor * test: add test for test_pipeline_main_file * refactor: remove doc * doc: add doc * ci: update build to use native arm64 builder * fix: merge fixes * refactor: changes from Igor review + add test (not by default) to test gpu modal part * ci: update to our own runner linux-amd64 * ci: try using suggested mode=min * fix: update diarizer for latest modal, and use volume * fix: modal file extension detection * fix: put the diarizer as A100
172 lines
5.0 KiB
Python
172 lines
5.0 KiB
Python
"""
|
|
Pipeline Runner
|
|
===============
|
|
|
|
Pipeline runner designed to be executed in a asyncio task.
|
|
|
|
It is meant to be subclassed, and implement a create() method
|
|
that expose/return a Pipeline instance.
|
|
|
|
During its lifecycle, it will emit the following status:
|
|
- started: the pipeline has been started
|
|
- push: the pipeline received at least one data
|
|
- flush: the pipeline is flushing
|
|
- ended: the pipeline has ended
|
|
- error: the pipeline has ended with an error
|
|
"""
|
|
|
|
import asyncio
|
|
from typing import Generic, TypeVar
|
|
|
|
from reflector.logger import logger
|
|
from reflector.processors import Pipeline
|
|
|
|
PipelineMessage = TypeVar("PipelineMessage")
|
|
|
|
|
|
class PipelineRunner(Generic[PipelineMessage]):
|
|
def __init__(self):
|
|
self._task = None
|
|
self._q_cmd = asyncio.Queue(maxsize=4096)
|
|
self._ev_done = asyncio.Event()
|
|
self._is_first_push = True
|
|
self._logger = logger.bind(
|
|
runner=id(self),
|
|
runner_cls=self.__class__.__name__,
|
|
)
|
|
self.status = "idle"
|
|
self.pipeline: Pipeline | None = None
|
|
|
|
async def create(self) -> Pipeline:
|
|
"""
|
|
Create the pipeline if not specified earlier.
|
|
Should be implemented in a subclass
|
|
"""
|
|
raise NotImplementedError()
|
|
|
|
def start(self):
|
|
"""
|
|
Start the pipeline as a coroutine task
|
|
"""
|
|
self._task = asyncio.get_event_loop().create_task(self.run())
|
|
|
|
async def join(self):
|
|
"""
|
|
Wait for the pipeline to finish
|
|
"""
|
|
if self._task:
|
|
await self._task
|
|
|
|
def start_sync(self):
|
|
"""
|
|
Start the pipeline synchronously (for non-asyncio apps)
|
|
"""
|
|
coro = self.run()
|
|
asyncio.run(coro)
|
|
|
|
async def push(self, data: PipelineMessage):
|
|
"""
|
|
Push data to the pipeline
|
|
"""
|
|
await self._add_cmd("PUSH", data)
|
|
|
|
async def flush(self):
|
|
"""
|
|
Flush the pipeline
|
|
"""
|
|
await self._add_cmd("FLUSH", None)
|
|
|
|
async def on_status(self, status):
|
|
"""
|
|
Called when the status of the pipeline changes
|
|
"""
|
|
pass
|
|
|
|
async def on_ended(self):
|
|
"""
|
|
Called when the pipeline ends
|
|
"""
|
|
pass
|
|
|
|
async def _add_cmd(
|
|
self,
|
|
cmd: str,
|
|
data: PipelineMessage,
|
|
max_retries: int = 3,
|
|
retry_time_limit: int = 3,
|
|
):
|
|
"""
|
|
Enqueue a command to be executed in the runner.
|
|
Currently supported commands: PUSH, FLUSH
|
|
"""
|
|
for _ in range(max_retries):
|
|
try:
|
|
self._q_cmd.put_nowait([cmd, data])
|
|
break # Break if put succeeds
|
|
except asyncio.queues.QueueFull:
|
|
# Handle only the QueueFull exception, retry after a small delay
|
|
self._logger.debug(
|
|
f"Encountered a full queue, while trying to add [{cmd, data}]. "
|
|
f"Retrying in {retry_time_limit} seconds"
|
|
)
|
|
await asyncio.sleep(retry_time_limit)
|
|
else:
|
|
print(f"Failed to add [{cmd, data}] after {max_retries} attempts.")
|
|
|
|
async def _set_status(self, status):
|
|
self._logger.debug("Runner status updated", status=status)
|
|
self.status = status
|
|
if self.on_status:
|
|
try:
|
|
await self.on_status(status)
|
|
except Exception:
|
|
self._logger.exception("Runer error while setting status")
|
|
|
|
async def run(self):
|
|
try:
|
|
# create the pipeline if not yet done
|
|
await self._set_status("init")
|
|
self._is_first_push = True
|
|
if not self.pipeline:
|
|
self.pipeline = await self.create()
|
|
|
|
if not self.pipeline:
|
|
# no pipeline created in create, just finish it then.
|
|
await self._set_status("ended")
|
|
self._ev_done.set()
|
|
if self.on_ended:
|
|
await self.on_ended()
|
|
return
|
|
|
|
# start the loop
|
|
await self._set_status("started")
|
|
while not self._ev_done.is_set():
|
|
cmd, data = await self._q_cmd.get()
|
|
func = getattr(self, f"cmd_{cmd.lower()}")
|
|
if func:
|
|
if cmd.upper() == "FLUSH":
|
|
await func()
|
|
else:
|
|
await func(data)
|
|
else:
|
|
raise Exception(f"Unknown command {cmd}")
|
|
except Exception:
|
|
self._logger.exception("Runner error")
|
|
await self._set_status("error")
|
|
self._ev_done.set()
|
|
raise
|
|
|
|
async def cmd_push(self, data: PipelineMessage):
|
|
if self._is_first_push:
|
|
await self._set_status("push")
|
|
self._is_first_push = False
|
|
await self.pipeline.push(data)
|
|
|
|
async def cmd_flush(self):
|
|
await self._set_status("flush")
|
|
await self.pipeline.flush()
|
|
await self._set_status("ended")
|
|
self._ev_done.set()
|
|
if self.on_ended:
|
|
await self.on_ended()
|