use asyncudp to (hopefully) accelerate networking, also add a tiled image packet for testing

This commit is contained in:
Camryn Thomas 2025-02-01 13:24:42 -06:00
parent 1ad3ded60d
commit 8ba524087d
Signed by: cptlobster
GPG Key ID: 33D607425C830B4C
4 changed files with 92 additions and 56 deletions

View File

@ -5,7 +5,8 @@ import socket
import numpy as np import numpy as np
import uuid import uuid
from common import StdPacket, InterlacedPacket, DoublyInterlacedPacket from common import StdPacket, InterlacedPacket, DoublyInterlacedPacket, TiledImagePacket
def send_packet(sock, packet): def send_packet(sock, packet):
sock.sendto(packet, (UDP_IP, UDP_PORT)) sock.sendto(packet, (UDP_IP, UDP_PORT))
@ -48,6 +49,18 @@ def breakdown_image_dint(frame):
pkt = DoublyInterlacedPacket(uuid, j, i, j_even, i_even, frame[i + i_even:i + 32:2, j + j_even:j + 32:2]) pkt = DoublyInterlacedPacket(uuid, j, i, j_even, i_even, frame[i + i_even:i + 32:2, j + j_even:j + 32:2])
send_packet(sock, pkt.to_bytestr()) send_packet(sock, pkt.to_bytestr())
def breakdown_image_tiled(frame):
(cols, rows, colors) = frame.shape
# break the array into 16x32 chunks. we'll split those further into odd and even rows
# and send each as UDP packets. this should make packet loss less obvious
xslice = cols // 16
yslice = rows // 16
for i in range(0, xslice):
for j in range(0, yslice):
# print("Sending frame segment (%d, %d)", i, j)
pkt = TiledImagePacket(uuid, j, i, rows, cols, frame[i:cols:xslice, j:rows:yslice])
send_packet(sock, pkt.to_bytestr())
if __name__ == '__main__': if __name__ == '__main__':
# argument parser # argument parser
parser = argparse.ArgumentParser(description="Proof-of-concept client for sauron-cv") parser = argparse.ArgumentParser(description="Proof-of-concept client for sauron-cv")

View File

@ -133,4 +133,39 @@ def from_bytes_dint(b: bytes) -> DoublyInterlacedPacket:
even_y = bool.from_bytes(b[26:28]) even_y = bool.from_bytes(b[26:28])
array = np.frombuffer(b[28:], np.uint8).reshape(16, 16, 3) array = np.frombuffer(b[28:], np.uint8).reshape(16, 16, 3)
return DoublyInterlacedPacket(uuid, x, y, even_x, even_y, array) return DoublyInterlacedPacket(uuid, x, y, even_x, even_y, array)
class TiledImagePacket(Packet):
"""Distributed selection from image."""
size = 16 + 4 + 4 + 768
def __init__(self, uuid: UUID, x: int, y: int, width: int, height: int, array: np.ndarray):
super().__init__(uuid, x, y, array)
self.width = width
self.height = height
self.xslice = width // 16
self.yslice = height // 16
def to_bytestr(self) -> bytes:
bytestr = b""
bytestr += self.uuid.bytes
bytestr += self.x.to_bytes(length=4, signed=False)
bytestr += self.y.to_bytes(length=4, signed=False)
bytestr += self.array.tobytes()
return bytestr
def apply(self, image: np.ndarray) -> np.ndarray:
x = self.x
y = self.y
arr = self.array
image[y:self.height:self.yslice, x:self.width:self.xslice] = arr
return image
def from_bytes_tiled(b: bytes) -> TiledImagePacket:
"""Convert a byte string obtained via UDP into a packet object."""
uuid = UUID(bytes = b[0:16])
x = int.from_bytes(b[16:20], signed = False)
y = int.from_bytes(b[20:24], signed = False)
array = np.frombuffer(b[24:], np.uint8).reshape(16, 16, 3)
return TiledImagePacket(uuid, x, y, 640, 480, array)

View File

@ -1,3 +1,4 @@
opencv-python opencv-python
numpy numpy
rich rich
asyncudp

View File

