diff --git a/server/reflector/processors.py b/server/reflector/processors.py index 8d26fcb7..02f003bd 100644 --- a/server/reflector/processors.py +++ b/server/reflector/processors.py @@ -9,6 +9,7 @@ from reflector.logger import logger import httpx import asyncio import json +from uuid import uuid4 from concurrent.futures import ThreadPoolExecutor @@ -73,11 +74,17 @@ class Processor: INPUT_TYPE: type = None OUTPUT_TYPE: type = None - def __init__(self, callback=None): + def __init__(self, callback=None, custom_logger=None): self._processors = [] self._callbacks = [] if callback: self.on(callback) + self.uid = uuid4().hex + self.logger = (custom_logger or logger).bind( + processor=self.__class__.__name__) + + def set_pipeline(self, pipeline: "Pipeline"): + self.logger = self.logger.bind(pipeline=pipeline.uid) def connect(self, processor: "Processor"): """ @@ -129,6 +136,9 @@ class Processor: # logger.debug(f"{self.__class__.__name__} flush") return await self._flush() + def describe(self, level=0): + logger.info(" " * level + self.__class__.__name__) + async def _push(self, data): raise NotImplementedError @@ -189,6 +199,10 @@ class ThreadedProcessor(Processor): def on(self, callback): self.processor.on(callback) + def describe(self, level=0): + super().describe(level) + self.processor.describe(level + 1) + class AudioChunkerProcessor(Processor): """ @@ -312,6 +326,35 @@ class AudioWhisperTranscriptProcessor(AudioTranscriptProcessor): return transcript +class AudioAutoTranscriptProcessor(AudioTranscriptProcessor): + BACKENDS = { + "whisper": AudioWhisperTranscriptProcessor, + } + BACKEND_DEFAULT = "whisper" + + def __init__(self, backend=None, **kwargs): + super().__init__(**kwargs) + self.processor = self.BACKENDS[backend or self.BACKEND_DEFAULT]() + + def connect(self, processor: Processor): + self.processor.connect(processor) + + def disconnect(self, processor: Processor): + self.processor.disconnect(processor) + + def on(self, callback): + self.processor.on(callback) + + def off(self, callback): + self.processor.off(callback) + + async def _push(self, data: AudioFile): + return await self.processor._push(data) + + async def _flush(self): + return await self.processor._flush() + + class TranscriptLineProcessor(Processor): """ Based on stream of transcript, assemble lines, remove duplicated words @@ -379,10 +422,10 @@ class TitleSummaryProcessor(Processor): await self.emit(summary) except httpx.ConnectError as e: - logger.error(f"Failed to call llm: {e}") + self.logger.error(f"Failed to call llm: {e}") except Exception: - logger.exception("Failed to call llm") + self.logger.exception("Failed to call llm") class Pipeline(Processor): @@ -393,10 +436,13 @@ class Pipeline(Processor): INPUT_TYPE = None OUTPUT_TYPE = None - def __init__(self, *processors): + def __init__(self, *processors: Processor): super().__init__() self.processors = processors + for processor in processors: + processor.set_pipeline(self) + for i in range(len(processors) - 1): processors[i].connect(processors[i + 1]) @@ -410,6 +456,13 @@ class Pipeline(Processor): for processor in self.processors: await processor.flush() + def describe(self, level=0): + logger.info(" " * level + "Pipeline:") + for processor in self.processors: + processor.describe(level + 1) + logger.info("") + + class FinalSummaryProcessor(Processor): """ @@ -422,13 +475,18 @@ class FinalSummaryProcessor(Processor): def __init__(self, filename: Path, **kwargs): super().__init__(**kwargs) self.filename = filename + self.chunkcount = 0 async def _push(self, data: TitleSummary): with open(self.filename, "a", encoding="utf8") as fd: fd.write(json.dumps(data)) + self.chunkcount += 1 async def _flush(self): - logger.info(f"Writing to {self.filename}") + if self.chunkcount == 0: + self.logger.warning("No summary to write") + return + self.logger.info(f"Writing to {self.filename}") await self.emit(self.filename) @@ -455,16 +513,17 @@ if __name__ == "__main__": pipeline = Pipeline( AudioChunkerProcessor(), AudioMergeProcessor(), - AudioWhisperTranscriptProcessor().as_threaded(), + AudioAutoTranscriptProcessor.as_threaded(), TranscriptLineProcessor(callback=on_transcript), TitleSummaryProcessor.as_threaded(callback=on_summary), FinalSummaryProcessor.as_threaded( filename=result_fn, callback=on_final_summary ), ) + pipeline.describe() # start processing audio - logger.info(f"Opening{args.source}") + logger.info(f"Opening {args.source}") container = av.open(args.source) try: logger.info("Start pushing audio into the pipeline")