Merge pull request #137 from Monadical-SAS/serverless-gpu-modal

server: implement modal backend for llm and transcription
This commit is contained in:
2023-08-11 16:19:01 +02:00
committed by GitHub
11 changed files with 204 additions and 4 deletions

View File

@@ -1,6 +1,7 @@
from reflector.settings import settings from reflector.settings import settings
from reflector.utils.retry import retry from reflector.utils.retry import retry
from reflector.logger import logger as reflector_logger from reflector.logger import logger as reflector_logger
from time import monotonic
import importlib import importlib
import json import json
import re import re
@@ -29,6 +30,21 @@ class LLM:
importlib.import_module(module_name) importlib.import_module(module_name)
return cls._registry[name]() return cls._registry[name]()
async def warmup(self, logger: reflector_logger):
start = monotonic()
name = self.__class__.__name__
logger.info(f"LLM[{name}] warming up...")
try:
await self._warmup(logger=logger)
duration = monotonic() - start
logger.info(f"LLM[{name}] warmup took {duration:.2f} seconds")
except Exception:
logger.exception(f"LLM[{name}] warmup failed")
raise
async def _warmup(self, logger: reflector_logger):
pass
async def generate(self, prompt: str, logger: reflector_logger, **kwargs) -> dict: async def generate(self, prompt: str, logger: reflector_logger, **kwargs) -> dict:
logger.info("LLM generate", prompt=repr(prompt)) logger.info("LLM generate", prompt=repr(prompt))
try: try:

View File

@@ -0,0 +1,53 @@
from reflector.llm.base import LLM
from reflector.settings import settings
from reflector.utils.retry import retry
import httpx
class ModalLLM(LLM):
def __init__(self):
super().__init__()
self.timeout = settings.LLM_TIMEOUT
self.llm_url = settings.LLM_URL + "/llm"
self.llm_warmup_url = settings.LLM_URL + "/warmup"
self.headers = {
"Authorization": f"Bearer {settings.LLM_MODAL_API_KEY}",
}
async def _warmup(self, logger):
async with httpx.AsyncClient() as client:
response = await client.post(
self.llm_warmup_url,
headers=self.headers,
timeout=60**5,
)
response.raise_for_status()
async def _generate(self, prompt: str, **kwargs):
async with httpx.AsyncClient() as client:
response = await retry(client.post)(
self.llm_url,
headers=self.headers,
json={"prompt": prompt},
timeout=self.timeout,
retry_timeout=60 * 5,
)
response.raise_for_status()
text = response.json()["text"]
text = text[len(prompt) :] # remove prompt
return text
LLM.register("modal", ModalLLM)
if __name__ == "__main__":
from reflector.logger import logger
async def main():
llm = ModalLLM()
result = await llm.generate("Hello, my name is", logger=logger)
print(result)
import asyncio
asyncio.run(main())

View File

@@ -47,6 +47,9 @@ class AudioTranscriptAutoProcessor(AudioTranscriptProcessor):
def off(self, callback): def off(self, callback):
self.processor.off(callback) self.processor.off(callback)
async def _warmup(self):
return await self.processor._warmup()
async def _push(self, data: AudioFile): async def _push(self, data: AudioFile):
return await self.processor._push(data) return await self.processor._push(data)

View File

