diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index 563079c5..6c70943e 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -106,7 +106,10 @@ from reflector.utils.daily import ( parse_daily_recording_filename, ) from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty -from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT +from reflector.utils.transcript_constants import ( + compute_max_subjects, + compute_topic_chunk_size, +) from reflector.zulip import post_transcript_notification @@ -885,7 +888,8 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: transcripts_controller, ) - chunk_size = TOPIC_CHUNK_WORD_COUNT + duration_seconds = words[-1].end - words[0].start if words else 0 + chunk_size = compute_topic_chunk_size(duration_seconds, len(words)) chunks = [] for i in range(0, len(words), chunk_size): chunk_words = words[i : i + chunk_size] @@ -975,7 +979,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: ctx.log(f"detect_topics complete: found {len(topics_list)} topics") - return TopicsResult(topics=topics_list) + return TopicsResult(topics=topics_list, duration_seconds=duration_seconds) @daily_multitrack_pipeline.task( @@ -1112,8 +1116,14 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult participant_names, participant_name_to_id=participant_name_to_id ) + max_subjects = compute_max_subjects(topics_result.duration_seconds) + ctx.log( + f"extract_subjects: duration={topics_result.duration_seconds:.0f}s, " + f"max_subjects={max_subjects}" + ) + ctx.log("extract_subjects: calling LLM to extract subjects") - await builder.extract_subjects() + await builder.extract_subjects(max_subjects=max_subjects) ctx.log(f"extract_subjects complete: {len(builder.subjects)} subjects") diff --git a/server/reflector/hatchet/workflows/models.py b/server/reflector/hatchet/workflows/models.py index e8a0b6ad..bc553dcd 100644 --- a/server/reflector/hatchet/workflows/models.py +++ b/server/reflector/hatchet/workflows/models.py @@ -102,6 +102,7 @@ class TopicsResult(BaseModel): """Result from detect_topics task.""" topics: list[TitleSummary] + duration_seconds: float = 0 class TitleResult(BaseModel): diff --git a/server/reflector/pipelines/topic_processing.py b/server/reflector/pipelines/topic_processing.py index bc175099..9ce05a79 100644 --- a/server/reflector/pipelines/topic_processing.py +++ b/server/reflector/pipelines/topic_processing.py @@ -18,7 +18,7 @@ from reflector.processors import ( ) from reflector.processors.types import TitleSummary from reflector.processors.types import Transcript as TranscriptType -from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT +from reflector.utils.transcript_constants import compute_topic_chunk_size class EmptyPipeline: @@ -39,7 +39,10 @@ async def detect_topics( on_topic_callback: Callable, empty_pipeline: EmptyPipeline, ) -> list[TitleSummary]: - chunk_size = TOPIC_CHUNK_WORD_COUNT + duration_seconds = ( + transcript.words[-1].end - transcript.words[0].start if transcript.words else 0 + ) + chunk_size = compute_topic_chunk_size(duration_seconds, len(transcript.words)) topics: list[TitleSummary] = [] async def on_topic(topic: TitleSummary): diff --git a/server/reflector/processors/summary/prompts.py b/server/reflector/processors/summary/prompts.py index 0f331300..8d7ee566 100644 --- a/server/reflector/processors/summary/prompts.py +++ b/server/reflector/processors/summary/prompts.py @@ -43,7 +43,8 @@ DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent( include any deadlines or timeframes discussed for completion or follow-up. - Mention unresolved issues or topics needing further discussion, aiding in planning future meetings or follow-up actions. - - Do not include topic unrelated to {subject}. + - Be specific and cite participant names when attributing statements or actions. + - Do not include topics unrelated to {subject}. # OUTPUT Your summary should be clear, concise, and structured, covering all major @@ -58,6 +59,7 @@ PARAGRAPH_SUMMARY_PROMPT = dedent( """ Summarize the mentioned topic in 1 paragraph. It will be integrated into the final summary, so just for this topic. + Preserve key decisions and action items. Do not introduce new information. """ ).strip() diff --git a/server/reflector/processors/summary/summary_builder.py b/server/reflector/processors/summary/summary_builder.py index fadcfa23..7a423860 100644 --- a/server/reflector/processors/summary/summary_builder.py +++ b/server/reflector/processors/summary/summary_builder.py @@ -48,17 +48,24 @@ TRANSCRIPTION_TYPE_PROMPT = dedent( """ ).strip() -SUBJECTS_PROMPT = dedent( - """ - What are the main / high level topic of the meeting. - Do not include direct quotes or unnecessary details. - Be concise and focused on the main ideas. - A subject briefly mentioned should not be included. - There should be maximum 6 subjects. - Do not write complete narrative sentences for the subject, - you must write a concise subject using noun phrases. - """ -).strip() +_DEFAULT_MAX_SUBJECTS = 6 + + +def build_subjects_prompt(max_subjects: int = _DEFAULT_MAX_SUBJECTS) -> str: + """Build subjects extraction prompt with a dynamic subject cap.""" + subject_word = "subject" if max_subjects == 1 else "subjects" + return dedent( + f""" + What are the main / high level topics of the meeting. + Do not include direct quotes or unnecessary details. + Be concise and focused on the main ideas. + A subject briefly mentioned should not be included. + There should be maximum {max_subjects} {subject_word}. + Do not write complete narrative sentences for the subject, + you must write a concise subject using noun phrases. + """ + ).strip() + ACTION_ITEMS_PROMPT = dedent( """ @@ -145,7 +152,7 @@ class SubjectsResponse(BaseModel): """Pydantic model for extracted subjects/topics""" subjects: list[str] = Field( - description="List of main subjects/topics discussed, maximum 6 items", + description="List of main subjects/topics discussed", ) @@ -345,11 +352,14 @@ class SummaryBuilder: # Summary # ---------------------------------------------------------------------------- - async def extract_subjects(self) -> None: + async def extract_subjects(self, max_subjects: int = _DEFAULT_MAX_SUBJECTS) -> None: """Extract main subjects/topics from the transcript.""" - self.logger.info("--- extract main subjects using TreeSummarize") + self.logger.info( + "--- extract main subjects using TreeSummarize", + max_subjects=max_subjects, + ) - subjects_prompt = SUBJECTS_PROMPT + subjects_prompt = build_subjects_prompt(max_subjects) try: response = await self._get_structured_response( @@ -358,7 +368,7 @@ class SummaryBuilder: tone_name="Meeting assistant that talk only as list item", ) - self.subjects = response.subjects + self.subjects = response.subjects[:max_subjects] self.logger.info(f"Extracted subjects: {self.subjects}") except Exception as e: diff --git a/server/reflector/utils/transcript_constants.py b/server/reflector/utils/transcript_constants.py index 3924fa56..1110e62b 100644 --- a/server/reflector/utils/transcript_constants.py +++ b/server/reflector/utils/transcript_constants.py @@ -4,5 +4,67 @@ Shared transcript processing constants. Used by both Hatchet workflows and Celery pipelines for consistent processing. """ -# Topic detection: number of words per chunk for topic extraction +import math + +# Topic detection: legacy static chunk size, used as fallback TOPIC_CHUNK_WORD_COUNT = 300 + +# Dynamic chunking curve parameters +# Formula: target_topics = _COEFFICIENT * duration_minutes ^ _EXPONENT +# Derived from anchors: 5 min -> 3 topics, 180 min -> 40 topics +_TOPIC_CURVE_COEFFICIENT = 0.833 +_TOPIC_CURVE_EXPONENT = 0.723 +_MIN_TOPICS = 2 +_MAX_TOPICS = 50 +_MIN_CHUNK_WORDS = 375 +_MAX_CHUNK_WORDS = 1500 + + +def compute_topic_chunk_size(duration_seconds: float, total_words: int) -> int: + """Calculate optimal chunk size for topic detection based on recording duration. + + Uses a power-curve function to scale topic count sublinearly with duration, + producing fewer LLM calls for longer recordings while maintaining topic quality. + + Returns the number of words per chunk. + """ + if total_words <= 0 or duration_seconds <= 0: + return _MIN_CHUNK_WORDS + + duration_minutes = duration_seconds / 60.0 + target_topics = _TOPIC_CURVE_COEFFICIENT * math.pow( + duration_minutes, _TOPIC_CURVE_EXPONENT + ) + target_topics = int(round(max(_MIN_TOPICS, min(_MAX_TOPICS, target_topics)))) + + chunk_size = total_words // target_topics + chunk_size = max(_MIN_CHUNK_WORDS, min(_MAX_CHUNK_WORDS, chunk_size)) + return chunk_size + + +# Subject extraction: scale max subjects with recording duration +# Short calls get fewer subjects to avoid over-analyzing trivial content +_SUBJECT_DURATION_THRESHOLDS = [ + (5 * 60, 1), # ≤ 5 min → 1 subject + (15 * 60, 2), # ≤ 15 min → 2 subjects + (30 * 60, 3), # ≤ 30 min → 3 subjects + (45 * 60, 4), # ≤ 45 min → 4 subjects + (60 * 60, 5), # ≤ 60 min → 5 subjects +] +_MAX_SUBJECTS = 6 + + +def compute_max_subjects(duration_seconds: float) -> int: + """Calculate maximum number of subjects to extract based on recording duration. + + Uses a step function: short recordings get fewer subjects to avoid + generating excessive detail for trivial content. + """ + if duration_seconds <= 0: + return 1 + + for threshold, max_subjects in _SUBJECT_DURATION_THRESHOLDS: + if duration_seconds <= threshold: + return max_subjects + + return _MAX_SUBJECTS diff --git a/server/tests/test_topic_chunking.py b/server/tests/test_topic_chunking.py new file mode 100644 index 00000000..1918691d --- /dev/null +++ b/server/tests/test_topic_chunking.py @@ -0,0 +1,99 @@ +import math + +import pytest + +from reflector.utils.transcript_constants import ( + compute_max_subjects, + compute_topic_chunk_size, +) + + +@pytest.mark.parametrize( + "duration_min,total_words,expected_topics_range", + [ + (5, 750, (1, 3)), + (10, 1500, (3, 6)), + (30, 4500, (8, 14)), + (60, 9000, (14, 22)), + (120, 18000, (24, 35)), + (180, 27000, (30, 42)), + ], +) +def test_topic_count_in_expected_range( + duration_min, total_words, expected_topics_range +): + chunk_size = compute_topic_chunk_size(duration_min * 60, total_words) + num_topics = math.ceil(total_words / chunk_size) + assert expected_topics_range[0] <= num_topics <= expected_topics_range[1], ( + f"For {duration_min}min/{total_words}words: got {num_topics} topics " + f"(chunk_size={chunk_size}), expected {expected_topics_range[0]}-{expected_topics_range[1]}" + ) + + +def test_chunk_size_within_bounds(): + for duration_min in [5, 10, 30, 60, 120, 180]: + chunk_size = compute_topic_chunk_size(duration_min * 60, duration_min * 150) + assert ( + 375 <= chunk_size <= 1500 + ), f"For {duration_min}min: chunk_size={chunk_size} out of bounds [375, 1500]" + + +def test_zero_duration_falls_back(): + assert compute_topic_chunk_size(0, 1000) == 375 + + +def test_zero_words_falls_back(): + assert compute_topic_chunk_size(600, 0) == 375 + + +def test_negative_inputs_fall_back(): + assert compute_topic_chunk_size(-10, 1000) == 375 + assert compute_topic_chunk_size(600, -5) == 375 + + +def test_very_short_transcript(): + """A 1-minute call with very few words should still produce at least 1 topic.""" + chunk_size = compute_topic_chunk_size(60, 100) + # chunk_size is at least 375, so 100 words = 1 chunk + assert chunk_size >= 375 + + +def test_very_long_transcript(): + """A 4-hour call should cap at max topics.""" + chunk_size = compute_topic_chunk_size(4 * 3600, 36000) + num_topics = math.ceil(36000 / chunk_size) + assert num_topics <= 50 + + +# --- compute_max_subjects tests --- + + +@pytest.mark.parametrize( + "duration_seconds,expected_max", + [ + (0, 1), # zero/invalid → 1 + (-10, 1), # negative → 1 + (60, 1), # 1 min → 1 + (120, 1), # 2 min → 1 + (300, 1), # 5 min (boundary) → 1 + (301, 2), # just over 5 min → 2 + (900, 2), # 15 min (boundary) → 2 + (901, 3), # just over 15 min → 3 + (1800, 3), # 30 min (boundary) → 3 + (1801, 4), # just over 30 min → 4 + (2700, 4), # 45 min (boundary) → 4 + (2701, 5), # just over 45 min → 5 + (3600, 5), # 60 min (boundary) → 5 + (3601, 6), # just over 60 min → 6 + (7200, 6), # 2 hours → 6 + (14400, 6), # 4 hours → 6 + ], +) +def test_max_subjects_scales_with_duration(duration_seconds, expected_max): + assert compute_max_subjects(duration_seconds) == expected_max + + +def test_max_subjects_never_exceeds_cap(): + """Even very long recordings should cap at 6 subjects.""" + for hours in range(1, 10): + assert compute_max_subjects(hours * 3600) <= 6