rewrite wait_for_move, split robot file, check test_all.py

This commit is contained in:
Olivier R-D 2015-06-12 10:34:42 +02:00
parent 06644d96c8
commit 5e4c8a0696
8 changed files with 498 additions and 495 deletions

View File

@ -1,11 +1,15 @@
import urx
from IPython import embed
import logging
if __name__ == "__main__":
try:
robot = urx.Robot("192.168.1.6")
logging.basicConfig(level=logging.INFO)
#robot = urx.Robot("192.168.1.6")
robot = urx.Robot("192.168.1.100")
#robot = urx.Robot("localhost")
r = robot
print("Robot object is available as robot or r")
embed()
finally:
robot.shutdown()
robot.close()

View File

@ -69,7 +69,8 @@ class Service(object):
if abs(val) < 0.2:
val = 0
tmp = "self.cmd.axis" + str(i) + " = " + str(val)
#print(tmp)
if val != 0:
print(tmp)
exec(tmp)
#get button state
@ -92,20 +93,20 @@ class Service(object):
speeds = [0, 0, 0, 0, 0, 0]
#get linear speed from joystick
speeds[0] = self.cmd.axis0 * self.linear_velocity
speeds[0] = -1 * self.cmd.axis0 * self.linear_velocity
speeds[1] = self.cmd.axis1 * self.linear_velocity
if self.cmd.btn4 and not self.cmd.btn6:
speeds[2] = self.linear_velocity
if self.cmd.btn6 and not self.cmd.btn4:
if self.cmd.btn4 and not self.cmd.axis2:
speeds[2] = -self.linear_velocity
if self.cmd.axis2 and not self.cmd.btn4:
speeds[2] = self.cmd.axis2 * self.linear_velocity
#get rotational speed from joystick
speeds[4] = -1 * self.cmd.axis2 * self.rotational_velocity
speeds[3] = self.cmd.axis3 * self.rotational_velocity
if self.cmd.btn1 and not self.cmd.btn2:
speeds[4] = -1 * self.cmd.axis3 * self.rotational_velocity
speeds[3] = -1 * self.cmd.axis4 * self.rotational_velocity
if self.cmd.btn5 and not self.cmd.axis5:
speeds[5] = self.rotational_velocity
if self.cmd.btn2 and not self.cmd.btn1:
speeds[5] = -self.rotational_velocity
if self.cmd.axis5 and not self.cmd.btn5:
speeds[5] = self.cmd.axis5 * -self.rotational_velocity
#for some reasons everything is inversed
speeds = [-i for i in speeds]
@ -117,7 +118,6 @@ class Service(object):
else:
self.robot.speedl(speeds, acc=self.acceleration, min_time=2)
time.sleep(0.12)#URX secondday port accepts command at 10Hz, no need to go faster
def close(self):
if self.joystick:
@ -125,12 +125,9 @@ class Service(object):
if __name__ == "__main__":
#create robot object
robot = urx.Robot('192.168.0.90')
#robot = urx.Robot('localhost')
#set a base transformation for robot (optional)
robot.set_tcp((0, 0, 0, 0, 0, 0))
robot.csys.orient.rotate_zb(-pi/4)
robot = urx.Robot("192.168.1.100")
r = robot
#start joystick service with given max speed and acceleration
service = Service(robot, linear_velocity=0.1, rotational_velocity=0.1, acceleration=0.1)

View File

@ -4,8 +4,8 @@ import urx
import logging
if __name__ == "__main__":
#rob = urx.Robot("192.168.1.6")
rob = urx.Robot("localhost")
rob = urx.Robot("192.168.1.100")
#rob = urx.Robot("localhost")
rob.set_tcp((0,0,0,0,0,0))
rob.set_payload(0.5, (0,0,0))
try:

View File

@ -5,8 +5,8 @@ import logging
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
#rob = urx.Robot("192.168.1.6")
rob = urx.Robot("localhost")
rob = urx.Robot("192.168.1.100")
#rob = urx.Robot("localhost")
rob.set_tcp((0,0,0,0,0,0))
rob.set_payload(0.5, (0,0,0))
try:

View File

