mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 12:49:06 +00:00
fix: rework process logic for meeting
This commit is contained in:
@@ -172,14 +172,13 @@ async def process_meetings():
|
|||||||
skipped_count = 0
|
skipped_count = 0
|
||||||
|
|
||||||
for meeting in meetings:
|
for meeting in meetings:
|
||||||
|
logger_ = logger.bind(meeting_id=meeting.id, room_name=meeting.room_name)
|
||||||
lock_key = f"meeting_process_lock:{meeting.id}"
|
lock_key = f"meeting_process_lock:{meeting.id}"
|
||||||
lock = redis_client.lock(lock_key, timeout=120)
|
lock = redis_client.lock(lock_key, timeout=120)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if not lock.acquire(blocking=False):
|
if not lock.acquire(blocking=False):
|
||||||
logger.debug(
|
logger_.debug("Meeting is being processed by another worker, skipping")
|
||||||
f"Meeting {meeting.id} is being processed by another worker, skipping"
|
|
||||||
)
|
|
||||||
skipped_count += 1
|
skipped_count += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
@@ -191,13 +190,12 @@ async def process_meetings():
|
|||||||
|
|
||||||
# This API call could be slow, extend lock if needed
|
# This API call could be slow, extend lock if needed
|
||||||
response = await get_room_sessions(meeting.room_name)
|
response = await get_room_sessions(meeting.room_name)
|
||||||
print(response)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Extend lock after slow operation to ensure we still hold it
|
# Extend lock after slow operation to ensure we still hold it
|
||||||
lock.extend(120, replace_ttl=True)
|
lock.extend(120, replace_ttl=True)
|
||||||
except LockError:
|
except LockError:
|
||||||
logger.warning(f"Lost lock for meeting {meeting.id}, skipping")
|
logger_.warning("Lost lock for meeting, skipping")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
room_sessions = response.get("results", [])
|
room_sessions = response.get("results", [])
|
||||||
@@ -207,41 +205,27 @@ async def process_meetings():
|
|||||||
has_had_sessions = bool(room_sessions)
|
has_had_sessions = bool(room_sessions)
|
||||||
|
|
||||||
if has_active_sessions:
|
if has_active_sessions:
|
||||||
logger.debug("Meeting %s still has active sessions", meeting.id)
|
logger_.debug("Meeting still has active sessions, keep it")
|
||||||
else:
|
elif has_had_sessions:
|
||||||
if meeting.calendar_event_id:
|
|
||||||
if has_had_sessions:
|
|
||||||
should_deactivate = True
|
should_deactivate = True
|
||||||
logger.info(
|
logger_.info("Meeting ended - all participants left")
|
||||||
"Calendar meeting %s ended - all participants left",
|
|
||||||
meeting.id,
|
|
||||||
)
|
|
||||||
elif current_time > end_date:
|
elif current_time > end_date:
|
||||||
should_deactivate = True
|
should_deactivate = True
|
||||||
logger.info(
|
logger_.info(
|
||||||
"Calendar meeting %s deactivated - scheduled time ended with no participants",
|
"Meeting deactivated - scheduled time ended with no participants",
|
||||||
meeting.id,
|
meeting.id,
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
logger.debug(
|
logger_.debug("Meeting not yet started, keep it")
|
||||||
"Calendar meeting %s waiting for participants until %s",
|
|
||||||
meeting.id,
|
|
||||||
end_date,
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
should_deactivate = True
|
|
||||||
logger.info(
|
|
||||||
"On-the-fly meeting %s has no active sessions", meeting.id
|
|
||||||
)
|
|
||||||
|
|
||||||
if should_deactivate:
|
if should_deactivate:
|
||||||
await meetings_controller.update_meeting(meeting.id, is_active=False)
|
await meetings_controller.update_meeting(meeting.id, is_active=False)
|
||||||
logger.info("Meeting %s is deactivated", meeting.id)
|
logger_.info("Meeting is deactivated")
|
||||||
|
|
||||||
processed_count += 1
|
processed_count += 1
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error processing meeting {meeting.id}: {e}")
|
logger_.error(f"Error processing meeting", exc_info=True)
|
||||||
finally:
|
finally:
|
||||||
try:
|
try:
|
||||||
lock.release()
|
lock.release()
|
||||||
@@ -249,7 +233,9 @@ async def process_meetings():
|
|||||||
pass # Lock already released or expired
|
pass # Lock already released or expired
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Processed {processed_count} meetings, skipped {skipped_count} (locked by other workers)"
|
f"Processed meetings finished",
|
||||||
|
processed_count=processed_count,
|
||||||
|
skipped_count=skipped_count,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user