diff --git a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py index 0726cfd6..2d1ab194 100644 --- a/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py +++ b/server/reflector/hatchet/workflows/daily_multitrack_pipeline.py @@ -1149,8 +1149,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: transcript, "TRANSCRIPT", TranscriptText( - text=merged_transcript.text, - translation=merged_transcript.translation, + text="", + translation=None, ), logger=logger, ) @@ -1347,14 +1347,34 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult: f"participants={len(payload.transcript.participants)})" ) - response = await send_webhook_request( - url=room.webhook_url, - payload=payload, - event_type="transcript.completed", - webhook_secret=room.webhook_secret, - timeout=30.0, - ) + try: + response = await send_webhook_request( + url=room.webhook_url, + payload=payload, + event_type="transcript.completed", + webhook_secret=room.webhook_secret, + timeout=30.0, + ) - ctx.log(f"send_webhook complete: status_code={response.status_code}") + ctx.log(f"send_webhook complete: status_code={response.status_code}") + return WebhookResult(webhook_sent=True, response_code=response.status_code) - return WebhookResult(webhook_sent=True, response_code=response.status_code) + except httpx.HTTPStatusError as e: + ctx.log( + f"send_webhook failed (HTTP {e.response.status_code}), continuing anyway" + ) + return WebhookResult( + webhook_sent=False, response_code=e.response.status_code + ) + + except httpx.ConnectError as e: + ctx.log(f"send_webhook failed (connection error), continuing anyway: {e}") + return WebhookResult(webhook_sent=False) + + except httpx.TimeoutException as e: + ctx.log(f"send_webhook failed (timeout), continuing anyway: {e}") + return WebhookResult(webhook_sent=False) + + except Exception as e: + ctx.log(f"send_webhook unexpected error, continuing anyway: {e}") + return WebhookResult(webhook_sent=False) diff --git a/server/reflector/services/transcript_process.py b/server/reflector/services/transcript_process.py index 0341304f..6a7fdbea 100644 --- a/server/reflector/services/transcript_process.py +++ b/server/reflector/services/transcript_process.py @@ -11,7 +11,7 @@ from typing import Literal, Union, assert_never import celery from celery.result import AsyncResult -from hatchet_sdk.clients.rest.exceptions import ApiException +from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException from hatchet_sdk.clients.rest.models import V1TaskStatus from reflector.db.recordings import recordings_controller @@ -212,24 +212,39 @@ async def dispatch_transcript_processing( ) return None else: - # Workflow exists but can't replay (CANCELLED, COMPLETED, etc.) + # Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted) # Log and proceed to start new workflow - status = await HatchetClientManager.get_workflow_run_status( - transcript.workflow_run_id - ) - logger.info( - "Old workflow not replayable, starting new", - old_workflow_id=transcript.workflow_run_id, - old_status=status.value, - ) + try: + status = await HatchetClientManager.get_workflow_run_status( + transcript.workflow_run_id + ) + logger.info( + "Old workflow not replayable, starting new", + old_workflow_id=transcript.workflow_run_id, + old_status=status.value, + ) + except NotFoundException: + # Workflow deleted from Hatchet but ID still in DB + logger.info( + "Old workflow not found in Hatchet, starting new", + old_workflow_id=transcript.workflow_run_id, + ) # Force: cancel old workflow if exists if force and transcript and transcript.workflow_run_id: - await HatchetClientManager.cancel_workflow(transcript.workflow_run_id) - logger.info( - "Cancelled old workflow (--force)", - workflow_id=transcript.workflow_run_id, - ) + try: + await HatchetClientManager.cancel_workflow( + transcript.workflow_run_id + ) + logger.info( + "Cancelled old workflow (--force)", + workflow_id=transcript.workflow_run_id, + ) + except NotFoundException: + logger.info( + "Old workflow already deleted (--force)", + workflow_id=transcript.workflow_run_id, + ) await transcripts_controller.update( transcript, {"workflow_run_id": None} )