Celery task for posting

This commit is contained in:
2024-08-27 18:54:43 +02:00
parent c2595b616b
commit 2e7ae9a6ab
5 changed files with 123 additions and 1 deletions

View File

@@ -7,6 +7,7 @@ database = databases.Database(settings.DATABASE_URL)
metadata = sqlalchemy.MetaData() metadata = sqlalchemy.MetaData()
# import models # import models
import reflector.db.meetings
import reflector.db.rooms # noqa import reflector.db.rooms # noqa
import reflector.db.transcripts # noqa import reflector.db.transcripts # noqa

View File

@@ -94,6 +94,16 @@ class MeetingController:
return Meeting(**result) return Meeting(**result)
async def get_by_id(self, meeting_id: str, **kwargs) -> Meeting | None:
"""
Get a meeting by id
"""
query = meetings.select().where(meetings.c.id == meeting_id)
result = await database.fetch_one(query)
if not result:
return None
return Meeting(**result)
async def get_by_id_for_http(self, meeting_id: str, user_id: str | None) -> Meeting: async def get_by_id_for_http(self, meeting_id: str, user_id: str | None) -> Meeting:
""" """
Get a meeting by ID for HTTP request. Get a meeting by ID for HTTP request.

View File

@@ -17,6 +17,8 @@ from contextlib import asynccontextmanager
from celery import chord, group, shared_task from celery import chord, group, shared_task
from pydantic import BaseModel from pydantic import BaseModel
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import ( from reflector.db.transcripts import (
Transcript, Transcript,
TranscriptDuration, TranscriptDuration,
@@ -53,6 +55,7 @@ from reflector.processors.types import (
from reflector.processors.types import Transcript as TranscriptProcessorType from reflector.processors.types import Transcript as TranscriptProcessorType
from reflector.settings import settings from reflector.settings import settings
from reflector.ws_manager import WebsocketManager, get_ws_manager from reflector.ws_manager import WebsocketManager, get_ws_manager
from reflector.zulip import get_zulip_message, send_message_to_zulip
from structlog import BoundLogger as Logger from structlog import BoundLogger as Logger
@@ -564,6 +567,31 @@ async def pipeline_summaries(transcript: Transcript, logger: Logger):
logger.info("Summaries done") logger.info("Summaries done")
@get_transcript
async def pipeline_post_to_zulip(transcript: Transcript, logger: Logger):
logger.info("Starting post to zulip")
if not transcript.meeting_id:
logger.info("Transcript has no meeting")
return
meeting = await meetings_controller.get_by_id(transcript.meeting_id)
if not meeting:
logger.info("No meeting found for this transcript")
return
room = await rooms_controller.get_by_id(meeting.room_id)
if not room:
logger.error(f"Missing room for a meeting {meeting.id}")
return
if room.zulip_auto_post:
message = get_zulip_message(transcript=transcript)
send_message_to_zulip(room.zulip_stream, room.zulip_topic, message)
logger.info("Posted to zulip")
# =================================================================== # ===================================================================
# Celery tasks that can be called from the API # Celery tasks that can be called from the API
# =================================================================== # ===================================================================
@@ -611,6 +639,12 @@ async def task_pipeline_final_summaries(*, transcript_id: str):
await pipeline_summaries(transcript_id=transcript_id) await pipeline_summaries(transcript_id=transcript_id)
@shared_task
@asynctask
async def task_pipeline_post_to_zulip(*, transcript_id: str):
await pipeline_post_to_zulip(transcript_id=transcript_id)
def pipeline_post(*, transcript_id: str): def pipeline_post(*, transcript_id: str):
""" """
Run the post pipeline Run the post pipeline
@@ -632,7 +666,8 @@ def pipeline_post(*, transcript_id: str):
chain = chord( chain = chord(
group(chain_mp3_and_diarize, chain_title_preview), group(chain_mp3_and_diarize, chain_title_preview),
chain_final_summaries, chain_final_summaries,
) ) | task_pipeline_post_to_zulip.si(transcript_id=transcript_id)
chain.delay() chain.delay()

View File

@@ -141,5 +141,9 @@ class Settings(BaseSettings):
AWS_WHEREBY_ACCESS_KEY_ID: str | None = None AWS_WHEREBY_ACCESS_KEY_ID: str | None = None
AWS_WHEREBY_ACCESS_KEY_SECRET: str | None = None AWS_WHEREBY_ACCESS_KEY_SECRET: str | None = None
ZULIP_REALM: str | None = None
ZULIP_API_KEY: str | None = None
ZULIP_BOT_EMAIL: str | None = None
settings = Settings() settings = Settings()

72
server/reflector/zulip.py Normal file
View File

@@ -0,0 +1,72 @@
from datetime import timedelta
from urllib.parse import urlparse
import requests
from reflector.db.transcripts import Transcript
from reflector.settings import settings
def send_message_to_zulip(stream: str, topic: str, message: str):
if not stream or not topic or not message:
raise ValueError("Missing required parameters")
try:
response = requests.post(
f"https://{settings.ZULIP_REALM}/api/v1/messages",
data={
"type": "stream",
"to": stream,
"topic": topic,
"content": message,
},
auth=(settings.ZULIP_BOT_EMAIL, settings.ZULIP_API_KEY),
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
response.raise_for_status()
return response.json()
except requests.RequestException as error:
raise Exception(f"Failed to send message to Zulip: {error}")
def get_zulip_message(transcript: Transcript):
domain = (
"http://localhost:3000" # Replace this with your deployment base URL if needed
)
transcript_url = f"{domain}/transcripts/{transcript.id}"
header_text = f"# Reflector {transcript.title or 'Unnamed recording'}\n\n"
header_text += f"**Date**: <time:{transcript.created_at.isoformat()}>\n"
header_text += f"**Link**: [{extract_domain(transcript_url)}]({transcript_url})\n"
header_text += f"**Duration**: {format_time_ms(transcript.duration)}\n\n"
topic_text = ""
if transcript.topics:
topic_text = "```spoiler Topics\n"
for topic in transcript.topics:
topic_text += f"1. [{format_time(topic.timestamp)}] {topic.title}\n"
topic_text += "```\n\n"
summary = "```spoiler Summary\n"
summary += transcript.long_summary
summary += "```\n\n"
message = header_text + summary + topic_text + "-----\n"
return message
def extract_domain(url: str) -> str:
return urlparse(url).netloc
def format_time_ms(milliseconds: float) -> str:
return format_time(milliseconds // 1000)
def format_time(seconds: float) -> str:
td = timedelta(seconds=seconds)
time = str(td - timedelta(microseconds=td.microseconds))
return time[2:] if time.startswith("0:") else time