hatchet: restore zullip report

This commit is contained in:
Igor Loskutov
2025-12-17 11:06:27 -05:00
parent 298abe8656
commit 7a29c742c5
6 changed files with 77 additions and 50 deletions

View File

@@ -30,9 +30,13 @@ def main() -> None:
debug=settings.HATCHET_DEBUG, debug=settings.HATCHET_DEBUG,
) )
# Import workflows to register them # Import here (not top-level) - workflow imports trigger HatchetClientManager.get_client()
from reflector.hatchet.client import HatchetClientManager # which requires HATCHET_CLIENT_TOKEN; must validate settings first
from reflector.hatchet.workflows import diarization_pipeline, track_workflow from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
from reflector.hatchet.workflows import ( # noqa: PLC0415
diarization_pipeline,
track_workflow,
)
hatchet = HatchetClientManager.get_client() hatchet = HatchetClientManager.get_client()

View File

@@ -10,13 +10,17 @@ import functools
import tempfile import tempfile
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from datetime import timedelta from datetime import timedelta
from fractions import Fraction
from pathlib import Path from pathlib import Path
from typing import Callable from typing import Callable
import av import av
import httpx
from av.audio.resampler import AudioResampler
from hatchet_sdk import Context from hatchet_sdk import Context
from pydantic import BaseModel from pydantic import BaseModel
from reflector.dailyco_api.client import DailyApiClient
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.hatchet.progress import emit_progress_async from reflector.hatchet.progress import emit_progress_async
from reflector.hatchet.workflows.models import ( from reflector.hatchet.workflows.models import (
@@ -36,6 +40,23 @@ from reflector.hatchet.workflows.models import (
) )
from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow from reflector.hatchet.workflows.track_processing import TrackInput, track_workflow
from reflector.logger import logger from reflector.logger import logger
from reflector.pipelines import topic_processing
from reflector.processors import AudioFileWriterProcessor
from reflector.processors.types import (
TitleSummary,
Word,
)
from reflector.processors.types import (
Transcript as TranscriptType,
)
from reflector.settings import settings
from reflector.storage.storage_aws import AwsStorage
from reflector.utils.audio_waveform import get_audio_waveform
from reflector.utils.daily import (
filter_cam_audio_tracks,
parse_daily_recording_filename,
)
from reflector.zulip import post_transcript_notification
# Audio constants # Audio constants
OPUS_STANDARD_SAMPLE_RATE = 48000 OPUS_STANDARD_SAMPLE_RATE = 48000
@@ -74,7 +95,6 @@ async def fresh_db_connection():
import databases import databases
from reflector.db import _database_context from reflector.db import _database_context
from reflector.settings import settings
_database_context.set(None) _database_context.set(None)
db = databases.Database(settings.DATABASE_URL) db = databases.Database(settings.DATABASE_URL)
@@ -116,9 +136,6 @@ async def set_workflow_error_status(transcript_id: str) -> bool:
def _get_storage(): def _get_storage():
"""Create fresh storage instance.""" """Create fresh storage instance."""
from reflector.settings import settings
from reflector.storage.storage_aws import AwsStorage
return AwsStorage( return AwsStorage(
aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME, aws_bucket_name=settings.TRANSCRIPT_STORAGE_AWS_BUCKET_NAME,
aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION, aws_region=settings.TRANSCRIPT_STORAGE_AWS_REGION,
@@ -198,9 +215,6 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
transcript_id=input.transcript_id, transcript_id=input.transcript_id,
) )
from reflector.dailyco_api.client import DailyApiClient
from reflector.settings import settings
if not input.recording_id: if not input.recording_id:
# No recording_id in reprocess path - return minimal data # No recording_id in reprocess path - return minimal data
await emit_progress_async( await emit_progress_async(
@@ -257,13 +271,6 @@ async def get_participants(input: PipelineInput, ctx: Context) -> ParticipantsRe
recording_data = _to_dict(ctx.task_output(get_recording)) recording_data = _to_dict(ctx.task_output(get_recording))
mtg_session_id = recording_data.get("mtg_session_id") mtg_session_id = recording_data.get("mtg_session_id")
from reflector.dailyco_api.client import DailyApiClient
from reflector.settings import settings
from reflector.utils.daily import (
filter_cam_audio_tracks,
parse_daily_recording_filename,
)
# Get transcript and reset events/topics/participants # Get transcript and reset events/topics/participants
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.transcripts import ( from reflector.db.transcripts import (
@@ -488,12 +495,6 @@ async def mixdown_tracks(input: PipelineInput, ctx: Context) -> MixdownResult:
padded_urls.append(url) padded_urls.append(url)
# Use PipelineMainMultitrack.mixdown_tracks which uses PyAV filter graph # Use PipelineMainMultitrack.mixdown_tracks which uses PyAV filter graph
from fractions import Fraction
from av.audio.resampler import AudioResampler
from reflector.processors import AudioFileWriterProcessor
valid_urls = [url for url in padded_urls if url] valid_urls = [url for url in padded_urls if url]
if not valid_urls: if not valid_urls:
raise ValueError("No valid padded tracks to mixdown") raise ValueError("No valid padded tracks to mixdown")
@@ -688,10 +689,7 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
input.transcript_id, "generate_waveform", "in_progress", ctx.workflow_run_id input.transcript_id, "generate_waveform", "in_progress", ctx.workflow_run_id
) )
import httpx
from reflector.db.transcripts import TranscriptWaveform, transcripts_controller from reflector.db.transcripts import TranscriptWaveform, transcripts_controller
from reflector.utils.audio_waveform import get_audio_waveform
# Cleanup temporary padded S3 files (deferred until after mixdown) # Cleanup temporary padded S3 files (deferred until after mixdown)
track_data = _to_dict(ctx.task_output(process_tracks)) track_data = _to_dict(ctx.task_output(process_tracks))
@@ -779,12 +777,9 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
target_language = track_data.get("target_language", "en") target_language = track_data.get("target_language", "en")
from reflector.db.transcripts import TranscriptTopic, transcripts_controller from reflector.db.transcripts import TranscriptTopic, transcripts_controller
from reflector.pipelines import topic_processing
from reflector.processors.types import ( from reflector.processors.types import (
TitleSummaryWithId as TitleSummaryWithIdProcessorType, TitleSummaryWithId as TitleSummaryWithIdProcessorType,
) )
from reflector.processors.types import Transcript as TranscriptType
from reflector.processors.types import Word
# Convert word dicts to Word objects # Convert word dicts to Word objects
word_objects = [Word(**w) for w in words] word_objects = [Word(**w) for w in words]
@@ -850,8 +845,6 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
TranscriptFinalTitle, TranscriptFinalTitle,
transcripts_controller, transcripts_controller,
) )
from reflector.pipelines import topic_processing
from reflector.processors.types import TitleSummary
topic_objects = [TitleSummary(**t) for t in topics] topic_objects = [TitleSummary(**t) for t in topics]
@@ -913,8 +906,6 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
TranscriptFinalShortSummary, TranscriptFinalShortSummary,
transcripts_controller, transcripts_controller,
) )
from reflector.pipelines import topic_processing
from reflector.processors.types import TitleSummary
topic_objects = [TitleSummary(**t) for t in topics] topic_objects = [TitleSummary(**t) for t in topics]
@@ -1100,8 +1091,6 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
input.transcript_id, "post_zulip", "in_progress", ctx.workflow_run_id input.transcript_id, "post_zulip", "in_progress", ctx.workflow_run_id
) )
from reflector.settings import settings
if not settings.ZULIP_REALM: if not settings.ZULIP_REALM:
logger.info("[Hatchet] post_zulip skipped (Zulip not configured)") logger.info("[Hatchet] post_zulip skipped (Zulip not configured)")
await emit_progress_async( await emit_progress_async(
@@ -1109,8 +1098,6 @@ async def post_zulip(input: PipelineInput, ctx: Context) -> ZulipResult:
) )
return ZulipResult(zulip_message_id=None, skipped=True) return ZulipResult(zulip_message_id=None, skipped=True)
from reflector.zulip import post_transcript_notification
async with fresh_db_connection(): async with fresh_db_connection():
from reflector.db.transcripts import transcripts_controller from reflector.db.transcripts import transcripts_controller
@@ -1155,8 +1142,6 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
transcript = await transcripts_controller.get_by_id(input.transcript_id) transcript = await transcripts_controller.get_by_id(input.transcript_id)
if room and room.webhook_url and transcript: if room and room.webhook_url and transcript:
import httpx
webhook_payload = { webhook_payload = {
"event": "transcript.completed", "event": "transcript.completed",
"transcript_id": input.transcript_id, "transcript_id": input.transcript_id,

View File

@@ -15,7 +15,8 @@ from hatchet_sdk.clients.rest.exceptions import ApiException
from hatchet_sdk.clients.rest.models import V1TaskStatus from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller from reflector.db.recordings import recordings_controller
from reflector.db.transcripts import Transcript from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager from reflector.hatchet.client import HatchetClientManager
from reflector.logger import logger from reflector.logger import logger
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
@@ -180,9 +181,6 @@ async def dispatch_transcript_processing(
Returns AsyncResult for Celery tasks, None for Hatchet workflows. Returns AsyncResult for Celery tasks, None for Hatchet workflows.
""" """
from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import transcripts_controller
if isinstance(config, MultitrackProcessingConfig): if isinstance(config, MultitrackProcessingConfig):
# Check if room has use_hatchet=True (overrides env vars) # Check if room has use_hatchet=True (overrides env vars)
room_forces_hatchet = False room_forces_hatchet = False

View File

@@ -17,7 +17,9 @@ from typing import Callable
from celery.result import AsyncResult from celery.result import AsyncResult
from hatchet_sdk.clients.rest.models import V1TaskStatus from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db import get_database
from reflector.db.transcripts import Transcript, transcripts_controller from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.hatchet.client import HatchetClientManager
from reflector.services.transcript_process import ( from reflector.services.transcript_process import (
FileProcessingConfig, FileProcessingConfig,
MultitrackProcessingConfig, MultitrackProcessingConfig,
@@ -55,8 +57,6 @@ async def process_transcript(
sync: If True, wait for task completion. If False, dispatch and exit. sync: If True, wait for task completion. If False, dispatch and exit.
force: If True, cancel old workflow and start new (latest code). If False, replay failed workflow. force: If True, cancel old workflow and start new (latest code). If False, replay failed workflow.
""" """
from reflector.db import get_database
database = get_database() database = get_database()
await database.connect() await database.connect()
@@ -96,8 +96,6 @@ async def process_transcript(
if result is None: if result is None:
# Hatchet workflow dispatched # Hatchet workflow dispatched
if sync: if sync:
from reflector.hatchet.client import HatchetClientManager
# Re-fetch transcript to get workflow_run_id # Re-fetch transcript to get workflow_run_id
transcript = await transcripts_controller.get_by_id(transcript_id) transcript = await transcripts_controller.get_by_id(transcript_id)
if not transcript or not transcript.workflow_run_id: if not transcript or not transcript.workflow_run_id:

View File

@@ -24,6 +24,7 @@ from reflector.db.transcripts import (
SourceKind, SourceKind,
transcripts_controller, transcripts_controller,
) )
from reflector.hatchet.client import HatchetClientManager
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
from reflector.pipelines.main_live_pipeline import asynctask from reflector.pipelines.main_live_pipeline import asynctask
from reflector.pipelines.main_multitrack_pipeline import ( from reflector.pipelines.main_multitrack_pipeline import (
@@ -298,8 +299,6 @@ async def _process_multitrack_recording_inner(
) )
if use_hatchet: if use_hatchet:
from reflector.hatchet.client import HatchetClientManager # noqa: PLC0415
workflow_id = await HatchetClientManager.start_workflow( workflow_id = await HatchetClientManager.start_workflow(
workflow_name="DiarizationPipeline", workflow_name="DiarizationPipeline",
input_data={ input_data={

View File

@@ -3,7 +3,8 @@ from urllib.parse import urlparse
import httpx import httpx
from reflector.db.transcripts import Transcript from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import Transcript, transcripts_controller
from reflector.settings import settings from reflector.settings import settings
@@ -113,6 +114,48 @@ def get_zulip_message(transcript: Transcript, include_topics: bool):
return message return message
async def post_transcript_notification(transcript: Transcript) -> int | None:
"""Post or update transcript notification in Zulip.
Uses transcript.room_id directly (Hatchet flow).
Celery's pipeline_post_to_zulip uses recording→meeting→room path instead.
"""
if not transcript.room_id:
return None
room = await rooms_controller.get_by_id(transcript.room_id)
if not room or not room.zulip_stream or not room.zulip_auto_post:
return None
message = get_zulip_message(transcript=transcript, include_topics=True)
message_updated = False
if transcript.zulip_message_id:
try:
await update_zulip_message(
transcript.zulip_message_id,
room.zulip_stream,
room.zulip_topic,
message,
)
message_updated = True
except Exception:
pass
if not message_updated:
response = await send_message_to_zulip(
room.zulip_stream, room.zulip_topic, message
)
message_id = response.get("id")
if message_id:
await transcripts_controller.update(
transcript, {"zulip_message_id": message_id}
)
return message_id
return transcript.zulip_message_id
def extract_domain(url: str) -> str: def extract_domain(url: str) -> str:
return urlparse(url).netloc return urlparse(url).netloc