make I/O non-blocking and handle dropped connections
This commit is contained in:
		
							
								
								
									
										60
									
								
								server.py
									
									
									
									
									
								
							
							
						
						
									
										60
									
								
								server.py
									
									
									
									
									
								
							@@ -1,3 +1,4 @@
 | 
			
		||||
from datetime import datetime
 | 
			
		||||
from typing import Dict
 | 
			
		||||
 | 
			
		||||
import cv2
 | 
			
		||||
@@ -12,37 +13,64 @@ UDP_PORT = 5005
 | 
			
		||||
 | 
			
		||||
sock = socket.socket(socket.AF_INET, # Internet
 | 
			
		||||
                     socket.SOCK_DGRAM) # UDP
 | 
			
		||||
sock.setblocking(False)
 | 
			
		||||
sock.bind((UDP_IP, UDP_PORT))
 | 
			
		||||
 | 
			
		||||
HEIGHT = 480
 | 
			
		||||
WIDTH = 640
 | 
			
		||||
 | 
			
		||||
frames: Dict[str, np.ndarray] = {}
 | 
			
		||||
class Client:
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self.last_updated = datetime.now()
 | 
			
		||||
        self.frame = np.ndarray((HEIGHT, WIDTH, 3), dtype=np.uint8)
 | 
			
		||||
 | 
			
		||||
    def update(self, pkt: InterlacedPacket):
 | 
			
		||||
        if pkt.even:
 | 
			
		||||
            self.frame[y + 1:y + 32:2, x:x + 16] = arr
 | 
			
		||||
        else:
 | 
			
		||||
            self.frame[y:y + 32:2, x:x + 16] = arr
 | 
			
		||||
        self.last_updated = datetime.now()
 | 
			
		||||
 | 
			
		||||
    def latency(self) -> float:
 | 
			
		||||
        return (datetime.now() - self.last_updated).total_seconds()
 | 
			
		||||
 | 
			
		||||
    def read(self) -> np.ndarray:
 | 
			
		||||
        return self.frame
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
frames: Dict[str, Client] = {}
 | 
			
		||||
 | 
			
		||||
while True:
 | 
			
		||||
    # break the array down into 16-bit chunks, then transmit them as UDP packets
 | 
			
		||||
    for i in range(0, HEIGHT, 16):
 | 
			
		||||
        for j in range(0, WIDTH, 16):
 | 
			
		||||
    for repeats in range(10000):
 | 
			
		||||
        try:
 | 
			
		||||
            data, addr = sock.recvfrom(InterlacedPacket.size)  # buffer size is 768 bytes
 | 
			
		||||
            # print("received packet from", addr)
 | 
			
		||||
            pkt = from_bytes_int(data)
 | 
			
		||||
            if data:
 | 
			
		||||
                pkt = from_bytes_int(data)
 | 
			
		||||
 | 
			
		||||
            uuid = str(pkt.uuid)
 | 
			
		||||
            x = pkt.x
 | 
			
		||||
            y = pkt.y
 | 
			
		||||
            arr = pkt.array
 | 
			
		||||
                uuid = str(pkt.uuid)
 | 
			
		||||
                x = pkt.x
 | 
			
		||||
                y = pkt.y
 | 
			
		||||
                arr = pkt.array
 | 
			
		||||
 | 
			
		||||
            if uuid not in frames.keys():
 | 
			
		||||
                frames[uuid] = frame = np.ndarray((HEIGHT, WIDTH, 3), dtype=np.uint8)
 | 
			
		||||
                if uuid not in frames.keys():
 | 
			
		||||
                    print("New client acquired, naming %s", uuid)
 | 
			
		||||
                    frames[uuid] = Client()
 | 
			
		||||
 | 
			
		||||
            if pkt.even:
 | 
			
		||||
                frames[uuid][y+1:y+32:2, x:x+16] = arr
 | 
			
		||||
            else:
 | 
			
		||||
                frames[uuid][y:y + 32:2, x:x + 16] = arr
 | 
			
		||||
                frames[uuid].update(pkt)
 | 
			
		||||
 | 
			
		||||
        except BlockingIOError:
 | 
			
		||||
            pass
 | 
			
		||||
 | 
			
		||||
    # Display the resulting frame
 | 
			
		||||
    for id in frames.keys():
 | 
			
		||||
        cv2.imshow(id, frames[id])
 | 
			
		||||
    for id in list(frames.keys()):
 | 
			
		||||
        if frames[id].latency() >= 5:
 | 
			
		||||
            print("Client likely lost connection, dropping %s", id)
 | 
			
		||||
            cv2.destroyWindow(id)
 | 
			
		||||
            frames.pop(id)
 | 
			
		||||
        else:
 | 
			
		||||
            cv2.imshow(id, frames[id].read())
 | 
			
		||||
 | 
			
		||||
    # Break the loop if 'q' key is pressed
 | 
			
		||||
    if cv2.waitKey(1) == ord('q'):
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user