fix: better topic chunking and subject extraction (#952)

* fix: better detect topics chunking depending on duration

* fix:  better subject detection + prompt improvements
This commit is contained in:
Juan Diego García
2026-04-08 15:38:04 -05:00
committed by GitHub
parent 7ed3b781ee
commit 5f0c5635eb
7 changed files with 211 additions and 24 deletions

View File

@@ -106,7 +106,10 @@ from reflector.utils.daily import (
parse_daily_recording_filename, parse_daily_recording_filename,
) )
from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty
from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT from reflector.utils.transcript_constants import (
compute_max_subjects,
compute_topic_chunk_size,
)
from reflector.zulip import post_transcript_notification from reflector.zulip import post_transcript_notification
@@ -885,7 +888,8 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
transcripts_controller, transcripts_controller,
) )
chunk_size = TOPIC_CHUNK_WORD_COUNT duration_seconds = words[-1].end - words[0].start if words else 0
chunk_size = compute_topic_chunk_size(duration_seconds, len(words))
chunks = [] chunks = []
for i in range(0, len(words), chunk_size): for i in range(0, len(words), chunk_size):
chunk_words = words[i : i + chunk_size] chunk_words = words[i : i + chunk_size]
@@ -975,7 +979,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
ctx.log(f"detect_topics complete: found {len(topics_list)} topics") ctx.log(f"detect_topics complete: found {len(topics_list)} topics")
return TopicsResult(topics=topics_list) return TopicsResult(topics=topics_list, duration_seconds=duration_seconds)
@daily_multitrack_pipeline.task( @daily_multitrack_pipeline.task(
@@ -1112,8 +1116,14 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
participant_names, participant_name_to_id=participant_name_to_id participant_names, participant_name_to_id=participant_name_to_id
) )
max_subjects = compute_max_subjects(topics_result.duration_seconds)
ctx.log(
f"extract_subjects: duration={topics_result.duration_seconds:.0f}s, "
f"max_subjects={max_subjects}"
)
ctx.log("extract_subjects: calling LLM to extract subjects") ctx.log("extract_subjects: calling LLM to extract subjects")
await builder.extract_subjects() await builder.extract_subjects(max_subjects=max_subjects)
ctx.log(f"extract_subjects complete: {len(builder.subjects)} subjects") ctx.log(f"extract_subjects complete: {len(builder.subjects)} subjects")

View File

@@ -102,6 +102,7 @@ class TopicsResult(BaseModel):
"""Result from detect_topics task.""" """Result from detect_topics task."""
topics: list[TitleSummary] topics: list[TitleSummary]
duration_seconds: float = 0
class TitleResult(BaseModel): class TitleResult(BaseModel):

View File

@@ -18,7 +18,7 @@ from reflector.processors import (
) )
from reflector.processors.types import TitleSummary from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType from reflector.processors.types import Transcript as TranscriptType
from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT from reflector.utils.transcript_constants import compute_topic_chunk_size
class EmptyPipeline: class EmptyPipeline:
@@ -39,7 +39,10 @@ async def detect_topics(
on_topic_callback: Callable, on_topic_callback: Callable,
empty_pipeline: EmptyPipeline, empty_pipeline: EmptyPipeline,
) -> list[TitleSummary]: ) -> list[TitleSummary]:
chunk_size = TOPIC_CHUNK_WORD_COUNT duration_seconds = (
transcript.words[-1].end - transcript.words[0].start if transcript.words else 0
)
chunk_size = compute_topic_chunk_size(duration_seconds, len(transcript.words))
topics: list[TitleSummary] = [] topics: list[TitleSummary] = []
async def on_topic(topic: TitleSummary): async def on_topic(topic: TitleSummary):

View File

@@ -43,7 +43,8 @@ DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
include any deadlines or timeframes discussed for completion or follow-up. include any deadlines or timeframes discussed for completion or follow-up.
- Mention unresolved issues or topics needing further discussion, aiding in - Mention unresolved issues or topics needing further discussion, aiding in
planning future meetings or follow-up actions. planning future meetings or follow-up actions.
- Do not include topic unrelated to {subject}. - Be specific and cite participant names when attributing statements or actions.
- Do not include topics unrelated to {subject}.
# OUTPUT # OUTPUT
Your summary should be clear, concise, and structured, covering all major Your summary should be clear, concise, and structured, covering all major
@@ -58,6 +59,7 @@ PARAGRAPH_SUMMARY_PROMPT = dedent(
""" """
Summarize the mentioned topic in 1 paragraph. Summarize the mentioned topic in 1 paragraph.
It will be integrated into the final summary, so just for this topic. It will be integrated into the final summary, so just for this topic.
Preserve key decisions and action items. Do not introduce new information.
""" """
).strip() ).strip()

