self-review round

This commit is contained in:
Igor Loskutov
2025-12-18 13:39:04 -05:00
parent 8272c79856
commit 0ce38dfeb3

View File

@@ -455,25 +455,6 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
transcripts_controller,
)
# Cleanup temporary padded S3 files (deferred until after mixdown)
track_result = ctx.task_output(process_tracks)
created_padded_files = track_result.created_padded_files
if created_padded_files:
ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files")
storage = _spawn_storage()
cleanup_tasks = []
for storage_path in created_padded_files:
cleanup_tasks.append(storage.delete_file(storage_path))
cleanup_results = await asyncio.gather(*cleanup_tasks, return_exceptions=True)
for storage_path, result in zip(created_padded_files, cleanup_results):
if isinstance(result, Exception):
logger.warning(
"[Hatchet] Failed to cleanup temporary padded track",
storage_path=storage_path,
error=str(result),
)
mixdown_result = ctx.task_output(mixdown_tracks)
audio_key = mixdown_result.audio_key
@@ -714,6 +695,23 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
duration = mixdown_result.duration
all_words = track_result.all_words
# Cleanup temporary padded S3 files (deferred until finalize for semantic parity with Celery)
created_padded_files = track_result.created_padded_files
if created_padded_files:
ctx.log(f"Cleaning up {len(created_padded_files)} temporary S3 files")
storage = _spawn_storage()
cleanup_results = await asyncio.gather(
*[storage.delete_file(path) for path in created_padded_files],
return_exceptions=True,
)
for storage_path, result in zip(created_padded_files, cleanup_results):
if isinstance(result, Exception):
logger.warning(
"[Hatchet] Failed to cleanup temporary padded track",
storage_path=storage_path,
error=str(result),
)
async with fresh_db_connection():
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptDuration,
@@ -767,22 +765,97 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
)
@with_error_handling("cleanup_consent", set_error_status=False)
async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
"""Check and handle consent requirements."""
"""Check consent and delete audio files if any participant denied."""
ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.db.meetings import meetings_controller # noqa: PLC0415
from reflector.db.meetings import ( # noqa: PLC0415
meeting_consent_controller,
meetings_controller,
)
from reflector.db.recordings import recordings_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.storage import get_transcripts_storage # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript and transcript.meeting_id:
if not transcript:
ctx.log("cleanup_consent: transcript not found")
return ConsentResult()
consent_denied = False
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting:
# Check consent logic here
# For now just mark as checked
pass
consent_denied = await meeting_consent_controller.has_any_denial(
meeting.id
)
ctx.log(f"cleanup_consent complete: transcript_id={input.transcript_id}")
if not consent_denied:
ctx.log("cleanup_consent: consent approved, keeping all files")
return ConsentResult()
ctx.log("cleanup_consent: consent denied, deleting audio files")
input_track_keys = set(t["s3_key"] for t in input.tracks)
# Detect if recording.track_keys was manually modified after workflow started
if transcript.recording_id:
recording = await recordings_controller.get_by_id(transcript.recording_id)
if recording and recording.track_keys:
db_track_keys = set(filter_cam_audio_tracks(recording.track_keys))
if input_track_keys != db_track_keys:
added = db_track_keys - input_track_keys
removed = input_track_keys - db_track_keys
logger.warning(
"[Hatchet] Track keys mismatch: DB changed since workflow start",
transcript_id=input.transcript_id,
recording_id=transcript.recording_id,
input_count=len(input_track_keys),
db_count=len(db_track_keys),
added_in_db=list(added) if added else None,
removed_from_db=list(removed) if removed else None,
)
ctx.log(
f"WARNING: track_keys mismatch - "
f"input has {len(input_track_keys)}, DB has {len(db_track_keys)}. "
f"Using input tracks for deletion."
)
deletion_errors = []
if input_track_keys and input.bucket_name:
master_storage = get_transcripts_storage()
for key in input_track_keys:
try:
await master_storage.delete_file(key, bucket=input.bucket_name)
ctx.log(f"Deleted recording file: {input.bucket_name}/{key}")
except Exception as e:
error_msg = f"Failed to delete {key}: {e}"
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
if transcript.audio_location == "storage":
storage = get_transcripts_storage()
try:
await storage.delete_file(transcript.storage_audio_path)
ctx.log(f"Deleted processed audio: {transcript.storage_audio_path}")
except Exception as e:
error_msg = f"Failed to delete processed audio: {e}"
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
if deletion_errors:
logger.warning(
"[Hatchet] cleanup_consent completed with errors",
transcript_id=input.transcript_id,
error_count=len(deletion_errors),
errors=deletion_errors,
)
ctx.log(f"cleanup_consent completed with {len(deletion_errors)} errors")
else:
await transcripts_controller.update(transcript, {"audio_deleted": True})
ctx.log("cleanup_consent: all audio deleted successfully")
return ConsentResult()