From 8c4f5e9c0f893f4cb029595505b53136f04760f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Diego=20Garc=C3=ADa?= Date: Tue, 31 Mar 2026 16:34:10 -0500 Subject: [PATCH] fix: cpu usage + email improvements (#944) * fix: cpu usage on server ws manager, 100% to 0% on idle * fix: change email icon to white and prefill email in daily room for authenticated users * fix: improve email sending with full ts transcript --- server/reflector/db/meetings.py | 25 ++- server/reflector/email.py | 116 ++++++++-- server/reflector/hatchet/constants.py | 6 + .../workflows/daily_multitrack_pipeline.py | 44 +++- .../hatchet/workflows/file_pipeline.py | 41 +++- .../hatchet/workflows/live_post_pipeline.py | 41 +++- server/reflector/views/meetings.py | 3 +- server/reflector/views/transcripts.py | 4 +- server/reflector/worker/app.py | 1 - server/reflector/worker/process.py | 9 +- server/reflector/worker/webhook.py | 2 +- server/reflector/ws_manager.py | 3 +- server/tests/test_email.py | 206 ++++++++++++++++++ www/app/[roomName]/components/DailyRoom.tsx | 5 + .../emailTranscript/EmailTranscriptDialog.tsx | 4 +- .../useEmailTranscriptDialog.tsx | 5 +- www/public/email-icon.svg | 2 +- 17 files changed, 459 insertions(+), 58 deletions(-) create mode 100644 server/tests/test_email.py diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py index 5096eeda..ba7b8a3a 100644 --- a/server/reflector/db/meetings.py +++ b/server/reflector/db/meetings.py @@ -120,7 +120,8 @@ class Meeting(BaseModel): daily_composed_video_s3_key: str | None = None daily_composed_video_duration: int | None = None # Email recipients for transcript notification - email_recipients: list[str] | None = None + # Each entry is {"email": str, "include_link": bool} or a legacy plain str + email_recipients: list[dict | str] | None = None class MeetingController: @@ -399,15 +400,27 @@ class MeetingController: async with get_database().transaction(isolation="serializable"): yield - async def add_email_recipient(self, meeting_id: str, email: str) -> list[str]: - """Add an email to the meeting's email_recipients list (no duplicates).""" + async def add_email_recipient( + self, meeting_id: str, email: str, *, include_link: bool = True + ) -> list[dict]: + """Add an email to the meeting's email_recipients list (no duplicates). + + Each entry is stored as {"email": str, "include_link": bool}. + Legacy plain-string entries are normalised on read. + """ async with self.transaction(): meeting = await self.get_by_id(meeting_id) if not meeting: raise ValueError(f"Meeting {meeting_id} not found") - current = meeting.email_recipients or [] - if email not in current: - current.append(email) + # Normalise legacy string entries + current: list[dict] = [ + entry + if isinstance(entry, dict) + else {"email": entry, "include_link": True} + for entry in (meeting.email_recipients or []) + ] + if not any(r["email"] == email for r in current): + current.append({"email": email, "include_link": include_link}) await self.update_meeting(meeting_id, email_recipients=current) return current diff --git a/server/reflector/email.py b/server/reflector/email.py index d10471e2..637f08be 100644 --- a/server/reflector/email.py +++ b/server/reflector/email.py @@ -1,11 +1,13 @@ from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText +from html import escape import aiosmtplib import structlog -from reflector.db.transcripts import Transcript +from reflector.db.transcripts import SourceKind, Transcript from reflector.settings import settings +from reflector.utils.transcript_formats import transcript_to_text_timestamped logger = structlog.get_logger(__name__) @@ -18,35 +20,111 @@ def get_transcript_url(transcript: Transcript) -> str: return f"{settings.UI_BASE_URL}/transcripts/{transcript.id}" -def _build_plain_text(transcript: Transcript, url: str) -> str: +def _get_timestamped_text(transcript: Transcript) -> str: + """Build the full timestamped transcript text using existing utility.""" + if not transcript.topics: + return "" + is_multitrack = transcript.source_kind == SourceKind.ROOM + return transcript_to_text_timestamped( + transcript.topics, transcript.participants, is_multitrack=is_multitrack + ) + + +def _build_plain_text(transcript: Transcript, url: str, include_link: bool) -> str: title = transcript.title or "Unnamed recording" - lines = [ - f"Your transcript is ready: {title}", - "", - f"View it here: {url}", - ] + lines = [f"Reflector: {title}", ""] + if transcript.short_summary: - lines.extend(["", "Summary:", transcript.short_summary]) + lines.extend(["Summary:", transcript.short_summary, ""]) + + timestamped = _get_timestamped_text(transcript) + if timestamped: + lines.extend(["Transcript:", timestamped, ""]) + + if include_link: + lines.append(f"View transcript: {url}") + lines.append("") + + lines.append( + "This email was sent because you requested to receive " + "the transcript from a meeting." + ) return "\n".join(lines) -def _build_html(transcript: Transcript, url: str) -> str: - title = transcript.title or "Unnamed recording" +def _build_html(transcript: Transcript, url: str, include_link: bool) -> str: + title = escape(transcript.title or "Unnamed recording") + summary_html = "" if transcript.short_summary: - summary_html = f"

{transcript.short_summary}

" + summary_html = ( + f'

' + f"{escape(transcript.short_summary)}

" + ) + + transcript_html = "" + timestamped = _get_timestamped_text(transcript) + if timestamped: + # Build styled transcript lines + styled_lines = [] + for line in timestamped.split("\n"): + if not line.strip(): + continue + # Lines are formatted as "[MM:SS] Speaker: text" + if line.startswith("[") and "] " in line: + bracket_end = line.index("] ") + timestamp = escape(line[: bracket_end + 1]) + rest = line[bracket_end + 2 :] + if ": " in rest: + colon_pos = rest.index(": ") + speaker = escape(rest[:colon_pos]) + text = escape(rest[colon_pos + 2 :]) + styled_lines.append( + f'
' + f'{timestamp} ' + f"{speaker}: {text}
" + ) + else: + styled_lines.append( + f'
{escape(line)}
' + ) + else: + styled_lines.append( + f'
{escape(line)}
' + ) + + transcript_html = ( + '

