Compare commits

..

6 Commits

Author SHA1 Message Date
dabf7251db chore(main): release 0.22.2 (#756) 2025-12-01 23:39:32 -05:00
Igor Monadical
b51b7aa917 fix: Skip mixdown for multitrack (#760)
* multitrack mixdown optimisation

* skip mixdown for multitrack

* skip mixdown for multitrack

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-12-01 23:35:12 -05:00
Igor Monadical
a8983b4e7e daily auth hotfix (#757)
Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
2025-11-28 14:52:59 -05:00
Igor Monadical
fe47c46489 fix: daily auto refresh fix (#755)
* daily auto refresh fix

* Update www/app/lib/AuthProvider.tsx

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

* Update www/app/[roomName]/components/DailyRoom.tsx

Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>

* fix bot lint

---------

Co-authored-by: Igor Loskutov <igor.loskutoff@gmail.com>
Co-authored-by: pr-agent-monadical[bot] <198624643+pr-agent-monadical[bot]@users.noreply.github.com>
2025-11-27 18:31:03 -05:00
a2bb6a27d6 chore(main): release 0.22.1 (#750) 2025-11-27 16:55:08 +01:00
7f0b728991 fix: participants update from daily (#749)
* Fix participants update from daily

* Use track keys from params
2025-11-27 16:53:26 +01:00
9 changed files with 209 additions and 140 deletions

View File

@@ -1,5 +1,20 @@
# Changelog # Changelog
## [0.22.2](https://github.com/Monadical-SAS/reflector/compare/v0.22.1...v0.22.2) (2025-12-02)
### Bug Fixes
* daily auto refresh fix ([#755](https://github.com/Monadical-SAS/reflector/issues/755)) ([fe47c46](https://github.com/Monadical-SAS/reflector/commit/fe47c46489c5aa0cc538109f7559cc9accb35c01))
* Skip mixdown for multitrack ([#760](https://github.com/Monadical-SAS/reflector/issues/760)) ([b51b7aa](https://github.com/Monadical-SAS/reflector/commit/b51b7aa9176c1a53ba57ad99f5e976c804a1e80c))
## [0.22.1](https://github.com/Monadical-SAS/reflector/compare/v0.22.0...v0.22.1) (2025-11-27)
### Bug Fixes
* participants update from daily ([#749](https://github.com/Monadical-SAS/reflector/issues/749)) ([7f0b728](https://github.com/Monadical-SAS/reflector/commit/7f0b728991c1b9f9aae702c96297eae63b561ef5))
## [0.22.0](https://github.com/Monadical-SAS/reflector/compare/v0.21.0...v0.22.0) (2025-11-26) ## [0.22.0](https://github.com/Monadical-SAS/reflector/compare/v0.21.0...v0.22.0) (2025-11-26)

View File

@@ -68,7 +68,7 @@ class MeetingParticipant(BaseModel):
Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-participants Reference: https://docs.daily.co/reference/rest-api/meetings/get-meeting-participants
""" """
user_id: NonEmptyString = Field(description="User identifier") user_id: NonEmptyString | None = Field(None, description="User identifier")
participant_id: NonEmptyString = Field(description="Participant session identifier") participant_id: NonEmptyString = Field(description="Participant session identifier")
user_name: NonEmptyString | None = Field(None, description="User display name") user_name: NonEmptyString | None = Field(None, description="User display name")
join_time: int = Field(description="Join timestamp (Unix epoch seconds)") join_time: int = Field(description="Join timestamp (Unix epoch seconds)")

View File

@@ -9,7 +9,10 @@ from av.audio.resampler import AudioResampler
from celery import chain, shared_task from celery import chain, shared_task
from reflector.asynctask import asynctask from reflector.asynctask import asynctask
from reflector.dailyco_api import MeetingParticipantsResponse
from reflector.db.transcripts import ( from reflector.db.transcripts import (
Transcript,
TranscriptParticipant,
TranscriptStatus, TranscriptStatus,
TranscriptWaveform, TranscriptWaveform,
transcripts_controller, transcripts_controller,
@@ -28,8 +31,14 @@ from reflector.processors import AudioFileWriterProcessor
from reflector.processors.audio_waveform_processor import AudioWaveformProcessor from reflector.processors.audio_waveform_processor import AudioWaveformProcessor
from reflector.processors.types import TitleSummary from reflector.processors.types import TitleSummary
from reflector.processors.types import Transcript as TranscriptType from reflector.processors.types import Transcript as TranscriptType
from reflector.settings import settings
from reflector.storage import Storage, get_transcripts_storage from reflector.storage import Storage, get_transcripts_storage
from reflector.utils.daily import (
filter_cam_audio_tracks,
parse_daily_recording_filename,
)
from reflector.utils.string import NonEmptyString from reflector.utils.string import NonEmptyString
from reflector.video_platforms.factory import create_platform_client
# Audio encoding constants # Audio encoding constants
OPUS_STANDARD_SAMPLE_RATE = 48000 OPUS_STANDARD_SAMPLE_RATE = 48000
@@ -494,6 +503,90 @@ class PipelineMainMultitrack(PipelineMainBase):
transcript=transcript, event="WAVEFORM", data=waveform transcript=transcript, event="WAVEFORM", data=waveform
) )
async def update_participants_from_daily(
self, transcript: Transcript, track_keys: list[str]
) -> None:
"""Update transcript participants with user_id and names from Daily.co API."""
if not transcript.recording_id:
return
try:
async with create_platform_client("daily") as daily_client:
id_to_name = {}
id_to_user_id = {}
try:
rec_details = await daily_client.get_recording(
transcript.recording_id
)
mtg_session_id = rec_details.mtgSessionId
if mtg_session_id:
try:
payload: MeetingParticipantsResponse = (
await daily_client.get_meeting_participants(
mtg_session_id
)
)
for p in payload.data:
pid = p.participant_id
name = p.user_name
user_id = p.user_id
if name:
id_to_name[pid] = name
if user_id:
id_to_user_id[pid] = user_id
except Exception as e:
self.logger.warning(
"Failed to fetch Daily meeting participants",
error=str(e),
mtg_session_id=mtg_session_id,
exc_info=True,
)
else:
self.logger.warning(
"No mtgSessionId found for recording; participant names may be generic",
recording_id=transcript.recording_id,
)
except Exception as e:
self.logger.warning(
"Failed to fetch Daily recording details",
error=str(e),
recording_id=transcript.recording_id,
exc_info=True,
)
return
cam_audio_keys = filter_cam_audio_tracks(track_keys)
for idx, key in enumerate(cam_audio_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
self.logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
exc_info=True,
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
)
await transcripts_controller.upsert_participant(
transcript, participant
)
except Exception as e:
self.logger.warning(
"Failed to map participant names", error=str(e), exc_info=True
)
async def process(self, bucket_name: str, track_keys: list[str]): async def process(self, bucket_name: str, track_keys: list[str]):
transcript = await self.get_transcript() transcript = await self.get_transcript()
async with self.transaction(): async with self.transaction():
@@ -502,9 +595,12 @@ class PipelineMainMultitrack(PipelineMainBase):
{ {
"events": [], "events": [],
"topics": [], "topics": [],
"participants": [],
}, },
) )
await self.update_participants_from_daily(transcript, track_keys)
source_storage = get_transcripts_storage() source_storage = get_transcripts_storage()
transcript_storage = source_storage transcript_storage = source_storage
@@ -536,11 +632,21 @@ class PipelineMainMultitrack(PipelineMainBase):
transcript.data_path.mkdir(parents=True, exist_ok=True) transcript.data_path.mkdir(parents=True, exist_ok=True)
if settings.SKIP_MIXDOWN:
self.logger.warning(
"SKIP_MIXDOWN enabled: Skipping mixdown and waveform generation. "
"UI will have no audio playback or waveform.",
num_tracks=len(padded_track_urls),
transcript_id=transcript.id,
)
else:
mp3_writer = AudioFileWriterProcessor( mp3_writer = AudioFileWriterProcessor(
path=str(transcript.audio_mp3_filename), path=str(transcript.audio_mp3_filename),
on_duration=self.on_duration, on_duration=self.on_duration,
) )
await self.mixdown_tracks(padded_track_urls, mp3_writer, offsets_seconds=None) await self.mixdown_tracks(
padded_track_urls, mp3_writer, offsets_seconds=None
)
await mp3_writer.flush() await mp3_writer.flush()
if not transcript.audio_mp3_filename.exists(): if not transcript.audio_mp3_filename.exists():
@@ -555,7 +661,9 @@ class PipelineMainMultitrack(PipelineMainBase):
await transcript_storage.put_file(storage_path, mp3_file) await transcript_storage.put_file(storage_path, mp3_file)
mp3_url = await transcript_storage.get_file_url(storage_path) mp3_url = await transcript_storage.get_file_url(storage_path)
await transcripts_controller.update(transcript, {"audio_location": "storage"}) await transcripts_controller.update(
transcript, {"audio_location": "storage"}
)
self.logger.info( self.logger.info(
f"Uploaded mixed audio to storage", f"Uploaded mixed audio to storage",

View File

@@ -138,6 +138,14 @@ class Settings(BaseSettings):
DAILY_WEBHOOK_UUID: str | None = ( DAILY_WEBHOOK_UUID: str | None = (
None # Webhook UUID for this environment. Not used by production code None # Webhook UUID for this environment. Not used by production code
) )
# Multitrack processing
# SKIP_MIXDOWN: When True, skips audio mixdown and waveform generation.
# Transcription still works using individual tracks. Useful for:
# - Diagnosing OOM issues in mixdown
# - Fast processing when audio playback is not needed
# Note: UI will have no audio playback or waveform when enabled.
SKIP_MIXDOWN: bool = True
# Platform Configuration # Platform Configuration
DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM DEFAULT_VIDEO_PLATFORM: Platform = WHEREBY_PLATFORM

View File

@@ -12,7 +12,7 @@ from celery import shared_task
from celery.utils.log import get_task_logger from celery.utils.log import get_task_logger
from pydantic import ValidationError from pydantic import ValidationError
from reflector.dailyco_api import MeetingParticipantsResponse, RecordingResponse from reflector.dailyco_api import RecordingResponse
from reflector.db.daily_participant_sessions import ( from reflector.db.daily_participant_sessions import (
DailyParticipantSession, DailyParticipantSession,
daily_participant_sessions_controller, daily_participant_sessions_controller,
@@ -22,7 +22,6 @@ from reflector.db.recordings import Recording, recordings_controller
from reflector.db.rooms import rooms_controller from reflector.db.rooms import rooms_controller
from reflector.db.transcripts import ( from reflector.db.transcripts import (
SourceKind, SourceKind,
TranscriptParticipant,
transcripts_controller, transcripts_controller,
) )
from reflector.pipelines.main_file_pipeline import task_pipeline_file_process from reflector.pipelines.main_file_pipeline import task_pipeline_file_process
@@ -40,7 +39,6 @@ from reflector.utils.daily import (
DailyRoomName, DailyRoomName,
extract_base_room_name, extract_base_room_name,
filter_cam_audio_tracks, filter_cam_audio_tracks,
parse_daily_recording_filename,
recording_lock_key, recording_lock_key,
) )
from reflector.video_platforms.factory import create_platform_client from reflector.video_platforms.factory import create_platform_client
@@ -275,15 +273,7 @@ async def _process_multitrack_recording_inner(
# else: Recording already exists; metadata set at creation time # else: Recording already exists; metadata set at creation time
transcript = await transcripts_controller.get_by_recording_id(recording.id) transcript = await transcripts_controller.get_by_recording_id(recording.id)
if transcript: if not transcript:
await transcripts_controller.update(
transcript,
{
"topics": [],
"participants": [],
},
)
else:
transcript = await transcripts_controller.add( transcript = await transcripts_controller.add(
"", "",
source_kind=SourceKind.ROOM, source_kind=SourceKind.ROOM,
@@ -296,77 +286,6 @@ async def _process_multitrack_recording_inner(
room_id=room.id, room_id=room.id,
) )
try:
async with create_platform_client("daily") as daily_client:
id_to_name = {}
id_to_user_id = {}
try:
rec_details = await daily_client.get_recording(recording_id)
mtg_session_id = rec_details.mtgSessionId
if mtg_session_id:
try:
payload: MeetingParticipantsResponse = (
await daily_client.get_meeting_participants(mtg_session_id)
)
for p in payload.data:
pid = p.participant_id
assert (
pid is not None
), "panic! participant id cannot be None"
name = p.user_name
user_id = p.user_id
if name:
id_to_name[pid] = name
if user_id:
id_to_user_id[pid] = user_id
except Exception as e:
logger.warning(
"Failed to fetch Daily meeting participants",
error=str(e),
mtg_session_id=mtg_session_id,
exc_info=True,
)
else:
logger.warning(
"No mtgSessionId found for recording; participant names may be generic",
recording_id=recording_id,
)
except Exception as e:
logger.warning(
"Failed to fetch Daily recording details",
error=str(e),
recording_id=recording_id,
exc_info=True,
)
cam_audio_keys = filter_cam_audio_tracks(track_keys)
for idx, key in enumerate(cam_audio_keys):
try:
parsed = parse_daily_recording_filename(key)
participant_id = parsed.participant_id
except ValueError as e:
logger.error(
"Failed to parse Daily recording filename",
error=str(e),
key=key,
exc_info=True,
)
continue
default_name = f"Speaker {idx}"
name = id_to_name.get(participant_id, default_name)
user_id = id_to_user_id.get(participant_id)
participant = TranscriptParticipant(
id=participant_id, speaker=idx, name=name, user_id=user_id
)
await transcripts_controller.upsert_participant(transcript, participant)
except Exception as e:
logger.warning("Failed to map participant names", error=str(e), exc_info=True)
task_pipeline_multitrack_process.delay( task_pipeline_multitrack_process.delay(
transcript_id=transcript.id, transcript_id=transcript.id,
bucket_name=bucket_name, bucket_name=bucket_name,

View File

@@ -117,15 +117,6 @@ export default function TranscriptDetails(details: TranscriptDetails) {
return <Modal title="Loading" text={"Loading transcript..."} />; return <Modal title="Loading" text={"Loading transcript..."} />;
} }
if (mp3.error) {
return (
<Modal
title="Transcription error"
text={`There was an error loading the recording. Error: ${mp3.error}`}
/>
);
}
return ( return (
<> <>
<Grid <Grid
@@ -147,7 +138,12 @@ export default function TranscriptDetails(details: TranscriptDetails) {
/> />
) : !mp3.loading && (waveform.error || mp3.error) ? ( ) : !mp3.loading && (waveform.error || mp3.error) ? (
<Box p={4} bg="red.100" borderRadius="md"> <Box p={4} bg="red.100" borderRadius="md">
<Text>Error loading this recording</Text> <Text>
Error loading{" "}
{[waveform.error && "waveform", mp3.error && "mp3"]
.filter(Boolean)
.join(" and ")}
</Text>
</Box> </Box>
) : ( ) : (
<Skeleton h={14} /> <Skeleton h={14} />

View File

@@ -11,6 +11,7 @@ import {
recordingTypeRequiresConsent, recordingTypeRequiresConsent,
} from "../../lib/consent"; } from "../../lib/consent";
import { useRoomJoinMeeting } from "../../lib/apiHooks"; import { useRoomJoinMeeting } from "../../lib/apiHooks";
import { assertExists } from "../../lib/utils";
type Meeting = components["schemas"]["Meeting"]; type Meeting = components["schemas"]["Meeting"];
@@ -22,16 +23,15 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
const router = useRouter(); const router = useRouter();
const params = useParams(); const params = useParams();
const auth = useAuth(); const auth = useAuth();
const status = auth.status; const authLastUserId = auth.lastUserId;
const containerRef = useRef<HTMLDivElement>(null); const containerRef = useRef<HTMLDivElement>(null);
const joinMutation = useRoomJoinMeeting(); const joinMutation = useRoomJoinMeeting();
const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null); const [joinedMeeting, setJoinedMeeting] = useState<Meeting | null>(null);
const roomName = params?.roomName as string; const roomName = params?.roomName as string;
// Always call /join to get a fresh token with user_id
useEffect(() => { useEffect(() => {
if (status === "loading" || !meeting?.id || !roomName) return; if (authLastUserId === undefined || !meeting?.id || !roomName) return;
const join = async () => { const join = async () => {
try { try {
@@ -50,18 +50,17 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
}; };
join(); join();
}, [meeting?.id, roomName, status]); }, [meeting?.id, roomName, authLastUserId]);
const roomUrl = joinedMeeting?.host_room_url || joinedMeeting?.room_url; const roomUrl = joinedMeeting?.host_room_url || joinedMeeting?.room_url;
const isLoading =
status === "loading" || joinMutation.isPending || !joinedMeeting;
const handleLeave = useCallback(() => { const handleLeave = useCallback(() => {
router.push("/browse"); router.push("/browse");
}, [router]); }, [router]);
useEffect(() => { useEffect(() => {
if (isLoading || !roomUrl || !containerRef.current) return; if (authLastUserId === undefined || !roomUrl || !containerRef.current)
return;
let frame: DailyCall | null = null; let frame: DailyCall | null = null;
let destroyed = false; let destroyed = false;
@@ -90,9 +89,14 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
frame.on("left-meeting", handleLeave); frame.on("left-meeting", handleLeave);
// TODO this method must not ignore no-recording option
// TODO this method is here to make dev environments work in some cases (not examined which)
frame.on("joined-meeting", async () => { frame.on("joined-meeting", async () => {
try { try {
await frame.startRecording({ type: "raw-tracks" }); assertExists(
frame,
"frame object got lost somewhere after frame.on was called",
).startRecording({ type: "raw-tracks" });
} catch (error) { } catch (error) {
console.error("Failed to start recording:", error); console.error("Failed to start recording:", error);
} }
@@ -104,7 +108,9 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
} }
}; };
createAndJoin(); createAndJoin().catch((error) => {
console.error("Failed to create and join meeting:", error);
});
return () => { return () => {
destroyed = true; destroyed = true;
@@ -114,9 +120,9 @@ export default function DailyRoom({ meeting }: DailyRoomProps) {
}); });
} }
}; };
}, [roomUrl, isLoading, handleLeave]); }, [roomUrl, authLastUserId, handleLeave]);
if (isLoading) { if (authLastUserId === undefined) {
return ( return (
<Center width="100vw" height="100vh"> <Center width="100vw" height="100vh">
<Spinner size="xl" /> <Spinner size="xl" />

View File

@@ -1,6 +1,6 @@
"use client"; "use client";
import { createContext, useContext } from "react"; import { createContext, useContext, useRef } from "react";
import { useSession as useNextAuthSession } from "next-auth/react"; import { useSession as useNextAuthSession } from "next-auth/react";
import { signOut, signIn } from "next-auth/react"; import { signOut, signIn } from "next-auth/react";
import { configureApiAuth } from "./apiClient"; import { configureApiAuth } from "./apiClient";
@@ -25,6 +25,9 @@ type AuthContextType = (
update: () => Promise<Session | null>; update: () => Promise<Session | null>;
signIn: typeof signIn; signIn: typeof signIn;
signOut: typeof signOut; signOut: typeof signOut;
// TODO probably rename isLoading to isReloading and make THIS field "isLoading"
// undefined is "not known", null is "is certainly logged out"
lastUserId: CustomSession["user"]["id"] | null | undefined;
}; };
const AuthContext = createContext<AuthContextType | undefined>(undefined); const AuthContext = createContext<AuthContextType | undefined>(undefined);
@@ -41,10 +44,15 @@ const noopAuthContext: AuthContextType = {
signOut: async () => { signOut: async () => {
throw new Error("signOut not supposed to be called"); throw new Error("signOut not supposed to be called");
}, },
lastUserId: undefined,
}; };
export function AuthProvider({ children }: { children: React.ReactNode }) { export function AuthProvider({ children }: { children: React.ReactNode }) {
const { data: session, status, update } = useNextAuthSession(); const { data: session, status, update } = useNextAuthSession();
// referential comparison done in component, must be primitive /or cached
const lastUserId = useRef<CustomSession["user"]["id"] | null | undefined>(
null,
);
const contextValue: AuthContextType = isAuthEnabled const contextValue: AuthContextType = isAuthEnabled
? { ? {
@@ -73,11 +81,16 @@ export function AuthProvider({ children }: { children: React.ReactNode }) {
case "authenticated": { case "authenticated": {
const customSession = assertCustomSession(session); const customSession = assertCustomSession(session);
if (customSession?.error === REFRESH_ACCESS_TOKEN_ERROR) { if (customSession?.error === REFRESH_ACCESS_TOKEN_ERROR) {
// warning: call order-dependent
lastUserId.current = null;
// token had expired but next auth still returns "authenticated" so show user unauthenticated state // token had expired but next auth still returns "authenticated" so show user unauthenticated state
return { return {
status: "unauthenticated" as const, status: "unauthenticated" as const,
}; };
} else if (customSession?.accessToken) { } else if (customSession?.accessToken) {
// updates anyways with updated properties below
// warning! execution order conscience, must be ran before reading lastUserId.current below
lastUserId.current = customSession.user.id;
return { return {
status, status,
accessToken: customSession.accessToken, accessToken: customSession.accessToken,
@@ -92,6 +105,8 @@ export function AuthProvider({ children }: { children: React.ReactNode }) {
} }
} }
case "unauthenticated": { case "unauthenticated": {
// warning: call order-dependent
lastUserId.current = null;
return { status: "unauthenticated" as const }; return { status: "unauthenticated" as const };
} }
default: { default: {
@@ -103,6 +118,8 @@ export function AuthProvider({ children }: { children: React.ReactNode }) {
update, update,
signIn, signIn,
signOut, signOut,
// for optimistic cases when we assume "loading" doesn't immediately invalidate the user
lastUserId: lastUserId.current,
} }
: noopAuthContext; : noopAuthContext;

View File

@@ -148,7 +148,7 @@ export const authOptions = (): AuthOptions =>
}, },
async session({ session, token }) { async session({ session, token }) {
const extendedToken = token as JWTWithAccessToken; const extendedToken = token as JWTWithAccessToken;
console.log("extendedToken", extendedToken);
const userId = await getUserId(extendedToken.accessToken); const userId = await getUserId(extendedToken.accessToken);
return { return {