mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 20:29:06 +00:00
Merge pull request #462 from Monadical-SAS/fix-reprocessing
Fix reprocessing
This commit is contained in:
@@ -488,7 +488,7 @@ class TranscriptController:
|
|||||||
"""
|
"""
|
||||||
Remove a transcript by id
|
Remove a transcript by id
|
||||||
"""
|
"""
|
||||||
transcript = await self.get_by_id(transcript_id, user_id=user_id)
|
transcript = await self.get_by_id(transcript_id)
|
||||||
if not transcript:
|
if not transcript:
|
||||||
return
|
return
|
||||||
if user_id is not None and transcript.user_id != user_id:
|
if user_id is not None and transcript.user_id != user_id:
|
||||||
@@ -497,6 +497,13 @@ class TranscriptController:
|
|||||||
query = transcripts.delete().where(transcripts.c.id == transcript_id)
|
query = transcripts.delete().where(transcripts.c.id == transcript_id)
|
||||||
await database.execute(query)
|
await database.execute(query)
|
||||||
|
|
||||||
|
async def remove_by_meeting_id(self, meeting_id: str):
|
||||||
|
"""
|
||||||
|
Remove a transcript by meeting_id
|
||||||
|
"""
|
||||||
|
query = transcripts.delete().where(transcripts.c.meeting_id == meeting_id)
|
||||||
|
await database.execute(query)
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def transaction(self):
|
async def transaction(self):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -7,7 +7,9 @@ from fastapi_pagination import Page
|
|||||||
from fastapi_pagination.ext.databases import paginate
|
from fastapi_pagination.ext.databases import paginate
|
||||||
from jose import jwt
|
from jose import jwt
|
||||||
from pydantic import BaseModel, Field
|
from pydantic import BaseModel, Field
|
||||||
|
from reflector.db.meetings import meetings_controller
|
||||||
from reflector.db.migrate_user import migrate_user
|
from reflector.db.migrate_user import migrate_user
|
||||||
|
from reflector.db.rooms import rooms_controller
|
||||||
from reflector.db.transcripts import (
|
from reflector.db.transcripts import (
|
||||||
SourceKind,
|
SourceKind,
|
||||||
TranscriptParticipant,
|
TranscriptParticipant,
|
||||||
@@ -273,9 +275,16 @@ async def transcript_delete(
|
|||||||
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
user: Annotated[Optional[auth.UserInfo], Depends(auth.current_user_optional)],
|
||||||
):
|
):
|
||||||
user_id = user["sub"] if user else None
|
user_id = user["sub"] if user else None
|
||||||
transcript = await transcripts_controller.get_by_id(transcript_id, user_id=user_id)
|
transcript = await transcripts_controller.get_by_id(transcript_id)
|
||||||
if not transcript:
|
if not transcript:
|
||||||
raise HTTPException(status_code=404, detail="Transcript not found")
|
raise HTTPException(status_code=404, detail="Transcript not found")
|
||||||
|
|
||||||
|
if transcript.meeting_id:
|
||||||
|
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
|
||||||
|
room = await rooms_controller.get_by_id(meeting.room_id)
|
||||||
|
if room.is_shared:
|
||||||
|
user_id = None
|
||||||
|
|
||||||
await transcripts_controller.remove_by_id(transcript.id, user_id=user_id)
|
await transcripts_controller.remove_by_id(transcript.id, user_id=user_id)
|
||||||
return DeletionStatus(status="ok")
|
return DeletionStatus(status="ok")
|
||||||
|
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import boto3
|
|||||||
import structlog
|
import structlog
|
||||||
from celery import shared_task
|
from celery import shared_task
|
||||||
from celery.utils.log import get_task_logger
|
from celery.utils.log import get_task_logger
|
||||||
|
from pydantic import ValidationError
|
||||||
from reflector.db.meetings import meetings_controller
|
from reflector.db.meetings import meetings_controller
|
||||||
from reflector.db.rooms import rooms_controller
|
from reflector.db.rooms import rooms_controller
|
||||||
from reflector.db.transcripts import SourceKind, transcripts_controller
|
from reflector.db.transcripts import SourceKind, transcripts_controller
|
||||||
@@ -69,15 +70,24 @@ async def process_recording(bucket_name: str, object_key: str):
|
|||||||
meeting = await meetings_controller.get_by_room_name(room_name)
|
meeting = await meetings_controller.get_by_room_name(room_name)
|
||||||
room = await rooms_controller.get_by_id(meeting.room_id)
|
room = await rooms_controller.get_by_id(meeting.room_id)
|
||||||
|
|
||||||
transcript = await transcripts_controller.add(
|
transcript = await transcripts_controller.get_by_meeting_id(meeting.id)
|
||||||
"",
|
if transcript:
|
||||||
source_kind=SourceKind.ROOM,
|
await transcripts_controller.update(
|
||||||
source_language="en",
|
transcript,
|
||||||
target_language="en",
|
{
|
||||||
user_id=room.user_id,
|
"topics": [],
|
||||||
meeting_id=meeting.id,
|
},
|
||||||
share_mode="public",
|
)
|
||||||
)
|
else:
|
||||||
|
transcript = await transcripts_controller.add(
|
||||||
|
"",
|
||||||
|
source_kind=SourceKind.ROOM,
|
||||||
|
source_language="en",
|
||||||
|
target_language="en",
|
||||||
|
user_id=room.user_id,
|
||||||
|
meeting_id=meeting.id,
|
||||||
|
share_mode="public",
|
||||||
|
)
|
||||||
|
|
||||||
_, extension = os.path.splitext(object_key)
|
_, extension = os.path.splitext(object_key)
|
||||||
upload_filename = transcript.data_path / f"upload{extension}"
|
upload_filename = transcript.data_path / f"upload{extension}"
|
||||||
@@ -171,21 +181,21 @@ async def reprocess_failed_recordings():
|
|||||||
logger.warning(f"No room found for meeting: {meeting.id}")
|
logger.warning(f"No room found for meeting: {meeting.id}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
transcript = await transcripts_controller.get_by_meeting_id(meeting.id)
|
transcript = None
|
||||||
|
try:
|
||||||
needs_processing = False
|
transcript = await transcripts_controller.get_by_meeting_id(
|
||||||
|
meeting.id
|
||||||
if transcript is None:
|
)
|
||||||
logger.info(f"Recording {object_key} has no transcript")
|
except ValidationError:
|
||||||
needs_processing = True
|
await transcripts_controller.remove_by_meeting_id(meeting.id)
|
||||||
elif transcript.status == "error":
|
logger.warning(
|
||||||
logger.info(
|
f"Removed invalid transcript for meeting: {meeting.id}"
|
||||||
f"Recording {object_key} has transcript with error status"
|
|
||||||
)
|
)
|
||||||
needs_processing = True
|
|
||||||
|
|
||||||
if needs_processing:
|
if transcript is not None and transcript.status == "error":
|
||||||
logger.info(f"Queueing recording for processing: {object_key}")
|
logger.info(
|
||||||
|
f"Queueing recording for processing: {object_key}, meeting {meeting.id}"
|
||||||
|
)
|
||||||
process_recording.delay(settings.AWS_WHEREBY_S3_BUCKET, object_key)
|
process_recording.delay(settings.AWS_WHEREBY_S3_BUCKET, object_key)
|
||||||
reprocessed_count += 1
|
reprocessed_count += 1
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user