mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 20:29:06 +00:00
Trigger multitrack processing for daily recordings
This commit is contained in:
@@ -8,7 +8,9 @@ from pydantic import BaseModel
|
|||||||
|
|
||||||
from reflector.db.meetings import meetings_controller
|
from reflector.db.meetings import meetings_controller
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
|
from reflector.settings import settings
|
||||||
from reflector.video_platforms.factory import create_platform_client
|
from reflector.video_platforms.factory import create_platform_client
|
||||||
|
from reflector.worker.process import process_multitrack_recording
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
@@ -207,14 +209,22 @@ async def _handle_recording_ready(event: DailyWebhookEvent):
|
|||||||
platform="daily",
|
platform="daily",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Import at runtime to avoid circular dependency (process.py imports from daily.py)
|
bucket_name = settings.AWS_DAILY_S3_BUCKET
|
||||||
from reflector.worker.process import process_daily_recording # noqa: PLC0415
|
if not bucket_name:
|
||||||
|
logger.error(
|
||||||
|
"AWS_DAILY_S3_BUCKET not configured; cannot process Daily recording"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
# Convert Pydantic models to dicts for Celery serialization
|
if not settings.DAILY_SUBDOMAIN:
|
||||||
process_daily_recording.delay(
|
logger.error(
|
||||||
meeting_id=meeting.id,
|
"DAILY_SUBDOMAIN not configured; cannot compute S3 prefix from room name"
|
||||||
recording_id=recording_id or event.id,
|
)
|
||||||
tracks=[t.model_dump() for t in tracks],
|
return
|
||||||
|
prefix = f"{settings.DAILY_SUBDOMAIN}/{room_name}"
|
||||||
|
|
||||||
|
process_multitrack_recording.delay(
|
||||||
|
bucket_name=bucket_name, prefix=prefix, meeting_id=meeting.id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -153,16 +153,16 @@ async def process_recording(bucket_name: str, object_key: str):
|
|||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
@asynctask
|
@asynctask
|
||||||
async def process_multitrack_recording(bucket_name: str, prefix: str):
|
async def process_multitrack_recording(bucket_name: str, prefix: str, meeting_id: str):
|
||||||
logger.info(
|
logger.info(
|
||||||
"Processing multitrack recording",
|
"Processing multitrack recording",
|
||||||
bucket=bucket_name,
|
bucket=bucket_name,
|
||||||
prefix=prefix,
|
prefix=prefix,
|
||||||
room_name="daily",
|
meeting_id=meeting_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Parse an approximate recorded_at from the prefix directory name
|
||||||
try:
|
try:
|
||||||
effective_room_name = "/daily"
|
|
||||||
dir_name = prefix.rstrip("/").split("/")[-1]
|
dir_name = prefix.rstrip("/").split("/")[-1]
|
||||||
ts_match = re.search(r"(\d{14})$", dir_name)
|
ts_match = re.search(r"(\d{14})$", dir_name)
|
||||||
if ts_match:
|
if ts_match:
|
||||||
@@ -177,32 +177,12 @@ async def process_multitrack_recording(bucket_name: str, prefix: str):
|
|||||||
recorded_at = datetime.now(timezone.utc)
|
recorded_at = datetime.now(timezone.utc)
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.warning("Could not parse recorded_at from prefix, using now()")
|
logger.warning("Could not parse recorded_at from prefix, using now()")
|
||||||
effective_room_name = "/daily"
|
|
||||||
recorded_at = datetime.now(timezone.utc)
|
recorded_at = datetime.now(timezone.utc)
|
||||||
|
|
||||||
meeting = await meetings_controller.get_by_room_name(effective_room_name)
|
meeting = await meetings_controller.get_by_id(meeting_id)
|
||||||
if meeting:
|
if not meeting:
|
||||||
room = await rooms_controller.get_by_id(meeting.room_id)
|
raise Exception(f"Meeting not found: {meeting_id}")
|
||||||
else:
|
room = await rooms_controller.get_by_id(meeting.room_id)
|
||||||
room = await rooms_controller.get_by_name(effective_room_name.lstrip("/"))
|
|
||||||
if not room:
|
|
||||||
raise Exception(f"Room not found: {effective_room_name}")
|
|
||||||
start_date = recorded_at
|
|
||||||
end_date = recorded_at
|
|
||||||
try:
|
|
||||||
dummy = await meetings_controller.create(
|
|
||||||
id=room.id + "-" + recorded_at.strftime("%Y%m%d%H%M%S"),
|
|
||||||
room_name=effective_room_name,
|
|
||||||
room_url=f"{effective_room_name}",
|
|
||||||
host_room_url=f"{effective_room_name}",
|
|
||||||
start_date=start_date,
|
|
||||||
end_date=end_date,
|
|
||||||
room=room,
|
|
||||||
)
|
|
||||||
meeting = dummy
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Failed to create dummy meeting", error=str(e))
|
|
||||||
meeting = None
|
|
||||||
|
|
||||||
recording = await recordings_controller.get_by_object_key(bucket_name, prefix)
|
recording = await recordings_controller.get_by_object_key(bucket_name, prefix)
|
||||||
if not recording:
|
if not recording:
|
||||||
@@ -211,7 +191,7 @@ async def process_multitrack_recording(bucket_name: str, prefix: str):
|
|||||||
bucket_name=bucket_name,
|
bucket_name=bucket_name,
|
||||||
object_key=prefix,
|
object_key=prefix,
|
||||||
recorded_at=recorded_at,
|
recorded_at=recorded_at,
|
||||||
meeting_id=meeting.id if meeting else None,
|
meeting_id=meeting.id,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -232,7 +212,7 @@ async def process_multitrack_recording(bucket_name: str, prefix: str):
|
|||||||
user_id=room.user_id,
|
user_id=room.user_id,
|
||||||
recording_id=recording.id,
|
recording_id=recording.id,
|
||||||
share_mode="public",
|
share_mode="public",
|
||||||
meeting_id=meeting.id if meeting else None,
|
meeting_id=meeting.id,
|
||||||
room_id=room.id,
|
room_id=room.id,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user