from typing import Dict import cv2 import socket import numpy as np from datetime import datetime import asyncio import asyncudp import argparse from rich.console import Console from common import Packet, DoublyInterlacedPacket, from_bytes_dint class Client: """Class for tracking client state, including current frame data and time since last update.""" def __init__(self): self.last_updated = datetime.now() self.frame = np.ndarray((HEIGHT, WIDTH, 3), dtype=np.uint8) def update(self, pkt: Packet): """Apply a packet to the client frame. Update last client update to current time.""" self.frame = pkt.apply(self.frame) self.last_updated = datetime.now() def latency(self) -> float: """Return the time since the last client update.""" return (datetime.now() - self.last_updated).total_seconds() def read(self) -> np.ndarray: """Return the current frame.""" return self.frame # Dictionary of client states stored by UUID frames: Dict[str, Client] = {} async def show_frames(): """Asynchronous coroutine to display frames in OpenCV debug windows.""" while True: # drop clients that have not sent packets for > 5 seconds for id in list(frames.keys()): if frames[id].latency() >= 5: console.log(f"Client likely lost connection, dropping [bold red]{id}[/bold red]") cv2.destroyWindow(id) frames.pop(id) else: # show the latest available frame cv2.imshow(id, frames[id].read()) cv2.waitKey(1) # this is necessary to allow asyncio to swap between reading packets and rendering frames await asyncio.sleep(0.05) async def listen(ip: str, port: int): """Asynchronous coroutine to listen for / read client connections.""" sock = await asyncudp.create_socket(local_addr=(ip, port)) console.log("Ready to accept connections.", style="bold green") while True: # receive packets data, addr = await sock.recvfrom() if data: # convert the byte string into a packet object pkt = from_bytes_dint(data) uuid = str(pkt.uuid) # if this is a new client, give it a new image if uuid not in frames.keys(): console.log(f"New client acquired, naming [bold cyan]{uuid}[bold cyan]") frames[uuid] = Client() frames[uuid].update(pkt) if __name__ == "__main__": # argument parser parser = argparse.ArgumentParser(description="Proof-of-concept server for sauron-cv") parser.add_argument("-p", "--port", type=int, default=5005) parser.add_argument("-l", "--listen", type=str, default="0.0.0.0") parser.add_argument("-W", "--width", type=int, default=640) parser.add_argument("-H", "--height", type=int, default=480) args = parser.parse_args() # console console = Console() # assign constants based on argument parser UDP_IP = args.listen UDP_PORT = args.port HEIGHT = args.height WIDTH = args.width # create the async event loop loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # create async tasks for reading network packets, displaying windows loop.create_task(listen(UDP_IP, UDP_PORT)) loop.create_task(show_frames()) try: loop.run_forever() finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close() # Release the capture and close all windows cv2.destroyAllWindows()