diff --git a/server/reflector/processors/__init__.py b/server/reflector/processors/__init__.py index 8a926f30..52e46c34 100644 --- a/server/reflector/processors/__init__.py +++ b/server/reflector/processors/__init__.py @@ -1,10 +1,10 @@ -from .base import Processor, ThreadedProcessor, Pipeline # noqa: F401 -from .types import AudioFile, Transcript, Word, TitleSummary, FinalSummary # noqa: F401 -from .audio_file_writer import AudioFileWriterProcessor # noqa: F401 from .audio_chunker import AudioChunkerProcessor # noqa: F401 +from .audio_file_writer import AudioFileWriterProcessor # noqa: F401 from .audio_merge import AudioMergeProcessor # noqa: F401 from .audio_transcript import AudioTranscriptProcessor # noqa: F401 from .audio_transcript_auto import AudioTranscriptAutoProcessor # noqa: F401 +from .base import Pipeline, PipelineEvent, Processor, ThreadedProcessor # noqa: F401 +from .transcript_final_summary import TranscriptFinalSummaryProcessor # noqa: F401 from .transcript_liner import TranscriptLinerProcessor # noqa: F401 from .transcript_topic_detector import TranscriptTopicDetectorProcessor # noqa: F401 -from .transcript_final_summary import TranscriptFinalSummaryProcessor # noqa: F401 +from .types import AudioFile, FinalSummary, TitleSummary, Transcript, Word # noqa: F401 diff --git a/server/reflector/processors/base.py b/server/reflector/processors/base.py index 35e836bc..78913d1c 100644 --- a/server/reflector/processors/base.py +++ b/server/reflector/processors/base.py @@ -3,15 +3,23 @@ from concurrent.futures import ThreadPoolExecutor from typing import Any from uuid import uuid4 +from pydantic import BaseModel from reflector.logger import logger +class PipelineEvent(BaseModel): + processor: str + uid: str + data: Any + + class Processor: INPUT_TYPE: type = None OUTPUT_TYPE: type = None WARMUP_EVENT: str = "WARMUP_EVENT" def __init__(self, callback=None, custom_logger=None): + self.name = self.__class__.__name__ self._processors = [] self._callbacks = [] if callback: @@ -67,6 +75,10 @@ class Processor: return default async def emit(self, data): + if self.pipeline: + await self.pipeline.emit( + PipelineEvent(processor=self.name, uid=self.uid, data=data) + ) for callback in self._callbacks: await callback(data) for processor in self._processors: diff --git a/server/reflector/tools/process.py b/server/reflector/tools/process.py index 68d49b0c..ae60d4a1 100644 --- a/server/reflector/tools/process.py +++ b/server/reflector/tools/process.py @@ -1,43 +1,45 @@ +import asyncio + import av from reflector.logger import logger from reflector.processors import ( - Pipeline, AudioChunkerProcessor, AudioMergeProcessor, AudioTranscriptAutoProcessor, + Pipeline, + PipelineEvent, + TranscriptFinalSummaryProcessor, TranscriptLinerProcessor, TranscriptTopicDetectorProcessor, - TranscriptFinalSummaryProcessor, ) -import asyncio -async def process_audio_file(filename, event_callback, only_transcript=False): - async def on_transcript(data): - await event_callback("transcript", data) - - async def on_topic(data): - await event_callback("topic", data) - - async def on_summary(data): - await event_callback("summary", data) - +async def process_audio_file( + filename, + event_callback, + only_transcript=False, + source_language="en", + target_language="en", +): # build pipeline for audio processing processors = [ AudioChunkerProcessor(), AudioMergeProcessor(), AudioTranscriptAutoProcessor.as_threaded(), - TranscriptLinerProcessor(callback=on_transcript), + TranscriptLinerProcessor(), ] if not only_transcript: processors += [ - TranscriptTopicDetectorProcessor.as_threaded(callback=on_topic), - TranscriptFinalSummaryProcessor.as_threaded(callback=on_summary), + TranscriptTopicDetectorProcessor.as_threaded(), + TranscriptFinalSummaryProcessor.as_threaded(), ] # transcription output pipeline = Pipeline(*processors) + pipeline.set_pref("audio:source_language", source_language) + pipeline.set_pref("audio:target_language", target_language) pipeline.describe() + pipeline.on(event_callback) # start processing audio logger.info(f"Opening {filename}") @@ -59,20 +61,35 @@ if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("source", help="Source file (mp3, wav, mp4...)") parser.add_argument("--only-transcript", "-t", action="store_true") + parser.add_argument("--source-language", default="en") + parser.add_argument("--target-language", default="en") + parser.add_argument("--output", "-o", help="Output file (output.jsonl)") args = parser.parse_args() - async def event_callback(event, data): - if event == "transcript": - print(f"Transcript[{data.human_timestamp}]: {data.text}") - elif event == "topic": - print(f"Topic[{data.human_timestamp}]: title={data.title}") - print(f"Topic[{data.human_timestamp}]: summary={data.summary}") - elif event == "summary": - print(f"Summary: duration={data.duration}") - print(f"Summary: summary={data.summary}") + output_fd = None + if args.output: + output_fd = open(args.output, "w") + + async def event_callback(event: PipelineEvent): + processor = event.processor + # ignore some processor + if processor in ("AudioChunkerProcessor", "AudioMergeProcessor"): + return + logger.info(f"Event: {event}") + if output_fd: + output_fd.write(event.model_dump_json()) + output_fd.write("\n") asyncio.run( process_audio_file( - args.source, event_callback, only_transcript=args.only_transcript + args.source, + event_callback, + only_transcript=args.only_transcript, + source_language=args.source_language, + target_language=args.target_language, ) ) + + if output_fd: + output_fd.close() + logger.info(f"Output written to {args.output}")