diff --git a/examples/get_robot.py b/examples/get_robot.py index 3e9052b..eea1000 100644 --- a/examples/get_robot.py +++ b/examples/get_robot.py @@ -1,11 +1,15 @@ import urx from IPython import embed +import logging if __name__ == "__main__": try: - robot = urx.Robot("192.168.1.6") + logging.basicConfig(level=logging.INFO) + #robot = urx.Robot("192.168.1.6") + robot = urx.Robot("192.168.1.100") + #robot = urx.Robot("localhost") r = robot print("Robot object is available as robot or r") embed() finally: - robot.shutdown() + robot.close() diff --git a/examples/joystick_control.py b/examples/joystick_control.py index 82fa0bf..72af2c8 100644 --- a/examples/joystick_control.py +++ b/examples/joystick_control.py @@ -69,7 +69,8 @@ class Service(object): if abs(val) < 0.2: val = 0 tmp = "self.cmd.axis" + str(i) + " = " + str(val) - #print(tmp) + if val != 0: + print(tmp) exec(tmp) #get button state @@ -92,20 +93,20 @@ class Service(object): speeds = [0, 0, 0, 0, 0, 0] #get linear speed from joystick - speeds[0] = self.cmd.axis0 * self.linear_velocity + speeds[0] = -1 * self.cmd.axis0 * self.linear_velocity speeds[1] = self.cmd.axis1 * self.linear_velocity - if self.cmd.btn4 and not self.cmd.btn6: - speeds[2] = self.linear_velocity - if self.cmd.btn6 and not self.cmd.btn4: + if self.cmd.btn4 and not self.cmd.axis2: speeds[2] = -self.linear_velocity + if self.cmd.axis2 and not self.cmd.btn4: + speeds[2] = self.cmd.axis2 * self.linear_velocity #get rotational speed from joystick - speeds[4] = -1 * self.cmd.axis2 * self.rotational_velocity - speeds[3] = self.cmd.axis3 * self.rotational_velocity - if self.cmd.btn1 and not self.cmd.btn2: + speeds[4] = -1 * self.cmd.axis3 * self.rotational_velocity + speeds[3] = -1 * self.cmd.axis4 * self.rotational_velocity + if self.cmd.btn5 and not self.cmd.axis5: speeds[5] = self.rotational_velocity - if self.cmd.btn2 and not self.cmd.btn1: - speeds[5] = -self.rotational_velocity + if self.cmd.axis5 and not self.cmd.btn5: + speeds[5] = self.cmd.axis5 * -self.rotational_velocity #for some reasons everything is inversed speeds = [-i for i in speeds] @@ -117,7 +118,6 @@ class Service(object): else: self.robot.speedl(speeds, acc=self.acceleration, min_time=2) - time.sleep(0.12)#URX secondday port accepts command at 10Hz, no need to go faster def close(self): if self.joystick: @@ -125,13 +125,10 @@ class Service(object): if __name__ == "__main__": - #create robot object - robot = urx.Robot('192.168.0.90') - #robot = urx.Robot('localhost') - #set a base transformation for robot (optional) - robot.set_tcp((0, 0, 0, 0, 0, 0)) - robot.csys.orient.rotate_zb(-pi/4) - + robot = urx.Robot("192.168.1.100") + r = robot + + #start joystick service with given max speed and acceleration service = Service(robot, linear_velocity=0.1, rotational_velocity=0.1, acceleration=0.1) service.init_joystick() diff --git a/examples/matrices.py b/examples/matrices.py index 691fdb6..0b7c0a4 100644 --- a/examples/matrices.py +++ b/examples/matrices.py @@ -4,8 +4,8 @@ import urx import logging if __name__ == "__main__": - #rob = urx.Robot("192.168.1.6") - rob = urx.Robot("localhost") + rob = urx.Robot("192.168.1.100") + #rob = urx.Robot("localhost") rob.set_tcp((0,0,0,0,0,0)) rob.set_payload(0.5, (0,0,0)) try: diff --git a/examples/simple.py b/examples/simple.py index c8a9759..2432d33 100644 --- a/examples/simple.py +++ b/examples/simple.py @@ -5,8 +5,8 @@ import logging if __name__ == "__main__": logging.basicConfig(level=logging.WARN) - #rob = urx.Robot("192.168.1.6") - rob = urx.Robot("localhost") + rob = urx.Robot("192.168.1.100") + #rob = urx.Robot("localhost") rob.set_tcp((0,0,0,0,0,0)) rob.set_payload(0.5, (0,0,0)) try: diff --git a/examples/test_all.py b/examples/test_all.py index dabef9a..2324e24 100644 --- a/examples/test_all.py +++ b/examples/test_all.py @@ -14,17 +14,18 @@ if sys.version_info[0] < 3: # support python v2 def wait(): if do_wait: + print("Click enter to continue") input() if __name__ == "__main__": - logging.basicConfig(level=logging.WARN) + logging.basicConfig(level=logging.INFO) do_wait = True if len(sys.argv) > 1: do_wait = False - #rob = urx.Robot("192.168.1.6") - rob = urx.Robot("localhost") + rob = urx.Robot("192.168.1.100") + #rob = urx.Robot("localhost") rob.set_tcp((0, 0, 0, 0, 0, 0)) rob.set_payload(0.5, (0, 0, 0)) try: @@ -49,8 +50,8 @@ if __name__ == "__main__": wait() pose[2] += l rob.movel(pose, acc=a, vel=v, wait=False) - print("Waiting for end move") - rob.wait_for_move(0, pose) + print("Waiting 2s for end move") + time.sleep(2) print("Moving through several points with a radius") wait() @@ -77,22 +78,14 @@ if __name__ == "__main__": print("stop robot") rob.stopj() - print("Test movep, it will fail on older controllers") - wait() - init = rob.get_pose() - pose = init.copy() - for _ in range(3): - pose.pos[0] += l - rob.movep(pose, acc=a, vel=v, radius=r) - rob.movep(init, acc=a, vel=v, radius=r) # back to start - print("Test movec") wait() - via = init.copy() + pose = rob.get_pose() + via = pose.copy() via.pos[0] += l to = via.copy() to.pos[1] += l - rob.movec(via, to, acc=a, vel=v, radius=r) + rob.movec(via, to, acc=a, vel=v) print("Sending robot back to original position") rob.movej(initj, acc=0.8, vel=0.2) diff --git a/urx/__init__.py b/urx/__init__.py index 8b54f9c..7589a92 100644 --- a/urx/__init__.py +++ b/urx/__init__.py @@ -4,4 +4,10 @@ Python library to control an UR robot through its TCP/IP interface __version__ = "0.9.0" -from urx.robot import Robot, RobotException, URRobot +from urx.urrobot import RobotException, URRobot + +try: + from urx.robot import Robot +except ImportError as ex: + print("Exception while importing math3d base robot, disabling use of matrices", ex) + Robot = URRobot diff --git a/urx/robot.py b/urx/robot.py index 50bcfb9..c8d36c0 100644 --- a/urx/robot.py +++ b/urx/robot.py @@ -8,426 +8,10 @@ __author__ = "Olivier Roulet-Dubonnet" __copyright__ = "Copyright 2011-2015, Sintef Raufoss Manufacturing" __license__ = "GPLv3" -import logging - - -MATH3D = True -try: - import math3d as m3d - import numpy as np -except ImportError: - MATH3D = False - print("python-math3d library could not be found on this computer, disabling use of matrices and path blending") - -from urx import urrtmon -from urx import ursecmon - - -class RobotException(Exception): - pass - - -class URRobot(object): - - """ - Python interface to socket interface of UR robot. - programs are send to port 3002 - data is read from secondary interface(10Hz?) and real-time interface(125Hz) (called Matlab interface in documentation) - Since parsing the RT interface uses som CPU, and does not support all robots versions, it is disabled by default - The RT interfaces is only used for the get_force related methods - Rmq: A program sent to the robot i executed immendiatly and any running program is stopped - """ - - def __init__(self, host, use_rt=False): - self.logger = logging.getLogger("urx") - self.host = host - self.csys = None - - self.logger.debug("Opening secondary monitor socket") - self.secmon = ursecmon.SecondaryMonitor(self.host) # data from robot at 10Hz - - self.rtmon = None - if use_rt: - self.rtmon = self.get_realtime_monitor() - # precision of joint movem used to wait for move completion - # the value must be conservative! otherwise we may wait forever - self.joinEpsilon = 0.01 - # It seems URScript is limited in the character length of floats it accepts - self.max_float_length = 6 # FIXME: check max length!!! - - self.secmon.wait() # make sure we get data from robot before letting clients access our methods - - def __repr__(self): - return "Robot Object (IP=%s, state=%s)" % (self.host, self.secmon.get_all_data()["RobotModeData"]) - - def __str__(self): - return self.__repr__() - - def is_running(self): - """ - Return True if robot is running (not - necessary running a program, it might be idle) - """ - return self.secmon.running - - def is_program_running(self): - """ - check if program is running. - Warning!!!!!: After sending a program it might take several 10th of - a second before the robot enters the running state - """ - return self.secmon.is_program_running() - - def send_program(self, prog): - """ - send a complete program using urscript to the robot - the program is executed immediatly and any runnning - program is interrupted - """ - self.logger.info("Sending program: " + prog) - self.secmon.send_program(prog) - - def get_tcp_force(self, wait=True): - """ - return measured force in TCP - if wait==True, waits for next packet before returning - """ - return self.rtmon.getTCFForce(wait) - - def get_force(self, wait=True): - """ - length of force vector returned by get_tcp_force - if wait==True, waits for next packet before returning - """ - tcpf = self.get_tcp_force(wait) - force = 0 - for i in tcpf: - force += i**2 - return force**0.5 - - def set_tcp(self, tcp): - """ - set robot flange to tool tip transformation - """ - prog = "set_tcp(p[{}, {}, {}, {}, {}, {}])".format(*tcp) - self.send_program(prog) - - def set_payload(self, weight, cog=None): - """ - set payload in Kg - cog is a vector x,y,z - if cog is not specified, then tool center point is used - """ - if cog: - cog = list(cog) - cog.insert(0, weight) - prog = "set_payload({}, ({},{},{}))".format(*cog) - else: - prog = "set_payload(%s)" % weight - self.send_program(prog) - - def set_gravity(self, vector): - """ - set direction of gravity - """ - prog = "set_gravity(%s)" % list(vector) - self.send_program(prog) - - def send_message(self, msg): - """ - send message to the GUI log tab on the robot controller - """ - prog = "textmsg(%s)" % msg - self.send_program(prog) - - def set_digital_out(self, output, val): - """ - set digital output. val is a bool - """ - if val in (True, 1): - val = "True" - else: - val = "False" - self.send_program('digital_out[%s]=%s' % (output, val)) - - def get_analog_inputs(self): - """ - get analog input - """ - return self.secmon.get_analog_inputs() - - def get_analog_in(self, nb, wait=False): - """ - get analog input - """ - return self.secmon.get_analog_in(nb, wait=wait) - - def get_digital_in_bits(self): - """ - get digital output - """ - return self.secmon.get_digital_in_bits() - - def get_digital_in(self, nb, wait=False): - """ - get digital output - """ - return self.secmon.get_digital_in(nb, wait) - - def get_digital_out(self, val, wait=False): - """ - get digital output - """ - return self.secmon.get_digital_out(val, wait=wait) - - def set_analog_out(self, output, val): - """ - set analog output, val is a float - """ - prog = "set_analog_output(%s, %s)" % (output, val) - self.send_program(prog) - - def set_tool_voltage(self, val): - """ - set voltage to be delivered to the tool, val is 0, 12 or 24 - """ - prog = "set_tool_voltage(%s)" % (val) - self.send_program(prog) - - #def wait_for_move(self, radius=0, target=None): - def wait_for_move(self, radius, target): - """ - wait until a move is completed - radius and target args are ignored - """ - try: - self._wait_for_move(radius, target) - except Exception as ex: - self.logger.exception("Exception:") - self.stopj() - raise ex - - def _wait_for_move(self, radius, target): - self.logger.debug("Waiting for move completion with radius %s and target %s", radius, target) - # it is necessary to wait since robot may takes a while to get into running state, - for _ in range(3): - self.secmon.wait() - while True: - if not self.is_running(): - raise RobotException("Robot stopped") - jts = self.secmon.get_joint_data(wait=True) - finished = True - for i in range(0, 6): - # Rmq: q_target is an interpolated target we have no control over - if abs(jts["q_actual%s" % i] - jts["q_target%s" % i]) > self.joinEpsilon: - self.logger.debug("Waiting for end move, q_actual is %s, q_target is %s, diff is %s, epsilon is %s", jts["q_actual%s" % i], jts["q_target%s" % i], jts["q_actual%s" % i] - jts["q_target%s" % i], self.joinEpsilon) - finished = False - break - if finished and not self.secmon.is_program_running(): - self.logger.debug("move has ended") - return - - def getj(self, wait=False): - """ - get joints position - """ - jts = self.secmon.get_joint_data(wait) - return [jts["q_actual0"], jts["q_actual1"], jts["q_actual2"], jts["q_actual3"], jts["q_actual4"], jts["q_actual5"]] - - def speedx(self, command, velocities, acc, min_time): - vels = [round(i, self.max_float_length) for i in velocities] - vels.append(acc) - vels.append(min_time) - prog = "{}([{},{},{},{},{},{}], a={}, t_min={})".format(command, *vels) - self.send_program(prog) - - def speedl(self, velocities, acc, min_time): - """ - move at given velocities until minimum min_time seconds - """ - return self.speedx("speedl", velocities, acc, min_time) - - def speedj(self, velocities, acc, min_time): - """ - move at given joint velocities until minimum min_time seconds - """ - return self.speedx("speedj", velocities, acc, min_time) - - def movej(self, joints, acc=0.1, vel=0.05, radius=0, wait=True, relative=False): - """ - move in joint space - """ - if relative: - l = self.getj() - joints = [v + l[i] for i, v in enumerate(joints)] - prog = self._format_move("movej", joints, acc, vel, radius) - self.send_program(prog) - if wait: - self.wait_for_move(radius, joints[:6]) - return self.getj() - - def movel(self, tpose, acc=0.01, vel=0.01, radius=0, wait=True, relative=False): - """ - Send a movel command to the robot. See URScript documentation. - """ - return self.movex("movel", tpose, acc=acc, vel=vel, radius=radius, wait=wait, relative=relative) - - def movep(self, tpose, acc=0.01, vel=0.01, radius=0, wait=True, relative=False): - """ - Send a movep command to the robot. See URScript documentation. - """ - return self.movex("movep", tpose, acc=acc, vel=vel, radius=radius, wait=wait, relative=relative) - - def servoc(self, tpose, acc=0.01, vel=0.01, radius=0, wait=True, relative=False): - """ - Send a servoc command to the robot. See URScript documentation. - """ - return self.movex("servoc", tpose, acc=acc, vel=vel, radius=radius, wait=wait, relative=relative) - - def _format_move(self, command, tpose, acc, vel, radius, prefix=""): - tpose = [round(i, self.max_float_length) for i in tpose] - tpose.append(acc) - tpose.append(vel) - tpose.append(radius) - return "{}({}[{},{},{},{},{},{}], a={}, v={}, r={})".format(command, prefix, *tpose) - - def movex(self, command, tpose, acc=0.01, vel=0.01, radius=0, wait=True, relative=False): - """ - Send a move command to the robot. since UR robotene have several methods this one - sends whatever is defined in 'command' string - """ - if relative: - l = self.getl() - tpose = [v + l[i] for i, v in enumerate(tpose)] - prog = self._format_move(command, tpose, acc, vel, radius, prefix="p") - self.send_program(prog) - if wait: - self.wait_for_move(radius, tpose[:6]) - return self.getl() - - def getl(self, wait=False, _log=True): - """ - get TCP position - """ - pose = self.secmon.get_cartesian_info(wait) - if pose: - pose = [pose["X"], pose["Y"], pose["Z"], pose["Rx"], pose["Ry"], pose["Rz"]] - if _log: - self.logger.debug("Received pose from robot: %s", pose) - return pose - - def movec(self, pose_via, pose_to, acc=0.01, vel=0.01, radius=0, wait=True): - """ - Move Circular: Move to position (circular in tool-space) - see UR documentation - """ - pose_via = [round(i, self.max_float_length) for i in pose_via] - pose_to = [round(i, self.max_float_length) for i in pose_to] - prog = "movec(p%s, p%s, a=%s, v=%s, r=%s)" % (pose_via, pose_to, acc, vel, radius) - self.send_program(prog) - if wait: - self.wait_for_move(radius, pose_to) - return self.getl() - - def movels(self, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True): - """ - Concatenate several movel commands and applies a blending radius - pose_list is a list of pose. - This method is usefull since any new command from python - to robot make the robot stop - """ - return self.movexs("movel", pose_list, acc, vel, radius, wait) - - def movexs(self, command, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True): - """ - Concatenate several movex commands and applies a blending radius - pose_list is a list of pose. - This method is usefull since any new command from python - to robot make the robot stop - """ - header = "def myProg():\n" - end = "end\n" - prog = header - for idx, pose in enumerate(pose_list): - if idx == (len(pose_list) - 1): - radius = 0 - prog += self._format_move(command, pose, acc, vel, radius, prefix="p") + "\n" - prog += end - self.send_program(prog) - if wait: - self.wait_for_move(radius=0, target=pose_list[-1]) - return self.getl() - - def stopl(self, acc=0.5): - self.send_program("stopl(%s)" % acc) - - def stopj(self, acc=1.5): - self.send_program("stopj(%s)" % acc) - - def stop(self): - self.stopj() - - def close(self): - """ - close connection to robot and stop internal thread - """ - self.logger.info("Closing sockets to robot") - self.secmon.close() - if self.rtmon: - self.rtmon.stop() - - def set_freedrive(self, val): - """ - set robot in freedrive/brackdrive mode where an operator can jogg - the robot to wished pose - """ - if val: - self.send_program("set robotmode freedrive") - else: - self.send_program("set robotmode run") - - def set_simulation(self, val): - if val: - self.send_program("set sim") - else: - self.send_program("set real") - - def get_realtime_monitor(self): - """ - return a pointer to the realtime monitor object - usefull to track robot position for example - """ - if not self.rtmon: - self.logger.info("Opening real-time monitor socket") - self.rtmon = urrtmon.URRTMonitor(self.host) # som information is only available on rt interface - self.rtmon.start() - self.rtmon.set_csys(self.csys) - return self.rtmon - - def translate(self, vect, acc=0.01, vel=0.01, wait=True, command="movel"): - """ - move tool in base coordinate, keeping orientation - """ - p = self.getl() - p[0] += vect[0] - p[1] += vect[1] - p[2] += vect[2] - return self.movex(command, p, vel=vel, acc=acc, wait=wait) - - def up(self, z=0.05, acc=0.01, vel=0.01): - """ - Move up in csys z - """ - p = self.getl() - p[2] += z - self.movel(p, acc=acc, vel=vel) - - def down(self, z=0.05, acc=0.01, vel=0.01): - """ - Move down in csys z - """ - self.up(-z, acc, vel) +import math3d as m3d +import numpy as np +from urx.urrobot import URRobot class Robot(URRobot): @@ -456,7 +40,7 @@ class Robot(URRobot): """ self.csys = transform - def set_orientation(self, orient, acc=0.01, vel=0.01, radius=0, wait=True): + def set_orientation(self, orient, acc=0.01, vel=0.01, wait=True): """ set tool orientation using a orientation matric from math3d or a orientation vector @@ -465,9 +49,9 @@ class Robot(URRobot): orient = m3d.Orientation(orient) trans = self.get_pose() trans.orient = orient - self.set_pose(trans, acc, vel, radius, wait=wait) + self.set_pose(trans, acc, vel, wait=wait) - def translate_tool(self, vect, acc=0.01, vel=0.01, radius=0, wait=True): + def translate_tool(self, vect, acc=0.01, vel=0.01, wait=True): """ move tool in tool coordinate, keeping orientation """ @@ -475,7 +59,7 @@ class Robot(URRobot): if not isinstance(vect, m3d.Vector): vect = m3d.Vector(vect) t.pos += vect - return self.add_pose_tool(t, acc, vel, radius, wait=wait) + return self.add_pose_tool(t, acc, vel, wait=wait) def back(self, z=0.05, acc=0.01, vel=0.01): """ @@ -483,27 +67,27 @@ class Robot(URRobot): """ self.translate_tool((0, 0, -z), acc=acc, vel=vel) - def set_pos(self, vect, acc=0.01, vel=0.01, radius=0, wait=True): + def set_pos(self, vect, acc=0.01, vel=0.01, wait=True): """ set tool to given pos, keeping constant orientation """ if not isinstance(vect, m3d.Vector): vect = m3d.Vector(vect) trans = m3d.Transform(self.get_orientation(), m3d.Vector(vect)) - return self.set_pose(trans, acc, vel, radius, wait=wait) + return self.set_pose(trans, acc, vel, wait=wait) - def movec(self, pose_via, pose_to, acc=0.01, vel=0.01, radius=0, wait=True): + def movec(self, pose_via, pose_to, acc=0.01, vel=0.01, wait=True): """ Move Circular: Move to position (circular in tool-space) see UR documentation """ pose_via = self.csys * m3d.Transform(pose_via) pose_to = self.csys * m3d.Transform(pose_to) - pose = URRobot.movec(self, pose_via.pose_vector, pose_to.pose_vector, acc=acc, vel=vel, wait=wait, radius=radius) + pose = URRobot.movec(self, pose_via.pose_vector, pose_to.pose_vector, acc=acc, vel=vel, wait=wait) if pose is not None: return self.csys.inverse * m3d.Transform(pose) - def set_pose(self, trans, acc=0.01, vel=0.01, radius=0, wait=True, command="movel"): + def set_pose(self, trans, acc=0.01, vel=0.01, wait=True, command="movel"): """ move tcp to point and orientation defined by a transformation UR robots have several move commands, by default movel is used but it can be changed @@ -511,23 +95,23 @@ class Robot(URRobot): """ self.logger.debug("Setting pose to %s", trans.pose_vector) t = self.csys * trans - pose = URRobot.movex(self, command, t.pose_vector, acc=acc, vel=vel, wait=wait, radius=radius) + pose = URRobot.movex(self, command, t.pose_vector, acc=acc, vel=vel, wait=wait) if pose is not None: return self.csys.inverse * m3d.Transform(pose) - def add_pose_base(self, trans, acc=0.01, vel=0.01, radius=0, wait=True, command="movel"): + def add_pose_base(self, trans, acc=0.01, vel=0.01, wait=True, command="movel"): """ Add transform expressed in base coordinate """ pose = self.get_pose() - return self.set_pose(trans * pose, acc, vel, radius, wait=wait, command=command) + return self.set_pose(trans * pose, acc, vel, wait=wait, command=command) - def add_pose_tool(self, trans, acc=0.01, vel=0.01, radius=0, wait=True, command="movel"): + def add_pose_tool(self, trans, acc=0.01, vel=0.01, wait=True, command="movel"): """ Add transform expressed in tool coordinate """ pose = self.get_pose() - return self.set_pose(pose * trans, acc, vel, radius, wait=wait, command=command) + return self.set_pose(pose * trans, acc, vel, wait=wait, command=command) def get_pose(self, wait=False, _log=True): """ @@ -571,7 +155,7 @@ class Robot(URRobot): w = self.csys.orient * pose.orient * m3d.Vector(velocities[3:]) URRobot.speedl(self, np.concatenate((v.array, w.array)), acc, min_time) - def movex(self, command, pose, acc=0.01, vel=0.01, wait=True, relative=False, radius=0): + def movex(self, command, pose, acc=0.01, vel=0.01, wait=True, relative=False): """ Send a move command to the robot. since UR robotene have several methods this one sends whatever is defined in 'command' string @@ -580,7 +164,7 @@ class Robot(URRobot): if relative: return self.add_pose_base(t, acc, vel, wait=wait, command=command) else: - return self.set_pose(t, acc, vel, radius, wait=wait, command=command) + return self.set_pose(t, acc, vel, wait=wait, command=command) def movexs(self, command, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True): """ @@ -618,23 +202,4 @@ class Robot(URRobot): vector = vector.list return URRobot.set_gravity(self, vector) - def _wait_for_move(self, radius, target): - self.logger.debug("Waiting for move completion with math3d using raidus %s and target %s", radius, target) - # it is necessary to wait since robot may takes a while to get into running state, - for _ in range(3): - self.secmon.wait() - target = m3d.Transform(target) - while True: - if not self.is_running(): - raise RobotException("Robot stopped") - pose = self.get_pose(wait=True, _log=False) - dist = pose.dist(target) - self.logger.debug("distance to target is: %s, target dist is %s", dist, radius) - if (dist < radius) or not self.secmon.is_program_running(): - #if (dist < radius): - self.logger.debug("move has ended") - return - -if not MATH3D: - Robot = URRobot diff --git a/urx/urrobot.py b/urx/urrobot.py new file mode 100644 index 0000000..c4051e8 --- /dev/null +++ b/urx/urrobot.py @@ -0,0 +1,438 @@ +""" +Python library to control an UR robot through its TCP/IP interface +Documentation from universal robots: +http://support.universal-robots.com/URRobot/RemoteAccess +""" + +__author__ = "Olivier Roulet-Dubonnet" +__copyright__ = "Copyright 2011-2015, Sintef Raufoss Manufacturing" +__license__ = "GPLv3" + +import logging + +from urx import urrtmon +from urx import ursecmon + + +class RobotException(Exception): + pass + + +class URRobot(object): + + """ + Python interface to socket interface of UR robot. + programs are send to port 3002 + data is read from secondary interface(10Hz?) and real-time interface(125Hz) (called Matlab interface in documentation) + Since parsing the RT interface uses som CPU, and does not support all robots versions, it is disabled by default + The RT interfaces is only used for the get_force related methods + Rmq: A program sent to the robot i executed immendiatly and any running program is stopped + """ + + def __init__(self, host, use_rt=False): + self.logger = logging.getLogger("urx") + self.host = host + self.csys = None + + self.logger.debug("Opening secondary monitor socket") + self.secmon = ursecmon.SecondaryMonitor(self.host) # data from robot at 10Hz + + self.rtmon = None + if use_rt: + self.rtmon = self.get_realtime_monitor() + # precision of joint movem used to wait for move completion + # the value must be conservative! otherwise we may wait forever + self.joinEpsilon = 0.01 + # It seems URScript is limited in the character length of floats it accepts + self.max_float_length = 6 # FIXME: check max length!!! + + self.secmon.wait() # make sure we get data from robot before letting clients access our methods + + def __repr__(self): + return "Robot Object (IP=%s, state=%s)" % (self.host, self.secmon.get_all_data()["RobotModeData"]) + + def __str__(self): + return self.__repr__() + + def is_running(self): + """ + Return True if robot is running (not + necessary running a program, it might be idle) + """ + return self.secmon.running + + def is_program_running(self): + """ + check if program is running. + Warning!!!!!: After sending a program it might take several 10th of + a second before the robot enters the running state + """ + return self.secmon.is_program_running() + + def send_program(self, prog): + """ + send a complete program using urscript to the robot + the program is executed immediatly and any runnning + program is interrupted + """ + self.logger.info("Sending program: " + prog) + self.secmon.send_program(prog) + + def get_tcp_force(self, wait=True): + """ + return measured force in TCP + if wait==True, waits for next packet before returning + """ + return self.rtmon.getTCFForce(wait) + + def get_force(self, wait=True): + """ + length of force vector returned by get_tcp_force + if wait==True, waits for next packet before returning + """ + tcpf = self.get_tcp_force(wait) + force = 0 + for i in tcpf: + force += i**2 + return force**0.5 + + def set_tcp(self, tcp): + """ + set robot flange to tool tip transformation + """ + prog = "set_tcp(p[{}, {}, {}, {}, {}, {}])".format(*tcp) + self.send_program(prog) + + def set_payload(self, weight, cog=None): + """ + set payload in Kg + cog is a vector x,y,z + if cog is not specified, then tool center point is used + """ + if cog: + cog = list(cog) + cog.insert(0, weight) + prog = "set_payload({}, ({},{},{}))".format(*cog) + else: + prog = "set_payload(%s)" % weight + self.send_program(prog) + + def set_gravity(self, vector): + """ + set direction of gravity + """ + prog = "set_gravity(%s)" % list(vector) + self.send_program(prog) + + def send_message(self, msg): + """ + send message to the GUI log tab on the robot controller + """ + prog = "textmsg(%s)" % msg + self.send_program(prog) + + def set_digital_out(self, output, val): + """ + set digital output. val is a bool + """ + if val in (True, 1): + val = "True" + else: + val = "False" + self.send_program('digital_out[%s]=%s' % (output, val)) + + def get_analog_inputs(self): + """ + get analog input + """ + return self.secmon.get_analog_inputs() + + def get_analog_in(self, nb, wait=False): + """ + get analog input + """ + return self.secmon.get_analog_in(nb, wait=wait) + + def get_digital_in_bits(self): + """ + get digital output + """ + return self.secmon.get_digital_in_bits() + + def get_digital_in(self, nb, wait=False): + """ + get digital output + """ + return self.secmon.get_digital_in(nb, wait) + + def get_digital_out(self, val, wait=False): + """ + get digital output + """ + return self.secmon.get_digital_out(val, wait=wait) + + def set_analog_out(self, output, val): + """ + set analog output, val is a float + """ + prog = "set_analog_output(%s, %s)" % (output, val) + self.send_program(prog) + + def set_tool_voltage(self, val): + """ + set voltage to be delivered to the tool, val is 0, 12 or 24 + """ + prog = "set_tool_voltage(%s)" % (val) + self.send_program(prog) + + def _wait_for_move(self, target, radius=0.001, timeout=2): + """ + wait for a move to complete. Unfortunately there is no good way to know when a move has finished + so for every received data from robot we compute a dist equivalent and when it is lower than + 'radius' we return. + if radius is not reached within timeout, and exceptino is raised + """ + self.logger.info("Waiting for move completion using radius %s and target %s", radius, target) + count = 0 + while True: + if not self.is_running(): + raise RobotException("Robot stopped") + dist = self._get_dist(target) + self.logger.debug("distance to target is: %s, target dist is %s", dist, radius) + if dist < radius: + self.logger.debug("move has ended") + return + count += 1 + if count > timeout * 10: + raise RobotException("Goal not reached but no program has been running for {} seconds. dist is {}, target is {}, current pose is {}".format(timeout, dist, target, URRobot.getl(self))) + if self.secmon.is_program_running(): + count = 0 + + def _get_dist(self, target): + pose = URRobot.getl(self, wait=True) + dist = 0 + for i in range(3): + dist += (target[i] - pose[i]) ** 2 + for i in range(3, 6): + dist += ((target[i] - pose[i]) / 5) ** 2 # arbitraty length like + return dist ** 0.5 + + def _wait_for_move_joints(self, radius, target): + #FIXME raise exception if target not reached after x seconds + self.logger.debug("Waiting for joint move completion with radius %s and target %s", radius, target) + while True: + if not self.is_running(): + raise RobotException("Robot stopped") + joints = self.getj(wait=True) + dist = 0 + for i in range(6): + dist += (target[i] - joints[i]) ** 2 + dist = dist ** 0.5 + self.logger.debug("distance to target is: %s, target dist is %s", dist, radius) + if dist < radius: + self.logger.debug("move has ended") + return + + def getj(self, wait=False): + """ + get joints position + """ + jts = self.secmon.get_joint_data(wait) + return [jts["q_actual0"], jts["q_actual1"], jts["q_actual2"], jts["q_actual3"], jts["q_actual4"], jts["q_actual5"]] + + def speedx(self, command, velocities, acc, min_time): + vels = [round(i, self.max_float_length) for i in velocities] + vels.append(acc) + vels.append(min_time) + prog = "{}([{},{},{},{},{},{}], a={}, t_min={})".format(command, *vels) + self.send_program(prog) + + def speedl(self, velocities, acc, min_time): + """ + move at given velocities until minimum min_time seconds + """ + return self.speedx("speedl", velocities, acc, min_time) + + def speedj(self, velocities, acc, min_time): + """ + move at given joint velocities until minimum min_time seconds + """ + return self.speedx("speedj", velocities, acc, min_time) + + def movej(self, joints, acc=0.1, vel=0.05, wait=True, relative=False): + """ + move in joint space + """ + if relative: + l = self.getj() + joints = [v + l[i] for i, v in enumerate(joints)] + prog = self._format_move("movej", joints, acc, vel) + self.send_program(prog) + if wait: + self._wait_for_move_joints(0.001, joints[:6]) + return self.getj() + + def movel(self, tpose, acc=0.01, vel=0.01, wait=True, relative=False): + """ + Send a movel command to the robot. See URScript documentation. + """ + return self.movex("movel", tpose, acc=acc, vel=vel, wait=wait, relative=relative) + + def movep(self, tpose, acc=0.01, vel=0.01, wait=True, relative=False): + """ + Send a movep command to the robot. See URScript documentation. + """ + return self.movex("movep", tpose, acc=acc, vel=vel, wait=wait, relative=relative) + + def servoc(self, tpose, acc=0.01, vel=0.01, wait=True, relative=False): + """ + Send a servoc command to the robot. See URScript documentation. + """ + return self.movex("servoc", tpose, acc=acc, vel=vel, wait=wait, relative=relative) + + def _format_move(self, command, tpose, acc, vel, radius=0, prefix=""): + tpose = [round(i, self.max_float_length) for i in tpose] + tpose.append(acc) + tpose.append(vel) + tpose.append(radius) + return "{}({}[{},{},{},{},{},{}], a={}, v={}, r={})".format(command, prefix, *tpose) + + def movex(self, command, tpose, acc=0.01, vel=0.01, wait=True, relative=False): + """ + Send a move command to the robot. since UR robotene have several methods this one + sends whatever is defined in 'command' string + """ + if relative: + l = self.getl() + tpose = [v + l[i] for i, v in enumerate(tpose)] + prog = self._format_move(command, tpose, acc, vel, prefix="p") + self.send_program(prog) + if wait: + self._wait_for_move(tpose[:6]) + return self.getl() + + def getl(self, wait=False, _log=True): + """ + get TCP position + """ + pose = self.secmon.get_cartesian_info(wait) + if pose: + pose = [pose["X"], pose["Y"], pose["Z"], pose["Rx"], pose["Ry"], pose["Rz"]] + if _log: + self.logger.debug("Received pose from robot: %s", pose) + return pose + + def movec(self, pose_via, pose_to, acc=0.01, vel=0.01, wait=True): + """ + Move Circular: Move to position (circular in tool-space) + see UR documentation + """ + pose_via = [round(i, self.max_float_length) for i in pose_via] + pose_to = [round(i, self.max_float_length) for i in pose_to] + prog = "movec(p%s, p%s, a=%s, v=%s, r=%s)" % (pose_via, pose_to, acc, vel, "0") + self.send_program(prog) + if wait: + self._wait_for_move(pose_to) + return self.getl() + + def movels(self, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True): + """ + Concatenate several movel commands and applies a blending radius + pose_list is a list of pose. + This method is usefull since any new command from python + to robot make the robot stop + """ + return self.movexs("movel", pose_list, acc, vel, radius, wait) + + def movexs(self, command, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True): + """ + Concatenate several movex commands and applies a blending radius + pose_list is a list of pose. + This method is usefull since any new command from python + to robot make the robot stop + """ + header = "def myProg():\n" + end = "end\n" + prog = header + for idx, pose in enumerate(pose_list): + if idx == (len(pose_list) - 1): + radius = 0 + prog += self._format_move(command, pose, acc, vel, radius, prefix="p") + "\n" + prog += end + self.send_program(prog) + if wait: + self._wait_for_move(target=pose_list[-1]) + return self.getl() + + def stopl(self, acc=0.5): + self.send_program("stopl(%s)" % acc) + + def stopj(self, acc=1.5): + self.send_program("stopj(%s)" % acc) + + def stop(self): + self.stopj() + + def close(self): + """ + close connection to robot and stop internal thread + """ + self.logger.info("Closing sockets to robot") + self.secmon.close() + if self.rtmon: + self.rtmon.stop() + + def set_freedrive(self, val): + """ + set robot in freedrive/brackdrive mode where an operator can jogg + the robot to wished pose + """ + if val: + self.send_program("set robotmode freedrive") + else: + self.send_program("set robotmode run") + + def set_simulation(self, val): + if val: + self.send_program("set sim") + else: + self.send_program("set real") + + def get_realtime_monitor(self): + """ + return a pointer to the realtime monitor object + usefull to track robot position for example + """ + if not self.rtmon: + self.logger.info("Opening real-time monitor socket") + self.rtmon = urrtmon.URRTMonitor(self.host) # som information is only available on rt interface + self.rtmon.start() + self.rtmon.set_csys(self.csys) + return self.rtmon + + def translate(self, vect, acc=0.01, vel=0.01, wait=True, command="movel"): + """ + move tool in base coordinate, keeping orientation + """ + p = self.getl() + p[0] += vect[0] + p[1] += vect[1] + p[2] += vect[2] + return self.movex(command, p, vel=vel, acc=acc, wait=wait) + + def up(self, z=0.05, acc=0.01, vel=0.01): + """ + Move up in csys z + """ + p = self.getl() + p[2] += z + self.movel(p, acc=acc, vel=vel) + + def down(self, z=0.05, acc=0.01, vel=0.01): + """ + Move down in csys z + """ + self.up(-z, acc, vel) + + +