processors: customize logger and auto describe

This commit is contained in:
2023-07-29 01:32:37 +02:00
parent 6f61863136
commit 3908c1ca53

View File

@@ -9,6 +9,7 @@ from reflector.logger import logger
import httpx
import asyncio
import json
from uuid import uuid4
from concurrent.futures import ThreadPoolExecutor
@@ -73,11 +74,17 @@ class Processor:
INPUT_TYPE: type = None
OUTPUT_TYPE: type = None
def __init__(self, callback=None):
def __init__(self, callback=None, custom_logger=None):
self._processors = []
self._callbacks = []
if callback:
self.on(callback)
self.uid = uuid4().hex
self.logger = (custom_logger or logger).bind(
processor=self.__class__.__name__)
def set_pipeline(self, pipeline: "Pipeline"):
self.logger = self.logger.bind(pipeline=pipeline.uid)
def connect(self, processor: "Processor"):
"""
@@ -129,6 +136,9 @@ class Processor:
# logger.debug(f"{self.__class__.__name__} flush")
return await self._flush()
def describe(self, level=0):
logger.info(" " * level + self.__class__.__name__)
async def _push(self, data):
raise NotImplementedError
@@ -189,6 +199,10 @@ class ThreadedProcessor(Processor):
def on(self, callback):
self.processor.on(callback)
def describe(self, level=0):
super().describe(level)
self.processor.describe(level + 1)
class AudioChunkerProcessor(Processor):
"""
@@ -312,6 +326,35 @@ class AudioWhisperTranscriptProcessor(AudioTranscriptProcessor):
return transcript
class AudioAutoTranscriptProcessor(AudioTranscriptProcessor):
BACKENDS = {
"whisper": AudioWhisperTranscriptProcessor,
}
BACKEND_DEFAULT = "whisper"
def __init__(self, backend=None, **kwargs):
super().__init__(**kwargs)
self.processor = self.BACKENDS[backend or self.BACKEND_DEFAULT]()
def connect(self, processor: Processor):
self.processor.connect(processor)
def disconnect(self, processor: Processor):
self.processor.disconnect(processor)
def on(self, callback):
self.processor.on(callback)
def off(self, callback):
self.processor.off(callback)
async def _push(self, data: AudioFile):
return await self.processor._push(data)
async def _flush(self):
return await self.processor._flush()
class TranscriptLineProcessor(Processor):
"""
Based on stream of transcript, assemble lines, remove duplicated words
@@ -379,10 +422,10 @@ class TitleSummaryProcessor(Processor):
await self.emit(summary)
except httpx.ConnectError as e:
logger.error(f"Failed to call llm: {e}")
self.logger.error(f"Failed to call llm: {e}")
except Exception:
logger.exception("Failed to call llm")
self.logger.exception("Failed to call llm")
class Pipeline(Processor):
@@ -393,10 +436,13 @@ class Pipeline(Processor):
INPUT_TYPE = None
OUTPUT_TYPE = None
def __init__(self, *processors):
def __init__(self, *processors: Processor):
super().__init__()
self.processors = processors
for processor in processors:
processor.set_pipeline(self)
for i in range(len(processors) - 1):
processors[i].connect(processors[i + 1])
@@ -410,6 +456,13 @@ class Pipeline(Processor):
for processor in self.processors:
await processor.flush()
def describe(self, level=0):
logger.info(" " * level + "Pipeline:")
for processor in self.processors:
processor.describe(level + 1)
logger.info("")
class FinalSummaryProcessor(Processor):
"""
@@ -422,13 +475,18 @@ class FinalSummaryProcessor(Processor):
def __init__(self, filename: Path, **kwargs):
super().__init__(**kwargs)
self.filename = filename
self.chunkcount = 0
async def _push(self, data: TitleSummary):
with open(self.filename, "a", encoding="utf8") as fd:
fd.write(json.dumps(data))
self.chunkcount += 1
async def _flush(self):
logger.info(f"Writing to {self.filename}")
if self.chunkcount == 0:
self.logger.warning("No summary to write")
return
self.logger.info(f"Writing to {self.filename}")
await self.emit(self.filename)
@@ -455,16 +513,17 @@ if __name__ == "__main__":
pipeline = Pipeline(
AudioChunkerProcessor(),
AudioMergeProcessor(),
AudioWhisperTranscriptProcessor().as_threaded(),
AudioAutoTranscriptProcessor.as_threaded(),
TranscriptLineProcessor(callback=on_transcript),
TitleSummaryProcessor.as_threaded(callback=on_summary),
FinalSummaryProcessor.as_threaded(
filename=result_fn, callback=on_final_summary
),
)
pipeline.describe()
# start processing audio
logger.info(f"Opening{args.source}")
logger.info(f"Opening {args.source}")
container = av.open(args.source)
try:
logger.info("Start pushing audio into the pipeline")