fix: local env setup (#855)

* Ensure rate limit

* Increase nextjs compilation speed

* Fix daily no content handling

* Simplify daily webhook creation

* Fix webhook request validation
This commit is contained in:
Sergey Mankovsky
2026-02-11 16:59:21 +01:00
committed by GitHub
parent 39573626e9
commit ec4f356b4c
9 changed files with 123 additions and 85 deletions

View File

@@ -79,6 +79,7 @@ services:
volumes:
- ./www:/app/
- /app/node_modules
- next_cache:/app/.next
env_file:
- ./www/.env.local
environment:
@@ -132,6 +133,9 @@ services:
retries: 5
start_period: 30s
volumes:
next_cache:
networks:
default:
attachable: true

View File

@@ -146,6 +146,8 @@ class DailyApiClient:
)
raise DailyApiError(operation, response)
if not response.content:
return {}
return response.json()
# ============================================================================

View File

@@ -99,7 +99,7 @@ def extract_room_name(event: DailyWebhookEvent) -> str | None:
>>> event = DailyWebhookEvent(**webhook_payload)
>>> room_name = extract_room_name(event)
"""
room = event.payload.get("room_name")
room = event.payload.get("room_name") or event.payload.get("room")
# Ensure we return a string, not any falsy value that might be in payload
return room if isinstance(room, str) else None

View File

@@ -6,7 +6,7 @@ Reference: https://docs.daily.co/reference/rest-api/webhooks
from typing import Annotated, Any, Dict, Literal, Union
from pydantic import BaseModel, Field, field_validator
from pydantic import AliasChoices, BaseModel, ConfigDict, Field, field_validator
from reflector.utils.string import NonEmptyString
@@ -41,6 +41,8 @@ class DailyTrack(BaseModel):
Reference: https://docs.daily.co/reference/rest-api/recordings
"""
model_config = ConfigDict(extra="ignore")
type: Literal["audio", "video"]
s3Key: NonEmptyString = Field(description="S3 object key for the track file")
size: int = Field(description="File size in bytes")
@@ -54,6 +56,8 @@ class DailyWebhookEvent(BaseModel):
Reference: https://docs.daily.co/reference/rest-api/webhooks
"""
model_config = ConfigDict(extra="ignore")
version: NonEmptyString = Field(
description="Represents the version of the event. This uses semantic versioning to inform a consumer if the payload has introduced any breaking changes"
)
@@ -82,7 +86,13 @@ class ParticipantJoinedPayload(BaseModel):
Reference: https://docs.daily.co/reference/rest-api/webhooks/events/participant-joined
"""
room_name: NonEmptyString | None = Field(None, description="Daily.co room name")
model_config = ConfigDict(extra="ignore")
room_name: NonEmptyString | None = Field(
None,
description="Daily.co room name",
validation_alias=AliasChoices("room_name", "room"),
)
session_id: NonEmptyString = Field(description="Daily.co session identifier")
user_id: NonEmptyString = Field(description="User identifier (may be encoded)")
user_name: NonEmptyString | None = Field(None, description="User display name")
@@ -100,7 +110,13 @@ class ParticipantLeftPayload(BaseModel):
Reference: https://docs.daily.co/reference/rest-api/webhooks/events/participant-left
"""
room_name: NonEmptyString | None = Field(None, description="Daily.co room name")
model_config = ConfigDict(extra="ignore")
room_name: NonEmptyString | None = Field(
None,
description="Daily.co room name",
validation_alias=AliasChoices("room_name", "room"),
)
session_id: NonEmptyString = Field(description="Daily.co session identifier")
user_id: NonEmptyString = Field(description="User identifier (may be encoded)")
user_name: NonEmptyString | None = Field(None, description="User display name")
@@ -112,6 +128,9 @@ class ParticipantLeftPayload(BaseModel):
_normalize_joined_at = field_validator("joined_at", mode="before")(
normalize_timestamp_to_int
)
_normalize_duration = field_validator("duration", mode="before")(
normalize_timestamp_to_int
)
class RecordingStartedPayload(BaseModel):
@@ -121,6 +140,8 @@ class RecordingStartedPayload(BaseModel):
Reference: https://docs.daily.co/reference/rest-api/webhooks/events/recording-started
"""
model_config = ConfigDict(extra="ignore")
room_name: NonEmptyString | None = Field(None, description="Daily.co room name")
recording_id: NonEmptyString = Field(description="Recording identifier")
start_ts: int | None = Field(None, description="Recording start timestamp")
@@ -138,7 +159,9 @@ class RecordingReadyToDownloadPayload(BaseModel):
Reference: https://docs.daily.co/reference/rest-api/webhooks/events/recording-ready-to-download
"""
type: Literal["cloud", "raw-tracks"] = Field(
model_config = ConfigDict(extra="ignore")
type: Literal["cloud", "cloud-audio-only", "raw-tracks"] = Field(
description="The type of recording that was generated"
)
recording_id: NonEmptyString = Field(
@@ -153,8 +176,9 @@ class RecordingReadyToDownloadPayload(BaseModel):
status: Literal["finished"] = Field(
description="The status of the given recording (always 'finished' in ready-to-download webhook, see RecordingStatus in responses.py for full API statuses)"
)
max_participants: int = Field(
description="The number of participants on the call that were recorded"
max_participants: int | None = Field(
None,
description="The number of participants on the call that were recorded (optional; Daily may omit it in some webhook versions)",
)
duration: int = Field(description="The duration in seconds of the call")
s3_key: NonEmptyString = Field(
@@ -180,6 +204,8 @@ class RecordingErrorPayload(BaseModel):
Reference: https://docs.daily.co/reference/rest-api/webhooks/events/recording-error
"""
model_config = ConfigDict(extra="ignore")
action: Literal["clourd-recording-err", "cloud-recording-error"] = Field(
description="A string describing the event that was emitted (both variants are documented)"
)
@@ -200,6 +226,8 @@ class RecordingErrorPayload(BaseModel):
class ParticipantJoinedEvent(BaseModel):
model_config = ConfigDict(extra="ignore")
version: NonEmptyString
type: Literal["participant.joined"]
id: NonEmptyString
@@ -212,6 +240,8 @@ class ParticipantJoinedEvent(BaseModel):
class ParticipantLeftEvent(BaseModel):
model_config = ConfigDict(extra="ignore")
version: NonEmptyString
type: Literal["participant.left"]
id: NonEmptyString
@@ -224,6 +254,8 @@ class ParticipantLeftEvent(BaseModel):
class RecordingStartedEvent(BaseModel):
model_config = ConfigDict(extra="ignore")
version: NonEmptyString
type: Literal["recording.started"]
id: NonEmptyString
@@ -236,6 +268,8 @@ class RecordingStartedEvent(BaseModel):
class RecordingReadyEvent(BaseModel):
model_config = ConfigDict(extra="ignore")
version: NonEmptyString
type: Literal["recording.ready-to-download"]
id: NonEmptyString
@@ -248,6 +282,8 @@ class RecordingReadyEvent(BaseModel):
class RecordingErrorEvent(BaseModel):
model_config = ConfigDict(extra="ignore")
version: NonEmptyString
type: Literal["recording.error"]
id: NonEmptyString

