Compare commits
	
		
			4 Commits
		
	
	
		
			275cbd027e
			...
			fb31ab0d73
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| fb31ab0d73 | |||
| 302d275c64 | |||
| 43392fa3ca | |||
| 660fe29236 | 
| @@ -40,13 +40,16 @@ class DriveImg(): | ||||
|             self.onLine = False  | ||||
|             fprint("Offline") | ||||
|  | ||||
|     def close(self): | ||||
|         self.trans.close() | ||||
|  | ||||
|     def read_img(self): | ||||
|         resposta = 'Falha' | ||||
|         try: | ||||
|             if not self.onLine: | ||||
|                 #print(f'tentando Conectar camera {self.ip}...') | ||||
|                 gravaLog(ip=self.ip,msg=f'Trying to connect...') | ||||
|                 sleep(2) | ||||
|                 #sleep(2) | ||||
|                 try: | ||||
|                     self.trans = socket.socket(socket.AF_INET,socket.SOCK_STREAM) | ||||
|                     self.trans.connect((self.ip,self.PORT)) | ||||
| @@ -63,13 +66,13 @@ class DriveImg(): | ||||
|                 if valida.find("TC IMAGE")<0: | ||||
|                     self.onLine = False | ||||
|                     self.trans.close() | ||||
|                     sleep(2) | ||||
|                     #sleep(2) | ||||
|                     gravaLog(ip=self.ip,tipo="Falha",msg=f'Unable to find TC IMAGE bookmark') | ||||
|                     return "Error" | ||||
|             except Exception as ex: | ||||
|                 self.onLine = False | ||||
|                 self.trans.close() | ||||
|                 sleep(2) | ||||
|                 #sleep(2) | ||||
|                 gravaLog(ip=self.ip,tipo="Falha",msg=f'Error - {str(ex)}') | ||||
|                 return "Error" | ||||
|             if ret: | ||||
| @@ -113,7 +116,7 @@ class DriveImg(): | ||||
|             #print(f'erro {str(ex)}') | ||||
|             self.onLine = False | ||||
|             self.trans.close() | ||||
|             sleep(2) | ||||
|             #sleep(2) | ||||
|             return resposta | ||||
|  | ||||
| class DriveData(): | ||||
| @@ -139,7 +142,7 @@ class DriveData(): | ||||
|             if not self.onLine: | ||||
|                 #print(f'tentando Conectar...\n') | ||||
|                 gravaLog(ip=self.ip,msg=f'tentando Conectar...',file="log_data.txt") | ||||
|                 sleep(2) | ||||
|                 #sleep(2) | ||||
|                 try: | ||||
|                     self.trans = socket.socket(socket.AF_INET,socket.SOCK_STREAM) | ||||
|                     self.trans.connect((self.ip,self.PORT)) | ||||
| @@ -154,7 +157,7 @@ class DriveData(): | ||||
|         except Exception as ex: | ||||
|             self.onLine = False  | ||||
|             gravaLog(ip=self.ip,tipo="Falha Generica",msg=f'erro {str(ex)}',file="log_data.txt") | ||||
|             sleep(2) | ||||
|             #sleep(2) | ||||
|             return resposta | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -3,6 +3,7 @@ core: | ||||
|   serverip: 172.26.178.114 | ||||
|   clientip: 172.26.176.1 | ||||
|   server: Hyper-Vd | ||||
|   loopspeed: 60 # fps | ||||
|  | ||||
| arm: | ||||
|   ip: 192.168.1.145 | ||||
|   | ||||
| @@ -221,13 +221,14 @@ class LEDSystem(): | ||||
|                 self.data.append((0,0,0)) | ||||
|         self.sendall(self.data) | ||||
|         #time.sleep(50000)     | ||||
|         fprint("Running start-up test sequence...") | ||||
|         for y in range(1): | ||||
|             for x in range(len(self.leds)): | ||||
|                 self.setpixel(0,60,144,x) | ||||
|             self.sendall(self.data) | ||||
|             #time.sleep(2) | ||||
|         self.alloffsmooth() | ||||
|         # fprint("Running start-up test sequence...") | ||||
|         # for y in range(1): | ||||
|         #     for x in range(len(self.leds)): | ||||
|         #         self.setpixel(0,60,144,x) | ||||
|         #     self.sendall(self.data) | ||||
|         #     #time.sleep(2) | ||||
|         # self.alloffsmooth() | ||||
|         self.startup_animation(show=False) | ||||
|  | ||||
|     def sendall(self, datain): | ||||
|         # send all LED data to all controllers | ||||
|   | ||||
| @@ -8,13 +8,18 @@ from util import fprint | ||||
| class qr_reader(): | ||||
|     camera = None | ||||
|     def __init__(self, ip, port): | ||||
|         self.camera = banner_ivu_export.DriveImg(ip, port) | ||||
|         self.ip = ip | ||||
|         self.port = port | ||||
|         #self.camera = banner_ivu_export.DriveImg(ip, port) | ||||
|  | ||||
|     def read_qr(self, tries=1): | ||||
|         print("Trying " + str(tries) + " frames.") | ||||
|         self.camera = banner_ivu_export.DriveImg(self.ip, self.port) | ||||
|         for x in range(tries): | ||||
|             try: | ||||
|             print(str(x) + " ", end="", flush=True) | ||||
|             imgtype, img = self.camera.read_img() | ||||
|                  | ||||
|             if True:     | ||||
|                 #fprint(imgtype) | ||||
|                 image_array = np.frombuffer(img, np.uint8) | ||||
|                 img = cv2.imdecode(image_array, cv2.IMREAD_COLOR) | ||||
| @@ -22,9 +27,13 @@ class qr_reader(): | ||||
|                 #cv2.waitKey(1) | ||||
|                 detect = cv2.QRCodeDetector() | ||||
|                 value, points, straight_qrcode = detect.detectAndDecode(img) | ||||
|                  | ||||
|                 if value != "": | ||||
|                     self.camera.close() | ||||
|                     return value | ||||
|             except: | ||||
|                 continue | ||||
|                      | ||||
|  | ||||
|         self.camera.close() | ||||
|         return False | ||||
|          | ||||
|  | ||||
| @@ -41,5 +50,7 @@ class video_streamer(): | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     test = qr_reader("192.168.1.125", 32200) | ||||
|     import time | ||||
|     while True: | ||||
|         fprint(test.read_qr(5)) | ||||
|         fprint(test.read_qr(300)) | ||||
|         time.sleep(1) | ||||
| @@ -4,6 +4,7 @@ opencv-python | ||||
| pypdf2==2.12.1 | ||||
| alive-progress | ||||
| requests | ||||
| math3d==4.0.0 | ||||
| git+https://github.com/Byeongdulee/python-urx.git | ||||
| meilisearch | ||||
| pyyaml | ||||
|   | ||||
							
								
								
									
										234
									
								
								run.py
									
									
									
									
									
								
							
							
						
						
									
										234
									
								
								run.py
									
									
									
									
									
								
							| @@ -27,8 +27,11 @@ import process_video | ||||
