74 lines
2.3 KiB
Python
74 lines
2.3 KiB
Python
"""This module contains functionality for interacting with a PostgreSQL database. It will automatically handle error
|
|
conditions (i.e. missing columns) without terminating the entire program. Use the :py:class:`DBConnector` class to
|
|
handle database interactions, either as a standalone object or in a context manager."""
|
|
import os
|
|
import psycopg2
|
|
from psycopg2 import DatabaseError
|
|
|
|
DB_ADDRESS = os.getenv('DB_ADDRESS', 'localhost')
|
|
DB_PORT = os.getenv('DB_PORT', 5432)
|
|
DB_USER = os.getenv('DB_USER', 'postgres')
|
|
DB_PASSWORD = os.getenv('DB_PASSWORD', '')
|
|
DB_NAME = os.getenv('DB_NAME', 'postgres')
|
|
DB_TABLE = os.getenv('DB_TABLE', 'cables')
|
|
|
|
|
|
class DBConnector:
|
|
"""Context managed database class. Use with statements to automatically open and close the database connection, like
|
|
so:
|
|
|
|
.. code-block:: python
|
|
with DBConnector() as db:
|
|
db.read()
|
|
"""
|
|
def _db_start(self):
|
|
try:
|
|
self.conn = psycopg2.connect(
|
|
f"host={DB_ADDRESS} port={DB_PORT} dbname={DB_NAME} user={DB_USER} password={DB_PASSWORD}")
|
|
self.cur = self.conn.cursor()
|
|
except DatabaseError as e:
|
|
|
|
|
|
def _db_stop(self):
|
|
self.cur.close()
|
|
self.conn.close()
|
|
|
|
def __init__(self):
|
|
self._db_start()
|
|
|
|
def __del__(self):
|
|
self._db_stop()
|
|
|
|
def __enter__(self):
|
|
self._db_start()
|
|
|
|
def __exit__(self):
|
|
self._db_stop()
|
|
|
|
def _query(self, sql):
|
|
try:
|
|
self.cur.execute(sql)
|
|
result = self.cur.fetchall()
|
|
except DatabaseError as e:
|
|
print(f"DB ERROR [{e.pgcode}]: {e.pgerror}")
|
|
result = []
|
|
return result
|
|
|
|
def read(self, **kwargs):
|
|
"""Read rows from a database that match the specified filters.
|
|
|
|
:param kwargs: Column constraints; i.e. what value to filter by in what column.
|
|
:returns: A list of dictionaries of all matching rows, or an empty list if no match."""
|
|
args = []
|
|
for kw in kwargs.keys():
|
|
args.append(f"{kw} ILIKE {kwargs['kw']}")
|
|
query = f"SELECT * FROM {DB_TABLE}"
|
|
if len(args) > 0:
|
|
query += f" WHERE {' AND '.join(args)}"
|
|
return self._query(query)
|
|
|
|
def write(self, **kwargs):
|
|
"""Write a row to the database.
|
|
|
|
:param kwargs: Values to write for each database; specify each column separately!"""
|
|
pass |