server: add prometheus instrumentation

This commit is contained in:
2023-09-12 13:11:13 +02:00
parent 487eb1ae22
commit 60edca6366
9 changed files with 235 additions and 34 deletions

35
server/poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. # This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand.
[[package]] [[package]]
name = "aioboto3" name = "aioboto3"
@@ -2061,6 +2061,35 @@ files = [
dev = ["pre-commit", "tox"] dev = ["pre-commit", "tox"]
testing = ["pytest", "pytest-benchmark"] testing = ["pytest", "pytest-benchmark"]
[[package]]
name = "prometheus-client"
version = "0.17.1"
description = "Python client for the Prometheus monitoring system."
optional = false
python-versions = ">=3.6"
files = [
{file = "prometheus_client-0.17.1-py3-none-any.whl", hash = "sha256:e537f37160f6807b8202a6fc4764cdd19bac5480ddd3e0d463c3002b34462101"},
{file = "prometheus_client-0.17.1.tar.gz", hash = "sha256:21e674f39831ae3f8acde238afd9a27a37d0d2fb5a28ea094f0ce25d2cbf2091"},
]
[package.extras]
twisted = ["twisted"]
[[package]]
name = "prometheus-fastapi-instrumentator"
version = "6.1.0"
description = "Instrument your FastAPI with Prometheus metrics."
optional = false
python-versions = ">=3.7.0,<4.0.0"
files = [
{file = "prometheus_fastapi_instrumentator-6.1.0-py3-none-any.whl", hash = "sha256:2279ac1cf5b9566a4c3a07f78c9c5ee19648ed90976ab87d73d672abc1bfa017"},
{file = "prometheus_fastapi_instrumentator-6.1.0.tar.gz", hash = "sha256:1820d7a90389ce100f7d1285495ead388818ae0882e761c1f3e6e62a410bdf13"},
]
[package.dependencies]
fastapi = ">=0.38.1,<1.0.0"
prometheus-client = ">=0.8.0,<1.0.0"
[[package]] [[package]]
name = "protobuf" name = "protobuf"
version = "4.23.4" version = "4.23.4"
@@ -2805,7 +2834,7 @@ files = [
] ]
[package.dependencies] [package.dependencies]
greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\")"} greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")"}
[package.extras] [package.extras]
aiomysql = ["aiomysql", "greenlet (!=0.4.17)"] aiomysql = ["aiomysql", "greenlet (!=0.4.17)"]
@@ -3395,4 +3424,4 @@ multidict = ">=4.0"
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.11" python-versions = "^3.11"
content-hash = "912184d26e5c8916fad9e969697ead9043a8688559063dfca4674ffcd5e2230d" content-hash = "1f6c44eb5498ef1bfe3cc661746baf997ddd4888c83921fab74eea3c5f04dc51"

View File

@@ -27,6 +27,7 @@ databases = {extras = ["aiosqlite", "asyncpg"], version = "^0.7.0"}
sqlalchemy = "<1.5" sqlalchemy = "<1.5"
fief-client = {extras = ["fastapi"], version = "^0.17.0"} fief-client = {extras = ["fastapi"], version = "^0.17.0"}
alembic = "^1.11.3" alembic = "^1.11.3"
prometheus-fastapi-instrumentator = "^6.1.0"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]

View File

@@ -6,8 +6,10 @@ from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from fastapi.routing import APIRoute from fastapi.routing import APIRoute
from fastapi_pagination import add_pagination from fastapi_pagination import add_pagination
from prometheus_fastapi_instrumentator import Instrumentator
from reflector.events import subscribers_shutdown, subscribers_startup from reflector.events import subscribers_shutdown, subscribers_startup
from reflector.logger import logger from reflector.logger import logger
from reflector.metrics import metrics_init
from reflector.settings import settings from reflector.settings import settings
from reflector.views.rtc_offer import router as rtc_offer_router from reflector.views.rtc_offer import router as rtc_offer_router
from reflector.views.transcripts import router as transcripts_router from reflector.views.transcripts import router as transcripts_router
@@ -23,10 +25,10 @@ except ImportError:
@asynccontextmanager @asynccontextmanager
async def lifespan(app: FastAPI): async def lifespan(app: FastAPI):
for func in subscribers_startup: for func in subscribers_startup:
await func() await func(app)
yield yield
for func in subscribers_shutdown: for func in subscribers_shutdown:
await func() await func(app)
# use sentry if available # use sentry if available
@@ -49,6 +51,12 @@ app.add_middleware(
allow_headers=["*"], allow_headers=["*"],
) )
# metrics
instrumentator = Instrumentator(
excluded_handlers=["/docs", "/metrics"],
).instrument(app)
metrics_init(app, instrumentator)
# register views # register views
app.include_router(rtc_offer_router) app.include_router(rtc_offer_router)
app.include_router(transcripts_router, prefix="/v1") app.include_router(transcripts_router, prefix="/v1")

