Files
reflector/server/reflector/worker/process.py
2026-01-30 14:37:53 -05:00

1226 lines
42 KiB
Python

import json
import os
import re
from datetime import datetime, timezone
from typing import List, Literal
from urllib.parse import unquote
import av
import boto3
import structlog
from celery import shared_task
from celery.utils.log import get_task_logger
from pydantic import ValidationError
from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
daily_participant_sessions_controller,
)
from reflector.db.meetings import meetings_controller
from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import (
SourceKind,
transcripts_controller,
)
from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import (
task_pipeline_multitrack_process,
)
from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.redis_cache import RedisAsyncLock
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
from reflector.utils.daily import (
DailyRoomName,
extract_base_room_name,
filter_cam_audio_tracks,
recording_lock_key,
)
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
from reflector.video_platforms.whereby_utils import (
parse_whereby_recording_filename,
room_name_to_whereby_api_room_name,
)
logger = structlog.wrap_logger(get_task_logger(__name__))
@shared_task
def process_messages():
queue_url = settings.AWS_PROCESS_RECORDING_QUEUE_URL
if not queue_url:
logger.warning("No process recording queue url")
return
try:
logger.info("Receiving messages from: %s", queue_url)
sqs = boto3.client(
"sqs",
region_name=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
)
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=["SentTimestamp"],
MaxNumberOfMessages=1,
MessageAttributeNames=["All"],
VisibilityTimeout=0,
WaitTimeSeconds=0,
)
for message in response.get("Messages", []):
receipt_handle = message["ReceiptHandle"]
body = json.loads(message["Body"])
for record in body.get("Records", []):
if record["eventName"].startswith("ObjectCreated"):
bucket = record["s3"]["bucket"]["name"]
key = unquote(record["s3"]["object"]["key"])
process_recording.delay(bucket, key)
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
logger.info("Processed and deleted message: %s", message)
except Exception as e:
logger.error("process_messages", error=str(e))
# only whereby supported.
@shared_task
@asynctask
async def process_recording(bucket_name: str, object_key: str):
logger.info("Processing recording: %s/%s", bucket_name, object_key)
room_name_part, recorded_at = parse_whereby_recording_filename(object_key)
# we store whereby api room names, NOT whereby room names
room_name = room_name_to_whereby_api_room_name(room_name_part)
meeting = await meetings_controller.get_by_room_name(room_name)
room = await rooms_controller.get_by_id(meeting.room_id)
recording = await recordings_controller.get_by_object_key(bucket_name, object_key)
if not recording:
recording = await recordings_controller.create(
Recording(
bucket_name=bucket_name,
object_key=object_key,
recorded_at=recorded_at,
meeting_id=meeting.id,
)
)
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if transcript:
await transcripts_controller.update(
transcript,
{
"topics": [],
"participants": [],
},
)
else:
transcript = await transcripts_controller.add(
"",
source_kind=SourceKind.ROOM,
source_language="en",
target_language="en",
user_id=room.user_id,
recording_id=recording.id,
share_mode="public",
meeting_id=meeting.id,
room_id=room.id,
)
_, extension = os.path.splitext(object_key)
upload_filename = transcript.data_path / f"upload{extension}"
upload_filename.parent.mkdir(parents=True, exist_ok=True)
storage = get_transcripts_storage()
try:
with open(upload_filename, "wb") as f:
await storage.stream_to_fileobj(object_key, f, bucket=bucket_name)
except Exception:
# Clean up partial file on stream failure
upload_filename.unlink(missing_ok=True)
raise
container = av.open(upload_filename.as_posix())
try:
if not len(container.streams.audio):
raise Exception("File has no audio stream")
except Exception:
upload_filename.unlink()
raise
finally:
container.close()
await transcripts_controller.update(transcript, {"status": "uploaded"})
task_pipeline_file_process.delay(transcript_id=transcript.id)
@shared_task
@asynctask
async def process_multitrack_recording(
bucket_name: str,
daily_room_name: DailyRoomName,
recording_id: str,
track_keys: list[str],
recording_start_ts: int,
):
"""
Process raw-tracks (multitrack) recording from Daily.co.
"""
logger.info(
"Processing multitrack recording",
bucket=bucket_name,
room_name=daily_room_name,
recording_id=recording_id,
provided_keys=len(track_keys),
recording_start_ts=recording_start_ts,
)
if not track_keys:
logger.warning("No audio track keys provided")
return
lock_key = recording_lock_key(recording_id)
async with RedisAsyncLock(
key=lock_key,
timeout=600, # 10min for processing (includes API calls, DB writes)
extend_interval=60, # Auto-extend every 60s
skip_if_locked=True,
blocking=False,
) as lock:
if not lock.acquired:
logger.warning(
"Recording processing skipped - lock already held (duplicate task or concurrent worker)",
recording_id=recording_id,
lock_key=lock_key,
reason="duplicate_task_or_concurrent_worker",
)
return
logger.info(
"Recording worker acquired lock - starting processing",
recording_id=recording_id,
lock_key=lock_key,
)
await _process_multitrack_recording_inner(
bucket_name, daily_room_name, recording_id, track_keys, recording_start_ts
)
async def _process_multitrack_recording_inner(
bucket_name: str,
daily_room_name: DailyRoomName,
recording_id: str,
track_keys: list[str],
recording_start_ts: int,
):
"""
Process multitrack recording (first time or reprocessing).
For first processing (webhook/polling):
- Uses recording_start_ts for time-based meeting matching (no instanceId available)
For reprocessing:
- Uses recording.meeting_id directly (already linked during first processing)
- recording_start_ts is ignored
"""
tz = timezone.utc
recorded_at = datetime.now(tz)
try:
if track_keys:
folder = os.path.basename(os.path.dirname(track_keys[0]))
ts_match = re.search(r"(\d{14})$", folder)
if ts_match:
ts = ts_match.group(1)
recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(tzinfo=tz)
except Exception as e:
logger.warning(
f"Could not parse recorded_at from keys, using now() {recorded_at}",
e,
exc_info=True,
)
# Check if recording already exists (reprocessing path)
recording = await recordings_controller.get_by_id(recording_id)
if recording and recording.meeting_id:
# Reprocessing: recording exists with meeting already linked
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.error(
"Reprocessing: meeting not found for recording - skipping",
meeting_id=recording.meeting_id,
recording_id=recording_id,
)
return
logger.info(
"Reprocessing: using existing recording.meeting_id",
recording_id=recording_id,
meeting_id=meeting.id,
room_name=daily_room_name,
)
else:
# First processing: recording doesn't exist, need time-based matching
# (Daily.co doesn't return instanceId in API, must match by timestamp)
recording_start = datetime.fromtimestamp(recording_start_ts, tz=timezone.utc)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=daily_room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.error(
"Raw-tracks: no meeting found within 1-week window (time-based match) - skipping",
recording_id=recording_id,
room_name=daily_room_name,
recording_start_ts=recording_start_ts,
recording_start=recording_start.isoformat(),
)
return # Skip processing, will retry on next poll
logger.info(
"First processing: found meeting via time-based matching",
meeting_id=meeting.id,
room_name=daily_room_name,
recording_id=recording_id,
time_delta_seconds=abs(
(meeting.start_date - recording_start).total_seconds()
),
)
room_name_base = extract_base_room_name(daily_room_name)
room = await rooms_controller.get_by_name(room_name_base)
if not room:
raise Exception(f"Room not found: {room_name_base}")
if not recording:
# Create recording (only happens during first processing)
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
recording = await recordings_controller.create(
Recording(
id=recording_id,
bucket_name=bucket_name,
object_key=object_key_dir,
recorded_at=recorded_at,
meeting_id=meeting.id,
track_keys=track_keys,
)
)
elif not recording.meeting_id:
# Recording exists but meeting_id is null (failed first processing)
# Update with meeting from time-based matching
await recordings_controller.set_meeting_id(
recording_id=recording.id,
meeting_id=meeting.id,
)
recording.meeting_id = meeting.id
logger.info(
"Updated existing recording with meeting_id",
recording_id=recording.id,
meeting_id=meeting.id,
)
transcript = await transcripts_controller.get_by_recording_id(recording.id)
if not transcript:
transcript = await transcripts_controller.add(
"",
source_kind=SourceKind.ROOM,
source_language="en",
target_language="en",
user_id=room.user_id,
recording_id=recording.id,
share_mode="public",
meeting_id=meeting.id,
room_id=room.id,
)
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_celery:
logger.info(
"Room uses legacy Celery processing",
room_id=room.id,
transcript_id=transcript.id,
)
if use_hatchet:
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording_id,
"tracks": [{"s3_key": k} for k in filter_cam_audio_tracks(track_keys)],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording_id,
"daily_recording_id": recording_id,
},
)
logger.info(
"Started Hatchet workflow",
workflow_id=workflow_id,
transcript_id=transcript.id,
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
return
# Celery pipeline (runs when durable workflows disabled)
task_pipeline_multitrack_process.delay(
transcript_id=transcript.id,
bucket_name=bucket_name,
track_keys=filter_cam_audio_tracks(track_keys),
)
@shared_task
@asynctask
async def poll_daily_recordings():
"""Poll Daily.co API for recordings and process missing ones.
Fetches latest recordings from Daily.co API (default limit 100), compares with DB,
and stores/queues missing recordings:
- Cloud recordings: Store S3 key in meeting table
- Raw-tracks recordings: Queue multitrack processing
Acts as fallback when webhooks active, primary discovery when webhooks unavailable.
Worker-level locking provides idempotency (see process_multitrack_recording).
"""
bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME
if not bucket_name:
logger.debug(
"DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; skipping recording poll"
)
return
async with create_platform_client("daily") as daily_client:
# latest 100. TODO cursor-based state
api_recordings: List[RecordingResponse] = await daily_client.list_recordings()
if not api_recordings:
logger.debug(
"No recordings found from Daily.co API",
)
return
finished_recordings: List[FinishedRecordingResponse] = []
for rec in api_recordings:
finished = rec.to_finished()
if finished is None:
logger.debug(
"Skipping unfinished recording",
recording_id=rec.id,
room_name=rec.room_name,
status=rec.status,
)
continue
finished_recordings.append(finished)
if not finished_recordings:
logger.debug(
"No finished recordings found from Daily.co API",
total_api_count=len(api_recordings),
)
return
# Separate cloud and raw-tracks recordings
cloud_recordings = []
raw_tracks_recordings = []
for rec in finished_recordings:
if rec.type:
# Daily.co API returns null type - make sure this assumption stays
# If this logs, Daily.co API changed - we can remove inference logic.
recording_type = rec.type
logger.warning(
"Recording has explicit type field from Daily.co API (unexpected, API may have changed)",
recording_id=rec.id,
room_name=rec.room_name,
recording_type=recording_type,
has_s3key=bool(rec.s3key),
tracks_count=len(rec.tracks),
)
else:
# DAILY.CO API LIMITATION:
# GET /recordings response does NOT include type field.
# Daily.co docs mention type field exists, but API never returns it.
# Verified: 84 recordings from Nov 2025 - Jan 2026 ALL have type=None.
#
# This is not a recent API change - Daily.co has never returned type.
# Must infer from structural properties.
#
# Inference heuristic (reliable for finished recordings):
# - Has tracks array → raw-tracks
# - Has s3key but no tracks → cloud
# - Neither → failed/incomplete recording
if len(rec.tracks) > 0:
recording_type = "raw-tracks"
elif rec.s3key and len(rec.tracks) == 0:
recording_type = "cloud"
else:
logger.warning(
"Recording has no type, no s3key, and no tracks - likely failed recording",
recording_id=rec.id,
room_name=rec.room_name,
status=rec.status,
duration=rec.duration,
mtg_session_id=rec.mtgSessionId,
)
continue
if recording_type == "cloud":
cloud_recordings.append(rec)
else:
raw_tracks_recordings.append(rec)
logger.debug(
"Poll results",
total=len(finished_recordings),
cloud=len(cloud_recordings),
raw_tracks=len(raw_tracks_recordings),
)
# Process cloud recordings
await _poll_cloud_recordings(cloud_recordings)
# Process raw-tracks recordings
await _poll_raw_tracks_recordings(raw_tracks_recordings, bucket_name)
async def store_cloud_recording(
recording_id: NonEmptyString,
room_name: NonEmptyString,
s3_key: NonEmptyString,
duration: int,
start_ts: int,
source: Literal["webhook", "polling"],
) -> bool:
"""
Store cloud recording reference in meeting table.
Common function for both webhook and polling code paths.
Uses time-based matching to handle duplicate room_name values.
Args:
recording_id: Daily.co recording ID
room_name: Daily.co room name
s3_key: S3 key where recording is stored
duration: Recording duration in seconds
start_ts: Unix timestamp when recording started
source: "webhook" or "polling" (for logging)
Returns:
True if stored, False if skipped/failed
"""
recording_start = datetime.fromtimestamp(start_ts, tz=timezone.utc)
meeting = await meetings_controller.get_by_room_name_and_time(
room_name=room_name,
recording_start=recording_start,
time_window_hours=168, # 1 week
)
if not meeting:
logger.warning(
f"Cloud recording ({source}): no meeting found within 1-week window",
recording_id=recording_id,
room_name=room_name,
recording_start_ts=start_ts,
recording_start=recording_start.isoformat(),
)
return False
success = await meetings_controller.set_cloud_recording_if_missing(
meeting_id=meeting.id,
s3_key=s3_key,
duration=duration,
)
if not success:
logger.debug(
f"Cloud recording ({source}): already set (race lost)",
recording_id=recording_id,
room_name=room_name,
meeting_id=meeting.id,
)
return False
logger.info(
f"Cloud recording stored via {source} (time-based match)",
meeting_id=meeting.id,
recording_id=recording_id,
s3_key=s3_key,
duration=duration,
time_delta_seconds=abs((meeting.start_date - recording_start).total_seconds()),
)
return True
async def _poll_cloud_recordings(cloud_recordings: List[FinishedRecordingResponse]):
"""
Store cloud recordings missing from meeting table via polling.
Uses time-based matching via store_cloud_recording().
"""
if not cloud_recordings:
return
stored_count = 0
for recording in cloud_recordings:
# Extract S3 key from recording (cloud recordings use s3key field)
s3_key = recording.s3key or (recording.s3.key if recording.s3 else None)
if not s3_key:
logger.warning(
"Cloud recording: missing S3 key",
recording_id=recording.id,
room_name=recording.room_name,
)
continue
stored = await store_cloud_recording(
recording_id=recording.id,
room_name=recording.room_name,
s3_key=s3_key,
duration=recording.duration,
start_ts=recording.start_ts,
source="polling",
)
if stored:
stored_count += 1
logger.info(
"Cloud recording polling complete",
total=len(cloud_recordings),
stored=stored_count,
)
async def _poll_raw_tracks_recordings(
raw_tracks_recordings: List[FinishedRecordingResponse],
bucket_name: str,
):
"""Queue raw-tracks recordings missing from DB (existing logic)."""
if not raw_tracks_recordings:
return
recording_ids = [rec.id for rec in raw_tracks_recordings]
existing_recordings = await recordings_controller.get_by_ids(recording_ids)
existing_ids = {rec.id for rec in existing_recordings}
missing_recordings = [
rec for rec in raw_tracks_recordings if rec.id not in existing_ids
]
if not missing_recordings:
logger.debug(
"All raw-tracks recordings already in DB",
api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
return
logger.info(
"Found raw-tracks recordings missing from DB",
missing_count=len(missing_recordings),
total_api_count=len(raw_tracks_recordings),
existing_count=len(existing_recordings),
)
for recording in missing_recordings:
if not recording.tracks:
logger.warning(
"Finished raw-tracks recording has no tracks (no audio captured)",
recording_id=recording.id,
room_name=recording.room_name,
)
continue
track_keys = [t.s3Key for t in recording.tracks if t.type == "audio"]
if not track_keys:
logger.warning(
"No audio tracks found in raw-tracks recording",
recording_id=recording.id,
room_name=recording.room_name,
total_tracks=len(recording.tracks),
)
continue
logger.info(
"Queueing missing raw-tracks recording for processing",
recording_id=recording.id,
room_name=recording.room_name,
track_count=len(track_keys),
)
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=recording.room_name,
recording_id=recording.id,
track_keys=track_keys,
recording_start_ts=recording.start_ts,
)
async def poll_daily_room_presence(meeting_id: str) -> None:
"""Poll Daily.co room presence and reconcile with DB sessions. New presence is added, old presence is marked as closed.
Warning: Daily api returns only current state, so there could be missed presence updates, people who went and left the room quickly.
Therefore, set(presences) != set(recordings) even if everyone said something. This is not a problem but should be noted."""
async with RedisAsyncLock(
key=f"meeting_presence_poll:{meeting_id}",
timeout=120,
extend_interval=30,
skip_if_locked=True,
blocking=False,
) as lock:
if not lock.acquired:
logger.debug(
"Concurrent poll skipped (duplicate task)", meeting_id=meeting_id
)
return
meeting = await meetings_controller.get_by_id(meeting_id)
if not meeting:
logger.warning("Meeting not found", meeting_id=meeting_id)
return
async with create_platform_client("daily") as daily_client:
try:
presence = await daily_client.get_room_presence(meeting.room_name)
except Exception as e:
logger.error(
"Daily.co API fetch failed",
meeting_id=meeting.id,
room_name=meeting.room_name,
error=str(e),
exc_info=True,
)
return
api_participants = {p.id: p for p in presence.data}
db_sessions = (
await daily_participant_sessions_controller.get_all_sessions_for_meeting(
meeting.id
)
)
active_session_ids = {
sid for sid, s in db_sessions.items() if s.left_at is None
}
missing_session_ids = set(api_participants.keys()) - active_session_ids
stale_session_ids = active_session_ids - set(api_participants.keys())
if missing_session_ids:
missing_sessions = []
for session_id in missing_session_ids:
p = api_participants[session_id]
session = DailyParticipantSession(
id=f"{meeting.id}:{session_id}",
meeting_id=meeting.id,
room_id=meeting.room_id,
session_id=session_id,
user_id=p.userId,
user_name=p.userName,
joined_at=datetime.fromisoformat(p.joinTime),
left_at=None,
)
missing_sessions.append(session)
await daily_participant_sessions_controller.batch_upsert_sessions(
missing_sessions
)
logger.info(
"Sessions added",
meeting_id=meeting.id,
count=len(missing_sessions),
)
if stale_session_ids:
composite_ids = [f"{meeting.id}:{sid}" for sid in stale_session_ids]
await daily_participant_sessions_controller.batch_close_sessions(
composite_ids,
left_at=datetime.now(timezone.utc),
)
logger.info(
"Stale sessions closed",
meeting_id=meeting.id,
count=len(composite_ids),
)
final_active_count = len(api_participants)
if meeting.num_clients != final_active_count:
await meetings_controller.update_meeting(
meeting.id,
num_clients=final_active_count,
)
logger.info(
"num_clients updated",
meeting_id=meeting.id,
old_value=meeting.num_clients,
new_value=final_active_count,
)
@shared_task
@asynctask
async def poll_daily_room_presence_task(meeting_id: str) -> None:
"""Celery task wrapper for poll_daily_room_presence.
Queued by webhooks or reconciliation timer.
"""
await poll_daily_room_presence(meeting_id)
@shared_task
@asynctask
async def process_meetings():
"""
Checks which meetings are still active and deactivates those that have ended.
Deactivation logic:
- Active sessions: Keep meeting active regardless of scheduled time
- No active sessions:
* Calendar meetings:
- If previously used (had sessions): Deactivate immediately
- If never used: Keep active until scheduled end time, then deactivate
* On-the-fly meetings: Deactivate immediately (created when someone joins,
so no sessions means everyone left)
Uses distributed locking to prevent race conditions when multiple workers
process the same meeting simultaneously.
"""
meetings = await meetings_controller.get_all_active()
logger.info(f"Processing {len(meetings)} meetings")
current_time = datetime.now(timezone.utc)
processed_count = 0
skipped_count = 0
for meeting in meetings:
logger_ = logger.bind(meeting_id=meeting.id, room_name=meeting.room_name)
logger_.info("Processing meeting")
try:
async with RedisAsyncLock(
key=f"meeting_process_lock:{meeting.id}",
timeout=120,
extend_interval=30,
skip_if_locked=True,
blocking=False,
) as lock:
if not lock.acquired:
logger_.debug(
"Meeting is being processed by another worker, skipping"
)
skipped_count += 1
continue
# Process the meeting
should_deactivate = False
end_date = meeting.end_date
if end_date.tzinfo is None:
end_date = end_date.replace(tzinfo=timezone.utc)
client = create_platform_client(meeting.platform)
has_active_sessions = False
has_had_sessions = False
if meeting.platform == "daily":
try:
presence = await client.get_room_presence(meeting.room_name)
has_active_sessions = presence.total_count > 0
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_had_sessions = bool(room_sessions)
logger_.info(
"Daily.co presence check",
has_active_sessions=has_active_sessions,
has_had_sessions=has_had_sessions,
presence_count=presence.total_count,
)
except Exception:
logger_.warning(
"Daily.co presence API failed, falling back to DB sessions",
exc_info=True,
)
room_sessions = await client.get_room_sessions(
meeting.room_name
)
has_active_sessions = bool(
room_sessions
and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
else:
room_sessions = await client.get_room_sessions(meeting.room_name)
has_active_sessions = bool(
room_sessions and any(s.ended_at is None for s in room_sessions)
)
has_had_sessions = bool(room_sessions)
logger_.info(
f"has_active_sessions={has_active_sessions}, has_had_sessions={has_had_sessions}"
)
if has_active_sessions:
logger_.debug("Meeting still has active sessions, keep it")
elif has_had_sessions:
should_deactivate = True
logger_.info("Meeting ended - all participants left")
elif current_time > end_date:
should_deactivate = True
logger_.info(
"Meeting deactivated - scheduled time ended with no participants",
)
else:
logger_.debug("Meeting not yet started, keep it")
if should_deactivate:
await meetings_controller.update_meeting(
meeting.id, is_active=False
)
logger_.info("Meeting deactivated in database")
if meeting.platform == "daily":
try:
await client.delete_room(meeting.room_name)
logger_.info(
"Daily.co room deleted", room_name=meeting.room_name
)
except Exception:
logger_.warning(
"Failed to delete Daily.co room",
room_name=meeting.room_name,
exc_info=True,
)
processed_count += 1
except Exception:
logger_.error("Error processing meeting", exc_info=True)
logger.debug(
"Processed meetings finished",
processed_count=processed_count,
skipped_count=skipped_count,
)
async def convert_audio_and_waveform(transcript) -> None:
"""Convert WebM to MP3 and generate waveform for Daily.co recordings.
This bypasses the full file pipeline which would overwrite stub data.
"""
try:
logger.info(
"Converting audio to MP3 and generating waveform",
transcript_id=transcript.id,
)
upload_path = transcript.data_path / "upload.webm"
mp3_path = transcript.audio_mp3_filename
# Convert WebM to MP3
mp3_writer = AudioFileWriterProcessor(path=mp3_path)
container = av.open(str(upload_path))
for frame in container.decode(audio=0):
await mp3_writer.push(frame)
await mp3_writer.flush()
container.close()
logger.info(
"Converted WebM to MP3",
transcript_id=transcript.id,
mp3_size=mp3_path.stat().st_size,
)
waveform_processor = AudioWaveformProcessor(
audio_path=mp3_path,
waveform_path=transcript.audio_waveform_filename,
)
waveform_processor.set_pipeline(EmptyPipeline(logger))
await waveform_processor.flush()
logger.info(
"Generated waveform",
transcript_id=transcript.id,
waveform_path=transcript.audio_waveform_filename,
)
# Update transcript status to ended (successful)
await transcripts_controller.update(transcript, {"status": "ended"})
except Exception as e:
logger.error(
"Failed to convert audio or generate waveform",
transcript_id=transcript.id,
error=str(e),
)
# Keep status as uploaded even if conversion fails
pass
@shared_task
@asynctask
async def reprocess_failed_recordings():
"""
Find recordings in Whereby S3 bucket and check if they have proper transcriptions.
If not, requeue them for processing.
Note: Daily.co multitrack recordings are handled by reprocess_failed_daily_recordings.
"""
logger.info("Checking Whereby recordings that need processing or reprocessing")
if not settings.WHEREBY_STORAGE_AWS_BUCKET_NAME:
raise ValueError(
"WHEREBY_STORAGE_AWS_BUCKET_NAME required for Whereby recording reprocessing. "
"Set WHEREBY_STORAGE_AWS_BUCKET_NAME environment variable."
)
storage = get_transcripts_storage()
bucket_name = settings.WHEREBY_STORAGE_AWS_BUCKET_NAME
reprocessed_count = 0
try:
object_keys = await storage.list_objects(prefix="", bucket=bucket_name)
for object_key in object_keys:
if not object_key.endswith(".mp4"):
continue
recording = await recordings_controller.get_by_object_key(
bucket_name, object_key
)
if not recording:
logger.info(f"Queueing recording for processing: {object_key}")
process_recording.delay(bucket_name, object_key)
reprocessed_count += 1
continue
transcript = None
try:
transcript = await transcripts_controller.get_by_recording_id(
recording.id
)
except ValidationError:
await transcripts_controller.remove_by_recording_id(recording.id)
logger.warning(
f"Removed invalid transcript for recording: {recording.id}"
)
if transcript is None or transcript.status == "error":
logger.info(f"Queueing recording for processing: {object_key}")
process_recording.delay(bucket_name, object_key)
reprocessed_count += 1
except Exception as e:
logger.error(f"Error checking S3 bucket: {str(e)}")
logger.info(f"Reprocessing complete. Requeued {reprocessed_count} recordings")
return reprocessed_count
@shared_task
@asynctask
async def reprocess_failed_daily_recordings():
"""
Find Daily.co multitrack recordings in the database and check if they have proper transcriptions.
If not, requeue them for processing.
"""
logger.info(
"Checking Daily.co multitrack recordings that need processing or reprocessing"
)
if not settings.DAILYCO_STORAGE_AWS_BUCKET_NAME:
logger.debug(
"DAILYCO_STORAGE_AWS_BUCKET_NAME not configured; skipping Daily recording reprocessing"
)
return 0
bucket_name = settings.DAILYCO_STORAGE_AWS_BUCKET_NAME
reprocessed_count = 0
try:
multitrack_recordings = (
await recordings_controller.get_multitrack_needing_reprocessing(bucket_name)
)
logger.info(
"Found multitrack recordings needing reprocessing",
count=len(multitrack_recordings),
bucket=bucket_name,
)
for recording in multitrack_recordings:
if not recording.meeting_id:
logger.debug(
"Skipping recording without meeting_id",
recording_id=recording.id,
)
continue
meeting = await meetings_controller.get_by_id(recording.meeting_id)
if not meeting:
logger.warning(
"Meeting not found for recording",
recording_id=recording.id,
meeting_id=recording.meeting_id,
)
continue
room = None
if meeting.room_id:
room = await rooms_controller.get_by_id(meeting.room_id)
transcript = None
try:
transcript = await transcripts_controller.get_by_recording_id(
recording.id
)
except ValidationError:
await transcripts_controller.remove_by_recording_id(recording.id)
logger.warning(
"Removed invalid transcript for recording",
recording_id=recording.id,
)
if not recording.track_keys:
logger.warning(
"Recording has no track_keys, cannot reprocess",
recording_id=recording.id,
)
continue
use_celery = room and room.use_celery
use_hatchet = not use_celery
if use_hatchet:
if not transcript:
logger.warning(
"No transcript for Hatchet reprocessing, skipping",
recording_id=recording.id,
)
continue
workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline",
input_data={
"recording_id": recording.id,
"tracks": [
{"s3_key": k}
for k in filter_cam_audio_tracks(recording.track_keys)
],
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
},
additional_metadata={
"transcript_id": transcript.id,
"recording_id": recording.id,
"reprocess": True,
},
)
await transcripts_controller.update(
transcript, {"workflow_run_id": workflow_id}
)
logger.info(
"Queued Daily recording for Hatchet reprocessing",
recording_id=recording.id,
workflow_id=workflow_id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
)
else:
logger.info(
"Queueing Daily recording for Celery reprocessing",
recording_id=recording.id,
room_name=meeting.room_name,
track_count=len(recording.track_keys),
transcript_status=transcript.status if transcript else None,
)
# For reprocessing, pass actual recording time (though it's ignored - see _process_multitrack_recording_inner)
# Reprocessing uses recording.meeting_id directly instead of time-based matching
recording_start_ts = int(recording.recorded_at.timestamp())
process_multitrack_recording.delay(
bucket_name=bucket_name,
daily_room_name=meeting.room_name,
recording_id=recording.id,
track_keys=recording.track_keys,
recording_start_ts=recording_start_ts,
)
reprocessed_count += 1
except Exception as e:
logger.error(
"Error checking Daily multitrack recordings",
error=str(e),
exc_info=True,
)
logger.info(
"Daily reprocessing complete",
requeued_count=reprocessed_count,
)
return reprocessed_count
@shared_task
@asynctask
async def trigger_daily_reconciliation() -> None:
"""Daily.co pull"""
try:
active_meetings = await meetings_controller.get_all_active(platform="daily")
queued_count = 0
for meeting in active_meetings:
try:
poll_daily_room_presence_task.delay(meeting.id)
queued_count += 1
except Exception as e:
logger.error(
"Failed to queue reconciliation poll",
meeting_id=meeting.id,
error=str(e),
exc_info=True,
)
raise
if queued_count > 0:
logger.debug(
"Reconciliation polls queued",
count=queued_count,
)
except Exception as e:
logger.error("Reconciliation trigger failed", error=str(e), exc_info=True)