Merge branch 'dthomas_meilisearch'

This commit is contained in:
Camryn Thomas 2024-03-12 16:15:13 -05:00
commit 53638f72e1
Signed by: cptlobster
GPG Key ID: 6D341D688163A176
6 changed files with 148 additions and 145 deletions

14
.gitignore vendored
View File

@ -1,10 +1,18 @@
# python
venv
__pycache__
# cable data folder(s)
cables
cables-sample.zip
# meilisearch (mainly where I've put the data volume for the container)
meili_data
# IDE things
.vscode
output.log
.idea
# videos
*.webm
output.mp4
# log files
output.log
cables-sample.zip
*.png
# images
*.png

13
compose.yml Normal file
View File

@ -0,0 +1,13 @@
services:
meilisearch:
image: "getmeili/meilisearch:v1.6.2"
ports:
- "7700:7700"
environment:
MEILI_MASTER_KEY: fluffybunnyrabbit
MEILI_NO_ANALYTICS: true
volumes:
- "meili_data:/meili_data"
volumes:
meili_data:

View File

@ -1,140 +0,0 @@
"""This module contains functionality for interacting with a PostgreSQL database. It will automatically handle error
conditions (i.e. missing columns) without terminating the entire program. Use the :py:class:`DBConnector` class to
handle database interactions, either as a standalone object or in a context manager."""
from __future__ import annotations
import os
import psycopg2
from psycopg2 import DatabaseError, OperationalError
from psycopg2.errors import UndefinedColumn
DB_ADDRESS = os.getenv('DB_ADDRESS', 'localhost')
DB_PORT = os.getenv('DB_PORT', 5432)
DB_USER = os.getenv('DB_USER', 'postgres')
DB_PASSWORD = os.getenv('DB_PASSWORD', '')
DB_NAME = os.getenv('DB_NAME', 'postgres')
DB_TABLE = os.getenv('DB_TABLE', 'cables')
class DBConnector:
"""Context managed database class. Use with statements to automatically open and close the database connection, like
so:
.. code-block:: python
with DBConnector() as db:
db.read()
"""
def _db_start(self):
"""Setup the database connection and cursor."""
try:
self.conn = psycopg2.connect(
f"host={DB_ADDRESS} port={DB_PORT} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD}")
self.cur = self.conn.cursor()
except OperationalError as e:
raise e
def _db_stop(self):
"""Close the cursor and connection."""
self.cur.close()
self.conn.close()
def __init__(self):
self._db_start()
def __del__(self):
self._db_stop()
def __enter__(self):
self._db_start()
def __exit__(self):
self._db_stop()
def _get_cols(self) -> set[str]:
"""Get the list of columns in the database.
:return: A list of column names."""
query = f"select COLUMN_NAME from information_schema.columns where table_name={DB_TABLE}"
rows = {x["COLUMN_NAME"] for x in self._query(query)}
return rows
def _column_parity(self, columns: list[str] | set[str]) -> set[str]:
"""If the listed columns are not in the database, add them.
:param columns: The columns we expect are in the database.
:return: The list of columns in the database after querying."""
cols = set(columns)
existing = self._get_cols()
needs = cols.difference(existing.intersection(cols))
if len(needs) > 0:
query = f"ALTER TABLE {DB_TABLE} {', '.join([f'ADD COLUMN {c}' for c in needs])}"
self._query(query)
existing = self._get_cols()
return existing
def _query(self, sql) -> list[dict]:
"""Basic function for running queries.
:param sql: SQL query as plaintext.
:return: Results of the query, or an empty list if none."""
result = []
try:
self.cur.execute(sql)
result = self._read_dict()
except DatabaseError as e:
print(f"ERROR {e.pgcode}: {e.pgerror}\n"
f"Caused by query: {sql}")
finally:
return result
def _read_dict(self) -> list[dict]:
"""Read the cursor as a list of dictionaries. psycopg2 defaults to using a list of tuples, so we want to convert
each row into a dictionary before we return it."""
cols = [i.name for i in self.cur.description]
results = []
for row in self.cur:
row_dict = {}
for i in range(0, len(row)):
if row[i]:
row_dict = {**row_dict, cols[i]: row[i]}
results.append(row_dict)
return results
def read(self, **kwargs) -> list[dict]:
"""Read rows from a database that match the specified filters.
:param kwargs: Column constraints; i.e. what value to filter by in what column.
:returns: A list of dictionaries of all matching rows, or an empty list if no match."""
args = []
for kw in kwargs.keys():
args.append(f"{kw} ILIKE {kwargs['kw']}")
query = f"SELECT * FROM {DB_TABLE}"
if len(args) > 0:
query += f" WHERE {' AND '.join(args)}"
return self._query(query)
def write(self, **kwargs) -> dict:
"""Write a row to the database.
:param kwargs: Values to write for each database; specify each column separately!
:returns: The row you just added."""
self._column_parity(set(kwargs.keys()))
values = []
for val in kwargs.keys():
values.append(kwargs[val])
query = f"INSERT INTO {DB_TABLE} ({', '.join(kwargs.keys())}) VALUES ({', '.join(values)})"
self._query(query)
return kwargs
def write_all(self, items: list[dict]) -> list[dict]:
"""Write multiple rows to the database.
:param items: Rows to write, as a list of dictionaries.
:returns: The rows that were added successfully."""
successes = []
for i in items:
res0 = self.write(**i)
if res0:
successes.append(res0)
return successes

