fix: inline imports

This commit is contained in:
Juan
2026-04-08 17:52:38 -05:00
parent ee8db36f2c
commit 4d4c4b8650
19 changed files with 104 additions and 218 deletions

View File

@@ -201,4 +201,4 @@ If you need to do any worker/pipeline related work, search for "Pipeline" classe
## Code Style
- Always put imports at the top of the file. Let ruff/pre-commit handle sorting and formatting of imports.
- Exception: In Hatchet pipeline task functions, DB controller imports (e.g., `transcripts_controller`, `meetings_controller`) stay as deferred/inline imports inside `fresh_db_connection()` blocks — this is intentional to avoid sharing DB connections across forked processes. Non-DB imports (utilities, services) should still go at the top of the file.
- The **only** imports allowed to remain inline are from `reflector.db.*` modules (e.g., `reflector.db.transcripts`, `reflector.db.meetings`, `reflector.db.recordings`, `reflector.db.rooms`). These stay as deferred/inline imports inside `fresh_db_connection()` blocks in Hatchet pipeline task functions — this is intentional to avoid sharing DB connections across forked processes. All other imports (utilities, services, processors, storage, third-party libs) **must** go at the top of the file, even in Hatchet workflows.

View File

@@ -18,10 +18,11 @@ import json
import tempfile
import time
from contextlib import asynccontextmanager
from datetime import timedelta
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Callable, Coroutine, Protocol, TypeVar
import databases
import httpx
from hatchet_sdk import (
ConcurrencyExpression,
@@ -83,6 +84,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
topic_chunk_workflow,
)
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
from reflector.llm import LLM
from reflector.logger import logger
from reflector.pipelines import topic_processing
from reflector.processors.audio_mixdown_auto import AudioMixdownAutoProcessor
@@ -95,7 +97,9 @@ from reflector.processors.summary.prompts import (
from reflector.processors.summary.summary_builder import SummaryBuilder
from reflector.processors.types import TitleSummary, Word
from reflector.processors.types import Transcript as TranscriptType
from reflector.redis_cache import get_async_redis_client
from reflector.settings import settings
from reflector.storage import get_source_storage, get_transcripts_storage
from reflector.utils.audio_constants import (
PRESIGNED_URL_EXPIRATION_SECONDS,
WAVEFORM_SEGMENTS,
@@ -105,11 +109,16 @@ from reflector.utils.daily import (
filter_cam_audio_tracks,
parse_daily_recording_filename,
)
from reflector.utils.livekit import parse_livekit_track_filepath
from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty
from reflector.utils.transcript_constants import (
compute_max_subjects,
compute_topic_chunk_size,
)
from reflector.utils.webhook import (
fetch_transcript_webhook_payload,
send_webhook_request,
)
from reflector.zulip import post_transcript_notification
@@ -138,8 +147,6 @@ async def fresh_db_connection():
The real fix would be making the db module fork-aware instead of bypassing it.
Current pattern is acceptable given Hatchet's process model.
"""
import databases # noqa: PLC0415
from reflector.db import _database_context # noqa: PLC0415
_database_context.set(None)
@@ -176,8 +183,6 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool:
def _spawn_storage():
"""Create fresh storage instance for writing to our transcript bucket."""
from reflector.storage import get_transcripts_storage # noqa: PLC0415
return get_transcripts_storage()
@@ -391,10 +396,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
if input.source_platform == "livekit":
# LiveKit: participant identity is in the track dict or can be parsed from filepath
from reflector.utils.livekit import (
parse_livekit_track_filepath, # noqa: PLC0415
)
# Look up identity → Reflector user_id mapping from Redis
# (stored at join time in rooms.py)
identity_to_user_id: dict[str, str] = {}
@@ -402,9 +403,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
from reflector.db.meetings import (
meetings_controller as mc, # noqa: PLC0415
)
from reflector.redis_cache import (
get_async_redis_client, # noqa: PLC0415
)
meeting = (
await mc.get_by_id(transcript.meeting_id)
@@ -546,12 +544,6 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
# OGG files don't have embedded start_time metadata, so we pre-calculate.
track_padding: dict[int, float] = {}
if input.source_platform == "livekit":
from datetime import datetime # noqa: PLC0415
from reflector.utils.livekit import (
parse_livekit_track_filepath, # noqa: PLC0415
)
timestamps = []
for i, track in enumerate(input.tracks):
ts_str = track.get("timestamp")
@@ -1073,10 +1065,9 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
participant_name_to_id={},
)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
# Deferred DB import: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections across forks
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.llm import LLM # noqa: PLC0415
async with fresh_db_connection():
transcript = await transcripts_controller.get_by_id(input.transcript_id)
@@ -1206,14 +1197,13 @@ async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
subjects_result = ctx.task_output(extract_subjects)
process_result = ctx.task_output(process_subjects)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
# Deferred DB import: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections across forks
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptFinalLongSummary,
TranscriptFinalShortSummary,
transcripts_controller,
)
from reflector.llm import LLM # noqa: PLC0415
subject_summaries = process_result.subject_summaries
@@ -1302,13 +1292,12 @@ async def identify_action_items(
ctx.log("identify_action_items: no transcript text, returning empty")
return ActionItemsResult(action_items=ActionItemsResponse())
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections and LLM HTTP pools across forks
# Deferred DB import: Hatchet workers fork processes, fresh imports avoid
# sharing DB connections across forks
from reflector.db.transcripts import ( # noqa: PLC0415
TranscriptActionItems,
transcripts_controller,
)
from reflector.llm import LLM # noqa: PLC0415
# TODO: refactor SummaryBuilder methods into standalone functions
llm = LLM(settings=settings)
@@ -1445,10 +1434,6 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
)
from reflector.db.recordings import recordings_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.storage import ( # noqa: PLC0415
get_source_storage,
get_transcripts_storage,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
@@ -1597,10 +1582,6 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
async with fresh_db_connection():
from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.utils.webhook import ( # noqa: PLC0415
fetch_transcript_webhook_payload,
send_webhook_request,
)
room = await rooms_controller.get_by_id(input.room_id)
if not room or not room.webhook_url:

View File

@@ -15,6 +15,8 @@ import json
from datetime import timedelta
from pathlib import Path
import av
import httpx
from hatchet_sdk import Context
from pydantic import BaseModel
@@ -47,9 +49,30 @@ from reflector.hatchet.workflows.models import (
)
from reflector.logger import logger
from reflector.pipelines import topic_processing
from reflector.pipelines.transcription_helpers import transcribe_file_with_processor
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.file_diarization import FileDiarizationInput
from reflector.processors.file_diarization_auto import FileDiarizationAutoProcessor
from reflector.processors.transcript_diarization_assembler import (
TranscriptDiarizationAssemblerInput,
TranscriptDiarizationAssemblerProcessor,
)
from reflector.processors.types import (
DiarizationSegment,
Word,
)
from reflector.processors.types import (
Transcript as TranscriptType,
)
from reflector.settings import settings
from reflector.storage import get_source_storage, get_transcripts_storage
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
from reflector.utils.audio_waveform import get_audio_waveform
from reflector.utils.webhook import (
fetch_transcript_webhook_payload,
send_webhook_request,
)
from reflector.zulip import post_transcript_notification
class FilePipelineInput(BaseModel):
@@ -135,10 +158,6 @@ async def extract_audio(input: FilePipelineInput, ctx: Context) -> ExtractAudioR
ctx.log(f"extract_audio: processing {audio_file}")
# Extract audio and write as MP3
import av # noqa: PLC0415
from reflector.processors import AudioFileWriterProcessor # noqa: PLC0415
duration_ms_container = [0.0]
async def capture_duration(d):
@@ -189,8 +208,6 @@ async def upload_audio(input: FilePipelineInput, ctx: Context) -> UploadAudioRes
extract_result = ctx.task_output(extract_audio)
audio_path = extract_result.audio_path
from reflector.storage import get_transcripts_storage # noqa: PLC0415
storage = get_transcripts_storage()
if not storage:
raise ValueError(
@@ -232,10 +249,6 @@ async def transcribe(input: FilePipelineInput, ctx: Context) -> TranscribeResult
raise ValueError(f"Transcript {input.transcript_id} not found")
source_language = transcript.source_language
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
transcribe_file_with_processor,
)
result = await transcribe_file_with_processor(audio_url, source_language)
ctx.log(f"transcribe complete: {len(result.words)} words")
@@ -264,13 +277,6 @@ async def diarize(input: FilePipelineInput, ctx: Context) -> DiarizeResult:
upload_result = ctx.task_output(upload_audio)
audio_url = upload_result.audio_url
from reflector.processors.file_diarization import ( # noqa: PLC0415
FileDiarizationInput,
)
from reflector.processors.file_diarization_auto import ( # noqa: PLC0415
FileDiarizationAutoProcessor,
)
processor = FileDiarizationAutoProcessor()
input_data = FileDiarizationInput(audio_url=audio_url)
@@ -353,18 +359,6 @@ async def assemble_transcript(
transcribe_result = ctx.task_output(transcribe)
diarize_result = ctx.task_output(diarize)
from reflector.processors.transcript_diarization_assembler import ( # noqa: PLC0415
TranscriptDiarizationAssemblerInput,
TranscriptDiarizationAssemblerProcessor,
)
from reflector.processors.types import ( # noqa: PLC0415
DiarizationSegment,
Word,
)
from reflector.processors.types import ( # noqa: PLC0415
Transcript as TranscriptType,
)
words = [Word(**w) for w in transcribe_result.words]
transcript_data = TranscriptType(
words=words, translation=transcribe_result.translation
@@ -437,17 +431,6 @@ async def detect_topics(input: FilePipelineInput, ctx: Context) -> TopicsResult:
TranscriptTopic,
transcripts_controller,
)
from reflector.processors.transcript_diarization_assembler import ( # noqa: PLC0415
TranscriptDiarizationAssemblerInput,
TranscriptDiarizationAssemblerProcessor,
)
from reflector.processors.types import ( # noqa: PLC0415
DiarizationSegment,
Word,
)
from reflector.processors.types import ( # noqa: PLC0415
Transcript as TranscriptType,
)
words = [Word(**w) for w in transcribe_result.words]
transcript_data = TranscriptType(
@@ -688,10 +671,6 @@ async def cleanup_consent(input: FilePipelineInput, ctx: Context) -> ConsentResu
)
from reflector.db.recordings import recordings_controller # noqa: PLC0415
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.storage import ( # noqa: PLC0415
get_source_storage,
get_transcripts_storage,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
@@ -807,7 +786,6 @@ async def post_zulip(input: FilePipelineInput, ctx: Context) -> ZulipResult:
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.zulip import post_transcript_notification # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
@@ -837,10 +815,6 @@ async def send_webhook(input: FilePipelineInput, ctx: Context) -> WebhookResult:
async with fresh_db_connection():
from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.utils.webhook import ( # noqa: PLC0415
fetch_transcript_webhook_payload,
send_webhook_request,
)
room = await rooms_controller.get_by_id(input.room_id)
if not room or not room.webhook_url:
@@ -856,8 +830,6 @@ async def send_webhook(input: FilePipelineInput, ctx: Context) -> WebhookResult:
ctx.log(f"send_webhook skipped (could not build payload): {payload}")
return WebhookResult(webhook_sent=False, skipped=True)
import httpx # noqa: PLC0415
try:
response = await send_webhook_request(
url=room.webhook_url,

View File

@@ -14,6 +14,7 @@ are not shared across forks, avoiding connection pooling issues.
from datetime import timedelta
import httpx
from hatchet_sdk import Context
from pydantic import BaseModel
@@ -40,7 +41,24 @@ from reflector.hatchet.workflows.models import (
ZulipResult,
)
from reflector.logger import logger
from reflector.pipelines.main_live_pipeline import (
PipelineMainTitle,
PipelineMainWaveform,
pipeline_convert_to_mp3,
pipeline_diarization,
pipeline_remove_upload,
pipeline_summaries,
pipeline_upload_mp3,
)
from reflector.pipelines.main_live_pipeline import (
cleanup_consent as _cleanup_consent,
)
from reflector.settings import settings
from reflector.utils.webhook import (
fetch_transcript_webhook_payload,
send_webhook_request,
)
from reflector.zulip import post_transcript_notification
class LivePostPipelineInput(BaseModel):
@@ -91,9 +109,6 @@ async def waveform(input: LivePostPipelineInput, ctx: Context) -> WaveformResult
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
PipelineMainWaveform,
)
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if not transcript:
@@ -118,10 +133,6 @@ async def generate_title(input: LivePostPipelineInput, ctx: Context) -> TitleRes
ctx.log(f"generate_title: starting for transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
PipelineMainTitle,
)
runner = PipelineMainTitle(transcript_id=input.transcript_id)
await runner.run()
@@ -142,10 +153,6 @@ async def convert_mp3(input: LivePostPipelineInput, ctx: Context) -> ConvertMp3R
ctx.log(f"convert_mp3: starting for transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
pipeline_convert_to_mp3,
)
await pipeline_convert_to_mp3(transcript_id=input.transcript_id)
ctx.log("convert_mp3 complete")
@@ -165,10 +172,6 @@ async def upload_mp3(input: LivePostPipelineInput, ctx: Context) -> UploadMp3Res
ctx.log(f"upload_mp3: starting for transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
pipeline_upload_mp3,
)
await pipeline_upload_mp3(transcript_id=input.transcript_id)
ctx.log("upload_mp3 complete")
@@ -190,10 +193,6 @@ async def remove_upload(
ctx.log(f"remove_upload: starting for transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
pipeline_remove_upload,
)
await pipeline_remove_upload(transcript_id=input.transcript_id)
ctx.log("remove_upload complete")
@@ -213,10 +212,6 @@ async def diarize(input: LivePostPipelineInput, ctx: Context) -> DiarizeResult:
ctx.log(f"diarize: starting for transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
pipeline_diarization,
)
await pipeline_diarization(transcript_id=input.transcript_id)
ctx.log("diarize complete")
@@ -236,10 +231,6 @@ async def cleanup_consent(input: LivePostPipelineInput, ctx: Context) -> Consent
ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
cleanup_consent as _cleanup_consent,
)
await _cleanup_consent(transcript_id=input.transcript_id)
ctx.log("cleanup_consent complete")
@@ -261,10 +252,6 @@ async def final_summaries(
ctx.log(f"final_summaries: starting for transcript_id={input.transcript_id}")
async with fresh_db_connection():
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
pipeline_summaries,
)
await pipeline_summaries(transcript_id=input.transcript_id)
ctx.log("final_summaries complete")
@@ -289,7 +276,6 @@ async def post_zulip(input: LivePostPipelineInput, ctx: Context) -> ZulipResult:
async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
from reflector.zulip import post_transcript_notification # noqa: PLC0415
transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript:
@@ -319,10 +305,6 @@ async def send_webhook(input: LivePostPipelineInput, ctx: Context) -> WebhookRes
async with fresh_db_connection():
from reflector.db.rooms import rooms_controller # noqa: PLC0415
from reflector.utils.webhook import ( # noqa: PLC0415
fetch_transcript_webhook_payload,
send_webhook_request,
)
room = await rooms_controller.get_by_id(input.room_id)
if not room or not room.webhook_url:
@@ -338,8 +320,6 @@ async def send_webhook(input: LivePostPipelineInput, ctx: Context) -> WebhookRes
ctx.log(f"send_webhook skipped (could not build payload): {payload}")
return WebhookResult(webhook_sent=False, skipped=True)
import httpx # noqa: PLC0415
try:
response = await send_webhook_request(
url=room.webhook_url,

View File

@@ -13,6 +13,8 @@ from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import TIMEOUT_AUDIO
from reflector.hatchet.workflows.models import PadTrackResult
from reflector.logger import logger
from reflector.processors.audio_padding_auto import AudioPaddingAutoProcessor
from reflector.storage import get_source_storage, get_transcripts_storage
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import extract_stream_start_time_from_container
@@ -51,11 +53,6 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
)
try:
from reflector.storage import ( # noqa: PLC0415
get_source_storage,
get_transcripts_storage,
)
# Source reads: use platform-specific credentials
source_storage = get_source_storage(input.source_platform)
source_url = await source_storage.get_file_url(
@@ -104,10 +101,6 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
from reflector.processors.audio_padding_auto import ( # noqa: PLC0415
AudioPaddingAutoProcessor,
)
processor = AudioPaddingAutoProcessor()
result = await processor.pad_track(
track_url=source_url,

View File

@@ -15,12 +15,14 @@ from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_HEAVY
from reflector.hatchet.workflows.models import SubjectSummaryResult
from reflector.llm import LLM
from reflector.logger import logger
from reflector.processors.summary.prompts import (
DETAILED_SUBJECT_PROMPT_TEMPLATE,
PARAGRAPH_SUMMARY_PROMPT,
build_participant_instructions,
)
from reflector.settings import settings
class SubjectInput(BaseModel):
@@ -60,11 +62,6 @@ async def generate_detailed_summary(
subject_index=input.subject_index,
)
# Deferred imports: Hatchet workers fork processes, fresh imports ensure
# LLM HTTP connection pools aren't shared across forks
from reflector.llm import LLM # noqa: PLC0415
from reflector.settings import settings # noqa: PLC0415
llm = LLM(settings=settings)
participant_instructions = build_participant_instructions(input.participant_names)

View File

@@ -18,9 +18,13 @@ from pydantic import BaseModel
from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
from reflector.hatchet.workflows.models import TopicChunkResult
from reflector.llm import LLM
from reflector.logger import logger
from reflector.processors.prompts import TOPIC_PROMPT
from reflector.processors.transcript_topic_detector import TopicResponse
from reflector.processors.types import Word
from reflector.settings import settings
from reflector.utils.text import clean_title
class TopicChunkInput(BaseModel):
@@ -64,15 +68,6 @@ async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunk
text_length=len(input.chunk_text),
)
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
# sharing LLM HTTP connection pools across forks
from reflector.llm import LLM # noqa: PLC0415
from reflector.processors.transcript_topic_detector import ( # noqa: PLC0415
TopicResponse,
)
from reflector.settings import settings # noqa: PLC0415
from reflector.utils.text import clean_title # noqa: PLC0415
llm = LLM(settings=settings, temperature=0.9)
prompt = TOPIC_PROMPT.format(text=input.chunk_text)

View File

@@ -9,9 +9,9 @@ because Hatchet workflow DAGs are defined statically, but the number of tracks v
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
Note: This file uses deferred imports (inside tasks) intentionally.
Note: DB imports (reflector.db.*) are kept inline (deferred) intentionally.
Hatchet workers run in forked processes; fresh imports per task ensure
storage/DB connections are not shared across forks.
DB connections are not shared across forks.
"""
from datetime import timedelta
@@ -24,6 +24,9 @@ from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
from reflector.logger import logger
from reflector.pipelines.transcription_helpers import transcribe_file_with_processor
from reflector.processors.audio_padding_auto import AudioPaddingAutoProcessor
from reflector.storage import get_source_storage, get_transcripts_storage
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
from reflector.utils.audio_padding import extract_stream_start_time_from_container
@@ -72,11 +75,6 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
)
try:
from reflector.storage import ( # noqa: PLC0415
get_source_storage,
get_transcripts_storage,
)
# Source reads: use platform-specific credentials
source_storage = get_source_storage(input.source_platform)
source_url = await source_storage.get_file_url(
@@ -120,10 +118,6 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
)
from reflector.processors.audio_padding_auto import ( # noqa: PLC0415
AudioPaddingAutoProcessor,
)
processor = AudioPaddingAutoProcessor()
result = await processor.pad_track(
track_url=source_url,
@@ -179,11 +173,6 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
raise ValueError("Missing padded_key from pad_track")
# Presign URL on demand (avoids stale URLs on workflow replay)
from reflector.storage import ( # noqa: PLC0415
get_source_storage,
get_transcripts_storage,
)
# If bucket_name is set, file is still in the platform's source bucket (no padding applied).
# If bucket_name is None, padded file was written to our transcript storage.
if bucket_name:
@@ -198,10 +187,6 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
bucket=bucket_name,
)
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
transcribe_file_with_processor,
)
transcript = await transcribe_file_with_processor(audio_url, input.language)
# Tag all words with speaker index