@ -14,17 +14,18 @@ if sys.version_info[0] < 3: # support python v2
def wait():
if do_wait:
print("Click enter to continue")
input()
if __name__ == "__main__":
logging.basicConfig(level=logging.WARN)
logging.basicConfig(level=logging.INFO)
do_wait = True
if len(sys.argv) > 1:
do_wait = False
#rob = urx.Robot("192.168.1.6")
rob = urx.Robot("localhost")
rob = urx.Robot("192.168.1.100")
#rob = urx.Robot("localhost")
rob.set_tcp((0, 0, 0, 0, 0, 0))
rob.set_payload(0.5, (0, 0, 0))
try:
@ -49,8 +50,8 @@ if __name__ == "__main__":
wait()
pose[2] += l
rob.movel(pose, acc=a, vel=v, wait=False)
print("Waiting for end move")
rob.wait_for_move(0, pose)
print("Waiting 2s for end move")
time.sleep(2)
print("Moving through several points with a radius")
wait()
@ -77,22 +78,14 @@ if __name__ == "__main__":
print("stop robot")
rob.stopj()
print("Test movep, it will fail on older controllers")
wait()
init = rob.get_pose()
pose = init.copy()
for _ in range(3):
pose.pos[0] += l
rob.movep(pose, acc=a, vel=v, radius=r)
rob.movep(init, acc=a, vel=v, radius=r) # back to start
print("Test movec")
wait()
via = init.copy()
pose = rob.get_pose()
via = pose.copy()
via.pos[0] += l
to = via.copy()
to.pos[1] += l
rob.movec(via, to, acc=a, vel=v, radius=r)
rob.movec(via, to, acc=a, vel=v)
print("Sending robot back to original position")
rob.movej(initj, acc=0.8, vel=0.2)

View File

@ -4,4 +4,10 @@ Python library to control an UR robot through its TCP/IP interface
__version__ = "0.9.0"
from urx.robot import Robot, RobotException, URRobot
from urx.urrobot import RobotException, URRobot
try:
from urx.robot import Robot
except ImportError as ex:
print("Exception while importing math3d base robot, disabling use of matrices", ex)
Robot = URRobot

View File