View File

@@ -31,13 +31,11 @@ engine = sqlalchemy.create_engine(
metadata.create_all(engine) metadata.create_all(engine)
async def database_connect(): @subscribers_startup.append
async def database_connect(_):
await database.connect() await database.connect()
async def database_disconnect(): @subscribers_shutdown.append
async def database_disconnect(_):
await database.disconnect() await database.disconnect()
subscribers_startup.append(database_connect)
subscribers_shutdown.append(database_disconnect)

View File

@@ -3,6 +3,7 @@ import json
import re import re
from time import monotonic from time import monotonic
from prometheus_client import Counter, Histogram
from reflector.logger import logger as reflector_logger from reflector.logger import logger as reflector_logger
from reflector.settings import settings from reflector.settings import settings
from reflector.utils.retry import retry from reflector.utils.retry import retry
@@ -10,6 +11,26 @@ from reflector.utils.retry import retry
class LLM: class LLM:
_registry = {} _registry = {}
m_generate = Histogram(
"llm_generate",
"Time spent in LLM.generate",
["backend"],
)
m_generate_call = Counter(
"llm_generate_call",
"Number of calls to LLM.generate",
["backend"],
)
m_generate_success = Counter(
"llm_generate_success",
"Number of successful calls to LLM.generate",
["backend"],
)
m_generate_failure = Counter(
"llm_generate_failure",
"Number of failed calls to LLM.generate",
["backend"],
)
@classmethod @classmethod
def register(cls, name, klass): def register(cls, name, klass):
@@ -31,6 +52,13 @@ class LLM:
importlib.import_module(module_name) importlib.import_module(module_name)
return cls._registry[name]() return cls._registry[name]()
def __init__(self):
name = self.__class__.__name__
self.m_generate = self.m_generate.labels(name)
self.m_generate_call = self.m_generate_call.labels(name)
self.m_generate_success = self.m_generate_success.labels(name)
self.m_generate_failure = self.m_generate_failure.labels(name)
async def warmup(self, logger: reflector_logger): async def warmup(self, logger: reflector_logger):
start = monotonic() start = monotonic()
name = self.__class__.__name__ name = self.__class__.__name__
@@ -53,10 +81,16 @@ class LLM:
**kwargs, **kwargs,
) -> dict: ) -> dict:
logger.info("LLM generate", prompt=repr(prompt)) logger.info("LLM generate", prompt=repr(prompt))
self.m_generate_call.inc()
try: try:
result = await retry(self._generate)(prompt=prompt, schema=schema, **kwargs) with self.m_generate.time():
result = await retry(self._generate)(
prompt=prompt, schema=schema, **kwargs
)
self.m_generate_success.inc()
except Exception: except Exception:
logger.exception("Failed to call llm after retrying") logger.exception("Failed to call llm after retrying")
self.m_generate_failure.inc()
raise raise
logger.debug("LLM result [raw]", result=repr(result)) logger.debug("LLM result [raw]", result=repr(result))

View File

@@ -0,0 +1,3 @@
def metrics_init(app, instrumentator):
instrumentator.instrument(app)
instrumentator.expose(app)

View File

@@ -1,3 +1,4 @@
from prometheus_client import Counter, Histogram
from reflector.processors.base import Processor from reflector.processors.base import Processor
from reflector.processors.types import AudioFile, Transcript from reflector.processors.types import AudioFile, Transcript
@@ -10,11 +11,46 @@ class AudioTranscriptProcessor(Processor):
INPUT_TYPE = AudioFile INPUT_TYPE = AudioFile
OUTPUT_TYPE = Transcript OUTPUT_TYPE = Transcript
m_transcript = Histogram(
"audio_transcript",
"Time spent in AudioTranscript.transcript",
["backend"],
)
m_transcript_call = Counter(
"audio_transcript_call",
"Number of calls to AudioTranscript.transcript",
["backend"],
)
m_transcript_success = Counter(
"audio_transcript_success",
"Number of successful calls to AudioTranscript.transcript",
["backend"],
)
m_transcript_failure = Counter(
"audio_transcript_failure",
"Number of failed calls to AudioTranscript.transcript",
["backend"],
)
def __init__(self, *args, **kwargs):
name = self.__class__.__name__
self.m_transcript = self.m_transcript.labels(name)
self.m_transcript_call = self.m_transcript_call.labels(name)
self.m_transcript_success = self.m_transcript_success.labels(name)
self.m_transcript_failure = self.m_transcript_failure.labels(name)
super().__init__(*args, **kwargs)
async def _push(self, data: AudioFile): async def _push(self, data: AudioFile):
try: try:
self.m_transcript_call.inc()
with self.m_transcript.time():
result = await self._transcript(data) result = await self._transcript(data)
self.m_transcript_success.inc()
if result: if result:
await self.emit(result) await self.emit(result)
except Exception:
self.m_transcript_failure.inc()
raise
finally: finally:
data.release() data.release()

