95 lines
3.3 KiB
Python
Executable File
95 lines
3.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# WebSocket server for communicating with all other nodes
|
|
|
|
"""import websockets
|
|
import asyncio
|
|
from util import fprint
|
|
|
|
class WebSocketServer:
|
|
def __init__(self):
|
|
self.start_server = websockets.serve(self.handler, "0.0.0.0", 9000)
|
|
self.received_messages = asyncio.Queue()
|
|
self.connected_clients = set()
|
|
|
|
async def handler(self, websocket, path):
|
|
self.connected_clients.add(websocket)
|
|
fprint(self.connected_clients)
|
|
try:
|
|
async for message in websocket:
|
|
fprint(message)
|
|
await self.received_messages.put(message)
|
|
finally:
|
|
self.connected_clients.remove(websocket)
|
|
|
|
async def send_message(self, message):
|
|
disconnected_clients = set()
|
|
for websocket in self.connected_clients:
|
|
try:
|
|
await websocket.send(message)
|
|
except websockets.exceptions.ConnectionClosed:
|
|
disconnected_clients.add(websocket)
|
|
self.connected_clients.difference_update(disconnected_clients)
|
|
|
|
def run(self):
|
|
fprint("Starting WebSocket server...")
|
|
asyncio.get_event_loop().run_until_complete(self.start_server)
|
|
#asyncio.run(self.start_server)
|
|
asyncio.get_event_loop().run_forever()
|
|
|
|
async def get_received_message(self):
|
|
if not self.received_messages.empty():
|
|
return await self.received_messages.get()
|
|
return None
|
|
|
|
# Function to be used by multiprocessing to run the server
|
|
def run_server():
|
|
server = WebSocketServer()
|
|
asyncio.run(server.run())"""
|
|
|
|
import asyncio
|
|
import websockets
|
|
from multiprocessing import Process, Queue
|
|
from util import fprint
|
|
import uuid
|
|
|
|
connected_clients = {}
|
|
|
|
async def handler(websocket, path, to_server_queue, from_server_queue):
|
|
# Register websocket connection
|
|
client_id = str(uuid.uuid4())
|
|
connected_clients[client_id] = websocket
|
|
try:
|
|
# Handle incoming messages
|
|
async for message in websocket:
|
|
#print(f"Received message: {message}")
|
|
print(client_id)
|
|
from_server_queue.put((client_id, message))
|
|
finally:
|
|
# Unregister websocket connection
|
|
if client_id in connected_clients:
|
|
del connected_clients[client_id]
|
|
print(f"Client {client_id} connection closed")
|
|
|
|
async def send_messages(to_server_queue):
|
|
while True:
|
|
if not to_server_queue.empty():
|
|
client_id, message = to_server_queue.get()
|
|
if client_id in connected_clients: # Send message to specific client
|
|
await connected_clients[client_id].send(message)
|
|
elif len(connected_clients) > 0: # Broadcast message to all clients
|
|
for client in connected_clients.values():
|
|
await client.send(message)
|
|
await asyncio.sleep(0.001)
|
|
|
|
def websocket_server(to_server_queue, from_server_queue):
|
|
start_server = websockets.serve(lambda ws, path: handler(ws, path, to_server_queue, from_server_queue), "0.0.0.0", 9000)
|
|
|
|
asyncio.get_event_loop().run_until_complete(start_server)
|
|
asyncio.get_event_loop().create_task(send_messages(to_server_queue))
|
|
asyncio.get_event_loop().run_forever()
|
|
|
|
def start_websocket_server(to_server_queue, from_server_queue):
|
|
p = Process(target=websocket_server, args=(to_server_queue, from_server_queue))
|
|
p.start()
|
|
return p |