From 964cd78bb699d83d012ae4b8c96565df25b90a5d Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Thu, 18 Dec 2025 21:13:47 +0100 Subject: [PATCH] feat: identify action items (#790) * Identify action items * Add action items to mock summary * Add action items validator * Remove final prefix from action items * Make on action items callback required * Don't mutation action items response * Assign action items to none on error * Use timeout constant * Exclude action items from transcript list --- .../versions/05f8688d6895_add_action_items.py | 26 +++ server/reflector/db/transcripts.py | 13 +- server/reflector/llm.py | 5 +- .../reflector/pipelines/main_file_pipeline.py | 1 + .../reflector/pipelines/main_live_pipeline.py | 19 ++ .../pipelines/main_multitrack_pipeline.py | 1 + .../reflector/pipelines/topic_processing.py | 14 +- .../processors/summary/summary_builder.py | 195 +++++++++++++++++- .../processors/transcript_final_summary.py | 26 ++- .../processors/transcript_topic_detector.py | 6 +- server/reflector/processors/types.py | 4 + server/reflector/settings.py | 3 + server/reflector/views/transcripts.py | 1 + server/reflector/worker/webhook.py | 1 + server/tests/test_pipeline_main_file.py | 15 +- 15 files changed, 306 insertions(+), 24 deletions(-) create mode 100644 server/migrations/versions/05f8688d6895_add_action_items.py diff --git a/server/migrations/versions/05f8688d6895_add_action_items.py b/server/migrations/versions/05f8688d6895_add_action_items.py new file mode 100644 index 00000000..b789263f --- /dev/null +++ b/server/migrations/versions/05f8688d6895_add_action_items.py @@ -0,0 +1,26 @@ +"""add_action_items + +Revision ID: 05f8688d6895 +Revises: bbafedfa510c +Create Date: 2025-12-12 11:57:50.209658 + +""" + +from typing import Sequence, Union + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "05f8688d6895" +down_revision: Union[str, None] = "bbafedfa510c" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.add_column("transcript", sa.Column("action_items", sa.JSON(), nullable=True)) + + +def downgrade() -> None: + op.drop_column("transcript", "action_items") diff --git a/server/reflector/db/transcripts.py b/server/reflector/db/transcripts.py index f9c3c057..736075c8 100644 --- a/server/reflector/db/transcripts.py +++ b/server/reflector/db/transcripts.py @@ -44,6 +44,7 @@ transcripts = sqlalchemy.Table( sqlalchemy.Column("title", sqlalchemy.String), sqlalchemy.Column("short_summary", sqlalchemy.String), sqlalchemy.Column("long_summary", sqlalchemy.String), + sqlalchemy.Column("action_items", sqlalchemy.JSON), sqlalchemy.Column("topics", sqlalchemy.JSON), sqlalchemy.Column("events", sqlalchemy.JSON), sqlalchemy.Column("participants", sqlalchemy.JSON), @@ -164,6 +165,10 @@ class TranscriptFinalLongSummary(BaseModel): long_summary: str +class TranscriptActionItems(BaseModel): + action_items: dict + + class TranscriptFinalTitle(BaseModel): title: str @@ -204,6 +209,7 @@ class Transcript(BaseModel): locked: bool = False short_summary: str | None = None long_summary: str | None = None + action_items: dict | None = None topics: list[TranscriptTopic] = [] events: list[TranscriptEvent] = [] participants: list[TranscriptParticipant] | None = [] @@ -368,7 +374,12 @@ class TranscriptController: room_id: str | None = None, search_term: str | None = None, return_query: bool = False, - exclude_columns: list[str] = ["topics", "events", "participants"], + exclude_columns: list[str] = [ + "topics", + "events", + "participants", + "action_items", + ], ) -> list[Transcript]: """ Get all transcripts diff --git a/server/reflector/llm.py b/server/reflector/llm.py index 10ba9138..f7c9137d 100644 --- a/server/reflector/llm.py +++ b/server/reflector/llm.py @@ -232,14 +232,17 @@ class LLM: texts: list[str], output_cls: Type[T], tone_name: str | None = None, + timeout: int | None = None, ) -> T: """Get structured output from LLM with validation retry via Workflow.""" + if timeout is None: + timeout = self.settings_obj.LLM_STRUCTURED_RESPONSE_TIMEOUT async def run_workflow(): workflow = StructuredOutputWorkflow( output_cls=output_cls, max_retries=self.settings_obj.LLM_PARSE_MAX_RETRIES + 1, - timeout=120, + timeout=timeout, ) result = await workflow.run( diff --git a/server/reflector/pipelines/main_file_pipeline.py b/server/reflector/pipelines/main_file_pipeline.py index aff6e042..b058e5b9 100644 --- a/server/reflector/pipelines/main_file_pipeline.py +++ b/server/reflector/pipelines/main_file_pipeline.py @@ -309,6 +309,7 @@ class PipelineMainFile(PipelineMainBase): transcript, on_long_summary_callback=self.on_long_summary, on_short_summary_callback=self.on_short_summary, + on_action_items_callback=self.on_action_items, empty_pipeline=self.empty_pipeline, logger=self.logger, ) diff --git a/server/reflector/pipelines/main_live_pipeline.py b/server/reflector/pipelines/main_live_pipeline.py index 83e560d6..2b6c6d07 100644 --- a/server/reflector/pipelines/main_live_pipeline.py +++ b/server/reflector/pipelines/main_live_pipeline.py @@ -27,6 +27,7 @@ from reflector.db.recordings import recordings_controller from reflector.db.rooms import rooms_controller from reflector.db.transcripts import ( Transcript, + TranscriptActionItems, TranscriptDuration, TranscriptFinalLongSummary, TranscriptFinalShortSummary, @@ -306,6 +307,23 @@ class PipelineMainBase(PipelineRunner[PipelineMessage], Generic[PipelineMessage] data=final_short_summary, ) + @broadcast_to_sockets + async def on_action_items(self, data): + action_items = TranscriptActionItems(action_items=data.action_items) + async with self.transaction(): + transcript = await self.get_transcript() + await transcripts_controller.update( + transcript, + { + "action_items": action_items.action_items, + }, + ) + return await transcripts_controller.append_event( + transcript=transcript, + event="ACTION_ITEMS", + data=action_items, + ) + @broadcast_to_sockets async def on_duration(self, data): async with self.transaction(): @@ -465,6 +483,7 @@ class PipelineMainFinalSummaries(PipelineMainFromTopics): transcript=self._transcript, callback=self.on_long_summary, on_short_summary=self.on_short_summary, + on_action_items=self.on_action_items, ), ] diff --git a/server/reflector/pipelines/main_multitrack_pipeline.py b/server/reflector/pipelines/main_multitrack_pipeline.py index 579bfbd3..d3b28274 100644 --- a/server/reflector/pipelines/main_multitrack_pipeline.py +++ b/server/reflector/pipelines/main_multitrack_pipeline.py @@ -772,6 +772,7 @@ class PipelineMainMultitrack(PipelineMainBase): transcript, on_long_summary_callback=self.on_long_summary, on_short_summary_callback=self.on_short_summary, + on_action_items_callback=self.on_action_items, empty_pipeline=self.empty_pipeline, logger=self.logger, ) diff --git a/server/reflector/pipelines/topic_processing.py b/server/reflector/pipelines/topic_processing.py index 7f055025..a6189d63 100644 --- a/server/reflector/pipelines/topic_processing.py +++ b/server/reflector/pipelines/topic_processing.py @@ -89,6 +89,7 @@ async def generate_summaries( *, on_long_summary_callback: Callable, on_short_summary_callback: Callable, + on_action_items_callback: Callable, empty_pipeline: EmptyPipeline, logger: structlog.BoundLogger, ): @@ -96,11 +97,14 @@ async def generate_summaries( logger.warning("No topics for summary generation") return - processor = TranscriptFinalSummaryProcessor( - transcript=transcript, - callback=on_long_summary_callback, - on_short_summary=on_short_summary_callback, - ) + processor_kwargs = { + "transcript": transcript, + "callback": on_long_summary_callback, + "on_short_summary": on_short_summary_callback, + "on_action_items": on_action_items_callback, + } + + processor = TranscriptFinalSummaryProcessor(**processor_kwargs) processor.set_pipeline(empty_pipeline) for topic in topics: diff --git a/server/reflector/processors/summary/summary_builder.py b/server/reflector/processors/summary/summary_builder.py index df348093..b9770fc5 100644 --- a/server/reflector/processors/summary/summary_builder.py +++ b/server/reflector/processors/summary/summary_builder.py @@ -96,6 +96,36 @@ RECAP_PROMPT = dedent( """ ).strip() +ACTION_ITEMS_PROMPT = dedent( + """ + Identify action items from this meeting transcript. Your goal is to identify what was decided and what needs to happen next. + + Look for: + + 1. **Decisions Made**: Any decisions, choices, or conclusions reached during the meeting. For each decision: + - What was decided? (be specific) + - Who made the decision or was involved? (use actual participant names) + - Why was this decision made? (key factors, reasoning, or rationale) + + 2. **Next Steps / Action Items**: Any tasks, follow-ups, or actions that were mentioned or assigned. For each action item: + - What specific task needs to be done? (be concrete and actionable) + - Who is responsible? (use actual participant names if mentioned, or "team" if unclear) + - When is it due? (any deadlines, timeframes, or "by next meeting" type commitments) + - What context is needed? (any additional details that help understand the task) + + Guidelines: + - Be thorough and identify all action items, even if they seem minor + - Include items that were agreed upon, assigned, or committed to + - Include decisions even if they seem obvious or implicit + - If someone says "I'll do X" or "We should do Y", that's an action item + - If someone says "Let's go with option A", that's a decision + - Use the exact participant names from the transcript + - If no participant name is mentioned, you can leave assigned_to/decided_by as null + + Only return empty lists if the transcript contains NO decisions and NO action items whatsoever. + """ +).strip() + STRUCTURED_RESPONSE_PROMPT_TEMPLATE = dedent( """ Based on the following analysis, provide the information in the requested JSON format: @@ -155,6 +185,53 @@ class SubjectsResponse(BaseModel): ) +class ActionItem(BaseModel): + """A single action item from the meeting""" + + task: str = Field(description="The task or action item to be completed") + assigned_to: str | None = Field( + default=None, description="Person or team assigned to this task (name)" + ) + assigned_to_participant_id: str | None = Field( + default=None, description="Participant ID if assigned_to matches a participant" + ) + deadline: str | None = Field( + default=None, description="Deadline or timeframe mentioned for this task" + ) + context: str | None = Field( + default=None, description="Additional context or notes about this task" + ) + + +class Decision(BaseModel): + """A decision made during the meeting""" + + decision: str = Field(description="What was decided") + rationale: str | None = Field( + default=None, + description="Reasoning or key factors that influenced this decision", + ) + decided_by: str | None = Field( + default=None, description="Person or group who made the decision (name)" + ) + decided_by_participant_id: str | None = Field( + default=None, description="Participant ID if decided_by matches a participant" + ) + + +class ActionItemsResponse(BaseModel): + """Pydantic model for identified action items""" + + decisions: list[Decision] = Field( + default_factory=list, + description="List of decisions made during the meeting", + ) + next_steps: list[ActionItem] = Field( + default_factory=list, + description="List of action items and next steps to be taken", + ) + + class SummaryBuilder: def __init__(self, llm: LLM, filename: str | None = None, logger=None) -> None: self.transcript: str | None = None @@ -166,6 +243,8 @@ class SummaryBuilder: self.model_name: str = llm.model_name self.logger = logger or structlog.get_logger() self.participant_instructions: str | None = None + self.action_items: ActionItemsResponse | None = None + self.participant_name_to_id: dict[str, str] = {} if filename: self.read_transcript_from_file(filename) @@ -189,13 +268,20 @@ class SummaryBuilder: self.llm = llm async def _get_structured_response( - self, prompt: str, output_cls: Type[T], tone_name: str | None = None + self, + prompt: str, + output_cls: Type[T], + tone_name: str | None = None, + timeout: int | None = None, ) -> T: """Generic function to get structured output from LLM for non-function-calling models.""" - # Add participant instructions to the prompt if available enhanced_prompt = self._enhance_prompt_with_participants(prompt) return await self.llm.get_structured_response( - enhanced_prompt, [self.transcript], output_cls, tone_name=tone_name + enhanced_prompt, + [self.transcript], + output_cls, + tone_name=tone_name, + timeout=timeout, ) async def _get_response( @@ -216,11 +302,19 @@ class SummaryBuilder: # Participants # ---------------------------------------------------------------------------- - def set_known_participants(self, participants: list[str]) -> None: + def set_known_participants( + self, + participants: list[str], + participant_name_to_id: dict[str, str] | None = None, + ) -> None: """ Set known participants directly without LLM identification. This is used when participants are already identified and stored. They are appended at the end of the transcript, providing more context for the assistant. + + Args: + participants: List of participant names + participant_name_to_id: Optional mapping of participant names to their IDs """ if not participants: self.logger.warning("No participants provided") @@ -231,10 +325,12 @@ class SummaryBuilder: participants=participants, ) + if participant_name_to_id: + self.participant_name_to_id = participant_name_to_id + participants_md = self.format_list_md(participants) self.transcript += f"\n\n# Participants\n\n{participants_md}" - # Set instructions that will be automatically added to all prompts participants_list = ", ".join(participants) self.participant_instructions = dedent( f""" @@ -413,6 +509,92 @@ class SummaryBuilder: self.recap = str(recap_response) self.logger.info(f"Quick recap: {self.recap}") + def _map_participant_names_to_ids( + self, response: ActionItemsResponse + ) -> ActionItemsResponse: + """Map participant names in action items to participant IDs.""" + if not self.participant_name_to_id: + return response + + decisions = [] + for decision in response.decisions: + new_decision = decision.model_copy() + if ( + decision.decided_by + and decision.decided_by in self.participant_name_to_id + ): + new_decision.decided_by_participant_id = self.participant_name_to_id[ + decision.decided_by + ] + decisions.append(new_decision) + + next_steps = [] + for item in response.next_steps: + new_item = item.model_copy() + if item.assigned_to and item.assigned_to in self.participant_name_to_id: + new_item.assigned_to_participant_id = self.participant_name_to_id[ + item.assigned_to + ] + next_steps.append(new_item) + + return ActionItemsResponse(decisions=decisions, next_steps=next_steps) + + async def identify_action_items(self) -> ActionItemsResponse | None: + """Identify action items (decisions and next steps) from the transcript.""" + self.logger.info("--- identify action items using TreeSummarize") + + if not self.transcript: + self.logger.warning( + "No transcript available for action items identification" + ) + self.action_items = None + return None + + action_items_prompt = ACTION_ITEMS_PROMPT + + try: + response = await self._get_structured_response( + action_items_prompt, + ActionItemsResponse, + tone_name="Action item identifier", + timeout=settings.LLM_STRUCTURED_RESPONSE_TIMEOUT, + ) + + response = self._map_participant_names_to_ids(response) + + self.action_items = response + self.logger.info( + f"Identified {len(response.decisions)} decisions and {len(response.next_steps)} action items", + decisions_count=len(response.decisions), + next_steps_count=len(response.next_steps), + ) + + if response.decisions: + self.logger.debug( + "Decisions identified", + decisions=[d.decision for d in response.decisions], + ) + if response.next_steps: + self.logger.debug( + "Action items identified", + tasks=[item.task for item in response.next_steps], + ) + if not response.decisions and not response.next_steps: + self.logger.warning( + "No action items identified from transcript", + transcript_length=len(self.transcript), + ) + + return response + + except Exception as e: + self.logger.error( + f"Error identifying action items: {e}", + exc_info=True, + ) + self.action_items = None + return None + async def generate_summary(self, only_subjects: bool = False) -> None: """ Generate summary by extracting subjects, creating summaries for each, and generating a recap. @@ -424,6 +606,7 @@ class SummaryBuilder: await self.generate_subject_summaries() await self.generate_recap() + await self.identify_action_items() # ---------------------------------------------------------------------------- # Markdown @@ -526,8 +709,6 @@ if __name__ == "__main__": if args.summary: await sm.generate_summary() - # Note: action items generation has been removed - print("") print("-" * 80) print("") diff --git a/server/reflector/processors/transcript_final_summary.py b/server/reflector/processors/transcript_final_summary.py index dfe07aad..932a46be 100644 --- a/server/reflector/processors/transcript_final_summary.py +++ b/server/reflector/processors/transcript_final_summary.py @@ -1,7 +1,12 @@ from reflector.llm import LLM from reflector.processors.base import Processor from reflector.processors.summary.summary_builder import SummaryBuilder -from reflector.processors.types import FinalLongSummary, FinalShortSummary, TitleSummary +from reflector.processors.types import ( + ActionItems, + FinalLongSummary, + FinalShortSummary, + TitleSummary, +) from reflector.settings import settings @@ -27,15 +32,20 @@ class TranscriptFinalSummaryProcessor(Processor): builder = SummaryBuilder(self.llm, logger=self.logger) builder.set_transcript(text) - # Use known participants if available, otherwise identify them if self.transcript and self.transcript.participants: - # Extract participant names from the stored participants participant_names = [p.name for p in self.transcript.participants if p.name] if participant_names: self.logger.info( f"Using {len(participant_names)} known participants from transcript" ) - builder.set_known_participants(participant_names) + participant_name_to_id = { + p.name: p.id + for p in self.transcript.participants + if p.name and p.id + } + builder.set_known_participants( + participant_names, participant_name_to_id=participant_name_to_id + ) else: self.logger.info( "Participants field exists but is empty, identifying participants" @@ -63,7 +73,6 @@ class TranscriptFinalSummaryProcessor(Processor): self.logger.warning("No summary to output") return - # build the speakermap from the transcript speakermap = {} if self.transcript: speakermap = { @@ -76,8 +85,6 @@ class TranscriptFinalSummaryProcessor(Processor): speakermap=speakermap, ) - # build the transcript as a single string - # Replace speaker IDs with actual participant names if available text_transcript = [] unique_speakers = set() for topic in self.chunks: @@ -111,4 +118,9 @@ class TranscriptFinalSummaryProcessor(Processor): ) await self.emit(final_short_summary, name="short_summary") + if self.builder and self.builder.action_items: + action_items = self.builder.action_items.model_dump() + action_items = ActionItems(action_items=action_items) + await self.emit(action_items, name="action_items") + await self.emit(final_long_summary) diff --git a/server/reflector/processors/transcript_topic_detector.py b/server/reflector/processors/transcript_topic_detector.py index 695d3af3..154db0ec 100644 --- a/server/reflector/processors/transcript_topic_detector.py +++ b/server/reflector/processors/transcript_topic_detector.py @@ -78,7 +78,11 @@ class TranscriptTopicDetectorProcessor(Processor): """ prompt = TOPIC_PROMPT.format(text=text) response = await self.llm.get_structured_response( - prompt, [text], TopicResponse, tone_name="Topic analyzer" + prompt, + [text], + TopicResponse, + tone_name="Topic analyzer", + timeout=settings.LLM_STRUCTURED_RESPONSE_TIMEOUT, ) return response diff --git a/server/reflector/processors/types.py b/server/reflector/processors/types.py index 3369e09c..ca6d675f 100644 --- a/server/reflector/processors/types.py +++ b/server/reflector/processors/types.py @@ -264,6 +264,10 @@ class FinalShortSummary(BaseModel): duration: float +class ActionItems(BaseModel): + action_items: dict # JSON-serializable dict from ActionItemsResponse + + class FinalTitle(BaseModel): title: str diff --git a/server/reflector/settings.py b/server/reflector/settings.py index 12276121..a72981b9 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -77,6 +77,9 @@ class Settings(BaseSettings): LLM_PARSE_MAX_RETRIES: int = ( 3 # Max retries for JSON/validation errors (total attempts = retries + 1) ) + LLM_STRUCTURED_RESPONSE_TIMEOUT: int = ( + 300 # Timeout in seconds for structured responses (5 minutes) + ) # Diarization DIARIZATION_ENABLED: bool = True diff --git a/server/reflector/views/transcripts.py b/server/reflector/views/transcripts.py index 02c1aa3e..2e1c9d30 100644 --- a/server/reflector/views/transcripts.py +++ b/server/reflector/views/transcripts.py @@ -501,6 +501,7 @@ async def transcript_get( "title": transcript.title, "short_summary": transcript.short_summary, "long_summary": transcript.long_summary, + "action_items": transcript.action_items, "created_at": transcript.created_at, "share_mode": transcript.share_mode, "source_language": transcript.source_language, diff --git a/server/reflector/worker/webhook.py b/server/reflector/worker/webhook.py index 57b294d8..d58cc7b3 100644 --- a/server/reflector/worker/webhook.py +++ b/server/reflector/worker/webhook.py @@ -123,6 +123,7 @@ async def send_transcript_webhook( "target_language": transcript.target_language, "status": transcript.status, "frontend_url": frontend_url, + "action_items": transcript.action_items, }, "room": { "id": room.id, diff --git a/server/tests/test_pipeline_main_file.py b/server/tests/test_pipeline_main_file.py index 825c8389..8a4d63dd 100644 --- a/server/tests/test_pipeline_main_file.py +++ b/server/tests/test_pipeline_main_file.py @@ -266,7 +266,11 @@ async def mock_summary_processor(): # When flush is called, simulate summary generation by calling the callbacks async def flush_with_callback(): mock_summary.flush_called = True - from reflector.processors.types import FinalLongSummary, FinalShortSummary + from reflector.processors.types import ( + ActionItems, + FinalLongSummary, + FinalShortSummary, + ) if hasattr(mock_summary, "_callback"): await mock_summary._callback( @@ -276,12 +280,19 @@ async def mock_summary_processor(): await mock_summary._on_short_summary( FinalShortSummary(short_summary="Test short summary", duration=10.0) ) + if hasattr(mock_summary, "_on_action_items"): + await mock_summary._on_action_items( + ActionItems(action_items={"test": "action item"}) + ) mock_summary.flush = flush_with_callback - def init_with_callback(transcript=None, callback=None, on_short_summary=None): + def init_with_callback( + transcript=None, callback=None, on_short_summary=None, on_action_items=None + ): mock_summary._callback = callback mock_summary._on_short_summary = on_short_summary + mock_summary._on_action_items = on_action_items return mock_summary mock_summary_class.side_effect = init_with_callback