View File

@@ -3,6 +3,7 @@ from concurrent.futures import ThreadPoolExecutor
from typing import Any, Union from typing import Any, Union
from uuid import uuid4 from uuid import uuid4
from prometheus_client import Counter, Gauge, Histogram
from pydantic import BaseModel from pydantic import BaseModel
from reflector.logger import logger from reflector.logger import logger
@@ -18,8 +19,57 @@ class Processor:
OUTPUT_TYPE: type = None OUTPUT_TYPE: type = None
WARMUP_EVENT: str = "WARMUP_EVENT" WARMUP_EVENT: str = "WARMUP_EVENT"
m_processor = Histogram(
"processor",
"Time spent in Processor.process",
["processor"],
)
m_processor_call = Counter(
"processor_call",
"Number of calls to Processor.process",
["processor"],
)
m_processor_success = Counter(
"processor_success",
"Number of successful calls to Processor.process",
["processor"],
)
m_processor_failure = Counter(
"processor_failure",
"Number of failed calls to Processor.process",
["processor"],
)
m_processor_flush = Histogram(
"processor_flush",
"Time spent in Processor.flush",
["processor"],
)
m_processor_flush_call = Counter(
"processor_flush_call",
"Number of calls to Processor.flush",
["processor"],
)
m_processor_flush_success = Counter(
"processor_flush_success",
"Number of successful calls to Processor.flush",
["processor"],
)
m_processor_flush_failure = Counter(
"processor_flush_failure",
"Number of failed calls to Processor.flush",
["processor"],
)
def __init__(self, callback=None, custom_logger=None): def __init__(self, callback=None, custom_logger=None):
self.name = self.__class__.__name__ self.name = name = self.__class__.__name__
self.m_processor = self.m_processor.labels(name)
self.m_processor_call = self.m_processor_call.labels(name)
self.m_processor_success = self.m_processor_success.labels(name)
self.m_processor_failure = self.m_processor_failure.labels(name)
self.m_processor_flush = self.m_processor_flush.labels(name)
self.m_processor_flush_call = self.m_processor_flush_call.labels(name)
self.m_processor_flush_success = self.m_processor_flush_success.labels(name)
self.m_processor_flush_failure = self.m_processor_flush_failure.labels(name)
self._processors = [] self._processors = []
self._callbacks = [] self._callbacks = []
if callback: if callback:
@@ -89,11 +139,15 @@ class Processor:
Push data to this processor. `data` must be of type `INPUT_TYPE` Push data to this processor. `data` must be of type `INPUT_TYPE`
The function returns the output of type `OUTPUT_TYPE` The function returns the output of type `OUTPUT_TYPE`
""" """
# logger.debug(f"{self.__class__.__name__} push") self.m_processor_call.inc()
try: try:
self.flushed = False self.flushed = False
return await self._push(data) with self.m_processor.time():
ret = await self._push(data)
self.m_processor_success.inc()
return ret
except Exception: except Exception:
self.m_processor_failure.inc()
self.logger.exception("Error in push") self.logger.exception("Error in push")
async def flush(self): async def flush(self):
@@ -103,9 +157,16 @@ class Processor:
""" """
if self.flushed: if self.flushed:
return return
# logger.debug(f"{self.__class__.__name__} flush") self.m_processor_flush_call.inc()
self.flushed = True self.flushed = True
return await self._flush() try:
with self.m_processor_flush.time():
ret = await self._flush()
self.m_processor_flush_success.inc()
return ret
except Exception:
self.m_processor_flush_failure.inc()
raise
def describe(self, level=0): def describe(self, level=0):
logger.info(" " * level + self.__class__.__name__) logger.info(" " * level + self.__class__.__name__)
@@ -139,12 +200,27 @@ class ThreadedProcessor(Processor):
A processor that runs in a separate thread A processor that runs in a separate thread
""" """
m_processor_queue = Gauge(
"processor_queue",
"Number of items in the processor queue",
["processor", "processor_uid"],
)
m_processor_queue_in_progress = Gauge(
"processor_queue_in_progress",
"Number of items in the processor queue in progress (global)",
["processor"],
)
def __init__(self, processor: Processor, max_workers=1): def __init__(self, processor: Processor, max_workers=1):
super().__init__() super().__init__()
# FIXME: This is a hack to make sure that the processor is single threaded # FIXME: This is a hack to make sure that the processor is single threaded
# but if it is more than 1, then we need to make sure that the processor # but if it is more than 1, then we need to make sure that the processor
# is emiting data in order # is emiting data in order
assert max_workers == 1 assert max_workers == 1
self.m_processor_queue = self.m_processor_queue.labels(processor.name, self.uid)
self.m_processor_queue_in_progress = self.m_processor_queue_in_progress.labels(
processor.name
)
self.processor = processor self.processor = processor
self.INPUT_TYPE = processor.INPUT_TYPE self.INPUT_TYPE = processor.INPUT_TYPE
self.OUTPUT_TYPE = processor.OUTPUT_TYPE self.OUTPUT_TYPE = processor.OUTPUT_TYPE
@@ -159,19 +235,24 @@ class ThreadedProcessor(Processor):
async def loop(self): async def loop(self):
while True: while True:
data = await self.queue.get() data = await self.queue.get()
self.m_processor_queue.set(self.queue.qsize())
with self.m_processor_queue_in_progress.track_inprogress():
try: try:
if data is None: if data is None:
await self.processor.flush() await self.processor.flush()
break break
if data == self.WARMUP_EVENT: if data == self.WARMUP_EVENT:
self.logger.debug(f"Warming up {self.processor.__class__.__name__}") self.logger.debug(
f"Warming up {self.processor.__class__.__name__}"
)
await self.processor.warmup() await self.processor.warmup()
continue continue
try: try:
await self.processor.push(data) await self.processor.push(data)
except Exception: except Exception:
self.logger.error( self.logger.error(
f"Error in push {self.processor.__class__.__name__}, continue" f"Error in push {self.processor.__class__.__name__}"
", continue"
) )
finally: finally:
self.queue.task_done() self.queue.task_done()

