From c45d3182ee02ba360633a7d45cb0a5cd4073d26f Mon Sep 17 00:00:00 2001 From: Igor Loskutov Date: Mon, 9 Feb 2026 13:53:13 -0500 Subject: [PATCH] =?UTF-8?q?test:=20enhance=20Task=201+3=20tests=20?= =?UTF-8?q?=E2=80=94=20production=20DAG,=20throttling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add 4 new tests to test_dag_progress.py: - test_production_dag_shape: Real 15-task pipeline topology with mixed statuses, verifying all tasks present, topological order invariant, and correct parent relationships (e.g. finalize has 4 parents) - test_topological_sort_invariant_complex_dag: 7-node DAG with wide branching/merging to stress-test that all parents precede children - test_logging_throttled_by_interval: Mocks time.monotonic to verify ctx.log() is throttled by interval while broadcasts are not - test_uses_broadcast_event_not_append_event_and_broadcast: Verifies progress uses transient broadcast_event, not persisted append variant --- server/tests/test_dag_progress.py | 335 ++++++++++++++++++++++++++++++ 1 file changed, 335 insertions(+) diff --git a/server/tests/test_dag_progress.py b/server/tests/test_dag_progress.py index 14a2d810..8279624c 100644 --- a/server/tests/test_dag_progress.py +++ b/server/tests/test_dag_progress.py @@ -245,6 +245,211 @@ class TestExtractDagTasksTopology: assert names[0] == "task_a" assert names[1] == "task_b" + def test_production_dag_shape(self): + """Test the real 15-task pipeline topology with mixed statuses. + + Simulates a mid-pipeline state where early tasks completed, + middle tasks running, and later tasks still queued. + """ + # Production DAG edges (parent -> children): + # get_recording -> get_participants + # get_participants -> process_tracks + # process_tracks -> mixdown_tracks, detect_topics, finalize + # mixdown_tracks -> generate_waveform + # detect_topics -> generate_title, extract_subjects + # extract_subjects -> process_subjects, identify_action_items + # process_subjects -> generate_recap + # generate_title -> finalize + # generate_recap -> finalize + # identify_action_items -> finalize + # finalize -> cleanup_consent + # cleanup_consent -> post_zulip, send_webhook + shape = [ + _make_shape_item( + "s_get_recording", TaskName.GET_RECORDING, ["s_get_participants"] + ), + _make_shape_item( + "s_get_participants", TaskName.GET_PARTICIPANTS, ["s_process_tracks"] + ), + _make_shape_item( + "s_process_tracks", + TaskName.PROCESS_TRACKS, + ["s_mixdown_tracks", "s_detect_topics", "s_finalize"], + ), + _make_shape_item( + "s_mixdown_tracks", TaskName.MIXDOWN_TRACKS, ["s_generate_waveform"] + ), + _make_shape_item("s_generate_waveform", TaskName.GENERATE_WAVEFORM), + _make_shape_item( + "s_detect_topics", + TaskName.DETECT_TOPICS, + ["s_generate_title", "s_extract_subjects"], + ), + _make_shape_item( + "s_generate_title", TaskName.GENERATE_TITLE, ["s_finalize"] + ), + _make_shape_item( + "s_extract_subjects", + TaskName.EXTRACT_SUBJECTS, + ["s_process_subjects", "s_identify_action_items"], + ), + _make_shape_item( + "s_process_subjects", TaskName.PROCESS_SUBJECTS, ["s_generate_recap"] + ), + _make_shape_item( + "s_generate_recap", TaskName.GENERATE_RECAP, ["s_finalize"] + ), + _make_shape_item( + "s_identify_action_items", + TaskName.IDENTIFY_ACTION_ITEMS, + ["s_finalize"], + ), + _make_shape_item("s_finalize", TaskName.FINALIZE, ["s_cleanup_consent"]), + _make_shape_item( + "s_cleanup_consent", + TaskName.CLEANUP_CONSENT, + ["s_post_zulip", "s_send_webhook"], + ), + _make_shape_item("s_post_zulip", TaskName.POST_ZULIP), + _make_shape_item("s_send_webhook", TaskName.SEND_WEBHOOK), + ] + + # Mid-pipeline: early tasks done, middle running, later queued + tasks = [ + _make_task_summary("s_get_recording", status="COMPLETED"), + _make_task_summary("s_get_participants", status="COMPLETED"), + _make_task_summary("s_process_tracks", status="COMPLETED"), + _make_task_summary("s_mixdown_tracks", status="RUNNING"), + _make_task_summary("s_generate_waveform", status="QUEUED"), + _make_task_summary("s_detect_topics", status="RUNNING"), + _make_task_summary("s_generate_title", status="QUEUED"), + _make_task_summary("s_extract_subjects", status="QUEUED"), + _make_task_summary("s_process_subjects", status="QUEUED"), + _make_task_summary("s_generate_recap", status="QUEUED"), + _make_task_summary("s_identify_action_items", status="QUEUED"), + _make_task_summary("s_finalize", status="QUEUED"), + _make_task_summary("s_cleanup_consent", status="QUEUED"), + _make_task_summary("s_post_zulip", status="QUEUED"), + _make_task_summary("s_send_webhook", status="QUEUED"), + ] + details = _make_details(shape, tasks) + + result = extract_dag_tasks(details) + + # All 15 tasks present + assert len(result) == 15 + result_names = [t.name for t in result] + assert set(result_names) == { + TaskName.GET_RECORDING, + TaskName.GET_PARTICIPANTS, + TaskName.PROCESS_TRACKS, + TaskName.MIXDOWN_TRACKS, + TaskName.GENERATE_WAVEFORM, + TaskName.DETECT_TOPICS, + TaskName.GENERATE_TITLE, + TaskName.EXTRACT_SUBJECTS, + TaskName.PROCESS_SUBJECTS, + TaskName.GENERATE_RECAP, + TaskName.IDENTIFY_ACTION_ITEMS, + TaskName.FINALIZE, + TaskName.CLEANUP_CONSENT, + TaskName.POST_ZULIP, + TaskName.SEND_WEBHOOK, + } + + # Topological order invariant: no task appears before its parents + name_to_index = {t.name: i for i, t in enumerate(result)} + for task in result: + for parent_name in task.parents: + assert name_to_index[parent_name] < name_to_index[task.name], ( + f"Parent {parent_name} (idx {name_to_index[parent_name]}) " + f"must appear before {task.name} (idx {name_to_index[task.name]})" + ) + + # finalize has exactly 4 parents + finalize = next(t for t in result if t.name == TaskName.FINALIZE) + assert set(finalize.parents) == { + TaskName.PROCESS_TRACKS, + TaskName.GENERATE_TITLE, + TaskName.GENERATE_RECAP, + TaskName.IDENTIFY_ACTION_ITEMS, + } + + # cleanup_consent has 1 parent (finalize) + cleanup = next(t for t in result if t.name == TaskName.CLEANUP_CONSENT) + assert cleanup.parents == [TaskName.FINALIZE] + + # post_zulip and send_webhook both have cleanup_consent as parent + post_zulip = next(t for t in result if t.name == TaskName.POST_ZULIP) + send_webhook = next(t for t in result if t.name == TaskName.SEND_WEBHOOK) + assert post_zulip.parents == [TaskName.CLEANUP_CONSENT] + assert send_webhook.parents == [TaskName.CLEANUP_CONSENT] + + # Verify statuses propagated correctly + assert ( + next(t for t in result if t.name == TaskName.GET_RECORDING).status + == DagTaskStatus.COMPLETED + ) + assert ( + next(t for t in result if t.name == TaskName.MIXDOWN_TRACKS).status + == DagTaskStatus.RUNNING + ) + assert ( + next(t for t in result if t.name == TaskName.FINALIZE).status + == DagTaskStatus.QUEUED + ) + + def test_topological_sort_invariant_complex_dag(self): + """For a complex DAG, every task's parents appear earlier in the list. + + Uses a wider branching/merging DAG than diamond to stress the invariant. + """ + # DAG: A -> B, A -> C, A -> D, B -> E, C -> E, C -> F, D -> F, E -> G, F -> G + shape = [ + _make_shape_item("s_a", "task_a", ["s_b", "s_c", "s_d"]), + _make_shape_item("s_b", "task_b", ["s_e"]), + _make_shape_item("s_c", "task_c", ["s_e", "s_f"]), + _make_shape_item("s_d", "task_d", ["s_f"]), + _make_shape_item("s_e", "task_e", ["s_g"]), + _make_shape_item("s_f", "task_f", ["s_g"]), + _make_shape_item("s_g", "task_g"), + ] + tasks = [ + _make_task_summary("s_a", status="COMPLETED"), + _make_task_summary("s_b", status="COMPLETED"), + _make_task_summary("s_c", status="RUNNING"), + _make_task_summary("s_d", status="COMPLETED"), + _make_task_summary("s_e", status="QUEUED"), + _make_task_summary("s_f", status="QUEUED"), + _make_task_summary("s_g", status="QUEUED"), + ] + details = _make_details(shape, tasks) + + result = extract_dag_tasks(details) + + assert len(result) == 7 + name_to_index = {t.name: i for i, t in enumerate(result)} + + # Verify invariant: every parent appears before its child + for task in result: + for parent_name in task.parents: + assert name_to_index[parent_name] < name_to_index[task.name], ( + f"Parent {parent_name} (idx {name_to_index[parent_name]}) " + f"must appear before {task.name} (idx {name_to_index[task.name]})" + ) + + # task_g has 2 parents + task_g = next(t for t in result if t.name == "task_g") + assert set(task_g.parents) == {"task_e", "task_f"} + + # task_e has 2 parents + task_e = next(t for t in result if t.name == "task_e") + assert set(task_e.parents) == {"task_b", "task_c"} + + # task_a is root (first in topological order) + assert result[0].name == "task_a" + assert result[0].parents == [] + class TestExtractDagTasksFanOut: """Test fan-out tasks with spawned children.""" @@ -621,3 +826,133 @@ class TestMakeAudioProgressLoggerWithBroadcast: ) callback(None, 100.0) mock_broadcast.assert_not_called() + + def test_logging_throttled_by_interval(self): + """With interval=5.0, rapid calls only log once until interval elapses. + + The throttle applies to ctx.log() calls. Broadcasts (fire-and-forget) + are not throttled — they occur every call when transcript_id + progress_pct set. + """ + import asyncio + import time as time_mod + + from reflector.hatchet.workflows.daily_multitrack_pipeline import ( + make_audio_progress_logger, + ) + + ctx = MagicMock() + ctx.log = MagicMock() + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + mock_broadcast = AsyncMock() + tasks_created = [] + original_create_task = loop.create_task + + def capture_create_task(coro): + task = original_create_task(coro) + tasks_created.append(task) + return task + + # Controlled monotonic values for the 4 calls from make_audio_progress_logger: + # init (start_time, last_log_time), call1 (now), call2 (now), call3 (now) + # After those, fall back to real time.monotonic() for asyncio internals. + controlled_values = [100.0, 100.0, 101.0, 106.0] + call_index = [0] + real_monotonic = time_mod.monotonic + + def mock_monotonic(): + if call_index[0] < len(controlled_values): + val = controlled_values[call_index[0]] + call_index[0] += 1 + return val + return real_monotonic() + + try: + with ( + patch( + "reflector.hatchet.workflows.daily_multitrack_pipeline.time.monotonic", + side_effect=mock_monotonic, + ), + patch( + "reflector.hatchet.broadcast.broadcast_event", + mock_broadcast, + ), + patch.object(loop, "create_task", side_effect=capture_create_task), + ): + callback = make_audio_progress_logger( + ctx, TaskName.MIXDOWN_TRACKS, interval=5.0, transcript_id="t-123" + ) + + # Call 1 at t=100.0: 100.0 - 100.0 = 0.0 < 5.0 => no log + callback(25.0, 50.0) + assert ctx.log.call_count == 0 + + # Call 2 at t=101.0: 101.0 - 100.0 = 1.0 < 5.0 => no log + callback(50.0, 100.0) + assert ctx.log.call_count == 0 + + # Call 3 at t=106.0: 106.0 - 100.0 = 6.0 >= 5.0 => logs + callback(75.0, 150.0) + assert ctx.log.call_count == 1 + + # Run pending broadcast tasks + if tasks_created: + loop.run_until_complete(asyncio.gather(*tasks_created)) + + # Broadcasts happen on every call (not throttled) — 3 calls total + assert mock_broadcast.call_count == 3 + finally: + loop.close() + + def test_uses_broadcast_event_not_append_event_and_broadcast(self): + """Progress events use broadcast_event (transient), not append_event_and_broadcast (persisted).""" + import asyncio + + from reflector.hatchet.workflows.daily_multitrack_pipeline import ( + make_audio_progress_logger, + ) + + ctx = MagicMock() + ctx.log = MagicMock() + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + mock_broadcast_event = AsyncMock() + mock_append = AsyncMock() + tasks_created = [] + original_create_task = loop.create_task + + def capture_create_task(coro): + task = original_create_task(coro) + tasks_created.append(task) + return task + + try: + with ( + patch( + "reflector.hatchet.broadcast.broadcast_event", + mock_broadcast_event, + ), + patch( + "reflector.hatchet.broadcast.append_event_and_broadcast", + mock_append, + ), + patch.object(loop, "create_task", side_effect=capture_create_task), + ): + callback = make_audio_progress_logger( + ctx, TaskName.MIXDOWN_TRACKS, interval=0.0, transcript_id="t-123" + ) + callback(50.0, 100.0) + + if tasks_created: + loop.run_until_complete(asyncio.gather(*tasks_created)) + + # broadcast_event (transient) IS called + mock_broadcast_event.assert_called_once() + # append_event_and_broadcast (persisted) is NOT called + mock_append.assert_not_called() + finally: + loop.close()