@ -8,426 +8,10 @@ __author__ = "Olivier Roulet-Dubonnet"
__copyright__ = "Copyright 2011-2015, Sintef Raufoss Manufacturing"
__license__ = "GPLv3"
import logging
MATH3D = True
try:
import math3d as m3d
import numpy as np
except ImportError:
MATH3D = False
print("python-math3d library could not be found on this computer, disabling use of matrices and path blending")
from urx import urrtmon
from urx import ursecmon
class RobotException(Exception):
pass
class URRobot(object):
"""
Python interface to socket interface of UR robot.
programs are send to port 3002
data is read from secondary interface(10Hz?) and real-time interface(125Hz) (called Matlab interface in documentation)
Since parsing the RT interface uses som CPU, and does not support all robots versions, it is disabled by default
The RT interfaces is only used for the get_force related methods
Rmq: A program sent to the robot i executed immendiatly and any running program is stopped
"""
def __init__(self, host, use_rt=False):
self.logger = logging.getLogger("urx")
self.host = host
self.csys = None
self.logger.debug("Opening secondary monitor socket")
self.secmon = ursecmon.SecondaryMonitor(self.host) # data from robot at 10Hz
self.rtmon = None
if use_rt:
self.rtmon = self.get_realtime_monitor()
# precision of joint movem used to wait for move completion
# the value must be conservative! otherwise we may wait forever
self.joinEpsilon = 0.01
# It seems URScript is limited in the character length of floats it accepts
self.max_float_length = 6 # FIXME: check max length!!!
self.secmon.wait() # make sure we get data from robot before letting clients access our methods
def __repr__(self):
return "Robot Object (IP=%s, state=%s)" % (self.host, self.secmon.get_all_data()["RobotModeData"])
def __str__(self):
return self.__repr__()
def is_running(self):
"""
Return True if robot is running (not
necessary running a program, it might be idle)
"""
return self.secmon.running
def is_program_running(self):
"""
check if program is running.
Warning!!!!!: After sending a program it might take several 10th of
a second before the robot enters the running state
"""
return self.secmon.is_program_running()
def send_program(self, prog):
"""
send a complete program using urscript to the robot
the program is executed immediatly and any runnning
program is interrupted
"""
self.logger.info("Sending program: " + prog)
self.secmon.send_program(prog)
def get_tcp_force(self, wait=True):
"""
return measured force in TCP
if wait==True, waits for next packet before returning
"""
return self.rtmon.getTCFForce(wait)
def get_force(self, wait=True):
"""
length of force vector returned by get_tcp_force
if wait==True, waits for next packet before returning
"""
tcpf = self.get_tcp_force(wait)
force = 0
for i in tcpf:
force += i**2
return force**0.5
def set_tcp(self, tcp):
"""
set robot flange to tool tip transformation
"""
prog = "set_tcp(p[{}, {}, {}, {}, {}, {}])".format(*tcp)
self.send_program(prog)
def set_payload(self, weight, cog=None):
"""
set payload in Kg
cog is a vector x,y,z
if cog is not specified, then tool center point is used
"""
if cog:
cog = list(cog)
cog.insert(0, weight)
prog = "set_payload({}, ({},{},{}))".format(*cog)
else:
prog = "set_payload(%s)" % weight
self.send_program(prog)
def set_gravity(self, vector):
"""
set direction of gravity
"""
prog = "set_gravity(%s)" % list(vector)
self.send_program(prog)
def send_message(self, msg):
"""
send message to the GUI log tab on the robot controller
"""
prog = "textmsg(%s)" % msg
self.send_program(prog)
def set_digital_out(self, output, val):
"""
set digital output. val is a bool
"""
if val in (True, 1):
val = "True"
else:
val = "False"
self.send_program('digital_out[%s]=%s' % (output, val))
def get_analog_inputs(self):
"""
get analog input
"""
return self.secmon.get_analog_inputs()
def get_analog_in(self, nb, wait=False):
"""
get analog input
"""
return self.secmon.get_analog_in(nb, wait=wait)
def get_digital_in_bits(self):
"""
get digital output
"""
return self.secmon.get_digital_in_bits()
def get_digital_in(self, nb, wait=False):
"""
get digital output
"""
return self.secmon.get_digital_in(nb, wait)
def get_digital_out(self, val, wait=False):
"""
get digital output
"""
return self.secmon.get_digital_out(val, wait=wait)
def set_analog_out(self, output, val):
"""
set analog output, val is a float
"""
prog = "set_analog_output(%s, %s)" % (output, val)
self.send_program(prog)
def set_tool_voltage(self, val):
"""
set voltage to be delivered to the tool, val is 0, 12 or 24
"""
prog = "set_tool_voltage(%s)" % (val)
self.send_program(prog)
#def wait_for_move(self, radius=0, target=None):
def wait_for_move(self, radius, target):
"""
wait until a move is completed
radius and target args are ignored
"""
try:
self._wait_for_move(radius, target)
except Exception as ex:
self.logger.exception("Exception:")
self.stopj()
raise ex
def _wait_for_move(self, radius, target):
self.logger.debug("Waiting for move completion with radius %s and target %s", radius, target)
# it is necessary to wait since robot may takes a while to get into running state,
for _ in range(3):
self.secmon.wait()
while True:
if not self.is_running():
raise RobotException("Robot stopped")
jts = self.secmon.get_joint_data(wait=True)
finished = True
for i in range(0, 6):
# Rmq: q_target is an interpolated target we have no control over
if abs(jts["q_actual%s" % i] - jts["q_target%s" % i]) > self.joinEpsilon:
self.logger.debug("Waiting for end move, q_actual is %s, q_target is %s, diff is %s, epsilon is %s", jts["q_actual%s" % i], jts["q_target%s" % i], jts["q_actual%s" % i] - jts["q_target%s" % i], self.joinEpsilon)
finished = False
break
if finished and not self.secmon.is_program_running():
self.logger.debug("move has ended")
return
def getj(self, wait=False):
"""
get joints position
"""
jts = self.secmon.get_joint_data(wait)
return [jts["q_actual0"], jts["q_actual1"], jts["q_actual2"], jts["q_actual3"], jts["q_actual4"], jts["q_actual5"]]
def speedx(self, command, velocities, acc, min_time):
vels = [round(i, self.max_float_length) for i in velocities]
vels.append(acc)
vels.append(min_time)
prog = "{}([{},{},{},{},{},{}], a={}, t_min={})".format(command, *vels)
self.send_program(prog)
def speedl(self, velocities, acc, min_time):
"""
move at given velocities until minimum min_time seconds
"""
return self.speedx("speedl", velocities, acc, min_time)
def speedj(self, velocities, acc, min_time):
"""
move at given joint velocities until minimum min_time seconds
"""
return self.speedx("speedj", velocities, acc, min_time)
def movej(self, joints, acc=0.1, vel=0.05, radius=0, wait=True, relative=False):
"""
move in joint space
"""
if relative:
l = self.getj()
joints = [v + l[i] for i, v in enumerate(joints)]
prog = self._format_move("movej", joints, acc, vel, radius)
self.send_program(prog)
if wait:
self.wait_for_move(radius, joints[:6])
return self.getj()
def movel(self, tpose, acc=0.01, vel=0.01, radius=0, wait=True, relative=False):
"""
Send a movel command to the robot. See URScript documentation.
"""
return self.movex("movel", tpose, acc=acc, vel=vel, radius=radius, wait=wait, relative=relative)
def movep(self, tpose, acc=0.01, vel=0.01, radius=0, wait=True, relative=False):
"""
Send a movep command to the robot. See URScript documentation.
"""
return self.movex("movep", tpose, acc=acc, vel=vel, radius=radius, wait=wait, relative=relative)
def servoc(self, tpose, acc=0.01, vel=0.01, radius=0, wait=True, relative=False):
"""
Send a servoc command to the robot. See URScript documentation.
"""
return self.movex("servoc", tpose, acc=acc, vel=vel, radius=radius, wait=wait, relative=relative)
def _format_move(self, command, tpose, acc, vel, radius, prefix=""):
tpose = [round(i, self.max_float_length) for i in tpose]
tpose.append(acc)
tpose.append(vel)
tpose.append(radius)
return "{}({}[{},{},{},{},{},{}], a={}, v={}, r={})".format(command, prefix, *tpose)
def movex(self, command, tpose, acc=0.01, vel=0.01, radius=0, wait=True, relative=False):
"""
Send a move command to the robot. since UR robotene have several methods this one
sends whatever is defined in 'command' string
"""
if relative:
l = self.getl()
tpose = [v + l[i] for i, v in enumerate(tpose)]
prog = self._format_move(command, tpose, acc, vel, radius, prefix="p")
self.send_program(prog)
if wait:
self.wait_for_move(radius, tpose[:6])
return self.getl()
def getl(self, wait=False, _log=True):
"""
get TCP position
"""
pose = self.secmon.get_cartesian_info(wait)
if pose:
pose = [pose["X"], pose["Y"], pose["Z"], pose["Rx"], pose["Ry"], pose["Rz"]]
if _log:
self.logger.debug("Received pose from robot: %s", pose)
return pose
def movec(self, pose_via, pose_to, acc=0.01, vel=0.01, radius=0, wait=True):
"""
Move Circular: Move to position (circular in tool-space)
see UR documentation
"""
pose_via = [round(i, self.max_float_length) for i in pose_via]
pose_to = [round(i, self.max_float_length) for i in pose_to]
prog = "movec(p%s, p%s, a=%s, v=%s, r=%s)" % (pose_via, pose_to, acc, vel, radius)
self.send_program(prog)
if wait:
self.wait_for_move(radius, pose_to)
return self.getl()
def movels(self, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True):
"""
Concatenate several movel commands and applies a blending radius
pose_list is a list of pose.
This method is usefull since any new command from python
to robot make the robot stop
"""
return self.movexs("movel", pose_list, acc, vel, radius, wait)
def movexs(self, command, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True):
"""
Concatenate several movex commands and applies a blending radius
pose_list is a list of pose.
This method is usefull since any new command from python
to robot make the robot stop
"""
header = "def myProg():\n"
end = "end\n"
prog = header
for idx, pose in enumerate(pose_list):
if idx == (len(pose_list) - 1):
radius = 0
prog += self._format_move(command, pose, acc, vel, radius, prefix="p") + "\n"
prog += end
self.send_program(prog)
if wait:
self.wait_for_move(radius=0, target=pose_list[-1])
return self.getl()
def stopl(self, acc=0.5):
self.send_program("stopl(%s)" % acc)
def stopj(self, acc=1.5):
self.send_program("stopj(%s)" % acc)
def stop(self):
self.stopj()
def close(self):
"""
close connection to robot and stop internal thread
"""
self.logger.info("Closing sockets to robot")
self.secmon.close()
if self.rtmon:
self.rtmon.stop()
def set_freedrive(self, val):
"""
set robot in freedrive/brackdrive mode where an operator can jogg
the robot to wished pose
"""
if val:
self.send_program("set robotmode freedrive")
else:
self.send_program("set robotmode run")
def set_simulation(self, val):
if val:
self.send_program("set sim")
else:
self.send_program("set real")
def get_realtime_monitor(self):
"""
return a pointer to the realtime monitor object
usefull to track robot position for example
"""
if not self.rtmon:
self.logger.info("Opening real-time monitor socket")
self.rtmon = urrtmon.URRTMonitor(self.host) # som information is only available on rt interface
self.rtmon.start()
self.rtmon.set_csys(self.csys)
return self.rtmon
def translate(self, vect, acc=0.01, vel=0.01, wait=True, command="movel"):
"""
move tool in base coordinate, keeping orientation
"""
p = self.getl()
p[0] += vect[0]
p[1] += vect[1]
p[2] += vect[2]
return self.movex(command, p, vel=vel, acc=acc, wait=wait)
def up(self, z=0.05, acc=0.01, vel=0.01):
"""
Move up in csys z
"""
p = self.getl()
p[2] += z
self.movel(p, acc=acc, vel=vel)
def down(self, z=0.05, acc=0.01, vel=0.01):
"""
Move down in csys z
"""
self.up(-z, acc, vel)
import math3d as m3d
import numpy as np
from urx.urrobot import URRobot
class Robot(URRobot):
@ -456,7 +40,7 @@ class Robot(URRobot):
"""
self.csys = transform
def set_orientation(self, orient, acc=0.01, vel=0.01, radius=0, wait=True):
def set_orientation(self, orient, acc=0.01, vel=0.01, wait=True):
"""
set tool orientation using a orientation matric from math3d
or a orientation vector
@ -465,9 +49,9 @@ class Robot(URRobot):
orient = m3d.Orientation(orient)
trans = self.get_pose()
trans.orient = orient
self.set_pose(trans, acc, vel, radius, wait=wait)
self.set_pose(trans, acc, vel, wait=wait)
def translate_tool(self, vect, acc=0.01, vel=0.01, radius=0, wait=True):
def translate_tool(self, vect, acc=0.01, vel=0.01, wait=True):
"""
move tool in tool coordinate, keeping orientation
"""
@ -475,7 +59,7 @@ class Robot(URRobot):
if not isinstance(vect, m3d.Vector):
vect = m3d.Vector(vect)
t.pos += vect
return self.add_pose_tool(t, acc, vel, radius, wait=wait)
return self.add_pose_tool(t, acc, vel, wait=wait)
def back(self, z=0.05, acc=0.01, vel=0.01):
"""
@ -483,27 +67,27 @@ class Robot(URRobot):
"""
self.translate_tool((0, 0, -z), acc=acc, vel=vel)
def set_pos(self, vect, acc=0.01, vel=0.01, radius=0, wait=True):
def set_pos(self, vect, acc=0.01, vel=0.01, wait=True):
"""
set tool to given pos, keeping constant orientation
"""
if not isinstance(vect, m3d.Vector):
vect = m3d.Vector(vect)
trans = m3d.Transform(self.get_orientation(), m3d.Vector(vect))
return self.set_pose(trans, acc, vel, radius, wait=wait)
return self.set_pose(trans, acc, vel, wait=wait)
def movec(self, pose_via, pose_to, acc=0.01, vel=0.01, radius=0, wait=True):
def movec(self, pose_via, pose_to, acc=0.01, vel=0.01, wait=True):
"""
Move Circular: Move to position (circular in tool-space)
see UR documentation
"""
pose_via = self.csys * m3d.Transform(pose_via)
pose_to = self.csys * m3d.Transform(pose_to)
pose = URRobot.movec(self, pose_via.pose_vector, pose_to.pose_vector, acc=acc, vel=vel, wait=wait, radius=radius)
pose = URRobot.movec(self, pose_via.pose_vector, pose_to.pose_vector, acc=acc, vel=vel, wait=wait)
if pose is not None:
return self.csys.inverse * m3d.Transform(pose)
def set_pose(self, trans, acc=0.01, vel=0.01, radius=0, wait=True, command="movel"):
def set_pose(self, trans, acc=0.01, vel=0.01, wait=True, command="movel"):
"""
move tcp to point and orientation defined by a transformation
UR robots have several move commands, by default movel is used but it can be changed
@ -511,23 +95,23 @@ class Robot(URRobot):
"""
self.logger.debug("Setting pose to %s", trans.pose_vector)
t = self.csys * trans
pose = URRobot.movex(self, command, t.pose_vector, acc=acc, vel=vel, wait=wait, radius=radius)
pose = URRobot.movex(self, command, t.pose_vector, acc=acc, vel=vel, wait=wait)
if pose is not None:
return self.csys.inverse * m3d.Transform(pose)
def add_pose_base(self, trans, acc=0.01, vel=0.01, radius=0, wait=True, command="movel"):
def add_pose_base(self, trans, acc=0.01, vel=0.01, wait=True, command="movel"):
"""
Add transform expressed in base coordinate
"""
pose = self.get_pose()
return self.set_pose(trans * pose, acc, vel, radius, wait=wait, command=command)
return self.set_pose(trans * pose, acc, vel, wait=wait, command=command)
def add_pose_tool(self, trans, acc=0.01, vel=0.01, radius=0, wait=True, command="movel"):
def add_pose_tool(self, trans, acc=0.01, vel=0.01, wait=True, command="movel"):
"""
Add transform expressed in tool coordinate
"""
pose = self.get_pose()
return self.set_pose(pose * trans, acc, vel, radius, wait=wait, command=command)
return self.set_pose(pose * trans, acc, vel, wait=wait, command=command)
def get_pose(self, wait=False, _log=True):
"""
@ -571,7 +155,7 @@ class Robot(URRobot):
w = self.csys.orient * pose.orient * m3d.Vector(velocities[3:])
URRobot.speedl(self, np.concatenate((v.array, w.array)), acc, min_time)
def movex(self, command, pose, acc=0.01, vel=0.01, wait=True, relative=False, radius=0):
def movex(self, command, pose, acc=0.01, vel=0.01, wait=True, relative=False):
"""
Send a move command to the robot. since UR robotene have several methods this one
sends whatever is defined in 'command' string
@ -580,7 +164,7 @@ class Robot(URRobot):
if relative:
return self.add_pose_base(t, acc, vel, wait=wait, command=command)
else:
return self.set_pose(t, acc, vel, radius, wait=wait, command=command)
return self.set_pose(t, acc, vel, wait=wait, command=command)
def movexs(self, command, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True):
"""
@ -618,23 +202,4 @@ class Robot(URRobot):
vector = vector.list
return URRobot.set_gravity(self, vector)
def _wait_for_move(self, radius, target):
self.logger.debug("Waiting for move completion with math3d using raidus %s and target %s", radius, target)
# it is necessary to wait since robot may takes a while to get into running state,
for _ in range(3):
self.secmon.wait()
target = m3d.Transform(target)
while True:
if not self.is_running():
raise RobotException("Robot stopped")
pose = self.get_pose(wait=True, _log=False)
dist = pose.dist(target)
self.logger.debug("distance to target is: %s, target dist is %s", dist, radius)
if (dist < radius) or not self.secmon.is_program_running():
#if (dist < radius):
self.logger.debug("move has ended")
return
if not MATH3D:
Robot = URRobot

438
urx/urrobot.py Normal file
View File

@ -0,0 +1,438 @@
"""
Python library to control an UR robot through its TCP/IP interface
Documentation from universal robots:
http://support.universal-robots.com/URRobot/RemoteAccess
"""
__author__ = "Olivier Roulet-Dubonnet"
__copyright__ = "Copyright 2011-2015, Sintef Raufoss Manufacturing"
__license__ = "GPLv3"
import logging
from urx import urrtmon
from urx import ursecmon
class RobotException(Exception):
pass
class URRobot(object):
"""
Python interface to socket interface of UR robot.
programs are send to port 3002
data is read from secondary interface(10Hz?) and real-time interface(125Hz) (called Matlab interface in documentation)
Since parsing the RT interface uses som CPU, and does not support all robots versions, it is disabled by default
The RT interfaces is only used for the get_force related methods
Rmq: A program sent to the robot i executed immendiatly and any running program is stopped
"""
def __init__(self, host, use_rt=False):
self.logger = logging.getLogger("urx")
self.host = host
self.csys = None
self.logger.debug("Opening secondary monitor socket")
self.secmon = ursecmon.SecondaryMonitor(self.host) # data from robot at 10Hz
self.rtmon = None
if use_rt:
self.rtmon = self.get_realtime_monitor()
# precision of joint movem used to wait for move completion
# the value must be conservative! otherwise we may wait forever
self.joinEpsilon = 0.01
# It seems URScript is limited in the character length of floats it accepts
self.max_float_length = 6 # FIXME: check max length!!!
self.secmon.wait() # make sure we get data from robot before letting clients access our methods
def __repr__(self):
return "Robot Object (IP=%s, state=%s)" % (self.host, self.secmon.get_all_data()["RobotModeData"])
def __str__(self):
return self.__repr__()
def is_running(self):
"""
Return True if robot is running (not
necessary running a program, it might be idle)
"""
return self.secmon.running
def is_program_running(self):
"""
check if program is running.
Warning!!!!!: After sending a program it might take several 10th of
a second before the robot enters the running state
"""
return self.secmon.is_program_running()
def send_program(self, prog):
"""
send a complete program using urscript to the robot
the program is executed immediatly and any runnning
program is interrupted
"""
self.logger.info("Sending program: " + prog)
self.secmon.send_program(prog)
def get_tcp_force(self, wait=True):
"""
return measured force in TCP
if wait==True, waits for next packet before returning
"""
return self.rtmon.getTCFForce(wait)
def get_force(self, wait=True):
"""
length of force vector returned by get_tcp_force
if wait==True, waits for next packet before returning
"""
tcpf = self.get_tcp_force(wait)
force = 0
for i in tcpf:
force += i**2
return force**0.5
def set_tcp(self, tcp):
"""
set robot flange to tool tip transformation
"""
prog = "set_tcp(p[{}, {}, {}, {}, {}, {}])".format(*tcp)
self.send_program(prog)
def set_payload(self, weight, cog=None):
"""
set payload in Kg
cog is a vector x,y,z
if cog is not specified, then tool center point is used
"""
if cog:
cog = list(cog)
cog.insert(0, weight)
prog = "set_payload({}, ({},{},{}))".format(*cog)
else:
prog = "set_payload(%s)" % weight
self.send_program(prog)
def set_gravity(self, vector):
"""
set direction of gravity
"""
prog = "set_gravity(%s)" % list(vector)
self.send_program(prog)
def send_message(self, msg):
"""
send message to the GUI log tab on the robot controller
"""
prog = "textmsg(%s)" % msg
self.send_program(prog)
def set_digital_out(self, output, val):
"""
set digital output. val is a bool
"""
if val in (True, 1):
val = "True"
else:
val = "False"
self.send_program('digital_out[%s]=%s' % (output, val))
def get_analog_inputs(self):
"""
get analog input
"""
return self.secmon.get_analog_inputs()
def get_analog_in(self, nb, wait=False):
"""
get analog input
"""
return self.secmon.get_analog_in(nb, wait=wait)
def get_digital_in_bits(self):
"""
get digital output
"""
return self.secmon.get_digital_in_bits()
def get_digital_in(self, nb, wait=False):
"""
get digital output
"""
return self.secmon.get_digital_in(nb, wait)
def get_digital_out(self, val, wait=False):
"""
get digital output
"""
return self.secmon.get_digital_out(val, wait=wait)
def set_analog_out(self, output, val):
"""
set analog output, val is a float
"""
prog = "set_analog_output(%s, %s)" % (output, val)
self.send_program(prog)
def set_tool_voltage(self, val):
"""
set voltage to be delivered to the tool, val is 0, 12 or 24
"""
prog = "set_tool_voltage(%s)" % (val)
self.send_program(prog)
def _wait_for_move(self, target, radius=0.001, timeout=2):
"""
wait for a move to complete. Unfortunately there is no good way to know when a move has finished
so for every received data from robot we compute a dist equivalent and when it is lower than
'radius' we return.
if radius is not reached within timeout, and exceptino is raised
"""
self.logger.info("Waiting for move completion using radius %s and target %s", radius, target)
count = 0
while True:
if not self.is_running():
raise RobotException("Robot stopped")
dist = self._get_dist(target)
self.logger.debug("distance to target is: %s, target dist is %s", dist, radius)
if dist < radius:
self.logger.debug("move has ended")
return
count += 1
if count > timeout * 10:
raise RobotException("Goal not reached but no program has been running for {} seconds. dist is {}, target is {}, current pose is {}".format(timeout, dist, target, URRobot.getl(self)))
if self.secmon.is_program_running():
count = 0
def _get_dist(self, target):
pose = URRobot.getl(self, wait=True)
dist = 0
for i in range(3):
dist += (target[i] - pose[i]) ** 2
for i in range(3, 6):
dist += ((target[i] - pose[i]) / 5) ** 2 # arbitraty length like
return dist ** 0.5
def _wait_for_move_joints(self, radius, target):
#FIXME raise exception if target not reached after x seconds
self.logger.debug("Waiting for joint move completion with radius %s and target %s", radius, target)
while True:
if not self.is_running():
raise RobotException("Robot stopped")
joints = self.getj(wait=True)
dist = 0
for i in range(6):
dist += (target[i] - joints[i]) ** 2
dist = dist ** 0.5
self.logger.debug("distance to target is: %s, target dist is %s", dist, radius)
if dist < radius:
self.logger.debug("move has ended")
return
def getj(self, wait=False):
"""
get joints position
"""
jts = self.secmon.get_joint_data(wait)
return [jts["q_actual0"], jts["q_actual1"], jts["q_actual2"], jts["q_actual3"], jts["q_actual4"], jts["q_actual5"]]
def speedx(self, command, velocities, acc, min_time):
vels = [round(i, self.max_float_length) for i in velocities]
vels.append(acc)
vels.append(min_time)
prog = "{}([{},{},{},{},{},{}], a={}, t_min={})".format(command, *vels)
self.send_program(prog)
def speedl(self, velocities, acc, min_time):
"""
move at given velocities until minimum min_time seconds
"""
return self.speedx("speedl", velocities, acc, min_time)
def speedj(self, velocities, acc, min_time):
"""
move at given joint velocities until minimum min_time seconds
"""
return self.speedx("speedj", velocities, acc, min_time)
def movej(self, joints, acc=0.1, vel=0.05, wait=True, relative=False):
"""
move in joint space
"""
if relative:
l = self.getj()
joints = [v + l[i] for i, v in enumerate(joints)]
prog = self._format_move("movej", joints, acc, vel)
self.send_program(prog)
if wait:
self._wait_for_move_joints(0.001, joints[:6])
return self.getj()
def movel(self, tpose, acc=0.01, vel=0.01, wait=True, relative=False):
"""
Send a movel command to the robot. See URScript documentation.
"""
return self.movex("movel", tpose, acc=acc, vel=vel, wait=wait, relative=relative)
def movep(self, tpose, acc=0.01, vel=0.01, wait=True, relative=False):
"""
Send a movep command to the robot. See URScript documentation.
"""
return self.movex("movep", tpose, acc=acc, vel=vel, wait=wait, relative=relative)
def servoc(self, tpose, acc=0.01, vel=0.01, wait=True, relative=False):
"""
Send a servoc command to the robot. See URScript documentation.
"""
return self.movex("servoc", tpose, acc=acc, vel=vel, wait=wait, relative=relative)
def _format_move(self, command, tpose, acc, vel, radius=0, prefix=""):
tpose = [round(i, self.max_float_length) for i in tpose]
tpose.append(acc)
tpose.append(vel)
tpose.append(radius)
return "{}({}[{},{},{},{},{},{}], a={}, v={}, r={})".format(command, prefix, *tpose)
def movex(self, command, tpose, acc=0.01, vel=0.01, wait=True, relative=False):
"""
Send a move command to the robot. since UR robotene have several methods this one
sends whatever is defined in 'command' string
"""
if relative:
l = self.getl()
tpose = [v + l[i] for i, v in enumerate(tpose)]
prog = self._format_move(command, tpose, acc, vel, prefix="p")
self.send_program(prog)
if wait:
self._wait_for_move(tpose[:6])
return self.getl()
def getl(self, wait=False, _log=True):
"""
get TCP position
"""
pose = self.secmon.get_cartesian_info(wait)
if pose:
pose = [pose["X"], pose["Y"], pose["Z"], pose["Rx"], pose["Ry"], pose["Rz"]]
if _log:
self.logger.debug("Received pose from robot: %s", pose)
return pose
def movec(self, pose_via, pose_to, acc=0.01, vel=0.01, wait=True):
"""
Move Circular: Move to position (circular in tool-space)
see UR documentation
"""
pose_via = [round(i, self.max_float_length) for i in pose_via]
pose_to = [round(i, self.max_float_length) for i in pose_to]
prog = "movec(p%s, p%s, a=%s, v=%s, r=%s)" % (pose_via, pose_to, acc, vel, "0")
self.send_program(prog)
if wait:
self._wait_for_move(pose_to)
return self.getl()
def movels(self, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True):
"""
Concatenate several movel commands and applies a blending radius
pose_list is a list of pose.
This method is usefull since any new command from python
to robot make the robot stop
"""
return self.movexs("movel", pose_list, acc, vel, radius, wait)
def movexs(self, command, pose_list, acc=0.01, vel=0.01, radius=0.01, wait=True):
"""
Concatenate several movex commands and applies a blending radius
pose_list is a list of pose.
This method is usefull since any new command from python
to robot make the robot stop
"""
header = "def myProg():\n"
end = "end\n"
prog = header
for idx, pose in enumerate(pose_list):
if idx == (len(pose_list) - 1):
radius = 0
prog += self._format_move(command, pose, acc, vel, radius, prefix="p") + "\n"
prog += end
self.send_program(prog)
if wait:
self._wait_for_move(target=pose_list[-1])
return self.getl()
def stopl(self, acc=0.5):
self.send_program("stopl(%s)" % acc)
def stopj(self, acc=1.5):
self.send_program("stopj(%s)" % acc)
def stop(self):
self.stopj()
def close(self):
"""
close connection to robot and stop internal thread
"""
self.logger.info("Closing sockets to robot")
self.secmon.close()
if self.rtmon:
self.rtmon.stop()
def set_freedrive(self, val):
"""
set robot in freedrive/brackdrive mode where an operator can jogg
the robot to wished pose
"""
if val:
self.send_program("set robotmode freedrive")
else:
self.send_program("set robotmode run")
def set_simulation(self, val):
if val:
self.send_program("set sim")
else:
self.send_program("set real")
def get_realtime_monitor(self):
"""
return a pointer to the realtime monitor object
usefull to track robot position for example
"""
if not self.rtmon:
self.logger.info("Opening real-time monitor socket")
self.rtmon = urrtmon.URRTMonitor(self.host) # som information is only available on rt interface
self.rtmon.start()
self.rtmon.set_csys(self.csys)
return self.rtmon
def translate(self, vect, acc=0.01, vel=0.01, wait=True, command="movel"):
"""
move tool in base coordinate, keeping orientation
"""
p = self.getl()
p[0] += vect[0]
p[1] += vect[1]
p[2] += vect[2]
return self.movex(command, p, vel=vel, acc=acc, wait=wait)
def up(self, z=0.05, acc=0.01, vel=0.01):
"""
Move up in csys z
"""
p = self.getl()
p[2] += z
self.movel(p, acc=acc, vel=vel)
def down(self, z=0.05, acc=0.01, vel=0.01):
"""
Move down in csys z
"""
self.up(-z, acc, vel)