mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 12:49:06 +00:00
Merge branch 'igor/dailico-2' of github-monadical:Monadical-SAS/reflector into igor/dailico-2
This commit is contained in:
@@ -57,7 +57,10 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
self.empty_pipeline = EmptyPipeline(logger=self.logger)
|
self.empty_pipeline = EmptyPipeline(logger=self.logger)
|
||||||
|
|
||||||
async def mixdown_tracks(
|
async def mixdown_tracks(
|
||||||
self, track_datas: list[bytes], writer: AudioFileWriterProcessor
|
self,
|
||||||
|
track_datas: list[bytes],
|
||||||
|
writer: AudioFileWriterProcessor,
|
||||||
|
offsets_seconds: list[float] | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
"""
|
"""
|
||||||
Minimal multi-track mixdown using a PyAV filter graph (amix), no resampling.
|
Minimal multi-track mixdown using a PyAV filter graph (amix), no resampling.
|
||||||
@@ -85,10 +88,22 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
self.logger.warning("Mixdown skipped - no decodable audio frames found")
|
self.logger.warning("Mixdown skipped - no decodable audio frames found")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Build PyAV filter graph: N abuffer (s32/stereo) -> amix (s32) -> aformat(s16) -> sink
|
# Build PyAV filter graph:
|
||||||
|
# N abuffer (s32/stereo)
|
||||||
|
# -> optional adelay per input (for alignment)
|
||||||
|
# -> amix (s32)
|
||||||
|
# -> aformat(s16)
|
||||||
|
# -> sink
|
||||||
graph = av.filter.Graph()
|
graph = av.filter.Graph()
|
||||||
inputs = []
|
inputs = []
|
||||||
for idx, data in enumerate([d for d in track_datas if d]):
|
valid_track_datas = [d for d in track_datas if d]
|
||||||
|
# Align offsets list with the filtered inputs (skip empties)
|
||||||
|
input_offsets_seconds = None
|
||||||
|
if offsets_seconds is not None:
|
||||||
|
input_offsets_seconds = [
|
||||||
|
offsets_seconds[i] for i, d in enumerate(track_datas) if d
|
||||||
|
]
|
||||||
|
for idx, data in enumerate(valid_track_datas):
|
||||||
args = (
|
args = (
|
||||||
f"time_base=1/{target_sample_rate}:"
|
f"time_base=1/{target_sample_rate}:"
|
||||||
f"sample_rate={target_sample_rate}:"
|
f"sample_rate={target_sample_rate}:"
|
||||||
@@ -114,7 +129,28 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
|
|
||||||
sink = graph.add("abuffersink", name="out")
|
sink = graph.add("abuffersink", name="out")
|
||||||
|
|
||||||
|
# Optional per-input delay before mixing
|
||||||
|
delays_ms: list[int] = []
|
||||||
|
if input_offsets_seconds is not None:
|
||||||
|
base = min(input_offsets_seconds) if input_offsets_seconds else 0.0
|
||||||
|
delays_ms = [
|
||||||
|
max(0, int(round((o - base) * 1000))) for o in input_offsets_seconds
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
delays_ms = [0 for _ in inputs]
|
||||||
|
|
||||||
for idx, in_ctx in enumerate(inputs):
|
for idx, in_ctx in enumerate(inputs):
|
||||||
|
delay_ms = delays_ms[idx] if idx < len(delays_ms) else 0
|
||||||
|
if delay_ms > 0:
|
||||||
|
# adelay requires one value per channel; use same for stereo
|
||||||
|
adelay = graph.add(
|
||||||
|
"adelay",
|
||||||
|
args=f"delays={delay_ms}|{delay_ms}:all=1",
|
||||||
|
name=f"delay{idx}",
|
||||||
|
)
|
||||||
|
in_ctx.link_to(adelay)
|
||||||
|
adelay.link_to(mixer, 0, idx)
|
||||||
|
else:
|
||||||
in_ctx.link_to(mixer, 0, idx)
|
in_ctx.link_to(mixer, 0, idx)
|
||||||
mixer.link_to(fmt)
|
mixer.link_to(fmt)
|
||||||
fmt.link_to(sink)
|
fmt.link_to(sink)
|
||||||
@@ -122,7 +158,7 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
|
|
||||||
# Open containers for decoding
|
# Open containers for decoding
|
||||||
containers = []
|
containers = []
|
||||||
for i, d in enumerate([d for d in track_datas if d]):
|
for i, d in enumerate(valid_track_datas):
|
||||||
try:
|
try:
|
||||||
c = av.open(io.BytesIO(d))
|
c = av.open(io.BytesIO(d))
|
||||||
containers.append(c)
|
containers.append(c)
|
||||||
@@ -192,38 +228,7 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
async with self.lock_transaction():
|
async with self.lock_transaction():
|
||||||
return await transcripts_controller.set_status(transcript_id, status)
|
return await transcripts_controller.set_status(transcript_id, status)
|
||||||
|
|
||||||
async def _list_immediate_keys(
|
async def process(self, bucket_name: str, track_keys: list[str]):
|
||||||
self, s3, bucket_name: str, prefix: str
|
|
||||||
) -> list[str]:
|
|
||||||
paginator = s3.get_paginator("list_objects_v2")
|
|
||||||
raw_prefix = prefix.rstrip("/")
|
|
||||||
prefixes = [raw_prefix, raw_prefix + "/"]
|
|
||||||
|
|
||||||
keys: set[str] = set()
|
|
||||||
for pref in prefixes:
|
|
||||||
for page in paginator.paginate(Bucket=bucket_name, Prefix=pref):
|
|
||||||
for obj in page.get("Contents", []):
|
|
||||||
key = obj["Key"]
|
|
||||||
if not key.startswith(pref):
|
|
||||||
continue
|
|
||||||
if pref.endswith("/"):
|
|
||||||
rel = key[len(pref) :]
|
|
||||||
if not rel or rel.endswith("/") or "/" in rel:
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
if key != pref:
|
|
||||||
continue
|
|
||||||
keys.add(key)
|
|
||||||
result = sorted(keys)
|
|
||||||
self.logger.info(
|
|
||||||
"S3 list immediate files",
|
|
||||||
prefixes=prefixes,
|
|
||||||
total_keys=len(result),
|
|
||||||
sample=result[:5],
|
|
||||||
)
|
|
||||||
return result
|
|
||||||
|
|
||||||
async def process(self, bucket_name: str, prefix: str):
|
|
||||||
transcript = await self.get_transcript()
|
transcript = await self.get_transcript()
|
||||||
|
|
||||||
s3 = boto3.client(
|
s3 = boto3.client(
|
||||||
@@ -233,15 +238,11 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
aws_secret_access_key=settings.RECORDING_STORAGE_AWS_SECRET_ACCESS_KEY,
|
aws_secret_access_key=settings.RECORDING_STORAGE_AWS_SECRET_ACCESS_KEY,
|
||||||
)
|
)
|
||||||
|
|
||||||
keys = await self._list_immediate_keys(s3, bucket_name, prefix)
|
|
||||||
if not keys:
|
|
||||||
raise Exception("No audio tracks found under prefix")
|
|
||||||
|
|
||||||
storage = get_transcripts_storage()
|
storage = get_transcripts_storage()
|
||||||
|
|
||||||
# Pre-download bytes for all tracks for mixing and transcription
|
# Pre-download bytes for all tracks for mixing and transcription
|
||||||
track_datas: list[bytes] = []
|
track_datas: list[bytes] = []
|
||||||
for key in keys:
|
for key in track_keys:
|
||||||
try:
|
try:
|
||||||
obj = s3.get_object(Bucket=bucket_name, Key=key)
|
obj = s3.get_object(Bucket=bucket_name, Key=key)
|
||||||
track_datas.append(obj["Body"].read())
|
track_datas.append(obj["Body"].read())
|
||||||
@@ -251,18 +252,36 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
)
|
)
|
||||||
track_datas.append(b"")
|
track_datas.append(b"")
|
||||||
|
|
||||||
|
# Estimate offsets from first frame PTS, aligned to track_keys
|
||||||
|
offsets_seconds: list[float] = []
|
||||||
|
for data, key in zip(track_datas, track_keys):
|
||||||
|
off_s = 0.0
|
||||||
|
if data:
|
||||||
|
try:
|
||||||
|
c = av.open(io.BytesIO(data))
|
||||||
|
try:
|
||||||
|
for frame in c.decode(audio=0):
|
||||||
|
if frame.pts is not None and frame.time_base:
|
||||||
|
off_s = float(frame.pts * frame.time_base)
|
||||||
|
break
|
||||||
|
finally:
|
||||||
|
c.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
offsets_seconds.append(max(0.0, float(off_s)))
|
||||||
|
|
||||||
# Mixdown all available tracks into transcript.audio_mp3_filename, preserving sample rate
|
# Mixdown all available tracks into transcript.audio_mp3_filename, preserving sample rate
|
||||||
try:
|
try:
|
||||||
mp3_writer = AudioFileWriterProcessor(
|
mp3_writer = AudioFileWriterProcessor(
|
||||||
path=str(transcript.audio_mp3_filename)
|
path=str(transcript.audio_mp3_filename)
|
||||||
)
|
)
|
||||||
await self.mixdown_tracks(track_datas, mp3_writer)
|
await self.mixdown_tracks(track_datas, mp3_writer, offsets_seconds)
|
||||||
await mp3_writer.flush()
|
await mp3_writer.flush()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.logger.error("Mixdown failed", error=str(e))
|
self.logger.error("Mixdown failed", error=str(e))
|
||||||
|
|
||||||
speaker_transcripts: list[TranscriptType] = []
|
speaker_transcripts: list[TranscriptType] = []
|
||||||
for idx, key in enumerate(keys):
|
for idx, key in enumerate(track_keys):
|
||||||
ext = ".mp4"
|
ext = ".mp4"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -322,7 +341,16 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
|
|
||||||
if not t.words:
|
if not t.words:
|
||||||
continue
|
continue
|
||||||
|
# Shift word timestamps by the track's offset so all are relative to 00:00
|
||||||
|
track_offset = offsets_seconds[idx] if idx < len(offsets_seconds) else 0.0
|
||||||
for w in t.words:
|
for w in t.words:
|
||||||
|
try:
|
||||||
|
if hasattr(w, "start") and w.start is not None:
|
||||||
|
w.start = float(w.start) + track_offset
|
||||||
|
if hasattr(w, "end") and w.end is not None:
|
||||||
|
w.end = float(w.end) + track_offset
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
w.speaker = idx
|
w.speaker = idx
|
||||||
speaker_transcripts.append(t)
|
speaker_transcripts.append(t)
|
||||||
|
|
||||||
@@ -433,12 +461,12 @@ class PipelineMainMultitrack(PipelineMainBase):
|
|||||||
@shared_task
|
@shared_task
|
||||||
@asynctask
|
@asynctask
|
||||||
async def task_pipeline_multitrack_process(
|
async def task_pipeline_multitrack_process(
|
||||||
*, transcript_id: str, bucket_name: str, prefix: str
|
*, transcript_id: str, bucket_name: str, track_keys: list[str]
|
||||||
):
|
):
|
||||||
pipeline = PipelineMainMultitrack(transcript_id=transcript_id)
|
pipeline = PipelineMainMultitrack(transcript_id=transcript_id)
|
||||||
try:
|
try:
|
||||||
await pipeline.set_status(transcript_id, "processing")
|
await pipeline.set_status(transcript_id, "processing")
|
||||||
await pipeline.process(bucket_name, prefix)
|
await pipeline.process(bucket_name, track_keys)
|
||||||
except Exception:
|
except Exception:
|
||||||
await pipeline.set_status(transcript_id, "error")
|
await pipeline.set_status(transcript_id, "error")
|
||||||
raise
|
raise
|
||||||
|
|||||||
@@ -8,7 +8,9 @@ from pydantic import BaseModel
|
|||||||
|
|
||||||
from reflector.db.meetings import meetings_controller
|
from reflector.db.meetings import meetings_controller
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
|
from reflector.settings import settings
|
||||||
from reflector.video_platforms.factory import create_platform_client
|
from reflector.video_platforms.factory import create_platform_client
|
||||||
|
from reflector.worker.process import process_multitrack_recording
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
@@ -191,30 +193,28 @@ async def _handle_recording_ready(event: DailyWebhookEvent):
|
|||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
meeting = await meetings_controller.get_by_room_name(room_name)
|
|
||||||
if not meeting:
|
|
||||||
logger.warning(
|
|
||||||
"recording.ready-to-download: meeting not found", room_name=room_name
|
|
||||||
)
|
|
||||||
return
|
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Recording ready for download",
|
"Recording ready for download",
|
||||||
meeting_id=meeting.id,
|
|
||||||
room_name=room_name,
|
room_name=room_name,
|
||||||
recording_id=recording_id,
|
recording_id=recording_id,
|
||||||
num_tracks=len(tracks),
|
num_tracks=len(tracks),
|
||||||
platform="daily",
|
platform="daily",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Import at runtime to avoid circular dependency (process.py imports from daily.py)
|
bucket_name = settings.AWS_DAILY_S3_BUCKET
|
||||||
from reflector.worker.process import process_daily_recording # noqa: PLC0415
|
if not bucket_name:
|
||||||
|
logger.error(
|
||||||
|
"AWS_DAILY_S3_BUCKET not configured; cannot process Daily recording"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
# Convert Pydantic models to dicts for Celery serialization
|
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
|
||||||
process_daily_recording.delay(
|
|
||||||
meeting_id=meeting.id,
|
process_multitrack_recording.delay(
|
||||||
recording_id=recording_id or event.id,
|
bucket_name=bucket_name,
|
||||||
tracks=[t.model_dump() for t in tracks],
|
room_name=room_name,
|
||||||
|
recording_id=recording_id,
|
||||||
|
track_keys=track_keys,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -153,65 +153,63 @@ async def process_recording(bucket_name: str, object_key: str):
|
|||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
@asynctask
|
@asynctask
|
||||||
async def process_multitrack_recording(bucket_name: str, prefix: str):
|
async def process_multitrack_recording(
|
||||||
|
bucket_name: str,
|
||||||
|
room_name: str,
|
||||||
|
recording_id: str,
|
||||||
|
track_keys: list[str],
|
||||||
|
):
|
||||||
logger.info(
|
logger.info(
|
||||||
"Processing multitrack recording",
|
"Processing multitrack recording",
|
||||||
bucket=bucket_name,
|
bucket=bucket_name,
|
||||||
prefix=prefix,
|
room_name=room_name,
|
||||||
room_name="daily",
|
recording_id=recording_id,
|
||||||
|
provided_keys=len(track_keys),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if not track_keys:
|
||||||
|
logger.warning("No audio track keys provided")
|
||||||
|
return
|
||||||
|
|
||||||
|
recorded_at = datetime.now(timezone.utc)
|
||||||
try:
|
try:
|
||||||
effective_room_name = "/daily"
|
if track_keys:
|
||||||
dir_name = prefix.rstrip("/").split("/")[-1]
|
folder = os.path.basename(os.path.dirname(track_keys[0]))
|
||||||
ts_match = re.search(r"(\d{14})$", dir_name)
|
ts_match = re.search(r"(\d{14})$", folder)
|
||||||
if ts_match:
|
if ts_match:
|
||||||
ts = ts_match.group(1)
|
ts = ts_match.group(1)
|
||||||
recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(
|
recorded_at = datetime.strptime(ts, "%Y%m%d%H%M%S").replace(
|
||||||
tzinfo=timezone.utc
|
tzinfo=timezone.utc
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
try:
|
|
||||||
recorded_at = parse_datetime_with_timezone(dir_name)
|
|
||||||
except Exception:
|
except Exception:
|
||||||
recorded_at = datetime.now(timezone.utc)
|
logger.warning("Could not parse recorded_at from keys, using now()")
|
||||||
except Exception:
|
|
||||||
logger.warning("Could not parse recorded_at from prefix, using now()")
|
|
||||||
effective_room_name = "/daily"
|
|
||||||
recorded_at = datetime.now(timezone.utc)
|
|
||||||
|
|
||||||
meeting = await meetings_controller.get_by_room_name(effective_room_name)
|
room_name = room_name.split("-", 1)[0]
|
||||||
if meeting:
|
room = await rooms_controller.get_by_name(room_name)
|
||||||
room = await rooms_controller.get_by_id(meeting.room_id)
|
|
||||||
else:
|
|
||||||
room = await rooms_controller.get_by_name(effective_room_name.lstrip("/"))
|
|
||||||
if not room:
|
if not room:
|
||||||
raise Exception(f"Room not found: {effective_room_name}")
|
raise Exception(f"Room not found: {room_name}")
|
||||||
start_date = recorded_at
|
|
||||||
end_date = recorded_at
|
|
||||||
try:
|
|
||||||
dummy = await meetings_controller.create(
|
|
||||||
id=room.id + "-" + recorded_at.strftime("%Y%m%d%H%M%S"),
|
|
||||||
room_name=effective_room_name,
|
|
||||||
room_url=f"{effective_room_name}",
|
|
||||||
host_room_url=f"{effective_room_name}",
|
|
||||||
start_date=start_date,
|
|
||||||
end_date=end_date,
|
|
||||||
room=room,
|
|
||||||
)
|
|
||||||
meeting = dummy
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Failed to create dummy meeting", error=str(e))
|
|
||||||
meeting = None
|
|
||||||
|
|
||||||
recording = await recordings_controller.get_by_object_key(bucket_name, prefix)
|
meeting = await meetings_controller.create(
|
||||||
|
id=recording_id,
|
||||||
|
room_name=room_name,
|
||||||
|
room_url=room.name,
|
||||||
|
host_room_url=room.name,
|
||||||
|
start_date=recorded_at,
|
||||||
|
end_date=recorded_at,
|
||||||
|
room=room,
|
||||||
|
platform=room.platform,
|
||||||
|
)
|
||||||
|
|
||||||
|
recording = await recordings_controller.get_by_id(recording_id)
|
||||||
if not recording:
|
if not recording:
|
||||||
|
object_key_dir = os.path.dirname(track_keys[0]) if track_keys else ""
|
||||||
recording = await recordings_controller.create(
|
recording = await recordings_controller.create(
|
||||||
Recording(
|
Recording(
|
||||||
|
id=recording_id,
|
||||||
bucket_name=bucket_name,
|
bucket_name=bucket_name,
|
||||||
object_key=prefix,
|
object_key=object_key_dir,
|
||||||
recorded_at=recorded_at,
|
recorded_at=recorded_at,
|
||||||
meeting_id=meeting.id if meeting else None,
|
meeting_id=meeting.id,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -232,55 +230,14 @@ async def process_multitrack_recording(bucket_name: str, prefix: str):
|
|||||||
user_id=room.user_id,
|
user_id=room.user_id,
|
||||||
recording_id=recording.id,
|
recording_id=recording.id,
|
||||||
share_mode="public",
|
share_mode="public",
|
||||||
meeting_id=meeting.id if meeting else None,
|
meeting_id=meeting.id,
|
||||||
room_id=room.id,
|
room_id=room.id,
|
||||||
)
|
)
|
||||||
|
|
||||||
s3 = boto3.client(
|
|
||||||
"s3",
|
|
||||||
region_name=settings.RECORDING_STORAGE_AWS_REGION,
|
|
||||||
aws_access_key_id=settings.RECORDING_STORAGE_AWS_ACCESS_KEY_ID,
|
|
||||||
aws_secret_access_key=settings.RECORDING_STORAGE_AWS_SECRET_ACCESS_KEY,
|
|
||||||
)
|
|
||||||
|
|
||||||
paginator = s3.get_paginator("list_objects_v2")
|
|
||||||
raw_prefix = prefix.rstrip("/")
|
|
||||||
prefixes = [raw_prefix, raw_prefix + "/"]
|
|
||||||
|
|
||||||
all_keys_set: set[str] = set()
|
|
||||||
for pref in prefixes:
|
|
||||||
for page in paginator.paginate(Bucket=bucket_name, Prefix=pref):
|
|
||||||
contents = page.get("Contents", [])
|
|
||||||
for obj in contents:
|
|
||||||
key = obj["Key"]
|
|
||||||
if not key.startswith(pref):
|
|
||||||
continue
|
|
||||||
if pref.endswith("/"):
|
|
||||||
rel = key[len(pref) :]
|
|
||||||
if not rel or rel.endswith("/") or "/" in rel:
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
if key == pref:
|
|
||||||
all_keys_set.add(key)
|
|
||||||
continue
|
|
||||||
all_keys_set.add(key)
|
|
||||||
|
|
||||||
all_keys = sorted(all_keys_set)
|
|
||||||
logger.info(
|
|
||||||
"S3 list immediate files",
|
|
||||||
prefixes=prefixes,
|
|
||||||
total_keys=len(all_keys),
|
|
||||||
sample=all_keys[:5],
|
|
||||||
)
|
|
||||||
|
|
||||||
track_keys: list[str] = all_keys[:]
|
|
||||||
|
|
||||||
if not track_keys:
|
|
||||||
logger.info("No objects found under prefix", prefixes=prefixes)
|
|
||||||
raise Exception("No audio tracks found under prefix")
|
|
||||||
|
|
||||||
task_pipeline_multitrack_process.delay(
|
task_pipeline_multitrack_process.delay(
|
||||||
transcript_id=transcript.id, bucket_name=bucket_name, prefix=prefix
|
transcript_id=transcript.id,
|
||||||
|
bucket_name=bucket_name,
|
||||||
|
track_keys=track_keys,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user