update audio-deleted flow

This commit is contained in:
Igor Loskutov
2025-06-18 15:43:34 -04:00
parent 6cb6d90b9a
commit c23e0e07ef
15 changed files with 282 additions and 76 deletions

View File

@@ -59,6 +59,13 @@ from reflector.zulip import (
send_message_to_zulip,
update_zulip_message,
)
from reflector.db.meetings import meetings_controller, meeting_consent_controller
from reflector.db.recordings import recordings_controller
from reflector.storage import get_transcripts_storage
import boto3
from structlog import BoundLogger as Logger
@@ -470,6 +477,7 @@ class PipelineMainWaveform(PipelineMainFromTopics):
@get_transcript
async def pipeline_remove_upload(transcript: Transcript, logger: Logger):
# for future changes: note that there's also a consent process happens, beforehand and users may not consent with keeping files. currently, we delete regardless, so it's no need for that
logger.info("Starting remove upload")
uploads = transcript.data_path.glob("upload.*")
for upload in uploads:
@@ -520,6 +528,10 @@ async def pipeline_upload_mp3(transcript: Transcript, logger: Logger):
logger.info("No storage backend configured, skipping mp3 upload")
return
if transcript.audio_deleted:
logger.info("Skipping MP3 upload - audio marked as deleted")
return
logger.info("Starting upload mp3")
# If the audio mp3 is not available, just skip
@@ -558,6 +570,67 @@ async def pipeline_summaries(transcript: Transcript, logger: Logger):
logger.info("Summaries done")
@get_transcript
async def cleanup_consent(transcript: Transcript, logger: Logger):
logger.info("Starting consent cleanup")
consent_denied = False
recording = None
try:
if transcript.recording_id:
recording = await recordings_controller.get_by_id(transcript.recording_id)
if recording and recording.meeting_id:
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if meeting:
consent_denied = await meeting_consent_controller.has_any_denial(meeting.id)
except Exception as e:
logger.error(f"Failed to get fetch consent: {e}")
consent_denied = True
if not consent_denied:
logger.info("Consent approved, keeping all files")
return
logger.info("Consent denied, cleaning up all related audio files")
# 1. Delete original Whereby recording from S3
if recording and recording.s3_bucket and recording.s3_key:
s3_whereby = boto3.client(
"s3",
aws_access_key_id=settings.AWS_WHEREBY_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_WHEREBY_ACCESS_KEY_SECRET,
)
try:
s3_whereby.delete_object(Bucket=recording.s3_bucket, Key=recording.s3_key)
logger.info(f"Deleted original Whereby recording: {recording.s3_bucket}/{recording.s3_key}")
except Exception as e:
logger.error(f"Failed to delete Whereby recording: {e}")
# non-transactional, files marked for deletion not actually deleted is possible
await transcripts_controller.update(transcript, {"audio_deleted": True})
# 2. Delete processed audio from transcript storage S3 bucket
if transcript.audio_location == "storage":
storage = get_transcripts_storage()
try:
await storage.delete_file(transcript.storage_audio_path)
logger.info(f"Deleted processed audio from storage: {transcript.storage_audio_path}")
except Exception as e:
logger.error(f"Failed to delete processed audio: {e}")
# 3. Delete local audio files
try:
if hasattr(transcript, 'audio_mp3_filename') and transcript.audio_mp3_filename:
transcript.audio_mp3_filename.unlink(missing_ok=True)
if hasattr(transcript, 'audio_wav_filename') and transcript.audio_wav_filename:
transcript.audio_wav_filename.unlink(missing_ok=True)
except Exception as e:
logger.error(f"Failed to delete local audio files: {e}")
logger.info("Consent cleanup done")
@get_transcript
async def pipeline_post_to_zulip(transcript: Transcript, logger: Logger):
logger.info("Starting post to zulip")
@@ -659,6 +732,12 @@ async def task_pipeline_final_summaries(*, transcript_id: str):
await pipeline_summaries(transcript_id=transcript_id)
@shared_task
@asynctask
async def task_cleanup_consent(*, transcript_id: str):
await cleanup_consent(transcript_id=transcript_id)
@shared_task
@asynctask
async def task_pipeline_post_to_zulip(*, transcript_id: str):
@@ -675,6 +754,7 @@ def pipeline_post(*, transcript_id: str):
| task_pipeline_upload_mp3.si(transcript_id=transcript_id)
| task_pipeline_remove_upload.si(transcript_id=transcript_id)
| task_pipeline_diarization.si(transcript_id=transcript_id)
| task_cleanup_consent.si(transcript_id=transcript_id)
)
chain_title_preview = task_pipeline_title.si(transcript_id=transcript_id)
chain_final_summaries = task_pipeline_final_summaries.si(