Fix reprocessing

This commit is contained in:
2025-03-03 15:15:17 +01:00
parent dd3dba3906
commit 78a522ac7c
2 changed files with 40 additions and 23 deletions

View File

@@ -488,7 +488,7 @@ class TranscriptController:
"""
Remove a transcript by id
"""
transcript = await self.get_by_id(transcript_id, user_id=user_id)
transcript = await self.get_by_id(transcript_id)
if not transcript:
return
if user_id is not None and transcript.user_id != user_id:
@@ -497,6 +497,13 @@ class TranscriptController:
query = transcripts.delete().where(transcripts.c.id == transcript_id)
await database.execute(query)
async def remove_by_meeting_id(self, meeting_id: str):
"""
Remove a transcript by meeting_id
"""
query = transcripts.delete().where(transcripts.c.meeting_id == meeting_id)
await database.execute(query)
@asynccontextmanager
async def transaction(self):
"""

View File

@@ -8,6 +8,7 @@ import boto3
import structlog
from celery import shared_task
from celery.utils.log import get_task_logger
from pydantic import ValidationError
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import SourceKind, transcripts_controller
@@ -69,15 +70,24 @@ async def process_recording(bucket_name: str, object_key: str):
meeting = await meetings_controller.get_by_room_name(room_name)
room = await rooms_controller.get_by_id(meeting.room_id)
transcript = await transcripts_controller.add(
"",
source_kind=SourceKind.ROOM,
source_language="en",
target_language="en",
user_id=room.user_id,
meeting_id=meeting.id,
share_mode="public",
)
transcript = await transcripts_controller.get_by_meeting_id(meeting.id)
if transcript:
await transcripts_controller.update(
transcript,
{
"topics": [],
},
)
else:
transcript = await transcripts_controller.add(
"",
source_kind=SourceKind.ROOM,
source_language="en",
target_language="en",
user_id=room.user_id,
meeting_id=meeting.id,
share_mode="public",
)
_, extension = os.path.splitext(object_key)
upload_filename = transcript.data_path / f"upload{extension}"
@@ -171,21 +181,21 @@ async def reprocess_failed_recordings():
logger.warning(f"No room found for meeting: {meeting.id}")
continue
transcript = await transcripts_controller.get_by_meeting_id(meeting.id)
needs_processing = False
if transcript is None:
logger.info(f"Recording {object_key} has no transcript")
needs_processing = True
elif transcript.status == "error":
logger.info(
f"Recording {object_key} has transcript with error status"
transcript = None
try:
transcript = await transcripts_controller.get_by_meeting_id(
meeting.id
)
except ValidationError:
await transcripts_controller.remove_by_meeting_id(meeting.id)
logger.warning(
f"Removed invalid transcript for meeting: {meeting.id}"
)
needs_processing = True
if needs_processing:
logger.info(f"Queueing recording for processing: {object_key}")
if transcript is not None and transcript.status == "error":
logger.info(
f"Queueing recording for processing: {object_key}, meeting {meeting.id}"
)
process_recording.delay(settings.AWS_WHEREBY_S3_BUCKET, object_key)
reprocessed_count += 1