mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 04:39:06 +00:00
server: add Broadcast processor
This commit is contained in:
@@ -195,11 +195,68 @@ class ThreadedProcessor(Processor):
|
|||||||
def on(self, callback):
|
def on(self, callback):
|
||||||
self.processor.on(callback)
|
self.processor.on(callback)
|
||||||
|
|
||||||
|
def off(self, callback):
|
||||||
|
self.processor.off(callback)
|
||||||
|
|
||||||
def describe(self, level=0):
|
def describe(self, level=0):
|
||||||
super().describe(level)
|
super().describe(level)
|
||||||
self.processor.describe(level + 1)
|
self.processor.describe(level + 1)
|
||||||
|
|
||||||
|
|
||||||
|
class BroadcastProcessor(Processor):
|
||||||
|
"""
|
||||||
|
A processor that broadcasts data to multiple processors, in the order
|
||||||
|
they were passed to the constructor
|
||||||
|
|
||||||
|
This processor does not guarantee that the output is in order.
|
||||||
|
|
||||||
|
This processor connect all the output of the processors to the input of
|
||||||
|
the next processor.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, processors: Processor):
|
||||||
|
super().__init__()
|
||||||
|
self.processors = processors
|
||||||
|
|
||||||
|
def set_pipeline(self, pipeline: "Pipeline"):
|
||||||
|
super().set_pipeline(pipeline)
|
||||||
|
for processor in self.processors:
|
||||||
|
processor.set_pipeline(pipeline)
|
||||||
|
|
||||||
|
async def _warmup(self):
|
||||||
|
for processor in self.processors:
|
||||||
|
await processor.warmup()
|
||||||
|
|
||||||
|
async def _push(self, data):
|
||||||
|
for processor in self.processors:
|
||||||
|
await processor.push(data)
|
||||||
|
|
||||||
|
async def _flush(self):
|
||||||
|
for processor in self.processors:
|
||||||
|
await processor.flush()
|
||||||
|
|
||||||
|
def connect(self, processor: Processor):
|
||||||
|
for processor in self.processors:
|
||||||
|
processor.connect(processor)
|
||||||
|
|
||||||
|
def disconnect(self, processor: Processor):
|
||||||
|
for processor in self.processors:
|
||||||
|
processor.disconnect(processor)
|
||||||
|
|
||||||
|
def on(self, callback):
|
||||||
|
for processor in self.processors:
|
||||||
|
processor.on(callback)
|
||||||
|
|
||||||
|
def off(self, callback):
|
||||||
|
for processor in self.processors:
|
||||||
|
processor.off(callback)
|
||||||
|
|
||||||
|
def describe(self, level=0):
|
||||||
|
super().describe(level)
|
||||||
|
for processor in self.processors:
|
||||||
|
processor.describe(level + 1)
|
||||||
|
|
||||||
|
|
||||||
class Pipeline(Processor):
|
class Pipeline(Processor):
|
||||||
"""
|
"""
|
||||||
A pipeline of processors
|
A pipeline of processors
|
||||||
|
|||||||
Reference in New Issue
Block a user