server: update process tools to save all events into a jsonl file

This commit is contained in:
2023-08-30 19:20:46 +02:00
committed by Mathieu Virbel
parent df078f7bd6
commit bdf7fe6ebc
3 changed files with 59 additions and 30 deletions

View File

@@ -1,10 +1,10 @@
from .base import Processor, ThreadedProcessor, Pipeline # noqa: F401
from .types import AudioFile, Transcript, Word, TitleSummary, FinalSummary # noqa: F401
from .audio_file_writer import AudioFileWriterProcessor # noqa: F401
from .audio_chunker import AudioChunkerProcessor # noqa: F401 from .audio_chunker import AudioChunkerProcessor # noqa: F401
from .audio_file_writer import AudioFileWriterProcessor # noqa: F401
from .audio_merge import AudioMergeProcessor # noqa: F401 from .audio_merge import AudioMergeProcessor # noqa: F401
from .audio_transcript import AudioTranscriptProcessor # noqa: F401 from .audio_transcript import AudioTranscriptProcessor # noqa: F401
from .audio_transcript_auto import AudioTranscriptAutoProcessor # noqa: F401 from .audio_transcript_auto import AudioTranscriptAutoProcessor # noqa: F401
from .base import Pipeline, PipelineEvent, Processor, ThreadedProcessor # noqa: F401
from .transcript_final_summary import TranscriptFinalSummaryProcessor # noqa: F401
from .transcript_liner import TranscriptLinerProcessor # noqa: F401 from .transcript_liner import TranscriptLinerProcessor # noqa: F401
from .transcript_topic_detector import TranscriptTopicDetectorProcessor # noqa: F401 from .transcript_topic_detector import TranscriptTopicDetectorProcessor # noqa: F401
from .transcript_final_summary import TranscriptFinalSummaryProcessor # noqa: F401 from .types import AudioFile, FinalSummary, TitleSummary, Transcript, Word # noqa: F401

View File

@@ -3,15 +3,23 @@ from concurrent.futures import ThreadPoolExecutor
from typing import Any from typing import Any
from uuid import uuid4 from uuid import uuid4
from pydantic import BaseModel
from reflector.logger import logger from reflector.logger import logger
class PipelineEvent(BaseModel):
processor: str
uid: str
data: Any
class Processor: class Processor:
INPUT_TYPE: type = None INPUT_TYPE: type = None
OUTPUT_TYPE: type = None OUTPUT_TYPE: type = None
WARMUP_EVENT: str = "WARMUP_EVENT" WARMUP_EVENT: str = "WARMUP_EVENT"
def __init__(self, callback=None, custom_logger=None): def __init__(self, callback=None, custom_logger=None):
self.name = self.__class__.__name__
self._processors = [] self._processors = []
self._callbacks = [] self._callbacks = []
if callback: if callback:
@@ -67,6 +75,10 @@ class Processor:
return default return default
async def emit(self, data): async def emit(self, data):
if self.pipeline:
await self.pipeline.emit(
PipelineEvent(processor=self.name, uid=self.uid, data=data)
)
for callback in self._callbacks: for callback in self._callbacks:
await callback(data) await callback(data)
for processor in self._processors: for processor in self._processors:

View File

@@ -1,43 +1,45 @@
import asyncio
import av import av
from reflector.logger import logger from reflector.logger import logger
from reflector.processors import ( from reflector.processors import (
Pipeline,
AudioChunkerProcessor, AudioChunkerProcessor,
AudioMergeProcessor, AudioMergeProcessor,
AudioTranscriptAutoProcessor, AudioTranscriptAutoProcessor,
Pipeline,
PipelineEvent,
TranscriptFinalSummaryProcessor,
TranscriptLinerProcessor, TranscriptLinerProcessor,
TranscriptTopicDetectorProcessor, TranscriptTopicDetectorProcessor,
TranscriptFinalSummaryProcessor,
) )
import asyncio
async def process_audio_file(filename, event_callback, only_transcript=False): async def process_audio_file(
async def on_transcript(data): filename,
await event_callback("transcript", data) event_callback,
only_transcript=False,
async def on_topic(data): source_language="en",
await event_callback("topic", data) target_language="en",
):
async def on_summary(data):
await event_callback("summary", data)
# build pipeline for audio processing # build pipeline for audio processing
processors = [ processors = [
AudioChunkerProcessor(), AudioChunkerProcessor(),
AudioMergeProcessor(), AudioMergeProcessor(),
AudioTranscriptAutoProcessor.as_threaded(), AudioTranscriptAutoProcessor.as_threaded(),
TranscriptLinerProcessor(callback=on_transcript), TranscriptLinerProcessor(),
] ]
if not only_transcript: if not only_transcript:
processors += [ processors += [
TranscriptTopicDetectorProcessor.as_threaded(callback=on_topic), TranscriptTopicDetectorProcessor.as_threaded(),
TranscriptFinalSummaryProcessor.as_threaded(callback=on_summary), TranscriptFinalSummaryProcessor.as_threaded(),
] ]
# transcription output # transcription output
pipeline = Pipeline(*processors) pipeline = Pipeline(*processors)
pipeline.set_pref("audio:source_language", source_language)
pipeline.set_pref("audio:target_language", target_language)
pipeline.describe() pipeline.describe()
pipeline.on(event_callback)
# start processing audio # start processing audio
logger.info(f"Opening {filename}") logger.info(f"Opening {filename}")
@@ -59,20 +61,35 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument("source", help="Source file (mp3, wav, mp4...)") parser.add_argument("source", help="Source file (mp3, wav, mp4...)")
parser.add_argument("--only-transcript", "-t", action="store_true") parser.add_argument("--only-transcript", "-t", action="store_true")
parser.add_argument("--source-language", default="en")
parser.add_argument("--target-language", default="en")
parser.add_argument("--output", "-o", help="Output file (output.jsonl)")
args = parser.parse_args() args = parser.parse_args()
async def event_callback(event, data): output_fd = None
if event == "transcript": if args.output:
print(f"Transcript[{data.human_timestamp}]: {data.text}") output_fd = open(args.output, "w")
elif event == "topic":
print(f"Topic[{data.human_timestamp}]: title={data.title}") async def event_callback(event: PipelineEvent):
print(f"Topic[{data.human_timestamp}]: summary={data.summary}") processor = event.processor
elif event == "summary": # ignore some processor
print(f"Summary: duration={data.duration}") if processor in ("AudioChunkerProcessor", "AudioMergeProcessor"):
print(f"Summary: summary={data.summary}") return
logger.info(f"Event: {event}")
if output_fd:
output_fd.write(event.model_dump_json())
output_fd.write("\n")
asyncio.run( asyncio.run(
process_audio_file( process_audio_file(
args.source, event_callback, only_transcript=args.only_transcript args.source,
event_callback,
only_transcript=args.only_transcript,
source_language=args.source_language,
target_language=args.target_language,
) )
) )
if output_fd:
output_fd.close()
logger.info(f"Output written to {args.output}")