From 509840cb4ceeb7997f155f86fa8eaaf2aa729f32 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Fri, 28 Jul 2023 20:08:17 +0200 Subject: [PATCH 01/12] processors: Introduce processors implementation Each processor is standalone, with define INPUT/OUTPUT. Processor can be threaded or not (can be extensible later) TODO: Pipeline that automatically connect all processors, flush and clean data To test: python -m reflector.processors tests/records/test_mathieu_hello.wav ``` Transcript: [00:00.500]: Hi there, everyone. Transcript: [00:02.700]: Today, I want to share my incredible experience. Transcript: [00:05.461]: with Reflector, a cutineage product that revolutionizes audio processing. Transcript: [00:10.922]: With Refector, I can easily convert any audio into accurate transcription. Transcript: [00:16.493]: serving me hours of tedious manual work. ``` This is not a good transcript, but not the purpose here. --- server/.gitignore | 1 + server/reflector/processors.py | 396 +++++++++++++++++++++++++++++++++ 2 files changed, 397 insertions(+) create mode 100644 server/reflector/processors.py diff --git a/server/.gitignore b/server/.gitignore index 06f526b4..6bf35f5e 100644 --- a/server/.gitignore +++ b/server/.gitignore @@ -174,3 +174,4 @@ test_samples/ .DS_Store .vscode/ artefacts/ +audio_*.wav diff --git a/server/reflector/processors.py b/server/reflector/processors.py new file mode 100644 index 00000000..a1f9cc4d --- /dev/null +++ b/server/reflector/processors.py @@ -0,0 +1,396 @@ +from pathlib import Path +import av +import wave +from dataclasses import dataclass +from faster_whisper import WhisperModel +from reflector.models import TitleSummaryInput, ParseLLMResult +from reflector.settings import settings +from reflector.logger import logger +import httpx +import asyncio +from concurrent.futures import ThreadPoolExecutor + + +@dataclass +class AudioFile: + path: Path + sample_rate: int + channels: int + sample_width: int + timestamp: float = 0.0 + + +@dataclass +class Word: + text: str + start: float + end: float + + +@dataclass +class TitleSummary: + title: str + summary: str + + +@dataclass +class Transcript: + text: str = "" + words: list[Word] = None + + @property + def human_timestamp(self): + minutes = int(self.timestamp / 60) + seconds = int(self.timestamp % 60) + milliseconds = int((self.timestamp % 1) * 1000) + return f"{minutes:02d}:{seconds:02d}.{milliseconds:03d}" + + @property + def timestamp(self): + if not self.words: + raise ValueError("No words in transcript") + return self.words[0].start + + @property + def duration(self): + if not self.words: + raise ValueError("No words in transcript") + return self.words[-1].end - self.words[0].start + + def merge(self, other: "Transcript"): + if not self.words: + self.words = other.words + else: + self.words.extend(other.words) + self.text += other.text + + +class Processor: + INPUT_TYPE: type = None + OUTPUT_TYPE: type = None + + def __init__(self): + self._processors = [] + self._callbacks = [] + + def connect(self, processor: "Processor"): + if processor.INPUT_TYPE != self.OUTPUT_TYPE: + raise ValueError( + f"Processor {processor} input type {processor.INPUT_TYPE} " + f"does not match {self.OUTPUT_TYPE}" + ) + self._processors.append(processor) + + def disconnect(self, processor: "Processor"): + self._processors.remove(processor) + + def on(self, callback): + self._callbacks.append(callback) + + def off(self, callback): + self._callbacks.remove(callback) + + async def emit(self, data): + for callback in self._callbacks: + if isinstance(data, AudioFile): + import pdb; pdb.set_trace() + await callback(data) + for processor in self._processors: + await processor.push(data) + + async def push(self, data): + # logger.debug(f"{self.__class__.__name__} push") + return await self._push(data) + + async def flush(self): + # logger.debug(f"{self.__class__.__name__} flush") + return await self._flush() + + async def _push(self, data): + raise NotImplementedError + + async def _flush(self): + pass + + @classmethod + def as_threaded(cls, *args, **kwargs): + return ThreadedProcessor(cls(*args, **kwargs), max_workers=1) + + +class ThreadedProcessor(Processor): + def __init__(self, processor: Processor, max_workers=1): + super().__init__() + self.processor = processor + self.INPUT_TYPE = processor.INPUT_TYPE + self.OUTPUT_TYPE = processor.OUTPUT_TYPE + self.executor = ThreadPoolExecutor(max_workers=max_workers) + self.queue = asyncio.Queue() + self.task = asyncio.get_running_loop().create_task(self.loop()) + + async def loop(self): + while True: + data = await self.queue.get() + try: + if data is None: + await self.processor.flush() + break + await self.processor.push(data) + finally: + self.queue.task_done() + + async def _push(self, data): + await self.queue.put(data) + + async def _flush(self): + await self.queue.put(None) + await self.queue.join() + + def connect(self, processor: Processor): + self.processor.connect(processor) + + def disconnect(self, processor: Processor): + self.processor.disconnect(processor) + + def on(self, callback): + self.processor.on(callback) + + +class AudioChunkerProcessor(Processor): + """ + Assemble audio frames into chunks + """ + + INPUT_TYPE = av.AudioFrame + OUTPUT_TYPE = list[av.AudioFrame] + + def __init__(self, max_frames=256): + super().__init__() + self.frames: list[av.AudioFrame] = [] + self.max_frames = max_frames + + async def _push(self, data: av.AudioFrame): + self.frames.append(data) + if len(self.frames) >= self.max_frames: + await self.flush() + + async def _flush(self): + frames = self.frames[:] + self.frames = [] + if frames: + await self.emit(frames) + + +class AudioMergeProcessor(Processor): + """ + Merge audio frame into a single file + """ + + INPUT_TYPE = list[av.AudioFrame] + OUTPUT_TYPE = AudioFile + + async def _push(self, data: list[av.AudioFrame]): + if not data: + return + + # get audio information from first frame + frame = data[0] + channels = len(frame.layout.channels) + sample_rate = frame.sample_rate + sample_width = frame.format.bytes + + # create audio file + from time import monotonic_ns + from uuid import uuid4 + + uu = uuid4().hex + path = Path(f"audio_{monotonic_ns()}_{uu}.wav") + with wave.open(path.as_posix(), "wb") as wf: + wf.setnchannels(channels) + wf.setsampwidth(sample_width) + wf.setframerate(sample_rate) + for frame in data: + wf.writeframes(frame.to_ndarray().tobytes()) + + # emit audio file + audiofile = AudioFile( + path=path, + sample_rate=sample_rate, + channels=channels, + sample_width=sample_width, + timestamp=data[0].pts * data[0].time_base, + ) + await self.emit(audiofile) + + +class AudioTranscriptProcessor(Processor): + """ + Transcript audio file + """ + + INPUT_TYPE = AudioFile + OUTPUT_TYPE = Transcript + + async def _push(self, data: AudioFile): + result = await self._transcript(data) + if result: + await self.emit(result) + + async def _transcript(self, data: AudioFile): + raise NotImplementedError + + +class AudioWhisperTranscriptProcessor(AudioTranscriptProcessor): + def __init__(self): + super().__init__() + self.model = WhisperModel( + "tiny", device="cpu", compute_type="float32", num_workers=12 + ) + + async def _transcript(self, data: AudioFile): + segments, _ = self.model.transcribe( + data.path.as_posix(), + language="en", + beam_size=5, + # condition_on_previous_text=True, + word_timestamps=True, + vad_filter=True, + vad_parameters={"min_silence_duration_ms": 500}, + ) + + if not segments: + return + + transcript = Transcript(words=[]) + segments = list(segments) + ts = data.timestamp + + for segment in segments: + transcript.text += segment.text + for word in segment.words: + transcript.words.append( + Word(text=word.word, start=ts + word.start, end=ts + word.end) + ) + + return transcript + + +class TranscriptLineProcessor(Processor): + """ + Based on stream of transcript, assemble lines, remove duplicated words + """ + + INPUT_TYPE = Transcript + OUTPUT_TYPE = Transcript + + def __init__(self, max_text=1000): + super().__init__() + self.transcript = Transcript(words=[]) + self.max_text = max_text + + async def _push(self, data: Transcript): + # merge both transcript + self.transcript.merge(data) + + # check if a line is complete + if "." not in self.transcript.text: + # if the transcription text is still not too long, wait for more + if len(self.transcript.text) < self.max_text: + return + + # cut to the next . + partial = Transcript(words=[]) + for word in self.transcript.words[:]: + partial.text += word.text + partial.words.append(word) + if "." not in word.text: + continue + + # emit line + await self.emit(partial) + + # create new transcript + partial = Transcript(words=[]) + + self.transcript = partial + + async def _flush(self): + if self.transcript.words: + await self.emit(self.transcript) + + +class TitleSummaryProcessor(Processor): + """ + Detect topic and summary from the transcript + """ + + INPUT_TYPE = Transcript + OUTPUT_TYPE = TitleSummary + + async def _push(self, data: Transcript): + param = TitleSummaryInput(transcribed_time=data.timestamp, input_text=data.text) + + try: + # TODO: abstract LLM implementation and parsing + response = httpx.post( + settings.LLM_URL, headers=param.headers, json=param.data + ) + response.raise_for_status() + + result = ParseLLMResult(param=param, output=response.json()) + summary = TitleSummary(title=result.title, summary=result.description) + await self.emit(summary) + + except Exception: + logger.exception("Failed to call llm") + return + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("source", help="Source file (mp3, wav, mp4...)") + args = parser.parse_args() + + async def main(): + chunker = AudioChunkerProcessor() + + # merge audio + merger = AudioMergeProcessor.as_threaded() + chunker.connect(merger) + + # transcript audio + transcripter = AudioWhisperTranscriptProcessor() + merger.connect(transcripter) + + # merge transcript and output lines + line_processor = TranscriptLineProcessor() + transcripter.connect(line_processor) + + async def on_transcript(transcript): + print(f"Transcript: [{transcript.human_timestamp}]: {transcript.text}") + + line_processor.on(on_transcript) + + # # title and summary + # title_summary = TitleSummaryProcessor.as_threaded() + # line_processor.connect(title_summary) + # + # async def on_summary(summary): + # print(f"Summary: title={summary.title} summary={summary.summary}") + # + # title_summary.on(on_summary) + + # start processing audio + container = av.open(args.source) + for frame in container.decode(audio=0): + await chunker.push(frame) + + # audio done, flush everything + await chunker.flush() + await merger.flush() + await transcripter.flush() + await line_processor.flush() + # await title_summary.flush() + + asyncio.run(main()) From 6f6186313611af5a372cec8b62a8b2ed0c01b008 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Sat, 29 Jul 2023 00:59:09 +0200 Subject: [PATCH 02/12] processors: implement Pipeline, simplify usage --- server/reflector/processors.py | 163 +++++++++++++++++++++++++-------- 1 file changed, 123 insertions(+), 40 deletions(-) diff --git a/server/reflector/processors.py b/server/reflector/processors.py index a1f9cc4d..8d26fcb7 100644 --- a/server/reflector/processors.py +++ b/server/reflector/processors.py @@ -8,6 +8,7 @@ from reflector.settings import settings from reflector.logger import logger import httpx import asyncio +import json from concurrent.futures import ThreadPoolExecutor @@ -19,6 +20,9 @@ class AudioFile: sample_width: int timestamp: float = 0.0 + def release(self): + self.path.unlink() + @dataclass class Word: @@ -69,11 +73,16 @@ class Processor: INPUT_TYPE: type = None OUTPUT_TYPE: type = None - def __init__(self): + def __init__(self, callback=None): self._processors = [] self._callbacks = [] + if callback: + self.on(callback) def connect(self, processor: "Processor"): + """ + Connect this processor output to another processor + """ if processor.INPUT_TYPE != self.OUTPUT_TYPE: raise ValueError( f"Processor {processor} input type {processor.INPUT_TYPE} " @@ -82,27 +91,41 @@ class Processor: self._processors.append(processor) def disconnect(self, processor: "Processor"): + """ + Disconnect this processor data from another processor + """ self._processors.remove(processor) def on(self, callback): + """ + Register a callback to be called when data is emitted + """ self._callbacks.append(callback) def off(self, callback): + """ + Unregister a callback to be called when data is emitted + """ self._callbacks.remove(callback) async def emit(self, data): for callback in self._callbacks: - if isinstance(data, AudioFile): - import pdb; pdb.set_trace() await callback(data) for processor in self._processors: await processor.push(data) async def push(self, data): + """ + Push data to this processor. `data` must be of type `INPUT_TYPE` + The function returns the output of type `OUTPUT_TYPE` + """ # logger.debug(f"{self.__class__.__name__} push") return await self._push(data) async def flush(self): + """ + Flush data to this processor + """ # logger.debug(f"{self.__class__.__name__} flush") return await self._flush() @@ -114,12 +137,24 @@ class Processor: @classmethod def as_threaded(cls, *args, **kwargs): + """ + Return a single threaded processor where output is guaranteed + to be in order + """ return ThreadedProcessor(cls(*args, **kwargs), max_workers=1) class ThreadedProcessor(Processor): + """ + A processor that runs in a separate thread + """ + def __init__(self, processor: Processor, max_workers=1): super().__init__() + # FIXME: This is a hack to make sure that the processor is single threaded + # but if it is more than 1, then we need to make sure that the processor + # is emiting data in order + assert max_workers == 1 self.processor = processor self.INPUT_TYPE = processor.INPUT_TYPE self.OUTPUT_TYPE = processor.OUTPUT_TYPE @@ -231,9 +266,12 @@ class AudioTranscriptProcessor(Processor): OUTPUT_TYPE = Transcript async def _push(self, data: AudioFile): - result = await self._transcript(data) - if result: - await self.emit(result) + try: + result = await self._transcript(data) + if result: + await self.emit(result) + finally: + data.release() async def _transcript(self, data: AudioFile): raise NotImplementedError @@ -282,8 +320,8 @@ class TranscriptLineProcessor(Processor): INPUT_TYPE = Transcript OUTPUT_TYPE = Transcript - def __init__(self, max_text=1000): - super().__init__() + def __init__(self, max_text=1000, **kwargs): + super().__init__(**kwargs) self.transcript = Transcript(words=[]) self.max_text = max_text @@ -340,9 +378,58 @@ class TitleSummaryProcessor(Processor): summary = TitleSummary(title=result.title, summary=result.description) await self.emit(summary) + except httpx.ConnectError as e: + logger.error(f"Failed to call llm: {e}") + except Exception: logger.exception("Failed to call llm") - return + + +class Pipeline(Processor): + """ + A pipeline of processors + """ + + INPUT_TYPE = None + OUTPUT_TYPE = None + + def __init__(self, *processors): + super().__init__() + self.processors = processors + + for i in range(len(processors) - 1): + processors[i].connect(processors[i + 1]) + + self.INPUT_TYPE = processors[0].INPUT_TYPE + self.OUTPUT_TYPE = processors[-1].OUTPUT_TYPE + + async def _push(self, data): + await self.processors[0].push(data) + + async def _flush(self): + for processor in self.processors: + await processor.flush() + + +class FinalSummaryProcessor(Processor): + """ + Assemble all summary into a line-based json + """ + + INPUT_TYPE = TitleSummary + OUTPUT_TYPE = Path + + def __init__(self, filename: Path, **kwargs): + super().__init__(**kwargs) + self.filename = filename + + async def _push(self, data: TitleSummary): + with open(self.filename, "a", encoding="utf8") as fd: + fd.write(json.dumps(data)) + + async def _flush(self): + logger.info(f"Writing to {self.filename}") + await self.emit(self.filename) if __name__ == "__main__": @@ -353,44 +440,40 @@ if __name__ == "__main__": args = parser.parse_args() async def main(): - chunker = AudioChunkerProcessor() - - # merge audio - merger = AudioMergeProcessor.as_threaded() - chunker.connect(merger) - - # transcript audio - transcripter = AudioWhisperTranscriptProcessor() - merger.connect(transcripter) - - # merge transcript and output lines - line_processor = TranscriptLineProcessor() - transcripter.connect(line_processor) - async def on_transcript(transcript): print(f"Transcript: [{transcript.human_timestamp}]: {transcript.text}") - line_processor.on(on_transcript) + async def on_summary(summary): + print(f"Summary: {summary.title} - {summary.summary}") - # # title and summary - # title_summary = TitleSummaryProcessor.as_threaded() - # line_processor.connect(title_summary) - # - # async def on_summary(summary): - # print(f"Summary: title={summary.title} summary={summary.summary}") - # - # title_summary.on(on_summary) + async def on_final_summary(path): + print(f"Final Summary: {path}") + + # transcription output + result_fn = Path(args.source).with_suffix(".jsonl") + + pipeline = Pipeline( + AudioChunkerProcessor(), + AudioMergeProcessor(), + AudioWhisperTranscriptProcessor().as_threaded(), + TranscriptLineProcessor(callback=on_transcript), + TitleSummaryProcessor.as_threaded(callback=on_summary), + FinalSummaryProcessor.as_threaded( + filename=result_fn, callback=on_final_summary + ), + ) # start processing audio + logger.info(f"Opening{args.source}") container = av.open(args.source) - for frame in container.decode(audio=0): - await chunker.push(frame) + try: + logger.info("Start pushing audio into the pipeline") + for frame in container.decode(audio=0): + await pipeline.push(frame) + finally: + logger.info("Flushing the pipeline") + await pipeline.flush() - # audio done, flush everything - await chunker.flush() - await merger.flush() - await transcripter.flush() - await line_processor.flush() - # await title_summary.flush() + logger.info("All done !") asyncio.run(main()) From 3908c1ca53515858df13fbaf79574d694c654ac3 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Sat, 29 Jul 2023 01:32:37 +0200 Subject: [PATCH 03/12] processors: customize logger and auto describe --- server/reflector/processors.py | 73 ++++++++++++++++++++++++++++++---- 1 file changed, 66 insertions(+), 7 deletions(-) diff --git a/server/reflector/processors.py b/server/reflector/processors.py index 8d26fcb7..02f003bd 100644 --- a/server/reflector/processors.py +++ b/server/reflector/processors.py @@ -9,6 +9,7 @@ from reflector.logger import logger import httpx import asyncio import json +from uuid import uuid4 from concurrent.futures import ThreadPoolExecutor @@ -73,11 +74,17 @@ class Processor: INPUT_TYPE: type = None OUTPUT_TYPE: type = None - def __init__(self, callback=None): + def __init__(self, callback=None, custom_logger=None): self._processors = [] self._callbacks = [] if callback: self.on(callback) + self.uid = uuid4().hex + self.logger = (custom_logger or logger).bind( + processor=self.__class__.__name__) + + def set_pipeline(self, pipeline: "Pipeline"): + self.logger = self.logger.bind(pipeline=pipeline.uid) def connect(self, processor: "Processor"): """ @@ -129,6 +136,9 @@ class Processor: # logger.debug(f"{self.__class__.__name__} flush") return await self._flush() + def describe(self, level=0): + logger.info(" " * level + self.__class__.__name__) + async def _push(self, data): raise NotImplementedError @@ -189,6 +199,10 @@ class ThreadedProcessor(Processor): def on(self, callback): self.processor.on(callback) + def describe(self, level=0): + super().describe(level) + self.processor.describe(level + 1) + class AudioChunkerProcessor(Processor): """ @@ -312,6 +326,35 @@ class AudioWhisperTranscriptProcessor(AudioTranscriptProcessor): return transcript +class AudioAutoTranscriptProcessor(AudioTranscriptProcessor): + BACKENDS = { + "whisper": AudioWhisperTranscriptProcessor, + } + BACKEND_DEFAULT = "whisper" + + def __init__(self, backend=None, **kwargs): + super().__init__(**kwargs) + self.processor = self.BACKENDS[backend or self.BACKEND_DEFAULT]() + + def connect(self, processor: Processor): + self.processor.connect(processor) + + def disconnect(self, processor: Processor): + self.processor.disconnect(processor) + + def on(self, callback): + self.processor.on(callback) + + def off(self, callback): + self.processor.off(callback) + + async def _push(self, data: AudioFile): + return await self.processor._push(data) + + async def _flush(self): + return await self.processor._flush() + + class TranscriptLineProcessor(Processor): """ Based on stream of transcript, assemble lines, remove duplicated words @@ -379,10 +422,10 @@ class TitleSummaryProcessor(Processor): await self.emit(summary) except httpx.ConnectError as e: - logger.error(f"Failed to call llm: {e}") + self.logger.error(f"Failed to call llm: {e}") except Exception: - logger.exception("Failed to call llm") + self.logger.exception("Failed to call llm") class Pipeline(Processor): @@ -393,10 +436,13 @@ class Pipeline(Processor): INPUT_TYPE = None OUTPUT_TYPE = None - def __init__(self, *processors): + def __init__(self, *processors: Processor): super().__init__() self.processors = processors + for processor in processors: + processor.set_pipeline(self) + for i in range(len(processors) - 1): processors[i].connect(processors[i + 1]) @@ -410,6 +456,13 @@ class Pipeline(Processor): for processor in self.processors: await processor.flush() + def describe(self, level=0): + logger.info(" " * level + "Pipeline:") + for processor in self.processors: + processor.describe(level + 1) + logger.info("") + + class FinalSummaryProcessor(Processor): """ @@ -422,13 +475,18 @@ class FinalSummaryProcessor(Processor): def __init__(self, filename: Path, **kwargs): super().__init__(**kwargs) self.filename = filename + self.chunkcount = 0 async def _push(self, data: TitleSummary): with open(self.filename, "a", encoding="utf8") as fd: fd.write(json.dumps(data)) + self.chunkcount += 1 async def _flush(self): - logger.info(f"Writing to {self.filename}") + if self.chunkcount == 0: + self.logger.warning("No summary to write") + return + self.logger.info(f"Writing to {self.filename}") await self.emit(self.filename) @@ -455,16 +513,17 @@ if __name__ == "__main__": pipeline = Pipeline( AudioChunkerProcessor(), AudioMergeProcessor(), - AudioWhisperTranscriptProcessor().as_threaded(), + AudioAutoTranscriptProcessor.as_threaded(), TranscriptLineProcessor(callback=on_transcript), TitleSummaryProcessor.as_threaded(callback=on_summary), FinalSummaryProcessor.as_threaded( filename=result_fn, callback=on_final_summary ), ) + pipeline.describe() # start processing audio - logger.info(f"Opening{args.source}") + logger.info(f"Opening {args.source}") container = av.open(args.source) try: logger.info("Start pushing audio into the pipeline") From 224afc6f282c7b7e84707e968010f038f2492f82 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Sat, 29 Jul 2023 15:59:25 +0200 Subject: [PATCH 04/12] fastapi: implement server with same back compatibility as before --- server/poetry.lock | 285 ++++++++++++++++++++++++++-- server/pyproject.toml | 2 + server/reflector/app.py | 15 ++ server/reflector/processors.py | 12 +- server/reflector/views/rtc_offer.py | 121 ++++++++++++ 5 files changed, 419 insertions(+), 16 deletions(-) create mode 100644 server/reflector/app.py create mode 100644 server/reflector/views/rtc_offer.py diff --git a/server/poetry.lock b/server/poetry.lock index 1d871c03..8af4d656 100644 --- a/server/poetry.lock +++ b/server/poetry.lock @@ -677,27 +677,42 @@ pyyaml = ">=5.3,<7" [[package]] name = "dnspython" -version = "2.4.0" +version = "2.4.1" description = "DNS toolkit" optional = false python-versions = ">=3.8,<4.0" files = [ - {file = "dnspython-2.4.0-py3-none-any.whl", hash = "sha256:46b4052a55b56beea3a3bdd7b30295c292bd6827dd442348bc116f2d35b17f0a"}, - {file = "dnspython-2.4.0.tar.gz", hash = "sha256:758e691dbb454d5ccf4e1b154a19e52847f79e21a42fef17b969144af29a4e6c"}, + {file = "dnspython-2.4.1-py3-none-any.whl", hash = "sha256:5b7488477388b8c0b70a8ce93b227c5603bc7b77f1565afe8e729c36c51447d7"}, + {file = "dnspython-2.4.1.tar.gz", hash = "sha256:c33971c79af5be968bb897e95c2448e11a645ee84d93b265ce0b7aabe5dfdca8"}, ] -[package.dependencies] -httpcore = {version = ">=0.17.3", markers = "python_version >= \"3.8\""} -sniffio = ">=1.1,<2.0" - [package.extras] dnssec = ["cryptography (>=2.6,<42.0)"] -doh = ["h2 (>=4.1.0)", "httpx (>=0.24.1)"] +doh = ["h2 (>=4.1.0)", "httpcore (>=0.17.3)", "httpx (>=0.24.1)"] doq = ["aioquic (>=0.9.20)"] idna = ["idna (>=2.1,<4.0)"] trio = ["trio (>=0.14,<0.23)"] wmi = ["wmi (>=1.5.1,<2.0.0)"] +[[package]] +name = "fastapi" +version = "0.100.1" +description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production" +optional = false +python-versions = ">=3.7" +files = [ + {file = "fastapi-0.100.1-py3-none-any.whl", hash = "sha256:ec6dd52bfc4eff3063cfcd0713b43c87640fefb2687bbbe3d8a08d94049cdf32"}, + {file = "fastapi-0.100.1.tar.gz", hash = "sha256:522700d7a469e4a973d92321ab93312448fbe20fca9c8da97effc7e7bc56df23"}, +] + +[package.dependencies] +pydantic = ">=1.7.4,<1.8 || >1.8,<1.8.1 || >1.8.1,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<3.0.0" +starlette = ">=0.27.0,<0.28.0" +typing-extensions = ">=4.5.0" + +[package.extras] +all = ["email-validator (>=2.0.0)", "httpx (>=0.23.0)", "itsdangerous (>=1.1.0)", "jinja2 (>=2.11.2)", "orjson (>=3.2.1)", "pydantic-extra-types (>=2.0.0)", "pydantic-settings (>=2.0.0)", "python-multipart (>=0.0.5)", "pyyaml (>=5.3.1)", "ujson (>=4.0.1,!=4.0.2,!=4.1.0,!=4.2.0,!=4.3.0,!=5.0.0,!=5.1.0)", "uvicorn[standard] (>=0.12.0)"] + [[package]] name = "faster-whisper" version = "0.7.1" @@ -963,6 +978,53 @@ sniffio = "==1.*" http2 = ["h2 (>=3,<5)"] socks = ["socksio (==1.*)"] +[[package]] +name = "httptools" +version = "0.6.0" +description = "A collection of framework independent HTTP protocol utils." +optional = false +python-versions = ">=3.5.0" +files = [ + {file = "httptools-0.6.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:818325afee467d483bfab1647a72054246d29f9053fd17cc4b86cda09cc60339"}, + {file = "httptools-0.6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:72205730bf1be875003692ca54a4a7c35fac77b4746008966061d9d41a61b0f5"}, + {file = "httptools-0.6.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:33eb1d4e609c835966e969a31b1dedf5ba16b38cab356c2ce4f3e33ffa94cad3"}, + {file = "httptools-0.6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bdc6675ec6cb79d27e0575750ac6e2b47032742e24eed011b8db73f2da9ed40"}, + {file = "httptools-0.6.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:463c3bc5ef64b9cf091be9ac0e0556199503f6e80456b790a917774a616aff6e"}, + {file = "httptools-0.6.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:82f228b88b0e8c6099a9c4757ce9fdbb8b45548074f8d0b1f0fc071e35655d1c"}, + {file = "httptools-0.6.0-cp310-cp310-win_amd64.whl", hash = "sha256:0781fedc610293a2716bc7fa142d4c85e6776bc59d617a807ff91246a95dea35"}, + {file = "httptools-0.6.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:721e503245d591527cddd0f6fd771d156c509e831caa7a57929b55ac91ee2b51"}, + {file = "httptools-0.6.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:274bf20eeb41b0956e34f6a81f84d26ed57c84dd9253f13dcb7174b27ccd8aaf"}, + {file = "httptools-0.6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:259920bbae18740a40236807915def554132ad70af5067e562f4660b62c59b90"}, + {file = "httptools-0.6.0-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:03bfd2ae8a2d532952ac54445a2fb2504c804135ed28b53fefaf03d3a93eb1fd"}, + {file = "httptools-0.6.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:f959e4770b3fc8ee4dbc3578fd910fab9003e093f20ac8c621452c4d62e517cb"}, + {file = "httptools-0.6.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6e22896b42b95b3237eccc42278cd72c0df6f23247d886b7ded3163452481e38"}, + {file = "httptools-0.6.0-cp311-cp311-win_amd64.whl", hash = "sha256:38f3cafedd6aa20ae05f81f2e616ea6f92116c8a0f8dcb79dc798df3356836e2"}, + {file = "httptools-0.6.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:47043a6e0ea753f006a9d0dd076a8f8c99bc0ecae86a0888448eb3076c43d717"}, + {file = "httptools-0.6.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:35a541579bed0270d1ac10245a3e71e5beeb1903b5fbbc8d8b4d4e728d48ff1d"}, + {file = "httptools-0.6.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:65d802e7b2538a9756df5acc062300c160907b02e15ed15ba035b02bce43e89c"}, + {file = "httptools-0.6.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:26326e0a8fe56829f3af483200d914a7cd16d8d398d14e36888b56de30bec81a"}, + {file = "httptools-0.6.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:e41ccac9e77cd045f3e4ee0fc62cbf3d54d7d4b375431eb855561f26ee7a9ec4"}, + {file = "httptools-0.6.0-cp37-cp37m-win_amd64.whl", hash = "sha256:4e748fc0d5c4a629988ef50ac1aef99dfb5e8996583a73a717fc2cac4ab89932"}, + {file = "httptools-0.6.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:cf8169e839a0d740f3d3c9c4fa630ac1a5aaf81641a34575ca6773ed7ce041a1"}, + {file = "httptools-0.6.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:5dcc14c090ab57b35908d4a4585ec5c0715439df07be2913405991dbb37e049d"}, + {file = "httptools-0.6.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0d0b0571806a5168013b8c3d180d9f9d6997365a4212cb18ea20df18b938aa0b"}, + {file = "httptools-0.6.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0fb4a608c631f7dcbdf986f40af7a030521a10ba6bc3d36b28c1dc9e9035a3c0"}, + {file = "httptools-0.6.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:93f89975465133619aea8b1952bc6fa0e6bad22a447c6d982fc338fbb4c89649"}, + {file = "httptools-0.6.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:73e9d66a5a28b2d5d9fbd9e197a31edd02be310186db423b28e6052472dc8201"}, + {file = "httptools-0.6.0-cp38-cp38-win_amd64.whl", hash = "sha256:22c01fcd53648162730a71c42842f73b50f989daae36534c818b3f5050b54589"}, + {file = "httptools-0.6.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:3f96d2a351b5625a9fd9133c95744e8ca06f7a4f8f0b8231e4bbaae2c485046a"}, + {file = "httptools-0.6.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:72ec7c70bd9f95ef1083d14a755f321d181f046ca685b6358676737a5fecd26a"}, + {file = "httptools-0.6.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b703d15dbe082cc23266bf5d9448e764c7cb3fcfe7cb358d79d3fd8248673ef9"}, + {file = "httptools-0.6.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:82c723ed5982f8ead00f8e7605c53e55ffe47c47465d878305ebe0082b6a1755"}, + {file = "httptools-0.6.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:b0a816bb425c116a160fbc6f34cece097fd22ece15059d68932af686520966bd"}, + {file = "httptools-0.6.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:dea66d94e5a3f68c5e9d86e0894653b87d952e624845e0b0e3ad1c733c6cc75d"}, + {file = "httptools-0.6.0-cp39-cp39-win_amd64.whl", hash = "sha256:23b09537086a5a611fad5696fc8963d67c7e7f98cb329d38ee114d588b0b74cd"}, + {file = "httptools-0.6.0.tar.gz", hash = "sha256:9fc6e409ad38cbd68b177cd5158fc4042c796b82ca88d99ec78f07bed6c6b796"}, +] + +[package.extras] +test = ["Cython (>=0.29.24,<0.30.0)"] + [[package]] name = "httpx" version = "0.24.1" @@ -1282,13 +1344,13 @@ files = [ [[package]] name = "pathspec" -version = "0.11.1" +version = "0.11.2" description = "Utility library for gitignore style pattern matching of file paths." optional = false python-versions = ">=3.7" files = [ - {file = "pathspec-0.11.1-py3-none-any.whl", hash = "sha256:d8af70af76652554bd134c22b3e8a1cc46ed7d91edcdd721ef1a0c51a84a5293"}, - {file = "pathspec-0.11.1.tar.gz", hash = "sha256:2798de800fa92780e33acca925945e9a19a133b715067cf165b8866c15a31687"}, + {file = "pathspec-0.11.2-py3-none-any.whl", hash = "sha256:1d6ed233af05e679efb96b1851550ea95bbb64b7c490b0f5aa52996c11e92a20"}, + {file = "pathspec-0.11.2.tar.gz", hash = "sha256:e0d8d0ac2f12da61956eb2306b69f9469b42f4deb0f3cb6ed47b9cce9996ced3"}, ] [[package]] @@ -1815,6 +1877,23 @@ docs = ["furo", "myst-parser", "prometheus-client", "sphinx", "sphinx-notfound-p tests = ["pytest", "pytest-asyncio"] typing = ["mypy (>=1.4)"] +[[package]] +name = "starlette" +version = "0.27.0" +description = "The little ASGI library that shines." +optional = false +python-versions = ">=3.7" +files = [ + {file = "starlette-0.27.0-py3-none-any.whl", hash = "sha256:918416370e846586541235ccd38a474c08b80443ed31c578a418e2209b3eef91"}, + {file = "starlette-0.27.0.tar.gz", hash = "sha256:6a6b0d042acb8d469a01eba54e9cda6cbd24ac602c4cd016723117d6a7e73b75"}, +] + +[package.dependencies] +anyio = ">=3.4.0,<5" + +[package.extras] +full = ["httpx (>=0.22.0)", "itsdangerous", "jinja2", "python-multipart", "pyyaml"] + [[package]] name = "structlog" version = "23.1.0" @@ -1962,6 +2041,188 @@ secure = ["certifi", "cryptography (>=1.9)", "idna (>=2.0.0)", "pyopenssl (>=17. socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] +[[package]] +name = "uvicorn" +version = "0.23.1" +description = "The lightning-fast ASGI server." +optional = false +python-versions = ">=3.8" +files = [ + {file = "uvicorn-0.23.1-py3-none-any.whl", hash = "sha256:1d55d46b83ee4ce82b4e82f621f2050adb3eb7b5481c13f9af1744951cae2f1f"}, + {file = "uvicorn-0.23.1.tar.gz", hash = "sha256:da9b0c8443b2d7ee9db00a345f1eee6db7317432c9d4400f5049cc8d358383be"}, +] + +[package.dependencies] +click = ">=7.0" +colorama = {version = ">=0.4", optional = true, markers = "sys_platform == \"win32\" and extra == \"standard\""} +h11 = ">=0.8" +httptools = {version = ">=0.5.0", optional = true, markers = "extra == \"standard\""} +python-dotenv = {version = ">=0.13", optional = true, markers = "extra == \"standard\""} +pyyaml = {version = ">=5.1", optional = true, markers = "extra == \"standard\""} +uvloop = {version = ">=0.14.0,<0.15.0 || >0.15.0,<0.15.1 || >0.15.1", optional = true, markers = "(sys_platform != \"win32\" and sys_platform != \"cygwin\") and platform_python_implementation != \"PyPy\" and extra == \"standard\""} +watchfiles = {version = ">=0.13", optional = true, markers = "extra == \"standard\""} +websockets = {version = ">=10.4", optional = true, markers = "extra == \"standard\""} + +[package.extras] +standard = ["colorama (>=0.4)", "httptools (>=0.5.0)", "python-dotenv (>=0.13)", "pyyaml (>=5.1)", "uvloop (>=0.14.0,!=0.15.0,!=0.15.1)", "watchfiles (>=0.13)", "websockets (>=10.4)"] + +[[package]] +name = "uvloop" +version = "0.17.0" +description = "Fast implementation of asyncio event loop on top of libuv" +optional = false +python-versions = ">=3.7" +files = [ + {file = "uvloop-0.17.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:ce9f61938d7155f79d3cb2ffa663147d4a76d16e08f65e2c66b77bd41b356718"}, + {file = "uvloop-0.17.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:68532f4349fd3900b839f588972b3392ee56042e440dd5873dfbbcd2cc67617c"}, + {file = "uvloop-0.17.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0949caf774b9fcefc7c5756bacbbbd3fc4c05a6b7eebc7c7ad6f825b23998d6d"}, + {file = "uvloop-0.17.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ff3d00b70ce95adce264462c930fbaecb29718ba6563db354608f37e49e09024"}, + {file = "uvloop-0.17.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:a5abddb3558d3f0a78949c750644a67be31e47936042d4f6c888dd6f3c95f4aa"}, + {file = "uvloop-0.17.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8efcadc5a0003d3a6e887ccc1fb44dec25594f117a94e3127954c05cf144d811"}, + {file = "uvloop-0.17.0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:3378eb62c63bf336ae2070599e49089005771cc651c8769aaad72d1bd9385a7c"}, + {file = "uvloop-0.17.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6aafa5a78b9e62493539456f8b646f85abc7093dd997f4976bb105537cf2635e"}, + {file = "uvloop-0.17.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c686a47d57ca910a2572fddfe9912819880b8765e2f01dc0dd12a9bf8573e539"}, + {file = "uvloop-0.17.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:864e1197139d651a76c81757db5eb199db8866e13acb0dfe96e6fc5d1cf45fc4"}, + {file = "uvloop-0.17.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:2a6149e1defac0faf505406259561bc14b034cdf1d4711a3ddcdfbaa8d825a05"}, + {file = "uvloop-0.17.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6708f30db9117f115eadc4f125c2a10c1a50d711461699a0cbfaa45b9a78e376"}, + {file = "uvloop-0.17.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:23609ca361a7fc587031429fa25ad2ed7242941adec948f9d10c045bfecab06b"}, + {file = "uvloop-0.17.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2deae0b0fb00a6af41fe60a675cec079615b01d68beb4cc7b722424406b126a8"}, + {file = "uvloop-0.17.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:45cea33b208971e87a31c17622e4b440cac231766ec11e5d22c76fab3bf9df62"}, + {file = "uvloop-0.17.0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:9b09e0f0ac29eee0451d71798878eae5a4e6a91aa275e114037b27f7db72702d"}, + {file = "uvloop-0.17.0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:dbbaf9da2ee98ee2531e0c780455f2841e4675ff580ecf93fe5c48fe733b5667"}, + {file = "uvloop-0.17.0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:a4aee22ece20958888eedbad20e4dbb03c37533e010fb824161b4f05e641f738"}, + {file = "uvloop-0.17.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:307958f9fc5c8bb01fad752d1345168c0abc5d62c1b72a4a8c6c06f042b45b20"}, + {file = "uvloop-0.17.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3ebeeec6a6641d0adb2ea71dcfb76017602ee2bfd8213e3fcc18d8f699c5104f"}, + {file = "uvloop-0.17.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1436c8673c1563422213ac6907789ecb2b070f5939b9cbff9ef7113f2b531595"}, + {file = "uvloop-0.17.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:8887d675a64cfc59f4ecd34382e5b4f0ef4ae1da37ed665adba0c2badf0d6578"}, + {file = "uvloop-0.17.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:3db8de10ed684995a7f34a001f15b374c230f7655ae840964d51496e2f8a8474"}, + {file = "uvloop-0.17.0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:7d37dccc7ae63e61f7b96ee2e19c40f153ba6ce730d8ba4d3b4e9738c1dccc1b"}, + {file = "uvloop-0.17.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:cbbe908fda687e39afd6ea2a2f14c2c3e43f2ca88e3a11964b297822358d0e6c"}, + {file = "uvloop-0.17.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3d97672dc709fa4447ab83276f344a165075fd9f366a97b712bdd3fee05efae8"}, + {file = "uvloop-0.17.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f1e507c9ee39c61bfddd79714e4f85900656db1aec4d40c6de55648e85c2799c"}, + {file = "uvloop-0.17.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:c092a2c1e736086d59ac8e41f9c98f26bbf9b9222a76f21af9dfe949b99b2eb9"}, + {file = "uvloop-0.17.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:30babd84706115626ea78ea5dbc7dd8d0d01a2e9f9b306d24ca4ed5796c66ded"}, + {file = "uvloop-0.17.0.tar.gz", hash = "sha256:0ddf6baf9cf11a1a22c71487f39f15b2cf78eb5bde7e5b45fbb99e8a9d91b9e1"}, +] + +[package.extras] +dev = ["Cython (>=0.29.32,<0.30.0)", "Sphinx (>=4.1.2,<4.2.0)", "aiohttp", "flake8 (>=3.9.2,<3.10.0)", "mypy (>=0.800)", "psutil", "pyOpenSSL (>=22.0.0,<22.1.0)", "pycodestyle (>=2.7.0,<2.8.0)", "pytest (>=3.6.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)"] +docs = ["Sphinx (>=4.1.2,<4.2.0)", "sphinx-rtd-theme (>=0.5.2,<0.6.0)", "sphinxcontrib-asyncio (>=0.3.0,<0.4.0)"] +test = ["Cython (>=0.29.32,<0.30.0)", "aiohttp", "flake8 (>=3.9.2,<3.10.0)", "mypy (>=0.800)", "psutil", "pyOpenSSL (>=22.0.0,<22.1.0)", "pycodestyle (>=2.7.0,<2.8.0)"] + +[[package]] +name = "watchfiles" +version = "0.19.0" +description = "Simple, modern and high performance file watching and code reload in python." +optional = false +python-versions = ">=3.7" +files = [ + {file = "watchfiles-0.19.0-cp37-abi3-macosx_10_7_x86_64.whl", hash = "sha256:91633e64712df3051ca454ca7d1b976baf842d7a3640b87622b323c55f3345e7"}, + {file = "watchfiles-0.19.0-cp37-abi3-macosx_11_0_arm64.whl", hash = "sha256:b6577b8c6c8701ba8642ea9335a129836347894b666dd1ec2226830e263909d3"}, + {file = "watchfiles-0.19.0-cp37-abi3-manylinux_2_12_i686.manylinux2010_i686.whl", hash = "sha256:18b28f6ad871b82df9542ff958d0c86bb0d8310bb09eb8e87d97318a3b5273af"}, + {file = "watchfiles-0.19.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fac19dc9cbc34052394dbe81e149411a62e71999c0a19e1e09ce537867f95ae0"}, + {file = "watchfiles-0.19.0-cp37-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:09ea3397aecbc81c19ed7f025e051a7387feefdb789cf768ff994c1228182fda"}, + {file = "watchfiles-0.19.0-cp37-abi3-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:c0376deac92377817e4fb8f347bf559b7d44ff556d9bc6f6208dd3f79f104aaf"}, + {file = "watchfiles-0.19.0-cp37-abi3-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9c75eff897786ee262c9f17a48886f4e98e6cfd335e011c591c305e5d083c056"}, + {file = "watchfiles-0.19.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cb5d45c4143c1dd60f98a16187fd123eda7248f84ef22244818c18d531a249d1"}, + {file = "watchfiles-0.19.0-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:79c533ff593db861ae23436541f481ec896ee3da4e5db8962429b441bbaae16e"}, + {file = "watchfiles-0.19.0-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:3d7d267d27aceeeaa3de0dd161a0d64f0a282264d592e335fff7958cc0cbae7c"}, + {file = "watchfiles-0.19.0-cp37-abi3-win32.whl", hash = "sha256:176a9a7641ec2c97b24455135d58012a5be5c6217fc4d5fef0b2b9f75dbf5154"}, + {file = "watchfiles-0.19.0-cp37-abi3-win_amd64.whl", hash = "sha256:945be0baa3e2440151eb3718fd8846751e8b51d8de7b884c90b17d271d34cae8"}, + {file = "watchfiles-0.19.0-cp37-abi3-win_arm64.whl", hash = "sha256:0089c6dc24d436b373c3c57657bf4f9a453b13767150d17284fc6162b2791911"}, + {file = "watchfiles-0.19.0-pp38-pypy38_pp73-macosx_10_7_x86_64.whl", hash = "sha256:cae3dde0b4b2078f31527acff6f486e23abed307ba4d3932466ba7cdd5ecec79"}, + {file = "watchfiles-0.19.0-pp38-pypy38_pp73-macosx_11_0_arm64.whl", hash = "sha256:7f3920b1285a7d3ce898e303d84791b7bf40d57b7695ad549dc04e6a44c9f120"}, + {file = "watchfiles-0.19.0-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9afd0d69429172c796164fd7fe8e821ade9be983f51c659a38da3faaaaac44dc"}, + {file = "watchfiles-0.19.0-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:68dce92b29575dda0f8d30c11742a8e2b9b8ec768ae414b54f7453f27bdf9545"}, + {file = "watchfiles-0.19.0-pp39-pypy39_pp73-macosx_10_7_x86_64.whl", hash = "sha256:5569fc7f967429d4bc87e355cdfdcee6aabe4b620801e2cf5805ea245c06097c"}, + {file = "watchfiles-0.19.0-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:5471582658ea56fca122c0f0d0116a36807c63fefd6fdc92c71ca9a4491b6b48"}, + {file = "watchfiles-0.19.0-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b538014a87f94d92f98f34d3e6d2635478e6be6423a9ea53e4dd96210065e193"}, + {file = "watchfiles-0.19.0-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:20b44221764955b1e703f012c74015306fb7e79a00c15370785f309b1ed9aa8d"}, + {file = "watchfiles-0.19.0.tar.gz", hash = "sha256:d9b073073e048081e502b6c6b0b88714c026a1a4c890569238d04aca5f9ca74b"}, +] + +[package.dependencies] +anyio = ">=3.0.0" + +[[package]] +name = "websockets" +version = "11.0.3" +description = "An implementation of the WebSocket Protocol (RFC 6455 & 7692)" +optional = false +python-versions = ">=3.7" +files = [ + {file = "websockets-11.0.3-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:3ccc8a0c387629aec40f2fc9fdcb4b9d5431954f934da3eaf16cdc94f67dbfac"}, + {file = "websockets-11.0.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d67ac60a307f760c6e65dad586f556dde58e683fab03323221a4e530ead6f74d"}, + {file = "websockets-11.0.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:84d27a4832cc1a0ee07cdcf2b0629a8a72db73f4cf6de6f0904f6661227f256f"}, + {file = "websockets-11.0.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ffd7dcaf744f25f82190856bc26ed81721508fc5cbf2a330751e135ff1283564"}, + {file = "websockets-11.0.3-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7622a89d696fc87af8e8d280d9b421db5133ef5b29d3f7a1ce9f1a7bf7fcfa11"}, + {file = "websockets-11.0.3-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bceab846bac555aff6427d060f2fcfff71042dba6f5fca7dc4f75cac815e57ca"}, + {file = "websockets-11.0.3-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:54c6e5b3d3a8936a4ab6870d46bdd6ec500ad62bde9e44462c32d18f1e9a8e54"}, + {file = "websockets-11.0.3-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:41f696ba95cd92dc047e46b41b26dd24518384749ed0d99bea0a941ca87404c4"}, + {file = "websockets-11.0.3-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:86d2a77fd490ae3ff6fae1c6ceaecad063d3cc2320b44377efdde79880e11526"}, + {file = "websockets-11.0.3-cp310-cp310-win32.whl", hash = "sha256:2d903ad4419f5b472de90cd2d40384573b25da71e33519a67797de17ef849b69"}, + {file = "websockets-11.0.3-cp310-cp310-win_amd64.whl", hash = "sha256:1d2256283fa4b7f4c7d7d3e84dc2ece74d341bce57d5b9bf385df109c2a1a82f"}, + {file = "websockets-11.0.3-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:e848f46a58b9fcf3d06061d17be388caf70ea5b8cc3466251963c8345e13f7eb"}, + {file = "websockets-11.0.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:aa5003845cdd21ac0dc6c9bf661c5beddd01116f6eb9eb3c8e272353d45b3288"}, + {file = "websockets-11.0.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:b58cbf0697721120866820b89f93659abc31c1e876bf20d0b3d03cef14faf84d"}, + {file = "websockets-11.0.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:660e2d9068d2bedc0912af508f30bbeb505bbbf9774d98def45f68278cea20d3"}, + {file = "websockets-11.0.3-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c1f0524f203e3bd35149f12157438f406eff2e4fb30f71221c8a5eceb3617b6b"}, + {file = "websockets-11.0.3-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:def07915168ac8f7853812cc593c71185a16216e9e4fa886358a17ed0fd9fcf6"}, + {file = "websockets-11.0.3-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:b30c6590146e53149f04e85a6e4fcae068df4289e31e4aee1fdf56a0dead8f97"}, + {file = "websockets-11.0.3-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:619d9f06372b3a42bc29d0cd0354c9bb9fb39c2cbc1a9c5025b4538738dbffaf"}, + {file = "websockets-11.0.3-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:01f5567d9cf6f502d655151645d4e8b72b453413d3819d2b6f1185abc23e82dd"}, + {file = "websockets-11.0.3-cp311-cp311-win32.whl", hash = "sha256:e1459677e5d12be8bbc7584c35b992eea142911a6236a3278b9b5ce3326f282c"}, + {file = "websockets-11.0.3-cp311-cp311-win_amd64.whl", hash = "sha256:e7837cb169eca3b3ae94cc5787c4fed99eef74c0ab9506756eea335e0d6f3ed8"}, + {file = "websockets-11.0.3-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:9f59a3c656fef341a99e3d63189852be7084c0e54b75734cde571182c087b152"}, + {file = "websockets-11.0.3-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2529338a6ff0eb0b50c7be33dc3d0e456381157a31eefc561771ee431134a97f"}, + {file = "websockets-11.0.3-cp37-cp37m-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:34fd59a4ac42dff6d4681d8843217137f6bc85ed29722f2f7222bd619d15e95b"}, + {file = "websockets-11.0.3-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:332d126167ddddec94597c2365537baf9ff62dfcc9db4266f263d455f2f031cb"}, + {file = "websockets-11.0.3-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:6505c1b31274723ccaf5f515c1824a4ad2f0d191cec942666b3d0f3aa4cb4007"}, + {file = "websockets-11.0.3-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:f467ba0050b7de85016b43f5a22b46383ef004c4f672148a8abf32bc999a87f0"}, + {file = "websockets-11.0.3-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:9d9acd80072abcc98bd2c86c3c9cd4ac2347b5a5a0cae7ed5c0ee5675f86d9af"}, + {file = "websockets-11.0.3-cp37-cp37m-win32.whl", hash = "sha256:e590228200fcfc7e9109509e4d9125eace2042fd52b595dd22bbc34bb282307f"}, + {file = "websockets-11.0.3-cp37-cp37m-win_amd64.whl", hash = "sha256:b16fff62b45eccb9c7abb18e60e7e446998093cdcb50fed33134b9b6878836de"}, + {file = "websockets-11.0.3-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:fb06eea71a00a7af0ae6aefbb932fb8a7df3cb390cc217d51a9ad7343de1b8d0"}, + {file = "websockets-11.0.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:8a34e13a62a59c871064dfd8ffb150867e54291e46d4a7cf11d02c94a5275bae"}, + {file = "websockets-11.0.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:4841ed00f1026dfbced6fca7d963c4e7043aa832648671b5138008dc5a8f6d99"}, + {file = "websockets-11.0.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1a073fc9ab1c8aff37c99f11f1641e16da517770e31a37265d2755282a5d28aa"}, + {file = "websockets-11.0.3-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:68b977f21ce443d6d378dbd5ca38621755f2063d6fdb3335bda981d552cfff86"}, + {file = "websockets-11.0.3-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e1a99a7a71631f0efe727c10edfba09ea6bee4166a6f9c19aafb6c0b5917d09c"}, + {file = "websockets-11.0.3-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:bee9fcb41db2a23bed96c6b6ead6489702c12334ea20a297aa095ce6d31370d0"}, + {file = "websockets-11.0.3-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:4b253869ea05a5a073ebfdcb5cb3b0266a57c3764cf6fe114e4cd90f4bfa5f5e"}, + {file = "websockets-11.0.3-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:1553cb82942b2a74dd9b15a018dce645d4e68674de2ca31ff13ebc2d9f283788"}, + {file = "websockets-11.0.3-cp38-cp38-win32.whl", hash = "sha256:f61bdb1df43dc9c131791fbc2355535f9024b9a04398d3bd0684fc16ab07df74"}, + {file = "websockets-11.0.3-cp38-cp38-win_amd64.whl", hash = "sha256:03aae4edc0b1c68498f41a6772d80ac7c1e33c06c6ffa2ac1c27a07653e79d6f"}, + {file = "websockets-11.0.3-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:777354ee16f02f643a4c7f2b3eff8027a33c9861edc691a2003531f5da4f6bc8"}, + {file = "websockets-11.0.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:8c82f11964f010053e13daafdc7154ce7385ecc538989a354ccc7067fd7028fd"}, + {file = "websockets-11.0.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3580dd9c1ad0701169e4d6fc41e878ffe05e6bdcaf3c412f9d559389d0c9e016"}, + {file = "websockets-11.0.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6f1a3f10f836fab6ca6efa97bb952300b20ae56b409414ca85bff2ad241d2a61"}, + {file = "websockets-11.0.3-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:df41b9bc27c2c25b486bae7cf42fccdc52ff181c8c387bfd026624a491c2671b"}, + {file = "websockets-11.0.3-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:279e5de4671e79a9ac877427f4ac4ce93751b8823f276b681d04b2156713b9dd"}, + {file = "websockets-11.0.3-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:1fdf26fa8a6a592f8f9235285b8affa72748dc12e964a5518c6c5e8f916716f7"}, + {file = "websockets-11.0.3-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:69269f3a0b472e91125b503d3c0b3566bda26da0a3261c49f0027eb6075086d1"}, + {file = "websockets-11.0.3-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:97b52894d948d2f6ea480171a27122d77af14ced35f62e5c892ca2fae9344311"}, + {file = "websockets-11.0.3-cp39-cp39-win32.whl", hash = "sha256:c7f3cb904cce8e1be667c7e6fef4516b98d1a6a0635a58a57528d577ac18a128"}, + {file = "websockets-11.0.3-cp39-cp39-win_amd64.whl", hash = "sha256:c792ea4eabc0159535608fc5658a74d1a81020eb35195dd63214dcf07556f67e"}, + {file = "websockets-11.0.3-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:f2e58f2c36cc52d41f2659e4c0cbf7353e28c8c9e63e30d8c6d3494dc9fdedcf"}, + {file = "websockets-11.0.3-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:de36fe9c02995c7e6ae6efe2e205816f5f00c22fd1fbf343d4d18c3d5ceac2f5"}, + {file = "websockets-11.0.3-pp37-pypy37_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0ac56b661e60edd453585f4bd68eb6a29ae25b5184fd5ba51e97652580458998"}, + {file = "websockets-11.0.3-pp37-pypy37_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e052b8467dd07d4943936009f46ae5ce7b908ddcac3fda581656b1b19c083d9b"}, + {file = "websockets-11.0.3-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:42cc5452a54a8e46a032521d7365da775823e21bfba2895fb7b77633cce031bb"}, + {file = "websockets-11.0.3-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:e6316827e3e79b7b8e7d8e3b08f4e331af91a48e794d5d8b099928b6f0b85f20"}, + {file = "websockets-11.0.3-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8531fdcad636d82c517b26a448dcfe62f720e1922b33c81ce695d0edb91eb931"}, + {file = "websockets-11.0.3-pp38-pypy38_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c114e8da9b475739dde229fd3bc6b05a6537a88a578358bc8eb29b4030fac9c9"}, + {file = "websockets-11.0.3-pp38-pypy38_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e063b1865974611313a3849d43f2c3f5368093691349cf3c7c8f8f75ad7cb280"}, + {file = "websockets-11.0.3-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:92b2065d642bf8c0a82d59e59053dd2fdde64d4ed44efe4870fa816c1232647b"}, + {file = "websockets-11.0.3-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:0ee68fe502f9031f19d495dae2c268830df2760c0524cbac5d759921ba8c8e82"}, + {file = "websockets-11.0.3-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dcacf2c7a6c3a84e720d1bb2b543c675bf6c40e460300b628bab1b1efc7c034c"}, + {file = "websockets-11.0.3-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b67c6f5e5a401fc56394f191f00f9b3811fe843ee93f4a70df3c389d1adf857d"}, + {file = "websockets-11.0.3-pp39-pypy39_pp73-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1d5023a4b6a5b183dc838808087033ec5df77580485fc533e7dab2567851b0a4"}, + {file = "websockets-11.0.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:ed058398f55163a79bb9f06a90ef9ccc063b204bb346c4de78efc5d15abfe602"}, + {file = "websockets-11.0.3-py3-none-any.whl", hash = "sha256:6681ba9e7f8f3b19440921e99efbb40fc89f26cd71bf539e45d8c8a25c976dc6"}, + {file = "websockets-11.0.3.tar.gz", hash = "sha256:88fc51d9a26b10fc331be344f1781224a375b78488fc343620184e95a4b27016"}, +] + [[package]] name = "win32-setctime" version = "1.1.0" @@ -2066,4 +2327,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "d2b64390d1ea9038b6703b12060cdde1970b680a0ad891f24405323ff2ca0a60" +content-hash = "d696a09d54edbfc0ab52bd4e7b1ba09f3930ac5d3156df511cc3094ddb7d6ac5" diff --git a/server/pyproject.toml b/server/pyproject.toml index 13dddaf9..d2c5e8fa 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -18,6 +18,8 @@ sortedcontainers = "^2.4.0" loguru = "^0.7.0" pydantic-settings = "^2.0.2" structlog = "^23.1.0" +uvicorn = {extras = ["standard"], version = "^0.23.1"} +fastapi = "^0.100.1" [tool.poetry.group.dev.dependencies] diff --git a/server/reflector/app.py b/server/reflector/app.py new file mode 100644 index 00000000..31509ce9 --- /dev/null +++ b/server/reflector/app.py @@ -0,0 +1,15 @@ +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from reflector.views.rtc_offer import router as rtc_offer_router + +# build app +app = FastAPI() +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_methods=["*"], + allow_headers=["*"], +) + +# register views +app.include_router(rtc_offer_router) diff --git a/server/reflector/processors.py b/server/reflector/processors.py index 02f003bd..3827f068 100644 --- a/server/reflector/processors.py +++ b/server/reflector/processors.py @@ -80,8 +80,7 @@ class Processor: if callback: self.on(callback) self.uid = uuid4().hex - self.logger = (custom_logger or logger).bind( - processor=self.__class__.__name__) + self.logger = (custom_logger or logger).bind(processor=self.__class__.__name__) def set_pipeline(self, pipeline: "Pipeline"): self.logger = self.logger.bind(pipeline=pipeline.uid) @@ -107,6 +106,9 @@ class Processor: """ Register a callback to be called when data is emitted """ + # ensure callback is asynchronous + if not asyncio.iscoroutinefunction(callback): + raise ValueError("Callback must be a coroutine function") self._callbacks.append(callback) def off(self, callback): @@ -127,7 +129,10 @@ class Processor: The function returns the output of type `OUTPUT_TYPE` """ # logger.debug(f"{self.__class__.__name__} push") - return await self._push(data) + try: + return await self._push(data) + except Exception: + self.logger.exception("Error in push") async def flush(self): """ @@ -463,7 +468,6 @@ class Pipeline(Processor): logger.info("") - class FinalSummaryProcessor(Processor): """ Assemble all summary into a line-based json diff --git a/server/reflector/views/rtc_offer.py b/server/reflector/views/rtc_offer.py new file mode 100644 index 00000000..3976d3e2 --- /dev/null +++ b/server/reflector/views/rtc_offer.py @@ -0,0 +1,121 @@ +from fastapi import Request, APIRouter +from pydantic import BaseModel +from reflector.models import TranscriptionContext, TranscriptionOutput +from reflector.logger import logger +from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrack +from json import loads, dumps +import av +from reflector.processors import ( + Pipeline, + AudioChunkerProcessor, + AudioMergeProcessor, + AudioAutoTranscriptProcessor, + TranscriptLineProcessor, + TitleSummaryProcessor, + # FinalSummaryProcessor, + Transcript, + TitleSummary, +) + + +class AudioStreamTrack(MediaStreamTrack): + """ + An audio stream track. + """ + + kind = "audio" + + def __init__(self, ctx: TranscriptionContext, track): + super().__init__() + self.ctx = ctx + self.track = track + + async def recv(self) -> av.audio.frame.AudioFrame: + ctx = self.ctx + frame = await self.track.recv() + try: + await ctx.pipeline.push(frame) + except Exception as e: + ctx.logger.error("Pipeline error", error=e) + return frame + + +class RtcOffer(BaseModel): + sdp: str + type: str + + +sessions = [] +router = APIRouter() + + +@router.post("/offer") +async def rtc_offer(params: RtcOffer, request: Request): + # build an rtc session + offer = RTCSessionDescription(sdp=params.sdp, type=params.type) + + # client identification + peername = request.client + clientid = f"{peername[0]}:{peername[1]}" + ctx = TranscriptionContext(logger=logger.bind(client=clientid)) + + # build pipeline callback + async def on_transcript(transcript: Transcript): + ctx.logger.info("Transcript", transcript=transcript) + cmd = TranscriptionOutput(transcript.text) + # FIXME: send the result to the client async way + ctx.data_channel.send(dumps(cmd.get_result())) + + async def on_summary(summary: TitleSummary): + ctx.logger.info("Summary", summary=summary) + + # create a context for the whole rtc transaction + # add a customised logger to the context + ctx.pipeline = Pipeline( + AudioChunkerProcessor(), + AudioMergeProcessor(), + AudioAutoTranscriptProcessor.as_threaded(), + TranscriptLineProcessor(callback=on_transcript), + TitleSummaryProcessor.as_threaded(callback=on_summary), + # FinalSummaryProcessor.as_threaded( + # filename=result_fn, callback=on_final_summary + # ), + ) + + # handle RTC peer connection + pc = RTCPeerConnection() + + @pc.on("datachannel") + def on_datachannel(channel): + ctx.data_channel = channel + ctx.logger = ctx.logger.bind(channel=channel.label) + ctx.logger.info("Channel created by remote party") + + @channel.on("message") + def on_message(message: str): + ctx.logger.info(f"Message: {message}") + if loads(message)["cmd"] == "STOP": + # FIXME: flush the pipeline + pass + + if isinstance(message, str) and message.startswith("ping"): + channel.send("pong" + message[4:]) + + @pc.on("connectionstatechange") + async def on_connectionstatechange(): + ctx.logger.info(f"Connection state: {pc.connectionState}") + if pc.connectionState == "failed": + await pc.close() + + @pc.on("track") + def on_track(track): + ctx.logger.info(f"Track {track.kind} received") + pc.addTrack(AudioStreamTrack(ctx, track)) + + await pc.setRemoteDescription(offer) + + answer = await pc.createAnswer() + await pc.setLocalDescription(answer) + sessions.append(pc) + + return RtcOffer(sdp=pc.localDescription.sdp, type=pc.localDescription.type) From 42f1442e56e255a92371ec962c086dffccf6ab8d Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Tue, 1 Aug 2023 14:23:34 +0200 Subject: [PATCH 05/12] server: introduce LLM backends --- server/reflector/llm/__init__.py | 3 ++ server/reflector/llm/base.py | 58 +++++++++++++++++++++++++++ server/reflector/llm/llm_oobagooda.py | 18 +++++++++ server/reflector/llm/llm_openai.py | 44 ++++++++++++++++++++ server/reflector/settings.py | 9 +++-- 5 files changed, 129 insertions(+), 3 deletions(-) create mode 100644 server/reflector/llm/__init__.py create mode 100644 server/reflector/llm/base.py create mode 100644 server/reflector/llm/llm_oobagooda.py create mode 100644 server/reflector/llm/llm_openai.py diff --git a/server/reflector/llm/__init__.py b/server/reflector/llm/__init__.py new file mode 100644 index 00000000..fddf3919 --- /dev/null +++ b/server/reflector/llm/__init__.py @@ -0,0 +1,3 @@ +from .base import LLM # noqa: F401 +from . import llm_oobagooda # noqa: F401 +from . import llm_openai # noqa: F401 diff --git a/server/reflector/llm/base.py b/server/reflector/llm/base.py new file mode 100644 index 00000000..55c0de5f --- /dev/null +++ b/server/reflector/llm/base.py @@ -0,0 +1,58 @@ +from reflector.logger import logger +from reflector.settings import settings +import asyncio +import json + + +class LLM: + _registry = {} + + @classmethod + def register(cls, name, klass): + cls._registry[name] = klass + + @classmethod + def instance(cls): + """ + Return an instance depending on the settings. + Settings used: + + - `LLM_BACKEND`: key of the backend, defaults to `oobagooda` + - `LLM_URL`: url of the backend + """ + return cls._registry[settings.LLM_BACKEND]() + + async def generate( + self, prompt: str, retry_count: int = 5, retry_interval: int = 1, **kwargs + ) -> dict: + while retry_count > 0: + try: + result = await self._generate(prompt=prompt, **kwargs) + break + except Exception: + logger.exception("Failed to call llm") + retry_count -= 1 + await asyncio.sleep(retry_interval) + + if retry_count == 0: + raise Exception("Failed to call llm after retrying") + + if isinstance(result, str): + result = self._parse_json(result) + + return result + + async def _generate(self, prompt: str, **kwargs) -> str: + raise NotImplementedError + + def _parse_json(self, result: str) -> dict: + result = result.strip() + # try detecting code block if exist + if result.startswith("```json\n") and result.endswith("```"): + result = result[8:-3] + elif result.startswith("```\n") and result.endswith("```"): + result = result[4:-3] + print(">>>", result) + return json.loads(result.strip()) + + diff --git a/server/reflector/llm/llm_oobagooda.py b/server/reflector/llm/llm_oobagooda.py new file mode 100644 index 00000000..be7d8133 --- /dev/null +++ b/server/reflector/llm/llm_oobagooda.py @@ -0,0 +1,18 @@ +from reflector.llm.base import LLM +from reflector.settings import settings +import httpx + + +class OobagoodaLLM(LLM): + async def _generate(self, prompt: str, **kwargs): + async with httpx.AsyncClient() as client: + response = await client.post( + settings.LLM_URL, + headers={"Content-Type": "application/json"}, + json={"prompt": prompt}, + ) + response.raise_for_status() + return response.json() + + +LLM.register("oobagooda", OobagoodaLLM) diff --git a/server/reflector/llm/llm_openai.py b/server/reflector/llm/llm_openai.py new file mode 100644 index 00000000..d4c565d6 --- /dev/null +++ b/server/reflector/llm/llm_openai.py @@ -0,0 +1,44 @@ +from reflector.llm.base import LLM +from reflector.logger import logger +from reflector.settings import settings +import json +import httpx + + +class OpenAILLM(LLM): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.openai_key = settings.LLM_OPENAI_KEY + self.openai_url = settings.LLM_URL + self.openai_model = settings.LLM_OPENAI_MODEL + self.openai_temperature = settings.LLM_OPENAI_TEMPERATURE + self.timeout = settings.LLM_TIMEOUT + self.max_tokens = settings.LLM_MAX_TOKENS + logger.info(f"LLM use openai backend at {self.openai_url}") + + async def _generate(self, prompt: str, **kwargs) -> str: + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {self.openai_key}", + } + + logger.debug(f"LLM openai prompt: {prompt}") + + async with httpx.AsyncClient(timeout=self.timeout) as client: + response = await client.post( + self.openai_url, + headers=headers, + json={ + "model": self.openai_model, + "prompt": prompt, + "max_tokens": self.max_tokens, + "temperature": self.openai_temperature, + }, + ) + response.raise_for_status() + result = response.json() + logger.info(f"LLM openai result: {result}") + return result["choices"][0]["text"] + + +LLM.register("openai", OpenAILLM) diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 6bad2697..0b6f6df5 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -31,6 +31,12 @@ class Settings(BaseSettings): LLM_URL: str | None = None LLM_HOST: str = "localhost" LLM_PORT: int = 7860 + LLM_OPENAI_KEY: str | None = None + LLM_OPENAI_MODEL: str = "gpt-3.5-turbo" + LLM_OPENAI_TEMPERATURE: float = 0.7 + LLM_TIMEOUT: int = 90 + LLM_MAX_TOKENS: int = 1024 + LLM_TEMPERATURE: float = 0.7 # Storage STORAGE_BACKEND: str = "aws" @@ -38,8 +44,5 @@ class Settings(BaseSettings): STORAGE_AWS_SECRET_KEY: str = "" STORAGE_AWS_BUCKET: str = "" - # OpenAI - OPENAI_API_KEY: str = "" - settings = Settings() From bc55cfdea38fe9ba4976ddc5bc50f20f6da6047b Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Tue, 1 Aug 2023 14:24:01 +0200 Subject: [PATCH 06/12] processors: split processors into their own files --- server/reflector/processors.py | 542 ------------------ server/reflector/processors/__init__.py | 9 + server/reflector/processors/audio_chunker.py | 27 + server/reflector/processors/audio_merge.py | 47 ++ .../reflector/processors/audio_transcript.py | 22 + .../processors/audio_transcript_auto.py | 35 ++ .../processors/audio_transcript_whisper.py | 38 ++ server/reflector/processors/base.py | 178 ++++++ .../reflector/processors/transcript_liner.py | 47 ++ .../processors/transcript_summarizer.py | 31 + .../processors/transcript_topic_detector.py | 48 ++ server/reflector/processors/types.py | 65 +++ server/reflector/tools/process.py | 61 ++ server/reflector/views/rtc_offer.py | 13 +- 14 files changed, 614 insertions(+), 549 deletions(-) delete mode 100644 server/reflector/processors.py create mode 100644 server/reflector/processors/__init__.py create mode 100644 server/reflector/processors/audio_chunker.py create mode 100644 server/reflector/processors/audio_merge.py create mode 100644 server/reflector/processors/audio_transcript.py create mode 100644 server/reflector/processors/audio_transcript_auto.py create mode 100644 server/reflector/processors/audio_transcript_whisper.py create mode 100644 server/reflector/processors/base.py create mode 100644 server/reflector/processors/transcript_liner.py create mode 100644 server/reflector/processors/transcript_summarizer.py create mode 100644 server/reflector/processors/transcript_topic_detector.py create mode 100644 server/reflector/processors/types.py create mode 100644 server/reflector/tools/process.py diff --git a/server/reflector/processors.py b/server/reflector/processors.py deleted file mode 100644 index 3827f068..00000000 --- a/server/reflector/processors.py +++ /dev/null @@ -1,542 +0,0 @@ -from pathlib import Path -import av -import wave -from dataclasses import dataclass -from faster_whisper import WhisperModel -from reflector.models import TitleSummaryInput, ParseLLMResult -from reflector.settings import settings -from reflector.logger import logger -import httpx -import asyncio -import json -from uuid import uuid4 -from concurrent.futures import ThreadPoolExecutor - - -@dataclass -class AudioFile: - path: Path - sample_rate: int - channels: int - sample_width: int - timestamp: float = 0.0 - - def release(self): - self.path.unlink() - - -@dataclass -class Word: - text: str - start: float - end: float - - -@dataclass -class TitleSummary: - title: str - summary: str - - -@dataclass -class Transcript: - text: str = "" - words: list[Word] = None - - @property - def human_timestamp(self): - minutes = int(self.timestamp / 60) - seconds = int(self.timestamp % 60) - milliseconds = int((self.timestamp % 1) * 1000) - return f"{minutes:02d}:{seconds:02d}.{milliseconds:03d}" - - @property - def timestamp(self): - if not self.words: - raise ValueError("No words in transcript") - return self.words[0].start - - @property - def duration(self): - if not self.words: - raise ValueError("No words in transcript") - return self.words[-1].end - self.words[0].start - - def merge(self, other: "Transcript"): - if not self.words: - self.words = other.words - else: - self.words.extend(other.words) - self.text += other.text - - -class Processor: - INPUT_TYPE: type = None - OUTPUT_TYPE: type = None - - def __init__(self, callback=None, custom_logger=None): - self._processors = [] - self._callbacks = [] - if callback: - self.on(callback) - self.uid = uuid4().hex - self.logger = (custom_logger or logger).bind(processor=self.__class__.__name__) - - def set_pipeline(self, pipeline: "Pipeline"): - self.logger = self.logger.bind(pipeline=pipeline.uid) - - def connect(self, processor: "Processor"): - """ - Connect this processor output to another processor - """ - if processor.INPUT_TYPE != self.OUTPUT_TYPE: - raise ValueError( - f"Processor {processor} input type {processor.INPUT_TYPE} " - f"does not match {self.OUTPUT_TYPE}" - ) - self._processors.append(processor) - - def disconnect(self, processor: "Processor"): - """ - Disconnect this processor data from another processor - """ - self._processors.remove(processor) - - def on(self, callback): - """ - Register a callback to be called when data is emitted - """ - # ensure callback is asynchronous - if not asyncio.iscoroutinefunction(callback): - raise ValueError("Callback must be a coroutine function") - self._callbacks.append(callback) - - def off(self, callback): - """ - Unregister a callback to be called when data is emitted - """ - self._callbacks.remove(callback) - - async def emit(self, data): - for callback in self._callbacks: - await callback(data) - for processor in self._processors: - await processor.push(data) - - async def push(self, data): - """ - Push data to this processor. `data` must be of type `INPUT_TYPE` - The function returns the output of type `OUTPUT_TYPE` - """ - # logger.debug(f"{self.__class__.__name__} push") - try: - return await self._push(data) - except Exception: - self.logger.exception("Error in push") - - async def flush(self): - """ - Flush data to this processor - """ - # logger.debug(f"{self.__class__.__name__} flush") - return await self._flush() - - def describe(self, level=0): - logger.info(" " * level + self.__class__.__name__) - - async def _push(self, data): - raise NotImplementedError - - async def _flush(self): - pass - - @classmethod - def as_threaded(cls, *args, **kwargs): - """ - Return a single threaded processor where output is guaranteed - to be in order - """ - return ThreadedProcessor(cls(*args, **kwargs), max_workers=1) - - -class ThreadedProcessor(Processor): - """ - A processor that runs in a separate thread - """ - - def __init__(self, processor: Processor, max_workers=1): - super().__init__() - # FIXME: This is a hack to make sure that the processor is single threaded - # but if it is more than 1, then we need to make sure that the processor - # is emiting data in order - assert max_workers == 1 - self.processor = processor - self.INPUT_TYPE = processor.INPUT_TYPE - self.OUTPUT_TYPE = processor.OUTPUT_TYPE - self.executor = ThreadPoolExecutor(max_workers=max_workers) - self.queue = asyncio.Queue() - self.task = asyncio.get_running_loop().create_task(self.loop()) - - async def loop(self): - while True: - data = await self.queue.get() - try: - if data is None: - await self.processor.flush() - break - await self.processor.push(data) - finally: - self.queue.task_done() - - async def _push(self, data): - await self.queue.put(data) - - async def _flush(self): - await self.queue.put(None) - await self.queue.join() - - def connect(self, processor: Processor): - self.processor.connect(processor) - - def disconnect(self, processor: Processor): - self.processor.disconnect(processor) - - def on(self, callback): - self.processor.on(callback) - - def describe(self, level=0): - super().describe(level) - self.processor.describe(level + 1) - - -class AudioChunkerProcessor(Processor): - """ - Assemble audio frames into chunks - """ - - INPUT_TYPE = av.AudioFrame - OUTPUT_TYPE = list[av.AudioFrame] - - def __init__(self, max_frames=256): - super().__init__() - self.frames: list[av.AudioFrame] = [] - self.max_frames = max_frames - - async def _push(self, data: av.AudioFrame): - self.frames.append(data) - if len(self.frames) >= self.max_frames: - await self.flush() - - async def _flush(self): - frames = self.frames[:] - self.frames = [] - if frames: - await self.emit(frames) - - -class AudioMergeProcessor(Processor): - """ - Merge audio frame into a single file - """ - - INPUT_TYPE = list[av.AudioFrame] - OUTPUT_TYPE = AudioFile - - async def _push(self, data: list[av.AudioFrame]): - if not data: - return - - # get audio information from first frame - frame = data[0] - channels = len(frame.layout.channels) - sample_rate = frame.sample_rate - sample_width = frame.format.bytes - - # create audio file - from time import monotonic_ns - from uuid import uuid4 - - uu = uuid4().hex - path = Path(f"audio_{monotonic_ns()}_{uu}.wav") - with wave.open(path.as_posix(), "wb") as wf: - wf.setnchannels(channels) - wf.setsampwidth(sample_width) - wf.setframerate(sample_rate) - for frame in data: - wf.writeframes(frame.to_ndarray().tobytes()) - - # emit audio file - audiofile = AudioFile( - path=path, - sample_rate=sample_rate, - channels=channels, - sample_width=sample_width, - timestamp=data[0].pts * data[0].time_base, - ) - await self.emit(audiofile) - - -class AudioTranscriptProcessor(Processor): - """ - Transcript audio file - """ - - INPUT_TYPE = AudioFile - OUTPUT_TYPE = Transcript - - async def _push(self, data: AudioFile): - try: - result = await self._transcript(data) - if result: - await self.emit(result) - finally: - data.release() - - async def _transcript(self, data: AudioFile): - raise NotImplementedError - - -class AudioWhisperTranscriptProcessor(AudioTranscriptProcessor): - def __init__(self): - super().__init__() - self.model = WhisperModel( - "tiny", device="cpu", compute_type="float32", num_workers=12 - ) - - async def _transcript(self, data: AudioFile): - segments, _ = self.model.transcribe( - data.path.as_posix(), - language="en", - beam_size=5, - # condition_on_previous_text=True, - word_timestamps=True, - vad_filter=True, - vad_parameters={"min_silence_duration_ms": 500}, - ) - - if not segments: - return - - transcript = Transcript(words=[]) - segments = list(segments) - ts = data.timestamp - - for segment in segments: - transcript.text += segment.text - for word in segment.words: - transcript.words.append( - Word(text=word.word, start=ts + word.start, end=ts + word.end) - ) - - return transcript - - -class AudioAutoTranscriptProcessor(AudioTranscriptProcessor): - BACKENDS = { - "whisper": AudioWhisperTranscriptProcessor, - } - BACKEND_DEFAULT = "whisper" - - def __init__(self, backend=None, **kwargs): - super().__init__(**kwargs) - self.processor = self.BACKENDS[backend or self.BACKEND_DEFAULT]() - - def connect(self, processor: Processor): - self.processor.connect(processor) - - def disconnect(self, processor: Processor): - self.processor.disconnect(processor) - - def on(self, callback): - self.processor.on(callback) - - def off(self, callback): - self.processor.off(callback) - - async def _push(self, data: AudioFile): - return await self.processor._push(data) - - async def _flush(self): - return await self.processor._flush() - - -class TranscriptLineProcessor(Processor): - """ - Based on stream of transcript, assemble lines, remove duplicated words - """ - - INPUT_TYPE = Transcript - OUTPUT_TYPE = Transcript - - def __init__(self, max_text=1000, **kwargs): - super().__init__(**kwargs) - self.transcript = Transcript(words=[]) - self.max_text = max_text - - async def _push(self, data: Transcript): - # merge both transcript - self.transcript.merge(data) - - # check if a line is complete - if "." not in self.transcript.text: - # if the transcription text is still not too long, wait for more - if len(self.transcript.text) < self.max_text: - return - - # cut to the next . - partial = Transcript(words=[]) - for word in self.transcript.words[:]: - partial.text += word.text - partial.words.append(word) - if "." not in word.text: - continue - - # emit line - await self.emit(partial) - - # create new transcript - partial = Transcript(words=[]) - - self.transcript = partial - - async def _flush(self): - if self.transcript.words: - await self.emit(self.transcript) - - -class TitleSummaryProcessor(Processor): - """ - Detect topic and summary from the transcript - """ - - INPUT_TYPE = Transcript - OUTPUT_TYPE = TitleSummary - - async def _push(self, data: Transcript): - param = TitleSummaryInput(transcribed_time=data.timestamp, input_text=data.text) - - try: - # TODO: abstract LLM implementation and parsing - response = httpx.post( - settings.LLM_URL, headers=param.headers, json=param.data - ) - response.raise_for_status() - - result = ParseLLMResult(param=param, output=response.json()) - summary = TitleSummary(title=result.title, summary=result.description) - await self.emit(summary) - - except httpx.ConnectError as e: - self.logger.error(f"Failed to call llm: {e}") - - except Exception: - self.logger.exception("Failed to call llm") - - -class Pipeline(Processor): - """ - A pipeline of processors - """ - - INPUT_TYPE = None - OUTPUT_TYPE = None - - def __init__(self, *processors: Processor): - super().__init__() - self.processors = processors - - for processor in processors: - processor.set_pipeline(self) - - for i in range(len(processors) - 1): - processors[i].connect(processors[i + 1]) - - self.INPUT_TYPE = processors[0].INPUT_TYPE - self.OUTPUT_TYPE = processors[-1].OUTPUT_TYPE - - async def _push(self, data): - await self.processors[0].push(data) - - async def _flush(self): - for processor in self.processors: - await processor.flush() - - def describe(self, level=0): - logger.info(" " * level + "Pipeline:") - for processor in self.processors: - processor.describe(level + 1) - logger.info("") - - -class FinalSummaryProcessor(Processor): - """ - Assemble all summary into a line-based json - """ - - INPUT_TYPE = TitleSummary - OUTPUT_TYPE = Path - - def __init__(self, filename: Path, **kwargs): - super().__init__(**kwargs) - self.filename = filename - self.chunkcount = 0 - - async def _push(self, data: TitleSummary): - with open(self.filename, "a", encoding="utf8") as fd: - fd.write(json.dumps(data)) - self.chunkcount += 1 - - async def _flush(self): - if self.chunkcount == 0: - self.logger.warning("No summary to write") - return - self.logger.info(f"Writing to {self.filename}") - await self.emit(self.filename) - - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser() - parser.add_argument("source", help="Source file (mp3, wav, mp4...)") - args = parser.parse_args() - - async def main(): - async def on_transcript(transcript): - print(f"Transcript: [{transcript.human_timestamp}]: {transcript.text}") - - async def on_summary(summary): - print(f"Summary: {summary.title} - {summary.summary}") - - async def on_final_summary(path): - print(f"Final Summary: {path}") - - # transcription output - result_fn = Path(args.source).with_suffix(".jsonl") - - pipeline = Pipeline( - AudioChunkerProcessor(), - AudioMergeProcessor(), - AudioAutoTranscriptProcessor.as_threaded(), - TranscriptLineProcessor(callback=on_transcript), - TitleSummaryProcessor.as_threaded(callback=on_summary), - FinalSummaryProcessor.as_threaded( - filename=result_fn, callback=on_final_summary - ), - ) - pipeline.describe() - - # start processing audio - logger.info(f"Opening {args.source}") - container = av.open(args.source) - try: - logger.info("Start pushing audio into the pipeline") - for frame in container.decode(audio=0): - await pipeline.push(frame) - finally: - logger.info("Flushing the pipeline") - await pipeline.flush() - - logger.info("All done !") - - asyncio.run(main()) diff --git a/server/reflector/processors/__init__.py b/server/reflector/processors/__init__.py new file mode 100644 index 00000000..847db231 --- /dev/null +++ b/server/reflector/processors/__init__.py @@ -0,0 +1,9 @@ +from .base import Processor, ThreadedProcessor, Pipeline # noqa: F401 +from .types import AudioFile, Transcript, Word, TitleSummary # noqa: F401 +from .audio_chunker import AudioChunkerProcessor # noqa: F401 +from .audio_merge import AudioMergeProcessor # noqa: F401 +from .audio_transcript import AudioTranscriptProcessor # noqa: F401 +from .audio_transcript_auto import AudioTranscriptAutoProcessor # noqa: F401 +from .transcript_liner import TranscriptLinerProcessor # noqa: F401 +from .transcript_summarizer import TranscriptSummarizerProcessor # noqa: F401 +from .transcript_topic_detector import TranscriptTopicDetectorProcessor # noqa: F401 diff --git a/server/reflector/processors/audio_chunker.py b/server/reflector/processors/audio_chunker.py new file mode 100644 index 00000000..8ca132a0 --- /dev/null +++ b/server/reflector/processors/audio_chunker.py @@ -0,0 +1,27 @@ +from reflector.processors.base import Processor +import av + + +class AudioChunkerProcessor(Processor): + """ + Assemble audio frames into chunks + """ + + INPUT_TYPE = av.AudioFrame + OUTPUT_TYPE = list[av.AudioFrame] + + def __init__(self, max_frames=256): + super().__init__() + self.frames: list[av.AudioFrame] = [] + self.max_frames = max_frames + + async def _push(self, data: av.AudioFrame): + self.frames.append(data) + if len(self.frames) >= self.max_frames: + await self.flush() + + async def _flush(self): + frames = self.frames[:] + self.frames = [] + if frames: + await self.emit(frames) diff --git a/server/reflector/processors/audio_merge.py b/server/reflector/processors/audio_merge.py new file mode 100644 index 00000000..ac16676d --- /dev/null +++ b/server/reflector/processors/audio_merge.py @@ -0,0 +1,47 @@ +from reflector.processors.base import Processor +from reflector.processors.types import AudioFile +from pathlib import Path +import wave +import av + + +class AudioMergeProcessor(Processor): + """ + Merge audio frame into a single file + """ + + INPUT_TYPE = list[av.AudioFrame] + OUTPUT_TYPE = AudioFile + + async def _push(self, data: list[av.AudioFrame]): + if not data: + return + + # get audio information from first frame + frame = data[0] + channels = len(frame.layout.channels) + sample_rate = frame.sample_rate + sample_width = frame.format.bytes + + # create audio file + from time import monotonic_ns + from uuid import uuid4 + + uu = uuid4().hex + path = Path(f"audio_{monotonic_ns()}_{uu}.wav") + with wave.open(path.as_posix(), "wb") as wf: + wf.setnchannels(channels) + wf.setsampwidth(sample_width) + wf.setframerate(sample_rate) + for frame in data: + wf.writeframes(frame.to_ndarray().tobytes()) + + # emit audio file + audiofile = AudioFile( + path=path, + sample_rate=sample_rate, + channels=channels, + sample_width=sample_width, + timestamp=data[0].pts * data[0].time_base, + ) + await self.emit(audiofile) diff --git a/server/reflector/processors/audio_transcript.py b/server/reflector/processors/audio_transcript.py new file mode 100644 index 00000000..708b959e --- /dev/null +++ b/server/reflector/processors/audio_transcript.py @@ -0,0 +1,22 @@ +from reflector.processors.base import Processor +from reflector.processors.types import AudioFile, Transcript + + +class AudioTranscriptProcessor(Processor): + """ + Transcript audio file + """ + + INPUT_TYPE = AudioFile + OUTPUT_TYPE = Transcript + + async def _push(self, data: AudioFile): + try: + result = await self._transcript(data) + if result: + await self.emit(result) + finally: + data.release() + + async def _transcript(self, data: AudioFile): + raise NotImplementedError diff --git a/server/reflector/processors/audio_transcript_auto.py b/server/reflector/processors/audio_transcript_auto.py new file mode 100644 index 00000000..0ece84f3 --- /dev/null +++ b/server/reflector/processors/audio_transcript_auto.py @@ -0,0 +1,35 @@ +from reflector.processors.base import Processor +from reflector.processors.audio_transcript import AudioTranscriptProcessor +from reflector.processors.audio_transcript_whisper import ( + AudioTranscriptWhisperProcessor, +) +from reflector.processors.types import AudioFile + + +class AudioTranscriptAutoProcessor(AudioTranscriptProcessor): + BACKENDS = { + "whisper": AudioTranscriptWhisperProcessor, + } + BACKEND_DEFAULT = "whisper" + + def __init__(self, backend=None, **kwargs): + super().__init__(**kwargs) + self.processor = self.BACKENDS[backend or self.BACKEND_DEFAULT]() + + def connect(self, processor: Processor): + self.processor.connect(processor) + + def disconnect(self, processor: Processor): + self.processor.disconnect(processor) + + def on(self, callback): + self.processor.on(callback) + + def off(self, callback): + self.processor.off(callback) + + async def _push(self, data: AudioFile): + return await self.processor._push(data) + + async def _flush(self): + return await self.processor._flush() diff --git a/server/reflector/processors/audio_transcript_whisper.py b/server/reflector/processors/audio_transcript_whisper.py new file mode 100644 index 00000000..9a85e9bf --- /dev/null +++ b/server/reflector/processors/audio_transcript_whisper.py @@ -0,0 +1,38 @@ +from reflector.processors.audio_transcript import AudioTranscriptProcessor +from reflector.processors.types import AudioFile, Transcript, Word +from faster_whisper import WhisperModel + + +class AudioTranscriptWhisperProcessor(AudioTranscriptProcessor): + def __init__(self): + super().__init__() + self.model = WhisperModel( + "tiny", device="cpu", compute_type="float32", num_workers=12 + ) + + async def _transcript(self, data: AudioFile): + segments, _ = self.model.transcribe( + data.path.as_posix(), + language="en", + beam_size=5, + # condition_on_previous_text=True, + word_timestamps=True, + vad_filter=True, + vad_parameters={"min_silence_duration_ms": 500}, + ) + + if not segments: + return + + transcript = Transcript(words=[]) + segments = list(segments) + ts = data.timestamp + + for segment in segments: + transcript.text += segment.text + for word in segment.words: + transcript.words.append( + Word(text=word.word, start=ts + word.start, end=ts + word.end) + ) + + return transcript diff --git a/server/reflector/processors/base.py b/server/reflector/processors/base.py new file mode 100644 index 00000000..8e616364 --- /dev/null +++ b/server/reflector/processors/base.py @@ -0,0 +1,178 @@ +from reflector.logger import logger +from uuid import uuid4 +from concurrent.futures import ThreadPoolExecutor +import asyncio + + +class Processor: + INPUT_TYPE: type = None + OUTPUT_TYPE: type = None + + def __init__(self, callback=None, custom_logger=None): + self._processors = [] + self._callbacks = [] + if callback: + self.on(callback) + self.uid = uuid4().hex + self.logger = (custom_logger or logger).bind(processor=self.__class__.__name__) + + def set_pipeline(self, pipeline: "Pipeline"): + self.logger = self.logger.bind(pipeline=pipeline.uid) + + def connect(self, processor: "Processor"): + """ + Connect this processor output to another processor + """ + if processor.INPUT_TYPE != self.OUTPUT_TYPE: + raise ValueError( + f"Processor {processor} input type {processor.INPUT_TYPE} " + f"does not match {self.OUTPUT_TYPE}" + ) + self._processors.append(processor) + + def disconnect(self, processor: "Processor"): + """ + Disconnect this processor data from another processor + """ + self._processors.remove(processor) + + def on(self, callback): + """ + Register a callback to be called when data is emitted + """ + # ensure callback is asynchronous + if not asyncio.iscoroutinefunction(callback): + raise ValueError("Callback must be a coroutine function") + self._callbacks.append(callback) + + def off(self, callback): + """ + Unregister a callback to be called when data is emitted + """ + self._callbacks.remove(callback) + + async def emit(self, data): + for callback in self._callbacks: + await callback(data) + for processor in self._processors: + await processor.push(data) + + async def push(self, data): + """ + Push data to this processor. `data` must be of type `INPUT_TYPE` + The function returns the output of type `OUTPUT_TYPE` + """ + # logger.debug(f"{self.__class__.__name__} push") + try: + return await self._push(data) + except Exception: + self.logger.exception("Error in push") + + async def flush(self): + """ + Flush data to this processor + """ + # logger.debug(f"{self.__class__.__name__} flush") + return await self._flush() + + def describe(self, level=0): + logger.info(" " * level + self.__class__.__name__) + + async def _push(self, data): + raise NotImplementedError + + async def _flush(self): + pass + + @classmethod + def as_threaded(cls, *args, **kwargs): + """ + Return a single threaded processor where output is guaranteed + to be in order + """ + return ThreadedProcessor(cls(*args, **kwargs), max_workers=1) + + +class ThreadedProcessor(Processor): + """ + A processor that runs in a separate thread + """ + + def __init__(self, processor: Processor, max_workers=1): + super().__init__() + # FIXME: This is a hack to make sure that the processor is single threaded + # but if it is more than 1, then we need to make sure that the processor + # is emiting data in order + assert max_workers == 1 + self.processor = processor + self.INPUT_TYPE = processor.INPUT_TYPE + self.OUTPUT_TYPE = processor.OUTPUT_TYPE + self.executor = ThreadPoolExecutor(max_workers=max_workers) + self.queue = asyncio.Queue() + self.task = asyncio.get_running_loop().create_task(self.loop()) + + async def loop(self): + while True: + data = await self.queue.get() + try: + if data is None: + await self.processor.flush() + break + await self.processor.push(data) + finally: + self.queue.task_done() + + async def _push(self, data): + await self.queue.put(data) + + async def _flush(self): + await self.queue.put(None) + await self.queue.join() + + def connect(self, processor: Processor): + self.processor.connect(processor) + + def disconnect(self, processor: Processor): + self.processor.disconnect(processor) + + def on(self, callback): + self.processor.on(callback) + + def describe(self, level=0): + super().describe(level) + self.processor.describe(level + 1) + + +class Pipeline(Processor): + """ + A pipeline of processors + """ + + INPUT_TYPE = None + OUTPUT_TYPE = None + + def __init__(self, *processors: Processor): + super().__init__() + self.processors = processors + + for processor in processors: + processor.set_pipeline(self) + + for i in range(len(processors) - 1): + processors[i].connect(processors[i + 1]) + + self.INPUT_TYPE = processors[0].INPUT_TYPE + self.OUTPUT_TYPE = processors[-1].OUTPUT_TYPE + + async def _push(self, data): + await self.processors[0].push(data) + + async def _flush(self): + for processor in self.processors: + await processor.flush() + + def describe(self, level=0): + logger.info(" " * level + "Pipeline:") + for processor in self.processors: + processor.describe(level + 1) + logger.info("") diff --git a/server/reflector/processors/transcript_liner.py b/server/reflector/processors/transcript_liner.py new file mode 100644 index 00000000..cca5e6a2 --- /dev/null +++ b/server/reflector/processors/transcript_liner.py @@ -0,0 +1,47 @@ +from reflector.processors.base import Processor +from reflector.processors.types import Transcript + + +class TranscriptLinerProcessor(Processor): + """ + Based on stream of transcript, assemble and remove duplicated words + then cut per lines. + """ + + INPUT_TYPE = Transcript + OUTPUT_TYPE = Transcript + + def __init__(self, max_text=1000, **kwargs): + super().__init__(**kwargs) + self.transcript = Transcript(words=[]) + self.max_text = max_text + + async def _push(self, data: Transcript): + # merge both transcript + self.transcript.merge(data) + + # check if a line is complete + if "." not in self.transcript.text: + # if the transcription text is still not too long, wait for more + if len(self.transcript.text) < self.max_text: + return + + # cut to the next . + partial = Transcript(words=[]) + for word in self.transcript.words[:]: + partial.text += word.text + partial.words.append(word) + if "." not in word.text: + continue + + # emit line + await self.emit(partial) + + # create new transcript + partial = Transcript(words=[]) + + self.transcript = partial + + async def _flush(self): + if self.transcript.words: + await self.emit(self.transcript) diff --git a/server/reflector/processors/transcript_summarizer.py b/server/reflector/processors/transcript_summarizer.py new file mode 100644 index 00000000..4e149602 --- /dev/null +++ b/server/reflector/processors/transcript_summarizer.py @@ -0,0 +1,31 @@ +from reflector.processors.base import Processor +from reflector.processors.types import TitleSummary +from pathlib import Path +import json + + +class TranscriptSummarizerProcessor(Processor): + """ + Assemble all summary into a line-based json + """ + + INPUT_TYPE = TitleSummary + OUTPUT_TYPE = Path + + def __init__(self, filename: Path, **kwargs): + super().__init__(**kwargs) + self.filename = filename + self.chunkcount = 0 + + async def _push(self, data: TitleSummary): + with open(self.filename, "a", encoding="utf8") as fd: + fd.write(json.dumps(data)) + self.chunkcount += 1 + + async def _flush(self): + if self.chunkcount == 0: + self.logger.warning("No summary to write") + return + self.logger.info(f"Writing to {self.filename}") + await self.emit(self.filename) + diff --git a/server/reflector/processors/transcript_topic_detector.py b/server/reflector/processors/transcript_topic_detector.py new file mode 100644 index 00000000..31a88882 --- /dev/null +++ b/server/reflector/processors/transcript_topic_detector.py @@ -0,0 +1,48 @@ +from reflector.processors.base import Processor +from reflector.processors.types import Transcript, TitleSummary +from reflector.llm import LLM + + +class TranscriptTopicDetectorProcessor(Processor): + """ + Detect topic and summary from the transcript + """ + + INPUT_TYPE = Transcript + OUTPUT_TYPE = TitleSummary + + PROMPT = """ + ### Human: + Create a JSON object as response.The JSON object must have 2 fields: + i) title and ii) summary.For the title field,generate a short title + for the given text. For the summary field, summarize the given text + in three sentences. + + {input_text} + + ### Assistant: + """ + + def __init__(self, min_transcript_length=25, **kwargs): + super().__init__(**kwargs) + self.transcript = None + self.min_transcript_length = min_transcript_length + self.llm = LLM.instance() + + async def _push(self, data: Transcript): + if self.transcript is None: + self.transcript = data + else: + self.transcript.merge(data) + if len(self.transcript.text) < self.min_transcript_length: + return + await self.flush() + + async def _flush(self): + if not self.transcript: + return + prompt = self.PROMPT.format(input_text=self.transcript.text) + result = await self.llm.generate(prompt=prompt) + summary = TitleSummary(title=result["title"], summary=result["summary"]) + self.transcript = None + await self.emit(summary) diff --git a/server/reflector/processors/types.py b/server/reflector/processors/types.py new file mode 100644 index 00000000..1a89c127 --- /dev/null +++ b/server/reflector/processors/types.py @@ -0,0 +1,65 @@ +from dataclasses import dataclass +from pathlib import Path + + +@dataclass +class AudioFile: + path: Path + sample_rate: int + channels: int + sample_width: int + timestamp: float = 0.0 + + def release(self): + self.path.unlink() + + +@dataclass +class Word: + text: str + start: float + end: float + + +@dataclass +class TitleSummary: + title: str + summary: str + + +@dataclass +class Transcript: + text: str = "" + words: list[Word] = None + + @property + def human_timestamp(self): + minutes = int(self.timestamp / 60) + seconds = int(self.timestamp % 60) + milliseconds = int((self.timestamp % 1) * 1000) + return f"{minutes:02d}:{seconds:02d}.{milliseconds:03d}" + + @property + def timestamp(self): + if not self.words: + raise ValueError("No words in transcript") + return self.words[0].start + + @property + def duration(self): + if not self.words: + raise ValueError("No words in transcript") + return self.words[-1].end - self.words[0].start + + def merge(self, other: "Transcript"): + if not self.words: + self.words = other.words + else: + self.words.extend(other.words) + self.text += other.text + + def clone(self): + words = [ + Word(text=word.text, start=word.start, end=word.end) for word in self.words + ] + return Transcript(text=self.text, words=words) diff --git a/server/reflector/tools/process.py b/server/reflector/tools/process.py new file mode 100644 index 00000000..aefbc153 --- /dev/null +++ b/server/reflector/tools/process.py @@ -0,0 +1,61 @@ +from pathlib import Path +import av +from reflector.logger import logger +from reflector.processors import ( + Pipeline, + AudioChunkerProcessor, + AudioMergeProcessor, + AudioTranscriptAutoProcessor, + TranscriptLinerProcessor, + TranscriptTopicDetectorProcessor, + TranscriptSummarizerProcessor, +) +import asyncio + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("source", help="Source file (mp3, wav, mp4...)") + args = parser.parse_args() + + async def main(): + async def on_transcript(transcript): + print(f"Transcript: [{transcript.human_timestamp}]: {transcript.text}") + + async def on_summary(summary): + print(f"Summary: {summary.title} - {summary.summary}") + + async def on_final_summary(path): + print(f"Final Summary: {path}") + + # transcription output + result_fn = Path(args.source).with_suffix(".jsonl") + + pipeline = Pipeline( + AudioChunkerProcessor(), + AudioMergeProcessor(), + AudioTranscriptAutoProcessor.as_threaded(), + TranscriptLinerProcessor(callback=on_transcript), + TranscriptTopicDetectorProcessor.as_threaded(callback=on_summary), + TranscriptSummarizerProcessor.as_threaded( + filename=result_fn, callback=on_final_summary + ), + ) + pipeline.describe() + + # start processing audio + logger.info(f"Opening {args.source}") + container = av.open(args.source) + try: + logger.info("Start pushing audio into the pipeline") + for frame in container.decode(audio=0): + await pipeline.push(frame) + finally: + logger.info("Flushing the pipeline") + await pipeline.flush() + + logger.info("All done !") + + asyncio.run(main()) diff --git a/server/reflector/views/rtc_offer.py b/server/reflector/views/rtc_offer.py index 3976d3e2..03f9decf 100644 --- a/server/reflector/views/rtc_offer.py +++ b/server/reflector/views/rtc_offer.py @@ -9,10 +9,9 @@ from reflector.processors import ( Pipeline, AudioChunkerProcessor, AudioMergeProcessor, - AudioAutoTranscriptProcessor, - TranscriptLineProcessor, - TitleSummaryProcessor, - # FinalSummaryProcessor, + AudioTranscriptAutoProcessor, + TranscriptLinerProcessor, + TranscriptTopicDetectorProcessor, Transcript, TitleSummary, ) @@ -74,9 +73,9 @@ async def rtc_offer(params: RtcOffer, request: Request): ctx.pipeline = Pipeline( AudioChunkerProcessor(), AudioMergeProcessor(), - AudioAutoTranscriptProcessor.as_threaded(), - TranscriptLineProcessor(callback=on_transcript), - TitleSummaryProcessor.as_threaded(callback=on_summary), + AudioTranscriptAutoProcessor.as_threaded(), + TranscriptLinerProcessor(callback=on_transcript), + TranscriptTopicDetectorProcessor.as_threaded(callback=on_summary), # FinalSummaryProcessor.as_threaded( # filename=result_fn, callback=on_final_summary # ), From 1f8e4200fda10dbd3b2452f3af41739ead71dcd5 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Tue, 1 Aug 2023 16:05:48 +0200 Subject: [PATCH 07/12] tests: rework tests and fixes bugs along the way --- .github/workflows/test_server.yml | 16 +++- server/reflector/llm/base.py | 19 ++-- server/reflector/llm/llm_openai.py | 1 - .../processors/audio_transcript_auto.py | 2 +- .../processors/audio_transcript_whisper.py | 6 +- .../processors/transcript_summarizer.py | 1 - server/reflector/tools/process.py | 86 ++++++++++--------- server/reflector/views/rtc_offer.py | 4 +- server/tests/test_processors_pipeline.py | 45 ++++++++++ 9 files changed, 126 insertions(+), 54 deletions(-) create mode 100644 server/tests/test_processors_pipeline.py diff --git a/.github/workflows/test_server.yml b/.github/workflows/test_server.yml index 2029a77c..2651f798 100644 --- a/.github/workflows/test_server.yml +++ b/.github/workflows/test_server.yml @@ -39,7 +39,7 @@ jobs: cd server poetry run python -m pytest -v tests - pep8: + formatting: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 @@ -53,6 +53,20 @@ jobs: cd server black --check reflector tests + linting: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Set up Python 3.x + uses: actions/setup-python@v4 + with: + python-version: 3.11 + - name: Validate formatting + run: | + pip install ruff + cd server + ruff reflector tests + docker: runs-on: ubuntu-latest steps: diff --git a/server/reflector/llm/base.py b/server/reflector/llm/base.py index 55c0de5f..6b5dbdc9 100644 --- a/server/reflector/llm/base.py +++ b/server/reflector/llm/base.py @@ -2,6 +2,7 @@ from reflector.logger import logger from reflector.settings import settings import asyncio import json +import re class LLM: @@ -48,11 +49,15 @@ class LLM: def _parse_json(self, result: str) -> dict: result = result.strip() # try detecting code block if exist - if result.startswith("```json\n") and result.endswith("```"): - result = result[8:-3] - elif result.startswith("```\n") and result.endswith("```"): - result = result[4:-3] - print(">>>", result) + # starts with ```json\n, ends with ``` + # or starts with ```\n, ends with ``` + # or starts with \n```javascript\n, ends with ``` + + regex = r"```(json|javascript|)?(.*)```" + matches = re.findall(regex, result.strip(), re.MULTILINE | re.DOTALL) + if not matches: + return result + + # we have a match, try to parse it + result = matches[0][1] return json.loads(result.strip()) - - diff --git a/server/reflector/llm/llm_openai.py b/server/reflector/llm/llm_openai.py index d4c565d6..517902e9 100644 --- a/server/reflector/llm/llm_openai.py +++ b/server/reflector/llm/llm_openai.py @@ -1,7 +1,6 @@ from reflector.llm.base import LLM from reflector.logger import logger from reflector.settings import settings -import json import httpx diff --git a/server/reflector/processors/audio_transcript_auto.py b/server/reflector/processors/audio_transcript_auto.py index 0ece84f3..9b792009 100644 --- a/server/reflector/processors/audio_transcript_auto.py +++ b/server/reflector/processors/audio_transcript_auto.py @@ -13,8 +13,8 @@ class AudioTranscriptAutoProcessor(AudioTranscriptProcessor): BACKEND_DEFAULT = "whisper" def __init__(self, backend=None, **kwargs): - super().__init__(**kwargs) self.processor = self.BACKENDS[backend or self.BACKEND_DEFAULT]() + super().__init__(**kwargs) def connect(self, processor: Processor): self.processor.connect(processor) diff --git a/server/reflector/processors/audio_transcript_whisper.py b/server/reflector/processors/audio_transcript_whisper.py index 9a85e9bf..0b768543 100644 --- a/server/reflector/processors/audio_transcript_whisper.py +++ b/server/reflector/processors/audio_transcript_whisper.py @@ -32,7 +32,11 @@ class AudioTranscriptWhisperProcessor(AudioTranscriptProcessor): transcript.text += segment.text for word in segment.words: transcript.words.append( - Word(text=word.word, start=ts + word.start, end=ts + word.end) + Word( + text=word.word, + start=round(ts + word.start, 3), + end=round(ts + word.end, 3), + ) ) return transcript diff --git a/server/reflector/processors/transcript_summarizer.py b/server/reflector/processors/transcript_summarizer.py index 4e149602..e4e55e9e 100644 --- a/server/reflector/processors/transcript_summarizer.py +++ b/server/reflector/processors/transcript_summarizer.py @@ -28,4 +28,3 @@ class TranscriptSummarizerProcessor(Processor): return self.logger.info(f"Writing to {self.filename}") await self.emit(self.filename) - diff --git a/server/reflector/tools/process.py b/server/reflector/tools/process.py index aefbc153..0c8611d8 100644 --- a/server/reflector/tools/process.py +++ b/server/reflector/tools/process.py @@ -1,4 +1,3 @@ -from pathlib import Path import av from reflector.logger import logger from reflector.processors import ( @@ -8,11 +7,48 @@ from reflector.processors import ( AudioTranscriptAutoProcessor, TranscriptLinerProcessor, TranscriptTopicDetectorProcessor, - TranscriptSummarizerProcessor, + # TranscriptSummarizerProcessor, ) import asyncio +async def process_audio_file(filename, event_callback): + async def on_transcript(data): + await event_callback("transcript", data) + + async def on_topic(data): + await event_callback("topic", data) + + async def on_summary(data): + await event_callback("summary", data) + + # transcription output + pipeline = Pipeline( + AudioChunkerProcessor(), + AudioMergeProcessor(), + AudioTranscriptAutoProcessor.as_threaded(), + TranscriptLinerProcessor(callback=on_transcript), + TranscriptTopicDetectorProcessor.as_threaded(callback=on_topic), + # TranscriptSummarizerProcessor.as_threaded( + # callback=on_summary + # ), + ) + pipeline.describe() + + # start processing audio + logger.info(f"Opening {filename}") + container = av.open(filename) + try: + logger.info("Start pushing audio into the pipeline") + for frame in container.decode(audio=0): + await pipeline.push(frame) + finally: + logger.info("Flushing the pipeline") + await pipeline.flush() + + logger.info("All done !") + + if __name__ == "__main__": import argparse @@ -20,42 +56,12 @@ if __name__ == "__main__": parser.add_argument("source", help="Source file (mp3, wav, mp4...)") args = parser.parse_args() - async def main(): - async def on_transcript(transcript): - print(f"Transcript: [{transcript.human_timestamp}]: {transcript.text}") + async def event_callback(event, data): + if event == "transcript": + print(f"Transcript[{data.human_timestamp}]: {data.text}") + elif event == "topic": + print(f"Topic: {data}") + elif event == "summary": + print(f"Summary: {data}") - async def on_summary(summary): - print(f"Summary: {summary.title} - {summary.summary}") - - async def on_final_summary(path): - print(f"Final Summary: {path}") - - # transcription output - result_fn = Path(args.source).with_suffix(".jsonl") - - pipeline = Pipeline( - AudioChunkerProcessor(), - AudioMergeProcessor(), - AudioTranscriptAutoProcessor.as_threaded(), - TranscriptLinerProcessor(callback=on_transcript), - TranscriptTopicDetectorProcessor.as_threaded(callback=on_summary), - TranscriptSummarizerProcessor.as_threaded( - filename=result_fn, callback=on_final_summary - ), - ) - pipeline.describe() - - # start processing audio - logger.info(f"Opening {args.source}") - container = av.open(args.source) - try: - logger.info("Start pushing audio into the pipeline") - for frame in container.decode(audio=0): - await pipeline.push(frame) - finally: - logger.info("Flushing the pipeline") - await pipeline.flush() - - logger.info("All done !") - - asyncio.run(main()) + asyncio.run(process_audio_file(args.source, event_callback)) diff --git a/server/reflector/views/rtc_offer.py b/server/reflector/views/rtc_offer.py index 03f9decf..f462a37a 100644 --- a/server/reflector/views/rtc_offer.py +++ b/server/reflector/views/rtc_offer.py @@ -73,8 +73,8 @@ async def rtc_offer(params: RtcOffer, request: Request): ctx.pipeline = Pipeline( AudioChunkerProcessor(), AudioMergeProcessor(), - AudioTranscriptAutoProcessor.as_threaded(), - TranscriptLinerProcessor(callback=on_transcript), + AudioTranscriptAutoProcessor.as_threaded(callback=on_transcript), + TranscriptLinerProcessor(), TranscriptTopicDetectorProcessor.as_threaded(callback=on_summary), # FinalSummaryProcessor.as_threaded( # filename=result_fn, callback=on_final_summary diff --git a/server/tests/test_processors_pipeline.py b/server/tests/test_processors_pipeline.py new file mode 100644 index 00000000..312f76d6 --- /dev/null +++ b/server/tests/test_processors_pipeline.py @@ -0,0 +1,45 @@ +import pytest +from unittest.mock import patch + + +@pytest.mark.asyncio +async def test_basic_process(event_loop): + # goal is to start the server, and send rtc audio to it + # validate the events received + from reflector.tools.process import process_audio_file + from reflector.settings import settings + from reflector.llm.base import LLM + from pathlib import Path + + # use an LLM test backend + settings.LLM_BACKEND = "test" + + class LLMTest(LLM): + async def _generate(self, prompt: str, **kwargs) -> str: + return { + "title": "TITLE", + "summary": "SUMMARY", + } + + LLM.register("test", LLMTest) + + # event callback + marks = { + "transcript": 0, + "topic": 0, + # "summary": 0, + } + + async def event_callback(event, data): + print(f"{event}: {data}") + marks[event] += 1 + + # invoke the process and capture events + path = Path(__file__).parent / "records" / "test_mathieu_hello.wav" + await process_audio_file(path.as_posix(), event_callback) + print(marks) + + # validate the events + assert marks["transcript"] == 5 + assert marks["topic"] == 4 + # assert marks["summary"] == 1 From 99c9ba3e6be8b70b1fdfa22664fcbe1bc455579d Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Tue, 1 Aug 2023 16:08:15 +0200 Subject: [PATCH 08/12] tests: remove unused mock --- server/tests/test_processors_pipeline.py | 1 - 1 file changed, 1 deletion(-) diff --git a/server/tests/test_processors_pipeline.py b/server/tests/test_processors_pipeline.py index 312f76d6..a807b6fd 100644 --- a/server/tests/test_processors_pipeline.py +++ b/server/tests/test_processors_pipeline.py @@ -1,5 +1,4 @@ import pytest -from unittest.mock import patch @pytest.mark.asyncio From d320558cc9bfcdcb392b51b6cc53b51337dce3c8 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Tue, 1 Aug 2023 19:12:51 +0200 Subject: [PATCH 09/12] server/rtc: fix topic output --- server/reflector/llm/base.py | 12 ++++--- .../processors/transcript_topic_detector.py | 11 +++++-- server/reflector/processors/types.py | 15 +++++---- server/reflector/views/rtc_offer.py | 32 ++++++++++++++++++- 4 files changed, 57 insertions(+), 13 deletions(-) diff --git a/server/reflector/llm/base.py b/server/reflector/llm/base.py index 6b5dbdc9..031e38aa 100644 --- a/server/reflector/llm/base.py +++ b/server/reflector/llm/base.py @@ -55,9 +55,13 @@ class LLM: regex = r"```(json|javascript|)?(.*)```" matches = re.findall(regex, result.strip(), re.MULTILINE | re.DOTALL) - if not matches: - return result + if matches: + result = matches[0][1] + + else: + # maybe the prompt has been started with ```json + # so if text ends with ```, just remove it and use it as json + if result.endswith("```"): + result = result[:-3] - # we have a match, try to parse it - result = matches[0][1] return json.loads(result.strip()) diff --git a/server/reflector/processors/transcript_topic_detector.py b/server/reflector/processors/transcript_topic_detector.py index 31a88882..b602d61e 100644 --- a/server/reflector/processors/transcript_topic_detector.py +++ b/server/reflector/processors/transcript_topic_detector.py @@ -21,9 +21,10 @@ class TranscriptTopicDetectorProcessor(Processor): {input_text} ### Assistant: + """ - def __init__(self, min_transcript_length=25, **kwargs): + def __init__(self, min_transcript_length=100, **kwargs): super().__init__(**kwargs) self.transcript = None self.min_transcript_length = min_transcript_length @@ -43,6 +44,12 @@ class TranscriptTopicDetectorProcessor(Processor): return prompt = self.PROMPT.format(input_text=self.transcript.text) result = await self.llm.generate(prompt=prompt) - summary = TitleSummary(title=result["title"], summary=result["summary"]) + summary = TitleSummary( + title=result["title"], + summary=result["summary"], + timestamp=self.transcript.timestamp, + duration=self.transcript.duration, + transcript=self.transcript, + ) self.transcript = None await self.emit(summary) diff --git a/server/reflector/processors/types.py b/server/reflector/processors/types.py index 1a89c127..c4c840dd 100644 --- a/server/reflector/processors/types.py +++ b/server/reflector/processors/types.py @@ -21,12 +21,6 @@ class Word: end: float -@dataclass -class TitleSummary: - title: str - summary: str - - @dataclass class Transcript: text: str = "" @@ -63,3 +57,12 @@ class Transcript: Word(text=word.text, start=word.start, end=word.end) for word in self.words ] return Transcript(text=self.text, words=words) + + +@dataclass +class TitleSummary: + title: str + summary: str + timestamp: float + duration: float + transcript: Transcript diff --git a/server/reflector/views/rtc_offer.py b/server/reflector/views/rtc_offer.py index f462a37a..77007035 100644 --- a/server/reflector/views/rtc_offer.py +++ b/server/reflector/views/rtc_offer.py @@ -1,6 +1,11 @@ from fastapi import Request, APIRouter from pydantic import BaseModel -from reflector.models import TranscriptionContext, TranscriptionOutput +from reflector.models import ( + TranscriptionContext, + TranscriptionOutput, + TitleSummaryOutput, + IncrementalResult, +) from reflector.logger import logger from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrack from json import loads, dumps @@ -67,6 +72,31 @@ async def rtc_offer(params: RtcOffer, request: Request): async def on_summary(summary: TitleSummary): ctx.logger.info("Summary", summary=summary) + # XXX doesnt work as expected, IncrementalResult is not serializable + # and previous implementation assume output of oobagooda + # result = TitleSummaryOutput( + # [ + # IncrementalResult( + # title=summary.title, + # desc=summary.summary, + # transcript=summary.transcript.text, + # timestamp=summary.timestamp, + # ) + # ] + # ) + result = { + "cmd": "UPDATE_TOPICS", + "topics": [ + { + "title": summary.title, + "timestamp": summary.timestamp, + "transcript": summary.transcript.text, + "desc": summary.summary, + } + ], + } + + ctx.data_channel.send(dumps(result)) # create a context for the whole rtc transaction # add a customised logger to the context From 74d2974ed285212d2541d52d5033851002cc8a20 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Tue, 1 Aug 2023 20:09:05 +0200 Subject: [PATCH 10/12] server: fixes latest implementation details on rtc offer and fastapi --- server/reflector/app.py | 15 +++- server/reflector/events.py | 2 + server/reflector/processors/__init__.py | 4 +- .../processors/transcript_final_summary.py | 30 +++++++ .../processors/transcript_summarizer.py | 30 ------- server/reflector/processors/types.py | 6 ++ server/reflector/views/rtc_offer.py | 90 +++++++++++-------- 7 files changed, 105 insertions(+), 72 deletions(-) create mode 100644 server/reflector/events.py create mode 100644 server/reflector/processors/transcript_final_summary.py delete mode 100644 server/reflector/processors/transcript_summarizer.py diff --git a/server/reflector/app.py b/server/reflector/app.py index 31509ce9..f40af489 100644 --- a/server/reflector/app.py +++ b/server/reflector/app.py @@ -1,9 +1,22 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from reflector.views.rtc_offer import router as rtc_offer_router +from reflector.events import subscribers_startup, subscribers_shutdown +from contextlib import asynccontextmanager + + +# lifespan events +@asynccontextmanager +async def lifespan(app: FastAPI): + for func in subscribers_startup: + await func() + yield + for func in subscribers_shutdown: + await func() + # build app -app = FastAPI() +app = FastAPI(lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], diff --git a/server/reflector/events.py b/server/reflector/events.py new file mode 100644 index 00000000..221ab4e5 --- /dev/null +++ b/server/reflector/events.py @@ -0,0 +1,2 @@ +subscribers_startup = [] +subscribers_shutdown = [] diff --git a/server/reflector/processors/__init__.py b/server/reflector/processors/__init__.py index 847db231..da890513 100644 --- a/server/reflector/processors/__init__.py +++ b/server/reflector/processors/__init__.py @@ -1,9 +1,9 @@ from .base import Processor, ThreadedProcessor, Pipeline # noqa: F401 -from .types import AudioFile, Transcript, Word, TitleSummary # noqa: F401 +from .types import AudioFile, Transcript, Word, TitleSummary, FinalSummary # noqa: F401 from .audio_chunker import AudioChunkerProcessor # noqa: F401 from .audio_merge import AudioMergeProcessor # noqa: F401 from .audio_transcript import AudioTranscriptProcessor # noqa: F401 from .audio_transcript_auto import AudioTranscriptAutoProcessor # noqa: F401 from .transcript_liner import TranscriptLinerProcessor # noqa: F401 -from .transcript_summarizer import TranscriptSummarizerProcessor # noqa: F401 from .transcript_topic_detector import TranscriptTopicDetectorProcessor # noqa: F401 +from .transcript_final_summary import TranscriptFinalSummaryProcessor # noqa: F401 diff --git a/server/reflector/processors/transcript_final_summary.py b/server/reflector/processors/transcript_final_summary.py new file mode 100644 index 00000000..208548f5 --- /dev/null +++ b/server/reflector/processors/transcript_final_summary.py @@ -0,0 +1,30 @@ +from reflector.processors.base import Processor +from reflector.processors.types import TitleSummary, FinalSummary + + +class TranscriptFinalSummaryProcessor(Processor): + """ + Assemble all summary into a line-based json + """ + + INPUT_TYPE = TitleSummary + OUTPUT_TYPE = FinalSummary + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.chunks: list[TitleSummary] = [] + + async def _push(self, data: TitleSummary): + self.chunks.append(data) + + async def _flush(self): + if not self.chunks: + self.logger.warning("No summary to output") + return + + # FIXME improve final summary + result = "\n".join([chunk.summary for chunk in self.chunks]) + last_chunk = self.chunks[-1] + duration = last_chunk.timestamp + last_chunk.duration + + await self.emit(FinalSummary(summary=result, duration=duration)) diff --git a/server/reflector/processors/transcript_summarizer.py b/server/reflector/processors/transcript_summarizer.py deleted file mode 100644 index e4e55e9e..00000000 --- a/server/reflector/processors/transcript_summarizer.py +++ /dev/null @@ -1,30 +0,0 @@ -from reflector.processors.base import Processor -from reflector.processors.types import TitleSummary -from pathlib import Path -import json - - -class TranscriptSummarizerProcessor(Processor): - """ - Assemble all summary into a line-based json - """ - - INPUT_TYPE = TitleSummary - OUTPUT_TYPE = Path - - def __init__(self, filename: Path, **kwargs): - super().__init__(**kwargs) - self.filename = filename - self.chunkcount = 0 - - async def _push(self, data: TitleSummary): - with open(self.filename, "a", encoding="utf8") as fd: - fd.write(json.dumps(data)) - self.chunkcount += 1 - - async def _flush(self): - if self.chunkcount == 0: - self.logger.warning("No summary to write") - return - self.logger.info(f"Writing to {self.filename}") - await self.emit(self.filename) diff --git a/server/reflector/processors/types.py b/server/reflector/processors/types.py index c4c840dd..d762c708 100644 --- a/server/reflector/processors/types.py +++ b/server/reflector/processors/types.py @@ -66,3 +66,9 @@ class TitleSummary: timestamp: float duration: float transcript: Transcript + + +@dataclass +class FinalSummary: + summary: str + duration: float diff --git a/server/reflector/views/rtc_offer.py b/server/reflector/views/rtc_offer.py index 77007035..c4eaddd8 100644 --- a/server/reflector/views/rtc_offer.py +++ b/server/reflector/views/rtc_offer.py @@ -1,10 +1,10 @@ +import asyncio from fastapi import Request, APIRouter +from reflector.events import subscribers_shutdown from pydantic import BaseModel from reflector.models import ( TranscriptionContext, TranscriptionOutput, - TitleSummaryOutput, - IncrementalResult, ) from reflector.logger import logger from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrack @@ -17,10 +17,15 @@ from reflector.processors import ( AudioTranscriptAutoProcessor, TranscriptLinerProcessor, TranscriptTopicDetectorProcessor, + TranscriptFinalSummaryProcessor, Transcript, TitleSummary, + FinalSummary, ) +sessions = [] +router = APIRouter() + class AudioStreamTrack(MediaStreamTrack): """ @@ -49,10 +54,6 @@ class RtcOffer(BaseModel): type: str -sessions = [] -router = APIRouter() - - @router.post("/offer") async def rtc_offer(params: RtcOffer, request: Request): # build an rtc session @@ -62,40 +63,38 @@ async def rtc_offer(params: RtcOffer, request: Request): peername = request.client clientid = f"{peername[0]}:{peername[1]}" ctx = TranscriptionContext(logger=logger.bind(client=clientid)) + ctx.topics = [] # build pipeline callback async def on_transcript(transcript: Transcript): ctx.logger.info("Transcript", transcript=transcript) - cmd = TranscriptionOutput(transcript.text) - # FIXME: send the result to the client async way - ctx.data_channel.send(dumps(cmd.get_result())) - - async def on_summary(summary: TitleSummary): - ctx.logger.info("Summary", summary=summary) - # XXX doesnt work as expected, IncrementalResult is not serializable - # and previous implementation assume output of oobagooda - # result = TitleSummaryOutput( - # [ - # IncrementalResult( - # title=summary.title, - # desc=summary.summary, - # transcript=summary.transcript.text, - # timestamp=summary.timestamp, - # ) - # ] - # ) result = { - "cmd": "UPDATE_TOPICS", - "topics": [ - { - "title": summary.title, - "timestamp": summary.timestamp, - "transcript": summary.transcript.text, - "desc": summary.summary, - } - ], + "cmd": "SHOW_TRANSCRIPTION", + "text": transcript.text, } + ctx.data_channel.send(dumps(result)) + async def on_topic(summary: TitleSummary): + # FIXME: make it incremental with the frontend, not send everything + ctx.logger.info("Summary", summary=summary) + ctx.topics.append( + { + "title": summary.title, + "timestamp": summary.timestamp, + "transcript": summary.transcript.text, + "desc": summary.summary, + } + ) + result = {"cmd": "UPDATE_TOPICS", "topics": ctx.topics} + ctx.data_channel.send(dumps(result)) + + async def on_final_summary(summary: FinalSummary): + ctx.logger.info("FinalSummary", final_summary=summary) + result = { + "cmd": "DISPLAY_FINAL_SUMMARY", + "summary": summary.summary, + "duration": summary.duration, + } ctx.data_channel.send(dumps(result)) # create a context for the whole rtc transaction @@ -105,15 +104,19 @@ async def rtc_offer(params: RtcOffer, request: Request): AudioMergeProcessor(), AudioTranscriptAutoProcessor.as_threaded(callback=on_transcript), TranscriptLinerProcessor(), - TranscriptTopicDetectorProcessor.as_threaded(callback=on_summary), - # FinalSummaryProcessor.as_threaded( - # filename=result_fn, callback=on_final_summary - # ), + TranscriptTopicDetectorProcessor.as_threaded(callback=on_topic), + TranscriptFinalSummaryProcessor.as_threaded(callback=on_final_summary), ) # handle RTC peer connection pc = RTCPeerConnection() + async def flush_pipeline_and_quit(): + ctx.logger.info("Flushing pipeline") + await ctx.pipeline.flush() + ctx.logger.debug("Closing peer connection") + await pc.close() + @pc.on("datachannel") def on_datachannel(channel): ctx.data_channel = channel @@ -124,8 +127,8 @@ async def rtc_offer(params: RtcOffer, request: Request): def on_message(message: str): ctx.logger.info(f"Message: {message}") if loads(message)["cmd"] == "STOP": - # FIXME: flush the pipeline - pass + ctx.logger.debug("STOP command received") + asyncio.get_event_loop().create_task(flush_pipeline_and_quit()) if isinstance(message, str) and message.startswith("ping"): channel.send("pong" + message[4:]) @@ -148,3 +151,12 @@ async def rtc_offer(params: RtcOffer, request: Request): sessions.append(pc) return RtcOffer(sdp=pc.localDescription.sdp, type=pc.localDescription.type) + + +@subscribers_shutdown.append +async def rtc_clean_sessions(): + logger.info("Closing all RTC sessions") + for pc in sessions: + logger.debug(f"Closing session {pc}") + await pc.close() + sessions.clear() From cb198927b0fc03105c73d6399e7b7584d8028363 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Tue, 1 Aug 2023 20:13:16 +0200 Subject: [PATCH 11/12] server: add default uvicorn server + update readme --- server/README.md | 10 ++++++++-- server/reflector/app.py | 4 ++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/server/README.md b/server/README.md index c3004bcb..a70fb7e7 100644 --- a/server/README.md +++ b/server/README.md @@ -33,15 +33,21 @@ Then run the server: ``` # With a config.ini -$ poetry run python -m reflector.server +$ poetry run python -m reflector.app # Within a poetry env $ poetry shell -$ LLM_URL=http://.../api/v1/generate python -m reflector.server +$ LLM_URL=http://.../api/v1/generate python -m reflector.app ``` +### Using local GPT4All + +- Start GPT4All with any model you want +- Ensure the API server is activated in GPT4all +- Run with: `LLM_BACKEND=openai LLM_URL=http://localhost:4891/v1/completions LLM_OPENAI_MODEL="GPT4All Falcon" python -m reflector.app` + # Old documentation This is the code base for the Reflector demo (formerly called agenda-talk-diff) for the leads : Troy Web Consulting diff --git a/server/reflector/app.py b/server/reflector/app.py index f40af489..4a10b685 100644 --- a/server/reflector/app.py +++ b/server/reflector/app.py @@ -26,3 +26,7 @@ app.add_middleware( # register views app.include_router(rtc_offer_router) + +if __name__ == "__main__": + import uvicorn + uvicorn.run("reflector.app:app", host="0.0.0.0", port=1250, reload=True) From e4f2b785cae8493270502b14f5d6f48a7fd47b56 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Tue, 1 Aug 2023 20:16:54 +0200 Subject: [PATCH 12/12] server: update process tools and tests --- server/README.md | 7 +++++++ server/reflector/app.py | 1 + server/reflector/tools/process.py | 6 ++---- server/reflector/views/rtc_offer.py | 5 +---- server/tests/test_processors_pipeline.py | 6 +++--- 5 files changed, 14 insertions(+), 11 deletions(-) diff --git a/server/README.md b/server/README.md index a70fb7e7..b0ca34b7 100644 --- a/server/README.md +++ b/server/README.md @@ -48,6 +48,13 @@ $ LLM_URL=http://.../api/v1/generate python -m reflector.app - Ensure the API server is activated in GPT4all - Run with: `LLM_BACKEND=openai LLM_URL=http://localhost:4891/v1/completions LLM_OPENAI_MODEL="GPT4All Falcon" python -m reflector.app` + +### Using local files + +``` +poetry run python -m reflector.tools.process path/to/audio.wav +``` + # Old documentation This is the code base for the Reflector demo (formerly called agenda-talk-diff) for the leads : Troy Web Consulting diff --git a/server/reflector/app.py b/server/reflector/app.py index 4a10b685..b07ef54e 100644 --- a/server/reflector/app.py +++ b/server/reflector/app.py @@ -29,4 +29,5 @@ app.include_router(rtc_offer_router) if __name__ == "__main__": import uvicorn + uvicorn.run("reflector.app:app", host="0.0.0.0", port=1250, reload=True) diff --git a/server/reflector/tools/process.py b/server/reflector/tools/process.py index 0c8611d8..071907ea 100644 --- a/server/reflector/tools/process.py +++ b/server/reflector/tools/process.py @@ -7,7 +7,7 @@ from reflector.processors import ( AudioTranscriptAutoProcessor, TranscriptLinerProcessor, TranscriptTopicDetectorProcessor, - # TranscriptSummarizerProcessor, + TranscriptFinalSummaryProcessor, ) import asyncio @@ -29,9 +29,7 @@ async def process_audio_file(filename, event_callback): AudioTranscriptAutoProcessor.as_threaded(), TranscriptLinerProcessor(callback=on_transcript), TranscriptTopicDetectorProcessor.as_threaded(callback=on_topic), - # TranscriptSummarizerProcessor.as_threaded( - # callback=on_summary - # ), + TranscriptFinalSummaryProcessor.as_threaded(callback=on_summary), ) pipeline.describe() diff --git a/server/reflector/views/rtc_offer.py b/server/reflector/views/rtc_offer.py index c4eaddd8..11c98009 100644 --- a/server/reflector/views/rtc_offer.py +++ b/server/reflector/views/rtc_offer.py @@ -2,10 +2,7 @@ import asyncio from fastapi import Request, APIRouter from reflector.events import subscribers_shutdown from pydantic import BaseModel -from reflector.models import ( - TranscriptionContext, - TranscriptionOutput, -) +from reflector.models import TranscriptionContext from reflector.logger import logger from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrack from json import loads, dumps diff --git a/server/tests/test_processors_pipeline.py b/server/tests/test_processors_pipeline.py index a807b6fd..ab836550 100644 --- a/server/tests/test_processors_pipeline.py +++ b/server/tests/test_processors_pipeline.py @@ -26,7 +26,7 @@ async def test_basic_process(event_loop): marks = { "transcript": 0, "topic": 0, - # "summary": 0, + "summary": 0, } async def event_callback(event, data): @@ -40,5 +40,5 @@ async def test_basic_process(event_loop): # validate the events assert marks["transcript"] == 5 - assert marks["topic"] == 4 - # assert marks["summary"] == 1 + assert marks["topic"] == 2 + assert marks["summary"] == 1