hatchet processing resilence several fixes (#831)

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
This commit is contained in:
2026-01-22 19:03:33 -05:00
committed by GitHub
parent 8fc8d8bf4a
commit 6e786b7631
2 changed files with 61 additions and 26 deletions

View File

@@ -1149,8 +1149,8 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
transcript, transcript,
"TRANSCRIPT", "TRANSCRIPT",
TranscriptText( TranscriptText(
text=merged_transcript.text, text="",
translation=merged_transcript.translation, translation=None,
), ),
logger=logger, logger=logger,
) )
@@ -1347,14 +1347,34 @@ async def send_webhook(input: PipelineInput, ctx: Context) -> WebhookResult:
f"participants={len(payload.transcript.participants)})" f"participants={len(payload.transcript.participants)})"
) )
response = await send_webhook_request( try:
url=room.webhook_url, response = await send_webhook_request(
payload=payload, url=room.webhook_url,
event_type="transcript.completed", payload=payload,
webhook_secret=room.webhook_secret, event_type="transcript.completed",
timeout=30.0, webhook_secret=room.webhook_secret,
) timeout=30.0,
)
ctx.log(f"send_webhook complete: status_code={response.status_code}") ctx.log(f"send_webhook complete: status_code={response.status_code}")
return WebhookResult(webhook_sent=True, response_code=response.status_code)
return WebhookResult(webhook_sent=True, response_code=response.status_code) except httpx.HTTPStatusError as e:
ctx.log(
f"send_webhook failed (HTTP {e.response.status_code}), continuing anyway"
)
return WebhookResult(
webhook_sent=False, response_code=e.response.status_code
)
except httpx.ConnectError as e:
ctx.log(f"send_webhook failed (connection error), continuing anyway: {e}")
return WebhookResult(webhook_sent=False)
except httpx.TimeoutException as e:
ctx.log(f"send_webhook failed (timeout), continuing anyway: {e}")
return WebhookResult(webhook_sent=False)
except Exception as e:
ctx.log(f"send_webhook unexpected error, continuing anyway: {e}")
return WebhookResult(webhook_sent=False)

View File

@@ -11,7 +11,7 @@ from typing import Literal, Union, assert_never
import celery import celery
from celery.result import AsyncResult from celery.result import AsyncResult
from hatchet_sdk.clients.rest.exceptions import ApiException from hatchet_sdk.clients.rest.exceptions import ApiException, NotFoundException
from hatchet_sdk.clients.rest.models import V1TaskStatus from hatchet_sdk.clients.rest.models import V1TaskStatus
from reflector.db.recordings import recordings_controller from reflector.db.recordings import recordings_controller
@@ -212,24 +212,39 @@ async def dispatch_transcript_processing(
) )
return None return None
else: else:
# Workflow exists but can't replay (CANCELLED, COMPLETED, etc.) # Workflow can't replay (CANCELLED, COMPLETED, or 404 deleted)
# Log and proceed to start new workflow # Log and proceed to start new workflow
status = await HatchetClientManager.get_workflow_run_status( try:
transcript.workflow_run_id status = await HatchetClientManager.get_workflow_run_status(
) transcript.workflow_run_id
logger.info( )
"Old workflow not replayable, starting new", logger.info(
old_workflow_id=transcript.workflow_run_id, "Old workflow not replayable, starting new",
old_status=status.value, old_workflow_id=transcript.workflow_run_id,
) old_status=status.value,
)
except NotFoundException:
# Workflow deleted from Hatchet but ID still in DB
logger.info(
"Old workflow not found in Hatchet, starting new",
old_workflow_id=transcript.workflow_run_id,
)
# Force: cancel old workflow if exists # Force: cancel old workflow if exists
if force and transcript and transcript.workflow_run_id: if force and transcript and transcript.workflow_run_id:
await HatchetClientManager.cancel_workflow(transcript.workflow_run_id) try:
logger.info( await HatchetClientManager.cancel_workflow(
"Cancelled old workflow (--force)", transcript.workflow_run_id
workflow_id=transcript.workflow_run_id, )
) logger.info(
"Cancelled old workflow (--force)",
workflow_id=transcript.workflow_run_id,
)
except NotFoundException:
logger.info(
"Old workflow already deleted (--force)",
workflow_id=transcript.workflow_run_id,
)
await transcripts_controller.update( await transcripts_controller.update(
transcript, {"workflow_run_id": None} transcript, {"workflow_run_id": None}
) )