diff --git a/server/reflector/hatchet/broadcast.py b/server/reflector/hatchet/broadcast.py index ba56c078..58b05f11 100644 --- a/server/reflector/hatchet/broadcast.py +++ b/server/reflector/hatchet/broadcast.py @@ -8,8 +8,9 @@ decorator behavior. Events are broadcast to transcript rooms and user rooms. from typing import Any +import structlog + from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller -from reflector.logger import logger from reflector.utils.string import NonEmptyString from reflector.ws_manager import get_ws_manager @@ -18,14 +19,16 @@ USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"} async def broadcast_event( - transcript_id: NonEmptyString, event: TranscriptEvent + transcript_id: NonEmptyString, + event: TranscriptEvent, + logger: structlog.BoundLogger, ) -> None: """Broadcast a TranscriptEvent to WebSocket subscribers. Fire-and-forget: errors are logged but don't interrupt workflow execution. """ logger.info( - "[Hatchet Broadcast] Broadcasting event", + "Broadcasting event", transcript_id=transcript_id, event_type=event.event, ) @@ -37,7 +40,7 @@ async def broadcast_event( message=event.model_dump(mode="json"), ) logger.info( - "[Hatchet Broadcast] Event sent to transcript room", + "Event sent to transcript room", transcript_id=transcript_id, event_type=event.event, ) @@ -54,21 +57,25 @@ async def broadcast_event( ) except Exception as e: logger.warning( - "[Hatchet Broadcast] Failed to broadcast event", + "Failed to broadcast event", error=str(e), transcript_id=transcript_id, event_type=event.event, ) -async def set_status_and_broadcast(transcript_id: NonEmptyString, status: str) -> None: +async def set_status_and_broadcast( + transcript_id: NonEmptyString, + status: str, + logger: structlog.BoundLogger, +) -> None: """Set transcript status and broadcast to WebSocket. Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting. """ event = await transcripts_controller.set_status(transcript_id, status) if event: - await broadcast_event(transcript_id, event) + await broadcast_event(transcript_id, event, logger=logger) async def append_event_and_broadcast( @@ -76,6 +83,7 @@ async def append_event_and_broadcast( transcript: Transcript, event_name: str, data: Any, + logger: structlog.BoundLogger, ) -> TranscriptEvent: """Append event to transcript and broadcast to WebSocket. @@ -86,5 +94,5 @@ async def append_event_and_broadcast( event=event_name, data=data, ) - await broadcast_event(transcript_id, event) + await broadcast_event(transcript_id, event, logger=logger) return event diff --git a/server/reflector/hatchet/workflows/diarization_pipeline.py b/server/reflector/hatchet/workflows/diarization_pipeline.py index f5f5b4b4..dbbda268 100644 --- a/server/reflector/hatchet/workflows/diarization_pipeline.py +++ b/server/reflector/hatchet/workflows/diarization_pipeline.py @@ -121,7 +121,7 @@ async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool: """ try: async with fresh_db_connection(): - await set_status_and_broadcast(transcript_id, "error") + await set_status_and_broadcast(transcript_id, "error", logger=logger) return True except Exception as e: logger.critical( @@ -184,7 +184,9 @@ async def get_recording(input: PipelineInput, ctx: Context) -> RecordingResult: transcript = await transcripts_controller.get_by_id(input.transcript_id) if transcript: - await set_status_and_broadcast(input.transcript_id, "processing") + await set_status_and_broadcast( + input.transcript_id, "processing", logger=logger + ) ctx.log(f"Set transcript status to processing: {input.transcript_id}") if not settings.DAILY_API_KEY: @@ -485,7 +487,11 @@ async def generate_waveform(input: PipelineInput, ctx: Context) -> WaveformResul if transcript: waveform_data = TranscriptWaveform(waveform=waveform) await append_event_and_broadcast( - input.transcript_id, transcript, "WAVEFORM", waveform_data + input.transcript_id, + transcript, + "WAVEFORM", + waveform_data, + logger=logger, ) finally: @@ -535,7 +541,7 @@ async def detect_topics(input: PipelineInput, ctx: Context) -> TopicsResult: topic.id = data.id await transcripts_controller.upsert_topic(transcript, topic) await append_event_and_broadcast( - input.transcript_id, transcript, "TOPIC", topic + input.transcript_id, transcript, "TOPIC", topic, logger=logger ) topics = await topic_processing.detect_topics( @@ -586,7 +592,11 @@ async def generate_title(input: PipelineInput, ctx: Context) -> TitleResult: {"title": final_title.title}, ) await append_event_and_broadcast( - input.transcript_id, transcript, "FINAL_TITLE", final_title + input.transcript_id, + transcript, + "FINAL_TITLE", + final_title, + logger=logger, ) await topic_processing.generate_title( @@ -642,6 +652,7 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult: transcript, "FINAL_LONG_SUMMARY", final_long_summary, + logger=logger, ) async def on_short_summary_callback(data): @@ -659,6 +670,7 @@ async def generate_summary(input: PipelineInput, ctx: Context) -> SummaryResult: transcript, "FINAL_SHORT_SUMMARY", final_short_summary, + logger=logger, ) await topic_processing.generate_summaries( @@ -734,6 +746,7 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: text=merged_transcript.text, translation=merged_transcript.translation, ), + logger=logger, ) # Save duration and clear workflow_run_id (workflow completed successfully) @@ -748,10 +761,10 @@ async def finalize(input: PipelineInput, ctx: Context) -> FinalizeResult: duration_data = TranscriptDuration(duration=duration) await append_event_and_broadcast( - input.transcript_id, transcript, "DURATION", duration_data + input.transcript_id, transcript, "DURATION", duration_data, logger=logger ) - await set_status_and_broadcast(input.transcript_id, "ended") + await set_status_and_broadcast(input.transcript_id, "ended", logger=logger) ctx.log( f"finalize complete: transcript {input.transcript_id} status set to 'ended'"