View File

@@ -6,6 +6,7 @@ from pathlib import Path
import av import av
from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription
from fastapi import APIRouter, Request from fastapi import APIRouter, Request
from prometheus_client import Gauge
from pydantic import BaseModel from pydantic import BaseModel
from reflector.events import subscribers_shutdown from reflector.events import subscribers_shutdown
from reflector.logger import logger from reflector.logger import logger
@@ -25,6 +26,7 @@ from reflector.processors import (
sessions = [] sessions = []
router = APIRouter() router = APIRouter()
m_rtc_sessions = Gauge("rtc_sessions", "Number of active RTC sessions")
class TranscriptionContext(object): class TranscriptionContext(object):
@@ -188,12 +190,21 @@ async def rtc_offer_base(
pc = RTCPeerConnection() pc = RTCPeerConnection()
async def flush_pipeline_and_quit(close=True): async def flush_pipeline_and_quit(close=True):
# may be called twice
# 1. either the client ask to sotp the meeting
# - we flush and close
# - when we receive the close event, we do nothing.
# 2. or the client close the connection
# and there is nothing to do because it is already closed
await update_status("processing") await update_status("processing")
await ctx.pipeline.flush() await ctx.pipeline.flush()
if close: if close:
ctx.logger.debug("Closing peer connection") ctx.logger.debug("Closing peer connection")
await pc.close() await pc.close()
await update_status("ended") await update_status("ended")
if pc in sessions:
sessions.remove(pc)
m_rtc_sessions.dec()
@pc.on("datachannel") @pc.on("datachannel")
def on_datachannel(channel): def on_datachannel(channel):
@@ -235,7 +246,7 @@ async def rtc_offer_base(
@subscribers_shutdown.append @subscribers_shutdown.append
async def rtc_clean_sessions(): async def rtc_clean_sessions(_):
logger.info("Closing all RTC sessions") logger.info("Closing all RTC sessions")
for pc in sessions: for pc in sessions:
logger.debug(f"Closing session {pc}") logger.debug(f"Closing session {pc}")