mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 20:59:05 +00:00
fix: add Redis distributed locking to prevent race conditions in process_meetings
- Implement per-meeting locks using Redis to prevent concurrent processing - Add lock extension after slow API calls (Whereby) to handle long-running operations - Use redis-py's built-in lock.extend() with replace_ttl=True for simple TTL refresh - Track and log skipped meetings when locked by other workers - Document SSRF analysis showing it's low-risk due to async worker isolation This prevents multiple workers from processing the same meeting simultaneously, which could cause state corruption or duplicate deactivations.
This commit is contained in:
@@ -9,6 +9,7 @@ import structlog
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
from pydantic import ValidationError
|
||||
from redis.exceptions import LockError
|
||||
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.recordings import Recording, recordings_controller
|
||||
@@ -16,6 +17,7 @@ from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.transcripts import SourceKind, transcripts_controller
|
||||
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
||||
from reflector.pipelines.main_live_pipeline import asynctask
|
||||
from reflector.redis_cache import get_redis_client
|
||||
from reflector.settings import settings
|
||||
from reflector.whereby import get_room_sessions
|
||||
|
||||
@@ -158,54 +160,96 @@ async def process_meetings():
|
||||
- If never used: Keep active until scheduled end time, then deactivate
|
||||
* On-the-fly meetings: Deactivate immediately (created when someone joins,
|
||||
so no sessions means everyone left)
|
||||
|
||||
Uses distributed locking to prevent race conditions when multiple workers
|
||||
process the same meeting simultaneously.
|
||||
"""
|
||||
logger.info("Processing meetings")
|
||||
meetings = await meetings_controller.get_all_active()
|
||||
current_time = datetime.now(timezone.utc)
|
||||
redis_client = get_redis_client()
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
|
||||
for meeting in meetings:
|
||||
should_deactivate = False
|
||||
end_date = meeting.end_date
|
||||
if end_date.tzinfo is None:
|
||||
end_date = end_date.replace(tzinfo=timezone.utc)
|
||||
lock_key = f"meeting_process_lock:{meeting.id}"
|
||||
lock = redis_client.lock(lock_key, timeout=120)
|
||||
|
||||
response = await get_room_sessions(meeting.room_name)
|
||||
room_sessions = response.get("results", [])
|
||||
has_active_sessions = room_sessions and any(
|
||||
rs["endedAt"] is None for rs in room_sessions
|
||||
)
|
||||
has_had_sessions = bool(room_sessions)
|
||||
try:
|
||||
if not lock.acquire(blocking=False):
|
||||
logger.debug(
|
||||
f"Meeting {meeting.id} is being processed by another worker, skipping"
|
||||
)
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
if has_active_sessions:
|
||||
logger.debug("Meeting %s still has active sessions", meeting.id)
|
||||
else:
|
||||
if meeting.calendar_event_id:
|
||||
if has_had_sessions:
|
||||
should_deactivate = True
|
||||
logger.info(
|
||||
"Calendar meeting %s ended - all participants left", meeting.id
|
||||
)
|
||||
elif current_time > end_date:
|
||||
should_deactivate = True
|
||||
logger.info(
|
||||
"Calendar meeting %s deactivated - scheduled time ended with no participants",
|
||||
meeting.id,
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
"Calendar meeting %s waiting for participants until %s",
|
||||
meeting.id,
|
||||
end_date,
|
||||
)
|
||||
# Process the meeting
|
||||
should_deactivate = False
|
||||
end_date = meeting.end_date
|
||||
if end_date.tzinfo is None:
|
||||
end_date = end_date.replace(tzinfo=timezone.utc)
|
||||
|
||||
# This API call could be slow, extend lock if needed
|
||||
response = await get_room_sessions(meeting.room_name)
|
||||
|
||||
try:
|
||||
# Extend lock after slow operation to ensure we still hold it
|
||||
lock.extend(120, replace_ttl=True)
|
||||
except LockError:
|
||||
logger.warning(f"Lost lock for meeting {meeting.id}, skipping")
|
||||
continue
|
||||
|
||||
room_sessions = response.get("results", [])
|
||||
has_active_sessions = room_sessions and any(
|
||||
rs["endedAt"] is None for rs in room_sessions
|
||||
)
|
||||
has_had_sessions = bool(room_sessions)
|
||||
|
||||
if has_active_sessions:
|
||||
logger.debug("Meeting %s still has active sessions", meeting.id)
|
||||
else:
|
||||
should_deactivate = True
|
||||
logger.info("On-the-fly meeting %s has no active sessions", meeting.id)
|
||||
if meeting.calendar_event_id:
|
||||
if has_had_sessions:
|
||||
should_deactivate = True
|
||||
logger.info(
|
||||
"Calendar meeting %s ended - all participants left",
|
||||
meeting.id,
|
||||
)
|
||||
elif current_time > end_date:
|
||||
should_deactivate = True
|
||||
logger.info(
|
||||
"Calendar meeting %s deactivated - scheduled time ended with no participants",
|
||||
meeting.id,
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
"Calendar meeting %s waiting for participants until %s",
|
||||
meeting.id,
|
||||
end_date,
|
||||
)
|
||||
else:
|
||||
should_deactivate = True
|
||||
logger.info(
|
||||
"On-the-fly meeting %s has no active sessions", meeting.id
|
||||
)
|
||||
|
||||
if should_deactivate:
|
||||
await meetings_controller.update_meeting(meeting.id, is_active=False)
|
||||
logger.info("Meeting %s is deactivated", meeting.id)
|
||||
if should_deactivate:
|
||||
await meetings_controller.update_meeting(meeting.id, is_active=False)
|
||||
logger.info("Meeting %s is deactivated", meeting.id)
|
||||
|
||||
logger.info("Processed %d meetings", len(meetings))
|
||||
processed_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing meeting {meeting.id}: {e}")
|
||||
finally:
|
||||
try:
|
||||
lock.release()
|
||||
except LockError:
|
||||
pass # Lock already released or expired
|
||||
|
||||
logger.info(
|
||||
f"Processed {processed_count} meetings, skipped {skipped_count} (locked by other workers)"
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
|
||||
Reference in New Issue
Block a user