Big file upload (#349)

This commit is contained in:
projects-g
2024-01-30 20:51:58 +05:30
committed by GitHub
parent 529984c198
commit 1522d60cbc
11 changed files with 64 additions and 45 deletions

View File

@@ -384,8 +384,8 @@ class PipelineMainDiarization(PipelineMainBase):
# to let the start just do one job.
pipeline.logger.bind(transcript_id=transcript.id)
pipeline.logger.info("Diarization pipeline created")
self.push(audio_diarization_input)
self.flush()
await self.push(audio_diarization_input)
await self.flush()
return pipeline
@@ -414,9 +414,9 @@ class PipelineMainFromTopics(PipelineMainBase):
# push topics
topics = self.get_transcript_topics(transcript)
for topic in topics:
self.push(topic)
await self.push(topic)
self.flush()
await self.flush()
return pipeline
@@ -653,10 +653,10 @@ async def pipeline_upload(transcript: Transcript, logger: Logger):
try:
logger.info("Start pushing audio into the pipeline")
for frame in container.decode(audio=0):
pipeline.push(frame)
await pipeline.push(frame)
finally:
logger.info("Flushing the pipeline")
pipeline.flush()
await pipeline.flush()
logger.info("Waiting for the pipeline to end")
await pipeline.join()

View File

@@ -66,17 +66,17 @@ class PipelineRunner(BaseModel):
coro = self.run()
asyncio.run(coro)
def push(self, data):
async def push(self, data):
"""
Push data to the pipeline
"""
self._add_cmd("PUSH", data)
await self._add_cmd("PUSH", data)
def flush(self):
async def flush(self):
"""
Flush the pipeline
"""
self._add_cmd("FLUSH", None)
await self._add_cmd("FLUSH", None)
async def on_status(self, status):
"""
@@ -90,12 +90,26 @@ class PipelineRunner(BaseModel):
"""
pass
def _add_cmd(self, cmd: str, data):
async def _add_cmd(
self, cmd: str, data, max_retries: int = 3, retry_time_limit: int = 3
):
"""
Enqueue a command to be executed in the runner.
Currently supported commands: PUSH, FLUSH
"""
self._q_cmd.put_nowait([cmd, data])
for _ in range(max_retries):
try:
self._q_cmd.put_nowait([cmd, data])
break # Break if put succeeds
except asyncio.queues.QueueFull:
# Handle only the QueueFull exception, retry after a small delay
self._logger.debug(
f"Encountered a full queue, while trying to add [{cmd, data}]. "
f"Retrying in {retry_time_limit} seconds"
)
await asyncio.sleep(retry_time_limit)
else:
print(f"Failed to add [{cmd, data}] after {max_retries} attempts.")
async def _set_status(self, status):
self._logger.debug("Runner status updated", status=status)