diff --git a/server/reflector/hatchet/run_workers.py b/server/reflector/hatchet/run_workers.py index 96eb6bfa..6f15b476 100644 --- a/server/reflector/hatchet/run_workers.py +++ b/server/reflector/hatchet/run_workers.py @@ -30,9 +30,13 @@ def main() -> None: debug=settings.HATCHET_DEBUG, ) - # Import workflows to register them - from reflector.hatchet.client import HatchetClientManager - from reflector.hatchet.workflows import diarization_pipeline, track_workflow + # Import here (not top-level) - workflow imports trigger HatchetClientManager.get_client() + # which requires HATCHET_CLIENT_TOKEN; must validate settings first + from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415 + from reflector.hatchet.workflows import ( # noqa: PLC0415 + diarization_pipeline, + track_workflow, + ) hatchet = HatchetClientManager.get_client() diff --git a/server/reflector/hatchet/workflows/diarization_pipeline.py b/server/reflector/hatchet/workflows/diarization_pipeline.py index ed0a13b8..fb9534b3 100644 --- a/server/reflector/hatchet/workflows/diarization_pipeline.py +++ b/server/reflector/hatchet/workflows/diarization_pipeline.py @@ -10,13 +10,17 @@ import functools import tempfile from contextlib import asynccontextmanager from datetime import timedelta +from fractions import Fraction from pathlib import Path from typing import Callable import av +import httpx +from av.audio.resampler import AudioResampler from hatchet_sdk import Context from pydantic import BaseModel +from reflector.dailyco_api.client import DailyApiClient from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.progress import emit_progress_async from reflector.hatchet.workflows.models import ( @@ -36,6 +40,23 @@ from reflector.hatchet.workflows.models import ( ) from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow from reflector.logger import logger +from reflector.pipelines import topic_processing +from reflector.processors import AudioFileWriterProcessor +from reflector.processors.types import ( + TitleSummary, + Word, +) +from reflector.processors.types import ( + Transcript as TranscriptType, +) +from reflector.settings import settings +from reflector.storage.storage_aws import AwsStorage +from reflector.utils.audio_waveform import get_audio_waveform +from reflector.utils.daily import ( + filter_cam_audio_tracks, + parse_daily_recording_filename, +) +from reflector.zulip import post_transcript_notification # Audio constants OPUS_STANDARD_SAMPLE_RATE = 48000 @@ -74,7 +95,6 @@ async def fresh_db_connection(): import databases from reflector.db import _database_context - from reflector.settings import settings _database_context.set(None) db = databases.Database(settings.DATABASE_URL) @@ -116,9 +136,6 @@ async def set_workflow_error_status(transcript_id: str) -> bool: def _get_storage(): """Create fresh storage instance.""" - from reflector.settings import settings - from reflector.storage.storage_aws import AwsStorage - return AwsStorage( aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME, aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION, @@ -198,9 +215,6 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: transcript_id=input.transcript_id, ) - from reflector.dailyco_api.client import DailyApiClient - from reflector.settings import settings - if not input.recording_id: # No recording_id in reprocess path - return minimal data await emit_progress_async( @@ -257,13 +271,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe recording_data = _to_dict(ctx.task_output(get_recording)) mtg_session_id = recording_data.get("mtg_session_id") - from reflector.dailyco_api.client import DailyApiClient - from reflector.settings import settings - from reflector.utils.daily import ( - filter_cam_audio_tracks, - parse_daily_recording_filename, - ) - # Get transcript and reset events/topics/participants async with fresh_db_connection(): from reflector.db.transcripts import ( @@ -488,12 +495,6 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult: padded_urls.append(url) # Use PipelineMainMultitrack.mixdown_tracks which uses PyAV filter graph - from fractions import Fraction - - from av.audio.resampler import AudioResampler - - from reflector.processors import AudioFileWriterProcessor - valid_urls = [url for url in padded_urls if url] if not valid_urls: raise ValueError("No valid padded tracks to mixdown") @@ -688,10 +689,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul input.transcript_id, "generate_waveform", "in_progress", ctx.workflow_run_id ) - import httpx - from reflector.db.transcripts import TranscriptWaveform, transcripts_controller - from reflector.utils.audio_waveform import get_audio_waveform # Cleanup temporary padded S3 files (deferred until after mixdown) track_data = _to_dict(ctx.task_output(process_tracks)) @@ -779,12 +777,9 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: target_language = track_data.get("target_language", "en") from reflector.db.transcripts import TranscriptTopic, transcripts_controller - from reflector.pipelines import topic_processing from reflector.processors.types import ( TitleSummaryWithId as TitleSummaryWithIdProcessorType, ) - from reflector.processors.types import Transcript as TranscriptType - from reflector.processors.types import Word # Convert word dicts to Word objects word_objects = [Word(**w) for w in words] @@ -850,8 +845,6 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult: TranscriptFinalTitle, transcripts_controller, ) - from reflector.pipelines import topic_processing - from reflector.processors.types import TitleSummary topic_objects = [TitleSummary(**t) for t in topics] @@ -913,8 +906,6 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult: TranscriptFinalShortSummary, transcripts_controller, ) - from reflector.pipelines import topic_processing - from reflector.processors.types import TitleSummary topic_objects = [TitleSummary(**t) for t in topics] @@ -1100,8 +1091,6 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult: input.transcript_id, "post_zulip", "in_progress", ctx.workflow_run_id ) - from reflector.settings import settings - if not settings.ZULIP_REALM: logger.info("[Hatchet] post_zulip skipped (Zulip not configured)") await emit_progress_async( @@ -1109,8 +1098,6 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult: ) return ZulipResult(zulip_message_id=None, skipped=True) - from reflector.zulip import post_transcript_notification - async with fresh_db_connection(): from reflector.db.transcripts import transcripts_controller @@ -1155,8 +1142,6 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult: transcript = await transcripts_controller.get_by_id(input.transcript_id) if room and room.webhook_url and transcript: - import httpx - webhook_payload = { "event": "transcript.completed", "transcript_id": input.transcript_id, diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index 2890e069..0e9b620a 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -15,7 +15,8 @@ from hatchet_sdk.clients.rest.exceptions import ApiException from hatchet_sdk.clients.rest.models import V1TaskStatus from reflector.db.recordings import recordings_controller -from reflector.db.transcripts import Transcript +from reflector.db.rooms import rooms_controller +from reflector.db.transcripts import Transcript, transcripts_controller from reflector.hatchet.client import HatchetClientManager from reflector.logger import logger from reflector.pipelines.main_file_pipeline import task_pipeline_file_process @@ -180,9 +181,6 @@ async def dispatch_transcript_processing( Returns AsyncResult for Celery tasks, None for Hatchet workflows. """ - from reflector.db.rooms import rooms_controller - from reflector.db.transcripts import transcripts_controller - if isinstance(config, MultitrackProcessingConfig): # Check if room has use_hatchet=True (overrides env vars) room_forces_hatchet = False diff --git a/server/reflector/tools/process_transcript.py b/server/reflector/tools/process_transcript.py index 8d8b8d04..84876e49 100644 --- a/server/reflector/tools/process_transcript.py +++ b/server/reflector/tools/process_transcript.py @@ -17,7 +17,9 @@ from typing import Callable from celery.result import AsyncResult from hatchet_sdk.clients.rest.models import V1TaskStatus +from reflector.db import get_database from reflector.db.transcripts import Transcript, transcripts_controller +from reflector.hatchet.client import HatchetClientManager from reflector.services.transcript_process import ( FileProcessingConfig, MultitrackProcessingConfig, @@ -55,8 +57,6 @@ async def process_transcript( sync: If True, wait for task completion. If False, dispatch and exit. force: If True, cancel old workflow and start new (latest code). If False, replay failed workflow. """ - from reflector.db import get_database - database = get_database() await database.connect() @@ -96,8 +96,6 @@ async def process_transcript( if result is None: # Hatchet workflow dispatched if sync: - from reflector.hatchet.client import HatchetClientManager - # Re-fetch transcript to get workflow_run_id transcript = await transcripts_controller.get_by_id(transcript_id) if not transcript or not transcript.workflow_run_id: diff --git a/server/reflector/worker/process.py b/server/reflector/worker/process.py index 19ef9909..9a1ad9f6 100644 --- a/server/reflector/worker/process.py +++ b/server/reflector/worker/process.py @@ -24,6 +24,7 @@ from reflector.db.transcripts import ( SourceKind, transcripts_controller, ) +from reflector.hatchet.client import HatchetClientManager from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_live_pipeline import asynctask from reflector.pipelines.main_multitrack_pipeline import ( @@ -298,8 +299,6 @@ async def _process_multitrack_recording_inner( ) if use_hatchet: - from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415 - workflow_id = await HatchetClientManager.start_workflow( workflow_name="DiarizationPipeline", input_data={ diff --git a/server/reflector/zulip.py b/server/reflector/zulip.py index 02e146c0..99a98627 100644 --- a/server/reflector/zulip.py +++ b/server/reflector/zulip.py @@ -3,7 +3,8 @@ from urllib.parse import urlparse import httpx -from reflector.db.transcripts import Transcript +from reflector.db.rooms import rooms_controller +from reflector.db.transcripts import Transcript, transcripts_controller from reflector.settings import settings @@ -113,6 +114,48 @@ def get_zulip_message(transcript: Transcript, include_topics: bool): return message +async def post_transcript_notification(transcript: Transcript) -> int | None: + """Post or update transcript notification in Zulip. + + Uses transcript.room_id directly (Hatchet flow). + Celery's pipeline_post_to_zulip uses recording→meeting→room path instead. + """ + if not transcript.room_id: + return None + + room = await rooms_controller.get_by_id(transcript.room_id) + if not room or not room.zulip_stream or not room.zulip_auto_post: + return None + + message = get_zulip_message(transcript=transcript, include_topics=True) + message_updated = False + + if transcript.zulip_message_id: + try: + await update_zulip_message( + transcript.zulip_message_id, + room.zulip_stream, + room.zulip_topic, + message, + ) + message_updated = True + except Exception: + pass + + if not message_updated: + response = await send_message_to_zulip( + room.zulip_stream, room.zulip_topic, message + ) + message_id = response.get("id") + if message_id: + await transcripts_controller.update( + transcript, {"zulip_message_id": message_id} + ) + return message_id + + return transcript.zulip_message_id + + def extract_domain(url: str) -> str: return urlparse(url).netloc