488 lines
21 KiB
Python
Executable File
488 lines
21 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Parse Belden (100%) & Alphawire (75%) catalog techdata datasheets
|
|
|
|
import pandas as pd
|
|
from PyPDF2 import PdfReader
|
|
import camelot
|
|
import numpy as np
|
|
from PIL import Image
|
|
import io
|
|
import json
|
|
from util import fprint
|
|
import uuid
|
|
from util import run_cmd
|
|
from util import win32
|
|
import os
|
|
import glob
|
|
import sys
|
|
from PIL import Image
|
|
import segno
|
|
|
|
def touch(path):
|
|
with open(path, 'a'):
|
|
os.utime(path, None)
|
|
|
|
def find_data_file(filename):
|
|
if getattr(sys, "frozen", False):
|
|
# The application is frozen
|
|
datadir = os.path.dirname(sys.executable)
|
|
else:
|
|
# The application is not frozen
|
|
# Change this bit to match where you store your data files:
|
|
datadir = os.path.dirname(__file__)
|
|
return os.path.join(datadir, filename)
|
|
|
|
def extract_table_name(table_start, searchpage, reader, dstype, fallbackname):
|
|
if dstype == "Belden":
|
|
ymin = table_start
|
|
ymax = table_start + 10
|
|
elif dstype == "Alphawire":
|
|
ymin = table_start - 5
|
|
ymax = table_start + 20
|
|
page = reader.pages[searchpage - 1]
|
|
parts = []
|
|
def visitor_body(text, cm, tm, fontDict, fontSize):
|
|
y = tm[5]
|
|
if y > ymin and y < ymax:
|
|
parts.append(text)
|
|
page.extract_text(visitor_text=visitor_body)
|
|
text_body = "".join(parts).strip('\n')
|
|
if len(text_body) == 0:
|
|
text_body = str(fallbackname)
|
|
|
|
return text_body
|
|
#fprint(text_body)
|
|
|
|
def find_file_noext(directory, prefix="part-hires"):
|
|
"""
|
|
Find files in the specified directory that start with the given prefix and have any extension.
|
|
|
|
:param directory: The directory to search in.
|
|
:param prefix: The prefix to search for.
|
|
:return: A list of matching file names.
|
|
"""
|
|
# Get all files and directories in the specified directory
|
|
entries = os.listdir(directory)
|
|
# Filter files that match 'filename.EXTENSION'
|
|
matching_files = [file for file in entries if os.path.isfile(os.path.join(directory, file)) and file.split('.')[0] == prefix and len(file.split('.')) == 2]
|
|
#print(directory, matching_files)
|
|
return matching_files
|
|
|
|
def rotate_and_crop_image(path, image_name, force_rotate=False, partnum=""):
|
|
# Open the image file
|
|
fprint("Generating thumbnail image for part " + partnum)
|
|
image_path = path + "/" + image_name
|
|
with Image.open(image_path) as img:
|
|
# Check if the image is wider than it is tall
|
|
if force_rotate or img.width > img.height * 1.2:
|
|
# Rotate the image by 90 degrees counter-clockwise
|
|
img = img.rotate(90, expand=True)
|
|
|
|
# Determine the size of the square (the length of the shorter side of the image)
|
|
square_size = min(img.width, img.height)
|
|
if img.height < img.width:
|
|
offset = (img.width - img.height)/2
|
|
img_cropped = img.crop((offset, 0, square_size+offset, square_size))
|
|
else:
|
|
# Crop the image to a square from the top
|
|
img_cropped = img.crop((0, 0, square_size, square_size))
|
|
|
|
# Save or display the image
|
|
img_cropped.save(path + "/" + "thumbnail-" + image_name) # Save the cropped image
|
|
|
|
|
|
def parse(filename, output_dir, partnum, dstype, weburl, extra):
|
|
tables = []
|
|
# Extract table data
|
|
try:
|
|
if dstype == "Belden":
|
|
tables = camelot.read_pdf(filename, pages="1-end", flavor='lattice', backend="ghostscript", split_text=False, line_scale=100, process_background=True, resolution=600, interations=1, layout_kwargs={'detect_vertical': False, 'char_margin': 0.5}, shift_text=['r', 't'])
|
|
elif dstype == "Alphawire":
|
|
tables = camelot.read_pdf(filename, pages="1-end", flavor='lattice', backend="ghostscript", split_text=False, line_scale=50, process_background=True, resolution=600, interations=1, layout_kwargs={'detect_vertical': True, 'char_margin': 0.5}, shift_text=['l', 'b'])
|
|
except (OSError, RuntimeError) as e:
|
|
print(e)
|
|
if win32:
|
|
print("Ghostscript is not installed! Launching installer...")
|
|
#subprocess.run([r".\\gs10030w64.exe"])
|
|
os.system(r'''Powershell -Command "& { Start-Process \"''' + find_data_file("gs10030w64.exe") + r'''\" -Verb RunAs } " ''')
|
|
# Will return once file launched...
|
|
print("Once the install is completed, try again.")
|
|
return False
|
|
else:
|
|
print("Ghostscript is not installed. You can install it with e.g. apt install ghostscript for Debian-based systems.")
|
|
return False
|
|
#fprint("Total tables extracted:", tables.n)
|
|
n = 0
|
|
#pagenum = 0
|
|
reader = PdfReader(filename)
|
|
page = reader.pages[0]
|
|
table_list = {}
|
|
table_list_raw = {}
|
|
pd.set_option('future.no_silent_downcasting', True)
|
|
for table in tables:
|
|
#with pd.options.context("future.no_silent_downcasting", True):
|
|
table.df.infer_objects(copy=False)
|
|
table.df = table.df.replace('', np.nan).infer_objects(copy=False)
|
|
table.df.dropna(inplace=True, how="all")
|
|
table.df.dropna(inplace=True, axis="columns", how="all")
|
|
table.df = table.df.replace(np.nan, '').infer_objects(copy=False)
|
|
|
|
if not table.df.empty:
|
|
#fprint("\nTable " + str(n))
|
|
# Extract table names
|
|
table_start = table.cells[0][0].lt[1] # Read top-left cell's top-left coordinate
|
|
#fprint(table_start)
|
|
|
|
text_body = extract_table_name(table_start, table.page, reader, dstype, n)
|
|
#print(text_body)
|
|
table_list[text_body] = table.df
|
|
#print(table_list[text_body])
|
|
if dstype == "Alphawire":
|
|
|
|
def reorder_row(row):
|
|
# Filter out NaNs and compute the original non-NaN values
|
|
non_nans = row[~row.isnull()]
|
|
# Create a new row with NaNs filled at the end
|
|
new_row = pd.Series(index=row.index)
|
|
new_row[:len(non_nans)] = non_nans
|
|
return new_row
|
|
|
|
# Apply the function to each row and return a new DataFrame
|
|
|
|
|
|
#table_list[text_body] = table.df.apply(reorder_row, axis=1)
|
|
#print(table_list[text_body])
|
|
table_list_raw[text_body] = table
|
|
#print(tbl)
|
|
#table.to_html("table" + str(n) + ".html")
|
|
|
|
#fprint(table.df)
|
|
#camelot.plot(table, kind='grid').savefig("test" + str(n) + ".png")
|
|
n=n+1
|
|
#camelot.plot(tables[0], kind='grid').savefig("test.png")
|
|
|
|
#tables.export(output_dir + '/techdata.json', f='json')
|
|
|
|
#fprint(table_list)
|
|
# Extract Basic details - part name & description, image, etc
|
|
|
|
reader = PdfReader(filename)
|
|
page = reader.pages[0]
|
|
count = 0
|
|
skip = False
|
|
for image_file_object in page.images:
|
|
if image_file_object.name == "img0.png" and skip == False:
|
|
#fprint(Image.open(io.BytesIO(image_file_object.data)).mode)
|
|
if Image.open(io.BytesIO(image_file_object.data)).mode == "P":
|
|
skip = True
|
|
continue
|
|
with open(output_dir + "/brand.png", "wb") as fp:
|
|
fp.write(image_file_object.data)
|
|
if Image.open(io.BytesIO(image_file_object.data)).size == (430, 430):
|
|
with open(output_dir + "/part.png", "wb") as fp:
|
|
fp.write(image_file_object.data)
|
|
if skip:
|
|
for image_file_object in page.images:
|
|
if image_file_object.name == "img1.png":
|
|
with open(output_dir + "/brand.png", "wb") as fp:
|
|
fp.write(image_file_object.data)
|
|
count += 1
|
|
|
|
if os.path.exists(output_dir + "/found_part_hires"):
|
|
rotate_and_crop_image(output_dir, find_file_noext(output_dir, prefix="part-hires")[0], force_rotate=(dstype == "Alphawire"), partnum=partnum)
|
|
img = weburl + find_file_noext(output_dir, prefix="thumbnail-part-hires")[0]
|
|
elif len(find_file_noext(output_dir, prefix="part")) > 0:
|
|
rotate_and_crop_image(output_dir, find_file_noext(output_dir, prefix="part")[0], force_rotate=(dstype == "Alphawire"), partnum=partnum)
|
|
img = weburl + find_file_noext(output_dir, prefix="thumbnail-part")[0]
|
|
else:
|
|
img = None
|
|
|
|
fprint("Making QR code for part " + partnum)
|
|
partnumqr = partnum.replace(" ", "%20")
|
|
if dstype == "Alphawire":
|
|
partnumqr = "AW" + partnumqr
|
|
if dstype == "Belden":
|
|
partnumqr = "BL" + partnumqr
|
|
qrcode = segno.make('HTTPS://BLDN.APP/' + partnumqr,micro=False,boost_error=False,error="L",mask=3)
|
|
#out = io.BytesIO()
|
|
qrx, _ = qrcode.symbol_size(1,0)
|
|
qrcode.save(output_dir + "/qrcode.png", scale=500.0/qrx, kind="PNG", border=0, light="#00000000")
|
|
qrpath = weburl + find_file_noext(output_dir, prefix="qrcode")[0]
|
|
|
|
# Table parsing and reordring
|
|
tables = dict()
|
|
torename = dict()
|
|
previous_table = ""
|
|
#print(table_list.keys())
|
|
for table_name in table_list.keys():
|
|
# determine shape: horizontal or vertical
|
|
#print(table_name)
|
|
table = table_list[table_name]
|
|
rows = table.shape[0]
|
|
cols = table.shape[1]
|
|
vertical = None
|
|
#print(rows, cols, table_name)
|
|
if rows > 2 and cols == 2:
|
|
vertical = True
|
|
elif cols == 1 and rows > 1:
|
|
vertical = False
|
|
elif rows == 1:
|
|
vertical = True
|
|
elif cols == 2: # and rows <= 2
|
|
# inconsistent
|
|
if dstype == "Belden":
|
|
if table.iloc[0, 0].find(":") == len(table.iloc[0, 0]) - 1: # check if last character is ":" indicating a vertical table
|
|
vertical = True
|
|
else:
|
|
vertical = False
|
|
elif dstype == "Alphawire":
|
|
if table.iloc[0, 0].find(")") == 1 or table.iloc[0, 0].find(")") == 2 or table.iloc[0, 0].find(":") == len(table.iloc[0, 0]) - 1: # check if last character is ":" indicating a vertical table
|
|
vertical = True
|
|
else:
|
|
vertical = False
|
|
|
|
elif cols > 2: # and rows <= 2
|
|
vertical = False
|
|
elif rows > 2 and cols > 2: # big table
|
|
vertical = False
|
|
else: # 1 column, <= 2 rows
|
|
vertical = False
|
|
#print(vertical)
|
|
# missing name check
|
|
for table_name_2 in table_list.keys():
|
|
if dstype == "Alphawire" and table_name_2.find("\n") >= 0:
|
|
torename[table_name_2] = table_name_2[0:table_name_2.find("\n")]
|
|
|
|
# if dstype == "Alphawire" and table_name_2.find(table.iloc[-1, 0]) >= 0:
|
|
# # Name taken from table directly above - this table does not have a name
|
|
# torename[table_name_2] = "Specs " + str(len(tables))
|
|
# #table_list["Specs " + str(len(tables))] = table_list[table_name_2] # rename table to arbitrary altername name
|
|
# break
|
|
|
|
if vertical:
|
|
out = dict()
|
|
if rows > 1:
|
|
for row in table.itertuples(index=False, name=None):
|
|
out[row[0].replace("\n", " ").replace(":", "")] = row[1]
|
|
else:
|
|
for row in table.itertuples(index=False, name=None):
|
|
out[row[0].replace("\n", " ").replace(":", "")] = ""
|
|
|
|
else: # horizontal
|
|
out = dict()
|
|
for col in table.columns:
|
|
col_data = tuple(table[col])
|
|
out[col_data[0].replace("\n", " ")] = col_data[1:]
|
|
|
|
tables[table_name] = out
|
|
#print(out)
|
|
# multi-page table check, Alphawire
|
|
if dstype == "Alphawire" and table_name.isdigit() and previous_table != "":
|
|
# table continues from previous page or has name on previous page
|
|
thistbl = table_list_raw[table_name]
|
|
prevtbl = table_list_raw[previous_table]
|
|
|
|
if prevtbl.cells[-1][0].lb[1] < 50 and thistbl.cells[0][0].lt[1] > 600:
|
|
# wraparound
|
|
#print("WRAP")
|
|
#print("PREV TABLE", prevtbl.df)
|
|
#print("THIS TABLE", thistbl.df)
|
|
#print("PREV TABLE CORNER", prevtbl.cells[-1][0].lb[1])
|
|
#print("THIS TABLE CORNER", thistbl.cells[0][0].lt[1])
|
|
main_key = previous_table
|
|
cont_key = table_name
|
|
#print(vertical)
|
|
if vertical == False:
|
|
main_keys = list(tables[main_key].keys())
|
|
for i, (cont_key, cont_values) in enumerate(tables[cont_key].items()):
|
|
if i < len(main_keys):
|
|
#print(tables[main_key][main_keys[i]])
|
|
tables[main_key][main_keys[i]] = (tuple(tables[main_key][main_keys[i]]) + (cont_key,) + cont_values)
|
|
|
|
del tables[table_name]
|
|
|
|
else:
|
|
#print(tables[cont_key].keys())
|
|
for key in tables[cont_key].keys():
|
|
#print(main_key, key, cont_key, key)
|
|
tables[main_key][key] = tables[cont_key][key]
|
|
del tables[table_name]
|
|
|
|
elif thistbl.cells[0][0].lt[1] > 600:
|
|
# name on previous page (grrrr)
|
|
#print("NAMEABOVE")
|
|
#print("PREV TABLE", prevtbl.df)
|
|
#print("THIS TABLE", thistbl.df)
|
|
#print("PREV TABLE CORNER", prevtbl.cells[-1][0].lb[1])
|
|
#print("THIS TABLE CORNER", thistbl.cells[0][0].lt[1])
|
|
name = extract_table_name(50, prevtbl.page,reader,dstype,table_name).strip("\n").strip()
|
|
#print("FOUND NAME:", name)
|
|
torename[table_name] = name
|
|
|
|
|
|
|
|
# multi-page table check, Belden
|
|
if dstype == "Belden":
|
|
if table_name.isdigit() and len(tables) > 1:
|
|
#fprint(table_name)
|
|
#fprint(previous_table)
|
|
|
|
main_key = previous_table
|
|
cont_key = table_name
|
|
#fprint(tables)
|
|
if vertical == False:
|
|
main_keys = list(tables[main_key].keys())
|
|
for i, (cont_key, cont_values) in enumerate(tables[cont_key].items()):
|
|
if i < len(main_keys):
|
|
#fprint(tables[main_key][main_keys[i]])
|
|
tables[main_key][main_keys[i]] = (tuple(tables[main_key][main_keys[i]]) + (cont_key,) + cont_values)
|
|
|
|
del tables[table_name]
|
|
|
|
else:
|
|
#print(tables)
|
|
#print(main_key)
|
|
#print(cont_key)
|
|
for key in tables[cont_key].keys():
|
|
tables[main_key][key] = tables[cont_key][key]
|
|
del tables[table_name]
|
|
else:
|
|
previous_table = table_name
|
|
else:
|
|
previous_table = table_name
|
|
|
|
# remove & rename tables
|
|
#print(torename)
|
|
for table_name in torename.keys():
|
|
tables[torename[str(table_name)]] = tables[str(table_name)]
|
|
del tables[table_name]
|
|
# remove multi-line values that occasionally squeak through
|
|
def replace_newlines_in_dict(d):
|
|
for key, value in d.items():
|
|
if isinstance(value, str):
|
|
# Replace \n with " " if the value is a string
|
|
d[key] = value.replace('\n', ' ')
|
|
elif isinstance(value, dict):
|
|
# Recursively call the function if the value is another dictionary
|
|
replace_newlines_in_dict(value)
|
|
return d
|
|
|
|
tables = replace_newlines_in_dict(tables)
|
|
|
|
# summary
|
|
#print(tables)
|
|
output_table = dict()
|
|
output_table["partnum"] = partnum
|
|
id = str(uuid.uuid4())
|
|
output_table["id"] = id
|
|
#output_table["position"] = id
|
|
if "brand" in extra:
|
|
output_table["brand"] = extra["brand"]
|
|
else:
|
|
output_table["brand"] = dstype
|
|
output_table["datasheet"] = weburl + "datasheet.pdf"
|
|
output_table["qrcode"] = qrpath
|
|
if img is not None:
|
|
output_table["image"] = img
|
|
output_table["fullspecs"] = {"partnum": partnum, "id": id, "brand": output_table["brand"], "image": img, "datasheet": weburl + "datasheet.pdf", "qrcode": qrpath, **tables}
|
|
output_table["searchspecs"] = {"partnum": partnum, "brand": output_table["brand"], "image": img, "datasheet": weburl + "datasheet.pdf", "qrcode": qrpath, **flatten(tables)}
|
|
else:
|
|
output_table["fullspecs"] = {"partnum": partnum, "id": id, "brand": output_table["brand"], "datasheet": weburl + "datasheet.pdf", "qrcode": qrpath, **tables}
|
|
output_table["searchspecs"] = {"partnum": partnum, "brand": output_table["brand"], "datasheet": weburl + "datasheet.pdf", "qrcode": qrpath, **flatten(tables)}
|
|
|
|
if "short_description" in extra:
|
|
output_table["short_description"] = extra["short_description"]
|
|
output_table["fullspecs"]["short_description"] = extra["short_description"]
|
|
output_table["searchspecs"]["short_description"] = extra["short_description"]
|
|
if "description" in extra:
|
|
output_table["description"] = extra["description"]
|
|
output_table["fullspecs"]["description"] = extra["description"]
|
|
output_table["searchspecs"]["description"] = extra["description"]
|
|
if "application" in extra:
|
|
output_table["application"] = extra["application"]
|
|
output_table["fullspecs"]["application"] = extra["application"]
|
|
output_table["searchspecs"]["application"] = extra["application"]
|
|
if "category" in extra:
|
|
output_table["category"] = extra["category"]
|
|
output_table["fullspecs"]["category"] = extra["category"]
|
|
output_table["searchspecs"]["category"] = extra["category"]
|
|
|
|
output_table["searchspecs"]["id"] = id
|
|
|
|
|
|
|
|
#print(output_table)
|
|
|
|
#run_cmd("rm \"" + output_dir + "\"/*.json") # not reliable!
|
|
# pattern = os.path.join(output_dir, '*.json')
|
|
# json_files = glob.glob(pattern)
|
|
# for file_path in json_files:
|
|
# os.remove(file_path)
|
|
#print(f"Deleted {file_path}")
|
|
with open(output_dir + "/search.json", 'w') as json_file:
|
|
json.dump(output_table["searchspecs"], json_file)
|
|
with open(output_dir + "/specs.json", 'w') as json_file:
|
|
json.dump(output_table["fullspecs"], json_file)
|
|
|
|
fprint("Datasheet values parsed and saved for " + partnum)
|
|
#print(json.dumps(output_table, indent=2))
|
|
touch(output_dir + "/parsed") # mark as parsed
|
|
return True
|
|
|
|
|
|
def flatten(tables):
|
|
def convert_to_number(s):
|
|
try:
|
|
# First, try converting to an integer.
|
|
return int(s)
|
|
except ValueError:
|
|
# If that fails, try converting to a float.
|
|
try:
|
|
return float(s)
|
|
except ValueError:
|
|
# If it fails again, return the original string.
|
|
return s
|
|
out = dict()
|
|
#print("{")
|
|
for table in tables.keys():
|
|
for key in tables[table].keys():
|
|
if len(key) < 64:
|
|
keyname = key
|
|
else:
|
|
keyname = key[0:64]
|
|
|
|
fullkeyname = (table + ": " + keyname).replace(".","")
|
|
if type(tables[table][key]) is not tuple:
|
|
if len(tables[table][key]) > 0:
|
|
out[fullkeyname] = convert_to_number(tables[table][key])
|
|
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
|
|
elif len(tables[table][key]) == 1:
|
|
if len(tables[table][key][0]) > 0:
|
|
out[fullkeyname] = convert_to_number(tables[table][key][0])
|
|
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
|
|
else:
|
|
tmp = []
|
|
for x in range(len(tables[table][key])):
|
|
if len(tables[table][key][x]) > 0:
|
|
tmp.append(tables[table][key][x].strip())
|
|
#out[fullkeyname + " " + str(x+1)] = convert_to_number(tables[table][key][x])
|
|
out[fullkeyname] = tmp
|
|
# if the item has at least two commas in it, split it
|
|
if tables[table][key].count(',') > 0:
|
|
out[fullkeyname] = list(map(lambda x: x.strip(), tables[table][key].split(",")))
|
|
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
|
|
|
|
# if the item has at least two commas in it, split it
|
|
if tables[table][key].count(',') > 0:
|
|
out[fullkeyname] = list(map(lambda x: x.strip(), tables[table][key].split(",")))
|
|
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
|
|
|
|
|
|
#print("}")
|
|
return out
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print(parse("cables/3050/datasheet-new.pdf", "cables/3050", "3050", "Alphawire")) |