From bc55cfdea38fe9ba4976ddc5bc50f20f6da6047b Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Tue, 1 Aug 2023 14:24:01 +0200 Subject: [PATCH] processors: split processors into their own files --- server/reflector/processors.py | 542 ------------------ server/reflector/processors/__init__.py | 9 + server/reflector/processors/audio_chunker.py | 27 + server/reflector/processors/audio_merge.py | 47 ++ .../reflector/processors/audio_transcript.py | 22 + .../processors/audio_transcript_auto.py | 35 ++ .../processors/audio_transcript_whisper.py | 38 ++ server/reflector/processors/base.py | 178 ++++++ .../reflector/processors/transcript_liner.py | 47 ++ .../processors/transcript_summarizer.py | 31 + .../processors/transcript_topic_detector.py | 48 ++ server/reflector/processors/types.py | 65 +++ server/reflector/tools/process.py | 61 ++ server/reflector/views/rtc_offer.py | 13 +- 14 files changed, 614 insertions(+), 549 deletions(-) delete mode 100644 server/reflector/processors.py create mode 100644 server/reflector/processors/__init__.py create mode 100644 server/reflector/processors/audio_chunker.py create mode 100644 server/reflector/processors/audio_merge.py create mode 100644 server/reflector/processors/audio_transcript.py create mode 100644 server/reflector/processors/audio_transcript_auto.py create mode 100644 server/reflector/processors/audio_transcript_whisper.py create mode 100644 server/reflector/processors/base.py create mode 100644 server/reflector/processors/transcript_liner.py create mode 100644 server/reflector/processors/transcript_summarizer.py create mode 100644 server/reflector/processors/transcript_topic_detector.py create mode 100644 server/reflector/processors/types.py create mode 100644 server/reflector/tools/process.py diff --git a/server/reflector/processors.py b/server/reflector/processors.py deleted file mode 100644 index 3827f068..00000000 --- a/server/reflector/processors.py +++ /dev/null @@ -1,542 +0,0 @@ -from pathlib import Path -import av -import wave -from dataclasses import dataclass -from faster_whisper import WhisperModel -from reflector.models import TitleSummaryInput, ParseLLMResult -from reflector.settings import settings -from reflector.logger import logger -import httpx -import asyncio -import json -from uuid import uuid4 -from concurrent.futures import ThreadPoolExecutor - - -@dataclass -class AudioFile: - path: Path - sample_rate: int - channels: int - sample_width: int - timestamp: float = 0.0 - - def release(self): - self.path.unlink() - - -@dataclass -class Word: - text: str - start: float - end: float - - -@dataclass -class TitleSummary: - title: str - summary: str - - -@dataclass -class Transcript: - text: str = "" - words: list[Word] = None - - @property - def human_timestamp(self): - minutes = int(self.timestamp / 60) - seconds = int(self.timestamp % 60) - milliseconds = int((self.timestamp % 1) * 1000) - return f"{minutes:02d}:{seconds:02d}.{milliseconds:03d}" - - @property - def timestamp(self): - if not self.words: - raise ValueError("No words in transcript") - return self.words[0].start - - @property - def duration(self): - if not self.words: - raise ValueError("No words in transcript") - return self.words[-1].end - self.words[0].start - - def merge(self, other: "Transcript"): - if not self.words: - self.words = other.words - else: - self.words.extend(other.words) - self.text += other.text - - -class Processor: - INPUT_TYPE: type = None - OUTPUT_TYPE: type = None - - def __init__(self, callback=None, custom_logger=None): - self._processors = [] - self._callbacks = [] - if callback: - self.on(callback) - self.uid = uuid4().hex - self.logger = (custom_logger or logger).bind(processor=self.__class__.__name__) - - def set_pipeline(self, pipeline: "Pipeline"): - self.logger = self.logger.bind(pipeline=pipeline.uid) - - def connect(self, processor: "Processor"): - """ - Connect this processor output to another processor - """ - if processor.INPUT_TYPE != self.OUTPUT_TYPE: - raise ValueError( - f"Processor {processor} input type {processor.INPUT_TYPE} " - f"does not match {self.OUTPUT_TYPE}" - ) - self._processors.append(processor) - - def disconnect(self, processor: "Processor"): - """ - Disconnect this processor data from another processor - """ - self._processors.remove(processor) - - def on(self, callback): - """ - Register a callback to be called when data is emitted - """ - # ensure callback is asynchronous - if not asyncio.iscoroutinefunction(callback): - raise ValueError("Callback must be a coroutine function") - self._callbacks.append(callback) - - def off(self, callback): - """ - Unregister a callback to be called when data is emitted - """ - self._callbacks.remove(callback) - - async def emit(self, data): - for callback in self._callbacks: - await callback(data) - for processor in self._processors: - await processor.push(data) - - async def push(self, data): - """ - Push data to this processor. `data` must be of type `INPUT_TYPE` - The function returns the output of type `OUTPUT_TYPE` - """ - # logger.debug(f"{self.__class__.__name__} push") - try: - return await self._push(data) - except Exception: - self.logger.exception("Error in push") - - async def flush(self): - """ - Flush data to this processor - """ - # logger.debug(f"{self.__class__.__name__} flush") - return await self._flush() - - def describe(self, level=0): - logger.info(" " * level + self.__class__.__name__) - - async def _push(self, data): - raise NotImplementedError - - async def _flush(self): - pass - - @classmethod - def as_threaded(cls, *args, **kwargs): - """ - Return a single threaded processor where output is guaranteed - to be in order - """ - return ThreadedProcessor(cls(*args, **kwargs), max_workers=1) - - -class ThreadedProcessor(Processor): - """ - A processor that runs in a separate thread - """ - - def __init__(self, processor: Processor, max_workers=1): - super().__init__() - # FIXME: This is a hack to make sure that the processor is single threaded - # but if it is more than 1, then we need to make sure that the processor - # is emiting data in order - assert max_workers == 1 - self.processor = processor - self.INPUT_TYPE = processor.INPUT_TYPE - self.OUTPUT_TYPE = processor.OUTPUT_TYPE - self.executor = ThreadPoolExecutor(max_workers=max_workers) - self.queue = asyncio.Queue() - self.task = asyncio.get_running_loop().create_task(self.loop()) - - async def loop(self): - while True: - data = await self.queue.get() - try: - if data is None: - await self.processor.flush() - break - await self.processor.push(data) - finally: - self.queue.task_done() - - async def _push(self, data): - await self.queue.put(data) - - async def _flush(self): - await self.queue.put(None) - await self.queue.join() - - def connect(self, processor: Processor): - self.processor.connect(processor) - - def disconnect(self, processor: Processor): - self.processor.disconnect(processor) - - def on(self, callback): - self.processor.on(callback) - - def describe(self, level=0): - super().describe(level) - self.processor.describe(level + 1) - - -class AudioChunkerProcessor(Processor): - """ - Assemble audio frames into chunks - """ - - INPUT_TYPE = av.AudioFrame - OUTPUT_TYPE = list[av.AudioFrame] - - def __init__(self, max_frames=256): - super().__init__() - self.frames: list[av.AudioFrame] = [] - self.max_frames = max_frames - - async def _push(self, data: av.AudioFrame): - self.frames.append(data) - if len(self.frames) >= self.max_frames: - await self.flush() - - async def _flush(self): - frames = self.frames[:] - self.frames = [] - if frames: - await self.emit(frames) - - -class AudioMergeProcessor(Processor): - """ - Merge audio frame into a single file - """ - - INPUT_TYPE = list[av.AudioFrame] - OUTPUT_TYPE = AudioFile - - async def _push(self, data: list[av.AudioFrame]): - if not data: - return - - # get audio information from first frame - frame = data[0] - channels = len(frame.layout.channels) - sample_rate = frame.sample_rate - sample_width = frame.format.bytes - - # create audio file - from time import monotonic_ns - from uuid import uuid4 - - uu = uuid4().hex - path = Path(f"audio_{monotonic_ns()}_{uu}.wav") - with wave.open(path.as_posix(), "wb") as wf: - wf.setnchannels(channels) - wf.setsampwidth(sample_width) - wf.setframerate(sample_rate) - for frame in data: - wf.writeframes(frame.to_ndarray().tobytes()) - - # emit audio file - audiofile = AudioFile( - path=path, - sample_rate=sample_rate, - channels=channels, - sample_width=sample_width, - timestamp=data[0].pts * data[0].time_base, - ) - await self.emit(audiofile) - - -class AudioTranscriptProcessor(Processor): - """ - Transcript audio file - """ - - INPUT_TYPE = AudioFile - OUTPUT_TYPE = Transcript - - async def _push(self, data: AudioFile): - try: - result = await self._transcript(data) - if result: - await self.emit(result) - finally: - data.release() - - async def _transcript(self, data: AudioFile): - raise NotImplementedError - - -class AudioWhisperTranscriptProcessor(AudioTranscriptProcessor): - def __init__(self): - super().__init__() - self.model = WhisperModel( - "tiny", device="cpu", compute_type="float32", num_workers=12 - ) - - async def _transcript(self, data: AudioFile): - segments, _ = self.model.transcribe( - data.path.as_posix(), - language="en", - beam_size=5, - # condition_on_previous_text=True, - word_timestamps=True, - vad_filter=True, - vad_parameters={"min_silence_duration_ms": 500}, - ) - - if not segments: - return - - transcript = Transcript(words=[]) - segments = list(segments) - ts = data.timestamp - - for segment in segments: - transcript.text += segment.text - for word in segment.words: - transcript.words.append( - Word(text=word.word, start=ts + word.start, end=ts + word.end) - ) - - return transcript - - -class AudioAutoTranscriptProcessor(AudioTranscriptProcessor): - BACKENDS = { - "whisper": AudioWhisperTranscriptProcessor, - } - BACKEND_DEFAULT = "whisper" - - def __init__(self, backend=None, **kwargs): - super().__init__(**kwargs) - self.processor = self.BACKENDS[backend or self.BACKEND_DEFAULT]() - - def connect(self, processor: Processor): - self.processor.connect(processor) - - def disconnect(self, processor: Processor): - self.processor.disconnect(processor) - - def on(self, callback): - self.processor.on(callback) - - def off(self, callback): - self.processor.off(callback) - - async def _push(self, data: AudioFile): - return await self.processor._push(data) - - async def _flush(self): - return await self.processor._flush() - - -class TranscriptLineProcessor(Processor): - """ - Based on stream of transcript, assemble lines, remove duplicated words - """ - - INPUT_TYPE = Transcript - OUTPUT_TYPE = Transcript - - def __init__(self, max_text=1000, **kwargs): - super().__init__(**kwargs) - self.transcript = Transcript(words=[]) - self.max_text = max_text - - async def _push(self, data: Transcript): - # merge both transcript - self.transcript.merge(data) - - # check if a line is complete - if "." not in self.transcript.text: - # if the transcription text is still not too long, wait for more - if len(self.transcript.text) < self.max_text: - return - - # cut to the next . - partial = Transcript(words=[]) - for word in self.transcript.words[:]: - partial.text += word.text - partial.words.append(word) - if "." not in word.text: - continue - - # emit line - await self.emit(partial) - - # create new transcript - partial = Transcript(words=[]) - - self.transcript = partial - - async def _flush(self): - if self.transcript.words: - await self.emit(self.transcript) - - -class TitleSummaryProcessor(Processor): - """ - Detect topic and summary from the transcript - """ - - INPUT_TYPE = Transcript - OUTPUT_TYPE = TitleSummary - - async def _push(self, data: Transcript): - param = TitleSummaryInput(transcribed_time=data.timestamp, input_text=data.text) - - try: - # TODO: abstract LLM implementation and parsing - response = httpx.post( - settings.LLM_URL, headers=param.headers, json=param.data - ) - response.raise_for_status() - - result = ParseLLMResult(param=param, output=response.json()) - summary = TitleSummary(title=result.title, summary=result.description) - await self.emit(summary) - - except httpx.ConnectError as e: - self.logger.error(f"Failed to call llm: {e}") - - except Exception: - self.logger.exception("Failed to call llm") - - -class Pipeline(Processor): - """ - A pipeline of processors - """ - - INPUT_TYPE = None - OUTPUT_TYPE = None - - def __init__(self, *processors: Processor): - super().__init__() - self.processors = processors - - for processor in processors: - processor.set_pipeline(self) - - for i in range(len(processors) - 1): - processors[i].connect(processors[i + 1]) - - self.INPUT_TYPE = processors[0].INPUT_TYPE - self.OUTPUT_TYPE = processors[-1].OUTPUT_TYPE - - async def _push(self, data): - await self.processors[0].push(data) - - async def _flush(self): - for processor in self.processors: - await processor.flush() - - def describe(self, level=0): - logger.info(" " * level + "Pipeline:") - for processor in self.processors: - processor.describe(level + 1) - logger.info("") - - -class FinalSummaryProcessor(Processor): - """ - Assemble all summary into a line-based json - """ - - INPUT_TYPE = TitleSummary - OUTPUT_TYPE = Path - - def __init__(self, filename: Path, **kwargs): - super().__init__(**kwargs) - self.filename = filename - self.chunkcount = 0 - - async def _push(self, data: TitleSummary): - with open(self.filename, "a", encoding="utf8") as fd: - fd.write(json.dumps(data)) - self.chunkcount += 1 - - async def _flush(self): - if self.chunkcount == 0: - self.logger.warning("No summary to write") - return - self.logger.info(f"Writing to {self.filename}") - await self.emit(self.filename) - - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser() - parser.add_argument("source", help="Source file (mp3, wav, mp4...)") - args = parser.parse_args() - - async def main(): - async def on_transcript(transcript): - print(f"Transcript: [{transcript.human_timestamp}]: {transcript.text}") - - async def on_summary(summary): - print(f"Summary: {summary.title} - {summary.summary}") - - async def on_final_summary(path): - print(f"Final Summary: {path}") - - # transcription output - result_fn = Path(args.source).with_suffix(".jsonl") - - pipeline = Pipeline( - AudioChunkerProcessor(), - AudioMergeProcessor(), - AudioAutoTranscriptProcessor.as_threaded(), - TranscriptLineProcessor(callback=on_transcript), - TitleSummaryProcessor.as_threaded(callback=on_summary), - FinalSummaryProcessor.as_threaded( - filename=result_fn, callback=on_final_summary - ), - ) - pipeline.describe() - - # start processing audio - logger.info(f"Opening {args.source}") - container = av.open(args.source) - try: - logger.info("Start pushing audio into the pipeline") - for frame in container.decode(audio=0): - await pipeline.push(frame) - finally: - logger.info("Flushing the pipeline") - await pipeline.flush() - - logger.info("All done !") - - asyncio.run(main()) diff --git a/server/reflector/processors/__init__.py b/server/reflector/processors/__init__.py new file mode 100644 index 00000000..847db231 --- /dev/null +++ b/server/reflector/processors/__init__.py @@ -0,0 +1,9 @@ +from .base import Processor, ThreadedProcessor, Pipeline # noqa: F401 +from .types import AudioFile, Transcript, Word, TitleSummary # noqa: F401 +from .audio_chunker import AudioChunkerProcessor # noqa: F401 +from .audio_merge import AudioMergeProcessor # noqa: F401 +from .audio_transcript import AudioTranscriptProcessor # noqa: F401 +from .audio_transcript_auto import AudioTranscriptAutoProcessor # noqa: F401 +from .transcript_liner import TranscriptLinerProcessor # noqa: F401 +from .transcript_summarizer import TranscriptSummarizerProcessor # noqa: F401 +from .transcript_topic_detector import TranscriptTopicDetectorProcessor # noqa: F401 diff --git a/server/reflector/processors/audio_chunker.py b/server/reflector/processors/audio_chunker.py new file mode 100644 index 00000000..8ca132a0 --- /dev/null +++ b/server/reflector/processors/audio_chunker.py @@ -0,0 +1,27 @@ +from reflector.processors.base import Processor +import av + + +class AudioChunkerProcessor(Processor): + """ + Assemble audio frames into chunks + """ + + INPUT_TYPE = av.AudioFrame + OUTPUT_TYPE = list[av.AudioFrame] + + def __init__(self, max_frames=256): + super().__init__() + self.frames: list[av.AudioFrame] = [] + self.max_frames = max_frames + + async def _push(self, data: av.AudioFrame): + self.frames.append(data) + if len(self.frames) >= self.max_frames: + await self.flush() + + async def _flush(self): + frames = self.frames[:] + self.frames = [] + if frames: + await self.emit(frames) diff --git a/server/reflector/processors/audio_merge.py b/server/reflector/processors/audio_merge.py new file mode 100644 index 00000000..ac16676d --- /dev/null +++ b/server/reflector/processors/audio_merge.py @@ -0,0 +1,47 @@ +from reflector.processors.base import Processor +from reflector.processors.types import AudioFile +from pathlib import Path +import wave +import av + + +class AudioMergeProcessor(Processor): + """ + Merge audio frame into a single file + """ + + INPUT_TYPE = list[av.AudioFrame] + OUTPUT_TYPE = AudioFile + + async def _push(self, data: list[av.AudioFrame]): + if not data: + return + + # get audio information from first frame + frame = data[0] + channels = len(frame.layout.channels) + sample_rate = frame.sample_rate + sample_width = frame.format.bytes + + # create audio file + from time import monotonic_ns + from uuid import uuid4 + + uu = uuid4().hex + path = Path(f"audio_{monotonic_ns()}_{uu}.wav") + with wave.open(path.as_posix(), "wb") as wf: + wf.setnchannels(channels) + wf.setsampwidth(sample_width) + wf.setframerate(sample_rate) + for frame in data: + wf.writeframes(frame.to_ndarray().tobytes()) + + # emit audio file + audiofile = AudioFile( + path=path, + sample_rate=sample_rate, + channels=channels, + sample_width=sample_width, + timestamp=data[0].pts * data[0].time_base, + ) + await self.emit(audiofile) diff --git a/server/reflector/processors/audio_transcript.py b/server/reflector/processors/audio_transcript.py new file mode 100644 index 00000000..708b959e --- /dev/null +++ b/server/reflector/processors/audio_transcript.py @@ -0,0 +1,22 @@ +from reflector.processors.base import Processor +from reflector.processors.types import AudioFile, Transcript + + +class AudioTranscriptProcessor(Processor): + """ + Transcript audio file + """ + + INPUT_TYPE = AudioFile + OUTPUT_TYPE = Transcript + + async def _push(self, data: AudioFile): + try: + result = await self._transcript(data) + if result: + await self.emit(result) + finally: + data.release() + + async def _transcript(self, data: AudioFile): + raise NotImplementedError diff --git a/server/reflector/processors/audio_transcript_auto.py b/server/reflector/processors/audio_transcript_auto.py new file mode 100644 index 00000000..0ece84f3 --- /dev/null +++ b/server/reflector/processors/audio_transcript_auto.py @@ -0,0 +1,35 @@ +from reflector.processors.base import Processor +from reflector.processors.audio_transcript import AudioTranscriptProcessor +from reflector.processors.audio_transcript_whisper import ( + AudioTranscriptWhisperProcessor, +) +from reflector.processors.types import AudioFile + + +class AudioTranscriptAutoProcessor(AudioTranscriptProcessor): + BACKENDS = { + "whisper": AudioTranscriptWhisperProcessor, + } + BACKEND_DEFAULT = "whisper" + + def __init__(self, backend=None, **kwargs): + super().__init__(**kwargs) + self.processor = self.BACKENDS[backend or self.BACKEND_DEFAULT]() + + def connect(self, processor: Processor): + self.processor.connect(processor) + + def disconnect(self, processor: Processor): + self.processor.disconnect(processor) + + def on(self, callback): + self.processor.on(callback) + + def off(self, callback): + self.processor.off(callback) + + async def _push(self, data: AudioFile): + return await self.processor._push(data) + + async def _flush(self): + return await self.processor._flush() diff --git a/server/reflector/processors/audio_transcript_whisper.py b/server/reflector/processors/audio_transcript_whisper.py new file mode 100644 index 00000000..9a85e9bf --- /dev/null +++ b/server/reflector/processors/audio_transcript_whisper.py @@ -0,0 +1,38 @@ +from reflector.processors.audio_transcript import AudioTranscriptProcessor +from reflector.processors.types import AudioFile, Transcript, Word +from faster_whisper import WhisperModel + + +class AudioTranscriptWhisperProcessor(AudioTranscriptProcessor): + def __init__(self): + super().__init__() + self.model = WhisperModel( + "tiny", device="cpu", compute_type="float32", num_workers=12 + ) + + async def _transcript(self, data: AudioFile): + segments, _ = self.model.transcribe( + data.path.as_posix(), + language="en", + beam_size=5, + # condition_on_previous_text=True, + word_timestamps=True, + vad_filter=True, + vad_parameters={"min_silence_duration_ms": 500}, + ) + + if not segments: + return + + transcript = Transcript(words=[]) + segments = list(segments) + ts = data.timestamp + + for segment in segments: + transcript.text += segment.text + for word in segment.words: + transcript.words.append( + Word(text=word.word, start=ts + word.start, end=ts + word.end) + ) + + return transcript diff --git a/server/reflector/processors/base.py b/server/reflector/processors/base.py new file mode 100644 index 00000000..8e616364 --- /dev/null +++ b/server/reflector/processors/base.py @@ -0,0 +1,178 @@ +from reflector.logger import logger +from uuid import uuid4 +from concurrent.futures import ThreadPoolExecutor +import asyncio + + +class Processor: + INPUT_TYPE: type = None + OUTPUT_TYPE: type = None + + def __init__(self, callback=None, custom_logger=None): + self._processors = [] + self._callbacks = [] + if callback: + self.on(callback) + self.uid = uuid4().hex + self.logger = (custom_logger or logger).bind(processor=self.__class__.__name__) + + def set_pipeline(self, pipeline: "Pipeline"): + self.logger = self.logger.bind(pipeline=pipeline.uid) + + def connect(self, processor: "Processor"): + """ + Connect this processor output to another processor + """ + if processor.INPUT_TYPE != self.OUTPUT_TYPE: + raise ValueError( + f"Processor {processor} input type {processor.INPUT_TYPE} " + f"does not match {self.OUTPUT_TYPE}" + ) + self._processors.append(processor) + + def disconnect(self, processor: "Processor"): + """ + Disconnect this processor data from another processor + """ + self._processors.remove(processor) + + def on(self, callback): + """ + Register a callback to be called when data is emitted + """ + # ensure callback is asynchronous + if not asyncio.iscoroutinefunction(callback): + raise ValueError("Callback must be a coroutine function") + self._callbacks.append(callback) + + def off(self, callback): + """ + Unregister a callback to be called when data is emitted + """ + self._callbacks.remove(callback) + + async def emit(self, data): + for callback in self._callbacks: + await callback(data) + for processor in self._processors: + await processor.push(data) + + async def push(self, data): + """ + Push data to this processor. `data` must be of type `INPUT_TYPE` + The function returns the output of type `OUTPUT_TYPE` + """ + # logger.debug(f"{self.__class__.__name__} push") + try: + return await self._push(data) + except Exception: + self.logger.exception("Error in push") + + async def flush(self): + """ + Flush data to this processor + """ + # logger.debug(f"{self.__class__.__name__} flush") + return await self._flush() + + def describe(self, level=0): + logger.info(" " * level + self.__class__.__name__) + + async def _push(self, data): + raise NotImplementedError + + async def _flush(self): + pass + + @classmethod + def as_threaded(cls, *args, **kwargs): + """ + Return a single threaded processor where output is guaranteed + to be in order + """ + return ThreadedProcessor(cls(*args, **kwargs), max_workers=1) + + +class ThreadedProcessor(Processor): + """ + A processor that runs in a separate thread + """ + + def __init__(self, processor: Processor, max_workers=1): + super().__init__() + # FIXME: This is a hack to make sure that the processor is single threaded + # but if it is more than 1, then we need to make sure that the processor + # is emiting data in order + assert max_workers == 1 + self.processor = processor + self.INPUT_TYPE = processor.INPUT_TYPE + self.OUTPUT_TYPE = processor.OUTPUT_TYPE + self.executor = ThreadPoolExecutor(max_workers=max_workers) + self.queue = asyncio.Queue() + self.task = asyncio.get_running_loop().create_task(self.loop()) + + async def loop(self): + while True: + data = await self.queue.get() + try: + if data is None: + await self.processor.flush() + break + await self.processor.push(data) + finally: + self.queue.task_done() + + async def _push(self, data): + await self.queue.put(data) + + async def _flush(self): + await self.queue.put(None) + await self.queue.join() + + def connect(self, processor: Processor): + self.processor.connect(processor) + + def disconnect(self, processor: Processor): + self.processor.disconnect(processor) + + def on(self, callback): + self.processor.on(callback) + + def describe(self, level=0): + super().describe(level) + self.processor.describe(level + 1) + + +class Pipeline(Processor): + """ + A pipeline of processors + """ + + INPUT_TYPE = None + OUTPUT_TYPE = None + + def __init__(self, *processors: Processor): + super().__init__() + self.processors = processors + + for processor in processors: + processor.set_pipeline(self) + + for i in range(len(processors) - 1): + processors[i].connect(processors[i + 1]) + + self.INPUT_TYPE = processors[0].INPUT_TYPE + self.OUTPUT_TYPE = processors[-1].OUTPUT_TYPE + + async def _push(self, data): + await self.processors[0].push(data) + + async def _flush(self): + for processor in self.processors: + await processor.flush() + + def describe(self, level=0): + logger.info(" " * level + "Pipeline:") + for processor in self.processors: + processor.describe(level + 1) + logger.info("") diff --git a/server/reflector/processors/transcript_liner.py b/server/reflector/processors/transcript_liner.py new file mode 100644 index 00000000..cca5e6a2 --- /dev/null +++ b/server/reflector/processors/transcript_liner.py @@ -0,0 +1,47 @@ +from reflector.processors.base import Processor +from reflector.processors.types import Transcript + + +class TranscriptLinerProcessor(Processor): + """ + Based on stream of transcript, assemble and remove duplicated words + then cut per lines. + """ + + INPUT_TYPE = Transcript + OUTPUT_TYPE = Transcript + + def __init__(self, max_text=1000, **kwargs): + super().__init__(**kwargs) + self.transcript = Transcript(words=[]) + self.max_text = max_text + + async def _push(self, data: Transcript): + # merge both transcript + self.transcript.merge(data) + + # check if a line is complete + if "." not in self.transcript.text: + # if the transcription text is still not too long, wait for more + if len(self.transcript.text) < self.max_text: + return + + # cut to the next . + partial = Transcript(words=[]) + for word in self.transcript.words[:]: + partial.text += word.text + partial.words.append(word) + if "." not in word.text: + continue + + # emit line + await self.emit(partial) + + # create new transcript + partial = Transcript(words=[]) + + self.transcript = partial + + async def _flush(self): + if self.transcript.words: + await self.emit(self.transcript) diff --git a/server/reflector/processors/transcript_summarizer.py b/server/reflector/processors/transcript_summarizer.py new file mode 100644 index 00000000..4e149602 --- /dev/null +++ b/server/reflector/processors/transcript_summarizer.py @@ -0,0 +1,31 @@ +from reflector.processors.base import Processor +from reflector.processors.types import TitleSummary +from pathlib import Path +import json + + +class TranscriptSummarizerProcessor(Processor): + """ + Assemble all summary into a line-based json + """ + + INPUT_TYPE = TitleSummary + OUTPUT_TYPE = Path + + def __init__(self, filename: Path, **kwargs): + super().__init__(**kwargs) + self.filename = filename + self.chunkcount = 0 + + async def _push(self, data: TitleSummary): + with open(self.filename, "a", encoding="utf8") as fd: + fd.write(json.dumps(data)) + self.chunkcount += 1 + + async def _flush(self): + if self.chunkcount == 0: + self.logger.warning("No summary to write") + return + self.logger.info(f"Writing to {self.filename}") + await self.emit(self.filename) + diff --git a/server/reflector/processors/transcript_topic_detector.py b/server/reflector/processors/transcript_topic_detector.py new file mode 100644 index 00000000..31a88882 --- /dev/null +++ b/server/reflector/processors/transcript_topic_detector.py @@ -0,0 +1,48 @@ +from reflector.processors.base import Processor +from reflector.processors.types import Transcript, TitleSummary +from reflector.llm import LLM + + +class TranscriptTopicDetectorProcessor(Processor): + """ + Detect topic and summary from the transcript + """ + + INPUT_TYPE = Transcript + OUTPUT_TYPE = TitleSummary + + PROMPT = """ + ### Human: + Create a JSON object as response.The JSON object must have 2 fields: + i) title and ii) summary.For the title field,generate a short title + for the given text. For the summary field, summarize the given text + in three sentences. + + {input_text} + + ### Assistant: + """ + + def __init__(self, min_transcript_length=25, **kwargs): + super().__init__(**kwargs) + self.transcript = None + self.min_transcript_length = min_transcript_length + self.llm = LLM.instance() + + async def _push(self, data: Transcript): + if self.transcript is None: + self.transcript = data + else: + self.transcript.merge(data) + if len(self.transcript.text) < self.min_transcript_length: + return + await self.flush() + + async def _flush(self): + if not self.transcript: + return + prompt = self.PROMPT.format(input_text=self.transcript.text) + result = await self.llm.generate(prompt=prompt) + summary = TitleSummary(title=result["title"], summary=result["summary"]) + self.transcript = None + await self.emit(summary) diff --git a/server/reflector/processors/types.py b/server/reflector/processors/types.py new file mode 100644 index 00000000..1a89c127 --- /dev/null +++ b/server/reflector/processors/types.py @@ -0,0 +1,65 @@ +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class AudioFile: + path: Path + sample_rate: int + channels: int + sample_width: int + timestamp: float = 0.0 + + def release(self): + self.path.unlink() + + +@dataclass +class Word: + text: str + start: float + end: float + + +@dataclass +class TitleSummary: + title: str + summary: str + + +@dataclass +class Transcript: + text: str = "" + words: list[Word] = None + + @property + def human_timestamp(self): + minutes = int(self.timestamp / 60) + seconds = int(self.timestamp % 60) + milliseconds = int((self.timestamp % 1) * 1000) + return f"{minutes:02d}:{seconds:02d}.{milliseconds:03d}" + + @property + def timestamp(self): + if not self.words: + raise ValueError("No words in transcript") + return self.words[0].start + + @property + def duration(self): + if not self.words: + raise ValueError("No words in transcript") + return self.words[-1].end - self.words[0].start + + def merge(self, other: "Transcript"): + if not self.words: + self.words = other.words + else: + self.words.extend(other.words) + self.text += other.text + + def clone(self): + words = [ + Word(text=word.text, start=word.start, end=word.end) for word in self.words + ] + return Transcript(text=self.text, words=words) diff --git a/server/reflector/tools/process.py b/server/reflector/tools/process.py new file mode 100644 index 00000000..aefbc153 --- /dev/null +++ b/server/reflector/tools/process.py @@ -0,0 +1,61 @@ +from pathlib import Path +import av +from reflector.logger import logger +from reflector.processors import ( + Pipeline, + AudioChunkerProcessor, + AudioMergeProcessor, + AudioTranscriptAutoProcessor, + TranscriptLinerProcessor, + TranscriptTopicDetectorProcessor, + TranscriptSummarizerProcessor, +) +import asyncio + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("source", help="Source file (mp3, wav, mp4...)") + args = parser.parse_args() + + async def main(): + async def on_transcript(transcript): + print(f"Transcript: [{transcript.human_timestamp}]: {transcript.text}") + + async def on_summary(summary): + print(f"Summary: {summary.title} - {summary.summary}") + + async def on_final_summary(path): + print(f"Final Summary: {path}") + + # transcription output + result_fn = Path(args.source).with_suffix(".jsonl") + + pipeline = Pipeline( + AudioChunkerProcessor(), + AudioMergeProcessor(), + AudioTranscriptAutoProcessor.as_threaded(), + TranscriptLinerProcessor(callback=on_transcript), + TranscriptTopicDetectorProcessor.as_threaded(callback=on_summary), + TranscriptSummarizerProcessor.as_threaded( + filename=result_fn, callback=on_final_summary + ), + ) + pipeline.describe() + + # start processing audio + logger.info(f"Opening {args.source}") + container = av.open(args.source) + try: + logger.info("Start pushing audio into the pipeline") + for frame in container.decode(audio=0): + await pipeline.push(frame) + finally: + logger.info("Flushing the pipeline") + await pipeline.flush() + + logger.info("All done !") + + asyncio.run(main()) diff --git a/server/reflector/views/rtc_offer.py b/server/reflector/views/rtc_offer.py index 3976d3e2..03f9decf 100644 --- a/server/reflector/views/rtc_offer.py +++ b/server/reflector/views/rtc_offer.py @@ -9,10 +9,9 @@ from reflector.processors import ( Pipeline, AudioChunkerProcessor, AudioMergeProcessor, - AudioAutoTranscriptProcessor, - TranscriptLineProcessor, - TitleSummaryProcessor, - # FinalSummaryProcessor, + AudioTranscriptAutoProcessor, + TranscriptLinerProcessor, + TranscriptTopicDetectorProcessor, Transcript, TitleSummary, ) @@ -74,9 +73,9 @@ async def rtc_offer(params: RtcOffer, request: Request): ctx.pipeline = Pipeline( AudioChunkerProcessor(), AudioMergeProcessor(), - AudioAutoTranscriptProcessor.as_threaded(), - TranscriptLineProcessor(callback=on_transcript), - TitleSummaryProcessor.as_threaded(callback=on_summary), + AudioTranscriptAutoProcessor.as_threaded(), + TranscriptLinerProcessor(callback=on_transcript), + TranscriptTopicDetectorProcessor.as_threaded(callback=on_summary), # FinalSummaryProcessor.as_threaded( # filename=result_fn, callback=on_final_summary # ),