Merge pull request #461 from Monadical-SAS/fix-unprocessed-meetings

Fix unprocessed meetings
This commit is contained in:
2025-02-28 15:46:46 +01:00
committed by GitHub
4 changed files with 77 additions and 4 deletions

View File

@@ -5,7 +5,7 @@ services:
ports: ports:
- 1250:1250 - 1250:1250
volumes: volumes:
- ./server/data/:/app/data/ - ./server/:/app/
env_file: env_file:
- ./server/.env - ./server/.env
environment: environment:
@@ -15,7 +15,7 @@ services:
build: build:
context: server context: server
volumes: volumes:
- ./server/data/:/app/data/ - ./server/:/app/
env_file: env_file:
- ./server/.env - ./server/.env
environment: environment:
@@ -25,7 +25,7 @@ services:
build: build:
context: server context: server
volumes: volumes:
- ./server/data/:/app/data/ - ./server/:/app/
env_file: env_file:
- ./server/.env - ./server/.env
environment: environment:

2
server/.gitignore vendored
View File

@@ -113,7 +113,7 @@ ipython_config.py
__pypackages__/ __pypackages__/
# Celery stuff # Celery stuff
celerybeat-schedule.db celerybeat-schedule*
celerybeat.pid celerybeat.pid
# SageMath parsed files # SageMath parsed files

View File

@@ -1,6 +1,7 @@
import celery import celery
import structlog import structlog
from celery import Celery from celery import Celery
from celery.schedules import crontab
from reflector.settings import settings from reflector.settings import settings
logger = structlog.get_logger(__name__) logger = structlog.get_logger(__name__)
@@ -30,6 +31,10 @@ else:
"task": "reflector.worker.process.process_meetings", "task": "reflector.worker.process.process_meetings",
"schedule": 60.0, "schedule": 60.0,
}, },
"reprocess_failed_recordings": {
"task": "reflector.worker.process.reprocess_failed_recordings",
"schedule": crontab(hour=5, minute=0), # Midnight EST
},
} }
if settings.HEALTHCHECK_URL: if settings.HEALTHCHECK_URL:

View File

@@ -126,3 +126,71 @@ async def process_meetings():
logger.info("Meeting %s is deactivated", meeting.id) logger.info("Meeting %s is deactivated", meeting.id)
logger.info("Processed meetings") logger.info("Processed meetings")
@shared_task
@asynctask
async def reprocess_failed_recordings():
"""
Find recordings in the S3 bucket and check if they have proper transcriptions.
If not, requeue them for processing.
"""
logger.info("Checking for recordings that need processing or reprocessing")
s3 = boto3.client(
"s3",
region_name=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
reprocessed_count = 0
try:
paginator = s3.get_paginator("list_objects_v2")
pages = paginator.paginate(Bucket=settings.AWS_WHEREBY_S3_BUCKET)
for page in pages:
if "Contents" not in page:
continue
for obj in page["Contents"]:
object_key = obj["Key"]
if not (object_key.endswith(".mp4")):
continue
room_name = f"/{object_key[:36]}"
meeting = await meetings_controller.get_by_room_name(room_name)
if not meeting:
logger.warning(f"No meeting found for recording: {object_key}")
continue
room = await rooms_controller.get_by_id(meeting.room_id)
if not room:
logger.warning(f"No room found for meeting: {meeting.id}")
continue
transcript = await transcripts_controller.get_by_meeting_id(meeting.id)
needs_processing = False
if transcript is None:
logger.info(f"Recording {object_key} has no transcript")
needs_processing = True
elif transcript.status == "error":
logger.info(
f"Recording {object_key} has transcript with error status"
)
needs_processing = True
if needs_processing:
logger.info(f"Queueing recording for processing: {object_key}")
process_recording.delay(settings.AWS_WHEREBY_S3_BUCKET, object_key)
reprocessed_count += 1
except Exception as e:
logger.error(f"Error checking S3 bucket: {str(e)}")
logger.info(f"Reprocessing complete. Requeued {reprocessed_count} recordings")
return reprocessed_count