mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-20 20:29:06 +00:00
Merge branch 'main' of github.com:Monadical-SAS/reflector into llm-modal
This commit is contained in:
@@ -1,10 +1,10 @@
|
||||
from .base import Processor, ThreadedProcessor, Pipeline # noqa: F401
|
||||
from .types import AudioFile, Transcript, Word, TitleSummary, FinalSummary # noqa: F401
|
||||
from .audio_file_writer import AudioFileWriterProcessor # noqa: F401
|
||||
from .audio_chunker import AudioChunkerProcessor # noqa: F401
|
||||
from .audio_file_writer import AudioFileWriterProcessor # noqa: F401
|
||||
from .audio_merge import AudioMergeProcessor # noqa: F401
|
||||
from .audio_transcript import AudioTranscriptProcessor # noqa: F401
|
||||
from .audio_transcript_auto import AudioTranscriptAutoProcessor # noqa: F401
|
||||
from .base import Pipeline, PipelineEvent, Processor, ThreadedProcessor # noqa: F401
|
||||
from .transcript_final_summary import TranscriptFinalSummaryProcessor # noqa: F401
|
||||
from .transcript_liner import TranscriptLinerProcessor # noqa: F401
|
||||
from .transcript_topic_detector import TranscriptTopicDetectorProcessor # noqa: F401
|
||||
from .transcript_final_summary import TranscriptFinalSummaryProcessor # noqa: F401
|
||||
from .types import AudioFile, FinalSummary, TitleSummary, Transcript, Word # noqa: F401
|
||||
|
||||
@@ -1,17 +1,25 @@
|
||||
import asyncio
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Any
|
||||
from typing import Any, Union
|
||||
from uuid import uuid4
|
||||
|
||||
from pydantic import BaseModel
|
||||
from reflector.logger import logger
|
||||
|
||||
|
||||
class PipelineEvent(BaseModel):
|
||||
processor: str
|
||||
uid: str
|
||||
data: Any
|
||||
|
||||
|
||||
class Processor:
|
||||
INPUT_TYPE: type = None
|
||||
OUTPUT_TYPE: type = None
|
||||
WARMUP_EVENT: str = "WARMUP_EVENT"
|
||||
|
||||
def __init__(self, callback=None, custom_logger=None):
|
||||
self.name = self.__class__.__name__
|
||||
self._processors = []
|
||||
self._callbacks = []
|
||||
if callback:
|
||||
@@ -67,6 +75,10 @@ class Processor:
|
||||
return default
|
||||
|
||||
async def emit(self, data):
|
||||
if self.pipeline:
|
||||
await self.pipeline.emit(
|
||||
PipelineEvent(processor=self.name, uid=self.uid, data=data)
|
||||
)
|
||||
for callback in self._callbacks:
|
||||
await callback(data)
|
||||
for processor in self._processors:
|
||||
@@ -183,11 +195,72 @@ class ThreadedProcessor(Processor):
|
||||
def on(self, callback):
|
||||
self.processor.on(callback)
|
||||
|
||||
def off(self, callback):
|
||||
self.processor.off(callback)
|
||||
|
||||
def describe(self, level=0):
|
||||
super().describe(level)
|
||||
self.processor.describe(level + 1)
|
||||
|
||||
|
||||
class BroadcastProcessor(Processor):
|
||||
"""
|
||||
A processor that broadcasts data to multiple processors, in the order
|
||||
they were passed to the constructor
|
||||
|
||||
This processor does not guarantee that the output is in order.
|
||||
|
||||
This processor connect all the output of the processors to the input of
|
||||
the next processor; so the next processor must be able to accept different
|
||||
types of input.
|
||||
"""
|
||||
|
||||
def __init__(self, processors: Processor):
|
||||
super().__init__()
|
||||
self.processors = processors
|
||||
self.INPUT_TYPE = processors[0].INPUT_TYPE
|
||||
output_types = set([processor.OUTPUT_TYPE for processor in processors])
|
||||
self.OUTPUT_TYPE = Union[tuple(output_types)]
|
||||
|
||||
def set_pipeline(self, pipeline: "Pipeline"):
|
||||
super().set_pipeline(pipeline)
|
||||
for processor in self.processors:
|
||||
processor.set_pipeline(pipeline)
|
||||
|
||||
async def _warmup(self):
|
||||
for processor in self.processors:
|
||||
await processor.warmup()
|
||||
|
||||
async def _push(self, data):
|
||||
for processor in self.processors:
|
||||
await processor.push(data)
|
||||
|
||||
async def _flush(self):
|
||||
for processor in self.processors:
|
||||
await processor.flush()
|
||||
|
||||
def connect(self, processor: Processor):
|
||||
for processor in self.processors:
|
||||
processor.connect(processor)
|
||||
|
||||
def disconnect(self, processor: Processor):
|
||||
for processor in self.processors:
|
||||
processor.disconnect(processor)
|
||||
|
||||
def on(self, callback):
|
||||
for processor in self.processors:
|
||||
processor.on(callback)
|
||||
|
||||
def off(self, callback):
|
||||
for processor in self.processors:
|
||||
processor.off(callback)
|
||||
|
||||
def describe(self, level=0):
|
||||
super().describe(level)
|
||||
for processor in self.processors:
|
||||
processor.describe(level + 1)
|
||||
|
||||
|
||||
class Pipeline(Processor):
|
||||
"""
|
||||
A pipeline of processors
|
||||
|
||||
@@ -1,43 +1,45 @@
|
||||
import asyncio
|
||||
|
||||
import av
|
||||
from reflector.logger import logger
|
||||
from reflector.processors import (
|
||||
Pipeline,
|
||||
AudioChunkerProcessor,
|
||||
AudioMergeProcessor,
|
||||
AudioTranscriptAutoProcessor,
|
||||
Pipeline,
|
||||
PipelineEvent,
|
||||
TranscriptFinalSummaryProcessor,
|
||||
TranscriptLinerProcessor,
|
||||
TranscriptTopicDetectorProcessor,
|
||||
TranscriptFinalSummaryProcessor,
|
||||
)
|
||||
import asyncio
|
||||
|
||||
|
||||
async def process_audio_file(filename, event_callback, only_transcript=False):
|
||||
async def on_transcript(data):
|
||||
await event_callback("transcript", data)
|
||||
|
||||
async def on_topic(data):
|
||||
await event_callback("topic", data)
|
||||
|
||||
async def on_summary(data):
|
||||
await event_callback("summary", data)
|
||||
|
||||
async def process_audio_file(
|
||||
filename,
|
||||
event_callback,
|
||||
only_transcript=False,
|
||||
source_language="en",
|
||||
target_language="en",
|
||||
):
|
||||
# build pipeline for audio processing
|
||||
processors = [
|
||||
AudioChunkerProcessor(),
|
||||
AudioMergeProcessor(),
|
||||
AudioTranscriptAutoProcessor.as_threaded(),
|
||||
TranscriptLinerProcessor(callback=on_transcript),
|
||||
TranscriptLinerProcessor(),
|
||||
]
|
||||
if not only_transcript:
|
||||
processors += [
|
||||
TranscriptTopicDetectorProcessor.as_threaded(callback=on_topic),
|
||||
TranscriptFinalSummaryProcessor.as_threaded(callback=on_summary),
|
||||
TranscriptTopicDetectorProcessor.as_threaded(),
|
||||
TranscriptFinalSummaryProcessor.as_threaded(),
|
||||
]
|
||||
|
||||
# transcription output
|
||||
pipeline = Pipeline(*processors)
|
||||
pipeline.set_pref("audio:source_language", source_language)
|
||||
pipeline.set_pref("audio:target_language", target_language)
|
||||
pipeline.describe()
|
||||
pipeline.on(event_callback)
|
||||
|
||||
# start processing audio
|
||||
logger.info(f"Opening {filename}")
|
||||
@@ -59,20 +61,35 @@ if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("source", help="Source file (mp3, wav, mp4...)")
|
||||
parser.add_argument("--only-transcript", "-t", action="store_true")
|
||||
parser.add_argument("--source-language", default="en")
|
||||
parser.add_argument("--target-language", default="en")
|
||||
parser.add_argument("--output", "-o", help="Output file (output.jsonl)")
|
||||
args = parser.parse_args()
|
||||
|
||||
async def event_callback(event, data):
|
||||
if event == "transcript":
|
||||
print(f"Transcript[{data.human_timestamp}]: {data.text}")
|
||||
elif event == "topic":
|
||||
print(f"Topic[{data.human_timestamp}]: title={data.title}")
|
||||
print(f"Topic[{data.human_timestamp}]: summary={data.summary}")
|
||||
elif event == "summary":
|
||||
print(f"Summary: duration={data.duration}")
|
||||
print(f"Summary: summary={data.summary}")
|
||||
output_fd = None
|
||||
if args.output:
|
||||
output_fd = open(args.output, "w")
|
||||
|
||||
async def event_callback(event: PipelineEvent):
|
||||
processor = event.processor
|
||||
# ignore some processor
|
||||
if processor in ("AudioChunkerProcessor", "AudioMergeProcessor"):
|
||||
return
|
||||
logger.info(f"Event: {event}")
|
||||
if output_fd:
|
||||
output_fd.write(event.model_dump_json())
|
||||
output_fd.write("\n")
|
||||
|
||||
asyncio.run(
|
||||
process_audio_file(
|
||||
args.source, event_callback, only_transcript=args.only_transcript
|
||||
args.source,
|
||||
event_callback,
|
||||
only_transcript=args.only_transcript,
|
||||
source_language=args.source_language,
|
||||
target_language=args.target_language,
|
||||
)
|
||||
)
|
||||
|
||||
if output_fd:
|
||||
output_fd.close()
|
||||
logger.info(f"Output written to {args.output}")
|
||||
|
||||
110
server/reflector/tools/runpipeline.py
Normal file
110
server/reflector/tools/runpipeline.py
Normal file
@@ -0,0 +1,110 @@
|
||||
"""
|
||||
# Run a pipeline of processor
|
||||
|
||||
This tools help to either create a pipeline from command line,
|
||||
or read a yaml description of a pipeline and run it.
|
||||
"""
|
||||
|
||||
import json
|
||||
|
||||
from reflector.logger import logger
|
||||
from reflector.processors import Pipeline, PipelineEvent
|
||||
|
||||
|
||||
def camel_to_snake(s):
|
||||
return "".join(["_" + c.lower() if c.isupper() else c for c in s]).lstrip("_")
|
||||
|
||||
|
||||
def snake_to_camel(s):
|
||||
return "".join([c.capitalize() for c in s.split("_")])
|
||||
|
||||
|
||||
def get_jsonl(filename, filter_processor_name=None):
|
||||
logger.info(f"Opening {args.input}")
|
||||
if filter_processor_name is not None:
|
||||
filter_processor_name = snake_to_camel(filter_processor_name) + "Processor"
|
||||
logger.info(f"Filtering on {filter_processor_name}")
|
||||
|
||||
with open(filename, encoding="utf8") as f:
|
||||
for line in f:
|
||||
data = json.loads(line)
|
||||
if (
|
||||
filter_processor_name is not None
|
||||
and data["processor"] != filter_processor_name
|
||||
):
|
||||
continue
|
||||
yield data
|
||||
|
||||
|
||||
def get_processor(name):
|
||||
import importlib
|
||||
|
||||
module_name = f"reflector.processors.{name}"
|
||||
class_name = snake_to_camel(name) + "Processor"
|
||||
module = importlib.import_module(module_name)
|
||||
return getattr(module, class_name)
|
||||
|
||||
|
||||
async def run_single_processor(args):
|
||||
output_fd = None
|
||||
if args.output:
|
||||
output_fd = open(args.output, "w")
|
||||
|
||||
async def event_callback(event: PipelineEvent):
|
||||
processor = event.processor
|
||||
# ignore some processor
|
||||
if processor in ("AudioChunkerProcessor", "AudioMergeProcessor"):
|
||||
return
|
||||
print(f"Event: {event}")
|
||||
if output_fd:
|
||||
output_fd.write(event.model_dump_json())
|
||||
output_fd.write("\n")
|
||||
|
||||
processor = get_processor(args.processor)()
|
||||
pipeline = Pipeline(processor)
|
||||
pipeline.on(event_callback)
|
||||
input_type = pipeline.INPUT_TYPE
|
||||
|
||||
logger.info(f"Converting to {input_type.__name__} type")
|
||||
|
||||
for data in get_jsonl(args.input, filter_processor_name=args.input_processor):
|
||||
obj = input_type(**data["data"])
|
||||
await pipeline.push(obj)
|
||||
await pipeline.flush()
|
||||
|
||||
if output_fd:
|
||||
output_fd.close()
|
||||
logger.info(f"Output written to {args.output}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
parser = argparse.ArgumentParser(description="Run a pipeline of processor")
|
||||
parser.add_argument("--input", "-i", help="Input file (jsonl)")
|
||||
parser.add_argument("--input-processor", "-f", help="Name of the processor to keep")
|
||||
parser.add_argument("--output", "-o", help="Output file (jsonl)")
|
||||
parser.add_argument("--pipeline", "-p", help="Pipeline description (yaml)")
|
||||
parser.add_argument("--processor", help="Processor to run")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.output and args.output == args.input:
|
||||
parser.error("Input and output cannot be the same")
|
||||
sys.exit(1)
|
||||
|
||||
if args.processor and args.pipeline:
|
||||
parser.error("--processor and --pipeline are mutually exclusive")
|
||||
sys.exit(1)
|
||||
|
||||
if not args.processor and not args.pipeline:
|
||||
parser.error("You need to specify either --processor or --pipeline")
|
||||
sys.exit(1)
|
||||
|
||||
if args.processor:
|
||||
func = run_single_processor(args)
|
||||
# elif args.pipeline:
|
||||
# func = run_pipeline(args)
|
||||
|
||||
asyncio.run(func)
|
||||
46
server/tests/test_processors_broadcast.py
Normal file
46
server/tests/test_processors_broadcast.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_processor_broadcast():
|
||||
from reflector.processors.base import Processor, BroadcastProcessor, Pipeline
|
||||
|
||||
class TestProcessor(Processor):
|
||||
INPUT_TYPE = str
|
||||
OUTPUT_TYPE = str
|
||||
|
||||
def __init__(self, name, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.name = name
|
||||
|
||||
async def _push(self, data):
|
||||
data = data + f":{self.name}"
|
||||
await self.emit(data)
|
||||
|
||||
processors = [
|
||||
TestProcessor("A"),
|
||||
BroadcastProcessor(
|
||||
processors=[
|
||||
TestProcessor("B"),
|
||||
TestProcessor("C"),
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
events = []
|
||||
|
||||
async def on_event(event):
|
||||
events.append(event)
|
||||
|
||||
pipeline = Pipeline(*processors)
|
||||
pipeline.on(on_event)
|
||||
await pipeline.push("test")
|
||||
await pipeline.flush()
|
||||
|
||||
assert len(events) == 3
|
||||
assert events[0].processor == "A"
|
||||
assert events[0].data == "test:A"
|
||||
assert events[1].processor == "B"
|
||||
assert events[1].data == "test:A:B"
|
||||
assert events[2].processor == "C"
|
||||
assert events[2].data == "test:A:C"
|
||||
@@ -24,15 +24,12 @@ async def test_basic_process(event_loop):
|
||||
LLM.register("test", LLMTest)
|
||||
|
||||
# event callback
|
||||
marks = {
|
||||
"transcript": 0,
|
||||
"topic": 0,
|
||||
"summary": 0,
|
||||
}
|
||||
marks = {}
|
||||
|
||||
async def event_callback(event, data):
|
||||
print(f"{event}: {data}")
|
||||
marks[event] += 1
|
||||
async def event_callback(event):
|
||||
if event.processor not in marks:
|
||||
marks[event.processor] = 0
|
||||
marks[event.processor] += 1
|
||||
|
||||
# invoke the process and capture events
|
||||
path = Path(__file__).parent / "records" / "test_mathieu_hello.wav"
|
||||
@@ -40,6 +37,6 @@ async def test_basic_process(event_loop):
|
||||
print(marks)
|
||||
|
||||
# validate the events
|
||||
assert marks["transcript"] == 5
|
||||
assert marks["topic"] == 1
|
||||
assert marks["summary"] == 1
|
||||
assert marks["TranscriptLinerProcessor"] == 5
|
||||
assert marks["TranscriptTopicDetectorProcessor"] == 1
|
||||
assert marks["TranscriptFinalSummaryProcessor"] == 1
|
||||
|
||||
Reference in New Issue
Block a user