mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-04-24 14:15:19 +00:00
fix: cpu usage + email improvements (#944)
* fix: cpu usage on server ws manager, 100% to 0% on idle * fix: change email icon to white and prefill email in daily room for authenticated users * fix: improve email sending with full ts transcript
This commit is contained in:
committed by
GitHub
parent
ec8b49738e
commit
8c4f5e9c0f
@@ -120,7 +120,8 @@ class Meeting(BaseModel):
|
||||
daily_composed_video_s3_key: str | None = None
|
||||
daily_composed_video_duration: int | None = None
|
||||
# Email recipients for transcript notification
|
||||
email_recipients: list[str] | None = None
|
||||
# Each entry is {"email": str, "include_link": bool} or a legacy plain str
|
||||
email_recipients: list[dict | str] | None = None
|
||||
|
||||
|
||||
class MeetingController:
|
||||
@@ -399,15 +400,27 @@ class MeetingController:
|
||||
async with get_database().transaction(isolation="serializable"):
|
||||
yield
|
||||
|
||||
async def add_email_recipient(self, meeting_id: str, email: str) -> list[str]:
|
||||
"""Add an email to the meeting's email_recipients list (no duplicates)."""
|
||||
async def add_email_recipient(
|
||||
self, meeting_id: str, email: str, *, include_link: bool = True
|
||||
) -> list[dict]:
|
||||
"""Add an email to the meeting's email_recipients list (no duplicates).
|
||||
|
||||
Each entry is stored as {"email": str, "include_link": bool}.
|
||||
Legacy plain-string entries are normalised on read.
|
||||
"""
|
||||
async with self.transaction():
|
||||
meeting = await self.get_by_id(meeting_id)
|
||||
if not meeting:
|
||||
raise ValueError(f"Meeting {meeting_id} not found")
|
||||
current = meeting.email_recipients or []
|
||||
if email not in current:
|
||||
current.append(email)
|
||||
# Normalise legacy string entries
|
||||
current: list[dict] = [
|
||||
entry
|
||||
if isinstance(entry, dict)
|
||||
else {"email": entry, "include_link": True}
|
||||
for entry in (meeting.email_recipients or [])
|
||||
]
|
||||
if not any(r["email"] == email for r in current):
|
||||
current.append({"email": email, "include_link": include_link})
|
||||
await self.update_meeting(meeting_id, email_recipients=current)
|
||||
return current
|
||||
|
||||
|
||||
@@ -1,11 +1,13 @@
|
||||
from email.mime.multipart import MIMEMultipart
|
||||
from email.mime.text import MIMEText
|
||||
from html import escape
|
||||
|
||||
import aiosmtplib
|
||||
import structlog
|
||||
|
||||
from reflector.db.transcripts import Transcript
|
||||
from reflector.db.transcripts import SourceKind, Transcript
|
||||
from reflector.settings import settings
|
||||
from reflector.utils.transcript_formats import transcript_to_text_timestamped
|
||||
|
||||
logger = structlog.get_logger(__name__)
|
||||
|
||||
@@ -18,35 +20,111 @@ def get_transcript_url(transcript: Transcript) -> str:
|
||||
return f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
|
||||
|
||||
|
||||
def _build_plain_text(transcript: Transcript, url: str) -> str:
|
||||
def _get_timestamped_text(transcript: Transcript) -> str:
|
||||
"""Build the full timestamped transcript text using existing utility."""
|
||||
if not transcript.topics:
|
||||
return ""
|
||||
is_multitrack = transcript.source_kind == SourceKind.ROOM
|
||||
return transcript_to_text_timestamped(
|
||||
transcript.topics, transcript.participants, is_multitrack=is_multitrack
|
||||
)
|
||||
|
||||
|
||||
def _build_plain_text(transcript: Transcript, url: str, include_link: bool) -> str:
|
||||
title = transcript.title or "Unnamed recording"
|
||||
lines = [
|
||||
f"Your transcript is ready: {title}",
|
||||
"",
|
||||
f"View it here: {url}",
|
||||
]
|
||||
lines = [f"Reflector: {title}", ""]
|
||||
|
||||
if transcript.short_summary:
|
||||
lines.extend(["", "Summary:", transcript.short_summary])
|
||||
lines.extend(["Summary:", transcript.short_summary, ""])
|
||||
|
||||
timestamped = _get_timestamped_text(transcript)
|
||||
if timestamped:
|
||||
lines.extend(["Transcript:", timestamped, ""])
|
||||
|
||||
if include_link:
|
||||
lines.append(f"View transcript: {url}")
|
||||
lines.append("")
|
||||
|
||||
lines.append(
|
||||
"This email was sent because you requested to receive "
|
||||
"the transcript from a meeting."
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _build_html(transcript: Transcript, url: str) -> str:
|
||||
title = transcript.title or "Unnamed recording"
|
||||
def _build_html(transcript: Transcript, url: str, include_link: bool) -> str:
|
||||
title = escape(transcript.title or "Unnamed recording")
|
||||
|
||||
summary_html = ""
|
||||
if transcript.short_summary:
|
||||
summary_html = f"<p style='color:#555;'>{transcript.short_summary}</p>"
|
||||
summary_html = (
|
||||
f'<p style="color:#555;margin-bottom:16px;">'
|
||||
f"{escape(transcript.short_summary)}</p>"
|
||||
)
|
||||
|
||||
transcript_html = ""
|
||||
timestamped = _get_timestamped_text(transcript)
|
||||
if timestamped:
|
||||
# Build styled transcript lines
|
||||
styled_lines = []
|
||||
for line in timestamped.split("\n"):
|
||||
if not line.strip():
|
||||
continue
|
||||
# Lines are formatted as "[MM:SS] Speaker: text"
|
||||
if line.startswith("[") and "] " in line:
|
||||
bracket_end = line.index("] ")
|
||||
timestamp = escape(line[: bracket_end + 1])
|
||||
rest = line[bracket_end + 2 :]
|
||||
if ": " in rest:
|
||||
colon_pos = rest.index(": ")
|
||||
speaker = escape(rest[:colon_pos])
|
||||
text = escape(rest[colon_pos + 2 :])
|
||||
styled_lines.append(
|
||||
f'<div style="margin-bottom:4px;">'
|
||||
f'<span style="color:#888;font-size:12px;">{timestamp}</span> '
|
||||
f"<strong>{speaker}:</strong> {text}</div>"
|
||||
)
|
||||
else:
|
||||
styled_lines.append(
|
||||
f'<div style="margin-bottom:4px;">{escape(line)}</div>'
|
||||
)
|
||||
else:
|
||||
styled_lines.append(
|
||||
f'<div style="margin-bottom:4px;">{escape(line)}</div>'
|
||||
)
|
||||
|
||||
transcript_html = (
|
||||
'<h3 style="margin-top:20px;margin-bottom:8px;">Transcript</h3>'
|
||||
'<div style="background:#f7f7f7;padding:16px;border-radius:6px;'
|
||||
'font-size:13px;line-height:1.6;max-height:600px;overflow-y:auto;">'
|
||||
f"{''.join(styled_lines)}</div>"
|
||||
)
|
||||
|
||||
link_html = ""
|
||||
if include_link:
|
||||
link_html = (
|
||||
'<p style="margin-top:20px;">'
|
||||
f'<a href="{url}" style="display:inline-block;padding:10px 20px;'
|
||||
"background:#4A90D9;color:#fff;text-decoration:none;"
|
||||
'border-radius:4px;">View Transcript</a></p>'
|
||||
)
|
||||
|
||||
return f"""\
|
||||
<div style="font-family:sans-serif;max-width:600px;margin:0 auto;">
|
||||
<h2>Your transcript is ready</h2>
|
||||
<p><strong>{title}</strong></p>
|
||||
<h2 style="margin-bottom:4px;">{title}</h2>
|
||||
{summary_html}
|
||||
<p><a href="{url}" style="display:inline-block;padding:10px 20px;background:#4A90D9;color:#fff;text-decoration:none;border-radius:4px;">View Transcript</a></p>
|
||||
<p style="color:#999;font-size:12px;">This email was sent because you requested to receive the transcript from a meeting.</p>
|
||||
{transcript_html}
|
||||
{link_html}
|
||||
<p style="color:#999;font-size:12px;margin-top:20px;">This email was sent because you requested to receive the transcript from a meeting.</p>
|
||||
</div>"""
|
||||
|
||||
|
||||
async def send_transcript_email(to_emails: list[str], transcript: Transcript) -> int:
|
||||
async def send_transcript_email(
|
||||
to_emails: list[str],
|
||||
transcript: Transcript,
|
||||
*,
|
||||
include_link: bool = True,
|
||||
) -> int:
|
||||
"""Send transcript notification to all emails. Returns count sent."""
|
||||
if not is_email_configured() or not to_emails:
|
||||
return 0
|
||||
@@ -57,12 +135,12 @@ async def send_transcript_email(to_emails: list[str], transcript: Transcript) ->
|
||||
|
||||
for email_addr in to_emails:
|
||||
msg = MIMEMultipart("alternative")
|
||||
msg["Subject"] = f"Transcript Ready: {title}"
|
||||
msg["Subject"] = f"Reflector: {title}"
|
||||
msg["From"] = settings.SMTP_FROM_EMAIL
|
||||
msg["To"] = email_addr
|
||||
|
||||
msg.attach(MIMEText(_build_plain_text(transcript, url), "plain"))
|
||||
msg.attach(MIMEText(_build_html(transcript, url), "html"))
|
||||
msg.attach(MIMEText(_build_plain_text(transcript, url, include_link), "plain"))
|
||||
msg.attach(MIMEText(_build_html(transcript, url, include_link), "html"))
|
||||
|
||||
try:
|
||||
await aiosmtplib.send(
|
||||
|
||||
@@ -64,3 +64,9 @@ TIMEOUT_HEAVY = 1200 # Transcription, fan-out LLM tasks (Hatchet execution_time
|
||||
TIMEOUT_HEAVY_HTTP = (
|
||||
1150 # httpx timeout for transcribe_track — below 1200 so Hatchet doesn't race
|
||||
)
|
||||
TIMEOUT_EXTRA_HEAVY = (
|
||||
3600 # Detect Topics, fan-out LLM tasks (Hatchet execution_timeout)
|
||||
)
|
||||
TIMEOUT_EXTRA_HEAVY_HTTP = (
|
||||
3400 # httpx timeout for detect_topics — below 3600 so Hatchet doesn't race
|
||||
)
|
||||
|
||||
@@ -41,6 +41,7 @@ from reflector.hatchet.broadcast import (
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.constants import (
|
||||
TIMEOUT_AUDIO,
|
||||
TIMEOUT_EXTRA_HEAVY,
|
||||
TIMEOUT_HEAVY,
|
||||
TIMEOUT_LONG,
|
||||
TIMEOUT_MEDIUM,
|
||||
@@ -693,7 +694,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
|
||||
|
||||
@daily_multitrack_pipeline.task(
|
||||
parents=[process_tracks],
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
|
||||
execution_timeout=timedelta(seconds=TIMEOUT_EXTRA_HEAVY),
|
||||
retries=3,
|
||||
backoff_factor=2.0,
|
||||
backoff_max_seconds=30,
|
||||
@@ -1510,22 +1511,41 @@ async def send_email(input: PipelineInput, ctx: Context) -> EmailResult:
|
||||
if recording and recording.meeting_id:
|
||||
meeting = await meetings_controller.get_by_id(recording.meeting_id)
|
||||
|
||||
recipients = (
|
||||
list(meeting.email_recipients)
|
||||
# Normalise meeting recipients (legacy strings → dicts)
|
||||
meeting_recipients: list[dict] = (
|
||||
[
|
||||
entry
|
||||
if isinstance(entry, dict)
|
||||
else {"email": entry, "include_link": True}
|
||||
for entry in (meeting.email_recipients or [])
|
||||
]
|
||||
if meeting and meeting.email_recipients
|
||||
else []
|
||||
)
|
||||
|
||||
# Also check room-level email
|
||||
# Room-level email always gets a link (room owner)
|
||||
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||
|
||||
room_email = None
|
||||
if transcript.room_id:
|
||||
room = await rooms_controller.get_by_id(transcript.room_id)
|
||||
if room and room.email_transcript_to:
|
||||
if room.email_transcript_to not in recipients:
|
||||
recipients.append(room.email_transcript_to)
|
||||
room_email = room.email_transcript_to
|
||||
|
||||
if not recipients:
|
||||
# Build two groups: with link and without link
|
||||
with_link = [
|
||||
r["email"] for r in meeting_recipients if r.get("include_link", True)
|
||||
]
|
||||
without_link = [
|
||||
r["email"] for r in meeting_recipients if not r.get("include_link", True)
|
||||
]
|
||||
|
||||
if room_email:
|
||||
if room_email not in with_link:
|
||||
with_link.append(room_email)
|
||||
without_link = [e for e in without_link if e != room_email]
|
||||
|
||||
if not with_link and not without_link:
|
||||
ctx.log("send_email skipped (no email recipients)")
|
||||
return EmailResult(skipped=True)
|
||||
|
||||
@@ -1533,7 +1553,15 @@ async def send_email(input: PipelineInput, ctx: Context) -> EmailResult:
|
||||
if meeting and meeting.email_recipients:
|
||||
await transcripts_controller.update(transcript, {"share_mode": "public"})
|
||||
|
||||
count = await send_transcript_email(recipients, transcript)
|
||||
count = 0
|
||||
if with_link:
|
||||
count += await send_transcript_email(
|
||||
with_link, transcript, include_link=True
|
||||
)
|
||||
if without_link:
|
||||
count += await send_transcript_email(
|
||||
without_link, transcript, include_link=False
|
||||
)
|
||||
ctx.log(f"send_email complete: sent {count} emails")
|
||||
|
||||
return EmailResult(emails_sent=count)
|
||||
|
||||
@@ -916,22 +916,41 @@ async def send_email(input: FilePipelineInput, ctx: Context) -> EmailResult:
|
||||
if recording and recording.meeting_id:
|
||||
meeting = await meetings_controller.get_by_id(recording.meeting_id)
|
||||
|
||||
recipients = (
|
||||
list(meeting.email_recipients)
|
||||
# Normalise meeting recipients (legacy strings → dicts)
|
||||
meeting_recipients: list[dict] = (
|
||||
[
|
||||
entry
|
||||
if isinstance(entry, dict)
|
||||
else {"email": entry, "include_link": True}
|
||||
for entry in (meeting.email_recipients or [])
|
||||
]
|
||||
if meeting and meeting.email_recipients
|
||||
else []
|
||||
)
|
||||
|
||||
# Also check room-level email
|
||||
# Room-level email always gets a link (room owner)
|
||||
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||
|
||||
room_email = None
|
||||
if transcript.room_id:
|
||||
room = await rooms_controller.get_by_id(transcript.room_id)
|
||||
if room and room.email_transcript_to:
|
||||
if room.email_transcript_to not in recipients:
|
||||
recipients.append(room.email_transcript_to)
|
||||
room_email = room.email_transcript_to
|
||||
|
||||
if not recipients:
|
||||
# Build two groups: with link and without link
|
||||
with_link = [
|
||||
r["email"] for r in meeting_recipients if r.get("include_link", True)
|
||||
]
|
||||
without_link = [
|
||||
r["email"] for r in meeting_recipients if not r.get("include_link", True)
|
||||
]
|
||||
|
||||
if room_email:
|
||||
if room_email not in with_link:
|
||||
with_link.append(room_email)
|
||||
without_link = [e for e in without_link if e != room_email]
|
||||
|
||||
if not with_link and not without_link:
|
||||
ctx.log("send_email skipped (no email recipients)")
|
||||
return EmailResult(skipped=True)
|
||||
|
||||
@@ -939,7 +958,15 @@ async def send_email(input: FilePipelineInput, ctx: Context) -> EmailResult:
|
||||
if meeting and meeting.email_recipients:
|
||||
await transcripts_controller.update(transcript, {"share_mode": "public"})
|
||||
|
||||
count = await send_transcript_email(recipients, transcript)
|
||||
count = 0
|
||||
if with_link:
|
||||
count += await send_transcript_email(
|
||||
with_link, transcript, include_link=True
|
||||
)
|
||||
if without_link:
|
||||
count += await send_transcript_email(
|
||||
without_link, transcript, include_link=False
|
||||
)
|
||||
ctx.log(f"send_email complete: sent {count} emails")
|
||||
|
||||
return EmailResult(emails_sent=count)
|
||||
|
||||
@@ -397,22 +397,41 @@ async def send_email(input: LivePostPipelineInput, ctx: Context) -> EmailResult:
|
||||
if recording and recording.meeting_id:
|
||||
meeting = await meetings_controller.get_by_id(recording.meeting_id)
|
||||
|
||||
recipients = (
|
||||
list(meeting.email_recipients)
|
||||
# Normalise meeting recipients (legacy strings → dicts)
|
||||
meeting_recipients: list[dict] = (
|
||||
[
|
||||
entry
|
||||
if isinstance(entry, dict)
|
||||
else {"email": entry, "include_link": True}
|
||||
for entry in (meeting.email_recipients or [])
|
||||
]
|
||||
if meeting and meeting.email_recipients
|
||||
else []
|
||||
)
|
||||
|
||||
# Also check room-level email
|
||||
# Room-level email always gets a link (room owner)
|
||||
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||
|
||||
room_email = None
|
||||
if transcript.room_id:
|
||||
room = await rooms_controller.get_by_id(transcript.room_id)
|
||||
if room and room.email_transcript_to:
|
||||
if room.email_transcript_to not in recipients:
|
||||
recipients.append(room.email_transcript_to)
|
||||
room_email = room.email_transcript_to
|
||||
|
||||
if not recipients:
|
||||
# Build two groups: with link and without link
|
||||
with_link = [
|
||||
r["email"] for r in meeting_recipients if r.get("include_link", True)
|
||||
]
|
||||
without_link = [
|
||||
r["email"] for r in meeting_recipients if not r.get("include_link", True)
|
||||
]
|
||||
|
||||
if room_email:
|
||||
if room_email not in with_link:
|
||||
with_link.append(room_email)
|
||||
without_link = [e for e in without_link if e != room_email]
|
||||
|
||||
if not with_link and not without_link:
|
||||
ctx.log("send_email skipped (no email recipients)")
|
||||
return EmailResult(skipped=True)
|
||||
|
||||
@@ -420,7 +439,15 @@ async def send_email(input: LivePostPipelineInput, ctx: Context) -> EmailResult:
|
||||
if meeting and meeting.email_recipients:
|
||||
await transcripts_controller.update(transcript, {"share_mode": "public"})
|
||||
|
||||
count = await send_transcript_email(recipients, transcript)
|
||||
count = 0
|
||||
if with_link:
|
||||
count += await send_transcript_email(
|
||||
with_link, transcript, include_link=True
|
||||
)
|
||||
if without_link:
|
||||
count += await send_transcript_email(
|
||||
without_link, transcript, include_link=False
|
||||
)
|
||||
ctx.log(f"send_email complete: sent {count} emails")
|
||||
|
||||
return EmailResult(emails_sent=count)
|
||||
|
||||
@@ -168,8 +168,9 @@ async def add_email_recipient(
|
||||
if not meeting:
|
||||
raise HTTPException(status_code=404, detail="Meeting not found")
|
||||
|
||||
include_link = user is not None
|
||||
recipients = await meetings_controller.add_email_recipient(
|
||||
meeting_id, request.email
|
||||
meeting_id, request.email, include_link=include_link
|
||||
)
|
||||
|
||||
return {"status": "success", "email_recipients": recipients}
|
||||
|
||||
@@ -797,5 +797,7 @@ async def transcript_send_email(
|
||||
)
|
||||
if not transcript:
|
||||
raise HTTPException(status_code=404, detail="Transcript not found")
|
||||
sent = await send_transcript_email([request.email], transcript)
|
||||
sent = await send_transcript_email(
|
||||
[request.email], transcript, include_link=(transcript.share_mode == "public")
|
||||
)
|
||||
return SendEmailResponse(sent=sent)
|
||||
|
||||
@@ -146,7 +146,6 @@ else:
|
||||
app.conf.broker_connection_retry_on_startup = True
|
||||
app.autodiscover_tasks(
|
||||
[
|
||||
"reflector.pipelines.main_live_pipeline",
|
||||
"reflector.worker.healthcheck",
|
||||
"reflector.worker.process",
|
||||
"reflector.worker.cleanup",
|
||||
|
||||
@@ -12,6 +12,7 @@ from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
from pydantic import ValidationError
|
||||
|
||||
from reflector.asynctask import asynctask
|
||||
from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse
|
||||
from reflector.db.daily_participant_sessions import (
|
||||
DailyParticipantSession,
|
||||
@@ -25,9 +26,6 @@ from reflector.db.transcripts import (
|
||||
transcripts_controller,
|
||||
)
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.pipelines.main_live_pipeline import asynctask
|
||||
from reflector.pipelines.topic_processing import EmptyPipeline
|
||||
from reflector.processors import AudioFileWriterProcessor
|
||||
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
|
||||
from reflector.redis_cache import RedisAsyncLock
|
||||
from reflector.settings import settings
|
||||
@@ -908,6 +906,11 @@ async def convert_audio_and_waveform(transcript) -> None:
|
||||
transcript_id=transcript.id,
|
||||
)
|
||||
|
||||
from reflector.pipelines.topic_processing import EmptyPipeline # noqa: PLC0415
|
||||
from reflector.processors.audio_file_writer import (
|
||||
AudioFileWriterProcessor, # noqa: PLC0415
|
||||
)
|
||||
|
||||
upload_path = transcript.data_path / "upload.webm"
|
||||
mp3_path = transcript.audio_mp3_filename
|
||||
|
||||
|
||||
@@ -8,8 +8,8 @@ import structlog
|
||||
from celery import shared_task
|
||||
from celery.utils.log import get_task_logger
|
||||
|
||||
from reflector.asynctask import asynctask
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.pipelines.main_live_pipeline import asynctask
|
||||
from reflector.utils.webhook import (
|
||||
WebhookRoomPayload,
|
||||
WebhookTestPayload,
|
||||
|
||||
@@ -107,7 +107,8 @@ class WebsocketManager:
|
||||
while True:
|
||||
# timeout=1.0 prevents tight CPU loop when no messages available
|
||||
message = await pubsub_subscriber.get_message(
|
||||
ignore_subscribe_messages=True
|
||||
ignore_subscribe_messages=True,
|
||||
timeout=1.0,
|
||||
)
|
||||
if message is not None:
|
||||
room_id = message["channel"].decode("utf-8")
|
||||
|
||||
Reference in New Issue
Block a user