better log webhook events

This commit is contained in:
Igor Loskutov
2025-12-18 14:22:03 -05:00
parent af425e6dfd
commit 84c1a57c83
2 changed files with 36 additions and 15 deletions

View File

@@ -8,8 +8,9 @@ decorator behavior. Events are broadcast to transcript rooms and user rooms.
from typing import Any from typing import Any
import structlog
from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller
from reflector.logger import logger
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
from reflector.ws_manager import get_ws_manager from reflector.ws_manager import get_ws_manager
@@ -18,14 +19,16 @@ USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"}
async def broadcast_event( async def broadcast_event(
transcript_id: NonEmptyString, event: TranscriptEvent transcript_id: NonEmptyString,
event: TranscriptEvent,
logger: structlog.BoundLogger,
) -> None: ) -> None:
"""Broadcast a TranscriptEvent to WebSocket subscribers. """Broadcast a TranscriptEvent to WebSocket subscribers.
Fire-and-forget: errors are logged but don't interrupt workflow execution. Fire-and-forget: errors are logged but don't interrupt workflow execution.
""" """
logger.info( logger.info(
"[Hatchet Broadcast] Broadcasting event", "Broadcasting event",
transcript_id=transcript_id, transcript_id=transcript_id,
event_type=event.event, event_type=event.event,
) )
@@ -37,7 +40,7 @@ async def broadcast_event(
message=event.model_dump(mode="json"), message=event.model_dump(mode="json"),
) )
logger.info( logger.info(
"[Hatchet Broadcast] Event sent to transcript room", "Event sent to transcript room",
transcript_id=transcript_id, transcript_id=transcript_id,
event_type=event.event, event_type=event.event,
) )
@@ -54,21 +57,25 @@ async def broadcast_event(
) )
except Exception as e: except Exception as e:
logger.warning( logger.warning(
"[Hatchet Broadcast] Failed to broadcast event", "Failed to broadcast event",
error=str(e), error=str(e),
transcript_id=transcript_id, transcript_id=transcript_id,
event_type=event.event, event_type=event.event,
) )
async def set_status_and_broadcast(transcript_id: NonEmptyString, status: str) -> None: async def set_status_and_broadcast(
transcript_id: NonEmptyString,
status: str,
logger: structlog.BoundLogger,
) -> None:
"""Set transcript status and broadcast to WebSocket. """Set transcript status and broadcast to WebSocket.
Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting. Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting.
""" """
event = await transcripts_controller.set_status(transcript_id, status) event = await transcripts_controller.set_status(transcript_id, status)
if event: if event:
await broadcast_event(transcript_id, event) await broadcast_event(transcript_id, event, logger=logger)
async def append_event_and_broadcast( async def append_event_and_broadcast(
@@ -76,6 +83,7 @@ async def append_event_and_broadcast(
transcript: Transcript, transcript: Transcript,
event_name: str, event_name: str,
data: Any, data: Any,
logger: structlog.BoundLogger,
) -> TranscriptEvent: ) -> TranscriptEvent:
"""Append event to transcript and broadcast to WebSocket. """Append event to transcript and broadcast to WebSocket.
@@ -86,5 +94,5 @@ async def append_event_and_broadcast(
event=event_name, event=event_name,
data=data, data=data,
) )
await broadcast_event(transcript_id, event) await broadcast_event(transcript_id, event, logger=logger)
return event return event

View File

@@ -121,7 +121,7 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool:
""" """
try: try:
async with fresh_db_connection(): async with fresh_db_connection():
await set_status_and_broadcast(transcript_id, "error") await set_status_and_broadcast(transcript_id, "error", logger=logger)
return True return True
except Exception as e: except Exception as e:
logger.critical( logger.critical(
@@ -184,7 +184,9 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult:
transcript = await transcripts_controller.get_by_id(input.transcript_id) transcript = await transcripts_controller.get_by_id(input.transcript_id)
if transcript: if transcript:
await set_status_and_broadcast(input.transcript_id, "processing") await set_status_and_broadcast(
input.transcript_id, "processing", logger=logger
)
ctx.log(f"Set transcript status to processing: {input.transcript_id}") ctx.log(f"Set transcript status to processing: {input.transcript_id}")
if not settings.DAILY_API_KEY: if not settings.DAILY_API_KEY:
@@ -485,7 +487,11 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul
if transcript: if transcript:
waveform_data = TranscriptWaveform(waveform=waveform) waveform_data = TranscriptWaveform(waveform=waveform)
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, transcript, "WAVEFORM", waveform_data input.transcript_id,
transcript,
"WAVEFORM",
waveform_data,
logger=logger,
) )
finally: finally:
@@ -535,7 +541,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult:
topic.id = data.id topic.id = data.id
await transcripts_controller.upsert_topic(transcript, topic) await transcripts_controller.upsert_topic(transcript, topic)
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, transcript, "TOPIC", topic input.transcript_id, transcript, "TOPIC", topic, logger=logger
) )
topics = await topic_processing.detect_topics( topics = await topic_processing.detect_topics(
@@ -586,7 +592,11 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult:
{"title": final_title.title}, {"title": final_title.title},
) )
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, transcript, "FINAL_TITLE", final_title input.transcript_id,
transcript,
"FINAL_TITLE",
final_title,
logger=logger,
) )
await topic_processing.generate_title( await topic_processing.generate_title(
@@ -642,6 +652,7 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
transcript, transcript,
"FINAL_LONG_SUMMARY", "FINAL_LONG_SUMMARY",
final_long_summary, final_long_summary,
logger=logger,
) )
async def on_short_summary_callback(data): async def on_short_summary_callback(data):
@@ -659,6 +670,7 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult:
transcript, transcript,
"FINAL_SHORT_SUMMARY", "FINAL_SHORT_SUMMARY",
final_short_summary, final_short_summary,
logger=logger,
) )
await topic_processing.generate_summaries( await topic_processing.generate_summaries(
@@ -734,6 +746,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
text=merged_transcript.text, text=merged_transcript.text,
translation=merged_transcript.translation, translation=merged_transcript.translation,
), ),
logger=logger,
) )
# Save duration and clear workflow_run_id (workflow completed successfully) # Save duration and clear workflow_run_id (workflow completed successfully)
@@ -748,10 +761,10 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult:
duration_data = TranscriptDuration(duration=duration) duration_data = TranscriptDuration(duration=duration)
await append_event_and_broadcast( await append_event_and_broadcast(
input.transcript_id, transcript, "DURATION", duration_data input.transcript_id, transcript, "DURATION", duration_data, logger=logger
) )
await set_status_and_broadcast(input.transcript_id, "ended") await set_status_and_broadcast(input.transcript_id, "ended", logger=logger)
ctx.log( ctx.log(
f"finalize complete: transcript {input.transcript_id} status set to 'ended'" f"finalize complete: transcript {input.transcript_id} status set to 'ended'"