mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-03-27 01:16:46 +00:00
test: enhance Task 1+3 tests — production DAG, throttling
Add 4 new tests to test_dag_progress.py: - test_production_dag_shape: Real 15-task pipeline topology with mixed statuses, verifying all tasks present, topological order invariant, and correct parent relationships (e.g. finalize has 4 parents) - test_topological_sort_invariant_complex_dag: 7-node DAG with wide branching/merging to stress-test that all parents precede children - test_logging_throttled_by_interval: Mocks time.monotonic to verify ctx.log() is throttled by interval while broadcasts are not - test_uses_broadcast_event_not_append_event_and_broadcast: Verifies progress uses transient broadcast_event, not persisted append variant
This commit is contained in:
@@ -245,6 +245,211 @@ class TestExtractDagTasksTopology:
|
||||
assert names[0] == "task_a"
|
||||
assert names[1] == "task_b"
|
||||
|
||||
def test_production_dag_shape(self):
|
||||
"""Test the real 15-task pipeline topology with mixed statuses.
|
||||
|
||||
Simulates a mid-pipeline state where early tasks completed,
|
||||
middle tasks running, and later tasks still queued.
|
||||
"""
|
||||
# Production DAG edges (parent -> children):
|
||||
# get_recording -> get_participants
|
||||
# get_participants -> process_tracks
|
||||
# process_tracks -> mixdown_tracks, detect_topics, finalize
|
||||
# mixdown_tracks -> generate_waveform
|
||||
# detect_topics -> generate_title, extract_subjects
|
||||
# extract_subjects -> process_subjects, identify_action_items
|
||||
# process_subjects -> generate_recap
|
||||
# generate_title -> finalize
|
||||
# generate_recap -> finalize
|
||||
# identify_action_items -> finalize
|
||||
# finalize -> cleanup_consent
|
||||
# cleanup_consent -> post_zulip, send_webhook
|
||||
shape = [
|
||||
_make_shape_item(
|
||||
"s_get_recording", TaskName.GET_RECORDING, ["s_get_participants"]
|
||||
),
|
||||
_make_shape_item(
|
||||
"s_get_participants", TaskName.GET_PARTICIPANTS, ["s_process_tracks"]
|
||||
),
|
||||
_make_shape_item(
|
||||
"s_process_tracks",
|
||||
TaskName.PROCESS_TRACKS,
|
||||
["s_mixdown_tracks", "s_detect_topics", "s_finalize"],
|
||||
),
|
||||
_make_shape_item(
|
||||
"s_mixdown_tracks", TaskName.MIXDOWN_TRACKS, ["s_generate_waveform"]
|
||||
),
|
||||
_make_shape_item("s_generate_waveform", TaskName.GENERATE_WAVEFORM),
|
||||
_make_shape_item(
|
||||
"s_detect_topics",
|
||||
TaskName.DETECT_TOPICS,
|
||||
["s_generate_title", "s_extract_subjects"],
|
||||
),
|
||||
_make_shape_item(
|
||||
"s_generate_title", TaskName.GENERATE_TITLE, ["s_finalize"]
|
||||
),
|
||||
_make_shape_item(
|
||||
"s_extract_subjects",
|
||||
TaskName.EXTRACT_SUBJECTS,
|
||||
["s_process_subjects", "s_identify_action_items"],
|
||||
),
|
||||
_make_shape_item(
|
||||
"s_process_subjects", TaskName.PROCESS_SUBJECTS, ["s_generate_recap"]
|
||||
),
|
||||
_make_shape_item(
|
||||
"s_generate_recap", TaskName.GENERATE_RECAP, ["s_finalize"]
|
||||
),
|
||||
_make_shape_item(
|
||||
"s_identify_action_items",
|
||||
TaskName.IDENTIFY_ACTION_ITEMS,
|
||||
["s_finalize"],
|
||||
),
|
||||
_make_shape_item("s_finalize", TaskName.FINALIZE, ["s_cleanup_consent"]),
|
||||
_make_shape_item(
|
||||
"s_cleanup_consent",
|
||||
TaskName.CLEANUP_CONSENT,
|
||||
["s_post_zulip", "s_send_webhook"],
|
||||
),
|
||||
_make_shape_item("s_post_zulip", TaskName.POST_ZULIP),
|
||||
_make_shape_item("s_send_webhook", TaskName.SEND_WEBHOOK),
|
||||
]
|
||||
|
||||
# Mid-pipeline: early tasks done, middle running, later queued
|
||||
tasks = [
|
||||
_make_task_summary("s_get_recording", status="COMPLETED"),
|
||||
_make_task_summary("s_get_participants", status="COMPLETED"),
|
||||
_make_task_summary("s_process_tracks", status="COMPLETED"),
|
||||
_make_task_summary("s_mixdown_tracks", status="RUNNING"),
|
||||
_make_task_summary("s_generate_waveform", status="QUEUED"),
|
||||
_make_task_summary("s_detect_topics", status="RUNNING"),
|
||||
_make_task_summary("s_generate_title", status="QUEUED"),
|
||||
_make_task_summary("s_extract_subjects", status="QUEUED"),
|
||||
_make_task_summary("s_process_subjects", status="QUEUED"),
|
||||
_make_task_summary("s_generate_recap", status="QUEUED"),
|
||||
_make_task_summary("s_identify_action_items", status="QUEUED"),
|
||||
_make_task_summary("s_finalize", status="QUEUED"),
|
||||
_make_task_summary("s_cleanup_consent", status="QUEUED"),
|
||||
_make_task_summary("s_post_zulip", status="QUEUED"),
|
||||
_make_task_summary("s_send_webhook", status="QUEUED"),
|
||||
]
|
||||
details = _make_details(shape, tasks)
|
||||
|
||||
result = extract_dag_tasks(details)
|
||||
|
||||
# All 15 tasks present
|
||||
assert len(result) == 15
|
||||
result_names = [t.name for t in result]
|
||||
assert set(result_names) == {
|
||||
TaskName.GET_RECORDING,
|
||||
TaskName.GET_PARTICIPANTS,
|
||||
TaskName.PROCESS_TRACKS,
|
||||
TaskName.MIXDOWN_TRACKS,
|
||||
TaskName.GENERATE_WAVEFORM,
|
||||
TaskName.DETECT_TOPICS,
|
||||
TaskName.GENERATE_TITLE,
|
||||
TaskName.EXTRACT_SUBJECTS,
|
||||
TaskName.PROCESS_SUBJECTS,
|
||||
TaskName.GENERATE_RECAP,
|
||||
TaskName.IDENTIFY_ACTION_ITEMS,
|
||||
TaskName.FINALIZE,
|
||||
TaskName.CLEANUP_CONSENT,
|
||||
TaskName.POST_ZULIP,
|
||||
TaskName.SEND_WEBHOOK,
|
||||
}
|
||||
|
||||
# Topological order invariant: no task appears before its parents
|
||||
name_to_index = {t.name: i for i, t in enumerate(result)}
|
||||
for task in result:
|
||||
for parent_name in task.parents:
|
||||
assert name_to_index[parent_name] < name_to_index[task.name], (
|
||||
f"Parent {parent_name} (idx {name_to_index[parent_name]}) "
|
||||
f"must appear before {task.name} (idx {name_to_index[task.name]})"
|
||||
)
|
||||
|
||||
# finalize has exactly 4 parents
|
||||
finalize = next(t for t in result if t.name == TaskName.FINALIZE)
|
||||
assert set(finalize.parents) == {
|
||||
TaskName.PROCESS_TRACKS,
|
||||
TaskName.GENERATE_TITLE,
|
||||
TaskName.GENERATE_RECAP,
|
||||
TaskName.IDENTIFY_ACTION_ITEMS,
|
||||
}
|
||||
|
||||
# cleanup_consent has 1 parent (finalize)
|
||||
cleanup = next(t for t in result if t.name == TaskName.CLEANUP_CONSENT)
|
||||
assert cleanup.parents == [TaskName.FINALIZE]
|
||||
|
||||
# post_zulip and send_webhook both have cleanup_consent as parent
|
||||
post_zulip = next(t for t in result if t.name == TaskName.POST_ZULIP)
|
||||
send_webhook = next(t for t in result if t.name == TaskName.SEND_WEBHOOK)
|
||||
assert post_zulip.parents == [TaskName.CLEANUP_CONSENT]
|
||||
assert send_webhook.parents == [TaskName.CLEANUP_CONSENT]
|
||||
|
||||
# Verify statuses propagated correctly
|
||||
assert (
|
||||
next(t for t in result if t.name == TaskName.GET_RECORDING).status
|
||||
== DagTaskStatus.COMPLETED
|
||||
)
|
||||
assert (
|
||||
next(t for t in result if t.name == TaskName.MIXDOWN_TRACKS).status
|
||||
== DagTaskStatus.RUNNING
|
||||
)
|
||||
assert (
|
||||
next(t for t in result if t.name == TaskName.FINALIZE).status
|
||||
== DagTaskStatus.QUEUED
|
||||
)
|
||||
|
||||
def test_topological_sort_invariant_complex_dag(self):
|
||||
"""For a complex DAG, every task's parents appear earlier in the list.
|
||||
|
||||
Uses a wider branching/merging DAG than diamond to stress the invariant.
|
||||
"""
|
||||
# DAG: A -> B, A -> C, A -> D, B -> E, C -> E, C -> F, D -> F, E -> G, F -> G
|
||||
shape = [
|
||||
_make_shape_item("s_a", "task_a", ["s_b", "s_c", "s_d"]),
|
||||
_make_shape_item("s_b", "task_b", ["s_e"]),
|
||||
_make_shape_item("s_c", "task_c", ["s_e", "s_f"]),
|
||||
_make_shape_item("s_d", "task_d", ["s_f"]),
|
||||
_make_shape_item("s_e", "task_e", ["s_g"]),
|
||||
_make_shape_item("s_f", "task_f", ["s_g"]),
|
||||
_make_shape_item("s_g", "task_g"),
|
||||
]
|
||||
tasks = [
|
||||
_make_task_summary("s_a", status="COMPLETED"),
|
||||
_make_task_summary("s_b", status="COMPLETED"),
|
||||
_make_task_summary("s_c", status="RUNNING"),
|
||||
_make_task_summary("s_d", status="COMPLETED"),
|
||||
_make_task_summary("s_e", status="QUEUED"),
|
||||
_make_task_summary("s_f", status="QUEUED"),
|
||||
_make_task_summary("s_g", status="QUEUED"),
|
||||
]
|
||||
details = _make_details(shape, tasks)
|
||||
|
||||
result = extract_dag_tasks(details)
|
||||
|
||||
assert len(result) == 7
|
||||
name_to_index = {t.name: i for i, t in enumerate(result)}
|
||||
|
||||
# Verify invariant: every parent appears before its child
|
||||
for task in result:
|
||||
for parent_name in task.parents:
|
||||
assert name_to_index[parent_name] < name_to_index[task.name], (
|
||||
f"Parent {parent_name} (idx {name_to_index[parent_name]}) "
|
||||
f"must appear before {task.name} (idx {name_to_index[task.name]})"
|
||||
)
|
||||
|
||||
# task_g has 2 parents
|
||||
task_g = next(t for t in result if t.name == "task_g")
|
||||
assert set(task_g.parents) == {"task_e", "task_f"}
|
||||
|
||||
# task_e has 2 parents
|
||||
task_e = next(t for t in result if t.name == "task_e")
|
||||
assert set(task_e.parents) == {"task_b", "task_c"}
|
||||
|
||||
# task_a is root (first in topological order)
|
||||
assert result[0].name == "task_a"
|
||||
assert result[0].parents == []
|
||||
|
||||
|
||||
class TestExtractDagTasksFanOut:
|
||||
"""Test fan-out tasks with spawned children."""
|
||||
@@ -621,3 +826,133 @@ class TestMakeAudioProgressLoggerWithBroadcast:
|
||||
)
|
||||
callback(None, 100.0)
|
||||
mock_broadcast.assert_not_called()
|
||||
|
||||
def test_logging_throttled_by_interval(self):
|
||||
"""With interval=5.0, rapid calls only log once until interval elapses.
|
||||
|
||||
The throttle applies to ctx.log() calls. Broadcasts (fire-and-forget)
|
||||
are not throttled — they occur every call when transcript_id + progress_pct set.
|
||||
"""
|
||||
import asyncio
|
||||
import time as time_mod
|
||||
|
||||
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||
make_audio_progress_logger,
|
||||
)
|
||||
|
||||
ctx = MagicMock()
|
||||
ctx.log = MagicMock()
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
mock_broadcast = AsyncMock()
|
||||
tasks_created = []
|
||||
original_create_task = loop.create_task
|
||||
|
||||
def capture_create_task(coro):
|
||||
task = original_create_task(coro)
|
||||
tasks_created.append(task)
|
||||
return task
|
||||
|
||||
# Controlled monotonic values for the 4 calls from make_audio_progress_logger:
|
||||
# init (start_time, last_log_time), call1 (now), call2 (now), call3 (now)
|
||||
# After those, fall back to real time.monotonic() for asyncio internals.
|
||||
controlled_values = [100.0, 100.0, 101.0, 106.0]
|
||||
call_index = [0]
|
||||
real_monotonic = time_mod.monotonic
|
||||
|
||||
def mock_monotonic():
|
||||
if call_index[0] < len(controlled_values):
|
||||
val = controlled_values[call_index[0]]
|
||||
call_index[0] += 1
|
||||
return val
|
||||
return real_monotonic()
|
||||
|
||||
try:
|
||||
with (
|
||||
patch(
|
||||
"reflector.hatchet.workflows.daily_multitrack_pipeline.time.monotonic",
|
||||
side_effect=mock_monotonic,
|
||||
),
|
||||
patch(
|
||||
"reflector.hatchet.broadcast.broadcast_event",
|
||||
mock_broadcast,
|
||||
),
|
||||
patch.object(loop, "create_task", side_effect=capture_create_task),
|
||||
):
|
||||
callback = make_audio_progress_logger(
|
||||
ctx, TaskName.MIXDOWN_TRACKS, interval=5.0, transcript_id="t-123"
|
||||
)
|
||||
|
||||
# Call 1 at t=100.0: 100.0 - 100.0 = 0.0 < 5.0 => no log
|
||||
callback(25.0, 50.0)
|
||||
assert ctx.log.call_count == 0
|
||||
|
||||
# Call 2 at t=101.0: 101.0 - 100.0 = 1.0 < 5.0 => no log
|
||||
callback(50.0, 100.0)
|
||||
assert ctx.log.call_count == 0
|
||||
|
||||
# Call 3 at t=106.0: 106.0 - 100.0 = 6.0 >= 5.0 => logs
|
||||
callback(75.0, 150.0)
|
||||
assert ctx.log.call_count == 1
|
||||
|
||||
# Run pending broadcast tasks
|
||||
if tasks_created:
|
||||
loop.run_until_complete(asyncio.gather(*tasks_created))
|
||||
|
||||
# Broadcasts happen on every call (not throttled) — 3 calls total
|
||||
assert mock_broadcast.call_count == 3
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
def test_uses_broadcast_event_not_append_event_and_broadcast(self):
|
||||
"""Progress events use broadcast_event (transient), not append_event_and_broadcast (persisted)."""
|
||||
import asyncio
|
||||
|
||||
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||
make_audio_progress_logger,
|
||||
)
|
||||
|
||||
ctx = MagicMock()
|
||||
ctx.log = MagicMock()
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
|
||||
mock_broadcast_event = AsyncMock()
|
||||
mock_append = AsyncMock()
|
||||
tasks_created = []
|
||||
original_create_task = loop.create_task
|
||||
|
||||
def capture_create_task(coro):
|
||||
task = original_create_task(coro)
|
||||
tasks_created.append(task)
|
||||
return task
|
||||
|
||||
try:
|
||||
with (
|
||||
patch(
|
||||
"reflector.hatchet.broadcast.broadcast_event",
|
||||
mock_broadcast_event,
|
||||
),
|
||||
patch(
|
||||
"reflector.hatchet.broadcast.append_event_and_broadcast",
|
||||
mock_append,
|
||||
),
|
||||
patch.object(loop, "create_task", side_effect=capture_create_task),
|
||||
):
|
||||
callback = make_audio_progress_logger(
|
||||
ctx, TaskName.MIXDOWN_TRACKS, interval=0.0, transcript_id="t-123"
|
||||
)
|
||||
callback(50.0, 100.0)
|
||||
|
||||
if tasks_created:
|
||||
loop.run_until_complete(asyncio.gather(*tasks_created))
|
||||
|
||||
# broadcast_event (transient) IS called
|
||||
mock_broadcast_event.assert_called_once()
|
||||
# append_event_and_broadcast (persisted) is NOT called
|
||||
mock_append.assert_not_called()
|
||||
finally:
|
||||
loop.close()
|
||||
|
||||
Reference in New Issue
Block a user