95 lines
3.3 KiB
Python
Executable File

#!/usr/bin/env python3
# WebSocket server for communicating with all other nodes
"""import websockets
import asyncio
from util import fprint
class WebSocketServer:
def __init__(self):
self.start_server = websockets.serve(self.handler, "0.0.0.0", 9000)
self.received_messages = asyncio.Queue()
self.connected_clients = set()
async def handler(self, websocket, path):
self.connected_clients.add(websocket)
fprint(self.connected_clients)
try:
async for message in websocket:
fprint(message)
await self.received_messages.put(message)
finally:
self.connected_clients.remove(websocket)
async def send_message(self, message):
disconnected_clients = set()
for websocket in self.connected_clients:
try:
await websocket.send(message)
except websockets.exceptions.ConnectionClosed:
disconnected_clients.add(websocket)
self.connected_clients.difference_update(disconnected_clients)
def run(self):
fprint("Starting WebSocket server...")
asyncio.get_event_loop().run_until_complete(self.start_server)
#asyncio.run(self.start_server)
asyncio.get_event_loop().run_forever()
async def get_received_message(self):
if not self.received_messages.empty():
return await self.received_messages.get()
return None
# Function to be used by multiprocessing to run the server
def run_server():
server = WebSocketServer()
asyncio.run(server.run())"""
import asyncio
import websockets
from multiprocessing import Process, Queue
from util import fprint
import uuid
connected_clients = {}
async def handler(websocket, path, to_server_queue, from_server_queue):
# Register websocket connection
client_id = str(uuid.uuid4())
connected_clients[client_id] = websocket
try:
# Handle incoming messages
async for message in websocket:
#print(f"Received message: {message}")
print(client_id)
from_server_queue.put((client_id, message))
finally:
# Unregister websocket connection
if client_id in connected_clients:
del connected_clients[client_id]
print(f"Client {client_id} connection closed")
async def send_messages(to_server_queue):
while True:
if not to_server_queue.empty():
client_id, message = to_server_queue.get()
if client_id in connected_clients: # Send message to specific client
await connected_clients[client_id].send(message)
elif len(connected_clients) > 0: # Broadcast message to all clients
for client in connected_clients.values():
await client.send(message)
await asyncio.sleep(0.001)
def websocket_server(to_server_queue, from_server_queue):
start_server = websockets.serve(lambda ws, path: handler(ws, path, to_server_queue, from_server_queue), "0.0.0.0", 9000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().create_task(send_messages(to_server_queue))
asyncio.get_event_loop().run_forever()
def start_websocket_server(to_server_queue, from_server_queue):
p = Process(target=websocket_server, args=(to_server_queue, from_server_queue))
p.start()
return p