""" Python library to control an UR robot through its TCP/IP interface Documentation from universal robots: http://support.universal-robots.com/URRobot/RemoteAccess """ import logging import numbers import collections from urx import urrtmon from urx import ursecmon __author__ = "Olivier Roulet-Dubonnet" __copyright__ = "Copyright 2011-2015, Sintef Raufoss Manufacturing" __license__ = "LGPLv3" class RobotException(Exception): pass class URRobot(object): """ Python interface to socket interface of UR robot. programs are send to port 3002 data is read from secondary interface(10Hz?) and real-time interface(125Hz) (called Matlab interface in documentation) Since parsing the RT interface uses som CPU, and does not support all robots versions, it is disabled by default The RT interfaces is only used for the get_force related methods Rmq: A program sent to the robot i executed immendiatly and any running program is stopped """ def __init__(self, host, use_rt=False): self.logger = logging.getLogger("urx") self.host = host self.csys = None self.logger.debug("Opening secondary monitor socket") self.secmon = ursecmon.SecondaryMonitor(self.host) # data from robot at 10Hz self.rtmon = None if use_rt: self.rtmon = self.get_realtime_monitor() # precision of joint movem used to wait for move completion # the value must be conservative! otherwise we may wait forever self.joinEpsilon = 0.01 # It seems URScript is limited in the character length of floats it accepts self.max_float_length = 6 # FIXME: check max length!!! self.secmon.wait() # make sure we get data from robot before letting clients access our methods def __repr__(self): return "Robot Object (IP=%s, state=%s)" % (self.host, self.secmon.get_all_data()["RobotModeData"]) def __str__(self): return self.__repr__() def __enter__(self): return self def __exit__(self, exc_type, exc_value, traceback): self.close() def is_running(self): """ Return True if robot is running (not necessary running a program, it might be idle) """ return self.secmon.running def is_program_running(self): """ check if program is running. Warning!!!!!: After sending a program it might take several 10th of a second before the robot enters the running state """ return self.secmon.is_program_running() def send_program(self, prog): """ send a complete program using urscript to the robot the program is executed immediatly and any runnning program is interrupted """ self.logger.info("Sending program: " + prog) self.secmon.send_program(prog) def get_tcp_force(self, wait=True): """ return measured force in TCP if wait==True, waits for next packet before returning """ return self.rtmon.getTCFForce(wait) def get_force(self, wait=True): """ length of force vector returned by get_tcp_force if wait==True, waits for next packet before returning """ tcpf = self.get_tcp_force(wait) force = 0 for i in tcpf: force += i**2 return force**0.5 def set_tcp(self, tcp): """ set robot flange to tool tip transformation """ prog = "set_tcp(p[{}, {}, {}, {}, {}, {}])".format(*tcp) self.send_program(prog) def set_payload(self, weight, cog=None): """ set payload in Kg cog is a vector x,y,z if cog is not specified, then tool center point is used """ if cog: cog = list(cog) cog.insert(0, weight) prog = "set_payload({}, ({},{},{}))".format(*cog) else: prog = "set_payload(%s)" % weight self.send_program(prog) def set_gravity(self, vector): """ set direction of gravity """ prog = "set_gravity(%s)" % list(vector) self.send_program(prog) def send_message(self, msg): """ send message to the GUI log tab on the robot controller """ prog = "textmsg(%s)" % msg self.send_program(prog) def set_digital_out(self, output, val): """ set digital output. val is a bool """ if val in (True, 1): val = "True" else: val = "False" self.send_program('digital_out[%s]=%s' % (output, val)) def get_analog_inputs(self): """ get analog input """ return self.secmon.get_analog_inputs() def get_analog_in(self, nb, wait=False): """ get analog input """ return self.secmon.get_analog_in(nb, wait=wait) def get_digital_in_bits(self): """ get digital output """ return self.secmon.get_digital_in_bits() def get_digital_in(self, nb, wait=False): """ get digital output """ return self.secmon.get_digital_in(nb, wait) def get_digital_out(self, val, wait=False): """ get digital output """ return self.secmon.get_digital_out(val, wait=wait) def get_digital_out_bits(self, wait=False): """ get digital output as a byte """ return self.secmon.get_digital_out_bits(wait=wait) def set_analog_out(self, output, val): """ set analog output, val is a float """ prog = "set_analog_out(%s, %s)" % (output, val) self.send_program(prog) def set_tool_voltage(self, val): """ set voltage to be delivered to the tool, val is 0, 12 or 24 """ prog = "set_tool_voltage(%s)" % (val) self.send_program(prog) def _wait_for_move(self, target, threshold=None, timeout=5, joints=False): """ wait for a move to complete. Unfortunately there is no good way to know when a move has finished so for every received data from robot we compute a dist equivalent and when it is lower than 'threshold' we return. if threshold is not reached within timeout, an exception is raised """ self.logger.debug("Waiting for move completion using threshold %s and target %s", threshold, target) start_dist = self._get_dist(target, joints) if threshold is None: threshold = start_dist * 0.8 if threshold < 0.001: # roboten precision is limited threshold = 0.001 self.logger.debug("No threshold set, setting it to %s", threshold) count = 0 while True: if not self.is_running(): raise RobotException("Robot stopped") dist = self._get_dist(target, joints) self.logger.debug("distance to target is: %s, target dist is %s", dist, threshold) if not self.secmon.is_program_running(): if dist < threshold: self.logger.debug("we are threshold(%s) close to target, move has ended", threshold) return count += 1 if count > timeout * 10: raise RobotException("Goal not reached but no program has been running for {} seconds. dist is {}, threshold is {}, target is {}, current pose is {}".format(timeout, dist, threshold, target, URRobot.getl(self))) else: count = 0 def _get_dist(self, target, joints=False): if joints: return self._get_joints_dist(target) else: return self._get_lin_dist(target) def _get_lin_dist(self, target): # FIXME: we have an issue here, it seems sometimes the axis angle received from robot pose = URRobot.getl(self, wait=True) dist = 0 for i in range(3): dist += (target[i] - pose[i]) ** 2 for i in range(3, 6): dist += ((target[i] - pose[i]) / 5) ** 2 # arbitraty length like return dist ** 0.5 def _get_joints_dist(self, target): joints = self.getj(wait=True) dist = 0 for i in range(6): dist += (target[i] - joints[i]) ** 2 return dist ** 0.5 def getj(self, wait=False): """ get joints position """ jts = self.secmon.get_joint_data(wait) return [jts["q_actual0"], jts["q_actual1"], jts["q_actual2"], jts["q_actual3"], jts["q_actual4"], jts["q_actual5"]] def speedx(self, command, velocities, acc, min_time): vels = [round(i, self.max_float_length) for i in velocities] vels.append(acc) vels.append(min_time) prog = "{}([{},{},{},{},{},{}], a={}, t_min={})".format(command, *vels) self.send_program(prog) def movej(self, joints, acc=0.1, vel=0.05, wait=True, relative=False, threshold=None): """ move in joint space """ if relative: l = self.getj() joints = [v + l[i] for i, v in enumerate(joints)] prog = self._format_move("movej", joints, acc, vel) self.send_program(prog) if wait: self._wait_for_move(joints[:6], threshold=threshold, joints=True) return self.getj() def movel(self, tpose, acc=0.01, vel=0.01, wait=True, relative=False, threshold=None): """ Send a movel command to the robot. See URScript documentation. """ return self.movex("movel", tpose, acc=acc, vel=vel, wait=wait, relative=relative, threshold=threshold) def movep(self, tpose, acc=0.01, vel=0.01, wait=True, relative=False, threshold=None): """ Send a movep command to the robot. See URScript documentation. """ return self.movex("movep", tpose, acc=acc, vel=vel, wait=wait, relative=relative, threshold=threshold) def servoc(self, tpose, acc=0.01, vel=0.01, wait=True, relative=False, threshold=None): """ Send a servoc command to the robot. See URScript documentation. """ return self.movex("servoc", tpose, acc=acc, vel=vel, wait=wait, relative=relative, threshold=threshold) def _format_move(self, command, tpose, acc, vel, radius=0, prefix=""): tpose = [round(i, self.max_float_length) for i in tpose] tpose.append(acc) tpose.append(vel) tpose.append(radius) return "{}({}[{},{},{},{},{},{}], a={}, v={}, r={})".format(command, prefix, *tpose) def movex(self, command, tpose, acc=0.01, vel=0.01, wait=True, relative=False, threshold=None): """ Send a move command to the robot. since UR robotene have several methods this one sends whatever is defined in 'command' string """ if relative: l = self.getl() tpose = [v + l[i] for i, v in enumerate(tpose)] prog = self._format_move(command, tpose, acc, vel, prefix="p") self.send_program(prog) if wait: self._wait_for_move(tpose[:6], threshold=threshold) return self.getl() def getl(self, wait=False, _log=True): """ get TCP position """ pose = self.secmon.get_cartesian_info(wait) if pose: pose = [pose["X"], pose["Y"], pose["Z"], pose["Rx"], pose["Ry"], pose["Rz"]] if _log: self.logger.debug("Received pose from robot: %s", pose) return pose def movec(self, pose_via, pose_to, acc=0.01, vel=0.01, wait=True, threshold=None): """ Move Circular: Move to position (circular in tool-space) see UR documentation """ pose_via = [round(i, self.max_float_length) for i in pose_via] pose_to = [round(i, self.max_float_length) for i in pose_to] prog = "movec(p%s, p%s, a=%s, v=%s, r=%s)" % (pose_via, pose_to, acc, vel, "0") self.send_program(prog) if wait: self._wait_for_move(pose_to, threshold=threshold) return self.getl() def movejs(self, joint_positions_list, acc=0.01, vel=0.01, radius=0.01, wait=True, threshold=None): """ Concatenate several movej commands and applies a blending radius joint_positions_list is a list of joint_positions. This method is usefull since any new command from python to robot make the robot stop """ return self.movexs("movej", joint_positions_list, acc, vel, radius, wait, threshold=threshold) def movels(self, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True, threshold=None): """ Concatenate several movel commands and applies a blending radius pose_list is a list of pose. This method is usefull since any new command from python to robot make the robot stop """ return self.movexs("movel", pose_list, acc, vel, radius, wait, threshold=threshold) def movexs(self, command, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True, threshold=None): """ Concatenate several movex commands and applies a blending radius pose_list is a list of pose. This method is usefull since any new command from python to robot make the robot stop """ header = "def myProg():\n" end = "end\n" prog = header # Check if 'vel' is a single number or a sequence. if isinstance(vel, numbers.Number): # Make 'vel' a sequence vel = len(pose_list) * [vel] elif not isinstance(vel, collections.Sequence): raise RobotException( 'movexs: "vel" must be a single number or a sequence!') # Check for adequate number of speeds if len(vel) != len(pose_list): raise RobotException( 'movexs: "vel" must be a number or a list ' + 'of numbers the same length as "pose_list"!') # Check if 'radius' is a single number. if isinstance(radius, numbers.Number): # Make 'radius' a sequence radius = len(pose_list) * [radius] elif not isinstance(radius, collections.Sequence): raise RobotException( 'movexs: "radius" must be a single number or a sequence!') # Ensure that last pose a stopping pose. radius[-1] = 0.0 # Require adequate number of radii. if len(radius) != len(pose_list): raise RobotException( 'movexs: "radius" must be a number or a list ' + 'of numbers the same length as "pose_list"!') for idx, pose in enumerate(pose_list): prog += self._format_move(command, pose, acc, vel[idx], radius[idx], prefix="p") + "\n" prog += end self.send_program(prog) if wait: if command == 'movel': self._wait_for_move(target=pose_list[-1], threshold=threshold, joints=False) elif command == 'movej': self._wait_for_move(target=pose_list[-1], threshold=threshold, joints=True) return self.getl() def stopl(self, acc=0.5): self.send_program("stopl(%s)" % acc) def stopj(self, acc=1.5): self.send_program("stopj(%s)" % acc) def stop(self): self.stopj() def close(self): """ close connection to robot and stop internal thread """ self.logger.info("Closing sockets to robot") self.secmon.close() if self.rtmon: self.rtmon.stop() def set_freedrive(self, val, timeout=60): """ set robot in freedrive/backdrive mode where an operator can jog the robot to wished pose. Freedrive will timeout at 60 seconds. """ if val: self.send_program("def myProg():\n\tfreedrive_mode()\n\tsleep({})\nend".format(timeout)) else: # This is a non-existant program, but running it will stop freedrive self.send_program("def myProg():\n\tend_freedrive_mode()\nend") def set_simulation(self, val): if val: self.send_program("set sim") else: self.send_program("set real") def get_realtime_monitor(self): """ return a pointer to the realtime monitor object usefull to track robot position for example """ if not self.rtmon: self.logger.info("Opening real-time monitor socket") self.rtmon = urrtmon.URRTMonitor(self.host) # som information is only available on rt interface self.rtmon.start() self.rtmon.set_csys(self.csys) return self.rtmon def translate(self, vect, acc=0.01, vel=0.01, wait=True, command="movel"): """ move tool in base coordinate, keeping orientation """ p = self.getl() p[0] += vect[0] p[1] += vect[1] p[2] += vect[2] return self.movex(command, p, vel=vel, acc=acc, wait=wait) def up(self, z=0.05, acc=0.01, vel=0.01): """ Move up in csys z """ p = self.getl() p[2] += z self.movel(p, acc=acc, vel=vel) def down(self, z=0.05, acc=0.01, vel=0.01): """ Move down in csys z """ self.up(-z, acc, vel)