Files
reflector/server/reflector/worker/app.py

168 lines
5.8 KiB
Python

import celery
import structlog
from celery import Celery
from celery.schedules import crontab
from reflector.settings import settings
logger = structlog.get_logger(__name__)
# Polling intervals (seconds)
# CELERY_BEAT_POLL_INTERVAL overrides all sub-5-min intervals (e.g. 300 for selfhosted)
_override = (
float(settings.CELERY_BEAT_POLL_INTERVAL)
if settings.CELERY_BEAT_POLL_INTERVAL > 0
else 0
)
# Webhook-aware: 180s when webhook configured (backup mode), 15s when no webhook (primary discovery)
POLL_DAILY_RECORDINGS_INTERVAL_SEC = _override or (
180.0 if settings.DAILY_WEBHOOK_SECRET else 15.0
)
SQS_POLL_INTERVAL = _override or float(settings.SQS_POLLING_TIMEOUT_SECONDS)
RECONCILIATION_INTERVAL = _override or 30.0
ICS_SYNC_INTERVAL = _override or 60.0
UPCOMING_MEETINGS_INTERVAL = _override or 30.0
def build_beat_schedule(
*,
whereby_api_key=None,
aws_process_recording_queue_url=None,
daily_api_key=None,
public_mode=False,
public_data_retention_days=None,
healthcheck_url=None,
):
"""Build the Celery beat schedule based on configured services.
Only registers tasks for services that are actually configured,
avoiding unnecessary worker wake-ups in selfhosted deployments.
"""
beat_schedule = {}
_whereby_enabled = bool(whereby_api_key) or bool(aws_process_recording_queue_url)
if _whereby_enabled:
beat_schedule["process_messages"] = {
"task": "reflector.worker.process.process_messages",
"schedule": SQS_POLL_INTERVAL,
}
beat_schedule["reprocess_failed_recordings"] = {
"task": "reflector.worker.process.reprocess_failed_recordings",
"schedule": crontab(hour=5, minute=0), # Midnight EST
}
logger.info(
"Whereby beat tasks enabled",
tasks=["process_messages", "reprocess_failed_recordings"],
)
else:
logger.info("Whereby beat tasks disabled (no WHEREBY_API_KEY or SQS URL)")
_daily_enabled = bool(daily_api_key)
if _daily_enabled:
beat_schedule["poll_daily_recordings"] = {
"task": "reflector.worker.process.poll_daily_recordings",
"schedule": POLL_DAILY_RECORDINGS_INTERVAL_SEC,
}
beat_schedule["trigger_daily_reconciliation"] = {
"task": "reflector.worker.process.trigger_daily_reconciliation",
"schedule": RECONCILIATION_INTERVAL,
}
beat_schedule["reprocess_failed_daily_recordings"] = {
"task": "reflector.worker.process.reprocess_failed_daily_recordings",
"schedule": crontab(hour=5, minute=0), # Midnight EST
}
logger.info(
"Daily.co beat tasks enabled",
tasks=[
"poll_daily_recordings",
"trigger_daily_reconciliation",
"reprocess_failed_daily_recordings",
],
)
else:
logger.info("Daily.co beat tasks disabled (no DAILY_API_KEY)")
_livekit_enabled = bool(settings.LIVEKIT_API_KEY and settings.LIVEKIT_URL)
if _livekit_enabled:
logger.info("LiveKit platform detected")
_any_platform = _whereby_enabled or _daily_enabled or _livekit_enabled
if _any_platform:
beat_schedule["process_meetings"] = {
"task": "reflector.worker.process.process_meetings",
"schedule": SQS_POLL_INTERVAL,
}
beat_schedule["sync_all_ics_calendars"] = {
"task": "reflector.worker.ics_sync.sync_all_ics_calendars",
"schedule": ICS_SYNC_INTERVAL,
}
beat_schedule["create_upcoming_meetings"] = {
"task": "reflector.worker.ics_sync.create_upcoming_meetings",
"schedule": UPCOMING_MEETINGS_INTERVAL,
}
logger.info(
"Platform tasks enabled",
tasks=[
"process_meetings",
"sync_all_ics_calendars",
"create_upcoming_meetings",
],
)
else:
logger.info("Platform tasks disabled (no video platform configured)")
if public_mode:
beat_schedule["cleanup_old_public_data"] = {
"task": "reflector.worker.cleanup.cleanup_old_public_data_task",
"schedule": crontab(hour=3, minute=0),
}
logger.info(
"Public mode cleanup enabled",
retention_days=public_data_retention_days,
)
if healthcheck_url:
beat_schedule["healthcheck_ping"] = {
"task": "reflector.worker.healthcheck.healthcheck_ping",
"schedule": 60.0 * 10,
}
logger.info("Healthcheck enabled", url=healthcheck_url)
else:
logger.warning("Healthcheck disabled, no url configured")
logger.info(
"Beat schedule configured",
total_tasks=len(beat_schedule),
task_names=sorted(beat_schedule.keys()),
)
return beat_schedule
if celery.current_app.main != "default":
logger.info(f"Celery already configured ({celery.current_app})")
app = celery.current_app
else:
app = Celery(__name__)
app.conf.broker_url = settings.CELERY_BROKER_URL
app.conf.result_backend = settings.CELERY_RESULT_BACKEND
app.conf.broker_connection_retry_on_startup = True
app.autodiscover_tasks(
[
"reflector.worker.healthcheck",
"reflector.worker.process",
"reflector.worker.cleanup",
"reflector.worker.ics_sync",
]
)
app.conf.beat_schedule = build_beat_schedule(
whereby_api_key=settings.WHEREBY_API_KEY,
aws_process_recording_queue_url=settings.AWS_PROCESS_RECORDING_QUEUE_URL,
daily_api_key=settings.DAILY_API_KEY,
public_mode=settings.PUBLIC_MODE,
public_data_retention_days=settings.PUBLIC_DATA_RETENTION_DAYS,
healthcheck_url=settings.HEALTHCHECK_URL,
)