mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 20:29:06 +00:00
Use explicit track keys for processing
This commit is contained in:
@@ -192,38 +192,7 @@ class PipelineMainMultitrack(PipelineMainBase):
|
||||
async with self.lock_transaction():
|
||||
return await transcripts_controller.set_status(transcript_id, status)
|
||||
|
||||
async def _list_immediate_keys(
|
||||
self, s3, bucket_name: str, prefix: str
|
||||
) -> list[str]:
|
||||
paginator = s3.get_paginator("list_objects_v2")
|
||||
raw_prefix = prefix.rstrip("/")
|
||||
prefixes = [raw_prefix, raw_prefix + "/"]
|
||||
|
||||
keys: set[str] = set()
|
||||
for pref in prefixes:
|
||||
for page in paginator.paginate(Bucket=bucket_name, Prefix=pref):
|
||||
for obj in page.get("Contents", []):
|
||||
key = obj["Key"]
|
||||
if not key.startswith(pref):
|
||||
continue
|
||||
if pref.endswith("/"):
|
||||
rel = key[len(pref) :]
|
||||
if not rel or rel.endswith("/") or "/" in rel:
|
||||
continue
|
||||
else:
|
||||
if key != pref:
|
||||
continue
|
||||
keys.add(key)
|
||||
result = sorted(keys)
|
||||
self.logger.info(
|
||||
"S3 list immediate files",
|
||||
prefixes=prefixes,
|
||||
total_keys=len(result),
|
||||
sample=result[:5],
|
||||
)
|
||||
return result
|
||||
|
||||
async def process(self, bucket_name: str, prefix: str):
|
||||
async def process(self, bucket_name: str, track_keys: list[str]):
|
||||
transcript = await self.get_transcript()
|
||||
|
||||
s3 = boto3.client(
|
||||
@@ -233,15 +202,11 @@ class PipelineMainMultitrack(PipelineMainBase):
|
||||
aws_secret_access_key=settings.RECORDING_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||
)
|
||||
|
||||
keys = await self._list_immediate_keys(s3, bucket_name, prefix)
|
||||
if not keys:
|
||||
raise Exception("No audio tracks found under prefix")
|
||||
|
||||
storage = get_transcripts_storage()
|
||||
|
||||
# Pre-download bytes for all tracks for mixing and transcription
|
||||
track_datas: list[bytes] = []
|
||||
for key in keys:
|
||||
for key in track_keys:
|
||||
try:
|
||||
obj = s3.get_object(Bucket=bucket_name, Key=key)
|
||||
track_datas.append(obj["Body"].read())
|
||||
@@ -262,7 +227,7 @@ class PipelineMainMultitrack(PipelineMainBase):
|
||||
self.logger.error("Mixdown failed", error=str(e))
|
||||
|
||||
speaker_transcripts: list[TranscriptType] = []
|
||||
for idx, key in enumerate(keys):
|
||||
for idx, key in enumerate(track_keys):
|
||||
ext = ".mp4"
|
||||
|
||||
try:
|
||||
@@ -433,12 +398,12 @@ class PipelineMainMultitrack(PipelineMainBase):
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def task_pipeline_multitrack_process(
|
||||
*, transcript_id: str, bucket_name: str, prefix: str
|
||||
*, transcript_id: str, bucket_name: str, track_keys: list[str]
|
||||
):
|
||||
pipeline = PipelineMainMultitrack(transcript_id=transcript_id)
|
||||
try:
|
||||
await pipeline.set_status(transcript_id, "processing")
|
||||
await pipeline.process(bucket_name, prefix)
|
||||
await pipeline.process(bucket_name, track_keys)
|
||||
except Exception:
|
||||
await pipeline.set_status(transcript_id, "error")
|
||||
raise
|
||||
|
||||
@@ -193,16 +193,8 @@ async def _handle_recording_ready(event: DailyWebhookEvent):
|
||||
)
|
||||
return
|
||||
|
||||
meeting = await meetings_controller.get_by_room_name(room_name)
|
||||
if not meeting:
|
||||
logger.warning(
|
||||
"recording.ready-to-download: meeting not found", room_name=room_name
|
||||
)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Recording ready for download",
|
||||
meeting_id=meeting.id,
|
||||
room_name=room_name,
|
||||
recording_id=recording_id,
|
||||
num_tracks=len(tracks),
|
||||
@@ -216,15 +208,13 @@ async def _handle_recording_ready(event: DailyWebhookEvent):
|
||||
)
|
||||
return
|
||||
|
||||
if not settings.DAILY_SUBDOMAIN:
|
||||
logger.error(
|
||||
"DAILY_SUBDOMAIN not configured; cannot compute S3 prefix from room name"
|
||||
)
|
||||
return
|
||||
prefix = f"{settings.DAILY_SUBDOMAIN}/{room_name}"
|
||||
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
|
||||
|
||||
process_multitrack_recording.delay(
|
||||
bucket_name=bucket_name, prefix=prefix, meeting_id=meeting.id
|
||||
bucket_name=bucket_name,
|
||||
room_name=room_name,
|
||||
recording_id=recording_id,
|
||||
track_keys=track_keys,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -153,43 +153,61 @@ async def process_recording(bucket_name: str, object_key: str):
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def process_multitrack_recording(bucket_name: str, prefix: str, meeting_id: str):
|
||||
async def process_multitrack_recording(
|
||||
bucket_name: str,
|
||||
room_name: str,
|
||||
recording_id: str,
|
||||
track_keys: list[str],
|
||||
):
|
||||
logger.info(
|
||||
"Processing multitrack recording",
|
||||
bucket=bucket_name,
|
||||
prefix=prefix,
|
||||
meeting_id=meeting_id,
|
||||
room_name=room_name,
|
||||
recording_id=recording_id,
|
||||
provided_keys=len(track_keys),
|
||||
)
|
||||
|
||||
# Parse an approximate recorded_at from the prefix directory name
|
||||
if not track_keys:
|
||||
logger.warning("No audio track keys provided")
|
||||
return
|
||||
|
||||
recorded_at = datetime.now(timezone.utc)
|
||||
try:
|
||||
dir_name = prefix.rstrip("/").split("/")[-1]
|
||||
ts_match = re.search(r"(\d{14})$", dir_name)
|
||||
if track_keys:
|
||||
folder = os.path.basename(os.path.dirname(track_keys[0]))
|
||||
ts_match = re.search(r"(\d{14})$", folder)
|
||||
if ts_match:
|
||||
ts = ts_match.group(1)
|
||||
recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(
|
||||
tzinfo=timezone.utc
|
||||
)
|
||||
else:
|
||||
try:
|
||||
recorded_at = parse_datetime_with_timezone(dir_name)
|
||||
except Exception:
|
||||
recorded_at = datetime.now(timezone.utc)
|
||||
except Exception:
|
||||
logger.warning("Could not parse recorded_at from prefix, using now()")
|
||||
recorded_at = datetime.now(timezone.utc)
|
||||
logger.warning("Could not parse recorded_at from keys, using now()")
|
||||
|
||||
meeting = await meetings_controller.get_by_id(meeting_id)
|
||||
if not meeting:
|
||||
raise Exception(f"Meeting not found: {meeting_id}")
|
||||
room = await rooms_controller.get_by_id(meeting.room_id)
|
||||
room_name = room_name.split("-", 1)[0]
|
||||
room = await rooms_controller.get_by_name(room_name)
|
||||
if not room:
|
||||
raise Exception(f"Room not found: {room_name}")
|
||||
|
||||
recording = await recordings_controller.get_by_object_key(bucket_name, prefix)
|
||||
meeting = await meetings_controller.create(
|
||||
id=recording_id,
|
||||
room_name=room_name,
|
||||
room_url=room.name,
|
||||
host_room_url=room.name,
|
||||
start_date=recorded_at,
|
||||
end_date=recorded_at,
|
||||
room=room,
|
||||
platform=room.platform,
|
||||
)
|
||||
|
||||
recording = await recordings_controller.get_by_id(recording_id)
|
||||
if not recording:
|
||||
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
|
||||
recording = await recordings_controller.create(
|
||||
Recording(
|
||||
id=recording_id,
|
||||
bucket_name=bucket_name,
|
||||
object_key=prefix,
|
||||
object_key=object_key_dir,
|
||||
recorded_at=recorded_at,
|
||||
meeting_id=meeting.id,
|
||||
)
|
||||
@@ -216,51 +234,10 @@ async def process_multitrack_recording(bucket_name: str, prefix: str, meeting_id
|
||||
room_id=room.id,
|
||||
)
|
||||
|
||||
s3 = boto3.client(
|
||||
"s3",
|
||||
region_name=settings.RECORDING_STORAGE_AWS_REGION,
|
||||
aws_access_key_id=settings.RECORDING_STORAGE_AWS_ACCESS_KEY_ID,
|
||||
aws_secret_access_key=settings.RECORDING_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||
)
|
||||
|
||||
paginator = s3.get_paginator("list_objects_v2")
|
||||
raw_prefix = prefix.rstrip("/")
|
||||
prefixes = [raw_prefix, raw_prefix + "/"]
|
||||
|
||||
all_keys_set: set[str] = set()
|
||||
for pref in prefixes:
|
||||
for page in paginator.paginate(Bucket=bucket_name, Prefix=pref):
|
||||
contents = page.get("Contents", [])
|
||||
for obj in contents:
|
||||
key = obj["Key"]
|
||||
if not key.startswith(pref):
|
||||
continue
|
||||
if pref.endswith("/"):
|
||||
rel = key[len(pref) :]
|
||||
if not rel or rel.endswith("/") or "/" in rel:
|
||||
continue
|
||||
else:
|
||||
if key == pref:
|
||||
all_keys_set.add(key)
|
||||
continue
|
||||
all_keys_set.add(key)
|
||||
|
||||
all_keys = sorted(all_keys_set)
|
||||
logger.info(
|
||||
"S3 list immediate files",
|
||||
prefixes=prefixes,
|
||||
total_keys=len(all_keys),
|
||||
sample=all_keys[:5],
|
||||
)
|
||||
|
||||
track_keys: list[str] = all_keys[:]
|
||||
|
||||
if not track_keys:
|
||||
logger.info("No objects found under prefix", prefixes=prefixes)
|
||||
raise Exception("No audio tracks found under prefix")
|
||||
|
||||
task_pipeline_multitrack_process.delay(
|
||||
transcript_id=transcript.id, bucket_name=bucket_name, prefix=prefix
|
||||
transcript_id=transcript.id,
|
||||
bucket_name=bucket_name,
|
||||
track_keys=track_keys,
|
||||
)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user