From 9ed26030a52f36898a622892b7c0eca7bbbfa5c3 Mon Sep 17 00:00:00 2001 From: Mathieu Virbel Date: Thu, 31 Aug 2023 11:03:39 +0200 Subject: [PATCH] server: add Broadcast processor --- server/reflector/processors/base.py | 57 +++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/server/reflector/processors/base.py b/server/reflector/processors/base.py index 78913d1c..219bc3c4 100644 --- a/server/reflector/processors/base.py +++ b/server/reflector/processors/base.py @@ -195,11 +195,68 @@ class ThreadedProcessor(Processor): def on(self, callback): self.processor.on(callback) + def off(self, callback): + self.processor.off(callback) + def describe(self, level=0): super().describe(level) self.processor.describe(level + 1) +class BroadcastProcessor(Processor): + """ + A processor that broadcasts data to multiple processors, in the order + they were passed to the constructor + + This processor does not guarantee that the output is in order. + + This processor connect all the output of the processors to the input of + the next processor. + """ + + def __init__(self, processors: Processor): + super().__init__() + self.processors = processors + + def set_pipeline(self, pipeline: "Pipeline"): + super().set_pipeline(pipeline) + for processor in self.processors: + processor.set_pipeline(pipeline) + + async def _warmup(self): + for processor in self.processors: + await processor.warmup() + + async def _push(self, data): + for processor in self.processors: + await processor.push(data) + + async def _flush(self): + for processor in self.processors: + await processor.flush() + + def connect(self, processor: Processor): + for processor in self.processors: + processor.connect(processor) + + def disconnect(self, processor: Processor): + for processor in self.processors: + processor.disconnect(processor) + + def on(self, callback): + for processor in self.processors: + processor.on(callback) + + def off(self, callback): + for processor in self.processors: + processor.off(callback) + + def describe(self, level=0): + super().describe(level) + for processor in self.processors: + processor.describe(level + 1) + + class Pipeline(Processor): """ A pipeline of processors