From 1522d60cbc5f17bdce1c7e285883123db4fe10be Mon Sep 17 00:00:00 2001 From: projects-g <63178974+projects-g@users.noreply.github.com> Date: Tue, 30 Jan 2024 20:51:58 +0530 Subject: [PATCH] Big file upload (#349) --- .pre-commit-config.yaml | 2 +- server/poetry.lock | 52 +++++++++---------- server/pyproject.toml | 3 +- .../reflector/pipelines/main_live_pipeline.py | 12 ++--- server/reflector/pipelines/runner.py | 26 +++++++--- server/reflector/utils/text_utils.py | 6 +-- server/reflector/views/rtc_offer.py | 4 +- server/reflector/views/transcripts_audio.py | 1 + .../views/transcripts_participants.py | 1 + server/reflector/views/transcripts_speaker.py | 1 + .../reflector/views/transcripts_websocket.py | 1 + 11 files changed, 64 insertions(+), 45 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 19d82472..77be7317 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,7 +19,7 @@ repos: - id: detect-private-key - repo: https://github.com/psf/black - rev: 23.1.0 + rev: 24.1.1 hooks: - id: black files: ^server/(reflector|tests)/ diff --git a/server/poetry.lock b/server/poetry.lock index 5bf722b9..b69f717a 100644 --- a/server/poetry.lock +++ b/server/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "aioboto3" @@ -488,33 +488,33 @@ files = [ [[package]] name = "black" -version = "23.9.1" +version = "24.1.1" description = "The uncompromising code formatter." optional = false python-versions = ">=3.8" files = [ - {file = "black-23.9.1-cp310-cp310-macosx_10_16_arm64.whl", hash = "sha256:d6bc09188020c9ac2555a498949401ab35bb6bf76d4e0f8ee251694664df6301"}, - {file = "black-23.9.1-cp310-cp310-macosx_10_16_universal2.whl", hash = "sha256:13ef033794029b85dfea8032c9d3b92b42b526f1ff4bf13b2182ce4e917f5100"}, - {file = "black-23.9.1-cp310-cp310-macosx_10_16_x86_64.whl", hash = "sha256:75a2dc41b183d4872d3a500d2b9c9016e67ed95738a3624f4751a0cb4818fe71"}, - {file = "black-23.9.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13a2e4a93bb8ca74a749b6974925c27219bb3df4d42fc45e948a5d9feb5122b7"}, - {file = "black-23.9.1-cp310-cp310-win_amd64.whl", hash = "sha256:adc3e4442eef57f99b5590b245a328aad19c99552e0bdc7f0b04db6656debd80"}, - {file = "black-23.9.1-cp311-cp311-macosx_10_16_arm64.whl", hash = "sha256:8431445bf62d2a914b541da7ab3e2b4f3bc052d2ccbf157ebad18ea126efb91f"}, - {file = "black-23.9.1-cp311-cp311-macosx_10_16_universal2.whl", hash = "sha256:8fc1ddcf83f996247505db6b715294eba56ea9372e107fd54963c7553f2b6dfe"}, - {file = "black-23.9.1-cp311-cp311-macosx_10_16_x86_64.whl", hash = "sha256:7d30ec46de88091e4316b17ae58bbbfc12b2de05e069030f6b747dfc649ad186"}, - {file = "black-23.9.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:031e8c69f3d3b09e1aa471a926a1eeb0b9071f80b17689a655f7885ac9325a6f"}, - {file = "black-23.9.1-cp311-cp311-win_amd64.whl", hash = "sha256:538efb451cd50f43aba394e9ec7ad55a37598faae3348d723b59ea8e91616300"}, - {file = "black-23.9.1-cp38-cp38-macosx_10_16_arm64.whl", hash = "sha256:638619a559280de0c2aa4d76f504891c9860bb8fa214267358f0a20f27c12948"}, - {file = "black-23.9.1-cp38-cp38-macosx_10_16_universal2.whl", hash = "sha256:a732b82747235e0542c03bf352c126052c0fbc458d8a239a94701175b17d4855"}, - {file = "black-23.9.1-cp38-cp38-macosx_10_16_x86_64.whl", hash = "sha256:cf3a4d00e4cdb6734b64bf23cd4341421e8953615cba6b3670453737a72ec204"}, - {file = "black-23.9.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:cf99f3de8b3273a8317681d8194ea222f10e0133a24a7548c73ce44ea1679377"}, - {file = "black-23.9.1-cp38-cp38-win_amd64.whl", hash = "sha256:14f04c990259576acd093871e7e9b14918eb28f1866f91968ff5524293f9c573"}, - {file = "black-23.9.1-cp39-cp39-macosx_10_16_arm64.whl", hash = "sha256:c619f063c2d68f19b2d7270f4cf3192cb81c9ec5bc5ba02df91471d0b88c4c5c"}, - {file = "black-23.9.1-cp39-cp39-macosx_10_16_universal2.whl", hash = "sha256:6a3b50e4b93f43b34a9d3ef00d9b6728b4a722c997c99ab09102fd5efdb88325"}, - {file = "black-23.9.1-cp39-cp39-macosx_10_16_x86_64.whl", hash = "sha256:c46767e8df1b7beefb0899c4a95fb43058fa8500b6db144f4ff3ca38eb2f6393"}, - {file = "black-23.9.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:50254ebfa56aa46a9fdd5d651f9637485068a1adf42270148cd101cdf56e0ad9"}, - {file = "black-23.9.1-cp39-cp39-win_amd64.whl", hash = "sha256:403397c033adbc45c2bd41747da1f7fc7eaa44efbee256b53842470d4ac5a70f"}, - {file = "black-23.9.1-py3-none-any.whl", hash = "sha256:6ccd59584cc834b6d127628713e4b6b968e5f79572da66284532525a042549f9"}, - {file = "black-23.9.1.tar.gz", hash = "sha256:24b6b3ff5c6d9ea08a8888f6977eae858e1f340d7260cf56d70a49823236b62d"}, + {file = "black-24.1.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:2588021038bd5ada078de606f2a804cadd0a3cc6a79cb3e9bb3a8bf581325a4c"}, + {file = "black-24.1.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:1a95915c98d6e32ca43809d46d932e2abc5f1f7d582ffbe65a5b4d1588af7445"}, + {file = "black-24.1.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2fa6a0e965779c8f2afb286f9ef798df770ba2b6cee063c650b96adec22c056a"}, + {file = "black-24.1.1-cp310-cp310-win_amd64.whl", hash = "sha256:5242ecd9e990aeb995b6d03dc3b2d112d4a78f2083e5a8e86d566340ae80fec4"}, + {file = "black-24.1.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:fc1ec9aa6f4d98d022101e015261c056ddebe3da6a8ccfc2c792cbe0349d48b7"}, + {file = "black-24.1.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:0269dfdea12442022e88043d2910429bed717b2d04523867a85dacce535916b8"}, + {file = "black-24.1.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b3d64db762eae4a5ce04b6e3dd745dcca0fb9560eb931a5be97472e38652a161"}, + {file = "black-24.1.1-cp311-cp311-win_amd64.whl", hash = "sha256:5d7b06ea8816cbd4becfe5f70accae953c53c0e53aa98730ceccb0395520ee5d"}, + {file = "black-24.1.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:e2c8dfa14677f90d976f68e0c923947ae68fa3961d61ee30976c388adc0b02c8"}, + {file = "black-24.1.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a21725862d0e855ae05da1dd25e3825ed712eaaccef6b03017fe0853a01aa45e"}, + {file = "black-24.1.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:07204d078e25327aad9ed2c64790d681238686bce254c910de640c7cc4fc3aa6"}, + {file = "black-24.1.1-cp312-cp312-win_amd64.whl", hash = "sha256:a83fe522d9698d8f9a101b860b1ee154c1d25f8a82ceb807d319f085b2627c5b"}, + {file = "black-24.1.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:08b34e85170d368c37ca7bf81cf67ac863c9d1963b2c1780c39102187ec8dd62"}, + {file = "black-24.1.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:7258c27115c1e3b5de9ac6c4f9957e3ee2c02c0b39222a24dc7aa03ba0e986f5"}, + {file = "black-24.1.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:40657e1b78212d582a0edecafef133cf1dd02e6677f539b669db4746150d38f6"}, + {file = "black-24.1.1-cp38-cp38-win_amd64.whl", hash = "sha256:e298d588744efda02379521a19639ebcd314fba7a49be22136204d7ed1782717"}, + {file = "black-24.1.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:34afe9da5056aa123b8bfda1664bfe6fb4e9c6f311d8e4a6eb089da9a9173bf9"}, + {file = "black-24.1.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:854c06fb86fd854140f37fb24dbf10621f5dab9e3b0c29a690ba595e3d543024"}, + {file = "black-24.1.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3897ae5a21ca132efa219c029cce5e6bfc9c3d34ed7e892113d199c0b1b444a2"}, + {file = "black-24.1.1-cp39-cp39-win_amd64.whl", hash = "sha256:ecba2a15dfb2d97105be74bbfe5128bc5e9fa8477d8c46766505c1dda5883aac"}, + {file = "black-24.1.1-py3-none-any.whl", hash = "sha256:5cdc2e2195212208fbcae579b931407c1fa9997584f0a415421748aeafff1168"}, + {file = "black-24.1.1.tar.gz", hash = "sha256:48b5760dcbfe5cf97fd4fba23946681f3a81514c6ab8a45b50da67ac8fbc6c7b"}, ] [package.dependencies] @@ -526,7 +526,7 @@ platformdirs = ">=2" [package.extras] colorama = ["colorama (>=0.4.3)"] -d = ["aiohttp (>=3.7.4)"] +d = ["aiohttp (>=3.7.4)", "aiohttp (>=3.7.4,!=3.9.0)"] jupyter = ["ipython (>=7.8.0)", "tokenize-rt (>=3.2.0)"] uvloop = ["uvloop (>=0.15.2)"] @@ -4296,4 +4296,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "a0d9362ca5fc6e5c310bba39ce9bac720880d4dbb884a9f6625e51c952c54ffa" +content-hash = "8b64f7a5e8282cedf8f508c9f85ed233222045bdccc49d7c4ea96cf4bf8f902b" diff --git a/server/pyproject.toml b/server/pyproject.toml index a28ba42f..6cdda4cb 100644 --- a/server/pyproject.toml +++ b/server/pyproject.toml @@ -37,10 +37,11 @@ python-jose = {extras = ["cryptography"], version = "^3.3.0"} python-multipart = "^0.0.6" faster-whisper = "^0.10.0" transformers = "^4.36.2" +black = "24.1.1" [tool.poetry.group.dev.dependencies] -black = "^23.7.0" +black = "^24.1.1" stamina = "^23.1.0" pyinstrument = "^4.6.1" diff --git a/server/reflector/pipelines/main_live_pipeline.py b/server/reflector/pipelines/main_live_pipeline.py index 20743391..2b87f23a 100644 --- a/server/reflector/pipelines/main_live_pipeline.py +++ b/server/reflector/pipelines/main_live_pipeline.py @@ -384,8 +384,8 @@ class PipelineMainDiarization(PipelineMainBase): # to let the start just do one job. pipeline.logger.bind(transcript_id=transcript.id) pipeline.logger.info("Diarization pipeline created") - self.push(audio_diarization_input) - self.flush() + await self.push(audio_diarization_input) + await self.flush() return pipeline @@ -414,9 +414,9 @@ class PipelineMainFromTopics(PipelineMainBase): # push topics topics = self.get_transcript_topics(transcript) for topic in topics: - self.push(topic) + await self.push(topic) - self.flush() + await self.flush() return pipeline @@ -653,10 +653,10 @@ async def pipeline_upload(transcript: Transcript, logger: Logger): try: logger.info("Start pushing audio into the pipeline") for frame in container.decode(audio=0): - pipeline.push(frame) + await pipeline.push(frame) finally: logger.info("Flushing the pipeline") - pipeline.flush() + await pipeline.flush() logger.info("Waiting for the pipeline to end") await pipeline.join() diff --git a/server/reflector/pipelines/runner.py b/server/reflector/pipelines/runner.py index 0edf156c..ed3118ae 100644 --- a/server/reflector/pipelines/runner.py +++ b/server/reflector/pipelines/runner.py @@ -66,17 +66,17 @@ class PipelineRunner(BaseModel): coro = self.run() asyncio.run(coro) - def push(self, data): + async def push(self, data): """ Push data to the pipeline """ - self._add_cmd("PUSH", data) + await self._add_cmd("PUSH", data) - def flush(self): + async def flush(self): """ Flush the pipeline """ - self._add_cmd("FLUSH", None) + await self._add_cmd("FLUSH", None) async def on_status(self, status): """ @@ -90,12 +90,26 @@ class PipelineRunner(BaseModel): """ pass - def _add_cmd(self, cmd: str, data): + async def _add_cmd( + self, cmd: str, data, max_retries: int = 3, retry_time_limit: int = 3 + ): """ Enqueue a command to be executed in the runner. Currently supported commands: PUSH, FLUSH """ - self._q_cmd.put_nowait([cmd, data]) + for _ in range(max_retries): + try: + self._q_cmd.put_nowait([cmd, data]) + break # Break if put succeeds + except asyncio.queues.QueueFull: + # Handle only the QueueFull exception, retry after a small delay + self._logger.debug( + f"Encountered a full queue, while trying to add [{cmd, data}]. " + f"Retrying in {retry_time_limit} seconds" + ) + await asyncio.sleep(retry_time_limit) + else: + print(f"Failed to add [{cmd, data}] after {max_retries} attempts.") async def _set_status(self, status): self._logger.debug("Runner status updated", status=status) diff --git a/server/reflector/utils/text_utils.py b/server/reflector/utils/text_utils.py index 01cb671d..1ea5e8d4 100644 --- a/server/reflector/utils/text_utils.py +++ b/server/reflector/utils/text_utils.py @@ -1,20 +1,20 @@ """ Utility file for all text processing related functionalities """ + import datetime from typing import List import nltk import torch +from log_utils import LOGGER from nltk.corpus import stopwords from nltk.tokenize import word_tokenize +from run_utils import CONFIG from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from transformers import BartForConditionalGeneration, BartTokenizer -from log_utils import LOGGER -from run_utils import CONFIG - nltk.download("punkt", quiet=True) diff --git a/server/reflector/views/rtc_offer.py b/server/reflector/views/rtc_offer.py index 386ada9c..3f0ca1ac 100644 --- a/server/reflector/views/rtc_offer.py +++ b/server/reflector/views/rtc_offer.py @@ -40,7 +40,7 @@ class AudioStreamTrack(MediaStreamTrack): ctx = self.ctx frame = await self.track.recv() try: - ctx.pipeline_runner.push(frame) + await ctx.pipeline_runner.push(frame) except Exception as e: ctx.logger.error("Pipeline error", error=e) return frame @@ -76,7 +76,7 @@ async def rtc_offer_base( # - when we receive the close event, we do nothing. # 2. or the client close the connection # and there is nothing to do because it is already closed - ctx.pipeline_runner.flush() + await ctx.pipeline_runner.flush() if close: ctx.logger.debug("Closing peer connection") await pc.close() diff --git a/server/reflector/views/transcripts_audio.py b/server/reflector/views/transcripts_audio.py index 7b3655fb..45d4eccc 100644 --- a/server/reflector/views/transcripts_audio.py +++ b/server/reflector/views/transcripts_audio.py @@ -3,6 +3,7 @@ Transcripts audio related endpoints =================================== """ + from typing import Annotated, Optional import httpx diff --git a/server/reflector/views/transcripts_participants.py b/server/reflector/views/transcripts_participants.py index d62d1dbf..f55668e9 100644 --- a/server/reflector/views/transcripts_participants.py +++ b/server/reflector/views/transcripts_participants.py @@ -3,6 +3,7 @@ Transcript participants API endpoints ===================================== """ + from typing import Annotated, Optional import reflector.auth as auth diff --git a/server/reflector/views/transcripts_speaker.py b/server/reflector/views/transcripts_speaker.py index 0bddad5e..89b20fa0 100644 --- a/server/reflector/views/transcripts_speaker.py +++ b/server/reflector/views/transcripts_speaker.py @@ -3,6 +3,7 @@ Reassign speakers in a transcript ================================= """ + from typing import Annotated, Optional import reflector.auth as auth diff --git a/server/reflector/views/transcripts_websocket.py b/server/reflector/views/transcripts_websocket.py index 65571aab..04f71a0c 100644 --- a/server/reflector/views/transcripts_websocket.py +++ b/server/reflector/views/transcripts_websocket.py @@ -3,6 +3,7 @@ Transcripts websocket API ========================= """ + from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect from reflector.db.transcripts import transcripts_controller from reflector.ws_manager import get_ws_manager