Merge branch 'main' into feat/durable

This commit is contained in:
Igor Loskutov
2025-12-20 11:07:04 -05:00
22 changed files with 631 additions and 45 deletions

View File

@@ -1,12 +1,14 @@
"""Tests for LLM parse error recovery using llama-index Workflow"""
from time import monotonic
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from pydantic import BaseModel, Field
from workflows.errors import WorkflowRuntimeError
from workflows.errors import WorkflowRuntimeError, WorkflowTimeoutError
from reflector.llm import LLM, LLMParseError, StructuredOutputWorkflow
from reflector.utils.retry import RetryException
class TestResponse(BaseModel):
@@ -355,3 +357,132 @@ class TestNetworkErrorRetries:
# Only called once - Workflow doesn't retry network errors
assert mock_settings.llm.acomplete.call_count == 1
class TestWorkflowTimeoutRetry:
"""Test timeout retry mechanism in get_structured_response"""
@pytest.mark.asyncio
async def test_timeout_retry_succeeds_on_retry(self, test_settings):
"""Test that WorkflowTimeoutError triggers retry and succeeds"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
call_count = {"count": 0}
async def workflow_run_side_effect(*args, **kwargs):
call_count["count"] += 1
if call_count["count"] == 1:
raise WorkflowTimeoutError("Operation timed out after 120 seconds")
return {
"success": TestResponse(
title="Test", summary="Summary", confidence=0.95
)
}
with (
patch("reflector.llm.StructuredOutputWorkflow") as mock_workflow_class,
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_workflow = MagicMock()
mock_workflow.run = AsyncMock(side_effect=workflow_run_side_effect)
mock_workflow_class.return_value = mock_workflow
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
mock_settings.llm.acomplete = AsyncMock(
return_value=make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
)
result = await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert result.title == "Test"
assert result.summary == "Summary"
assert call_count["count"] == 2
@pytest.mark.asyncio
async def test_timeout_retry_exhausts_after_max_attempts(self, test_settings):
"""Test that timeout retry stops after max attempts"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
call_count = {"count": 0}
async def workflow_run_side_effect(*args, **kwargs):
call_count["count"] += 1
raise WorkflowTimeoutError("Operation timed out after 120 seconds")
with (
patch("reflector.llm.StructuredOutputWorkflow") as mock_workflow_class,
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_workflow = MagicMock()
mock_workflow.run = AsyncMock(side_effect=workflow_run_side_effect)
mock_workflow_class.return_value = mock_workflow
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
mock_settings.llm.acomplete = AsyncMock(
return_value=make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
)
with pytest.raises(RetryException, match="Retry attempts exceeded"):
await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert call_count["count"] == 3
@pytest.mark.asyncio
async def test_timeout_retry_with_backoff(self, test_settings):
"""Test that exponential backoff is applied between retries"""
llm = LLM(settings=test_settings, temperature=0.4, max_tokens=100)
call_times = []
async def workflow_run_side_effect(*args, **kwargs):
call_times.append(monotonic())
if len(call_times) < 3:
raise WorkflowTimeoutError("Operation timed out after 120 seconds")
return {
"success": TestResponse(
title="Test", summary="Summary", confidence=0.95
)
}
with (
patch("reflector.llm.StructuredOutputWorkflow") as mock_workflow_class,
patch("reflector.llm.TreeSummarize") as mock_summarize,
patch("reflector.llm.Settings") as mock_settings,
):
mock_workflow = MagicMock()
mock_workflow.run = AsyncMock(side_effect=workflow_run_side_effect)
mock_workflow_class.return_value = mock_workflow
mock_summarizer = MagicMock()
mock_summarize.return_value = mock_summarizer
mock_summarizer.aget_response = AsyncMock(return_value="Some analysis")
mock_settings.llm.acomplete = AsyncMock(
return_value=make_completion_response(
'{"title": "Test", "summary": "Summary", "confidence": 0.95}'
)
)
result = await llm.get_structured_response(
prompt="Test prompt", texts=["Test text"], output_cls=TestResponse
)
assert result.title == "Test"
if len(call_times) >= 2:
time_between_calls = call_times[1] - call_times[0]
assert (
time_between_calls >= 1.5
), f"Expected ~2s backoff, got {time_between_calls}s"

View File

@@ -266,7 +266,11 @@ async def mock_summary_processor():
# When flush is called, simulate summary generation by calling the callbacks
async def flush_with_callback():
mock_summary.flush_called = True
from reflector.processors.types import FinalLongSummary, FinalShortSummary
from reflector.processors.types import (
ActionItems,
FinalLongSummary,
FinalShortSummary,
)
if hasattr(mock_summary, "_callback"):
await mock_summary._callback(
@@ -276,12 +280,19 @@ async def mock_summary_processor():
await mock_summary._on_short_summary(
FinalShortSummary(short_summary="Test short summary", duration=10.0)
)
if hasattr(mock_summary, "_on_action_items"):
await mock_summary._on_action_items(
ActionItems(action_items={"test": "action item"})
)
mock_summary.flush = flush_with_callback
def init_with_callback(transcript=None, callback=None, on_short_summary=None):
def init_with_callback(
transcript=None, callback=None, on_short_summary=None, on_action_items=None
):
mock_summary._callback = callback
mock_summary._on_short_summary = on_short_summary
mock_summary._on_action_items = on_action_items
return mock_summary
mock_summary_class.side_effect = init_with_callback