From ec4f356b4cbf163720f30b7ffdc6c9dd2ebd7282 Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Wed, 11 Feb 2026 16:59:21 +0100 Subject: [PATCH] fix: local env setup (#855) * Ensure rate limit * Increase nextjs compilation speed * Fix daily no content handling * Simplify daily webhook creation * Fix webhook request validation --- docker-compose.yml | 4 + server/reflector/dailyco_api/client.py | 2 + server/reflector/dailyco_api/webhook_utils.py | 2 +- server/reflector/dailyco_api/webhooks.py | 48 +++++++- server/reflector/hatchet/client.py | 25 +++++ server/reflector/hatchet/run_workers_llm.py | 11 ++ server/reflector/views/daily.py | 9 +- server/scripts/recreate_daily_webhook.py | 105 +++++------------- www/package.json | 2 +- 9 files changed, 123 insertions(+), 85 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index b695f82d..596b5aed 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -79,6 +79,7 @@ services: volumes: - ./www:/app/ - /app/node_modules + - next_cache:/app/.next env_file: - ./www/.env.local environment: @@ -132,6 +133,9 @@ services: retries: 5 start_period: 30s +volumes: + next_cache: + networks: default: attachable: true diff --git a/server/reflector/dailyco_api/client.py b/server/reflector/dailyco_api/client.py index 8634039f..bebcd81c 100644 --- a/server/reflector/dailyco_api/client.py +++ b/server/reflector/dailyco_api/client.py @@ -146,6 +146,8 @@ class DailyApiClient: ) raise DailyApiError(operation, response) + if not response.content: + return {} return response.json() # ============================================================================ diff --git a/server/reflector/dailyco_api/webhook_utils.py b/server/reflector/dailyco_api/webhook_utils.py index 27d5fb4e..52167d7c 100644 --- a/server/reflector/dailyco_api/webhook_utils.py +++ b/server/reflector/dailyco_api/webhook_utils.py @@ -99,7 +99,7 @@ def extract_room_name(event: DailyWebhookEvent) -> str | None: >>> event = DailyWebhookEvent(**webhook_payload) >>> room_name = extract_room_name(event) """ - room = event.payload.get("room_name") + room = event.payload.get("room_name") or event.payload.get("room") # Ensure we return a string, not any falsy value that might be in payload return room if isinstance(room, str) else None diff --git a/server/reflector/dailyco_api/webhooks.py b/server/reflector/dailyco_api/webhooks.py index e0ff1f5c..68ed5d3c 100644 --- a/server/reflector/dailyco_api/webhooks.py +++ b/server/reflector/dailyco_api/webhooks.py @@ -6,7 +6,7 @@ Reference: https://docs.daily.co/reference/rest-api/webhooks from typing import Annotated, Any, Dict, Literal, Union -from pydantic import BaseModel, Field, field_validator +from pydantic import AliasChoices, BaseModel, ConfigDict, Field, field_validator from reflector.utils.string import NonEmptyString @@ -41,6 +41,8 @@ class DailyTrack(BaseModel): Reference: https://docs.daily.co/reference/rest-api/recordings """ + model_config = ConfigDict(extra="ignore") + type: Literal["audio", "video"] s3Key: NonEmptyString = Field(description="S3 object key for the track file") size: int = Field(description="File size in bytes") @@ -54,6 +56,8 @@ class DailyWebhookEvent(BaseModel): Reference: https://docs.daily.co/reference/rest-api/webhooks """ + model_config = ConfigDict(extra="ignore") + version: NonEmptyString = Field( description="Represents the version of the event. This uses semantic versioning to inform a consumer if the payload has introduced any breaking changes" ) @@ -82,7 +86,13 @@ class ParticipantJoinedPayload(BaseModel): Reference: https://docs.daily.co/reference/rest-api/webhooks/events/participant-joined """ - room_name: NonEmptyString | None = Field(None, description="Daily.co room name") + model_config = ConfigDict(extra="ignore") + + room_name: NonEmptyString | None = Field( + None, + description="Daily.co room name", + validation_alias=AliasChoices("room_name", "room"), + ) session_id: NonEmptyString = Field(description="Daily.co session identifier") user_id: NonEmptyString = Field(description="User identifier (may be encoded)") user_name: NonEmptyString | None = Field(None, description="User display name") @@ -100,7 +110,13 @@ class ParticipantLeftPayload(BaseModel): Reference: https://docs.daily.co/reference/rest-api/webhooks/events/participant-left """ - room_name: NonEmptyString | None = Field(None, description="Daily.co room name") + model_config = ConfigDict(extra="ignore") + + room_name: NonEmptyString | None = Field( + None, + description="Daily.co room name", + validation_alias=AliasChoices("room_name", "room"), + ) session_id: NonEmptyString = Field(description="Daily.co session identifier") user_id: NonEmptyString = Field(description="User identifier (may be encoded)") user_name: NonEmptyString | None = Field(None, description="User display name") @@ -112,6 +128,9 @@ class ParticipantLeftPayload(BaseModel): _normalize_joined_at = field_validator("joined_at", mode="before")( normalize_timestamp_to_int ) + _normalize_duration = field_validator("duration", mode="before")( + normalize_timestamp_to_int + ) class RecordingStartedPayload(BaseModel): @@ -121,6 +140,8 @@ class RecordingStartedPayload(BaseModel): Reference: https://docs.daily.co/reference/rest-api/webhooks/events/recording-started """ + model_config = ConfigDict(extra="ignore") + room_name: NonEmptyString | None = Field(None, description="Daily.co room name") recording_id: NonEmptyString = Field(description="Recording identifier") start_ts: int | None = Field(None, description="Recording start timestamp") @@ -138,7 +159,9 @@ class RecordingReadyToDownloadPayload(BaseModel): Reference: https://docs.daily.co/reference/rest-api/webhooks/events/recording-ready-to-download """ - type: Literal["cloud", "raw-tracks"] = Field( + model_config = ConfigDict(extra="ignore") + + type: Literal["cloud", "cloud-audio-only", "raw-tracks"] = Field( description="The type of recording that was generated" ) recording_id: NonEmptyString = Field( @@ -153,8 +176,9 @@ class RecordingReadyToDownloadPayload(BaseModel): status: Literal["finished"] = Field( description="The status of the given recording (always 'finished' in ready-to-download webhook, see RecordingStatus in responses.py for full API statuses)" ) - max_participants: int = Field( - description="The number of participants on the call that were recorded" + max_participants: int | None = Field( + None, + description="The number of participants on the call that were recorded (optional; Daily may omit it in some webhook versions)", ) duration: int = Field(description="The duration in seconds of the call") s3_key: NonEmptyString = Field( @@ -180,6 +204,8 @@ class RecordingErrorPayload(BaseModel): Reference: https://docs.daily.co/reference/rest-api/webhooks/events/recording-error """ + model_config = ConfigDict(extra="ignore") + action: Literal["clourd-recording-err", "cloud-recording-error"] = Field( description="A string describing the event that was emitted (both variants are documented)" ) @@ -200,6 +226,8 @@ class RecordingErrorPayload(BaseModel): class ParticipantJoinedEvent(BaseModel): + model_config = ConfigDict(extra="ignore") + version: NonEmptyString type: Literal["participant.joined"] id: NonEmptyString @@ -212,6 +240,8 @@ class ParticipantJoinedEvent(BaseModel): class ParticipantLeftEvent(BaseModel): + model_config = ConfigDict(extra="ignore") + version: NonEmptyString type: Literal["participant.left"] id: NonEmptyString @@ -224,6 +254,8 @@ class ParticipantLeftEvent(BaseModel): class RecordingStartedEvent(BaseModel): + model_config = ConfigDict(extra="ignore") + version: NonEmptyString type: Literal["recording.started"] id: NonEmptyString @@ -236,6 +268,8 @@ class RecordingStartedEvent(BaseModel): class RecordingReadyEvent(BaseModel): + model_config = ConfigDict(extra="ignore") + version: NonEmptyString type: Literal["recording.ready-to-download"] id: NonEmptyString @@ -248,6 +282,8 @@ class RecordingReadyEvent(BaseModel): class RecordingErrorEvent(BaseModel): + model_config = ConfigDict(extra="ignore") + version: NonEmptyString type: Literal["recording.error"] id: NonEmptyString diff --git a/server/reflector/hatchet/client.py b/server/reflector/hatchet/client.py index d9ef9eab..c235f7c9 100644 --- a/server/reflector/hatchet/client.py +++ b/server/reflector/hatchet/client.py @@ -12,7 +12,9 @@ import threading from hatchet_sdk import ClientConfig, Hatchet from hatchet_sdk.clients.rest.models import V1TaskStatus +from hatchet_sdk.rate_limit import RateLimitDuration +from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND from reflector.logger import logger from reflector.settings import settings @@ -113,3 +115,26 @@ class HatchetClientManager: """Reset the client instance (for testing).""" with cls._lock: cls._instance = None + + @classmethod + async def ensure_rate_limit(cls) -> None: + """Ensure the LLM rate limit exists in Hatchet. + + Uses the Hatchet SDK rate_limits client (aio_put). See: + https://docs.hatchet.run/sdks/python/feature-clients/rate_limits + """ + logger.info( + "[Hatchet] Ensuring rate limit exists", + rate_limit_key=LLM_RATE_LIMIT_KEY, + limit=LLM_RATE_LIMIT_PER_SECOND, + ) + client = cls.get_client() + await client.rate_limits.aio_put( + key=LLM_RATE_LIMIT_KEY, + limit=LLM_RATE_LIMIT_PER_SECOND, + duration=RateLimitDuration.SECOND, + ) + logger.info( + "[Hatchet] Rate limit put successfully", + rate_limit_key=LLM_RATE_LIMIT_KEY, + ) diff --git a/server/reflector/hatchet/run_workers_llm.py b/server/reflector/hatchet/run_workers_llm.py index 3ab0529a..35734fbb 100644 --- a/server/reflector/hatchet/run_workers_llm.py +++ b/server/reflector/hatchet/run_workers_llm.py @@ -3,6 +3,8 @@ LLM/I/O worker pool for all non-CPU tasks. Handles: all tasks except mixdown_tracks (transcription, LLM inference, orchestration) """ +import asyncio + from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.workflows.daily_multitrack_pipeline import ( daily_multitrack_pipeline, @@ -20,6 +22,15 @@ POOL = "llm-io" def main(): hatchet = HatchetClientManager.get_client() + try: + asyncio.run(HatchetClientManager.ensure_rate_limit()) + except Exception as e: + logger.warning( + "[Hatchet] Rate limit initialization failed, but continuing. " + "If workflows fail to register, rate limits may need to be created manually.", + error=str(e), + ) + logger.info( "Starting Hatchet LLM worker pool (all tasks except mixdown)", worker_name=WORKER_NAME, diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index 384290da..84b5b203 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -80,7 +80,14 @@ async def webhook(request: Request): try: event = event_adapter.validate_python(body_json) except Exception as e: - logger.error("Failed to parse webhook event", error=str(e), body=body.decode()) + err_detail = str(e) + if hasattr(e, "errors"): + err_detail = f"{err_detail}; errors={e.errors()!r}" + logger.error( + "Failed to parse webhook event", + error=err_detail, + body=body.decode(), + ) raise HTTPException(status_code=422, detail="Invalid event format") match event: diff --git a/server/scripts/recreate_daily_webhook.py b/server/scripts/recreate_daily_webhook.py index e4ac9ce9..a263aa6a 100644 --- a/server/scripts/recreate_daily_webhook.py +++ b/server/scripts/recreate_daily_webhook.py @@ -15,8 +15,7 @@ from reflector.settings import settings async def setup_webhook(webhook_url: str): """ - Create or update Daily.co webhook for this environment using dailyco_api module. - Uses DAILY_WEBHOOK_UUID to identify existing webhook. + Create Daily.co webhook. Deletes any existing webhooks first, then creates the new one. """ if not settings.DAILY_API_KEY: print("Error: DAILY_API_KEY not set") @@ -35,79 +34,37 @@ async def setup_webhook(webhook_url: str): ] async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client: - webhook_uuid = settings.DAILY_WEBHOOK_UUID + webhooks = await client.list_webhooks() + for wh in webhooks: + await client.delete_webhook(wh.uuid) + print(f"Deleted webhook {wh.uuid}") - if webhook_uuid: - print(f"Updating existing webhook {webhook_uuid}...") - try: - # Note: Daily.co doesn't support PATCH well, so we delete + recreate - await client.delete_webhook(webhook_uuid) - print(f"Deleted old webhook {webhook_uuid}") + request = CreateWebhookRequest( + url=webhook_url, + eventTypes=event_types, + hmac=settings.DAILY_WEBHOOK_SECRET, + ) + result = await client.create_webhook(request) + webhook_uuid = result.uuid - request = CreateWebhookRequest( - url=webhook_url, - eventTypes=event_types, - hmac=settings.DAILY_WEBHOOK_SECRET, - ) - result = await client.create_webhook(request) + print(f"✓ Created webhook {webhook_uuid} (state: {result.state})") + print(f" URL: {result.url}") - print( - f"✓ Created replacement webhook {result.uuid} (state: {result.state})" - ) - print(f" URL: {result.url}") + env_file = Path(__file__).parent.parent / ".env" + if env_file.exists(): + lines = env_file.read_text().splitlines() + updated = False + for i, line in enumerate(lines): + if line.startswith("DAILY_WEBHOOK_UUID="): + lines[i] = f"DAILY_WEBHOOK_UUID={webhook_uuid}" + updated = True + break + if not updated: + lines.append(f"DAILY_WEBHOOK_UUID={webhook_uuid}") + env_file.write_text("\n".join(lines) + "\n") + print("✓ Saved DAILY_WEBHOOK_UUID to .env") - webhook_uuid = result.uuid - - except Exception as e: - if hasattr(e, "response") and e.response.status_code == 404: - print(f"Webhook {webhook_uuid} not found, creating new one...") - webhook_uuid = None # Fall through to creation - else: - print(f"Error updating webhook: {e}") - return 1 - - if not webhook_uuid: - print("Creating new webhook...") - request = CreateWebhookRequest( - url=webhook_url, - eventTypes=event_types, - hmac=settings.DAILY_WEBHOOK_SECRET, - ) - result = await client.create_webhook(request) - webhook_uuid = result.uuid - - print(f"✓ Created webhook {webhook_uuid} (state: {result.state})") - print(f" URL: {result.url}") - print() - print("=" * 60) - print("IMPORTANT: Add this to your environment variables:") - print("=" * 60) - print(f"DAILY_WEBHOOK_UUID: {webhook_uuid}") - print("=" * 60) - print() - - # Try to write UUID to .env file - env_file = Path(__file__).parent.parent / ".env" - if env_file.exists(): - lines = env_file.read_text().splitlines() - updated = False - - # Update existing DAILY_WEBHOOK_UUID line or add it - for i, line in enumerate(lines): - if line.startswith("DAILY_WEBHOOK_UUID="): - lines[i] = f"DAILY_WEBHOOK_UUID={webhook_uuid}" - updated = True - break - - if not updated: - lines.append(f"DAILY_WEBHOOK_UUID={webhook_uuid}") - - env_file.write_text("\n".join(lines) + "\n") - print(f"✓ Also saved to local .env file") - else: - print(f"⚠ Local .env file not found - please add manually") - - return 0 + return 0 if __name__ == "__main__": @@ -117,11 +74,7 @@ if __name__ == "__main__": "Example: python recreate_daily_webhook.py https://example.com/v1/daily/webhook" ) print() - print("Behavior:") - print(" - If DAILY_WEBHOOK_UUID set: Deletes old webhook, creates new one") - print( - " - If DAILY_WEBHOOK_UUID empty: Creates new webhook, saves UUID to .env" - ) + print("Deletes all existing webhooks, then creates a new one.") sys.exit(1) sys.exit(asyncio.run(setup_webhook(sys.argv[1]))) diff --git a/www/package.json b/www/package.json index ceefbf55..4b66386e 100644 --- a/www/package.json +++ b/www/package.json @@ -3,7 +3,7 @@ "version": "0.1.0", "private": true, "scripts": { - "dev": "next dev", + "dev": "next dev --turbopack", "build": "next build", "build-production": "next build --experimental-build-mode compile", "start": "next start",