mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 20:29:06 +00:00
Implement retry that automatically detect httpx and backoff (#119)
* server: implement retry that automatically detect httpx and backoff Closes #118 * server: fix formatting
This commit is contained in:
54
server/poetry.lock
generated
54
server/poetry.lock
generated
@@ -1851,6 +1851,24 @@ pytest = ">=7.0.0"
|
|||||||
docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"]
|
docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"]
|
||||||
testing = ["coverage (>=6.2)", "flaky (>=3.5.0)", "hypothesis (>=5.7.1)", "mypy (>=0.931)", "pytest-trio (>=0.7.0)"]
|
testing = ["coverage (>=6.2)", "flaky (>=3.5.0)", "hypothesis (>=5.7.1)", "mypy (>=0.931)", "pytest-trio (>=0.7.0)"]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pytest-httpx"
|
||||||
|
version = "0.23.1"
|
||||||
|
description = "Send responses to httpx."
|
||||||
|
optional = false
|
||||||
|
python-versions = ">=3.9"
|
||||||
|
files = [
|
||||||
|
{file = "pytest_httpx-0.23.1-py3-none-any.whl", hash = "sha256:ba38a9e6c685d3cf6197551a79bf7e41f8bbc57a6d1de65b537f77e87f56ecd3"},
|
||||||
|
{file = "pytest_httpx-0.23.1.tar.gz", hash = "sha256:cfed19eb8b13cbdf464bbb1c4ef88717d88d42334aa9ce516e56e46975c77f74"},
|
||||||
|
]
|
||||||
|
|
||||||
|
[package.dependencies]
|
||||||
|
httpx = "==0.24.*"
|
||||||
|
pytest = ">=6.0,<8.0"
|
||||||
|
|
||||||
|
[package.extras]
|
||||||
|
testing = ["pytest-asyncio (==0.21.*)", "pytest-cov (==4.*)"]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "python-dateutil"
|
name = "python-dateutil"
|
||||||
version = "2.8.2"
|
version = "2.8.2"
|
||||||
@@ -2042,26 +2060,6 @@ files = [
|
|||||||
{file = "sortedcontainers-2.4.0.tar.gz", hash = "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88"},
|
{file = "sortedcontainers-2.4.0.tar.gz", hash = "sha256:25caa5a06cc30b6b83d11423433f65d1f9d76c4c6a0c90e3379eaa43b9bfdb88"},
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "stamina"
|
|
||||||
version = "23.1.0"
|
|
||||||
description = "Production-grade retries made easy."
|
|
||||||
optional = false
|
|
||||||
python-versions = ">=3.8"
|
|
||||||
files = [
|
|
||||||
{file = "stamina-23.1.0-py3-none-any.whl", hash = "sha256:850de8c2c2469aabf42a4c02e7372eaa12c2eced78f2bfa34162b8676c2846e5"},
|
|
||||||
{file = "stamina-23.1.0.tar.gz", hash = "sha256:b16ce3d52d658aa75db813fc6a6661b770abfea915f72cda48e325f2a7854786"},
|
|
||||||
]
|
|
||||||
|
|
||||||
[package.dependencies]
|
|
||||||
tenacity = "*"
|
|
||||||
|
|
||||||
[package.extras]
|
|
||||||
dev = ["nox", "prometheus-client", "stamina[tests,typing]", "structlog", "tomli"]
|
|
||||||
docs = ["furo", "myst-parser", "prometheus-client", "sphinx", "sphinx-notfound-page", "structlog"]
|
|
||||||
tests = ["pytest", "pytest-asyncio"]
|
|
||||||
typing = ["mypy (>=1.4)"]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "starlette"
|
name = "starlette"
|
||||||
version = "0.27.0"
|
version = "0.27.0"
|
||||||
@@ -2110,20 +2108,6 @@ files = [
|
|||||||
[package.dependencies]
|
[package.dependencies]
|
||||||
mpmath = ">=0.19"
|
mpmath = ">=0.19"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "tenacity"
|
|
||||||
version = "8.2.2"
|
|
||||||
description = "Retry code until it succeeds"
|
|
||||||
optional = false
|
|
||||||
python-versions = ">=3.6"
|
|
||||||
files = [
|
|
||||||
{file = "tenacity-8.2.2-py3-none-any.whl", hash = "sha256:2f277afb21b851637e8f52e6a613ff08734c347dc19ade928e519d7d2d8569b0"},
|
|
||||||
{file = "tenacity-8.2.2.tar.gz", hash = "sha256:43af037822bd0029025877f3b2d97cc4d7bb0c2991000a3d59d71517c5c969e0"},
|
|
||||||
]
|
|
||||||
|
|
||||||
[package.extras]
|
|
||||||
doc = ["reno", "sphinx", "tornado (>=4.5)"]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokenizers"
|
name = "tokenizers"
|
||||||
version = "0.13.3"
|
version = "0.13.3"
|
||||||
@@ -2595,4 +2579,4 @@ multidict = ">=4.0"
|
|||||||
[metadata]
|
[metadata]
|
||||||
lock-version = "2.0"
|
lock-version = "2.0"
|
||||||
python-versions = "^3.11"
|
python-versions = "^3.11"
|
||||||
content-hash = "1a98a080ce035b381521426c9d6f9f80e8656258beab6cdff95ea90cf6c77e85"
|
content-hash = "b6097887e0343a553bec5519aec6ecf345796e27d4a0f0f4abf8cd51e56a24eb"
|
||||||
|
|||||||
@@ -30,13 +30,13 @@ black = "^23.7.0"
|
|||||||
|
|
||||||
[tool.poetry.group.client.dependencies]
|
[tool.poetry.group.client.dependencies]
|
||||||
pyaudio = "^0.2.13"
|
pyaudio = "^0.2.13"
|
||||||
stamina = "^23.1.0"
|
|
||||||
|
|
||||||
|
|
||||||
[tool.poetry.group.tests.dependencies]
|
[tool.poetry.group.tests.dependencies]
|
||||||
pytest-aiohttp = "^1.0.4"
|
pytest-aiohttp = "^1.0.4"
|
||||||
pytest-asyncio = "^0.21.1"
|
pytest-asyncio = "^0.21.1"
|
||||||
pytest = "^7.4.0"
|
pytest = "^7.4.0"
|
||||||
|
pytest-httpx = "^0.23.1"
|
||||||
|
|
||||||
|
|
||||||
[tool.poetry.group.aws.dependencies]
|
[tool.poetry.group.aws.dependencies]
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ class BananaLLM(LLM):
|
|||||||
headers=self.headers,
|
headers=self.headers,
|
||||||
json={"prompt": prompt},
|
json={"prompt": prompt},
|
||||||
timeout=self.timeout,
|
timeout=self.timeout,
|
||||||
|
retry_timeout=300, # as per their sdk
|
||||||
)
|
)
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
text = response.json()["text"]
|
text = response.json()["text"]
|
||||||
|
|||||||
@@ -21,7 +21,6 @@ class OpenAILLM(LLM):
|
|||||||
"Authorization": f"Bearer {self.openai_key}",
|
"Authorization": f"Bearer {self.openai_key}",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
async with httpx.AsyncClient(timeout=self.timeout) as client:
|
||||||
response = await client.post(
|
response = await client.post(
|
||||||
self.openai_url,
|
self.openai_url,
|
||||||
|
|||||||
@@ -1,27 +1,82 @@
|
|||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
|
from time import monotonic
|
||||||
|
from httpx import HTTPStatusError, Response
|
||||||
|
from random import random
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|
||||||
|
|
||||||
|
class RetryException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class RetryTimeoutException(RetryException):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class RetryHTTPException(RetryException):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
def retry(fn):
|
def retry(fn):
|
||||||
async def decorated(*args, **kwargs):
|
async def decorated(*args, **kwargs):
|
||||||
retry_max = kwargs.pop("retry_max", 5)
|
retry_attempts = kwargs.pop("retry_attempts", None)
|
||||||
retry_delay = kwargs.pop("retry_delay", 2)
|
retry_timeout = kwargs.pop("retry_timeout", 60)
|
||||||
retry_ignore_exc_types = kwargs.pop("retry_ignore_exc_types", ())
|
retry_backoff_interval = kwargs.pop("retry_backoff_interval", 0.1)
|
||||||
|
retry_jitter = kwargs.pop("retry_jitter", 0.1)
|
||||||
|
retry_backoff_max = kwargs.pop("retry_backoff_max", 3)
|
||||||
|
retry_httpx_status_stop = kwargs.pop(
|
||||||
|
"retry_httpx_status_stop",
|
||||||
|
(
|
||||||
|
401, # auth issue
|
||||||
|
404, # not found
|
||||||
|
413, # payload too large
|
||||||
|
418, # teapot
|
||||||
|
),
|
||||||
|
)
|
||||||
|
retry_ignore_exc_types = kwargs.pop("retry_ignore_exc_types", (Exception,))
|
||||||
|
|
||||||
result = None
|
result = None
|
||||||
attempt = 0
|
|
||||||
last_exception = None
|
last_exception = None
|
||||||
for attempt in range(retry_max):
|
attempts = 0
|
||||||
|
start = monotonic()
|
||||||
|
fn_name = fn.__name__
|
||||||
|
|
||||||
|
# goal: retry until timeout
|
||||||
|
while True:
|
||||||
|
if monotonic() - start > retry_timeout:
|
||||||
|
raise RetryTimeoutException()
|
||||||
|
|
||||||
|
jitter = random() * retry_jitter
|
||||||
|
retry_backoff_interval = min(
|
||||||
|
retry_backoff_interval * 2 + jitter, retry_backoff_max
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
result = await fn(*args, **kwargs)
|
result = await fn(*args, **kwargs)
|
||||||
|
if isinstance(result, Response):
|
||||||
|
result.raise_for_status()
|
||||||
if result:
|
if result:
|
||||||
return result
|
return result
|
||||||
|
except HTTPStatusError as e:
|
||||||
|
status_code = e.response.status_code
|
||||||
|
logger.debug(f"HTTP status {status_code} - {e}")
|
||||||
|
if status_code in retry_httpx_status_stop:
|
||||||
|
message = f"HTTP status {status_code} is in retry_httpx_status_stop"
|
||||||
|
raise RetryHTTPException(message) from e
|
||||||
except retry_ignore_exc_types as e:
|
except retry_ignore_exc_types as e:
|
||||||
last_exception = e
|
last_exception = e
|
||||||
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"Retrying {fn} - in {retry_delay} seconds "
|
f"Retrying {fn_name} - in {retry_backoff_interval:.1f}s "
|
||||||
f"- attempt {attempt + 1}/{retry_max}"
|
f"({monotonic() - start:.1f}s / {retry_timeout:.1f}s)"
|
||||||
)
|
)
|
||||||
await asyncio.sleep(retry_delay)
|
attempts += 1
|
||||||
|
|
||||||
|
if retry_attempts is not None and attempts >= retry_attempts:
|
||||||
|
raise RetryException(f"Retry attempts exceeded: {retry_attempts}")
|
||||||
|
|
||||||
|
await asyncio.sleep(retry_backoff_interval)
|
||||||
|
|
||||||
if last_exception is not None:
|
if last_exception is not None:
|
||||||
raise type(last_exception) from last_exception
|
raise type(last_exception) from last_exception
|
||||||
return result
|
return result
|
||||||
|
|||||||
55
server/tests/test_retry_decorator.py
Normal file
55
server/tests/test_retry_decorator.py
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
import pytest
|
||||||
|
import httpx
|
||||||
|
from reflector.utils.retry import (
|
||||||
|
retry,
|
||||||
|
RetryTimeoutException,
|
||||||
|
RetryHTTPException,
|
||||||
|
RetryException,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_retry_httpx(httpx_mock):
|
||||||
|
# this code should be force a retry
|
||||||
|
httpx_mock.add_response(status_code=500)
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
with pytest.raises(RetryTimeoutException):
|
||||||
|
await retry(client.get)("https://test_url", retry_timeout=0.1)
|
||||||
|
|
||||||
|
# but if we add it in the retry_httpx_status_stop, it should not retry
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
with pytest.raises(RetryHTTPException):
|
||||||
|
await retry(client.get)(
|
||||||
|
"https://test_url", retry_timeout=5, retry_httpx_status_stop=[500]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_retry_normal():
|
||||||
|
left = 3
|
||||||
|
|
||||||
|
async def retry_before_success():
|
||||||
|
nonlocal left
|
||||||
|
if left > 0:
|
||||||
|
left -= 1
|
||||||
|
raise Exception("test")
|
||||||
|
return True
|
||||||
|
|
||||||
|
result = await retry(retry_before_success)()
|
||||||
|
assert result is True
|
||||||
|
assert left == 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_retry_max_attempts():
|
||||||
|
left = 3
|
||||||
|
|
||||||
|
async def retry_before_success():
|
||||||
|
nonlocal left
|
||||||
|
if left > 0:
|
||||||
|
left -= 1
|
||||||
|
raise Exception("test")
|
||||||
|
return True
|
||||||
|
|
||||||
|
with pytest.raises(RetryException):
|
||||||
|
await retry(retry_before_success)(retry_attempts=2)
|
||||||
Reference in New Issue
Block a user