View File

@@ -38,6 +38,7 @@ from reflector.db.transcripts import (
TranscriptWaveform,
transcripts_controller,
)
from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger
from reflector.pipelines.runner import PipelineMessage, PipelineRunner
from reflector.processors import (
@@ -814,8 +815,6 @@ async def pipeline_post(*, transcript_id: str, room_id: str | None = None):
"""
Run the post pipeline via Hatchet.
"""
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
await HatchetClientManager.start_workflow(
"LivePostProcessingPipeline",
{

View File

@@ -10,6 +10,7 @@ import os
import tempfile
import av
import requests
from reflector.logger import logger
from reflector.processors.audio_padding import AudioPaddingProcessor, PaddingResponse
@@ -65,8 +66,6 @@ class AudioPaddingPyavProcessor(AudioPaddingProcessor):
track_index: int,
) -> PaddingResponse:
"""Blocking padding work: download, pad with PyAV, upload."""
import requests
log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds)
temp_dir = tempfile.mkdtemp()
input_path = None

View File

@@ -333,7 +333,9 @@ if __name__ == "__main__":
if not s3_urls:
parser.error("At least one S3 URL required for multitrack processing")
from reflector.tools.cli_multitrack import process_multitrack_cli
from reflector.tools.cli_multitrack import (
process_multitrack_cli, # circular import
)
asyncio.run(
process_multitrack_cli(

View File

@@ -5,6 +5,7 @@ This tools help to either create a pipeline from command line,
or read a yaml description of a pipeline and run it.
"""
import importlib
import json
from reflector.logger import logger
@@ -37,8 +38,6 @@ def get_jsonl(filename, filter_processor_name=None):
def get_processor(name):
import importlib
module_name = f"reflector.processors.{name}"
class_name = snake_to_camel(name) + "Processor"
module = importlib.import_module(module_name)

View File

@@ -15,6 +15,7 @@ from reflector.dailyco_api import (
from reflector.db.meetings import meetings_controller
from reflector.logger import logger as _logger
from reflector.settings import settings
from reflector.storage import get_source_storage
from reflector.video_platforms.factory import create_platform_client
from reflector.worker.process import (
poll_daily_room_presence_task,
@@ -226,8 +227,6 @@ async def _handle_recording_ready(event: RecordingReadyEvent):
if video_track_keys:
meeting = await meetings_controller.get_by_room_name(room_name)
if meeting is not None and not meeting.store_video:
from reflector.storage import get_source_storage
storage = get_source_storage("daily")
for video_key in video_track_keys:
try:

View File

@@ -17,6 +17,7 @@ from reflector.db.meetings import meetings_controller
from reflector.livekit_api.webhooks import create_webhook_receiver, verify_webhook
from reflector.logger import logger as _logger
from reflector.settings import settings
from reflector.storage import get_source_storage
router = APIRouter()
@@ -189,8 +190,6 @@ async def _handle_egress_ended(event):
filename = file_result.filename
if filename and filename.endswith(".webm"):
try:
from reflector.storage import get_source_storage # noqa: PLC0415
storage = get_source_storage("livekit")
await storage.delete_file(filename)
logger.info(

View File

@@ -1,4 +1,6 @@
import logging
import re
import uuid
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import Annotated, Any, Literal, Optional
@@ -14,7 +16,7 @@ from reflector.db import get_database
from reflector.db.calendar_events import calendar_events_controller
from reflector.db.meetings import meetings_controller
from reflector.db.rooms import rooms_controller
from reflector.redis_cache import RedisAsyncLock
from reflector.redis_cache import RedisAsyncLock, get_async_redis_client
from reflector.schemas.platform import Platform
from reflector.services.ics_sync import ics_sync_service
from reflector.utils.url import add_query_param
@@ -606,9 +608,6 @@ async def rooms_join_meeting(
meeting.room_url = add_query_param(meeting.room_url, "t", token)
elif meeting.platform == "livekit":
import re
import uuid
client = create_platform_client(meeting.platform)
# Identity must be unique per participant to avoid S3 key collisions.
# Format: {readable_name}-{short_uuid} ensures uniqueness even for same names.
@@ -631,8 +630,6 @@ async def rooms_join_meeting(
# Store identity → Reflector user_id mapping for the pipeline
# (so TranscriptParticipant.user_id can be set correctly)
if user_id:
from reflector.redis_cache import get_async_redis_client # noqa: PLC0415
redis_client = await get_async_redis_client()
mapping_key = f"livekit:participant_map:{meeting.room_name}"
await redis_client.hset(mapping_key, participant_identity, user_id)

View File

@@ -11,6 +11,7 @@ from reflector.events import subscribers_shutdown
from reflector.logger import logger
from reflector.pipelines.runner import PipelineRunner
from reflector.settings import settings
from reflector.webrtc_ports import resolve_webrtc_host, rewrite_sdp_host
sessions = []
router = APIRouter()
@@ -128,8 +129,6 @@ async def rtc_offer_base(
# Rewrite ICE candidate IPs when running behind Docker bridge networking
if settings.WEBRTC_HOST:
from reflector.webrtc_ports import resolve_webrtc_host, rewrite_sdp_host
host_ip = resolve_webrtc_host(settings.WEBRTC_HOST)
sdp = rewrite_sdp_host(sdp, host_ip)

View File

@@ -4,6 +4,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request
import reflector.auth as auth
from reflector.db.transcripts import transcripts_controller
from reflector.pipelines.main_live_pipeline import PipelineMainLive
from .rtc_offer import RtcOffer, rtc_offer_base
@@ -28,8 +29,6 @@ async def transcript_record_webrtc(
raise HTTPException(status_code=400, detail="Transcript is locked")
# create a pipeline runner
from reflector.pipelines.main_live_pipeline import PipelineMainLive # noqa: PLC0415
pipeline_runner = PipelineMainLive(transcript_id=transcript_id)
# FIXME do not allow multiple recording at the same time

View File

@@ -11,6 +11,8 @@ This allows running the server in Docker with bridge networking
import asyncio
import socket
import aioice.ice
from reflector.logger import logger
@@ -36,9 +38,7 @@ def patch_aioice_port_range(min_port: int, max_port: int) -> None:
Works by temporarily wrapping loop.create_datagram_endpoint() during
aioice's get_component_candidates() to intercept bind(addr, 0) calls.
"""
import aioice.ice as _ice
_original = _ice.Connection.get_component_candidates
_original = aioice.ice.Connection.get_component_candidates
_state = {"next_port": min_port}
async def _patched_get_component_candidates(self, component, addresses, timeout=5):
@@ -78,7 +78,7 @@ def patch_aioice_port_range(min_port: int, max_port: int) -> None:
finally:
loop.create_datagram_endpoint = _orig_create
_ice.Connection.get_component_candidates = _patched_get_component_candidates
aioice.ice.Connection.get_component_candidates = _patched_get_component_candidates
logger.info(
"aioice patched for WebRTC port range",
min_port=min_port,
@@ -102,8 +102,6 @@ def rewrite_sdp_host(sdp: str, target_ip: str) -> str:
Replace container-internal IPs in SDP with target_ip so that
ICE candidates advertise a routable address.
"""
import aioice.ice
container_ips = aioice.ice.get_host_addresses(use_ipv4=True, use_ipv6=False)
for ip in container_ips:
if ip != "127.0.0.1" and ip != target_ip:

View File

@@ -1,3 +1,4 @@
import asyncio
import json
import os
import re
@@ -26,16 +27,26 @@ from reflector.db.transcripts import (
transcripts_controller,
)
from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.topic_processing import EmptyPipeline
from reflector.processors.audio_file_writer import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.redis_cache import RedisAsyncLock
from reflector.settings import settings
from reflector.storage import get_transcripts_storage
from reflector.storage import get_source_storage, get_transcripts_storage
from reflector.utils.daily import (
DailyRoomName,
extract_base_room_name,
filter_cam_audio_tracks,
recording_lock_key,
)
from reflector.utils.livekit import (
extract_livekit_base_room_name,
filter_audio_tracks,
parse_livekit_track_filepath,
)
from reflector.utils.livekit import (
recording_lock_key as livekit_recording_lock_key,
)
from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
from reflector.video_platforms.whereby_utils import (
@@ -932,11 +943,6 @@ async def convert_audio_and_waveform(transcript) -> None:
transcript_id=transcript.id,
)
from reflector.pipelines.topic_processing import EmptyPipeline # noqa: PLC0415
from reflector.processors.audio_file_writer import (
AudioFileWriterProcessor, # noqa: PLC0415
)
upload_path = transcript.data_path / "upload.webm"
mp3_path = transcript.audio_mp3_filename
@@ -1215,17 +1221,13 @@ async def process_livekit_multitrack(
Tracks are discovered via S3 listing (source of truth), not webhooks.
Called from room_finished webhook (fast-path) or beat task (fallback).
"""
from reflector.utils.livekit import ( # noqa: PLC0415
recording_lock_key,
)
logger.info(
"Processing LiveKit multitrack recording",
room_name=room_name,
meeting_id=meeting_id,
)
lock_key = recording_lock_key(room_name)
lock_key = livekit_recording_lock_key(room_name)
async with RedisAsyncLock(
key=lock_key,
timeout=600,
@@ -1252,19 +1254,10 @@ async def _process_livekit_multitrack_inner(
# 1. Discover tracks by listing S3 prefix.
# Wait briefly for egress files to finish flushing to S3 — the room_finished
# webhook fires after empty_timeout, but egress finalization may still be in progress.
import asyncio as _asyncio # noqa: PLC0415
from reflector.storage import get_source_storage # noqa: PLC0415
from reflector.utils.livekit import ( # noqa: PLC0415
extract_livekit_base_room_name,
filter_audio_tracks,
parse_livekit_track_filepath,
)
EGRESS_FLUSH_DELAY = 10 # seconds — egress typically flushes within a few seconds
EGRESS_RETRY_DELAY = 30 # seconds — retry if first listing finds nothing
await _asyncio.sleep(EGRESS_FLUSH_DELAY)
await asyncio.sleep(EGRESS_FLUSH_DELAY)
storage = get_source_storage("livekit")
s3_prefix = f"livekit/{room_name}/"
@@ -1280,7 +1273,7 @@ async def _process_livekit_multitrack_inner(
room_name=room_name,
retry_delay=EGRESS_RETRY_DELAY,
)
await _asyncio.sleep(EGRESS_RETRY_DELAY)
await asyncio.sleep(EGRESS_RETRY_DELAY)
all_keys = await storage.list_objects(prefix=s3_prefix)
audio_keys = filter_audio_tracks(all_keys) if all_keys else []
@@ -1299,7 +1292,7 @@ async def _process_livekit_multitrack_inner(
expected=expected_audio,
found=len(audio_keys),
)
await _asyncio.sleep(EGRESS_RETRY_DELAY)
await asyncio.sleep(EGRESS_RETRY_DELAY)
all_keys = await storage.list_objects(prefix=s3_prefix)
audio_keys = filter_audio_tracks(all_keys) if all_keys else []