#!/usr/bin/env python3 # WebSocket server for communicating with all other nodes """import websockets import asyncio from util import fprint class WebSocketServer: def __init__(self): self.start_server = websockets.serve(self.handler, "0.0.0.0", 9000) self.received_messages = asyncio.Queue() self.connected_clients = set() async def handler(self, websocket, path): self.connected_clients.add(websocket) fprint(self.connected_clients) try: async for message in websocket: fprint(message) await self.received_messages.put(message) finally: self.connected_clients.remove(websocket) async def send_message(self, message): disconnected_clients = set() for websocket in self.connected_clients: try: await websocket.send(message) except websockets.exceptions.ConnectionClosed: disconnected_clients.add(websocket) self.connected_clients.difference_update(disconnected_clients) def run(self): fprint("Starting WebSocket server...") asyncio.get_event_loop().run_until_complete(self.start_server) #asyncio.run(self.start_server) asyncio.get_event_loop().run_forever() async def get_received_message(self): if not self.received_messages.empty(): return await self.received_messages.get() return None # Function to be used by multiprocessing to run the server def run_server(): server = WebSocketServer() asyncio.run(server.run())""" import asyncio import websockets from multiprocessing import Process, Queue connected_clients = set() async def handler(websocket, path, to_server_queue, from_server_queue): # Register websocket connection connected_clients.add(websocket) try: # Handle incoming messages async for message in websocket: #print(f"Received message: {message}") from_server_queue.put(message) # Put received message into from_server_queue finally: # Unregister websocket connection connected_clients.remove(websocket) async def send_messages(to_server_queue): while True: if not to_server_queue.empty(): message = to_server_queue.get() if connected_clients: # Check if there are any connected clients #await asyncio.wait([client.send(message) for client in connected_clients]) #await [client.send(message) for client in connected_clients] for client in connected_clients: await client.send(message) await asyncio.sleep(0.1) # Prevent the loop from running too fast def websocket_server(to_server_queue, from_server_queue): start_server = websockets.serve(lambda ws, path: handler(ws, path, to_server_queue, from_server_queue), "localhost", 9000) asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().create_task(send_messages(to_server_queue)) asyncio.get_event_loop().run_forever() def start_websocket_server(to_server_queue, from_server_queue): p = Process(target=websocket_server, args=(to_server_queue, from_server_queue)) p.start() return p