From 3e7ba62d3a0238f47feb1a5f3cfe2f1b46ff9950 Mon Sep 17 00:00:00 2001 From: Dustin Thomas Date: Fri, 31 Jan 2025 21:58:09 -0600 Subject: [PATCH] doubly interlace video and make server asynchronous --- client.py | 17 ++++++++-- common.py | 33 +++++++++++++++++- server.py | 100 +++++++++++++++++++++++++++++++----------------------- 3 files changed, 104 insertions(+), 46 deletions(-) diff --git a/client.py b/client.py index 704475c..f0618a2 100644 --- a/client.py +++ b/client.py @@ -5,7 +5,7 @@ import uuid uuid = uuid.uuid4() -from common import StdPacket, InterlacedPacket +from common import StdPacket, InterlacedPacket, DoublyInterlacedPacket UDP_IP = "127.0.0.1" UDP_PORT = 5005 @@ -48,6 +48,19 @@ def breakdown_image_interlaced(frame): pkt = InterlacedPacket(uuid, j, i, True, frame[i + 1:i + 32:2, j:j + 16]) send_packet(sock, pkt.to_bytestr()) +def breakdown_image_dint(frame): + (cols, rows, colors) = frame.shape + # break the array into 16x32 chunks. we'll split those further into odd and even rows + # and send each as UDP packets. this should make packet loss less obvious + for l in range(0, 4): + for i in range(0, cols, 32): + for j in range(0, rows, 32): + # print("Sending frame segment (%d, %d)", i, j) + i_even = l % 2 == 0 + j_even = l >= 2 + pkt = DoublyInterlacedPacket(uuid, j, i, j_even, i_even, frame[i + i_even:i + 32:2, j + j_even:j + 32:2]) + send_packet(sock, pkt.to_bytestr()) + while True: # Capture frame-by-frame ret, frame = cap.read() @@ -57,7 +70,7 @@ while True: print("Can't receive frame (stream end?). Exiting ...") break - breakdown_image_interlaced(frame) + breakdown_image_dint(frame) # Release the capture and close all windows cap.release() \ No newline at end of file diff --git a/common.py b/common.py index b8bbef4..b64a0eb 100644 --- a/common.py +++ b/common.py @@ -52,4 +52,35 @@ def from_bytes_int(b: bytes) -> InterlacedPacket: even = bool.from_bytes(b[24:28]) array = np.frombuffer(b[28:], np.uint8).reshape(16, 16, 3) - return InterlacedPacket(uuid, x, y, even, array) \ No newline at end of file + return InterlacedPacket(uuid, x, y, even, array) + +class DoublyInterlacedPacket: + size = 16 + 4 + 4 + 4 + 768 + def __init__(self, uuid: UUID, x: int, y: int, even_x: bool, even_y: bool, array: np.ndarray): + self.uuid = uuid + self.x = x + self.y = y + self.even_x = even_x + self.even_y = even_y + self.array = array + + def to_bytestr(self) -> bytes: + bytestr = b"" + bytestr += self.uuid.bytes + bytestr += self.x.to_bytes(length=4, signed=False) + bytestr += self.y.to_bytes(length=4, signed=False) + bytestr += self.even_x.to_bytes(length=2) + bytestr += self.even_y.to_bytes(length=2) + bytestr += self.array.tobytes() + return bytestr + + +def from_bytes_dint(b: bytes) -> DoublyInterlacedPacket: + uuid = UUID(bytes=b[0:16]) + x = int.from_bytes(b[16:20], signed=False) + y = int.from_bytes(b[20:24], signed=False) + even_x = bool.from_bytes(b[24:26]) + even_y = bool.from_bytes(b[26:28]) + array = np.frombuffer(b[28:], np.uint8).reshape(16, 16, 3) + + return DoublyInterlacedPacket(uuid, x, y, even_x, even_y, array) \ No newline at end of file diff --git a/server.py b/server.py index 7e21c39..43aa8d5 100644 --- a/server.py +++ b/server.py @@ -1,21 +1,17 @@ -from datetime import datetime from typing import Dict import cv2 import socket import numpy as np +from datetime import datetime +import asyncio -from common import InterlacedPacket, from_bytes_int +from common import DoublyInterlacedPacket, from_bytes_dint # bind any IP address UDP_IP = "" UDP_PORT = 5005 -sock = socket.socket(socket.AF_INET, # Internet - socket.SOCK_DGRAM) # UDP -sock.setblocking(False) -sock.bind((UDP_IP, UDP_PORT)) - HEIGHT = 480 WIDTH = 640 @@ -24,11 +20,12 @@ class Client: self.last_updated = datetime.now() self.frame = np.ndarray((HEIGHT, WIDTH, 3), dtype=np.uint8) - def update(self, pkt: InterlacedPacket): - if pkt.even: - self.frame[y + 1:y + 32:2, x:x + 16] = arr - else: - self.frame[y:y + 32:2, x:x + 16] = arr + def update(self, pkt: DoublyInterlacedPacket): + x = pkt.x + y = pkt.y + arr = pkt.array + self.frame[y + pkt.even_y:y + 32:2, x + pkt.even_x:x + 32:2] = arr + self.last_updated = datetime.now() def latency(self) -> float: @@ -40,41 +37,58 @@ class Client: frames: Dict[str, Client] = {} -while True: - # break the array down into 16-bit chunks, then transmit them as UDP packets - for repeats in range(10000): - try: - data, addr = sock.recvfrom(InterlacedPacket.size) # buffer size is 768 bytes - # print("received packet from", addr) - if data: - pkt = from_bytes_int(data) +async def read_packet(): + while True: + for i in range(0, 1200): + # break the array down into 16-bit chunks, then transmit them as UDP packets + try: + data, addr = sock.recvfrom(DoublyInterlacedPacket.size) # buffer size is 768 bytes + # print("received packet from", addr) + if data: + pkt = from_bytes_dint(data) - uuid = str(pkt.uuid) - x = pkt.x - y = pkt.y - arr = pkt.array + uuid = str(pkt.uuid) - if uuid not in frames.keys(): - print("New client acquired, naming %s", uuid) - frames[uuid] = Client() + if uuid not in frames.keys(): + print("New client acquired, naming %s", uuid) + frames[uuid] = Client() - frames[uuid].update(pkt) + frames[uuid].update(pkt) - except BlockingIOError: - pass + except BlockingIOError: + pass - # Display the resulting frame - for id in list(frames.keys()): - if frames[id].latency() >= 5: - print("Client likely lost connection, dropping %s", id) - cv2.destroyWindow(id) - frames.pop(id) - else: - cv2.imshow(id, frames[id].read()) + await asyncio.sleep(0.001) - # Break the loop if 'q' key is pressed - if cv2.waitKey(1) == ord('q'): - break +async def show_frames(): + while True: + # Display the resulting frame + for id in list(frames.keys()): + if frames[id].latency() >= 5: + print("Client likely lost connection, dropping %s", id) + cv2.destroyWindow(id) + frames.pop(id) + else: + cv2.imshow(id, frames[id].read()) -# Release the capture and close all windows -cv2.destroyAllWindows() \ No newline at end of file + cv2.waitKey(1) + await asyncio.sleep(0.01) + +if __name__ == "__main__": + sock = socket.socket(socket.AF_INET, # Internet + socket.SOCK_DGRAM) # UDP + sock.setblocking(False) + sock.bind((UDP_IP, UDP_PORT)) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.create_task(read_packet()) + loop.create_task(show_frames()) + try: + loop.run_forever() + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) + loop.close() + + # Release the capture and close all windows + cv2.destroyAllWindows() \ No newline at end of file