From 2516d4085f333d6a0b04aca99122269078cd2357 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Wed, 16 Jul 2025 21:09:51 -0600 Subject: [PATCH] fix: postgres database not connecting in worker (#492) stacks-reflector-worker-1 | [2025-07-17 02:18:21,234: ERROR/ForkPoolWorker-2] Task reflector.worker.process.process_meetings[8e763caf-be8a-4272-8793-7b918e4e3922] raised unexpected: AssertionError('DatabaseBackend is not running') stacks-reflector-worker-1 | Traceback (most recent call last): stacks-reflector-worker-1 | File "/app/.venv/lib/python3.12/site-packages/celery/app/trace.py", line 453, in trace_task stacks-reflector-worker-1 | R = retval = fun(*args, **kwargs) stacks-reflector-worker-1 | ^^^^^^^^^^^^^^^^^^^^ stacks-reflector-worker-1 | File "/app/.venv/lib/python3.12/site-packages/celery/app/trace.py", line 736, in __protected_call__ stacks-reflector-worker-1 | return self.run(*args, **kwargs) stacks-reflector-worker-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^ stacks-reflector-worker-1 | File "/app/reflector/pipelines/main_live_pipeline.py", line 81, in wrapper stacks-reflector-worker-1 | return asyncio.run(coro) stacks-reflector-worker-1 | ^^^^^^^^^^^^^^^^^ stacks-reflector-worker-1 | File "/usr/local/lib/python3.12/asyncio/runners.py", line 195, in run stacks-reflector-worker-1 | return runner.run(main) stacks-reflector-worker-1 | ^^^^^^^^^^^^^^^^ stacks-reflector-worker-1 | File "/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run stacks-reflector-worker-1 | return self._loop.run_until_complete(task) stacks-reflector-worker-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ stacks-reflector-worker-1 | File "/usr/local/lib/python3.12/asyncio/base_events.py", line 691, in run_until_complete stacks-reflector-worker-1 | return future.result() stacks-reflector-worker-1 | ^^^^^^^^^^^^^^^ stacks-reflector-worker-1 | File "/app/reflector/worker/process.py", line 139, in process_meetings stacks-reflector-worker-1 | meetings = await meetings_controller.get_all_active() stacks-reflector-worker-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ stacks-reflector-worker-1 | File "/app/reflector/db/meetings.py", line 121, in get_all_active stacks-reflector-worker-1 | return await database.fetch_all(query) stacks-reflector-worker-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ stacks-reflector-worker-1 | File "/app/.venv/lib/python3.12/site-packages/databases/core.py", line 173, in fetch_all stacks-reflector-worker-1 | async with self.connection() as connection: stacks-reflector-worker-1 | ^^^^^^^^^^^^^^^^^ stacks-reflector-worker-1 | File "/app/.venv/lib/python3.12/site-packages/databases/core.py", line 267, in __aenter__ stacks-reflector-worker-1 | raise e stacks-reflector-worker-1 | File "/app/.venv/lib/python3.12/site-packages/databases/core.py", line 264, in __aenter__ stacks-reflector-worker-1 | await self._connection.acquire() stacks-reflector-worker-1 | File "/app/.venv/lib/python3.12/site-packages/databases/backends/postgres.py", line 169, in acquire stacks-reflector-worker-1 | assert self._database._pool is not None, "DatabaseBackend is not running" stacks-reflector-worker-1 | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ stacks-reflector-worker-1 | AssertionError: DatabaseBackend is not running --- .../reflector/pipelines/main_live_pipeline.py | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/server/reflector/pipelines/main_live_pipeline.py b/server/reflector/pipelines/main_live_pipeline.py index aca28586..3a4c36be 100644 --- a/server/reflector/pipelines/main_live_pipeline.py +++ b/server/reflector/pipelines/main_live_pipeline.py @@ -15,9 +15,10 @@ import asyncio import functools from contextlib import asynccontextmanager +import boto3 from celery import chord, group, shared_task from pydantic import BaseModel -from reflector.db.meetings import meetings_controller +from reflector.db.meetings import meeting_consent_controller, meetings_controller from reflector.db.recordings import recordings_controller from reflector.db.rooms import rooms_controller from reflector.db.transcripts import ( @@ -53,25 +54,29 @@ from reflector.processors.types import ( ) from reflector.processors.types import Transcript as TranscriptProcessorType from reflector.settings import settings +from reflector.storage import get_transcripts_storage from reflector.ws_manager import WebsocketManager, get_ws_manager from reflector.zulip import ( get_zulip_message, send_message_to_zulip, update_zulip_message, ) - -from reflector.db.meetings import meeting_consent_controller -from reflector.storage import get_transcripts_storage - -import boto3 - from structlog import BoundLogger as Logger def asynctask(f): @functools.wraps(f) def wrapper(*args, **kwargs): - coro = f(*args, **kwargs) + async def run_with_db(): + from reflector.db import database + + await database.connect() + try: + return await f(*args, **kwargs) + finally: + await database.disconnect() + + coro = run_with_db() try: loop = asyncio.get_running_loop() except RuntimeError: @@ -595,7 +600,6 @@ async def cleanup_consent(transcript: Transcript, logger: Logger): logger.info("Consent denied, cleaning up all related audio files") if recording and recording.bucket_name and recording.object_key: - s3_whereby = boto3.client( "s3", aws_access_key_id=settings.AWS_WHEREBY_ACCESS_KEY_ID, @@ -615,7 +619,6 @@ async def cleanup_consent(transcript: Transcript, logger: Logger): await transcripts_controller.update(transcript, {"audio_deleted": True}) # 2. Delete processed audio from transcript storage S3 bucket if transcript.audio_location == "storage": - storage = get_transcripts_storage() try: await storage.delete_file(transcript.storage_audio_path)