Add basic cable_map implementation

This commit is contained in:
Cole Deck 2024-03-27 20:33:21 -05:00
parent 1ec6d92cfa
commit dc1e568a96
3 changed files with 101 additions and 77 deletions

13
compose-search-only.yml Normal file
View File

@ -0,0 +1,13 @@
services:
meilisearch:
image: "getmeili/meilisearch:v1.6.2"
ports:
- "7700:7700"
environment:
MEILI_MASTER_KEY: fluffybunnyrabbit
MEILI_NO_ANALYTICS: true
volumes:
- "meili_data:/meili_data"
volumes:
meili_data:

157
run.py
View File

@ -97,7 +97,7 @@ def send_data(type, call, data, client_id="*"):
out["data"] = data out["data"] = data
to_server_queue.put((client_id, json.dumps(out))) to_server_queue.put((client_id, json.dumps(out)))
def start_server_socket(): def start_server_socket(cable_list):
global jbs global jbs
"""app = Flask(__name__) """app = Flask(__name__)
@ -146,80 +146,90 @@ def start_server_socket():
# if we get here, we have a "valid" data packet # if we get here, we have a "valid" data packet
data = decoded["data"] data = decoded["data"]
call = decoded["call"] call = decoded["call"]
match decoded["type"]: try:
case "log": match decoded["type"]:
fprint("log message") case "log":
if call == "send": fprint("log message")
fprint("webapp: " + str(data), sendqueue=to_server_queue) if call == "send":
elif call == "request": fprint("webapp: " + str(data), sendqueue=to_server_queue)
fprint("") elif call == "request":
case "cable_map": pass
fprint("cable_map message") case "cable_map":
if call == "send": fprint("cable_map message")
fprint("") if call == "send":
elif call == "request": pass
fprint("") elif call == "request":
case "ping": tmp = list()
fprint("Pong!!!") for idx in range(len(cable_list)):
# Lucas' notes if cable_list[idx] is not False:
# Add a ping pong :) response/handler tmp1 = {"part_number": cable_list[idx], "position": idx, "name": cable_list[idx], "brand": "Belden", "description": "Blah", "short_description": "Bla"}
# Add a get cable response/handler tmp.append(tmp1)
# this will tell the robot arm to move out = {"map": tmp}
# Call for turning off everything fprint(out)
# TODO Helper for converting Python Dictionaries to JSON send_data(decoded["type"], "send", out, client_id)
# make function: pythonData --> { { "type": "...", "call": "...", "data": pythonData } }
# to send: to_server_queue.put(("*", "JSON STRING HERE")) # replace * with UUID of client to send to one specific location case "ping":
fprint("Pong!!!")
# Lucas' notes
# Add a ping pong :) response/handler
# Add a get cable response/handler
# this will tell the robot arm to move
# Call for turning off everything
# TODO Helper for converting Python Dictionaries to JSON
# make function: pythonData --> { { "type": "...", "call": "...", "data": pythonData } }
case "cable_details": # to send: to_server_queue.put(("*", "JSON STRING HERE")) # replace * with UUID of client to send to one specific location
fprint("cable_details message")
if call == "send":
fprint("")
elif call == "request":
fprint("")
dataout = dict()
dataout["cables"] = list()
print(data)
if "part_number" in data:
for part in data["part_number"]:
#print(part)
#print(jbs.get_partnum(part))
dataout["cables"].append(jbs.get_partnum(part)["fullspecs"])
if "position" in data:
for pos in data["position"]:
#print(pos)
#print(jbs.get_position(str(pos)))
dataout["cables"].append(jbs.get_position(str(pos))["fullspecs"])
send_data(decoded["type"], "send", dataout, client_id)
case "cable_search": case "cable_details":
fprint("cable_search message") fprint("cable_details message")
if call == "send": if call == "send":
fprint("") pass
elif call == "request": elif call == "request":
fprint("") dataout = dict()
case "keyboard": dataout["cables"] = list()
fprint("keyboard message") print(data)
if call == "send": if "part_number" in data:
fprint("") for part in data["part_number"]:
elif call == "request": #print(part)
fprint("") #print(jbs.get_partnum(part))
if data["enabled"] == True: dataout["cables"].append(jbs.get_partnum(part)["fullspecs"])
# todo : send this to client if "position" in data:
p = Process(target=run_cmd, args=("./keyboard-up.ps1",)) for pos in data["position"]:
p.start() #print(pos)
elif data["enabled"] == False: #print(jbs.get_position(str(pos)))
p = Process(target=run_cmd, args=("./keyboard-down.ps1",)) dataout["cables"].append(jbs.get_position(str(pos))["fullspecs"])
p.start() send_data(decoded["type"], "send", dataout, client_id)
case "machine_settings":
fprint("machine_settings message")
if call == "send":
fprint("")
elif call == "request":
fprint("")
case _:
fprint("Unknown/unimplemented data type: " + decoded["type"])
case "cable_search":
fprint("cable_search message")
if call == "send":
pass
elif call == "request":
pass
case "keyboard":
fprint("keyboard message")
if call == "send":
pass
elif call == "request":
pass
if data["enabled"] == True:
# todo : send this to client
p = Process(target=run_cmd, args=("./keyboard-up.ps1",))
p.start()
elif data["enabled"] == False:
p = Process(target=run_cmd, args=("./keyboard-down.ps1",))
p.start()
case "machine_settings":
fprint("machine_settings message")
if call == "send":
pass
elif call == "request":
pass
case _:
fprint("Unknown/unimplemented data type: " + decoded["type"])
except Exception as e:
fprint(traceback.format_exc())
fprint(e)
@ -278,8 +288,7 @@ def setup_server(pool):
pool.apply_async(ledsys.init, callback=led_start_callback) pool.apply_async(ledsys.init, callback=led_start_callback)
#pool.apply_async(sensor_control.init, callback=sensor_start_callback) #pool.apply_async(sensor_control.init, callback=sensor_start_callback)
jbs = JukeboxSearch() jbs = JukeboxSearch()
serverproc = Process(target=start_server_socket)
serverproc.start()
if led_ready is False: if led_ready is False:
@ -445,6 +454,8 @@ def mainloop_server(pool):
fprint("All cables added to database.") fprint("All cables added to database.")
mode = "Idle" mode = "Idle"
serverproc = Process(target=start_server_socket, args=(cable_list,))
serverproc.start()
else: else:
# TODO: manual input # TODO: manual input
pass pass

View File

@ -6,7 +6,7 @@ from meilisearch.task import TaskInfo
from meilisearch.errors import MeilisearchApiError from meilisearch.errors import MeilisearchApiError
import time import time
DEFAULT_URL = "http://meilisearch:7700" DEFAULT_URL = "http://127.0.0.1:7700"
DEFAULT_APIKEY = "fluffybunnyrabbit" # I WOULD RECOMMEND SOMETHING MORE SECURE DEFAULT_APIKEY = "fluffybunnyrabbit" # I WOULD RECOMMEND SOMETHING MORE SECURE
DEFAULT_INDEX = "cables" DEFAULT_INDEX = "cables"
DEFAULT_FILTERABLE_ATTRS = ["partnum", "uuid", "position"] # default filterable attributes DEFAULT_FILTERABLE_ATTRS = ["partnum", "uuid", "position"] # default filterable attributes