| import search | ||||
| from search import JukeboxSearch | ||||
| #multiprocessing.set_start_method('spawn', True) | ||||
| from pyModbusTCP.client import ModbusClient | ||||
| from uptime import uptime | ||||
|  | ||||
|  | ||||
| mbconn = None | ||||
| config = None | ||||
| keeprunning = True | ||||
| arm_ready = False | ||||
| @@ -45,6 +48,7 @@ ledsys = None | ||||
| arm = None | ||||
| to_server_queue = Queue() | ||||
| from_server_queue = Queue() | ||||
| mainloop_get = Queue() | ||||
| mode = "Startup" | ||||
| counter = 0 | ||||
| jbs = None | ||||
| @@ -52,11 +56,17 @@ scan_value = None | ||||
| arm_state = None | ||||
| cable_list = list() | ||||
| parse_res = None | ||||
| cable_list_state = list() | ||||
| just_placed = -1 | ||||
| ring_animation = None | ||||
| led_set_mode = None | ||||
|  | ||||
| def arm_start_callback(res): | ||||
|     fprint("Arm action complete.") | ||||
|     global arm_ready | ||||
|     arm_ready = True | ||||
|      | ||||
|  | ||||
| def led_start_callback(res): | ||||
|     global led_ready | ||||
|     led_ready = True | ||||
| @@ -229,6 +239,19 @@ def start_server_socket(cable_list): | ||||
|                             pass | ||||
|                         elif call == "request": | ||||
|                             pass | ||||
|  | ||||
|                     case "cable_get": | ||||
|                         fprint("cable_get message") | ||||
|                         if call == "send": | ||||
|                             global mainloop_get | ||||
|                             if "part_number" in data: | ||||
|                                 for cableidx in range(len(cable_list)): | ||||
|                                     cable = cable_list[cableidx] | ||||
|                                     if cable == data["part_number"]: | ||||
|                                         mainloop_get.put(("pickup", cableidx)) | ||||
|                             elif "position" in data: | ||||
|                                 mainloop_get.put(("pickup", data["position"])) | ||||
|                      | ||||
|                     case _: | ||||
|                         fprint("Unknown/unimplemented data type: " + decoded["type"]) | ||||
|             except Exception as e: | ||||
| @@ -285,11 +308,12 @@ def setup_server(pool): | ||||
|     global camera | ||||
|     global arm | ||||
|     global jbs | ||||
|  | ||||
|     arm = Rob(config) | ||||
|     pool.apply_async(arm.init_arm, callback=arm_start_callback) | ||||
|     pool.apply_async(ur5_control.powerup_arm, (arm,), callback=arm_start_callback, error_callback=handle_error) | ||||
|     global ledsys | ||||
|     ledsys = LEDSystem() | ||||
|     pool.apply_async(ledsys.init, callback=led_start_callback) | ||||
|     #pool.apply_async(ledsys.init, callback=led_start_callback) | ||||
|     #pool.apply_async(sensor_control.init, callback=sensor_start_callback) | ||||
|     jbs = JukeboxSearch() | ||||
|      | ||||
| @@ -297,38 +321,85 @@ def setup_server(pool): | ||||
|      | ||||
|     if led_ready is False: | ||||
|         fprint("waiting for " + "LED controller initialization" + " to complete...", sendqueue=to_server_queue) | ||||
|         while led_ready is False: | ||||
|             sleep(0.1) | ||||
|         ledsys.init() | ||||
|         led_ready = True | ||||
|     fprint("LED controllers initialized.", sendqueue=to_server_queue) | ||||
|     #to_server_queue.put("[log] LED controllers initialized.") | ||||
|     sensor_ready = True | ||||
|  | ||||
|     if sensor_ready is False: | ||||
|         fprint("waiting for " + "Sensor Initialization" + " to complete...", sendqueue=to_server_queue) | ||||
|         while sensor_ready is False: | ||||
|             sleep(0.1) | ||||
|         global mbconn | ||||
|         mbconn = ModbusClient(host="localhost", port=502, unit_id=1, auto_open=True, auto_close=True) | ||||
|     fprint("Sensors initialized.", sendqueue=to_server_queue) | ||||
|  | ||||
|     if camera_ready is False: | ||||
|         fprint("waiting for " + "Camera initilization" + " to complete...", sendqueue=to_server_queue) | ||||
|         camera = process_video.qr_reader(config["cameras"]["banner"]["ip"], config["cameras"]["banner"]["port"]) | ||||
|         camera = process_video.qr_reader(config["cameras"]["banner"]["ip"], int(config["cameras"]["banner"]["port"])) | ||||
|  | ||||
|     fprint("Camera initialized.", sendqueue=to_server_queue) | ||||
|  | ||||
|     arm_ready = True | ||||
|     #arm_ready = True | ||||
|     if arm_ready is False: | ||||
|         fprint("waiting for " + "UR5 initilization" + " to complete...", sendqueue=to_server_queue) | ||||
|         fprint("waiting for " + "UR5 powerup" + " to complete...", sendqueue=to_server_queue) | ||||
|         while arm_ready is False: | ||||
|             sleep(0.1) | ||||
|  | ||||
|     ur5_control.init_arm(arm) | ||||
|     fprint("Arm initialized.", sendqueue=to_server_queue) | ||||
|  | ||||
|  | ||||
|      | ||||
|     return True | ||||
|  | ||||
| def handle_error(error): | ||||
| 	print(error, flush=True) | ||||
|  | ||||
| def get_sensors(): | ||||
|     global mbconn | ||||
|     global sensors | ||||
|     oldsens = sensors | ||||
|     """ | ||||
|     port 1: 256 | ||||
|     port 2: 272 | ||||
|     port 3: 288 | ||||
|     port 4: 304 | ||||
|     port 5: 320 | ||||
|     port 6: 336 | ||||
|     port 7: 352 | ||||
|     port 8: 368 | ||||
|      | ||||
|     """ | ||||
|     out = list() | ||||
|     for reg in [352, 288, 304, 368]: | ||||
|         val = mbconn.read_holding_registers(reg) | ||||
|         if val == 1: | ||||
|             out.append(1) | ||||
|         else: | ||||
|             out.append(0) | ||||
|  | ||||
|     sensors = out | ||||
|  | ||||
|     for x in range(len(oldsens)): | ||||
|         if oldsens[x] == 0 and out[x] == 1: | ||||
|             # cable newly detected on tray | ||||
|             return x | ||||
|          | ||||
|     return -1 | ||||
| def get_open_spot(sensordata): | ||||
|     for x in range(len(sensordata)): | ||||
|         sens = sensordata[x] | ||||
|         if not sens: | ||||
|             return x | ||||
|          | ||||
|     # if we get here, every spot is full | ||||
|          | ||||
|     return False | ||||
|      | ||||
|  | ||||
| def mainloop_server(pool): | ||||
|     # NON-blocking loop | ||||
|     global ring_animation | ||||
|     global led_set_mode | ||||
|     global just_placed | ||||
|     global config | ||||
|     global counter | ||||
|     global killme | ||||
| @@ -342,26 +413,30 @@ def mainloop_server(pool): | ||||
|     global camera_ready | ||||
|     global cable_search_ready | ||||
|     global cable_list | ||||
|     global mainloop_get | ||||
|     global cable_list_state | ||||
|  | ||||
|     if killme.value > 0: | ||||
|         killall() | ||||
|  | ||||
|     if mode == "Startup": | ||||
|         counter = 54 | ||||
|         #counter = 54 # remove for demo | ||||
|         if counter < 54: | ||||
|             # scanning cables | ||||
|              | ||||
|             ring_animation = counter | ||||
|             led_set_mode = "GrabA" | ||||
|             if arm_state is None: | ||||
|                 #pool.apply_async(arm.get cable to camera, callback=arm_start_callback) | ||||
|                 #ur5_control.goto_holder_index(arm) | ||||
|                 #ur5 get item | ||||
|                 # ur5 bring to camera | ||||
|                 #pool.apply_async(arm_start_callback, ("",)) | ||||
|                 arm_ready = False | ||||
|                 pool.apply_async(ur5_control.to_camera, (arm,counter), callback=arm_start_callback, error_callback=handle_error) | ||||
|                 fprint("Getting cable index " + str(counter) + " and scanning...") | ||||
|                 arm_state = "GET" | ||||
|                 #ur5_control.to_camera(arm, counter) | ||||
|                 #arm_ready = True | ||||
|  | ||||
|             elif arm_ready and arm_state == "GET": | ||||
|                 fprint("Looking for QR code...") | ||||
|                 pool.apply_async(camera.read_qr, (30,), callback=camera_start_callback) | ||||
|                 pool.apply_async(camera.read_qr, (10,), callback=camera_start_callback, error_callback=handle_error) | ||||
|                 arm_ready = False | ||||
|  | ||||
|             elif camera_ready: | ||||
| @@ -374,7 +449,9 @@ def mainloop_server(pool): | ||||
|                 else: | ||||
|                     cable_list.append(scan_value) | ||||
|                 fprint(scan_value) | ||||
|                 #pool.apply_async(arm.return cable, callback=arm_start_callback) | ||||
|                 pool.apply_async(ur5_control.return_camera, (arm,counter), callback=arm_start_callback, error_callback=handle_error) | ||||
|                 #ur5_control.return_camera(arm, counter) | ||||
|                 #arm_ready = True | ||||
|                 arm_state = "RETURN" | ||||
|                 camera_ready = False | ||||
|  | ||||
| @@ -386,6 +463,8 @@ def mainloop_server(pool): | ||||
|                 pass | ||||
|         else: | ||||
|             # scanned everything | ||||
|             ring_animation = None | ||||
|             led_set_mode == "idle" | ||||
|             tmp = [ | ||||
|                     # Actual cables in Jukebox | ||||
|                     "BLTF-1LF-006-RS5", | ||||
| @@ -418,8 +497,13 @@ def mainloop_server(pool): | ||||
|                 ] | ||||
|             while len(tmp) < 54: | ||||
|                 tmp.append(False) # must have 54 entries | ||||
|             cable_list = tmp # remove for real demo | ||||
|             pool.apply_async(get_specs.get_multi, (tmp, 0.3), callback=cable_search_callback) | ||||
|  | ||||
|             #cable_list = tmp # comment out for real demo | ||||
|                  | ||||
|             for idx in range(len(cable_list)): | ||||
|                 cable_list_state.append(True) | ||||
|  | ||||
|             pool.apply_async(get_specs.get_multi, (cable_list, 0.3), callback=cable_search_callback, error_callback=handle_error) | ||||
|             mode = "Parsing" | ||||
|             fprint("All cables scanned. Finding & parsing datasheets...") | ||||
|     if mode == "Parsing": | ||||
| @@ -458,7 +542,7 @@ def mainloop_server(pool): | ||||
|                      | ||||
|                     fprint("All cables added to database.") | ||||
|                     mode = "Idle" | ||||
|                     serverproc = Process(target=start_server_socket, args=(cable_list,)) | ||||
|                     serverproc = Process(target=start_server_socket, args=(cable_list,), error_callback=handle_error) | ||||
|                     serverproc.start() | ||||
|                 else: | ||||
|                     # TODO: manual input | ||||
| @@ -468,13 +552,104 @@ def mainloop_server(pool): | ||||
|     if mode == "Idle": | ||||
|         # do nothing | ||||
|         if arm_ready is False: | ||||
|             pool.apply_async(ur5_control.move_to_home, (arm,), callback=arm_start_callback) | ||||
|             arm_ready = True | ||||
|             pool.apply_async(ur5_control.move_to_home, (arm,), callback=arm_start_callback, error_callback=handle_error) | ||||
|             #arm_ready = True | ||||
|  | ||||
|         else: | ||||
|             global mainloop_get | ||||
|             newtube = get_sensors() | ||||
|             if newtube >= 0 and newtube != just_placed: | ||||
|                 # need to return a cable | ||||
|                 mainloop_get.put(("return", newtube)) | ||||
|                 just_placed = -1 | ||||
|              | ||||
|             if not mainloop_get.empty(): | ||||
|                 action, get_cable = mainloop_get.get() | ||||
|                 if get_cable > -1: | ||||
|                     global sensors | ||||
|                     if action == "pickup": | ||||
|                         spot = get_open_spot(sensors) | ||||
|                         if spot is not False: | ||||
|                             arm_ready = False | ||||
|                             pool.apply_async(ur5_control.pick_up_routine, (arm, get_cable, True, spot), callback=arm_start_callback, error_callback=handle_error) | ||||
|                             mode = "Pickup" | ||||
|                             cable_list_state[get_cable] = False # mark as removed | ||||
|                             get_sensors(sensors,flag=get_open_spot(sensors)) | ||||
|  | ||||
|                     if action == "return": | ||||
|                         arm_ready = False | ||||
|                         pool.apply_async(ur5_control.return_routine, (arm, get_cable), callback=arm_start_callback, error_callback=handle_error) | ||||
|                         mode = "ReturnC" | ||||
|             else: | ||||
|                 # LED idle anim | ||||
|                 pass | ||||
|  | ||||
|     if mode == "Pickup": | ||||
|         # complete | ||||
|         if arm_ready == True: | ||||
|             mode = "Idle" | ||||
|             arm_ready = False | ||||
|              | ||||
|         else: | ||||
|             # getting cable and bringing to tray | ||||
|             # led animation | ||||
|             pass | ||||
|      | ||||
|     if mode == "ReturnC": | ||||
|         # complete | ||||
|         if arm_ready == True: | ||||
|             mode = "Scan" | ||||
|             arm_ready = False | ||||
|             camera_ready = False | ||||
|             pool.apply_async(camera.read_qr, (10,), callback=camera_start_callback, error_callback=handle_error) | ||||
|              | ||||
|         else: | ||||
|             # getting cable from and bringing to camera | ||||
|             # led animation | ||||
|             pass | ||||
|  | ||||
|     if mode == "Scan": | ||||
|         if camera_ready == True: | ||||
|             global scan_value | ||||
|             if scan_value is False: | ||||
|                 # unable to scan ???? not good | ||||
|                 fprint("Unable to scan cable. Gonna retry.") | ||||
|                 camera_ready = False | ||||
|                 pool.apply_async(camera.read_qr, (10,), callback=camera_start_callback, error_callback=handle_error) | ||||
|                 pass | ||||
|             elif scan_value.find("bldn.app/") > -1: | ||||
|                 scan_value = scan_value[scan_value.find("bldn.app/")+9:] | ||||
|              | ||||
|             fprint("Got cable: " + scan_value) | ||||
|  | ||||
|             for idx in range(len(cable_list)): | ||||
|                 cable = cable_list[idx] | ||||
|                 if cable == scan_value and cable_list_state[idx] == False: | ||||
|                     cable_list_state[idx] = True # mark cable as returned | ||||
|                     arm_ready = False | ||||
|                     pool.apply_async(ur5_control.return_camera, (arm, idx), callback=arm_start_callback, error_callback=handle_error) | ||||
|                     mode = "Return" | ||||
|                     break | ||||
|  | ||||
|              | ||||
|  | ||||
|     if mode == "Return": | ||||
|         if arm_ready == True: | ||||
|             mode = "Idle" | ||||
|             arm_ready = False | ||||
|             # movement finished | ||||
|  | ||||
|         else:  | ||||
|             # cable going from camera to holder | ||||
|             # led animation | ||||
|             pass | ||||
|              | ||||
|     if True: | ||||
|         # do every loop! | ||||
|         if ring_animation is not None and ledsys.mode != "idle": | ||||
|             ledsys.mainloop(None, ring_animation) | ||||
|         elif ring_animation is not None: | ||||
|             ledsys.mainloop(led_set_mode, ring_animation) | ||||
|          | ||||
|  | ||||
|  | ||||
| @@ -584,14 +759,15 @@ class LogExceptions(object): | ||||
|         return result | ||||
|  | ||||
| class LoggingPool(Pool): | ||||
|     def apply_async(self, func, args=(), kwds={}, callback=None): | ||||
|         return Pool.apply_async(self, LogExceptions(func), args, kwds, callback) | ||||
|     def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None): | ||||
|         return Pool.apply_async(self, LogExceptions(func), args, kwds, callback, error_callback) | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     #sys.stdout = Logger(filename="output.log") | ||||
|     #sys.stderr = Logger(filename="output.log") | ||||
|     #log_to_stderr(logging.DEBUG) | ||||
|      | ||||
|     fprint("Starting Jukebox control system...") | ||||
|     with open('config.yml', 'r') as fileread: | ||||
|         #global config | ||||
| @@ -622,7 +798,13 @@ if __name__ == "__main__": | ||||
|             fprint("Starting in server mode.") | ||||
|             if setup_server(pool): | ||||
|                 fprint("Entering main loop...") | ||||
|                 start = 0 | ||||
|                 speed = config["loopspeed"] | ||||
|                 while(keeprunning): | ||||
|                     start = uptime() | ||||
|                     mainloop_server(pool) | ||||
|                     # limit to certain "framerate" | ||||
|                     while start + 1.0/speed < uptime(): | ||||
|                         pass | ||||
|  | ||||
|  | ||||
|   | ||||
							
								
								
									
										44
									
								
								test.py
									
									
									
									
									
								
							
							
						
						
									
										44
									
								
								test.py
									
									
									
									
									
								
							| @@ -1,4 +1,46 @@ | ||||
