diff --git a/server/poetry.lock b/server/poetry.lock index 868ce4fc..24823ed7 100644 --- a/server/poetry.lock +++ b/server/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. [[package]] name = "aioboto3" @@ -2061,6 +2061,35 @@ files = [ dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] +[[package]] +name = "prometheus-client" +version = "0.17.1" +description = "Python client for the Prometheus monitoring system." +optional = false +python-versions = ">=3.6" +files = [ + {file = "prometheus_client-0.17.1-py3-none-any.whl", hash = "sha256:e537f37160f6807b8202a6fc4764cdd19bac5480ddd3e0d463c3002b34462101"}, + {file = "prometheus_client-0.17.1.tar.gz", hash = "sha256:21e674f39831ae3f8acde238afd9a27a37d0d2fb5a28ea094f0ce25d2cbf2091"}, +] + +[package.extras] +twisted = ["twisted"] + +[[package]] +name = "prometheus-fastapi-instrumentator" +version = "6.1.0" +description = "Instrument your FastAPI with Prometheus metrics." +optional = false +python-versions = ">=3.7.0,<4.0.0" +files = [ + {file = "prometheus_fastapi_instrumentator-6.1.0-py3-none-any.whl", hash = "sha256:2279ac1cf5b9566a4c3a07f78c9c5ee19648ed90976ab87d73d672abc1bfa017"}, + {file = "prometheus_fastapi_instrumentator-6.1.0.tar.gz", hash = "sha256:1820d7a90389ce100f7d1285495ead388818ae0882e761c1f3e6e62a410bdf13"}, +] + +[package.dependencies] +fastapi = ">=0.38.1,<1.0.0" +prometheus-client = ">=0.8.0,<1.0.0" + [[package]] name = "protobuf" version = "4.23.4" @@ -2805,7 +2834,7 @@ files = [ ] [package.dependencies] -greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\")"} +greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")"} [package.extras] aiomysql = ["aiomysql", "greenlet (!=0.4.17)"] @@ -3395,4 +3424,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "912184d26e5c8916fad9e969697ead9043a8688559063dfca4674ffcd5e2230d" +content-hash = "1f6c44eb5498ef1bfe3cc661746baf997ddd4888c83921fab74eea3c5f04dc51" diff --git a/server/pyproject.toml b/server/pyproject.toml index c0538789..edced410 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -27,6 +27,7 @@ databases = {extras = ["aiosqlite", "asyncpg"], version = "^0.7.0"} sqlalchemy = "<1.5" fief-client = {extras = ["fastapi"], version = "^0.17.0"} alembic = "^1.11.3" +prometheus-fastapi-instrumentator = "^6.1.0" [tool.poetry.group.dev.dependencies] diff --git a/server/reflector/app.py b/server/reflector/app.py index 14d55d7a..136199be 100644 --- a/server/reflector/app.py +++ b/server/reflector/app.py @@ -6,8 +6,10 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.routing import APIRoute from fastapi_pagination import add_pagination +from prometheus_fastapi_instrumentator import Instrumentator from reflector.events import subscribers_shutdown, subscribers_startup from reflector.logger import logger +from reflector.metrics import metrics_init from reflector.settings import settings from reflector.views.rtc_offer import router as rtc_offer_router from reflector.views.transcripts import router as transcripts_router @@ -23,10 +25,10 @@ except ImportError: @asynccontextmanager async def lifespan(app: FastAPI): for func in subscribers_startup: - await func() + await func(app) yield for func in subscribers_shutdown: - await func() + await func(app) # use sentry if available @@ -49,6 +51,12 @@ app.add_middleware( allow_headers=["*"], ) +# metrics +instrumentator = Instrumentator( + excluded_handlers=["/docs", "/metrics"], +).instrument(app) +metrics_init(app, instrumentator) + # register views app.include_router(rtc_offer_router) app.include_router(transcripts_router, prefix="/v1") diff --git a/server/reflector/db/__init__.py b/server/reflector/db/__init__.py index 2ac68029..b445e907 100644 --- a/server/reflector/db/__init__.py +++ b/server/reflector/db/__init__.py @@ -31,13 +31,11 @@ engine = sqlalchemy.create_engine( metadata.create_all(engine) -async def database_connect(): +@subscribers_startup.append +async def database_connect(_): await database.connect() -async def database_disconnect(): +@subscribers_shutdown.append +async def database_disconnect(_): await database.disconnect() - - -subscribers_startup.append(database_connect) -subscribers_shutdown.append(database_disconnect) diff --git a/server/reflector/llm/base.py b/server/reflector/llm/base.py index d046ffe7..e729148e 100644 --- a/server/reflector/llm/base.py +++ b/server/reflector/llm/base.py @@ -3,6 +3,7 @@ import json import re from time import monotonic +from prometheus_client import Counter, Histogram from reflector.logger import logger as reflector_logger from reflector.settings import settings from reflector.utils.retry import retry @@ -10,6 +11,26 @@ from reflector.utils.retry import retry class LLM: _registry = {} + m_generate = Histogram( + "llm_generate", + "Time spent in LLM.generate", + ["backend"], + ) + m_generate_call = Counter( + "llm_generate_call", + "Number of calls to LLM.generate", + ["backend"], + ) + m_generate_success = Counter( + "llm_generate_success", + "Number of successful calls to LLM.generate", + ["backend"], + ) + m_generate_failure = Counter( + "llm_generate_failure", + "Number of failed calls to LLM.generate", + ["backend"], + ) @classmethod def register(cls, name, klass): @@ -31,6 +52,13 @@ class LLM: importlib.import_module(module_name) return cls._registry[name]() + def __init__(self): + name = self.__class__.__name__ + self.m_generate = self.m_generate.labels(name) + self.m_generate_call = self.m_generate_call.labels(name) + self.m_generate_success = self.m_generate_success.labels(name) + self.m_generate_failure = self.m_generate_failure.labels(name) + async def warmup(self, logger: reflector_logger): start = monotonic() name = self.__class__.__name__ @@ -53,10 +81,16 @@ class LLM: **kwargs, ) -> dict: logger.info("LLM generate", prompt=repr(prompt)) + self.m_generate_call.inc() try: - result = await retry(self._generate)(prompt=prompt, schema=schema, **kwargs) + with self.m_generate.time(): + result = await retry(self._generate)( + prompt=prompt, schema=schema, **kwargs + ) + self.m_generate_success.inc() except Exception: logger.exception("Failed to call llm after retrying") + self.m_generate_failure.inc() raise logger.debug("LLM result [raw]", result=repr(result)) diff --git a/server/reflector/metrics.py b/server/reflector/metrics.py new file mode 100644 index 00000000..0f883952 --- /dev/null +++ b/server/reflector/metrics.py @@ -0,0 +1,3 @@ +def metrics_init(app, instrumentator): + instrumentator.instrument(app) + instrumentator.expose(app) diff --git a/server/reflector/processors/audio_transcript.py b/server/reflector/processors/audio_transcript.py index 708b959e..3f9dc85b 100644 --- a/server/reflector/processors/audio_transcript.py +++ b/server/reflector/processors/audio_transcript.py @@ -1,3 +1,4 @@ +from prometheus_client import Counter, Histogram from reflector.processors.base import Processor from reflector.processors.types import AudioFile, Transcript @@ -10,11 +11,46 @@ class AudioTranscriptProcessor(Processor): INPUT_TYPE = AudioFile OUTPUT_TYPE = Transcript + m_transcript = Histogram( + "audio_transcript", + "Time spent in AudioTranscript.transcript", + ["backend"], + ) + m_transcript_call = Counter( + "audio_transcript_call", + "Number of calls to AudioTranscript.transcript", + ["backend"], + ) + m_transcript_success = Counter( + "audio_transcript_success", + "Number of successful calls to AudioTranscript.transcript", + ["backend"], + ) + m_transcript_failure = Counter( + "audio_transcript_failure", + "Number of failed calls to AudioTranscript.transcript", + ["backend"], + ) + + def __init__(self, *args, **kwargs): + name = self.__class__.__name__ + self.m_transcript = self.m_transcript.labels(name) + self.m_transcript_call = self.m_transcript_call.labels(name) + self.m_transcript_success = self.m_transcript_success.labels(name) + self.m_transcript_failure = self.m_transcript_failure.labels(name) + super().__init__(*args, **kwargs) + async def _push(self, data: AudioFile): try: - result = await self._transcript(data) + self.m_transcript_call.inc() + with self.m_transcript.time(): + result = await self._transcript(data) + self.m_transcript_success.inc() if result: await self.emit(result) + except Exception: + self.m_transcript_failure.inc() + raise finally: data.release() diff --git a/server/reflector/processors/base.py b/server/reflector/processors/base.py index 0f93b14b..4c9757a0 100644 --- a/server/reflector/processors/base.py +++ b/server/reflector/processors/base.py @@ -3,6 +3,7 @@ from concurrent.futures import ThreadPoolExecutor from typing import Any, Union from uuid import uuid4 +from prometheus_client import Counter, Gauge, Histogram from pydantic import BaseModel from reflector.logger import logger @@ -18,8 +19,57 @@ class Processor: OUTPUT_TYPE: type = None WARMUP_EVENT: str = "WARMUP_EVENT" + m_processor = Histogram( + "processor", + "Time spent in Processor.process", + ["processor"], + ) + m_processor_call = Counter( + "processor_call", + "Number of calls to Processor.process", + ["processor"], + ) + m_processor_success = Counter( + "processor_success", + "Number of successful calls to Processor.process", + ["processor"], + ) + m_processor_failure = Counter( + "processor_failure", + "Number of failed calls to Processor.process", + ["processor"], + ) + m_processor_flush = Histogram( + "processor_flush", + "Time spent in Processor.flush", + ["processor"], + ) + m_processor_flush_call = Counter( + "processor_flush_call", + "Number of calls to Processor.flush", + ["processor"], + ) + m_processor_flush_success = Counter( + "processor_flush_success", + "Number of successful calls to Processor.flush", + ["processor"], + ) + m_processor_flush_failure = Counter( + "processor_flush_failure", + "Number of failed calls to Processor.flush", + ["processor"], + ) + def __init__(self, callback=None, custom_logger=None): - self.name = self.__class__.__name__ + self.name = name = self.__class__.__name__ + self.m_processor = self.m_processor.labels(name) + self.m_processor_call = self.m_processor_call.labels(name) + self.m_processor_success = self.m_processor_success.labels(name) + self.m_processor_failure = self.m_processor_failure.labels(name) + self.m_processor_flush = self.m_processor_flush.labels(name) + self.m_processor_flush_call = self.m_processor_flush_call.labels(name) + self.m_processor_flush_success = self.m_processor_flush_success.labels(name) + self.m_processor_flush_failure = self.m_processor_flush_failure.labels(name) self._processors = [] self._callbacks = [] if callback: @@ -89,11 +139,15 @@ class Processor: Push data to this processor. `data` must be of type `INPUT_TYPE` The function returns the output of type `OUTPUT_TYPE` """ - # logger.debug(f"{self.__class__.__name__} push") + self.m_processor_call.inc() try: self.flushed = False - return await self._push(data) + with self.m_processor.time(): + ret = await self._push(data) + self.m_processor_success.inc() + return ret except Exception: + self.m_processor_failure.inc() self.logger.exception("Error in push") async def flush(self): @@ -103,9 +157,16 @@ class Processor: """ if self.flushed: return - # logger.debug(f"{self.__class__.__name__} flush") + self.m_processor_flush_call.inc() self.flushed = True - return await self._flush() + try: + with self.m_processor_flush.time(): + ret = await self._flush() + self.m_processor_flush_success.inc() + return ret + except Exception: + self.m_processor_flush_failure.inc() + raise def describe(self, level=0): logger.info(" " * level + self.__class__.__name__) @@ -139,12 +200,27 @@ class ThreadedProcessor(Processor): A processor that runs in a separate thread """ + m_processor_queue = Gauge( + "processor_queue", + "Number of items in the processor queue", + ["processor", "processor_uid"], + ) + m_processor_queue_in_progress = Gauge( + "processor_queue_in_progress", + "Number of items in the processor queue in progress (global)", + ["processor"], + ) + def __init__(self, processor: Processor, max_workers=1): super().__init__() # FIXME: This is a hack to make sure that the processor is single threaded # but if it is more than 1, then we need to make sure that the processor # is emiting data in order assert max_workers == 1 + self.m_processor_queue = self.m_processor_queue.labels(processor.name, self.uid) + self.m_processor_queue_in_progress = self.m_processor_queue_in_progress.labels( + processor.name + ) self.processor = processor self.INPUT_TYPE = processor.INPUT_TYPE self.OUTPUT_TYPE = processor.OUTPUT_TYPE @@ -159,22 +235,27 @@ class ThreadedProcessor(Processor): async def loop(self): while True: data = await self.queue.get() - try: - if data is None: - await self.processor.flush() - break - if data == self.WARMUP_EVENT: - self.logger.debug(f"Warming up {self.processor.__class__.__name__}") - await self.processor.warmup() - continue + self.m_processor_queue.set(self.queue.qsize()) + with self.m_processor_queue_in_progress.track_inprogress(): try: - await self.processor.push(data) - except Exception: - self.logger.error( - f"Error in push {self.processor.__class__.__name__}, continue" - ) - finally: - self.queue.task_done() + if data is None: + await self.processor.flush() + break + if data == self.WARMUP_EVENT: + self.logger.debug( + f"Warming up {self.processor.__class__.__name__}" + ) + await self.processor.warmup() + continue + try: + await self.processor.push(data) + except Exception: + self.logger.error( + f"Error in push {self.processor.__class__.__name__}" + ", continue" + ) + finally: + self.queue.task_done() async def _warmup(self): await self.queue.put(self.WARMUP_EVENT) diff --git a/server/reflector/views/rtc_offer.py b/server/reflector/views/rtc_offer.py index 90f44434..5f0a2ab2 100644 --- a/server/reflector/views/rtc_offer.py +++ b/server/reflector/views/rtc_offer.py @@ -6,6 +6,7 @@ from pathlib import Path import av from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription from fastapi import APIRouter, Request +from prometheus_client import Gauge from pydantic import BaseModel from reflector.events import subscribers_shutdown from reflector.logger import logger @@ -25,6 +26,7 @@ from reflector.processors import ( sessions = [] router = APIRouter() +m_rtc_sessions = Gauge("rtc_sessions", "Number of active RTC sessions") class TranscriptionContext(object): @@ -188,12 +190,21 @@ async def rtc_offer_base( pc = RTCPeerConnection() async def flush_pipeline_and_quit(close=True): + # may be called twice + # 1. either the client ask to sotp the meeting + # - we flush and close + # - when we receive the close event, we do nothing. + # 2. or the client close the connection + # and there is nothing to do because it is already closed await update_status("processing") await ctx.pipeline.flush() if close: ctx.logger.debug("Closing peer connection") await pc.close() await update_status("ended") + if pc in sessions: + sessions.remove(pc) + m_rtc_sessions.dec() @pc.on("datachannel") def on_datachannel(channel): @@ -235,7 +246,7 @@ async def rtc_offer_base( @subscribers_shutdown.append -async def rtc_clean_sessions(): +async def rtc_clean_sessions(_): logger.info("Closing all RTC sessions") for pc in sessions: logger.debug(f"Closing session {pc}")