feat: show trash for soft deleted transcripts and hard delete option (#942)

* feat: show trash for soft deleted transcripts and hard delete option

* fix: test fixtures

* docs: aws new permissions
This commit is contained in:
Juan Diego García
2026-03-31 13:15:52 -05:00
committed by GitHub
parent cc9c5cd4a5
commit ec8b49738e
20 changed files with 1351 additions and 94 deletions

View File

@@ -78,6 +78,14 @@ class RecordingController:
)
await get_database().execute(query)
async def restore_by_id(self, id: str) -> None:
query = recordings.update().where(recordings.c.id == id).values(deleted_at=None)
await get_database().execute(query)
async def hard_delete_by_id(self, id: str) -> None:
query = recordings.delete().where(recordings.c.id == id)
await get_database().execute(query)
async def set_meeting_id(
self,
recording_id: NonEmptyString,

View File

@@ -138,6 +138,7 @@ class SearchParameters(BaseModel):
source_kind: SourceKind | None = None
from_datetime: datetime | None = None
to_datetime: datetime | None = None
include_deleted: bool = False
class SearchResultDB(BaseModel):
@@ -387,7 +388,10 @@ class SearchController:
transcripts.join(rooms, transcripts.c.room_id == rooms.c.id, isouter=True)
)
base_query = base_query.where(transcripts.c.deleted_at.is_(None))
if params.include_deleted:
base_query = base_query.where(transcripts.c.deleted_at.isnot(None))
else:
base_query = base_query.where(transcripts.c.deleted_at.is_(None))
if params.query_text is not None:
# because already initialized based on params.query_text presence above
@@ -396,7 +400,13 @@ class SearchController:
transcripts.c.search_vector_en.op("@@")(search_query)
)
if params.user_id:
if params.include_deleted:
# Trash view: only show user's own deleted transcripts.
# Defense-in-depth: require user_id to prevent leaking all users' trash.
if not params.user_id:
return [], 0
base_query = base_query.where(transcripts.c.user_id == params.user_id)
elif params.user_id:
base_query = base_query.where(
sqlalchemy.or_(
transcripts.c.user_id == params.user_id, rooms.c.is_shared
@@ -421,6 +431,8 @@ class SearchController:
if params.query_text is not None:
order_by = sqlalchemy.desc(sqlalchemy.text("rank"))
elif params.include_deleted:
order_by = sqlalchemy.desc(transcripts.c.deleted_at)
else:
order_by = sqlalchemy.desc(transcripts.c.created_at)

View File

@@ -24,7 +24,7 @@ from reflector.db.utils import is_postgresql
from reflector.logger import logger
from reflector.processors.types import Word as ProcessorWord
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
from reflector.storage import get_source_storage, get_transcripts_storage
from reflector.utils import generate_uuid4
from reflector.utils.webvtt import topics_to_webvtt
@@ -676,6 +676,126 @@ class TranscriptController:
)
await get_database().execute(query)
async def restore_by_id(
self,
transcript_id: str,
user_id: str | None = None,
) -> bool:
"""
Restore a soft-deleted transcript by clearing deleted_at.
Also restores the associated recording if present.
Returns True if the transcript was restored, False otherwise.
"""
transcript = await self.get_by_id(transcript_id)
if not transcript:
return False
if transcript.deleted_at is None:
return False
if user_id is not None and transcript.user_id != user_id:
return False
query = (
transcripts.update()
.where(transcripts.c.id == transcript_id)
.values(deleted_at=None)
)
await get_database().execute(query)
if transcript.recording_id:
try:
await recordings_controller.restore_by_id(transcript.recording_id)
except Exception as e:
logger.warning(
"Failed to restore recording",
exc_info=e,
recording_id=transcript.recording_id,
)
return True
async def hard_delete(self, transcript_id: str) -> None:
"""
Permanently delete a transcript, its recording, and all associated files.
Only deletes transcript-owned resources:
- Transcript row and recording row from DB (first, to make data inaccessible)
- Transcript audio in S3 storage
- Recording files in S3 (both object_key and track_keys, since a recording can have both)
- Local files (data_path directory)
Does NOT delete: meetings, consent records, rooms, or any shared entity.
Requires the transcript to be soft-deleted first (deleted_at must be set).
"""
transcript = await self.get_by_id(transcript_id)
if not transcript:
return
if transcript.deleted_at is None:
return
# Collect file references before deleting DB rows
recording = None
recording_storage = None
if transcript.recording_id:
recording = await recordings_controller.get_by_id(transcript.recording_id)
# Determine the correct storage backend for recording files.
# Recordings from different platforms (daily, whereby) live in
# platform-specific buckets with separate credentials.
if recording and recording.meeting_id:
from reflector.db.meetings import meetings_controller # noqa: PLC0415
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if meeting:
recording_storage = get_source_storage(meeting.platform)
if recording_storage is None:
recording_storage = get_transcripts_storage()
# 1. Hard-delete DB rows first (makes data inaccessible immediately)
if recording:
await recordings_controller.hard_delete_by_id(recording.id)
await get_database().execute(
transcripts.delete().where(transcripts.c.id == transcript_id)
)
# 2. Delete transcript audio from S3 (always uses transcript storage)
transcript_storage = get_transcripts_storage()
if transcript.audio_location == "storage" and not transcript.audio_deleted:
try:
await transcript_storage.delete_file(transcript.storage_audio_path)
except Exception as e:
logger.warning(
"Failed to delete transcript audio from storage",
exc_info=e,
transcript_id=transcript_id,
path=transcript.storage_audio_path,
)
# 3. Delete recording files from S3 (both object_key and track_keys —
# a recording can have both, unlike consent cleanup which uses elif).
# Uses platform-specific storage resolved above.
if recording and recording.bucket_name and recording_storage:
keys_to_delete = []
if recording.track_keys:
keys_to_delete = recording.track_keys
if recording.object_key:
keys_to_delete.append(recording.object_key)
for key in keys_to_delete:
try:
await recording_storage.delete_file(
key, bucket=recording.bucket_name
)
except Exception as e:
logger.warning(
"Failed to delete recording file",
exc_info=e,
key=key,
bucket=recording.bucket_name,
)
# 4. Delete local files
transcript.unlink()
async def remove_by_recording_id(self, recording_id: str):
"""
Soft-delete a transcript by recording_id

View File

@@ -1277,6 +1277,7 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
return ConsentResult()
consent_denied = False
meeting = None
if transcript.meeting_id:
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if meeting:
@@ -1339,6 +1340,22 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
# Delete cloud video if present
if meeting and meeting.daily_composed_video_s3_key:
try:
source_storage = get_source_storage("daily")
await source_storage.delete_file(meeting.daily_composed_video_s3_key)
await meetings_controller.update_meeting(
meeting.id,
daily_composed_video_s3_key=None,
daily_composed_video_duration=None,
)
ctx.log(f"Deleted cloud video: {meeting.daily_composed_video_s3_key}")
except Exception as e:
error_msg = f"Failed to delete cloud video: {e}"
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
if deletion_errors:
logger.warning(
"[Hatchet] cleanup_consent completed with errors",
@@ -1349,7 +1366,7 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
ctx.log(f"cleanup_consent completed with {len(deletion_errors)} errors")
else:
await transcripts_controller.update(transcript, {"audio_deleted": True})
ctx.log("cleanup_consent: all audio deleted successfully")
ctx.log("cleanup_consent: all audio and video deleted successfully")
return ConsentResult()

View File

@@ -688,7 +688,10 @@ async def cleanup_consent(input: FilePipelineInput, ctx: Context) -> ConsentResu
)
from reflector.db.recordings import recordings_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.storage import get_transcripts_storage # noqa: PLC0415
from reflector.storage import ( # noqa: PLC0415
get_source_storage,
get_transcripts_storage,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
@@ -697,6 +700,7 @@ async def cleanup_consent(input: FilePipelineInput, ctx: Context) -> ConsentResu
consent_denied = False
recording = None
meeting = None
if transcript.recording_id:
recording = await recordings_controller.get_by_id(transcript.recording_id)
if recording and recording.meeting_id:
@@ -756,6 +760,22 @@ async def cleanup_consent(input: FilePipelineInput, ctx: Context) -> ConsentResu
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
# Delete cloud video if present
if meeting and meeting.daily_composed_video_s3_key:
try:
source_storage = get_source_storage("daily")
await source_storage.delete_file(meeting.daily_composed_video_s3_key)
await meetings_controller.update_meeting(
meeting.id,
daily_composed_video_s3_key=None,
daily_composed_video_duration=None,
)
ctx.log(f"Deleted cloud video: {meeting.daily_composed_video_s3_key}")
except Exception as e:
error_msg = f"Failed to delete cloud video: {e}"
logger.error(error_msg, exc_info=True)
deletion_errors.append(error_msg)
if deletion_errors:
logger.warning(
"[Hatchet] cleanup_consent completed with errors",
@@ -764,7 +784,7 @@ async def cleanup_consent(input: FilePipelineInput, ctx: Context) -> ConsentResu
)
else:
await transcripts_controller.update(transcript, {"audio_deleted": True})
ctx.log("cleanup_consent: all audio deleted successfully")
ctx.log("cleanup_consent: all audio and video deleted successfully")
return ConsentResult()

View File

@@ -61,7 +61,7 @@ from reflector.processors.types import (
)
from reflector.processors.types import Transcript as TranscriptProcessorType
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
from reflector.storage import get_source_storage, get_transcripts_storage
from reflector.views.transcripts import GetTranscriptTopic
from reflector.ws_events import TranscriptEventName
from reflector.ws_manager import WebsocketManager, get_ws_manager
@@ -671,6 +671,22 @@ async def cleanup_consent(transcript: Transcript, logger: Logger):
logger.error(error_msg, exc_info=e)
deletion_errors.append(error_msg)
# Delete cloud video if present
if meeting and meeting.daily_composed_video_s3_key:
try:
source_storage = get_source_storage("daily")
await source_storage.delete_file(meeting.daily_composed_video_s3_key)
await meetings_controller.update_meeting(
meeting.id,
daily_composed_video_s3_key=None,
daily_composed_video_duration=None,
)
logger.info(f"Deleted cloud video: {meeting.daily_composed_video_s3_key}")
except Exception as e:
error_msg = f"Failed to delete cloud video: {e}"
logger.error(error_msg, exc_info=e)
deletion_errors.append(error_msg)
if deletion_errors:
logger.warning(
f"Consent cleanup completed with {len(deletion_errors)} errors",
@@ -678,7 +694,7 @@ async def cleanup_consent(transcript: Transcript, logger: Logger):
)
else:
await transcripts_controller.update(transcript, {"audio_deleted": True})
logger.info("Consent cleanup done - all audio deleted")
logger.info("Consent cleanup done - all audio and video deleted")
@get_transcript

View File

@@ -309,6 +309,7 @@ async def transcripts_search(
source_kind: Optional[SourceKind] = None,
from_datetime: SearchFromDatetimeParam = None,
to_datetime: SearchToDatetimeParam = None,
include_deleted: bool = False,
user: Annotated[
Optional[auth.UserInfo], Depends(auth.current_user_optional_if_public_mode)
] = None,
@@ -316,6 +317,12 @@ async def transcripts_search(
"""Full-text search across transcript titles and content."""
user_id = user["sub"] if user else None
if include_deleted and not user_id:
raise HTTPException(
status_code=401,
detail="Authentication required to view deleted transcripts",
)
if from_datetime and to_datetime and from_datetime > to_datetime:
raise HTTPException(
status_code=400, detail="'from' must be less than or equal to 'to'"
@@ -330,6 +337,7 @@ async def transcripts_search(
source_kind=source_kind,
from_datetime=from_datetime,
to_datetime=to_datetime,
include_deleted=include_deleted,
)
results, total = await search_controller.search_transcripts(search_params)
@@ -615,6 +623,54 @@ async def transcript_delete(
return DeletionStatus(status="ok")
@router.post("/transcripts/{transcript_id}/restore", response_model=DeletionStatus)
async def transcript_restore(
transcript_id: str,
user: Annotated[auth.UserInfo, Depends(auth.current_user)],
):
"""Restore a soft-deleted transcript."""
user_id = user["sub"]
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
raise HTTPException(status_code=404, detail="Transcript not found")
if transcript.deleted_at is None:
raise HTTPException(status_code=400, detail="Transcript is not deleted")
if not transcripts_controller.user_can_mutate(transcript, user_id):
raise HTTPException(status_code=403, detail="Not authorized")
await transcripts_controller.restore_by_id(transcript.id, user_id=user_id)
await get_ws_manager().send_json(
room_id=f"user:{user_id}",
message={"event": "TRANSCRIPT_RESTORED", "data": {"id": transcript.id}},
)
return DeletionStatus(status="ok")
@router.delete("/transcripts/{transcript_id}/destroy", response_model=DeletionStatus)
async def transcript_destroy(
transcript_id: str,
user: Annotated[auth.UserInfo, Depends(auth.current_user)],
):
"""Permanently delete a transcript and all associated files."""
user_id = user["sub"]
transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript:
raise HTTPException(status_code=404, detail="Transcript not found")
if transcript.deleted_at is None:
raise HTTPException(
status_code=400, detail="Transcript must be soft-deleted first"
)
if not transcripts_controller.user_can_mutate(transcript, user_id):
raise HTTPException(status_code=403, detail="Not authorized")
await transcripts_controller.hard_delete(transcript.id)
await get_ws_manager().send_json(
room_id=f"user:{user_id}",
message={"event": "TRANSCRIPT_DELETED", "data": {"id": transcript.id}},
)
return DeletionStatus(status="ok")
@router.get(
"/transcripts/{transcript_id}/topics",
response_model=list[GetTranscriptTopic],

View File

@@ -113,6 +113,7 @@ TranscriptWsEvent = Annotated[
UserEventName = Literal[
"TRANSCRIPT_CREATED",
"TRANSCRIPT_DELETED",
"TRANSCRIPT_RESTORED",
"TRANSCRIPT_STATUS",
"TRANSCRIPT_FINAL_TITLE",
"TRANSCRIPT_DURATION",
@@ -161,6 +162,15 @@ class UserWsTranscriptDeleted(BaseModel):
data: UserTranscriptDeletedData
class UserTranscriptRestoredData(BaseModel):
id: NonEmptyString
class UserWsTranscriptRestored(BaseModel):
event: Literal["TRANSCRIPT_RESTORED"] = "TRANSCRIPT_RESTORED"
data: UserTranscriptRestoredData
class UserWsTranscriptStatus(BaseModel):
event: Literal["TRANSCRIPT_STATUS"] = "TRANSCRIPT_STATUS"
data: UserTranscriptStatusData
@@ -180,6 +190,7 @@ UserWsEvent = Annotated[
Union[
UserWsTranscriptCreated,
UserWsTranscriptDeleted,
UserWsTranscriptRestored,
UserWsTranscriptStatus,
UserWsTranscriptFinalTitle,
UserWsTranscriptDuration,