diff --git a/server/reflector/settings.py b/server/reflector/settings.py index eac3f342..4a3a35ef 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -106,12 +106,13 @@ class Settings(BaseSettings): DAILY_WEBHOOK_SECRET: str | None = None DAILY_SUBDOMAIN: str | None = None AWS_DAILY_S3_BUCKET: str | None = None + AWS_DAILY_S3_REGION: str = "us-west-2" AWS_DAILY_ROLE_ARN: str | None = None # Video platform migration feature flags - DAILY_MIGRATION_ENABLED: bool = False + DAILY_MIGRATION_ENABLED: bool = True DAILY_MIGRATION_ROOM_IDS: list[str] = [] - DEFAULT_VIDEO_PLATFORM: str = "whereby" + DEFAULT_VIDEO_PLATFORM: str = "daily" # Zulip integration ZULIP_REALM: str | None = None diff --git a/server/reflector/video_platforms/daily.py b/server/reflector/video_platforms/daily.py index f8de2400..278cba41 100644 --- a/server/reflector/video_platforms/daily.py +++ b/server/reflector/video_platforms/daily.py @@ -28,7 +28,6 @@ class DailyClient(VideoPlatformClient): self, room_name_prefix: str, end_date: datetime, room: Room ) -> MeetingData: """Create a Daily.co room.""" - # Generate unique room name room_name = f"{room_name_prefix}-{datetime.now().strftime('%Y%m%d%H%M%S')}" data = { @@ -43,17 +42,16 @@ class DailyClient(VideoPlatformClient): "start_video_off": False, "start_audio_off": False, "exp": int(end_date.timestamp()), - "enable_recording_ui": False, # We handle consent ourselves }, } # Configure S3 bucket for cloud recordings if room.recording_type == "cloud" and self.config.s3_bucket: - data["properties"]["recording_bucket"] = { + data["properties"]["recordings_bucket"] = { "bucket_name": self.config.s3_bucket, "bucket_region": self.config.s3_region, "assume_role_arn": self.config.aws_role_arn, - "path": f"recordings/{room_name}", + "allow_api_access": True, } async with httpx.AsyncClient() as client: @@ -68,13 +66,12 @@ class DailyClient(VideoPlatformClient): # Format response to match our standard room_url = result["url"] - host_room_url = f"{room_url}?t={result['config']['token']}" return MeetingData( meeting_id=result["id"], room_name=result["name"], room_url=room_url, - host_room_url=host_room_url, + host_room_url=room_url, platform=self.PLATFORM_NAME, extra_data=result, ) @@ -128,25 +125,3 @@ class DailyClient(VideoPlatformClient): return hmac.compare_digest(expected, signature) except Exception: return False - - async def start_recording(self, room_name: str) -> Dict[str, Any]: - """Start recording for a room - Daily.co specific method.""" - data = { - "layout": { - "preset": "audio-only" # For transcription use case - }, - "streaming_settings": { - "width": 1280, - "height": 720, - }, - } - - async with httpx.AsyncClient() as client: - response = await client.post( - f"{self.BASE_URL}/rooms/{room_name}/recordings", - headers=self.headers, - json=data, - timeout=self.TIMEOUT, - ) - response.raise_for_status() - return response.json() diff --git a/server/reflector/video_platforms/factory.py b/server/reflector/video_platforms/factory.py index a8ab9f08..61e7f1e9 100644 --- a/server/reflector/video_platforms/factory.py +++ b/server/reflector/video_platforms/factory.py @@ -25,9 +25,7 @@ def get_platform_config(platform: str) -> VideoPlatformConfig: webhook_secret=settings.DAILY_WEBHOOK_SECRET or "", subdomain=settings.DAILY_SUBDOMAIN, s3_bucket=settings.AWS_DAILY_S3_BUCKET, - s3_region=settings.AWS_REGION - if hasattr(settings, "AWS_REGION") - else "us-west-2", + s3_region=settings.AWS_DAILY_S3_REGION, aws_role_arn=settings.AWS_DAILY_ROLE_ARN, ) else: diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index b4b9d081..91dfe1a6 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -106,8 +106,9 @@ async def _handle_recording_started(event: DailyWebhookEvent): async def _handle_recording_ready(event: DailyWebhookEvent): """Handle recording ready for download event.""" room_name = event.data.get("room", {}).get("name") - download_link = event.data.get("download_link") - recording_id = event.data.get("recording_id") + recording_data = event.data.get("recording", {}) + download_link = recording_data.get("download_url") + recording_id = recording_data.get("id") if not room_name or not download_link: return @@ -117,11 +118,13 @@ async def _handle_recording_ready(event: DailyWebhookEvent): # Queue recording processing task (same as Whereby) try: # Import here to avoid circular imports - from reflector.worker.tasks import process_recording + from reflector.worker.process import process_recording_from_url - process_recording.delay( - meeting_id=meeting.id, + # For Daily.co, we need to queue recording processing with URL + # This will download from the URL and process similar to S3 + process_recording_from_url.delay( recording_url=download_link, + meeting_id=meeting.id, recording_id=recording_id or event.id, ) except ImportError: diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 055b6755..a1d25b0c 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -5,6 +5,7 @@ from urllib.parse import unquote import av import boto3 +import httpx import structlog from celery import shared_task from celery.utils.log import get_task_logger @@ -220,3 +221,98 @@ async def reprocess_failed_recordings(): logger.info(f"Reprocessing complete. Requeued {reprocessed_count} recordings") return reprocessed_count + + +@shared_task +@asynctask +async def process_recording_from_url( + recording_url: str, meeting_id: str, recording_id: str +): + """Process recording from Direct URL (Daily.co webhook).""" + logger.info("Processing recording from URL for meeting: %s", meeting_id) + + meeting = await meetings_controller.get_by_id(meeting_id) + if not meeting: + logger.error("Meeting not found: %s", meeting_id) + return + + room = await rooms_controller.get_by_id(meeting.room_id) + if not room: + logger.error("Room not found for meeting: %s", meeting_id) + return + + # Create recording record with URL instead of S3 bucket/key + recording = await recordings_controller.get_by_object_key( + "daily-recordings", recording_id + ) + if not recording: + recording = await recordings_controller.create( + Recording( + bucket_name="daily-recordings", # Logical bucket name for Daily.co + object_key=recording_id, # Store Daily.co recording ID + recorded_at=datetime.utcnow(), + meeting_id=meeting.id, + ) + ) + + # Get or create transcript record + transcript = await transcripts_controller.get_by_recording_id(recording.id) + if transcript: + await transcripts_controller.update(transcript, {"topics": []}) + else: + transcript = await transcripts_controller.add( + "", + source_kind=SourceKind.ROOM, + source_language="en", + target_language="en", + user_id=room.user_id, + recording_id=recording.id, + share_mode="public", + meeting_id=meeting.id, + room_id=room.id, + ) + + # Download file from URL + upload_filename = transcript.data_path / "upload.mp4" + upload_filename.parent.mkdir(parents=True, exist_ok=True) + + try: + logger.info("Downloading recording from URL: %s", recording_url) + async with httpx.AsyncClient(timeout=300.0) as client: # 5 minute timeout + async with client.stream("GET", recording_url) as response: + response.raise_for_status() + + with open(upload_filename, "wb") as f: + async for chunk in response.aiter_bytes(8192): + f.write(chunk) + + logger.info("Download completed: %s", upload_filename) + except Exception as e: + logger.error("Failed to download recording: %s", str(e)) + await transcripts_controller.update(transcript, {"status": "error"}) + if upload_filename.exists(): + upload_filename.unlink() + raise + + # Validate audio content (same as S3 version) + try: + container = av.open(upload_filename.as_posix()) + try: + if not len(container.streams.audio): + raise Exception("File has no audio stream") + logger.info("Audio validation successful") + finally: + container.close() + except Exception as e: + logger.error("Audio validation failed: %s", str(e)) + await transcripts_controller.update(transcript, {"status": "error"}) + if upload_filename.exists(): + upload_filename.unlink() + raise + + # Mark as uploaded and trigger processing pipeline + await transcripts_controller.update(transcript, {"status": "uploaded"}) + logger.info("Queuing transcript for processing pipeline: %s", transcript.id) + + # Start the ML pipeline (same as S3 version) + task_pipeline_process.delay(transcript_id=transcript.id) diff --git a/server/tests/test_daily_webhook.py b/server/tests/test_daily_webhook.py index 94b722da..dee885aa 100644 --- a/server/tests/test_daily_webhook.py +++ b/server/tests/test_daily_webhook.py @@ -202,7 +202,7 @@ class TestDailyWebhookIntegration: "reflector.db.meetings.meetings_controller.update_meeting" ) as mock_update_url: with patch( - "reflector.worker.tasks.process_recording.delay" + "reflector.worker.process.process_recording_from_url.delay" ) as mock_process: async with AsyncClient( app=app, base_url="http://test/v1" @@ -216,15 +216,13 @@ class TestDailyWebhookIntegration: assert response.status_code == 200 assert response.json() == {"status": "ok"} - # Verify recording URL was updated - mock_update_url.assert_called_once_with( - mock_meeting.id, - "https://s3.amazonaws.com/bucket/recording.mp4", + # Verify processing was triggered with correct parameters + mock_process.assert_called_once_with( + recording_url="https://s3.amazonaws.com/bucket/recording.mp4", + meeting_id=mock_meeting.id, + recording_id="recording-789", ) - # Verify processing was triggered - mock_process.assert_called_once_with(mock_meeting.id) - @pytest.mark.asyncio async def test_webhook_invalid_signature_rejected(self, webhook_secret): """Test webhook with invalid signature is rejected."""