mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-22 05:09:05 +00:00
feat: dailyco poll (#730)
* dailyco api module (no-mistakes) * daily co library self-review * uncurse * self-review: daily resource leak, uniform types, enable_recording bomb, daily custom error, video_platforms/daily typing, daily timestamp dry * dailyco docs parser * phase 1-2 of daily poll * dailyco poll (no-mistakes) * poll docs * fix tests * forgotten utils file * remove generated daily docs * pr comments * dailyco poll pr review and self-review * daily recording poll api fix * daily recording poll api fix * review * review * fix tests --------- Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
This commit is contained in:
@@ -38,6 +38,14 @@ else:
|
||||
"task": "reflector.worker.process.reprocess_failed_recordings",
|
||||
"schedule": crontab(hour=5, minute=0), # Midnight EST
|
||||
},
|
||||
"poll_daily_recordings": {
|
||||
"task": "reflector.worker.process.poll_daily_recordings",
|
||||
"schedule": 180.0, # Every 3 minutes (configurable lookback window)
|
||||
},
|
||||
"trigger_daily_reconciliation": {
|
||||
"task": "reflector.worker.process.trigger_daily_reconciliation",
|
||||
"schedule": 30.0, # Every 30 seconds (queues poll tasks for all active meetings)
|
||||
},
|
||||
"sync_all_ics_calendars": {
|
||||
"task": "reflector.worker.ics_sync.sync_all_ics_calendars",
|
||||
"schedule": 60.0, # Run every minute to check which rooms need sync
|
||||
|
||||
@@ -10,8 +10,12 @@ import structlog
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
from pydantic import ValidationError
|
||||
from redis.exceptions import LockError
|
||||
|
||||
from reflector.dailyco_api import MeetingParticipantsResponse
|
||||
from reflector.db.daily_participant_sessions import (
|
||||
DailyParticipantSession,
|
||||
daily_participant_sessions_controller,
|
||||
)
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.recordings import Recording, recordings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
@@ -28,10 +32,15 @@ from reflector.pipelines.main_multitrack_pipeline import (
|
||||
from reflector.pipelines.topic_processing import EmptyPipeline
|
||||
from reflector.processors import AudioFileWriterProcessor
|
||||
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
|
||||
from reflector.redis_cache import get_redis_client
|
||||
from reflector.redis_cache import RedisAsyncLock
|
||||
from reflector.settings import settings
|
||||
from reflector.storage import get_transcripts_storage
|
||||
from reflector.utils.daily import DailyRoomName, extract_base_room_name
|
||||
from reflector.utils.daily import (
|
||||
DailyRoomName,
|
||||
extract_base_room_name,
|
||||
parse_daily_recording_filename,
|
||||
recording_lock_key,
|
||||
)
|
||||
from reflector.video_platforms.factory import create_platform_client
|
||||
from reflector.video_platforms.whereby_utils import (
|
||||
parse_whereby_recording_filename,
|
||||
@@ -178,6 +187,42 @@ async def process_multitrack_recording(
|
||||
logger.warning("No audio track keys provided")
|
||||
return
|
||||
|
||||
lock_key = recording_lock_key(recording_id)
|
||||
async with RedisAsyncLock(
|
||||
key=lock_key,
|
||||
timeout=600, # 10min for processing (includes API calls, DB writes)
|
||||
extend_interval=60, # Auto-extend every 60s
|
||||
skip_if_locked=True,
|
||||
blocking=False,
|
||||
) as lock:
|
||||
if not lock.acquired:
|
||||
logger.warning(
|
||||
"Recording processing skipped - lock already held (duplicate task or concurrent worker)",
|
||||
recording_id=recording_id,
|
||||
lock_key=lock_key,
|
||||
reason="duplicate_task_or_concurrent_worker",
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Recording worker acquired lock - starting processing",
|
||||
recording_id=recording_id,
|
||||
lock_key=lock_key,
|
||||
)
|
||||
|
||||
await _process_multitrack_recording_inner(
|
||||
bucket_name, daily_room_name, recording_id, track_keys
|
||||
)
|
||||
|
||||
|
||||
async def _process_multitrack_recording_inner(
|
||||
bucket_name: str,
|
||||
daily_room_name: DailyRoomName,
|
||||
recording_id: str,
|
||||
track_keys: list[str],
|
||||
):
|
||||
"""Inner function containing the actual processing logic."""
|
||||
|
||||
tz = timezone.utc
|
||||
recorded_at = datetime.now(tz)
|
||||
try:
|
||||
@@ -225,9 +270,7 @@ async def process_multitrack_recording(
|
||||
track_keys=track_keys,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Recording already exists; assume metadata was set at creation time
|
||||
pass
|
||||
# else: Recording already exists; metadata set at creation time
|
||||
|
||||
transcript = await transcripts_controller.get_by_recording_id(recording.id)
|
||||
if transcript:
|
||||
@@ -252,60 +295,70 @@ async def process_multitrack_recording(
|
||||
)
|
||||
|
||||
try:
|
||||
daily_client = create_platform_client("daily")
|
||||
async with create_platform_client("daily") as daily_client:
|
||||
id_to_name = {}
|
||||
id_to_user_id = {}
|
||||
|
||||
id_to_name = {}
|
||||
id_to_user_id = {}
|
||||
|
||||
mtg_session_id = None
|
||||
try:
|
||||
rec_details = await daily_client.get_recording(recording_id)
|
||||
mtg_session_id = rec_details.get("mtgSessionId")
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to fetch Daily recording details",
|
||||
error=str(e),
|
||||
recording_id=recording_id,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
if mtg_session_id:
|
||||
try:
|
||||
payload = await daily_client.get_meeting_participants(mtg_session_id)
|
||||
for p in payload.get("data", []):
|
||||
pid = p.get("participant_id")
|
||||
name = p.get("user_name")
|
||||
user_id = p.get("user_id")
|
||||
if pid and name:
|
||||
id_to_name[pid] = name
|
||||
if pid and user_id:
|
||||
id_to_user_id[pid] = user_id
|
||||
rec_details = await daily_client.get_recording(recording_id)
|
||||
mtg_session_id = rec_details.mtgSessionId
|
||||
if mtg_session_id:
|
||||
try:
|
||||
payload: MeetingParticipantsResponse = (
|
||||
await daily_client.get_meeting_participants(mtg_session_id)
|
||||
)
|
||||
for p in payload.data:
|
||||
pid = p.participant_id
|
||||
assert (
|
||||
pid is not None
|
||||
), "panic! participant id cannot be None"
|
||||
name = p.user_name
|
||||
user_id = p.user_id
|
||||
if name:
|
||||
id_to_name[pid] = name
|
||||
if user_id:
|
||||
id_to_user_id[pid] = user_id
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to fetch Daily meeting participants",
|
||||
error=str(e),
|
||||
mtg_session_id=mtg_session_id,
|
||||
exc_info=True,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"No mtgSessionId found for recording; participant names may be generic",
|
||||
recording_id=recording_id,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to fetch Daily meeting participants",
|
||||
"Failed to fetch Daily recording details",
|
||||
error=str(e),
|
||||
mtg_session_id=mtg_session_id,
|
||||
recording_id=recording_id,
|
||||
exc_info=True,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"No mtgSessionId found for recording; participant names may be generic",
|
||||
recording_id=recording_id,
|
||||
)
|
||||
|
||||
for idx, key in enumerate(track_keys):
|
||||
base = os.path.basename(key)
|
||||
m = re.search(r"\d{13,}-([0-9a-fA-F-]{36})-cam-audio-", base)
|
||||
participant_id = m.group(1) if m else None
|
||||
for idx, key in enumerate(track_keys):
|
||||
try:
|
||||
parsed = parse_daily_recording_filename(key)
|
||||
participant_id = parsed.participant_id
|
||||
except ValueError as e:
|
||||
logger.error(
|
||||
"Failed to parse Daily recording filename",
|
||||
error=str(e),
|
||||
key=key,
|
||||
exc_info=True,
|
||||
)
|
||||
continue
|
||||
|
||||
default_name = f"Speaker {idx}"
|
||||
name = id_to_name.get(participant_id, default_name)
|
||||
user_id = id_to_user_id.get(participant_id)
|
||||
default_name = f"Speaker {idx}"
|
||||
name = id_to_name.get(participant_id, default_name)
|
||||
user_id = id_to_user_id.get(participant_id)
|
||||
|
||||
participant = TranscriptParticipant(
|
||||
id=participant_id, speaker=idx, name=name, user_id=user_id
|
||||
)
|
||||
await transcripts_controller.upsert_participant(transcript, participant)
|
||||
participant = TranscriptParticipant(
|
||||
id=participant_id, speaker=idx, name=name, user_id=user_id
|
||||
)
|
||||
await transcripts_controller.upsert_participant(transcript, participant)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Failed to map participant names", error=str(e), exc_info=True)
|
||||
@@ -317,6 +370,207 @@ async def process_multitrack_recording(
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def poll_daily_recordings():
|
||||
"""Poll Daily.co API for recordings and process missing ones.
|
||||
|
||||
Fetches latest recordings from Daily.co API (default limit 100), compares with DB,
|
||||
and queues processing for recordings not already in DB.
|
||||
|
||||
For each missing recording, uses audio tracks from API response.
|
||||
|
||||
Worker-level locking provides idempotency (see process_multitrack_recording).
|
||||
"""
|
||||
bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME
|
||||
if not bucket_name:
|
||||
logger.debug(
|
||||
"DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; skipping recording poll"
|
||||
)
|
||||
return
|
||||
|
||||
async with create_platform_client("daily") as daily_client:
|
||||
# latest 100. TODO cursor-based state
|
||||
api_recordings = await daily_client.list_recordings()
|
||||
|
||||
if not api_recordings:
|
||||
logger.debug(
|
||||
"No recordings found from Daily.co API",
|
||||
)
|
||||
return
|
||||
|
||||
recording_ids = [rec.id for rec in api_recordings]
|
||||
existing_recordings = await recordings_controller.get_by_ids(recording_ids)
|
||||
existing_ids = {rec.id for rec in existing_recordings}
|
||||
|
||||
missing_recordings = [rec for rec in api_recordings if rec.id not in existing_ids]
|
||||
|
||||
if not missing_recordings:
|
||||
logger.debug(
|
||||
"All recordings already in DB",
|
||||
api_count=len(api_recordings),
|
||||
existing_count=len(existing_recordings),
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Found recordings missing from DB",
|
||||
missing_count=len(missing_recordings),
|
||||
total_api_count=len(api_recordings),
|
||||
existing_count=len(existing_recordings),
|
||||
)
|
||||
|
||||
for recording in missing_recordings:
|
||||
if not recording.tracks:
|
||||
assert recording.status != "finished", (
|
||||
f"Recording {recording.id} has status='finished' but no tracks. "
|
||||
f"Daily.co API guarantees finished recordings have tracks available. "
|
||||
f"room_name={recording.room_name}"
|
||||
)
|
||||
logger.debug(
|
||||
"No tracks in recording yet",
|
||||
recording_id=recording.id,
|
||||
room_name=recording.room_name,
|
||||
status=recording.status,
|
||||
)
|
||||
continue
|
||||
|
||||
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
|
||||
|
||||
if not track_keys:
|
||||
logger.warning(
|
||||
"No audio tracks found in recording (only video tracks)",
|
||||
recording_id=recording.id,
|
||||
room_name=recording.room_name,
|
||||
total_tracks=len(recording.tracks),
|
||||
)
|
||||
continue
|
||||
|
||||
logger.info(
|
||||
"Queueing missing recording for processing",
|
||||
recording_id=recording.id,
|
||||
room_name=recording.room_name,
|
||||
track_count=len(track_keys),
|
||||
)
|
||||
|
||||
process_multitrack_recording.delay(
|
||||
bucket_name=bucket_name,
|
||||
daily_room_name=recording.room_name,
|
||||
recording_id=recording.id,
|
||||
track_keys=track_keys,
|
||||
)
|
||||
|
||||
|
||||
async def poll_daily_room_presence(meeting_id: str) -> None:
|
||||
"""Poll Daily.co room presence and reconcile with DB sessions. New presence is added, old presence is marked as closed.
|
||||
Warning: Daily api returns only current state, so there could be missed presence updates, people who went and left the room quickly.
|
||||
Therefore, set(presences) != set(recordings) even if everyone said something. This is not a problem but should be noted."""
|
||||
|
||||
async with RedisAsyncLock(
|
||||
key=f"meeting_presence_poll:{meeting_id}",
|
||||
timeout=120,
|
||||
extend_interval=30,
|
||||
skip_if_locked=True,
|
||||
blocking=False,
|
||||
) as lock:
|
||||
if not lock.acquired:
|
||||
logger.debug(
|
||||
"Concurrent poll skipped (duplicate task)", meeting_id=meeting_id
|
||||
)
|
||||
return
|
||||
|
||||
meeting = await meetings_controller.get_by_id(meeting_id)
|
||||
if not meeting:
|
||||
logger.warning("Meeting not found", meeting_id=meeting_id)
|
||||
return
|
||||
|
||||
async with create_platform_client("daily") as daily_client:
|
||||
try:
|
||||
presence = await daily_client.get_room_presence(meeting.room_name)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Daily.co API fetch failed",
|
||||
meeting_id=meeting.id,
|
||||
room_name=meeting.room_name,
|
||||
error=str(e),
|
||||
exc_info=True,
|
||||
)
|
||||
return
|
||||
|
||||
api_participants = {p.id: p for p in presence.data}
|
||||
db_sessions = (
|
||||
await daily_participant_sessions_controller.get_all_sessions_for_meeting(
|
||||
meeting.id
|
||||
)
|
||||
)
|
||||
|
||||
active_session_ids = {
|
||||
sid for sid, s in db_sessions.items() if s.left_at is None
|
||||
}
|
||||
missing_session_ids = set(api_participants.keys()) - active_session_ids
|
||||
stale_session_ids = active_session_ids - set(api_participants.keys())
|
||||
|
||||
if missing_session_ids:
|
||||
missing_sessions = []
|
||||
for session_id in missing_session_ids:
|
||||
p = api_participants[session_id]
|
||||
session = DailyParticipantSession(
|
||||
id=f"{meeting.id}:{session_id}",
|
||||
meeting_id=meeting.id,
|
||||
room_id=meeting.room_id,
|
||||
session_id=session_id,
|
||||
user_id=p.userId,
|
||||
user_name=p.userName,
|
||||
joined_at=datetime.fromisoformat(p.joinTime),
|
||||
left_at=None,
|
||||
)
|
||||
missing_sessions.append(session)
|
||||
|
||||
await daily_participant_sessions_controller.batch_upsert_sessions(
|
||||
missing_sessions
|
||||
)
|
||||
logger.info(
|
||||
"Sessions added",
|
||||
meeting_id=meeting.id,
|
||||
count=len(missing_sessions),
|
||||
)
|
||||
|
||||
if stale_session_ids:
|
||||
composite_ids = [f"{meeting.id}:{sid}" for sid in stale_session_ids]
|
||||
await daily_participant_sessions_controller.batch_close_sessions(
|
||||
composite_ids,
|
||||
left_at=datetime.now(timezone.utc),
|
||||
)
|
||||
logger.info(
|
||||
"Stale sessions closed",
|
||||
meeting_id=meeting.id,
|
||||
count=len(composite_ids),
|
||||
)
|
||||
|
||||
final_active_count = len(api_participants)
|
||||
if meeting.num_clients != final_active_count:
|
||||
await meetings_controller.update_meeting(
|
||||
meeting.id,
|
||||
num_clients=final_active_count,
|
||||
)
|
||||
logger.info(
|
||||
"num_clients updated",
|
||||
meeting_id=meeting.id,
|
||||
old_value=meeting.num_clients,
|
||||
new_value=final_active_count,
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def poll_daily_room_presence_task(meeting_id: str) -> None:
|
||||
"""Celery task wrapper for poll_daily_room_presence.
|
||||
|
||||
Queued by webhooks or reconciliation timer.
|
||||
"""
|
||||
await poll_daily_room_presence(meeting_id)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def process_meetings():
|
||||
@@ -335,74 +589,71 @@ async def process_meetings():
|
||||
Uses distributed locking to prevent race conditions when multiple workers
|
||||
process the same meeting simultaneously.
|
||||
"""
|
||||
|
||||
meetings = await meetings_controller.get_all_active()
|
||||
logger.info(f"Processing {len(meetings)} meetings")
|
||||
current_time = datetime.now(timezone.utc)
|
||||
redis_client = get_redis_client()
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
for meeting in meetings:
|
||||
logger_ = logger.bind(meeting_id=meeting.id, room_name=meeting.room_name)
|
||||
logger_.info("Processing meeting")
|
||||
lock_key = f"meeting_process_lock:{meeting.id}"
|
||||
lock = redis_client.lock(lock_key, timeout=120)
|
||||
|
||||
try:
|
||||
if not lock.acquire(blocking=False):
|
||||
logger_.debug("Meeting is being processed by another worker, skipping")
|
||||
skipped_count += 1
|
||||
continue
|
||||
async with RedisAsyncLock(
|
||||
key=f"meeting_process_lock:{meeting.id}",
|
||||
timeout=120,
|
||||
extend_interval=30,
|
||||
skip_if_locked=True,
|
||||
blocking=False,
|
||||
) as lock:
|
||||
if not lock.acquired:
|
||||
logger_.debug(
|
||||
"Meeting is being processed by another worker, skipping"
|
||||
)
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Process the meeting
|
||||
should_deactivate = False
|
||||
end_date = meeting.end_date
|
||||
if end_date.tzinfo is None:
|
||||
end_date = end_date.replace(tzinfo=timezone.utc)
|
||||
# Process the meeting
|
||||
should_deactivate = False
|
||||
end_date = meeting.end_date
|
||||
if end_date.tzinfo is None:
|
||||
end_date = end_date.replace(tzinfo=timezone.utc)
|
||||
|
||||
client = create_platform_client(meeting.platform)
|
||||
room_sessions = await client.get_room_sessions(meeting.room_name)
|
||||
client = create_platform_client(meeting.platform)
|
||||
room_sessions = await client.get_room_sessions(meeting.room_name)
|
||||
|
||||
try:
|
||||
# Extend lock after operation to ensure we still hold it
|
||||
lock.extend(120, replace_ttl=True)
|
||||
except LockError:
|
||||
logger_.warning("Lost lock for meeting, skipping")
|
||||
continue
|
||||
|
||||
has_active_sessions = room_sessions and any(
|
||||
s.ended_at is None for s in room_sessions
|
||||
)
|
||||
has_had_sessions = bool(room_sessions)
|
||||
logger_.info(
|
||||
f"found {has_active_sessions} active sessions, had {has_had_sessions}"
|
||||
)
|
||||
|
||||
if has_active_sessions:
|
||||
logger_.debug("Meeting still has active sessions, keep it")
|
||||
elif has_had_sessions:
|
||||
should_deactivate = True
|
||||
logger_.info("Meeting ended - all participants left")
|
||||
elif current_time > end_date:
|
||||
should_deactivate = True
|
||||
logger_.info(
|
||||
"Meeting deactivated - scheduled time ended with no participants",
|
||||
has_active_sessions = room_sessions and any(
|
||||
s.ended_at is None for s in room_sessions
|
||||
)
|
||||
has_had_sessions = bool(room_sessions)
|
||||
logger_.info(
|
||||
f"found {has_active_sessions} active sessions, had {has_had_sessions}"
|
||||
)
|
||||
else:
|
||||
logger_.debug("Meeting not yet started, keep it")
|
||||
|
||||
if should_deactivate:
|
||||
await meetings_controller.update_meeting(meeting.id, is_active=False)
|
||||
logger_.info("Meeting is deactivated")
|
||||
if has_active_sessions:
|
||||
logger_.debug("Meeting still has active sessions, keep it")
|
||||
elif has_had_sessions:
|
||||
should_deactivate = True
|
||||
logger_.info("Meeting ended - all participants left")
|
||||
elif current_time > end_date:
|
||||
should_deactivate = True
|
||||
logger_.info(
|
||||
"Meeting deactivated - scheduled time ended with no participants",
|
||||
)
|
||||
else:
|
||||
logger_.debug("Meeting not yet started, keep it")
|
||||
|
||||
processed_count += 1
|
||||
if should_deactivate:
|
||||
await meetings_controller.update_meeting(
|
||||
meeting.id, is_active=False
|
||||
)
|
||||
logger_.info("Meeting is deactivated")
|
||||
|
||||
processed_count += 1
|
||||
|
||||
except Exception:
|
||||
logger_.error("Error processing meeting", exc_info=True)
|
||||
finally:
|
||||
try:
|
||||
lock.release()
|
||||
except LockError:
|
||||
pass # Lock already released or expired
|
||||
|
||||
logger.debug(
|
||||
"Processed meetings finished",
|
||||
@@ -524,3 +775,34 @@ async def reprocess_failed_recordings():
|
||||
|
||||
logger.info(f"Reprocessing complete. Requeued {reprocessed_count} recordings")
|
||||
return reprocessed_count
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def trigger_daily_reconciliation() -> None:
|
||||
"""Daily.co pull"""
|
||||
try:
|
||||
active_meetings = await meetings_controller.get_all_active(platform="daily")
|
||||
queued_count = 0
|
||||
|
||||
for meeting in active_meetings:
|
||||
try:
|
||||
poll_daily_room_presence_task.delay(meeting.id)
|
||||
queued_count += 1
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Failed to queue reconciliation poll",
|
||||
meeting_id=meeting.id,
|
||||
error=str(e),
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
|
||||
if queued_count > 0:
|
||||
logger.debug(
|
||||
"Reconciliation polls queued",
|
||||
count=queued_count,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Reconciliation trigger failed", error=str(e), exc_info=True)
|
||||
|
||||
Reference in New Issue
Block a user