#!/usr/bin/env python3 # WebSocket server for communicating with all other nodes """import websockets import asyncio from util import fprint class WebSocketServer: def __init__(self): self.start_server = websockets.serve(self.handler, "0.0.0.0", 9000) self.received_messages = asyncio.Queue() self.connected_clients = set() async def handler(self, websocket, path): self.connected_clients.add(websocket) fprint(self.connected_clients) try: async for message in websocket: fprint(message) await self.received_messages.put(message) finally: self.connected_clients.remove(websocket) async def send_message(self, message): disconnected_clients = set() for websocket in self.connected_clients: try: await websocket.send(message) except websockets.exceptions.ConnectionClosed: disconnected_clients.add(websocket) self.connected_clients.difference_update(disconnected_clients) def run(self): fprint("Starting WebSocket server...") asyncio.get_event_loop().run_until_complete(self.start_server) #asyncio.run(self.start_server) asyncio.get_event_loop().run_forever() async def get_received_message(self): if not self.received_messages.empty(): return await self.received_messages.get() return None # Function to be used by multiprocessing to run the server def run_server(): server = WebSocketServer() asyncio.run(server.run())""" import asyncio import websockets from multiprocessing import Process, Queue from util import fprint import uuid connected_clients = {} async def handler(websocket, path, to_server_queue, from_server_queue): # Register websocket connection client_id = str(uuid.uuid4()) connected_clients[client_id] = websocket try: # Handle incoming messages async for message in websocket: #print(f"Received message: {message}") print(client_id) from_server_queue.put((client_id, message)) finally: # Unregister websocket connection if client_id in connected_clients: del connected_clients[client_id] print(f"Client {client_id} connection closed") async def send_messages(to_server_queue): while True: if not to_server_queue.empty(): client_id, message = to_server_queue.get() if client_id in connected_clients: # Send message to specific client await connected_clients[client_id].send(message) elif len(connected_clients) > 0: # Broadcast message to all clients for client in connected_clients.values(): await client.send(message) await asyncio.sleep(0.001) def websocket_server(to_server_queue, from_server_queue): start_server = websockets.serve(lambda ws, path: handler(ws, path, to_server_queue, from_server_queue), "0.0.0.0", 9000) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().create_task(send_messages(to_server_queue)) asyncio.get_event_loop().run_forever() def start_websocket_server(to_server_queue, from_server_queue): p = Process(target=websocket_server, args=(to_server_queue, from_server_queue)) p.start() return p