feat: enable daily co in selfhosted + only schedule tasks when necessary (#883)

* feat: enable daily co in selfhosted + only schedule tasks when necessary

* feat: refactor aws storage to be platform agnostic + add local pad tracking with slfhosted support
This commit is contained in:
Juan Diego García
2026-03-02 11:08:20 -05:00
committed by GitHub
parent f6cc03286b
commit 045eae8ff2
23 changed files with 1442 additions and 165 deletions

View File

@@ -90,7 +90,6 @@ from reflector.processors.summary.summary_builder import SummaryBuilder
from reflector.processors.types import TitleSummary, Word
from reflector.processors.types import Transcript as TranscriptType
from reflector.settings import settings
from reflector.storage.storage_aws import AwsStorage
from reflector.utils.audio_constants import (
PRESIGNED_URL_EXPIRATION_SECONDS,
WAVEFORM_SEGMENTS,
@@ -117,6 +116,7 @@ class PipelineInput(BaseModel):
bucket_name: NonEmptyString
transcript_id: NonEmptyString
room_id: NonEmptyString | None = None
source_platform: str = "daily"
hatchet = HatchetClientManager.get_client()
@@ -170,15 +170,10 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool:
def _spawn_storage():
"""Create fresh storage instance."""
# TODO: replace direct AwsStorage construction with get_transcripts_storage() factory
return AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
aws_endpoint_url=settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL,
)
"""Create fresh storage instance for writing to our transcript bucket."""
from reflector.storage import get_transcripts_storage # noqa: PLC0415
return get_transcripts_storage()
class Loggable(Protocol):
@@ -434,6 +429,7 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
bucket_name=input.bucket_name,
transcript_id=input.transcript_id,
language=source_language,
source_platform=input.source_platform,
)
)
for i, track in enumerate(input.tracks)
@@ -1195,7 +1191,10 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
)
from reflector.db.recordings import recordings_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.storage import get_transcripts_storage # noqa: PLC0415
from reflector.storage import ( # noqa: PLC0415
get_source_storage,
get_transcripts_storage,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
@@ -1245,7 +1244,7 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
deletion_errors = []
if input_track_keys and input.bucket_name:
master_storage = get_transcripts_storage()
master_storage = get_source_storage(input.source_platform)
for key in input_track_keys:
try:
await master_storage.delete_file(key, bucket=input.bucket_name)

View File

@@ -24,6 +24,7 @@ class PaddingInput(BaseModel):
s3_key: str
bucket_name: str
transcript_id: str
source_platform: str = "daily"
hatchet = HatchetClientManager.get_client()
@@ -45,20 +46,14 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
)
try:
# Create fresh storage instance to avoid aioboto3 fork issues
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
# TODO: replace direct AwsStorage construction with get_transcripts_storage() factory
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
aws_endpoint_url=settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL,
from reflector.storage import ( # noqa: PLC0415
get_source_storage,
get_transcripts_storage,
)
source_url = await storage.get_file_url(
# Source reads: use platform-specific credentials
source_storage = get_source_storage(input.source_platform)
source_url = await source_storage.get_file_url(
input.s3_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
@@ -96,52 +91,28 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
# Presign PUT URL for output (Modal will upload directly)
output_url = await storage.get_file_url(
# Output writes: use transcript storage (our own bucket)
output_storage = get_transcripts_storage()
output_url = await output_storage.get_file_url(
storage_path,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
import httpx # noqa: PLC0415
from reflector.processors.audio_padding_modal import ( # noqa: PLC0415
AudioPaddingModalProcessor,
from reflector.processors.audio_padding_auto import ( # noqa: PLC0415
AudioPaddingAutoProcessor,
)
try:
processor = AudioPaddingModalProcessor()
result = await processor.pad_track(
track_url=source_url,
output_url=output_url,
start_time_seconds=start_time_seconds,
track_index=input.track_index,
)
file_size = result.size
processor = AudioPaddingAutoProcessor()
result = await processor.pad_track(
track_url=source_url,
output_url=output_url,
start_time_seconds=start_time_seconds,
track_index=input.track_index,
)
file_size = result.size
ctx.log(f"pad_track: Modal returned size={file_size}")
except httpx.HTTPStatusError as e:
error_detail = e.response.text if hasattr(e.response, "text") else str(e)
logger.error(
"[Hatchet] Modal padding HTTP error",
transcript_id=input.transcript_id,
track_index=input.track_index,
status_code=e.response.status_code if hasattr(e, "response") else None,
error=error_detail,
exc_info=True,
)
raise Exception(
f"Modal padding failed: HTTP {e.response.status_code}"
) from e
except httpx.TimeoutException as e:
logger.error(
"[Hatchet] Modal padding timeout",
transcript_id=input.transcript_id,
track_index=input.track_index,
error=str(e),
exc_info=True,
)
raise Exception("Modal padding timeout") from e
ctx.log(f"pad_track: padding returned size={file_size}")
logger.info(
"[Hatchet] pad_track complete",

View File

@@ -36,6 +36,7 @@ class TrackInput(BaseModel):
bucket_name: str
transcript_id: str
language: str = "en"
source_platform: str = "daily"
hatchet = HatchetClientManager.get_client()
@@ -59,20 +60,14 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
)
try:
# Create fresh storage instance to avoid aioboto3 fork issues
# TODO: replace direct AwsStorage construction with get_transcripts_storage() factory
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
aws_endpoint_url=settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL,
from reflector.storage import ( # noqa: PLC0415
get_source_storage,
get_transcripts_storage,
)
source_url = await storage.get_file_url(
# Source reads: use platform-specific credentials
source_storage = get_source_storage(input.source_platform)
source_url = await source_storage.get_file_url(
input.s3_key,
operation="get_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
@@ -99,18 +94,19 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
storage_path = f"file_pipeline_hatchet/{input.transcript_id}/tracks/padded_{input.track_index}.webm"
# Presign PUT URL for output (Modal uploads directly)
output_url = await storage.get_file_url(
# Output writes: use transcript storage (our own bucket)
output_storage = get_transcripts_storage()
output_url = await output_storage.get_file_url(
storage_path,
operation="put_object",
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
from reflector.processors.audio_padding_modal import ( # noqa: PLC0415
AudioPaddingModalProcessor,
from reflector.processors.audio_padding_auto import ( # noqa: PLC0415
AudioPaddingAutoProcessor,
)
processor = AudioPaddingModalProcessor()
processor = AudioPaddingAutoProcessor()
result = await processor.pad_track(
track_url=source_url,
output_url=output_url,
@@ -161,18 +157,18 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
raise ValueError("Missing padded_key from pad_track")
# Presign URL on demand (avoids stale URLs on workflow replay)
# TODO: replace direct AwsStorage construction with get_transcripts_storage() factory
from reflector.settings import settings # noqa: PLC0415
from reflector.storage.storage_aws import AwsStorage # noqa: PLC0415
storage = AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
aws_access_key_id=settings.TRANSCRIPT_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.TRANSCRIPT_STORAGE_AWS_SECRET_ACCESS_KEY,
aws_endpoint_url=settings.TRANSCRIPT_STORAGE_AWS_ENDPOINT_URL,
from reflector.storage import ( # noqa: PLC0415
get_source_storage,
get_transcripts_storage,
)
# If bucket_name is set, file is still in the platform's source bucket (no padding applied).
# If bucket_name is None, padded file was written to our transcript storage.
if bucket_name:
storage = get_source_storage(input.source_platform)
else:
storage = get_transcripts_storage()
audio_url = await storage.get_file_url(
padded_key,
operation="get_object",

View File

@@ -0,0 +1,31 @@
import importlib
from reflector.settings import settings
class AudioPaddingAutoProcessor:
_registry = {}
@classmethod
def register(cls, name, kclass):
cls._registry[name] = kclass
def __new__(cls, name: str | None = None, **kwargs):
if name is None:
name = settings.PADDING_BACKEND
if name not in cls._registry:
module_name = f"reflector.processors.audio_padding_{name}"
importlib.import_module(module_name)
# gather specific configuration for the processor
# search `PADDING_XXX_YYY`, push to constructor as `xxx_yyy`
config = {}
name_upper = name.upper()
settings_prefix = "PADDING_"
config_prefix = f"{settings_prefix}{name_upper}_"
for key, value in settings:
if key.startswith(config_prefix):
config_name = key[len(settings_prefix) :].lower()
config[config_name] = value
return cls._registry[name](**config | kwargs)

View File

@@ -0,0 +1,133 @@
"""
Local audio padding processor using PyAV.
Pads audio tracks with silence directly in-process (no HTTP).
Reuses the shared PyAV utilities from reflector.utils.audio_padding.
"""
import asyncio
import os
import tempfile
import av
from reflector.logger import logger
from reflector.processors.audio_padding_auto import AudioPaddingAutoProcessor
from reflector.processors.audio_padding_modal import PaddingResponse
from reflector.utils.audio_padding import apply_audio_padding_to_file
S3_TIMEOUT = 60
class AudioPaddingLocalProcessor:
"""Audio padding processor using local PyAV (no HTTP backend)."""
async def pad_track(
self,
track_url: str,
output_url: str,
start_time_seconds: float,
track_index: int,
) -> PaddingResponse:
"""Pad audio track with silence locally via PyAV.
Args:
track_url: Presigned GET URL for source audio track
output_url: Presigned PUT URL for output WebM
start_time_seconds: Amount of silence to prepend
track_index: Track index for logging
"""
if not track_url:
raise ValueError("track_url cannot be empty")
if start_time_seconds <= 0:
raise ValueError(
f"start_time_seconds must be positive, got {start_time_seconds}"
)
log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds)
log.info("Starting local PyAV padding")
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
self._pad_track_blocking,
track_url,
output_url,
start_time_seconds,
track_index,
)
def _pad_track_blocking(
self,
track_url: str,
output_url: str,
start_time_seconds: float,
track_index: int,
) -> PaddingResponse:
"""Blocking padding work: download, pad with PyAV, upload."""
import requests
log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds)
temp_dir = tempfile.mkdtemp()
input_path = None
output_path = None
try:
# Download source audio
log.info("Downloading track for local padding")
response = requests.get(track_url, stream=True, timeout=S3_TIMEOUT)
response.raise_for_status()
input_path = os.path.join(temp_dir, "track.webm")
total_bytes = 0
with open(input_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
total_bytes += len(chunk)
log.info("Track downloaded", bytes=total_bytes)
# Apply padding using shared PyAV utility
output_path = os.path.join(temp_dir, "padded.webm")
with av.open(input_path) as in_container:
apply_audio_padding_to_file(
in_container,
output_path,
start_time_seconds,
track_index,
logger=logger,
)
file_size = os.path.getsize(output_path)
log.info("Local padding complete", size=file_size)
# Upload padded track
log.info("Uploading padded track to S3")
with open(output_path, "rb") as f:
upload_response = requests.put(output_url, data=f, timeout=S3_TIMEOUT)
upload_response.raise_for_status()
log.info("Upload complete", size=file_size)
return PaddingResponse(size=file_size)
except Exception as e:
log.error("Local padding failed", error=str(e), exc_info=True)
raise
finally:
if input_path and os.path.exists(input_path):
try:
os.unlink(input_path)
except Exception as e:
log.warning("Failed to cleanup input file", error=str(e))
if output_path and os.path.exists(output_path):
try:
os.unlink(output_path)
except Exception as e:
log.warning("Failed to cleanup output file", error=str(e))
try:
os.rmdir(temp_dir)
except Exception as e:
log.warning("Failed to cleanup temp directory", error=str(e))
AudioPaddingAutoProcessor.register("local", AudioPaddingLocalProcessor)

View File

@@ -10,6 +10,7 @@ from pydantic import BaseModel
from reflector.hatchet.constants import TIMEOUT_AUDIO
from reflector.logger import logger
from reflector.processors.audio_padding_auto import AudioPaddingAutoProcessor
class PaddingResponse(BaseModel):
@@ -111,3 +112,6 @@ class AudioPaddingModalProcessor:
except Exception as e:
log.error("Modal padding unexpected error", error=str(e), exc_info=True)
raise
AudioPaddingAutoProcessor.register("modal", AudioPaddingModalProcessor)

View File

@@ -40,6 +40,7 @@ class MultitrackProcessingConfig:
track_keys: list[str]
recording_id: NonEmptyString | None = None
room_id: NonEmptyString | None = None
source_platform: str = "daily"
mode: Literal["multitrack"] = "multitrack"
@@ -256,6 +257,7 @@ async def dispatch_transcript_processing(
"bucket_name": config.bucket_name,
"transcript_id": config.transcript_id,
"room_id": config.room_id,
"source_platform": config.source_platform,
},
additional_metadata={
"transcript_id": config.transcript_id,

View File

@@ -73,6 +73,9 @@ class Settings(BaseSettings):
DAILYCO_STORAGE_AWS_BUCKET_NAME: str | None = None
DAILYCO_STORAGE_AWS_REGION: str | None = None
DAILYCO_STORAGE_AWS_ROLE_ARN: str | None = None
# Worker credentials for reading/deleting from Daily's recording bucket
DAILYCO_STORAGE_AWS_ACCESS_KEY_ID: str | None = None
DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY: str | None = None
# Translate into the target language
TRANSLATION_BACKEND: str = "passthrough"
@@ -106,7 +109,11 @@ class Settings(BaseSettings):
# Diarization: modal backend
DIARIZATION_MODAL_API_KEY: str | None = None
# Audio Padding (Modal.com backend)
# Audio Padding
# backends:
# - local: in-process PyAV padding (no HTTP, runs in same process)
# - modal: HTTP API client (works with Modal.com OR self-hosted gpu/self_hosted/)
PADDING_BACKEND: str = "local"
PADDING_URL: str | None = None
PADDING_MODAL_API_KEY: str | None = None

View File

@@ -17,6 +17,49 @@ def get_transcripts_storage() -> Storage:
)
def get_source_storage(platform: str) -> Storage:
"""Get storage for reading/deleting source recording files from the platform's bucket.
Returns an AwsStorage configured with the platform's worker credentials
(access keys), or falls back to get_transcripts_storage() when platform-specific
credentials aren't configured (e.g., single-bucket setups).
Args:
platform: Recording platform name ("daily", "whereby", or other).
"""
if platform == "daily":
if (
settings.DAILYCO_STORAGE_AWS_ACCESS_KEY_ID
and settings.DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY
and settings.DAILYCO_STORAGE_AWS_BUCKET_NAME
):
from reflector.storage.storage_aws import AwsStorage
return AwsStorage(
aws_bucket_name=settings.DAILYCO_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.DAILYCO_STORAGE_AWS_REGION or "us-east-1",
aws_access_key_id=settings.DAILYCO_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.DAILYCO_STORAGE_AWS_SECRET_ACCESS_KEY,
)
elif platform == "whereby":
if (
settings.WHEREBY_STORAGE_AWS_ACCESS_KEY_ID
and settings.WHEREBY_STORAGE_AWS_SECRET_ACCESS_KEY
and settings.WHEREBY_STORAGE_AWS_BUCKET_NAME
):
from reflector.storage.storage_aws import AwsStorage
return AwsStorage(
aws_bucket_name=settings.WHEREBY_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.WHEREBY_STORAGE_AWS_REGION or "us-east-1",
aws_access_key_id=settings.WHEREBY_STORAGE_AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.WHEREBY_STORAGE_AWS_SECRET_ACCESS_KEY,
)
return get_transcripts_storage()
def get_whereby_storage() -> Storage:
"""
Get storage config for Whereby (for passing to Whereby API).

View File

@@ -24,6 +24,118 @@ RECONCILIATION_INTERVAL = _override or 30.0
ICS_SYNC_INTERVAL = _override or 60.0
UPCOMING_MEETINGS_INTERVAL = _override or 30.0
def build_beat_schedule(
*,
whereby_api_key=None,
aws_process_recording_queue_url=None,
daily_api_key=None,
public_mode=False,
public_data_retention_days=None,
healthcheck_url=None,
):
"""Build the Celery beat schedule based on configured services.
Only registers tasks for services that are actually configured,
avoiding unnecessary worker wake-ups in selfhosted deployments.
"""
beat_schedule = {}
_whereby_enabled = bool(whereby_api_key) or bool(aws_process_recording_queue_url)
if _whereby_enabled:
beat_schedule["process_messages"] = {
"task": "reflector.worker.process.process_messages",
"schedule": SQS_POLL_INTERVAL,
}
beat_schedule["reprocess_failed_recordings"] = {
"task": "reflector.worker.process.reprocess_failed_recordings",
"schedule": crontab(hour=5, minute=0), # Midnight EST
}
logger.info(
"Whereby beat tasks enabled",
tasks=["process_messages", "reprocess_failed_recordings"],
)
else:
logger.info("Whereby beat tasks disabled (no WHEREBY_API_KEY or SQS URL)")
_daily_enabled = bool(daily_api_key)
if _daily_enabled:
beat_schedule["poll_daily_recordings"] = {
"task": "reflector.worker.process.poll_daily_recordings",
"schedule": POLL_DAILY_RECORDINGS_INTERVAL_SEC,
}
beat_schedule["trigger_daily_reconciliation"] = {
"task": "reflector.worker.process.trigger_daily_reconciliation",
"schedule": RECONCILIATION_INTERVAL,
}
beat_schedule["reprocess_failed_daily_recordings"] = {
"task": "reflector.worker.process.reprocess_failed_daily_recordings",
"schedule": crontab(hour=5, minute=0), # Midnight EST
}
logger.info(
"Daily.co beat tasks enabled",
tasks=[
"poll_daily_recordings",
"trigger_daily_reconciliation",
"reprocess_failed_daily_recordings",
],
)
else:
logger.info("Daily.co beat tasks disabled (no DAILY_API_KEY)")
_any_platform = _whereby_enabled or _daily_enabled
if _any_platform:
beat_schedule["process_meetings"] = {
"task": "reflector.worker.process.process_meetings",
"schedule": SQS_POLL_INTERVAL,
}
beat_schedule["sync_all_ics_calendars"] = {
"task": "reflector.worker.ics_sync.sync_all_ics_calendars",
"schedule": ICS_SYNC_INTERVAL,
}
beat_schedule["create_upcoming_meetings"] = {
"task": "reflector.worker.ics_sync.create_upcoming_meetings",
"schedule": UPCOMING_MEETINGS_INTERVAL,
}
logger.info(
"Platform tasks enabled",
tasks=[
"process_meetings",
"sync_all_ics_calendars",
"create_upcoming_meetings",
],
)
else:
logger.info("Platform tasks disabled (no video platform configured)")
if public_mode:
beat_schedule["cleanup_old_public_data"] = {
"task": "reflector.worker.cleanup.cleanup_old_public_data_task",
"schedule": crontab(hour=3, minute=0),
}
logger.info(
"Public mode cleanup enabled",
retention_days=public_data_retention_days,
)
if healthcheck_url:
beat_schedule["healthcheck_ping"] = {
"task": "reflector.worker.healthcheck.healthcheck_ping",
"schedule": 60.0 * 10,
}
logger.info("Healthcheck enabled", url=healthcheck_url)
else:
logger.warning("Healthcheck disabled, no url configured")
logger.info(
"Beat schedule configured",
total_tasks=len(beat_schedule),
task_names=sorted(beat_schedule.keys()),
)
return beat_schedule
if celery.current_app.main != "default":
logger.info(f"Celery already configured ({celery.current_app})")
app = celery.current_app
@@ -42,57 +154,11 @@ else:
]
)
# crontab
app.conf.beat_schedule = {
"process_messages": {
"task": "reflector.worker.process.process_messages",
"schedule": SQS_POLL_INTERVAL,
},
"process_meetings": {
"task": "reflector.worker.process.process_meetings",
"schedule": SQS_POLL_INTERVAL,
},
"reprocess_failed_recordings": {
"task": "reflector.worker.process.reprocess_failed_recordings",
"schedule": crontab(hour=5, minute=0), # Midnight EST
},
"reprocess_failed_daily_recordings": {
"task": "reflector.worker.process.reprocess_failed_daily_recordings",
"schedule": crontab(hour=5, minute=0), # Midnight EST
},
"poll_daily_recordings": {
"task": "reflector.worker.process.poll_daily_recordings",
"schedule": POLL_DAILY_RECORDINGS_INTERVAL_SEC,
},
"trigger_daily_reconciliation": {
"task": "reflector.worker.process.trigger_daily_reconciliation",
"schedule": RECONCILIATION_INTERVAL,
},
"sync_all_ics_calendars": {
"task": "reflector.worker.ics_sync.sync_all_ics_calendars",
"schedule": ICS_SYNC_INTERVAL,
},
"create_upcoming_meetings": {
"task": "reflector.worker.ics_sync.create_upcoming_meetings",
"schedule": UPCOMING_MEETINGS_INTERVAL,
},
}
if settings.PUBLIC_MODE:
app.conf.beat_schedule["cleanup_old_public_data"] = {
"task": "reflector.worker.cleanup.cleanup_old_public_data_task",
"schedule": crontab(hour=3, minute=0),
}
logger.info(
"Public mode cleanup enabled",
retention_days=settings.PUBLIC_DATA_RETENTION_DAYS,
)
if settings.HEALTHCHECK_URL:
app.conf.beat_schedule["healthcheck_ping"] = {
"task": "reflector.worker.healthcheck.healthcheck_ping",
"schedule": 60.0 * 10,
}
logger.info("Healthcheck enabled", url=settings.HEALTHCHECK_URL)
else:
logger.warning("Healthcheck disabled, no url configured")
app.conf.beat_schedule = build_beat_schedule(
whereby_api_key=settings.WHEREBY_API_KEY,
aws_process_recording_queue_url=settings.AWS_PROCESS_RECORDING_QUEUE_URL,
daily_api_key=settings.DAILY_API_KEY,
public_mode=settings.PUBLIC_MODE,
public_data_retention_days=settings.PUBLIC_DATA_RETENTION_DAYS,
healthcheck_url=settings.HEALTHCHECK_URL,
)

View File

@@ -357,6 +357,7 @@ async def _process_multitrack_recording_inner(
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id,
"source_platform": "daily",
},
additional_metadata={
"transcript_id": transcript.id,
@@ -1068,6 +1069,7 @@ async def reprocess_failed_daily_recordings():
"bucket_name": bucket_name,
"transcript_id": transcript.id,
"room_id": room.id if room else None,
"source_platform": "daily",
},
additional_metadata={
"transcript_id": transcript.id,