mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-22 21:29:05 +00:00
feat: daily.co support as alternative to whereby (#691)
* llm instructions * vibe dailyco * vibe dailyco * doc update (vibe) * dont show recording ui on call * stub processor (vibe) * stub processor (vibe) self-review * stub processor (vibe) self-review * chore(main): release 0.14.0 (#670) * Add multitrack pipeline * Mixdown audio tracks * Mixdown with pyav filter graph * Trigger multitrack processing for daily recordings * apply platform from envs in priority: non-dry * Use explicit track keys for processing * Align tracks of a multitrack recording * Generate waveforms for the mixed audio * Emit multriack pipeline events * Fix multitrack pipeline track alignment * dailico docs * Enable multitrack reprocessing * modal temp files uniform names, cleanup. remove llm temporary docs * docs cleanup * dont proceed with raw recordings if any of the downloads fail * dry transcription pipelines * remove is_miltitrack * comments * explicit dailyco room name * docs * remove stub data/method * frontend daily/whereby code self-review (no-mistake) * frontend daily/whereby code self-review (no-mistakes) * frontend daily/whereby code self-review (no-mistakes) * consent cleanup for multitrack (no-mistakes) * llm fun * remove extra comments * fix tests * merge migrations * Store participant names * Get participants by meeting session id * pop back main branch migration * s3 paddington (no-mistakes) * comment * pr comments * pr comments * pr comments * platform / meeting cleanup * Use participant names in summary generation * platform assignment to meeting at controller level * pr comment * room playform properly default none * room playform properly default none * restore migration lost * streaming WIP * extract storage / use common storage / proper env vars for storage * fix mocks tests * remove fall back * streaming for multifile * cenrtal storage abstraction (no-mistakes) * remove dead code / vars * Set participant user id for authenticated users * whereby recording name parsing fix * whereby recording name parsing fix * more file stream * storage dry + tests * remove homemade boto3 streaming and use proper boto * update migration guide * webhook creation script - print uuid --------- Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com> Co-authored-by: Mathieu Virbel <mat@meltingrocks.com> Co-authored-by: Sergey Mankovsky <sergey@monadical.com>
This commit is contained in:
@@ -19,7 +19,7 @@ from reflector.db.meetings import meetings
|
||||
from reflector.db.recordings import recordings
|
||||
from reflector.db.transcripts import transcripts, transcripts_controller
|
||||
from reflector.settings import settings
|
||||
from reflector.storage import get_recordings_storage
|
||||
from reflector.storage import get_transcripts_storage
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
@@ -53,8 +53,8 @@ async def delete_single_transcript(
|
||||
)
|
||||
if recording:
|
||||
try:
|
||||
await get_recordings_storage().delete_file(
|
||||
recording["object_key"]
|
||||
await get_transcripts_storage().delete_file(
|
||||
recording["object_key"], bucket=recording["bucket_name"]
|
||||
)
|
||||
except Exception as storage_error:
|
||||
logger.warning(
|
||||
|
||||
@@ -7,10 +7,10 @@ from celery.utils.log import get_task_logger
|
||||
from reflector.asynctask import asynctask
|
||||
from reflector.db.calendar_events import calendar_events_controller
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.rooms import Room, rooms_controller
|
||||
from reflector.redis_cache import RedisAsyncLock
|
||||
from reflector.services.ics_sync import SyncStatus, ics_sync_service
|
||||
from reflector.whereby import create_meeting, upload_logo
|
||||
from reflector.video_platforms.factory import create_platform_client, get_platform
|
||||
|
||||
logger = structlog.wrap_logger(get_task_logger(__name__))
|
||||
|
||||
@@ -86,17 +86,17 @@ def _should_sync(room) -> bool:
|
||||
MEETING_DEFAULT_DURATION = timedelta(hours=1)
|
||||
|
||||
|
||||
async def create_upcoming_meetings_for_event(event, create_window, room_id, room):
|
||||
async def create_upcoming_meetings_for_event(event, create_window, room: Room):
|
||||
if event.start_time <= create_window:
|
||||
return
|
||||
existing_meeting = await meetings_controller.get_by_calendar_event(event.id)
|
||||
existing_meeting = await meetings_controller.get_by_calendar_event(event.id, room)
|
||||
|
||||
if existing_meeting:
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Pre-creating meeting for calendar event",
|
||||
room_id=room_id,
|
||||
room_id=room.id,
|
||||
event_id=event.id,
|
||||
event_title=event.title,
|
||||
)
|
||||
@@ -104,20 +104,22 @@ async def create_upcoming_meetings_for_event(event, create_window, room_id, room
|
||||
try:
|
||||
end_date = event.end_time or (event.start_time + MEETING_DEFAULT_DURATION)
|
||||
|
||||
whereby_meeting = await create_meeting(
|
||||
client = create_platform_client(get_platform(room.platform))
|
||||
|
||||
meeting_data = await client.create_meeting(
|
||||
"",
|
||||
end_date=end_date,
|
||||
room=room,
|
||||
)
|
||||
await upload_logo(whereby_meeting["roomName"], "./images/logo.png")
|
||||
await client.upload_logo(meeting_data.room_name, "./images/logo.png")
|
||||
|
||||
meeting = await meetings_controller.create(
|
||||
id=whereby_meeting["meetingId"],
|
||||
room_name=whereby_meeting["roomName"],
|
||||
room_url=whereby_meeting["roomUrl"],
|
||||
host_room_url=whereby_meeting["hostRoomUrl"],
|
||||
start_date=datetime.fromisoformat(whereby_meeting["startDate"]),
|
||||
end_date=datetime.fromisoformat(whereby_meeting["endDate"]),
|
||||
id=meeting_data.meeting_id,
|
||||
room_name=meeting_data.room_name,
|
||||
room_url=meeting_data.room_url,
|
||||
host_room_url=meeting_data.host_room_url,
|
||||
start_date=event.start_time,
|
||||
end_date=end_date,
|
||||
room=room,
|
||||
calendar_event_id=event.id,
|
||||
calendar_metadata={
|
||||
@@ -136,7 +138,7 @@ async def create_upcoming_meetings_for_event(event, create_window, room_id, room
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Failed to pre-create meeting",
|
||||
room_id=room_id,
|
||||
room_id=room.id,
|
||||
event_id=event.id,
|
||||
error=str(e),
|
||||
)
|
||||
@@ -166,9 +168,7 @@ async def create_upcoming_meetings():
|
||||
)
|
||||
|
||||
for event in events:
|
||||
await create_upcoming_meetings_for_event(
|
||||
event, create_window, room.id, room
|
||||
)
|
||||
await create_upcoming_meetings_for_event(event, create_window, room)
|
||||
logger.info("Completed pre-creation check for upcoming meetings")
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
from datetime import datetime, timezone
|
||||
from urllib.parse import unquote
|
||||
|
||||
@@ -14,24 +15,32 @@ from redis.exceptions import LockError
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.recordings import Recording, recordings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.db.transcripts import SourceKind, transcripts_controller
|
||||
from reflector.db.transcripts import (
|
||||
SourceKind,
|
||||
TranscriptParticipant,
|
||||
transcripts_controller,
|
||||
)
|
||||
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
|
||||
from reflector.pipelines.main_live_pipeline import asynctask
|
||||
from reflector.pipelines.main_multitrack_pipeline import (
|
||||
task_pipeline_multitrack_process,
|
||||
)
|
||||
from reflector.pipelines.topic_processing import EmptyPipeline
|
||||
from reflector.processors import AudioFileWriterProcessor
|
||||
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
|
||||
from reflector.redis_cache import get_redis_client
|
||||
from reflector.settings import settings
|
||||
from reflector.whereby import get_room_sessions
|
||||
from reflector.storage import get_transcripts_storage
|
||||
from reflector.utils.daily import DailyRoomName, extract_base_room_name
|
||||
from reflector.video_platforms.factory import create_platform_client
|
||||
from reflector.video_platforms.whereby_utils import (
|
||||
parse_whereby_recording_filename,
|
||||
room_name_to_whereby_api_room_name,
|
||||
)
|
||||
|
||||
logger = structlog.wrap_logger(get_task_logger(__name__))
|
||||
|
||||
|
||||
def parse_datetime_with_timezone(iso_string: str) -> datetime:
|
||||
"""Parse ISO datetime string and ensure timezone awareness (defaults to UTC if naive)."""
|
||||
dt = datetime.fromisoformat(iso_string)
|
||||
if dt.tzinfo is None:
|
||||
dt = dt.replace(tzinfo=timezone.utc)
|
||||
return dt
|
||||
|
||||
|
||||
@shared_task
|
||||
def process_messages():
|
||||
queue_url = settings.AWS_PROCESS_RECORDING_QUEUE_URL
|
||||
@@ -73,14 +82,16 @@ def process_messages():
|
||||
logger.error("process_messages", error=str(e))
|
||||
|
||||
|
||||
# only whereby supported.
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def process_recording(bucket_name: str, object_key: str):
|
||||
logger.info("Processing recording: %s/%s", bucket_name, object_key)
|
||||
|
||||
# extract a guid and a datetime from the object key
|
||||
room_name = f"/{object_key[:36]}"
|
||||
recorded_at = parse_datetime_with_timezone(object_key[37:57])
|
||||
room_name_part, recorded_at = parse_whereby_recording_filename(object_key)
|
||||
|
||||
# we store whereby api room names, NOT whereby room names
|
||||
room_name = room_name_to_whereby_api_room_name(room_name_part)
|
||||
|
||||
meeting = await meetings_controller.get_by_room_name(room_name)
|
||||
room = await rooms_controller.get_by_id(meeting.room_id)
|
||||
@@ -102,6 +113,7 @@ async def process_recording(bucket_name: str, object_key: str):
|
||||
transcript,
|
||||
{
|
||||
"topics": [],
|
||||
"participants": [],
|
||||
},
|
||||
)
|
||||
else:
|
||||
@@ -121,15 +133,15 @@ async def process_recording(bucket_name: str, object_key: str):
|
||||
upload_filename = transcript.data_path / f"upload{extension}"
|
||||
upload_filename.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
s3 = boto3.client(
|
||||
"s3",
|
||||
region_name=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||
)
|
||||
storage = get_transcripts_storage()
|
||||
|
||||
with open(upload_filename, "wb") as f:
|
||||
s3.download_fileobj(bucket_name, object_key, f)
|
||||
try:
|
||||
with open(upload_filename, "wb") as f:
|
||||
await storage.stream_to_fileobj(object_key, f, bucket=bucket_name)
|
||||
except Exception:
|
||||
# Clean up partial file on stream failure
|
||||
upload_filename.unlink(missing_ok=True)
|
||||
raise
|
||||
|
||||
container = av.open(upload_filename.as_posix())
|
||||
try:
|
||||
@@ -146,6 +158,165 @@ async def process_recording(bucket_name: str, object_key: str):
|
||||
task_pipeline_file_process.delay(transcript_id=transcript.id)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def process_multitrack_recording(
|
||||
bucket_name: str,
|
||||
daily_room_name: DailyRoomName,
|
||||
recording_id: str,
|
||||
track_keys: list[str],
|
||||
):
|
||||
logger.info(
|
||||
"Processing multitrack recording",
|
||||
bucket=bucket_name,
|
||||
room_name=daily_room_name,
|
||||
recording_id=recording_id,
|
||||
provided_keys=len(track_keys),
|
||||
)
|
||||
|
||||
if not track_keys:
|
||||
logger.warning("No audio track keys provided")
|
||||
return
|
||||
|
||||
tz = timezone.utc
|
||||
recorded_at = datetime.now(tz)
|
||||
try:
|
||||
if track_keys:
|
||||
folder = os.path.basename(os.path.dirname(track_keys[0]))
|
||||
ts_match = re.search(r"(\d{14})$", folder)
|
||||
if ts_match:
|
||||
ts = ts_match.group(1)
|
||||
recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(tzinfo=tz)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Could not parse recorded_at from keys, using now() {recorded_at}",
|
||||
e,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
meeting = await meetings_controller.get_by_room_name(daily_room_name)
|
||||
|
||||
room_name_base = extract_base_room_name(daily_room_name)
|
||||
|
||||
room = await rooms_controller.get_by_name(room_name_base)
|
||||
if not room:
|
||||
raise Exception(f"Room not found: {room_name_base}")
|
||||
|
||||
if not meeting:
|
||||
raise Exception(f"Meeting not found: {room_name_base}")
|
||||
|
||||
logger.info(
|
||||
"Found existing Meeting for recording",
|
||||
meeting_id=meeting.id,
|
||||
room_name=daily_room_name,
|
||||
recording_id=recording_id,
|
||||
)
|
||||
|
||||
recording = await recordings_controller.get_by_id(recording_id)
|
||||
if not recording:
|
||||
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
|
||||
recording = await recordings_controller.create(
|
||||
Recording(
|
||||
id=recording_id,
|
||||
bucket_name=bucket_name,
|
||||
object_key=object_key_dir,
|
||||
recorded_at=recorded_at,
|
||||
meeting_id=meeting.id,
|
||||
track_keys=track_keys,
|
||||
)
|
||||
)
|
||||
else:
|
||||
# Recording already exists; assume metadata was set at creation time
|
||||
pass
|
||||
|
||||
transcript = await transcripts_controller.get_by_recording_id(recording.id)
|
||||
if transcript:
|
||||
await transcripts_controller.update(
|
||||
transcript,
|
||||
{
|
||||
"topics": [],
|
||||
"participants": [],
|
||||
},
|
||||
)
|
||||
else:
|
||||
transcript = await transcripts_controller.add(
|
||||
"",
|
||||
source_kind=SourceKind.ROOM,
|
||||
source_language="en",
|
||||
target_language="en",
|
||||
user_id=room.user_id,
|
||||
recording_id=recording.id,
|
||||
share_mode="public",
|
||||
meeting_id=meeting.id,
|
||||
room_id=room.id,
|
||||
)
|
||||
|
||||
try:
|
||||
daily_client = create_platform_client("daily")
|
||||
|
||||
id_to_name = {}
|
||||
id_to_user_id = {}
|
||||
|
||||
mtg_session_id = None
|
||||
try:
|
||||
rec_details = await daily_client.get_recording(recording_id)
|
||||
mtg_session_id = rec_details.get("mtgSessionId")
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to fetch Daily recording details",
|
||||
error=str(e),
|
||||
recording_id=recording_id,
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
if mtg_session_id:
|
||||
try:
|
||||
payload = await daily_client.get_meeting_participants(mtg_session_id)
|
||||
for p in payload.get("data", []):
|
||||
pid = p.get("participant_id")
|
||||
name = p.get("user_name")
|
||||
user_id = p.get("user_id")
|
||||
if pid and name:
|
||||
id_to_name[pid] = name
|
||||
if pid and user_id:
|
||||
id_to_user_id[pid] = user_id
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"Failed to fetch Daily meeting participants",
|
||||
error=str(e),
|
||||
mtg_session_id=mtg_session_id,
|
||||
exc_info=True,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"No mtgSessionId found for recording; participant names may be generic",
|
||||
recording_id=recording_id,
|
||||
)
|
||||
|
||||
for idx, key in enumerate(track_keys):
|
||||
base = os.path.basename(key)
|
||||
m = re.search(r"\d{13,}-([0-9a-fA-F-]{36})-cam-audio-", base)
|
||||
participant_id = m.group(1) if m else None
|
||||
|
||||
default_name = f"Speaker {idx}"
|
||||
name = id_to_name.get(participant_id, default_name)
|
||||
user_id = id_to_user_id.get(participant_id)
|
||||
|
||||
participant = TranscriptParticipant(
|
||||
id=participant_id, speaker=idx, name=name, user_id=user_id
|
||||
)
|
||||
await transcripts_controller.upsert_participant(transcript, participant)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Failed to map participant names", error=str(e), exc_info=True)
|
||||
|
||||
task_pipeline_multitrack_process.delay(
|
||||
transcript_id=transcript.id,
|
||||
bucket_name=bucket_name,
|
||||
track_keys=track_keys,
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def process_meetings():
|
||||
@@ -164,7 +335,7 @@ async def process_meetings():
|
||||
Uses distributed locking to prevent race conditions when multiple workers
|
||||
process the same meeting simultaneously.
|
||||
"""
|
||||
logger.info("Processing meetings")
|
||||
logger.debug("Processing meetings")
|
||||
meetings = await meetings_controller.get_all_active()
|
||||
current_time = datetime.now(timezone.utc)
|
||||
redis_client = get_redis_client()
|
||||
@@ -189,7 +360,8 @@ async def process_meetings():
|
||||
end_date = end_date.replace(tzinfo=timezone.utc)
|
||||
|
||||
# This API call could be slow, extend lock if needed
|
||||
response = await get_room_sessions(meeting.room_name)
|
||||
client = create_platform_client(meeting.platform)
|
||||
room_sessions = await client.get_room_sessions(meeting.room_name)
|
||||
|
||||
try:
|
||||
# Extend lock after slow operation to ensure we still hold it
|
||||
@@ -198,7 +370,6 @@ async def process_meetings():
|
||||
logger_.warning("Lost lock for meeting, skipping")
|
||||
continue
|
||||
|
||||
room_sessions = response.get("results", [])
|
||||
has_active_sessions = room_sessions and any(
|
||||
rs["endedAt"] is None for rs in room_sessions
|
||||
)
|
||||
@@ -231,69 +402,120 @@ async def process_meetings():
|
||||
except LockError:
|
||||
pass # Lock already released or expired
|
||||
|
||||
logger.info(
|
||||
logger.debug(
|
||||
"Processed meetings finished",
|
||||
processed_count=processed_count,
|
||||
skipped_count=skipped_count,
|
||||
)
|
||||
|
||||
|
||||
async def convert_audio_and_waveform(transcript) -> None:
|
||||
"""Convert WebM to MP3 and generate waveform for Daily.co recordings.
|
||||
|
||||
This bypasses the full file pipeline which would overwrite stub data.
|
||||
"""
|
||||
try:
|
||||
logger.info(
|
||||
"Converting audio to MP3 and generating waveform",
|
||||
transcript_id=transcript.id,
|
||||
)
|
||||
|
||||
upload_path = transcript.data_path / "upload.webm"
|
||||
mp3_path = transcript.audio_mp3_filename
|
||||
|
||||
# Convert WebM to MP3
|
||||
mp3_writer = AudioFileWriterProcessor(path=mp3_path)
|
||||
|
||||
container = av.open(str(upload_path))
|
||||
for frame in container.decode(audio=0):
|
||||
await mp3_writer.push(frame)
|
||||
await mp3_writer.flush()
|
||||
container.close()
|
||||
|
||||
logger.info(
|
||||
"Converted WebM to MP3",
|
||||
transcript_id=transcript.id,
|
||||
mp3_size=mp3_path.stat().st_size,
|
||||
)
|
||||
|
||||
waveform_processor = AudioWaveformProcessor(
|
||||
audio_path=mp3_path,
|
||||
waveform_path=transcript.audio_waveform_filename,
|
||||
)
|
||||
waveform_processor.set_pipeline(EmptyPipeline(logger))
|
||||
await waveform_processor.flush()
|
||||
|
||||
logger.info(
|
||||
"Generated waveform",
|
||||
transcript_id=transcript.id,
|
||||
waveform_path=transcript.audio_waveform_filename,
|
||||
)
|
||||
|
||||
# Update transcript status to ended (successful)
|
||||
await transcripts_controller.update(transcript, {"status": "ended"})
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Failed to convert audio or generate waveform",
|
||||
transcript_id=transcript.id,
|
||||
error=str(e),
|
||||
)
|
||||
# Keep status as uploaded even if conversion fails
|
||||
pass
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def reprocess_failed_recordings():
|
||||
"""
|
||||
Find recordings in the S3 bucket and check if they have proper transcriptions.
|
||||
Find recordings in Whereby S3 bucket and check if they have proper transcriptions.
|
||||
If not, requeue them for processing.
|
||||
"""
|
||||
logger.info("Checking for recordings that need processing or reprocessing")
|
||||
|
||||
s3 = boto3.client(
|
||||
"s3",
|
||||
region_name=settings.TRANSCRIPT_STORAGE_AWS_REGION,
|
||||
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||
)
|
||||
Note: Daily.co recordings are processed via webhooks, not this cron job.
|
||||
"""
|
||||
logger.info("Checking Whereby recordings that need processing or reprocessing")
|
||||
|
||||
if not settings.WHEREBY_STORAGE_AWS_BUCKET_NAME:
|
||||
raise ValueError(
|
||||
"WHEREBY_STORAGE_AWS_BUCKET_NAME required for Whereby recording reprocessing. "
|
||||
"Set WHEREBY_STORAGE_AWS_BUCKET_NAME environment variable."
|
||||
)
|
||||
|
||||
storage = get_transcripts_storage()
|
||||
bucket_name = settings.WHEREBY_STORAGE_AWS_BUCKET_NAME
|
||||
|
||||
reprocessed_count = 0
|
||||
try:
|
||||
paginator = s3.get_paginator("list_objects_v2")
|
||||
bucket_name = settings.RECORDING_STORAGE_AWS_BUCKET_NAME
|
||||
pages = paginator.paginate(Bucket=bucket_name)
|
||||
object_keys = await storage.list_objects(prefix="", bucket=bucket_name)
|
||||
|
||||
for page in pages:
|
||||
if "Contents" not in page:
|
||||
for object_key in object_keys:
|
||||
if not object_key.endswith(".mp4"):
|
||||
continue
|
||||
|
||||
for obj in page["Contents"]:
|
||||
object_key = obj["Key"]
|
||||
recording = await recordings_controller.get_by_object_key(
|
||||
bucket_name, object_key
|
||||
)
|
||||
if not recording:
|
||||
logger.info(f"Queueing recording for processing: {object_key}")
|
||||
process_recording.delay(bucket_name, object_key)
|
||||
reprocessed_count += 1
|
||||
continue
|
||||
|
||||
if not (object_key.endswith(".mp4")):
|
||||
continue
|
||||
|
||||
recording = await recordings_controller.get_by_object_key(
|
||||
bucket_name, object_key
|
||||
transcript = None
|
||||
try:
|
||||
transcript = await transcripts_controller.get_by_recording_id(
|
||||
recording.id
|
||||
)
|
||||
except ValidationError:
|
||||
await transcripts_controller.remove_by_recording_id(recording.id)
|
||||
logger.warning(
|
||||
f"Removed invalid transcript for recording: {recording.id}"
|
||||
)
|
||||
if not recording:
|
||||
logger.info(f"Queueing recording for processing: {object_key}")
|
||||
process_recording.delay(bucket_name, object_key)
|
||||
reprocessed_count += 1
|
||||
continue
|
||||
|
||||
transcript = None
|
||||
try:
|
||||
transcript = await transcripts_controller.get_by_recording_id(
|
||||
recording.id
|
||||
)
|
||||
except ValidationError:
|
||||
await transcripts_controller.remove_by_recording_id(recording.id)
|
||||
logger.warning(
|
||||
f"Removed invalid transcript for recording: {recording.id}"
|
||||
)
|
||||
|
||||
if transcript is None or transcript.status == "error":
|
||||
logger.info(f"Queueing recording for processing: {object_key}")
|
||||
process_recording.delay(bucket_name, object_key)
|
||||
reprocessed_count += 1
|
||||
if transcript is None or transcript.status == "error":
|
||||
logger.info(f"Queueing recording for processing: {object_key}")
|
||||
process_recording.delay(bucket_name, object_key)
|
||||
reprocessed_count += 1
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking S3 bucket: {str(e)}")
|
||||
|
||||
Reference in New Issue
Block a user