from typing import Dict import cv2 import socket import numpy as np from datetime import datetime import asyncio from common import DoublyInterlacedPacket, from_bytes_dint # bind any IP address UDP_IP = "" UDP_PORT = 5005 HEIGHT = 480 WIDTH = 640 class Client: def __init__(self): self.last_updated = datetime.now() self.frame = np.ndarray((HEIGHT, WIDTH, 3), dtype=np.uint8) def update(self, pkt: DoublyInterlacedPacket): x = pkt.x y = pkt.y arr = pkt.array self.frame[y + pkt.even_y:y + 32:2, x + pkt.even_x:x + 32:2] = arr self.last_updated = datetime.now() def latency(self) -> float: return (datetime.now() - self.last_updated).total_seconds() def read(self) -> np.ndarray: return self.frame frames: Dict[str, Client] = {} async def read_packet(): while True: for i in range(0, 1200): # break the array down into 16-bit chunks, then transmit them as UDP packets try: data, addr = sock.recvfrom(DoublyInterlacedPacket.size) # buffer size is 768 bytes # print("received packet from", addr) if data: pkt = from_bytes_dint(data) uuid = str(pkt.uuid) if uuid not in frames.keys(): print("New client acquired, naming %s", uuid) frames[uuid] = Client() frames[uuid].update(pkt) except BlockingIOError: pass await asyncio.sleep(0.001) async def show_frames(): while True: # Display the resulting frame for id in list(frames.keys()): if frames[id].latency() >= 5: print("Client likely lost connection, dropping %s", id) cv2.destroyWindow(id) frames.pop(id) else: cv2.imshow(id, frames[id].read()) cv2.waitKey(1) await asyncio.sleep(0.01) if __name__ == "__main__": sock = socket.socket(socket.AF_INET, # Internet socket.SOCK_DGRAM) # UDP sock.setblocking(False) sock.bind((UDP_IP, UDP_PORT)) loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.create_task(read_packet()) loop.create_task(show_frames()) try: loop.run_forever() finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close() # Release the capture and close all windows cv2.destroyAllWindows()