mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-04-12 08:26:53 +00:00
Compare commits
4 Commits
dependabot
...
v0.45.0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5cefc39972 | ||
|
|
739cd51375 | ||
|
|
ee8db36f2c | ||
|
|
5f0c5635eb |
13
CHANGELOG.md
13
CHANGELOG.md
@@ -1,5 +1,18 @@
|
||||
# Changelog
|
||||
|
||||
## [0.45.0](https://github.com/GreyhavenHQ/reflector/compare/v0.44.0...v0.45.0) (2026-04-09)
|
||||
|
||||
|
||||
### Features
|
||||
|
||||
* make video recording optional, deleting video tracks ([#954](https://github.com/GreyhavenHQ/reflector/issues/954)) ([ee8db36](https://github.com/GreyhavenHQ/reflector/commit/ee8db36f2cd93b8f1ff4f4318e331fe2bac219c5))
|
||||
|
||||
|
||||
### Bug Fixes
|
||||
|
||||
* better topic chunking and subject extraction ([#952](https://github.com/GreyhavenHQ/reflector/issues/952)) ([5f0c563](https://github.com/GreyhavenHQ/reflector/commit/5f0c5635eb77955b70168242ad7c336a20c98dd0))
|
||||
* inline imports ([#955](https://github.com/GreyhavenHQ/reflector/issues/955)) ([739cd51](https://github.com/GreyhavenHQ/reflector/commit/739cd513751cd52d8e3d6d80b64568b1cf409414))
|
||||
|
||||
## [0.44.0](https://github.com/GreyhavenHQ/reflector/compare/v0.43.0...v0.44.0) (2026-04-07)
|
||||
|
||||
|
||||
|
||||
@@ -201,4 +201,4 @@ If you need to do any worker/pipeline related work, search for "Pipeline" classe
|
||||
## Code Style
|
||||
|
||||
- Always put imports at the top of the file. Let ruff/pre-commit handle sorting and formatting of imports.
|
||||
- Exception: In Hatchet pipeline task functions, DB controller imports (e.g., `transcripts_controller`, `meetings_controller`) stay as deferred/inline imports inside `fresh_db_connection()` blocks — this is intentional to avoid sharing DB connections across forked processes. Non-DB imports (utilities, services) should still go at the top of the file.
|
||||
- The **only** imports allowed to remain inline are from `reflector.db.*` modules (e.g., `reflector.db.transcripts`, `reflector.db.meetings`, `reflector.db.recordings`, `reflector.db.rooms`). These stay as deferred/inline imports inside `fresh_db_connection()` blocks in Hatchet pipeline task functions — this is intentional to avoid sharing DB connections across forked processes. All other imports (utilities, services, processors, storage, third-party libs) **must** go at the top of the file, even in Hatchet workflows.
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
"""add store_video to room and meeting
|
||||
|
||||
Revision ID: c1d2e3f4a5b6
|
||||
Revises: b4c7e8f9a012
|
||||
Create Date: 2026-04-08 00:00:00.000000
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
revision: str = "c1d2e3f4a5b6"
|
||||
down_revision: Union[str, None] = "b4c7e8f9a012"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.add_column(
|
||||
"room",
|
||||
sa.Column(
|
||||
"store_video",
|
||||
sa.Boolean(),
|
||||
nullable=False,
|
||||
server_default=sa.false(),
|
||||
),
|
||||
)
|
||||
op.add_column(
|
||||
"meeting",
|
||||
sa.Column(
|
||||
"store_video",
|
||||
sa.Boolean(),
|
||||
nullable=False,
|
||||
server_default=sa.false(),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.drop_column("meeting", "store_video")
|
||||
op.drop_column("room", "store_video")
|
||||
@@ -69,6 +69,7 @@ meetings = sa.Table(
|
||||
sa.Column("daily_composed_video_duration", sa.Integer, nullable=True),
|
||||
# Email recipients for transcript notification
|
||||
sa.Column("email_recipients", JSONB, nullable=True),
|
||||
sa.Column("store_video", sa.Boolean, nullable=False, server_default=sa.false()),
|
||||
sa.Index("idx_meeting_room_id", "room_id"),
|
||||
sa.Index("idx_meeting_calendar_event", "calendar_event_id"),
|
||||
)
|
||||
@@ -122,6 +123,7 @@ class Meeting(BaseModel):
|
||||
# Email recipients for transcript notification
|
||||
# Each entry is {"email": str, "include_link": bool} or a legacy plain str
|
||||
email_recipients: list[dict | str] | None = None
|
||||
store_video: bool = False
|
||||
|
||||
|
||||
class MeetingController:
|
||||
@@ -152,6 +154,7 @@ class MeetingController:
|
||||
calendar_event_id=calendar_event_id,
|
||||
calendar_metadata=calendar_metadata,
|
||||
platform=room.platform,
|
||||
store_video=room.store_video,
|
||||
)
|
||||
query = meetings.insert().values(**meeting.model_dump())
|
||||
await get_database().execute(query)
|
||||
|
||||
@@ -64,6 +64,9 @@ rooms = sqlalchemy.Table(
|
||||
server_default=sqlalchemy.sql.false(),
|
||||
),
|
||||
sqlalchemy.Column("email_transcript_to", sqlalchemy.String, nullable=True),
|
||||
sqlalchemy.Column(
|
||||
"store_video", sqlalchemy.Boolean, nullable=False, server_default=false()
|
||||
),
|
||||
sqlalchemy.Index("idx_room_is_shared", "is_shared"),
|
||||
sqlalchemy.Index("idx_room_ics_enabled", "ics_enabled"),
|
||||
)
|
||||
@@ -94,6 +97,7 @@ class Room(BaseModel):
|
||||
platform: Platform = Field(default_factory=lambda: settings.DEFAULT_VIDEO_PLATFORM)
|
||||
skip_consent: bool = False
|
||||
email_transcript_to: str | None = None
|
||||
store_video: bool = False
|
||||
|
||||
|
||||
class RoomController:
|
||||
@@ -150,6 +154,7 @@ class RoomController:
|
||||
platform: Platform = settings.DEFAULT_VIDEO_PLATFORM,
|
||||
skip_consent: bool = False,
|
||||
email_transcript_to: str | None = None,
|
||||
store_video: bool = False,
|
||||
):
|
||||
"""
|
||||
Add a new room
|
||||
@@ -176,6 +181,7 @@ class RoomController:
|
||||
"platform": platform,
|
||||
"skip_consent": skip_consent,
|
||||
"email_transcript_to": email_transcript_to,
|
||||
"store_video": store_video,
|
||||
}
|
||||
|
||||
room = Room(**room_data)
|
||||
|
||||
@@ -10,6 +10,7 @@ from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.workflows.daily_multitrack_pipeline import (
|
||||
daily_multitrack_pipeline,
|
||||
)
|
||||
from reflector.hatchet.workflows.failed_runs_monitor import failed_runs_monitor
|
||||
from reflector.hatchet.workflows.file_pipeline import file_pipeline
|
||||
from reflector.hatchet.workflows.live_post_pipeline import live_post_pipeline
|
||||
from reflector.hatchet.workflows.subject_processing import subject_workflow
|
||||
@@ -54,10 +55,6 @@ def main():
|
||||
]
|
||||
)
|
||||
if _zulip_dag_enabled:
|
||||
from reflector.hatchet.workflows.failed_runs_monitor import ( # noqa: PLC0415
|
||||
failed_runs_monitor,
|
||||
)
|
||||
|
||||
workflows.append(failed_runs_monitor)
|
||||
logger.info(
|
||||
"FailedRunsMonitor cron enabled",
|
||||
|
||||
@@ -18,10 +18,11 @@ import json
|
||||
import tempfile
|
||||
import time
|
||||
from contextlib import asynccontextmanager
|
||||
from datetime import timedelta
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Coroutine, Protocol, TypeVar
|
||||
|
||||
import databases
|
||||
import httpx
|
||||
from hatchet_sdk import (
|
||||
ConcurrencyExpression,
|
||||
@@ -83,6 +84,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
|
||||
topic_chunk_workflow,
|
||||
)
|
||||
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
||||
from reflector.llm import LLM
|
||||
from reflector.logger import logger
|
||||
from reflector.pipelines import topic_processing
|
||||
from reflector.processors.audio_mixdown_auto import AudioMixdownAutoProcessor
|
||||
@@ -95,7 +97,9 @@ from reflector.processors.summary.prompts import (
|
||||
from reflector.processors.summary.summary_builder import SummaryBuilder
|
||||
from reflector.processors.types import TitleSummary, Word
|
||||
from reflector.processors.types import Transcript as TranscriptType
|
||||
from reflector.redis_cache import get_async_redis_client
|
||||
from reflector.settings import settings
|
||||
from reflector.storage import get_source_storage, get_transcripts_storage
|
||||
from reflector.utils.audio_constants import (
|
||||
PRESIGNED_URL_EXPIRATION_SECONDS,
|
||||
WAVEFORM_SEGMENTS,
|
||||
@@ -105,8 +109,16 @@ from reflector.utils.daily import (
|
||||
filter_cam_audio_tracks,
|
||||
parse_daily_recording_filename,
|
||||
)
|
||||
from reflector.utils.livekit import parse_livekit_track_filepath
|
||||
from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty
|
||||
from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT
|
||||
from reflector.utils.transcript_constants import (
|
||||
compute_max_subjects,
|
||||
compute_topic_chunk_size,
|
||||
)
|
||||
from reflector.utils.webhook import (
|
||||
fetch_transcript_webhook_payload,
|
||||
send_webhook_request,
|
||||
)
|
||||
from reflector.zulip import post_transcript_notification
|
||||
|
||||
|
||||
@@ -135,8 +147,6 @@ async def fresh_db_connection():
|
||||
The real fix would be making the db module fork-aware instead of bypassing it.
|
||||
Current pattern is acceptable given Hatchet's process model.
|
||||
"""
|
||||
import databases # noqa: PLC0415
|
||||
|
||||
from reflector.db import _database_context # noqa: PLC0415
|
||||
|
||||
_database_context.set(None)
|
||||
@@ -173,8 +183,6 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool:
|
||||
|
||||
def _spawn_storage():
|
||||
"""Create fresh storage instance for writing to our transcript bucket."""
|
||||
from reflector.storage import get_transcripts_storage # noqa: PLC0415
|
||||
|
||||
return get_transcripts_storage()
|
||||
|
||||
|
||||
@@ -388,10 +396,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
|
||||
|
||||
if input.source_platform == "livekit":
|
||||
# LiveKit: participant identity is in the track dict or can be parsed from filepath
|
||||
from reflector.utils.livekit import (
|
||||
parse_livekit_track_filepath, # noqa: PLC0415
|
||||
)
|
||||
|
||||
# Look up identity → Reflector user_id mapping from Redis
|
||||
# (stored at join time in rooms.py)
|
||||
identity_to_user_id: dict[str, str] = {}
|
||||
@@ -399,9 +403,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
|
||||
from reflector.db.meetings import (
|
||||
meetings_controller as mc, # noqa: PLC0415
|
||||
)
|
||||
from reflector.redis_cache import (
|
||||
get_async_redis_client, # noqa: PLC0415
|
||||
)
|
||||
|
||||
meeting = (
|
||||
await mc.get_by_id(transcript.meeting_id)
|
||||
@@ -543,12 +544,6 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
|
||||
# OGG files don't have embedded start_time metadata, so we pre-calculate.
|
||||
track_padding: dict[int, float] = {}
|
||||
if input.source_platform == "livekit":
|
||||
from datetime import datetime # noqa: PLC0415
|
||||
|
||||
from reflector.utils.livekit import (
|
||||
parse_livekit_track_filepath, # noqa: PLC0415
|
||||
)
|
||||
|
||||
timestamps = []
|
||||
for i, track in enumerate(input.tracks):
|
||||
ts_str = track.get("timestamp")
|
||||
@@ -885,7 +880,8 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
|
||||
transcripts_controller,
|
||||
)
|
||||
|
||||
chunk_size = TOPIC_CHUNK_WORD_COUNT
|
||||
duration_seconds = words[-1].end - words[0].start if words else 0
|
||||
chunk_size = compute_topic_chunk_size(duration_seconds, len(words))
|
||||
chunks = []
|
||||
for i in range(0, len(words), chunk_size):
|
||||
chunk_words = words[i : i + chunk_size]
|
||||
@@ -975,7 +971,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
|
||||
|
||||
ctx.log(f"detect_topics complete: found {len(topics_list)} topics")
|
||||
|
||||
return TopicsResult(topics=topics_list)
|
||||
return TopicsResult(topics=topics_list, duration_seconds=duration_seconds)
|
||||
|
||||
|
||||
@daily_multitrack_pipeline.task(
|
||||
@@ -1069,10 +1065,9 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
|
||||
participant_name_to_id={},
|
||||
)
|
||||
|
||||
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
||||
# sharing DB connections and LLM HTTP pools across forks
|
||||
# Deferred DB import: Hatchet workers fork processes, fresh imports avoid
|
||||
# sharing DB connections across forks
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
from reflector.llm import LLM # noqa: PLC0415
|
||||
|
||||
async with fresh_db_connection():
|
||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||
@@ -1112,8 +1107,14 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
|
||||
participant_names, participant_name_to_id=participant_name_to_id
|
||||
)
|
||||
|
||||
max_subjects = compute_max_subjects(topics_result.duration_seconds)
|
||||
ctx.log(
|
||||
f"extract_subjects: duration={topics_result.duration_seconds:.0f}s, "
|
||||
f"max_subjects={max_subjects}"
|
||||
)
|
||||
|
||||
ctx.log("extract_subjects: calling LLM to extract subjects")
|
||||
await builder.extract_subjects()
|
||||
await builder.extract_subjects(max_subjects=max_subjects)
|
||||
|
||||
ctx.log(f"extract_subjects complete: {len(builder.subjects)} subjects")
|
||||
|
||||
@@ -1196,14 +1197,13 @@ async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
|
||||
subjects_result = ctx.task_output(extract_subjects)
|
||||
process_result = ctx.task_output(process_subjects)
|
||||
|
||||
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
||||
# sharing DB connections and LLM HTTP pools across forks
|
||||
# Deferred DB import: Hatchet workers fork processes, fresh imports avoid
|
||||
# sharing DB connections across forks
|
||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
||||
TranscriptFinalLongSummary,
|
||||
TranscriptFinalShortSummary,
|
||||
transcripts_controller,
|
||||
)
|
||||
from reflector.llm import LLM # noqa: PLC0415
|
||||
|
||||
subject_summaries = process_result.subject_summaries
|
||||
|
||||
@@ -1292,13 +1292,12 @@ async def identify_action_items(
|
||||
ctx.log("identify_action_items: no transcript text, returning empty")
|
||||
return ActionItemsResult(action_items=ActionItemsResponse())
|
||||
|
||||
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
||||
# sharing DB connections and LLM HTTP pools across forks
|
||||
# Deferred DB import: Hatchet workers fork processes, fresh imports avoid
|
||||
# sharing DB connections across forks
|
||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
||||
TranscriptActionItems,
|
||||
transcripts_controller,
|
||||
)
|
||||
from reflector.llm import LLM # noqa: PLC0415
|
||||
|
||||
# TODO: refactor SummaryBuilder methods into standalone functions
|
||||
llm = LLM(settings=settings)
|
||||
@@ -1435,10 +1434,6 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
|
||||
)
|
||||
from reflector.db.recordings import recordings_controller # noqa: PLC0415
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
from reflector.storage import ( # noqa: PLC0415
|
||||
get_source_storage,
|
||||
get_transcripts_storage,
|
||||
)
|
||||
|
||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||
if not transcript:
|
||||
@@ -1587,10 +1582,6 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||
from reflector.utils.webhook import ( # noqa: PLC0415
|
||||
fetch_transcript_webhook_payload,
|
||||
send_webhook_request,
|
||||
)
|
||||
|
||||
room = await rooms_controller.get_by_id(input.room_id)
|
||||
if not room or not room.webhook_url:
|
||||
|
||||
@@ -15,6 +15,8 @@ import json
|
||||
from datetime import timedelta
|
||||
from pathlib import Path
|
||||
|
||||
import av
|
||||
import httpx
|
||||
from hatchet_sdk import Context
|
||||
from pydantic import BaseModel
|
||||
|
||||
@@ -47,9 +49,30 @@ from reflector.hatchet.workflows.models import (
|
||||
)
|
||||
from reflector.logger import logger
|
||||
from reflector.pipelines import topic_processing
|
||||
from reflector.pipelines.transcription_helpers import transcribe_file_with_processor
|
||||
from reflector.processors import AudioFileWriterProcessor
|
||||
from reflector.processors.file_diarization import FileDiarizationInput
|
||||
from reflector.processors.file_diarization_auto import FileDiarizationAutoProcessor
|
||||
from reflector.processors.transcript_diarization_assembler import (
|
||||
TranscriptDiarizationAssemblerInput,
|
||||
TranscriptDiarizationAssemblerProcessor,
|
||||
)
|
||||
from reflector.processors.types import (
|
||||
DiarizationSegment,
|
||||
Word,
|
||||
)
|
||||
from reflector.processors.types import (
|
||||
Transcript as TranscriptType,
|
||||
)
|
||||
from reflector.settings import settings
|
||||
from reflector.storage import get_source_storage, get_transcripts_storage
|
||||
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
|
||||
from reflector.utils.audio_waveform import get_audio_waveform
|
||||
from reflector.utils.webhook import (
|
||||
fetch_transcript_webhook_payload,
|
||||
send_webhook_request,
|
||||
)
|
||||
from reflector.zulip import post_transcript_notification
|
||||
|
||||
|
||||
class FilePipelineInput(BaseModel):
|
||||
@@ -135,10 +158,6 @@ async def extract_audio(input: FilePipelineInput, ctx: Context) -> ExtractAudioR
|
||||
ctx.log(f"extract_audio: processing {audio_file}")
|
||||
|
||||
# Extract audio and write as MP3
|
||||
import av # noqa: PLC0415
|
||||
|
||||
from reflector.processors import AudioFileWriterProcessor # noqa: PLC0415
|
||||
|
||||
duration_ms_container = [0.0]
|
||||
|
||||
async def capture_duration(d):
|
||||
@@ -189,8 +208,6 @@ async def upload_audio(input: FilePipelineInput, ctx: Context) -> UploadAudioRes
|
||||
extract_result = ctx.task_output(extract_audio)
|
||||
audio_path = extract_result.audio_path
|
||||
|
||||
from reflector.storage import get_transcripts_storage # noqa: PLC0415
|
||||
|
||||
storage = get_transcripts_storage()
|
||||
if not storage:
|
||||
raise ValueError(
|
||||
@@ -232,10 +249,6 @@ async def transcribe(input: FilePipelineInput, ctx: Context) -> TranscribeResult
|
||||
raise ValueError(f"Transcript {input.transcript_id} not found")
|
||||
source_language = transcript.source_language
|
||||
|
||||
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
|
||||
transcribe_file_with_processor,
|
||||
)
|
||||
|
||||
result = await transcribe_file_with_processor(audio_url, source_language)
|
||||
|
||||
ctx.log(f"transcribe complete: {len(result.words)} words")
|
||||
@@ -264,13 +277,6 @@ async def diarize(input: FilePipelineInput, ctx: Context) -> DiarizeResult:
|
||||
upload_result = ctx.task_output(upload_audio)
|
||||
audio_url = upload_result.audio_url
|
||||
|
||||
from reflector.processors.file_diarization import ( # noqa: PLC0415
|
||||
FileDiarizationInput,
|
||||
)
|
||||
from reflector.processors.file_diarization_auto import ( # noqa: PLC0415
|
||||
FileDiarizationAutoProcessor,
|
||||
)
|
||||
|
||||
processor = FileDiarizationAutoProcessor()
|
||||
input_data = FileDiarizationInput(audio_url=audio_url)
|
||||
|
||||
@@ -353,18 +359,6 @@ async def assemble_transcript(
|
||||
transcribe_result = ctx.task_output(transcribe)
|
||||
diarize_result = ctx.task_output(diarize)
|
||||
|
||||
from reflector.processors.transcript_diarization_assembler import ( # noqa: PLC0415
|
||||
TranscriptDiarizationAssemblerInput,
|
||||
TranscriptDiarizationAssemblerProcessor,
|
||||
)
|
||||
from reflector.processors.types import ( # noqa: PLC0415
|
||||
DiarizationSegment,
|
||||
Word,
|
||||
)
|
||||
from reflector.processors.types import ( # noqa: PLC0415
|
||||
Transcript as TranscriptType,
|
||||
)
|
||||
|
||||
words = [Word(**w) for w in transcribe_result.words]
|
||||
transcript_data = TranscriptType(
|
||||
words=words, translation=transcribe_result.translation
|
||||
@@ -437,17 +431,6 @@ async def detect_topics(input: FilePipelineInput, ctx: Context) -> TopicsResult:
|
||||
TranscriptTopic,
|
||||
transcripts_controller,
|
||||
)
|
||||
from reflector.processors.transcript_diarization_assembler import ( # noqa: PLC0415
|
||||
TranscriptDiarizationAssemblerInput,
|
||||
TranscriptDiarizationAssemblerProcessor,
|
||||
)
|
||||
from reflector.processors.types import ( # noqa: PLC0415
|
||||
DiarizationSegment,
|
||||
Word,
|
||||
)
|
||||
from reflector.processors.types import ( # noqa: PLC0415
|
||||
Transcript as TranscriptType,
|
||||
)
|
||||
|
||||
words = [Word(**w) for w in transcribe_result.words]
|
||||
transcript_data = TranscriptType(
|
||||
@@ -688,10 +671,6 @@ async def cleanup_consent(input: FilePipelineInput, ctx: Context) -> ConsentResu
|
||||
)
|
||||
from reflector.db.recordings import recordings_controller # noqa: PLC0415
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
from reflector.storage import ( # noqa: PLC0415
|
||||
get_source_storage,
|
||||
get_transcripts_storage,
|
||||
)
|
||||
|
||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||
if not transcript:
|
||||
@@ -807,7 +786,6 @@ async def post_zulip(input: FilePipelineInput, ctx: Context) -> ZulipResult:
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
from reflector.zulip import post_transcript_notification # noqa: PLC0415
|
||||
|
||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||
if transcript:
|
||||
@@ -837,10 +815,6 @@ async def send_webhook(input: FilePipelineInput, ctx: Context) -> WebhookResult:
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||
from reflector.utils.webhook import ( # noqa: PLC0415
|
||||
fetch_transcript_webhook_payload,
|
||||
send_webhook_request,
|
||||
)
|
||||
|
||||
room = await rooms_controller.get_by_id(input.room_id)
|
||||
if not room or not room.webhook_url:
|
||||
@@ -856,8 +830,6 @@ async def send_webhook(input: FilePipelineInput, ctx: Context) -> WebhookResult:
|
||||
ctx.log(f"send_webhook skipped (could not build payload): {payload}")
|
||||
return WebhookResult(webhook_sent=False, skipped=True)
|
||||
|
||||
import httpx # noqa: PLC0415
|
||||
|
||||
try:
|
||||
response = await send_webhook_request(
|
||||
url=room.webhook_url,
|
||||
|
||||
@@ -14,6 +14,7 @@ are not shared across forks, avoiding connection pooling issues.
|
||||
|
||||
from datetime import timedelta
|
||||
|
||||
import httpx
|
||||
from hatchet_sdk import Context
|
||||
from pydantic import BaseModel
|
||||
|
||||
@@ -40,7 +41,24 @@ from reflector.hatchet.workflows.models import (
|
||||
ZulipResult,
|
||||
)
|
||||
from reflector.logger import logger
|
||||
from reflector.pipelines.main_live_pipeline import (
|
||||
PipelineMainTitle,
|
||||
PipelineMainWaveform,
|
||||
pipeline_convert_to_mp3,
|
||||
pipeline_diarization,
|
||||
pipeline_remove_upload,
|
||||
pipeline_summaries,
|
||||
pipeline_upload_mp3,
|
||||
)
|
||||
from reflector.pipelines.main_live_pipeline import (
|
||||
cleanup_consent as _cleanup_consent,
|
||||
)
|
||||
from reflector.settings import settings
|
||||
from reflector.utils.webhook import (
|
||||
fetch_transcript_webhook_payload,
|
||||
send_webhook_request,
|
||||
)
|
||||
from reflector.zulip import post_transcript_notification
|
||||
|
||||
|
||||
class LivePostPipelineInput(BaseModel):
|
||||
@@ -91,9 +109,6 @@ async def waveform(input: LivePostPipelineInput, ctx: Context) -> WaveformResult
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
||||
PipelineMainWaveform,
|
||||
)
|
||||
|
||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||
if not transcript:
|
||||
@@ -118,10 +133,6 @@ async def generate_title(input: LivePostPipelineInput, ctx: Context) -> TitleRes
|
||||
ctx.log(f"generate_title: starting for transcript_id={input.transcript_id}")
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
||||
PipelineMainTitle,
|
||||
)
|
||||
|
||||
runner = PipelineMainTitle(transcript_id=input.transcript_id)
|
||||
await runner.run()
|
||||
|
||||
@@ -142,10 +153,6 @@ async def convert_mp3(input: LivePostPipelineInput, ctx: Context) -> ConvertMp3R
|
||||
ctx.log(f"convert_mp3: starting for transcript_id={input.transcript_id}")
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
||||
pipeline_convert_to_mp3,
|
||||
)
|
||||
|
||||
await pipeline_convert_to_mp3(transcript_id=input.transcript_id)
|
||||
|
||||
ctx.log("convert_mp3 complete")
|
||||
@@ -165,10 +172,6 @@ async def upload_mp3(input: LivePostPipelineInput, ctx: Context) -> UploadMp3Res
|
||||
ctx.log(f"upload_mp3: starting for transcript_id={input.transcript_id}")
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
||||
pipeline_upload_mp3,
|
||||
)
|
||||
|
||||
await pipeline_upload_mp3(transcript_id=input.transcript_id)
|
||||
|
||||
ctx.log("upload_mp3 complete")
|
||||
@@ -190,10 +193,6 @@ async def remove_upload(
|
||||
ctx.log(f"remove_upload: starting for transcript_id={input.transcript_id}")
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
||||
pipeline_remove_upload,
|
||||
)
|
||||
|
||||
await pipeline_remove_upload(transcript_id=input.transcript_id)
|
||||
|
||||
ctx.log("remove_upload complete")
|
||||
@@ -213,10 +212,6 @@ async def diarize(input: LivePostPipelineInput, ctx: Context) -> DiarizeResult:
|
||||
ctx.log(f"diarize: starting for transcript_id={input.transcript_id}")
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
||||
pipeline_diarization,
|
||||
)
|
||||
|
||||
await pipeline_diarization(transcript_id=input.transcript_id)
|
||||
|
||||
ctx.log("diarize complete")
|
||||
@@ -236,10 +231,6 @@ async def cleanup_consent(input: LivePostPipelineInput, ctx: Context) -> Consent
|
||||
ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}")
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
||||
cleanup_consent as _cleanup_consent,
|
||||
)
|
||||
|
||||
await _cleanup_consent(transcript_id=input.transcript_id)
|
||||
|
||||
ctx.log("cleanup_consent complete")
|
||||
@@ -261,10 +252,6 @@ async def final_summaries(
|
||||
ctx.log(f"final_summaries: starting for transcript_id={input.transcript_id}")
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
||||
pipeline_summaries,
|
||||
)
|
||||
|
||||
await pipeline_summaries(transcript_id=input.transcript_id)
|
||||
|
||||
ctx.log("final_summaries complete")
|
||||
@@ -289,7 +276,6 @@ async def post_zulip(input: LivePostPipelineInput, ctx: Context) -> ZulipResult:
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||
from reflector.zulip import post_transcript_notification # noqa: PLC0415
|
||||
|
||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||
if transcript:
|
||||
@@ -319,10 +305,6 @@ async def send_webhook(input: LivePostPipelineInput, ctx: Context) -> WebhookRes
|
||||
|
||||
async with fresh_db_connection():
|
||||
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||
from reflector.utils.webhook import ( # noqa: PLC0415
|
||||
fetch_transcript_webhook_payload,
|
||||
send_webhook_request,
|
||||
)
|
||||
|
||||
room = await rooms_controller.get_by_id(input.room_id)
|
||||
if not room or not room.webhook_url:
|
||||
@@ -338,8 +320,6 @@ async def send_webhook(input: LivePostPipelineInput, ctx: Context) -> WebhookRes
|
||||
ctx.log(f"send_webhook skipped (could not build payload): {payload}")
|
||||
return WebhookResult(webhook_sent=False, skipped=True)
|
||||
|
||||
import httpx # noqa: PLC0415
|
||||
|
||||
try:
|
||||
response = await send_webhook_request(
|
||||
url=room.webhook_url,
|
||||
|
||||
@@ -102,6 +102,7 @@ class TopicsResult(BaseModel):
|
||||
"""Result from detect_topics task."""
|
||||
|
||||
topics: list[TitleSummary]
|
||||
duration_seconds: float = 0
|
||||
|
||||
|
||||
class TitleResult(BaseModel):
|
||||
|
||||
@@ -13,6 +13,8 @@ from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.constants import TIMEOUT_AUDIO
|
||||
from reflector.hatchet.workflows.models import PadTrackResult
|
||||
from reflector.logger import logger
|
||||
from reflector.processors.audio_padding_auto import AudioPaddingAutoProcessor
|
||||
from reflector.storage import get_source_storage, get_transcripts_storage
|
||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
||||
from reflector.utils.audio_padding import extract_stream_start_time_from_container
|
||||
|
||||
@@ -51,11 +53,6 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
|
||||
)
|
||||
|
||||
try:
|
||||
from reflector.storage import ( # noqa: PLC0415
|
||||
get_source_storage,
|
||||
get_transcripts_storage,
|
||||
)
|
||||
|
||||
# Source reads: use platform-specific credentials
|
||||
source_storage = get_source_storage(input.source_platform)
|
||||
source_url = await source_storage.get_file_url(
|
||||
@@ -104,10 +101,6 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
|
||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
||||
)
|
||||
|
||||
from reflector.processors.audio_padding_auto import ( # noqa: PLC0415
|
||||
AudioPaddingAutoProcessor,
|
||||
)
|
||||
|
||||
processor = AudioPaddingAutoProcessor()
|
||||
result = await processor.pad_track(
|
||||
track_url=source_url,
|
||||
|
||||
@@ -15,12 +15,14 @@ from pydantic import BaseModel
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_HEAVY
|
||||
from reflector.hatchet.workflows.models import SubjectSummaryResult
|
||||
from reflector.llm import LLM
|
||||
from reflector.logger import logger
|
||||
from reflector.processors.summary.prompts import (
|
||||
DETAILED_SUBJECT_PROMPT_TEMPLATE,
|
||||
PARAGRAPH_SUMMARY_PROMPT,
|
||||
build_participant_instructions,
|
||||
)
|
||||
from reflector.settings import settings
|
||||
|
||||
|
||||
class SubjectInput(BaseModel):
|
||||
@@ -60,11 +62,6 @@ async def generate_detailed_summary(
|
||||
subject_index=input.subject_index,
|
||||
)
|
||||
|
||||
# Deferred imports: Hatchet workers fork processes, fresh imports ensure
|
||||
# LLM HTTP connection pools aren't shared across forks
|
||||
from reflector.llm import LLM # noqa: PLC0415
|
||||
from reflector.settings import settings # noqa: PLC0415
|
||||
|
||||
llm = LLM(settings=settings)
|
||||
|
||||
participant_instructions = build_participant_instructions(input.participant_names)
|
||||
|
||||
@@ -18,9 +18,13 @@ from pydantic import BaseModel
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
|
||||
from reflector.hatchet.workflows.models import TopicChunkResult
|
||||
from reflector.llm import LLM
|
||||
from reflector.logger import logger
|
||||
from reflector.processors.prompts import TOPIC_PROMPT
|
||||
from reflector.processors.transcript_topic_detector import TopicResponse
|
||||
from reflector.processors.types import Word
|
||||
from reflector.settings import settings
|
||||
from reflector.utils.text import clean_title
|
||||
|
||||
|
||||
class TopicChunkInput(BaseModel):
|
||||
@@ -64,15 +68,6 @@ async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunk
|
||||
text_length=len(input.chunk_text),
|
||||
)
|
||||
|
||||
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
||||
# sharing LLM HTTP connection pools across forks
|
||||
from reflector.llm import LLM # noqa: PLC0415
|
||||
from reflector.processors.transcript_topic_detector import ( # noqa: PLC0415
|
||||
TopicResponse,
|
||||
)
|
||||
from reflector.settings import settings # noqa: PLC0415
|
||||
from reflector.utils.text import clean_title # noqa: PLC0415
|
||||
|
||||
llm = LLM(settings=settings, temperature=0.9)
|
||||
|
||||
prompt = TOPIC_PROMPT.format(text=input.chunk_text)
|
||||
|
||||
@@ -9,9 +9,9 @@ because Hatchet workflow DAGs are defined statically, but the number of tracks v
|
||||
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
|
||||
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
|
||||
|
||||
Note: This file uses deferred imports (inside tasks) intentionally.
|
||||
Note: DB imports (reflector.db.*) are kept inline (deferred) intentionally.
|
||||
Hatchet workers run in forked processes; fresh imports per task ensure
|
||||
storage/DB connections are not shared across forks.
|
||||
DB connections are not shared across forks.
|
||||
"""
|
||||
|
||||
from datetime import timedelta
|
||||
@@ -24,6 +24,9 @@ from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
|
||||
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
|
||||
from reflector.logger import logger
|
||||
from reflector.pipelines.transcription_helpers import transcribe_file_with_processor
|
||||
from reflector.processors.audio_padding_auto import AudioPaddingAutoProcessor
|
||||
from reflector.storage import get_source_storage, get_transcripts_storage
|
||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
||||
from reflector.utils.audio_padding import extract_stream_start_time_from_container
|
||||
|
||||
@@ -72,11 +75,6 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
||||
)
|
||||
|
||||
try:
|
||||
from reflector.storage import ( # noqa: PLC0415
|
||||
get_source_storage,
|
||||
get_transcripts_storage,
|
||||
)
|
||||
|
||||
# Source reads: use platform-specific credentials
|
||||
source_storage = get_source_storage(input.source_platform)
|
||||
source_url = await source_storage.get_file_url(
|
||||
@@ -120,10 +118,6 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
||||
)
|
||||
|
||||
from reflector.processors.audio_padding_auto import ( # noqa: PLC0415
|
||||
AudioPaddingAutoProcessor,
|
||||
)
|
||||
|
||||
processor = AudioPaddingAutoProcessor()
|
||||
result = await processor.pad_track(
|
||||
track_url=source_url,
|
||||
@@ -179,11 +173,6 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
|
||||
raise ValueError("Missing padded_key from pad_track")
|
||||
|
||||
# Presign URL on demand (avoids stale URLs on workflow replay)
|
||||
from reflector.storage import ( # noqa: PLC0415
|
||||
get_source_storage,
|
||||
get_transcripts_storage,
|
||||
)
|
||||
|
||||
# If bucket_name is set, file is still in the platform's source bucket (no padding applied).
|
||||
# If bucket_name is None, padded file was written to our transcript storage.
|
||||
if bucket_name:
|
||||
@@ -198,10 +187,6 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
|
||||
bucket=bucket_name,
|
||||
)
|
||||
|
||||
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
|
||||
transcribe_file_with_processor,
|
||||
)
|
||||
|
||||
transcript = await transcribe_file_with_processor(audio_url, input.language)
|
||||
|
||||
# Tag all words with speaker index
|
||||
|
||||
@@ -38,6 +38,7 @@ from reflector.db.transcripts import (
|
||||
TranscriptWaveform,
|
||||
transcripts_controller,
|
||||
)
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.logger import logger
|
||||
from reflector.pipelines.runner import PipelineMessage, PipelineRunner
|
||||
from reflector.processors import (
|
||||
@@ -814,8 +815,6 @@ async def pipeline_post(*, transcript_id: str, room_id: str | None = None):
|
||||
"""
|
||||
Run the post pipeline via Hatchet.
|
||||
"""
|
||||
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
|
||||
|
||||
await HatchetClientManager.start_workflow(
|
||||
"LivePostProcessingPipeline",
|
||||
{
|
||||
|
||||
@@ -18,7 +18,7 @@ from reflector.processors import (
|
||||
)
|
||||
from reflector.processors.types import TitleSummary
|
||||
from reflector.processors.types import Transcript as TranscriptType
|
||||
from reflector.utils.transcript_constants import TOPIC_CHUNK_WORD_COUNT
|
||||
from reflector.utils.transcript_constants import compute_topic_chunk_size
|
||||
|
||||
|
||||
class EmptyPipeline:
|
||||
@@ -39,7 +39,10 @@ async def detect_topics(
|
||||
on_topic_callback: Callable,
|
||||
empty_pipeline: EmptyPipeline,
|
||||
) -> list[TitleSummary]:
|
||||
chunk_size = TOPIC_CHUNK_WORD_COUNT
|
||||
duration_seconds = (
|
||||
transcript.words[-1].end - transcript.words[0].start if transcript.words else 0
|
||||
)
|
||||
chunk_size = compute_topic_chunk_size(duration_seconds, len(transcript.words))
|
||||
topics: list[TitleSummary] = []
|
||||
|
||||
async def on_topic(topic: TitleSummary):
|
||||
|
||||
@@ -10,6 +10,7 @@ import os
|
||||
import tempfile
|
||||
|
||||
import av
|
||||
import requests
|
||||
|
||||
from reflector.logger import logger
|
||||
from reflector.processors.audio_padding import AudioPaddingProcessor, PaddingResponse
|
||||
@@ -65,8 +66,6 @@ class AudioPaddingPyavProcessor(AudioPaddingProcessor):
|
||||
track_index: int,
|
||||
) -> PaddingResponse:
|
||||
"""Blocking padding work: download, pad with PyAV, upload."""
|
||||
import requests
|
||||
|
||||
log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds)
|
||||
temp_dir = tempfile.mkdtemp()
|
||||
input_path = None
|
||||
|
||||
@@ -43,7 +43,8 @@ DETAILED_SUBJECT_PROMPT_TEMPLATE = dedent(
|
||||
include any deadlines or timeframes discussed for completion or follow-up.
|
||||
- Mention unresolved issues or topics needing further discussion, aiding in
|
||||
planning future meetings or follow-up actions.
|
||||
- Do not include topic unrelated to {subject}.
|
||||
- Be specific and cite participant names when attributing statements or actions.
|
||||
- Do not include topics unrelated to {subject}.
|
||||
|
||||
# OUTPUT
|
||||
Your summary should be clear, concise, and structured, covering all major
|
||||
@@ -58,6 +59,7 @@ PARAGRAPH_SUMMARY_PROMPT = dedent(
|
||||
"""
|
||||
Summarize the mentioned topic in 1 paragraph.
|
||||
It will be integrated into the final summary, so just for this topic.
|
||||
Preserve key decisions and action items. Do not introduce new information.
|
||||
"""
|
||||
).strip()
|
||||
|
||||
|
||||
@@ -48,17 +48,24 @@ TRANSCRIPTION_TYPE_PROMPT = dedent(
|
||||
"""
|
||||
).strip()
|
||||
|
||||
SUBJECTS_PROMPT = dedent(
|
||||
"""
|
||||
What are the main / high level topic of the meeting.
|
||||
Do not include direct quotes or unnecessary details.
|
||||
Be concise and focused on the main ideas.
|
||||
A subject briefly mentioned should not be included.
|
||||
There should be maximum 6 subjects.
|
||||
Do not write complete narrative sentences for the subject,
|
||||
you must write a concise subject using noun phrases.
|
||||
"""
|
||||
).strip()
|
||||
_DEFAULT_MAX_SUBJECTS = 6
|
||||
|
||||
|
||||
def build_subjects_prompt(max_subjects: int = _DEFAULT_MAX_SUBJECTS) -> str:
|
||||
"""Build subjects extraction prompt with a dynamic subject cap."""
|
||||
subject_word = "subject" if max_subjects == 1 else "subjects"
|
||||
return dedent(
|
||||
f"""
|
||||
What are the main / high level topics of the meeting.
|
||||
Do not include direct quotes or unnecessary details.
|
||||
Be concise and focused on the main ideas.
|
||||
A subject briefly mentioned should not be included.
|
||||
There should be maximum {max_subjects} {subject_word}.
|
||||
Do not write complete narrative sentences for the subject,
|
||||
you must write a concise subject using noun phrases.
|
||||
"""
|
||||
).strip()
|
||||
|
||||
|
||||
ACTION_ITEMS_PROMPT = dedent(
|
||||
"""
|
||||
@@ -145,7 +152,7 @@ class SubjectsResponse(BaseModel):
|
||||
"""Pydantic model for extracted subjects/topics"""
|
||||
|
||||
subjects: list[str] = Field(
|
||||
description="List of main subjects/topics discussed, maximum 6 items",
|
||||
description="List of main subjects/topics discussed",
|
||||
)
|
||||
|
||||
|
||||
@@ -345,11 +352,14 @@ class SummaryBuilder:
|
||||
# Summary
|
||||
# ----------------------------------------------------------------------------
|
||||
|
||||
async def extract_subjects(self) -> None:
|
||||
async def extract_subjects(self, max_subjects: int = _DEFAULT_MAX_SUBJECTS) -> None:
|
||||
"""Extract main subjects/topics from the transcript."""
|
||||
self.logger.info("--- extract main subjects using TreeSummarize")
|
||||
self.logger.info(
|
||||
"--- extract main subjects using TreeSummarize",
|
||||
max_subjects=max_subjects,
|
||||
)
|
||||
|
||||
subjects_prompt = SUBJECTS_PROMPT
|
||||
subjects_prompt = build_subjects_prompt(max_subjects)
|
||||
|
||||
try:
|
||||
response = await self._get_structured_response(
|
||||
@@ -358,7 +368,7 @@ class SummaryBuilder:
|
||||
tone_name="Meeting assistant that talk only as list item",
|
||||
)
|
||||
|
||||
self.subjects = response.subjects
|
||||
self.subjects = response.subjects[:max_subjects]
|
||||
self.logger.info(f"Extracted subjects: {self.subjects}")
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -333,7 +333,9 @@ if __name__ == "__main__":
|
||||
if not s3_urls:
|
||||
parser.error("At least one S3 URL required for multitrack processing")
|
||||
|
||||
from reflector.tools.cli_multitrack import process_multitrack_cli
|
||||
from reflector.tools.cli_multitrack import (
|
||||
process_multitrack_cli, # circular import
|
||||
)
|
||||
|
||||
asyncio.run(
|
||||
process_multitrack_cli(
|
||||
|
||||
@@ -5,6 +5,7 @@ This tools help to either create a pipeline from command line,
|
||||
or read a yaml description of a pipeline and run it.
|
||||
"""
|
||||
|
||||
import importlib
|
||||
import json
|
||||
|
||||
from reflector.logger import logger
|
||||
@@ -37,8 +38,6 @@ def get_jsonl(filename, filter_processor_name=None):
|
||||
|
||||
|
||||
def get_processor(name):
|
||||
import importlib
|
||||
|
||||
module_name = f"reflector.processors.{name}"
|
||||
class_name = snake_to_camel(name) + "Processor"
|
||||
module = importlib.import_module(module_name)
|
||||
|
||||
@@ -4,5 +4,67 @@ Shared transcript processing constants.
|
||||
Used by both Hatchet workflows and Celery pipelines for consistent processing.
|
||||
"""
|
||||
|
||||
# Topic detection: number of words per chunk for topic extraction
|
||||
import math
|
||||
|
||||
# Topic detection: legacy static chunk size, used as fallback
|
||||
TOPIC_CHUNK_WORD_COUNT = 300
|
||||
|
||||
# Dynamic chunking curve parameters
|
||||
# Formula: target_topics = _COEFFICIENT * duration_minutes ^ _EXPONENT
|
||||
# Derived from anchors: 5 min -> 3 topics, 180 min -> 40 topics
|
||||
_TOPIC_CURVE_COEFFICIENT = 0.833
|
||||
_TOPIC_CURVE_EXPONENT = 0.723
|
||||
_MIN_TOPICS = 2
|
||||
_MAX_TOPICS = 50
|
||||
_MIN_CHUNK_WORDS = 375
|
||||
_MAX_CHUNK_WORDS = 1500
|
||||
|
||||
|
||||
def compute_topic_chunk_size(duration_seconds: float, total_words: int) -> int:
|
||||
"""Calculate optimal chunk size for topic detection based on recording duration.
|
||||
|
||||
Uses a power-curve function to scale topic count sublinearly with duration,
|
||||
producing fewer LLM calls for longer recordings while maintaining topic quality.
|
||||
|
||||
Returns the number of words per chunk.
|
||||
"""
|
||||
if total_words <= 0 or duration_seconds <= 0:
|
||||
return _MIN_CHUNK_WORDS
|
||||
|
||||
duration_minutes = duration_seconds / 60.0
|
||||
target_topics = _TOPIC_CURVE_COEFFICIENT * math.pow(
|
||||
duration_minutes, _TOPIC_CURVE_EXPONENT
|
||||
)
|
||||
target_topics = int(round(max(_MIN_TOPICS, min(_MAX_TOPICS, target_topics))))
|
||||
|
||||
chunk_size = total_words // target_topics
|
||||
chunk_size = max(_MIN_CHUNK_WORDS, min(_MAX_CHUNK_WORDS, chunk_size))
|
||||
return chunk_size
|
||||
|
||||
|
||||
# Subject extraction: scale max subjects with recording duration
|
||||
# Short calls get fewer subjects to avoid over-analyzing trivial content
|
||||
_SUBJECT_DURATION_THRESHOLDS = [
|
||||
(5 * 60, 1), # ≤ 5 min → 1 subject
|
||||
(15 * 60, 2), # ≤ 15 min → 2 subjects
|
||||
(30 * 60, 3), # ≤ 30 min → 3 subjects
|
||||
(45 * 60, 4), # ≤ 45 min → 4 subjects
|
||||
(60 * 60, 5), # ≤ 60 min → 5 subjects
|
||||
]
|
||||
_MAX_SUBJECTS = 6
|
||||
|
||||
|
||||
def compute_max_subjects(duration_seconds: float) -> int:
|
||||
"""Calculate maximum number of subjects to extract based on recording duration.
|
||||
|
||||
Uses a step function: short recordings get fewer subjects to avoid
|
||||
generating excessive detail for trivial content.
|
||||
"""
|
||||
if duration_seconds <= 0:
|
||||
return 1
|
||||
|
||||
for threshold, max_subjects in _SUBJECT_DURATION_THRESHOLDS:
|
||||
if duration_seconds <= threshold:
|
||||
return max_subjects
|
||||
|
||||
return _MAX_SUBJECTS
|
||||
|
||||
@@ -15,6 +15,7 @@ from reflector.dailyco_api import (
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.logger import logger as _logger
|
||||
from reflector.settings import settings
|
||||
from reflector.storage import get_source_storage
|
||||
from reflector.video_platforms.factory import create_platform_client
|
||||
from reflector.worker.process import (
|
||||
poll_daily_room_presence_task,
|
||||
@@ -219,6 +220,30 @@ async def _handle_recording_ready(event: RecordingReadyEvent):
|
||||
|
||||
track_keys = [t.s3Key for t in tracks if t.type == "audio"]
|
||||
|
||||
# Delete video tracks when store_video is disabled (same pattern as LiveKit).
|
||||
# Only delete if we have a meeting AND store_video is explicitly false.
|
||||
# If no meeting found, leave files alone (can't confirm user intent).
|
||||
video_track_keys = [t.s3Key for t in tracks if t.type == "video"]
|
||||
if video_track_keys:
|
||||
meeting = await meetings_controller.get_by_room_name(room_name)
|
||||
if meeting is not None and not meeting.store_video:
|
||||
storage = get_source_storage("daily")
|
||||
for video_key in video_track_keys:
|
||||
try:
|
||||
await storage.delete_file(video_key)
|
||||
logger.info(
|
||||
"Deleted video track from raw-tracks recording",
|
||||
s3_key=video_key,
|
||||
room_name=room_name,
|
||||
)
|
||||
except Exception as e:
|
||||
# Non-critical — pipeline filters these out anyway
|
||||
logger.warning(
|
||||
"Failed to delete video track from raw-tracks recording",
|
||||
s3_key=video_key,
|
||||
error=str(e),
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Raw-tracks recording queuing processing",
|
||||
recording_id=recording_id,
|
||||
|
||||
@@ -17,6 +17,7 @@ from reflector.db.meetings import meetings_controller
|
||||
from reflector.livekit_api.webhooks import create_webhook_receiver, verify_webhook
|
||||
from reflector.logger import logger as _logger
|
||||
from reflector.settings import settings
|
||||
from reflector.storage import get_source_storage
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -189,8 +190,6 @@ async def _handle_egress_ended(event):
|
||||
filename = file_result.filename
|
||||
if filename and filename.endswith(".webm"):
|
||||
try:
|
||||
from reflector.storage import get_source_storage # noqa: PLC0415
|
||||
|
||||
storage = get_source_storage("livekit")
|
||||
await storage.delete_file(filename)
|
||||
logger.info(
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import logging
|
||||
import re
|
||||
import uuid
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from enum import Enum
|
||||
from typing import Annotated, Any, Literal, Optional
|
||||
@@ -14,7 +16,7 @@ from reflector.db import get_database
|
||||
from reflector.db.calendar_events import calendar_events_controller
|
||||
from reflector.db.meetings import meetings_controller
|
||||
from reflector.db.rooms import rooms_controller
|
||||
from reflector.redis_cache import RedisAsyncLock
|
||||
from reflector.redis_cache import RedisAsyncLock, get_async_redis_client
|
||||
from reflector.schemas.platform import Platform
|
||||
from reflector.services.ics_sync import ics_sync_service
|
||||
from reflector.utils.url import add_query_param
|
||||
@@ -45,6 +47,7 @@ class Room(BaseModel):
|
||||
platform: Platform
|
||||
skip_consent: bool = False
|
||||
email_transcript_to: str | None = None
|
||||
store_video: bool = False
|
||||
|
||||
|
||||
class RoomDetails(Room):
|
||||
@@ -75,6 +78,7 @@ class Meeting(BaseModel):
|
||||
platform: Platform
|
||||
daily_composed_video_s3_key: str | None = None
|
||||
daily_composed_video_duration: int | None = None
|
||||
store_video: bool = False
|
||||
|
||||
|
||||
class CreateRoom(BaseModel):
|
||||
@@ -95,6 +99,7 @@ class CreateRoom(BaseModel):
|
||||
platform: Platform
|
||||
skip_consent: bool = False
|
||||
email_transcript_to: str | None = None
|
||||
store_video: bool = False
|
||||
|
||||
|
||||
class UpdateRoom(BaseModel):
|
||||
@@ -115,6 +120,7 @@ class UpdateRoom(BaseModel):
|
||||
platform: Optional[Platform] = None
|
||||
skip_consent: Optional[bool] = None
|
||||
email_transcript_to: Optional[str] = None
|
||||
store_video: Optional[bool] = None
|
||||
|
||||
|
||||
class CreateRoomMeeting(BaseModel):
|
||||
@@ -257,6 +263,7 @@ async def rooms_create(
|
||||
platform=room.platform,
|
||||
skip_consent=room.skip_consent,
|
||||
email_transcript_to=room.email_transcript_to,
|
||||
store_video=room.store_video,
|
||||
)
|
||||
|
||||
|
||||
@@ -325,6 +332,7 @@ async def rooms_create_meeting(
|
||||
and meeting.recording_type == room.recording_type
|
||||
and meeting.recording_trigger == room.recording_trigger
|
||||
and meeting.platform == room.platform
|
||||
and meeting.store_video == room.store_video
|
||||
)
|
||||
if not settings_match:
|
||||
logger.info(
|
||||
@@ -600,9 +608,6 @@ async def rooms_join_meeting(
|
||||
meeting.room_url = add_query_param(meeting.room_url, "t", token)
|
||||
|
||||
elif meeting.platform == "livekit":
|
||||
import re
|
||||
import uuid
|
||||
|
||||
client = create_platform_client(meeting.platform)
|
||||
# Identity must be unique per participant to avoid S3 key collisions.
|
||||
# Format: {readable_name}-{short_uuid} ensures uniqueness even for same names.
|
||||
@@ -625,8 +630,6 @@ async def rooms_join_meeting(
|
||||
# Store identity → Reflector user_id mapping for the pipeline
|
||||
# (so TranscriptParticipant.user_id can be set correctly)
|
||||
if user_id:
|
||||
from reflector.redis_cache import get_async_redis_client # noqa: PLC0415
|
||||
|
||||
redis_client = await get_async_redis_client()
|
||||
mapping_key = f"livekit:participant_map:{meeting.room_name}"
|
||||
await redis_client.hset(mapping_key, participant_identity, user_id)
|
||||
|
||||
@@ -11,6 +11,7 @@ from reflector.events import subscribers_shutdown
|
||||
from reflector.logger import logger
|
||||
from reflector.pipelines.runner import PipelineRunner
|
||||
from reflector.settings import settings
|
||||
from reflector.webrtc_ports import resolve_webrtc_host, rewrite_sdp_host
|
||||
|
||||
sessions = []
|
||||
router = APIRouter()
|
||||
@@ -128,8 +129,6 @@ async def rtc_offer_base(
|
||||
|
||||
# Rewrite ICE candidate IPs when running behind Docker bridge networking
|
||||
if settings.WEBRTC_HOST:
|
||||
from reflector.webrtc_ports import resolve_webrtc_host, rewrite_sdp_host
|
||||
|
||||
host_ip = resolve_webrtc_host(settings.WEBRTC_HOST)
|
||||
sdp = rewrite_sdp_host(sdp, host_ip)
|
||||
|
||||
|
||||
@@ -4,6 +4,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request
|
||||
|
||||
import reflector.auth as auth
|
||||
from reflector.db.transcripts import transcripts_controller
|
||||
from reflector.pipelines.main_live_pipeline import PipelineMainLive
|
||||
|
||||
from .rtc_offer import RtcOffer, rtc_offer_base
|
||||
|
||||
@@ -28,8 +29,6 @@ async def transcript_record_webrtc(
|
||||
raise HTTPException(status_code=400, detail="Transcript is locked")
|
||||
|
||||
# create a pipeline runner
|
||||
from reflector.pipelines.main_live_pipeline import PipelineMainLive # noqa: PLC0415
|
||||
|
||||
pipeline_runner = PipelineMainLive(transcript_id=transcript_id)
|
||||
|
||||
# FIXME do not allow multiple recording at the same time
|
||||
|
||||
@@ -11,6 +11,8 @@ This allows running the server in Docker with bridge networking
|
||||
import asyncio
|
||||
import socket
|
||||
|
||||
import aioice.ice
|
||||
|
||||
from reflector.logger import logger
|
||||
|
||||
|
||||
@@ -36,9 +38,7 @@ def patch_aioice_port_range(min_port: int, max_port: int) -> None:
|
||||
Works by temporarily wrapping loop.create_datagram_endpoint() during
|
||||
aioice's get_component_candidates() to intercept bind(addr, 0) calls.
|
||||
"""
|
||||
import aioice.ice as _ice
|
||||
|
||||
_original = _ice.Connection.get_component_candidates
|
||||
_original = aioice.ice.Connection.get_component_candidates
|
||||
_state = {"next_port": min_port}
|
||||
|
||||
async def _patched_get_component_candidates(self, component, addresses, timeout=5):
|
||||
@@ -78,7 +78,7 @@ def patch_aioice_port_range(min_port: int, max_port: int) -> None:
|
||||
finally:
|
||||
loop.create_datagram_endpoint = _orig_create
|
||||
|
||||
_ice.Connection.get_component_candidates = _patched_get_component_candidates
|
||||
aioice.ice.Connection.get_component_candidates = _patched_get_component_candidates
|
||||
logger.info(
|
||||
"aioice patched for WebRTC port range",
|
||||
min_port=min_port,
|
||||
@@ -102,8 +102,6 @@ def rewrite_sdp_host(sdp: str, target_ip: str) -> str:
|
||||
Replace container-internal IPs in SDP with target_ip so that
|
||||
ICE candidates advertise a routable address.
|
||||
"""
|
||||
import aioice.ice
|
||||
|
||||
container_ips = aioice.ice.get_host_addresses(use_ipv4=True, use_ipv6=False)
|
||||
for ip in container_ips:
|
||||
if ip != "127.0.0.1" and ip != target_ip:
|
||||
|
||||
@@ -30,6 +30,8 @@ def build_beat_schedule(
|
||||
whereby_api_key=None,
|
||||
aws_process_recording_queue_url=None,
|
||||
daily_api_key=None,
|
||||
livekit_api_key=None,
|
||||
livekit_url=None,
|
||||
public_mode=False,
|
||||
public_data_retention_days=None,
|
||||
healthcheck_url=None,
|
||||
@@ -83,7 +85,7 @@ def build_beat_schedule(
|
||||
else:
|
||||
logger.info("Daily.co beat tasks disabled (no DAILY_API_KEY)")
|
||||
|
||||
_livekit_enabled = bool(settings.LIVEKIT_API_KEY and settings.LIVEKIT_URL)
|
||||
_livekit_enabled = bool(livekit_api_key and livekit_url)
|
||||
if _livekit_enabled:
|
||||
beat_schedule["process_livekit_ended_meetings"] = {
|
||||
"task": "reflector.worker.process.process_livekit_ended_meetings",
|
||||
@@ -175,6 +177,8 @@ else:
|
||||
whereby_api_key=settings.WHEREBY_API_KEY,
|
||||
aws_process_recording_queue_url=settings.AWS_PROCESS_RECORDING_QUEUE_URL,
|
||||
daily_api_key=settings.DAILY_API_KEY,
|
||||
livekit_api_key=settings.LIVEKIT_API_KEY,
|
||||
livekit_url=settings.LIVEKIT_URL,
|
||||
public_mode=settings.PUBLIC_MODE,
|
||||
public_data_retention_days=settings.PUBLIC_DATA_RETENTION_DAYS,
|
||||
healthcheck_url=settings.HEALTHCHECK_URL,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
@@ -26,16 +27,26 @@ from reflector.db.transcripts import (
|
||||
transcripts_controller,
|
||||
)
|
||||
from reflector.hatchet.client import HatchetClientManager
|
||||
from reflector.pipelines.topic_processing import EmptyPipeline
|
||||
from reflector.processors.audio_file_writer import AudioFileWriterProcessor
|
||||
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
|
||||
from reflector.redis_cache import RedisAsyncLock
|
||||
from reflector.settings import settings
|
||||
from reflector.storage import get_transcripts_storage
|
||||
from reflector.storage import get_source_storage, get_transcripts_storage
|
||||
from reflector.utils.daily import (
|
||||
DailyRoomName,
|
||||
extract_base_room_name,
|
||||
filter_cam_audio_tracks,
|
||||
recording_lock_key,
|
||||
)
|
||||
from reflector.utils.livekit import (
|
||||
extract_livekit_base_room_name,
|
||||
filter_audio_tracks,
|
||||
parse_livekit_track_filepath,
|
||||
)
|
||||
from reflector.utils.livekit import (
|
||||
recording_lock_key as livekit_recording_lock_key,
|
||||
)
|
||||
from reflector.utils.string import NonEmptyString
|
||||
from reflector.video_platforms.factory import create_platform_client
|
||||
from reflector.video_platforms.whereby_utils import (
|
||||
@@ -562,6 +573,15 @@ async def store_cloud_recording(
|
||||
)
|
||||
return False
|
||||
|
||||
if not meeting.store_video:
|
||||
logger.info(
|
||||
f"Cloud recording ({source}): skipped, store_video=false",
|
||||
recording_id=recording_id,
|
||||
room_name=room_name,
|
||||
meeting_id=meeting.id,
|
||||
)
|
||||
return False
|
||||
|
||||
success = await meetings_controller.set_cloud_recording_if_missing(
|
||||
meeting_id=meeting.id,
|
||||
s3_key=s3_key,
|
||||
@@ -923,11 +943,6 @@ async def convert_audio_and_waveform(transcript) -> None:
|
||||
transcript_id=transcript.id,
|
||||
)
|
||||
|
||||
from reflector.pipelines.topic_processing import EmptyPipeline # noqa: PLC0415
|
||||
from reflector.processors.audio_file_writer import (
|
||||
AudioFileWriterProcessor, # noqa: PLC0415
|
||||
)
|
||||
|
||||
upload_path = transcript.data_path / "upload.webm"
|
||||
mp3_path = transcript.audio_mp3_filename
|
||||
|
||||
@@ -1206,17 +1221,13 @@ async def process_livekit_multitrack(
|
||||
Tracks are discovered via S3 listing (source of truth), not webhooks.
|
||||
Called from room_finished webhook (fast-path) or beat task (fallback).
|
||||
"""
|
||||
from reflector.utils.livekit import ( # noqa: PLC0415
|
||||
recording_lock_key,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Processing LiveKit multitrack recording",
|
||||
room_name=room_name,
|
||||
meeting_id=meeting_id,
|
||||
)
|
||||
|
||||
lock_key = recording_lock_key(room_name)
|
||||
lock_key = livekit_recording_lock_key(room_name)
|
||||
async with RedisAsyncLock(
|
||||
key=lock_key,
|
||||
timeout=600,
|
||||
@@ -1243,19 +1254,10 @@ async def _process_livekit_multitrack_inner(
|
||||
# 1. Discover tracks by listing S3 prefix.
|
||||
# Wait briefly for egress files to finish flushing to S3 — the room_finished
|
||||
# webhook fires after empty_timeout, but egress finalization may still be in progress.
|
||||
import asyncio as _asyncio # noqa: PLC0415
|
||||
|
||||
from reflector.storage import get_source_storage # noqa: PLC0415
|
||||
from reflector.utils.livekit import ( # noqa: PLC0415
|
||||
extract_livekit_base_room_name,
|
||||
filter_audio_tracks,
|
||||
parse_livekit_track_filepath,
|
||||
)
|
||||
|
||||
EGRESS_FLUSH_DELAY = 10 # seconds — egress typically flushes within a few seconds
|
||||
EGRESS_RETRY_DELAY = 30 # seconds — retry if first listing finds nothing
|
||||
|
||||
await _asyncio.sleep(EGRESS_FLUSH_DELAY)
|
||||
await asyncio.sleep(EGRESS_FLUSH_DELAY)
|
||||
|
||||
storage = get_source_storage("livekit")
|
||||
s3_prefix = f"livekit/{room_name}/"
|
||||
@@ -1271,7 +1273,7 @@ async def _process_livekit_multitrack_inner(
|
||||
room_name=room_name,
|
||||
retry_delay=EGRESS_RETRY_DELAY,
|
||||
)
|
||||
await _asyncio.sleep(EGRESS_RETRY_DELAY)
|
||||
await asyncio.sleep(EGRESS_RETRY_DELAY)
|
||||
all_keys = await storage.list_objects(prefix=s3_prefix)
|
||||
audio_keys = filter_audio_tracks(all_keys) if all_keys else []
|
||||
|
||||
@@ -1290,7 +1292,7 @@ async def _process_livekit_multitrack_inner(
|
||||
expected=expected_audio,
|
||||
found=len(audio_keys),
|
||||
)
|
||||
await _asyncio.sleep(EGRESS_RETRY_DELAY)
|
||||
await asyncio.sleep(EGRESS_RETRY_DELAY)
|
||||
all_keys = await storage.list_objects(prefix=s3_prefix)
|
||||
audio_keys = filter_audio_tracks(all_keys) if all_keys else []
|
||||
|
||||
|
||||
@@ -32,6 +32,10 @@ DAILY_TASKS = {
|
||||
"trigger_daily_reconciliation",
|
||||
"reprocess_failed_daily_recordings",
|
||||
}
|
||||
LIVEKIT_TASKS = {
|
||||
"process_livekit_ended_meetings",
|
||||
"reprocess_failed_livekit_recordings",
|
||||
}
|
||||
PLATFORM_TASKS = {
|
||||
"process_meetings",
|
||||
"sync_all_ics_calendars",
|
||||
@@ -47,6 +51,7 @@ class TestNoPlatformConfigured:
|
||||
task_names = set(schedule.keys())
|
||||
assert not task_names & WHEREBY_TASKS
|
||||
assert not task_names & DAILY_TASKS
|
||||
assert not task_names & LIVEKIT_TASKS
|
||||
assert not task_names & PLATFORM_TASKS
|
||||
|
||||
def test_only_healthcheck_disabled_warning(self):
|
||||
@@ -72,6 +77,7 @@ class TestWherebyOnly:
|
||||
assert WHEREBY_TASKS <= task_names
|
||||
assert PLATFORM_TASKS <= task_names
|
||||
assert not task_names & DAILY_TASKS
|
||||
assert not task_names & LIVEKIT_TASKS
|
||||
|
||||
def test_whereby_sqs_url(self):
|
||||
schedule = build_beat_schedule(
|
||||
@@ -81,6 +87,7 @@ class TestWherebyOnly:
|
||||
assert WHEREBY_TASKS <= task_names
|
||||
assert PLATFORM_TASKS <= task_names
|
||||
assert not task_names & DAILY_TASKS
|
||||
assert not task_names & LIVEKIT_TASKS
|
||||
|
||||
def test_whereby_task_count(self):
|
||||
schedule = build_beat_schedule(whereby_api_key="test-key")
|
||||
@@ -97,6 +104,7 @@ class TestDailyOnly:
|
||||
assert DAILY_TASKS <= task_names
|
||||
assert PLATFORM_TASKS <= task_names
|
||||
assert not task_names & WHEREBY_TASKS
|
||||
assert not task_names & LIVEKIT_TASKS
|
||||
|
||||
def test_daily_task_count(self):
|
||||
schedule = build_beat_schedule(daily_api_key="test-daily-key")
|
||||
@@ -104,6 +112,33 @@ class TestDailyOnly:
|
||||
assert len(schedule) == 6
|
||||
|
||||
|
||||
class TestLiveKitOnly:
|
||||
"""When only LiveKit is configured."""
|
||||
|
||||
def test_livekit_keys(self):
|
||||
schedule = build_beat_schedule(
|
||||
livekit_api_key="test-lk-key", livekit_url="ws://livekit:7880"
|
||||
)
|
||||
task_names = set(schedule.keys())
|
||||
assert LIVEKIT_TASKS <= task_names
|
||||
assert PLATFORM_TASKS <= task_names
|
||||
assert not task_names & WHEREBY_TASKS
|
||||
assert not task_names & DAILY_TASKS
|
||||
|
||||
def test_livekit_task_count(self):
|
||||
schedule = build_beat_schedule(
|
||||
livekit_api_key="test-lk-key", livekit_url="ws://livekit:7880"
|
||||
)
|
||||
# LiveKit (2) + Platform (3) = 5
|
||||
assert len(schedule) == 5
|
||||
|
||||
def test_livekit_needs_both_key_and_url(self):
|
||||
schedule_key_only = build_beat_schedule(livekit_api_key="test-lk-key")
|
||||
schedule_url_only = build_beat_schedule(livekit_url="ws://livekit:7880")
|
||||
assert not set(schedule_key_only.keys()) & LIVEKIT_TASKS
|
||||
assert not set(schedule_url_only.keys()) & LIVEKIT_TASKS
|
||||
|
||||
|
||||
class TestBothPlatforms:
|
||||
"""When both Whereby and Daily.co are configured."""
|
||||
|
||||
|
||||
99
server/tests/test_topic_chunking.py
Normal file
99
server/tests/test_topic_chunking.py
Normal file
@@ -0,0 +1,99 @@
|
||||
import math
|
||||
|
||||
import pytest
|
||||
|
||||
from reflector.utils.transcript_constants import (
|
||||
compute_max_subjects,
|
||||
compute_topic_chunk_size,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"duration_min,total_words,expected_topics_range",
|
||||
[
|
||||
(5, 750, (1, 3)),
|
||||
(10, 1500, (3, 6)),
|
||||
(30, 4500, (8, 14)),
|
||||
(60, 9000, (14, 22)),
|
||||
(120, 18000, (24, 35)),
|
||||
(180, 27000, (30, 42)),
|
||||
],
|
||||
)
|
||||
def test_topic_count_in_expected_range(
|
||||
duration_min, total_words, expected_topics_range
|
||||
):
|
||||
chunk_size = compute_topic_chunk_size(duration_min * 60, total_words)
|
||||
num_topics = math.ceil(total_words / chunk_size)
|
||||
assert expected_topics_range[0] <= num_topics <= expected_topics_range[1], (
|
||||
f"For {duration_min}min/{total_words}words: got {num_topics} topics "
|
||||
f"(chunk_size={chunk_size}), expected {expected_topics_range[0]}-{expected_topics_range[1]}"
|
||||
)
|
||||
|
||||
|
||||
def test_chunk_size_within_bounds():
|
||||
for duration_min in [5, 10, 30, 60, 120, 180]:
|
||||
chunk_size = compute_topic_chunk_size(duration_min * 60, duration_min * 150)
|
||||
assert (
|
||||
375 <= chunk_size <= 1500
|
||||
), f"For {duration_min}min: chunk_size={chunk_size} out of bounds [375, 1500]"
|
||||
|
||||
|
||||
def test_zero_duration_falls_back():
|
||||
assert compute_topic_chunk_size(0, 1000) == 375
|
||||
|
||||
|
||||
def test_zero_words_falls_back():
|
||||
assert compute_topic_chunk_size(600, 0) == 375
|
||||
|
||||
|
||||
def test_negative_inputs_fall_back():
|
||||
assert compute_topic_chunk_size(-10, 1000) == 375
|
||||
assert compute_topic_chunk_size(600, -5) == 375
|
||||
|
||||
|
||||
def test_very_short_transcript():
|
||||
"""A 1-minute call with very few words should still produce at least 1 topic."""
|
||||
chunk_size = compute_topic_chunk_size(60, 100)
|
||||
# chunk_size is at least 375, so 100 words = 1 chunk
|
||||
assert chunk_size >= 375
|
||||
|
||||
|
||||
def test_very_long_transcript():
|
||||
"""A 4-hour call should cap at max topics."""
|
||||
chunk_size = compute_topic_chunk_size(4 * 3600, 36000)
|
||||
num_topics = math.ceil(36000 / chunk_size)
|
||||
assert num_topics <= 50
|
||||
|
||||
|
||||
# --- compute_max_subjects tests ---
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"duration_seconds,expected_max",
|
||||
[
|
||||
(0, 1), # zero/invalid → 1
|
||||
(-10, 1), # negative → 1
|
||||
(60, 1), # 1 min → 1
|
||||
(120, 1), # 2 min → 1
|
||||
(300, 1), # 5 min (boundary) → 1
|
||||
(301, 2), # just over 5 min → 2
|
||||
(900, 2), # 15 min (boundary) → 2
|
||||
(901, 3), # just over 15 min → 3
|
||||
(1800, 3), # 30 min (boundary) → 3
|
||||
(1801, 4), # just over 30 min → 4
|
||||
(2700, 4), # 45 min (boundary) → 4
|
||||
(2701, 5), # just over 45 min → 5
|
||||
(3600, 5), # 60 min (boundary) → 5
|
||||
(3601, 6), # just over 60 min → 6
|
||||
(7200, 6), # 2 hours → 6
|
||||
(14400, 6), # 4 hours → 6
|
||||
],
|
||||
)
|
||||
def test_max_subjects_scales_with_duration(duration_seconds, expected_max):
|
||||
assert compute_max_subjects(duration_seconds) == expected_max
|
||||
|
||||
|
||||
def test_max_subjects_never_exceeds_cap():
|
||||
"""Even very long recordings should cap at 6 subjects."""
|
||||
for hours in range(1, 10):
|
||||
assert compute_max_subjects(hours * 3600) <= 6
|
||||
@@ -95,6 +95,7 @@ const roomInitialState = {
|
||||
platform: "whereby",
|
||||
skipConsent: false,
|
||||
emailTranscriptTo: "",
|
||||
storeVideo: false,
|
||||
};
|
||||
|
||||
export default function RoomsList() {
|
||||
@@ -185,6 +186,7 @@ export default function RoomsList() {
|
||||
platform: detailedEditedRoom.platform,
|
||||
skipConsent: detailedEditedRoom.skip_consent || false,
|
||||
emailTranscriptTo: detailedEditedRoom.email_transcript_to || "",
|
||||
storeVideo: detailedEditedRoom.store_video || false,
|
||||
}
|
||||
: null,
|
||||
[detailedEditedRoom],
|
||||
@@ -335,6 +337,7 @@ export default function RoomsList() {
|
||||
platform,
|
||||
skip_consent: room.skipConsent,
|
||||
email_transcript_to: room.emailTranscriptTo || null,
|
||||
store_video: room.storeVideo,
|
||||
};
|
||||
|
||||
if (isEditing) {
|
||||
@@ -400,6 +403,7 @@ export default function RoomsList() {
|
||||
platform: roomData.platform,
|
||||
skipConsent: roomData.skip_consent || false,
|
||||
emailTranscriptTo: roomData.email_transcript_to || "",
|
||||
storeVideo: roomData.store_video || false,
|
||||
});
|
||||
setEditRoomId(roomId);
|
||||
setIsEditing(true);
|
||||
@@ -842,6 +846,38 @@ export default function RoomsList() {
|
||||
</Field.HelperText>
|
||||
</Field.Root>
|
||||
)}
|
||||
{room.platform === "daily" &&
|
||||
room.recordingType === "cloud" && (
|
||||
<Field.Root mt={4}>
|
||||
<Checkbox.Root
|
||||
name="storeVideo"
|
||||
checked={room.storeVideo}
|
||||
onCheckedChange={(e) => {
|
||||
const syntheticEvent = {
|
||||
target: {
|
||||
name: "storeVideo",
|
||||
type: "checkbox",
|
||||
checked: e.checked,
|
||||
},
|
||||
};
|
||||
handleRoomChange(syntheticEvent);
|
||||
}}
|
||||
>
|
||||
<Checkbox.HiddenInput />
|
||||
<Checkbox.Control>
|
||||
<Checkbox.Indicator />
|
||||
</Checkbox.Control>
|
||||
<Checkbox.Label>
|
||||
Store video recording
|
||||
</Checkbox.Label>
|
||||
</Checkbox.Root>
|
||||
<Field.HelperText>
|
||||
When enabled, a composed video recording will be
|
||||
saved alongside audio. Disabling saves significant
|
||||
storage.
|
||||
</Field.HelperText>
|
||||
</Field.Root>
|
||||
)}
|
||||
</Tabs.Content>
|
||||
|
||||
<Tabs.Content value="share" pt={6}>
|
||||
|
||||
@@ -267,12 +267,13 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
||||
|
||||
const handleFrameJoinMeeting = useCallback(() => {
|
||||
if (meeting.recording_type === "cloud") {
|
||||
console.log("Starting dual recording via REST API", {
|
||||
console.log("Starting recording via REST API", {
|
||||
cloudInstanceId,
|
||||
rawTracksInstanceId,
|
||||
storeVideo: meeting.store_video,
|
||||
});
|
||||
|
||||
// Start both cloud and raw-tracks via backend REST API (with retry on 404)
|
||||
// Start recordings via backend REST API (with retry on 404)
|
||||
// Daily.co needs time to register call as "hosting" for REST API
|
||||
const startRecordingWithRetry = (
|
||||
type: DailyRecordingType,
|
||||
@@ -320,12 +321,17 @@ export default function DailyRoom({ meeting, room }: DailyRoomProps) {
|
||||
}, RECORDING_START_DELAY_MS);
|
||||
};
|
||||
|
||||
// Start both recordings
|
||||
startRecordingWithRetry("cloud", cloudInstanceId);
|
||||
// Always start raw-tracks (needed for transcription pipeline)
|
||||
startRecordingWithRetry("raw-tracks", rawTracksInstanceId);
|
||||
|
||||
// Only start cloud (composed video) if store_video is enabled
|
||||
if (meeting.store_video) {
|
||||
startRecordingWithRetry("cloud", cloudInstanceId);
|
||||
}
|
||||
}
|
||||
}, [
|
||||
meeting.recording_type,
|
||||
meeting.store_video,
|
||||
meeting.id,
|
||||
startRecordingMutation,
|
||||
cloudInstanceId,
|
||||
|
||||
22
www/app/reflector-api.d.ts
vendored
22
www/app/reflector-api.d.ts
vendored
@@ -1134,6 +1134,11 @@ export interface components {
|
||||
skip_consent: boolean;
|
||||
/** Email Transcript To */
|
||||
email_transcript_to?: string | null;
|
||||
/**
|
||||
* Store Video
|
||||
* @default false
|
||||
*/
|
||||
store_video: boolean;
|
||||
};
|
||||
/** CreateRoomMeeting */
|
||||
CreateRoomMeeting: {
|
||||
@@ -1852,6 +1857,11 @@ export interface components {
|
||||
daily_composed_video_s3_key?: string | null;
|
||||
/** Daily Composed Video Duration */
|
||||
daily_composed_video_duration?: number | null;
|
||||
/**
|
||||
* Store Video
|
||||
* @default false
|
||||
*/
|
||||
store_video: boolean;
|
||||
};
|
||||
/** MeetingConsentRequest */
|
||||
MeetingConsentRequest: {
|
||||
@@ -1955,6 +1965,11 @@ export interface components {
|
||||
skip_consent: boolean;
|
||||
/** Email Transcript To */
|
||||
email_transcript_to?: string | null;
|
||||
/**
|
||||
* Store Video
|
||||
* @default false
|
||||
*/
|
||||
store_video: boolean;
|
||||
};
|
||||
/** RoomDetails */
|
||||
RoomDetails: {
|
||||
@@ -2013,6 +2028,11 @@ export interface components {
|
||||
skip_consent: boolean;
|
||||
/** Email Transcript To */
|
||||
email_transcript_to?: string | null;
|
||||
/**
|
||||
* Store Video
|
||||
* @default false
|
||||
*/
|
||||
store_video: boolean;
|
||||
/** Webhook Url */
|
||||
webhook_url: string | null;
|
||||
/** Webhook Secret */
|
||||
@@ -2389,6 +2409,8 @@ export interface components {
|
||||
skip_consent?: boolean | null;
|
||||
/** Email Transcript To */
|
||||
email_transcript_to?: string | null;
|
||||
/** Store Video */
|
||||
store_video?: boolean | null;
|
||||
};
|
||||
/** UpdateTranscript */
|
||||
UpdateTranscript: {
|
||||
|
||||
Reference in New Issue
Block a user