From 2e7ae9a6ab7f5fe86f902c3299878918aaab3f48 Mon Sep 17 00:00:00 2001 From: Sergey Mankovsky Date: Tue, 27 Aug 2024 18:54:43 +0200 Subject: [PATCH] Celery task for posting --- server/reflector/db/__init__.py | 1 + server/reflector/db/meetings.py | 10 +++ .../reflector/pipelines/main_live_pipeline.py | 37 +++++++++- server/reflector/settings.py | 4 ++ server/reflector/zulip.py | 72 +++++++++++++++++++ 5 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 server/reflector/zulip.py diff --git a/server/reflector/db/__init__.py b/server/reflector/db/__init__.py index 4806e377..694e9946 100644 --- a/server/reflector/db/__init__.py +++ b/server/reflector/db/__init__.py @@ -7,6 +7,7 @@ database = databases.Database(settings.DATABASE_URL) metadata = sqlalchemy.MetaData() # import models +import reflector.db.meetings import reflector.db.rooms # noqa import reflector.db.transcripts # noqa diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index ba85bb38..1ddcc0ba 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -94,6 +94,16 @@ class MeetingController: return Meeting(**result) + async def get_by_id(self, meeting_id: str, **kwargs) -> Meeting | None: + """ + Get a meeting by id + """ + query = meetings.select().where(meetings.c.id == meeting_id) + result = await database.fetch_one(query) + if not result: + return None + return Meeting(**result) + async def get_by_id_for_http(self, meeting_id: str, user_id: str | None) -> Meeting: """ Get a meeting by ID for HTTP request. diff --git a/server/reflector/pipelines/main_live_pipeline.py b/server/reflector/pipelines/main_live_pipeline.py index b1e0a2aa..3dde5175 100644 --- a/server/reflector/pipelines/main_live_pipeline.py +++ b/server/reflector/pipelines/main_live_pipeline.py @@ -17,6 +17,8 @@ from contextlib import asynccontextmanager from celery import chord, group, shared_task from pydantic import BaseModel +from reflector.db.meetings import meetings_controller +from reflector.db.rooms import rooms_controller from reflector.db.transcripts import ( Transcript, TranscriptDuration, @@ -53,6 +55,7 @@ from reflector.processors.types import ( from reflector.processors.types import Transcript as TranscriptProcessorType from reflector.settings import settings from reflector.ws_manager import WebsocketManager, get_ws_manager +from reflector.zulip import get_zulip_message, send_message_to_zulip from structlog import BoundLogger as Logger @@ -564,6 +567,31 @@ async def pipeline_summaries(transcript: Transcript, logger: Logger): logger.info("Summaries done") +@get_transcript +async def pipeline_post_to_zulip(transcript: Transcript, logger: Logger): + logger.info("Starting post to zulip") + + if not transcript.meeting_id: + logger.info("Transcript has no meeting") + return + + meeting = await meetings_controller.get_by_id(transcript.meeting_id) + if not meeting: + logger.info("No meeting found for this transcript") + return + + room = await rooms_controller.get_by_id(meeting.room_id) + if not room: + logger.error(f"Missing room for a meeting {meeting.id}") + return + + if room.zulip_auto_post: + message = get_zulip_message(transcript=transcript) + send_message_to_zulip(room.zulip_stream, room.zulip_topic, message) + + logger.info("Posted to zulip") + + # =================================================================== # Celery tasks that can be called from the API # =================================================================== @@ -611,6 +639,12 @@ async def task_pipeline_final_summaries(*, transcript_id: str): await pipeline_summaries(transcript_id=transcript_id) +@shared_task +@asynctask +async def task_pipeline_post_to_zulip(*, transcript_id: str): + await pipeline_post_to_zulip(transcript_id=transcript_id) + + def pipeline_post(*, transcript_id: str): """ Run the post pipeline @@ -632,7 +666,8 @@ def pipeline_post(*, transcript_id: str): chain = chord( group(chain_mp3_and_diarize, chain_title_preview), chain_final_summaries, - ) + ) | task_pipeline_post_to_zulip.si(transcript_id=transcript_id) + chain.delay() diff --git a/server/reflector/settings.py b/server/reflector/settings.py index d5032e58..e2b4481d 100644 --- a/server/reflector/settings.py +++ b/server/reflector/settings.py @@ -141,5 +141,9 @@ class Settings(BaseSettings): AWS_WHEREBY_ACCESS_KEY_ID: str | None = None AWS_WHEREBY_ACCESS_KEY_SECRET: str | None = None + ZULIP_REALM: str | None = None + ZULIP_API_KEY: str | None = None + ZULIP_BOT_EMAIL: str | None = None + settings = Settings() diff --git a/server/reflector/zulip.py b/server/reflector/zulip.py new file mode 100644 index 00000000..4eb23014 --- /dev/null +++ b/server/reflector/zulip.py @@ -0,0 +1,72 @@ +from datetime import timedelta +from urllib.parse import urlparse + +import requests +from reflector.db.transcripts import Transcript +from reflector.settings import settings + + +def send_message_to_zulip(stream: str, topic: str, message: str): + if not stream or not topic or not message: + raise ValueError("Missing required parameters") + + try: + response = requests.post( + f"https://{settings.ZULIP_REALM}/api/v1/messages", + data={ + "type": "stream", + "to": stream, + "topic": topic, + "content": message, + }, + auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY), + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) + + response.raise_for_status() + + return response.json() + except requests.RequestException as error: + raise Exception(f"Failed to send message to Zulip: {error}") + + +def get_zulip_message(transcript: Transcript): + domain = ( + "http://localhost:3000" # Replace this with your deployment base URL if needed + ) + transcript_url = f"{domain}/transcripts/{transcript.id}" + + header_text = f"# Reflector – {transcript.title or 'Unnamed recording'}\n\n" + header_text += f"**Date**: \n" + header_text += f"**Link**: [{extract_domain(transcript_url)}]({transcript_url})\n" + header_text += f"**Duration**: {format_time_ms(transcript.duration)}\n\n" + + topic_text = "" + + if transcript.topics: + topic_text = "```spoiler Topics\n" + for topic in transcript.topics: + topic_text += f"1. [{format_time(topic.timestamp)}] {topic.title}\n" + topic_text += "```\n\n" + + summary = "```spoiler Summary\n" + summary += transcript.long_summary + summary += "```\n\n" + + message = header_text + summary + topic_text + "-----\n" + return message + + +def extract_domain(url: str) -> str: + return urlparse(url).netloc + + +def format_time_ms(milliseconds: float) -> str: + return format_time(milliseconds // 1000) + + +def format_time(seconds: float) -> str: + td = timedelta(seconds=seconds) + time = str(td - timedelta(microseconds=td.microseconds)) + + return time[2:] if time.startswith("0:") else time