From 6c1869b79ac2487c3906a12e05b4d3eb1bba7e9d Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Fri, 13 Oct 2023 21:15:57 +0200 Subject: [PATCH 1/4] gpu: improve concurrency on modal - coauthored with Gokul (#286) --- server/gpu/modal/reflector_llm.py | 51 +++++++++++---------- server/gpu/modal/reflector_llm_zephyr.py | 56 ++++++++++++----------- server/gpu/modal/reflector_transcriber.py | 25 ++++++---- server/gpu/modal/reflector_translator.py | 26 +++++------ 4 files changed, 86 insertions(+), 72 deletions(-) diff --git a/server/gpu/modal/reflector_llm.py b/server/gpu/modal/reflector_llm.py index 7d7bb57d..02feedb7 100644 --- a/server/gpu/modal/reflector_llm.py +++ b/server/gpu/modal/reflector_llm.py @@ -5,6 +5,7 @@ Reflector GPU backend - LLM """ import json import os +import threading from typing import Optional import modal @@ -67,7 +68,7 @@ llm_image = ( gpu="A100", timeout=60 * 5, container_idle_timeout=60 * 5, - concurrency_limit=2, + allow_concurrent_inputs=15, image=llm_image, ) class LLM: @@ -108,6 +109,8 @@ class LLM: self.gen_cfg = gen_cfg self.GenerationConfig = GenerationConfig + self.lock = threading.Lock() + def __exit__(self, *args): print("Exit llm") @@ -123,30 +126,31 @@ class LLM: gen_cfg = self.gen_cfg # If a gen_schema is given, conform to gen_schema - if gen_schema: - import jsonformer + with self.lock: + if gen_schema: + import jsonformer - print(f"Schema {gen_schema=}") - jsonformer_llm = jsonformer.Jsonformer( - model=self.model, - tokenizer=self.tokenizer, - json_schema=json.loads(gen_schema), - prompt=prompt, - max_string_token_length=gen_cfg.max_new_tokens - ) - response = jsonformer_llm() - else: - # If no gen_schema, perform prompt only generation + print(f"Schema {gen_schema=}") + jsonformer_llm = jsonformer.Jsonformer( + model=self.model, + tokenizer=self.tokenizer, + json_schema=json.loads(gen_schema), + prompt=prompt, + max_string_token_length=gen_cfg.max_new_tokens + ) + response = jsonformer_llm() + else: + # If no gen_schema, perform prompt only generation - # tokenize prompt - input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to( - self.model.device - ) - output = self.model.generate(input_ids, generation_config=gen_cfg) + # tokenize prompt + input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to( + self.model.device + ) + output = self.model.generate(input_ids, generation_config=gen_cfg) - # decode output - response = self.tokenizer.decode(output[0].cpu(), skip_special_tokens=True) - response = response[len(prompt):] + # decode output + response = self.tokenizer.decode(output[0].cpu(), skip_special_tokens=True) + response = response[len(prompt):] print(f"Generated {response=}") return {"text": response} @@ -158,6 +162,7 @@ class LLM: @stub.function( container_idle_timeout=60 * 10, timeout=60 * 5, + allow_concurrent_inputs=45, secrets=[ Secret.from_name("reflector-gpu"), ], @@ -187,7 +192,7 @@ def web(): gen_cfg: Optional[dict] = None @app.post("/llm", dependencies=[Depends(apikey_auth)]) - async def llm( + def llm( req: LLMRequest, ): gen_schema = json.dumps(req.gen_schema) if req.gen_schema else None diff --git a/server/gpu/modal/reflector_llm_zephyr.py b/server/gpu/modal/reflector_llm_zephyr.py index 1000de4e..cbb436b0 100644 --- a/server/gpu/modal/reflector_llm_zephyr.py +++ b/server/gpu/modal/reflector_llm_zephyr.py @@ -5,6 +5,7 @@ Reflector GPU backend - LLM """ import json import os +import threading from typing import Optional import modal @@ -67,7 +68,7 @@ llm_image = ( gpu="A10G", timeout=60 * 5, container_idle_timeout=60 * 5, - concurrency_limit=2, + allow_concurrent_inputs=10, image=llm_image, ) class LLM: @@ -111,6 +112,7 @@ class LLM: self.tokenizer = tokenizer self.gen_cfg = gen_cfg self.GenerationConfig = GenerationConfig + self.lock = threading.Lock() def __exit__(self, *args): print("Exit llm") @@ -129,33 +131,34 @@ class LLM: gen_cfg = self.gen_cfg # If a gen_schema is given, conform to gen_schema - if gen_schema: - import jsonformer + with self.lock: + if gen_schema: + import jsonformer - print(f"Schema {gen_schema=}") - jsonformer_llm = jsonformer.Jsonformer( - model=self.model, - tokenizer=self.tokenizer, - json_schema=json.loads(gen_schema), - prompt=prompt, - max_string_token_length=gen_cfg.max_new_tokens - ) - response = jsonformer_llm() - else: - # If no gen_schema, perform prompt only generation + print(f"Schema {gen_schema=}") + jsonformer_llm = jsonformer.Jsonformer( + model=self.model, + tokenizer=self.tokenizer, + json_schema=json.loads(gen_schema), + prompt=prompt, + max_string_token_length=gen_cfg.max_new_tokens + ) + response = jsonformer_llm() + else: + # If no gen_schema, perform prompt only generation - # tokenize prompt - input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to( - self.model.device - ) - output = self.model.generate(input_ids, generation_config=gen_cfg) + # tokenize prompt + input_ids = self.tokenizer.encode(prompt, return_tensors="pt").to( + self.model.device + ) + output = self.model.generate(input_ids, generation_config=gen_cfg) - # decode output - response = self.tokenizer.decode(output[0].cpu(), skip_special_tokens=True) - response = response[len(prompt):] - response = { - "long_summary": response - } + # decode output + response = self.tokenizer.decode(output[0].cpu(), skip_special_tokens=True) + response = response[len(prompt):] + response = { + "long_summary": response + } print(f"Generated {response=}") return {"text": response} @@ -167,6 +170,7 @@ class LLM: @stub.function( container_idle_timeout=60 * 10, timeout=60 * 5, + allow_concurrent_inputs=30, secrets=[ Secret.from_name("reflector-gpu"), ], @@ -196,7 +200,7 @@ def web(): gen_cfg: Optional[dict] = None @app.post("/llm", dependencies=[Depends(apikey_auth)]) - async def llm( + def llm( req: LLMRequest, ): gen_schema = json.dumps(req.gen_schema) if req.gen_schema else None diff --git a/server/gpu/modal/reflector_transcriber.py b/server/gpu/modal/reflector_transcriber.py index 69558c8e..bee9ccd1 100644 --- a/server/gpu/modal/reflector_transcriber.py +++ b/server/gpu/modal/reflector_transcriber.py @@ -5,6 +5,7 @@ Reflector GPU backend - transcriber import os import tempfile +import threading from modal import Image, Secret, Stub, asgi_app, method from pydantic import BaseModel @@ -78,6 +79,7 @@ transcriber_image = ( gpu="A10G", timeout=60 * 5, container_idle_timeout=60 * 5, + allow_concurrent_inputs=6, image=transcriber_image, ) class Transcriber: @@ -85,6 +87,7 @@ class Transcriber: import faster_whisper import torch + self.lock = threading.Lock() self.use_gpu = torch.cuda.is_available() self.device = "cuda" if self.use_gpu else "cpu" self.model = faster_whisper.WhisperModel( @@ -106,14 +109,15 @@ class Transcriber: with tempfile.NamedTemporaryFile("wb+", suffix=f".{audio_suffix}") as fp: fp.write(audio_data) - segments, _ = self.model.transcribe( - fp.name, - language=source_language, - beam_size=5, - word_timestamps=True, - vad_filter=True, - vad_parameters={"min_silence_duration_ms": 500}, - ) + with self.lock: + segments, _ = self.model.transcribe( + fp.name, + language=source_language, + beam_size=5, + word_timestamps=True, + vad_filter=True, + vad_parameters={"min_silence_duration_ms": 500}, + ) multilingual_transcript = {} transcript_source_lang = "" @@ -147,6 +151,7 @@ class Transcriber: @stub.function( container_idle_timeout=60, timeout=60, + allow_concurrent_inputs=40, secrets=[ Secret.from_name("reflector-gpu"), ], @@ -176,12 +181,12 @@ def web(): result: dict @app.post("/transcribe", dependencies=[Depends(apikey_auth)]) - async def transcribe( + def transcribe( file: UploadFile, source_language: Annotated[str, Body(...)] = "en", timestamp: Annotated[float, Body()] = 0.0 ) -> TranscriptResponse: - audio_data = await file.read() + audio_data = file.file.read() audio_suffix = file.filename.split(".")[-1] assert audio_suffix in supported_audio_file_types diff --git a/server/gpu/modal/reflector_translator.py b/server/gpu/modal/reflector_translator.py index 69ea719a..6b035174 100644 --- a/server/gpu/modal/reflector_translator.py +++ b/server/gpu/modal/reflector_translator.py @@ -4,7 +4,7 @@ Reflector GPU backend - transcriber """ import os -import tempfile +import threading from modal import Image, Secret, Stub, asgi_app, method from pydantic import BaseModel @@ -129,6 +129,7 @@ transcriber_image = ( gpu="A10G", timeout=60 * 5, container_idle_timeout=60 * 5, + allow_concurrent_inputs=4, image=transcriber_image, ) class Translator: @@ -136,6 +137,7 @@ class Translator: import torch from seamless_communication.models.inference.translator import Translator + self.lock = threading.Lock() self.use_gpu = torch.cuda.is_available() self.device = "cuda" if self.use_gpu else "cpu" self.translator = Translator( @@ -168,13 +170,14 @@ class Translator: source_language: str, target_language: str ): - translated_text, _, _ = self.translator.predict( - text, - "t2tt", - src_lang=self.get_seamless_lang_code(source_language), - tgt_lang=self.get_seamless_lang_code(target_language), - ngram_filtering=True - ) + with self.lock: + translated_text, _, _ = self.translator.predict( + text, + "t2tt", + src_lang=self.get_seamless_lang_code(source_language), + tgt_lang=self.get_seamless_lang_code(target_language), + ngram_filtering=True + ) return { "text": { source_language: text, @@ -189,6 +192,7 @@ class Translator: @stub.function( container_idle_timeout=60, timeout=60, + allow_concurrent_inputs=40, secrets=[ Secret.from_name("reflector-gpu"), ], @@ -217,7 +221,7 @@ def web(): result: dict @app.post("/translate", dependencies=[Depends(apikey_auth)]) - async def translate( + def translate( text: str, source_language: Annotated[str, Body(...)] = "en", target_language: Annotated[str, Body(...)] = "fr", @@ -230,8 +234,4 @@ def web(): result = func.get() return result - @app.post("/warmup", dependencies=[Depends(apikey_auth)]) - async def warmup(): - return translatorstub.warmup.spawn().get() - return app From 9269db74c0e755f06948b1cfc5b5e182d2d8d362 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Fri, 13 Oct 2023 23:33:37 +0200 Subject: [PATCH 2/4] gpu: update format + list of country 2 to 3 --- server/gpu/modal/reflector_translator.py | 257 ++++++++++++++++++++--- 1 file changed, 222 insertions(+), 35 deletions(-) diff --git a/server/gpu/modal/reflector_translator.py b/server/gpu/modal/reflector_translator.py index 6b035174..2986e002 100644 --- a/server/gpu/modal/reflector_translator.py +++ b/server/gpu/modal/reflector_translator.py @@ -26,8 +26,11 @@ stub = Stub(name="reflector-translator") def install_seamless_communication(): import os import subprocess + initial_dir = os.getcwd() - subprocess.run(["ssh-keyscan", "-t", "rsa", "github.com", ">>", "~/.ssh/known_hosts"]) + subprocess.run( + ["ssh-keyscan", "-t", "rsa", "github.com", ">>", "~/.ssh/known_hosts"] + ) subprocess.run(["rm", "-rf", "seamless_communication"]) subprocess.run(["git", "clone", SEAMLESS_GITEPO, "." + "/seamless_communication"]) os.chdir("seamless_communication") @@ -54,13 +57,13 @@ def configure_seamless_m4t(): ASSETS_DIR: str = "./seamless_communication/src/seamless_communication/assets/cards" - with open(f'{ASSETS_DIR}/seamlessM4T_{SEAMLESSM4T_MODEL_SIZE}.yaml', 'r') as file: + with open(f"{ASSETS_DIR}/seamlessM4T_{SEAMLESSM4T_MODEL_SIZE}.yaml", "r") as file: model_yaml_data = yaml.load(file, Loader=yaml.FullLoader) - with open(f'{ASSETS_DIR}/vocoder_36langs.yaml', 'r') as file: + with open(f"{ASSETS_DIR}/vocoder_36langs.yaml", "r") as file: vocoder_yaml_data = yaml.load(file, Loader=yaml.FullLoader) - with open(f'{ASSETS_DIR}/unity_nllb-100.yaml', 'r') as file: + with open(f"{ASSETS_DIR}/unity_nllb-100.yaml", "r") as file: unity_100_yaml_data = yaml.load(file, Loader=yaml.FullLoader) - with open(f'{ASSETS_DIR}/unity_nllb-200.yaml', 'r') as file: + with open(f"{ASSETS_DIR}/unity_nllb-200.yaml", "r") as file: unity_200_yaml_data = yaml.load(file, Loader=yaml.FullLoader) model_dir = f"{SEAMLESS_MODEL_DIR}/models--facebook--seamless-m4t-{SEAMLESSM4T_MODEL_SIZE}/snapshots" @@ -69,27 +72,33 @@ def configure_seamless_m4t(): model_name = f"multitask_unity_{SEAMLESSM4T_MODEL_SIZE}.pt" model_path = os.path.join(os.getcwd(), model_dir, latest_model_version, model_name) - vocoder_dir = f"{SEAMLESS_MODEL_DIR}/models--facebook--seamless-m4t-vocoder/snapshots" + vocoder_dir = ( + f"{SEAMLESS_MODEL_DIR}/models--facebook--seamless-m4t-vocoder/snapshots" + ) available_vocoder_versions = os.listdir(vocoder_dir) latest_vocoder_version = sorted(available_vocoder_versions)[-1] vocoder_name = "vocoder_36langs.pt" - vocoder_path = os.path.join(os.getcwd(), vocoder_dir, latest_vocoder_version, vocoder_name) + vocoder_path = os.path.join( + os.getcwd(), vocoder_dir, latest_vocoder_version, vocoder_name + ) tokenizer_name = "tokenizer.model" - tokenizer_path = os.path.join(os.getcwd(), model_dir, latest_model_version, tokenizer_name) + tokenizer_path = os.path.join( + os.getcwd(), model_dir, latest_model_version, tokenizer_name + ) - model_yaml_data['checkpoint'] = f"file:/{model_path}" - vocoder_yaml_data['checkpoint'] = f"file:/{vocoder_path}" - unity_100_yaml_data['tokenizer'] = f"file:/{tokenizer_path}" - unity_200_yaml_data['tokenizer'] = f"file:/{tokenizer_path}" + model_yaml_data["checkpoint"] = f"file:/{model_path}" + vocoder_yaml_data["checkpoint"] = f"file:/{vocoder_path}" + unity_100_yaml_data["tokenizer"] = f"file:/{tokenizer_path}" + unity_200_yaml_data["tokenizer"] = f"file:/{tokenizer_path}" - with open(f'{ASSETS_DIR}/seamlessM4T_{SEAMLESSM4T_MODEL_SIZE}.yaml', 'w') as file: + with open(f"{ASSETS_DIR}/seamlessM4T_{SEAMLESSM4T_MODEL_SIZE}.yaml", "w") as file: yaml.dump(model_yaml_data, file) - with open(f'{ASSETS_DIR}/vocoder_36langs.yaml', 'w') as file: + with open(f"{ASSETS_DIR}/vocoder_36langs.yaml", "w") as file: yaml.dump(vocoder_yaml_data, file) - with open(f'{ASSETS_DIR}/unity_nllb-100.yaml', 'w') as file: + with open(f"{ASSETS_DIR}/unity_nllb-100.yaml", "w") as file: yaml.dump(unity_100_yaml_data, file) - with open(f'{ASSETS_DIR}/unity_nllb-200.yaml', 'w') as file: + with open(f"{ASSETS_DIR}/unity_nllb-200.yaml", "w") as file: yaml.dump(unity_200_yaml_data, file) @@ -109,7 +118,7 @@ transcriber_image = ( "torchaudio", "fairseq2", "pyyaml", - "hf-transfer~=0.1" + "hf-transfer~=0.1", ) .run_function(install_seamless_communication) .run_function(download_seamlessm4t_model) @@ -144,7 +153,7 @@ class Translator: SEAMLESSM4T_MODEL_CARD_NAME, SEAMLESSM4T_VOCODER_CARD_NAME, torch.device(self.device), - dtype=torch.float32 + dtype=torch.float32, ) @method() @@ -158,32 +167,210 @@ class Translator: """ # TODO: Enhance with complete list of lang codes seamless_lang_code = { + # Amharic + "am": "amh", + # Modern Standard Arabic + "ar": "arb", + # Moroccan Arabic + # (No 2-letter code) + # Egyptian Arabic + # (No 2-letter code) + # Assamese + "as": "asm", + # North Azerbaijani + "az": "azj", + # Belarusian + "be": "bel", + # Bengali + "bn": "ben", + # Bosnian + "bs": "bos", + # Bulgarian + "bg": "bul", + # Catalan + "ca": "cat", + # Cebuano + "ceb": "ceb", + # Czech + "cs": "ces", + # Central Kurdish + "ckb": "ckb", + # Mandarin Chinese (Simplified) + "zh": "cmn", + # Mandarin Chinese (Traditional) + # (No separate 2-letter code) + # Welsh + "cy": "cym", + # Danish + "da": "dan", + # German + "de": "deu", + # Greek + "el": "ell", + # English "en": "eng", - "fr": "fra" + # Estonian + "et": "est", + # Basque + "eu": "eus", + # Finnish + "fi": "fin", + # French + "fr": "fra", + # West Central Oromo + # (No 2-letter code) + # Irish + "ga": "gle", + # Galician + "gl": "glg", + # Gujarati + "gu": "guj", + # Hebrew + "he": "heb", + # Hindi + "hi": "hin", + # Croatian + "hr": "hrv", + # Hungarian + "hu": "hun", + # Armenian + "hy": "hye", + # Igbo + "ig": "ibo", + # Indonesian + "id": "ind", + # Icelandic + "is": "isl", + # Italian + "it": "ita", + # Javanese + "jv": "jav", + # Japanese + "ja": "jpn", + # Kannada + "kn": "kan", + # Georgian + "ka": "kat", + # Kazakh + "kk": "kaz", + # Halh Mongolian + # (No 2-letter code) + # Khmer + "km": "khm", + # Kyrgyz + "ky": "kir", + # Korean + "ko": "kor", + # Lao + "lo": "lao", + # Lithuanian + "lt": "lit", + # Ganda + "lg": "lug", + # Luo + "luo": "luo", + # Standard Latvian + "lv": "lvs", + # Maithili + # (No 2-letter code) + # Malayalam + "ml": "mal", + # Marathi + "mr": "mar", + # Macedonian + "mk": "mkd", + # Maltese + "mt": "mlt", + # Meitei + # (No 2-letter code) + # Burmese + "my": "mya", + # Dutch + "nl": "nld", + # Norwegian Nynorsk + "nn": "nno", + # Norwegian Bokmål + "nb": "nob", + # Nepali + "ne": "npi", + # Nyanja + "ny": "nya", + # Odia + "or": "ory", + # Punjabi + "pa": "pan", + # Southern Pashto + # (No 2-letter code) + # Western Persian + "fa": "pes", + # Polish + "pl": "pol", + # Portuguese + "pt": "por", + # Romanian + "ro": "ron", + # Russian + "ru": "rus", + # Slovak + "sk": "slk", + # Slovenian + "sl": "slv", + # Shona + "sn": "sna", + # Sindhi + "sd": "snd", + # Somali + "so": "som", + # Spanish + "es": "spa", + # Serbian + "sr": "srp", + # Swedish + "sv": "swe", + # Swahili + "sw": "swh", + # Tamil + "ta": "tam", + # Telugu + "te": "tel", + # Tajik + "tg": "tgk", + # Tagalog + "tl": "tgl", + # Thai + "th": "tha", + # Turkish + "tr": "tur", + # Ukrainian + "uk": "ukr", + # Urdu + "ur": "urd", + # Northern Uzbek + "uz": "uzn", + # Vietnamese + "vi": "vie", + # Yoruba + "yo": "yor", + # Cantonese + # (No separate 2-letter code) + # Zulu + "zu": "zul", } return seamless_lang_code.get(lang_code, "eng") @method() - def translate_text( - self, - text: str, - source_language: str, - target_language: str - ): + def translate_text(self, text: str, source_language: str, target_language: str): with self.lock: translated_text, _, _ = self.translator.predict( text, "t2tt", src_lang=self.get_seamless_lang_code(source_language), tgt_lang=self.get_seamless_lang_code(target_language), - ngram_filtering=True + ngram_filtering=True, ) - return { - "text": { - source_language: text, - target_language: str(translated_text) - } - } + return {"text": {source_language: text, target_language: str(translated_text)}} + + # ------------------------------------------------------------------- # Web API # ------------------------------------------------------------------- @@ -222,9 +409,9 @@ def web(): @app.post("/translate", dependencies=[Depends(apikey_auth)]) def translate( - text: str, - source_language: Annotated[str, Body(...)] = "en", - target_language: Annotated[str, Body(...)] = "fr", + text: str, + source_language: Annotated[str, Body(...)] = "en", + target_language: Annotated[str, Body(...)] = "fr", ) -> TranslateResponse: func = translatorstub.translate_text.spawn( text=text, From 79fa537c35d8a1543674edf1d9348ade69a8d1b8 Mon Sep 17 00:00:00 2001 From: Gokul Mohanarangan Date: Sat, 14 Oct 2023 18:08:16 +0530 Subject: [PATCH 3/4] update return format --- server/reflector/processors/types.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/reflector/processors/types.py b/server/reflector/processors/types.py index ce792b35..e867becf 100644 --- a/server/reflector/processors/types.py +++ b/server/reflector/processors/types.py @@ -317,6 +317,4 @@ class TranslationLanguages(BaseModel): return self.language_to_id_mapping.keys() def is_supported(self, lang_id: str) -> bool: - if lang_id in self.supported_languages: - return True - return False + return lang_id in self.supported_languages From c1a9005ec301380082d5feb268a84686c6b72082 Mon Sep 17 00:00:00 2001 From: Gokul Mohanarangan Date: Sat, 14 Oct 2023 18:55:40 +0530 Subject: [PATCH 4/4] update buller condition --- server/reflector/processors/transcript_final_long_summary.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/reflector/processors/transcript_final_long_summary.py b/server/reflector/processors/transcript_final_long_summary.py index 45bbcafb..57e36636 100644 --- a/server/reflector/processors/transcript_final_long_summary.py +++ b/server/reflector/processors/transcript_final_long_summary.py @@ -79,7 +79,7 @@ class TranscriptFinalLongSummaryProcessor(Processor): sentence = str(sentence).strip() if sentence.startswith("- "): sentence.replace("- ", "* ") - else: + elif not sentence.startswith("*"): sentence = "* " + sentence sentence += " \n" summary_sentences.append(sentence)