from typing import Dict import cv2 import socket import numpy as np from datetime import datetime import asyncio import argparse from common import Packet, DoublyInterlacedPacket, from_bytes_dint class Client: """Class for tracking client state, including current frame data and time since last update.""" def __init__(self): self.last_updated = datetime.now() self.frame = np.ndarray((HEIGHT, WIDTH, 3), dtype=np.uint8) def update(self, pkt: Packet): """Apply a packet to the client frame. Update last client update to current time.""" self.frame = pkt.apply(self.frame) self.last_updated = datetime.now() def latency(self) -> float: """Return the time since the last client update.""" return (datetime.now() - self.last_updated).total_seconds() def read(self) -> np.ndarray: """Return the current frame.""" return self.frame # Dictionary of client states stored by UUID frames: Dict[str, Client] = {} async def read_packet(): """Asynchronous coroutine to read UDP packets from the client(s).""" while True: # we repeat this a ton of times all at once to hopefully capture all of the image data for i in range(0, 1200): try: data, addr = sock.recvfrom(DoublyInterlacedPacket.size) # packet buffer size based on the packet size # print("received packet from", addr) if data: pkt = from_bytes_dint(data) uuid = str(pkt.uuid) # if this is a new client, give it a new image if uuid not in frames.keys(): print("New client acquired, naming %s", uuid) frames[uuid] = Client() frames[uuid].update(pkt) except BlockingIOError: pass # this is necessary to allow asyncio to swap between reading packets and rendering frames await asyncio.sleep(0.001) async def show_frames(): """Asynchronous coroutine to display frames in OpenCV debug windows.""" while True: # drop clients that have not sent packets for > 5 seconds for id in list(frames.keys()): if frames[id].latency() >= 5: print("Client likely lost connection, dropping %s", id) cv2.destroyWindow(id) frames.pop(id) else: # show the latest available frame cv2.imshow(id, frames[id].read()) cv2.waitKey(1) # this is necessary to allow asyncio to swap between reading packets and rendering frames await asyncio.sleep(0.01) if __name__ == "__main__": # argument parser parser = argparse.ArgumentParser(description="Proof-of-concept server for sauron-cv") parser.add_argument("-p", "--port", type=int, default=5005) parser.add_argument("-l", "--listen", type=str, default="") parser.add_argument("-W", "--width", type=int, default=640) parser.add_argument("-H", "--height", type=int, default=480) args = parser.parse_args() # assign constants based on argument parser UDP_IP = args.listen UDP_PORT = args.port HEIGHT = args.height WIDTH = args.width # create the UDP socket sock = socket.socket(socket.AF_INET, # Internet socket.SOCK_DGRAM) # UDP sock.setblocking(False) sock.bind((UDP_IP, UDP_PORT)) # create the async event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # create async tasks for reading network packets, displaying windows loop.create_task(read_packet()) loop.create_task(show_frames()) try: loop.run_forever() finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close() # Release the capture and close all windows cv2.destroyAllWindows()