Files
reflector/server/gpu/modal_deployments/reflector_translator.py
2025-03-25 11:09:01 +01:00

429 lines
12 KiB
Python

"""
Reflector GPU backend - transcriber
===================================
"""
import os
import threading
from modal import App, Image, Secret, asgi_app, enter, method
from pydantic import BaseModel
# Seamless M4T
SEAMLESSM4T_MODEL_SIZE: str = "medium"
SEAMLESSM4T_MODEL_CARD_NAME: str = f"seamlessM4T_{SEAMLESSM4T_MODEL_SIZE}"
SEAMLESSM4T_VOCODER_CARD_NAME: str = "vocoder_36langs"
HF_SEAMLESS_M4TEPO: str = f"facebook/seamless-m4t-{SEAMLESSM4T_MODEL_SIZE}"
HF_SEAMLESS_M4T_VOCODEREPO: str = "facebook/seamless-m4t-vocoder"
SEAMLESS_GITEPO: str = "https://github.com/facebookresearch/seamless_communication.git"
SEAMLESS_MODEL_DIR: str = "m4t"
app = App(name="reflector-translator")
def install_seamless_communication():
import os
import subprocess
initial_dir = os.getcwd()
subprocess.run(
["ssh-keyscan", "-t", "rsa", "github.com", ">>", "~/.ssh/known_hosts"]
)
subprocess.run(["rm", "-rf", "seamless_communication"])
subprocess.run(["git", "clone", SEAMLESS_GITEPO, "." + "/seamless_communication"])
os.chdir("seamless_communication")
subprocess.run(["pip", "install", "-e", "."])
os.chdir(initial_dir)
def download_seamlessm4t_model():
from huggingface_hub import snapshot_download
print("Downloading Transcriber model & tokenizer")
snapshot_download(HF_SEAMLESS_M4TEPO, cache_dir=SEAMLESS_MODEL_DIR)
print("Transcriber model & tokenizer downloaded")
print("Downloading vocoder weights")
snapshot_download(HF_SEAMLESS_M4T_VOCODEREPO, cache_dir=SEAMLESS_MODEL_DIR)
print("Vocoder weights downloaded")
def configure_seamless_m4t():
import os
import yaml
CARDS_DIR: str = "./seamless_communication/src/seamless_communication/cards"
with open(f"{CARDS_DIR}/seamlessM4T_{SEAMLESSM4T_MODEL_SIZE}.yaml", "r") as file:
model_yaml_data = yaml.load(file, Loader=yaml.FullLoader)
with open(f"{CARDS_DIR}/vocoder_36langs.yaml", "r") as file:
vocoder_yaml_data = yaml.load(file, Loader=yaml.FullLoader)
with open(f"{CARDS_DIR}/unity_nllb-100.yaml", "r") as file:
unity_100_yaml_data = yaml.load(file, Loader=yaml.FullLoader)
with open(f"{CARDS_DIR}/unity_nllb-200.yaml", "r") as file:
unity_200_yaml_data = yaml.load(file, Loader=yaml.FullLoader)
model_dir = f"{SEAMLESS_MODEL_DIR}/models--facebook--seamless-m4t-{SEAMLESSM4T_MODEL_SIZE}/snapshots"
available_model_versions = os.listdir(model_dir)
latest_model_version = sorted(available_model_versions)[-1]
model_name = f"multitask_unity_{SEAMLESSM4T_MODEL_SIZE}.pt"
model_path = os.path.join(os.getcwd(), model_dir, latest_model_version, model_name)
vocoder_dir = (
f"{SEAMLESS_MODEL_DIR}/models--facebook--seamless-m4t-vocoder/snapshots"
)
available_vocoder_versions = os.listdir(vocoder_dir)
latest_vocoder_version = sorted(available_vocoder_versions)[-1]
vocoder_name = "vocoder_36langs.pt"
vocoder_path = os.path.join(
os.getcwd(), vocoder_dir, latest_vocoder_version, vocoder_name
)
tokenizer_name = "tokenizer.model"
tokenizer_path = os.path.join(
os.getcwd(), model_dir, latest_model_version, tokenizer_name
)
model_yaml_data["checkpoint"] = f"file://{model_path}"
vocoder_yaml_data["checkpoint"] = f"file://{vocoder_path}"
unity_100_yaml_data["tokenizer"] = f"file://{tokenizer_path}"
unity_200_yaml_data["tokenizer"] = f"file://{tokenizer_path}"
with open(f"{CARDS_DIR}/seamlessM4T_{SEAMLESSM4T_MODEL_SIZE}.yaml", "w") as file:
yaml.dump(model_yaml_data, file)
with open(f"{CARDS_DIR}/vocoder_36langs.yaml", "w") as file:
yaml.dump(vocoder_yaml_data, file)
with open(f"{CARDS_DIR}/unity_nllb-100.yaml", "w") as file:
yaml.dump(unity_100_yaml_data, file)
with open(f"{CARDS_DIR}/unity_nllb-200.yaml", "w") as file:
yaml.dump(unity_200_yaml_data, file)
transcriber_image = (
Image.debian_slim(python_version="3.10.8")
.apt_install("git")
.apt_install("wget")
.apt_install("libsndfile-dev")
.pip_install(
"requests",
"torch",
"transformers==4.34.0",
"sentencepiece",
"protobuf",
"huggingface_hub==0.16.4",
"gitpython",
"torchaudio",
"fairseq2",
"pyyaml",
"hf-transfer~=0.1",
)
.run_function(install_seamless_communication)
.run_function(download_seamlessm4t_model)
.run_function(configure_seamless_m4t)
.env(
{
"LD_LIBRARY_PATH": (
"/usr/local/lib/python3.10/site-packages/nvidia/cudnn/lib/:"
"/opt/conda/lib/python3.10/site-packages/nvidia/cublas/lib/"
)
}
)
)
@app.cls(
gpu="A10G",
timeout=60 * 5,
scaledown_window=60 * 5,
allow_concurrent_inputs=4,
image=transcriber_image,
)
class Translator:
@enter()
def enter(self):
import torch
from seamless_communication.inference.translator import Translator
self.lock = threading.Lock()
self.use_gpu = torch.cuda.is_available()
self.device = "cuda" if self.use_gpu else "cpu"
self.translator = Translator(
SEAMLESSM4T_MODEL_CARD_NAME,
SEAMLESSM4T_VOCODER_CARD_NAME,
torch.device(self.device),
dtype=torch.float32,
)
@method()
def warmup(self):
return {"status": "ok"}
def get_seamless_lang_code(self, lang_code: str):
"""
The codes for SeamlessM4T is different from regular standards.
For ex, French is "fra" and not "fr".
"""
# TODO: Enhance with complete list of lang codes
seamless_lang_code = {
# Afrikaans
"af": "afr",
# Amharic
"am": "amh",
# Modern Standard Arabic
"ar": "arb",
# Moroccan Arabic
"ary": "ary",
# Egyptian Arabic
"arz": "arz",
# Assamese
"as": "asm",
# North Azerbaijani
"az": "azj",
# Belarusian
"be": "bel",
# Bengali
"bn": "ben",
# Bosnian
"bs": "bos",
# Bulgarian
"bg": "bul",
# Catalan
"ca": "cat",
# Cebuano
"ceb": "ceb",
# Czech
"cs": "ces",
# Central Kurdish
"ku": "ckb",
# Mandarin Chinese
"cmn": "cmn_Hant",
# Welsh
"cy": "cym",
# Danish
"da": "dan",
# German
"de": "deu",
# Greek
"el": "ell",
# English
"en": "eng",
# Estonian
"et": "est",
# Basque
"eu": "eus",
# Finnish
"fi": "fin",
# French
"fr": "fra",
# Irish
"ga": "gle",
# West Central Oromo,
"gaz": "gaz",
# Galician
"gl": "glg",
# Gujarati
"gu": "guj",
# Hebrew
"he": "heb",
# Hindi
"hi": "hin",
# Croatian
"hr": "hrv",
# Hungarian
"hu": "hun",
# Armenian
"hy": "hye",
# Igbo
"ig": "ibo",
# Indonesian
"id": "ind",
# Icelandic
"is": "isl",
# Italian
"it": "ita",
# Javanese
"jv": "jav",
# Japanese
"ja": "jpn",
# Kannada
"kn": "kan",
# Georgian
"ka": "kat",
# Kazakh
"kk": "kaz",
# Halh Mongolian
"khk": "khk",
# Khmer
"km": "khm",
# Kyrgyz
"ky": "kir",
# Korean
"ko": "kor",
# Lao
"lo": "lao",
# Lithuanian
"lt": "lit",
# Ganda
"lg": "lug",
# Luo
"luo": "luo",
# Standard Latvian
"lv": "lvs",
# Maithili
"mai": "mai",
# Malayalam
"ml": "mal",
# Marathi
"mr": "mar",
# Macedonian
"mk": "mkd",
# Maltese
"mt": "mlt",
# Meitei
"mni": "mni",
# Burmese
"my": "mya",
# Dutch
"nl": "nld",
# Norwegian Nynorsk
"nn": "nno",
# Norwegian Bokmål
"nb": "nob",
# Nepali
"ne": "npi",
# Nyanja
"ny": "nya",
# Odia
"or": "ory",
# Punjabi
"pa": "pan",
# Southern Pashto
"pbt": "pbt",
# Western Persian
"pes": "pes",
# Polish
"pl": "pol",
# Portuguese
"pt": "por",
# Romanian
"ro": "ron",
# Russian
"ru": "rus",
# Slovak
"sk": "slk",
# Slovenian
"sl": "slv",
# Shona
"sn": "sna",
# Sindhi
"sd": "snd",
# Somali
"so": "som",
# Spanish
"es": "spa",
# Serbian
"sr": "srp",
# Swedish
"sv": "swe",
# Swahili
"sw": "swh",
# Tamil
"ta": "tam",
# Telugu
"te": "tel",
# Tajik
"tg": "tgk",
# Tagalog
"tl": "tgl",
# Thai
"th": "tha",
# Turkish
"tr": "tur",
# Ukrainian
"uk": "ukr",
# Urdu
"ur": "urd",
# Northern Uzbek
"uz": "uzn",
# Vietnamese
"vi": "vie",
# Yoruba
"yo": "yor",
# Cantonese
"yue": "yue",
# Standard Malay
"ms": "zsm",
# Zulu
"zu": "zul",
}
return seamless_lang_code.get(lang_code, "eng")
@method()
def translate_text(self, text: str, source_language: str, target_language: str):
with self.lock:
translation_result, _ = self.translator.predict(
text,
"t2tt",
src_lang=self.get_seamless_lang_code(source_language),
tgt_lang=self.get_seamless_lang_code(target_language),
unit_generation_ngram_filtering=True,
)
translated_text = str(translation_result[0])
return {"text": {source_language: text, target_language: translated_text}}
# -------------------------------------------------------------------
# Web API
# -------------------------------------------------------------------
@app.function(
scaledown_window=60,
timeout=60,
allow_concurrent_inputs=40,
secrets=[
Secret.from_name("reflector-gpu"),
],
)
@asgi_app()
def web():
from fastapi import Body, Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from typing_extensions import Annotated
translatorstub = Translator()
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def apikey_auth(apikey: str = Depends(oauth2_scheme)):
if apikey != os.environ["REFLECTOR_GPU_APIKEY"]:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
headers={"WWW-Authenticate": "Bearer"},
)
class TranslateResponse(BaseModel):
result: dict
@app.post("/translate", dependencies=[Depends(apikey_auth)])
async def translate(
text: str,
source_language: Annotated[str, Body(...)] = "en",
target_language: Annotated[str, Body(...)] = "fr",
) -> TranslateResponse:
func = translatorstub.translate_text.spawn(
text=text,
source_language=source_language,
target_language=target_language,
)
result = func.get()
return result
return app