View File

@@ -48,17 +48,24 @@ TRANSCRIPTION_TYPE_PROMPT = dedent(
""" """
).strip() ).strip()
SUBJECTS_PROMPT = dedent( _DEFAULT_MAX_SUBJECTS = 6
"""
What are the main / high level topic of the meeting.
Do not include direct quotes or unnecessary details. def build_subjects_prompt(max_subjects: int = _DEFAULT_MAX_SUBJECTS) -> str:
Be concise and focused on the main ideas. """Build subjects extraction prompt with a dynamic subject cap."""
A subject briefly mentioned should not be included. subject_word = "subject" if max_subjects == 1 else "subjects"
There should be maximum 6 subjects. return dedent(
Do not write complete narrative sentences for the subject, f"""
you must write a concise subject using noun phrases. What are the main / high level topics of the meeting.
""" Do not include direct quotes or unnecessary details.
).strip() Be concise and focused on the main ideas.
A subject briefly mentioned should not be included.
There should be maximum {max_subjects} {subject_word}.
Do not write complete narrative sentences for the subject,
you must write a concise subject using noun phrases.
"""
).strip()
ACTION_ITEMS_PROMPT = dedent( ACTION_ITEMS_PROMPT = dedent(
""" """
@@ -145,7 +152,7 @@ class SubjectsResponse(BaseModel):
"""Pydantic model for extracted subjects/topics""" """Pydantic model for extracted subjects/topics"""
subjects: list[str] = Field( subjects: list[str] = Field(
description="List of main subjects/topics discussed, maximum 6 items", description="List of main subjects/topics discussed",
) )
@@ -345,11 +352,14 @@ class SummaryBuilder:
# Summary # Summary
# ---------------------------------------------------------------------------- # ----------------------------------------------------------------------------
async def extract_subjects(self) -> None: async def extract_subjects(self, max_subjects: int = _DEFAULT_MAX_SUBJECTS) -> None:
"""Extract main subjects/topics from the transcript.""" """Extract main subjects/topics from the transcript."""
self.logger.info("--- extract main subjects using TreeSummarize") self.logger.info(
"--- extract main subjects using TreeSummarize",
max_subjects=max_subjects,
)
subjects_prompt = SUBJECTS_PROMPT subjects_prompt = build_subjects_prompt(max_subjects)
try: try:
response = await self._get_structured_response( response = await self._get_structured_response(
@@ -358,7 +368,7 @@ class SummaryBuilder:
tone_name="Meeting assistant that talk only as list item", tone_name="Meeting assistant that talk only as list item",
) )
self.subjects = response.subjects self.subjects = response.subjects[:max_subjects]
self.logger.info(f"Extracted subjects: {self.subjects}") self.logger.info(f"Extracted subjects: {self.subjects}")
except Exception as e: except Exception as e:

View File

@@ -4,5 +4,67 @@ Shared transcript processing constants.
Used by both Hatchet workflows and Celery pipelines for consistent processing. Used by both Hatchet workflows and Celery pipelines for consistent processing.
""" """
# Topic detection: number of words per chunk for topic extraction import math
# Topic detection: legacy static chunk size, used as fallback
TOPIC_CHUNK_WORD_COUNT = 300 TOPIC_CHUNK_WORD_COUNT = 300
# Dynamic chunking curve parameters
# Formula: target_topics = _COEFFICIENT * duration_minutes ^ _EXPONENT
# Derived from anchors: 5 min -> 3 topics, 180 min -> 40 topics
_TOPIC_CURVE_COEFFICIENT = 0.833
_TOPIC_CURVE_EXPONENT = 0.723
_MIN_TOPICS = 2
_MAX_TOPICS = 50
_MIN_CHUNK_WORDS = 375
_MAX_CHUNK_WORDS = 1500
def compute_topic_chunk_size(duration_seconds: float, total_words: int) -> int:
"""Calculate optimal chunk size for topic detection based on recording duration.
Uses a power-curve function to scale topic count sublinearly with duration,
producing fewer LLM calls for longer recordings while maintaining topic quality.
Returns the number of words per chunk.
"""
if total_words <= 0 or duration_seconds <= 0:
return _MIN_CHUNK_WORDS
duration_minutes = duration_seconds / 60.0
target_topics = _TOPIC_CURVE_COEFFICIENT * math.pow(
duration_minutes, _TOPIC_CURVE_EXPONENT
)
target_topics = int(round(max(_MIN_TOPICS, min(_MAX_TOPICS, target_topics))))
chunk_size = total_words // target_topics
chunk_size = max(_MIN_CHUNK_WORDS, min(_MAX_CHUNK_WORDS, chunk_size))
return chunk_size
# Subject extraction: scale max subjects with recording duration
# Short calls get fewer subjects to avoid over-analyzing trivial content
_SUBJECT_DURATION_THRESHOLDS = [
(5 * 60, 1), # ≤ 5 min → 1 subject
(15 * 60, 2), # ≤ 15 min → 2 subjects
(30 * 60, 3), # ≤ 30 min → 3 subjects
(45 * 60, 4), # ≤ 45 min → 4 subjects
(60 * 60, 5), # ≤ 60 min → 5 subjects
]
_MAX_SUBJECTS = 6
def compute_max_subjects(duration_seconds: float) -> int:
"""Calculate maximum number of subjects to extract based on recording duration.
Uses a step function: short recordings get fewer subjects to avoid
generating excessive detail for trivial content.
"""
if duration_seconds <= 0:
return 1
for threshold, max_subjects in _SUBJECT_DURATION_THRESHOLDS:
if duration_seconds <= threshold:
return max_subjects
return _MAX_SUBJECTS

View File

@@ -0,0 +1,99 @@
import math
import pytest
from reflector.utils.transcript_constants import (
compute_max_subjects,
compute_topic_chunk_size,
)
@pytest.mark.parametrize(
"duration_min,total_words,expected_topics_range",
[
(5, 750, (1, 3)),
(10, 1500, (3, 6)),
(30, 4500, (8, 14)),
(60, 9000, (14, 22)),
(120, 18000, (24, 35)),
(180, 27000, (30, 42)),
],
)
def test_topic_count_in_expected_range(
duration_min, total_words, expected_topics_range
):
chunk_size = compute_topic_chunk_size(duration_min * 60, total_words)
num_topics = math.ceil(total_words / chunk_size)
assert expected_topics_range[0] <= num_topics <= expected_topics_range[1], (
f"For {duration_min}min/{total_words}words: got {num_topics} topics "
f"(chunk_size={chunk_size}), expected {expected_topics_range[0]}-{expected_topics_range[1]}"
)
def test_chunk_size_within_bounds():
for duration_min in [5, 10, 30, 60, 120, 180]:
chunk_size = compute_topic_chunk_size(duration_min * 60, duration_min * 150)
assert (
375 <= chunk_size <= 1500
), f"For {duration_min}min: chunk_size={chunk_size} out of bounds [375, 1500]"
def test_zero_duration_falls_back():
assert compute_topic_chunk_size(0, 1000) == 375
def test_zero_words_falls_back():
assert compute_topic_chunk_size(600, 0) == 375
def test_negative_inputs_fall_back():
assert compute_topic_chunk_size(-10, 1000) == 375
assert compute_topic_chunk_size(600, -5) == 375
def test_very_short_transcript():
"""A 1-minute call with very few words should still produce at least 1 topic."""
chunk_size = compute_topic_chunk_size(60, 100)
# chunk_size is at least 375, so 100 words = 1 chunk
assert chunk_size >= 375
def test_very_long_transcript():
"""A 4-hour call should cap at max topics."""
chunk_size = compute_topic_chunk_size(4 * 3600, 36000)
num_topics = math.ceil(36000 / chunk_size)
assert num_topics <= 50
# --- compute_max_subjects tests ---
@pytest.mark.parametrize(
"duration_seconds,expected_max",
[
(0, 1), # zero/invalid → 1
(-10, 1), # negative → 1
(60, 1), # 1 min → 1
(120, 1), # 2 min → 1
(300, 1), # 5 min (boundary) → 1
(301, 2), # just over 5 min → 2
(900, 2), # 15 min (boundary) → 2
(901, 3), # just over 15 min → 3
(1800, 3), # 30 min (boundary) → 3
(1801, 4), # just over 30 min → 4
(2700, 4), # 45 min (boundary) → 4
(2701, 5), # just over 45 min → 5
(3600, 5), # 60 min (boundary) → 5
(3601, 6), # just over 60 min → 6
(7200, 6), # 2 hours → 6
(14400, 6), # 4 hours → 6
],
)
def test_max_subjects_scales_with_duration(duration_seconds, expected_max):
assert compute_max_subjects(duration_seconds) == expected_max
def test_max_subjects_never_exceeds_cap():
"""Even very long recordings should cap at 6 subjects."""
for hours in range(1, 10):
assert compute_max_subjects(hours * 3600) <= 6