mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 12:49:06 +00:00
code style updates
This commit is contained in:
64
server.py
64
server.py
@@ -6,20 +6,21 @@ import os
|
||||
import uuid
|
||||
import wave
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Any
|
||||
from typing import Any, NoReturn
|
||||
|
||||
import aiohttp_cors
|
||||
import av
|
||||
import requests
|
||||
from aiohttp import web
|
||||
from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription
|
||||
from aiortc.contrib.media import MediaRelay
|
||||
from av import AudioFifo
|
||||
from faster_whisper import WhisperModel
|
||||
from loguru import logger
|
||||
from sortedcontainers import SortedDict
|
||||
|
||||
from reflector_dataclasses import FinalSummaryResponse, ParseLLMResult, TitleSummaryInput, TitleSummaryOutput, \
|
||||
TranscriptionInput, TranscriptionOutput
|
||||
from reflector_dataclasses import FinalSummaryResult, ParseLLMResult,\
|
||||
TitleSummaryInput, TitleSummaryOutput, TranscriptionInput,\
|
||||
TranscriptionOutput
|
||||
from utils.run_utils import config, run_in_executor
|
||||
|
||||
pcs = set()
|
||||
@@ -31,25 +32,21 @@ model = WhisperModel("tiny", device="cpu",
|
||||
|
||||
CHANNELS = 2
|
||||
RATE = 48000
|
||||
audio_buffer = AudioFifo()
|
||||
audio_buffer = av.AudioFifo()
|
||||
executor = ThreadPoolExecutor()
|
||||
transcription_text = ""
|
||||
last_transcribed_time = 0.0
|
||||
LLM_MACHINE_IP = config["DEFAULT"]["LLM_MACHINE_IP"]
|
||||
LLM_MACHINE_PORT = config["DEFAULT"]["LLM_MACHINE_PORT"]
|
||||
LLM_MACHINE_IP = config["LLM"]["LLM_MACHINE_IP"]
|
||||
LLM_MACHINE_PORT = config["LLM"]["LLM_MACHINE_PORT"]
|
||||
LLM_URL = f"http://{LLM_MACHINE_IP}:{LLM_MACHINE_PORT}/api/v1/generate"
|
||||
incremental_responses = []
|
||||
sorted_transcripts = SortedDict()
|
||||
|
||||
blacklisted_messages = [" Thank you.", " See you next time!",
|
||||
" Thank you for watching!", " Bye!",
|
||||
" And that's what I'm talking about."]
|
||||
|
||||
|
||||
def parse_llm_output(param: TitleSummaryInput, response: requests.Response) -> Any[None, ParseLLMResult]:
|
||||
try:
|
||||
output = json.loads(response.json()["results"][0]["text"])
|
||||
return ParseLLMResult(param, output).get_result()
|
||||
return ParseLLMResult(param, output)
|
||||
except Exception as e:
|
||||
logger.info("Exception" + str(e))
|
||||
return None
|
||||
@@ -65,33 +62,35 @@ def get_title_and_summary(param: TitleSummaryInput) -> Any[None, TitleSummaryOut
|
||||
json=param.data)
|
||||
output = parse_llm_output(param, response)
|
||||
if output:
|
||||
incremental_responses.append(output)
|
||||
return TitleSummaryOutput(incremental_responses).get_response()
|
||||
result = output.get_result()
|
||||
incremental_responses.append(result)
|
||||
return TitleSummaryOutput(incremental_responses)
|
||||
except Exception as e:
|
||||
logger.info("Exception" + str(e))
|
||||
return None
|
||||
|
||||
|
||||
def channel_log(channel, t, message):
|
||||
def channel_log(channel, t: str, message: str) -> NoReturn:
|
||||
logger.info("channel(%s) %s %s" % (channel.label, t, message))
|
||||
|
||||
|
||||
def channel_send(channel, message):
|
||||
def channel_send(channel, message: str) -> NoReturn:
|
||||
if channel:
|
||||
channel.send(message)
|
||||
|
||||
|
||||
def channel_send_increment(channel, message):
|
||||
if channel and message:
|
||||
def channel_send_increment(channel, param: Any[FinalSummaryResult, TitleSummaryOutput]) -> NoReturn:
|
||||
if channel and param:
|
||||
message = param.get_result()
|
||||
channel.send(json.dumps(message))
|
||||
|
||||
|
||||
def channel_send_transcript(channel):
|
||||
def channel_send_transcript(channel) -> NoReturn:
|
||||
# channel_log(channel, ">", message)
|
||||
if channel:
|
||||
try:
|
||||
least_time = sorted_transcripts.keys()[0]
|
||||
message = sorted_transcripts[least_time]
|
||||
message = sorted_transcripts[least_time].get_result()
|
||||
if message:
|
||||
del sorted_transcripts[least_time]
|
||||
if message["text"] not in blacklisted_messages:
|
||||
@@ -157,19 +156,19 @@ def get_transcription(input_frames: TranscriptionInput) -> Any[None, Transcripti
|
||||
logger.info("Exception" + str(e))
|
||||
pass
|
||||
|
||||
result = TranscriptionOutput(result_text).get_response()
|
||||
result = TranscriptionOutput(result_text)
|
||||
sorted_transcripts[input_frames.frames[0].time] = result
|
||||
return result
|
||||
|
||||
|
||||
def get_final_summary_response() -> Any[None, FinalSummaryResponse]:
|
||||
def get_final_summary_response() -> FinalSummaryResult:
|
||||
final_summary = ""
|
||||
|
||||
# Collate inc summaries
|
||||
for topic in incremental_responses:
|
||||
final_summary += topic["description"]
|
||||
|
||||
response = FinalSummaryResponse(final_summary, last_transcribed_time).get_response()
|
||||
response = FinalSummaryResult(final_summary, last_transcribed_time)
|
||||
|
||||
with open("./artefacts/meeting_titles_and_summaries.txt", "a") as f:
|
||||
f.write(json.dumps(incremental_responses))
|
||||
@@ -188,7 +187,7 @@ class AudioStreamTrack(MediaStreamTrack):
|
||||
super().__init__()
|
||||
self.track = track
|
||||
|
||||
async def recv(self):
|
||||
async def recv(self) -> av.audio.frame.AudioFrame:
|
||||
global transcription_text
|
||||
frame = await self.track.recv()
|
||||
audio_buffer.write(frame)
|
||||
@@ -222,7 +221,7 @@ class AudioStreamTrack(MediaStreamTrack):
|
||||
return frame
|
||||
|
||||
|
||||
async def offer(request):
|
||||
async def offer(request: requests.Request) -> web.Response:
|
||||
params = await request.json()
|
||||
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
|
||||
|
||||
@@ -230,40 +229,39 @@ async def offer(request):
|
||||
pc_id = "PeerConnection(%s)" % uuid.uuid4()
|
||||
pcs.add(pc)
|
||||
|
||||
def log_info(msg, *args):
|
||||
def log_info(msg, *args) -> NoReturn:
|
||||
logger.info(pc_id + " " + msg, *args)
|
||||
|
||||
log_info("Created for " + request.remote)
|
||||
|
||||
@pc.on("datachannel")
|
||||
def on_datachannel(channel):
|
||||
def on_datachannel(channel) -> NoReturn:
|
||||
global data_channel
|
||||
data_channel = channel
|
||||
channel_log(channel, "-", "created by remote party")
|
||||
|
||||
@channel.on("message")
|
||||
def on_message(message):
|
||||
def on_message(message: str) -> NoReturn:
|
||||
channel_log(channel, "<", message)
|
||||
if json.loads(message)["cmd"] == "STOP":
|
||||
# Place holder final summary
|
||||
# Placeholder final summary
|
||||
response = get_final_summary_response()
|
||||
channel_send_increment(data_channel, response)
|
||||
# To-do Add code to stop connection from server side here
|
||||
# But have to handshake with client once
|
||||
# pc.close()
|
||||
|
||||
if isinstance(message, str) and message.startswith("ping"):
|
||||
channel_send(channel, "pong" + message[4:])
|
||||
|
||||
@pc.on("connectionstatechange")
|
||||
async def on_connectionstatechange():
|
||||
async def on_connectionstatechange() -> NoReturn:
|
||||
log_info("Connection state is " + pc.connectionState)
|
||||
if pc.connectionState == "failed":
|
||||
await pc.close()
|
||||
pcs.discard(pc)
|
||||
|
||||
@pc.on("track")
|
||||
def on_track(track):
|
||||
def on_track(track) -> NoReturn:
|
||||
log_info("Track " + track.kind + " received")
|
||||
pc.addTrack(AudioStreamTrack(relay.subscribe(track)))
|
||||
|
||||
@@ -280,7 +278,7 @@ async def offer(request):
|
||||
)
|
||||
|
||||
|
||||
async def on_shutdown(app):
|
||||
async def on_shutdown(app) -> NoReturn:
|
||||
coros = [pc.close() for pc in pcs]
|
||||
await asyncio.gather(*coros)
|
||||
pcs.clear()
|
||||
|
||||
Reference in New Issue
Block a user