View File

@@ -12,7 +12,9 @@ import threading
from hatchet_sdk import ClientConfig, Hatchet
from hatchet_sdk.clients.rest.models import V1TaskStatus
from hatchet_sdk.rate_limit import RateLimitDuration
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, LLM_RATE_LIMIT_PER_SECOND
from reflector.logger import logger
from reflector.settings import settings
@@ -113,3 +115,26 @@ class HatchetClientManager:
"""Reset the client instance (for testing)."""
with cls._lock:
cls._instance = None
@classmethod
async def ensure_rate_limit(cls) -> None:
"""Ensure the LLM rate limit exists in Hatchet.
Uses the Hatchet SDK rate_limits client (aio_put). See:
https://docs.hatchet.run/sdks/python/feature-clients/rate_limits
"""
logger.info(
"[Hatchet] Ensuring rate limit exists",
rate_limit_key=LLM_RATE_LIMIT_KEY,
limit=LLM_RATE_LIMIT_PER_SECOND,
)
client = cls.get_client()
await client.rate_limits.aio_put(
key=LLM_RATE_LIMIT_KEY,
limit=LLM_RATE_LIMIT_PER_SECOND,
duration=RateLimitDuration.SECOND,
)
logger.info(
"[Hatchet] Rate limit put successfully",
rate_limit_key=LLM_RATE_LIMIT_KEY,
)

View File

