Compare commits

...

13 Commits

23 changed files with 2327 additions and 38 deletions

8
.gitignore vendored
View File

@ -1,3 +1,9 @@
venv venv
__pycache__ __pycache__
cables cables
.vscode
output.log
*.webm
output.mp4
output.log
cables-sample.zip

BIN
badapple.mp4 Normal file

Binary file not shown.

166
banner_ivu_export.py Executable file
View File

@ -0,0 +1,166 @@
#!/usr/bin/env python3
import socket
from datetime import datetime
from time import sleep
from util import fprint
"""
from: https://github.com/MoisesBrito31/ve_data_log/blob/main/serverContagem/VE/drive.py
(no license)
(partially) adapted to English language & iVu camera instead of classic VE by Cole Deck
"""
def gravaLog(ip="1",tipo="Evento", msg="", file="log_imagem.txt"):
# removed full logging
fprint(msg)
class DriveImg():
HEADERSIZE = 100
ip = "192.168.0.1"
port = 32200
onLine = False
def __init__(self, ip, port, pasta = "media/"):
self.pasta = pasta
self.ip=ip
self.port = port
self.trans = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.trans.settimeout(5)
fprint("Trying to connect...")
try:
self.trans.connect((self.ip,self.port))
self.onLine = True
fprint("Camera Online")
#self.trans.close()
except:
self.onLine = False
fprint("Offline")
def read_img(self):
resposta = 'Falha'
try:
if not self.onLine:
#print(f'tentando Conectar camera {self.ip}...')
gravaLog(ip=self.ip,msg=f'Trying to connect...')
sleep(2)
try:
self.trans = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.trans.connect((self.ip,self.PORT))
self.onLine = True
gravaLog(ip=self.ip,msg=f'Connection established.')
except:
self.onLine = False
self.trans.close()
return resposta
ret = self.trans.recv(64)
try:
valida = str(ret[0:15].decode('UTF-8'))
#print(valida)
if valida.find("TC IMAGE")<0:
self.onLine = False
self.trans.close()
sleep(2)
gravaLog(ip=self.ip,tipo="Falha",msg=f'Unable to find TC IMAGE bookmark')
return "Error"
except Exception as ex:
self.onLine = False
self.trans.close()
sleep(2)
gravaLog(ip=self.ip,tipo="Falha",msg=f'Error - {str(ex)}')
return "Error"
if ret:
frame = int.from_bytes(ret[24:27],"little")
isJpeg = int.from_bytes(ret[32:33],"little")
img_size = int.from_bytes(ret[20:23],"little")
data = self.trans.recv(5000)
while img_size>len(data) and ret:
ret = self.trans.recv(10000)
if ret:
data = data+ret
#print(f'{len(ret)}b dados recebidos, total de: {len(data)+64}b')
else:
gravaLog(ip=self.ip,tipo="Falha",msg="Unable to recieve the image")
self.onLine = False
return "Unable to recieve the image"
hoje = datetime.now()
idcam = self.ip.split('.')
"""try:
nomeFile = f'{hoje.day}{hoje.month}{hoje.year}-{idcam[3]}-{frame}'
if isJpeg==1:
file = open(f'{self.pasta}{nomeFile}.jpg','wb')
nomeFile = f'{nomeFile}.jpg'
else:
file = open(f'{self.pasta}{nomeFile}.bmp','wb')
nomeFile = f'{nomeFile}.bmp'
file.write(data)
file.close()
except Exception as ex:
sleep(2)
gravaLog(ip=self.ip,tipo="Falha",msg=f'Error - {str(ex)}')
return "Falha"
"""
if isJpeg==1:
return "jpeg",data
else:
return "bmp",data
except Exception as ex:
gravaLog(ip=self.ip,tipo="Falha Generica",msg=f'Error - {str(ex)}')
#print(f'erro {str(ex)}')
self.onLine = False
self.trans.close()
sleep(2)
return resposta
class DriveData():
HEADERSIZE = 100
ip = "192.168.0.1"
port = 32100
onLine = False
def __init__(self, ip, port):
gravaLog(ip=self.ip,msg=f'iniciou drive',file="log_data.txt")
self.ip=ip
self.port = port
self.trans = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
try:
self.trans.connect((self.ip,self.port))
self.onLine = True
except:
self.onLine = False
def read_data(self):
resposta = 'falha'
try:
if not self.onLine:
#print(f'tentando Conectar...\n')
gravaLog(ip=self.ip,msg=f'tentando Conectar...',file="log_data.txt")
sleep(2)
try:
self.trans = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.trans.connect((self.ip,self.PORT))
self.onLine = True
gravaLog(ip=self.ip,msg=f'Conexão restabelecida...',file="log_data.txt")
except:
self.onLine = False
return resposta
resposta = self.trans.recv(self.HEADERSIZE).decode("utf-8")
resposta = str(resposta).split(',')
return resposta
except Exception as ex:
self.onLine = False
gravaLog(ip=self.ip,tipo="Falha Generica",msg=f'erro {str(ex)}',file="log_data.txt")
sleep(2)
return resposta
if __name__ == "__main__":
test = DriveImg("192.168.1.125", 32200)
x = 0
while x < 100:
x=x+1
imgtype, img = test.read_img()

425
config.yml Normal file
View File

@ -0,0 +1,425 @@
core:
mode: linuxserver
serverip: 172.26.178.114
clientip: 172.26.176.1
server: Hyper-Vd
arm:
ip: 192.168.1.145
#cable_map:
cameras:
banner:
ip: 192.168.1.125
port: 32200
led:
fps: 90
timeout: 0
controllers:
- universe: 9
ip: 192.168.68.131
ledstart: 0
ledend: 143
mode: rgb
- universe: 3
ip: 192.168.68.131
ledstart: 144
ledend: 287
mode: rgb
- universe: 2
ip: 192.168.68.131
ledstart: 288
ledend: 431
mode: rgb
- universe: 1
ip: 192.168.68.130
ledstart: 432
ledend: 575
mode: rgb
- universe: 4
ip: 192.168.68.131
ledstart: 576
ledend: 719
mode: rgb
- universe: 5
ip: 192.168.68.131
ledstart: 720
ledend: 863
mode: rgb
- universe: 6
ip: 192.168.68.131
ledstart: 864
ledend: 1007
mode: rgb
- universe: 7
ip: 192.168.68.131
ledstart: 1008
ledend: 1151
mode: rgb
- universe: 8
ip: 192.168.68.131
ledstart: 1152
ledend: 1295
mode: rgb
- universe: 0
ip: 192.168.68.130
ledstart: 1296
ledend: 1365
mode: rgbw
map:
# total for 54x rings: 1296 LEDs (0-1295), 24 ea
# controller 1
- type: circle
start: 0
size: 24
diameter: 63.5
angle: 0
pos: [0, 304.8]
- type: circle
start: 24
size: 24
diameter: 63.5
angle: 0
pos: [-65.991, 266.7]
- type: circle
start: 48
size: 24
diameter: 63.5
angle: 0
pos: [-131.982, 228.6]
- type: circle
start: 72
size: 24
diameter: 63.5
angle: 0
pos: [-197.973, 190.5]
- type: circle
start: 96
size: 24
diameter: 63.5
angle: 0
pos: [-263.965, 152.4]
- type: circle
start: 120
size: 24
diameter: 63.5
angle: 0
pos: [-263.965, 76.2]
# controller 2
- type: circle
start: 144
size: 24
diameter: 63.5
angle: 0
pos: [0, 228.6]
- type: circle
start: 168
size: 24
diameter: 63.5
angle: 0
pos: [-65.991, 190.5]
- type: circle
start: 192
size: 24
diameter: 63.5
angle: 0
pos: [-131.982, 152.4]
- type: circle
start: 216
size: 24
diameter: 63.5
angle: 0
pos: [-197.973, 114.3]
- type: circle
start: 240
size: 24
diameter: 63.5
angle: 0
pos: [-197.973, 38.1]
- type: circle
start: 264
size: 24
diameter: 63.5
angle: 0
pos: [-263.965, 0]
# controller 3
- type: circle
start: 288
size: 24
diameter: 63.5
angle: 0
pos: [0, 152.4]
- type: circle
start: 312
size: 24
diameter: 63.5
angle: 0
pos: [-65.936, 114.3]
- type: circle
start: 336
size: 24
diameter: 63.5
angle: 0
pos: [-131.982, 76.2]
- type: circle
start: 360
size: 24
diameter: 63.5
angle: 0
pos: [-131.982, 0]
- type: circle
start: 384
size: 24
diameter: 63.5
angle: 0
pos: [-197.973, -38.1]
- type: circle
start: 408
size: 24
diameter: 63.5
angle: 0
pos: [-263.965, -76.2]
# controller 4
- type: circle
start: 432
size: 24
diameter: 63.5
angle: 0
pos: [131.982, 76.2]
- type: circle
start: 456
size: 24
diameter: 63.5
angle: 0
pos: [131.982, 152.4]
- type: circle
start: 480
size: 24
diameter: 63.5
angle: 0
pos: [131.982, 228.6]
- type: circle
start: 504
size: 24
diameter: 63.5
angle: 0
pos: [65.991, 266.7]
- type: circle
start: 528
size: 24
diameter: 63.5
angle: 0
pos: [65.991, 190.5]
- type: circle
start: 552
size: 24
diameter: 63.5
angle: 0
pos: [65.991, 114.3]
# controller 5
- type: circle
start: 576
size: 24
diameter: 63.5
angle: 0
pos: [131.982, 0]
- type: circle
start: 600
size: 24
diameter: 63.5
angle: 0
pos: [197.973, 38.1]
- type: circle
start: 624
size: 24
diameter: 63.5
angle: 0
pos: [197.973, 114.3]
- type: circle
start: 648
size: 24
diameter: 63.5
angle: 0
pos: [197.973, 190.5]
- type: circle
start: 672
size: 24
diameter: 63.5
angle: 0
pos: [263.965, 152.4]
- type: circle
start: 696
size: 24
diameter: 63.5
angle: 0
pos: [263.965, 76.2]
# controller 6
- type: circle
start: 720
size: 24
diameter: 63.5
angle: 0
pos: [131.982, -76.2]
- type: circle
start: 744
size: 24
diameter: 63.5
angle: 0
pos: [197.973, -38.1]
- type: circle
start: 768
size: 24
diameter: 63.5
angle: 0
pos: [263.965, 0]
- type: circle
start: 792
size: 24
diameter: 63.5
angle: 0
pos: [263.965, -76.2]
- type: circle
start: 816
size: 24
diameter: 63.5
angle: 0
pos: [263.965, -152.4]
- type: circle
start: 840
size: 24
diameter: 63.5
angle: 0
pos: [197.973, -114.3]
# controller 7
- type: circle
start: 864
size: 24
diameter: 63.5
angle: 0
pos: [65.991, -114.3]
- type: circle
start: 888
size: 24
diameter: 63.5
angle: 0
pos: [0, -152.4]
- type: circle
start: 912
size: 24
diameter: 63.5
angle: 0
pos: [-65.991, -114.3]
- type: circle
start: 936
size: 24
diameter: 63.5
angle: 0
pos: [-131.982, -76.2]
- type: circle
start: 960
size: 24
diameter: 63.5
angle: 0
pos: [-197.973, -114.3]
- type: circle
start: 984
size: 24
diameter: 63.5
angle: 0
pos: [-131.982, -152.4]
# controller 8
- type: circle
start: 1008
size: 24
diameter: 63.5
angle: 0
pos: [0, -228.6]
- type: circle
start: 1032
size: 24
diameter: 63.5
angle: 0
pos: [-65.991, -190.5]
- type: circle
start: 1056
size: 24
diameter: 63.5
angle: 0
pos: [-65.991, -266.7]
- type: circle
start: 1080
size: 24
diameter: 63.5
angle: 0
pos: [-131.982, -228.6]
- type: circle
start: 1104
size: 24
diameter: 63.5
angle: 0
pos: [-197.973, -190.5]
- type: circle
start: 1128
size: 24
diameter: 63.5
angle: 0
pos: [-263.965, -152.4]
# controller 9
- type: circle
start: 1152
size: 24
diameter: 63.5
angle: 0
pos: [0, -304.8]
- type: circle
start: 1176
size: 24
diameter: 63.5
angle: 0
pos: [65.991, -266.7]
- type: circle
start: 1200
size: 24
diameter: 63.5
angle: 0
pos: [131.982, -228.6]
- type: circle
start: 1224
size: 24
diameter: 63.5
angle: 0
pos: [197.973, -190.5]
- type: circle
start: 1248
size: 24
diameter: 63.5
angle: 0
pos: [131.982, -152.4]
- type: circle
start: 1272
size: 24
diameter: 63.5
angle: 0
pos: [65.991, -190.5]
# Strips
- type: strip
start: 1296
size: 70
length: 600
angle: 270 # down
pos: [300, 300]

