diff --git a/server/reflector/auth/auth_none.py b/server/reflector/auth/auth_none.py index 3ef7c923..4806a560 100644 --- a/server/reflector/auth/auth_none.py +++ b/server/reflector/auth/auth_none.py @@ -1,11 +1,5 @@ -from typing import Annotated - -from fastapi import Depends -from fastapi.security import OAuth2PasswordBearer from pydantic import BaseModel -oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token", auto_error=False) - class UserInfo(BaseModel): sub: str @@ -15,13 +9,13 @@ class AccessTokenInfo(BaseModel): pass -def authenticated(token: Annotated[str, Depends(oauth2_scheme)]): +def authenticated(): return None -def current_user(token: Annotated[str, Depends(oauth2_scheme)]): +def current_user(): return None -def current_user_optional(token: Annotated[str, Depends(oauth2_scheme)]): +def current_user_optional(): return None diff --git a/server/reflector/ws_manager.py b/server/reflector/ws_manager.py index fc3653bb..48f178f5 100644 --- a/server/reflector/ws_manager.py +++ b/server/reflector/ws_manager.py @@ -48,7 +48,15 @@ class RedisPubSubManager: if not self.redis_connection: await self.connect() message = json.dumps(message) - await self.redis_connection.publish(room_id, message) + try: + await self.redis_connection.publish(room_id, message) + except RuntimeError: + # Celery workers run each task in a new event loop (asyncio.run), + # which closes the previous loop. Cached Redis connection is dead. + # Reconnect on the current loop and retry. + self.redis_connection = None + await self.connect() + await self.redis_connection.publish(room_id, message) async def subscribe(self, room_id: str) -> redis.Redis: await self.pubsub.subscribe(room_id) diff --git a/www/app/(app)/transcripts/[transcriptId]/processing/page.tsx b/www/app/(app)/transcripts/[transcriptId]/processing/page.tsx index 0b7affaf..c8ac418e 100644 --- a/www/app/(app)/transcripts/[transcriptId]/processing/page.tsx +++ b/www/app/(app)/transcripts/[transcriptId]/processing/page.tsx @@ -11,6 +11,7 @@ import { import { useRouter } from "next/navigation"; import { useTranscriptGet } from "../../../../lib/apiHooks"; import { parseNonEmptyString } from "../../../../lib/utils"; +import { useWebSockets } from "../../useWebSockets"; type TranscriptProcessing = { params: Promise<{ @@ -24,6 +25,7 @@ export default function TranscriptProcessing(details: TranscriptProcessing) { const router = useRouter(); const transcript = useTranscriptGet(transcriptId); + useWebSockets(transcriptId); useEffect(() => { const status = transcript.data?.status;