mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 20:29:06 +00:00
processors: split processors into their own files
This commit is contained in:
@@ -1,542 +0,0 @@
|
||||
from pathlib import Path
|
||||
import av
|
||||
import wave
|
||||
from dataclasses import dataclass
|
||||
from faster_whisper import WhisperModel
|
||||
from reflector.models import TitleSummaryInput, ParseLLMResult
|
||||
from reflector.settings import settings
|
||||
from reflector.logger import logger
|
||||
import httpx
|
||||
import asyncio
|
||||
import json
|
||||
from uuid import uuid4
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
|
||||
@dataclass
|
||||
class AudioFile:
|
||||
path: Path
|
||||
sample_rate: int
|
||||
channels: int
|
||||
sample_width: int
|
||||
timestamp: float = 0.0
|
||||
|
||||
def release(self):
|
||||
self.path.unlink()
|
||||
|
||||
|
||||
@dataclass
|
||||
class Word:
|
||||
text: str
|
||||
start: float
|
||||
end: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class TitleSummary:
|
||||
title: str
|
||||
summary: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Transcript:
|
||||
text: str = ""
|
||||
words: list[Word] = None
|
||||
|
||||
@property
|
||||
def human_timestamp(self):
|
||||
minutes = int(self.timestamp / 60)
|
||||
seconds = int(self.timestamp % 60)
|
||||
milliseconds = int((self.timestamp % 1) * 1000)
|
||||
return f"{minutes:02d}:{seconds:02d}.{milliseconds:03d}"
|
||||
|
||||
@property
|
||||
def timestamp(self):
|
||||
if not self.words:
|
||||
raise ValueError("No words in transcript")
|
||||
return self.words[0].start
|
||||
|
||||
@property
|
||||
def duration(self):
|
||||
if not self.words:
|
||||
raise ValueError("No words in transcript")
|
||||
return self.words[-1].end - self.words[0].start
|
||||
|
||||
def merge(self, other: "Transcript"):
|
||||
if not self.words:
|
||||
self.words = other.words
|
||||
else:
|
||||
self.words.extend(other.words)
|
||||
self.text += other.text
|
||||
|
||||
|
||||
class Processor:
|
||||
INPUT_TYPE: type = None
|
||||
OUTPUT_TYPE: type = None
|
||||
|
||||
def __init__(self, callback=None, custom_logger=None):
|
||||
self._processors = []
|
||||
self._callbacks = []
|
||||
if callback:
|
||||
self.on(callback)
|
||||
self.uid = uuid4().hex
|
||||
self.logger = (custom_logger or logger).bind(processor=self.__class__.__name__)
|
||||
|
||||
def set_pipeline(self, pipeline: "Pipeline"):
|
||||
self.logger = self.logger.bind(pipeline=pipeline.uid)
|
||||
|
||||
def connect(self, processor: "Processor"):
|
||||
"""
|
||||
Connect this processor output to another processor
|
||||
"""
|
||||
if processor.INPUT_TYPE != self.OUTPUT_TYPE:
|
||||
raise ValueError(
|
||||
f"Processor {processor} input type {processor.INPUT_TYPE} "
|
||||
f"does not match {self.OUTPUT_TYPE}"
|
||||
)
|
||||
self._processors.append(processor)
|
||||
|
||||
def disconnect(self, processor: "Processor"):
|
||||
"""
|
||||
Disconnect this processor data from another processor
|
||||
"""
|
||||
self._processors.remove(processor)
|
||||
|
||||
def on(self, callback):
|
||||
"""
|
||||
Register a callback to be called when data is emitted
|
||||
"""
|
||||
# ensure callback is asynchronous
|
||||
if not asyncio.iscoroutinefunction(callback):
|
||||
raise ValueError("Callback must be a coroutine function")
|
||||
self._callbacks.append(callback)
|
||||
|
||||
def off(self, callback):
|
||||
"""
|
||||
Unregister a callback to be called when data is emitted
|
||||
"""
|
||||
self._callbacks.remove(callback)
|
||||
|
||||
async def emit(self, data):
|
||||
for callback in self._callbacks:
|
||||
await callback(data)
|
||||
for processor in self._processors:
|
||||
await processor.push(data)
|
||||
|
||||
async def push(self, data):
|
||||
"""
|
||||
Push data to this processor. `data` must be of type `INPUT_TYPE`
|
||||
The function returns the output of type `OUTPUT_TYPE`
|
||||
"""
|
||||
# logger.debug(f"{self.__class__.__name__} push")
|
||||
try:
|
||||
return await self._push(data)
|
||||
except Exception:
|
||||
self.logger.exception("Error in push")
|
||||
|
||||
async def flush(self):
|
||||
"""
|
||||
Flush data to this processor
|
||||
"""
|
||||
# logger.debug(f"{self.__class__.__name__} flush")
|
||||
return await self._flush()
|
||||
|
||||
def describe(self, level=0):
|
||||
logger.info(" " * level + self.__class__.__name__)
|
||||
|
||||
async def _push(self, data):
|
||||
raise NotImplementedError
|
||||
|
||||
async def _flush(self):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def as_threaded(cls, *args, **kwargs):
|
||||
"""
|
||||
Return a single threaded processor where output is guaranteed
|
||||
to be in order
|
||||
"""
|
||||
return ThreadedProcessor(cls(*args, **kwargs), max_workers=1)
|
||||
|
||||
|
||||
class ThreadedProcessor(Processor):
|
||||
"""
|
||||
A processor that runs in a separate thread
|
||||
"""
|
||||
|
||||
def __init__(self, processor: Processor, max_workers=1):
|
||||
super().__init__()
|
||||
# FIXME: This is a hack to make sure that the processor is single threaded
|
||||
# but if it is more than 1, then we need to make sure that the processor
|
||||
# is emiting data in order
|
||||
assert max_workers == 1
|
||||
self.processor = processor
|
||||
self.INPUT_TYPE = processor.INPUT_TYPE
|
||||
self.OUTPUT_TYPE = processor.OUTPUT_TYPE
|
||||
self.executor = ThreadPoolExecutor(max_workers=max_workers)
|
||||
self.queue = asyncio.Queue()
|
||||
self.task = asyncio.get_running_loop().create_task(self.loop())
|
||||
|
||||
async def loop(self):
|
||||
while True:
|
||||
data = await self.queue.get()
|
||||
try:
|
||||
if data is None:
|
||||
await self.processor.flush()
|
||||
break
|
||||
await self.processor.push(data)
|
||||
finally:
|
||||
self.queue.task_done()
|
||||
|
||||
async def _push(self, data):
|
||||
await self.queue.put(data)
|
||||
|
||||
async def _flush(self):
|
||||
await self.queue.put(None)
|
||||
await self.queue.join()
|
||||
|
||||
def connect(self, processor: Processor):
|
||||
self.processor.connect(processor)
|
||||
|
||||
def disconnect(self, processor: Processor):
|
||||
self.processor.disconnect(processor)
|
||||
|
||||
def on(self, callback):
|
||||
self.processor.on(callback)
|
||||
|
||||
def describe(self, level=0):
|
||||
super().describe(level)
|
||||
self.processor.describe(level + 1)
|
||||
|
||||
|
||||
class AudioChunkerProcessor(Processor):
|
||||
"""
|
||||
Assemble audio frames into chunks
|
||||
"""
|
||||
|
||||
INPUT_TYPE = av.AudioFrame
|
||||
OUTPUT_TYPE = list[av.AudioFrame]
|
||||
|
||||
def __init__(self, max_frames=256):
|
||||
super().__init__()
|
||||
self.frames: list[av.AudioFrame] = []
|
||||
self.max_frames = max_frames
|
||||
|
||||
async def _push(self, data: av.AudioFrame):
|
||||
self.frames.append(data)
|
||||
if len(self.frames) >= self.max_frames:
|
||||
await self.flush()
|
||||
|
||||
async def _flush(self):
|
||||
frames = self.frames[:]
|
||||
self.frames = []
|
||||
if frames:
|
||||
await self.emit(frames)
|
||||
|
||||
|
||||
class AudioMergeProcessor(Processor):
|
||||
"""
|
||||
Merge audio frame into a single file
|
||||
"""
|
||||
|
||||
INPUT_TYPE = list[av.AudioFrame]
|
||||
OUTPUT_TYPE = AudioFile
|
||||
|
||||
async def _push(self, data: list[av.AudioFrame]):
|
||||
if not data:
|
||||
return
|
||||
|
||||
# get audio information from first frame
|
||||
frame = data[0]
|
||||
channels = len(frame.layout.channels)
|
||||
sample_rate = frame.sample_rate
|
||||
sample_width = frame.format.bytes
|
||||
|
||||
# create audio file
|
||||
from time import monotonic_ns
|
||||
from uuid import uuid4
|
||||
|
||||
uu = uuid4().hex
|
||||
path = Path(f"audio_{monotonic_ns()}_{uu}.wav")
|
||||
with wave.open(path.as_posix(), "wb") as wf:
|
||||
wf.setnchannels(channels)
|
||||
wf.setsampwidth(sample_width)
|
||||
wf.setframerate(sample_rate)
|
||||
for frame in data:
|
||||
wf.writeframes(frame.to_ndarray().tobytes())
|
||||
|
||||
# emit audio file
|
||||
audiofile = AudioFile(
|
||||
path=path,
|
||||
sample_rate=sample_rate,
|
||||
channels=channels,
|
||||
sample_width=sample_width,
|
||||
timestamp=data[0].pts * data[0].time_base,
|
||||
)
|
||||
await self.emit(audiofile)
|
||||
|
||||
|
||||
class AudioTranscriptProcessor(Processor):
|
||||
"""
|
||||
Transcript audio file
|
||||
"""
|
||||
|
||||
INPUT_TYPE = AudioFile
|
||||
OUTPUT_TYPE = Transcript
|
||||
|
||||
async def _push(self, data: AudioFile):
|
||||
try:
|
||||
result = await self._transcript(data)
|
||||
if result:
|
||||
await self.emit(result)
|
||||
finally:
|
||||
data.release()
|
||||
|
||||
async def _transcript(self, data: AudioFile):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class AudioWhisperTranscriptProcessor(AudioTranscriptProcessor):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.model = WhisperModel(
|
||||
"tiny", device="cpu", compute_type="float32", num_workers=12
|
||||
)
|
||||
|
||||
async def _transcript(self, data: AudioFile):
|
||||
segments, _ = self.model.transcribe(
|
||||
data.path.as_posix(),
|
||||
language="en",
|
||||
beam_size=5,
|
||||
# condition_on_previous_text=True,
|
||||
word_timestamps=True,
|
||||
vad_filter=True,
|
||||
vad_parameters={"min_silence_duration_ms": 500},
|
||||
)
|
||||
|
||||
if not segments:
|
||||
return
|
||||
|
||||
transcript = Transcript(words=[])
|
||||
segments = list(segments)
|
||||
ts = data.timestamp
|
||||
|
||||
for segment in segments:
|
||||
transcript.text += segment.text
|
||||
for word in segment.words:
|
||||
transcript.words.append(
|
||||
Word(text=word.word, start=ts + word.start, end=ts + word.end)
|
||||
)
|
||||
|
||||
return transcript
|
||||
|
||||
|
||||
class AudioAutoTranscriptProcessor(AudioTranscriptProcessor):
|
||||
BACKENDS = {
|
||||
"whisper": AudioWhisperTranscriptProcessor,
|
||||
}
|
||||
BACKEND_DEFAULT = "whisper"
|
||||
|
||||
def __init__(self, backend=None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.processor = self.BACKENDS[backend or self.BACKEND_DEFAULT]()
|
||||
|
||||
def connect(self, processor: Processor):
|
||||
self.processor.connect(processor)
|
||||
|
||||
def disconnect(self, processor: Processor):
|
||||
self.processor.disconnect(processor)
|
||||
|
||||
def on(self, callback):
|
||||
self.processor.on(callback)
|
||||
|
||||
def off(self, callback):
|
||||
self.processor.off(callback)
|
||||
|
||||
async def _push(self, data: AudioFile):
|
||||
return await self.processor._push(data)
|
||||
|
||||
async def _flush(self):
|
||||
return await self.processor._flush()
|
||||
|
||||
|
||||
class TranscriptLineProcessor(Processor):
|
||||
"""
|
||||
Based on stream of transcript, assemble lines, remove duplicated words
|
||||
"""
|
||||
|
||||
INPUT_TYPE = Transcript
|
||||
OUTPUT_TYPE = Transcript
|
||||
|
||||
def __init__(self, max_text=1000, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.transcript = Transcript(words=[])
|
||||
self.max_text = max_text
|
||||
|
||||
async def _push(self, data: Transcript):
|
||||
# merge both transcript
|
||||
self.transcript.merge(data)
|
||||
|
||||
# check if a line is complete
|
||||
if "." not in self.transcript.text:
|
||||
# if the transcription text is still not too long, wait for more
|
||||
if len(self.transcript.text) < self.max_text:
|
||||
return
|
||||
|
||||
# cut to the next .
|
||||
partial = Transcript(words=[])
|
||||
for word in self.transcript.words[:]:
|
||||
partial.text += word.text
|
||||
partial.words.append(word)
|
||||
if "." not in word.text:
|
||||
continue
|
||||
|
||||
# emit line
|
||||
await self.emit(partial)
|
||||
|
||||
# create new transcript
|
||||
partial = Transcript(words=[])
|
||||
|
||||
self.transcript = partial
|
||||
|
||||
async def _flush(self):
|
||||
if self.transcript.words:
|
||||
await self.emit(self.transcript)
|
||||
|
||||
|
||||
class TitleSummaryProcessor(Processor):
|
||||
"""
|
||||
Detect topic and summary from the transcript
|
||||
"""
|
||||
|
||||
INPUT_TYPE = Transcript
|
||||
OUTPUT_TYPE = TitleSummary
|
||||
|
||||
async def _push(self, data: Transcript):
|
||||
param = TitleSummaryInput(transcribed_time=data.timestamp, input_text=data.text)
|
||||
|
||||
try:
|
||||
# TODO: abstract LLM implementation and parsing
|
||||
response = httpx.post(
|
||||
settings.LLM_URL, headers=param.headers, json=param.data
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
result = ParseLLMResult(param=param, output=response.json())
|
||||
summary = TitleSummary(title=result.title, summary=result.description)
|
||||
await self.emit(summary)
|
||||
|
||||
except httpx.ConnectError as e:
|
||||
self.logger.error(f"Failed to call llm: {e}")
|
||||
|
||||
except Exception:
|
||||
self.logger.exception("Failed to call llm")
|
||||
|
||||
|
||||
class Pipeline(Processor):
|
||||
"""
|
||||
A pipeline of processors
|
||||
"""
|
||||
|
||||
INPUT_TYPE = None
|
||||
OUTPUT_TYPE = None
|
||||
|
||||
def __init__(self, *processors: Processor):
|
||||
super().__init__()
|
||||
self.processors = processors
|
||||
|
||||
for processor in processors:
|
||||
processor.set_pipeline(self)
|
||||
|
||||
for i in range(len(processors) - 1):
|
||||
processors[i].connect(processors[i + 1])
|
||||
|
||||
self.INPUT_TYPE = processors[0].INPUT_TYPE
|
||||
self.OUTPUT_TYPE = processors[-1].OUTPUT_TYPE
|
||||
|
||||
async def _push(self, data):
|
||||
await self.processors[0].push(data)
|
||||
|
||||
async def _flush(self):
|
||||
for processor in self.processors:
|
||||
await processor.flush()
|
||||
|
||||
def describe(self, level=0):
|
||||
logger.info(" " * level + "Pipeline:")
|
||||
for processor in self.processors:
|
||||
processor.describe(level + 1)
|
||||
logger.info("")
|
||||
|
||||
|
||||
class FinalSummaryProcessor(Processor):
|
||||
"""
|
||||
Assemble all summary into a line-based json
|
||||
"""
|
||||
|
||||
INPUT_TYPE = TitleSummary
|
||||
OUTPUT_TYPE = Path
|
||||
|
||||
def __init__(self, filename: Path, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.filename = filename
|
||||
self.chunkcount = 0
|
||||
|
||||
async def _push(self, data: TitleSummary):
|
||||
with open(self.filename, "a", encoding="utf8") as fd:
|
||||
fd.write(json.dumps(data))
|
||||
self.chunkcount += 1
|
||||
|
||||
async def _flush(self):
|
||||
if self.chunkcount == 0:
|
||||
self.logger.warning("No summary to write")
|
||||
return
|
||||
self.logger.info(f"Writing to {self.filename}")
|
||||
await self.emit(self.filename)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("source", help="Source file (mp3, wav, mp4...)")
|
||||
args = parser.parse_args()
|
||||
|
||||
async def main():
|
||||
async def on_transcript(transcript):
|
||||
print(f"Transcript: [{transcript.human_timestamp}]: {transcript.text}")
|
||||
|
||||
async def on_summary(summary):
|
||||
print(f"Summary: {summary.title} - {summary.summary}")
|
||||
|
||||
async def on_final_summary(path):
|
||||
print(f"Final Summary: {path}")
|
||||
|
||||
# transcription output
|
||||
result_fn = Path(args.source).with_suffix(".jsonl")
|
||||
|
||||
pipeline = Pipeline(
|
||||
AudioChunkerProcessor(),
|
||||
AudioMergeProcessor(),
|
||||
AudioAutoTranscriptProcessor.as_threaded(),
|
||||
TranscriptLineProcessor(callback=on_transcript),
|
||||
TitleSummaryProcessor.as_threaded(callback=on_summary),
|
||||
FinalSummaryProcessor.as_threaded(
|
||||
filename=result_fn, callback=on_final_summary
|
||||
),
|
||||
)
|
||||
pipeline.describe()
|
||||
|
||||
# start processing audio
|
||||
logger.info(f"Opening {args.source}")
|
||||
container = av.open(args.source)
|
||||
try:
|
||||
logger.info("Start pushing audio into the pipeline")
|
||||
for frame in container.decode(audio=0):
|
||||
await pipeline.push(frame)
|
||||
finally:
|
||||
logger.info("Flushing the pipeline")
|
||||
await pipeline.flush()
|
||||
|
||||
logger.info("All done !")
|
||||
|
||||
asyncio.run(main())
|
||||
9
server/reflector/processors/__init__.py
Normal file
9
server/reflector/processors/__init__.py
Normal file
@@ -0,0 +1,9 @@
|
||||
from .base import Processor, ThreadedProcessor, Pipeline # noqa: F401
|
||||
from .types import AudioFile, Transcript, Word, TitleSummary # noqa: F401
|
||||
from .audio_chunker import AudioChunkerProcessor # noqa: F401
|
||||
from .audio_merge import AudioMergeProcessor # noqa: F401
|
||||
from .audio_transcript import AudioTranscriptProcessor # noqa: F401
|
||||
from .audio_transcript_auto import AudioTranscriptAutoProcessor # noqa: F401
|
||||
from .transcript_liner import TranscriptLinerProcessor # noqa: F401
|
||||
from .transcript_summarizer import TranscriptSummarizerProcessor # noqa: F401
|
||||
from .transcript_topic_detector import TranscriptTopicDetectorProcessor # noqa: F401
|
||||
27
server/reflector/processors/audio_chunker.py
Normal file
27
server/reflector/processors/audio_chunker.py
Normal file
@@ -0,0 +1,27 @@
|
||||
from reflector.processors.base import Processor
|
||||
import av
|
||||
|
||||
|
||||
class AudioChunkerProcessor(Processor):
|
||||
"""
|
||||
Assemble audio frames into chunks
|
||||
"""
|
||||
|
||||
INPUT_TYPE = av.AudioFrame
|
||||
OUTPUT_TYPE = list[av.AudioFrame]
|
||||
|
||||
def __init__(self, max_frames=256):
|
||||
super().__init__()
|
||||
self.frames: list[av.AudioFrame] = []
|
||||
self.max_frames = max_frames
|
||||
|
||||
async def _push(self, data: av.AudioFrame):
|
||||
self.frames.append(data)
|
||||
if len(self.frames) >= self.max_frames:
|
||||
await self.flush()
|
||||
|
||||
async def _flush(self):
|
||||
frames = self.frames[:]
|
||||
self.frames = []
|
||||
if frames:
|
||||
await self.emit(frames)
|
||||
47
server/reflector/processors/audio_merge.py
Normal file
47
server/reflector/processors/audio_merge.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from reflector.processors.base import Processor
|
||||
from reflector.processors.types import AudioFile
|
||||
from pathlib import Path
|
||||
import wave
|
||||
import av
|
||||
|
||||
|
||||
class AudioMergeProcessor(Processor):
|
||||
"""
|
||||
Merge audio frame into a single file
|
||||
"""
|
||||
|
||||
INPUT_TYPE = list[av.AudioFrame]
|
||||
OUTPUT_TYPE = AudioFile
|
||||
|
||||
async def _push(self, data: list[av.AudioFrame]):
|
||||
if not data:
|
||||
return
|
||||
|
||||
# get audio information from first frame
|
||||
frame = data[0]
|
||||
channels = len(frame.layout.channels)
|
||||
sample_rate = frame.sample_rate
|
||||
sample_width = frame.format.bytes
|
||||
|
||||
# create audio file
|
||||
from time import monotonic_ns
|
||||
from uuid import uuid4
|
||||
|
||||
uu = uuid4().hex
|
||||
path = Path(f"audio_{monotonic_ns()}_{uu}.wav")
|
||||
with wave.open(path.as_posix(), "wb") as wf:
|
||||
wf.setnchannels(channels)
|
||||
wf.setsampwidth(sample_width)
|
||||
wf.setframerate(sample_rate)
|
||||
for frame in data:
|
||||
wf.writeframes(frame.to_ndarray().tobytes())
|
||||
|
||||
# emit audio file
|
||||
audiofile = AudioFile(
|
||||
path=path,
|
||||
sample_rate=sample_rate,
|
||||
channels=channels,
|
||||
sample_width=sample_width,
|
||||
timestamp=data[0].pts * data[0].time_base,
|
||||
)
|
||||
await self.emit(audiofile)
|
||||
22
server/reflector/processors/audio_transcript.py
Normal file
22
server/reflector/processors/audio_transcript.py
Normal file
@@ -0,0 +1,22 @@
|
||||
from reflector.processors.base import Processor
|
||||
from reflector.processors.types import AudioFile, Transcript
|
||||
|
||||
|
||||
class AudioTranscriptProcessor(Processor):
|
||||
"""
|
||||
Transcript audio file
|
||||
"""
|
||||
|
||||
INPUT_TYPE = AudioFile
|
||||
OUTPUT_TYPE = Transcript
|
||||
|
||||
async def _push(self, data: AudioFile):
|
||||
try:
|
||||
result = await self._transcript(data)
|
||||
if result:
|
||||
await self.emit(result)
|
||||
finally:
|
||||
data.release()
|
||||
|
||||
async def _transcript(self, data: AudioFile):
|
||||
raise NotImplementedError
|
||||
35
server/reflector/processors/audio_transcript_auto.py
Normal file
35
server/reflector/processors/audio_transcript_auto.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from reflector.processors.base import Processor
|
||||
from reflector.processors.audio_transcript import AudioTranscriptProcessor
|
||||
from reflector.processors.audio_transcript_whisper import (
|
||||
AudioTranscriptWhisperProcessor,
|
||||
)
|
||||
from reflector.processors.types import AudioFile
|
||||
|
||||
|
||||
class AudioTranscriptAutoProcessor(AudioTranscriptProcessor):
|
||||
BACKENDS = {
|
||||
"whisper": AudioTranscriptWhisperProcessor,
|
||||
}
|
||||
BACKEND_DEFAULT = "whisper"
|
||||
|
||||
def __init__(self, backend=None, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.processor = self.BACKENDS[backend or self.BACKEND_DEFAULT]()
|
||||
|
||||
def connect(self, processor: Processor):
|
||||
self.processor.connect(processor)
|
||||
|
||||
def disconnect(self, processor: Processor):
|
||||
self.processor.disconnect(processor)
|
||||
|
||||
def on(self, callback):
|
||||
self.processor.on(callback)
|
||||
|
||||
def off(self, callback):
|
||||
self.processor.off(callback)
|
||||
|
||||
async def _push(self, data: AudioFile):
|
||||
return await self.processor._push(data)
|
||||
|
||||
async def _flush(self):
|
||||
return await self.processor._flush()
|
||||
38
server/reflector/processors/audio_transcript_whisper.py
Normal file
38
server/reflector/processors/audio_transcript_whisper.py
Normal file
@@ -0,0 +1,38 @@
|
||||
from reflector.processors.audio_transcript import AudioTranscriptProcessor
|
||||
from reflector.processors.types import AudioFile, Transcript, Word
|
||||
from faster_whisper import WhisperModel
|
||||
|
||||
|
||||
class AudioTranscriptWhisperProcessor(AudioTranscriptProcessor):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.model = WhisperModel(
|
||||
"tiny", device="cpu", compute_type="float32", num_workers=12
|
||||
)
|
||||
|
||||
async def _transcript(self, data: AudioFile):
|
||||
segments, _ = self.model.transcribe(
|
||||
data.path.as_posix(),
|
||||
language="en",
|
||||
beam_size=5,
|
||||
# condition_on_previous_text=True,
|
||||
word_timestamps=True,
|
||||
vad_filter=True,
|
||||
vad_parameters={"min_silence_duration_ms": 500},
|
||||
)
|
||||
|
||||
if not segments:
|
||||
return
|
||||
|
||||
transcript = Transcript(words=[])
|
||||
segments = list(segments)
|
||||
ts = data.timestamp
|
||||
|
||||
for segment in segments:
|
||||
transcript.text += segment.text
|
||||
for word in segment.words:
|
||||
transcript.words.append(
|
||||
Word(text=word.word, start=ts + word.start, end=ts + word.end)
|
||||
)
|
||||
|
||||
return transcript
|
||||
178
server/reflector/processors/base.py
Normal file
178
server/reflector/processors/base.py
Normal file
@@ -0,0 +1,178 @@
|
||||
from reflector.logger import logger
|
||||
from uuid import uuid4
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
import asyncio
|
||||
|
||||
|
||||
class Processor:
|
||||
INPUT_TYPE: type = None
|
||||
OUTPUT_TYPE: type = None
|
||||
|
||||
def __init__(self, callback=None, custom_logger=None):
|
||||
self._processors = []
|
||||
self._callbacks = []
|
||||
if callback:
|
||||
self.on(callback)
|
||||
self.uid = uuid4().hex
|
||||
self.logger = (custom_logger or logger).bind(processor=self.__class__.__name__)
|
||||
|
||||
def set_pipeline(self, pipeline: "Pipeline"):
|
||||
self.logger = self.logger.bind(pipeline=pipeline.uid)
|
||||
|
||||
def connect(self, processor: "Processor"):
|
||||
"""
|
||||
Connect this processor output to another processor
|
||||
"""
|
||||
if processor.INPUT_TYPE != self.OUTPUT_TYPE:
|
||||
raise ValueError(
|
||||
f"Processor {processor} input type {processor.INPUT_TYPE} "
|
||||
f"does not match {self.OUTPUT_TYPE}"
|
||||
)
|
||||
self._processors.append(processor)
|
||||
|
||||
def disconnect(self, processor: "Processor"):
|
||||
"""
|
||||
Disconnect this processor data from another processor
|
||||
"""
|
||||
self._processors.remove(processor)
|
||||
|
||||
def on(self, callback):
|
||||
"""
|
||||
Register a callback to be called when data is emitted
|
||||
"""
|
||||
# ensure callback is asynchronous
|
||||
if not asyncio.iscoroutinefunction(callback):
|
||||
raise ValueError("Callback must be a coroutine function")
|
||||
self._callbacks.append(callback)
|
||||
|
||||
def off(self, callback):
|
||||
"""
|
||||
Unregister a callback to be called when data is emitted
|
||||
"""
|
||||
self._callbacks.remove(callback)
|
||||
|
||||
async def emit(self, data):
|
||||
for callback in self._callbacks:
|
||||
await callback(data)
|
||||
for processor in self._processors:
|
||||
await processor.push(data)
|
||||
|
||||
async def push(self, data):
|
||||
"""
|
||||
Push data to this processor. `data` must be of type `INPUT_TYPE`
|
||||
The function returns the output of type `OUTPUT_TYPE`
|
||||
"""
|
||||
# logger.debug(f"{self.__class__.__name__} push")
|
||||
try:
|
||||
return await self._push(data)
|
||||
except Exception:
|
||||
self.logger.exception("Error in push")
|
||||
|
||||
async def flush(self):
|
||||
"""
|
||||
Flush data to this processor
|
||||
"""
|
||||
# logger.debug(f"{self.__class__.__name__} flush")
|
||||
return await self._flush()
|
||||
|
||||
def describe(self, level=0):
|
||||
logger.info(" " * level + self.__class__.__name__)
|
||||
|
||||
async def _push(self, data):
|
||||
raise NotImplementedError
|
||||
|
||||
async def _flush(self):
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def as_threaded(cls, *args, **kwargs):
|
||||
"""
|
||||
Return a single threaded processor where output is guaranteed
|
||||
to be in order
|
||||
"""
|
||||
return ThreadedProcessor(cls(*args, **kwargs), max_workers=1)
|
||||
|
||||
|
||||
class ThreadedProcessor(Processor):
|
||||
"""
|
||||
A processor that runs in a separate thread
|
||||
"""
|
||||
|
||||
def __init__(self, processor: Processor, max_workers=1):
|
||||
super().__init__()
|
||||
# FIXME: This is a hack to make sure that the processor is single threaded
|
||||
# but if it is more than 1, then we need to make sure that the processor
|
||||
# is emiting data in order
|
||||
assert max_workers == 1
|
||||
self.processor = processor
|
||||
self.INPUT_TYPE = processor.INPUT_TYPE
|
||||
self.OUTPUT_TYPE = processor.OUTPUT_TYPE
|
||||
self.executor = ThreadPoolExecutor(max_workers=max_workers)
|
||||
self.queue = asyncio.Queue()
|
||||
self.task = asyncio.get_running_loop().create_task(self.loop())
|
||||
|
||||
async def loop(self):
|
||||
while True:
|
||||
data = await self.queue.get()
|
||||
try:
|
||||
if data is None:
|
||||
await self.processor.flush()
|
||||
break
|
||||
await self.processor.push(data)
|
||||
finally:
|
||||
self.queue.task_done()
|
||||
|
||||
async def _push(self, data):
|
||||
await self.queue.put(data)
|
||||
|
||||
async def _flush(self):
|
||||
await self.queue.put(None)
|
||||
await self.queue.join()
|
||||
|
||||
def connect(self, processor: Processor):
|
||||
self.processor.connect(processor)
|
||||
|
||||
def disconnect(self, processor: Processor):
|
||||
self.processor.disconnect(processor)
|
||||
|
||||
def on(self, callback):
|
||||
self.processor.on(callback)
|
||||
|
||||
def describe(self, level=0):
|
||||
super().describe(level)
|
||||
self.processor.describe(level + 1)
|
||||
|
||||
|
||||
class Pipeline(Processor):
|
||||
"""
|
||||
A pipeline of processors
|
||||
"""
|
||||
|
||||
INPUT_TYPE = None
|
||||
OUTPUT_TYPE = None
|
||||
|
||||
def __init__(self, *processors: Processor):
|
||||
super().__init__()
|
||||
self.processors = processors
|
||||
|
||||
for processor in processors:
|
||||
processor.set_pipeline(self)
|
||||
|
||||
for i in range(len(processors) - 1):
|
||||
processors[i].connect(processors[i + 1])
|
||||
|
||||
self.INPUT_TYPE = processors[0].INPUT_TYPE
|
||||
self.OUTPUT_TYPE = processors[-1].OUTPUT_TYPE
|
||||
|
||||
async def _push(self, data):
|
||||
await self.processors[0].push(data)
|
||||
|
||||
async def _flush(self):
|
||||
for processor in self.processors:
|
||||
await processor.flush()
|
||||
|
||||
def describe(self, level=0):
|
||||
logger.info(" " * level + "Pipeline:")
|
||||
for processor in self.processors:
|
||||
processor.describe(level + 1)
|
||||
logger.info("")
|
||||
47
server/reflector/processors/transcript_liner.py
Normal file
47
server/reflector/processors/transcript_liner.py
Normal file
@@ -0,0 +1,47 @@
|
||||
from reflector.processors.base import Processor
|
||||
from reflector.processors.types import Transcript
|
||||
|
||||
|
||||
class TranscriptLinerProcessor(Processor):
|
||||
"""
|
||||
Based on stream of transcript, assemble and remove duplicated words
|
||||
then cut per lines.
|
||||
"""
|
||||
|
||||
INPUT_TYPE = Transcript
|
||||
OUTPUT_TYPE = Transcript
|
||||
|
||||
def __init__(self, max_text=1000, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.transcript = Transcript(words=[])
|
||||
self.max_text = max_text
|
||||
|
||||
async def _push(self, data: Transcript):
|
||||
# merge both transcript
|
||||
self.transcript.merge(data)
|
||||
|
||||
# check if a line is complete
|
||||
if "." not in self.transcript.text:
|
||||
# if the transcription text is still not too long, wait for more
|
||||
if len(self.transcript.text) < self.max_text:
|
||||
return
|
||||
|
||||
# cut to the next .
|
||||
partial = Transcript(words=[])
|
||||
for word in self.transcript.words[:]:
|
||||
partial.text += word.text
|
||||
partial.words.append(word)
|
||||
if "." not in word.text:
|
||||
continue
|
||||
|
||||
# emit line
|
||||
await self.emit(partial)
|
||||
|
||||
# create new transcript
|
||||
partial = Transcript(words=[])
|
||||
|
||||
self.transcript = partial
|
||||
|
||||
async def _flush(self):
|
||||
if self.transcript.words:
|
||||
await self.emit(self.transcript)
|
||||
31
server/reflector/processors/transcript_summarizer.py
Normal file
31
server/reflector/processors/transcript_summarizer.py
Normal file
@@ -0,0 +1,31 @@
|
||||
from reflector.processors.base import Processor
|
||||
from reflector.processors.types import TitleSummary
|
||||
from pathlib import Path
|
||||
import json
|
||||
|
||||
|
||||
class TranscriptSummarizerProcessor(Processor):
|
||||
"""
|
||||
Assemble all summary into a line-based json
|
||||
"""
|
||||
|
||||
INPUT_TYPE = TitleSummary
|
||||
OUTPUT_TYPE = Path
|
||||
|
||||
def __init__(self, filename: Path, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.filename = filename
|
||||
self.chunkcount = 0
|
||||
|
||||
async def _push(self, data: TitleSummary):
|
||||
with open(self.filename, "a", encoding="utf8") as fd:
|
||||
fd.write(json.dumps(data))
|
||||
self.chunkcount += 1
|
||||
|
||||
async def _flush(self):
|
||||
if self.chunkcount == 0:
|
||||
self.logger.warning("No summary to write")
|
||||
return
|
||||
self.logger.info(f"Writing to {self.filename}")
|
||||
await self.emit(self.filename)
|
||||
|
||||
48
server/reflector/processors/transcript_topic_detector.py
Normal file
48
server/reflector/processors/transcript_topic_detector.py
Normal file
@@ -0,0 +1,48 @@
|
||||
from reflector.processors.base import Processor
|
||||
from reflector.processors.types import Transcript, TitleSummary
|
||||
from reflector.llm import LLM
|
||||
|
||||
|
||||
class TranscriptTopicDetectorProcessor(Processor):
|
||||
"""
|
||||
Detect topic and summary from the transcript
|
||||
"""
|
||||
|
||||
INPUT_TYPE = Transcript
|
||||
OUTPUT_TYPE = TitleSummary
|
||||
|
||||
PROMPT = """
|
||||
### Human:
|
||||
Create a JSON object as response.The JSON object must have 2 fields:
|
||||
i) title and ii) summary.For the title field,generate a short title
|
||||
for the given text. For the summary field, summarize the given text
|
||||
in three sentences.
|
||||
|
||||
{input_text}
|
||||
|
||||
### Assistant:
|
||||
"""
|
||||
|
||||
def __init__(self, min_transcript_length=25, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.transcript = None
|
||||
self.min_transcript_length = min_transcript_length
|
||||
self.llm = LLM.instance()
|
||||
|
||||
async def _push(self, data: Transcript):
|
||||
if self.transcript is None:
|
||||
self.transcript = data
|
||||
else:
|
||||
self.transcript.merge(data)
|
||||
if len(self.transcript.text) < self.min_transcript_length:
|
||||
return
|
||||
await self.flush()
|
||||
|
||||
async def _flush(self):
|
||||
if not self.transcript:
|
||||
return
|
||||
prompt = self.PROMPT.format(input_text=self.transcript.text)
|
||||
result = await self.llm.generate(prompt=prompt)
|
||||
summary = TitleSummary(title=result["title"], summary=result["summary"])
|
||||
self.transcript = None
|
||||
await self.emit(summary)
|
||||
65
server/reflector/processors/types.py
Normal file
65
server/reflector/processors/types.py
Normal file
@@ -0,0 +1,65 @@
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@dataclass
|
||||
class AudioFile:
|
||||
path: Path
|
||||
sample_rate: int
|
||||
channels: int
|
||||
sample_width: int
|
||||
timestamp: float = 0.0
|
||||
|
||||
def release(self):
|
||||
self.path.unlink()
|
||||
|
||||
|
||||
@dataclass
|
||||
class Word:
|
||||
text: str
|
||||
start: float
|
||||
end: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class TitleSummary:
|
||||
title: str
|
||||
summary: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class Transcript:
|
||||
text: str = ""
|
||||
words: list[Word] = None
|
||||
|
||||
@property
|
||||
def human_timestamp(self):
|
||||
minutes = int(self.timestamp / 60)
|
||||
seconds = int(self.timestamp % 60)
|
||||
milliseconds = int((self.timestamp % 1) * 1000)
|
||||
return f"{minutes:02d}:{seconds:02d}.{milliseconds:03d}"
|
||||
|
||||
@property
|
||||
def timestamp(self):
|
||||
if not self.words:
|
||||
raise ValueError("No words in transcript")
|
||||
return self.words[0].start
|
||||
|
||||
@property
|
||||
def duration(self):
|
||||
if not self.words:
|
||||
raise ValueError("No words in transcript")
|
||||
return self.words[-1].end - self.words[0].start
|
||||
|
||||
def merge(self, other: "Transcript"):
|
||||
if not self.words:
|
||||
self.words = other.words
|
||||
else:
|
||||
self.words.extend(other.words)
|
||||
self.text += other.text
|
||||
|
||||
def clone(self):
|
||||
words = [
|
||||
Word(text=word.text, start=word.start, end=word.end) for word in self.words
|
||||
]
|
||||
return Transcript(text=self.text, words=words)
|
||||
61
server/reflector/tools/process.py
Normal file
61
server/reflector/tools/process.py
Normal file
@@ -0,0 +1,61 @@
|
||||
from pathlib import Path
|
||||
import av
|
||||
from reflector.logger import logger
|
||||
from reflector.processors import (
|
||||
Pipeline,
|
||||
AudioChunkerProcessor,
|
||||
AudioMergeProcessor,
|
||||
AudioTranscriptAutoProcessor,
|
||||
TranscriptLinerProcessor,
|
||||
TranscriptTopicDetectorProcessor,
|
||||
TranscriptSummarizerProcessor,
|
||||
)
|
||||
import asyncio
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("source", help="Source file (mp3, wav, mp4...)")
|
||||
args = parser.parse_args()
|
||||
|
||||
async def main():
|
||||
async def on_transcript(transcript):
|
||||
print(f"Transcript: [{transcript.human_timestamp}]: {transcript.text}")
|
||||
|
||||
async def on_summary(summary):
|
||||
print(f"Summary: {summary.title} - {summary.summary}")
|
||||
|
||||
async def on_final_summary(path):
|
||||
print(f"Final Summary: {path}")
|
||||
|
||||
# transcription output
|
||||
result_fn = Path(args.source).with_suffix(".jsonl")
|
||||
|
||||
pipeline = Pipeline(
|
||||
AudioChunkerProcessor(),
|
||||
AudioMergeProcessor(),
|
||||
AudioTranscriptAutoProcessor.as_threaded(),
|
||||
TranscriptLinerProcessor(callback=on_transcript),
|
||||
TranscriptTopicDetectorProcessor.as_threaded(callback=on_summary),
|
||||
TranscriptSummarizerProcessor.as_threaded(
|
||||
filename=result_fn, callback=on_final_summary
|
||||
),
|
||||
)
|
||||
pipeline.describe()
|
||||
|
||||
# start processing audio
|
||||
logger.info(f"Opening {args.source}")
|
||||
container = av.open(args.source)
|
||||
try:
|
||||
logger.info("Start pushing audio into the pipeline")
|
||||
for frame in container.decode(audio=0):
|
||||
await pipeline.push(frame)
|
||||
finally:
|
||||
logger.info("Flushing the pipeline")
|
||||
await pipeline.flush()
|
||||
|
||||
logger.info("All done !")
|
||||
|
||||
asyncio.run(main())
|
||||
@@ -9,10 +9,9 @@ from reflector.processors import (
|
||||
Pipeline,
|
||||
AudioChunkerProcessor,
|
||||
AudioMergeProcessor,
|
||||
AudioAutoTranscriptProcessor,
|
||||
TranscriptLineProcessor,
|
||||
TitleSummaryProcessor,
|
||||
# FinalSummaryProcessor,
|
||||
AudioTranscriptAutoProcessor,
|
||||
TranscriptLinerProcessor,
|
||||
TranscriptTopicDetectorProcessor,
|
||||
Transcript,
|
||||
TitleSummary,
|
||||
)
|
||||
@@ -74,9 +73,9 @@ async def rtc_offer(params: RtcOffer, request: Request):
|
||||
ctx.pipeline = Pipeline(
|
||||
AudioChunkerProcessor(),
|
||||
AudioMergeProcessor(),
|
||||
AudioAutoTranscriptProcessor.as_threaded(),
|
||||
TranscriptLineProcessor(callback=on_transcript),
|
||||
TitleSummaryProcessor.as_threaded(callback=on_summary),
|
||||
AudioTranscriptAutoProcessor.as_threaded(),
|
||||
TranscriptLinerProcessor(callback=on_transcript),
|
||||
TranscriptTopicDetectorProcessor.as_threaded(callback=on_summary),
|
||||
# FinalSummaryProcessor.as_threaded(
|
||||
# filename=result_fn, callback=on_final_summary
|
||||
# ),
|
||||
|
||||
Reference in New Issue
Block a user