View File

@ -30,7 +30,7 @@ def parse(filename, output_dir, partnum, dstype):
reader = PdfReader(filename)
page = reader.pages[0]
table_list = {}
for table in tables:
table.df.infer_objects(copy=False)
table.df.replace('', np.nan, inplace=True)
@ -253,6 +253,11 @@ def flatten(tables):
out[fullkeyname] = list(map(lambda x: x.strip(), tables[table][key].split(",")))
#print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
# if the item has at least two commas in it, split it
if tables[table][key].count(',') > 0:
out[fullkeyname] = list(map(lambda x: x.strip(), tables[table][key].split(",")))
print("\"" + keyname + "\":", "\"" + str(out[fullkeyname]) + "\",")
#print("}")
return out

View File

@ -5,7 +5,7 @@ pypdf2==2.12.1
alive-progress
requests
git+https://github.com/Byeongdulee/python-urx.git
psycopg2-binary
meilisearch
pyyaml
Flask
selenium

117
search.py Normal file
View File

@ -0,0 +1,117 @@
"""Interactions with the Meilisearch API for adding and searching cables."""
from meilisearch import Client
from meilisearch.task import TaskInfo
from meilisearch.errors import MeilisearchApiError
import json
DEFAULT_URL = "http://localhost:7700"
DEFAULT_APIKEY = "fluffybunnyrabbit" # I WOULD RECOMMEND SOMETHING MORE SECURE
DEFAULT_INDEX = "cables"
DEFAULT_FILTERABLE_ATTRS = ["partnum", "uuid", "position"] # default filterable attributes
class JukeboxSearch:
"""Class for interacting with the Meilisearch API."""
def __init__(self,
url: str = None,
api_key: str = None,
index: str = None,
filterable_attrs: list = None):
"""Connect to Meilisearch and perform first-run tasks as necessary.
:param url: Address of the Meilisearch server. Defaults to ``http://localhost:7700`` if unspecified.
:param api_key: API key used to authenticate with Meilisearch. It is highly recommended to set this as something
secure if you can access this endpoint publicly, but you can ignore this and set Meilisearch's default API key
to ``fluffybunnyrabbit``.
:param index: The name of the index to configure. Defaults to ``cables`` if unspecified.
:param filterable_attrs: List of all the attributes we want to filter by."""
# connect to Meilisearch
url = url or DEFAULT_URL
api_key = api_key or DEFAULT_APIKEY
filterable_attrs = filterable_attrs or DEFAULT_FILTERABLE_ATTRS
self.index = index or DEFAULT_INDEX
self.client = Client(url, api_key)
# create the index if it does not exist already
try:
self.client.get_index(self.index)
except MeilisearchApiError as _:
self.client.create_index(self.index)
# make a variable to easily reference the index
self.idxref = self.client.index(self.index)
# update filterable attributes if needed
self.update_filterables(filterable_attrs)
def add_document(self, document: dict) -> TaskInfo:
"""Add a cable to the Meilisearch index.
:param document: Dictionary containing all the cable data.
:returns: A TaskInfo object for the addition of the new document."""
return self.idxref.add_documents(document)
def add_documents(self, documents: list):
"""Add a list of cables to the Meilisearch index.
:param documents: List of dictionaries containing all the cable data.
:returns: A TaskInfo object for the last new document."""
taskinfo = None
for i in documents:
taskinfo = self.add_document(i)
return taskinfo
def update_filterables(self, filterables: list):
"""Update filterable attributes and wait for database to fully index. If the filterable attributes matches the
current attributes in the database, don't update (saves reindexing).
:param filterables: List of all filterable attributes"""
existing_filterables = self.idxref.get_filterable_attributes()
if len(set(existing_filterables).difference(set(filterables))) > 0:
taskref = self.idxref.update_filterable_attributes(filterables)
self.client.wait_for_task(taskref.index_uid)
def search(self, query: str, filters: str = None):
"""Execute a search query on the Meilisearch index.
:param query: Seach query
:param filters: A meilisearch compatible filter statement.
:returns: The search results dict. Actual results are in a list under "hits", but there are other nice values that are useful in the root element."""
if filters:
q = self.idxref.search(query, {"filter": filters})
else:
q = self.idxref.search(query)
return q
def _filter_one(self, filter: str):
"""Get the first item to match a filter.
:param filter: A meilisearch compatible filter statement.
:returns: A dict containing the results; If no results found, an empty dict."""
q = self.search("", filter)
if q["estimatedTotalHits"] != 0:
return ["hits"][0]
else:
return dict()
def get_position(self, position: str):
"""Get a part by position.
:param partnum: The position to search for."""
return self._filter_one(f"position = {position}")
def get_uuid(self, uuid: str):
"""Get a specific UUID.
:param uuid: The UUID to search for."""
return self._filter_one(f"uuid = {uuid}")
def get_partnum(self, partnum: str):
"""Get a specific part number.
:param partnum: The part number to search for."""
return self._filter_one(f"partnum = {partnum}")
# entrypoint
if __name__ == "__main__":
jbs = JukeboxSearch()