add support to realtime interface from firmware 5.1 and methods to get joint temperature, joint voltage, joint current, main voltage, robot voltage and robot current

This commit is contained in:
mustaffxx 2021-04-09 11:19:42 -03:00 committed by oroulet
parent 18fed61168
commit ddc511ea74
4 changed files with 187 additions and 15 deletions

View File

@ -0,0 +1,43 @@
import urx
import time
import logging
r = urx.Robot("192.168.111.134", use_rt=True, urFirm=5.1)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
while 1:
try:
j_temp = r.get_joint_temperature()
j_voltage = r.get_joint_voltage()
j_current = r.get_joint_current()
main_voltage = r.get_main_voltage()
robot_voltage = r.get_robot_voltage()
robot_current = r.get_robot_current()
print("JOINT TEMPERATURE")
print(j_temp)
print("JOINT VOLTAGE")
print(j_voltage)
print("JOINT CURRENT")
print(j_current)
print("MAIN VOLTAGE")
print(main_voltage)
print("ROBOT VOLTAGE")
print(robot_voltage)
print("ROBOT CURRENT")
print(robot_current)
print("##########\t##########\t##########\t##########")
time.sleep(1)
except:
pass
r.close()

View File

@ -23,8 +23,8 @@ class Robot(URRobot):
and includes support for setting a reference coordinate system
"""
def __init__(self, host, use_rt=False):
URRobot.__init__(self, host, use_rt)
def __init__(self, host, use_rt=False, urFirm=None):
URRobot.__init__(self, host, use_rt, urFirm)
self.csys = m3d.Transform()
def _get_lin_dist(self, target):

View File

@ -31,9 +31,10 @@ class URRobot(object):
Rmq: A program sent to the robot i executed immendiatly and any running program is stopped
"""
def __init__(self, host, use_rt=False):
def __init__(self, host, use_rt=False, urFirm=None):
self.logger = logging.getLogger("urx")
self.host = host
self.urFirm = urFirm
self.csys = None
self.logger.debug("Opening secondary monitor socket")
@ -104,12 +105,55 @@ class URRobot(object):
force += i**2
return force**0.5
def get_joint_temperature(self, wait=True):
"""
return measured joint temperature
if wait==True, waits for next packet before returning
"""
return self.rtmon.getJOINTTemperature(wait)
def get_joint_voltage(self, wait=True):
"""
return measured joint voltage
if wait==True, waits for next packet before returning
"""
return self.rtmon.getJOINTVoltage(wait)
def get_joint_current(self, wait=True):
"""
return measured joint current
if wait==True, waits for next packet before returning
"""
return self.rtmon.getJOINTCurrent(wait)
def get_main_voltage(self, wait=True):
"""
return measured Safety Control Board: Main voltage
if wait==True, waits for next packet before returning
"""
return self.rtmon.getMAINVoltage(wait)
def get_robot_voltage(self, wait=True):
"""
return measured Safety Control Board: Robot voltage (48V)
if wait==True, waits for next packet before returning
"""
return self.rtmon.getROBOTVoltage(wait)
def get_robot_current(self, wait=True):
"""
return measured Safety Control Board: Robot current
if wait==True, waits for next packet before returning
"""
return self.rtmon.getROBOTCurrent(wait)
def get_all_rt_data(self, wait=True):
"""
return all data parsed from robot real-time interace as a dict
if wait==True, waits for next packet before returning
"""
return self.rtmon.getALLData(wait)
def set_tcp(self, tcp):
"""
@ -468,7 +512,7 @@ class URRobot(object):
"""
if not self.rtmon:
self.logger.info("Opening real-time monitor socket")
self.rtmon = urrtmon.URRTMonitor(self.host) # som information is only available on rt interface
self.rtmon = urrtmon.URRTMonitor(self.host, self.urFirm) # som information is only available on rt interface
self.rtmon.start()
self.rtmon.set_csys(self.csys)
return self.rtmon

View File

