From be34be2812a1b086731dfdbae7822db5044b879a Mon Sep 17 00:00:00 2001 From: Chris Gilmer Date: Thu, 2 Feb 2017 14:57:16 -0800 Subject: [PATCH] Add URScript class to compose urscripts. Update and clean robotiq code to use URScript class --- urx/robotiq_two_finger_gripper.py | 248 ++++++++++++++++++++++-------- urx/urscript.py | 147 ++++++++++++++++++ 2 files changed, 334 insertions(+), 61 deletions(-) create mode 100755 urx/urscript.py diff --git a/urx/robotiq_two_finger_gripper.py b/urx/robotiq_two_finger_gripper.py index 5612b20..a495c5d 100644 --- a/urx/robotiq_two_finger_gripper.py +++ b/urx/robotiq_two_finger_gripper.py @@ -1,87 +1,213 @@ +#! /usr/bin/env python + """ -Python library to control Robotiq Two Finger Gripper connected to UR robot via Python-URX +Python library to control Robotiq Two Finger Gripper connected to UR robot via +Python-URX Tested using a UR5 Version CB3 and Robotiq 2-Finger Gripper Version 85 SETUP -You must install the driver first (http://support.robotiq.com/pages/viewpage.action?pageId=5963876) and then power on the gripper from the gripper UI +You must install the driver first and then power on the gripper from the +gripper UI. The driver can be found here: + +http://support.robotiq.com/pages/viewpage.action?pageId=5963876 FAQ -Why does this class group all the commands together and run them as a single program as opposed to running each line seperately (like most of URX)? +Q: Why does this class group all the commands together and run them as a single +program as opposed to running each line seperately (like most of URX)? -- The gripper is controlled by connecting to the robot's computer (TCP/IP) and then communicating with the gripper via a socket (127.0.0.1:63352). The scope of the socket is at the program level. It will be automatically closed whenever a program finishes. Therefore it's important that we run all commands as a single program. +A: The gripper is controlled by connecting to the robot's computer (TCP/IP) and +then communicating with the gripper via a socket (127.0.0.1:63352). The scope +of the socket is at the program level. It will be automatically closed +whenever a program finishes. Therefore it's important that we run all commands +as a single program. DOCUMENTATION -- This code was developed by downloading the "gripper package" on http://support.robotiq.com/pages/viewpage.action?pageId=5963876 -- Open folder "robotiq_2f_gripper_programs_CB3" -- robotiq_2f_gripper_programs_CB3/advanced_template_test.script was referenced to create this class +- This code was developed by downloading the gripper package "DCU-1.0.10" from + http://support.robotiq.com/pages/viewpage.action?pageId=5963876. Or more + directly from http://support.robotiq.com/download/attachments/5963876/DCU-1.0.10.zip +- The file robotiq_2f_gripper_programs_CB3/rq_script.script was referenced to + create this class -Future Features +FUTURE FEATURES -- Though I haven't developed it yet if you look in robotiq_2f_gripper_programs_CB3/advanced_template_test.script and view function "rq_get_var" there is an example of how to determine the current state of the gripper and if it's holding an object. -""" +Though I haven't developed it yet, if you look in +robotiq_2f_gripper_programs_CB3/advanced_template_test.script and view function +"rq_get_var" there is an example of how to determine the current state of the +gripper and if it's holding an object. +""" # noqa import logging +import os +import time + +from urx.urscript import URScript + +# Gripper Variables +ACT = b"ACT" +GTO = b"GTO" +ATR = b"ATR" +ARD = b"ARD" +FOR = b"FOR" +SPE = b"SPE" +OBJ = b"OBJ" +STA = b"STA" +FLT = b"FLT" +POS = b"POS" + +SOCKET_HOST = b"127.0.0.1" +SOCKET_PORT = 63352 +SOCKET_NAME = b"gripper_socket" + + +class RobotiqScript(URScript): + + def __init__(self, + socket_host=SOCKET_HOST, + socket_port=SOCKET_PORT, + socket_name=SOCKET_NAME): + self.socket_host = socket_host + self.socket_port = socket_port + self.socket_name = socket_name + super(RobotiqScript, self).__init__() + + # Reset connection to gripper + self._socket_close(self.socket_name) + self._socket_open(self.socket_host, + self.socket_port, + self.socket_name) + + def _import_rq_script(self): + dir_path = os.path.dirname(os.path.realpath(__file__)) + rq_script = os.path.join(dir_path, 'rq_script.script') + with open(rq_script, 'rb') as f: + rq_script = f.read() + self.add_header_to_program(rq_script) + + def _rq_get_var(self, var_name, nbytes): + self._socket_send_string(b"GET {}".format(var_name)) + self._socket_read_byte_list(nbytes) + + def _get_gripper_fault(self): + self._rq_get_var(FLT, 2) + + def _get_gripper_object(self): + self._rq_get_var(OBJ, 1) + + def _get_gripper_status(self): + self._rq_get_var(STA, 1) + + def _set_gripper_activate(self): + self._socket_set_var(GTO, 1, self.socket_name) + + def _set_gripper_force(self, value): + """ + FOR is the variable + range is 0 - 255 + 0 is no force + 255 is full force + """ + value = self._constrain_unsigned_char(value) + self._socket_set_var(FOR, value, self.socket_name) + + def _set_gripper_position(self, value): + """ + SPE is the variable + range is 0 - 255 + 0 is no speed + 255 is full speed + """ + value = self._constrain_unsigned_char(value) + self._socket_set_var(POS, value, self.socket_name) + + def _set_gripper_speed(self, value): + """ + SPE is the variable + range is 0 - 255 + 0 is no speed + 255 is full speed + """ + value = self._constrain_unsigned_char(value) + self._socket_set_var(SPE, value, self.socket_name) + + def _set_robot_activate(self): + self._socket_set_var(ACT, 1, self.socket_name) class Robotiq_Two_Finger_Gripper(object): - complete_program = "" - header = "def myProg():" + b"\n" - end = b"\n" + "end" - logger = False - def __init__(self): - self.logger = logging.getLogger("urx") - self.reset() + def __init__(self, + robot, + payload=0.85, + speed=255, + force=50, + socket_host=SOCKET_HOST, + socket_port=SOCKET_PORT, + socket_name=SOCKET_NAME): + self.robot = robot + self.payload = payload + self.speed = speed + self.force = force + self.socket_host = socket_host + self.socket_port = socket_port + self.socket_name = socket_name + self.logger = logging.getLogger(u"robotiq") - def reset(self): - self.complete_program = "" - self.add_line_to_program("set_analog_inputrange(0, 0)") - self.add_line_to_program("set_analog_inputrange(1, 0)") - self.add_line_to_program("set_analog_inputrange(2, 0)") - self.add_line_to_program("set_analog_inputrange(3, 0)") - self.add_line_to_program("set_analog_outputdomain(0, 0)") - self.add_line_to_program("set_analog_outputdomain(1, 0)") - self.add_line_to_program("set_tool_voltage(0)") - self.add_line_to_program("set_runstate_outputs([])") - self.add_line_to_program("set_payload(0.85)") # 0.85 is the weight of the gripper in KG - self.add_line_to_program("socket_close(\"gripper_socket\")") - # self.add_line_to_program("sleep(1)") # in Robotiq's example they do a wait here... I haven't found it nec - self.add_line_to_program("socket_open(\"127.0.0.1\",63352,\"gripper_socket\")") - # self.add_line_to_program("sleep(1)") - self.add_line_to_program("socket_set_var(\"SEP\",255,\"gripper_socket\")") # Speed 0-255 is valid - self.add_line_to_program("sync()") - self.add_line_to_program("socket_set_var(\"FOR\",50,\"gripper_socket\")") # Force 0-255 is valid - self.add_line_to_program("sync()") - self.add_line_to_program("socket_set_var(\"ACT\",1,\"gripper_socket\")") # Activate robot - self.add_line_to_program("sync()") - self.add_line_to_program("socket_set_var(\"GTO\",1,\"gripper_socket\")") - self.add_line_to_program("sync()") + def _get_new_urscript(self): + """ + Set up a new URScript to communicate with gripper + """ + urscript = RobotiqScript(socket_host=self.socket_host, + socket_port=self.socket_port, + socket_name=self.socket_name) + + # Set input and output voltage ranges + urscript._set_analog_inputrange(0, 0) + urscript._set_analog_inputrange(1, 0) + urscript._set_analog_inputrange(2, 0) + urscript._set_analog_inputrange(3, 0) + urscript._set_analog_outputdomain(0, 0) + urscript._set_analog_outputdomain(1, 0) + urscript._set_tool_voltage(0) + urscript._set_runstate_outputs() + + # Set payload, speed and force + urscript._set_payload(self.payload) + urscript._set_gripper_speed(self.speed) + urscript._set_gripper_force(self.force) + + # Initialize the gripper + urscript._set_robot_activate() + urscript._set_gripper_activate() + + return urscript + + def gripper_action(self, value): + """ + Activate the gripper to a given value from 0 to 255 + + 0 is open + 255 is closed + """ + urscript = self._get_new_urscript() + + # Move to the position + sleep = 2.0 + urscript._set_gripper_position(value) + urscript._sleep(sleep) + + # Send the script + self.robot.send_program(urscript()) + + # sleep the code the same amount as the urscript to ensure that + # the action completes + time.sleep(sleep) def open_gripper(self): - self.add_line_to_program("socket_set_var(\"POS\",0,\"gripper_socket\")") # 0 is open; range is 0-255 - self.add_line_to_program("sync()") - self.add_line_to_program("sleep(2)") + self.gripper_action(0) def close_gripper(self): - self.add_line_to_program("socket_set_var(\"POS\",255,\"gripper_socket\")") # 255 is closed; range is 0-255 - self.add_line_to_program("sync()") - self.add_line_to_program("sleep(2)") - - def add_line_to_program(self, new_line): - if(self.complete_program != ""): - self.complete_program += b"\n" - self.complete_program += new_line - - def ret_program_to_run(self): - if(self.complete_program == ""): - self.logger.debug("robotiq_two_finger_gripper's program is empty") - return "" - - prog = self.header - prog += self.complete_program - prog += self.end - return prog + self.gripper_action(255) diff --git a/urx/urscript.py b/urx/urscript.py new file mode 100755 index 0000000..f1c4df5 --- /dev/null +++ b/urx/urscript.py @@ -0,0 +1,147 @@ +#! /usr/bin/env python + +import logging + + +# Controller Settings +CONTROLLER_PORTS = [0, 1] +CONTROLLER_VOLTAGE = [ + 0, # 0-5V + 2, # 0-10V +] + +# Tool Settings +TOOL_PORTS = [2, 3] +TOOL_VOLTAGE = [ + 0, # 0-5V + 1, # 0-10V + 2, # 4-20mA +] + +OUTPUT_DOMAIN_VOLTAGE = [ + 0, # 4-20mA + 1, # 0-10V +] + + +class URScript(object): + + def __init__(self): + self.logger = logging.getLogger(u"urscript") + # The header is code that is before and outside the myProg() method + self.header = b"" + # The program is code inside the myProg() method + self.program = b"" + + def __call__(self): + if(self.program == b""): + self.logger.debug(u"urscript program is empty") + return b"" + + # Construct the program + myprog = b"""def myProg():{}\nend""".format(self.program) + + # Construct the full script + script = b"" + if self.header: + script = b"{}\n\n".format(self.header) + script = b"{}{}".format(script, myprog) + return script + + def reset(self): + self.header = b"" + self.program = b"" + + def add_header_to_program(self, header_line): + self.header = b"{}\n{}".format(self.header, header_line) + + def add_line_to_program(self, new_line): + self.program = b"{}\n\t{}".format(self.program, new_line) + + def _constrain_unsigned_char(self, value): + """ + Ensure that unsigned char values are constrained + to between 0 and 255. + """ + assert(isinstance(value, int)) + if value < 0: + value = 0 + elif value > 255: + value = 255 + return value + + def _set_analog_inputrange(self, port, vrange): + if port in CONTROLLER_PORTS: + assert(vrange in CONTROLLER_VOLTAGE) + elif port in TOOL_PORTS: + assert(vrange in TOOL_VOLTAGE) + msg = b"set_analog_inputrange({},{})".format(port, vrange) + self.add_line_to_program(msg) + + def _set_analog_output(self, input_id, signal_level): + assert(input_id in [0, 1]) + assert(signal_level in [0, 1]) + msg = "set_analog_output({}, {})".format(input_id, signal_level) + self.add_line_to_program(msg) + + def _set_analog_outputdomain(self, port, domain): + assert(domain in OUTPUT_DOMAIN_VOLTAGE) + msg = b"set_analog_outputdomain({},{})".format(port, domain) + self.add_line_to_program(msg) + + def _set_payload(self, mass, cog=None): + msg = b"set_payload({}".format(mass) + if cog: + assert(len(cog) == 3) + msg = b"{},{}".format(msg, cog) + msg = b"{})".format(msg) + self.add_line_to_program(msg) + + def _set_runstate_outputs(self, outputs=None): + if not outputs: + outputs = [] + msg = b"set_runstate_outputs({})".format(outputs) + self.add_line_to_program(msg) + + def _set_tool_voltage(self, voltage): + assert(voltage in [0, 12, 24]) + msg = b"set_tool_voltage({})".format(voltage) + self.add_line_to_program(msg) + + def _sleep(self, value): + msg = b"sleep({})".format(value) + self.add_line_to_program(msg) + + def _socket_close(self, socket_name): + msg = b"socket_close(\"{}\")".format(socket_name) + self.add_line_to_program(msg) + + def _socket_get_var(self, var, socket_name): + msg = b"socket_get_var(\"{}\",\"{}\")".format(var, socket_name) + self.add_line_to_program(msg) + self._sync() + + def _socket_open(self, socket_host, socket_port, socket_name): + msg = b"socket_open(\"{}\",{},\"{}\")".format(socket_host, + socket_port, + socket_name) + self.add_line_to_program(msg) + + def _socket_read_byte_list(self, nbytes, socket_name): + msg = b"global var_value = socket_read_byte_list({},\"{}\")".format(nbytes, socket_name) # noqa + self.add_line_to_program(msg) + self._sync() + + def _socket_send_string(self, message, socket_name): + msg = b"socket_send_string(\"{}\",\"{}\")".format(message, socket_name) # noqa + self.add_line_to_program(msg) + self._sync() + + def _socket_set_var(self, var, value, socket_name): + msg = b"socket_set_var(\"{}\",{},\"{}\")".format(var, value, socket_name) # noqa + self.add_line_to_program(msg) + self._sync() + + def _sync(self): + msg = b"sync()" + self.add_line_to_program(msg)