@ -5,6 +5,7 @@ import socket
import numpy as np import numpy as np
from datetime import datetime from datetime import datetime
import asyncio import asyncio
import asyncudp
import argparse import argparse
from rich.console import Console from rich.console import Console
@ -34,33 +35,6 @@ class Client:
# Dictionary of client states stored by UUID # Dictionary of client states stored by UUID
frames: Dict[str, Client] = {} frames: Dict[str, Client] = {}
async def read_packet():
"""Asynchronous coroutine to read UDP packets from the client(s)."""
while True:
# we repeat this a ton of times all at once to hopefully capture all of the image data
for i in range(0, 1600):
try:
data, addr = sock.recvfrom(DoublyInterlacedPacket.size) # packet buffer size based on the packet size
# print("received packet from", addr)
if data:
pkt = from_bytes_dint(data)
uuid = str(pkt.uuid)
# if this is a new client, give it a new image
if uuid not in frames.keys():
console.log(f"New client acquired, naming [bold cyan]{uuid}[bold cyan]")
frames[uuid] = Client()
stat.update(f"[bold yellow]{len(frames.keys())}[/bold yellow] clients connected.")
frames[uuid].update(pkt)
except BlockingIOError:
pass
# this is necessary to allow asyncio to swap between reading packets and rendering frames
await asyncio.sleep(0.001)
async def show_frames(): async def show_frames():
"""Asynchronous coroutine to display frames in OpenCV debug windows.""" """Asynchronous coroutine to display frames in OpenCV debug windows."""
while True: while True:
@ -70,20 +44,43 @@ async def show_frames():
console.log(f"Client likely lost connection, dropping [bold red]{id}[/bold red]") console.log(f"Client likely lost connection, dropping [bold red]{id}[/bold red]")
cv2.destroyWindow(id) cv2.destroyWindow(id)
frames.pop(id) frames.pop(id)
stat.update(f"[bold yellow]{len(frames.keys())}[/bold yellow] clients connected.")
else: else:
# show the latest available frame # show the latest available frame
cv2.imshow(id, frames[id].read()) cv2.imshow(id, frames[id].read())
cv2.waitKey(1) cv2.waitKey(1)
# this is necessary to allow asyncio to swap between reading packets and rendering frames # this is necessary to allow asyncio to swap between reading packets and rendering frames
await asyncio.sleep(0.01) await asyncio.sleep(0.05)
async def listen(ip: str, port: int):
"""Asynchronous coroutine to listen for / read client connections."""
sock = await asyncudp.create_socket(local_addr=(ip, port))
console.log("Ready to accept connections.", style="bold green")
while True:
# receive packets
data, addr = await sock.recvfrom()
if data:
# convert the byte string into a packet object
pkt = from_bytes_dint(data)
uuid = str(pkt.uuid)
# if this is a new client, give it a new image
if uuid not in frames.keys():
console.log(f"New client acquired, naming [bold cyan]{uuid}[bold cyan]")
frames[uuid] = Client()
frames[uuid].update(pkt)
if __name__ == "__main__": if __name__ == "__main__":
# argument parser # argument parser
parser = argparse.ArgumentParser(description="Proof-of-concept server for sauron-cv") parser = argparse.ArgumentParser(description="Proof-of-concept server for sauron-cv")
parser.add_argument("-p", "--port", type=int, default=5005) parser.add_argument("-p", "--port", type=int, default=5005)
parser.add_argument("-l", "--listen", type=str, default="") parser.add_argument("-l", "--listen", type=str, default="0.0.0.0")
parser.add_argument("-W", "--width", type=int, default=640) parser.add_argument("-W", "--width", type=int, default=640)
parser.add_argument("-H", "--height", type=int, default=480) parser.add_argument("-H", "--height", type=int, default=480)
args = parser.parse_args() args = parser.parse_args()
@ -98,28 +95,18 @@ if __name__ == "__main__":
HEIGHT = args.height HEIGHT = args.height
WIDTH = args.width WIDTH = args.width
# create the UDP socket # create the async event loop
sock = socket.socket(socket.AF_INET, # Internet loop = asyncio.new_event_loop()
socket.SOCK_DGRAM) # UDP asyncio.set_event_loop(loop)
sock.setblocking(False)
sock.bind((UDP_IP, UDP_PORT))
console.log("Ready to accept connections.", style="bold green") # create async tasks for reading network packets, displaying windows
loop.create_task(listen(UDP_IP, UDP_PORT))
loop.create_task(show_frames())
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
with console.status("[bold yellow]0[/bold yellow] clients connected.", spinner="pong") as stat: # Release the capture and close all windows
cv2.destroyAllWindows()
# create the async event loop
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# create async tasks for reading network packets, displaying windows
loop.create_task(read_packet())
loop.create_task(show_frames())
try:
loop.run_forever()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
# Release the capture and close all windows
cv2.destroyAllWindows()