diff --git a/server/reflector/hatchet/workflows/diarization_pipeline.py b/server/reflector/hatchet/workflows/diarization_pipeline.py index 08f88554..f5f5b4b4 100644 --- a/server/reflector/hatchet/workflows/diarization_pipeline.py +++ b/server/reflector/hatchet/workflows/diarization_pipeline.py @@ -455,25 +455,6 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul transcripts_controller, ) - # Cleanup temporary padded S3 files (deferred until after mixdown) - track_result = ctx.task_output(process_tracks) - created_padded_files = track_result.created_padded_files - if created_padded_files: - ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files") - storage = _spawn_storage() - cleanup_tasks = [] - for storage_path in created_padded_files: - cleanup_tasks.append(storage.delete_file(storage_path)) - - cleanup_results = await asyncio.gather(*cleanup_tasks, return_exceptions=True) - for storage_path, result in zip(created_padded_files, cleanup_results): - if isinstance(result, Exception): - logger.warning( - "[Hatchet] Failed to cleanup temporary padded track", - storage_path=storage_path, - error=str(result), - ) - mixdown_result = ctx.task_output(mixdown_tracks) audio_key = mixdown_result.audio_key @@ -714,6 +695,23 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: duration = mixdown_result.duration all_words = track_result.all_words + # Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery) + created_padded_files = track_result.created_padded_files + if created_padded_files: + ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files") + storage = _spawn_storage() + cleanup_results = await asyncio.gather( + *[storage.delete_file(path) for path in created_padded_files], + return_exceptions=True, + ) + for storage_path, result in zip(created_padded_files, cleanup_results): + if isinstance(result, Exception): + logger.warning( + "[Hatchet] Failed to cleanup temporary padded track", + storage_path=storage_path, + error=str(result), + ) + async with fresh_db_connection(): from reflector.db.transcripts import ( # noqa: PLC0415 TranscriptDuration, @@ -767,22 +765,97 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: ) @with_error_handling("cleanup_consent", set_error_status=False) async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult: - """Check and handle consent requirements.""" + """Check consent and delete audio files if any participant denied.""" ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}") async with fresh_db_connection(): - from reflector.db.meetings import meetings_controller # noqa: PLC0415 + from reflector.db.meetings import ( # noqa: PLC0415 + meeting_consent_controller, + meetings_controller, + ) + from reflector.db.recordings import recordings_controller # noqa: PLC0415 from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 + from reflector.storage import get_transcripts_storage # noqa: PLC0415 transcript = await transcripts_controller.get_by_id(input.transcript_id) - if transcript and transcript.meeting_id: + if not transcript: + ctx.log("cleanup_consent: transcript not found") + return ConsentResult() + + consent_denied = False + if transcript.meeting_id: meeting = await meetings_controller.get_by_id(transcript.meeting_id) if meeting: - # Check consent logic here - # For now just mark as checked - pass + consent_denied = await meeting_consent_controller.has_any_denial( + meeting.id + ) - ctx.log(f"cleanup_consent complete: transcript_id={input.transcript_id}") + if not consent_denied: + ctx.log("cleanup_consent: consent approved, keeping all files") + return ConsentResult() + + ctx.log("cleanup_consent: consent denied, deleting audio files") + + input_track_keys = set(t["s3_key"] for t in input.tracks) + + # Detect if recording.track_keys was manually modified after workflow started + if transcript.recording_id: + recording = await recordings_controller.get_by_id(transcript.recording_id) + if recording and recording.track_keys: + db_track_keys = set(filter_cam_audio_tracks(recording.track_keys)) + + if input_track_keys != db_track_keys: + added = db_track_keys - input_track_keys + removed = input_track_keys - db_track_keys + logger.warning( + "[Hatchet] Track keys mismatch: DB changed since workflow start", + transcript_id=input.transcript_id, + recording_id=transcript.recording_id, + input_count=len(input_track_keys), + db_count=len(db_track_keys), + added_in_db=list(added) if added else None, + removed_from_db=list(removed) if removed else None, + ) + ctx.log( + f"WARNING: track_keys mismatch - " + f"input has {len(input_track_keys)}, DB has {len(db_track_keys)}. " + f"Using input tracks for deletion." + ) + + deletion_errors = [] + + if input_track_keys and input.bucket_name: + master_storage = get_transcripts_storage() + for key in input_track_keys: + try: + await master_storage.delete_file(key, bucket=input.bucket_name) + ctx.log(f"Deleted recording file: {input.bucket_name}/{key}") + except Exception as e: + error_msg = f"Failed to delete {key}: {e}" + logger.error(error_msg, exc_info=True) + deletion_errors.append(error_msg) + + if transcript.audio_location == "storage": + storage = get_transcripts_storage() + try: + await storage.delete_file(transcript.storage_audio_path) + ctx.log(f"Deleted processed audio: {transcript.storage_audio_path}") + except Exception as e: + error_msg = f"Failed to delete processed audio: {e}" + logger.error(error_msg, exc_info=True) + deletion_errors.append(error_msg) + + if deletion_errors: + logger.warning( + "[Hatchet] cleanup_consent completed with errors", + transcript_id=input.transcript_id, + error_count=len(deletion_errors), + errors=deletion_errors, + ) + ctx.log(f"cleanup_consent completed with {len(deletion_errors)} errors") + else: + await transcripts_controller.update(transcript, {"audio_deleted": True}) + ctx.log("cleanup_consent: all audio deleted successfully") return ConsentResult()