From 4d4c4b8650b4450379f718bb25bae3a9165786fb Mon Sep 17 00:00:00 2001 From: Juan Date: Wed, 8 Apr 2026 17:52:38 -0500 Subject: [PATCH] fix: inline imports --- CLAUDE.md | 2 +- .../workflows/daily_multitrack_pipeline.py | 51 ++++--------- .../hatchet/workflows/file_pipeline.py | 74 ++++++------------- .../hatchet/workflows/live_post_pipeline.py | 56 +++++--------- .../hatchet/workflows/padding_workflow.py | 11 +-- .../hatchet/workflows/subject_processing.py | 7 +- .../workflows/topic_chunk_processing.py | 13 +--- .../hatchet/workflows/track_processing.py | 25 ++----- .../reflector/pipelines/main_live_pipeline.py | 3 +- .../processors/audio_padding_pyav.py | 3 +- server/reflector/tools/process.py | 4 +- server/reflector/tools/runpipeline.py | 3 +- server/reflector/views/daily.py | 3 +- server/reflector/views/livekit.py | 3 +- server/reflector/views/rooms.py | 9 +-- server/reflector/views/rtc_offer.py | 3 +- server/reflector/views/transcripts_webrtc.py | 3 +- server/reflector/webrtc_ports.py | 10 +-- server/reflector/worker/process.py | 39 ++++------ 19 files changed, 104 insertions(+), 218 deletions(-) diff --git a/CLAUDE.md b/CLAUDE.md index 3d534efd..5f7a68be 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -201,4 +201,4 @@ If you need to do any worker/pipeline related work, search for "Pipeline" classe ## Code Style - Always put imports at the top of the file. Let ruff/pre-commit handle sorting and formatting of imports. -- Exception: In Hatchet pipeline task functions, DB controller imports (e.g., `transcripts_controller`, `meetings_controller`) stay as deferred/inline imports inside `fresh_db_connection()` blocks — this is intentional to avoid sharing DB connections across forked processes. Non-DB imports (utilities, services) should still go at the top of the file. +- The **only** imports allowed to remain inline are from `reflector.db.*` modules (e.g., `reflector.db.transcripts`, `reflector.db.meetings`, `reflector.db.recordings`, `reflector.db.rooms`). These stay as deferred/inline imports inside `fresh_db_connection()` blocks in Hatchet pipeline task functions — this is intentional to avoid sharing DB connections across forked processes. All other imports (utilities, services, processors, storage, third-party libs) **must** go at the top of the file, even in Hatchet workflows. diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index 6c70943e..0fbfc62d 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -18,10 +18,11 @@ import json import tempfile import time from contextlib import asynccontextmanager -from datetime import timedelta +from datetime import datetime, timedelta from pathlib import Path from typing import Any, Callable, Coroutine, Protocol, TypeVar +import databases import httpx from hatchet_sdk import ( ConcurrencyExpression, @@ -83,6 +84,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import ( topic_chunk_workflow, ) from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow +from reflector.llm import LLM from reflector.logger import logger from reflector.pipelines import topic_processing from reflector.processors.audio_mixdown_auto import AudioMixdownAutoProcessor @@ -95,7 +97,9 @@ from reflector.processors.summary.prompts import ( from reflector.processors.summary.summary_builder import SummaryBuilder from reflector.processors.types import TitleSummary, Word from reflector.processors.types import Transcript as TranscriptType +from reflector.redis_cache import get_async_redis_client from reflector.settings import settings +from reflector.storage import get_source_storage, get_transcripts_storage from reflector.utils.audio_constants import ( PRESIGNED_URL_EXPIRATION_SECONDS, WAVEFORM_SEGMENTS, @@ -105,11 +109,16 @@ from reflector.utils.daily import ( filter_cam_audio_tracks, parse_daily_recording_filename, ) +from reflector.utils.livekit import parse_livekit_track_filepath from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty from reflector.utils.transcript_constants import ( compute_max_subjects, compute_topic_chunk_size, ) +from reflector.utils.webhook import ( + fetch_transcript_webhook_payload, + send_webhook_request, +) from reflector.zulip import post_transcript_notification @@ -138,8 +147,6 @@ async def fresh_db_connection(): The real fix would be making the db module fork-aware instead of bypassing it. Current pattern is acceptable given Hatchet's process model. """ - import databases # noqa: PLC0415 - from reflector.db import _database_context # noqa: PLC0415 _database_context.set(None) @@ -176,8 +183,6 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool: def _spawn_storage(): """Create fresh storage instance for writing to our transcript bucket.""" - from reflector.storage import get_transcripts_storage # noqa: PLC0415 - return get_transcripts_storage() @@ -391,10 +396,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe if input.source_platform == "livekit": # LiveKit: participant identity is in the track dict or can be parsed from filepath - from reflector.utils.livekit import ( - parse_livekit_track_filepath, # noqa: PLC0415 - ) - # Look up identity → Reflector user_id mapping from Redis # (stored at join time in rooms.py) identity_to_user_id: dict[str, str] = {} @@ -402,9 +403,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe from reflector.db.meetings import ( meetings_controller as mc, # noqa: PLC0415 ) - from reflector.redis_cache import ( - get_async_redis_client, # noqa: PLC0415 - ) meeting = ( await mc.get_by_id(transcript.meeting_id) @@ -546,12 +544,6 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes # OGG files don't have embedded start_time metadata, so we pre-calculate. track_padding: dict[int, float] = {} if input.source_platform == "livekit": - from datetime import datetime # noqa: PLC0415 - - from reflector.utils.livekit import ( - parse_livekit_track_filepath, # noqa: PLC0415 - ) - timestamps = [] for i, track in enumerate(input.tracks): ts_str = track.get("timestamp") @@ -1073,10 +1065,9 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult participant_name_to_id={}, ) - # Deferred imports: Hatchet workers fork processes, fresh imports avoid - # sharing DB connections and LLM HTTP pools across forks + # Deferred DB import: Hatchet workers fork processes, fresh imports avoid + # sharing DB connections across forks from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 - from reflector.llm import LLM # noqa: PLC0415 async with fresh_db_connection(): transcript = await transcripts_controller.get_by_id(input.transcript_id) @@ -1206,14 +1197,13 @@ async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult: subjects_result = ctx.task_output(extract_subjects) process_result = ctx.task_output(process_subjects) - # Deferred imports: Hatchet workers fork processes, fresh imports avoid - # sharing DB connections and LLM HTTP pools across forks + # Deferred DB import: Hatchet workers fork processes, fresh imports avoid + # sharing DB connections across forks from reflector.db.transcripts import ( # noqa: PLC0415 TranscriptFinalLongSummary, TranscriptFinalShortSummary, transcripts_controller, ) - from reflector.llm import LLM # noqa: PLC0415 subject_summaries = process_result.subject_summaries @@ -1302,13 +1292,12 @@ async def identify_action_items( ctx.log("identify_action_items: no transcript text, returning empty") return ActionItemsResult(action_items=ActionItemsResponse()) - # Deferred imports: Hatchet workers fork processes, fresh imports avoid - # sharing DB connections and LLM HTTP pools across forks + # Deferred DB import: Hatchet workers fork processes, fresh imports avoid + # sharing DB connections across forks from reflector.db.transcripts import ( # noqa: PLC0415 TranscriptActionItems, transcripts_controller, ) - from reflector.llm import LLM # noqa: PLC0415 # TODO: refactor SummaryBuilder methods into standalone functions llm = LLM(settings=settings) @@ -1445,10 +1434,6 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult: ) from reflector.db.recordings import recordings_controller # noqa: PLC0415 from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 - from reflector.storage import ( # noqa: PLC0415 - get_source_storage, - get_transcripts_storage, - ) transcript = await transcripts_controller.get_by_id(input.transcript_id) if not transcript: @@ -1597,10 +1582,6 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult: async with fresh_db_connection(): from reflector.db.rooms import rooms_controller # noqa: PLC0415 - from reflector.utils.webhook import ( # noqa: PLC0415 - fetch_transcript_webhook_payload, - send_webhook_request, - ) room = await rooms_controller.get_by_id(input.room_id) if not room or not room.webhook_url: diff --git a/server/reflector/hatchet/workflows/file_pipeline.py b/server/reflector/hatchet/workflows/file_pipeline.py index 5f20a2b5..801a0d96 100644 --- a/server/reflector/hatchet/workflows/file_pipeline.py +++ b/server/reflector/hatchet/workflows/file_pipeline.py @@ -15,6 +15,8 @@ import json from datetime import timedelta from pathlib import Path +import av +import httpx from hatchet_sdk import Context from pydantic import BaseModel @@ -47,9 +49,30 @@ from reflector.hatchet.workflows.models import ( ) from reflector.logger import logger from reflector.pipelines import topic_processing +from reflector.pipelines.transcription_helpers import transcribe_file_with_processor +from reflector.processors import AudioFileWriterProcessor +from reflector.processors.file_diarization import FileDiarizationInput +from reflector.processors.file_diarization_auto import FileDiarizationAutoProcessor +from reflector.processors.transcript_diarization_assembler import ( + TranscriptDiarizationAssemblerInput, + TranscriptDiarizationAssemblerProcessor, +) +from reflector.processors.types import ( + DiarizationSegment, + Word, +) +from reflector.processors.types import ( + Transcript as TranscriptType, +) from reflector.settings import settings +from reflector.storage import get_source_storage, get_transcripts_storage from reflector.utils.audio_constants import WAVEFORM_SEGMENTS from reflector.utils.audio_waveform import get_audio_waveform +from reflector.utils.webhook import ( + fetch_transcript_webhook_payload, + send_webhook_request, +) +from reflector.zulip import post_transcript_notification class FilePipelineInput(BaseModel): @@ -135,10 +158,6 @@ async def extract_audio(input: FilePipelineInput, ctx: Context) -> ExtractAudioR ctx.log(f"extract_audio: processing {audio_file}") # Extract audio and write as MP3 - import av # noqa: PLC0415 - - from reflector.processors import AudioFileWriterProcessor # noqa: PLC0415 - duration_ms_container = [0.0] async def capture_duration(d): @@ -189,8 +208,6 @@ async def upload_audio(input: FilePipelineInput, ctx: Context) -> UploadAudioRes extract_result = ctx.task_output(extract_audio) audio_path = extract_result.audio_path - from reflector.storage import get_transcripts_storage # noqa: PLC0415 - storage = get_transcripts_storage() if not storage: raise ValueError( @@ -232,10 +249,6 @@ async def transcribe(input: FilePipelineInput, ctx: Context) -> TranscribeResult raise ValueError(f"Transcript {input.transcript_id} not found") source_language = transcript.source_language - from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415 - transcribe_file_with_processor, - ) - result = await transcribe_file_with_processor(audio_url, source_language) ctx.log(f"transcribe complete: {len(result.words)} words") @@ -264,13 +277,6 @@ async def diarize(input: FilePipelineInput, ctx: Context) -> DiarizeResult: upload_result = ctx.task_output(upload_audio) audio_url = upload_result.audio_url - from reflector.processors.file_diarization import ( # noqa: PLC0415 - FileDiarizationInput, - ) - from reflector.processors.file_diarization_auto import ( # noqa: PLC0415 - FileDiarizationAutoProcessor, - ) - processor = FileDiarizationAutoProcessor() input_data = FileDiarizationInput(audio_url=audio_url) @@ -353,18 +359,6 @@ async def assemble_transcript( transcribe_result = ctx.task_output(transcribe) diarize_result = ctx.task_output(diarize) - from reflector.processors.transcript_diarization_assembler import ( # noqa: PLC0415 - TranscriptDiarizationAssemblerInput, - TranscriptDiarizationAssemblerProcessor, - ) - from reflector.processors.types import ( # noqa: PLC0415 - DiarizationSegment, - Word, - ) - from reflector.processors.types import ( # noqa: PLC0415 - Transcript as TranscriptType, - ) - words = [Word(**w) for w in transcribe_result.words] transcript_data = TranscriptType( words=words, translation=transcribe_result.translation @@ -437,17 +431,6 @@ async def detect_topics(input: FilePipelineInput, ctx: Context) -> TopicsResult: TranscriptTopic, transcripts_controller, ) - from reflector.processors.transcript_diarization_assembler import ( # noqa: PLC0415 - TranscriptDiarizationAssemblerInput, - TranscriptDiarizationAssemblerProcessor, - ) - from reflector.processors.types import ( # noqa: PLC0415 - DiarizationSegment, - Word, - ) - from reflector.processors.types import ( # noqa: PLC0415 - Transcript as TranscriptType, - ) words = [Word(**w) for w in transcribe_result.words] transcript_data = TranscriptType( @@ -688,10 +671,6 @@ async def cleanup_consent(input: FilePipelineInput, ctx: Context) -> ConsentResu ) from reflector.db.recordings import recordings_controller # noqa: PLC0415 from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 - from reflector.storage import ( # noqa: PLC0415 - get_source_storage, - get_transcripts_storage, - ) transcript = await transcripts_controller.get_by_id(input.transcript_id) if not transcript: @@ -807,7 +786,6 @@ async def post_zulip(input: FilePipelineInput, ctx: Context) -> ZulipResult: async with fresh_db_connection(): from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 - from reflector.zulip import post_transcript_notification # noqa: PLC0415 transcript = await transcripts_controller.get_by_id(input.transcript_id) if transcript: @@ -837,10 +815,6 @@ async def send_webhook(input: FilePipelineInput, ctx: Context) -> WebhookResult: async with fresh_db_connection(): from reflector.db.rooms import rooms_controller # noqa: PLC0415 - from reflector.utils.webhook import ( # noqa: PLC0415 - fetch_transcript_webhook_payload, - send_webhook_request, - ) room = await rooms_controller.get_by_id(input.room_id) if not room or not room.webhook_url: @@ -856,8 +830,6 @@ async def send_webhook(input: FilePipelineInput, ctx: Context) -> WebhookResult: ctx.log(f"send_webhook skipped (could not build payload): {payload}") return WebhookResult(webhook_sent=False, skipped=True) - import httpx # noqa: PLC0415 - try: response = await send_webhook_request( url=room.webhook_url, diff --git a/server/reflector/hatchet/workflows/live_post_pipeline.py b/server/reflector/hatchet/workflows/live_post_pipeline.py index 5913d999..4b63671e 100644 --- a/server/reflector/hatchet/workflows/live_post_pipeline.py +++ b/server/reflector/hatchet/workflows/live_post_pipeline.py @@ -14,6 +14,7 @@ are not shared across forks, avoiding connection pooling issues. from datetime import timedelta +import httpx from hatchet_sdk import Context from pydantic import BaseModel @@ -40,7 +41,24 @@ from reflector.hatchet.workflows.models import ( ZulipResult, ) from reflector.logger import logger +from reflector.pipelines.main_live_pipeline import ( + PipelineMainTitle, + PipelineMainWaveform, + pipeline_convert_to_mp3, + pipeline_diarization, + pipeline_remove_upload, + pipeline_summaries, + pipeline_upload_mp3, +) +from reflector.pipelines.main_live_pipeline import ( + cleanup_consent as _cleanup_consent, +) from reflector.settings import settings +from reflector.utils.webhook import ( + fetch_transcript_webhook_payload, + send_webhook_request, +) +from reflector.zulip import post_transcript_notification class LivePostPipelineInput(BaseModel): @@ -91,9 +109,6 @@ async def waveform(input: LivePostPipelineInput, ctx: Context) -> WaveformResult async with fresh_db_connection(): from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 - from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415 - PipelineMainWaveform, - ) transcript = await transcripts_controller.get_by_id(input.transcript_id) if not transcript: @@ -118,10 +133,6 @@ async def generate_title(input: LivePostPipelineInput, ctx: Context) -> TitleRes ctx.log(f"generate_title: starting for transcript_id={input.transcript_id}") async with fresh_db_connection(): - from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415 - PipelineMainTitle, - ) - runner = PipelineMainTitle(transcript_id=input.transcript_id) await runner.run() @@ -142,10 +153,6 @@ async def convert_mp3(input: LivePostPipelineInput, ctx: Context) -> ConvertMp3R ctx.log(f"convert_mp3: starting for transcript_id={input.transcript_id}") async with fresh_db_connection(): - from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415 - pipeline_convert_to_mp3, - ) - await pipeline_convert_to_mp3(transcript_id=input.transcript_id) ctx.log("convert_mp3 complete") @@ -165,10 +172,6 @@ async def upload_mp3(input: LivePostPipelineInput, ctx: Context) -> UploadMp3Res ctx.log(f"upload_mp3: starting for transcript_id={input.transcript_id}") async with fresh_db_connection(): - from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415 - pipeline_upload_mp3, - ) - await pipeline_upload_mp3(transcript_id=input.transcript_id) ctx.log("upload_mp3 complete") @@ -190,10 +193,6 @@ async def remove_upload( ctx.log(f"remove_upload: starting for transcript_id={input.transcript_id}") async with fresh_db_connection(): - from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415 - pipeline_remove_upload, - ) - await pipeline_remove_upload(transcript_id=input.transcript_id) ctx.log("remove_upload complete") @@ -213,10 +212,6 @@ async def diarize(input: LivePostPipelineInput, ctx: Context) -> DiarizeResult: ctx.log(f"diarize: starting for transcript_id={input.transcript_id}") async with fresh_db_connection(): - from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415 - pipeline_diarization, - ) - await pipeline_diarization(transcript_id=input.transcript_id) ctx.log("diarize complete") @@ -236,10 +231,6 @@ async def cleanup_consent(input: LivePostPipelineInput, ctx: Context) -> Consent ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}") async with fresh_db_connection(): - from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415 - cleanup_consent as _cleanup_consent, - ) - await _cleanup_consent(transcript_id=input.transcript_id) ctx.log("cleanup_consent complete") @@ -261,10 +252,6 @@ async def final_summaries( ctx.log(f"final_summaries: starting for transcript_id={input.transcript_id}") async with fresh_db_connection(): - from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415 - pipeline_summaries, - ) - await pipeline_summaries(transcript_id=input.transcript_id) ctx.log("final_summaries complete") @@ -289,7 +276,6 @@ async def post_zulip(input: LivePostPipelineInput, ctx: Context) -> ZulipResult: async with fresh_db_connection(): from reflector.db.transcripts import transcripts_controller # noqa: PLC0415 - from reflector.zulip import post_transcript_notification # noqa: PLC0415 transcript = await transcripts_controller.get_by_id(input.transcript_id) if transcript: @@ -319,10 +305,6 @@ async def send_webhook(input: LivePostPipelineInput, ctx: Context) -> WebhookRes async with fresh_db_connection(): from reflector.db.rooms import rooms_controller # noqa: PLC0415 - from reflector.utils.webhook import ( # noqa: PLC0415 - fetch_transcript_webhook_payload, - send_webhook_request, - ) room = await rooms_controller.get_by_id(input.room_id) if not room or not room.webhook_url: @@ -338,8 +320,6 @@ async def send_webhook(input: LivePostPipelineInput, ctx: Context) -> WebhookRes ctx.log(f"send_webhook skipped (could not build payload): {payload}") return WebhookResult(webhook_sent=False, skipped=True) - import httpx # noqa: PLC0415 - try: response = await send_webhook_request( url=room.webhook_url, diff --git a/server/reflector/hatchet/workflows/padding_workflow.py b/server/reflector/hatchet/workflows/padding_workflow.py index d63125c4..a85d3197 100644 --- a/server/reflector/hatchet/workflows/padding_workflow.py +++ b/server/reflector/hatchet/workflows/padding_workflow.py @@ -13,6 +13,8 @@ from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.constants import TIMEOUT_AUDIO from reflector.hatchet.workflows.models import PadTrackResult from reflector.logger import logger +from reflector.processors.audio_padding_auto import AudioPaddingAutoProcessor +from reflector.storage import get_source_storage, get_transcripts_storage from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS from reflector.utils.audio_padding import extract_stream_start_time_from_container @@ -51,11 +53,6 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult: ) try: - from reflector.storage import ( # noqa: PLC0415 - get_source_storage, - get_transcripts_storage, - ) - # Source reads: use platform-specific credentials source_storage = get_source_storage(input.source_platform) source_url = await source_storage.get_file_url( @@ -104,10 +101,6 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult: expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, ) - from reflector.processors.audio_padding_auto import ( # noqa: PLC0415 - AudioPaddingAutoProcessor, - ) - processor = AudioPaddingAutoProcessor() result = await processor.pad_track( track_url=source_url, diff --git a/server/reflector/hatchet/workflows/subject_processing.py b/server/reflector/hatchet/workflows/subject_processing.py index df7d8f2f..2c771282 100644 --- a/server/reflector/hatchet/workflows/subject_processing.py +++ b/server/reflector/hatchet/workflows/subject_processing.py @@ -15,12 +15,14 @@ from pydantic import BaseModel from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_HEAVY from reflector.hatchet.workflows.models import SubjectSummaryResult +from reflector.llm import LLM from reflector.logger import logger from reflector.processors.summary.prompts import ( DETAILED_SUBJECT_PROMPT_TEMPLATE, PARAGRAPH_SUMMARY_PROMPT, build_participant_instructions, ) +from reflector.settings import settings class SubjectInput(BaseModel): @@ -60,11 +62,6 @@ async def generate_detailed_summary( subject_index=input.subject_index, ) - # Deferred imports: Hatchet workers fork processes, fresh imports ensure - # LLM HTTP connection pools aren't shared across forks - from reflector.llm import LLM # noqa: PLC0415 - from reflector.settings import settings # noqa: PLC0415 - llm = LLM(settings=settings) participant_instructions = build_participant_instructions(input.participant_names) diff --git a/server/reflector/hatchet/workflows/topic_chunk_processing.py b/server/reflector/hatchet/workflows/topic_chunk_processing.py index e7c90252..1cac4c7d 100644 --- a/server/reflector/hatchet/workflows/topic_chunk_processing.py +++ b/server/reflector/hatchet/workflows/topic_chunk_processing.py @@ -18,9 +18,13 @@ from pydantic import BaseModel from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM from reflector.hatchet.workflows.models import TopicChunkResult +from reflector.llm import LLM from reflector.logger import logger from reflector.processors.prompts import TOPIC_PROMPT +from reflector.processors.transcript_topic_detector import TopicResponse from reflector.processors.types import Word +from reflector.settings import settings +from reflector.utils.text import clean_title class TopicChunkInput(BaseModel): @@ -64,15 +68,6 @@ async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunk text_length=len(input.chunk_text), ) - # Deferred imports: Hatchet workers fork processes, fresh imports avoid - # sharing LLM HTTP connection pools across forks - from reflector.llm import LLM # noqa: PLC0415 - from reflector.processors.transcript_topic_detector import ( # noqa: PLC0415 - TopicResponse, - ) - from reflector.settings import settings # noqa: PLC0415 - from reflector.utils.text import clean_title # noqa: PLC0415 - llm = LLM(settings=settings, temperature=0.9) prompt = TOPIC_PROMPT.format(text=input.chunk_text) diff --git a/server/reflector/hatchet/workflows/track_processing.py b/server/reflector/hatchet/workflows/track_processing.py index b2b477f2..c9c1ba1b 100644 --- a/server/reflector/hatchet/workflows/track_processing.py +++ b/server/reflector/hatchet/workflows/track_processing.py @@ -9,9 +9,9 @@ because Hatchet workflow DAGs are defined statically, but the number of tracks v at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py. -Note: This file uses deferred imports (inside tasks) intentionally. +Note: DB imports (reflector.db.*) are kept inline (deferred) intentionally. Hatchet workers run in forked processes; fresh imports per task ensure -storage/DB connections are not shared across forks. +DB connections are not shared across forks. """ from datetime import timedelta @@ -24,6 +24,9 @@ from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult from reflector.logger import logger +from reflector.pipelines.transcription_helpers import transcribe_file_with_processor +from reflector.processors.audio_padding_auto import AudioPaddingAutoProcessor +from reflector.storage import get_source_storage, get_transcripts_storage from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS from reflector.utils.audio_padding import extract_stream_start_time_from_container @@ -72,11 +75,6 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: ) try: - from reflector.storage import ( # noqa: PLC0415 - get_source_storage, - get_transcripts_storage, - ) - # Source reads: use platform-specific credentials source_storage = get_source_storage(input.source_platform) source_url = await source_storage.get_file_url( @@ -120,10 +118,6 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult: expires_in=PRESIGNED_URL_EXPIRATION_SECONDS, ) - from reflector.processors.audio_padding_auto import ( # noqa: PLC0415 - AudioPaddingAutoProcessor, - ) - processor = AudioPaddingAutoProcessor() result = await processor.pad_track( track_url=source_url, @@ -179,11 +173,6 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe raise ValueError("Missing padded_key from pad_track") # Presign URL on demand (avoids stale URLs on workflow replay) - from reflector.storage import ( # noqa: PLC0415 - get_source_storage, - get_transcripts_storage, - ) - # If bucket_name is set, file is still in the platform's source bucket (no padding applied). # If bucket_name is None, padded file was written to our transcript storage. if bucket_name: @@ -198,10 +187,6 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe bucket=bucket_name, ) - from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415 - transcribe_file_with_processor, - ) - transcript = await transcribe_file_with_processor(audio_url, input.language) # Tag all words with speaker index diff --git a/server/reflector/pipelines/main_live_pipeline.py b/server/reflector/pipelines/main_live_pipeline.py index b25910fe..76c9f01e 100644 --- a/server/reflector/pipelines/main_live_pipeline.py +++ b/server/reflector/pipelines/main_live_pipeline.py @@ -38,6 +38,7 @@ from reflector.db.transcripts import ( TranscriptWaveform, transcripts_controller, ) +from reflector.hatchet.client import HatchetClientManager from reflector.logger import logger from reflector.pipelines.runner import PipelineMessage, PipelineRunner from reflector.processors import ( @@ -814,8 +815,6 @@ async def pipeline_post(*, transcript_id: str, room_id: str | None = None): """ Run the post pipeline via Hatchet. """ - from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415 - await HatchetClientManager.start_workflow( "LivePostProcessingPipeline", { diff --git a/server/reflector/processors/audio_padding_pyav.py b/server/reflector/processors/audio_padding_pyav.py index f97255a7..3ac0fb65 100644 --- a/server/reflector/processors/audio_padding_pyav.py +++ b/server/reflector/processors/audio_padding_pyav.py @@ -10,6 +10,7 @@ import os import tempfile import av +import requests from reflector.logger import logger from reflector.processors.audio_padding import AudioPaddingProcessor, PaddingResponse @@ -65,8 +66,6 @@ class AudioPaddingPyavProcessor(AudioPaddingProcessor): track_index: int, ) -> PaddingResponse: """Blocking padding work: download, pad with PyAV, upload.""" - import requests - log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds) temp_dir = tempfile.mkdtemp() input_path = None diff --git a/server/reflector/tools/process.py b/server/reflector/tools/process.py index b311748e..1dac96c3 100644 --- a/server/reflector/tools/process.py +++ b/server/reflector/tools/process.py @@ -333,7 +333,9 @@ if __name__ == "__main__": if not s3_urls: parser.error("At least one S3 URL required for multitrack processing") - from reflector.tools.cli_multitrack import process_multitrack_cli + from reflector.tools.cli_multitrack import ( + process_multitrack_cli, # circular import + ) asyncio.run( process_multitrack_cli( diff --git a/server/reflector/tools/runpipeline.py b/server/reflector/tools/runpipeline.py index d723f7b4..b940ccf2 100644 --- a/server/reflector/tools/runpipeline.py +++ b/server/reflector/tools/runpipeline.py @@ -5,6 +5,7 @@ This tools help to either create a pipeline from command line, or read a yaml description of a pipeline and run it. """ +import importlib import json from reflector.logger import logger @@ -37,8 +38,6 @@ def get_jsonl(filename, filter_processor_name=None): def get_processor(name): - import importlib - module_name = f"reflector.processors.{name}" class_name = snake_to_camel(name) + "Processor" module = importlib.import_module(module_name) diff --git a/server/reflector/views/daily.py b/server/reflector/views/daily.py index 43faefcc..ec54bcd9 100644 --- a/server/reflector/views/daily.py +++ b/server/reflector/views/daily.py @@ -15,6 +15,7 @@ from reflector.dailyco_api import ( from reflector.db.meetings import meetings_controller from reflector.logger import logger as _logger from reflector.settings import settings +from reflector.storage import get_source_storage from reflector.video_platforms.factory import create_platform_client from reflector.worker.process import ( poll_daily_room_presence_task, @@ -226,8 +227,6 @@ async def _handle_recording_ready(event: RecordingReadyEvent): if video_track_keys: meeting = await meetings_controller.get_by_room_name(room_name) if meeting is not None and not meeting.store_video: - from reflector.storage import get_source_storage - storage = get_source_storage("daily") for video_key in video_track_keys: try: diff --git a/server/reflector/views/livekit.py b/server/reflector/views/livekit.py index 2f031229..f2fc66c9 100644 --- a/server/reflector/views/livekit.py +++ b/server/reflector/views/livekit.py @@ -17,6 +17,7 @@ from reflector.db.meetings import meetings_controller from reflector.livekit_api.webhooks import create_webhook_receiver, verify_webhook from reflector.logger import logger as _logger from reflector.settings import settings +from reflector.storage import get_source_storage router = APIRouter() @@ -189,8 +190,6 @@ async def _handle_egress_ended(event): filename = file_result.filename if filename and filename.endswith(".webm"): try: - from reflector.storage import get_source_storage # noqa: PLC0415 - storage = get_source_storage("livekit") await storage.delete_file(filename) logger.info( diff --git a/server/reflector/views/rooms.py b/server/reflector/views/rooms.py index 90969aa7..9e36335f 100644 --- a/server/reflector/views/rooms.py +++ b/server/reflector/views/rooms.py @@ -1,4 +1,6 @@ import logging +import re +import uuid from datetime import datetime, timedelta, timezone from enum import Enum from typing import Annotated, Any, Literal, Optional @@ -14,7 +16,7 @@ from reflector.db import get_database from reflector.db.calendar_events import calendar_events_controller from reflector.db.meetings import meetings_controller from reflector.db.rooms import rooms_controller -from reflector.redis_cache import RedisAsyncLock +from reflector.redis_cache import RedisAsyncLock, get_async_redis_client from reflector.schemas.platform import Platform from reflector.services.ics_sync import ics_sync_service from reflector.utils.url import add_query_param @@ -606,9 +608,6 @@ async def rooms_join_meeting( meeting.room_url = add_query_param(meeting.room_url, "t", token) elif meeting.platform == "livekit": - import re - import uuid - client = create_platform_client(meeting.platform) # Identity must be unique per participant to avoid S3 key collisions. # Format: {readable_name}-{short_uuid} ensures uniqueness even for same names. @@ -631,8 +630,6 @@ async def rooms_join_meeting( # Store identity → Reflector user_id mapping for the pipeline # (so TranscriptParticipant.user_id can be set correctly) if user_id: - from reflector.redis_cache import get_async_redis_client # noqa: PLC0415 - redis_client = await get_async_redis_client() mapping_key = f"livekit:participant_map:{meeting.room_name}" await redis_client.hset(mapping_key, participant_identity, user_id) diff --git a/server/reflector/views/rtc_offer.py b/server/reflector/views/rtc_offer.py index 935ac544..d3701491 100644 --- a/server/reflector/views/rtc_offer.py +++ b/server/reflector/views/rtc_offer.py @@ -11,6 +11,7 @@ from reflector.events import subscribers_shutdown from reflector.logger import logger from reflector.pipelines.runner import PipelineRunner from reflector.settings import settings +from reflector.webrtc_ports import resolve_webrtc_host, rewrite_sdp_host sessions = [] router = APIRouter() @@ -128,8 +129,6 @@ async def rtc_offer_base( # Rewrite ICE candidate IPs when running behind Docker bridge networking if settings.WEBRTC_HOST: - from reflector.webrtc_ports import resolve_webrtc_host, rewrite_sdp_host - host_ip = resolve_webrtc_host(settings.WEBRTC_HOST) sdp = rewrite_sdp_host(sdp, host_ip) diff --git a/server/reflector/views/transcripts_webrtc.py b/server/reflector/views/transcripts_webrtc.py index 2c222272..c5f02801 100644 --- a/server/reflector/views/transcripts_webrtc.py +++ b/server/reflector/views/transcripts_webrtc.py @@ -4,6 +4,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request import reflector.auth as auth from reflector.db.transcripts import transcripts_controller +from reflector.pipelines.main_live_pipeline import PipelineMainLive from .rtc_offer import RtcOffer, rtc_offer_base @@ -28,8 +29,6 @@ async def transcript_record_webrtc( raise HTTPException(status_code=400, detail="Transcript is locked") # create a pipeline runner - from reflector.pipelines.main_live_pipeline import PipelineMainLive # noqa: PLC0415 - pipeline_runner = PipelineMainLive(transcript_id=transcript_id) # FIXME do not allow multiple recording at the same time diff --git a/server/reflector/webrtc_ports.py b/server/reflector/webrtc_ports.py index 5863d1f4..21cec5f3 100644 --- a/server/reflector/webrtc_ports.py +++ b/server/reflector/webrtc_ports.py @@ -11,6 +11,8 @@ This allows running the server in Docker with bridge networking import asyncio import socket +import aioice.ice + from reflector.logger import logger @@ -36,9 +38,7 @@ def patch_aioice_port_range(min_port: int, max_port: int) -> None: Works by temporarily wrapping loop.create_datagram_endpoint() during aioice's get_component_candidates() to intercept bind(addr, 0) calls. """ - import aioice.ice as _ice - - _original = _ice.Connection.get_component_candidates + _original = aioice.ice.Connection.get_component_candidates _state = {"next_port": min_port} async def _patched_get_component_candidates(self, component, addresses, timeout=5): @@ -78,7 +78,7 @@ def patch_aioice_port_range(min_port: int, max_port: int) -> None: finally: loop.create_datagram_endpoint = _orig_create - _ice.Connection.get_component_candidates = _patched_get_component_candidates + aioice.ice.Connection.get_component_candidates = _patched_get_component_candidates logger.info( "aioice patched for WebRTC port range", min_port=min_port, @@ -102,8 +102,6 @@ def rewrite_sdp_host(sdp: str, target_ip: str) -> str: Replace container-internal IPs in SDP with target_ip so that ICE candidates advertise a routable address. """ - import aioice.ice - container_ips = aioice.ice.get_host_addresses(use_ipv4=True, use_ipv6=False) for ip in container_ips: if ip != "127.0.0.1" and ip != target_ip: diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 130b32d8..975e9575 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -1,3 +1,4 @@ +import asyncio import json import os import re @@ -26,16 +27,26 @@ from reflector.db.transcripts import ( transcripts_controller, ) from reflector.hatchet.client import HatchetClientManager +from reflector.pipelines.topic_processing import EmptyPipeline +from reflector.processors.audio_file_writer import AudioFileWriterProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.redis_cache import RedisAsyncLock from reflector.settings import settings -from reflector.storage import get_transcripts_storage +from reflector.storage import get_source_storage, get_transcripts_storage from reflector.utils.daily import ( DailyRoomName, extract_base_room_name, filter_cam_audio_tracks, recording_lock_key, ) +from reflector.utils.livekit import ( + extract_livekit_base_room_name, + filter_audio_tracks, + parse_livekit_track_filepath, +) +from reflector.utils.livekit import ( + recording_lock_key as livekit_recording_lock_key, +) from reflector.utils.string import NonEmptyString from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.whereby_utils import ( @@ -932,11 +943,6 @@ async def convert_audio_and_waveform(transcript) -> None: transcript_id=transcript.id, ) - from reflector.pipelines.topic_processing import EmptyPipeline # noqa: PLC0415 - from reflector.processors.audio_file_writer import ( - AudioFileWriterProcessor, # noqa: PLC0415 - ) - upload_path = transcript.data_path / "upload.webm" mp3_path = transcript.audio_mp3_filename @@ -1215,17 +1221,13 @@ async def process_livekit_multitrack( Tracks are discovered via S3 listing (source of truth), not webhooks. Called from room_finished webhook (fast-path) or beat task (fallback). """ - from reflector.utils.livekit import ( # noqa: PLC0415 - recording_lock_key, - ) - logger.info( "Processing LiveKit multitrack recording", room_name=room_name, meeting_id=meeting_id, ) - lock_key = recording_lock_key(room_name) + lock_key = livekit_recording_lock_key(room_name) async with RedisAsyncLock( key=lock_key, timeout=600, @@ -1252,19 +1254,10 @@ async def _process_livekit_multitrack_inner( # 1. Discover tracks by listing S3 prefix. # Wait briefly for egress files to finish flushing to S3 — the room_finished # webhook fires after empty_timeout, but egress finalization may still be in progress. - import asyncio as _asyncio # noqa: PLC0415 - - from reflector.storage import get_source_storage # noqa: PLC0415 - from reflector.utils.livekit import ( # noqa: PLC0415 - extract_livekit_base_room_name, - filter_audio_tracks, - parse_livekit_track_filepath, - ) - EGRESS_FLUSH_DELAY = 10 # seconds — egress typically flushes within a few seconds EGRESS_RETRY_DELAY = 30 # seconds — retry if first listing finds nothing - await _asyncio.sleep(EGRESS_FLUSH_DELAY) + await asyncio.sleep(EGRESS_FLUSH_DELAY) storage = get_source_storage("livekit") s3_prefix = f"livekit/{room_name}/" @@ -1280,7 +1273,7 @@ async def _process_livekit_multitrack_inner( room_name=room_name, retry_delay=EGRESS_RETRY_DELAY, ) - await _asyncio.sleep(EGRESS_RETRY_DELAY) + await asyncio.sleep(EGRESS_RETRY_DELAY) all_keys = await storage.list_objects(prefix=s3_prefix) audio_keys = filter_audio_tracks(all_keys) if all_keys else [] @@ -1299,7 +1292,7 @@ async def _process_livekit_multitrack_inner( expected=expected_audio, found=len(audio_keys), ) - await _asyncio.sleep(EGRESS_RETRY_DELAY) + await asyncio.sleep(EGRESS_RETRY_DELAY) all_keys = await storage.list_objects(prefix=s3_prefix) audio_keys = filter_audio_tracks(all_keys) if all_keys else []