mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 04:39:06 +00:00
fix: zulip and consent handler on the file pipeline (#645)
This commit is contained in:
@@ -12,7 +12,7 @@ from pathlib import Path
|
|||||||
|
|
||||||
import av
|
import av
|
||||||
import structlog
|
import structlog
|
||||||
from celery import shared_task
|
from celery import chain, shared_task
|
||||||
|
|
||||||
from reflector.asynctask import asynctask
|
from reflector.asynctask import asynctask
|
||||||
from reflector.db.rooms import rooms_controller
|
from reflector.db.rooms import rooms_controller
|
||||||
@@ -26,6 +26,8 @@ from reflector.logger import logger
|
|||||||
from reflector.pipelines.main_live_pipeline import (
|
from reflector.pipelines.main_live_pipeline import (
|
||||||
PipelineMainBase,
|
PipelineMainBase,
|
||||||
broadcast_to_sockets,
|
broadcast_to_sockets,
|
||||||
|
task_cleanup_consent,
|
||||||
|
task_pipeline_post_to_zulip,
|
||||||
)
|
)
|
||||||
from reflector.processors import (
|
from reflector.processors import (
|
||||||
AudioFileWriterProcessor,
|
AudioFileWriterProcessor,
|
||||||
@@ -379,6 +381,28 @@ class PipelineMainFile(PipelineMainBase):
|
|||||||
await processor.flush()
|
await processor.flush()
|
||||||
|
|
||||||
|
|
||||||
|
@shared_task
|
||||||
|
@asynctask
|
||||||
|
async def task_send_webhook_if_needed(*, transcript_id: str):
|
||||||
|
"""Send webhook if this is a room recording with webhook configured"""
|
||||||
|
transcript = await transcripts_controller.get_by_id(transcript_id)
|
||||||
|
if not transcript:
|
||||||
|
return
|
||||||
|
|
||||||
|
if transcript.source_kind == SourceKind.ROOM and transcript.room_id:
|
||||||
|
room = await rooms_controller.get_by_id(transcript.room_id)
|
||||||
|
if room and room.webhook_url:
|
||||||
|
logger.info(
|
||||||
|
"Dispatching webhook",
|
||||||
|
transcript_id=transcript_id,
|
||||||
|
room_id=room.id,
|
||||||
|
webhook_url=room.webhook_url,
|
||||||
|
)
|
||||||
|
send_transcript_webhook.delay(
|
||||||
|
transcript_id, room.id, event_id=uuid.uuid4().hex
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
@asynctask
|
@asynctask
|
||||||
async def task_pipeline_file_process(*, transcript_id: str):
|
async def task_pipeline_file_process(*, transcript_id: str):
|
||||||
@@ -406,16 +430,10 @@ async def task_pipeline_file_process(*, transcript_id: str):
|
|||||||
await pipeline.set_status(transcript_id, "error")
|
await pipeline.set_status(transcript_id, "error")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# Trigger webhook if this is a room recording with webhook configured
|
# Run post-processing chain: consent cleanup -> zulip -> webhook
|
||||||
if transcript.source_kind == SourceKind.ROOM and transcript.room_id:
|
post_chain = chain(
|
||||||
room = await rooms_controller.get_by_id(transcript.room_id)
|
task_cleanup_consent.si(transcript_id=transcript_id),
|
||||||
if room and room.webhook_url:
|
task_pipeline_post_to_zulip.si(transcript_id=transcript_id),
|
||||||
logger.info(
|
task_send_webhook_if_needed.si(transcript_id=transcript_id),
|
||||||
"Dispatching webhook task",
|
|
||||||
transcript_id=transcript_id,
|
|
||||||
room_id=room.id,
|
|
||||||
webhook_url=room.webhook_url,
|
|
||||||
)
|
|
||||||
send_transcript_webhook.delay(
|
|
||||||
transcript_id, room.id, event_id=uuid.uuid4().hex
|
|
||||||
)
|
)
|
||||||
|
post_chain.delay()
|
||||||
|
|||||||
Reference in New Issue
Block a user