mirror of
https://github.com/Monadical-SAS/reflector.git
synced 2025-12-21 12:49:06 +00:00
move client files
This commit is contained in:
6
client-local/__init__.py
Normal file
6
client-local/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
import sys
|
||||
import os
|
||||
|
||||
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||
os.pardir))
|
||||
sys.path.append(parent_dir)
|
||||
72
client-local/client.py
Normal file
72
client-local/client.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import argparse
|
||||
import asyncio
|
||||
import signal
|
||||
|
||||
from aiortc.contrib.signaling import (add_signaling_arguments,
|
||||
create_signaling)
|
||||
|
||||
from ..utils.log_utils import logger
|
||||
from stream_client import StreamClient
|
||||
|
||||
|
||||
async def main():
|
||||
parser = argparse.ArgumentParser(description="Data channels ping/pong")
|
||||
|
||||
parser.add_argument(
|
||||
"--url", type=str, nargs="?", default="http://0.0.0.0:1250/offer"
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--ping-pong",
|
||||
help="Benchmark data channel with ping pong",
|
||||
type=eval,
|
||||
choices=[True, False],
|
||||
default="False",
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
"--play-from",
|
||||
type=str,
|
||||
default="",
|
||||
)
|
||||
add_signaling_arguments(parser)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
signaling = create_signaling(args)
|
||||
|
||||
async def shutdown(signal, loop):
|
||||
"""Cleanup tasks tied to the service's shutdown."""
|
||||
logger.info(f"Received exit signal {signal.name}...")
|
||||
logger.info("Closing database connections")
|
||||
logger.info("Nacking outstanding messages")
|
||||
tasks = [t for t in asyncio.all_tasks() if t is not
|
||||
asyncio.current_task()]
|
||||
|
||||
[task.cancel() for task in tasks]
|
||||
|
||||
logger.info(f"Cancelling {len(tasks)} outstanding tasks")
|
||||
await asyncio.gather(*tasks, return_exceptions=True)
|
||||
logger.info(f'{"Flushing metrics"}')
|
||||
loop.stop()
|
||||
|
||||
signals = (signal.SIGHUP, signal.SIGTERM, signal.SIGINT)
|
||||
loop = asyncio.get_event_loop()
|
||||
for s in signals:
|
||||
loop.add_signal_handler(
|
||||
s, lambda s=s: asyncio.create_task(shutdown(s, loop)))
|
||||
|
||||
# Init client
|
||||
sc = StreamClient(
|
||||
signaling=signaling,
|
||||
url=args.url,
|
||||
play_from=args.play_from,
|
||||
ping_pong=args.ping_pong
|
||||
)
|
||||
await sc.start()
|
||||
async for msg in sc.get_reader():
|
||||
print(msg)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
152
client-local/stream_client.py
Normal file
152
client-local/stream_client.py
Normal file
@@ -0,0 +1,152 @@
|
||||
import asyncio
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import httpx
|
||||
import pyaudio
|
||||
import requests
|
||||
import stamina
|
||||
from aiortc import (RTCPeerConnection, RTCSessionDescription)
|
||||
from aiortc.contrib.media import (MediaPlayer, MediaRelay)
|
||||
|
||||
from ..utils.log_utils import logger
|
||||
from ..utils.run_utils import config
|
||||
|
||||
|
||||
class StreamClient:
|
||||
def __init__(
|
||||
self,
|
||||
signaling,
|
||||
url="http://0.0.0.0:1250",
|
||||
play_from=None,
|
||||
ping_pong=False
|
||||
):
|
||||
self.signaling = signaling
|
||||
self.server_url = url
|
||||
self.play_from = play_from
|
||||
self.ping_pong = ping_pong
|
||||
self.paudio = pyaudio.PyAudio()
|
||||
|
||||
self.pc = RTCPeerConnection()
|
||||
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.relay = None
|
||||
self.pcs = set()
|
||||
self.time_start = None
|
||||
self.queue = asyncio.Queue()
|
||||
self.player = MediaPlayer(
|
||||
':' + str(config['DEFAULT']["AV_FOUNDATION_DEVICE_ID"]),
|
||||
format='avfoundation',
|
||||
options={'channels': '2'})
|
||||
|
||||
def stop(self):
|
||||
self.loop.run_until_complete(self.signaling.close())
|
||||
self.loop.run_until_complete(self.pc.close())
|
||||
# self.loop.close()
|
||||
|
||||
def create_local_tracks(self, play_from):
|
||||
if play_from:
|
||||
player = MediaPlayer(play_from)
|
||||
return player.audio, player.video
|
||||
else:
|
||||
if self.relay is None:
|
||||
self.relay = MediaRelay()
|
||||
return self.relay.subscribe(self.player.audio), None
|
||||
|
||||
def channel_log(self, channel, t, message):
|
||||
print("channel(%s) %s %s" % (channel.label, t, message))
|
||||
|
||||
def channel_send(self, channel, message):
|
||||
# self.channel_log(channel, ">", message)
|
||||
channel.send(message)
|
||||
|
||||
def current_stamp(self):
|
||||
if self.time_start is None:
|
||||
self.time_start = time.time()
|
||||
return 0
|
||||
else:
|
||||
return int((time.time() - self.time_start) * 1000000)
|
||||
|
||||
async def run_offer(self, pc, signaling):
|
||||
# microphone
|
||||
audio, video = self.create_local_tracks(self.play_from)
|
||||
pc_id = "PeerConnection(%s)" % uuid.uuid4()
|
||||
self.pcs.add(pc)
|
||||
|
||||
def log_info(msg, *args):
|
||||
logger.info(pc_id + " " + msg, *args)
|
||||
|
||||
@pc.on("connectionstatechange")
|
||||
async def on_connectionstatechange():
|
||||
print("Connection state is %s" % pc.connectionState)
|
||||
if pc.connectionState == "failed":
|
||||
await pc.close()
|
||||
self.pcs.discard(pc)
|
||||
|
||||
@pc.on("track")
|
||||
def on_track(track):
|
||||
print("Sending %s" % track.kind)
|
||||
self.pc.addTrack(track)
|
||||
|
||||
@track.on("ended")
|
||||
async def on_ended():
|
||||
log_info("Track %s ended", track.kind)
|
||||
|
||||
self.pc.addTrack(audio)
|
||||
|
||||
channel = pc.createDataChannel("data-channel")
|
||||
self.channel_log(channel, "-", "created by local party")
|
||||
|
||||
async def send_pings():
|
||||
while True:
|
||||
self.channel_send(channel, "ping %d" % self.current_stamp())
|
||||
await asyncio.sleep(1)
|
||||
|
||||
@channel.on("open")
|
||||
def on_open():
|
||||
if self.ping_pong:
|
||||
asyncio.ensure_future(send_pings())
|
||||
|
||||
@channel.on("message")
|
||||
def on_message(message):
|
||||
self.queue.put_nowait(message)
|
||||
if self.ping_pong:
|
||||
self.channel_log(channel, "<", message)
|
||||
|
||||
if isinstance(message, str) and message.startswith("pong"):
|
||||
elapsed_ms = (self.current_stamp() - int(message[5:])) \
|
||||
/ 1000
|
||||
print(" RTT %.2f ms" % elapsed_ms)
|
||||
|
||||
await pc.setLocalDescription(await pc.createOffer())
|
||||
|
||||
sdp = {
|
||||
"sdp": pc.localDescription.sdp,
|
||||
"type": pc.localDescription.type
|
||||
}
|
||||
|
||||
@stamina.retry(on=httpx.HTTPError, attempts=5)
|
||||
def connect_to_server():
|
||||
response = requests.post(self.server_url, json=sdp, timeout=10)
|
||||
response.raise_for_status()
|
||||
return response
|
||||
|
||||
params = connect_to_server().json()
|
||||
answer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
|
||||
await pc.setRemoteDescription(answer)
|
||||
|
||||
self.reader = self.worker(f'{"worker"}', self.queue)
|
||||
|
||||
def get_reader(self):
|
||||
return self.reader
|
||||
|
||||
async def worker(self, name, queue):
|
||||
while True:
|
||||
msg = await self.queue.get()
|
||||
yield msg
|
||||
self.queue.task_done()
|
||||
|
||||
async def start(self):
|
||||
coro = self.run_offer(self.pc, self.signaling)
|
||||
task = asyncio.create_task(coro)
|
||||
await task
|
||||
Reference in New Issue
Block a user