383 lines
15 KiB
Python
Executable File
383 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Parse Belden (100%) & Alphawire (75%) catalog techdata datasheets
|
|
|
|
import pandas as pd
|
|
from PyPDF2 import PdfReader
|
|
import camelot
|
|
import numpy as np
|
|
from PIL import Image
|
|
import io
|
|
import json
|
|
from util import fprint
|
|
import uuid
|
|
from util import run_cmd
|
|
from util import win32
|
|
import os
|
|
import glob
|
|
import sys
|
|
|
|
def touch(path):
|
|
with open(path, 'a'):
|
|
os.utime(path, None)
|
|
|
|
def find_data_file(filename):
|
|
if getattr(sys, "frozen", False):
|
|
# The application is frozen
|
|
datadir = os.path.dirname(sys.executable)
|
|
else:
|
|
# The application is not frozen
|
|
# Change this bit to match where you store your data files:
|
|
datadir = os.path.dirname(__file__)
|
|
return os.path.join(datadir, filename)
|
|
|
|
def extract_table_name(table_start, searchpage, reader, dstype, fallbackname):
|
|
if dstype == "Belden":
|
|
ymin = table_start
|
|
ymax = table_start + 10
|
|
elif dstype == "Alphawire":
|
|
ymin = table_start - 5
|
|
ymax = table_start + 10
|
|
page = reader.pages[searchpage - 1]
|
|
parts = []
|
|
def visitor_body(text, cm, tm, fontDict, fontSize):
|
|
y = tm[5]
|
|
if y > ymin and y < ymax:
|
|
parts.append(text)
|
|
page.extract_text(visitor_text=visitor_body)
|
|
text_body = "".join(parts).strip('\n')
|
|
if len(text_body) == 0:
|
|
text_body = str(fallbackname)
|
|
|
|
return text_body
|
|
#fprint(text_body)
|
|
|
|
def parse(filename, output_dir, partnum, dstype):
|
|
tables = []
|
|
# Extract table data
|
|
try:
|
|
if dstype == "Belden":
|
|
tables = camelot.read_pdf(filename, pages="1-end", flavor='lattice', backend="ghostscript", split_text=False, line_scale=100, process_background=True, resolution=600, interations=1, layout_kwargs={'detect_vertical': False, 'char_margin': 0.5}, shift_text=['r', 't'])
|
|
elif dstype == "Alphawire":
|
|
tables = camelot.read_pdf(filename, pages="1-end", flavor='lattice', backend="ghostscript", split_text=False, line_scale=50, process_background=True, resolution=600, interations=1, layout_kwargs={'detect_vertical': True, 'char_margin': 0.5}, shift_text=['l', 't'])
|
|
except (OSError, RuntimeError) as e:
|
|
print(e)
|
|
if win32:
|
|
print("Ghostscript is not installed! Launching installer...")
|
|
#subprocess.run([r".\\gs10030w64.exe"])
|
|
os.system(r'''Powershell -Command "& { Start-Process \"''' + find_data_file("gs10030w64.exe") + r'''\" -Verb RunAs } " ''')
|
|
# Will return once file launched...
|
|
print("Once the install is completed, try again.")
|
|
return False
|
|
else:
|
|
print("Ghostscript is not installed. You can install it with e.g. apt install ghostscript for Debian-based systems.")
|
|
return False
|
|
#fprint("Total tables extracted:", tables.n)
|
|
n = 0
|
|
#pagenum = 0
|
|
reader = PdfReader(filename)
|
|
page = reader.pages[0]
|
|
table_list = {}
|
|
table_list_raw = {}
|
|
pd.set_option('future.no_silent_downcasting', True)
|
|
for table in tables:
|
|
#with pd.options.context("future.no_silent_downcasting", True):
|
|
table.df.infer_objects(copy=False)
|
|
table.df = table.df.replace('', np.nan).infer_objects(copy=False)
|
|
table.df.dropna(inplace=True, how="all")
|
|
table.df.dropna(inplace=True, axis="columns", how="all")
|
|
table.df = table.df.replace(np.nan, '').infer_objects(copy=False)
|
|
|
|
if not table.df.empty:
|
|
#fprint("\nTable " + str(n))
|
|
# Extract table names
|
|
table_start = table.cells[0][0].lt[1] # Read top-left cell's top-left coordinate
|
|
#fprint(table_start)
|
|
|
|
text_body = extract_table_name(table_start, table.page, reader, dstype, n)
|
|
|
|
table_list[text_body] = table.df
|
|
if dstype == "Alphawire":
|
|
table_list_raw[text_body] = table
|
|
|
|
#table.to_html("table" + str(n) + ".html")
|
|
|
|
#fprint(table.df)
|
|
#camelot.plot(table, kind='grid').savefig("test" + str(n) + ".png")
|
|
n=n+1
|
|
#camelot.plot(tables[0], kind='grid').savefig("test.png")
|
|
|
|
#tables.export(output_dir + '/techdata.json', f='json')
|
|
|
|
#fprint(table_list)
|
|
# Extract Basic details - part name & description, image, etc
|
|
|
|
reader = PdfReader(filename)
|
|
page = reader.pages[0]
|
|
count = 0
|
|
skip = False
|
|
for image_file_object in page.images:
|
|
if image_file_object.name == "img0.png" and skip == False:
|
|
#fprint(Image.open(io.BytesIO(image_file_object.data)).mode)
|
|
if Image.open(io.BytesIO(image_file_object.data)).mode == "P":
|
|
skip = True
|
|
continue
|
|
with open(output_dir + "/brand.png", "wb") as fp:
|
|
fp.write(image_file_object.data)
|
|
if Image.open(io.BytesIO(image_file_object.data)).size == (430, 430):
|
|
with open(output_dir + "/part.png", "wb") as fp:
|
|
fp.write(image_file_object.data)
|
|
if skip:
|
|
for image_file_object in page.images:
|
|
if image_file_object.name == "img1.png":
|
|
with open(output_dir + "/brand.png", "wb") as fp:
|
|
fp.write(image_file_object.data)
|
|
count += 1
|
|
|
|
# Table parsing and reordring
|
|
tables = dict()
|
|
torename = dict()
|
|
previous_table = ""
|
|
#print(table_list.keys())
|
|
for table_name in table_list.keys():
|
|
# determine shape: horizontal or vertical
|
|
table = table_list[table_name]
|
|
rows = table.shape[0]
|
|
cols = table.shape[1]
|
|
vertical = None
|
|
#print(rows, cols, table_name)
|
|
if rows > 2 and cols == 2:
|
|
vertical = True
|
|
elif cols == 1 and rows > 1:
|
|
vertical = False
|
|
elif rows == 1:
|
|
vertical = True
|
|
elif cols == 2: # and rows <= 2
|
|
# inconsistent
|
|
if dstype == "Belden":
|
|
if table.iloc[0, 0].find(":") == len(table.iloc[0, 0]) - 1: # check if last character is ":" indicating a vertical table
|
|
vertical = True
|
|
else:
|
|
vertical = False
|
|
elif dstype == "Alphawire":
|
|
if table.iloc[0, 0].find(")") == 1 or table.iloc[0, 0].find(")") == 2 or table.iloc[0, 0].find(":") == len(table.iloc[0, 0]) - 1: # check if last character is ":" indicating a vertical table
|
|
vertical = True
|
|
else:
|
|
vertical = False
|
|
|
|
elif cols > 2: # and rows <= 2
|
|
vertical = False
|
|
elif rows > 2 and cols > 2: # big table
|
|
vertical = False
|
|
else: # 1 column, <= 2 rows
|
|
vertical = False
|
|
#print(vertical)
|
|
# missing name check
|
|
for table_name_2 in table_list.keys():
|
|
if dstype == "Alphawire" and table_name_2.find("\n") >= 0:
|
|
torename[table_name_2] = table_name_2[0:table_name_2.find("\n")]
|
|
|
|
if dstype == "Alphawire" and table_name_2.find(table.iloc[-1, 0]) >= 0:
|
|
# Name taken from table directly above - this table does not have a name
|
|
torename[table_name_2] = "Specs " + str(len(tables))
|
|
#table_list["Specs " + str(len(tables))] = table_list[table_name_2] # rename table to arbitrary altername name
|
|
break
|
|
|
|
if vertical:
|
|
out = dict()
|
|
if rows > 1:
|
|
for row in table.itertuples(index=False, name=None):
|
|
out[row[0].replace("\n", " ").replace(":", "")] = row[1]
|
|
else:
|
|
for row in table.itertuples(index=False, name=None):
|
|
out[row[0].replace("\n", " ").replace(":", "")] = ""
|
|
|
|
else: # horizontal
|
|
out = dict()
|
|
for col in table.columns:
|
|
col_data = tuple(table[col])
|
|
out[col_data[0].replace("\n", " ")] = col_data[1:]
|
|
|
|
tables[table_name] = out
|
|
|
|
# multi-page table check, Alphawire
|
|
if dstype == "Alphawire" and table_name.isdigit():
|
|
# table continues from previous page or has name on previous page
|
|
thistbl = table_list_raw[table_name]
|
|
prevtbl = table_list_raw[previous_table]
|
|
|
|
if prevtbl.cells[-1][0].lb[1] < 50 and thistbl.cells[0][0].lt[1] > 600:
|
|
# wraparound
|
|
#print("WRAP")
|
|
#print("PREV TABLE", prevtbl.df)
|
|
#print("THIS TABLE", thistbl.df)
|
|
#print("PREV TABLE CORNER", prevtbl.cells[-1][0].lb[1])
|
|
#print("THIS TABLE CORNER", thistbl.cells[0][0].lt[1])
|
|
main_key = previous_table
|
|
cont_key = table_name
|
|
#print(vertical)
|
|
if vertical == False:
|
|
main_keys = list(tables[main_key].keys())
|
|
for i, (cont_key, cont_values) in enumerate(tables[cont_key].items()):
|
|
if i < len(main_keys):
|
|
#print(tables[main_key][main_keys[i]])
|
|
tables[main_key][main_keys[i]] = (tuple(tables[main_key][main_keys[i]]) + (cont_key,) + cont_values)
|
|
|
|
del tables[table_name]
|
|
|
|
else:
|
|
#print(tables[cont_key].keys())
|
|
for key in tables[cont_key].keys():
|
|
#print(main_key, key, cont_key, key)
|
|
tables[main_key][key] = tables[cont_key][key]
|
|
del tables[table_name]
|
|
|
|
elif thistbl.cells[0][0].lt[1] > 600:
|
|
# name on previous page (grrrr)
|
|
#print("NAMEABOVE")
|
|
#print("PREV TABLE", prevtbl.df)
|
|
#print("THIS TABLE", thistbl.df)
|
|
#print("PREV TABLE CORNER", prevtbl.cells[-1][0].lb[1])
|
|
#print("THIS TABLE CORNER", thistbl.cells[0][0].lt[1])
|
|
name = extract_table_name(50, prevtbl.page,reader,dstype,table_name).strip("\n").strip()
|
|
#print("FOUND NAME:", name)
|
|
torename[table_name] = name
|
|
|
|
|
|
|
|
# multi-page table check, Belden
|
|
if dstype == "Belden":
|
|
if table_name.isdigit() and len(tables) > 1:
|
|
#fprint(table_name)
|
|
#fprint(previous_table)
|
|
|
|
main_key = previous_table
|
|
cont_key = table_name
|
|
#fprint(tables)
|
|
if vertical == False:
|
|
main_keys = list(tables[main_key].keys())
|
|
for i, (cont_key, cont_values) in enumerate(tables[cont_key].items()):
|
|
if i < len(main_keys):
|
|
#fprint(tables[main_key][main_keys[i]])
|
|
tables[main_key][main_keys[i]] = (tuple(tables[main_key][main_keys[i]]) + (cont_key,) + cont_values)
|
|
|
|
del tables[table_name]
|
|
|
|
else:
|
|
#print(tables)
|
|
#print(main_key)
|
|
#print(cont_key)
|
|
for key in tables[cont_key].keys():
|
|
tables[main_key][key] = tables[cont_key][key]
|
|
del tables[table_name]
|
|
else:
|
|
previous_table = table_name
|
|
else:
|
|
previous_table = table_name
|
|
|
|
# remove & rename tables
|
|
#print(torename)
|
|
for table_name in torename.keys():
|
|
tables[torename[str(table_name)]] = tables[str(table_name)]
|
|
del tables[table_name]
|
|
# remove multi-line values that occasionally squeak through
|
|
def replace_newlines_in_dict(d):
|
|
for key, value in d.items():
|
|
if isinstance(value, str):
|
|
# Replace \n with " " if the value is a string
|
|
d[key] = value.replace('\n', ' ')
|
|
elif isinstance(value, dict):
|
|
# Recursively call the function if the value is another dictionary
|
|
replace_newlines_in_dict(value)
|
|
return d
|
|
|
|
tables = replace_newlines_in_dict(tables)
|
|
|
|
# summary
|
|
|
|
output_table = dict()
|
|
output_table["partnum"] = partnum
|
|
id = str(uuid.uuid4())
|
|
output_table["id"] = id
|
|
#output_table["position"] = id
|
|
#output_table["brand"] = brand
|
|
output_table["fullspecs"] = tables
|
|
output_table["searchspecs"] = {"partnum": partnum, **flatten(tables)}
|
|
|
|
output_table["searchspecs"]["id"] = id
|
|
|
|
|
|
|
|
#print(output_table)
|
|
|
|
#run_cmd("rm \"" + output_dir + "\"/*.json") # not reliable!
|
|
pattern = os.path.join(output_dir, '*.json')
|
|
json_files = glob.glob(pattern)
|
|
for file_path in json_files:
|
|
os.remove(file_path)
|
|
#print(f"Deleted {file_path}")
|
|
with open(output_dir + "/search.json", 'w') as json_file:
|
|
json.dump(output_table["searchspecs"], json_file)
|
|
with open(output_dir + "/specs.json", 'w') as json_file:
|
|
json.dump(output_table["fullspecs"], json_file)
|
|
|
|
#print(json.dumps(output_table, indent=2))
|
|
touch(output_dir + "/parsed") # mark as parsed
|
|
return True
|
|
|
|
|
|
def flatten(tables):
|
|
def convert_to_number(s):
|
|
try:
|
|
# First, try converting to an integer.
|
|
return int(s)
|
|
except ValueError:
|
|
# If that fails, try converting to a float.
|
|
try:
|
|
return float(s)
|
|
except ValueError:
|
|
# If it fails again, return the original string.
|
|
return s
|
|
out = dict()
|
|
#print("{")
|
|
for table in tables.keys():
|
|
for key in tables[table].keys():
|
|
if len(key) < 64:
|
|
keyname = key
|
|
else:
|
|
keyname = key[0:64]
|
|
|
|
fullkeyname = (table + ": " + keyname).replace(".","")
|
|
if type(tables[table][key]) is not tuple:
|
|
if len(tables[table][key]) > 0:
|
|
out[fullkeyname] = convert_to_number(tables[table][key])
|
|
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
|
|
elif len(tables[table][key]) == 1:
|
|
if len(tables[table][key][0]) > 0:
|
|
out[fullkeyname] = convert_to_number(tables[table][key][0])
|
|
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
|
|
else:
|
|
tmp = []
|
|
for x in range(len(tables[table][key])):
|
|
if len(tables[table][key][x]) > 0:
|
|
tmp.append(tables[table][key][x].strip())
|
|
#out[fullkeyname + " " + str(x+1)] = convert_to_number(tables[table][key][x])
|
|
out[fullkeyname] = tmp
|
|
# if the item has at least two commas in it, split it
|
|
if tables[table][key].count(',') > 0:
|
|
out[fullkeyname] = list(map(lambda x: x.strip(), tables[table][key].split(",")))
|
|
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
|
|
|
|
# if the item has at least two commas in it, split it
|
|
if tables[table][key].count(',') > 0:
|
|
out[fullkeyname] = list(map(lambda x: x.strip(), tables[table][key].split(",")))
|
|
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
|
|
|
|
|
|
#print("}")
|
|
return out
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print(parse("cables/3050/datasheet.pdf", "cables/3050", "3050", "Alphawire")) |