mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 04:39:06 +00:00
gpu: improve concurrency on modal - coauthored with Gokul (#286)
This commit is contained in:
@@ -5,6 +5,7 @@ Reflector GPU backend - LLM
|
|||||||
"""
|
"""
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import threading
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import modal
|
import modal
|
||||||
@@ -67,7 +68,7 @@ llm_image = (
|
|||||||
gpu="A100",
|
gpu="A100",
|
||||||
timeout=60 * 5,
|
timeout=60 * 5,
|
||||||
container_idle_timeout=60 * 5,
|
container_idle_timeout=60 * 5,
|
||||||
concurrency_limit=2,
|
allow_concurrent_inputs=15,
|
||||||
image=llm_image,
|
image=llm_image,
|
||||||
)
|
)
|
||||||
class LLM:
|
class LLM:
|
||||||
@@ -108,6 +109,8 @@ class LLM:
|
|||||||
self.gen_cfg = gen_cfg
|
self.gen_cfg = gen_cfg
|
||||||
self.GenerationConfig = GenerationConfig
|
self.GenerationConfig = GenerationConfig
|
||||||
|
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
|
||||||
def __exit__(self, *args):
|
def __exit__(self, *args):
|
||||||
print("Exit llm")
|
print("Exit llm")
|
||||||
|
|
||||||
@@ -123,30 +126,31 @@ class LLM:
|
|||||||
gen_cfg = self.gen_cfg
|
gen_cfg = self.gen_cfg
|
||||||
|
|
||||||
# If a gen_schema is given, conform to gen_schema
|
# If a gen_schema is given, conform to gen_schema
|
||||||
if gen_schema:
|
with self.lock:
|
||||||
import jsonformer
|
if gen_schema:
|
||||||
|
import jsonformer
|
||||||
|
|
||||||
print(f"Schema {gen_schema=}")
|
print(f"Schema {gen_schema=}")
|
||||||
jsonformer_llm = jsonformer.Jsonformer(
|
jsonformer_llm = jsonformer.Jsonformer(
|
||||||
model=self.model,
|
model=self.model,
|
||||||
tokenizer=self.tokenizer,
|
tokenizer=self.tokenizer,
|
||||||
json_schema=json.loads(gen_schema),
|
json_schema=json.loads(gen_schema),
|
||||||
prompt=prompt,
|
prompt=prompt,
|
||||||
max_string_token_length=gen_cfg.max_new_tokens
|
max_string_token_length=gen_cfg.max_new_tokens
|
||||||
)
|
)
|
||||||
response = jsonformer_llm()
|
response = jsonformer_llm()
|
||||||
else:
|
else:
|
||||||
# If no gen_schema, perform prompt only generation
|
# If no gen_schema, perform prompt only generation
|
||||||
|
|
||||||
# tokenize prompt
|
# tokenize prompt
|
||||||
input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(
|
input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(
|
||||||
self.model.device
|
self.model.device
|
||||||
)
|
)
|
||||||
output = self.model.generate(input_ids, generation_config=gen_cfg)
|
output = self.model.generate(input_ids, generation_config=gen_cfg)
|
||||||
|
|
||||||
# decode output
|
# decode output
|
||||||
response = self.tokenizer.decode(output[0].cpu(), skip_special_tokens=True)
|
response = self.tokenizer.decode(output[0].cpu(), skip_special_tokens=True)
|
||||||
response = response[len(prompt):]
|
response = response[len(prompt):]
|
||||||
print(f"Generated {response=}")
|
print(f"Generated {response=}")
|
||||||
return {"text": response}
|
return {"text": response}
|
||||||
|
|
||||||
@@ -158,6 +162,7 @@ class LLM:
|
|||||||
@stub.function(
|
@stub.function(
|
||||||
container_idle_timeout=60 * 10,
|
container_idle_timeout=60 * 10,
|
||||||
timeout=60 * 5,
|
timeout=60 * 5,
|
||||||
|
allow_concurrent_inputs=45,
|
||||||
secrets=[
|
secrets=[
|
||||||
Secret.from_name("reflector-gpu"),
|
Secret.from_name("reflector-gpu"),
|
||||||
],
|
],
|
||||||
@@ -187,7 +192,7 @@ def web():
|
|||||||
gen_cfg: Optional[dict] = None
|
gen_cfg: Optional[dict] = None
|
||||||
|
|
||||||
@app.post("/llm", dependencies=[Depends(apikey_auth)])
|
@app.post("/llm", dependencies=[Depends(apikey_auth)])
|
||||||
async def llm(
|
def llm(
|
||||||
req: LLMRequest,
|
req: LLMRequest,
|
||||||
):
|
):
|
||||||
gen_schema = json.dumps(req.gen_schema) if req.gen_schema else None
|
gen_schema = json.dumps(req.gen_schema) if req.gen_schema else None
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ Reflector GPU backend - LLM
|
|||||||
"""
|
"""
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
|
import threading
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
import modal
|
import modal
|
||||||
@@ -67,7 +68,7 @@ llm_image = (
|
|||||||
gpu="A10G",
|
gpu="A10G",
|
||||||
timeout=60 * 5,
|
timeout=60 * 5,
|
||||||
container_idle_timeout=60 * 5,
|
container_idle_timeout=60 * 5,
|
||||||
concurrency_limit=2,
|
allow_concurrent_inputs=10,
|
||||||
image=llm_image,
|
image=llm_image,
|
||||||
)
|
)
|
||||||
class LLM:
|
class LLM:
|
||||||
@@ -111,6 +112,7 @@ class LLM:
|
|||||||
self.tokenizer = tokenizer
|
self.tokenizer = tokenizer
|
||||||
self.gen_cfg = gen_cfg
|
self.gen_cfg = gen_cfg
|
||||||
self.GenerationConfig = GenerationConfig
|
self.GenerationConfig = GenerationConfig
|
||||||
|
self.lock = threading.Lock()
|
||||||
|
|
||||||
def __exit__(self, *args):
|
def __exit__(self, *args):
|
||||||
print("Exit llm")
|
print("Exit llm")
|
||||||
@@ -129,33 +131,34 @@ class LLM:
|
|||||||
gen_cfg = self.gen_cfg
|
gen_cfg = self.gen_cfg
|
||||||
|
|
||||||
# If a gen_schema is given, conform to gen_schema
|
# If a gen_schema is given, conform to gen_schema
|
||||||
if gen_schema:
|
with self.lock:
|
||||||
import jsonformer
|
if gen_schema:
|
||||||
|
import jsonformer
|
||||||
|
|
||||||
print(f"Schema {gen_schema=}")
|
print(f"Schema {gen_schema=}")
|
||||||
jsonformer_llm = jsonformer.Jsonformer(
|
jsonformer_llm = jsonformer.Jsonformer(
|
||||||
model=self.model,
|
model=self.model,
|
||||||
tokenizer=self.tokenizer,
|
tokenizer=self.tokenizer,
|
||||||
json_schema=json.loads(gen_schema),
|
json_schema=json.loads(gen_schema),
|
||||||
prompt=prompt,
|
prompt=prompt,
|
||||||
max_string_token_length=gen_cfg.max_new_tokens
|
max_string_token_length=gen_cfg.max_new_tokens
|
||||||
)
|
)
|
||||||
response = jsonformer_llm()
|
response = jsonformer_llm()
|
||||||
else:
|
else:
|
||||||
# If no gen_schema, perform prompt only generation
|
# If no gen_schema, perform prompt only generation
|
||||||
|
|
||||||
# tokenize prompt
|
# tokenize prompt
|
||||||
input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(
|
input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to(
|
||||||
self.model.device
|
self.model.device
|
||||||
)
|
)
|
||||||
output = self.model.generate(input_ids, generation_config=gen_cfg)
|
output = self.model.generate(input_ids, generation_config=gen_cfg)
|
||||||
|
|
||||||
# decode output
|
# decode output
|
||||||
response = self.tokenizer.decode(output[0].cpu(), skip_special_tokens=True)
|
response = self.tokenizer.decode(output[0].cpu(), skip_special_tokens=True)
|
||||||
response = response[len(prompt):]
|
response = response[len(prompt):]
|
||||||
response = {
|
response = {
|
||||||
"long_summary": response
|
"long_summary": response
|
||||||
}
|
}
|
||||||
print(f"Generated {response=}")
|
print(f"Generated {response=}")
|
||||||
return {"text": response}
|
return {"text": response}
|
||||||
|
|
||||||
@@ -167,6 +170,7 @@ class LLM:
|
|||||||
@stub.function(
|
@stub.function(
|
||||||
container_idle_timeout=60 * 10,
|
container_idle_timeout=60 * 10,
|
||||||
timeout=60 * 5,
|
timeout=60 * 5,
|
||||||
|
allow_concurrent_inputs=30,
|
||||||
secrets=[
|
secrets=[
|
||||||
Secret.from_name("reflector-gpu"),
|
Secret.from_name("reflector-gpu"),
|
||||||
],
|
],
|
||||||
@@ -196,7 +200,7 @@ def web():
|
|||||||
gen_cfg: Optional[dict] = None
|
gen_cfg: Optional[dict] = None
|
||||||
|
|
||||||
@app.post("/llm", dependencies=[Depends(apikey_auth)])
|
@app.post("/llm", dependencies=[Depends(apikey_auth)])
|
||||||
async def llm(
|
def llm(
|
||||||
req: LLMRequest,
|
req: LLMRequest,
|
||||||
):
|
):
|
||||||
gen_schema = json.dumps(req.gen_schema) if req.gen_schema else None
|
gen_schema = json.dumps(req.gen_schema) if req.gen_schema else None
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ Reflector GPU backend - transcriber
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import tempfile
|
||||||
|
import threading
|
||||||
|
|
||||||
from modal import Image, Secret, Stub, asgi_app, method
|
from modal import Image, Secret, Stub, asgi_app, method
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
@@ -78,6 +79,7 @@ transcriber_image = (
|
|||||||
gpu="A10G",
|
gpu="A10G",
|
||||||
timeout=60 * 5,
|
timeout=60 * 5,
|
||||||
container_idle_timeout=60 * 5,
|
container_idle_timeout=60 * 5,
|
||||||
|
allow_concurrent_inputs=6,
|
||||||
image=transcriber_image,
|
image=transcriber_image,
|
||||||
)
|
)
|
||||||
class Transcriber:
|
class Transcriber:
|
||||||
@@ -85,6 +87,7 @@ class Transcriber:
|
|||||||
import faster_whisper
|
import faster_whisper
|
||||||
import torch
|
import torch
|
||||||
|
|
||||||
|
self.lock = threading.Lock()
|
||||||
self.use_gpu = torch.cuda.is_available()
|
self.use_gpu = torch.cuda.is_available()
|
||||||
self.device = "cuda" if self.use_gpu else "cpu"
|
self.device = "cuda" if self.use_gpu else "cpu"
|
||||||
self.model = faster_whisper.WhisperModel(
|
self.model = faster_whisper.WhisperModel(
|
||||||
@@ -106,14 +109,15 @@ class Transcriber:
|
|||||||
with tempfile.NamedTemporaryFile("wb+", suffix=f".{audio_suffix}") as fp:
|
with tempfile.NamedTemporaryFile("wb+", suffix=f".{audio_suffix}") as fp:
|
||||||
fp.write(audio_data)
|
fp.write(audio_data)
|
||||||
|
|
||||||
segments, _ = self.model.transcribe(
|
with self.lock:
|
||||||
fp.name,
|
segments, _ = self.model.transcribe(
|
||||||
language=source_language,
|
fp.name,
|
||||||
beam_size=5,
|
language=source_language,
|
||||||
word_timestamps=True,
|
beam_size=5,
|
||||||
vad_filter=True,
|
word_timestamps=True,
|
||||||
vad_parameters={"min_silence_duration_ms": 500},
|
vad_filter=True,
|
||||||
)
|
vad_parameters={"min_silence_duration_ms": 500},
|
||||||
|
)
|
||||||
|
|
||||||
multilingual_transcript = {}
|
multilingual_transcript = {}
|
||||||
transcript_source_lang = ""
|
transcript_source_lang = ""
|
||||||
@@ -147,6 +151,7 @@ class Transcriber:
|
|||||||
@stub.function(
|
@stub.function(
|
||||||
container_idle_timeout=60,
|
container_idle_timeout=60,
|
||||||
timeout=60,
|
timeout=60,
|
||||||
|
allow_concurrent_inputs=40,
|
||||||
secrets=[
|
secrets=[
|
||||||
Secret.from_name("reflector-gpu"),
|
Secret.from_name("reflector-gpu"),
|
||||||
],
|
],
|
||||||
@@ -176,12 +181,12 @@ def web():
|
|||||||
result: dict
|
result: dict
|
||||||
|
|
||||||
@app.post("/transcribe", dependencies=[Depends(apikey_auth)])
|
@app.post("/transcribe", dependencies=[Depends(apikey_auth)])
|
||||||
async def transcribe(
|
def transcribe(
|
||||||
file: UploadFile,
|
file: UploadFile,
|
||||||
source_language: Annotated[str, Body(...)] = "en",
|
source_language: Annotated[str, Body(...)] = "en",
|
||||||
timestamp: Annotated[float, Body()] = 0.0
|
timestamp: Annotated[float, Body()] = 0.0
|
||||||
) -> TranscriptResponse:
|
) -> TranscriptResponse:
|
||||||
audio_data = await file.read()
|
audio_data = file.file.read()
|
||||||
audio_suffix = file.filename.split(".")[-1]
|
audio_suffix = file.filename.split(".")[-1]
|
||||||
assert audio_suffix in supported_audio_file_types
|
assert audio_suffix in supported_audio_file_types
|
||||||
|
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ Reflector GPU backend - transcriber
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import os
|
import os
|
||||||
import tempfile
|
import threading
|
||||||
|
|
||||||
from modal import Image, Secret, Stub, asgi_app, method
|
from modal import Image, Secret, Stub, asgi_app, method
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
@@ -129,6 +129,7 @@ transcriber_image = (
|
|||||||
gpu="A10G",
|
gpu="A10G",
|
||||||
timeout=60 * 5,
|
timeout=60 * 5,
|
||||||
container_idle_timeout=60 * 5,
|
container_idle_timeout=60 * 5,
|
||||||
|
allow_concurrent_inputs=4,
|
||||||
image=transcriber_image,
|
image=transcriber_image,
|
||||||
)
|
)
|
||||||
class Translator:
|
class Translator:
|
||||||
@@ -136,6 +137,7 @@ class Translator:
|
|||||||
import torch
|
import torch
|
||||||
from seamless_communication.models.inference.translator import Translator
|
from seamless_communication.models.inference.translator import Translator
|
||||||
|
|
||||||
|
self.lock = threading.Lock()
|
||||||
self.use_gpu = torch.cuda.is_available()
|
self.use_gpu = torch.cuda.is_available()
|
||||||
self.device = "cuda" if self.use_gpu else "cpu"
|
self.device = "cuda" if self.use_gpu else "cpu"
|
||||||
self.translator = Translator(
|
self.translator = Translator(
|
||||||
@@ -168,13 +170,14 @@ class Translator:
|
|||||||
source_language: str,
|
source_language: str,
|
||||||
target_language: str
|
target_language: str
|
||||||
):
|
):
|
||||||
translated_text, _, _ = self.translator.predict(
|
with self.lock:
|
||||||
text,
|
translated_text, _, _ = self.translator.predict(
|
||||||
"t2tt",
|
text,
|
||||||
src_lang=self.get_seamless_lang_code(source_language),
|
"t2tt",
|
||||||
tgt_lang=self.get_seamless_lang_code(target_language),
|
src_lang=self.get_seamless_lang_code(source_language),
|
||||||
ngram_filtering=True
|
tgt_lang=self.get_seamless_lang_code(target_language),
|
||||||
)
|
ngram_filtering=True
|
||||||
|
)
|
||||||
return {
|
return {
|
||||||
"text": {
|
"text": {
|
||||||
source_language: text,
|
source_language: text,
|
||||||
@@ -189,6 +192,7 @@ class Translator:
|
|||||||
@stub.function(
|
@stub.function(
|
||||||
container_idle_timeout=60,
|
container_idle_timeout=60,
|
||||||
timeout=60,
|
timeout=60,
|
||||||
|
allow_concurrent_inputs=40,
|
||||||
secrets=[
|
secrets=[
|
||||||
Secret.from_name("reflector-gpu"),
|
Secret.from_name("reflector-gpu"),
|
||||||
],
|
],
|
||||||
@@ -217,7 +221,7 @@ def web():
|
|||||||
result: dict
|
result: dict
|
||||||
|
|
||||||
@app.post("/translate", dependencies=[Depends(apikey_auth)])
|
@app.post("/translate", dependencies=[Depends(apikey_auth)])
|
||||||
async def translate(
|
def translate(
|
||||||
text: str,
|
text: str,
|
||||||
source_language: Annotated[str, Body(...)] = "en",
|
source_language: Annotated[str, Body(...)] = "en",
|
||||||
target_language: Annotated[str, Body(...)] = "fr",
|
target_language: Annotated[str, Body(...)] = "fr",
|
||||||
@@ -230,8 +234,4 @@ def web():
|
|||||||
result = func.get()
|
result = func.get()
|
||||||
return result
|
return result
|
||||||
|
|
||||||
@app.post("/warmup", dependencies=[Depends(apikey_auth)])
|
|
||||||
async def warmup():
|
|
||||||
return translatorstub.warmup.spawn().get()
|
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|||||||
Reference in New Issue
Block a user