diff --git a/server/reflector/db/meetings.py b/server/reflector/db/meetings.py
index 5096eeda..ba7b8a3a 100644
--- a/server/reflector/db/meetings.py
+++ b/server/reflector/db/meetings.py
@@ -120,7 +120,8 @@ class Meeting(BaseModel):
daily_composed_video_s3_key: str | None = None
daily_composed_video_duration: int | None = None
# Email recipients for transcript notification
- email_recipients: list[str] | None = None
+ # Each entry is {"email": str, "include_link": bool} or a legacy plain str
+ email_recipients: list[dict | str] | None = None
class MeetingController:
@@ -399,15 +400,27 @@ class MeetingController:
async with get_database().transaction(isolation="serializable"):
yield
- async def add_email_recipient(self, meeting_id: str, email: str) -> list[str]:
- """Add an email to the meeting's email_recipients list (no duplicates)."""
+ async def add_email_recipient(
+ self, meeting_id: str, email: str, *, include_link: bool = True
+ ) -> list[dict]:
+ """Add an email to the meeting's email_recipients list (no duplicates).
+
+ Each entry is stored as {"email": str, "include_link": bool}.
+ Legacy plain-string entries are normalised on read.
+ """
async with self.transaction():
meeting = await self.get_by_id(meeting_id)
if not meeting:
raise ValueError(f"Meeting {meeting_id} not found")
- current = meeting.email_recipients or []
- if email not in current:
- current.append(email)
+ # Normalise legacy string entries
+ current: list[dict] = [
+ entry
+ if isinstance(entry, dict)
+ else {"email": entry, "include_link": True}
+ for entry in (meeting.email_recipients or [])
+ ]
+ if not any(r["email"] == email for r in current):
+ current.append({"email": email, "include_link": include_link})
await self.update_meeting(meeting_id, email_recipients=current)
return current
diff --git a/server/reflector/email.py b/server/reflector/email.py
index d10471e2..637f08be 100644
--- a/server/reflector/email.py
+++ b/server/reflector/email.py
@@ -1,11 +1,13 @@
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
+from html import escape
import aiosmtplib
import structlog
-from reflector.db.transcripts import Transcript
+from reflector.db.transcripts import SourceKind, Transcript
from reflector.settings import settings
+from reflector.utils.transcript_formats import transcript_to_text_timestamped
logger = structlog.get_logger(__name__)
@@ -18,35 +20,111 @@ def get_transcript_url(transcript: Transcript) -> str:
return f"{settings.UI_BASE_URL}/transcripts/{transcript.id}"
-def _build_plain_text(transcript: Transcript, url: str) -> str:
+def _get_timestamped_text(transcript: Transcript) -> str:
+ """Build the full timestamped transcript text using existing utility."""
+ if not transcript.topics:
+ return ""
+ is_multitrack = transcript.source_kind == SourceKind.ROOM
+ return transcript_to_text_timestamped(
+ transcript.topics, transcript.participants, is_multitrack=is_multitrack
+ )
+
+
+def _build_plain_text(transcript: Transcript, url: str, include_link: bool) -> str:
title = transcript.title or "Unnamed recording"
- lines = [
- f"Your transcript is ready: {title}",
- "",
- f"View it here: {url}",
- ]
+ lines = [f"Reflector: {title}", ""]
+
if transcript.short_summary:
- lines.extend(["", "Summary:", transcript.short_summary])
+ lines.extend(["Summary:", transcript.short_summary, ""])
+
+ timestamped = _get_timestamped_text(transcript)
+ if timestamped:
+ lines.extend(["Transcript:", timestamped, ""])
+
+ if include_link:
+ lines.append(f"View transcript: {url}")
+ lines.append("")
+
+ lines.append(
+ "This email was sent because you requested to receive "
+ "the transcript from a meeting."
+ )
return "\n".join(lines)
-def _build_html(transcript: Transcript, url: str) -> str:
- title = transcript.title or "Unnamed recording"
+def _build_html(transcript: Transcript, url: str, include_link: bool) -> str:
+ title = escape(transcript.title or "Unnamed recording")
+
summary_html = ""
if transcript.short_summary:
- summary_html = f"
{transcript.short_summary}
"
+ summary_html = (
+ f''
+ f"{escape(transcript.short_summary)}
"
+ )
+
+ transcript_html = ""
+ timestamped = _get_timestamped_text(transcript)
+ if timestamped:
+ # Build styled transcript lines
+ styled_lines = []
+ for line in timestamped.split("\n"):
+ if not line.strip():
+ continue
+ # Lines are formatted as "[MM:SS] Speaker: text"
+ if line.startswith("[") and "] " in line:
+ bracket_end = line.index("] ")
+ timestamp = escape(line[: bracket_end + 1])
+ rest = line[bracket_end + 2 :]
+ if ": " in rest:
+ colon_pos = rest.index(": ")
+ speaker = escape(rest[:colon_pos])
+ text = escape(rest[colon_pos + 2 :])
+ styled_lines.append(
+ f''
+ f'{timestamp} '
+ f"{speaker}: {text}
"
+ )
+ else:
+ styled_lines.append(
+ f'{escape(line)}
'
+ )
+ else:
+ styled_lines.append(
+ f'{escape(line)}
'
+ )
+
+ transcript_html = (
+ 'Transcript
'
+ ''
+ f"{''.join(styled_lines)}
"
+ )
+
+ link_html = ""
+ if include_link:
+ link_html = (
+ ''
+ f'View Transcript
'
+ )
return f"""\
-
Your transcript is ready
-
{title}
+
{title}
{summary_html}
-
View Transcript
-
This email was sent because you requested to receive the transcript from a meeting.
+ {transcript_html}
+ {link_html}
+
This email was sent because you requested to receive the transcript from a meeting.
"""
-async def send_transcript_email(to_emails: list[str], transcript: Transcript) -> int:
+async def send_transcript_email(
+ to_emails: list[str],
+ transcript: Transcript,
+ *,
+ include_link: bool = True,
+) -> int:
"""Send transcript notification to all emails. Returns count sent."""
if not is_email_configured() or not to_emails:
return 0
@@ -57,12 +135,12 @@ async def send_transcript_email(to_emails: list[str], transcript: Transcript) ->
for email_addr in to_emails:
msg = MIMEMultipart("alternative")
- msg["Subject"] = f"Transcript Ready: {title}"
+ msg["Subject"] = f"Reflector: {title}"
msg["From"] = settings.SMTP_FROM_EMAIL
msg["To"] = email_addr
- msg.attach(MIMEText(_build_plain_text(transcript, url), "plain"))
- msg.attach(MIMEText(_build_html(transcript, url), "html"))
+ msg.attach(MIMEText(_build_plain_text(transcript, url, include_link), "plain"))
+ msg.attach(MIMEText(_build_html(transcript, url, include_link), "html"))
try:
await aiosmtplib.send(
diff --git a/server/reflector/hatchet/constants.py b/server/reflector/hatchet/constants.py
index bfb57bf2..4fdb12e4 100644
--- a/server/reflector/hatchet/constants.py
+++ b/server/reflector/hatchet/constants.py
@@ -64,3 +64,9 @@ TIMEOUT_HEAVY = 1200 # Transcription, fan-out LLM tasks (Hatchet execution_time
TIMEOUT_HEAVY_HTTP = (
1150 # httpx timeout for transcribe_track — below 1200 so Hatchet doesn't race
)
+TIMEOUT_EXTRA_HEAVY = (
+ 3600 # Detect Topics, fan-out LLM tasks (Hatchet execution_timeout)
+)
+TIMEOUT_EXTRA_HEAVY_HTTP = (
+ 3400 # httpx timeout for detect_topics — below 3600 so Hatchet doesn't race
+)
diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py
index eee164fe..ef8a5c16 100644
--- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py
+++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py
@@ -41,6 +41,7 @@ from reflector.hatchet.broadcast import (
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import (
TIMEOUT_AUDIO,
+ TIMEOUT_EXTRA_HEAVY,
TIMEOUT_HEAVY,
TIMEOUT_LONG,
TIMEOUT_MEDIUM,
@@ -693,7 +694,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
@daily_multitrack_pipeline.task(
parents=[process_tracks],
- execution_timeout=timedelta(seconds=TIMEOUT_HEAVY),
+ execution_timeout=timedelta(seconds=TIMEOUT_EXTRA_HEAVY),
retries=3,
backoff_factor=2.0,
backoff_max_seconds=30,
@@ -1510,22 +1511,41 @@ async def send_email(input: PipelineInput, ctx: Context) -> EmailResult:
if recording and recording.meeting_id:
meeting = await meetings_controller.get_by_id(recording.meeting_id)
- recipients = (
- list(meeting.email_recipients)
+ # Normalise meeting recipients (legacy strings → dicts)
+ meeting_recipients: list[dict] = (
+ [
+ entry
+ if isinstance(entry, dict)
+ else {"email": entry, "include_link": True}
+ for entry in (meeting.email_recipients or [])
+ ]
if meeting and meeting.email_recipients
else []
)
- # Also check room-level email
+ # Room-level email always gets a link (room owner)
from reflector.db.rooms import rooms_controller # noqa: PLC0415
+ room_email = None
if transcript.room_id:
room = await rooms_controller.get_by_id(transcript.room_id)
if room and room.email_transcript_to:
- if room.email_transcript_to not in recipients:
- recipients.append(room.email_transcript_to)
+ room_email = room.email_transcript_to
- if not recipients:
+ # Build two groups: with link and without link
+ with_link = [
+ r["email"] for r in meeting_recipients if r.get("include_link", True)
+ ]
+ without_link = [
+ r["email"] for r in meeting_recipients if not r.get("include_link", True)
+ ]
+
+ if room_email:
+ if room_email not in with_link:
+ with_link.append(room_email)
+ without_link = [e for e in without_link if e != room_email]
+
+ if not with_link and not without_link:
ctx.log("send_email skipped (no email recipients)")
return EmailResult(skipped=True)
@@ -1533,7 +1553,15 @@ async def send_email(input: PipelineInput, ctx: Context) -> EmailResult:
if meeting and meeting.email_recipients:
await transcripts_controller.update(transcript, {"share_mode": "public"})
- count = await send_transcript_email(recipients, transcript)
+ count = 0
+ if with_link:
+ count += await send_transcript_email(
+ with_link, transcript, include_link=True
+ )
+ if without_link:
+ count += await send_transcript_email(
+ without_link, transcript, include_link=False
+ )
ctx.log(f"send_email complete: sent {count} emails")
return EmailResult(emails_sent=count)
diff --git a/server/reflector/hatchet/workflows/file_pipeline.py b/server/reflector/hatchet/workflows/file_pipeline.py
index 6ec38345..5f20a2b5 100644
--- a/server/reflector/hatchet/workflows/file_pipeline.py
+++ b/server/reflector/hatchet/workflows/file_pipeline.py
@@ -916,22 +916,41 @@ async def send_email(input: FilePipelineInput, ctx: Context) -> EmailResult:
if recording and recording.meeting_id:
meeting = await meetings_controller.get_by_id(recording.meeting_id)
- recipients = (
- list(meeting.email_recipients)
+ # Normalise meeting recipients (legacy strings → dicts)
+ meeting_recipients: list[dict] = (
+ [
+ entry
+ if isinstance(entry, dict)
+ else {"email": entry, "include_link": True}
+ for entry in (meeting.email_recipients or [])
+ ]
if meeting and meeting.email_recipients
else []
)
- # Also check room-level email
+ # Room-level email always gets a link (room owner)
from reflector.db.rooms import rooms_controller # noqa: PLC0415
+ room_email = None
if transcript.room_id:
room = await rooms_controller.get_by_id(transcript.room_id)
if room and room.email_transcript_to:
- if room.email_transcript_to not in recipients:
- recipients.append(room.email_transcript_to)
+ room_email = room.email_transcript_to
- if not recipients:
+ # Build two groups: with link and without link
+ with_link = [
+ r["email"] for r in meeting_recipients if r.get("include_link", True)
+ ]
+ without_link = [
+ r["email"] for r in meeting_recipients if not r.get("include_link", True)
+ ]
+
+ if room_email:
+ if room_email not in with_link:
+ with_link.append(room_email)
+ without_link = [e for e in without_link if e != room_email]
+
+ if not with_link and not without_link:
ctx.log("send_email skipped (no email recipients)")
return EmailResult(skipped=True)
@@ -939,7 +958,15 @@ async def send_email(input: FilePipelineInput, ctx: Context) -> EmailResult:
if meeting and meeting.email_recipients:
await transcripts_controller.update(transcript, {"share_mode": "public"})
- count = await send_transcript_email(recipients, transcript)
+ count = 0
+ if with_link:
+ count += await send_transcript_email(
+ with_link, transcript, include_link=True
+ )
+ if without_link:
+ count += await send_transcript_email(
+ without_link, transcript, include_link=False
+ )
ctx.log(f"send_email complete: sent {count} emails")
return EmailResult(emails_sent=count)
diff --git a/server/reflector/hatchet/workflows/live_post_pipeline.py b/server/reflector/hatchet/workflows/live_post_pipeline.py
index e1768835..5913d999 100644
--- a/server/reflector/hatchet/workflows/live_post_pipeline.py
+++ b/server/reflector/hatchet/workflows/live_post_pipeline.py
@@ -397,22 +397,41 @@ async def send_email(input: LivePostPipelineInput, ctx: Context) -> EmailResult:
if recording and recording.meeting_id:
meeting = await meetings_controller.get_by_id(recording.meeting_id)
- recipients = (
- list(meeting.email_recipients)
+ # Normalise meeting recipients (legacy strings → dicts)
+ meeting_recipients: list[dict] = (
+ [
+ entry
+ if isinstance(entry, dict)
+ else {"email": entry, "include_link": True}
+ for entry in (meeting.email_recipients or [])
+ ]
if meeting and meeting.email_recipients
else []
)
- # Also check room-level email
+ # Room-level email always gets a link (room owner)
from reflector.db.rooms import rooms_controller # noqa: PLC0415
+ room_email = None
if transcript.room_id:
room = await rooms_controller.get_by_id(transcript.room_id)
if room and room.email_transcript_to:
- if room.email_transcript_to not in recipients:
- recipients.append(room.email_transcript_to)
+ room_email = room.email_transcript_to
- if not recipients:
+ # Build two groups: with link and without link
+ with_link = [
+ r["email"] for r in meeting_recipients if r.get("include_link", True)
+ ]
+ without_link = [
+ r["email"] for r in meeting_recipients if not r.get("include_link", True)
+ ]
+
+ if room_email:
+ if room_email not in with_link:
+ with_link.append(room_email)
+ without_link = [e for e in without_link if e != room_email]
+
+ if not with_link and not without_link:
ctx.log("send_email skipped (no email recipients)")
return EmailResult(skipped=True)
@@ -420,7 +439,15 @@ async def send_email(input: LivePostPipelineInput, ctx: Context) -> EmailResult:
if meeting and meeting.email_recipients:
await transcripts_controller.update(transcript, {"share_mode": "public"})
- count = await send_transcript_email(recipients, transcript)
+ count = 0
+ if with_link:
+ count += await send_transcript_email(
+ with_link, transcript, include_link=True
+ )
+ if without_link:
+ count += await send_transcript_email(
+ without_link, transcript, include_link=False
+ )
ctx.log(f"send_email complete: sent {count} emails")
return EmailResult(emails_sent=count)
diff --git a/server/reflector/views/meetings.py b/server/reflector/views/meetings.py
index 7f256c41..78137ca9 100644
--- a/server/reflector/views/meetings.py
+++ b/server/reflector/views/meetings.py
@@ -168,8 +168,9 @@ async def add_email_recipient(
if not meeting:
raise HTTPException(status_code=404, detail="Meeting not found")
+ include_link = user is not None
recipients = await meetings_controller.add_email_recipient(
- meeting_id, request.email
+ meeting_id, request.email, include_link=include_link
)
return {"status": "success", "email_recipients": recipients}
diff --git a/server/reflector/views/transcripts.py b/server/reflector/views/transcripts.py
index 171c3d81..d49aa5a6 100644
--- a/server/reflector/views/transcripts.py
+++ b/server/reflector/views/transcripts.py
@@ -797,5 +797,7 @@ async def transcript_send_email(
)
if not transcript:
raise HTTPException(status_code=404, detail="Transcript not found")
- sent = await send_transcript_email([request.email], transcript)
+ sent = await send_transcript_email(
+ [request.email], transcript, include_link=(transcript.share_mode == "public")
+ )
return SendEmailResponse(sent=sent)
diff --git a/server/reflector/worker/app.py b/server/reflector/worker/app.py
index 2c9d2ae8..7df74263 100644
--- a/server/reflector/worker/app.py
+++ b/server/reflector/worker/app.py
@@ -146,7 +146,6 @@ else:
app.conf.broker_connection_retry_on_startup = True
app.autodiscover_tasks(
[
- "reflector.pipelines.main_live_pipeline",
"reflector.worker.healthcheck",
"reflector.worker.process",
"reflector.worker.cleanup",
diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py
index c0381769..aa7d8042 100644
--- a/server/reflector/worker/process.py
+++ b/server/reflector/worker/process.py
@@ -12,6 +12,7 @@ from celery import shared_task
from celery.utils.log import get_task_logger
from pydantic import ValidationError
+from reflector.asynctask import asynctask
from reflector.dailyco_api import FinishedRecordingResponse, RecordingResponse
from reflector.db.daily_participant_sessions import (
DailyParticipantSession,
@@ -25,9 +26,6 @@ from reflector.db.transcripts import (
transcripts_controller,
)
from reflector.hatchet.client import HatchetClientManager
-from reflector.pipelines.main_live_pipeline import asynctask
-from reflector.pipelines.topic_processing import EmptyPipeline
-from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.redis_cache import RedisAsyncLock
from reflector.settings import settings
@@ -908,6 +906,11 @@ async def convert_audio_and_waveform(transcript) -> None:
transcript_id=transcript.id,
)
+ from reflector.pipelines.topic_processing import EmptyPipeline # noqa: PLC0415
+ from reflector.processors.audio_file_writer import (
+ AudioFileWriterProcessor, # noqa: PLC0415
+ )
+
upload_path = transcript.data_path / "upload.webm"
mp3_path = transcript.audio_mp3_filename
diff --git a/server/reflector/worker/webhook.py b/server/reflector/worker/webhook.py
index a31df897..ecbf67e7 100644
--- a/server/reflector/worker/webhook.py
+++ b/server/reflector/worker/webhook.py
@@ -8,8 +8,8 @@ import structlog
from celery import shared_task
from celery.utils.log import get_task_logger
+from reflector.asynctask import asynctask
from reflector.db.rooms import rooms_controller
-from reflector.pipelines.main_live_pipeline import asynctask
from reflector.utils.webhook import (
WebhookRoomPayload,
WebhookTestPayload,
diff --git a/server/reflector/ws_manager.py b/server/reflector/ws_manager.py
index 48f178f5..9c187714 100644
--- a/server/reflector/ws_manager.py
+++ b/server/reflector/ws_manager.py
@@ -107,7 +107,8 @@ class WebsocketManager:
while True:
# timeout=1.0 prevents tight CPU loop when no messages available
message = await pubsub_subscriber.get_message(
- ignore_subscribe_messages=True
+ ignore_subscribe_messages=True,
+ timeout=1.0,
)
if message is not None:
room_id = message["channel"].decode("utf-8")
diff --git a/server/tests/test_email.py b/server/tests/test_email.py
new file mode 100644
index 00000000..37c93f0e
--- /dev/null
+++ b/server/tests/test_email.py
@@ -0,0 +1,206 @@
+"""Tests for reflector.email — transcript email composition and sending."""
+
+from unittest.mock import AsyncMock, patch
+
+import pytest
+
+from reflector.db.transcripts import (
+ SourceKind,
+ Transcript,
+ TranscriptParticipant,
+ TranscriptTopic,
+)
+from reflector.email import (
+ _build_html,
+ _build_plain_text,
+ get_transcript_url,
+ send_transcript_email,
+)
+from reflector.processors.types import Word
+
+
+def _make_transcript(
+ *,
+ title: str | None = "Weekly Standup",
+ short_summary: str | None = "Team discussed sprint progress.",
+ with_topics: bool = True,
+ share_mode: str = "private",
+ source_kind: SourceKind = SourceKind.FILE,
+) -> Transcript:
+ topics = []
+ participants = []
+ if with_topics:
+ participants = [
+ TranscriptParticipant(id="p1", speaker=0, name="Alice"),
+ TranscriptParticipant(id="p2", speaker=1, name="Bob"),
+ ]
+ topics = [
+ TranscriptTopic(
+ title="Intro",
+ summary="Greetings",
+ timestamp=0.0,
+ duration=10.0,
+ words=[
+ Word(text="Hello", start=0.0, end=0.5, speaker=0),
+ Word(text="everyone", start=0.5, end=1.0, speaker=0),
+ Word(text="Thanks", start=5.0, end=5.5, speaker=1),
+ Word(text="for", start=5.5, end=5.8, speaker=1),
+ Word(text="joining", start=5.8, end=6.2, speaker=1),
+ ],
+ ),
+ ]
+ return Transcript(
+ id="tx-123",
+ title=title,
+ short_summary=short_summary,
+ topics=topics,
+ participants=participants,
+ share_mode=share_mode,
+ source_kind=source_kind,
+ )
+
+
+URL = "http://localhost:3000/transcripts/tx-123"
+
+
+class TestBuildPlainText:
+ def test_full_content_with_link(self):
+ t = _make_transcript()
+ text = _build_plain_text(t, URL, include_link=True)
+
+ assert text.startswith("Reflector: Weekly Standup")
+ assert "Team discussed sprint progress." in text
+ assert "[00:00] Alice:" in text
+ assert "[00:05] Bob:" in text
+ assert URL in text
+
+ def test_full_content_without_link(self):
+ t = _make_transcript()
+ text = _build_plain_text(t, URL, include_link=False)
+
+ assert "Reflector: Weekly Standup" in text
+ assert "Team discussed sprint progress." in text
+ assert "[00:00] Alice:" in text
+ assert URL not in text
+
+ def test_no_summary(self):
+ t = _make_transcript(short_summary=None)
+ text = _build_plain_text(t, URL, include_link=True)
+
+ assert "Summary:" not in text
+ assert "[00:00] Alice:" in text
+
+ def test_no_topics(self):
+ t = _make_transcript(with_topics=False)
+ text = _build_plain_text(t, URL, include_link=True)
+
+ assert "Transcript:" not in text
+ assert "Reflector: Weekly Standup" in text
+
+ def test_unnamed_recording(self):
+ t = _make_transcript(title=None)
+ text = _build_plain_text(t, URL, include_link=True)
+
+ assert "Reflector: Unnamed recording" in text
+
+
+class TestBuildHtml:
+ def test_full_content_with_link(self):
+ t = _make_transcript()
+ html = _build_html(t, URL, include_link=True)
+
+ assert "Weekly Standup" in html
+ assert "Team discussed sprint progress." in html
+ assert "Alice" in html
+ assert "Bob" in html
+ assert URL in html
+ assert "View Transcript" in html
+
+ def test_full_content_without_link(self):
+ t = _make_transcript()
+ html = _build_html(t, URL, include_link=False)
+
+ assert "Weekly Standup" in html
+ assert "Alice" in html
+ assert URL not in html
+ assert "View Transcript" not in html
+
+ def test_no_summary(self):
+ t = _make_transcript(short_summary=None)
+ html = _build_html(t, URL, include_link=True)
+
+ assert "sprint progress" not in html
+ assert "Alice" in html
+
+ def test_no_topics(self):
+ t = _make_transcript(with_topics=False)
+ html = _build_html(t, URL, include_link=True)
+
+ assert "Transcript" not in html or "View Transcript" in html
+
+ def test_html_escapes_title(self):
+ t = _make_transcript(title='')
+ html = _build_html(t, URL, include_link=True)
+
+ assert "