@@ -0,0 +1,83 @@
"""
Implementation using the GPU service from modal.com
API will be a POST request to TRANSCRIPT_URL:
```form
"timestamp": 123.456
"language": "en"
"file": <audio file>
```
"""
from reflector.processors.audio_transcript import AudioTranscriptProcessor
from reflector.processors.audio_transcript_auto import AudioTranscriptAutoProcessor
from reflector.processors.types import AudioFile, Transcript, Word
from reflector.settings import settings
from reflector.utils.retry import retry
from time import monotonic
import httpx
class AudioTranscriptModalProcessor(AudioTranscriptProcessor):
def __init__(self, modal_api_key: str):
super().__init__()
self.transcript_url = settings.TRANSCRIPT_URL + "/transcribe"
self.warmup_url = settings.TRANSCRIPT_URL + "/warmup"
self.timeout = settings.TRANSCRIPT_TIMEOUT
self.headers = {
"Authorization": f"Bearer {modal_api_key}",
}
async def _warmup(self):
try:
async with httpx.AsyncClient() as client:
start = monotonic()
self.logger.debug("Transcribe modal: warming up...")
response = await client.post(
self.warmup_url,
headers=self.headers,
timeout=self.timeout,
)
response.raise_for_status()
duration = monotonic() - start
self.logger.debug(f"Transcribe modal: warmup took {duration:.2f}s")
except Exception:
self.logger.exception("Transcribe modal: warmup failed")
async def _transcript(self, data: AudioFile):
async with httpx.AsyncClient() as client:
self.logger.debug(f"Try to transcribe audio {data.path.name}")
files = {
"file": (data.path.name, data.path.open("rb")),
}
response = await retry(client.post)(
self.transcript_url,
files=files,
timeout=self.timeout,
headers=self.headers,
)
self.logger.debug(
f"Transcript response: {response.status_code} {response.content}"
)
response.raise_for_status()
result = response.json()
transcript = Transcript(
text=result["text"],
words=[
Word(
text=word["text"],
start=word["start"],
end=word["end"],
)
for word in result["words"]
],
)
transcript.add_offset(data.timestamp)
return transcript
AudioTranscriptAutoProcessor.register("modal", AudioTranscriptModalProcessor)

View File

@@ -7,6 +7,7 @@ import asyncio
class Processor: class Processor:
INPUT_TYPE: type = None INPUT_TYPE: type = None
OUTPUT_TYPE: type = None OUTPUT_TYPE: type = None
WARMUP_EVENT: str = "WARMUP_EVENT"
def __init__(self, callback=None, custom_logger=None): def __init__(self, callback=None, custom_logger=None):
self._processors = [] self._processors = []
@@ -85,12 +86,21 @@ class Processor:
def describe(self, level=0): def describe(self, level=0):
logger.info(" " * level + self.__class__.__name__) logger.info(" " * level + self.__class__.__name__)
async def warmup(self):
"""
Warmup the processor
"""
await self._warmup()
async def _push(self, data): async def _push(self, data):
raise NotImplementedError raise NotImplementedError
async def _flush(self): async def _flush(self):
pass pass
async def _warmup(self):
pass
@classmethod @classmethod
def as_threaded(cls, *args, **kwargs): def as_threaded(cls, *args, **kwargs):
""" """
@@ -129,10 +139,17 @@ class ThreadedProcessor(Processor):
if data is None: if data is None:
await self.processor.flush() await self.processor.flush()
break break
if data == self.WARMUP_EVENT:
self.logger.debug(f"Warming up {self.processor.__class__.__name__}")
await self.processor.warmup()
continue
await self.processor.push(data) await self.processor.push(data)
finally: finally:
self.queue.task_done() self.queue.task_done()
async def _warmup(self):
await self.queue.put(self.WARMUP_EVENT)
async def _push(self, data): async def _push(self, data):
await self.queue.put(data) await self.queue.put(data)
@@ -163,6 +180,7 @@ class Pipeline(Processor):
OUTPUT_TYPE = None OUTPUT_TYPE = None
def __init__(self, *processors: Processor): def __init__(self, *processors: Processor):
self._warmed_up = False
super().__init__() super().__init__()
self.logger = logger.bind(pipeline=self.uid) self.logger = logger.bind(pipeline=self.uid)
self.logger.info("Pipeline created") self.logger.info("Pipeline created")
@@ -178,6 +196,11 @@ class Pipeline(Processor):
self.INPUT_TYPE = processors[0].INPUT_TYPE self.INPUT_TYPE = processors[0].INPUT_TYPE
self.OUTPUT_TYPE = processors[-1].OUTPUT_TYPE self.OUTPUT_TYPE = processors[-1].OUTPUT_TYPE
async def _warmup(self):
for processor in self.processors:
self.logger.debug(f"Warming up {processor.__class__.__name__}")
await processor.warmup()
async def _push(self, data): async def _push(self, data):
await self.processors[0].push(data) await self.processors[0].push(data)

View File

@@ -31,6 +31,9 @@ class TranscriptTopicDetectorProcessor(Processor):
self.min_transcript_length = min_transcript_length self.min_transcript_length = min_transcript_length
self.llm = LLM.get_instance() self.llm = LLM.get_instance()
async def _warmup(self):
await self.llm.warmup(logger=self.logger)
async def _push(self, data: Transcript): async def _push(self, data: Transcript):
if self.transcript is None: if self.transcript is None:
self.transcript = data self.transcript = data