| print("\u001b[37m") | ||||
| from pyModbusTCP.client import ModbusClient | ||||
|  | ||||
|  | ||||
|  | ||||
| def get_sensors(): | ||||
|     mbconn = ModbusClient(host="192.168.1.20", port=502, auto_open=True, auto_close=True) | ||||
|     """ | ||||
|     port 1: 256 | ||||
|     port 2: 272 | ||||
|     port 3: 288 | ||||
|     port 4: 304 | ||||
|     port 5: 320 | ||||
|     port 6: 336 | ||||
|     port 7: 352 | ||||
|     port 8: 368 | ||||
|     """ | ||||
|     out = list() | ||||
|     for reg in [352, 288, 304, 368]: | ||||
|         val = mbconn.read_holding_registers(reg)[0] # read only one register | ||||
|         print(val) | ||||
|         if val == 1: | ||||
|             out.append(True) | ||||
|         else: | ||||
|             out.append(False) | ||||
|  | ||||
|     return out | ||||
|  | ||||
| def get_open_spot(sensordata): | ||||
|     for x in range(len(sensordata)): | ||||
|         sens = sensordata[x] | ||||
|         if not sens: | ||||
|             return x | ||||
|          | ||||
|     # if we get here, every spot is full | ||||
|          | ||||
|     return False | ||||
|  | ||||
|  | ||||
| testmb = get_sensors() | ||||
| print(testmb) | ||||
| print("Spot open", get_open_spot(testmb)) | ||||
|  | ||||
| exit() | ||||
|  | ||||
| class Ring: | ||||
|     def __init__(self) -> None: | ||||
|   | ||||
		Reference in New Issue
	
	Block a user