diff --git a/server/reflector/hatchet/broadcast.py b/server/reflector/hatchet/broadcast.py index e9439e61..ba56c078 100644 --- a/server/reflector/hatchet/broadcast.py +++ b/server/reflector/hatchet/broadcast.py @@ -1,5 +1,7 @@ """WebSocket broadcasting helpers for Hatchet workflows. +DUPLICATION NOTE: To be kept when Celery is deprecated. Currently dupes Celery logic. + Provides WebSocket broadcasting for Hatchet that matches Celery's @broadcast_to_sockets decorator behavior. Events are broadcast to transcript rooms and user rooms. """ @@ -8,13 +10,16 @@ from typing import Any from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller from reflector.logger import logger +from reflector.utils.string import NonEmptyString from reflector.ws_manager import get_ws_manager # Events that should also be sent to user room (matches Celery behavior) USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"} -async def broadcast_event(transcript_id: str, event: TranscriptEvent) -> None: +async def broadcast_event( + transcript_id: NonEmptyString, event: TranscriptEvent +) -> None: """Broadcast a TranscriptEvent to WebSocket subscribers. Fire-and-forget: errors are logged but don't interrupt workflow execution. @@ -56,7 +61,7 @@ async def broadcast_event(transcript_id: str, event: TranscriptEvent) -> None: ) -async def set_status_and_broadcast(transcript_id: str, status: str) -> None: +async def set_status_and_broadcast(transcript_id: NonEmptyString, status: str) -> None: """Set transcript status and broadcast to WebSocket. Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting. @@ -67,7 +72,7 @@ async def set_status_and_broadcast(transcript_id: str, status: str) -> None: async def append_event_and_broadcast( - transcript_id: str, + transcript_id: NonEmptyString, transcript: Transcript, event_name: str, data: Any, diff --git a/server/reflector/hatchet/workflows/diarization_pipeline.py b/server/reflector/hatchet/workflows/diarization_pipeline.py index e5aba59a..a137efa1 100644 --- a/server/reflector/hatchet/workflows/diarization_pipeline.py +++ b/server/reflector/hatchet/workflows/diarization_pipeline.py @@ -92,7 +92,11 @@ diarization_pipeline = hatchet.workflow( @asynccontextmanager async def fresh_db_connection(): - """Context manager for database connections in Hatchet workers.""" + """Context manager for database connections in Hatchet workers. + TECH DEBT: Made to make connection fork-aware without changing db code too much. + The real fix would be making the db module fork-aware instead of bypassing it. + Current pattern is acceptable given Hatchet's process model. + """ import databases # noqa: PLC0415 from reflector.db import _database_context # noqa: PLC0415 @@ -108,7 +112,7 @@ async def fresh_db_connection(): _database_context.set(None) -async def set_workflow_error_status(transcript_id: str) -> bool: +async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool: """Set transcript status to 'error' on workflow failure. Returns: diff --git a/server/reflector/processors/summary/summary_builder.py b/server/reflector/processors/summary/summary_builder.py index c81c7669..df348093 100644 --- a/server/reflector/processors/summary/summary_builder.py +++ b/server/reflector/processors/summary/summary_builder.py @@ -208,7 +208,7 @@ class SummaryBuilder: def _enhance_prompt_with_participants(self, prompt: str) -> str: """Add participant instructions to any prompt if participants are known.""" if self.participant_instructions: - self.logger.debug("Adding participant instructions to prompts") + self.logger.debug("Adding participant instructions to prompt") return f"{prompt}\n\n{self.participant_instructions}" return prompt