mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 04:39:06 +00:00
fix: room concurrency (theoretically) (#511)
* fix: room concurrency (theoretically) * cleanup * cleanup
This commit is contained in:
@@ -0,0 +1,35 @@
|
|||||||
|
"""add_unique_constraint_one_active_meeting_per_room
|
||||||
|
|
||||||
|
Revision ID: b7df9609542c
|
||||||
|
Revises: d7fbb74b673b
|
||||||
|
Create Date: 2025-07-25 16:27:06.959868
|
||||||
|
|
||||||
|
"""
|
||||||
|
from typing import Sequence, Union
|
||||||
|
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision: str = 'b7df9609542c'
|
||||||
|
down_revision: Union[str, None] = 'd7fbb74b673b'
|
||||||
|
branch_labels: Union[str, Sequence[str], None] = None
|
||||||
|
depends_on: Union[str, Sequence[str], None] = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
# Create a partial unique index that ensures only one active meeting per room
|
||||||
|
# This works for both PostgreSQL and SQLite
|
||||||
|
op.create_index(
|
||||||
|
'idx_one_active_meeting_per_room',
|
||||||
|
'meeting',
|
||||||
|
['room_id'],
|
||||||
|
unique=True,
|
||||||
|
postgresql_where=sa.text('is_active = true'),
|
||||||
|
sqlite_where=sa.text('is_active = 1')
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
op.drop_index('idx_one_active_meeting_per_room', table_name='meeting')
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import Annotated, Optional, Literal
|
from typing import Annotated, Optional, Literal
|
||||||
|
import logging
|
||||||
|
|
||||||
import reflector.auth as auth
|
import reflector.auth as auth
|
||||||
from fastapi import APIRouter, Depends, HTTPException
|
from fastapi import APIRouter, Depends, HTTPException
|
||||||
@@ -7,10 +8,14 @@ from fastapi_pagination import Page
|
|||||||
from fastapi_pagination.ext.databases import paginate
|
from fastapi_pagination.ext.databases import paginate
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
from reflector.db import database
|
from reflector.db import database
|
||||||
from reflector.db.meetings import meetings_controller
|
from reflector.db.meetings import Meeting, meetings_controller
|
||||||
from reflector.db.rooms import rooms_controller
|
from reflector.db.rooms import rooms_controller
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
from reflector.whereby import create_meeting, upload_logo
|
from reflector.whereby import create_meeting, upload_logo
|
||||||
|
import asyncpg.exceptions
|
||||||
|
import sqlite3
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
@@ -30,7 +35,7 @@ class Room(BaseModel):
|
|||||||
is_shared: bool
|
is_shared: bool
|
||||||
|
|
||||||
|
|
||||||
class Meeting(BaseModel):
|
class Meeting(BaseModel): # noqa: F811 # Response model, different from db.meetings.Meeting
|
||||||
id: str
|
id: str
|
||||||
room_name: str
|
room_name: str
|
||||||
room_url: str
|
room_url: str
|
||||||
@@ -149,19 +154,47 @@ async def rooms_create_meeting(
|
|||||||
|
|
||||||
if meeting is None:
|
if meeting is None:
|
||||||
end_date = current_time + timedelta(hours=8)
|
end_date = current_time + timedelta(hours=8)
|
||||||
meeting = await create_meeting("", end_date=end_date, room=room)
|
|
||||||
await upload_logo(meeting["roomName"], "./images/logo.png")
|
|
||||||
|
|
||||||
meeting = await meetings_controller.create(
|
whereby_meeting = await create_meeting("", end_date=end_date, room=room)
|
||||||
id=meeting["meetingId"],
|
await upload_logo(whereby_meeting["roomName"], "./images/logo.png")
|
||||||
room_name=meeting["roomName"],
|
|
||||||
room_url=meeting["roomUrl"],
|
# Now try to save to database
|
||||||
host_room_url=meeting["hostRoomUrl"],
|
try:
|
||||||
start_date=datetime.fromisoformat(meeting["startDate"]),
|
meeting = await meetings_controller.create(
|
||||||
end_date=datetime.fromisoformat(meeting["endDate"]),
|
id=whereby_meeting["meetingId"],
|
||||||
user_id=user_id,
|
room_name=whereby_meeting["roomName"],
|
||||||
room=room,
|
room_url=whereby_meeting["roomUrl"],
|
||||||
)
|
host_room_url=whereby_meeting["hostRoomUrl"],
|
||||||
|
start_date=datetime.fromisoformat(whereby_meeting["startDate"]),
|
||||||
|
end_date=datetime.fromisoformat(whereby_meeting["endDate"]),
|
||||||
|
user_id=user_id,
|
||||||
|
room=room,
|
||||||
|
)
|
||||||
|
except (asyncpg.exceptions.UniqueViolationError, sqlite3.IntegrityError):
|
||||||
|
# Another request already created a meeting for this room
|
||||||
|
# Log this race condition occurrence
|
||||||
|
logger.info(
|
||||||
|
"Race condition detected for room %s - fetching existing meeting",
|
||||||
|
room.name,
|
||||||
|
)
|
||||||
|
logger.warning(
|
||||||
|
"Whereby meeting %s was created but not used (resource leak) for room %s",
|
||||||
|
whereby_meeting["meetingId"],
|
||||||
|
room.name,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fetch the meeting that was created by the other request
|
||||||
|
meeting = await meetings_controller.get_active(
|
||||||
|
room=room, current_time=current_time
|
||||||
|
)
|
||||||
|
if meeting is None:
|
||||||
|
# Edge case: meeting was created but expired/deleted between checks
|
||||||
|
logger.error(
|
||||||
|
"Meeting disappeared after race condition for room %s", room.name
|
||||||
|
)
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=503, detail="Unable to join meeting - please try again"
|
||||||
|
)
|
||||||
|
|
||||||
if user_id != room.user_id:
|
if user_id != room.user_id:
|
||||||
meeting.host_room_url = ""
|
meeting.host_room_url = ""
|
||||||
|
|||||||
Reference in New Issue
Block a user