11
download-vid.sh Executable file
View File

@ -0,0 +1,11 @@
#!/bin/bash
set -euxo pipefail
mkdir -p tmpdl
cd tmpdl
yt-dlp $1
ffmpeg -i * -vf "scale=800x480" -r $2 ../output.mp4
rm -rf tmpdl

View File

@ -8,6 +8,7 @@ import requests
#import time #import time
import json import json
import subprocess import subprocess
from util import fprint
bartext = "" bartext = ""
failed = [] failed = []
@ -33,16 +34,16 @@ def query_search(partnum):
search_url = "https://www.belden.com/coveo/rest/search" search_url = "https://www.belden.com/coveo/rest/search"
search_data ='{ "q": "' + str(partnum) + '", "sortCriteria": "relevancy", "numberOfResults": "250", "sortCriteria": "@catalogitemwebdisplaypriority ascending", "searchHub": "products-only-search", "pipeline": "Site Search", "maximumAge": "900000", "tab": "products-search", "locale": "en" }' search_data ='{ "q": "' + str(partnum) + '", "sortCriteria": "relevancy", "numberOfResults": "250", "sortCriteria": "@catalogitemwebdisplaypriority ascending", "searchHub": "products-only-search", "pipeline": "Site Search", "maximumAge": "900000", "tab": "products-search", "locale": "en" }'
#"aq": "", "cq": "((@z95xlanguage==en) (@z95xlatestversion==1) (@source==\\"Coveo_web_index - rg-nc-prod-sitecore-prod\\")) OR (@source==(\\"website_001002_catalog_index-rg-nc-prod-sitecore-prod\\",\\"website_001002_Category_index-rg-nc-prod-sitecore-prod\\"))", "firstResult": "0", "categoryFacets": "[{\\"field\\":\\"@catalogitemcategories\\",\\"path\\":[],\\"injectionDepth\\":1000,\\"maximumNumberOfValues\\":6,\\"delimitingCharacter\\":\\"|\\"}]", "facetOptions": "{}", "groupBy": "" }' #"aq": "", "cq": "((@z95xlanguage==en) (@z95xlatestversion==1) (@source==\\"Coveo_web_index - rg-nc-prod-sitecore-prod\\")) OR (@source==(\\"website_001002_catalog_index-rg-nc-prod-sitecore-prod\\",\\"website_001002_Category_index-rg-nc-prod-sitecore-prod\\"))", "firstResult": "0", "categoryFacets": "[{\\"field\\":\\"@catalogitemcategories\\",\\"path\\":[],\\"injectionDepth\\":1000,\\"maximumNumberOfValues\\":6,\\"delimitingCharacter\\":\\"|\\"}]", "facetOptions": "{}", "groupBy": "" }'
#print(search_data) #fprint(search_data)
print(json.loads(search_data)) fprint(json.loads(search_data))
#search_data = '{ "q": "' + str(partnum) + '" }' #search_data = '{ "q": "' + str(partnum) + '" }'
print(search_data) fprint(search_data)
headers = headers = { headers = headers = {
'Authorization': f'Bearer {token}', 'Authorization': f'Bearer {token}',
'Content-Type': 'application/json' 'Content-Type': 'application/json'
} }
with requests.post(search_url, headers=headers, data=search_data) as r: with requests.post(search_url, headers=headers, data=search_data) as r:
print(r.text)""" fprint(r.text)"""
# TODO: Reimplement in python # TODO: Reimplement in python
# Bash script uses some crazy json formatting that I could not figure out # Bash script uses some crazy json formatting that I could not figure out
@ -52,7 +53,7 @@ def query_search(partnum):
command = ["./query-search.sh", partnum] command = ["./query-search.sh", partnum]
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if result.returncode != 0: # error if result.returncode != 0: # error
print("No results found in search database for " + partnum + ". No hi-res part image available.", result.stderr) fprint("No results found in search database for " + partnum + ". No hi-res part image available.", result.stderr)
return False return False
else: else:
data_out = json.loads(result.stdout) data_out = json.loads(result.stdout)
@ -72,7 +73,7 @@ def get_multi(partnums):
sanitized_name = partnum.replace(" ", "") sanitized_name = partnum.replace(" ", "")
url = "https://catalog.belden.com/techdata/EN/" + sanitized_name + "_techdata.pdf" url = "https://catalog.belden.com/techdata/EN/" + sanitized_name + "_techdata.pdf"
#print(url) #fprint(url)
try: try:
with requests.get(url, stream=True) as r: with requests.get(url, stream=True) as r:
#r.raise_for_status() #r.raise_for_status()
@ -89,10 +90,10 @@ def get_multi(partnums):
bartext = bartext + "." bartext = bartext + "."
bar.text = bartext bar.text = bartext
f.write(chunk) f.write(chunk)
#print("") #fprint("")
return output_dir + "/datasheet.pdf" return output_dir + "/datasheet.pdf"
except KeyboardInterrupt: except KeyboardInterrupt:
print("Quitting!") fprint("Quitting!")
os.remove(output_dir + "/datasheet.pdf") os.remove(output_dir + "/datasheet.pdf")
sys.exit() sys.exit()
@ -100,7 +101,7 @@ def get_multi(partnums):
def _download_datasheet(url, output_dir): # Download datasheet with known URL def _download_datasheet(url, output_dir): # Download datasheet with known URL
global bartext global bartext
#print(url) #fprint(url)
try: try:
with requests.get(url, stream=True) as r: with requests.get(url, stream=True) as r:
#r.raise_for_status() #r.raise_for_status()
@ -117,10 +118,10 @@ def get_multi(partnums):
bartext = bartext + "." bartext = bartext + "."
bar.text = bartext bar.text = bartext
f.write(chunk) f.write(chunk)
#print("") #fprint("")
return output_dir + "/datasheet.pdf" return output_dir + "/datasheet.pdf"
except KeyboardInterrupt: except KeyboardInterrupt:
print("Quitting!") fprint("Quitting!")
os.remove(output_dir + "/datasheet.pdf") os.remove(output_dir + "/datasheet.pdf")
sys.exit() sys.exit()
@ -128,7 +129,7 @@ def get_multi(partnums):
def _download_image(url, output_dir): # Download datasheet with known URL def _download_image(url, output_dir): # Download datasheet with known URL
global bartext global bartext
#print(url) #fprint(url)
try: try:
with requests.get(url, stream=True) as r: with requests.get(url, stream=True) as r:
#r.raise_for_status() #r.raise_for_status()
@ -143,27 +144,27 @@ def get_multi(partnums):
bartext = bartext + "." bartext = bartext + "."
bar.text = bartext bar.text = bartext
f.write(chunk) f.write(chunk)
#print("") #fprint("")
return output_dir + "/part-hires." + url.split(".")[-1] return output_dir + "/part-hires." + url.split(".")[-1]
except KeyboardInterrupt: except KeyboardInterrupt:
print("Quitting!") fprint("Quitting!")
os.remove(partnum + "/datasheet.pdf") os.remove(partnum + "/datasheet.pdf")
sys.exit() sys.exit()
def __use_cached_datasheet(partnum, path, output_dir): def __use_cached_datasheet(partnum, path, output_dir):
print("Using cached datasheet for " + partnum, end='') fprint("Using cached datasheet for " + partnum)
bar.text = "Using cached datasheet for " + partnum bar.text = "Using cached datasheet for " + partnum
bar(skipped=True) bar(skipped=True)
print("Parsing Datasheet contents of " + partnum, end='') fprint("Parsing Datasheet contents of " + partnum)
bar.text = "Parsing Datasheet contents of " + partnum + ".pdf..." bar.text = "Parsing Datasheet contents of " + partnum + ".pdf..."
read_datasheet.parse(path, output_dir) read_datasheet.parse(path, output_dir)
bar(skipped=False) bar(skipped=False)
def __downloaded_datasheet(partnum, path, output_dir): def __downloaded_datasheet(partnum, path, output_dir):
print("Downloaded " + path, end='') fprint("Downloaded " + path)
bar.text = "Downloaded " + path bar.text = "Downloaded " + path
bar(skipped=False) bar(skipped=False)
print("Parsing Datasheet contents of " + partnum, end='') fprint("Parsing Datasheet contents of " + partnum)
bar.text = "Parsing Datasheet contents of " + partnum + ".pdf..." bar.text = "Parsing Datasheet contents of " + partnum + ".pdf..."
read_datasheet.parse(path, output_dir) read_datasheet.parse(path, output_dir)
bar(skipped=False) bar(skipped=False)
@ -182,10 +183,10 @@ def get_multi(partnums):
# Download high resolution part image if available and needed # Download high resolution part image if available and needed
if not os.path.exists(output_dir + "/found_part_hires"): if not os.path.exists(output_dir + "/found_part_hires"):
if _download_image(search_result["image"], output_dir): if _download_image(search_result["image"], output_dir):
print("Downloaded hi-res part image for " + partnum) fprint("Downloaded hi-res part image for " + partnum)
touch(output_dir + "/found_part_hires") touch(output_dir + "/found_part_hires")
else: else:
print("Using cached hi-res part image for " + partnum) fprint("Using cached hi-res part image for " + partnum)
# Download datasheet from provided URL if needed # Download datasheet from provided URL if needed
if os.path.exists(path) and os.path.getsize(path) > 1: if os.path.exists(path) and os.path.getsize(path) > 1:
@ -203,7 +204,7 @@ def get_multi(partnums):
# Failed to download with search or guess :( # Failed to download with search or guess :(
else: else:
print("Failed to download datasheet for part " + partnum, end='') fprint("Failed to download datasheet for part " + partnum)
bar.text = "Failed to download datasheet for part " + partnum bar.text = "Failed to download datasheet for part " + partnum
failed.append(partnum) failed.append(partnum)
bar(skipped=True) bar(skipped=True)
@ -211,13 +212,13 @@ def get_multi(partnums):
# We already have a hi-res image and the datasheet - perfect! # We already have a hi-res image and the datasheet - perfect!
else: else:
print("Using cached hi-res part image for " + partnum) fprint("Using cached hi-res part image for " + partnum)
__use_cached_datasheet(partnum, path, output_dir) __use_cached_datasheet(partnum, path, output_dir)
if len(failed) > 0: if len(failed) > 0:
print("Failed to download:") fprint("Failed to download:")
for partnum in failed: for partnum in failed:
print(partnum) fprint(partnum)
return False # Go to manual review upload page return False # Go to manual review upload page
else: else:
return True # All cables downloaded; we are good to go return True # All cables downloaded; we are good to go
@ -238,7 +239,8 @@ if __name__ == "__main__":
"FDSD012A9", "FDSD012A9",
"FSSL024NG", "FSSL024NG",
"FISX006W0", "FISX006W0",
"FISX00103" "FISX00103",
"C6D1100007"
] ]
get_multi(partnums) get_multi(partnums)

