diff --git a/server/reflector/db/transcripts.py b/server/reflector/db/transcripts.py index 4002538c..26ed9386 100644 --- a/server/reflector/db/transcripts.py +++ b/server/reflector/db/transcripts.py @@ -488,7 +488,7 @@ class TranscriptController: """ Remove a transcript by id """ - transcript = await self.get_by_id(transcript_id, user_id=user_id) + transcript = await self.get_by_id(transcript_id) if not transcript: return if user_id is not None and transcript.user_id != user_id: @@ -497,6 +497,13 @@ class TranscriptController: query = transcripts.delete().where(transcripts.c.id == transcript_id) await database.execute(query) + async def remove_by_meeting_id(self, meeting_id: str): + """ + Remove a transcript by meeting_id + """ + query = transcripts.delete().where(transcripts.c.meeting_id == meeting_id) + await database.execute(query) + @asynccontextmanager async def transaction(self): """ diff --git a/server/reflector/views/transcripts.py b/server/reflector/views/transcripts.py index c4e2913c..b6e56c44 100644 --- a/server/reflector/views/transcripts.py +++ b/server/reflector/views/transcripts.py @@ -7,7 +7,9 @@ from fastapi_pagination import Page from fastapi_pagination.ext.databases import paginate from jose import jwt from pydantic import BaseModel, Field +from reflector.db.meetings import meetings_controller from reflector.db.migrate_user import migrate_user +from reflector.db.rooms import rooms_controller from reflector.db.transcripts import ( SourceKind, TranscriptParticipant, @@ -273,9 +275,16 @@ async def transcript_delete( user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)], ): user_id = user["sub"] if user else None - transcript = await transcripts_controller.get_by_id(transcript_id, user_id=user_id) + transcript = await transcripts_controller.get_by_id(transcript_id) if not transcript: raise HTTPException(status_code=404, detail="Transcript not found") + + if transcript.meeting_id: + meeting = await meetings_controller.get_by_id(transcript.meeting_id) + room = await rooms_controller.get_by_id(meeting.room_id) + if room.is_shared: + user_id = None + await transcripts_controller.remove_by_id(transcript.id, user_id=user_id) return DeletionStatus(status="ok") diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 68b53893..72c87adb 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -8,6 +8,7 @@ import boto3 import structlog from celery import shared_task from celery.utils.log import get_task_logger +from pydantic import ValidationError from reflector.db.meetings import meetings_controller from reflector.db.rooms import rooms_controller from reflector.db.transcripts import SourceKind, transcripts_controller @@ -69,15 +70,24 @@ async def process_recording(bucket_name: str, object_key: str): meeting = await meetings_controller.get_by_room_name(room_name) room = await rooms_controller.get_by_id(meeting.room_id) - transcript = await transcripts_controller.add( - "", - source_kind=SourceKind.ROOM, - source_language="en", - target_language="en", - user_id=room.user_id, - meeting_id=meeting.id, - share_mode="public", - ) + transcript = await transcripts_controller.get_by_meeting_id(meeting.id) + if transcript: + await transcripts_controller.update( + transcript, + { + "topics": [], + }, + ) + else: + transcript = await transcripts_controller.add( + "", + source_kind=SourceKind.ROOM, + source_language="en", + target_language="en", + user_id=room.user_id, + meeting_id=meeting.id, + share_mode="public", + ) _, extension = os.path.splitext(object_key) upload_filename = transcript.data_path / f"upload{extension}" @@ -171,21 +181,21 @@ async def reprocess_failed_recordings(): logger.warning(f"No room found for meeting: {meeting.id}") continue - transcript = await transcripts_controller.get_by_meeting_id(meeting.id) - - needs_processing = False - - if transcript is None: - logger.info(f"Recording {object_key} has no transcript") - needs_processing = True - elif transcript.status == "error": - logger.info( - f"Recording {object_key} has transcript with error status" + transcript = None + try: + transcript = await transcripts_controller.get_by_meeting_id( + meeting.id + ) + except ValidationError: + await transcripts_controller.remove_by_meeting_id(meeting.id) + logger.warning( + f"Removed invalid transcript for meeting: {meeting.id}" ) - needs_processing = True - if needs_processing: - logger.info(f"Queueing recording for processing: {object_key}") + if transcript is not None and transcript.status == "error": + logger.info( + f"Queueing recording for processing: {object_key}, meeting {meeting.id}" + ) process_recording.delay(settings.AWS_WHEREBY_S3_BUCKET, object_key) reprocessed_count += 1