mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2026-04-24 06:05:19 +00:00
fix: inline imports (#955)
This commit is contained in:
committed by
GitHub
parent
ee8db36f2c
commit
739cd51375
@@ -201,4 +201,4 @@ If you need to do any worker/pipeline related work, search for "Pipeline" classe
|
|||||||
## Code Style
|
## Code Style
|
||||||
|
|
||||||
- Always put imports at the top of the file. Let ruff/pre-commit handle sorting and formatting of imports.
|
- Always put imports at the top of the file. Let ruff/pre-commit handle sorting and formatting of imports.
|
||||||
- Exception: In Hatchet pipeline task functions, DB controller imports (e.g., `transcripts_controller`, `meetings_controller`) stay as deferred/inline imports inside `fresh_db_connection()` blocks — this is intentional to avoid sharing DB connections across forked processes. Non-DB imports (utilities, services) should still go at the top of the file.
|
- The **only** imports allowed to remain inline are from `reflector.db.*` modules (e.g., `reflector.db.transcripts`, `reflector.db.meetings`, `reflector.db.recordings`, `reflector.db.rooms`). These stay as deferred/inline imports inside `fresh_db_connection()` blocks in Hatchet pipeline task functions — this is intentional to avoid sharing DB connections across forked processes. All other imports (utilities, services, processors, storage, third-party libs) **must** go at the top of the file, even in Hatchet workflows.
|
||||||
|
|||||||
@@ -18,10 +18,11 @@ import json
|
|||||||
import tempfile
|
import tempfile
|
||||||
import time
|
import time
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from datetime import timedelta
|
from datetime import datetime, timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any, Callable, Coroutine, Protocol, TypeVar
|
from typing import Any, Callable, Coroutine, Protocol, TypeVar
|
||||||
|
|
||||||
|
import databases
|
||||||
import httpx
|
import httpx
|
||||||
from hatchet_sdk import (
|
from hatchet_sdk import (
|
||||||
ConcurrencyExpression,
|
ConcurrencyExpression,
|
||||||
@@ -83,6 +84,7 @@ from reflector.hatchet.workflows.topic_chunk_processing import (
|
|||||||
topic_chunk_workflow,
|
topic_chunk_workflow,
|
||||||
)
|
)
|
||||||
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
|
||||||
|
from reflector.llm import LLM
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.pipelines import topic_processing
|
from reflector.pipelines import topic_processing
|
||||||
from reflector.processors.audio_mixdown_auto import AudioMixdownAutoProcessor
|
from reflector.processors.audio_mixdown_auto import AudioMixdownAutoProcessor
|
||||||
@@ -95,7 +97,9 @@ from reflector.processors.summary.prompts import (
|
|||||||
from reflector.processors.summary.summary_builder import SummaryBuilder
|
from reflector.processors.summary.summary_builder import SummaryBuilder
|
||||||
from reflector.processors.types import TitleSummary, Word
|
from reflector.processors.types import TitleSummary, Word
|
||||||
from reflector.processors.types import Transcript as TranscriptType
|
from reflector.processors.types import Transcript as TranscriptType
|
||||||
|
from reflector.redis_cache import get_async_redis_client
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
|
from reflector.storage import get_source_storage, get_transcripts_storage
|
||||||
from reflector.utils.audio_constants import (
|
from reflector.utils.audio_constants import (
|
||||||
PRESIGNED_URL_EXPIRATION_SECONDS,
|
PRESIGNED_URL_EXPIRATION_SECONDS,
|
||||||
WAVEFORM_SEGMENTS,
|
WAVEFORM_SEGMENTS,
|
||||||
@@ -105,11 +109,16 @@ from reflector.utils.daily import (
|
|||||||
filter_cam_audio_tracks,
|
filter_cam_audio_tracks,
|
||||||
parse_daily_recording_filename,
|
parse_daily_recording_filename,
|
||||||
)
|
)
|
||||||
|
from reflector.utils.livekit import parse_livekit_track_filepath
|
||||||
from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty
|
from reflector.utils.string import NonEmptyString, assert_non_none_and_non_empty
|
||||||
from reflector.utils.transcript_constants import (
|
from reflector.utils.transcript_constants import (
|
||||||
compute_max_subjects,
|
compute_max_subjects,
|
||||||
compute_topic_chunk_size,
|
compute_topic_chunk_size,
|
||||||
)
|
)
|
||||||
|
from reflector.utils.webhook import (
|
||||||
|
fetch_transcript_webhook_payload,
|
||||||
|
send_webhook_request,
|
||||||
|
)
|
||||||
from reflector.zulip import post_transcript_notification
|
from reflector.zulip import post_transcript_notification
|
||||||
|
|
||||||
|
|
||||||
@@ -138,8 +147,6 @@ async def fresh_db_connection():
|
|||||||
The real fix would be making the db module fork-aware instead of bypassing it.
|
The real fix would be making the db module fork-aware instead of bypassing it.
|
||||||
Current pattern is acceptable given Hatchet's process model.
|
Current pattern is acceptable given Hatchet's process model.
|
||||||
"""
|
"""
|
||||||
import databases # noqa: PLC0415
|
|
||||||
|
|
||||||
from reflector.db import _database_context # noqa: PLC0415
|
from reflector.db import _database_context # noqa: PLC0415
|
||||||
|
|
||||||
_database_context.set(None)
|
_database_context.set(None)
|
||||||
@@ -176,8 +183,6 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool:
|
|||||||
|
|
||||||
def _spawn_storage():
|
def _spawn_storage():
|
||||||
"""Create fresh storage instance for writing to our transcript bucket."""
|
"""Create fresh storage instance for writing to our transcript bucket."""
|
||||||
from reflector.storage import get_transcripts_storage # noqa: PLC0415
|
|
||||||
|
|
||||||
return get_transcripts_storage()
|
return get_transcripts_storage()
|
||||||
|
|
||||||
|
|
||||||
@@ -391,10 +396,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
|
|||||||
|
|
||||||
if input.source_platform == "livekit":
|
if input.source_platform == "livekit":
|
||||||
# LiveKit: participant identity is in the track dict or can be parsed from filepath
|
# LiveKit: participant identity is in the track dict or can be parsed from filepath
|
||||||
from reflector.utils.livekit import (
|
|
||||||
parse_livekit_track_filepath, # noqa: PLC0415
|
|
||||||
)
|
|
||||||
|
|
||||||
# Look up identity → Reflector user_id mapping from Redis
|
# Look up identity → Reflector user_id mapping from Redis
|
||||||
# (stored at join time in rooms.py)
|
# (stored at join time in rooms.py)
|
||||||
identity_to_user_id: dict[str, str] = {}
|
identity_to_user_id: dict[str, str] = {}
|
||||||
@@ -402,9 +403,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
|
|||||||
from reflector.db.meetings import (
|
from reflector.db.meetings import (
|
||||||
meetings_controller as mc, # noqa: PLC0415
|
meetings_controller as mc, # noqa: PLC0415
|
||||||
)
|
)
|
||||||
from reflector.redis_cache import (
|
|
||||||
get_async_redis_client, # noqa: PLC0415
|
|
||||||
)
|
|
||||||
|
|
||||||
meeting = (
|
meeting = (
|
||||||
await mc.get_by_id(transcript.meeting_id)
|
await mc.get_by_id(transcript.meeting_id)
|
||||||
@@ -546,12 +544,6 @@ async def process_tracks(input: PipelineInput, ctx: Context) -> ProcessTracksRes
|
|||||||
# OGG files don't have embedded start_time metadata, so we pre-calculate.
|
# OGG files don't have embedded start_time metadata, so we pre-calculate.
|
||||||
track_padding: dict[int, float] = {}
|
track_padding: dict[int, float] = {}
|
||||||
if input.source_platform == "livekit":
|
if input.source_platform == "livekit":
|
||||||
from datetime import datetime # noqa: PLC0415
|
|
||||||
|
|
||||||
from reflector.utils.livekit import (
|
|
||||||
parse_livekit_track_filepath, # noqa: PLC0415
|
|
||||||
)
|
|
||||||
|
|
||||||
timestamps = []
|
timestamps = []
|
||||||
for i, track in enumerate(input.tracks):
|
for i, track in enumerate(input.tracks):
|
||||||
ts_str = track.get("timestamp")
|
ts_str = track.get("timestamp")
|
||||||
@@ -1073,10 +1065,9 @@ async def extract_subjects(input: PipelineInput, ctx: Context) -> SubjectsResult
|
|||||||
participant_name_to_id={},
|
participant_name_to_id={},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
# Deferred DB import: Hatchet workers fork processes, fresh imports avoid
|
||||||
# sharing DB connections and LLM HTTP pools across forks
|
# sharing DB connections across forks
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||||
from reflector.llm import LLM # noqa: PLC0415
|
|
||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||||
@@ -1206,14 +1197,13 @@ async def generate_recap(input: PipelineInput, ctx: Context) -> RecapResult:
|
|||||||
subjects_result = ctx.task_output(extract_subjects)
|
subjects_result = ctx.task_output(extract_subjects)
|
||||||
process_result = ctx.task_output(process_subjects)
|
process_result = ctx.task_output(process_subjects)
|
||||||
|
|
||||||
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
# Deferred DB import: Hatchet workers fork processes, fresh imports avoid
|
||||||
# sharing DB connections and LLM HTTP pools across forks
|
# sharing DB connections across forks
|
||||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
from reflector.db.transcripts import ( # noqa: PLC0415
|
||||||
TranscriptFinalLongSummary,
|
TranscriptFinalLongSummary,
|
||||||
TranscriptFinalShortSummary,
|
TranscriptFinalShortSummary,
|
||||||
transcripts_controller,
|
transcripts_controller,
|
||||||
)
|
)
|
||||||
from reflector.llm import LLM # noqa: PLC0415
|
|
||||||
|
|
||||||
subject_summaries = process_result.subject_summaries
|
subject_summaries = process_result.subject_summaries
|
||||||
|
|
||||||
@@ -1302,13 +1292,12 @@ async def identify_action_items(
|
|||||||
ctx.log("identify_action_items: no transcript text, returning empty")
|
ctx.log("identify_action_items: no transcript text, returning empty")
|
||||||
return ActionItemsResult(action_items=ActionItemsResponse())
|
return ActionItemsResult(action_items=ActionItemsResponse())
|
||||||
|
|
||||||
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
# Deferred DB import: Hatchet workers fork processes, fresh imports avoid
|
||||||
# sharing DB connections and LLM HTTP pools across forks
|
# sharing DB connections across forks
|
||||||
from reflector.db.transcripts import ( # noqa: PLC0415
|
from reflector.db.transcripts import ( # noqa: PLC0415
|
||||||
TranscriptActionItems,
|
TranscriptActionItems,
|
||||||
transcripts_controller,
|
transcripts_controller,
|
||||||
)
|
)
|
||||||
from reflector.llm import LLM # noqa: PLC0415
|
|
||||||
|
|
||||||
# TODO: refactor SummaryBuilder methods into standalone functions
|
# TODO: refactor SummaryBuilder methods into standalone functions
|
||||||
llm = LLM(settings=settings)
|
llm = LLM(settings=settings)
|
||||||
@@ -1445,10 +1434,6 @@ async def cleanup_consent(input: PipelineInput, ctx: Context) -> ConsentResult:
|
|||||||
)
|
)
|
||||||
from reflector.db.recordings import recordings_controller # noqa: PLC0415
|
from reflector.db.recordings import recordings_controller # noqa: PLC0415
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||||
from reflector.storage import ( # noqa: PLC0415
|
|
||||||
get_source_storage,
|
|
||||||
get_transcripts_storage,
|
|
||||||
)
|
|
||||||
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||||
if not transcript:
|
if not transcript:
|
||||||
@@ -1597,10 +1582,6 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
|
|||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||||
from reflector.utils.webhook import ( # noqa: PLC0415
|
|
||||||
fetch_transcript_webhook_payload,
|
|
||||||
send_webhook_request,
|
|
||||||
)
|
|
||||||
|
|
||||||
room = await rooms_controller.get_by_id(input.room_id)
|
room = await rooms_controller.get_by_id(input.room_id)
|
||||||
if not room or not room.webhook_url:
|
if not room or not room.webhook_url:
|
||||||
|
|||||||
@@ -15,6 +15,8 @@ import json
|
|||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
|
import av
|
||||||
|
import httpx
|
||||||
from hatchet_sdk import Context
|
from hatchet_sdk import Context
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
@@ -47,9 +49,30 @@ from reflector.hatchet.workflows.models import (
|
|||||||
)
|
)
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.pipelines import topic_processing
|
from reflector.pipelines import topic_processing
|
||||||
|
from reflector.pipelines.transcription_helpers import transcribe_file_with_processor
|
||||||
|
from reflector.processors import AudioFileWriterProcessor
|
||||||
|
from reflector.processors.file_diarization import FileDiarizationInput
|
||||||
|
from reflector.processors.file_diarization_auto import FileDiarizationAutoProcessor
|
||||||
|
from reflector.processors.transcript_diarization_assembler import (
|
||||||
|
TranscriptDiarizationAssemblerInput,
|
||||||
|
TranscriptDiarizationAssemblerProcessor,
|
||||||
|
)
|
||||||
|
from reflector.processors.types import (
|
||||||
|
DiarizationSegment,
|
||||||
|
Word,
|
||||||
|
)
|
||||||
|
from reflector.processors.types import (
|
||||||
|
Transcript as TranscriptType,
|
||||||
|
)
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
|
from reflector.storage import get_source_storage, get_transcripts_storage
|
||||||
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
|
from reflector.utils.audio_constants import WAVEFORM_SEGMENTS
|
||||||
from reflector.utils.audio_waveform import get_audio_waveform
|
from reflector.utils.audio_waveform import get_audio_waveform
|
||||||
|
from reflector.utils.webhook import (
|
||||||
|
fetch_transcript_webhook_payload,
|
||||||
|
send_webhook_request,
|
||||||
|
)
|
||||||
|
from reflector.zulip import post_transcript_notification
|
||||||
|
|
||||||
|
|
||||||
class FilePipelineInput(BaseModel):
|
class FilePipelineInput(BaseModel):
|
||||||
@@ -135,10 +158,6 @@ async def extract_audio(input: FilePipelineInput, ctx: Context) -> ExtractAudioR
|
|||||||
ctx.log(f"extract_audio: processing {audio_file}")
|
ctx.log(f"extract_audio: processing {audio_file}")
|
||||||
|
|
||||||
# Extract audio and write as MP3
|
# Extract audio and write as MP3
|
||||||
import av # noqa: PLC0415
|
|
||||||
|
|
||||||
from reflector.processors import AudioFileWriterProcessor # noqa: PLC0415
|
|
||||||
|
|
||||||
duration_ms_container = [0.0]
|
duration_ms_container = [0.0]
|
||||||
|
|
||||||
async def capture_duration(d):
|
async def capture_duration(d):
|
||||||
@@ -189,8 +208,6 @@ async def upload_audio(input: FilePipelineInput, ctx: Context) -> UploadAudioRes
|
|||||||
extract_result = ctx.task_output(extract_audio)
|
extract_result = ctx.task_output(extract_audio)
|
||||||
audio_path = extract_result.audio_path
|
audio_path = extract_result.audio_path
|
||||||
|
|
||||||
from reflector.storage import get_transcripts_storage # noqa: PLC0415
|
|
||||||
|
|
||||||
storage = get_transcripts_storage()
|
storage = get_transcripts_storage()
|
||||||
if not storage:
|
if not storage:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
@@ -232,10 +249,6 @@ async def transcribe(input: FilePipelineInput, ctx: Context) -> TranscribeResult
|
|||||||
raise ValueError(f"Transcript {input.transcript_id} not found")
|
raise ValueError(f"Transcript {input.transcript_id} not found")
|
||||||
source_language = transcript.source_language
|
source_language = transcript.source_language
|
||||||
|
|
||||||
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
|
|
||||||
transcribe_file_with_processor,
|
|
||||||
)
|
|
||||||
|
|
||||||
result = await transcribe_file_with_processor(audio_url, source_language)
|
result = await transcribe_file_with_processor(audio_url, source_language)
|
||||||
|
|
||||||
ctx.log(f"transcribe complete: {len(result.words)} words")
|
ctx.log(f"transcribe complete: {len(result.words)} words")
|
||||||
@@ -264,13 +277,6 @@ async def diarize(input: FilePipelineInput, ctx: Context) -> DiarizeResult:
|
|||||||
upload_result = ctx.task_output(upload_audio)
|
upload_result = ctx.task_output(upload_audio)
|
||||||
audio_url = upload_result.audio_url
|
audio_url = upload_result.audio_url
|
||||||
|
|
||||||
from reflector.processors.file_diarization import ( # noqa: PLC0415
|
|
||||||
FileDiarizationInput,
|
|
||||||
)
|
|
||||||
from reflector.processors.file_diarization_auto import ( # noqa: PLC0415
|
|
||||||
FileDiarizationAutoProcessor,
|
|
||||||
)
|
|
||||||
|
|
||||||
processor = FileDiarizationAutoProcessor()
|
processor = FileDiarizationAutoProcessor()
|
||||||
input_data = FileDiarizationInput(audio_url=audio_url)
|
input_data = FileDiarizationInput(audio_url=audio_url)
|
||||||
|
|
||||||
@@ -353,18 +359,6 @@ async def assemble_transcript(
|
|||||||
transcribe_result = ctx.task_output(transcribe)
|
transcribe_result = ctx.task_output(transcribe)
|
||||||
diarize_result = ctx.task_output(diarize)
|
diarize_result = ctx.task_output(diarize)
|
||||||
|
|
||||||
from reflector.processors.transcript_diarization_assembler import ( # noqa: PLC0415
|
|
||||||
TranscriptDiarizationAssemblerInput,
|
|
||||||
TranscriptDiarizationAssemblerProcessor,
|
|
||||||
)
|
|
||||||
from reflector.processors.types import ( # noqa: PLC0415
|
|
||||||
DiarizationSegment,
|
|
||||||
Word,
|
|
||||||
)
|
|
||||||
from reflector.processors.types import ( # noqa: PLC0415
|
|
||||||
Transcript as TranscriptType,
|
|
||||||
)
|
|
||||||
|
|
||||||
words = [Word(**w) for w in transcribe_result.words]
|
words = [Word(**w) for w in transcribe_result.words]
|
||||||
transcript_data = TranscriptType(
|
transcript_data = TranscriptType(
|
||||||
words=words, translation=transcribe_result.translation
|
words=words, translation=transcribe_result.translation
|
||||||
@@ -437,17 +431,6 @@ async def detect_topics(input: FilePipelineInput, ctx: Context) -> TopicsResult:
|
|||||||
TranscriptTopic,
|
TranscriptTopic,
|
||||||
transcripts_controller,
|
transcripts_controller,
|
||||||
)
|
)
|
||||||
from reflector.processors.transcript_diarization_assembler import ( # noqa: PLC0415
|
|
||||||
TranscriptDiarizationAssemblerInput,
|
|
||||||
TranscriptDiarizationAssemblerProcessor,
|
|
||||||
)
|
|
||||||
from reflector.processors.types import ( # noqa: PLC0415
|
|
||||||
DiarizationSegment,
|
|
||||||
Word,
|
|
||||||
)
|
|
||||||
from reflector.processors.types import ( # noqa: PLC0415
|
|
||||||
Transcript as TranscriptType,
|
|
||||||
)
|
|
||||||
|
|
||||||
words = [Word(**w) for w in transcribe_result.words]
|
words = [Word(**w) for w in transcribe_result.words]
|
||||||
transcript_data = TranscriptType(
|
transcript_data = TranscriptType(
|
||||||
@@ -688,10 +671,6 @@ async def cleanup_consent(input: FilePipelineInput, ctx: Context) -> ConsentResu
|
|||||||
)
|
)
|
||||||
from reflector.db.recordings import recordings_controller # noqa: PLC0415
|
from reflector.db.recordings import recordings_controller # noqa: PLC0415
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||||
from reflector.storage import ( # noqa: PLC0415
|
|
||||||
get_source_storage,
|
|
||||||
get_transcripts_storage,
|
|
||||||
)
|
|
||||||
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||||
if not transcript:
|
if not transcript:
|
||||||
@@ -807,7 +786,6 @@ async def post_zulip(input: FilePipelineInput, ctx: Context) -> ZulipResult:
|
|||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||||
from reflector.zulip import post_transcript_notification # noqa: PLC0415
|
|
||||||
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||||
if transcript:
|
if transcript:
|
||||||
@@ -837,10 +815,6 @@ async def send_webhook(input: FilePipelineInput, ctx: Context) -> WebhookResult:
|
|||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||||
from reflector.utils.webhook import ( # noqa: PLC0415
|
|
||||||
fetch_transcript_webhook_payload,
|
|
||||||
send_webhook_request,
|
|
||||||
)
|
|
||||||
|
|
||||||
room = await rooms_controller.get_by_id(input.room_id)
|
room = await rooms_controller.get_by_id(input.room_id)
|
||||||
if not room or not room.webhook_url:
|
if not room or not room.webhook_url:
|
||||||
@@ -856,8 +830,6 @@ async def send_webhook(input: FilePipelineInput, ctx: Context) -> WebhookResult:
|
|||||||
ctx.log(f"send_webhook skipped (could not build payload): {payload}")
|
ctx.log(f"send_webhook skipped (could not build payload): {payload}")
|
||||||
return WebhookResult(webhook_sent=False, skipped=True)
|
return WebhookResult(webhook_sent=False, skipped=True)
|
||||||
|
|
||||||
import httpx # noqa: PLC0415
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = await send_webhook_request(
|
response = await send_webhook_request(
|
||||||
url=room.webhook_url,
|
url=room.webhook_url,
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ are not shared across forks, avoiding connection pooling issues.
|
|||||||
|
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
|
import httpx
|
||||||
from hatchet_sdk import Context
|
from hatchet_sdk import Context
|
||||||
from pydantic import BaseModel
|
from pydantic import BaseModel
|
||||||
|
|
||||||
@@ -40,7 +41,24 @@ from reflector.hatchet.workflows.models import (
|
|||||||
ZulipResult,
|
ZulipResult,
|
||||||
)
|
)
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
|
from reflector.pipelines.main_live_pipeline import (
|
||||||
|
PipelineMainTitle,
|
||||||
|
PipelineMainWaveform,
|
||||||
|
pipeline_convert_to_mp3,
|
||||||
|
pipeline_diarization,
|
||||||
|
pipeline_remove_upload,
|
||||||
|
pipeline_summaries,
|
||||||
|
pipeline_upload_mp3,
|
||||||
|
)
|
||||||
|
from reflector.pipelines.main_live_pipeline import (
|
||||||
|
cleanup_consent as _cleanup_consent,
|
||||||
|
)
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
|
from reflector.utils.webhook import (
|
||||||
|
fetch_transcript_webhook_payload,
|
||||||
|
send_webhook_request,
|
||||||
|
)
|
||||||
|
from reflector.zulip import post_transcript_notification
|
||||||
|
|
||||||
|
|
||||||
class LivePostPipelineInput(BaseModel):
|
class LivePostPipelineInput(BaseModel):
|
||||||
@@ -91,9 +109,6 @@ async def waveform(input: LivePostPipelineInput, ctx: Context) -> WaveformResult
|
|||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
|
||||||
PipelineMainWaveform,
|
|
||||||
)
|
|
||||||
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||||
if not transcript:
|
if not transcript:
|
||||||
@@ -118,10 +133,6 @@ async def generate_title(input: LivePostPipelineInput, ctx: Context) -> TitleRes
|
|||||||
ctx.log(f"generate_title: starting for transcript_id={input.transcript_id}")
|
ctx.log(f"generate_title: starting for transcript_id={input.transcript_id}")
|
||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
|
||||||
PipelineMainTitle,
|
|
||||||
)
|
|
||||||
|
|
||||||
runner = PipelineMainTitle(transcript_id=input.transcript_id)
|
runner = PipelineMainTitle(transcript_id=input.transcript_id)
|
||||||
await runner.run()
|
await runner.run()
|
||||||
|
|
||||||
@@ -142,10 +153,6 @@ async def convert_mp3(input: LivePostPipelineInput, ctx: Context) -> ConvertMp3R
|
|||||||
ctx.log(f"convert_mp3: starting for transcript_id={input.transcript_id}")
|
ctx.log(f"convert_mp3: starting for transcript_id={input.transcript_id}")
|
||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
|
||||||
pipeline_convert_to_mp3,
|
|
||||||
)
|
|
||||||
|
|
||||||
await pipeline_convert_to_mp3(transcript_id=input.transcript_id)
|
await pipeline_convert_to_mp3(transcript_id=input.transcript_id)
|
||||||
|
|
||||||
ctx.log("convert_mp3 complete")
|
ctx.log("convert_mp3 complete")
|
||||||
@@ -165,10 +172,6 @@ async def upload_mp3(input: LivePostPipelineInput, ctx: Context) -> UploadMp3Res
|
|||||||
ctx.log(f"upload_mp3: starting for transcript_id={input.transcript_id}")
|
ctx.log(f"upload_mp3: starting for transcript_id={input.transcript_id}")
|
||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
|
||||||
pipeline_upload_mp3,
|
|
||||||
)
|
|
||||||
|
|
||||||
await pipeline_upload_mp3(transcript_id=input.transcript_id)
|
await pipeline_upload_mp3(transcript_id=input.transcript_id)
|
||||||
|
|
||||||
ctx.log("upload_mp3 complete")
|
ctx.log("upload_mp3 complete")
|
||||||
@@ -190,10 +193,6 @@ async def remove_upload(
|
|||||||
ctx.log(f"remove_upload: starting for transcript_id={input.transcript_id}")
|
ctx.log(f"remove_upload: starting for transcript_id={input.transcript_id}")
|
||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
|
||||||
pipeline_remove_upload,
|
|
||||||
)
|
|
||||||
|
|
||||||
await pipeline_remove_upload(transcript_id=input.transcript_id)
|
await pipeline_remove_upload(transcript_id=input.transcript_id)
|
||||||
|
|
||||||
ctx.log("remove_upload complete")
|
ctx.log("remove_upload complete")
|
||||||
@@ -213,10 +212,6 @@ async def diarize(input: LivePostPipelineInput, ctx: Context) -> DiarizeResult:
|
|||||||
ctx.log(f"diarize: starting for transcript_id={input.transcript_id}")
|
ctx.log(f"diarize: starting for transcript_id={input.transcript_id}")
|
||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
|
||||||
pipeline_diarization,
|
|
||||||
)
|
|
||||||
|
|
||||||
await pipeline_diarization(transcript_id=input.transcript_id)
|
await pipeline_diarization(transcript_id=input.transcript_id)
|
||||||
|
|
||||||
ctx.log("diarize complete")
|
ctx.log("diarize complete")
|
||||||
@@ -236,10 +231,6 @@ async def cleanup_consent(input: LivePostPipelineInput, ctx: Context) -> Consent
|
|||||||
ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}")
|
ctx.log(f"cleanup_consent: transcript_id={input.transcript_id}")
|
||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
|
||||||
cleanup_consent as _cleanup_consent,
|
|
||||||
)
|
|
||||||
|
|
||||||
await _cleanup_consent(transcript_id=input.transcript_id)
|
await _cleanup_consent(transcript_id=input.transcript_id)
|
||||||
|
|
||||||
ctx.log("cleanup_consent complete")
|
ctx.log("cleanup_consent complete")
|
||||||
@@ -261,10 +252,6 @@ async def final_summaries(
|
|||||||
ctx.log(f"final_summaries: starting for transcript_id={input.transcript_id}")
|
ctx.log(f"final_summaries: starting for transcript_id={input.transcript_id}")
|
||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.pipelines.main_live_pipeline import ( # noqa: PLC0415
|
|
||||||
pipeline_summaries,
|
|
||||||
)
|
|
||||||
|
|
||||||
await pipeline_summaries(transcript_id=input.transcript_id)
|
await pipeline_summaries(transcript_id=input.transcript_id)
|
||||||
|
|
||||||
ctx.log("final_summaries complete")
|
ctx.log("final_summaries complete")
|
||||||
@@ -289,7 +276,6 @@ async def post_zulip(input: LivePostPipelineInput, ctx: Context) -> ZulipResult:
|
|||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
from reflector.db.transcripts import transcripts_controller # noqa: PLC0415
|
||||||
from reflector.zulip import post_transcript_notification # noqa: PLC0415
|
|
||||||
|
|
||||||
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
transcript = await transcripts_controller.get_by_id(input.transcript_id)
|
||||||
if transcript:
|
if transcript:
|
||||||
@@ -319,10 +305,6 @@ async def send_webhook(input: LivePostPipelineInput, ctx: Context) -> WebhookRes
|
|||||||
|
|
||||||
async with fresh_db_connection():
|
async with fresh_db_connection():
|
||||||
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
from reflector.db.rooms import rooms_controller # noqa: PLC0415
|
||||||
from reflector.utils.webhook import ( # noqa: PLC0415
|
|
||||||
fetch_transcript_webhook_payload,
|
|
||||||
send_webhook_request,
|
|
||||||
)
|
|
||||||
|
|
||||||
room = await rooms_controller.get_by_id(input.room_id)
|
room = await rooms_controller.get_by_id(input.room_id)
|
||||||
if not room or not room.webhook_url:
|
if not room or not room.webhook_url:
|
||||||
@@ -338,8 +320,6 @@ async def send_webhook(input: LivePostPipelineInput, ctx: Context) -> WebhookRes
|
|||||||
ctx.log(f"send_webhook skipped (could not build payload): {payload}")
|
ctx.log(f"send_webhook skipped (could not build payload): {payload}")
|
||||||
return WebhookResult(webhook_sent=False, skipped=True)
|
return WebhookResult(webhook_sent=False, skipped=True)
|
||||||
|
|
||||||
import httpx # noqa: PLC0415
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
response = await send_webhook_request(
|
response = await send_webhook_request(
|
||||||
url=room.webhook_url,
|
url=room.webhook_url,
|
||||||
|
|||||||
@@ -13,6 +13,8 @@ from reflector.hatchet.client import HatchetClientManager
|
|||||||
from reflector.hatchet.constants import TIMEOUT_AUDIO
|
from reflector.hatchet.constants import TIMEOUT_AUDIO
|
||||||
from reflector.hatchet.workflows.models import PadTrackResult
|
from reflector.hatchet.workflows.models import PadTrackResult
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
|
from reflector.processors.audio_padding_auto import AudioPaddingAutoProcessor
|
||||||
|
from reflector.storage import get_source_storage, get_transcripts_storage
|
||||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
||||||
from reflector.utils.audio_padding import extract_stream_start_time_from_container
|
from reflector.utils.audio_padding import extract_stream_start_time_from_container
|
||||||
|
|
||||||
@@ -51,11 +53,6 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
|
|||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from reflector.storage import ( # noqa: PLC0415
|
|
||||||
get_source_storage,
|
|
||||||
get_transcripts_storage,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Source reads: use platform-specific credentials
|
# Source reads: use platform-specific credentials
|
||||||
source_storage = get_source_storage(input.source_platform)
|
source_storage = get_source_storage(input.source_platform)
|
||||||
source_url = await source_storage.get_file_url(
|
source_url = await source_storage.get_file_url(
|
||||||
@@ -104,10 +101,6 @@ async def pad_track(input: PaddingInput, ctx: Context) -> PadTrackResult:
|
|||||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
||||||
)
|
)
|
||||||
|
|
||||||
from reflector.processors.audio_padding_auto import ( # noqa: PLC0415
|
|
||||||
AudioPaddingAutoProcessor,
|
|
||||||
)
|
|
||||||
|
|
||||||
processor = AudioPaddingAutoProcessor()
|
processor = AudioPaddingAutoProcessor()
|
||||||
result = await processor.pad_track(
|
result = await processor.pad_track(
|
||||||
track_url=source_url,
|
track_url=source_url,
|
||||||
|
|||||||
@@ -15,12 +15,14 @@ from pydantic import BaseModel
|
|||||||
from reflector.hatchet.client import HatchetClientManager
|
from reflector.hatchet.client import HatchetClientManager
|
||||||
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_HEAVY
|
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_HEAVY
|
||||||
from reflector.hatchet.workflows.models import SubjectSummaryResult
|
from reflector.hatchet.workflows.models import SubjectSummaryResult
|
||||||
|
from reflector.llm import LLM
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.processors.summary.prompts import (
|
from reflector.processors.summary.prompts import (
|
||||||
DETAILED_SUBJECT_PROMPT_TEMPLATE,
|
DETAILED_SUBJECT_PROMPT_TEMPLATE,
|
||||||
PARAGRAPH_SUMMARY_PROMPT,
|
PARAGRAPH_SUMMARY_PROMPT,
|
||||||
build_participant_instructions,
|
build_participant_instructions,
|
||||||
)
|
)
|
||||||
|
from reflector.settings import settings
|
||||||
|
|
||||||
|
|
||||||
class SubjectInput(BaseModel):
|
class SubjectInput(BaseModel):
|
||||||
@@ -60,11 +62,6 @@ async def generate_detailed_summary(
|
|||||||
subject_index=input.subject_index,
|
subject_index=input.subject_index,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Deferred imports: Hatchet workers fork processes, fresh imports ensure
|
|
||||||
# LLM HTTP connection pools aren't shared across forks
|
|
||||||
from reflector.llm import LLM # noqa: PLC0415
|
|
||||||
from reflector.settings import settings # noqa: PLC0415
|
|
||||||
|
|
||||||
llm = LLM(settings=settings)
|
llm = LLM(settings=settings)
|
||||||
|
|
||||||
participant_instructions = build_participant_instructions(input.participant_names)
|
participant_instructions = build_participant_instructions(input.participant_names)
|
||||||
|
|||||||
@@ -18,9 +18,13 @@ from pydantic import BaseModel
|
|||||||
from reflector.hatchet.client import HatchetClientManager
|
from reflector.hatchet.client import HatchetClientManager
|
||||||
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
|
from reflector.hatchet.constants import LLM_RATE_LIMIT_KEY, TIMEOUT_MEDIUM
|
||||||
from reflector.hatchet.workflows.models import TopicChunkResult
|
from reflector.hatchet.workflows.models import TopicChunkResult
|
||||||
|
from reflector.llm import LLM
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.processors.prompts import TOPIC_PROMPT
|
from reflector.processors.prompts import TOPIC_PROMPT
|
||||||
|
from reflector.processors.transcript_topic_detector import TopicResponse
|
||||||
from reflector.processors.types import Word
|
from reflector.processors.types import Word
|
||||||
|
from reflector.settings import settings
|
||||||
|
from reflector.utils.text import clean_title
|
||||||
|
|
||||||
|
|
||||||
class TopicChunkInput(BaseModel):
|
class TopicChunkInput(BaseModel):
|
||||||
@@ -64,15 +68,6 @@ async def detect_chunk_topic(input: TopicChunkInput, ctx: Context) -> TopicChunk
|
|||||||
text_length=len(input.chunk_text),
|
text_length=len(input.chunk_text),
|
||||||
)
|
)
|
||||||
|
|
||||||
# Deferred imports: Hatchet workers fork processes, fresh imports avoid
|
|
||||||
# sharing LLM HTTP connection pools across forks
|
|
||||||
from reflector.llm import LLM # noqa: PLC0415
|
|
||||||
from reflector.processors.transcript_topic_detector import ( # noqa: PLC0415
|
|
||||||
TopicResponse,
|
|
||||||
)
|
|
||||||
from reflector.settings import settings # noqa: PLC0415
|
|
||||||
from reflector.utils.text import clean_title # noqa: PLC0415
|
|
||||||
|
|
||||||
llm = LLM(settings=settings, temperature=0.9)
|
llm = LLM(settings=settings, temperature=0.9)
|
||||||
|
|
||||||
prompt = TOPIC_PROMPT.format(text=input.chunk_text)
|
prompt = TOPIC_PROMPT.format(text=input.chunk_text)
|
||||||
|
|||||||
@@ -9,9 +9,9 @@ because Hatchet workflow DAGs are defined statically, but the number of tracks v
|
|||||||
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
|
at runtime. Child workflow spawning via `aio_run()` + `asyncio.gather()` is the
|
||||||
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
|
standard pattern for dynamic fan-out. See `process_tracks` in daily_multitrack_pipeline.py.
|
||||||
|
|
||||||
Note: This file uses deferred imports (inside tasks) intentionally.
|
Note: DB imports (reflector.db.*) are kept inline (deferred) intentionally.
|
||||||
Hatchet workers run in forked processes; fresh imports per task ensure
|
Hatchet workers run in forked processes; fresh imports per task ensure
|
||||||
storage/DB connections are not shared across forks.
|
DB connections are not shared across forks.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
@@ -24,6 +24,9 @@ from reflector.hatchet.client import HatchetClientManager
|
|||||||
from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
|
from reflector.hatchet.constants import TIMEOUT_AUDIO, TIMEOUT_HEAVY
|
||||||
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
|
from reflector.hatchet.workflows.models import PadTrackResult, TranscribeTrackResult
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
|
from reflector.pipelines.transcription_helpers import transcribe_file_with_processor
|
||||||
|
from reflector.processors.audio_padding_auto import AudioPaddingAutoProcessor
|
||||||
|
from reflector.storage import get_source_storage, get_transcripts_storage
|
||||||
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
from reflector.utils.audio_constants import PRESIGNED_URL_EXPIRATION_SECONDS
|
||||||
from reflector.utils.audio_padding import extract_stream_start_time_from_container
|
from reflector.utils.audio_padding import extract_stream_start_time_from_container
|
||||||
|
|
||||||
@@ -72,11 +75,6 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
|||||||
)
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from reflector.storage import ( # noqa: PLC0415
|
|
||||||
get_source_storage,
|
|
||||||
get_transcripts_storage,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Source reads: use platform-specific credentials
|
# Source reads: use platform-specific credentials
|
||||||
source_storage = get_source_storage(input.source_platform)
|
source_storage = get_source_storage(input.source_platform)
|
||||||
source_url = await source_storage.get_file_url(
|
source_url = await source_storage.get_file_url(
|
||||||
@@ -120,10 +118,6 @@ async def pad_track(input: TrackInput, ctx: Context) -> PadTrackResult:
|
|||||||
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
expires_in=PRESIGNED_URL_EXPIRATION_SECONDS,
|
||||||
)
|
)
|
||||||
|
|
||||||
from reflector.processors.audio_padding_auto import ( # noqa: PLC0415
|
|
||||||
AudioPaddingAutoProcessor,
|
|
||||||
)
|
|
||||||
|
|
||||||
processor = AudioPaddingAutoProcessor()
|
processor = AudioPaddingAutoProcessor()
|
||||||
result = await processor.pad_track(
|
result = await processor.pad_track(
|
||||||
track_url=source_url,
|
track_url=source_url,
|
||||||
@@ -179,11 +173,6 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
|
|||||||
raise ValueError("Missing padded_key from pad_track")
|
raise ValueError("Missing padded_key from pad_track")
|
||||||
|
|
||||||
# Presign URL on demand (avoids stale URLs on workflow replay)
|
# Presign URL on demand (avoids stale URLs on workflow replay)
|
||||||
from reflector.storage import ( # noqa: PLC0415
|
|
||||||
get_source_storage,
|
|
||||||
get_transcripts_storage,
|
|
||||||
)
|
|
||||||
|
|
||||||
# If bucket_name is set, file is still in the platform's source bucket (no padding applied).
|
# If bucket_name is set, file is still in the platform's source bucket (no padding applied).
|
||||||
# If bucket_name is None, padded file was written to our transcript storage.
|
# If bucket_name is None, padded file was written to our transcript storage.
|
||||||
if bucket_name:
|
if bucket_name:
|
||||||
@@ -198,10 +187,6 @@ async def transcribe_track(input: TrackInput, ctx: Context) -> TranscribeTrackRe
|
|||||||
bucket=bucket_name,
|
bucket=bucket_name,
|
||||||
)
|
)
|
||||||
|
|
||||||
from reflector.pipelines.transcription_helpers import ( # noqa: PLC0415
|
|
||||||
transcribe_file_with_processor,
|
|
||||||
)
|
|
||||||
|
|
||||||
transcript = await transcribe_file_with_processor(audio_url, input.language)
|
transcript = await transcribe_file_with_processor(audio_url, input.language)
|
||||||
|
|
||||||
# Tag all words with speaker index
|
# Tag all words with speaker index
|
||||||
|
|||||||
@@ -38,6 +38,7 @@ from reflector.db.transcripts import (
|
|||||||
TranscriptWaveform,
|
TranscriptWaveform,
|
||||||
transcripts_controller,
|
transcripts_controller,
|
||||||
)
|
)
|
||||||
|
from reflector.hatchet.client import HatchetClientManager
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.pipelines.runner import PipelineMessage, PipelineRunner
|
from reflector.pipelines.runner import PipelineMessage, PipelineRunner
|
||||||
from reflector.processors import (
|
from reflector.processors import (
|
||||||
@@ -814,8 +815,6 @@ async def pipeline_post(*, transcript_id: str, room_id: str | None = None):
|
|||||||
"""
|
"""
|
||||||
Run the post pipeline via Hatchet.
|
Run the post pipeline via Hatchet.
|
||||||
"""
|
"""
|
||||||
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
|
|
||||||
|
|
||||||
await HatchetClientManager.start_workflow(
|
await HatchetClientManager.start_workflow(
|
||||||
"LivePostProcessingPipeline",
|
"LivePostProcessingPipeline",
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import os
|
|||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
import av
|
import av
|
||||||
|
import requests
|
||||||
|
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.processors.audio_padding import AudioPaddingProcessor, PaddingResponse
|
from reflector.processors.audio_padding import AudioPaddingProcessor, PaddingResponse
|
||||||
@@ -65,8 +66,6 @@ class AudioPaddingPyavProcessor(AudioPaddingProcessor):
|
|||||||
track_index: int,
|
track_index: int,
|
||||||
) -> PaddingResponse:
|
) -> PaddingResponse:
|
||||||
"""Blocking padding work: download, pad with PyAV, upload."""
|
"""Blocking padding work: download, pad with PyAV, upload."""
|
||||||
import requests
|
|
||||||
|
|
||||||
log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds)
|
log = logger.bind(track_index=track_index, padding_seconds=start_time_seconds)
|
||||||
temp_dir = tempfile.mkdtemp()
|
temp_dir = tempfile.mkdtemp()
|
||||||
input_path = None
|
input_path = None
|
||||||
|
|||||||
@@ -333,7 +333,9 @@ if __name__ == "__main__":
|
|||||||
if not s3_urls:
|
if not s3_urls:
|
||||||
parser.error("At least one S3 URL required for multitrack processing")
|
parser.error("At least one S3 URL required for multitrack processing")
|
||||||
|
|
||||||
from reflector.tools.cli_multitrack import process_multitrack_cli
|
from reflector.tools.cli_multitrack import (
|
||||||
|
process_multitrack_cli, # circular import
|
||||||
|
)
|
||||||
|
|
||||||
asyncio.run(
|
asyncio.run(
|
||||||
process_multitrack_cli(
|
process_multitrack_cli(
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ This tools help to either create a pipeline from command line,
|
|||||||
or read a yaml description of a pipeline and run it.
|
or read a yaml description of a pipeline and run it.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import importlib
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
@@ -37,8 +38,6 @@ def get_jsonl(filename, filter_processor_name=None):
|
|||||||
|
|
||||||
|
|
||||||
def get_processor(name):
|
def get_processor(name):
|
||||||
import importlib
|
|
||||||
|
|
||||||
module_name = f"reflector.processors.{name}"
|
module_name = f"reflector.processors.{name}"
|
||||||
class_name = snake_to_camel(name) + "Processor"
|
class_name = snake_to_camel(name) + "Processor"
|
||||||
module = importlib.import_module(module_name)
|
module = importlib.import_module(module_name)
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ from reflector.dailyco_api import (
|
|||||||
from reflector.db.meetings import meetings_controller
|
from reflector.db.meetings import meetings_controller
|
||||||
from reflector.logger import logger as _logger
|
from reflector.logger import logger as _logger
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
|
from reflector.storage import get_source_storage
|
||||||
from reflector.video_platforms.factory import create_platform_client
|
from reflector.video_platforms.factory import create_platform_client
|
||||||
from reflector.worker.process import (
|
from reflector.worker.process import (
|
||||||
poll_daily_room_presence_task,
|
poll_daily_room_presence_task,
|
||||||
@@ -226,8 +227,6 @@ async def _handle_recording_ready(event: RecordingReadyEvent):
|
|||||||
if video_track_keys:
|
if video_track_keys:
|
||||||
meeting = await meetings_controller.get_by_room_name(room_name)
|
meeting = await meetings_controller.get_by_room_name(room_name)
|
||||||
if meeting is not None and not meeting.store_video:
|
if meeting is not None and not meeting.store_video:
|
||||||
from reflector.storage import get_source_storage
|
|
||||||
|
|
||||||
storage = get_source_storage("daily")
|
storage = get_source_storage("daily")
|
||||||
for video_key in video_track_keys:
|
for video_key in video_track_keys:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ from reflector.db.meetings import meetings_controller
|
|||||||
from reflector.livekit_api.webhooks import create_webhook_receiver, verify_webhook
|
from reflector.livekit_api.webhooks import create_webhook_receiver, verify_webhook
|
||||||
from reflector.logger import logger as _logger
|
from reflector.logger import logger as _logger
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
|
from reflector.storage import get_source_storage
|
||||||
|
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
|
|
||||||
@@ -189,8 +190,6 @@ async def _handle_egress_ended(event):
|
|||||||
filename = file_result.filename
|
filename = file_result.filename
|
||||||
if filename and filename.endswith(".webm"):
|
if filename and filename.endswith(".webm"):
|
||||||
try:
|
try:
|
||||||
from reflector.storage import get_source_storage # noqa: PLC0415
|
|
||||||
|
|
||||||
storage = get_source_storage("livekit")
|
storage = get_source_storage("livekit")
|
||||||
await storage.delete_file(filename)
|
await storage.delete_file(filename)
|
||||||
logger.info(
|
logger.info(
|
||||||
|
|||||||
@@ -1,4 +1,6 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import re
|
||||||
|
import uuid
|
||||||
from datetime import datetime, timedelta, timezone
|
from datetime import datetime, timedelta, timezone
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import Annotated, Any, Literal, Optional
|
from typing import Annotated, Any, Literal, Optional
|
||||||
@@ -14,7 +16,7 @@ from reflector.db import get_database
|
|||||||
from reflector.db.calendar_events import calendar_events_controller
|
from reflector.db.calendar_events import calendar_events_controller
|
||||||
from reflector.db.meetings import meetings_controller
|
from reflector.db.meetings import meetings_controller
|
||||||
from reflector.db.rooms import rooms_controller
|
from reflector.db.rooms import rooms_controller
|
||||||
from reflector.redis_cache import RedisAsyncLock
|
from reflector.redis_cache import RedisAsyncLock, get_async_redis_client
|
||||||
from reflector.schemas.platform import Platform
|
from reflector.schemas.platform import Platform
|
||||||
from reflector.services.ics_sync import ics_sync_service
|
from reflector.services.ics_sync import ics_sync_service
|
||||||
from reflector.utils.url import add_query_param
|
from reflector.utils.url import add_query_param
|
||||||
@@ -606,9 +608,6 @@ async def rooms_join_meeting(
|
|||||||
meeting.room_url = add_query_param(meeting.room_url, "t", token)
|
meeting.room_url = add_query_param(meeting.room_url, "t", token)
|
||||||
|
|
||||||
elif meeting.platform == "livekit":
|
elif meeting.platform == "livekit":
|
||||||
import re
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
client = create_platform_client(meeting.platform)
|
client = create_platform_client(meeting.platform)
|
||||||
# Identity must be unique per participant to avoid S3 key collisions.
|
# Identity must be unique per participant to avoid S3 key collisions.
|
||||||
# Format: {readable_name}-{short_uuid} ensures uniqueness even for same names.
|
# Format: {readable_name}-{short_uuid} ensures uniqueness even for same names.
|
||||||
@@ -631,8 +630,6 @@ async def rooms_join_meeting(
|
|||||||
# Store identity → Reflector user_id mapping for the pipeline
|
# Store identity → Reflector user_id mapping for the pipeline
|
||||||
# (so TranscriptParticipant.user_id can be set correctly)
|
# (so TranscriptParticipant.user_id can be set correctly)
|
||||||
if user_id:
|
if user_id:
|
||||||
from reflector.redis_cache import get_async_redis_client # noqa: PLC0415
|
|
||||||
|
|
||||||
redis_client = await get_async_redis_client()
|
redis_client = await get_async_redis_client()
|
||||||
mapping_key = f"livekit:participant_map:{meeting.room_name}"
|
mapping_key = f"livekit:participant_map:{meeting.room_name}"
|
||||||
await redis_client.hset(mapping_key, participant_identity, user_id)
|
await redis_client.hset(mapping_key, participant_identity, user_id)
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ from reflector.events import subscribers_shutdown
|
|||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
from reflector.pipelines.runner import PipelineRunner
|
from reflector.pipelines.runner import PipelineRunner
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
|
from reflector.webrtc_ports import resolve_webrtc_host, rewrite_sdp_host
|
||||||
|
|
||||||
sessions = []
|
sessions = []
|
||||||
router = APIRouter()
|
router = APIRouter()
|
||||||
@@ -128,8 +129,6 @@ async def rtc_offer_base(
|
|||||||
|
|
||||||
# Rewrite ICE candidate IPs when running behind Docker bridge networking
|
# Rewrite ICE candidate IPs when running behind Docker bridge networking
|
||||||
if settings.WEBRTC_HOST:
|
if settings.WEBRTC_HOST:
|
||||||
from reflector.webrtc_ports import resolve_webrtc_host, rewrite_sdp_host
|
|
||||||
|
|
||||||
host_ip = resolve_webrtc_host(settings.WEBRTC_HOST)
|
host_ip = resolve_webrtc_host(settings.WEBRTC_HOST)
|
||||||
sdp = rewrite_sdp_host(sdp, host_ip)
|
sdp = rewrite_sdp_host(sdp, host_ip)
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ from fastapi import APIRouter, Depends, HTTPException, Request
|
|||||||
|
|
||||||
import reflector.auth as auth
|
import reflector.auth as auth
|
||||||
from reflector.db.transcripts import transcripts_controller
|
from reflector.db.transcripts import transcripts_controller
|
||||||
|
from reflector.pipelines.main_live_pipeline import PipelineMainLive
|
||||||
|
|
||||||
from .rtc_offer import RtcOffer, rtc_offer_base
|
from .rtc_offer import RtcOffer, rtc_offer_base
|
||||||
|
|
||||||
@@ -28,8 +29,6 @@ async def transcript_record_webrtc(
|
|||||||
raise HTTPException(status_code=400, detail="Transcript is locked")
|
raise HTTPException(status_code=400, detail="Transcript is locked")
|
||||||
|
|
||||||
# create a pipeline runner
|
# create a pipeline runner
|
||||||
from reflector.pipelines.main_live_pipeline import PipelineMainLive # noqa: PLC0415
|
|
||||||
|
|
||||||
pipeline_runner = PipelineMainLive(transcript_id=transcript_id)
|
pipeline_runner = PipelineMainLive(transcript_id=transcript_id)
|
||||||
|
|
||||||
# FIXME do not allow multiple recording at the same time
|
# FIXME do not allow multiple recording at the same time
|
||||||
|
|||||||
@@ -11,6 +11,8 @@ This allows running the server in Docker with bridge networking
|
|||||||
import asyncio
|
import asyncio
|
||||||
import socket
|
import socket
|
||||||
|
|
||||||
|
import aioice.ice
|
||||||
|
|
||||||
from reflector.logger import logger
|
from reflector.logger import logger
|
||||||
|
|
||||||
|
|
||||||
@@ -36,9 +38,7 @@ def patch_aioice_port_range(min_port: int, max_port: int) -> None:
|
|||||||
Works by temporarily wrapping loop.create_datagram_endpoint() during
|
Works by temporarily wrapping loop.create_datagram_endpoint() during
|
||||||
aioice's get_component_candidates() to intercept bind(addr, 0) calls.
|
aioice's get_component_candidates() to intercept bind(addr, 0) calls.
|
||||||
"""
|
"""
|
||||||
import aioice.ice as _ice
|
_original = aioice.ice.Connection.get_component_candidates
|
||||||
|
|
||||||
_original = _ice.Connection.get_component_candidates
|
|
||||||
_state = {"next_port": min_port}
|
_state = {"next_port": min_port}
|
||||||
|
|
||||||
async def _patched_get_component_candidates(self, component, addresses, timeout=5):
|
async def _patched_get_component_candidates(self, component, addresses, timeout=5):
|
||||||
@@ -78,7 +78,7 @@ def patch_aioice_port_range(min_port: int, max_port: int) -> None:
|
|||||||
finally:
|
finally:
|
||||||
loop.create_datagram_endpoint = _orig_create
|
loop.create_datagram_endpoint = _orig_create
|
||||||
|
|
||||||
_ice.Connection.get_component_candidates = _patched_get_component_candidates
|
aioice.ice.Connection.get_component_candidates = _patched_get_component_candidates
|
||||||
logger.info(
|
logger.info(
|
||||||
"aioice patched for WebRTC port range",
|
"aioice patched for WebRTC port range",
|
||||||
min_port=min_port,
|
min_port=min_port,
|
||||||
@@ -102,8 +102,6 @@ def rewrite_sdp_host(sdp: str, target_ip: str) -> str:
|
|||||||
Replace container-internal IPs in SDP with target_ip so that
|
Replace container-internal IPs in SDP with target_ip so that
|
||||||
ICE candidates advertise a routable address.
|
ICE candidates advertise a routable address.
|
||||||
"""
|
"""
|
||||||
import aioice.ice
|
|
||||||
|
|
||||||
container_ips = aioice.ice.get_host_addresses(use_ipv4=True, use_ipv6=False)
|
container_ips = aioice.ice.get_host_addresses(use_ipv4=True, use_ipv6=False)
|
||||||
for ip in container_ips:
|
for ip in container_ips:
|
||||||
if ip != "127.0.0.1" and ip != target_ip:
|
if ip != "127.0.0.1" and ip != target_ip:
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
@@ -26,16 +27,26 @@ from reflector.db.transcripts import (
|
|||||||
transcripts_controller,
|
transcripts_controller,
|
||||||
)
|
)
|
||||||
from reflector.hatchet.client import HatchetClientManager
|
from reflector.hatchet.client import HatchetClientManager
|
||||||
|
from reflector.pipelines.topic_processing import EmptyPipeline
|
||||||
|
from reflector.processors.audio_file_writer import AudioFileWriterProcessor
|
||||||
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
|
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
|
||||||
from reflector.redis_cache import RedisAsyncLock
|
from reflector.redis_cache import RedisAsyncLock
|
||||||
from reflector.settings import settings
|
from reflector.settings import settings
|
||||||
from reflector.storage import get_transcripts_storage
|
from reflector.storage import get_source_storage, get_transcripts_storage
|
||||||
from reflector.utils.daily import (
|
from reflector.utils.daily import (
|
||||||
DailyRoomName,
|
DailyRoomName,
|
||||||
extract_base_room_name,
|
extract_base_room_name,
|
||||||
filter_cam_audio_tracks,
|
filter_cam_audio_tracks,
|
||||||
recording_lock_key,
|
recording_lock_key,
|
||||||
)
|
)
|
||||||
|
from reflector.utils.livekit import (
|
||||||
|
extract_livekit_base_room_name,
|
||||||
|
filter_audio_tracks,
|
||||||
|
parse_livekit_track_filepath,
|
||||||
|
)
|
||||||
|
from reflector.utils.livekit import (
|
||||||
|
recording_lock_key as livekit_recording_lock_key,
|
||||||
|
)
|
||||||
from reflector.utils.string import NonEmptyString
|
from reflector.utils.string import NonEmptyString
|
||||||
from reflector.video_platforms.factory import create_platform_client
|
from reflector.video_platforms.factory import create_platform_client
|
||||||
from reflector.video_platforms.whereby_utils import (
|
from reflector.video_platforms.whereby_utils import (
|
||||||
@@ -932,11 +943,6 @@ async def convert_audio_and_waveform(transcript) -> None:
|
|||||||
transcript_id=transcript.id,
|
transcript_id=transcript.id,
|
||||||
)
|
)
|
||||||
|
|
||||||
from reflector.pipelines.topic_processing import EmptyPipeline # noqa: PLC0415
|
|
||||||
from reflector.processors.audio_file_writer import (
|
|
||||||
AudioFileWriterProcessor, # noqa: PLC0415
|
|
||||||
)
|
|
||||||
|
|
||||||
upload_path = transcript.data_path / "upload.webm"
|
upload_path = transcript.data_path / "upload.webm"
|
||||||
mp3_path = transcript.audio_mp3_filename
|
mp3_path = transcript.audio_mp3_filename
|
||||||
|
|
||||||
@@ -1215,17 +1221,13 @@ async def process_livekit_multitrack(
|
|||||||
Tracks are discovered via S3 listing (source of truth), not webhooks.
|
Tracks are discovered via S3 listing (source of truth), not webhooks.
|
||||||
Called from room_finished webhook (fast-path) or beat task (fallback).
|
Called from room_finished webhook (fast-path) or beat task (fallback).
|
||||||
"""
|
"""
|
||||||
from reflector.utils.livekit import ( # noqa: PLC0415
|
|
||||||
recording_lock_key,
|
|
||||||
)
|
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
"Processing LiveKit multitrack recording",
|
"Processing LiveKit multitrack recording",
|
||||||
room_name=room_name,
|
room_name=room_name,
|
||||||
meeting_id=meeting_id,
|
meeting_id=meeting_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
lock_key = recording_lock_key(room_name)
|
lock_key = livekit_recording_lock_key(room_name)
|
||||||
async with RedisAsyncLock(
|
async with RedisAsyncLock(
|
||||||
key=lock_key,
|
key=lock_key,
|
||||||
timeout=600,
|
timeout=600,
|
||||||
@@ -1252,19 +1254,10 @@ async def _process_livekit_multitrack_inner(
|
|||||||
# 1. Discover tracks by listing S3 prefix.
|
# 1. Discover tracks by listing S3 prefix.
|
||||||
# Wait briefly for egress files to finish flushing to S3 — the room_finished
|
# Wait briefly for egress files to finish flushing to S3 — the room_finished
|
||||||
# webhook fires after empty_timeout, but egress finalization may still be in progress.
|
# webhook fires after empty_timeout, but egress finalization may still be in progress.
|
||||||
import asyncio as _asyncio # noqa: PLC0415
|
|
||||||
|
|
||||||
from reflector.storage import get_source_storage # noqa: PLC0415
|
|
||||||
from reflector.utils.livekit import ( # noqa: PLC0415
|
|
||||||
extract_livekit_base_room_name,
|
|
||||||
filter_audio_tracks,
|
|
||||||
parse_livekit_track_filepath,
|
|
||||||
)
|
|
||||||
|
|
||||||
EGRESS_FLUSH_DELAY = 10 # seconds — egress typically flushes within a few seconds
|
EGRESS_FLUSH_DELAY = 10 # seconds — egress typically flushes within a few seconds
|
||||||
EGRESS_RETRY_DELAY = 30 # seconds — retry if first listing finds nothing
|
EGRESS_RETRY_DELAY = 30 # seconds — retry if first listing finds nothing
|
||||||
|
|
||||||
await _asyncio.sleep(EGRESS_FLUSH_DELAY)
|
await asyncio.sleep(EGRESS_FLUSH_DELAY)
|
||||||
|
|
||||||
storage = get_source_storage("livekit")
|
storage = get_source_storage("livekit")
|
||||||
s3_prefix = f"livekit/{room_name}/"
|
s3_prefix = f"livekit/{room_name}/"
|
||||||
@@ -1280,7 +1273,7 @@ async def _process_livekit_multitrack_inner(
|
|||||||
room_name=room_name,
|
room_name=room_name,
|
||||||
retry_delay=EGRESS_RETRY_DELAY,
|
retry_delay=EGRESS_RETRY_DELAY,
|
||||||
)
|
)
|
||||||
await _asyncio.sleep(EGRESS_RETRY_DELAY)
|
await asyncio.sleep(EGRESS_RETRY_DELAY)
|
||||||
all_keys = await storage.list_objects(prefix=s3_prefix)
|
all_keys = await storage.list_objects(prefix=s3_prefix)
|
||||||
audio_keys = filter_audio_tracks(all_keys) if all_keys else []
|
audio_keys = filter_audio_tracks(all_keys) if all_keys else []
|
||||||
|
|
||||||
@@ -1299,7 +1292,7 @@ async def _process_livekit_multitrack_inner(
|
|||||||
expected=expected_audio,
|
expected=expected_audio,
|
||||||
found=len(audio_keys),
|
found=len(audio_keys),
|
||||||
)
|
)
|
||||||
await _asyncio.sleep(EGRESS_RETRY_DELAY)
|
await asyncio.sleep(EGRESS_RETRY_DELAY)
|
||||||
all_keys = await storage.list_objects(prefix=s3_prefix)
|
all_keys = await storage.list_objects(prefix=s3_prefix)
|
||||||
audio_keys = filter_audio_tracks(all_keys) if all_keys else []
|
audio_keys = filter_audio_tracks(all_keys) if all_keys else []
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user