more NES instead of str

This commit is contained in:
Igor Loskutov
2025-12-17 15:46:59 -05:00
parent cb41e9e779
commit 557073850e
3 changed files with 15 additions and 6 deletions

View File

@@ -1,5 +1,7 @@
"""WebSocket broadcasting helpers for Hatchet workflows. """WebSocket broadcasting helpers for Hatchet workflows.
DUPLICATION NOTE: To be kept when Celery is deprecated. Currently dupes Celery logic.
Provides WebSocket broadcasting for Hatchet that matches Celery's @broadcast_to_sockets Provides WebSocket broadcasting for Hatchet that matches Celery's @broadcast_to_sockets
decorator behavior. Events are broadcast to transcript rooms and user rooms. decorator behavior. Events are broadcast to transcript rooms and user rooms.
""" """
@@ -8,13 +10,16 @@ from typing import Any
from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller from reflector.db.transcripts import Transcript, TranscriptEvent, transcripts_controller
from reflector.logger import logger from reflector.logger import logger
from reflector.utils.string import NonEmptyString
from reflector.ws_manager import get_ws_manager from reflector.ws_manager import get_ws_manager
# Events that should also be sent to user room (matches Celery behavior) # Events that should also be sent to user room (matches Celery behavior)
USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"} USER_ROOM_EVENTS = {"STATUS", "FINAL_TITLE", "DURATION"}
async def broadcast_event(transcript_id: str, event: TranscriptEvent) -> None: async def broadcast_event(
transcript_id: NonEmptyString, event: TranscriptEvent
) -> None:
"""Broadcast a TranscriptEvent to WebSocket subscribers. """Broadcast a TranscriptEvent to WebSocket subscribers.
Fire-and-forget: errors are logged but don't interrupt workflow execution. Fire-and-forget: errors are logged but don't interrupt workflow execution.
@@ -56,7 +61,7 @@ async def broadcast_event(transcript_id: str, event: TranscriptEvent) -> None:
) )
async def set_status_and_broadcast(transcript_id: str, status: str) -> None: async def set_status_and_broadcast(transcript_id: NonEmptyString, status: str) -> None:
"""Set transcript status and broadcast to WebSocket. """Set transcript status and broadcast to WebSocket.
Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting. Wrapper around transcripts_controller.set_status that adds WebSocket broadcasting.
@@ -67,7 +72,7 @@ async def set_status_and_broadcast(transcript_id: str, status: str) -> None:
async def append_event_and_broadcast( async def append_event_and_broadcast(
transcript_id: str, transcript_id: NonEmptyString,
transcript: Transcript, transcript: Transcript,
event_name: str, event_name: str,
data: Any, data: Any,

View File

@@ -92,7 +92,11 @@ diarization_pipeline = hatchet.workflow(
@asynccontextmanager @asynccontextmanager
async def fresh_db_connection(): async def fresh_db_connection():
"""Context manager for database connections in Hatchet workers.""" """Context manager for database connections in Hatchet workers.
TECH DEBT: Made to make connection fork-aware without changing db code too much.
The real fix would be making the db module fork-aware instead of bypassing it.
Current pattern is acceptable given Hatchet's process model.
"""
import databases # noqa: PLC0415 import databases # noqa: PLC0415
from reflector.db import _database_context # noqa: PLC0415 from reflector.db import _database_context # noqa: PLC0415
@@ -108,7 +112,7 @@ async def fresh_db_connection():
_database_context.set(None) _database_context.set(None)
async def set_workflow_error_status(transcript_id: str) -> bool: async def set_workflow_error_status(transcript_id: NonEmptyString) -> bool:
"""Set transcript status to 'error' on workflow failure. """Set transcript status to 'error' on workflow failure.
Returns: Returns:

View File

@@ -208,7 +208,7 @@ class SummaryBuilder:
def _enhance_prompt_with_participants(self, prompt: str) -> str: def _enhance_prompt_with_participants(self, prompt: str) -> str:
"""Add participant instructions to any prompt if participants are known.""" """Add participant instructions to any prompt if participants are known."""
if self.participant_instructions: if self.participant_instructions:
self.logger.debug("Adding participant instructions to prompts") self.logger.debug("Adding participant instructions to prompt")
return f"{prompt}\n\n{self.participant_instructions}" return f"{prompt}\n\n{self.participant_instructions}"
return prompt return prompt