mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 20:29:06 +00:00
* feat: improve pipeline threading, and transcriber (parakeet and silero vad) * refactor: remove whisperx, implement parakeet * refactor: make audio_chunker more smart and wait for speech, instead of fixed frame * refactor: make audio merge to always downscale the audio to 16k for transcription * refactor: make the audio transcript modal accepting batches * refactor: improve type safety and remove prometheus metrics - Add DiarizationSegment TypedDict for proper diarization typing - Replace List/Optional with modern Python list/| None syntax - Remove all Prometheus metrics from TranscriptDiarizationAssemblerProcessor - Add comprehensive file processing pipeline with parallel execution - Update processor imports and type annotations throughout - Implement optimized file pipeline as default in process.py tool * refactor: convert FileDiarizationProcessor I/O types to BaseModel Update FileDiarizationInput and FileDiarizationOutput to inherit from BaseModel instead of plain classes, following the standard pattern used by other processors in the codebase. * test: add tests for file transcript and diarization with pytest-recording * build: add pytest-recording * feat: add local pyannote for testing * fix: replace PyAV AudioResampler with torchaudio for reliable audio processing - Replace problematic PyAV AudioResampler that was causing ValueError: [Errno 22] Invalid argument - Use torchaudio.functional.resample for robust sample rate conversion - Optimize processing: skip conversion for already 16kHz mono audio - Add direct WAV writing with Python wave module for better performance - Consolidate duplicate downsample checks for cleaner code - Maintain list[av.AudioFrame] input interface - Required for Silero VAD which needs 16kHz mono audio * fix: replace PyAV AudioResampler with torchaudio solution - Resolves ValueError: [Errno 22] Invalid argument in AudioMergeProcessor - Replaces problematic PyAV AudioResampler with torchaudio.functional.resample - Optimizes processing to skip unnecessary conversions when audio is already 16kHz mono - Uses direct WAV writing with Python's wave module for better performance - Fixes test_basic_process to disable diarization (pyannote dependency not installed) - Updates test expectations to match actual processor behavior - Removes unused pydub dependency from pyproject.toml - Adds comprehensive TEST_ANALYSIS.md documenting test suite status * feat: add parameterized test for both diarization modes - Adds @pytest.mark.parametrize to test_basic_process with enable_diarization=[False, True] - Test with diarization=False always passes (tests core AudioMergeProcessor functionality) - Test with diarization=True gracefully skips when pyannote.audio is not installed - Provides comprehensive test coverage for both pipeline configurations * fix: resolve pipeline property naming conflict in AudioDiarizationPyannoteProcessor - Renames 'pipeline' property to 'diarization_pipeline' to avoid conflict with base Processor.pipeline attribute - Fixes AttributeError: 'property 'pipeline' object has no setter' when set_pipeline() is called - Updates property usage in _diarize method to use new name - Now correctly supports pipeline initialization for diarization processing * fix: add local for pyannote * test: add diarization test * fix: resample on audio merge now working * fix: correctly restore timestamp * fix: display exception in a threaded processor if that happen * Update pyproject.toml * ci: remove option * ci: update astral-sh/setup-uv * test: add monadical url for pytest-recording * refactor: remove previous version * build: move faster whisper to local dep * test: fix missing import * refactor: improve main_file_pipeline organization and error handling - Move all imports to the top of the file - Create unified EmptyPipeline class to replace duplicate mock pipeline code - Remove timeout and fallback logic - let processors handle their own retries - Fix error handling to raise any exception from parallel tasks - Add proper type hints and validation for captured results * fix: wrong function * fix: remove task_done * feat: add configurable file processing timeouts for modal processors - Add TRANSCRIPT_FILE_TIMEOUT setting (default: 600s) for file transcription - Add DIARIZATION_FILE_TIMEOUT setting (default: 600s) for file diarization - Replace hardcoded timeout=600 with configurable settings in modal processors - Allows customization of timeout values via environment variables * fix: use logger * fix: worker process meetings now use file pipeline * fix: topic not gathered * refactor: remove prepare(), pipeline now work * refactor: implement many review from Igor * test: add test for test_pipeline_main_file * refactor: remove doc * doc: add doc * ci: update build to use native arm64 builder * fix: merge fixes * refactor: changes from Igor review + add test (not by default) to test gpu modal part * ci: update to our own runner linux-amd64 * ci: try using suggested mode=min * fix: update diarizer for latest modal, and use volume * fix: modal file extension detection * fix: put the diarizer as A100
194 lines
7.0 KiB
Python
194 lines
7.0 KiB
Python
from reflector.processors.base import Processor
|
|
from reflector.processors.types import (
|
|
AudioDiarizationInput,
|
|
DiarizationSegment,
|
|
TitleSummary,
|
|
Word,
|
|
)
|
|
|
|
|
|
class AudioDiarizationProcessor(Processor):
|
|
INPUT_TYPE = AudioDiarizationInput
|
|
OUTPUT_TYPE = TitleSummary
|
|
|
|
async def _push(self, data: AudioDiarizationInput):
|
|
try:
|
|
self.logger.info("Diarization started", audio_file_url=data.audio_url)
|
|
diarization = await self._diarize(data)
|
|
self.logger.info("Diarization finished")
|
|
except Exception:
|
|
self.logger.exception("Diarization failed after retrying")
|
|
raise
|
|
|
|
# now reapply speaker to topics (if any)
|
|
# topics is a list[BaseModel] with an attribute words
|
|
# words is a list[BaseModel] with text, start and speaker attribute
|
|
|
|
# create a view of words based on topics
|
|
# the current algorithm is using words index, we cannot use a generator
|
|
words = list(self.iter_words_from_topics(data.topics))
|
|
|
|
# assign speaker to words (mutate the words list)
|
|
self.assign_speaker(words, diarization)
|
|
|
|
# emit them
|
|
for topic in data.topics:
|
|
await self.emit(topic)
|
|
|
|
async def _diarize(self, data: AudioDiarizationInput):
|
|
raise NotImplementedError
|
|
|
|
@classmethod
|
|
def assign_speaker(cls, words: list[Word], diarization: list[DiarizationSegment]):
|
|
cls._diarization_remove_overlap(diarization)
|
|
cls._diarization_remove_segment_without_words(words, diarization)
|
|
cls._diarization_merge_same_speaker(diarization)
|
|
cls._diarization_assign_speaker(words, diarization)
|
|
|
|
@staticmethod
|
|
def iter_words_from_topics(topics: list[TitleSummary]):
|
|
for topic in topics:
|
|
for word in topic.transcript.words:
|
|
yield word
|
|
|
|
@staticmethod
|
|
def is_word_continuation(word_prev, word):
|
|
"""
|
|
Return True if the word is a continuation of the previous word
|
|
by checking if the previous word is ending with a punctuation
|
|
or if the current word is starting with a capital letter
|
|
"""
|
|
# is word_prev ending with a punctuation ?
|
|
if word_prev.text and word_prev.text[-1] in ".?!":
|
|
return False
|
|
elif word.text and word.text[0].isupper():
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def _diarization_remove_overlap(diarization: list[DiarizationSegment]):
|
|
"""
|
|
Remove overlap in diarization results
|
|
|
|
When using a diarization algorithm, it's possible to have overlapping segments
|
|
This function remove the overlap by keeping the longest segment
|
|
|
|
Warning: this function mutate the diarization list
|
|
"""
|
|
# remove overlap by keeping the longest segment
|
|
diarization_idx = 0
|
|
while diarization_idx < len(diarization) - 1:
|
|
d = diarization[diarization_idx]
|
|
dnext = diarization[diarization_idx + 1]
|
|
if d["end"] > dnext["start"]:
|
|
# remove the shortest segment
|
|
if d["end"] - d["start"] > dnext["end"] - dnext["start"]:
|
|
# remove next segment
|
|
diarization.pop(diarization_idx + 1)
|
|
else:
|
|
# remove current segment
|
|
diarization.pop(diarization_idx)
|
|
else:
|
|
diarization_idx += 1
|
|
|
|
@staticmethod
|
|
def _diarization_remove_segment_without_words(
|
|
words: list[Word], diarization: list[DiarizationSegment]
|
|
):
|
|
"""
|
|
Remove diarization segments without words
|
|
|
|
Warning: this function mutate the diarization list
|
|
"""
|
|
# count the number of words for each diarization segment
|
|
diarization_count = []
|
|
for d in diarization:
|
|
start = d["start"]
|
|
end = d["end"]
|
|
count = 0
|
|
for word in words:
|
|
if start <= word.start < end:
|
|
count += 1
|
|
elif start < word.end <= end:
|
|
count += 1
|
|
diarization_count.append(count)
|
|
|
|
# remove diarization segments with no words
|
|
diarization_idx = 0
|
|
while diarization_idx < len(diarization):
|
|
if diarization_count[diarization_idx] == 0:
|
|
diarization.pop(diarization_idx)
|
|
diarization_count.pop(diarization_idx)
|
|
else:
|
|
diarization_idx += 1
|
|
|
|
@staticmethod
|
|
def _diarization_merge_same_speaker(diarization: list[DiarizationSegment]):
|
|
"""
|
|
Merge diarization contigous segments with the same speaker
|
|
|
|
Warning: this function mutate the diarization list
|
|
"""
|
|
# merge segment with same speaker
|
|
diarization_idx = 0
|
|
while diarization_idx < len(diarization) - 1:
|
|
d = diarization[diarization_idx]
|
|
dnext = diarization[diarization_idx + 1]
|
|
if d["speaker"] == dnext["speaker"]:
|
|
diarization[diarization_idx]["end"] = dnext["end"]
|
|
diarization.pop(diarization_idx + 1)
|
|
else:
|
|
diarization_idx += 1
|
|
|
|
@classmethod
|
|
def _diarization_assign_speaker(
|
|
cls, words: list[Word], diarization: list[DiarizationSegment]
|
|
):
|
|
"""
|
|
Assign speaker to words based on diarization
|
|
|
|
Warning: this function mutate the words list
|
|
"""
|
|
|
|
word_idx = 0
|
|
last_speaker = 0
|
|
for d in diarization:
|
|
start = d["start"]
|
|
end = d["end"]
|
|
speaker = d["speaker"]
|
|
|
|
# diarization may start after the first set of words
|
|
# in this case, we assign the last speaker
|
|
for word in words[word_idx:]:
|
|
if word.start < start:
|
|
# speaker change, but what make sense for assigning the word ?
|
|
# If it's a new sentence, assign with the new speaker
|
|
# If it's a continuation, assign with the last speaker
|
|
is_continuation = False
|
|
if word_idx > 0 and word_idx < len(words) - 1:
|
|
is_continuation = cls.is_word_continuation(
|
|
*words[word_idx - 1 : word_idx + 1]
|
|
)
|
|
if is_continuation:
|
|
word.speaker = last_speaker
|
|
else:
|
|
word.speaker = speaker
|
|
last_speaker = speaker
|
|
word_idx += 1
|
|
else:
|
|
break
|
|
|
|
# now continue to assign speaker until the word starts after the end
|
|
for word in words[word_idx:]:
|
|
if start <= word.start < end:
|
|
last_speaker = speaker
|
|
word.speaker = speaker
|
|
word_idx += 1
|
|
elif word.start > end:
|
|
break
|
|
|
|
# no more diarization available,
|
|
# assign last speaker to all words without speaker
|
|
for word in words[word_idx:]:
|
|
word.speaker = last_speaker
|