Transcript

' + '
' + f"{''.join(styled_lines)}
" + ) + + link_html = "" + if include_link: + link_html = ( + '

' + f'View Transcript

' + ) return f"""\
-

Your transcript is ready

-

{title}

+

{title}

{summary_html} -

View Transcript

-

This email was sent because you requested to receive the transcript from a meeting.

+ {transcript_html} + {link_html} +

This email was sent because you requested to receive the transcript from a meeting.

""" -async def send_transcript_email(to_emails: list[str], transcript: Transcript) -> int: +async def send_transcript_email( + to_emails: list[str], + transcript: Transcript, + *, + include_link: bool = True, +) -> int: """Send transcript notification to all emails. Returns count sent.""" if not is_email_configured() or not to_emails: return 0 @@ -57,12 +135,12 @@ async def send_transcript_email(to_emails: list[str], transcript: Transcript) -> for email_addr in to_emails: msg = MIMEMultipart("alternative") - msg["Subject"] = f"Transcript Ready: {title}" + msg["Subject"] = f"Reflector: {title}" msg["From"] = settings.SMTP_FROM_EMAIL msg["To"] = email_addr - msg.attach(MIMEText(_build_plain_text(transcript, url), "plain")) - msg.attach(MIMEText(_build_html(transcript, url), "html")) + msg.attach(MIMEText(_build_plain_text(transcript, url, include_link), "plain")) + msg.attach(MIMEText(_build_html(transcript, url, include_link), "html")) try: await aiosmtplib.send( diff --git a/server/reflector/hatchet/constants.py b/server/reflector/hatchet/constants.py index bfb57bf2..4fdb12e4 100644 --- a/server/reflector/hatchet/constants.py +++ b/server/reflector/hatchet/constants.py @@ -64,3 +64,9 @@ TIMEOUT_HEAVY = 1200 # Transcription, fan-out LLM tasks (Hatchet execution_time TIMEOUT_HEAVY_HTTP = ( 1150 # httpx timeout for transcribe_track — below 1200 so Hatchet doesn't race ) +TIMEOUT_EXTRA_HEAVY = ( + 3600 # Detect Topics, fan-out LLM tasks (Hatchet execution_timeout) +) +TIMEOUT_EXTRA_HEAVY_HTTP = ( + 3400 # httpx timeout for detect_topics — below 3600 so Hatchet doesn't race +) diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index eee164fe..ef8a5c16 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -41,6 +41,7 @@ from reflector.hatchet.broadcast import ( from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.constants import ( TIMEOUT_AUDIO, + TIMEOUT_EXTRA_HEAVY, TIMEOUT_HEAVY, TIMEOUT_LONG, TIMEOUT_MEDIUM, @@ -693,7 +694,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul @daily_multitrack_pipeline.task( parents=[process_tracks], - execution_timeout=timedelta(seconds=TIMEOUT_HEAVY), + execution_timeout=timedelta(seconds=TIMEOUT_EXTRA_HEAVY), retries=3, backoff_factor=2.0, backoff_max_seconds=30, @@ -1510,22 +1511,41 @@ async def send_email(input: PipelineInput, ctx: Context) -> EmailResult: if recording and recording.meeting_id: meeting = await meetings_controller.get_by_id(recording.meeting_id) - recipients = ( - list(meeting.email_recipients) + # Normalise meeting recipients (legacy strings → dicts) + meeting_recipients: list[dict] = ( + [ + entry + if isinstance(entry, dict) + else {"email": entry, "include_link": True} + for entry in (meeting.email_recipients or []) + ] if meeting and meeting.email_recipients else [] ) - # Also check room-level email + # Room-level email always gets a link (room owner) from reflector.db.rooms import rooms_controller # noqa: PLC0415 + room_email = None if transcript.room_id: room = await rooms_controller.get_by_id(transcript.room_id) if room and room.email_transcript_to: - if room.email_transcript_to not in recipients: - recipients.append(room.email_transcript_to) + room_email = room.email_transcript_to - if not recipients: + # Build two groups: with link and without link + with_link = [ + r["email"] for r in meeting_recipients if r.get("include_link", True) + ] + without_link = [ + r["email"] for r in meeting_recipients if not r.get("include_link", True) + ] + + if room_email: + if room_email not in with_link: + with_link.append(room_email) + without_link = [e for e in without_link if e != room_email] + + if not with_link and not without_link: ctx.log("send_email skipped (no email recipients)") return EmailResult(skipped=True) @@ -1533,7 +1553,15 @@ async def send_email(input: PipelineInput, ctx: Context) -> EmailResult: if meeting and meeting.email_recipients: await transcripts_controller.update(transcript, {"share_mode": "public"}) - count = await send_transcript_email(recipients, transcript) + count = 0 + if with_link: + count += await send_transcript_email( + with_link, transcript, include_link=True + ) + if without_link: + count += await send_transcript_email( + without_link, transcript, include_link=False + ) ctx.log(f"send_email complete: sent {count} emails") return EmailResult(emails_sent=count) diff --git a/server/reflector/hatchet/workflows/file_pipeline.py b/server/reflector/hatchet/workflows/file_pipeline.py index 6ec38345..5f20a2b5 100644 --- a/server/reflector/hatchet/workflows/file_pipeline.py +++ b/server/reflector/hatchet/workflows/file_pipeline.py @@ -916,22 +916,41 @@ async def send_email(input: FilePipelineInput, ctx: Context) -> EmailResult: if recording and recording.meeting_id: meeting = await meetings_controller.get_by_id(recording.meeting_id) - recipients = ( - list(meeting.email_recipients) + # Normalise meeting recipients (legacy strings → dicts) + meeting_recipients: list[dict] = ( + [ + entry + if isinstance(entry, dict) + else {"email": entry, "include_link": True} + for entry in (meeting.email_recipients or []) + ] if meeting and meeting.email_recipients else [] ) - # Also check room-level email + # Room-level email always gets a link (room owner) from reflector.db.rooms import rooms_controller # noqa: PLC0415 + room_email = None if transcript.room_id: room = await rooms_controller.get_by_id(transcript.room_id) if room and room.email_transcript_to: - if room.email_transcript_to not in recipients: - recipients.append(room.email_transcript_to) + room_email = room.email_transcript_to - if not recipients: + # Build two groups: with link and without link + with_link = [ + r["email"] for r in meeting_recipients if r.get("include_link", True) + ] + without_link = [ + r["email"] for r in meeting_recipients if not r.get("include_link", True) + ] + + if room_email: + if room_email not in with_link: + with_link.append(room_email) + without_link = [e for e in without_link if e != room_email] + + if not with_link and not without_link: ctx.log("send_email skipped (no email recipients)") return EmailResult(skipped=True) @@ -939,7 +958,15 @@ async def send_email(input: FilePipelineInput, ctx: Context) -> EmailResult: if meeting and meeting.email_recipients: await transcripts_controller.update(transcript, {"share_mode": "public"}) - count = await send_transcript_email(recipients, transcript) + count = 0 + if with_link: + count += await send_transcript_email( + with_link, transcript, include_link=True + ) + if without_link: + count += await send_transcript_email( + without_link, transcript, include_link=False + ) ctx.log(f"send_email complete: sent {count} emails") return EmailResult(emails_sent=count) diff --git a/server/reflector/hatchet/workflows/live_post_pipeline.py b/server/reflector/hatchet/workflows/live_post_pipeline.py index e1768835..5913d999 100644 --- a/server/reflector/hatchet/workflows/live_post_pipeline.py +++ b/server/reflector/hatchet/workflows/live_post_pipeline.py @@ -397,22 +397,41 @@ async def send_email(input: LivePostPipelineInput, ctx: Context) -> EmailResult: if recording and recording.meeting_id: meeting = await meetings_controller.get_by_id(recording.meeting_id) - recipients = ( - list(meeting.email_recipients) + # Normalise meeting recipients (legacy strings → dicts) + meeting_recipients: list[dict] = ( + [ + entry + if isinstance(entry, dict) + else {"email": entry, "include_link": True} + for entry in (meeting.email_recipients or []) + ] if meeting and meeting.email_recipients else [] ) - # Also check room-level email + # Room-level email always gets a link (room owner) from reflector.db.rooms import rooms_controller # noqa: PLC0415 + room_email = None if transcript.room_id: room = await rooms_controller.get_by_id(transcript.room_id) if room and room.email_transcript_to: - if room.email_transcript_to not in recipients: - recipients.append(room.email_transcript_to) + room_email = room.email_transcript_to - if not recipients: + # Build two groups: with link and without link + with_link = [ + r["email"] for r in meeting_recipients if r.get("include_link", True) + ] + without_link = [ + r["email"] for r in meeting_recipients if not r.get("include_link", True) + ] + + if room_email: + if room_email not in with_link: + with_link.append(room_email) + without_link = [e for e in without_link if e != room_email] + + if not with_link and not without_link: ctx.log("send_email skipped (no email recipients)") return EmailResult(skipped=True) @@ -420,7 +439,15 @@ async def send_email(input: LivePostPipelineInput, ctx: Context) -> EmailResult: if meeting and meeting.email_recipients: await transcripts_controller.update(transcript, {"share_mode": "public"}) - count = await send_transcript_email(recipients, transcript) + count = 0 + if with_link: + count += await send_transcript_email( + with_link, transcript, include_link=True + ) + if without_link: + count += await send_transcript_email( + without_link, transcript, include_link=False + ) ctx.log(f"send_email complete: sent {count} emails") return EmailResult(emails_sent=count) diff --git a/server/reflector/views/meetings.py b/server/reflector/views/meetings.py index 7f256c41..78137ca9 100644 --- a/server/reflector/views/meetings.py +++ b/server/reflector/views/meetings.py @@ -168,8 +168,9 @@ async def add_email_recipient( if not meeting: raise HTTPException(status_code=404, detail="Meeting not found") + include_link = user is not None recipients = await meetings_controller.add_email_recipient( - meeting_id, request.email + meeting_id, request.email, include_link=include_link ) return {"status": "success", "email_recipients": recipients} diff --git a/server/reflector/views/transcripts.py b/server/reflector/views/transcripts.py index 171c3d81..d49aa5a6 100644 --- a/server/reflector/views/transcripts.py +++ b/server/reflector/views/transcripts.py @@ -797,5 +797,7 @@ async def transcript_send_email( ) if not transcript: raise HTTPException(status_code=404, detail="Transcript not found") - sent = await send_transcript_email([request.email], transcript) + sent = await send_transcript_email( + [request.email], transcript, include_link=(transcript.share_mode == "public") + ) return SendEmailResponse(sent=sent) diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py index 2c9d2ae8..7df74263 100644 --- a/server/reflector/worker/app.py +++ b/server/reflector/worker/app.py @@ -146,7 +146,6 @@ else: app.conf.broker_connection_retry_on_startup = True app.autodiscover_tasks( [ - "reflector.pipelines.main_live_pipeline", "reflector.worker.healthcheck", "reflector.worker.process", "reflector.worker.cleanup", diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index c0381769..aa7d8042 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -12,6 +12,7 @@ from celery import shared_task from celery.utils.log import get_task_logger from pydantic import ValidationError +from reflector.asynctask import asynctask from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse from reflector.db.daily_participant_sessions import ( DailyParticipantSession, @@ -25,9 +26,6 @@ from reflector.db.transcripts import ( transcripts_controller, ) from reflector.hatchet.client import HatchetClientManager -from reflector.pipelines.main_live_pipeline import asynctask -from reflector.pipelines.topic_processing import EmptyPipeline -from reflector.processors import AudioFileWriterProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.redis_cache import RedisAsyncLock from reflector.settings import settings @@ -908,6 +906,11 @@ async def convert_audio_and_waveform(transcript) -> None: transcript_id=transcript.id, ) + from reflector.pipelines.topic_processing import EmptyPipeline # noqa: PLC0415 + from reflector.processors.audio_file_writer import ( + AudioFileWriterProcessor, # noqa: PLC0415 + ) + upload_path = transcript.data_path / "upload.webm" mp3_path = transcript.audio_mp3_filename diff --git a/server/reflector/worker/webhook.py b/server/reflector/worker/webhook.py index a31df897..ecbf67e7 100644 --- a/server/reflector/worker/webhook.py +++ b/server/reflector/worker/webhook.py @@ -8,8 +8,8 @@ import structlog from celery import shared_task from celery.utils.log import get_task_logger +from reflector.asynctask import asynctask from reflector.db.rooms import rooms_controller -from reflector.pipelines.main_live_pipeline import asynctask from reflector.utils.webhook import ( WebhookRoomPayload, WebhookTestPayload, diff --git a/server/reflector/ws_manager.py b/server/reflector/ws_manager.py index 48f178f5..9c187714 100644 --- a/server/reflector/ws_manager.py +++ b/server/reflector/ws_manager.py @@ -107,7 +107,8 @@ class WebsocketManager: while True: # timeout=1.0 prevents tight CPU loop when no messages available message = await pubsub_subscriber.get_message( - ignore_subscribe_messages=True + ignore_subscribe_messages=True, + timeout=1.0, ) if message is not None: room_id = message["channel"].decode("utf-8") diff --git a/server/tests/test_email.py b/server/tests/test_email.py new file mode 100644 index 00000000..37c93f0e --- /dev/null +++ b/server/tests/test_email.py @@ -0,0 +1,206 @@ +"""Tests for reflector.email — transcript email composition and sending.""" + +from unittest.mock import AsyncMock, patch + +import pytest + +from reflector.db.transcripts import ( + SourceKind, + Transcript, + TranscriptParticipant, + TranscriptTopic, +) +from reflector.email import ( + _build_html, + _build_plain_text, + get_transcript_url, + send_transcript_email, +) +from reflector.processors.types import Word + + +def _make_transcript( + *, + title: str | None = "Weekly Standup", + short_summary: str | None = "Team discussed sprint progress.", + with_topics: bool = True, + share_mode: str = "private", + source_kind: SourceKind = SourceKind.FILE, +) -> Transcript: + topics = [] + participants = [] + if with_topics: + participants = [ + TranscriptParticipant(id="p1", speaker=0, name="Alice"), + TranscriptParticipant(id="p2", speaker=1, name="Bob"), + ] + topics = [ + TranscriptTopic( + title="Intro", + summary="Greetings", + timestamp=0.0, + duration=10.0, + words=[ + Word(text="Hello", start=0.0, end=0.5, speaker=0), + Word(text="everyone", start=0.5, end=1.0, speaker=0), + Word(text="Thanks", start=5.0, end=5.5, speaker=1), + Word(text="for", start=5.5, end=5.8, speaker=1), + Word(text="joining", start=5.8, end=6.2, speaker=1), + ], + ), + ] + return Transcript( + id="tx-123", + title=title, + short_summary=short_summary, + topics=topics, + participants=participants, + share_mode=share_mode, + source_kind=source_kind, + ) + + +URL = "http://localhost:3000/transcripts/tx-123" + + +class TestBuildPlainText: + def test_full_content_with_link(self): + t = _make_transcript() + text = _build_plain_text(t, URL, include_link=True) + + assert text.startswith("Reflector: Weekly Standup") + assert "Team discussed sprint progress." in text + assert "[00:00] Alice:" in text + assert "[00:05] Bob:" in text + assert URL in text + + def test_full_content_without_link(self): + t = _make_transcript() + text = _build_plain_text(t, URL, include_link=False) + + assert "Reflector: Weekly Standup" in text + assert "Team discussed sprint progress." in text + assert "[00:00] Alice:" in text + assert URL not in text + + def test_no_summary(self): + t = _make_transcript(short_summary=None) + text = _build_plain_text(t, URL, include_link=True) + + assert "Summary:" not in text + assert "[00:00] Alice:" in text + + def test_no_topics(self): + t = _make_transcript(with_topics=False) + text = _build_plain_text(t, URL, include_link=True) + + assert "Transcript:" not in text + assert "Reflector: Weekly Standup" in text + + def test_unnamed_recording(self): + t = _make_transcript(title=None) + text = _build_plain_text(t, URL, include_link=True) + + assert "Reflector: Unnamed recording" in text + + +class TestBuildHtml: + def test_full_content_with_link(self): + t = _make_transcript() + html = _build_html(t, URL, include_link=True) + + assert "Weekly Standup" in html + assert "Team discussed sprint progress." in html + assert "Alice" in html + assert "Bob" in html + assert URL in html + assert "View Transcript" in html + + def test_full_content_without_link(self): + t = _make_transcript() + html = _build_html(t, URL, include_link=False) + + assert "Weekly Standup" in html + assert "Alice" in html + assert URL not in html + assert "View Transcript" not in html + + def test_no_summary(self): + t = _make_transcript(short_summary=None) + html = _build_html(t, URL, include_link=True) + + assert "sprint progress" not in html + assert "Alice" in html + + def test_no_topics(self): + t = _make_transcript(with_topics=False) + html = _build_html(t, URL, include_link=True) + + assert "Transcript" not in html or "View Transcript" in html + + def test_html_escapes_title(self): + t = _make_transcript(title='') + html = _build_html(t, URL, include_link=True) + + assert "