From 34b15e9543ec32530e3737217efa1c3dba163bca Mon Sep 17 00:00:00 2001 From: Dustin Thomas Date: Fri, 31 Jan 2025 21:09:21 -0600 Subject: [PATCH] make I/O non-blocking and handle dropped connections --- server.py | 60 ++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 44 insertions(+), 16 deletions(-) diff --git a/server.py b/server.py index 6290464..7e21c39 100644 --- a/server.py +++ b/server.py @@ -1,3 +1,4 @@ +from datetime import datetime from typing import Dict import cv2 @@ -12,37 +13,64 @@ UDP_PORT = 5005 sock = socket.socket(socket.AF_INET, # Internet socket.SOCK_DGRAM) # UDP +sock.setblocking(False) sock.bind((UDP_IP, UDP_PORT)) HEIGHT = 480 WIDTH = 640 -frames: Dict[str, np.ndarray] = {} +class Client: + def __init__(self): + self.last_updated = datetime.now() + self.frame = np.ndarray((HEIGHT, WIDTH, 3), dtype=np.uint8) + + def update(self, pkt: InterlacedPacket): + if pkt.even: + self.frame[y + 1:y + 32:2, x:x + 16] = arr + else: + self.frame[y:y + 32:2, x:x + 16] = arr + self.last_updated = datetime.now() + + def latency(self) -> float: + return (datetime.now() - self.last_updated).total_seconds() + + def read(self) -> np.ndarray: + return self.frame + + +frames: Dict[str, Client] = {} while True: # break the array down into 16-bit chunks, then transmit them as UDP packets - for i in range(0, HEIGHT, 16): - for j in range(0, WIDTH, 16): + for repeats in range(10000): + try: data, addr = sock.recvfrom(InterlacedPacket.size) # buffer size is 768 bytes # print("received packet from", addr) - pkt = from_bytes_int(data) + if data: + pkt = from_bytes_int(data) - uuid = str(pkt.uuid) - x = pkt.x - y = pkt.y - arr = pkt.array + uuid = str(pkt.uuid) + x = pkt.x + y = pkt.y + arr = pkt.array - if uuid not in frames.keys(): - frames[uuid] = frame = np.ndarray((HEIGHT, WIDTH, 3), dtype=np.uint8) + if uuid not in frames.keys(): + print("New client acquired, naming %s", uuid) + frames[uuid] = Client() - if pkt.even: - frames[uuid][y+1:y+32:2, x:x+16] = arr - else: - frames[uuid][y:y + 32:2, x:x + 16] = arr + frames[uuid].update(pkt) + + except BlockingIOError: + pass # Display the resulting frame - for id in frames.keys(): - cv2.imshow(id, frames[id]) + for id in list(frames.keys()): + if frames[id].latency() >= 5: + print("Client likely lost connection, dropping %s", id) + cv2.destroyWindow(id) + frames.pop(id) + else: + cv2.imshow(id, frames[id].read()) # Break the loop if 'q' key is pressed if cv2.waitKey(1) == ord('q'):