fix: postgres database not connecting in worker (#492)

stacks-reflector-worker-1  | [2025-07-17 02:18:21,234:
ERROR/ForkPoolWorker-2] Task
reflector.worker.process.process_meetings[8e763caf-be8a-4272-8793-7b918e4e3922]
raised unexpected: AssertionError('DatabaseBackend is not running')
stacks-reflector-worker-1  | Traceback (most recent call last):
stacks-reflector-worker-1  |   File
"/app/.venv/lib/python3.12/site-packages/celery/app/trace.py", line 453,
in trace_task
stacks-reflector-worker-1  |     R = retval = fun(*args, **kwargs)
stacks-reflector-worker-1  |                  ^^^^^^^^^^^^^^^^^^^^
stacks-reflector-worker-1  |   File
"/app/.venv/lib/python3.12/site-packages/celery/app/trace.py", line 736,
in __protected_call__
stacks-reflector-worker-1  |     return self.run(*args, **kwargs)
stacks-reflector-worker-1  |            ^^^^^^^^^^^^^^^^^^^^^^^^^
stacks-reflector-worker-1  |   File
"/app/reflector/pipelines/main_live_pipeline.py", line 81, in wrapper
stacks-reflector-worker-1  |     return asyncio.run(coro)
stacks-reflector-worker-1  |            ^^^^^^^^^^^^^^^^^
stacks-reflector-worker-1  |   File
"/usr/local/lib/python3.12/asyncio/runners.py", line 195, in run
stacks-reflector-worker-1  |     return runner.run(main)
stacks-reflector-worker-1  |            ^^^^^^^^^^^^^^^^
stacks-reflector-worker-1  |   File
"/usr/local/lib/python3.12/asyncio/runners.py", line 118, in run
stacks-reflector-worker-1  |     return
self._loop.run_until_complete(task)
stacks-reflector-worker-1  |
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
stacks-reflector-worker-1  |   File
"/usr/local/lib/python3.12/asyncio/base_events.py", line 691, in
run_until_complete
stacks-reflector-worker-1  |     return future.result()
stacks-reflector-worker-1  |            ^^^^^^^^^^^^^^^
stacks-reflector-worker-1  |   File "/app/reflector/worker/process.py",
line 139, in process_meetings
stacks-reflector-worker-1  |     meetings = await
meetings_controller.get_all_active()
stacks-reflector-worker-1  |
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
stacks-reflector-worker-1  |   File "/app/reflector/db/meetings.py",
line 121, in get_all_active
stacks-reflector-worker-1  |     return await database.fetch_all(query)
stacks-reflector-worker-1  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
stacks-reflector-worker-1  |   File
"/app/.venv/lib/python3.12/site-packages/databases/core.py", line 173,
in fetch_all
stacks-reflector-worker-1  |     async with self.connection() as
connection:
stacks-reflector-worker-1  |                ^^^^^^^^^^^^^^^^^
stacks-reflector-worker-1  |   File
"/app/.venv/lib/python3.12/site-packages/databases/core.py", line 267,
in __aenter__
stacks-reflector-worker-1  |     raise e
stacks-reflector-worker-1  |   File
"/app/.venv/lib/python3.12/site-packages/databases/core.py", line 264,
in __aenter__
stacks-reflector-worker-1  |     await self._connection.acquire()
stacks-reflector-worker-1  |   File
"/app/.venv/lib/python3.12/site-packages/databases/backends/postgres.py",
line 169, in acquire
stacks-reflector-worker-1  |     assert self._database._pool is not
None, "DatabaseBackend is not running"
stacks-reflector-worker-1  |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
stacks-reflector-worker-1  | AssertionError: DatabaseBackend is not
running
This commit is contained in:
2025-07-16 21:09:51 -06:00
committed by GitHub
parent 4d21fd1754
commit 2516d4085f

View File

@@ -15,9 +15,10 @@ import asyncio
import functools import functools
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
import boto3
from celery import chord, group, shared_task from celery import chord, group, shared_task
from pydantic import BaseModel from pydantic import BaseModel
from reflector.db.meetings import meetings_controller from reflector.db.meetings import meeting_consent_controller, meetings_controller
from reflector.db.recordings import recordings_controller from reflector.db.recordings import recordings_controller
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import ( from reflector.db.transcripts import (
@@ -53,25 +54,29 @@ from reflector.processors.types import (
) )
from reflector.processors.types import Transcript as TranscriptProcessorType from reflector.processors.types import Transcript as TranscriptProcessorType
from reflector.settings import settings from reflector.settings import settings
from reflector.storage import get_transcripts_storage
from reflector.ws_manager import WebsocketManager, get_ws_manager from reflector.ws_manager import WebsocketManager, get_ws_manager
from reflector.zulip import ( from reflector.zulip import (
get_zulip_message, get_zulip_message,
send_message_to_zulip, send_message_to_zulip,
update_zulip_message, update_zulip_message,
) )
from reflector.db.meetings import meeting_consent_controller
from reflector.storage import get_transcripts_storage
import boto3
from structlog import BoundLogger as Logger from structlog import BoundLogger as Logger
def asynctask(f): def asynctask(f):
@functools.wraps(f) @functools.wraps(f)
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
coro = f(*args, **kwargs) async def run_with_db():
from reflector.db import database
await database.connect()
try:
return await f(*args, **kwargs)
finally:
await database.disconnect()
coro = run_with_db()
try: try:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
except RuntimeError: except RuntimeError:
@@ -595,7 +600,6 @@ async def cleanup_consent(transcript: Transcript, logger: Logger):
logger.info("Consent denied, cleaning up all related audio files") logger.info("Consent denied, cleaning up all related audio files")
if recording and recording.bucket_name and recording.object_key: if recording and recording.bucket_name and recording.object_key:
s3_whereby = boto3.client( s3_whereby = boto3.client(
"s3", "s3",
aws_access_key_id=settings.AWS_WHEREBY_ACCESS_KEY_ID, aws_access_key_id=settings.AWS_WHEREBY_ACCESS_KEY_ID,
@@ -615,7 +619,6 @@ async def cleanup_consent(transcript: Transcript, logger: Logger):
await transcripts_controller.update(transcript, {"audio_deleted": True}) await transcripts_controller.update(transcript, {"audio_deleted": True})
# 2. Delete processed audio from transcript storage S3 bucket # 2. Delete processed audio from transcript storage S3 bucket
if transcript.audio_location == "storage": if transcript.audio_location == "storage":
storage = get_transcripts_storage() storage = get_transcripts_storage()
try: try:
await storage.delete_file(transcript.storage_audio_path) await storage.delete_file(transcript.storage_audio_path)