jukebox-software/read_datasheet.py

488 lines
21 KiB
Python
Executable File

#!/usr/bin/env python3
# Parse Belden (100%) & Alphawire (75%) catalog techdata datasheets
import pandas as pd
from PyPDF2 import PdfReader
import camelot
import numpy as np
from PIL import Image
import io
import json
from util import fprint
import uuid
from util import run_cmd
from util import win32
import os
import glob
import sys
from PIL import Image
import segno
def touch(path):
with open(path, 'a'):
os.utime(path, None)
def find_data_file(filename):
if getattr(sys, "frozen", False):
# The application is frozen
datadir = os.path.dirname(sys.executable)
else:
# The application is not frozen
# Change this bit to match where you store your data files:
datadir = os.path.dirname(__file__)
return os.path.join(datadir, filename)
def extract_table_name(table_start, searchpage, reader, dstype, fallbackname):
if dstype == "Belden":
ymin = table_start
ymax = table_start + 10
elif dstype == "Alphawire":
ymin = table_start - 5
ymax = table_start + 20
page = reader.pages[searchpage - 1]
parts = []
def visitor_body(text, cm, tm, fontDict, fontSize):
y = tm[5]
if y > ymin and y < ymax:
parts.append(text)
page.extract_text(visitor_text=visitor_body)
text_body = "".join(parts).strip('\n')
if len(text_body) == 0:
text_body = str(fallbackname)
return text_body
#fprint(text_body)
def find_file_noext(directory, prefix="part-hires"):
"""
Find files in the specified directory that start with the given prefix and have any extension.
:param directory: The directory to search in.
:param prefix: The prefix to search for.
:return: A list of matching file names.
"""
# Get all files and directories in the specified directory
entries = os.listdir(directory)
# Filter files that match 'filename.EXTENSION'
matching_files = [file for file in entries if os.path.isfile(os.path.join(directory, file)) and file.split('.')[0] == prefix and len(file.split('.')) == 2]
#print(directory, matching_files)
return matching_files
def rotate_and_crop_image(path, image_name, force_rotate=False, partnum=""):
# Open the image file
fprint("Generating thumbnail image for part " + partnum)
image_path = path + "/" + image_name
with Image.open(image_path) as img:
# Check if the image is wider than it is tall
if force_rotate or img.width > img.height * 1.2:
# Rotate the image by 90 degrees counter-clockwise
img = img.rotate(90, expand=True)
# Determine the size of the square (the length of the shorter side of the image)
square_size = min(img.width, img.height)
if img.height < img.width:
offset = (img.width - img.height)/2
img_cropped = img.crop((offset, 0, square_size+offset, square_size))
else:
# Crop the image to a square from the top
img_cropped = img.crop((0, 0, square_size, square_size))
# Save or display the image
img_cropped.save(path + "/" + "thumbnail-" + image_name) # Save the cropped image
def parse(filename, output_dir, partnum, dstype, weburl, extra):
tables = []
# Extract table data
try:
if dstype == "Belden":
tables = camelot.read_pdf(filename, pages="1-end", flavor='lattice', backend="ghostscript", split_text=False, line_scale=100, process_background=True, resolution=600, interations=1, layout_kwargs={'detect_vertical': False, 'char_margin': 0.5}, shift_text=['r', 't'])
elif dstype == "Alphawire":
tables = camelot.read_pdf(filename, pages="1-end", flavor='lattice', backend="ghostscript", split_text=False, line_scale=50, process_background=True, resolution=600, interations=1, layout_kwargs={'detect_vertical': True, 'char_margin': 0.5}, shift_text=['l', 'b'])
except (OSError, RuntimeError) as e:
print(e)
if win32:
print("Ghostscript is not installed! Launching installer...")
#subprocess.run([r".\\gs10030w64.exe"])
os.system(r'''Powershell -Command "& { Start-Process \"''' + find_data_file("gs10030w64.exe") + r'''\" -Verb RunAs } " ''')
# Will return once file launched...
print("Once the install is completed, try again.")
return False
else:
print("Ghostscript is not installed. You can install it with e.g. apt install ghostscript for Debian-based systems.")
return False
#fprint("Total tables extracted:", tables.n)
n = 0
#pagenum = 0
reader = PdfReader(filename)
page = reader.pages[0]
table_list = {}
table_list_raw = {}
pd.set_option('future.no_silent_downcasting', True)
for table in tables:
#with pd.options.context("future.no_silent_downcasting", True):
table.df.infer_objects(copy=False)
table.df = table.df.replace('', np.nan).infer_objects(copy=False)
table.df.dropna(inplace=True, how="all")
table.df.dropna(inplace=True, axis="columns", how="all")
table.df = table.df.replace(np.nan, '').infer_objects(copy=False)
if not table.df.empty:
#fprint("\nTable " + str(n))
# Extract table names
table_start = table.cells[0][0].lt[1] # Read top-left cell's top-left coordinate
#fprint(table_start)
text_body = extract_table_name(table_start, table.page, reader, dstype, n)
#print(text_body)
table_list[text_body] = table.df
#print(table_list[text_body])
if dstype == "Alphawire":
def reorder_row(row):
# Filter out NaNs and compute the original non-NaN values
non_nans = row[~row.isnull()]
# Create a new row with NaNs filled at the end
new_row = pd.Series(index=row.index)
new_row[:len(non_nans)] = non_nans
return new_row
# Apply the function to each row and return a new DataFrame
#table_list[text_body] = table.df.apply(reorder_row, axis=1)
#print(table_list[text_body])
table_list_raw[text_body] = table
#print(tbl)
#table.to_html("table" + str(n) + ".html")
#fprint(table.df)
#camelot.plot(table, kind='grid').savefig("test" + str(n) + ".png")
n=n+1
#camelot.plot(tables[0], kind='grid').savefig("test.png")
#tables.export(output_dir + '/techdata.json', f='json')
#fprint(table_list)
# Extract Basic details - part name & description, image, etc
reader = PdfReader(filename)
page = reader.pages[0]
count = 0
skip = False
for image_file_object in page.images:
if image_file_object.name == "img0.png" and skip == False:
#fprint(Image.open(io.BytesIO(image_file_object.data)).mode)
if Image.open(io.BytesIO(image_file_object.data)).mode == "P":
skip = True
continue
with open(output_dir + "/brand.png", "wb") as fp:
fp.write(image_file_object.data)
if Image.open(io.BytesIO(image_file_object.data)).size == (430, 430):
with open(output_dir + "/part.png", "wb") as fp:
fp.write(image_file_object.data)
if skip:
for image_file_object in page.images:
if image_file_object.name == "img1.png":
with open(output_dir + "/brand.png", "wb") as fp:
fp.write(image_file_object.data)
count += 1
if os.path.exists(output_dir + "/found_part_hires"):
rotate_and_crop_image(output_dir, find_file_noext(output_dir, prefix="part-hires")[0], force_rotate=(dstype == "Alphawire"), partnum=partnum)
img = weburl + find_file_noext(output_dir, prefix="thumbnail-part-hires")[0]
elif len(find_file_noext(output_dir, prefix="part")) > 0:
rotate_and_crop_image(output_dir, find_file_noext(output_dir, prefix="part")[0], force_rotate=(dstype == "Alphawire"), partnum=partnum)
img = weburl + find_file_noext(output_dir, prefix="thumbnail-part")[0]
else:
img = None
fprint("Making QR code for part " + partnum)
partnumqr = partnum.replace(" ", "%20")
if dstype == "Alphawire":
partnumqr = "AW" + partnumqr
if dstype == "Belden":
partnumqr = "BL" + partnumqr
qrcode = segno.make('HTTPS://BLDN.APP/' + partnumqr,micro=False,boost_error=False,error="L",mask=3)
#out = io.BytesIO()
qrx, _ = qrcode.symbol_size(1,0)
qrcode.save(output_dir + "/qrcode.png", scale=500.0/qrx, kind="PNG", border=0, light="#00000000")
qrpath = weburl + find_file_noext(output_dir, prefix="qrcode")[0]
# Table parsing and reordring
tables = dict()
torename = dict()
previous_table = ""
#print(table_list.keys())
for table_name in table_list.keys():
# determine shape: horizontal or vertical
#print(table_name)
table = table_list[table_name]
rows = table.shape[0]
cols = table.shape[1]
vertical = None
#print(rows, cols, table_name)
if rows > 2 and cols == 2:
vertical = True
elif cols == 1 and rows > 1:
vertical = False
elif rows == 1:
vertical = True
elif cols == 2: # and rows <= 2
# inconsistent
if dstype == "Belden":
if table.iloc[0, 0].find(":") == len(table.iloc[0, 0]) - 1: # check if last character is ":" indicating a vertical table
vertical = True
else:
vertical = False
elif dstype == "Alphawire":
if table.iloc[0, 0].find(")") == 1 or table.iloc[0, 0].find(")") == 2 or table.iloc[0, 0].find(":") == len(table.iloc[0, 0]) - 1: # check if last character is ":" indicating a vertical table
vertical = True
else:
vertical = False
elif cols > 2: # and rows <= 2
vertical = False
elif rows > 2 and cols > 2: # big table
vertical = False
else: # 1 column, <= 2 rows
vertical = False
#print(vertical)
# missing name check
for table_name_2 in table_list.keys():
if dstype == "Alphawire" and table_name_2.find("\n") >= 0:
torename[table_name_2] = table_name_2[0:table_name_2.find("\n")]
# if dstype == "Alphawire" and table_name_2.find(table.iloc[-1, 0]) >= 0:
# # Name taken from table directly above - this table does not have a name
# torename[table_name_2] = "Specs " + str(len(tables))
# #table_list["Specs " + str(len(tables))] = table_list[table_name_2] # rename table to arbitrary altername name
# break
if vertical:
out = dict()
if rows > 1:
for row in table.itertuples(index=False, name=None):
out[row[0].replace("\n", " ").replace(":", "")] = row[1]
else:
for row in table.itertuples(index=False, name=None):
out[row[0].replace("\n", " ").replace(":", "")] = ""
else: # horizontal
out = dict()
for col in table.columns:
col_data = tuple(table[col])
out[col_data[0].replace("\n", " ")] = col_data[1:]
tables[table_name] = out
#print(out)
# multi-page table check, Alphawire
if dstype == "Alphawire" and table_name.isdigit() and previous_table != "":
# table continues from previous page or has name on previous page
thistbl = table_list_raw[table_name]
prevtbl = table_list_raw[previous_table]
if prevtbl.cells[-1][0].lb[1] < 50 and thistbl.cells[0][0].lt[1] > 600:
# wraparound
#print("WRAP")
#print("PREV TABLE", prevtbl.df)
#print("THIS TABLE", thistbl.df)
#print("PREV TABLE CORNER", prevtbl.cells[-1][0].lb[1])
#print("THIS TABLE CORNER", thistbl.cells[0][0].lt[1])
main_key = previous_table
cont_key = table_name
#print(vertical)
if vertical == False:
main_keys = list(tables[main_key].keys())
for i, (cont_key, cont_values) in enumerate(tables[cont_key].items()):
if i < len(main_keys):
#print(tables[main_key][main_keys[i]])
tables[main_key][main_keys[i]] = (tuple(tables[main_key][main_keys[i]]) + (cont_key,) + cont_values)
del tables[table_name]
else:
#print(tables[cont_key].keys())
for key in tables[cont_key].keys():
#print(main_key, key, cont_key, key)
tables[main_key][key] = tables[cont_key][key]
del tables[table_name]
elif thistbl.cells[0][0].lt[1] > 600:
# name on previous page (grrrr)
#print("NAMEABOVE")
#print("PREV TABLE", prevtbl.df)
#print("THIS TABLE", thistbl.df)
#print("PREV TABLE CORNER", prevtbl.cells[-1][0].lb[1])
#print("THIS TABLE CORNER", thistbl.cells[0][0].lt[1])
name = extract_table_name(50, prevtbl.page,reader,dstype,table_name).strip("\n").strip()
#print("FOUND NAME:", name)
torename[table_name] = name
# multi-page table check, Belden
if dstype == "Belden":
if table_name.isdigit() and len(tables) > 1:
#fprint(table_name)
#fprint(previous_table)
main_key = previous_table
cont_key = table_name
#fprint(tables)
if vertical == False:
main_keys = list(tables[main_key].keys())
for i, (cont_key, cont_values) in enumerate(tables[cont_key].items()):
if i < len(main_keys):
#fprint(tables[main_key][main_keys[i]])
tables[main_key][main_keys[i]] = (tuple(tables[main_key][main_keys[i]]) + (cont_key,) + cont_values)
del tables[table_name]
else:
#print(tables)
#print(main_key)
#print(cont_key)
for key in tables[cont_key].keys():
tables[main_key][key] = tables[cont_key][key]
del tables[table_name]
else:
previous_table = table_name
else:
previous_table = table_name
# remove & rename tables
#print(torename)
for table_name in torename.keys():
tables[torename[str(table_name)]] = tables[str(table_name)]
del tables[table_name]
# remove multi-line values that occasionally squeak through
def replace_newlines_in_dict(d):
for key, value in d.items():
if isinstance(value, str):
# Replace \n with " " if the value is a string
d[key] = value.replace('\n', ' ')
elif isinstance(value, dict):
# Recursively call the function if the value is another dictionary
replace_newlines_in_dict(value)
return d
tables = replace_newlines_in_dict(tables)
# summary
#print(tables)
output_table = dict()
output_table["partnum"] = partnum
id = str(uuid.uuid4())
output_table["id"] = id
#output_table["position"] = id
if "brand" in extra:
output_table["brand"] = extra["brand"]
else:
output_table["brand"] = dstype
output_table["datasheet"] = weburl + "datasheet.pdf"
output_table["qrcode"] = qrpath
if img is not None:
output_table["image"] = img
output_table["fullspecs"] = {"partnum": partnum, "id": id, "brand": output_table["brand"], "image": img, "datasheet": weburl + "datasheet.pdf", "qrcode": qrpath, **tables}
output_table["searchspecs"] = {"partnum": partnum, "brand": output_table["brand"], "image": img, "datasheet": weburl + "datasheet.pdf", "qrcode": qrpath, **flatten(tables)}
else:
output_table["fullspecs"] = {"partnum": partnum, "id": id, "brand": output_table["brand"], "datasheet": weburl + "datasheet.pdf", "qrcode": qrpath, **tables}
output_table["searchspecs"] = {"partnum": partnum, "brand": output_table["brand"], "datasheet": weburl + "datasheet.pdf", "qrcode": qrpath, **flatten(tables)}
if "short_description" in extra:
output_table["short_description"] = extra["short_description"]
output_table["fullspecs"]["short_description"] = extra["short_description"]
output_table["searchspecs"]["short_description"] = extra["short_description"]
if "description" in extra:
output_table["description"] = extra["description"]
output_table["fullspecs"]["description"] = extra["description"]
output_table["searchspecs"]["description"] = extra["description"]
if "application" in extra:
output_table["application"] = extra["application"]
output_table["fullspecs"]["application"] = extra["application"]
output_table["searchspecs"]["application"] = extra["application"]
if "category" in extra:
output_table["category"] = extra["category"]
output_table["fullspecs"]["category"] = extra["category"]
output_table["searchspecs"]["category"] = extra["category"]
output_table["searchspecs"]["id"] = id
#print(output_table)
#run_cmd("rm \"" + output_dir + "\"/*.json") # not reliable!
# pattern = os.path.join(output_dir, '*.json')
# json_files = glob.glob(pattern)
# for file_path in json_files:
# os.remove(file_path)
#print(f"Deleted {file_path}")
with open(output_dir + "/search.json", 'w') as json_file:
json.dump(output_table["searchspecs"], json_file)
with open(output_dir + "/specs.json", 'w') as json_file:
json.dump(output_table["fullspecs"], json_file)
fprint("Datasheet values parsed and saved for " + partnum)
#print(json.dumps(output_table, indent=2))
touch(output_dir + "/parsed") # mark as parsed
return True
def flatten(tables):
def convert_to_number(s):
try:
# First, try converting to an integer.
return int(s)
except ValueError:
# If that fails, try converting to a float.
try:
return float(s)
except ValueError:
# If it fails again, return the original string.
return s
out = dict()
#print("{")
for table in tables.keys():
for key in tables[table].keys():
if len(key) < 64:
keyname = key
else:
keyname = key[0:64]
fullkeyname = (table + ": " + keyname).replace(".","")
if type(tables[table][key]) is not tuple:
if len(tables[table][key]) > 0:
out[fullkeyname] = convert_to_number(tables[table][key])
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
elif len(tables[table][key]) == 1:
if len(tables[table][key][0]) > 0:
out[fullkeyname] = convert_to_number(tables[table][key][0])
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
else:
tmp = []
for x in range(len(tables[table][key])):
if len(tables[table][key][x]) > 0:
tmp.append(tables[table][key][x].strip())
#out[fullkeyname + " " + str(x+1)] = convert_to_number(tables[table][key][x])
out[fullkeyname] = tmp
# if the item has at least two commas in it, split it
if tables[table][key].count(',') > 0:
out[fullkeyname] = list(map(lambda x: x.strip(), tables[table][key].split(",")))
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
# if the item has at least two commas in it, split it
if tables[table][key].count(',') > 0:
out[fullkeyname] = list(map(lambda x: x.strip(), tables[table][key].split(",")))
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
#print("}")
return out
if __name__ == "__main__":
print(parse("cables/3050/datasheet-new.pdf", "cables/3050", "3050", "Alphawire"))