Files
reflector/server/reflector/processors/summary/summary_builder.py
Igor Monadical 2d0df48767 feat: devex/hatchet log progress track (#813)
* progress track for some hatchet tasks

* remove inline imports / type fixes

* progress callback for mixdown - move to a function

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-26 14:10:21 -05:00

612 lines
21 KiB
Python

"""
# Summary meeting notes
This script is used to generate a summary of a meeting notes transcript.
"""
import asyncio
import sys
from datetime import datetime, timezone
from enum import Enum
from textwrap import dedent
from typing import Type, TypeVar
import structlog
from pydantic import BaseModel, Field
from reflector.llm import LLM
from reflector.processors.summary.models import ActionItemsResponse
from reflector.processors.summary.prompts import (
DETAILED_SUBJECT_PROMPT_TEMPLATE,
PARAGRAPH_SUMMARY_PROMPT,
RECAP_PROMPT,
build_participant_instructions,
build_summary_markdown,
)
from reflector.settings import settings
T = TypeVar("T", bound=BaseModel)
PARTICIPANTS_PROMPT = dedent(
"""
Identify all participants in this conversation.
Distinguish between people who actually spoke in the transcript and those who were only mentioned.
Each participant should only be listed once.
Do not include company names, only people's names.
"""
).strip()
TRANSCRIPTION_TYPE_PROMPT = dedent(
"""
Analyze the transcript to determine if it is a meeting, podcast, or interview.
A meeting typically involves severals participants engaging in discussions,
making decisions, and planning actions. A podcast often includes hosts
discussing topics or interviewing guests for an audience in a structured format.
An interview generally features one or more interviewer questioning one or
more interviewees, often for hiring, research, or journalism. Deliver your
classification with a confidence score and reasoning.
"""
).strip()
SUBJECTS_PROMPT = dedent(
"""
What are the main / high level topic of the meeting.
Do not include direct quotes or unnecessary details.
Be concise and focused on the main ideas.
A subject briefly mentioned should not be included.
There should be maximum 6 subjects.
Do not write complete narrative sentences for the subject,
you must write a concise subject using noun phrases.
"""
).strip()
ACTION_ITEMS_PROMPT = dedent(
"""
Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next.
Look for:
1. **Decisions Made**: Any decisions, choices, or conclusions reached during the meeting. For each decision:
- What was decided? (be specific)
- Who made the decision or was involved? (use actual participant names)
- Why was this decision made? (key factors, reasoning, or rationale)
2. **Next Steps / Action Items**: Any tasks, follow-ups, or actions that were mentioned or assigned. For each action item:
- What specific task needs to be done? (be concrete and actionable)
- Who is responsible? (use actual participant names if mentioned, or "team" if unclear)
- When is it due? (any deadlines, timeframes, or "by next meeting" type commitments)
- What context is needed? (any additional details that help understand the task)
Guidelines:
- Be thorough and identify all action items, even if they seem minor
- Include items that were agreed upon, assigned, or committed to
- Include decisions even if they seem obvious or implicit
- If someone says "I'll do X" or "We should do Y", that's an action item
- If someone says "Let's go with option A", that's a decision
- Use the exact participant names from the transcript
- If no participant name is mentioned, you can leave assigned_to/decided_by as null
Only return empty lists if the transcript contains NO decisions and NO action items whatsoever.
"""
).strip()
STRUCTURED_RESPONSE_PROMPT_TEMPLATE = dedent(
"""
Based on the following analysis, provide the information in the requested JSON format:
Analysis:
{analysis}
{format_instructions}
"""
).strip()
class TranscriptionType(Enum):
MEETING = "meeting"
PODCAST = "podcast"
INTERVIEW = "interview"
class TranscriptionTypeResponse(BaseModel):
"""Pydantic model for transcription type classification"""
transcription_type: str = Field(
description="The type of transcription - either 'meeting', 'podcast', or 'interview'"
)
confidence: float = Field(
description="Confidence score between 0 and 1", ge=0.0, le=1.0
)
reasoning: str = Field(description="Brief explanation for the classification")
class ParticipantInfo(BaseModel):
"""Information about a single participant"""
name: str = Field(description="The name of the participant")
is_speaker: bool = Field(
default=True, description="Whether this person spoke in the transcript"
)
class ParticipantsResponse(BaseModel):
"""Pydantic model for participants identification"""
participants: list[ParticipantInfo] = Field(
description="List of all participants in the conversation"
)
total_speakers: int = Field(description="Total number of people who spoke")
mentioned_only: list[str] = Field(
default_factory=list, description="Names mentioned but who didn't speak"
)
class SubjectsResponse(BaseModel):
"""Pydantic model for extracted subjects/topics"""
subjects: list[str] = Field(
description="List of main subjects/topics discussed, maximum 6 items",
)
class SummaryBuilder:
def __init__(self, llm: LLM, filename: str | None = None, logger=None) -> None:
self.transcript: str | None = None
self.recap: str | None = None
self.summaries: list[dict[str, str]] = []
self.subjects: list[str] = []
self.transcription_type: TranscriptionType | None = None
self.llm: LLM = llm
self.model_name: str = llm.model_name
self.logger = logger or structlog.get_logger()
self.participant_instructions: str | None = None
self.action_items: ActionItemsResponse | None = None
self.participant_name_to_id: dict[str, str] = {}
if filename:
self.read_transcript_from_file(filename)
def read_transcript_from_file(self, filename: str) -> None:
"""
Load a transcript from a text file.
Must be formatted as:
speaker: message
speaker2: message2
"""
with open(filename, "r", encoding="utf-8") as f:
self.transcript = f.read().strip()
def set_transcript(self, transcript: str) -> None:
assert isinstance(transcript, str)
self.transcript = transcript
def set_llm_instance(self, llm: LLM) -> None:
self.llm = llm
async def _get_structured_response(
self,
prompt: str,
output_cls: Type[T],
tone_name: str | None = None,
timeout: int | None = None,
) -> T:
"""Generic function to get structured output from LLM for non-function-calling models."""
enhanced_prompt = self._enhance_prompt_with_participants(prompt)
return await self.llm.get_structured_response(
enhanced_prompt,
[self.transcript],
output_cls,
tone_name=tone_name,
timeout=timeout,
)
async def _get_response(
self, prompt: str, texts: list[str], tone_name: str | None = None
) -> str:
"""Get text response with automatic participant instructions injection."""
enhanced_prompt = self._enhance_prompt_with_participants(prompt)
return await self.llm.get_response(enhanced_prompt, texts, tone_name=tone_name)
def _enhance_prompt_with_participants(self, prompt: str) -> str:
"""Add participant instructions to any prompt if participants are known."""
if self.participant_instructions:
self.logger.debug("Adding participant instructions to prompt")
return f"{prompt}\n\n{self.participant_instructions}"
return prompt
# ----------------------------------------------------------------------------
# Participants
# ----------------------------------------------------------------------------
def set_known_participants(
self,
participants: list[str],
participant_name_to_id: dict[str, str] | None = None,
) -> None:
"""
Set known participants directly without LLM identification.
This is used when participants are already identified and stored.
They are appended at the end of the transcript, providing more context for the assistant.
Args:
participants: List of participant names
participant_name_to_id: Optional mapping of participant names to their IDs
"""
if not participants:
self.logger.warning("No participants provided")
return
self.logger.info(
"Using known participants",
participants=participants,
)
if participant_name_to_id:
self.participant_name_to_id = participant_name_to_id
participants_md = self.format_list_md(participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}"
self.participant_instructions = build_participant_instructions(participants)
async def identify_participants(self) -> None:
"""
From a transcript, try to identify the participants using TreeSummarize with structured output.
This might not give the best result without good diarization, but it's a start.
They are appended at the end of the transcript, providing more context for the assistant.
"""
self.logger.debug("--- identify_participants using TreeSummarize with Pydantic")
participants_prompt = PARTICIPANTS_PROMPT
try:
response = await self._get_structured_response(
participants_prompt,
ParticipantsResponse,
tone_name="Participant identifier",
)
all_participants = [p.name for p in response.participants]
self.logger.info(
"Participants analysis complete",
total_speakers=response.total_speakers,
speakers=[p.name for p in response.participants if p.is_speaker],
mentioned_only=response.mentioned_only,
total_identified=len(all_participants) + len(response.mentioned_only),
)
unique_participants = list(set(all_participants + response.mentioned_only))
if unique_participants:
participants_md = self.format_list_md(unique_participants)
self.transcript += f"\n\n# Participants\n\n{participants_md}"
self.participant_instructions = build_participant_instructions(
unique_participants
)
else:
self.logger.warning("No participants identified in the transcript")
except Exception as e:
self.logger.error(f"Error in participant identification: {e}")
self.logger.warning(
"Failed to identify participants, continuing without them"
)
# ----------------------------------------------------------------------------
# Transcription identification
# ----------------------------------------------------------------------------
async def identify_transcription_type(self) -> None:
"""
Identify the type of transcription: meeting or podcast using TreeSummarizer with structured output.
"""
self.logger.debug(
"--- identify transcription type using TreeSummarizer with Pydantic"
)
transcription_type_prompt = TRANSCRIPTION_TYPE_PROMPT
try:
response = await self._get_structured_response(
transcription_type_prompt,
TranscriptionTypeResponse,
tone_name="Transcription type classifier",
)
self.logger.info(
f"Transcription type identified: {response.transcription_type} "
f"(confidence: {response.confidence:.2f})"
)
self.logger.debug(f"Reasoning: {response.reasoning}")
if response.transcription_type.lower() == "meeting":
self.transcription_type = TranscriptionType.MEETING
elif response.transcription_type.lower() == "podcast":
self.transcription_type = TranscriptionType.PODCAST
elif response.transcription_type.lower() == "interview":
self.transcription_type = TranscriptionType.INTERVIEW
else:
self.logger.warning(
f"Unexpected transcription type: {response.transcription_type}, "
f"defaulting to meeting"
)
self.transcription_type = TranscriptionType.MEETING
except Exception as e:
self.logger.error(f"Error in transcription type identification: {e}")
self.transcription_type = TranscriptionType.MEETING
# ----------------------------------------------------------------------------
# Summary
# ----------------------------------------------------------------------------
async def extract_subjects(self) -> None:
"""Extract main subjects/topics from the transcript."""
self.logger.info("--- extract main subjects using TreeSummarize")
subjects_prompt = SUBJECTS_PROMPT
try:
response = await self._get_structured_response(
subjects_prompt,
SubjectsResponse,
tone_name="Meeting assistant that talk only as list item",
)
self.subjects = response.subjects
self.logger.info(f"Extracted subjects: {self.subjects}")
except Exception as e:
self.logger.error(f"Error extracting subjects: {e}")
self.subjects = []
async def generate_subject_summaries(self) -> None:
"""Generate detailed summaries for each extracted subject."""
assert self.transcript is not None
summaries = []
for subject in self.subjects:
detailed_prompt = DETAILED_SUBJECT_PROMPT_TEMPLATE.format(subject=subject)
detailed_response = await self._get_response(
detailed_prompt, [self.transcript], tone_name="Topic assistant"
)
paragraph_prompt = PARAGRAPH_SUMMARY_PROMPT
paragraph_response = await self._get_response(
paragraph_prompt, [str(detailed_response)], tone_name="Topic summarizer"
)
summaries.append({"subject": subject, "summary": str(paragraph_response)})
self.logger.debug(f"Summary for {subject}: {paragraph_response}")
self.summaries = summaries
async def generate_recap(self) -> None:
"""Generate a quick recap from the subject summaries."""
summaries_text = "\n\n".join(
[
f"{summary['subject']}: {summary['summary']}"
for summary in self.summaries
]
)
recap_prompt = RECAP_PROMPT
recap_response = await self._get_response(
recap_prompt, [summaries_text], tone_name="Recap summarizer"
)
self.recap = str(recap_response)
self.logger.info(f"Quick recap: {self.recap}")
def _map_participant_names_to_ids(
self, response: ActionItemsResponse
) -> ActionItemsResponse:
"""Map participant names in action items to participant IDs."""
if not self.participant_name_to_id:
return response
decisions = []
for decision in response.decisions:
new_decision = decision.model_copy()
if (
decision.decided_by
and decision.decided_by in self.participant_name_to_id
):
new_decision.decided_by_participant_id = self.participant_name_to_id[
decision.decided_by
]
decisions.append(new_decision)
next_steps = []
for item in response.next_steps:
new_item = item.model_copy()
if item.assigned_to and item.assigned_to in self.participant_name_to_id:
new_item.assigned_to_participant_id = self.participant_name_to_id[
item.assigned_to
]
next_steps.append(new_item)
return ActionItemsResponse(decisions=decisions, next_steps=next_steps)
async def identify_action_items(self) -> ActionItemsResponse | None:
"""Identify action items (decisions and next steps) from the transcript."""
self.logger.info("--- identify action items using TreeSummarize")
if not self.transcript:
self.logger.warning(
"No transcript available for action items identification"
)
self.action_items = None
return None
action_items_prompt = ACTION_ITEMS_PROMPT
try:
response = await self._get_structured_response(
action_items_prompt,
ActionItemsResponse,
tone_name="Action item identifier",
timeout=settings.LLM_STRUCTURED_RESPONSE_TIMEOUT,
)
response = self._map_participant_names_to_ids(response)
self.action_items = response
self.logger.info(
f"Identified {len(response.decisions)} decisions and {len(response.next_steps)} action items",
decisions_count=len(response.decisions),
next_steps_count=len(response.next_steps),
)
if response.decisions:
self.logger.debug(
"Decisions identified",
decisions=[d.decision for d in response.decisions],
)
if response.next_steps:
self.logger.debug(
"Action items identified",
tasks=[item.task for item in response.next_steps],
)
if not response.decisions and not response.next_steps:
self.logger.warning(
"No action items identified from transcript",
transcript_length=len(self.transcript),
)
return response
except Exception as e:
self.logger.error(
f"Error identifying action items: {e}",
exc_info=True,
)
self.action_items = None
return None
async def generate_summary(self, only_subjects: bool = False) -> None:
"""
Generate summary by extracting subjects, creating summaries for each, and generating a recap.
"""
await self.extract_subjects()
if only_subjects:
return
await self.generate_subject_summaries()
await self.generate_recap()
await self.identify_action_items()
# ----------------------------------------------------------------------------
# Markdown
# ----------------------------------------------------------------------------
def as_markdown(self) -> str:
return build_summary_markdown(self.recap, self.summaries)
def format_list_md(self, data: list[str]) -> str:
return "\n".join([f"- {item}" for item in data])
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Generate a summary of a meeting transcript"
)
parser.add_argument(
"transcript",
type=str,
nargs="?",
help="The transcript of the meeting",
default="transcript.txt",
)
parser.add_argument(
"--transcription-type",
action="store_true",
help="Identify the type of the transcript (meeting, interview, podcast...)",
)
parser.add_argument(
"--save",
action="store_true",
help="Save the summary to a file",
)
parser.add_argument(
"--summary",
action="store_true",
help="Generate a summary",
)
parser.add_argument(
"--subjects",
help="Generate a list of subjects",
action="store_true",
)
parser.add_argument(
"--participants",
help="Generate a list of participants",
action="store_true",
)
args = parser.parse_args()
async def main():
# build the summary
llm = LLM(settings=settings)
sm = SummaryBuilder(llm=llm, filename=args.transcript)
if args.subjects:
await sm.generate_summary(only_subjects=True)
print("# Subjects\n")
print("\n".join(sm.subjects))
sys.exit(0)
if args.transcription_type:
await sm.identify_transcription_type()
print(sm.transcription_type)
sys.exit(0)
if args.participants:
await sm.identify_participants()
sys.exit(0)
# if no summary is asked, ask for everything
if not args.summary and not args.subjects:
args.summary = True
if args.summary:
await sm.generate_summary()
print("")
print("-" * 80)
print("")
print(sm.as_markdown())
if args.save:
# write the summary to a file, on the format summary-<iso date>.md
filename = f"summary-{datetime.now(timezone.utc).isoformat()}.md"
with open(filename, "w", encoding="utf-8") as f:
f.write(sm.as_markdown())
print("")
print("-" * 80)
print("")
print("Saved to", filename)
asyncio.run(main())