View File

@@ -49,6 +49,11 @@ class Transcript(BaseModel):
self.words.extend(other.words) self.words.extend(other.words)
self.text += other.text self.text += other.text
def add_offset(self, offset: float):
for word in self.words:
word.start += offset
word.end += offset
def clone(self): def clone(self):
words = [ words = [
Word(text=word.text, start=word.start, end=word.end) for word in self.words Word(text=word.text, start=word.start, end=word.end) for word in self.words

View File

@@ -36,6 +36,9 @@ class Settings(BaseSettings):
TRANSCRIPT_BANANA_API_KEY: str | None = None TRANSCRIPT_BANANA_API_KEY: str | None = None
TRANSCRIPT_BANANA_MODEL_KEY: str | None = None TRANSCRIPT_BANANA_MODEL_KEY: str | None = None
# Audio transcription modal.com configuration
TRANSCRIPT_MODAL_API_KEY: str | None = None
# Audio transcription storage # Audio transcription storage
TRANSCRIPT_STORAGE_BACKEND: str = "aws" TRANSCRIPT_STORAGE_BACKEND: str = "aws"
@@ -63,6 +66,9 @@ class Settings(BaseSettings):
LLM_BANANA_API_KEY: str | None = None LLM_BANANA_API_KEY: str | None = None
LLM_BANANA_MODEL_KEY: str | None = None LLM_BANANA_MODEL_KEY: str | None = None
# LLM Modal configuration
LLM_MODAL_API_KEY: str | None = None
# Sentry # Sentry
SENTRY_DSN: str | None = None SENTRY_DSN: str | None = None

View File

@@ -58,12 +58,14 @@ def retry(fn):
if result: if result:
return result return result
except HTTPStatusError as e: except HTTPStatusError as e:
logger.exception(e)
status_code = e.response.status_code status_code = e.response.status_code
logger.debug(f"HTTP status {status_code} - {e}") logger.debug(f"HTTP status {status_code} - {e}")
if status_code in retry_httpx_status_stop: if status_code in retry_httpx_status_stop:
message = f"HTTP status {status_code} is in retry_httpx_status_stop" message = f"HTTP status {status_code} is in retry_httpx_status_stop"
raise RetryHTTPException(message) from e raise RetryHTTPException(message) from e
except retry_ignore_exc_types as e: except retry_ignore_exc_types as e:
logger.exception(e)
last_exception = e last_exception = e
logger.debug( logger.debug(

View File

@@ -159,6 +159,7 @@ async def rtc_offer_base(
TranscriptTopicDetectorProcessor.as_threaded(callback=on_topic), TranscriptTopicDetectorProcessor.as_threaded(callback=on_topic),
TranscriptFinalSummaryProcessor.as_threaded(callback=on_final_summary), TranscriptFinalSummaryProcessor.as_threaded(callback=on_final_summary),
) )
await ctx.pipeline.warmup()
# handle RTC peer connection # handle RTC peer connection
pc = RTCPeerConnection() pc = RTCPeerConnection()

View File

@@ -1,11 +1,16 @@
from fastapi import APIRouter, HTTPException, Request, WebSocket, WebSocketDisconnect from fastapi import (
APIRouter,
HTTPException,
Request,
WebSocket,
WebSocketDisconnect,
)
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from uuid import UUID, uuid4 from uuid import UUID, uuid4
from datetime import datetime from datetime import datetime
from fastapi_pagination import Page, paginate from fastapi_pagination import Page, paginate
from reflector.logger import logger from reflector.logger import logger
from .rtc_offer import rtc_offer_base, RtcOffer, PipelineEvent from .rtc_offer import rtc_offer_base, RtcOffer, PipelineEvent
import asyncio
from typing import Optional from typing import Optional
@@ -239,8 +244,8 @@ async def transcript_events_websocket(transcript_id: UUID, websocket: WebSocket)
# endless loop to wait for new events # endless loop to wait for new events
try: try:
while True: while True:
await asyncio.sleep(42) await websocket.receive()
except WebSocketDisconnect: except (RuntimeError, WebSocketDisconnect):
ws_manager.disconnect(transcript_id, websocket) ws_manager.disconnect(transcript_id, websocket)