mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-02-05 18:36:45 +00:00
fix: restore timeout and fix celery test infrastructure
- Re-add timeout=1.0 to ws_manager pubsub loop (prevents CPU spin) - Use Redis for Celery tests (memory:// broker doesn't support chords) - Add timeout param to in-memory subscriber mock - Remove duplicate celery_includes fixture from rtc_ws tests
This commit is contained in:
@@ -97,9 +97,10 @@ class WebsocketManager:
|
|||||||
|
|
||||||
async def _pubsub_data_reader(self, pubsub_subscriber):
|
async def _pubsub_data_reader(self, pubsub_subscriber):
|
||||||
while True:
|
while True:
|
||||||
# No timeout - global singleton prevents CPU hog from multiple instances
|
# timeout=1.0 prevents tight CPU loop when no messages available
|
||||||
message = await pubsub_subscriber.get_message(
|
message = await pubsub_subscriber.get_message(
|
||||||
ignore_subscribe_messages=True
|
ignore_subscribe_messages=True,
|
||||||
|
timeout=1.0,
|
||||||
)
|
)
|
||||||
if message is not None:
|
if message is not None:
|
||||||
room_id = message["channel"].decode("utf-8")
|
room_id = message["channel"].decode("utf-8")
|
||||||
@@ -109,10 +110,8 @@ class WebsocketManager:
|
|||||||
await socket.send_json(data)
|
await socket.send_json(data)
|
||||||
|
|
||||||
|
|
||||||
# Process-global singleton (not thread-local)
|
# Process-global singleton to ensure only one WebsocketManager instance exists.
|
||||||
# The original threading.local() pattern was broken - it created a NEW
|
# Multiple instances would cause resource leaks and CPU issues.
|
||||||
# threading.local() object on every call, so caching never worked.
|
|
||||||
# This caused infinite ws_manager instances → resource leaks → CPU hog.
|
|
||||||
_ws_manager: WebsocketManager | None = None
|
_ws_manager: WebsocketManager | None = None
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import os
|
import os
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from tempfile import NamedTemporaryFile
|
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@@ -333,10 +332,17 @@ def celery_enable_logging():
|
|||||||
|
|
||||||
@pytest.fixture(scope="session")
|
@pytest.fixture(scope="session")
|
||||||
def celery_config():
|
def celery_config():
|
||||||
with NamedTemporaryFile() as f:
|
# Use Redis for chord/group task execution (memory:// broker doesn't support chords)
|
||||||
|
# Redis must be running - start with: docker compose up -d redis
|
||||||
|
import os
|
||||||
|
|
||||||
|
redis_host = os.environ.get("REDIS_HOST", "localhost")
|
||||||
|
redis_port = os.environ.get("REDIS_PORT", "6379")
|
||||||
|
# Use db 2 to avoid conflicts with main app
|
||||||
|
redis_url = f"redis://{redis_host}:{redis_port}/2"
|
||||||
yield {
|
yield {
|
||||||
"broker_url": "memory://",
|
"broker_url": redis_url,
|
||||||
"result_backend": f"db+sqlite:///{f.name}",
|
"result_backend": redis_url,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -370,9 +376,12 @@ async def ws_manager_in_memory(monkeypatch):
|
|||||||
def __init__(self, queue: asyncio.Queue):
|
def __init__(self, queue: asyncio.Queue):
|
||||||
self.queue = queue
|
self.queue = queue
|
||||||
|
|
||||||
async def get_message(self, ignore_subscribe_messages: bool = True):
|
async def get_message(
|
||||||
|
self, ignore_subscribe_messages: bool = True, timeout: float | None = None
|
||||||
|
):
|
||||||
|
wait_timeout = timeout if timeout is not None else 0.05
|
||||||
try:
|
try:
|
||||||
return await asyncio.wait_for(self.queue.get(), timeout=0.05)
|
return await asyncio.wait_for(self.queue.get(), timeout=wait_timeout)
|
||||||
except Exception:
|
except Exception:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|||||||
@@ -115,9 +115,7 @@ def appserver(tmpdir, setup_database, celery_session_app, celery_session_worker)
|
|||||||
settings.DATA_DIR = DATA_DIR
|
settings.DATA_DIR = DATA_DIR
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session")
|
# Using celery_includes from conftest.py which includes both pipelines
|
||||||
def celery_includes():
|
|
||||||
return ["reflector.pipelines.main_live_pipeline"]
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.usefixtures("setup_database")
|
@pytest.mark.usefixtures("setup_database")
|
||||||
|
|||||||
20
server/uv.lock
generated
20
server/uv.lock
generated
@@ -330,26 +330,6 @@ name = "av"
|
|||||||
version = "14.4.0"
|
version = "14.4.0"
|
||||||
source = { registry = "https://pypi.org/simple" }
|
source = { registry = "https://pypi.org/simple" }
|
||||||
sdist = { url = "https://files.pythonhosted.org/packages/86/f6/0b473dab52dfdea05f28f3578b1c56b6c796ce85e76951bab7c4e38d5a74/av-14.4.0.tar.gz", hash = "sha256:3ecbf803a7fdf67229c0edada0830d6bfaea4d10bfb24f0c3f4e607cd1064b42", size = 3892203 }
|
sdist = { url = "https://files.pythonhosted.org/packages/86/f6/0b473dab52dfdea05f28f3578b1c56b6c796ce85e76951bab7c4e38d5a74/av-14.4.0.tar.gz", hash = "sha256:3ecbf803a7fdf67229c0edada0830d6bfaea4d10bfb24f0c3f4e607cd1064b42", size = 3892203 }
|
||||||
wheels = [
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/18/8a/d57418b686ffd05fabd5a0a9cfa97e63b38c35d7101af00e87c51c8cc43c/av-14.4.0-cp311-cp311-macosx_12_0_arm64.whl", hash = "sha256:5b21d5586a88b9fce0ab78e26bd1c38f8642f8e2aad5b35e619f4d202217c701", size = 19965048 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/f5/aa/3f878b0301efe587e9b07bb773dd6b47ef44ca09a3cffb4af50c08a170f3/av-14.4.0-cp311-cp311-macosx_12_0_x86_64.whl", hash = "sha256:cf8762d90b0f94a20c9f6e25a94f1757db5a256707964dfd0b1d4403e7a16835", size = 23750064 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/9a/b4/6fe94a31f9ed3a927daa72df67c7151968587106f30f9f8fcd792b186633/av-14.4.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c0ac9f08920c7bbe0795319689d901e27cb3d7870b9a0acae3f26fc9daa801a6", size = 33648775 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/6c/f3/7f3130753521d779450c935aec3f4beefc8d4645471159f27b54e896470c/av-14.4.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a56d9ad2afdb638ec0404e962dc570960aae7e08ae331ad7ff70fbe99a6cf40e", size = 32216915 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/f8/9a/8ffabfcafb42154b4b3a67d63f9b69e68fa8c34cb39ddd5cb813dd049ed4/av-14.4.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bed513cbcb3437d0ae47743edc1f5b4a113c0b66cdd4e1aafc533abf5b2fbf2", size = 35287279 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/ad/11/7023ba0a2ca94a57aedf3114ab8cfcecb0819b50c30982a4c5be4d31df41/av-14.4.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d030c2d3647931e53d51f2f6e0fcf465263e7acf9ec6e4faa8dbfc77975318c3", size = 36294683 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/3d/fa/b8ac9636bd5034e2b899354468bef9f4dadb067420a16d8a493a514b7817/av-14.4.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:1cc21582a4f606271d8c2036ec7a6247df0831050306c55cf8a905701d0f0474", size = 34552391 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/fb/29/0db48079c207d1cba7a2783896db5aec3816e17de55942262c244dffbc0f/av-14.4.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:ce7c9cd452153d36f1b1478f904ed5f9ab191d76db873bdd3a597193290805d4", size = 37265250 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/1c/55/715858c3feb7efa4d667ce83a829c8e6ee3862e297fb2b568da3f968639d/av-14.4.0-cp311-cp311-win_amd64.whl", hash = "sha256:fd261e31cc6b43ca722f80656c39934199d8f2eb391e0147e704b6226acebc29", size = 27925845 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/a6/75/b8641653780336c90ba89e5352cac0afa6256a86a150c7703c0b38851c6d/av-14.4.0-cp312-cp312-macosx_12_0_arm64.whl", hash = "sha256:a53e682b239dd23b4e3bc9568cfb1168fc629ab01925fdb2e7556eb426339e94", size = 19954125 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/99/e6/37fe6fa5853a48d54d749526365780a63a4bc530be6abf2115e3a21e292a/av-14.4.0-cp312-cp312-macosx_12_0_x86_64.whl", hash = "sha256:5aa0b901751a32703fa938d2155d56ce3faf3630e4a48d238b35d2f7e49e5395", size = 23751479 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/f7/75/9a5f0e6bda5f513b62bafd1cff2b495441a8b07ab7fb7b8e62f0c0d1683f/av-14.4.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a3b316fed3597675fe2aacfed34e25fc9d5bb0196dc8c0b014ae5ed4adda48de", size = 33801401 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/6a/c9/e4df32a2ad1cb7f3a112d0ed610c5e43c89da80b63c60d60e3dc23793ec0/av-14.4.0-cp312-cp312-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a587b5c5014c3c0e16143a0f8d99874e46b5d0c50db6111aa0b54206b5687c81", size = 32364330 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/ca/f0/64e7444a41817fde49a07d0239c033f7e9280bec4a4bb4784f5c79af95e6/av-14.4.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:10d53f75e8ac1ec8877a551c0db32a83c0aaeae719d05285281eaaba211bbc30", size = 35519508 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/c2/a8/a370099daa9033a3b6f9b9bd815304b3d8396907a14d09845f27467ba138/av-14.4.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:c8558cfde79dd8fc92d97c70e0f0fa8c94c7a66f68ae73afdf58598f0fe5e10d", size = 36448593 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/27/bb/edb6ceff8fa7259cb6330c51dbfbc98dd1912bd6eb5f7bc05a4bb14a9d6e/av-14.4.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:455b6410dea0ab2d30234ffb28df7d62ca3cdf10708528e247bec3a4cdcced09", size = 34701485 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/a7/8a/957da1f581aa1faa9a5dfa8b47ca955edb47f2b76b949950933b457bfa1d/av-14.4.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:1661efbe9d975f927b8512d654704223d936f39016fad2ddab00aee7c40f412c", size = 37521981 },
|
|
||||||
{ url = "https://files.pythonhosted.org/packages/28/76/3f1cf0568592f100fd68eb40ed8c491ce95ca3c1378cc2d4c1f6d1bd295d/av-14.4.0-cp312-cp312-win_amd64.whl", hash = "sha256:fbbeef1f421a3461086853d6464ad5526b56ffe8ccb0ab3fd0a1f121dfbf26ad", size = 27925944 },
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "banks"
|
name = "banks"
|
||||||
|
|||||||
Reference in New Issue
Block a user