From 5f143fe3640875dcb56c26694254a93189281d17 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Mon, 15 Sep 2025 10:49:20 -0600 Subject: [PATCH] fix: zulip and consent handler on the file pipeline (#645) --- .../reflector/pipelines/main_file_pipeline.py | 46 +++++++++++++------ 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/server/reflector/pipelines/main_file_pipeline.py b/server/reflector/pipelines/main_file_pipeline.py index 5c57dddb..ce9d000e 100644 --- a/server/reflector/pipelines/main_file_pipeline.py +++ b/server/reflector/pipelines/main_file_pipeline.py @@ -12,7 +12,7 @@ from pathlib import Path import av import structlog -from celery import shared_task +from celery import chain, shared_task from reflector.asynctask import asynctask from reflector.db.rooms import rooms_controller @@ -26,6 +26,8 @@ from reflector.logger import logger from reflector.pipelines.main_live_pipeline import ( PipelineMainBase, broadcast_to_sockets, + task_cleanup_consent, + task_pipeline_post_to_zulip, ) from reflector.processors import ( AudioFileWriterProcessor, @@ -379,6 +381,28 @@ class PipelineMainFile(PipelineMainBase): await processor.flush() +@shared_task +@asynctask +async def task_send_webhook_if_needed(*, transcript_id: str): + """Send webhook if this is a room recording with webhook configured""" + transcript = await transcripts_controller.get_by_id(transcript_id) + if not transcript: + return + + if transcript.source_kind == SourceKind.ROOM and transcript.room_id: + room = await rooms_controller.get_by_id(transcript.room_id) + if room and room.webhook_url: + logger.info( + "Dispatching webhook", + transcript_id=transcript_id, + room_id=room.id, + webhook_url=room.webhook_url, + ) + send_transcript_webhook.delay( + transcript_id, room.id, event_id=uuid.uuid4().hex + ) + + @shared_task @asynctask async def task_pipeline_file_process(*, transcript_id: str): @@ -406,16 +430,10 @@ async def task_pipeline_file_process(*, transcript_id: str): await pipeline.set_status(transcript_id, "error") raise - # Trigger webhook if this is a room recording with webhook configured - if transcript.source_kind == SourceKind.ROOM and transcript.room_id: - room = await rooms_controller.get_by_id(transcript.room_id) - if room and room.webhook_url: - logger.info( - "Dispatching webhook task", - transcript_id=transcript_id, - room_id=room.id, - webhook_url=room.webhook_url, - ) - send_transcript_webhook.delay( - transcript_id, room.id, event_id=uuid.uuid4().hex - ) + # Run post-processing chain: consent cleanup -> zulip -> webhook + post_chain = chain( + task_cleanup_consent.si(transcript_id=transcript_id), + task_pipeline_post_to_zulip.si(transcript_id=transcript_id), + task_send_webhook_if_needed.si(transcript_id=transcript_id), + ) + post_chain.delay()