1
index.html Normal file
View File

@ -0,0 +1 @@
<html> <head> <title>RGB Controller Configuration</title> <style> body { background-color: #cccccc; font-family: Arial, Helvetica, Sans-Serif; Color: #000088; } </style> </head> <body> <h1>RGB Controller Configuration</h1><br> <h2>Set IP address</h2> Needs reboot to apply<br> Set to 0.0.0.0 for DHCP <form method="post" enctype="application/x-www-form-urlencoded" action="/postform/"> <input type="text" name="ipa" value="0" size="3">. <input type="text" name="ipb" value="0" size="3">. <input type="text" name="ipc" value="0" size="3">. <input type="text" name="ipd" value="0" size="3"> <input type="submit" value="Set"> </form><br> <h2>Set Hostname</h2> Needs reboot to apply<br> Max 64 characters <form method="post" enctype="application/x-www-form-urlencoded" action="/postform/"> <input type="text" name="hostname" value="RGBController" size="20"> <input type="submit" value="Set"> </form><br> <h2>DMX512 Start Universe</h2> Applies immediately<br> Between (inclusive) 1-65000 <form method="post" enctype="application/x-www-form-urlencoded" action="/postform/"> <input type="text" name="universe" value="1" size="5"> <input type="submit" value="Set"> </form><br> <form method="post" enctype="application/x-www-form-urlencoded" action="/postform/"> <input type="submit" name="reboot" value="Reboot"> </form><br> </body></html>

94
keyboard-down.ps1 Normal file
View File

@ -0,0 +1,94 @@
# A script to toggle the Touch Keyboard of Windows 11,
# compatible with both Windows PowerShell and PowerShell 7.
# Based on code by @torvin (https://stackoverflow.com/users/332528/torvin): https://stackoverflow.com/a/40921638
# Based on code by @Andrea S. (https://stackoverflow.com/users/5887913/andrea-s): https://stackoverflow.com/a/55513524
# Warning: Relies on undocumented behaviour of the Windows Shell
# and may break with any update.
# Last tested on Windows 11 Home 22000.978.
Add-Type -ReferencedAssemblies $(if ($PSVersionTable.PSEdition -eq "Desktop") {"System.Drawing.dll"} else {$null}) -Language CSharp -TypeDefinition @'
using System;
using System.Diagnostics;
using System.Drawing;
using System.Runtime.InteropServices;
public class TouchKeyboardController
{
public static void ToggleTouchKeyboard()
{
try
{
UIHostNoLaunch uiHostNoLaunch = new UIHostNoLaunch();
((ITipInvocation)uiHostNoLaunch).Toggle(GetDesktopWindow());
Marshal.ReleaseComObject(uiHostNoLaunch);
}
catch (COMException exc)
{
if (exc.HResult == unchecked((int)0x80040154)) // REGDB_E_CLASSNOTREG
{
ProcessStartInfo processStartInfo = new ProcessStartInfo("TabTip.exe")
{
UseShellExecute = true
};
using (Process process = Process.Start(processStartInfo))
{
}
}
else
{
throw;
}
}
}
[ComImport, Guid("4ce576fa-83dc-4F88-951c-9d0782b4e376")]
class UIHostNoLaunch
{
}
[ComImport, Guid("37c994e7-432b-4834-a2f7-dce1f13b834b")]
[InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
interface ITipInvocation
{
void Toggle(IntPtr hwnd);
}
[DllImport("user32.dll", SetLastError = false)]
static extern IntPtr GetDesktopWindow();
public static bool IsInputPaneOpen()
{
FrameworkInputPane frameworkInputPane = new FrameworkInputPane();
Rectangle rect;
((IFrameworkInputPane)frameworkInputPane).Location(out rect);
Marshal.ReleaseComObject(frameworkInputPane);
return !rect.IsEmpty;
}
[ComImport, Guid("d5120aa3-46ba-44c5-822d-ca8092c1fc72")]
public class FrameworkInputPane
{
}
[ComImport, Guid("5752238b-24f0-495a-82f1-2fd593056796")]
[InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
public interface IFrameworkInputPane
{
int Advise([MarshalAs(UnmanagedType.IUnknown)] object pWindow, [MarshalAs(UnmanagedType.IUnknown)] object pHandler, out int pdwCookie);
int AdviseWithHWND(IntPtr hwnd, [MarshalAs(UnmanagedType.IUnknown)] object pHandler, out int pdwCookie);
int Unadvise(int pdwCookie);
int Location(out Rectangle prcInputPaneScreenLocation);
}
}
'@
# Toggle the Touch Keyboard regardless of whether it is currently shown or not.
#[TouchKeyboardController]::ToggleTouchKeyboard()
# Alternatively, if you only want to show the Touch Keyboard
# and not hide it if it is already active:
if ([TouchKeyboardController]::IsInputPaneOpen()) {
[TouchKeyboardController]::ToggleTouchKeyboard()
}

94
keyboard-up.ps1 Normal file
View File

@ -0,0 +1,94 @@
# A script to toggle the Touch Keyboard of Windows 11,
# compatible with both Windows PowerShell and PowerShell 7.
# Based on code by @torvin (https://stackoverflow.com/users/332528/torvin): https://stackoverflow.com/a/40921638
# Based on code by @Andrea S. (https://stackoverflow.com/users/5887913/andrea-s): https://stackoverflow.com/a/55513524
# Warning: Relies on undocumented behaviour of the Windows Shell
# and may break with any update.
# Last tested on Windows 11 Home 22000.978.
Add-Type -ReferencedAssemblies $(if ($PSVersionTable.PSEdition -eq "Desktop") {"System.Drawing.dll"} else {$null}) -Language CSharp -TypeDefinition @'
using System;
using System.Diagnostics;
using System.Drawing;
using System.Runtime.InteropServices;
public class TouchKeyboardController
{
public static void ToggleTouchKeyboard()
{
try
{
UIHostNoLaunch uiHostNoLaunch = new UIHostNoLaunch();
((ITipInvocation)uiHostNoLaunch).Toggle(GetDesktopWindow());
Marshal.ReleaseComObject(uiHostNoLaunch);
}
catch (COMException exc)
{
if (exc.HResult == unchecked((int)0x80040154)) // REGDB_E_CLASSNOTREG
{
ProcessStartInfo processStartInfo = new ProcessStartInfo("TabTip.exe")
{
UseShellExecute = true
};
using (Process process = Process.Start(processStartInfo))
{
}
}
else
{
throw;
}
}
}
[ComImport, Guid("4ce576fa-83dc-4F88-951c-9d0782b4e376")]
class UIHostNoLaunch
{
}
[ComImport, Guid("37c994e7-432b-4834-a2f7-dce1f13b834b")]
[InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
interface ITipInvocation
{
void Toggle(IntPtr hwnd);
}
[DllImport("user32.dll", SetLastError = false)]
static extern IntPtr GetDesktopWindow();
public static bool IsInputPaneOpen()
{
FrameworkInputPane frameworkInputPane = new FrameworkInputPane();
Rectangle rect;
((IFrameworkInputPane)frameworkInputPane).Location(out rect);
Marshal.ReleaseComObject(frameworkInputPane);
return !rect.IsEmpty;
}
[ComImport, Guid("d5120aa3-46ba-44c5-822d-ca8092c1fc72")]
public class FrameworkInputPane
{
}
[ComImport, Guid("5752238b-24f0-495a-82f1-2fd593056796")]
[InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]
public interface IFrameworkInputPane
{
int Advise([MarshalAs(UnmanagedType.IUnknown)] object pWindow, [MarshalAs(UnmanagedType.IUnknown)] object pHandler, out int pdwCookie);
int AdviseWithHWND(IntPtr hwnd, [MarshalAs(UnmanagedType.IUnknown)] object pHandler, out int pdwCookie);
int Unadvise(int pdwCookie);
int Location(out Rectangle prcInputPaneScreenLocation);
}
}
'@
# Toggle the Touch Keyboard regardless of whether it is currently shown or not.
#[TouchKeyboardController]::ToggleTouchKeyboard()
# Alternatively, if you only want to show the Touch Keyboard
# and not hide it if it is already active:
if (-not [TouchKeyboardController]::IsInputPaneOpen()) {
[TouchKeyboardController]::ToggleTouchKeyboard()
}

343
led_control.py Executable file
View File

@ -0,0 +1,343 @@
#!/usr/bin/env python3
import sacn
import time
import sys
import yaml
import math
import random
from util import fprint
import platform # For getting the operating system name
import subprocess # For executing a shell command
from util import win32
import cv2
import numpy as np
from uptime import uptime
sender = None
debug = True
config = None
leds = None
leds_size = None
leds_normalized = None
controllers = None
data = None
start = uptime()
def ping(host):
#Returns True if host (str) responds to a ping request.
# Option for the number of packets as a function of
if win32:
param1 = '-n'
param2 = '-w'
param3 = '250'
else:
param1 = '-c'
param2 = '-W'
param3 = '0.25'
# Building the command. Ex: "ping -c 1 google.com"
command = ['ping', param1, '1', param2, param3, host]
return subprocess.call(command, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT) == 0
def map():
global config
global leds
global leds_size
global leds_normalized
global controllers
with open('config.yml', 'r') as fileread:
#global config
config = yaml.safe_load(fileread)
leds = list()
leds_size = list()
controllers = list()
#fprint(config["led"]["map"])
for shape in config["led"]["map"]:
if shape["type"] == "circle":
#fprint(shape["pos"])
anglediv = 360.0 / shape["size"]
angle = 0
radius = shape["diameter"] / 2
lednum = shape["start"]
if len(leds) < lednum + shape["size"]:
for x in range(lednum + shape["size"] - len(leds)):
leds.append(None)
leds_size.append(None)
while angle < 359.999:
tmpangle = angle + shape["angle"]
x = math.cos(tmpangle * (math.pi / 180.0)) * radius + shape["pos"][0]
y = math.sin(tmpangle * (math.pi / 180.0)) * radius + shape["pos"][1]
leds[lednum] = (x,y)
lednum = lednum + 1
angle = angle + anglediv
elif shape["type"] == "strip":
angle = shape["angle"]
lednum = shape["start"]
length = shape["length"]
distdiv = length / shape["size"]
dist = distdiv / 2
xmov = math.cos(angle * (math.pi / 180.0)) * distdiv
ymov = math.sin(angle * (math.pi / 180.0)) * distdiv
pos = shape["pos"]
if len(leds) < lednum + shape["size"]:
for x in range(lednum + shape["size"] - len(leds)):
leds.append(None)
leds_size.append(None)
while dist < length:
leds[lednum] = (pos[0], pos[1])
pos[0] += xmov
pos[1] += ymov
dist += distdiv
lednum = lednum + 1
flag = 0
for x in leds:
if x is None:
flag = flag + 1
if flag > 0:
fprint("Warning: Imperfect LED map ordering. Hiding undefined lights.")
for x in range(len(leds)):
if leds[x] is None:
leds[x] = (0, 0)
#leds = tmpleds.reverse()
#fprint(leds)
# controller mapping
for ctrl in config["led"]["controllers"]:
if len(controllers) < ctrl["universe"]+1:
for x in range(ctrl["universe"]+1 - len(controllers)):
controllers.append(None)
controllers[ctrl["universe"]] = (ctrl["ledstart"],ctrl["ledend"]+1,ctrl["ip"])
for x in range(ctrl["ledstart"],ctrl["ledend"]+1):
leds_size[x] = len(ctrl["mode"])
#fprint(controllers)
if(debug):
import matplotlib.pyplot as plt
plt.axis('equal')
for ctrl in controllers:
plt.scatter(*zip(*leds[ctrl[0]:ctrl[1]]), s=2)
#plt.scatter(*zip(*leds), s=3)
plt.savefig("map.png", dpi=600, bbox_inches="tight")
leds_adj = [(x-min([led[0] for led in leds]), # push to zero start
y-min([led[1] for led in leds]) )
for x, y in leds]
leds_normalized = [(x / max([led[0] for led in leds_adj]),
y / max([led[1] for led in leds_adj]))
for x, y in leds_adj]
#return leds, controllers
def init():
map()
global sender
global config
global leds
global leds_size
global controllers
global data
sender = sacn.sACNsender(fps=config["led"]["fps"], universeDiscovery=False)
sender.start() # start the sending thread
for x in range(len(controllers)):
print("Waiting for the controller at", controllers[x][2], "to be online...", end="")
count = 0
while not ping(controllers[x][2]):
count = count + 1
if count >= config["led"]["timeout"]:
fprint(" ERROR: controller still offline after " + str(count) + " seconds, continuing...")
break
if count < config["led"]["timeout"]:
fprint(" done")
for x in range(len(controllers)):
print("Activating controller", x, "at", controllers[x][2], "with", controllers[x][1]-controllers[x][0], "LEDs.")
sender.activate_output(x+1) # start sending out data
sender[x+1].destination = controllers[x][2]
sender.manual_flush = True
# initialize global pixel data list
data = list()
for x in range(len(leds)):
if leds_size[x] == 3:
data.append((20,20,127))
elif leds_size[x] == 4:
data.append((50,50,255,0))
else:
data.append((0,0,0))
sendall(data)
#time.sleep(50000)
fprint("Running start-up test sequence...")
for y in range(1):
for x in range(len(leds)):
setpixel(5,5,5,x)
sendall(data)
#time.sleep(2)
#alloffsmooth()
def sendall(datain):
# send all LED data to all controllers
# data must have all LED data in it as [(R,G,B,)] tuples in an array, 1 tuple per pixel
global controllers
global sender
sender.manual_flush = True
for x in range(len(controllers)):
sender[x+1].dmx_data = list(sum(datain[controllers[x][0]:controllers[x][1]] , ())) # flatten the subsection of the data array
sender.flush()
time.sleep(0.002)
#sender.flush() # 100% reliable with 2 flushes, often fails with 1
#time.sleep(0.002)
#sender.flush()
def fastsendall(datain):
# send all LED data to all controllers
# data must have all LED data in it as [(R,G,B,)] tuples in an array, 1 tuple per pixel
global controllers
global sender
sender.manual_flush = False
print(datain[controllers[0][0]:controllers[0][1]])
for x in range(len(controllers)):
sender[x+1].dmx_data = list(sum(datain[controllers[x][0]:controllers[x][1]] , ())) # flatten the subsection of the data array
sender.flush()
def senduniverse(datain, lednum):
# send all LED data for 1 controller/universe
# data must have all LED data in it as [(R,G,B,)] tuples in an array, 1 tuple per pixel
global controllers
global sender
for x in range(len(controllers)):
if lednum >= controllers[x][0] and lednum < controllers[x][1]:
sender[x+1].dmx_data = list(sum(datain[controllers[x][0]:controllers[x][1]] , ())) # flatten the subsection of the data array
sender.flush()
time.sleep(0.004)
#sender.flush() # 100% reliable with 2 flushes, often fails with 1
#time.sleep(0.002)
#sender.flush()
def alloff():
tmpdata = list()
for x in range(len(leds)):
if leds_size[x] == 3:
tmpdata.append((0,0,0))
elif leds_size[x] == 4:
tmpdata.append((0,0,0,0))
else:
tmpdata.append((0,0,0))
sendall(tmpdata)
#sendall(tmpdata)
#sendall(tmpdata) #definitely make sure it's off
def allon():
global data
sendall(data)
def alloffsmooth():
tmpdata = data
for x in range(256):
for x in range(len(data)):
setpixel(tmpdata[x][0]-1,tmpdata[x][1]-1,tmpdata[x][2]-1, x)
sendall(tmpdata)
alloff()
def setpixelnow(r, g, b, num):
# slight optimization: send only changed universe
# unfortunately no way to manual flush data packets to only 1 controller with this sACN library
global data
setpixel(r,g,b,num)
senduniverse(data, num)
def setpixel(r, g, b, num):
global data
global leds_size
# constrain values
if r < 0:
r = 0
elif r > 255:
r = 255
if g < 0:
g = 0
elif g > 255:
g = 255
if b < 0:
b = 0
elif b > 255:
b = 255
if leds_size[num] == 3:
data[num] = (int(r), int(g), int(b))
elif leds_size[num] == 4: # cut out matching white and turn on white pixel instead
data[num] = (( int(r) - int(min(r,g,b)), int(g) - int(min(r,g,b)), int(b) - int(min(r,g,b)), int(min(r,g,b))) )
else:
data[num] = (int(r), int(g), int(b))
def close():
global sender
time.sleep(0.5)
sender.stop()
def mapimage(image, fps=60):
global start
while uptime() - start < 1/fps:
time.sleep(0.00001)
fprint(1 / (uptime() - start))
start = uptime()
minsize = min(image.shape[0:2])
leds_normalized2 = [(x * minsize,
y * minsize)
for x, y in leds_normalized]
cv2.imshow("video", image)
cv2.waitKey(1)
#im_rgb = image #cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # OpenCV uses BGR format by default
avgx = 0
avgy = 0
for xx in range(len(leds_normalized2)):
led = leds_normalized2[xx]
x, y = int(round(led[0])), int(round(led[1]))
if x < image.shape[1] and y < image.shape[0]:
#avgx += x
#avgy += y
color = tuple(image[y, x])
setpixel(color[2]/2,color[1]/2,color[0]/2,xx) # swap b & r
#print(color)
else:
#avgx += x
#avgy += y
setpixel(0,0,0,xx)
#avgx /= len(leds)
#avgy /= len(leds)
#print((avgx,avgy, max([led[0] for led in leds_adj]), max([led[1] for led in leds_adj]) , min(image.shape[0:2]) ))
global data
fastsendall(data)
if __name__ == "__main__":
init()
cap = cv2.VideoCapture('output.mp4')
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
mapimage(frame)
time.sleep(1)
close()
#sys.exit(0)

BIN
map.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 381 KiB

45
process_video.py Executable file
View File

@ -0,0 +1,45 @@
#!/usr/bin/env python3
import cv2
import banner_ivu_export
import numpy as np
from util import fprint
class qr_reader():
camera = None
def __init__(self, ip, port):
self.camera = banner_ivu_export.DriveImg(ip, port)
def read_qr(self, tries=1):
print("Trying " + str(tries) + " frames.")
for x in range(tries):
try:
imgtype, img = self.camera.read_img()
#fprint(imgtype)
image_array = np.frombuffer(img, np.uint8)
img = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
#cv2.imshow('Image', img)
#cv2.waitKey(1)
detect = cv2.QRCodeDetector()
value, points, straight_qrcode = detect.detectAndDecode(img)
return value
except:
continue
return False
class video_streamer():
camera = None
def __init__(self, ip, port):
self.camera = banner_ivu_export.DriveImg(ip, port)
def get_frame(self):
try:
return self.camera.read_img()
except:
return False
if __name__ == "__main__":
test = qr_reader("192.168.1.125", 32200)
while True:
fprint(test.read_qr(5))

View File

@ -7,13 +7,15 @@ import camelot
import numpy as np import numpy as np
from PIL import Image from PIL import Image
import io import io
import json
from util import fprint
def parse(filename, output_dir): def parse(filename, output_dir):
# Extract table data # Extract table data
tables = camelot.read_pdf(filename, pages="1-end", flavor='lattice', backend="poppler", split_text=False, line_scale=100, process_background=True, resolution=600, interations=1, layout_kwargs={'detect_vertical': False, 'char_margin': 0.5}, shift_text=['r', 't']) tables = camelot.read_pdf(filename, pages="1-end", flavor='lattice', backend="poppler", split_text=False, line_scale=100, process_background=True, resolution=600, interations=1, layout_kwargs={'detect_vertical': False, 'char_margin': 0.5}, shift_text=['r', 't'])
#print("Total tables extracted:", tables.n) #fprint("Total tables extracted:", tables.n)
n = 0 n = 0
pagenum = 0 pagenum = 0
reader = PdfReader(filename) reader = PdfReader(filename)
@ -26,10 +28,10 @@ def parse(filename, output_dir):
table.df.replace(np.nan, '', inplace=True) table.df.replace(np.nan, '', inplace=True)
if not table.df.empty: if not table.df.empty:
#print("\nTable " + str(n)) #fprint("\nTable " + str(n))
# Extract table names # Extract table names
table_start = table.cells[0][0].lt[1] # Read top-left cell's top-left coordinate table_start = table.cells[0][0].lt[1] # Read top-left cell's top-left coordinate
#print(table_start) #fprint(table_start)
ymin = table_start ymin = table_start
ymax = table_start + 10 ymax = table_start + 10
if pagenum != table.page - 1: if pagenum != table.page - 1:
@ -43,21 +45,22 @@ def parse(filename, output_dir):
page.extract_text(visitor_text=visitor_body) page.extract_text(visitor_text=visitor_body)
text_body = "".join(parts).strip('\n') text_body = "".join(parts).strip('\n')
#print(text_body) if len(text_body) == 0:
text_body = str(n)
#fprint(text_body)
table_list[text_body] = table.df table_list[text_body] = table.df
#table.to_html("table" + str(n) + ".html") #table.to_html("table" + str(n) + ".html")
#print(table.df) #fprint(table.df)
#camelot.plot(table, kind='grid').savefig("test" + str(n) + ".png") #camelot.plot(table, kind='grid').savefig("test" + str(n) + ".png")
n=n+1 n=n+1
#camelot.plot(tables[0], kind='grid').savefig("test.png") #camelot.plot(tables[0], kind='grid').savefig("test.png")
tables.export(output_dir + '/techdata.json', f='json') #tables.export(output_dir + '/techdata.json', f='json')
# print(table_list) # fprint(table_list)
# Extract Basic details - part name & description, image, etc # Extract Basic details - part name & description, image, etc
reader = PdfReader(filename) reader = PdfReader(filename)
@ -66,7 +69,7 @@ def parse(filename, output_dir):
skip = False skip = False
for image_file_object in page.images: for image_file_object in page.images:
if image_file_object.name == "img0.png" and skip == False: if image_file_object.name == "img0.png" and skip == False:
#print(Image.open(io.BytesIO(image_file_object.data)).mode) #fprint(Image.open(io.BytesIO(image_file_object.data)).mode)
if Image.open(io.BytesIO(image_file_object.data)).mode == "P": if Image.open(io.BytesIO(image_file_object.data)).mode == "P":
skip = True skip = True
continue continue
@ -81,7 +84,95 @@ def parse(filename, output_dir):
with open(output_dir + "/brand.png", "wb") as fp: with open(output_dir + "/brand.png", "wb") as fp:
fp.write(image_file_object.data) fp.write(image_file_object.data)
count += 1 count += 1
return table_list
# Table parsing and reordring
tables = dict()
previous_table = ""
for table_name in table_list.keys():
# determine shape: horizontal or vertical
table = table_list[table_name]
rows = table.shape[0]
cols = table.shape[1]
vertical = None
if rows > 2 and cols == 2:
vertical = True
elif cols == 1:
vertical = False
elif rows == 1:
vertical = True
elif cols == 2: # and rows <= 2
# inconsistent
if table.iloc[0, 0].find(":") == len(table.iloc[0, 0]) - 1: # check if last character is ":" indicating a vertical table
vertical = True
else:
vertical = False
elif cols > 2: # and rows <= 2
vertical = False
elif rows > 2 and cols > 2: # big table
vertical = False
else: # 1 column, <= 2 rows
vertical = False
# missing name check
for table_name_2 in table_list.keys():
if table_name_2.find(table.iloc[-1, 0]) >= 0:
# Name taken from table directly above - this table does not have a name
table_list["Specs " + str(len(tables))] = table_list.pop(table_name_2, None) # rename table to arbitrary altername name
break
if vertical:
out = dict()
for row in table.itertuples(index=False, name=None):
out[row[0].replace("\n", " ").replace(":", "")] = row[1]
else: # horizontal
out = dict()
for col in table.columns:
col_data = tuple(table[col])
out[col_data[0].replace("\n", " ")] = col_data[1:]
tables[table_name] = out
# multi-page table check
if table_name.isdigit() and len(tables) > 1:
fprint(table_name)
fprint(previous_table)
main_key = previous_table
cont_key = table_name
fprint(tables)
if vertical == False:
main_keys = list(tables[main_key].keys())
for i, (cont_key, cont_values) in enumerate(tables[cont_key].items()):
if i < len(main_keys):
fprint(tables[main_key][main_keys[i]])
tables[main_key][main_keys[i]] = (tables[main_key][main_keys[i]] + (cont_key,) + cont_values)
del tables[table_name]
else:
for key in tables[cont_key].keys():
tables[main_key][key] = tables[cont_key][key]
del tables[table_name]
previous_table = table_name
fprint(tables)
with open(output_dir + "/tables.json", 'w') as json_file:
json.dump(tables, json_file)
return tables

View File

@ -4,7 +4,14 @@ opencv-python
pypdf2==2.12.1 pypdf2==2.12.1
alive-progress alive-progress
requests requests
git+https://github.com/Byeongdulee/python-urx.git
psycopg2 psycopg2
pyyaml
Flask
selenium
sacn
uptime
websockets
# Development # Development
matplotlib matplotlib

408
run.py Executable file
View File

@ -0,0 +1,408 @@
#!/usr/bin/env python3
import get_specs
import traceback
#import logging
import yaml
from multiprocessing import Process, Manager, Pool, TimeoutError, active_children, log_to_stderr, Pipe, Queue
from multiprocessing.pool import Pool
import multiprocessing
from time import sleep
from util import fprint
from util import run_cmd
import sys
import ur5_control
import os
import signal
import socket
from flask import Flask, render_template, request
import requests
import led_control
import server
import asyncio
import json
import process_video
config = None
keeprunning = True
arm_ready = False
led_ready = False
camera_ready = False
sensor_ready = False
vm_ready = False
killme = None
#pool = None
serverproc = None
camera = None
to_server_queue = Queue()
from_server_queue = Queue()
def arm_start_callback(res):
global arm_ready
arm_ready = True
def led_start_callback(res):
global led_ready
led_ready = True
def camera_start_callback(res):
global camera_ready
camera_ready = True
def sensor_start_callback(res):
global sensor_ready
sensor_ready = True
def vm_start_callback(res):
global vm_ready
vm_ready = True
def wait_for(val, name):
#global val
if val is False:
fprint("waiting for " + name + " to complete...")
while val is False:
sleep(0.1)
def start_server_socket():
"""app = Flask(__name__)
@app.route('/report_ip', methods=['POST'])
def report_ip():
client_ip = request.json.get('ip')
fprint(f"Received IP: {client_ip}")
# You can store or process the IP address as needed
return "IP Received", 200
app.run(host='0.0.0.0', port=5000)"""
global to_server_queue
global from_server_queue
fprint("Starting WebSocket server...")
websocket_process = server.start_websocket_server(to_server_queue, from_server_queue)
# Example
#to_server_queue.put("Hello, WebSocket clients!")
while True:
#print("HI")
if not from_server_queue.empty():
client_id, message = from_server_queue.get()
fprint(f"Message from client {client_id}: {message}")
# Message handler
try:
decoded = json.loads(message)
if "type" not in decoded:
fprint("Missing \"type\" field.")
continue
if "call" not in decoded:
fprint("Missing \"call\" field.")
continue
if "data" not in decoded:
fprint("Missing \"data\" field.")
continue
# if we get here, we have a "valid" data packet
data = decoded["data"]
call = decoded["call"]
match decoded["type"]:
case "log":
fprint("log message")
if call == "send":
fprint("webapp: " + data)
elif call == "request":
fprint("")
case "cable_map":
fprint("cable_map message")
if call == "send":
fprint("")
elif call == "request":
fprint("")
case "cable_details":
fprint("cable_details message")
if call == "send":
fprint("")
elif call == "request":
fprint("")
case "cable_search":
fprint("cable_search message")
if call == "send":
fprint("")
elif call == "request":
fprint("")
case "keyboard":
fprint("keyboard message")
if call == "send":
fprint("")
elif call == "request":
fprint("")
if data["enabled"] == True:
# todo : send this to client
p = Process(target=run_cmd, args=("./keyboard-up.ps1",))
p.start()
elif data["enabled"] == False:
p = Process(target=run_cmd, args=("./keyboard-down.ps1",))
p.start()
case "machine_settings":
fprint("machine_settings message")
if call == "send":
fprint("")
elif call == "request":
fprint("")
case _:
fprint("Unknown/unimplemented data type: " + decoded["type"])
except:
fprint("Non-JSON message recieved")
continue
sleep(0.001) # Sleep to prevent tight loop
def start_client_socket():
app = Flask(__name__)
@app.route('/control_client', methods=['POST'])
def message_from_server():
# Handle message from server
data = request.json
fprint(f"Message from server: {data.get('message')}")
return "Message received", 200
app.run(host='0.0.0.0', port=6000)
def check_server_online(serverip, clientip):
def send_ip_to_server(server_url, client_ip):
try:
response = requests.post(server_url, json={'ip': client_ip}, timeout=1)
fprint(f"Server response: {response.text}")
return True
except requests.exceptions.RequestException as e:
fprint(f"Error sending IP to server: {e}")
return False
server_url = 'http://' + serverip + ':5000/report_ip'
while not send_ip_to_server(server_url, clientip):
sleep(1)
fprint("Successfully connected to server.")
return True
def setup_server(pool):
# linux server setup
global config
global counter
global sensor_ready
global camera_ready
global led_ready
global arm_ready
global serverproc
global camera
pool.apply_async(ur5_control.init, (config["arm"]["ip"],), callback=arm_start_callback)
pool.apply_async(led_control.init, callback=led_start_callback)
#pool.apply_async(sensor_control.init, callback=sensor_start_callback)
serverproc = Process(target=start_server_socket)
serverproc.start()
if led_ready is False:
fprint("waiting for " + "LED controller initialization" + " to complete...", sendqueue=to_server_queue)
while led_ready is False:
sleep(0.1)
fprint("LED controllers initialized.", sendqueue=to_server_queue)
#to_server_queue.put("[log] LED controllers initialized.")
sensor_ready = True
if sensor_ready is False:
fprint("waiting for " + "Sensor Initialization" + " to complete...", sendqueue=to_server_queue)
while sensor_ready is False:
sleep(0.1)
fprint("Sensors initialized.", sendqueue=to_server_queue)
if camera_ready is False:
fprint("waiting for " + "Camera initilization" + " to complete...", sendqueue=to_server_queue)
camera = process_video.qr_reader(config["cameras"]["banner"]["ip"], config["cameras"]["banner"]["port"])
fprint("Camera initialized.", sendqueue=to_server_queue)
arm_ready = True
if arm_ready is False:
fprint("waiting for " + "UR5 initilization" + " to complete...", sendqueue=to_server_queue)
while arm_ready is False:
sleep(0.1)
fprint("Arm initialized.", sendqueue=to_server_queue)
return True
def mainloop_server(pool):
global config
global counter
global killme
if killme.value > 0:
killall()
counter = counter + 1
fprint("Looking for QR code...")
print(camera.read_qr(30))
def run_loading_app():
app = Flask(__name__)
@app.route('/')
def index():
return render_template('index.html')
app.run(debug=True, use_reloader=False, port=7000)
def setup_client(pool):
# Windows client setup
fprint("Opening browser...")
firefox = webdriver.Firefox()
firefox.fullscreen_window()
global config
global vm_ready
global serverproc
# Open loading wepage
p = Process(target=run_loading_app)
p.start()
firefox.get('http://localhost:7000')
# start Linux server VM
if config["core"]["server"] == "Hyper-V":
run_cmd("Start-VM -Name Jukebox*") # any and all VMs starting with "Jukebox"
# Wait for VM to start and be reachable over the network
serverproc = Process(target=start_client_socket)
serverproc.start()
pool.apply_async(check_server_online, (config["core"]["serverip"],config["core"]["clientip"]), callback=vm_start_callback)
#wait_for(vm_ready, "VM Startup")
#global vm_ready
if vm_ready is False:
fprint("waiting for " + "VM Startup" + " to complete...")
while vm_ready is False:
sleep(0.1)
p.terminate()
firefox.get("http://" + config["core"]["serverip"] + ":8000")
return True
def mainloop_client(pool):
sleep(0.1)
# listen for & act on commands from VM, if needed
# mainly just shut down, possibly connect to wifi or something
"""class Logger(object):
def __init__(self, filename="output.log"):
self.log = open(filename, "a")
self.terminal = sys.stdout
def write(self, message):
self.log.write(message)
#close(filename)
#self.log = open(filename, "a")
try:
self.terminal.write(message)
except:
sleep(0)
def flush(self):
print("",end="")"""
def killall():
procs = active_children()
for proc in procs:
proc.kill()
fprint("All child processes killed")
os.kill(os.getpid(), 9) # dirty kill of self
def killall_signal(a, b):
global config
if config["core"]["server"] == "Hyper-V":
run_cmd("Stop-VM -Name Jukebox*") # any and all VMs starting with "Jukebox"
killall()
def error(msg, *args):
return multiprocessing.get_logger().error(msg, *args)
class LogExceptions(object):
def __init__(self, callable):
self.__callable = callable
def __call__(self, *args, **kwargs):
try:
result = self.__callable(*args, **kwargs)
except Exception as e:
# Here we add some debugging help. If multiprocessing's
# debugging is on, it will arrange to log the traceback
error(traceback.format_exc())
# Re-raise the original exception so the Pool worker can
# clean up
raise
# It was fine, give a normal answer
return result
class LoggingPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)
if __name__ == "__main__":
#sys.stdout = Logger(filename="output.log")
#sys.stderr = Logger(filename="output.log")
#log_to_stderr(logging.DEBUG)
fprint("Starting Jukebox control system...")
with open('config.yml', 'r') as fileread:
#global config
config = yaml.safe_load(fileread)
with Manager() as manager:
fprint("Spawning threads...")
pool = LoggingPool(processes=10)
counter = 0
killme = manager.Value('d', 0)
signal.signal(signal.SIGINT, killall_signal)
if config["core"]["mode"] == "winclient":
fprint("Starting in client mode.")
from selenium import webdriver
if setup_client(pool):
fprint("Entering main loop...")
while(keeprunning):
mainloop_client(pool)
elif config["core"]["mode"] == "linuxserver":
fprint("Starting in server mode.")
if setup_server(pool):
fprint("Entering main loop...")
while(keeprunning):
mainloop_server(pool)

95
server.py Executable file
View File

@ -0,0 +1,95 @@
#!/usr/bin/env python3
# WebSocket server for communicating with all other nodes
"""import websockets
import asyncio
from util import fprint
class WebSocketServer:
def __init__(self):
self.start_server = websockets.serve(self.handler, "0.0.0.0", 9000)
self.received_messages = asyncio.Queue()
self.connected_clients = set()
async def handler(self, websocket, path):
self.connected_clients.add(websocket)
fprint(self.connected_clients)
try:
async for message in websocket:
fprint(message)
await self.received_messages.put(message)
finally:
self.connected_clients.remove(websocket)
async def send_message(self, message):
disconnected_clients = set()
for websocket in self.connected_clients:
try:
await websocket.send(message)
except websockets.exceptions.ConnectionClosed:
disconnected_clients.add(websocket)
self.connected_clients.difference_update(disconnected_clients)
def run(self):
fprint("Starting WebSocket server...")
asyncio.get_event_loop().run_until_complete(self.start_server)
#asyncio.run(self.start_server)
asyncio.get_event_loop().run_forever()
async def get_received_message(self):
if not self.received_messages.empty():
return await self.received_messages.get()
return None
# Function to be used by multiprocessing to run the server
def run_server():
server = WebSocketServer()
asyncio.run(server.run())"""
import asyncio
import websockets
from multiprocessing import Process, Queue
from util import fprint
import uuid
connected_clients = {}
async def handler(websocket, path, to_server_queue, from_server_queue):
# Register websocket connection
client_id = str(uuid.uuid4())
connected_clients[client_id] = websocket
try:
# Handle incoming messages
async for message in websocket:
#print(f"Received message: {message}")
print(client_id)
from_server_queue.put((client_id, message))
finally:
# Unregister websocket connection
if client_id in connected_clients:
del connected_clients[client_id]
print(f"Client {client_id} connection closed")
async def send_messages(to_server_queue):
while True:
if not to_server_queue.empty():
client_id, message = to_server_queue.get()
if client_id in connected_clients: # Send message to specific client
await connected_clients[client_id].send(message)
elif len(connected_clients) > 0: # Broadcast message to all clients
for client in connected_clients.values():
await client.send(message)
await asyncio.sleep(0.001)
def websocket_server(to_server_queue, from_server_queue):
start_server = websockets.serve(lambda ws, path: handler(ws, path, to_server_queue, from_server_queue), "localhost", 9000)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().create_task(send_messages(to_server_queue))
asyncio.get_event_loop().run_forever()
def start_websocket_server(to_server_queue, from_server_queue):
p = Process(target=websocket_server, args=(to_server_queue, from_server_queue))
p.start()
return p

24
static/style.css Normal file
View File

@ -0,0 +1,24 @@
body {
text-align: center;
padding-top: 100px;
}
.spinner {
border: 16px solid #f3f3f3;
border-top: 16px solid #002554;
border-radius: 50%;
width: 120px;
height: 120px;
animation: spin 2s linear infinite;
margin: auto;
}
@keyframes spin {
0% { transform: rotate(0deg); }
100% { transform: rotate(360deg); }
}
p {
margin-top: 20px;
font-size: 20px;
}

13
templates/index.html Normal file
View File

@ -0,0 +1,13 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Loading</title>
<link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}">
</head>
<body>
<div class="spinner"></div>
<p>Waiting for backend server to start...</p>
</body>
</html>

98
test.html Normal file

File diff suppressed because one or more lines are too long

58
udp_send_test.py Executable file
View File

@ -0,0 +1,58 @@
#!/usr/bin/env python3
import sacn
import time
ipaddr = "192.168.68.130"
sender = None
data = None
start = time.time()
def init():
global sender
global data
sender = sacn.sACNsender(universeDiscovery=False)
sender.start() # start the sending thread
sender.activate_output(2) # start sending out data
sender[2].destination = ipaddr
sender.manual_flush = True
# initialize global pixel data list
data = list()
for x in range(170):
data.append((1,2,3)) # some random data
def fastsendall(datain):
# send all LED data to all controllers
# data must have all LED data in it as [(R,G,B,)] tuples in an array, 1 tuple per pixel
global sender
sender[2].dmx_data = list(sum(datain[0:170] , ())) # flatten the subsection of the data array
sender.flush()
def close():
global sender
time.sleep(0.5)
sender.stop()
def send_fps(fps=140):
global start
while time.time() - start < 1/fps:
time.sleep(0.00001)
print("FPS:", 1 / (time.time() - start))
start = time.time()
global data
fastsendall(data)
if __name__ == "__main__":
init()
while True:
send_fps()
time.sleep(1)
close()
#sys.exit(0)

101
ur5_control.py Executable file
View File

@ -0,0 +1,101 @@
import urx
import math3d as m3d
import math
import time
import os
import logging
from urx.robotiq_two_finger_gripper import Robotiq_Two_Finger_Gripper
import sys
from util import fprint
rob = None
def init(ip):
global rob
#sys.stdout = Logger()
fprint("Starting UR5 power up...")
# power up robot here
# wait for power up (this function runs async)
# trigger auto-initialize
# wait for auto-initialize
# init urx
fprint("Connecting to arm at " + ip)
trying = True
while trying:
try:
rob = urx.Robot(ip)
trying = False
except:
time.sleep(1)
robotiqgrip = Robotiq_Two_Finger_Gripper(rob)
# Sets robot arm endpoint offset (x,y,z,rx,ry,rz)
rob.set_tcp((0, 0, 0.15, 0, 0, 0))
# Set weight
rob.set_payload(2, (0, 0, 0.1))
#rob.set_payload(2, (0, 0, 0.1))
time.sleep(0.2)
fprint("UR5 ready.")
def set_pos_abs(x, y, z, xb, yb, zb):
global rob
new_orientation = m3d.Transform()
new_orientation.orient.rotate_xb(xb) # Replace rx with the desired rotation around X-axis
new_orientation.orient.rotate_yb(yb) # Replace ry with the desired rotation around Y-axis
new_orientation.orient.rotate_zb(zb) # Replace rz with the desired rotation around Z-axis
# Get the current pose
trans = rob.getl()
# Apply the new orientation while keeping the current position
new_trans = m3d.Transform(new_orientation.orient, m3d.Vector(trans[0:3]))
new_trans.pos.x = x
new_trans.pos.y = y
new_trans.pos.z = z
#rob.speedj(0.2, 0.5, 99999)
rob.set_pose(new_trans, acc=2, vel=2, command="movej") # apply the new pose
def set_pos_rel_rot_abs(x, y, z, xb, yb, zb):
global rob
new_orientation = m3d.Transform()
new_orientation.orient.rotate_xb(xb) # Replace rx with the desired rotation around X-axis
new_orientation.orient.rotate_yb(yb) # Replace ry with the desired rotation around Y-axis
new_orientation.orient.rotate_zb(zb) # Replace rz with the desired rotation around Z-axis
# Get the current pose
trans = rob.getl()
# Apply the new orientation while keeping the current position
new_trans = m3d.Transform(new_orientation.orient, m3d.Vector(trans[0:3]))
new_trans.pos.x += x
new_trans.pos.y += y
new_trans.pos.z += z
#rob.speedj(0.2, 0.5, 99999)
rob.set_pose(new_trans, acc=0.1, vel=0.4, command="movej") # apply the new pose
if __name__ == "__main__":
#rob.movej((0, 0, 0, 0, 0, 0), 0.1, 0.2)
#rob.movel((x, y, z, rx, ry, rz), a, v)
init("192.168.1.145")
fprint("Current tool pose is: ", rob.getl())
#set_pos_rel_rot_abs(0, 0, -0.2, math.pi, 0, -math.pi)
set_pos_abs(0.3, -0.2, 0.5, math.pi, 0, -math.pi)
set_pos_abs(0, 0.2, 0.6, math.pi, 0, -math.pi)
set_pos_abs(-0.5, -0.2, 0.4, math.pi, 0, -math.pi)
#set_pos_rel_rot_abs(0, 0, 0, math.pi, 0, -math.pi)
fprint("Current tool pose is: ", rob.getl())
rob.stop()
os.kill(os.getpid(), 9) # dirty kill of self
sys.exit(0)

165
util.py Executable file
View File

@ -0,0 +1,165 @@
import inspect
import sys
import subprocess
import os
from sys import platform
import time as t
from time import sleep
import uuid
import csv
win32 = platform == "win32"
linux = platform == "linux" or platform == "linux2"
macos = platform == "darwin"
datafile = ""
logMsg = ""
logCont = ""
settings = None
if win32:
sysid = hex(uuid.getnode())
# Python is running as Administrator (so netstat can get filename, to block, etc),
# so we use this to see who is actually logged in
# it's very hacky
startupinfo = subprocess.STARTUPINFO()
#if not getattr(sys, "frozen", False):
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # hide powershell window
res = subprocess.check_output(["WMIC", "ComputerSystem", "GET", "UserName"], universal_newlines=True, startupinfo=startupinfo)
_, username = res.strip().rsplit("\n", 1)
userid, sysdom = username.rsplit("\\", 1)
if linux or macos:
sysid = hex(uuid.getnode())
#fprint(sysid)
res = subprocess.check_output(["who",], universal_newlines=True)
userid = res.strip().split(" ")[0]
#sysdom = subprocess.check_output(["hostname",], universal_newlines=True).strip()
#fprint(sysdom)
#fprint("d")
def time():
return int(t.time())
def kill(pid):
setup_child()
try:
if pid > 4:
fprint("Killing PID " + str(pid), settings)
os.kill(int(pid), 9)
fprint("Signal 9 sent to PID " + str(pid), settings)
except:
fprint("Unable to kill " + str(pid), settings)
def fprint(msg, settings = None, sendqueue = None):
#if not getattr(sys, "frozen", False):
setup_child()
try:
frm = inspect.stack()[1]
mod = inspect.getmodule(frm[0])
logMsg = '[' + mod.__name__ + ":" + frm.function + ']:' + str(msg)
print(logMsg)
if (sendqueue is not None):
sendqueue.put(("*", "{ \"type\": \"log\", \"call\":\"send\", \"data\": \"" + logMsg + "\" }"))
if (settings is not None):
tmpList = settings["logMsg"]
tmpList.append(logMsg)
settings["logMsg"] = tmpList
except Exception as e:
try:
print('[????:' + frm.function + ']:', str(msg))
print('[util:fprint]: ' + str(e))
except:
print('[????]:', str(msg))
# else:
#print(msg)
def find_data_file(filename):
if getattr(sys, "frozen", False):
# The application is frozen
datadir = os.path.dirname(sys.executable)
else:
# The application is not frozen
# Change this bit to match where you store your data files:
datadir = os.path.dirname(__file__)
return os.path.join(datadir, filename)
def run_cmd(cmd):
if win32:
startupinfo = subprocess.STARTUPINFO()
#print("DICKS")
#if not getattr(sys, "frozen", False):
# print("test")
#
#completed = subprocess.run(["powershell", "-Command", cmd], capture_output=True, startupinfo=startupinfo)
#else:
# print("alt")
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW # , "-WindowStyle", "hidden"
fprint("running PS command: " + cmd, settings)
completed = subprocess.run(["powershell", "-Command", cmd], capture_output=True, startupinfo=startupinfo)
fprint("ran PS command successfully", settings)
#completed = subprocess.run(["powershell", "-WindowStyle", "hidden", "-Command", cmd], capture_output=True, startupinfo=startupinfo)
return completed
if linux or macos:
fprint("running sh command: " + cmd, settings)
completed = subprocess.run(["sh", "-c", cmd], capture_output=True)
fprint("ran sh command successfully", settings)
return completed
def setup_child(sets=None):
if not getattr(sys, "frozen", False):
sys.stdout = Logger(filename=find_data_file("output.log"))
sys.stderr = Logger(filename=find_data_file("output.log"))
if sets is not None:
settings = sets
class Logger(object):
def __init__(self, filename="output.log"):
self.log = open(filename, "a")
self.terminal = sys.stdout
def write(self, message):
self.log.write(message)
#close(filename)
#self.log = open(filename, "a")
try:
self.terminal.write(message)
except:
sleep(0)
def flush(self):
print("", end="")
def write_stats(stats):
fprint("Writing stats", settings)
tmp = list()
tmp.append(["connections blocked", "connections allowed", "data uploaded", "data recieved", "block ratio"])
tmp.append(stats)
with open(find_data_file("stats.csv"), "w", newline="") as f:
writer = csv.writer(f)
writer.writerows(tmp)
fprint("Done writing stats", settings)
def read_stats():
with open(find_data_file("stats.csv"), newline='') as csvfile:
csvreader = csv.reader(csvfile, delimiter=',', quotechar='|')
header = True
fprint(csvreader, settings)
data = list()
for line in csvreader:
fprint(line, settings)
if header:
header = False
continue
data = line
for idx in range(len(data) - 1):
data[idx] = int(data[idx])
data[len(data) - 1] = float(data[len(data) - 1])
return data

42
websocket_test.html Normal file
View File

@ -0,0 +1,42 @@
<!DOCTYPE html>
<html>
<head>
<title>WebSocket Test</title>
<script>
document.addEventListener("DOMContentLoaded", function() {
// Create WebSocket connection.
const socket = new WebSocket('ws://localhost:9000');
// Connection opened
socket.addEventListener('open', function (event) {
console.log("Connected to WebSocket server");
});
// Listen for messages
socket.addEventListener('message', function (event) {
console.log('Message from server', event.data);
let messages = document.getElementById('messages');
let message = document.createElement('li');
message.textContent = "Received: " + event.data;
messages.appendChild(message);
});
// Send a message to the server
function sendMessage() {
let message = document.getElementById('messageInput').value;
socket.send(message);
console.log('Message sent', message);
}
// Bind send message function to button click
document.getElementById('sendMessage').addEventListener('click', sendMessage);
});
</script>
</head>
<body>
<h2>WebSocket Test</h2>
<input type="text" id="messageInput" placeholder="Type a message...">
<button id="sendMessage">Send Message</button>
<ul id="messages"></ul>
</body>
</html>