From 7bbc069f8c01296d3b0a96d88ab01887a784b77e Mon Sep 17 00:00:00 2001 From: Olivier R-D Date: Tue, 8 Jan 2013 11:26:54 +0100 Subject: [PATCH] inital cleanup for publishing --- README | 5 + debian/compat | 1 + debian/control | 21 ++ debian/python-ur.pyinstall | 1 + debian/python3-ur.install | 1 + debian/rules | 14 + make_deb.py | 45 +++ setup.py | 19 ++ urx/urparser.py | 170 ++++++++++ urx/urx.py | 647 +++++++++++++++++++++++++++++++++++++ 10 files changed, 924 insertions(+) create mode 100644 README create mode 100644 debian/compat create mode 100644 debian/control create mode 100644 debian/python-ur.pyinstall create mode 100644 debian/python3-ur.install create mode 100644 debian/rules create mode 100755 make_deb.py create mode 100644 setup.py create mode 100644 urx/urparser.py create mode 100644 urx/urx.py diff --git a/README b/README new file mode 100644 index 0000000..4f6a55e --- /dev/null +++ b/README @@ -0,0 +1,5 @@ +High level driver to control a Universal Robot robot from python through its TCP/IP interface +urx.py is the main file in this repository. It is a python library to control an UR robot from python. Internally it uses URSCript. +urx can optionally use the math3d library to receive and send transformation matrices to the robot +urparser.py is only the parser for the binary data frmo the UR +urrtmon.py is a parser for the "real-time" interface of the robot. It is hardcoded for some controller versions and therefore disabled by default diff --git a/debian/compat b/debian/compat new file mode 100644 index 0000000..ec63514 --- /dev/null +++ b/debian/compat @@ -0,0 +1 @@ +9 diff --git a/debian/control b/debian/control new file mode 100644 index 0000000..770bc92 --- /dev/null +++ b/debian/control @@ -0,0 +1,21 @@ +Source: python-urx +Section: python +Priority: extra +Maintainer: Olivier Roulet-Dubonnet +Build-Depends: debhelper (>= 8.0.0) +Standards-Version: 3.9.3 +Homepage: http://launchpad.net/XXX +X-Python-Version: >= 2.6 +X-Python3-Version: >= 3.1 + +Package: python-urx +Architecture: all +Depends: ${python:Depends}, ${misc:Depends} +Recommends: python-math3d +Description: Python library to control a robot from Universal Robots + +Package: python3-urx +Architecture: all +Depends: ${python3:Depends}, ${misc:Depends} +Recommends: python3-math3d +Description: Python library to control a robot from Universal Robots diff --git a/debian/python-ur.pyinstall b/debian/python-ur.pyinstall new file mode 100644 index 0000000..1782c54 --- /dev/null +++ b/debian/python-ur.pyinstall @@ -0,0 +1 @@ +urx/*.py diff --git a/debian/python3-ur.install b/debian/python3-ur.install new file mode 100644 index 0000000..22b35a2 --- /dev/null +++ b/debian/python3-ur.install @@ -0,0 +1 @@ +urx /usr/lib/python3/dist-packages diff --git a/debian/rules b/debian/rules new file mode 100644 index 0000000..82bc03c --- /dev/null +++ b/debian/rules @@ -0,0 +1,14 @@ +#!/usr/bin/make -f +# -*- makefile -*- +# Sample debian/rules that uses debhelper. +# This file was originally written by Joey Hess and Craig Small. +# As a special exception, when this file is copied by dh-make into a +# dh-make output file, you may use that output file without restriction. +# This special exception was added by Craig Small in version 0.37 of dh-make. + +# Uncomment this to turn on verbose mode. +export DH_VERBOSE=1 + +%: + dh $@ --with python2,python3 + diff --git a/make_deb.py b/make_deb.py new file mode 100755 index 0000000..1d75a4f --- /dev/null +++ b/make_deb.py @@ -0,0 +1,45 @@ +#!/usr/bin/python3 +""" +hackish file to crreate deb from setup.py +""" + +import subprocess +from email.utils import formatdate + +DEBVERSION = "0.5" + +rev = subprocess.check_output("bzr version-info --check-clean --custom --template='{revno}'", shell=True) +bzrstring = "bzr" + str(rev).replace("'","") + + +def get_changelog(progname, version, changelog, date): + """ + return a dummy changelog acceptable by debian script engine + """ + return """%s (%s) unstable; urgency=low + + %s + + -- Olivier R-D %s """ % (progname, version, changelog, date) + + + +def check_deb(name): + print("checking if %s is installed" % name) + subprocess.check_call("dpkg -s %s > /dev/null" % name, shell=True) + +if __name__ == "__main__": + check_deb("build-essential") + f = open("debian/changelog", "w") + f.write(get_changelog("python-ur", DEBVERSION + bzrstring, "Updated to last changes in bzr repository", formatdate())) + f.close() + + #now build package + #subprocess.check_call("dpkg-buildpackage -rfakeroot -uc -us -b", shell=True) + subprocess.check_call("fakeroot dh binary --with python3,python2 --without pysupport,pycentral", shell=True) + subprocess.check_call("dh clean", shell=True) + + + + + diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..15fe476 --- /dev/null +++ b/setup.py @@ -0,0 +1,19 @@ +from distutils.core import setup +from distutils.command.install_data import install_data + + +import glob +import os + +import make_deb + +setup (name = "python-ur", + version = make_deb.bzrstring, + description = "Python library to control an UR robot", + author = "Olivier Roulet-Dubonnet", + url = 'http://launchpad.net/XXX', + py_modules=["urparser", "urx"], + license = "GNU General Public License", + ) + + diff --git a/urx/urparser.py b/urx/urparser.py new file mode 100644 index 0000000..8b20487 --- /dev/null +++ b/urx/urparser.py @@ -0,0 +1,170 @@ +""" +parse data from UR socket client +REMARK +Only the last connected socket on 3001 is the primary client !!!! so it is unreliable to rely on it +http://support.universal-robots.com/Technical/PrimaryAndSecondaryClientInterface +""" + +__author__ = "Olivier Roulet-Dubonnet" +__copyright__ = "Copyright 2011-2012, Olivier Roulet-Dubonnet" +__credits__ = ["Olivier Roulet-Dubonnet"] +__license__ = "GPLv3" +__version__ = "0.3" +__status__ = "Development" + + +import struct +from copy import copy + +class ParsingException(Exception): + def __init__(self, *args): + Exception.__init__(self, *args) + + +def _get_data(data, fmt, names): + """ + fill data into a dictionary + data is data from robot packet + fmt is struct format, but with added A for arrays and no support for numerical in fmt + names args are strings used to store values + """ + + tmpdata = copy(data) + fmt = fmt.strip() # space may confuse us + d = dict() + i = 0 + j = 0 + while j < len(fmt) and i < len(names): + f = fmt[j] + if f in (" ", "!", ">", "<"): + j += 1 + elif f == "A": #we got an array + # first we need to find its size + if j == len(fmt) - 2: # we are last element, size is the rest of data in packet + arraysize = len(tmpdata) + else: # size should be given in last element + asn = names[i-1] + if not asn.endswith("Size"): + raise ParsingException("Error, array without size ! %s %s" % (asn, i)) + else: + arraysize = d[asn] + d[names[i]] = tmpdata[0:arraysize] + #print "Array is ", names[i], d[names[i]] + tmpdata = tmpdata[arraysize:] + j += 2 + i += 1 + else: + fmtsize = struct.calcsize(fmt[j]) + #print "reading ", f , i, j, fmtsize, len(tmpdata) + if len(tmpdata) < fmtsize: #seems to happen on windows + raise ParsingException("Error, length of data smaller than advertized: ", len(tmpdata), fmtsize, "for names ", names, f, i, j) + d[names[i]] = struct.unpack("!" + f, tmpdata[0:fmtsize])[0] + #print names[i], d[names[i]] + tmpdata = tmpdata[fmtsize:] + j += 1 + i += 1 + + return d + +def get_header(data): + return struct.unpack("!iB", data[0:5]) + +def analyze_header(data): + """ + read first 5 bytes and return complete packet + """ + if not len(data) >= 5: + raise ParsingException("Packet size %s smaller than header size (5 bytes)" % len(data)) + else: + psize, ptype = get_header(data) + if psize < 5: + raise ParsingException("Error, declared length of data smaller than its own header(5): ", psize) + elif psize > len(data): + raise ParsingException("Error, length of data smaller (%s) than declared (%s)" %( len(data), psize)) + return psize, ptype, data[:psize], data[psize:] + + +def find_first_packet(data): + """ + find the first complete packet in a string + returns None if none found + """ + counter = 0 + while True: + if len(data) >= 5: + psize, ptype = get_header(data) + if psize < 5 or psize > 500 or ptype != 16: + data = data[1:] + elif len(data) > psize: + if counter: + print("Had to remove % bytes of garbage at begining of packet" % counter) + #ok we we have somehting which looks like a packet" + return (data[:psize], data[psize:]) + else: + #packet is not complete + return None + else: + return None + + + +def parse(data): + """ + parse a packet from the UR socket and return a dictionary with the data + """ + allData = {} + #print "Total size ", len(data) + while data: + psize, ptype, pdata, data = analyze_header(data) + #print "We got packet with size %i and type %s" % (psize, ptype) + if ptype == 16: + allData["SecondaryClientData"] = _get_data(pdata, "!iB", ("size", "type")) + data = (pdata + data)[5:] # This is the total size so we resend data to parser + elif ptype == 0: + allData["RobotModeData"] = _get_data(pdata, "!iBQ???????Bd", ("size", "type", "timestamp", "isRobotConnected", "isRealRobotEnabled", "isPowerOnRobot", "isEmergencyStopped", "isSecurityStopped", "isProgramRunning", "isProgramPaused", "robotMode", "speedFraction")) + elif ptype == 1: + tmpstr = ["size", "type"] + for i in range(0, 6): + tmpstr += ["q_actual%s" % i, "q_target%s" % i, "qd_actual%s" % i, "I_actual%s" % i, "V_actual%s" % i, "T_motor%s" % i ,"T_micro%s" % i, "jointMode%s" % i] + + allData["JointData"] = _get_data(pdata, "!iB dddffffB dddffffB dddffffB dddffffB dddffffB dddffffB", tmpstr) + + elif ptype == 4: + allData["CartesianInfo"] = _get_data(pdata, "iBdddddd", ("size", "type", "X", "Y", "Z", "Rx", "Ry", "Rz")) + elif ptype == 5: + allData["LaserPointer(OBSOLETE)"] = _get_data(pdata, "iBddd" , ("size", "type")) + elif ptype == 3: + allData["MasterBoardData"] = _get_data(pdata, "iBhhbbddbbddffffBBb" , ("size", "type", "digitalInputBits", "digitalOutputBits", "analogInputRange0", "analogInputRange1", "analogInput0", "analogInput1", "analogInputDomain0", "analogInputDomain1", "analogOutput0", "analogOutput1", "masterBoardTemperature", "robotVoltage48V", "robotCurrent", "masterIOCurrent"))#, "masterSafetyState" ,"masterOnOffState", "euromap67InterfaceInstalled" )) + elif ptype == 2: + allData["ToolData"] = _get_data(pdata, "iBbbddfBffB" , ("size", "type", "analoginputRange2", "analoginputRange3", "analogInput2", "analogInput3", "toolVoltage48V", "toolOutputVoltage", "toolCurrent", "toolTemperature", "toolMode" )) + + elif ptype == 20: + tmp = _get_data(pdata, "!iB Qbb", ("size", "type", "timestamp", "source", "robotMessageType")) + if tmp["robotMessageType"] == 3: + allData["VersionMessage"] = _get_data(pdata, "!iBQbb bAbBBiAb", ("size", "type", "timestamp", "source", "robotMessageType", "projectNameSize", "projectName", "majorVersion", "minorVersion", "svnRevision", "buildDate")) + elif tmp["robotMessageType"] == 6: + allData["robotCommMessage"] = _get_data(pdata, "!iBQbb iiAc", ("size", "type", "timestamp", "source", "robotMessageType", "code", "argument", "messageText")) + elif tmp["robotMessageType"] == 1: + allData["labelMessage"] = _get_data(pdata, "!iBQbb iAc", ("size", "type", "timestamp", "source", "robotMessageType", "id", "messageText")) + elif tmp["robotMessageType"] == 2: + allData["popupMessage"] = _get_data(pdata, "!iBQbb ??BAcAc", ("size", "type", "timestamp", "source", "robotMessageType", "warning", "error", "titleSize", "messageTitle", "messageText")) + elif tmp["robotMessageType"] == 0: + allData["messageText"] = _get_data(pdata, "!iBQbb Ac", ("size", "type", "timestamp", "source", "robotMessageType", "messageText")) + elif tmp["robotMessageType"] == 8: + allData["varMessage"] = _get_data(pdata, "!iBQbb iiBAcAc", ("size", "type", "timestamp", "source", "robotMessageType", "code", "argument", "titleSize", "messageTitle", "messageText")) + elif tmp["robotMessageType"] == 7: + allData["keyMessage"] = _get_data(pdata, "!iBQbb iiBAcAc", ("size", "type", "timestamp", "source", "robotMessageType", "code", "argument", "titleSize", "messageTitle", "messageText")) + elif tmp["robotMessageType"] == 5: + allData["keyMessage"] = _get_data(pdata, "!iBQbb iiAc", ("size", "type", "timestamp", "source", "robotMessageType", "code", "argument", "messageText")) + else: + print("Message type parser not implemented ", tmp) + else: + print("Uknown packet type %s with size %s" % (ptype, psize)) + + return allData + + + + + + diff --git a/urx/urx.py b/urx/urx.py new file mode 100644 index 0000000..1dce667 --- /dev/null +++ b/urx/urx.py @@ -0,0 +1,647 @@ +""" +Python library to control an UR robot through its secondary port interface + +import urx + +rob = urx.robot(192.168.0.100) +rob.set_tcp((x=01, z=0.232)) +rob.movej((1,2,3,4,5,6), a, v) +rob.movel((x,y,z,a,b,c), a, v) +print "Current tool pose is: ", rob.getl() +rob.movelrel((0.1, 0, 0, 0, 0, 0), a, v) +rob.stopj(a) + +robot.movel(x,y,z,a,b,c), wait=False) +while True : + sleep(0.1) #sleep first since the information may be outdated + if robot.isProgramRunning(): + break + +robot.movel(x,y,z,a,b,c), wait=False) +while.robot.getForce() < 50: + sleep(0.01) +robot.stopl() + +try: + robot.movelrel(0,0,0.1,0,0,0) +except RobotError, ex: + print "Robot could not execute move (emergency stop for example), do somethhing", ex + +Using matrices: + +robot = Robot("192.168.1.1") +robot.set_tcp((0,0,0.23,0,0,0) +calib = mathd3d.Transform() +calib.orient.rotate_zb(pi/4) #just an example +robot.set_calibration_matrix(calib) + +trans = robot.get_transform() # get current transformation matrix (tool to base) +trans.orient.rotate_yt(pi/2) +robot.apply_transform(trans) +trans.pos += math3d.Vector(0,0,0.3) +robot.apply_transform(trans) + + +#or only work with orientation part +o = robot.get_orientation() +o.rotate_yb(pi) +robot.orient(o) + +TODO: + add more methods + movec + +DOC LINK +http://support.universal-robots.com/URScript/RemoteAccess +""" + +__author__ = "Olivier Roulet-Dubonnet" +__copyright__ = "Copyright 2011-2012, Olivier Roulet-Dubonnet" +__credits__ = ["Olivier Roulet-Dubonnet"] +__license__ = "GPLv3" +__version__ = "0.3" +__status__ = "Development" + + +from threading import Thread, Lock, Condition +import socket +import time + + +MATH3D = True +try: + import math3d +except ImportError: + MATH3D = False + print("pymath3d library could not be found on this computer, disabling use of matrices") + +#import urrtmon #temproarely disabled +import urparser + + +class RobotException(Exception): + pass + + +class SecondaryMonitor(Thread): + """ + Monitor data from secondry port and send programs to robot + """ + def __init__(self, host): + Thread.__init__(self) + self._lock = Lock() + self._s_secondary = None + self._dict = {} + self._dictLock = Lock() + self.host = host + secondary_port = 30002 # Secondary client interface on Universal Robots + self._s_secondary = socket.create_connection((self.host, secondary_port), timeout=0.5) + self._queue = [] + self._dataqueue = bytes() + self._trystop = False # to stop thread + self.running = False #True when robot is on and listening + self._dataEvent = Condition() + + self.start() + with self._dataEvent: + self._dataEvent.wait() # make sure we got some data before someone calls us + + def sendProgram(self, prog): + """ + send program to robot in URScript format + If another program is send while a program is running the first program is aborded. + """ + + with self._lock: + prog.strip() + self._queue.append(prog.encode() + b"\n") + + + def run(self): + """ + check program execution status in the secondary client data packet we get from the robot + This interface uses only data from the secondary client interface (see UR doc) + Only the last connected client is the primary client, + so this is not guaranted and we cannot rely on information only + send to the primary client(like program execution start ?!?!?) + """ + + while not self._trystop: + with self._lock: + if len(self._queue) > 0: + prog = self._queue.pop(0) + self._s_secondary.send(prog) + + data = self._get_data() + try: + with self._dictLock: + self._dict = urparser.parse(data) + except urparser.ParsingException as ex: + self._log("Error parsing data from urrobot: " + str(ex) ) + continue + + if "RobotModeData" not in self._dict: + self._log( "Got a packet from robot without RobotModeData, strange ...") + continue + + #print data["RobotModeData"] + + if self._dict["RobotModeData"]["robotMode"] == 0 \ + and self._dict["RobotModeData"]["isRealRobotEnabled"] == True \ + and self._dict["RobotModeData"]["isEmergencyStopped"] == False \ + and self._dict["RobotModeData"]["isSecurityStopped"] == False \ + and self._dict["RobotModeData"]["isRobotConnected"] == True \ + and self._dict["RobotModeData"]["isPowerOnRobot"] == True: + self.running = True + else: + if self.running == True: + print("Robot not running: ", self._dict["RobotModeData"]) + self.running = False + with self._dataEvent: + self._dataEvent.notifyAll() + + + def _get_data(self): + """ + returns something that looks like a packet, nothing is guaranted + """ + while True: + ans = urparser.find_first_packet(self._dataqueue[:]) + if ans: + self._dataqueue = ans[1] + return ans[0] + else: + tmp = self._s_secondary.recv(1024) + self._dataqueue += tmp + + def getCartesianInfo(self): + with self._dictLock: + if "CartesianInfo" in self._dict: + return self._dict["CartesianInfo"] + else: + return None + + def getAllData(self): + """ + return last data obtained from robot in dictionnary format + """ + with self._dictLock: + return self._dict.copy() + + + def getJointData(self): + with self._dictLock: + if "JointData" in self._dict: + return self._dict["JointData"] + else: + return None + + def isProgramRunning(self): + """ + return True if robot is executing a program + Rmq: The refresh rate is only 10Hz so the information may be outdated + """ + with self._dictLock: + return self._dict["RobotModeData"]["isProgramRunning"] + + + def cleanup(self): + self._trystop = True + self.join() + if self._s_secondary: + with self._lock: + self._s_secondary.close() + + def _log(self, msg): + print(self.__class__.__name__, ": ", msg) + + +class SimpleRobot(object): + """ + Python interface to socket interface of UR robot. + programs are send to port 3002 + data is read from secondary intergace and realtime interface (called Matlab interface in documentation) + Since parsing the RT interface uses som CPU, and does not support all robots versions, it is disabled by default + The RT interfaces is only used for the getForce related methods + Rmq: A program sent to the robot i executed immendiatly and any running program is stopped + """ + def __init__(self, host, useRTInterface=False): + self.host = host + + self.secmon = SecondaryMonitor(self.host) #data from robot at 10Hz + + if useRTInterface: + self.rtmon = urrtmon.URRTMonitor(self.host)# som information is only available on rt interface + else: + self.rtmon = None + #the next 3 values must be conservative! otherwise we may wait forever + self.jointEpsilon = 0.05 # precision of joint movem used to wait for move completion + self.linearEpsilon = 0.0005 # precision of linear movement, used to wait for move completion + self.radialEpsilon = 0.05 # precision of radial movement, used to wait for move completion + if useRTInterface: + self.rtmon.start() + + self.start_pose = [1.57, -1.77, 1.57, -1.8, -1.57, -1.57] + + + def __repr__(self): + return "Robot Object (IP=%s, state=%s)" % (self.host, self.secmon.getAllData()["RobotModeData"]) + + def __str__(self): + return self.__repr__() + + def isRunning(self): # legacy + return self.secmon.running + + def isProgramRunning(self): + return self.secmon.isProgramRunning() + + def sendProgram(self, prog): + self.secmon.sendProgram(prog) + + def getTCPForce(self, wait=True): + """ + return measured force in TCP + if wait==True, waits for next packet before returning + """ + return self.rtmon.getTCFForce(wait) + + def getForce(self, wait=True): + """ + length of force vector returned by getTCPForce + if wait==True, waits for next packet before returning + """ + tcpf = self.getTCPForce( wait) + force = 0 + for i in tcpf: + force += i**2 + return force**0.5 + + def moveToStartPose(self): + """ + move to pos defined in self.start_pose attribute + """ + self.movej(self.start_pose) + + def setTcp(self, x=0, y=0, z=0, a=0, b=0, c=0): + """ + """ + if type(x) in (list, tuple): + if len(x) != 6: + raise Exception("Tcp is a 6 values list") + else: + arg = x + else: + arg = (x, y, z, a, b, c) + prog = "set_tcp(p[%s, %s, %s, %s, %s, %s])" % arg + self.secmon.sendProgram(prog) + + def setPayload(self, weight): + """ + set payload in Kg + """ + prog = "set_payload(%s)" % weight + self.secmon.sendProgram(prog) + + def setGravity(self, vector): + """ + set direction of gravity + """ + prog = "set_gravity(%s)" % list(vector) + self.secmon.sendProgram(prog) + + def sendMessage(self, msg): + """ + send message to the GUI log tab on the robot controller + """ + prog = "textmsg(%s)" % msg + self.secmon.sendProgram(prog) + + def setDigitalOut(self, output, val): + """ + set digital output. val is a bool + """ + if val in (True, 1): + val = "True" + else: + val = "False" + self.secmon.sendProgram('digital_out[%s]=%s' % (output, val)) + + def getAnalogInputs(self): + """ + get analog input + """ + data = self.secmon.getAllData() + return data["MasterBoardData"]["analogInput0"], data["MasterBoardData"]["analogInput1"] + + def getDigitalOutputBits(self): + """ + get digital output + """ + data = self.secmon.getAllData() + return data["MasterBoardData"]["digitalOutputBits"] + + def getDigitalOutput(self, val): + """ + get digital output + """ + data = self.secmon.getAllData() + output = data["MasterBoardData"]["digitalOutputBits"] + mask = 1 << val + if (output & mask): + return 1 + else: + return 0 + + + + + def setAnalogOut(self, output, val): + """ + set analog output, val is a float + """ + prog = "set_analog_output(%s, %s)" % (output, val) + self.secmon.sendProgram(prog) + + + def setToolVoltage(self, val): + """ + set voltage to be delivered to the tool, val is 0, 12 or 24 + """ + prog = "set_tool_voltage(%s)" % (val) + self.secmon.sendProgram(prog) + + def movejrel(self, joints, acc=0.1, vel=0.05, wait=True): + """ + relative joint move + """ + l = self.getj() + newl = [v + l[i] for i, v in enumerate(joints)] + return self.movej(newl, acc, vel, wait ) + + def movej(self, joints, acc=0.1, vel=0.05, wait=True): + """ + wrapper around the movej command in URScript + """ + #todo: should check joints input and velocity + prog = "movej(%s, a=%s, v=%s)" % (list(joints), acc, vel) + self.sendProgram(prog) + if not wait: + return None + else: + while True: + time.sleep(0.05) + if not self.isRunning():#FIXME add more tests + raise RobotException("Robot stopped") + currentjoints = self.getj() + if self._eq(currentjoints, joints) and not self.secmon.isProgramRunning(): + return currentjoints + + def getj(self): + """ + get joints position + """ + jts = self.secmon.getJointData() + return [jts["q_actual0"], jts["q_actual1"], jts["q_actual2"], jts["q_actual3"], jts["q_actual4"], jts["q_actual5"]] + + + def movelrel(self, tpose, acc=0.01, vel=0.01, wait=True): + """ + relative move in cartesian coordinate + """ + l = self.getl() + newl = [v + l[i] for i, v in enumerate(tpose)] + #print "going to: ", newl + return self.movel(newl, acc, vel, wait ) + + def movel(self, tpose, acc=0.01, vel=0.01, wait=True): + """ + move in tool corrdinate system + wrapper around the movel command in URScript + """ + #todo: should check joints input and velocity + tpose = [round(i, 2) for i in tpose] + prog = "movel(p%s, a=%s, v=%s)" % (tpose, acc, vel) + #print(prog) + self.sendProgram(prog) + if not wait: + return None + else: + while True: + time.sleep(0.05) + if not self.isRunning():#FIXME add more tests + raise RobotException("Robot stopped") + pose = self.getl() + if self._eqpose(pose, tpose) and not self.secmon.isProgramRunning(): + return pose + + def getl(self): + """ + get TCP position + """ + pose = self.secmon.getCartesianInfo() + if pose: + return [pose["X"], pose["Y"], pose["Z"], pose["Rx"], pose["Ry"], pose["Rz"]] + else: + return None + + def movels(self, joints, acc, vel , radius, wait=True): + """ + where joints is a list of list. dvs: several movel commands must be send as one program in order for radius blending to work. + This is could easily be implemented in movel by detecting type of the joint variable + """ + raise NotImplementedError + + def stopl(self, acc = 0.5): + self.sendProgram("stopl(%s)" % acc) + + def stopj(self, acc = 1.5): + self.sendProgram("stopj(%s)" % acc) + + def stop(self): + self.stopj() + + def _eq(self, l1, l2): + """ + robot joints precision is 0.01, donot give anything smaller!!! + """ + for i in range(0, len(l1)): + if abs(l1[i] -l2[i]) > self.jointEpsilon: + return False + return True + + def _eqpose(self, l1, l2): + """ + epsilonl is for x,y,z + epsilonr is for a,b,c + robot joints precision is 0.01, do not give anything smaller!!! + """ + for i in range(0, 3): + if abs(l1[i] - l2[i]) > self.linearEpsilon: + #print "param: ", i, "val: ", l1[i], "-", l2[i] , "=", abs(l1[i] -l2[i]), " is not under ", self.linearEpsilon + return False + for i in range(3, 6): + if abs(l1[i] - l2[i]) > self.radialEpsilon: + #print "param: ", i, "val: ", l1[i], "-", l2[i] , "=", abs(l1[i] -l2[i]), " is not under ", self.radialEpsilon + return False + return True + + def _log(self, msg): + print("urrobot: ", self.__class__.__name__, ": ", msg) + + def cleanup(self): + self.secmon.cleanup() + if self.rtmon: + self.rtmon.stop() + + + +class Robot(object): + """ + Python interface to socket interface of UR robot. + Compare to the SimpleRobot class, this class adds the possibilty to work directly with matrices + and includes support for calibrating the robot coordinate system + and style portet to PEP 8 + """ + def __init__(self, host, useRTInterface=False): + self.robot = SimpleRobot(host, useRTInterface)#since there are many depcreated methods in SimpleRobot it is cleaner not to inherit it + self.default_linear_acceleration = 0.01 + self.default_linear_velocity = 0.01 + + self.calibration = math3d.Transform() #identity + self.inverse = self.calibration.inverse() + + def set_tcp(self, tcp): + if type(tcp) == math3d.Transform: + tcp = tcp.pose_vector + self.robot.setTcp(tcp) + + def set_calibration_matrix(self, matrix): + self.calibration = matrix + self.inverse = self.calibration.inverse() + + def orient(self, orient, acc=None, vel=None, wait=True): + if type(orient) != math3d.Orientation: + orient = math3d.Orientation(orient) + trans = self.get_transform() + trans.orient = orient + self.apply_transform(trans, acc, vel, wait) + + def set_orientation(self, orient, acc=None, vel=None, wait=True): + self.orient(orient, acc, vel, wait) + + def translate(self, vect, acc=None, vel=None, wait=True): + trans = self.get_transform() + trans.pos += math3d.Vector(vect) + return self.apply_transform(trans, acc, vel, wait) + + def set_pos(self, vect, acc=None, vel=None, wait=True): + trans = math3d.Transform(self.get_orientation(), math3d.Vector(vect)) + return self.apply_transform(trans, acc, vel, wait) + + def stop(self): + self.robot.stop() + + def apply_transform(self, trans, acc=None, vel=None, wait=True): + if not acc: + acc = self.default_linear_acceleration + if not vel: + vel = self.default_linear_velocity + t = self.calibration * trans + pose = self.robot.movel(t.pose_vector, acc, vel, wait) + if pose: #movel does not return anything when wait is False + return self.inverse * math3d.Transform(pose) + + def add_transform(self, trans, acc=None, vel=None, wait=True): + pose = self.get_transform() + return self.apply_transform(trans * pose, acc, vel, wait) + + def get_transform(self): + pose = self.robot.getl() + trans = self.inverse * math3d.Transform(pose) + return trans + + def get_orientation(self): + trans = self.get_transform() + return trans.orient + + def get_pos(self): + trans = self.get_transform() + return trans.pos + + def movel(self, pose, acc=None, vel=None, wait=True): + t = math3d.Transform(pose) + self.apply_transform(self.inverse * t, acc, vel, wait) + + def movelrel(self, pose, acc=None, vel=None, wait=True): + self.apply_transform(math3d.Transform(pose), acc, vel, wait) + + def movel_cnc(self, pose, acc=None, vel=None, wait=True): + pass + + def getl(self): + t = self.get_transform() + return t.pose_vector + + def is_running(self): + return self.robot.isRunning() + + def is_program_running(self): + return self.robot.isProgramRunning() + + def set_payload(self, weight): + return self.robot.setPayload(weight) + + def set_gravity(self, vector): + if type(vector) == math3d.Vector: + vector = vector.list + return self.robot.setGravity(vector) + + def set_digital_out(self, output, val): + self.robot.setDigitalOut(output, val) + + def movej(self, joints, acc=0.1, vel=0.05, wait=True): + """ + wrapper around the movej command in URScript + """ + self.robot.movej(joints, acc, vel, wait) + + def getj(self): + return self.robot.getj() + + def movejrel(self, joints, acc=0.1, vel=0.05, wait=True): + """ + relative joint move + """ + self.robot.movejrel(joints, acc, vel, wait) + + def cleanup(self): + self.robot.cleanup() + + + +if not MATH3D: + Robot = SimpleRobot + +if __name__ == "__main__": + try: + #robot = Robot( '192.168.1.6') + robot = Robot( '192.168.1.5') + r = robot + + #robot = Robot( '192.168.1.2', useRTInterface=False) + #robot = Robot( '192.168.1.216') + #robot = Robot( '192.168.1.216') + #p = r.sendProgram(open("examples/example.prog").read(), wait=False) + + from IPython.frontend.terminal.embed import InteractiveShellEmbed + ipshell = InteractiveShellEmbed( banner1="\nStarting IPython shell, robot object is available\n") + ipshell(local_ns=locals()) + + finally: + if "robot" in dir(): + robot.cleanup() + + + +