diff --git a/server/reflector/llm/base.py b/server/reflector/llm/base.py index cc5b7245..7f65a4bc 100644 --- a/server/reflector/llm/base.py +++ b/server/reflector/llm/base.py @@ -1,6 +1,6 @@ -from reflector.logger import logger from reflector.settings import settings from reflector.utils.retry import retry +from reflector.logger import logger as reflector_logger import importlib import json import re @@ -29,15 +29,18 @@ class LLM: importlib.import_module(module_name) return cls._registry[name]() - async def generate(self, prompt: str, **kwargs) -> dict: + async def generate(self, prompt: str, logger: reflector_logger, **kwargs) -> dict: + logger.info("LLM generate", prompt=repr(prompt)) try: result = await retry(self._generate)(prompt=prompt, **kwargs) except Exception: logger.exception("Failed to call llm after retrying") raise + logger.debug("LLM result [raw]", result=repr(result)) if isinstance(result, str): result = self._parse_json(result) + logger.debug("LLM result [parsed]", result=repr(result)) return result diff --git a/server/reflector/llm/llm_openai.py b/server/reflector/llm/llm_openai.py index 517902e9..03189afc 100644 --- a/server/reflector/llm/llm_openai.py +++ b/server/reflector/llm/llm_openai.py @@ -21,7 +21,6 @@ class OpenAILLM(LLM): "Authorization": f"Bearer {self.openai_key}", } - logger.debug(f"LLM openai prompt: {prompt}") async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.post( @@ -36,7 +35,6 @@ class OpenAILLM(LLM): ) response.raise_for_status() result = response.json() - logger.info(f"LLM openai result: {result}") return result["choices"][0]["text"] diff --git a/server/reflector/processors/base.py b/server/reflector/processors/base.py index 8e616364..7d11590d 100644 --- a/server/reflector/processors/base.py +++ b/server/reflector/processors/base.py @@ -17,7 +17,8 @@ class Processor: self.logger = (custom_logger or logger).bind(processor=self.__class__.__name__) def set_pipeline(self, pipeline: "Pipeline"): - self.logger = self.logger.bind(pipeline=pipeline.uid) + # if pipeline is used, pipeline logger will be used instead + self.logger = pipeline.logger.bind(processor=self.__class__.__name__) def connect(self, processor: "Processor"): """ @@ -111,6 +112,10 @@ class ThreadedProcessor(Processor): self.queue = asyncio.Queue() self.task = asyncio.get_running_loop().create_task(self.loop()) + def set_pipeline(self, pipeline: "Pipeline"): + super().set_pipeline(pipeline) + self.processor.set_pipeline(pipeline) + async def loop(self): while True: data = await self.queue.get() @@ -153,6 +158,9 @@ class Pipeline(Processor): def __init__(self, *processors: Processor): super().__init__() + self.logger = logger.bind(pipeline=self.uid) + self.logger.info("Pipeline created") + self.processors = processors for processor in processors: @@ -168,8 +176,10 @@ class Pipeline(Processor): await self.processors[0].push(data) async def _flush(self): + self.logger.debug("Pipeline flushing") for processor in self.processors: await processor.flush() + self.logger.info("Pipeline flushed") def describe(self, level=0): logger.info(" " * level + "Pipeline:") diff --git a/server/reflector/processors/transcript_topic_detector.py b/server/reflector/processors/transcript_topic_detector.py index f185831a..6bcc2497 100644 --- a/server/reflector/processors/transcript_topic_detector.py +++ b/server/reflector/processors/transcript_topic_detector.py @@ -1,5 +1,6 @@ from reflector.processors.base import Processor from reflector.processors.types import Transcript, TitleSummary +from reflector.utils.retry import retry from reflector.llm import LLM @@ -42,8 +43,10 @@ class TranscriptTopicDetectorProcessor(Processor): async def _flush(self): if not self.transcript: return - prompt = self.PROMPT.format(input_text=self.transcript.text) - result = await self.llm.generate(prompt=prompt) + text = self.transcript.text + self.logger.info(f"Detect topic on {len(text)} length transcript") + prompt = self.PROMPT.format(input_text=text) + result = await retry(self.llm.generate)(prompt=prompt, logger=self.logger) summary = TitleSummary( title=result["title"], summary=result["summary"], diff --git a/server/reflector/processors/types.py b/server/reflector/processors/types.py index d762c708..353c8aa4 100644 --- a/server/reflector/processors/types.py +++ b/server/reflector/processors/types.py @@ -67,6 +67,13 @@ class TitleSummary: duration: float transcript: Transcript + @property + def human_timestamp(self): + minutes = int(self.timestamp / 60) + seconds = int(self.timestamp % 60) + milliseconds = int((self.timestamp % 1) * 1000) + return f"{minutes:02d}:{seconds:02d}.{milliseconds:03d}" + @dataclass class FinalSummary: diff --git a/server/reflector/tools/process.py b/server/reflector/tools/process.py index 85febaff..68d49b0c 100644 --- a/server/reflector/tools/process.py +++ b/server/reflector/tools/process.py @@ -65,9 +65,11 @@ if __name__ == "__main__": if event == "transcript": print(f"Transcript[{data.human_timestamp}]: {data.text}") elif event == "topic": - print(f"Topic: {data}") + print(f"Topic[{data.human_timestamp}]: title={data.title}") + print(f"Topic[{data.human_timestamp}]: summary={data.summary}") elif event == "summary": - print(f"Summary: {data}") + print(f"Summary: duration={data.duration}") + print(f"Summary: summary={data.summary}") asyncio.run( process_audio_file(