@@ -3,6 +3,8 @@ LLM/I/O worker pool for all non-CPU tasks.
Handles: all tasks except mixdown_tracks (transcription, LLM inference, orchestration)
"""
import asyncio
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
daily_multitrack_pipeline,
@@ -20,6 +22,15 @@ POOL = "llm-io"
def main():
hatchet = HatchetClientManager.get_client()
try:
asyncio.run(HatchetClientManager.ensure_rate_limit())
except Exception as e:
logger.warning(
"[Hatchet] Rate limit initialization failed, but continuing. "
"If workflows fail to register, rate limits may need to be created manually.",
error=str(e),
)
logger.info(
"Starting Hatchet LLM worker pool (all tasks except mixdown)",
worker_name=WORKER_NAME,

View File

@@ -80,7 +80,14 @@ async def webhook(request: Request):
try:
event = event_adapter.validate_python(body_json)
except Exception as e:
logger.error("Failed to parse webhook event", error=str(e), body=body.decode())
err_detail = str(e)
if hasattr(e, "errors"):
err_detail = f"{err_detail}; errors={e.errors()!r}"
logger.error(
"Failed to parse webhook event",
error=err_detail,
body=body.decode(),
)
raise HTTPException(status_code=422, detail="Invalid event format")
match event:

View File

@@ -15,8 +15,7 @@ from reflector.settings import settings
async def setup_webhook(webhook_url: str):
"""
Create or update Daily.co webhook for this environment using dailyco_api module.
Uses DAILY_WEBHOOK_UUID to identify existing webhook.
Create Daily.co webhook. Deletes any existing webhooks first, then creates the new one.
"""
if not settings.DAILY_API_KEY:
print("Error: DAILY_API_KEY not set")
@@ -35,79 +34,37 @@ async def setup_webhook(webhook_url: str):
]
async with DailyApiClient(api_key=settings.DAILY_API_KEY) as client:
webhook_uuid = settings.DAILY_WEBHOOK_UUID
webhooks = await client.list_webhooks()
for wh in webhooks:
await client.delete_webhook(wh.uuid)
print(f"Deleted webhook {wh.uuid}")
if webhook_uuid:
print(f"Updating existing webhook {webhook_uuid}...")
try:
# Note: Daily.co doesn't support PATCH well, so we delete + recreate
await client.delete_webhook(webhook_uuid)
print(f"Deleted old webhook {webhook_uuid}")
request = CreateWebhookRequest(
url=webhook_url,
eventTypes=event_types,
hmac=settings.DAILY_WEBHOOK_SECRET,
)
result = await client.create_webhook(request)
webhook_uuid = result.uuid
request = CreateWebhookRequest(
url=webhook_url,
eventTypes=event_types,
hmac=settings.DAILY_WEBHOOK_SECRET,
)
result = await client.create_webhook(request)
print(f"Created webhook {webhook_uuid} (state: {result.state})")
print(f" URL: {result.url}")
print(
f"✓ Created replacement webhook {result.uuid} (state: {result.state})"
)
print(f" URL: {result.url}")
env_file = Path(__file__).parent.parent / ".env"
if env_file.exists():
lines = env_file.read_text().splitlines()
updated = False
for i, line in enumerate(lines):
if line.startswith("DAILY_WEBHOOK_UUID="):
lines[i] = f"DAILY_WEBHOOK_UUID={webhook_uuid}"
updated = True
break
if not updated:
lines.append(f"DAILY_WEBHOOK_UUID={webhook_uuid}")
env_file.write_text("\n".join(lines) + "\n")
print("✓ Saved DAILY_WEBHOOK_UUID to .env")
webhook_uuid = result.uuid
except Exception as e:
if hasattr(e, "response") and e.response.status_code == 404:
print(f"Webhook {webhook_uuid} not found, creating new one...")
webhook_uuid = None # Fall through to creation
else:
print(f"Error updating webhook: {e}")
return 1
if not webhook_uuid:
print("Creating new webhook...")
request = CreateWebhookRequest(
url=webhook_url,
eventTypes=event_types,
hmac=settings.DAILY_WEBHOOK_SECRET,
)
result = await client.create_webhook(request)
webhook_uuid = result.uuid
print(f"✓ Created webhook {webhook_uuid} (state: {result.state})")
print(f" URL: {result.url}")
print()
print("=" * 60)
print("IMPORTANT: Add this to your environment variables:")
print("=" * 60)
print(f"DAILY_WEBHOOK_UUID: {webhook_uuid}")
print("=" * 60)
print()
# Try to write UUID to .env file
env_file = Path(__file__).parent.parent / ".env"
if env_file.exists():
lines = env_file.read_text().splitlines()
updated = False
# Update existing DAILY_WEBHOOK_UUID line or add it
for i, line in enumerate(lines):
if line.startswith("DAILY_WEBHOOK_UUID="):
lines[i] = f"DAILY_WEBHOOK_UUID={webhook_uuid}"
updated = True
break
if not updated:
lines.append(f"DAILY_WEBHOOK_UUID={webhook_uuid}")
env_file.write_text("\n".join(lines) + "\n")
print(f"✓ Also saved to local .env file")
else:
print(f"⚠ Local .env file not found - please add manually")
return 0
return 0
if __name__ == "__main__":
@@ -117,11 +74,7 @@ if __name__ == "__main__":
"Example: python recreate_daily_webhook.py https://example.com/v1/daily/webhook"
)
print()
print("Behavior:")
print(" - If DAILY_WEBHOOK_UUID set: Deletes old webhook, creates new one")
print(
" - If DAILY_WEBHOOK_UUID empty: Creates new webhook, saves UUID to .env"
)
print("Deletes all existing webhooks, then creates a new one.")
sys.exit(1)
sys.exit(asyncio.run(setup_webhook(sys.argv[1])))

View File

@@ -3,7 +3,7 @@
"version": "0.1.0",
"private": true,
"scripts": {
"dev": "next dev",
"dev": "next dev --turbopack",
"build": "next build",
"build-production": "next build --experimental-build-mode compile",
"start": "next start",