This commit is contained in:
BlueOceanWave 2024-03-14 01:49:19 -05:00
commit 25ceb6c133
19 changed files with 1108 additions and 253 deletions

13
.gitignore vendored

@ -1,9 +1,18 @@
# python
venv
__pycache__
# cable data folder(s)
cables
cables-sample.zip
# meilisearch (mainly where I've put the data volume for the container)
meili_data
# IDE things
.vscode
output.log
.idea
# videos
*.webm
output.mp4
# log files
output.log
cables-sample.zip
# images
*.png

13
compose.yml Normal file

@ -0,0 +1,13 @@
services:
meilisearch:
image: "getmeili/meilisearch:v1.6.2"
ports:
- "7700:7700"
environment:
MEILI_MASTER_KEY: fluffybunnyrabbit
MEILI_NO_ANALYTICS: true
volumes:
- "meili_data:/meili_data"
volumes:
meili_data:

@ -159,7 +159,7 @@ led:
size: 24
diameter: 63.5
angle: 0
pos: [-65.936, 114.3]
pos: [-65.991, 114.3]
- type: circle
start: 336
size: 24
@ -420,6 +420,116 @@ led:
size: 70
length: 600
angle: 270 # down
pos: [300, 300]
pos: [375, 300]
global_position_offset: [0,0] # default coordinate spce below as center of arm at 0,0 - adjust if necessary
animation_time: 40
position_map:
- index: 0
pos: [-152.4, 263.965]
- index: 1
pos: [-76.2, 263.965]
- index: 2
pos: [0, 263.965]
- index: 3
pos: [76.2, 263.965]
- index: 4
pos: [152.4, 263.965]
- index: 5
pos: [-190.5, 197.973]
- index: 6
pos: [-114.3, 197.973]
- index: 7
pos: [-38.1, 197.973]
- index: 8
pos: [38.1, 197.973]
- index: 9
pos: [114.3, 197.973]
- index: 10
pos: [190.5, 197.973]
- index: 11
pos: [-228.6, 131.982]
- index: 12
pos: [-152.4, 131.982]
- index: 13
pos: [-76.2, 131.982]
- index: 14
pos: [0, 131.982]
- index: 15
pos: [76.2, 131.982]
- index: 16
pos: [152.4, 131.982]
- index: 17
pos: [228.6, 131.982]
- index: 18
pos: [-266.7, 65.991]
- index: 19
pos: [-190.5, 65.991]
- index: 20
pos: [-114.3, 65.991]
- index: 21
pos: [114.3, 65.991]
- index: 22
pos: [190.5, 65.991]
- index: 23
pos: [266.7, 65.991]
- index: 24
pos: [-304.8, 0]
- index: 25
pos: [-228.6, 0]
- index: 26
pos: [-152.4, 0]
- index: 27
pos: [152.4, 0]
- index: 28
pos: [228.6, 0]
- index: 29
pos: [304.8, 0]
- index: 30
pos: [-266.7, -65.991]
- index: 31
pos: [-190.5, -65.991]
- index: 32
pos: [-114.3, -65.991]
- index: 33
pos: [114.3, -65.991]
- index: 34
pos: [190.5, -65.991]
- index: 35
pos: [266.7, -65.991]
- index: 36
pos: [-228.6, -131.982]
- index: 37
pos: [-152.4, -131.982]
- index: 38
pos: [-76.2, -131.982]
- index: 39
pos: [0, -131.982]
- index: 40
pos: [76.2, -131.982]
- index: 41
pos: [152.4, -131.982]
- index: 42
pos: [228.6, -131.982]
- index: 43
pos: [-190.5, -197.973]
- index: 44
pos: [-114.3, -197.973]
- index: 45
pos: [-38.1, -197.973]
- index: 46
pos: [38.1, -197.973]
- index: 47
pos: [114.3, -197.973]
- index: 48
pos: [190.5, -197.973]
- index: 49
pos: [-152.4, -263.965]
- index: 50
pos: [-76.2, -263.965]
- index: 51
pos: [0, -263.965]
- index: 52
pos: [76.2, -263.965]
- index: 53
pos: [152.4, -263.965]

@ -1,140 +0,0 @@
"""This module contains functionality for interacting with a PostgreSQL database. It will automatically handle error
conditions (i.e. missing columns) without terminating the entire program. Use the :py:class:`DBConnector` class to
handle database interactions, either as a standalone object or in a context manager."""
from __future__ import annotations
import os
import psycopg2
from psycopg2 import DatabaseError, OperationalError
from psycopg2.errors import UndefinedColumn
DB_ADDRESS = os.getenv('DB_ADDRESS', 'localhost')
DB_PORT = os.getenv('DB_PORT', 5432)
DB_USER = os.getenv('DB_USER', 'postgres')
DB_PASSWORD = os.getenv('DB_PASSWORD', '')
DB_NAME = os.getenv('DB_NAME', 'postgres')
DB_TABLE = os.getenv('DB_TABLE', 'cables')
class DBConnector:
"""Context managed database class. Use with statements to automatically open and close the database connection, like
so:
.. code-block:: python
with DBConnector() as db:
db.read()
"""
def _db_start(self):
"""Setup the database connection and cursor."""
try:
self.conn = psycopg2.connect(
f"host={DB_ADDRESS} port={DB_PORT} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD}")
self.cur = self.conn.cursor()
except OperationalError as e:
raise e
def _db_stop(self):
"""Close the cursor and connection."""
self.cur.close()
self.conn.close()
def __init__(self):
self._db_start()
def __del__(self):
self._db_stop()
def __enter__(self):
self._db_start()
def __exit__(self):
self._db_stop()
def _get_cols(self) -> set[str]:
"""Get the list of columns in the database.
:return: A list of column names."""
query = f"select COLUMN_NAME from information_schema.columns where table_name={DB_TABLE}"
rows = {x["COLUMN_NAME"] for x in self._query(query)}
return rows
def _column_parity(self, columns: list[str] | set[str]) -> set[str]:
"""If the listed columns are not in the database, add them.
:param columns: The columns we expect are in the database.
:return: The list of columns in the database after querying."""
cols = set(columns)
existing = self._get_cols()
needs = cols.difference(existing.intersection(cols))
if len(needs) > 0:
query = f"ALTER TABLE {DB_TABLE} {', '.join([f'ADD COLUMN {c}' for c in needs])}"
self._query(query)
existing = self._get_cols()
return existing
def _query(self, sql) -> list[dict]:
"""Basic function for running queries.
:param sql: SQL query as plaintext.
:return: Results of the query, or an empty list if none."""
result = []
try:
self.cur.execute(sql)
result = self._read_dict()
except DatabaseError as e:
print(f"ERROR {e.pgcode}: {e.pgerror}\n"
f"Caused by query: {sql}")
finally:
return result
def _read_dict(self) -> list[dict]:
"""Read the cursor as a list of dictionaries. psycopg2 defaults to using a list of tuples, so we want to convert
each row into a dictionary before we return it."""
cols = [i.name for i in self.cur.description]
results = []
for row in self.cur:
row_dict = {}
for i in range(0, len(row)):
if row[i]:
row_dict = {**row_dict, cols[i]: row[i]}
results.append(row_dict)
return results
def read(self, **kwargs) -> list[dict]:
"""Read rows from a database that match the specified filters.
:param kwargs: Column constraints; i.e. what value to filter by in what column.
:returns: A list of dictionaries of all matching rows, or an empty list if no match."""
args = []
for kw in kwargs.keys():
args.append(f"{kw} ILIKE {kwargs['kw']}")
query = f"SELECT * FROM {DB_TABLE}"
if len(args) > 0:
query += f" WHERE {' AND '.join(args)}"
return self._query(query)
def write(self, **kwargs) -> dict:
"""Write a row to the database.
:param kwargs: Values to write for each database; specify each column separately!
:returns: The row you just added."""
self._column_parity(set(kwargs.keys()))
values = []
for val in kwargs.keys():
values.append(kwargs[val])
query = f"INSERT INTO {DB_TABLE} ({', '.join(kwargs.keys())}) VALUES ({', '.join(values)})"
self._query(query)
return kwargs
def write_all(self, items: list[dict]) -> list[dict]:
"""Write multiple rows to the database.
:param items: Rows to write, as a list of dictionaries.
:returns: The rows that were added successfully."""
successes = []
for i in items:
res0 = self.write(**i)
if res0:
successes.append(res0)
return successes