@ -30,7 +30,9 @@ class URRTMonitor(threading.Thread):
# pose is not included!
rtstruct540 = struct.Struct('>d6d6d6d6d6d6d6d6d18d')
def __init__(self, urHost):
rtstruct5_1 = struct.Struct('>d1d6d6d6d6d6d6d6d6d6d6d6d6d6d6d1d6d1d1d1d6d1d6d3d6d1d1d1d1d1d1d1d6d1d1d3d3d')
def __init__(self, urHost, urFirm=None):
threading.Thread.__init__(self)
self.logger = logging.getLogger(self.__class__.__name__)
self.daemon = True
@ -40,6 +42,7 @@ class URRTMonitor(threading.Thread):
self._rtSock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._rtSock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
self._urHost = urHost
self.urFirm = urFirm
# Package data variables
self._timestamp = None
self._ctrlTimestamp = None
@ -47,7 +50,12 @@ class URRTMonitor(threading.Thread):
self._qTarget = None
self._tcp = None
self._tcp_force = None
self._joint_temperature = None
self._joint_voltage = None
self._joint_current = None
self._main_voltage = None
self._robot_voltage = None
self._robot_current = None
self.__recvTime = 0
self._last_ctrl_ts = 0
# self._last_ts = 0
@ -141,6 +149,30 @@ class URRTMonitor(threading.Thread):
return tcf_force
getTCFForce = tcf_force
def joint_temperature(self, wait=False, timestamp=False):
""" Get the joint temperature."""
if wait:
self.wait()
with self._dataAccess:
joint_temperature = self._joint_temperature
if timestamp:
return self._timestamp, joint_temperature
else:
return joint_temperature
getJOINTTemperature = joint_temperature
def joint_voltage(self, wait=False, timestamp=False):
""" Get the joint voltage."""
if wait:
self.wait()
with self._dataAccess:
joint_voltage = self._joint_voltage
if timestamp:
return self._timestamp, joint_voltage
else:
return joint_voltage
getJOINTVoltage = joint_voltage
def joint_current(self, wait=False, timestamp=False):
""" Get the joint current."""
if wait:
@ -153,6 +185,42 @@ class URRTMonitor(threading.Thread):
return joint_current
getJOINTCurrent = joint_current
def main_voltage(self, wait=False, timestamp=False):
""" Get the Safety Control Board: Main voltage."""
if wait:
self.wait()
with self._dataAccess:
main_voltage = self._main_voltage
if timestamp:
return self._timestamp, main_voltage
else:
return main_voltage
getMAINVoltage = main_voltage
def robot_voltage(self, wait=False, timestamp=False):
""" Get the Safety Control Board: Robot voltage (48V)."""
if wait:
self.wait()
with self._dataAccess:
robot_voltage = self._robot_voltage
if timestamp:
return self._timestamp, robot_voltage
else:
return robot_voltage
getROBOTVoltage = robot_voltage
def robot_current(self, wait=False, timestamp=False):
""" Get the Safety Control Board: Robot current."""
if wait:
self.wait()
with self._dataAccess:
robot_current = self._robot_current
if timestamp:
return self._timestamp, robot_current
else:
return robot_current
getROBOTCurrent = robot_current
def __recv_rt_data(self):
head = self.__recv_bytes(4)
# Record the timestamp for this logical package
@ -162,15 +230,20 @@ class URRTMonitor(threading.Thread):
'Received header telling that package is %s bytes long',
pkgsize)
payload = self.__recv_bytes(pkgsize - 4)
if pkgsize >= 692:
unp = self.rtstruct692.unpack(payload[:self.rtstruct692.size])
elif pkgsize >= 540:
unp = self.rtstruct540.unpack(payload[:self.rtstruct540.size])
if self.urFirm is not None:
if self.urFirm == 5.1:
unp = self.rtstruct5_1.unpack(payload[:self.rtstruct5_1.size])
else:
self.logger.warning(
'Error, Received packet of length smaller than 540: %s ',
pkgsize)
return
if pkgsize >= 692:
unp = self.rtstruct692.unpack(payload[:self.rtstruct692.size])
elif pkgsize >= 540:
unp = self.rtstruct540.unpack(payload[:self.rtstruct540.size])
else:
self.logger.warning(
'Error, Received packet of length smaller than 540: %s ',
pkgsize)
return
with self._dataAccess:
self._timestamp = timestamp
@ -190,8 +263,14 @@ class URRTMonitor(threading.Thread):
self._qdActual = np.array(unp[37:43])
self._qTarget = np.array(unp[1:7])
self._tcp_force = np.array(unp[67:73])
self._tcp = np.array(unp[73:79])
self._tcp = np.array(unp[73:79])
self._joint_current = np.array(unp[43:49])
if self.urFirm >= 3.1:
self._joint_temperature = np.array(unp[86:92])
self._joint_voltage = np.array(unp[124:130])
self._main_voltage = unp[121]
self._robot_voltage = unp[122]
self._robot_current = unp[123]
if self._csys:
with self._csys_lock:
@ -260,7 +339,13 @@ class URRTMonitor(threading.Thread):
qTarget=self._qTarget,
tcp=self._tcp,
tcp_force=self._tcp_force,
joint_current=self._joint_current)
joint_temperature=self._joint_temperature,
joint_voltage=self._joint_voltage,
joint_current=self._joint_current,
main_voltage=self._main_voltage,
robot_voltage=self._robot_voltage,
robot_current=self._robot_current)
getALLData = get_all_data
def stop(self):
# print(self.__class__.__name__+': Stopping')