Files
reflector/server/reflector/tools/runpipeline.py
Mathieu Virbel 9265d201b5 fix: restore previous behavior on live pipeline + audio downscaler (#561)
This commit restore the original behavior with frame cutting. While
silero is used on our gpu for files, look like it's not working great on
the live pipeline. To be investigated, but at the moment, what we keep
is:

- refactored to extract the downscale for further processing in the
pipeline
- remove any downscale implementation from audio_chunker and audio_merge
- removed batching from audio_merge too for now
2025-08-22 10:49:26 -06:00

111 lines
3.3 KiB
Python

"""
# Run a pipeline of processor
This tools help to either create a pipeline from command line,
or read a yaml description of a pipeline and run it.
"""
import json
from reflector.logger import logger
from reflector.processors import Pipeline, PipelineEvent
def camel_to_snake(s):
return "".join(["_" + c.lower() if c.isupper() else c for c in s]).lstrip("_")
def snake_to_camel(s):
return "".join([c.capitalize() for c in s.split("_")])
def get_jsonl(filename, filter_processor_name=None):
logger.info(f"Opening {args.input}")
if filter_processor_name is not None:
filter_processor_name = snake_to_camel(filter_processor_name) + "Processor"
logger.info(f"Filtering on {filter_processor_name}")
with open(filename, encoding="utf8") as f:
for line in f:
data = json.loads(line)
if (
filter_processor_name is not None
and data["processor"] != filter_processor_name
):
continue
yield data
def get_processor(name):
import importlib
module_name = f"reflector.processors.{name}"
class_name = snake_to_camel(name) + "Processor"
module = importlib.import_module(module_name)
return getattr(module, class_name)
async def run_single_processor(args):
output_fd = None
if args.output:
output_fd = open(args.output, "w")
async def event_callback(event: PipelineEvent):
processor = event.processor
# ignore some processor
if processor in ("AudioChunkerAutoProcessor", "AudioMergeProcessor"):
return
print(f"Event: {event}")
if output_fd:
output_fd.write(event.model_dump_json())
output_fd.write("\n")
processor = get_processor(args.processor)()
pipeline = Pipeline(processor)
pipeline.on(event_callback)
input_type = pipeline.INPUT_TYPE
logger.info(f"Converting to {input_type.__name__} type")
for data in get_jsonl(args.input, filter_processor_name=args.input_processor):
obj = input_type(**data["data"])
await pipeline.push(obj)
await pipeline.flush()
if output_fd:
output_fd.close()
logger.info(f"Output written to {args.output}")
if __name__ == "__main__":
import argparse
import asyncio
import sys
parser = argparse.ArgumentParser(description="Run a pipeline of processor")
parser.add_argument("--input", "-i", help="Input file (jsonl)")
parser.add_argument("--input-processor", "-f", help="Name of the processor to keep")
parser.add_argument("--output", "-o", help="Output file (jsonl)")
parser.add_argument("--pipeline", "-p", help="Pipeline description (yaml)")
parser.add_argument("--processor", help="Processor to run")
args = parser.parse_args()
if args.output and args.output == args.input:
parser.error("Input and output cannot be the same")
sys.exit(1)
if args.processor and args.pipeline:
parser.error("--processor and --pipeline are mutually exclusive")
sys.exit(1)
if not args.processor and not args.pipeline:
parser.error("You need to specify either --processor or --pipeline")
sys.exit(1)
if args.processor:
func = run_single_processor(args)
# elif args.pipeline:
# func = run_pipeline(args)
asyncio.run(func)