@ -5,7 +5,7 @@ import sys
import read_datasheet
from alive_progress import alive_bar
import requests
#import time
import time
import json
import subprocess
from util import fprint
@ -27,29 +27,78 @@ def check_internet(url='https://belden.com', timeout=5):
def query_search(partnum, source):
"""token_url = "https://www.belden.com/coveo/rest/token?t=" + str(int(time.time()))
with requests.get(token_url) as r:
out = json.loads(r.content)
token = out["token"]
search_url = "https://www.belden.com/coveo/rest/search"
search_data ='{ "q": "' + str(partnum) + '", "sortCriteria": "relevancy", "numberOfResults": "250", "sortCriteria": "@catalogitemwebdisplaypriority ascending", "searchHub": "products-only-search", "pipeline": "Site Search", "maximumAge": "900000", "tab": "products-search", "locale": "en" }'
#"aq": "", "cq": "((@z95xlanguage==en) (@z95xlatestversion==1) (@source==\\"Coveo_web_index - rg-nc-prod-sitecore-prod\\")) OR (@source==(\\"website_001002_catalog_index-rg-nc-prod-sitecore-prod\\",\\"website_001002_Category_index-rg-nc-prod-sitecore-prod\\"))", "firstResult": "0", "categoryFacets": "[{\\"field\\":\\"@catalogitemcategories\\",\\"path\\":[],\\"injectionDepth\\":1000,\\"maximumNumberOfValues\\":6,\\"delimitingCharacter\\":\\"|\\"}]", "facetOptions": "{}", "groupBy": "" }'
#fprint(search_data)
fprint(json.loads(search_data))
#search_data = '{ "q": "' + str(partnum) + '" }'
fprint(search_data)
headers = headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
with requests.post(search_url, headers=headers, data=search_data) as r:
fprint(r.text)"""
# TODO: Reimplement in python
# Bash script uses some crazy json formatting that I could not figure out
# Despite the fact that I wrote it
# So I'll just leave it, becuase it works.
if source == "Belden":
token_url = "https://www.belden.com/coveo/rest/token?t=" + str(int(time.time()))
with requests.get(token_url) as r:
out = json.loads(r.content)
token = out["token"]
search_url = "https://www.belden.com/coveo/rest/search"
# Ridiculous search parameters extracted from website. Do not touch
search_data = r"""{ "q": "{QUERY}", "sortCriteria": "relevancy", "numberOfResults": "250", "sortCriteria": "@catalogitemwebdisplaypriority ascending", "searchHub": "products-only-search", "pipeline": "Site Search", "maximumAge": "900000", "tab": "products-search", "locale": "en", "aq": "(NOT @z95xtemplate==(ADB6CA4F03EF4F47B9AC9CE2BA53FF97,FE5DD82648C6436DB87A7C4210C7413B)) ((@syssource==\"website_001002_catalog_index-rg-nc-prod-sitecore-prod\" @catalogitemprimarycategorypublished==true)) ((@catalogitemregionavailable=Global) (@z95xlanguage==en))", "cq": "((@z95xlanguage==en) (@z95xlatestversion==1) (@source==\"Coveo_web_index - rg-nc-prod-sitecore-prod\")) OR (@source==(\"website_001002_catalog_index-rg-nc-prod-sitecore-prod\",\"website_001002_Category_index-rg-nc-prod-sitecore-prod\"))", "firstResult": "0" }, "categoryFacets": "[{\"field\":\"@catalogitemcategories\",\"path\":[],\"injectionDepth\":1000,\"maximumNumberOfValues\":6,\"delimitingCharacter\":\"|\"}]", "facetOptions": "{}", "groupBy": " [{\"field\":\"@contenttype\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[\"Products\"],\"queryOverride\":\"{QUERY}\",\"advancedQueryOverride\":\"(NOT @z95xtemplate==(ADB6CA4F03EF4F47B9AC9CE2BA53FF97,FE5DD82648C6436DB87A7C4210C7413B)) ((((((((@z95xpath=3324AF2D58F64C0FB725521052F679D2 @z95xid<>3324AF2D58F64C0FB725521052F679D2) ((@z95xpath=C292F3A37B3A4E6BAB345DF87ADDE516 @z95xid<>C292F3A37B3A4E6BAB345DF87ADDE516) @z95xtemplate==E4EFEB787BDC4B1A908EFC64D56CB2A4)) OR ((@z95xpath=723501A864754FEEB8AE377E4C710271 @z95xid<>723501A864754FEEB8AE377E4C710271) ((@z95xpath=600114EAB0E5407A84AAA9F0985B6575 @z95xid<>600114EAB0E5407A84AAA9F0985B6575) @z95xtemplate==2BE4FD6B3B2C49EBBD9E1F6C92238B05))) OR (@syssource==\\"website_001002_catalog_index-rg-nc-prod-sitecore-prod\\" @catalogitemprimarycategorypublished==true)) OR ((@z95xpath=3324AF2D58F64C0FB725521052F679D2 @z95xid<>3324AF2D58F64C0FB725521052F679D2) @z95xpath<>C292F3A37B3A4E6BAB345DF87ADDE516)) OR @syssource==\\"website_001002_Category_index-rg-nc-prod-sitecore-prod\\") NOT @z95xtemplate==(ADB6CA4F03EF4F47B9AC9CE2BA53FF97,FE5DD82648C6436DB87A7C4210C7413B))) ((@catalogitemregionavailable=Global) (@z95xlanguage==en) OR (@contenttype=(Blogs,Resources,Other)) (NOT @ez120xcludefromcoveo==1))\",\"constantQueryOverride\":\"((@z95xlanguage==en) (@z95xlatestversion==1) (@source==\\"Coveo_web_index - rg-nc-prod-sitecore-prod\\")) OR (@source==(\\"website_001002_catalog_index-rg-nc-prod-sitecore-prod\\",\\"website_001002_Category_index-rg-nc-prod-sitecore-prod\\"))\"},{\"field\":\"@catalogitembrand\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@catalogitemenvironment\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@catalogitemregionalavailability\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@prez45xtez120xt\",\"maximumNumberOfValues\":5,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@tags\",\"maximumNumberOfValues\":4,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@facetassettype\",\"maximumNumberOfValues\":3,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@facetbrand\",\"maximumNumberOfValues\":3,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@facetmarket\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@facetsolution\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@facetsearchcontentpagetype\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]}]" }"""
search_data = search_data.replace(r"{QUERY}", partnum)
#"aq": "", "cq": "((@z95xlanguage==en) (@z95xlatestversion==1) (@source==\\"Coveo_web_index - rg-nc-prod-sitecore-prod\\")) OR (@source==(\\"website_001002_catalog_index-rg-nc-prod-sitecore-prod\\",\\"website_001002_Category_index-rg-nc-prod-sitecore-prod\\"))", "firstResult": "0", "categoryFacets": "[{\\"field\\":\\"@catalogitemcategories\\",\\"path\\":[],\\"injectionDepth\\":1000,\\"maximumNumberOfValues\\":6,\\"delimitingCharacter\\":\\"|\\"}]", "facetOptions": "{}", "groupBy": "" }'
#fprint(search_data)
#fprint(json.loads(search_data))
#search_data = '{ "q": "' + str(partnum) + '" }'
#fprint(search_data)
headers = headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
try:
with requests.post(search_url, headers=headers, data=search_data) as r:
a = r.text
a = json.loads(a)
idx = -1
name = ""
for partid in range(len(a["results"])):
name = a["results"][partid]["title"]
if name != partnum:
if name.find(partnum) >= 0:
idx = partid
break
elif partnum.find(name) >= 0:
idx = partid
break
else:
idx = partid
break
if idx < 0:
fprint("Could not find part in API: " + partnum)
return False
fprint("Search result found: result " + str(idx) + ", for ID " + name)
#urlname = a["results"][0]["raw"]["catalogitemurlname"]
img = a["results"][idx]["raw"]["catalogitemimageurl"]
img = img[0:img.index("?")]
uri = a["results"][idx]["raw"]["clickableuri"]
dsid = a["results"][idx]["raw"]["catalogitemdatasheetid"]
brand = a["results"][idx]["raw"]["catalogitembrand"]
desc = a["results"][idx]["raw"]["catalogitemlongdesc"]
shortdesc = a["results"][idx]["raw"]["catalogitemshortdesc"]
a = json.dumps(a["results"][idx], indent=2)
#print(a, urlname, img, uri, dsurl)
out = dict()
out["url"] = "https://www.belden.com/products/" + uri
out["datasheet"] = "https://catalog.belden.com/techdata/EN/" + dsid + "_techdata.pdf"
out["brand"] = brand
out["name"] = shortdesc
out["description"] = desc
out["image"] = "https://www.belden.com" + img
out["partnum"] = name
#print(out)
return out
except:
print("falied to search with API. Falling back to datasheet lookup.")
return False
# Original bash script
# superceded by above
if source == "Belden_shell":
command = ["./query-search.sh", partnum]
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0: # error
@ -66,27 +115,28 @@ def query_search(partnum, source):
#print(data)
try:
if data["Count"] > 0:
print(data["Results"][0]["Url"])
result = data["Results"][0]
if result["Url"].split("/")[-1] == partnum:
#print(partnum)
print(result["Html"])
try:
imgidx = result["Html"].index("<img src=") + 10
imgidx2 = result["Html"].index("?", imgidx)
output["image"] = result["Html"][imgidx:imgidx2]
if output["image"].index("http") != 0:
output["image"] = ""
#print(data["Results"][0]["Url"])
for result in data["Results"]:
if result["Url"].split("/")[-1] == partnum:
#print(partnum)
#print(result["Html"])
try:
imgidx = result["Html"].index("<img src=") + 10
imgidx2 = result["Html"].index("?", imgidx)
output["image"] = result["Html"][imgidx:imgidx2]
if output["image"].index("http") != 0:
output["image"] = ""
print("No cable image found.")
except:
print("No cable image found.")
except:
print("No cable image found.")
dsidx = result["Html"].index("<a href=\"/disteAPI/") + 9
dsidx2 = result["Html"].index(partnum, dsidx) + len(partnum)
output["datasheet"] = "https://www.alphawire.com" + result["Html"][dsidx:dsidx2]
#"test".index()
print(output)
return output
dsidx = result["Html"].index("<a href=\"/disteAPI/") + 9
dsidx2 = result["Html"].index(partnum, dsidx) + len(partnum)
output["datasheet"] = "https://www.alphawire.com" + result["Html"][dsidx:dsidx2]
output["partnum"] = partnum
#"test".index()
#print(output)
return output
except:
@ -100,7 +150,7 @@ def touch(path):
def get_multi(partnums):
def get_multi(partnums, delay=0.25):
with alive_bar(len(partnums) * 2, dual_line=True, calibrate=30, bar="classic2", spinner="classic") as bar:
def _try_download_datasheet(partnum, output_dir): # Guess datasheet URL
@ -190,10 +240,17 @@ def get_multi(partnums):
fprint("Using cached datasheet for " + partnum)
bar.text = "Using cached datasheet for " + partnum
bar(skipped=True)
fprint("Parsing Datasheet contents of " + partnum)
bar.text = "Parsing Datasheet contents of " + partnum + ".pdf..."
read_datasheet.parse(path, output_dir, partnum, dstype)
bar(skipped=False)
if not os.path.exists(output_dir + "/parsed"):
fprint("Parsing Datasheet contents of " + partnum)
bar.text = "Parsing Datasheet contents of " + partnum + ".pdf..."
read_datasheet.parse(path, output_dir, partnum, dstype)
bar(skipped=False)
else:
fprint("Datasheet already parsed for " + partnum)
bar.text = "Datasheet already parsed for " + partnum + ".pdf"
bar(skipped=True)
def __downloaded_datasheet(partnum, path, output_dir, dstype):
fprint("Downloaded " + path)
@ -204,13 +261,7 @@ def get_multi(partnums):
read_datasheet.parse(path, output_dir, partnum, dstype)
bar(skipped=False)
for fullpartnum in partnums:
if fullpartnum[0:2] == "BL": # catalog.belden.com entry\
partnum = fullpartnum[2:]
dstype = "Belden"
elif fullpartnum[0:2] == "AW":
partnum = fullpartnum[2:]
dstype = "Alphawire"
def run_search(partnum):
output_dir = "cables/" + partnum
path = output_dir + "/datasheet.pdf"
bartext = "Downloading files for part " + partnum
@ -218,10 +269,16 @@ def get_multi(partnums):
#
if (not os.path.exists(output_dir + "/found_part_hires")) or not (os.path.exists(path) and os.path.getsize(path) > 1):
# Use query
search_result = query_search(partnum.replace(" ", ""), dstype)
search_result = query_search(partnum, dstype)
# Try to use belden.com search
if search_result is not False:
# Download high resolution part image if available and needed
partnum = search_result["partnum"]
output_dir = "cables/" + partnum
path = output_dir + "/datasheet.pdf"
bartext = "Downloading files for part " + partnum
bar.text = bartext
if not os.path.exists(output_dir + "/found_part_hires"):
if _download_image(search_result["image"], output_dir):
fprint("Downloaded hi-res part image for " + partnum)
@ -245,17 +302,48 @@ def get_multi(partnums):
# Failed to download with search or guess :(
else:
fprint("Failed to download datasheet for part " + partnum)
bar.text = "Failed to download datasheet for part " + partnum
failed.append(partnum)
bar(skipped=True)
bar(skipped=True)
return False
return True
# We already have a hi-res image and the datasheet - perfect!
else:
fprint("Using cached hi-res part image for " + partnum)
__use_cached_datasheet(partnum, path, output_dir, dstype)
return True
for fullpartnum in partnums:
if fullpartnum[0:2] == "BL": # catalog.belden.com entry
partnum = fullpartnum[2:]
dstype = "Belden"
elif fullpartnum[0:2] == "AW":
partnum = fullpartnum[2:]
dstype = "Alphawire"
else:
dstype = "Belden" # guess
partnum = fullpartnum
if not run_search(partnum):
success = False
if len(partnum.split(" ")) > 1:
for name in partnum.split(" "):
fprint("Retrying with alternate name: " + name)
if(run_search(name)):
success = True
break
time.sleep(delay)
if not success:
namestripped = partnum.strip(" ")
fprint("Retrying with alternate name: " + namestripped)
if(run_search(namestripped)):
success = True
time.sleep(delay)
if not success:
fprint("Failed to download datasheet for part " + partnum)
bar.text = "Failed to download datasheet for part " + partnum
failed.append(partnum)
bar(skipped=True)
bar(skipped=True)
time.sleep(delay)
if len(failed) > 0:
fprint("Failed to download:")
for partnum in failed:
@ -268,22 +356,73 @@ def get_multi(partnums):
if __name__ == "__main__":
partnums = ["BL7958A", "BL10GXS12", "BLRST 5L-RKT 5L-949",
"BL10GXS13",
"BL10GXW12",
"BL10GXW13",
"BL2412",
"BL2413",
"BLOSP6AU",
"BLFI4D024P9",
"BLFISD012R9",
"BLFDSD012A9",
"BLFSSL024NG",
"BLFISX006W0",
"BLFISX00103",
"BLC6D1100007"
# partnums = ["BLFISX012W0", "BL7958A", "BL10GXS12", "BLRST 5L-RKT 5L-949",
# "BL10GXS13",
# "BL10GXW12",
# "BL10GXW13",
# "BL2412",
# "BL2413",
# "BLOSP6AU",
# "BLFI4D024P9",
# "BLFISD012R9",
# "BLFDSD012A9",
# "BLFSSL024NG",
# "BLFISX006W0",
# "BLFISX00103",
# "BLC6D1100007"
# ]
partnums = [
# Actual cables in Jukebox
"AW86104CY",
"AW3050",
"AW6714",
"AW1172C",
"AW2211/4",
"BLTF-1LF-006-RS5N",
"BLTF-SD9-006-RI5N",
"BLTT-SLG-024-HTNN",
"BLFISX012W0",
"BLFI4X012W0",
"BLSPE101 006Q",
"BLSPE102 006Q",
"BL7922A 010Q",
"BL7958A 008Q",
"BLIOP6U 010Q",
"BL10GXW13 D15Q",
"BL10GXW53 D15Q",
"BL29501F 010Q",
"BL29512 010Q",
"BL3106A 010Q",
"BL9841 060Q",
"BL3105A 010Q",
"BL3092A 010Q",
"BL8760 060Q",
"BL6300UE 008Q",
"BL6300FE 009Q",
"BLRA500P 006Q",
# Some ones I picked, including some invalid ones
"BL10GXS12",
"BLRST 5L-RKT 5L-949",
"BL10GXS13",
"BL10GXW12",
"BL10GXW13",
"BL2412",
"BL2413",
"BLOSP6AU",
"BLFI4D024P9",
"BLFISD012R9",
"BLFDSD012A9",
"BLFSSL024NG",
"BLFISX006W0",
"BLFISX00103",
"BLC6D1100007"
]
get_multi(partnums)
#query_search("3248", "Alphawire")
#query_search("86104CY", "Alphawire")
get_multi(partnums, 0.25)
#query_search("10GXS13", "Belden")

@ -1,4 +1,5 @@
#!/bin/sh
# change this to #!/bin/bash for windows
if ! [ -d "venv" ]; then
./venv-setup.sh

@ -22,6 +22,13 @@ leds_size = None
leds_normalized = None
controllers = None
data = None
exactdata = None
rings = None
ringstatus = None
mode = "Startup"
firstrun = True
changecount = 0
animation_time = 0
start = uptime()
def ping(host):
@ -48,30 +55,50 @@ def map():
global leds_size
global leds_normalized
global controllers
global rings
global ringstatus
global animation_time
with open('config.yml', 'r') as fileread:
#global config
config = yaml.safe_load(fileread)
animation_time = config["animation_time"]
leds = list()
leds_size = list()
controllers = list()
rings = list(range(len(config["position_map"])))
ringstatus = list(range(len(config["position_map"])))
#print(rings)
#fprint(config["led"]["map"])
generate_map = False
map = list()
for shape in config["led"]["map"]:
if shape["type"] == "circle":
if generate_map:
map.append((shape["pos"][1],shape["pos"][0]))
#fprint(shape["pos"])
anglediv = 360.0 / shape["size"]
angle = 0
radius = shape["diameter"] / 2
lednum = shape["start"]
for item in config['position_map']:
# Check if the current item's position matches the target position
#print(item['pos'],(shape["pos"][1],shape["pos"][0]))
if tuple(item['pos']) == (shape["pos"][1],shape["pos"][0]):
rings[item["index"]] = (shape["pos"][1],shape["pos"][0],lednum,lednum+shape["size"]) # rings[index] = x, y, startpos, endpos
ringstatus[item["index"]] = [None, None]
break
if len(leds) < lednum + shape["size"]:
for x in range(lednum + shape["size"] - len(leds)):
leds.append(None)
leds_size.append(None)
while angle < 359.999:
tmpangle = angle + shape["angle"]
x = math.cos(tmpangle * (math.pi / 180.0)) * radius + shape["pos"][0]
y = math.sin(tmpangle * (math.pi / 180.0)) * radius + shape["pos"][1]
x = math.cos(tmpangle * (math.pi / 180.0)) * radius + shape["pos"][1] # flip by 90 degress when we changed layout
y = math.sin(tmpangle * (math.pi / 180.0)) * radius + shape["pos"][0]
leds[lednum] = (x,y)
lednum = lednum + 1
angle = angle + anglediv
@ -97,7 +124,23 @@ def map():
dist += distdiv
lednum = lednum + 1
if generate_map:
map = sorted(map, key=lambda x: (-x[1], x[0]))
print(map)
import matplotlib.pyplot as plt
plt.axis('equal')
x, y = zip(*map)
plt.scatter(x, y, s=12)
#plt.plot(x, y, marker='o')
#plt.scatter(*zip(*leds), s=3)
for i, (x_pos, y_pos) in enumerate(map):
plt.text(x_pos, y_pos, str(i), color="red", fontsize=12)
plt.savefig("map2.png", dpi=600, bbox_inches="tight")
data = {"map": [{"index": i, "pos": str(list(pos))} for i, pos in enumerate(map)]}
yaml_str = yaml.dump(data, default_flow_style=False)
print(yaml_str)
print(rings)
flag = 0
for x in leds:
if x is None:
@ -148,9 +191,10 @@ def init():
global leds_size
global controllers
global data
global exactdata
sender = sacn.sACNsender(fps=config["led"]["fps"], universeDiscovery=False)
sender.start() # start the sending thread
for x in range(len(controllers)):
"""for x in range(len(controllers)):
print("Waiting for the controller at", controllers[x][2], "to be online...", end="")
count = 0
while not ping(controllers[x][2]):
@ -159,7 +203,7 @@ def init():
fprint(" ERROR: controller still offline after " + str(count) + " seconds, continuing...")
break
if count < config["led"]["timeout"]:
fprint(" done")
fprint(" done")"""
for x in range(len(controllers)):
print("Activating controller", x, "at", controllers[x][2], "with", controllers[x][1]-controllers[x][0], "LEDs.")
sender.activate_output(x+1) # start sending out data
@ -168,22 +212,26 @@ def init():
# initialize global pixel data list
data = list()
exactdata = list()
for x in range(len(leds)):
if leds_size[x] == 3:
exactdata.append(None)
data.append((20,20,127))
elif leds_size[x] == 4:
exactdata.append(None)
data.append((50,50,255,0))
else:
exactdata.append(None)
data.append((0,0,0))
sendall(data)
#time.sleep(50000)
fprint("Running start-up test sequence...")
for y in range(100):
for y in range(1):
for x in range(len(leds)):
setpixel(0,0,150,x)
setpixel(0,60,144,x)
sendall(data)
#time.sleep(2)
#alloffsmooth()
alloffsmooth()
def sendall(datain):
# send all LED data to all controllers
@ -260,6 +308,209 @@ def setpixelnow(r, g, b, num):
setpixel(r,g,b,num)
senduniverse(data, num)
def setmode(stmode, r=0,g=0,b=0):
global mode
global firstrun
if stmode is not None:
if mode != stmode:
firstrun = True
mode = stmode
def setring(r,g,b,idx):
ring = rings[idx]
for pixel in range(ring[2],ring[3]):
setpixel(r,g,b,pixel)
#global data
#senduniverse(data, ring[2])
def runmodes(ring = -1, speed = 1):
global mode
global firstrun
global changecount
fprint("Mode: " + str(mode))
if mode == "Startup":
# loading animation. cable check
if firstrun:
changecount = animation_time * 3
firstrun = False
for x in range(len(ringstatus)):
ringstatus[x] = [True, animation_time]
if changecount > 0:
fprint(changecount)
changecount = fadeorder(0,len(leds), changecount, 0,50,100)
else:
setmode("Startup2")
elif mode == "Startup2":
if firstrun:
firstrun = False
else:
for x in range(len(ringstatus)):
if ringstatus[x][0]:
setring(0, 50, 100, x)
else:
ringstatus[x][1] = fadeall(rings[x][2],rings[x][3], ringstatus[x][1], 100,0,0) # not ready
elif mode == "StartupCheck":
if firstrun:
firstrun = False
for x in range(len(ringstatus)):
ringstatus[x] = [False, animation_time]
else:
for x in range(len(ringstatus)):
if ringstatus[x][0]:
ringstatus[x][1] = fadeall(rings[x][2],rings[x][3], ringstatus[x][1], 0,50,100) # ready
else:
setring(100, 0, 0, x)
elif mode == "GrabA":
if firstrun:
firstrun = False
changecount = animation_time # 100hz
if changecount > 0:
changecount = fadeall(rings[ring][2],rings[ring][3], changecount, 100,0,0)
else:
setring(100,0,0,ring)
setmode("GrabB")
elif mode == "GrabB":
if firstrun:
firstrun = False
changecount = animation_time # 100hz
if changecount > 0:
changecount = fadeorder(rings[ring][2],rings[ring][3], changecount, 0,100,0)
else:
setring(0,100,0,ring)
setmode("idle")
elif mode == "GrabC":
if firstrun:
firstrun = False
changecount = animation_time # 100hz
if changecount > 0:
changecount = fadeall(rings[ring][2],rings[ring][3], changecount, 0,50,100)
else:
setring(0,50,100,ring)
setmode("idle")
elif mode == "idle":
time.sleep(0)
sendall(data)
def fadeall(idxa,idxb,sizerem,r,g,b):
if sizerem < 1:
return 0
global exactdata
sum = 0
for x in range(idxa,idxb):
if exactdata[x] is None:
exactdata[x] = data[x]
old = exactdata[x]
dr = (r - old[0])/sizerem
sum += abs(dr)
dr += old[0]
dg = (g - old[1])/sizerem
sum += abs(dg)
dg += old[1]
db = (b - old[2])/sizerem
db += old[2]
sum += abs(db)
exactdata[x] = (dr, dg, db)
#print(new)
setpixel(dr, dg, db, x)
if sizerem == 1:
exactdata[x] = None
if sum == 0 and sizerem > 2:
sizerem = 2
return sizerem - 1
def fadeorder(idxa,idxb,sizerem,r,g,b):
if sizerem < 1:
return 0
global exactdata
drs = 0
dgs = 0
dbs = 0
sum = 0
for x in range(idxa,idxb):
if exactdata[x] is None:
exactdata[x] = data[x]
old = exactdata[x]
dr = (r - old[0])
dg = (g - old[1])
db = (b - old[2])
drs += dr
dgs += dg
dbs += db
drs /= sizerem
dgs /= sizerem
dbs /= sizerem
sum += abs(drs) + abs(dgs) + abs(dbs)
print(drs,dgs,dbs)
for x in range(idxa,idxb):
old = exactdata[x]
new = list(old)
if drs > 0:
if old[0] + drs > r:
new[0] = r
drs -= r - old[0]
else:
new[0] = old[0] + drs
drs = 0
if dgs > 0:
if old[1] + dgs > g:
new[1] = g
dgs -= g - old[1]
else:
new[1] = old[1] + dgs
dgs = 0
if dbs > 0:
if old[2] + dbs > b:
new[2] = b
dbs -= b - old[2]
else:
new[2] = old[2] + dbs
dbs = 0
if drs < 0:
if old[0] + drs < r:
new[0] = r
drs -= r - old[0]
else:
new[0] = old[0] + drs
drs = 0
if dgs < 0:
if old[1] + dgs < g:
new[1] = g
dgs -= g - old[1]
else:
new[1] = old[1] + dgs
dgs = 0
if dbs < 0:
if old[2] + dbs < b:
new[2] = b
dbs -= b - old[2]
else:
new[2] = old[2] + dbs
dbs = 0
if drs != 0 or dgs != 0 or dbs != 0:
exactdata[x] = new
setpixel(new[0],new[1],new[2],x)
if sizerem == 1:
exactdata[x] = None
if sum == 0 and sizerem > 2:
sizerem = 2
return sizerem - 1
def setpixel(r, g, b, num):
global data
global leds_size
@ -328,16 +579,105 @@ def mapimage(image, fps=90):
global data
fastsendall(data)
def mainloop(stmode, ring = -1, fps = 100, preview = False):
global start
while uptime() - start < 1/fps:
time.sleep(0.00001)
fprint(1 / (uptime() - start))
start = uptime()
if mode is not None:
setmode(stmode)
runmodes(ring)
if preview:
drawdata()
def drawdata():
#tmp = list()
#for x in len(leds):
# led = leds[x]
# tmp.append((led[0], led[1], data[x]))
x = [led[0] for led in leds]
y = [led[1] for led in leds]
colors = data
colors_normalized = [(x[0]/255, x[1]/255, x[2]/255) for x in colors]
# Plot the points
plt.scatter(x, y, c=colors_normalized)
# Optional: add grid, title, and labels
plt.grid(True)
plt.title('Colored Points')
plt.xlabel('X')
plt.ylabel('Y')
plt.show()
plt.savefig("map3.png", dpi=50, bbox_inches="tight")
plt.clf()
def startup_animation(show):
stmode = "Startup"
mainloop(stmode, preview=show)
while mode == "Startup":
mainloop(None, preview=show)
for x in range(54):
ringstatus[x][0] = False
mainloop(None, preview=show)
for x in range(animation_time):
mainloop(None, preview=show)
clear_animations()
stmode = "StartupCheck"
mainloop(stmode, preview=show)
clear_animations()
def clear_animations():
for x in range(len(leds)):
exactdata[x] = None
def do_animation(stmode, ring=-1):
mainloop(stmode, ring, preview=show)
wait_for_animation(ring)
def start_animation(stmode, ring=-1):
mainloop(stmode, ring, preview=show)
def wait_for_animation(ring=-1):
while mode != "idle":
mainloop(None, ring, preview=show)
if __name__ == "__main__":
init()
cap = cv2.VideoCapture('output.mp4')
import matplotlib.pyplot as plt
"""cap = cv2.VideoCapture('badapple.mp4')
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
mapimage(frame)
mapimage(frame, fps=30)"""
show = True
ring = 1
startup_animation(show)
for x in range(54):
ringstatus[x][0] = True
mainloop(None, preview=show)
for x in range(animation_time):
mainloop(None, preview=show)
do_animation("GrabA", 1)
time.sleep(1)
do_animation("GrabA", 5)
start_animation("GrabC", 1)
wait_for_animation(1)
do_animation("GrabC", 5)
close()
#sys.exit(0)
#sys.exit(0)
# blue : default
# green : target
# yellow : crosshair
# red : missing
# uninitialized : red/purple?

BIN
map.png

Binary file not shown.

Before

(image error) Size: 381 KiB

After

(image error) Size: 372 KiB

BIN
map2.png Normal file

Binary file not shown.

After

(image error) Size: 184 KiB

BIN
map3.png Normal file

Binary file not shown.

After

(image error) Size: 44 KiB

@ -2,6 +2,8 @@
# Parse Belden catalog techdata datasheets
import pandas as pd
pd.set_option('future.no_silent_downcasting', True)
from PyPDF2 import PdfReader
import camelot
import numpy as np
@ -11,6 +13,11 @@ import json
from util import fprint
import uuid
from util import run_cmd
import os
def touch(path):
with open(path, 'a'):
os.utime(path, None)
def parse(filename, output_dir, partnum, dstype):
@ -23,6 +30,7 @@ def parse(filename, output_dir, partnum, dstype):
reader = PdfReader(filename)
page = reader.pages[0]
table_list = {}
for table in tables:
table.df.infer_objects(copy=False)
table.df.replace('', np.nan, inplace=True)
@ -90,6 +98,7 @@ def parse(filename, output_dir, partnum, dstype):
# Table parsing and reordring
tables = dict()
torename = dict()
previous_table = ""
for table_name in table_list.keys():
# determine shape: horizontal or vertical
@ -121,7 +130,8 @@ def parse(filename, output_dir, partnum, dstype):
for table_name_2 in table_list.keys():
if table_name_2.find(table.iloc[-1, 0]) >= 0:
# Name taken from table directly above - this table does not have a name
table_list["Specs " + str(len(tables))] = table_list.pop(table_name_2, None) # rename table to arbitrary altername name
torename[table_name_2] = "Specs " + str(len(tables))
#table_list["Specs " + str(len(tables))] = table_list[table_name_2] # rename table to arbitrary altername name
break
if vertical:
@ -142,21 +152,21 @@ def parse(filename, output_dir, partnum, dstype):
# multi-page table check
if dstype == "Belden":
if table_name.isdigit() and len(tables) > 1:
fprint(table_name)
fprint(previous_table)
#fprint(table_name)
#fprint(previous_table)
main_key = previous_table
cont_key = table_name
fprint(tables)
#fprint(tables)
if vertical == False:
main_keys = list(tables[main_key].keys())
for i, (cont_key, cont_values) in enumerate(tables[cont_key].items()):
if i < len(main_keys):
fprint(tables[main_key][main_keys[i]])
tables[main_key][main_keys[i]] = (tables[main_key][main_keys[i]] + (cont_key,) + cont_values)
#fprint(tables[main_key][main_keys[i]])
tables[main_key][main_keys[i]] = (tuple(tables[main_key][main_keys[i]]) + (cont_key,) + cont_values)
del tables[table_name]
@ -167,6 +177,10 @@ def parse(filename, output_dir, partnum, dstype):
previous_table = table_name
# remove renamed tables
for table_name in torename.keys():
tables[torename[table_name]] = tables[table_name]
del tables[table_name]
# remove multi-line values that occasionally squeak through
def replace_newlines_in_dict(d):
for key, value in d.items():
@ -195,12 +209,12 @@ def parse(filename, output_dir, partnum, dstype):
print(output_table)
#print(output_table)
run_cmd("rm " + output_dir + "/*.json") # not reliable!
run_cmd("rm \"" + output_dir + "\"/*.json") # not reliable!
with open(output_dir + "/" + output_table["searchspecs"]["id"] + ".json", 'w') as json_file:
json.dump(output_table["searchspecs"], json_file)
touch(output_dir + "/parsed")
return output_table
@ -217,7 +231,7 @@ def flatten(tables):
# If it fails again, return the original string.
return s
out = dict()
print("{")
#print("{")
for table in tables.keys():
for key in tables[table].keys():
if len(key) < 64:
@ -228,13 +242,24 @@ def flatten(tables):
fullkeyname = (table + ": " + keyname).replace(".","")
if type(tables[table][key]) is not tuple:
out[fullkeyname] = convert_to_number(tables[table][key])
print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
elif len(tables[table][key]) == 1:
out[fullkeyname] = convert_to_number(tables[table][key][0])
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
# if the item has at least two commas in it, split it
if tables[table][key].count(',') > 0:
out[fullkeyname] = list(map(lambda x: x.strip(), tables[table][key].split(",")))
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
# if the item has at least two commas in it, split it
if tables[table][key].count(',') > 0:
out[fullkeyname] = list(map(lambda x: x.strip(), tables[table][key].split(",")))
print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
print("}")
#print("}")
return out

@ -5,7 +5,7 @@ pypdf2==2.12.1
alive-progress
requests
git+https://github.com/Byeongdulee/python-urx.git
psycopg2-binary
meilisearch
pyyaml
Flask
selenium

26
run.py

@ -89,6 +89,9 @@ def start_server_socket():
while True:
#print("HI")
# Handeling Server Requests Loop, will run forever
if not from_server_queue.empty():
client_id, message = from_server_queue.get()
fprint(f"Message from client {client_id}: {message}")
@ -113,7 +116,7 @@ def start_server_socket():
case "log":
fprint("log message")
if call == "send":
fprint("webapp: " + data)
fprint("webapp: " + str(data), sendqueue=to_server_queue)
elif call == "request":
fprint("")
@ -124,6 +127,21 @@ def start_server_socket():
elif call == "request":
fprint("")
case "ping":
fprint("Pong!!!")
# Lucas' notes
# Add a ping pong :) response/handler
# Add a get cable response/handler
# this will tell the robot arm to move
# Call for turning off everything
# TODO Helper for converting Python Dictionaries to JSON
# make function: pythonData --> { { "type": "...", "call": "...", "data": pythonData } }
# to send: to_server_queue.put(("*", "JSON STRING HERE")) # replace * with UUID of client to send to one specific location
case "cable_details":
fprint("cable_details message")
if call == "send":
@ -235,7 +253,7 @@ def setup_server(pool):
if camera_ready is False:
fprint("waiting for " + "Camera initilization" + " to complete...", sendqueue=to_server_queue)
camera = process_video.qr_reader(config["cameras"]["banner"]["ip"], config["cameras"]["banner"]["port"])
# camera = process_video.qr_reader(config["cameras"]["banner"]["ip"], config["cameras"]["banner"]["port"])
fprint("Camera initialized.", sendqueue=to_server_queue)
@ -262,8 +280,8 @@ def mainloop_server(pool):
killall()
counter = counter + 1
fprint("Looking for QR code...")
print(camera.read_qr(30))
# fprint("Looking for QR code...")
# print(camera.read_qr(30))
def run_loading_app():

117
search.py Normal file

@ -0,0 +1,117 @@
"""Interactions with the Meilisearch API for adding and searching cables."""
from meilisearch import Client
from meilisearch.task import TaskInfo
from meilisearch.errors import MeilisearchApiError
import json
DEFAULT_URL = "http://localhost:7700"
DEFAULT_APIKEY = "fluffybunnyrabbit" # I WOULD RECOMMEND SOMETHING MORE SECURE
DEFAULT_INDEX = "cables"
DEFAULT_FILTERABLE_ATTRS = ["partnum", "uuid", "position"] # default filterable attributes
class JukeboxSearch:
"""Class for interacting with the Meilisearch API."""
def __init__(self,
url: str = None,
api_key: str = None,
index: str = None,
filterable_attrs: list = None):
"""Connect to Meilisearch and perform first-run tasks as necessary.
:param url: Address of the Meilisearch server. Defaults to ``http://localhost:7700`` if unspecified.
:param api_key: API key used to authenticate with Meilisearch. It is highly recommended to set this as something
secure if you can access this endpoint publicly, but you can ignore this and set Meilisearch's default API key
to ``fluffybunnyrabbit``.
:param index: The name of the index to configure. Defaults to ``cables`` if unspecified.
:param filterable_attrs: List of all the attributes we want to filter by."""
# connect to Meilisearch
url = url or DEFAULT_URL
api_key = api_key or DEFAULT_APIKEY
filterable_attrs = filterable_attrs or DEFAULT_FILTERABLE_ATTRS
self.index = index or DEFAULT_INDEX
self.client = Client(url, api_key)
# create the index if it does not exist already
try:
self.client.get_index(self.index)
except MeilisearchApiError as _:
self.client.create_index(self.index)
# make a variable to easily reference the index
self.idxref = self.client.index(self.index)
# update filterable attributes if needed
self.update_filterables(filterable_attrs)
def add_document(self, document: dict) -> TaskInfo:
"""Add a cable to the Meilisearch index.
:param document: Dictionary containing all the cable data.
:returns: A TaskInfo object for the addition of the new document."""
return self.idxref.add_documents(document)
def add_documents(self, documents: list):
"""Add a list of cables to the Meilisearch index.
:param documents: List of dictionaries containing all the cable data.
:returns: A TaskInfo object for the last new document."""
taskinfo = None
for i in documents:
taskinfo = self.add_document(i)
return taskinfo
def update_filterables(self, filterables: list):
"""Update filterable attributes and wait for database to fully index. If the filterable attributes matches the
current attributes in the database, don't update (saves reindexing).
:param filterables: List of all filterable attributes"""
existing_filterables = self.idxref.get_filterable_attributes()
if len(set(existing_filterables).difference(set(filterables))) > 0:
taskref = self.idxref.update_filterable_attributes(filterables)
self.client.wait_for_task(taskref.index_uid)
def search(self, query: str, filters: str = None):
"""Execute a search query on the Meilisearch index.
:param query: Seach query
:param filters: A meilisearch compatible filter statement.
:returns: The search results dict. Actual results are in a list under "hits", but there are other nice values that are useful in the root element."""
if filters:
q = self.idxref.search(query, {"filter": filters})
else:
q = self.idxref.search(query)
return q
def _filter_one(self, filter: str):
"""Get the first item to match a filter.
:param filter: A meilisearch compatible filter statement.
:returns: A dict containing the results; If no results found, an empty dict."""
q = self.search("", filter)
if q["estimatedTotalHits"] != 0:
return ["hits"][0]
else:
return dict()
def get_position(self, position: str):
"""Get a part by position.
:param partnum: The position to search for."""
return self._filter_one(f"position = {position}")
def get_uuid(self, uuid: str):
"""Get a specific UUID.
:param uuid: The UUID to search for."""
return self._filter_one(f"uuid = {uuid}")
def get_partnum(self, partnum: str):
"""Get a specific part number.
:param partnum: The part number to search for."""
return self._filter_one(f"partnum = {partnum}")
# entrypoint
if __name__ == "__main__":
jbs = JukeboxSearch()

1
source.fish Normal file

@ -0,0 +1 @@
source venv/bin/activate.fish

1
source.sh Normal file

@ -0,0 +1 @@
source venv/bin/activate

106
test.py Normal file

@ -0,0 +1,106 @@
print("\u001b[37m")
class Ring:
def __init__(self) -> None:
self.leds = [0] * 24
self.id = 0
self.dirty = False
def __iter__(self) -> iter:
yield from self.leds
def __repr__(self) -> str:
return f"Ring<id={self.id}, led_state={' '.join(list(map(lambda x: str(x+1), self.leds)))}, dirty={self.dirty}>"
def __add__(self, other):
self.leds.extend(other)
return self
def __bool__(self):
return self.dirty
def __getitem__(self, index):
return self.leds[index]
def __setitem__(self, index, value):
ivalue = self.leds[index]
if ivalue != value:
self.dirty = True
self.leds[index] = value
def __getattr__(self, name):
import word2num
name = int(word2num.word2num(name))
print(name)
if 0 <= name < len(self.leds):
return self.leds[name]
a = Ring()
print(a)
b = Ring()
b.leds[2] = 3
print(a + b)
b.active = True
if b:
print("Bexist")
c = [a, b, b, a, a]
d = list(filter(lambda x: bool(x), c))
print(d)
for i, ring in enumerate(c):
ring[0] = i
print(ring)
print(a, b)
print(f"\u001b[32m{a}")
print(f"\u001b[37ma")
print(getattr(a, "twenty two"))
# eval(f"getattr(a,\"{input()}\")")
# a = r"wow this string is cursed; for example \n"
# SEARCHDATA=r"""{ "q": "{QUERY}", "sortCriteria": "relevancy", "numberOfResults": "250", "sortCriteria": "@catalogitemwebdisplaypriority ascending", "searchHub": "products-only-search", "pipeline": "Site Search", "maximumAge": "900000", "tab": "products-search", "locale": "en", "aq": "(NOT @z95xtemplate==(ADB6CA4F03EF4F47B9AC9CE2BA53FF97,FE5DD82648C6436DB87A7C4210C7413B)) ((@syssource==\"website_001002_catalog_index-rg-nc-prod-sitecore-prod\" @catalogitemprimarycategorypublished==true)) ((@catalogitemregionavailable=Global) (@z95xlanguage==en))", "cq": "((@z95xlanguage==en) (@z95xlatestversion==1) (@source==\"Coveo_web_index - rg-nc-prod-sitecore-prod\")) OR (@source==(\"website_001002_catalog_index-rg-nc-prod-sitecore-prod\",\"website_001002_Category_index-rg-nc-prod-sitecore-prod\"))", "firstResult": "0" }, "categoryFacets": "[{\"field\":\"@catalogitemcategories\",\"path\":[],\"injectionDepth\":1000,\"maximumNumberOfValues\":6,\"delimitingCharacter\":\"|\"}]", "facetOptions": "{}", "groupBy": " [{\"field\":\"@contenttype\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[\"Products\"],\"queryOverride\":\"{QUERY}\",\"advancedQueryOverride\":\"(NOT @z95xtemplate==(ADB6CA4F03EF4F47B9AC9CE2BA53FF97,FE5DD82648C6436DB87A7C4210C7413B)) ((((((((@z95xpath=3324AF2D58F64C0FB725521052F679D2 @z95xid<>3324AF2D58F64C0FB725521052F679D2) ((@z95xpath=C292F3A37B3A4E6BAB345DF87ADDE516 @z95xid<>C292F3A37B3A4E6BAB345DF87ADDE516) @z95xtemplate==E4EFEB787BDC4B1A908EFC64D56CB2A4)) OR ((@z95xpath=723501A864754FEEB8AE377E4C710271 @z95xid<>723501A864754FEEB8AE377E4C710271) ((@z95xpath=600114EAB0E5407A84AAA9F0985B6575 @z95xid<>600114EAB0E5407A84AAA9F0985B6575) @z95xtemplate==2BE4FD6B3B2C49EBBD9E1F6C92238B05))) OR (@syssource==\\"website_001002_catalog_index-rg-nc-prod-sitecore-prod\\" @catalogitemprimarycategorypublished==true)) OR ((@z95xpath=3324AF2D58F64C0FB725521052F679D2 @z95xid<>3324AF2D58F64C0FB725521052F679D2) @z95xpath<>C292F3A37B3A4E6BAB345DF87ADDE516)) OR @syssource==\\"website_001002_Category_index-rg-nc-prod-sitecore-prod\\") NOT @z95xtemplate==(ADB6CA4F03EF4F47B9AC9CE2BA53FF97,FE5DD82648C6436DB87A7C4210C7413B))) ((@catalogitemregionavailable=Global) (@z95xlanguage==en) OR (@contenttype=(Blogs,Resources,Other)) (NOT @ez120xcludefromcoveo==1))\",\"constantQueryOverride\":\"((@z95xlanguage==en) (@z95xlatestversion==1) (@source==\\"Coveo_web_index - rg-nc-prod-sitecore-prod\\")) OR (@source==(\\"website_001002_catalog_index-rg-nc-prod-sitecore-prod\\",\\"website_001002_Category_index-rg-nc-prod-sitecore-prod\\"))\"},{\"field\":\"@catalogitembrand\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@catalogitemenvironment\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@catalogitemregionalavailability\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@prez45xtez120xt\",\"maximumNumberOfValues\":5,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@tags\",\"maximumNumberOfValues\":4,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@facetassettype\",\"maximumNumberOfValues\":3,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@facetbrand\",\"maximumNumberOfValues\":3,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@facetmarket\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@facetsolution\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]},{\"field\":\"@facetsearchcontentpagetype\",\"maximumNumberOfValues\":6,\"sortCriteria\":\"occurrences\",\"injectionDepth\":1000,\"completeFacetWithStandardValues\":true,\"allowedValues\":[]}]" }"""
# QUERY = "AAAAAAAAAAAA"
# b = SEARCHDATA.replace(r"{QUERY}", QUERY)
q = [i * 2 for i in range(10)]
d = {a : b for a,b in enumerate(q)}
print(q)
print(d)
def stalin_sort(a):
b = sum(a)
b /= len(a)
return [b for _ in range(len(a))]
def mao_sort(a):
i = 0
while i < len(a) - 1:
if a[i+1] < a[i]:
del a[i]
else:
i += 1
return a
print(stalin_sort(list(range(10))))
print(mao_sort([1, 3, 2, 4, 5, 8, 7, 6, 9]))
# i l

@ -1,6 +1,7 @@
#!/bin/sh
# change this to #!/bin/bash for windows
python -m venv ./venv
python3 -m venv ./venv
source ./venv/bin/activate
pip install --upgrade pip

@ -1,15 +1,61 @@
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Test</title>
<style>
footer {
background-color: #333;
color: #fff;
text-align: center;
padding: 10px 0;
position: fixed;
bottom: 0;
width: 100%;
display: flex;
justify-content: space-around;
}
.service-box {
width: 150px;
padding: 10px;
border-radius: 5px;
}
.service-up {
background-color: green;
}
.service-down {
background-color: red;
}
</style>
<script>
document.addEventListener("DOMContentLoaded", function() {
// class Service {
// constructor(name, status) {
// this.name = name
// this.status = status
// }
// }
var updatedTime = new Date();
// Initial status of services
// var serviceA = new Service("234234", 'down');
// var serviceBStatus = 'down';
// var serviceCStatus = 'down';
document.addEventListener("DOMContentLoaded", function () {
// Create WebSocket connection.
const socket = new WebSocket('ws://localhost:9000');
// Connection opened
socket.addEventListener('open', function (event) {
console.log("Connected to WebSocket server");
updatedTime = new Date();
});
// Listen for messages
@ -19,6 +65,7 @@
let message = document.createElement('li');
message.textContent = "Received: " + event.data;
messages.appendChild(message);
updatedTime = new Date();
});
// Send a message to the server
@ -28,15 +75,82 @@
console.log('Message sent', message);
}
// This function just sends a ping to make sure the server is there and it is able to responds
function ping() {
let message = `{ "call": "send", "type": "log", "data": "This is a ping!!" }`;
socket.send(message);
console.log('Message sent', message);
}
setInterval(ping, 1500);
// setInterval(() => {
// updateServiceStatus('serviceA', 'down');
// }, 2000);
// Bind send message function to button click
document.getElementById('sendMessage').addEventListener('click', sendMessage);
});
</script>
</head>
<body>
<h2>WebSocket Test</h2>
<input type="text" id="messageInput" placeholder="Type a message...">
<textarea rows="4" cols="50" id="messageInput" placeholder="Type a message..."> </textarea>
<button id="sendMessage">Send Message</button>
<p>Example JSON</p>
<p>{ "type": "cable_map", "call": "request", "data": { } }</p>
<p>{ "type": "log", "call": "send", "data": "123123" }</p>
<p>Messages/Logs</p>
<ul id="messages"></ul>
<footer>
<!-- <div id="serviceA" class="service-box"></div>
<div id="serviceB" class="service-box"></div>
<div id="serviceC" class="service-box"></div> -->
<div id="clock"></div>
</footer>
<script>
// // Function to update service status
// function updateServiceStatus(service) {
// // serviceId, status
// var serviceElement = document.getElementById(service.serviceId);
// // updateClock();
// if (service.status === 'up') {
// serviceElement.innerHTML = '<h3>' + service.serviceId + '</h3><p>Running</p>';
// serviceElement.classList.remove('service-down');
// serviceElement.classList.add('service-up');
// } else {
// serviceElement.innerHTML = '<h3>' + service.serviceId + '</h3><p>Down</p>';
// serviceElement.classList.remove('service-up');
// serviceElement.classList.add('service-down');
// }
// }
// // Update service statuses
// updateServiceStatus('node.js (for this page)', serviceAStatus);
// updateServiceStatus('Python WebSocket', serviceBStatus);
// updateServiceStatus('serviceC', serviceCStatus);
// Function to update clock
function updateClock() {
var now = new Date();
now = now.getTime() - updatedTime.getTime();
// console.log(now)
document.getElementById('clock').textContent = 'Milliseconds Since Update: ' + now.toString().padStart(6, '0');
}
// Update clock every second
setInterval(updateClock, 100);
</script>
</body>
</html>
</html>