mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 12:19:06 +00:00
fix: daily.co initial support works
This commit is contained in:
@@ -106,12 +106,13 @@ class Settings(BaseSettings):
|
||||
DAILY_WEBHOOK_SECRET: str | None = None
|
||||
DAILY_SUBDOMAIN: str | None = None
|
||||
AWS_DAILY_S3_BUCKET: str | None = None
|
||||
AWS_DAILY_S3_REGION: str = "us-west-2"
|
||||
AWS_DAILY_ROLE_ARN: str | None = None
|
||||
|
||||
# Video platform migration feature flags
|
||||
DAILY_MIGRATION_ENABLED: bool = False
|
||||
DAILY_MIGRATION_ENABLED: bool = True
|
||||
DAILY_MIGRATION_ROOM_IDS: list[str] = []
|
||||
DEFAULT_VIDEO_PLATFORM: str = "whereby"
|
||||
DEFAULT_VIDEO_PLATFORM: str = "daily"
|
||||
|
||||
# Zulip integration
|
||||
ZULIP_REALM: str | None = None
|
||||
|
||||
@@ -28,7 +28,6 @@ class DailyClient(VideoPlatformClient):
|
||||
self, room_name_prefix: str, end_date: datetime, room: Room
|
||||
) -> MeetingData:
|
||||
"""Create a Daily.co room."""
|
||||
# Generate unique room name
|
||||
room_name = f"{room_name_prefix}-{datetime.now().strftime('%Y%m%d%H%M%S')}"
|
||||
|
||||
data = {
|
||||
@@ -43,17 +42,16 @@ class DailyClient(VideoPlatformClient):
|
||||
"start_video_off": False,
|
||||
"start_audio_off": False,
|
||||
"exp": int(end_date.timestamp()),
|
||||
"enable_recording_ui": False, # We handle consent ourselves
|
||||
},
|
||||
}
|
||||
|
||||
# Configure S3 bucket for cloud recordings
|
||||
if room.recording_type == "cloud" and self.config.s3_bucket:
|
||||
data["properties"]["recording_bucket"] = {
|
||||
data["properties"]["recordings_bucket"] = {
|
||||
"bucket_name": self.config.s3_bucket,
|
||||
"bucket_region": self.config.s3_region,
|
||||
"assume_role_arn": self.config.aws_role_arn,
|
||||
"path": f"recordings/{room_name}",
|
||||
"allow_api_access": True,
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
@@ -68,13 +66,12 @@ class DailyClient(VideoPlatformClient):
|
||||
|
||||
# Format response to match our standard
|
||||
room_url = result["url"]
|
||||
host_room_url = f"{room_url}?t={result['config']['token']}"
|
||||
|
||||
return MeetingData(
|
||||
meeting_id=result["id"],
|
||||
room_name=result["name"],
|
||||
room_url=room_url,
|
||||
host_room_url=host_room_url,
|
||||
host_room_url=room_url,
|
||||
platform=self.PLATFORM_NAME,
|
||||
extra_data=result,
|
||||
)
|
||||
@@ -128,25 +125,3 @@ class DailyClient(VideoPlatformClient):
|
||||
return hmac.compare_digest(expected, signature)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
async def start_recording(self, room_name: str) -> Dict[str, Any]:
|
||||
"""Start recording for a room - Daily.co specific method."""
|
||||
data = {
|
||||
"layout": {
|
||||
"preset": "audio-only" # For transcription use case
|
||||
},
|
||||
"streaming_settings": {
|
||||
"width": 1280,
|
||||
"height": 720,
|
||||
},
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.post(
|
||||
f"{self.BASE_URL}/rooms/{room_name}/recordings",
|
||||
headers=self.headers,
|
||||
json=data,
|
||||
timeout=self.TIMEOUT,
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
@@ -25,9 +25,7 @@ def get_platform_config(platform: str) -> VideoPlatformConfig:
|
||||
webhook_secret=settings.DAILY_WEBHOOK_SECRET or "",
|
||||
subdomain=settings.DAILY_SUBDOMAIN,
|
||||
s3_bucket=settings.AWS_DAILY_S3_BUCKET,
|
||||
s3_region=settings.AWS_REGION
|
||||
if hasattr(settings, "AWS_REGION")
|
||||
else "us-west-2",
|
||||
s3_region=settings.AWS_DAILY_S3_REGION,
|
||||
aws_role_arn=settings.AWS_DAILY_ROLE_ARN,
|
||||
)
|
||||
else:
|
||||
|
||||
@@ -106,8 +106,9 @@ async def _handle_recording_started(event: DailyWebhookEvent):
|
||||
async def _handle_recording_ready(event: DailyWebhookEvent):
|
||||
"""Handle recording ready for download event."""
|
||||
room_name = event.data.get("room", {}).get("name")
|
||||
download_link = event.data.get("download_link")
|
||||
recording_id = event.data.get("recording_id")
|
||||
recording_data = event.data.get("recording", {})
|
||||
download_link = recording_data.get("download_url")
|
||||
recording_id = recording_data.get("id")
|
||||
|
||||
if not room_name or not download_link:
|
||||
return
|
||||
@@ -117,11 +118,13 @@ async def _handle_recording_ready(event: DailyWebhookEvent):
|
||||
# Queue recording processing task (same as Whereby)
|
||||
try:
|
||||
# Import here to avoid circular imports
|
||||
from reflector.worker.tasks import process_recording
|
||||
from reflector.worker.process import process_recording_from_url
|
||||
|
||||
process_recording.delay(
|
||||
meeting_id=meeting.id,
|
||||
# For Daily.co, we need to queue recording processing with URL
|
||||
# This will download from the URL and process similar to S3
|
||||
process_recording_from_url.delay(
|
||||
recording_url=download_link,
|
||||
meeting_id=meeting.id,
|
||||
recording_id=recording_id or event.id,
|
||||
)
|
||||
except ImportError:
|
||||
|
||||
@@ -5,6 +5,7 @@ from urllib.parse import unquote
|
||||
|
||||
import av
|
||||
import boto3
|
||||
import httpx
|
||||
import structlog
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
@@ -220,3 +221,98 @@ async def reprocess_failed_recordings():
|
||||
|
||||
logger.info(f"Reprocessing complete. Requeued {reprocessed_count} recordings")
|
||||
return reprocessed_count
|
||||
|
||||
|
||||
@shared_task
|
||||
@asynctask
|
||||
async def process_recording_from_url(
|
||||
recording_url: str, meeting_id: str, recording_id: str
|
||||
):
|
||||
"""Process recording from Direct URL (Daily.co webhook)."""
|
||||
logger.info("Processing recording from URL for meeting: %s", meeting_id)
|
||||
|
||||
meeting = await meetings_controller.get_by_id(meeting_id)
|
||||
if not meeting:
|
||||
logger.error("Meeting not found: %s", meeting_id)
|
||||
return
|
||||
|
||||
room = await rooms_controller.get_by_id(meeting.room_id)
|
||||
if not room:
|
||||
logger.error("Room not found for meeting: %s", meeting_id)
|
||||
return
|
||||
|
||||
# Create recording record with URL instead of S3 bucket/key
|
||||
recording = await recordings_controller.get_by_object_key(
|
||||
"daily-recordings", recording_id
|
||||
)
|
||||
if not recording:
|
||||
recording = await recordings_controller.create(
|
||||
Recording(
|
||||
bucket_name="daily-recordings", # Logical bucket name for Daily.co
|
||||
object_key=recording_id, # Store Daily.co recording ID
|
||||
recorded_at=datetime.utcnow(),
|
||||
meeting_id=meeting.id,
|
||||
)
|
||||
)
|
||||
|
||||
# Get or create transcript record
|
||||
transcript = await transcripts_controller.get_by_recording_id(recording.id)
|
||||
if transcript:
|
||||
await transcripts_controller.update(transcript, {"topics": []})
|
||||
else:
|
||||
transcript = await transcripts_controller.add(
|
||||
"",
|
||||
source_kind=SourceKind.ROOM,
|
||||
source_language="en",
|
||||
target_language="en",
|
||||
user_id=room.user_id,
|
||||
recording_id=recording.id,
|
||||
share_mode="public",
|
||||
meeting_id=meeting.id,
|
||||
room_id=room.id,
|
||||
)
|
||||
|
||||
# Download file from URL
|
||||
upload_filename = transcript.data_path / "upload.mp4"
|
||||
upload_filename.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
try:
|
||||
logger.info("Downloading recording from URL: %s", recording_url)
|
||||
async with httpx.AsyncClient(timeout=300.0) as client: # 5 minute timeout
|
||||
async with client.stream("GET", recording_url) as response:
|
||||
response.raise_for_status()
|
||||
|
||||
with open(upload_filename, "wb") as f:
|
||||
async for chunk in response.aiter_bytes(8192):
|
||||
f.write(chunk)
|
||||
|
||||
logger.info("Download completed: %s", upload_filename)
|
||||
except Exception as e:
|
||||
logger.error("Failed to download recording: %s", str(e))
|
||||
await transcripts_controller.update(transcript, {"status": "error"})
|
||||
if upload_filename.exists():
|
||||
upload_filename.unlink()
|
||||
raise
|
||||
|
||||
# Validate audio content (same as S3 version)
|
||||
try:
|
||||
container = av.open(upload_filename.as_posix())
|
||||
try:
|
||||
if not len(container.streams.audio):
|
||||
raise Exception("File has no audio stream")
|
||||
logger.info("Audio validation successful")
|
||||
finally:
|
||||
container.close()
|
||||
except Exception as e:
|
||||
logger.error("Audio validation failed: %s", str(e))
|
||||
await transcripts_controller.update(transcript, {"status": "error"})
|
||||
if upload_filename.exists():
|
||||
upload_filename.unlink()
|
||||
raise
|
||||
|
||||
# Mark as uploaded and trigger processing pipeline
|
||||
await transcripts_controller.update(transcript, {"status": "uploaded"})
|
||||
logger.info("Queuing transcript for processing pipeline: %s", transcript.id)
|
||||
|
||||
# Start the ML pipeline (same as S3 version)
|
||||
task_pipeline_process.delay(transcript_id=transcript.id)
|
||||
|
||||
@@ -202,7 +202,7 @@ class TestDailyWebhookIntegration:
|
||||
"reflector.db.meetings.meetings_controller.update_meeting"
|
||||
) as mock_update_url:
|
||||
with patch(
|
||||
"reflector.worker.tasks.process_recording.delay"
|
||||
"reflector.worker.process.process_recording_from_url.delay"
|
||||
) as mock_process:
|
||||
async with AsyncClient(
|
||||
app=app, base_url="http://test/v1"
|
||||
@@ -216,15 +216,13 @@ class TestDailyWebhookIntegration:
|
||||
assert response.status_code == 200
|
||||
assert response.json() == {"status": "ok"}
|
||||
|
||||
# Verify recording URL was updated
|
||||
mock_update_url.assert_called_once_with(
|
||||
mock_meeting.id,
|
||||
"https://s3.amazonaws.com/bucket/recording.mp4",
|
||||
# Verify processing was triggered with correct parameters
|
||||
mock_process.assert_called_once_with(
|
||||
recording_url="https://s3.amazonaws.com/bucket/recording.mp4",
|
||||
meeting_id=mock_meeting.id,
|
||||
recording_id="recording-789",
|
||||
)
|
||||
|
||||
# Verify processing was triggered
|
||||
mock_process.assert_called_once_with(mock_meeting.id)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_webhook_invalid_signature_rejected(self, webhook_secret):
|
||||
"""Test webhook with invalid signature is rejected."""
|
||||
|
||||
Reference in New Issue
Block a user