diff --git a/server/reflector/processors/__init__.py b/server/reflector/processors/__init__.py index 8a926f30..52e46c34 100644 --- a/server/reflector/processors/__init__.py +++ b/server/reflector/processors/__init__.py @@ -1,10 +1,10 @@ -from .base import Processor, ThreadedProcessor, Pipeline # noqa: F401 -from .types import AudioFile, Transcript, Word, TitleSummary, FinalSummary # noqa: F401 -from .audio_file_writer import AudioFileWriterProcessor # noqa: F401 from .audio_chunker import AudioChunkerProcessor # noqa: F401 +from .audio_file_writer import AudioFileWriterProcessor # noqa: F401 from .audio_merge import AudioMergeProcessor # noqa: F401 from .audio_transcript import AudioTranscriptProcessor # noqa: F401 from .audio_transcript_auto import AudioTranscriptAutoProcessor # noqa: F401 +from .base import Pipeline, PipelineEvent, Processor, ThreadedProcessor # noqa: F401 +from .transcript_final_summary import TranscriptFinalSummaryProcessor # noqa: F401 from .transcript_liner import TranscriptLinerProcessor # noqa: F401 from .transcript_topic_detector import TranscriptTopicDetectorProcessor # noqa: F401 -from .transcript_final_summary import TranscriptFinalSummaryProcessor # noqa: F401 +from .types import AudioFile, FinalSummary, TitleSummary, Transcript, Word # noqa: F401 diff --git a/server/reflector/processors/base.py b/server/reflector/processors/base.py index 35e836bc..0f93b14b 100644 --- a/server/reflector/processors/base.py +++ b/server/reflector/processors/base.py @@ -1,17 +1,25 @@ import asyncio from concurrent.futures import ThreadPoolExecutor -from typing import Any +from typing import Any, Union from uuid import uuid4 +from pydantic import BaseModel from reflector.logger import logger +class PipelineEvent(BaseModel): + processor: str + uid: str + data: Any + + class Processor: INPUT_TYPE: type = None OUTPUT_TYPE: type = None WARMUP_EVENT: str = "WARMUP_EVENT" def __init__(self, callback=None, custom_logger=None): + self.name = self.__class__.__name__ self._processors = [] self._callbacks = [] if callback: @@ -67,6 +75,10 @@ class Processor: return default async def emit(self, data): + if self.pipeline: + await self.pipeline.emit( + PipelineEvent(processor=self.name, uid=self.uid, data=data) + ) for callback in self._callbacks: await callback(data) for processor in self._processors: @@ -183,11 +195,72 @@ class ThreadedProcessor(Processor): def on(self, callback): self.processor.on(callback) + def off(self, callback): + self.processor.off(callback) + def describe(self, level=0): super().describe(level) self.processor.describe(level + 1) +class BroadcastProcessor(Processor): + """ + A processor that broadcasts data to multiple processors, in the order + they were passed to the constructor + + This processor does not guarantee that the output is in order. + + This processor connect all the output of the processors to the input of + the next processor; so the next processor must be able to accept different + types of input. + """ + + def __init__(self, processors: Processor): + super().__init__() + self.processors = processors + self.INPUT_TYPE = processors[0].INPUT_TYPE + output_types = set([processor.OUTPUT_TYPE for processor in processors]) + self.OUTPUT_TYPE = Union[tuple(output_types)] + + def set_pipeline(self, pipeline: "Pipeline"): + super().set_pipeline(pipeline) + for processor in self.processors: + processor.set_pipeline(pipeline) + + async def _warmup(self): + for processor in self.processors: + await processor.warmup() + + async def _push(self, data): + for processor in self.processors: + await processor.push(data) + + async def _flush(self): + for processor in self.processors: + await processor.flush() + + def connect(self, processor: Processor): + for processor in self.processors: + processor.connect(processor) + + def disconnect(self, processor: Processor): + for processor in self.processors: + processor.disconnect(processor) + + def on(self, callback): + for processor in self.processors: + processor.on(callback) + + def off(self, callback): + for processor in self.processors: + processor.off(callback) + + def describe(self, level=0): + super().describe(level) + for processor in self.processors: + processor.describe(level + 1) + + class Pipeline(Processor): """ A pipeline of processors diff --git a/server/reflector/tools/process.py b/server/reflector/tools/process.py index 68d49b0c..ae60d4a1 100644 --- a/server/reflector/tools/process.py +++ b/server/reflector/tools/process.py @@ -1,43 +1,45 @@ +import asyncio + import av from reflector.logger import logger from reflector.processors import ( - Pipeline, AudioChunkerProcessor, AudioMergeProcessor, AudioTranscriptAutoProcessor, + Pipeline, + PipelineEvent, + TranscriptFinalSummaryProcessor, TranscriptLinerProcessor, TranscriptTopicDetectorProcessor, - TranscriptFinalSummaryProcessor, ) -import asyncio -async def process_audio_file(filename, event_callback, only_transcript=False): - async def on_transcript(data): - await event_callback("transcript", data) - - async def on_topic(data): - await event_callback("topic", data) - - async def on_summary(data): - await event_callback("summary", data) - +async def process_audio_file( + filename, + event_callback, + only_transcript=False, + source_language="en", + target_language="en", +): # build pipeline for audio processing processors = [ AudioChunkerProcessor(), AudioMergeProcessor(), AudioTranscriptAutoProcessor.as_threaded(), - TranscriptLinerProcessor(callback=on_transcript), + TranscriptLinerProcessor(), ] if not only_transcript: processors += [ - TranscriptTopicDetectorProcessor.as_threaded(callback=on_topic), - TranscriptFinalSummaryProcessor.as_threaded(callback=on_summary), + TranscriptTopicDetectorProcessor.as_threaded(), + TranscriptFinalSummaryProcessor.as_threaded(), ] # transcription output pipeline = Pipeline(*processors) + pipeline.set_pref("audio:source_language", source_language) + pipeline.set_pref("audio:target_language", target_language) pipeline.describe() + pipeline.on(event_callback) # start processing audio logger.info(f"Opening {filename}") @@ -59,20 +61,35 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("source", help="Source file (mp3, wav, mp4...)") parser.add_argument("--only-transcript", "-t", action="store_true") + parser.add_argument("--source-language", default="en") + parser.add_argument("--target-language", default="en") + parser.add_argument("--output", "-o", help="Output file (output.jsonl)") args = parser.parse_args() - async def event_callback(event, data): - if event == "transcript": - print(f"Transcript[{data.human_timestamp}]: {data.text}") - elif event == "topic": - print(f"Topic[{data.human_timestamp}]: title={data.title}") - print(f"Topic[{data.human_timestamp}]: summary={data.summary}") - elif event == "summary": - print(f"Summary: duration={data.duration}") - print(f"Summary: summary={data.summary}") + output_fd = None + if args.output: + output_fd = open(args.output, "w") + + async def event_callback(event: PipelineEvent): + processor = event.processor + # ignore some processor + if processor in ("AudioChunkerProcessor", "AudioMergeProcessor"): + return + logger.info(f"Event: {event}") + if output_fd: + output_fd.write(event.model_dump_json()) + output_fd.write("\n") asyncio.run( process_audio_file( - args.source, event_callback, only_transcript=args.only_transcript + args.source, + event_callback, + only_transcript=args.only_transcript, + source_language=args.source_language, + target_language=args.target_language, ) ) + + if output_fd: + output_fd.close() + logger.info(f"Output written to {args.output}") diff --git a/server/reflector/tools/runpipeline.py b/server/reflector/tools/runpipeline.py new file mode 100644 index 00000000..eeb9647b --- /dev/null +++ b/server/reflector/tools/runpipeline.py @@ -0,0 +1,110 @@ +""" +# Run a pipeline of processor + +This tools help to either create a pipeline from command line, +or read a yaml description of a pipeline and run it. +""" + +import json + +from reflector.logger import logger +from reflector.processors import Pipeline, PipelineEvent + + +def camel_to_snake(s): + return "".join(["_" + c.lower() if c.isupper() else c for c in s]).lstrip("_") + + +def snake_to_camel(s): + return "".join([c.capitalize() for c in s.split("_")]) + + +def get_jsonl(filename, filter_processor_name=None): + logger.info(f"Opening {args.input}") + if filter_processor_name is not None: + filter_processor_name = snake_to_camel(filter_processor_name) + "Processor" + logger.info(f"Filtering on {filter_processor_name}") + + with open(filename, encoding="utf8") as f: + for line in f: + data = json.loads(line) + if ( + filter_processor_name is not None + and data["processor"] != filter_processor_name + ): + continue + yield data + + +def get_processor(name): + import importlib + + module_name = f"reflector.processors.{name}" + class_name = snake_to_camel(name) + "Processor" + module = importlib.import_module(module_name) + return getattr(module, class_name) + + +async def run_single_processor(args): + output_fd = None + if args.output: + output_fd = open(args.output, "w") + + async def event_callback(event: PipelineEvent): + processor = event.processor + # ignore some processor + if processor in ("AudioChunkerProcessor", "AudioMergeProcessor"): + return + print(f"Event: {event}") + if output_fd: + output_fd.write(event.model_dump_json()) + output_fd.write("\n") + + processor = get_processor(args.processor)() + pipeline = Pipeline(processor) + pipeline.on(event_callback) + input_type = pipeline.INPUT_TYPE + + logger.info(f"Converting to {input_type.__name__} type") + + for data in get_jsonl(args.input, filter_processor_name=args.input_processor): + obj = input_type(**data["data"]) + await pipeline.push(obj) + await pipeline.flush() + + if output_fd: + output_fd.close() + logger.info(f"Output written to {args.output}") + + +if __name__ == "__main__": + import argparse + import asyncio + import sys + + parser = argparse.ArgumentParser(description="Run a pipeline of processor") + parser.add_argument("--input", "-i", help="Input file (jsonl)") + parser.add_argument("--input-processor", "-f", help="Name of the processor to keep") + parser.add_argument("--output", "-o", help="Output file (jsonl)") + parser.add_argument("--pipeline", "-p", help="Pipeline description (yaml)") + parser.add_argument("--processor", help="Processor to run") + args = parser.parse_args() + + if args.output and args.output == args.input: + parser.error("Input and output cannot be the same") + sys.exit(1) + + if args.processor and args.pipeline: + parser.error("--processor and --pipeline are mutually exclusive") + sys.exit(1) + + if not args.processor and not args.pipeline: + parser.error("You need to specify either --processor or --pipeline") + sys.exit(1) + + if args.processor: + func = run_single_processor(args) + # elif args.pipeline: + # func = run_pipeline(args) + + asyncio.run(func) diff --git a/server/tests/test_processors_broadcast.py b/server/tests/test_processors_broadcast.py new file mode 100644 index 00000000..fcddf31c --- /dev/null +++ b/server/tests/test_processors_broadcast.py @@ -0,0 +1,46 @@ +import pytest + + +@pytest.mark.asyncio +async def test_processor_broadcast(): + from reflector.processors.base import Processor, BroadcastProcessor, Pipeline + + class TestProcessor(Processor): + INPUT_TYPE = str + OUTPUT_TYPE = str + + def __init__(self, name, **kwargs): + super().__init__(**kwargs) + self.name = name + + async def _push(self, data): + data = data + f":{self.name}" + await self.emit(data) + + processors = [ + TestProcessor("A"), + BroadcastProcessor( + processors=[ + TestProcessor("B"), + TestProcessor("C"), + ], + ), + ] + + events = [] + + async def on_event(event): + events.append(event) + + pipeline = Pipeline(*processors) + pipeline.on(on_event) + await pipeline.push("test") + await pipeline.flush() + + assert len(events) == 3 + assert events[0].processor == "A" + assert events[0].data == "test:A" + assert events[1].processor == "B" + assert events[1].data == "test:A:B" + assert events[2].processor == "C" + assert events[2].data == "test:A:C" diff --git a/server/tests/test_processors_pipeline.py b/server/tests/test_processors_pipeline.py index cc6a8574..1831e5bd 100644 --- a/server/tests/test_processors_pipeline.py +++ b/server/tests/test_processors_pipeline.py @@ -24,15 +24,12 @@ async def test_basic_process(event_loop): LLM.register("test", LLMTest) # event callback - marks = { - "transcript": 0, - "topic": 0, - "summary": 0, - } + marks = {} - async def event_callback(event, data): - print(f"{event}: {data}") - marks[event] += 1 + async def event_callback(event): + if event.processor not in marks: + marks[event.processor] = 0 + marks[event.processor] += 1 # invoke the process and capture events path = Path(__file__).parent / "records" / "test_mathieu_hello.wav" @@ -40,6 +37,6 @@ async def test_basic_process(event_loop): print(marks) # validate the events - assert marks["transcript"] == 5 - assert marks["topic"] == 1 - assert marks["summary"] == 1 + assert marks["TranscriptLinerProcessor"] == 5 + assert marks["TranscriptTopicDetectorProcessor"] == 1 + assert marks["TranscriptFinalSummaryProcessor"] == 1 diff --git a/www/app/(errors)/errorContext.tsx b/www/app/(errors)/errorContext.tsx new file mode 100644 index 00000000..d8a80c04 --- /dev/null +++ b/www/app/(errors)/errorContext.tsx @@ -0,0 +1,31 @@ +"use client"; +import React, { createContext, useContext, useState } from "react"; + +interface ErrorContextProps { + error: Error | null; + setError: React.Dispatch>; +} + +const ErrorContext = createContext(undefined); + +export const useError = () => { + const context = useContext(ErrorContext); + if (!context) { + throw new Error("useError must be used within an ErrorProvider"); + } + return context; +}; + +interface ErrorProviderProps { + children: React.ReactNode; +} + +export const ErrorProvider: React.FC = ({ children }) => { + const [error, setError] = useState(null); + + return ( + + {children} + + ); +}; diff --git a/www/app/(errors)/errorMessage.tsx b/www/app/(errors)/errorMessage.tsx new file mode 100644 index 00000000..d5109733 --- /dev/null +++ b/www/app/(errors)/errorMessage.tsx @@ -0,0 +1,34 @@ +"use client"; +import { useError } from "./errorContext"; +import { useEffect, useState } from "react"; +import * as Sentry from "@sentry/react"; + +const ErrorMessage: React.FC = () => { + const { error, setError } = useError(); + const [isVisible, setIsVisible] = useState(false); + + useEffect(() => { + if (error) { + setIsVisible(true); + Sentry.captureException(error); + console.error("Error", error.message, error); + } + }, [error]); + + if (!isVisible || !error) return null; + + return ( +
{ + setIsVisible(false); + setError(null); + }} + className="max-w-xs z-50 fixed top-16 right-10 bg-red-100 border border-red-400 text-red-700 px-4 py-3 rounded transition-opacity duration-300 ease-out opacity-100 hover:opacity-75 cursor-pointer transform hover:scale-105" + role="alert" + > + {error?.message} +
+ ); +}; + +export default ErrorMessage; diff --git a/www/app/layout.tsx b/www/app/layout.tsx index fc7913f5..c15974a2 100644 --- a/www/app/layout.tsx +++ b/www/app/layout.tsx @@ -3,6 +3,8 @@ import { Roboto } from "next/font/google"; import { Metadata } from "next"; import FiefWrapper from "./(auth)/fiefWrapper"; import UserInfo from "./(auth)/userInfo"; +import { ErrorProvider } from "./(errors)/errorContext"; +import ErrorMessage from "./(errors)/errorMessage"; const roboto = Roboto({ subsets: ["latin"], weight: "400" }); @@ -55,19 +57,24 @@ export default function RootLayout({ children }) { -
-
- + + +
+
+ -
-

Reflector

-

- Capture The Signal, Not The Noise -

+
+

+ Reflector +

+

+ Capture The Signal, Not The Noise +

+
+ {children}
- {children}
-
+
diff --git a/www/app/transcripts/useTranscript.ts b/www/app/transcripts/useTranscript.ts index 07b614f4..f2188f62 100644 --- a/www/app/transcripts/useTranscript.ts +++ b/www/app/transcripts/useTranscript.ts @@ -1,19 +1,18 @@ import { useEffect, useState } from "react"; import { DefaultApi, V1TranscriptsCreateRequest } from "../api/apis/DefaultApi"; import { GetTranscript } from "../api"; -import getApi from "../lib/getApi"; +import { useError } from "../(errors)/errorContext"; type UseTranscript = { response: GetTranscript | null; loading: boolean; - error: string | null; createTranscript: () => void; }; const useTranscript = (api: DefaultApi): UseTranscript => { const [response, setResponse] = useState(null); const [loading, setLoading] = useState(false); - const [error, setError] = useState(null); + const { setError } = useError(); const createTranscript = () => { setLoading(true); @@ -37,10 +36,7 @@ const useTranscript = (api: DefaultApi): UseTranscript => { console.debug("New transcript created:", result); }) .catch((err) => { - const errorString = err.response || err.message || "Unknown error"; - setError(errorString); - setLoading(false); - console.error("Error creating transcript:", errorString); + setError(err); }); }; @@ -48,7 +44,7 @@ const useTranscript = (api: DefaultApi): UseTranscript => { createTranscript(); }, []); - return { response, loading, error, createTranscript }; + return { response, loading, createTranscript }; }; export default useTranscript; diff --git a/www/app/transcripts/useWebRTC.ts b/www/app/transcripts/useWebRTC.ts index 62694de8..b99382ab 100644 --- a/www/app/transcripts/useWebRTC.ts +++ b/www/app/transcripts/useWebRTC.ts @@ -4,7 +4,7 @@ import { DefaultApi, V1TranscriptRecordWebrtcRequest, } from "../api/apis/DefaultApi"; -import { Configuration } from "../api/runtime"; +import { useError } from "../(errors)/errorContext"; const useWebRTC = ( stream: MediaStream | null, @@ -12,13 +12,25 @@ const useWebRTC = ( api: DefaultApi, ): Peer => { const [peer, setPeer] = useState(null); + const { setError } = useError(); useEffect(() => { if (!stream || !transcriptId) { return; } - let p: Peer = new Peer({ initiator: true, stream: stream }); + let p: Peer; + + try { + p = new Peer({ initiator: true, stream: stream }); + } catch (error) { + setError(error); + return; + } + + p.on("error", (err) => { + setError(new Error(`WebRTC error: ${err}`)); + }); p.on("signal", (data: any) => { if ("sdp" in data) { @@ -33,10 +45,14 @@ const useWebRTC = ( api .v1TranscriptRecordWebrtc(requestParameters) .then((answer) => { - p.signal(answer); + try { + p.signal(answer); + } catch (error) { + setError(error); + } }) - .catch((err) => { - console.error("WebRTC signaling error:", err); + .catch((error) => { + setError(error); }); } }); diff --git a/www/app/transcripts/useWebSockets.ts b/www/app/transcripts/useWebSockets.ts index 726aed22..7bde6d9b 100644 --- a/www/app/transcripts/useWebSockets.ts +++ b/www/app/transcripts/useWebSockets.ts @@ -1,5 +1,6 @@ import { useEffect, useState } from "react"; import { Topic, FinalSummary, Status } from "./webSocketTypes"; +import { useError } from "../(errors)/errorContext"; type UseWebSockets = { transcriptText: string; @@ -15,6 +16,7 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => { summary: "", }); const [status, setStatus] = useState({ value: "disconnected" }); + const { setError } = useError(); useEffect(() => { document.onkeyup = (e) => { @@ -77,41 +79,53 @@ export const useWebSockets = (transcriptId: string | null): UseWebSockets => { ws.onmessage = (event) => { const message = JSON.parse(event.data); - switch (message.event) { - case "TRANSCRIPT": - if (message.data.text) { - setTranscriptText((message.data.text ?? "").trim()); - console.debug("TRANSCRIPT event:", message.data); - } - break; + try { + switch (message.event) { + case "TRANSCRIPT": + if (message.data.text) { + setTranscriptText((message.data.text ?? "").trim()); + console.debug("TRANSCRIPT event:", message.data); + } + break; - case "TOPIC": - setTopics((prevTopics) => [...prevTopics, message.data]); - console.debug("TOPIC event:", message.data); - break; + case "TOPIC": + setTopics((prevTopics) => [...prevTopics, message.data]); + console.debug("TOPIC event:", message.data); + break; - case "FINAL_SUMMARY": - if (message.data) { - setFinalSummary(message.data); - console.debug("FINAL_SUMMARY event:", message.data); - } - break; + case "FINAL_SUMMARY": + if (message.data) { + setFinalSummary(message.data); + console.debug("FINAL_SUMMARY event:", message.data); + } + break; - case "STATUS": - setStatus(message.data); - break; + case "STATUS": + setStatus(message.data); + break; - default: - console.error("Unknown event:", message.event); + default: + setError( + new Error(`Received unknown WebSocket event: ${message.event}`), + ); + } + } catch (error) { + setError(error); } }; ws.onerror = (error) => { console.error("WebSocket error:", error); + setError(new Error("A WebSocket error occurred.")); }; - ws.onclose = () => { + ws.onclose = (event) => { console.debug("WebSocket connection closed"); + if (event.code !== 1000) { + setError( + new Error(`WebSocket closed unexpectedly with code: ${event.code}`), + ); + } }; return () => {