diff --git a/server/reflector/db/transcripts.py b/server/reflector/db/transcripts.py index 16bf8b0a..c8d364b2 100644 --- a/server/reflector/db/transcripts.py +++ b/server/reflector/db/transcripts.py @@ -538,18 +538,29 @@ class TranscriptController: Move mp3 file to storage """ - # store the audio on external storage - await get_storage().put_file( - transcript.storage_audio_path, - transcript.audio_mp3_filename.read_bytes(), - ) + if transcript.audio_location == "local": + # store the audio on external storage if it's not already there + await get_storage().put_file( + transcript.storage_audio_path, + transcript.audio_mp3_filename.read_bytes(), + ) - # indicate on the transcript that the audio is now on storage - await self.update(transcript, {"audio_location": "storage"}) + # indicate on the transcript that the audio is now on storage + await self.update(transcript, {"audio_location": "storage"}) # unlink the local file transcript.audio_mp3_filename.unlink(missing_ok=True) + async def download_mp3_from_storage(self, transcript: Transcript): + """ + Download audio from storage + """ + transcript.audio_mp3_filename.write_bytes( + await get_storage().get_file( + transcript.storage_audio_path, + ) + ) + async def upsert_participant( self, transcript: Transcript, diff --git a/server/reflector/pipelines/main_live_pipeline.py b/server/reflector/pipelines/main_live_pipeline.py index 7377e7a4..f81e37b7 100644 --- a/server/reflector/pipelines/main_live_pipeline.py +++ b/server/reflector/pipelines/main_live_pipeline.py @@ -663,6 +663,9 @@ async def pipeline_process(transcript: Transcript, logger: Logger): import av try: + if transcript.audio_location == "storage": + await transcripts_controller.download_mp3_from_storage(transcript) + # open audio audio_filename = next(transcript.data_path.glob("upload.*"), None) if audio_filename and transcript.status != "uploaded": diff --git a/server/reflector/storage/base.py b/server/reflector/storage/base.py index a457ddf8..b51f11f6 100644 --- a/server/reflector/storage/base.py +++ b/server/reflector/storage/base.py @@ -52,3 +52,9 @@ class Storage: async def _get_file_url(self, filename: str) -> str: raise NotImplementedError + + async def get_file(self, filename: str): + return await self._get_file(filename) + + async def _get_file(self, filename: str): + raise NotImplementedError diff --git a/server/reflector/storage/storage_aws.py b/server/reflector/storage/storage_aws.py index d2313293..b8e47f42 100644 --- a/server/reflector/storage/storage_aws.py +++ b/server/reflector/storage/storage_aws.py @@ -65,5 +65,14 @@ class AwsStorage(Storage): async with self.session.client("s3") as client: await client.delete_object(Bucket=bucket, Key=s3filename) + async def _get_file(self, filename: str): + bucket = self.aws_bucket_name + folder = self.aws_folder + logger.info(f"Downloading {filename} from S3 {bucket}/{folder}") + s3filename = f"{folder}/{filename}" if folder else filename + async with self.session.client("s3") as client: + response = await client.get_object(Bucket=bucket, Key=s3filename) + return await response["Body"].read() + Storage.register("aws", AwsStorage)