Edit on GitHub

Bulk Read/Write Tutorial

NOTE: This tutorial is based on XL-430-W250 DYNAMIXEL motors and uses Protocol 2.0.

This section provides examples of how to write code in python to bulk_read and bulk_write data to DYNAMIXEL motors.
Bulk Read/Write enables simultaneous control of multiple motors.
Unlike Sync Read/Write, which can only access the same address across multiple motors, Bulk Read/Write can access different addresses on multiple motors in a single instruction.
In this example, we need two motors to operate simultaneously.

Make python file

  • Create a python file and open it in a text editor. In this case, we use visual studio code, but you can use any text editor you prefer.
    $ mkdir -p my_dxl_project/python
    $ cd my_dxl_project/python
    $ code my_bulk_read_write.py
    

Make python file

  • Open the Visual Studio Code and create a python file in your workspace.

Source Code Description

Add Header Files

Initialize Handler Objects

  • Initialize the PortHandler,PacketHandler,GroupBulkWrite and GroupBulkRead. Set the port name and protocol version according to your DYNAMIXEL setup. The example below uses /dev/ttyUSB0 as the port name and 2.0 as the protocol version.
    portHandler = PortHandler('/dev/ttyUSB0')
    packetHandler = PacketHandler(2.0)
    
    goal_position_address = 116
    present_position_address = 132
    groupBulkWrite = GroupBulkWrite(portHandler, packetHandler)
    groupBulkRead = GroupBulkRead(portHandler, packetHandler)
    
  • Initialize the PortHandler,PacketHandler,GroupBulkWrite and GroupBulkRead. Set the port name and protocol version according to your DYNAMIXEL setup. The example below uses COM3 as the port name and 2.0 as the protocol version.
    portHandler = PortHandler('COM3')
    packetHandler = PacketHandler(2.0)
    
    goal_position_address = 116
    present_position_address = 132
    groupBulkWrite = GroupBulkWrite(portHandler, packetHandler)
    groupBulkRead = GroupBulkRead(portHandler, packetHandler)
    

Open Port and Set Baud Rate

ERROR LOGGING

openPort() and setBaudRate() functions return a boolean value indicating success or failure. If you want to check for failure, you can write like below to print an error message and exit the program.

if portHandler.openPort():
  print("Succeeded to open the port!")
else:
  print("Failed to open the port!")
  exit()

if portHandler.setBaudRate(57600):
  print("Succeeded to change the baudrate!")
else:
  print("Failed to change the baudrate!")
  exit()

Write data to enable torque

ERROR LOGGING

write1ByteTxRx() returns a communication result. If you want to check the communication result and error, you can write the code as shown below.

dxl_comm_result, dxl_error = packetHandler.write1ByteTxRx(portHandler, dxl_id1, torque_on_address, data)
if dxl_comm_result != COMM_SUCCESS:
    print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
elif dxl_error != 0:
    print("%s" % packetHandler.getRxPacketError(dxl_error))
else:
    print("Dynamixel#1 has been successfully connected")

dxl_comm_result, dxl_error = packetHandler.write1ByteTxRx(portHandler, dxl_id2, torque_on_address, data)
if dxl_comm_result != COMM_SUCCESS:
    print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
elif dxl_error != 0:
    print("%s" % packetHandler.getRxPacketError(dxl_error))
else:
    print("Dynamixel#2 has been successfully connected")

Add parameters to GroupBulkRead

ERROR LOGGING

addParam() returns a communication result as boolean. If you want to check the communication result and error, you can write the code as shown below.

dxl_addparam_result = groupBulkRead.addParam(dxl_id1, present_position_address, data_length_4byte)
if dxl_addparam_result != True:
    print("[ID:%03d] groupBulkRead addparam failed" % dxl_id1)
    exit()

dxl_addparam_result = groupBulkRead.addParam(dxl_id2, led_address, data_length_1byte)
if dxl_addparam_result != True:
    print("[ID:%03d] groupBulkRead addparam failed" % dxl_id2)
    exit()

Get User Input and Set Data

Add parameters to GroupBulkWrite

ERROR LOGGING

txPacket() and addParam() returns a communication result. If you want to check the communication result and error, you can write the code as shown below.

    dxl_addparam_result = groupBulkWrite.addParam(dxl_id1, goal_position_address, data_length_4byte, param_goal_position)
    if not dxl_addparam_result:
        print("[ID:%03d] groupBulkWrite addparam failed" % dxl_id1)
        exit()

    dxl_addparam_result = groupBulkWrite.addParam(dxl_id2, led_address, data_length_1byte, led_data)
    if not dxl_addparam_result:
        print("[ID:%03d] groupBulkWrite addparam failed" % dxl_id2)
        exit()

    dxl_comm_result = groupBulkWrite.txPacket()
    if dxl_comm_result != COMM_SUCCESS:
        print("%s" % packetHandler.getTxRxResult(dxl_comm_result))

Read data to get current position

ERROR LOGGING

txRxPacket() and isAvailable() returns a communication result . If you want to check the communication result and error, you can write the code as shown below.

        dxl_comm_result = groupBulkRead.txRxPacket()
        if dxl_comm_result != COMM_SUCCESS:
            print("%s" % packetHandler.getTxRxResult(dxl_comm_result))

You can also check if the data is available in the GroupBulkRead by using the isAvailable() function. If the data is not available, you can print an error message and exit the program.

        dxl_getdata_result = groupBulkRead.isAvailable(dxl_id1, present_position_address, data_length_4byte)
        if dxl_getdata_result != True:
            print("[ID:%03d] groupBulkRead getdata failed" % dxl_id1)
            quit()

        dxl_getdata_result = groupBulkRead.isAvailable(dxl_id2, led_address, data_length_1byte)
        if dxl_getdata_result != True:
            print("[ID:%03d] groupBulkRead getdata failed" % dxl_id2)
            quit()

Run the Code

  • Run the code using python3.
    $ python3 my_bulk_read_write.py
    
  • Run the code through Visual Studio Code.

Full Source Code

#!/usr/bin/env python3

from dynamixel_sdk import *


portHandler = PortHandler('/dev/ttyUSB0')
packetHandler = PacketHandler(2.0)

goal_position_address = 116
present_position_address = 132
groupBulkWrite = GroupBulkWrite(portHandler, packetHandler)
groupBulkRead = GroupBulkRead(portHandler, packetHandler)

if portHandler.openPort():
  print("Succeeded to open the port!")
else:
  print("Failed to open the port!")
  exit()

if portHandler.setBaudRate(57600):
  print("Succeeded to change the baudrate!")
else:
  print("Failed to change the baudrate!")
  exit()

dxl_id1 = 1
dxl_id2 = 2
torque_on_address = 64
data = 1
dxl_comm_result, dxl_error = packetHandler.write1ByteTxRx(portHandler, dxl_id1, torque_on_address, data)
if dxl_comm_result != COMM_SUCCESS:
    print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
elif dxl_error != 0:
    print("%s" % packetHandler.getRxPacketError(dxl_error))
else:
    print("Dynamixel#1 has been successfully connected")

dxl_comm_result, dxl_error = packetHandler.write1ByteTxRx(portHandler, dxl_id2, torque_on_address, data)
if dxl_comm_result != COMM_SUCCESS:
    print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
elif dxl_error != 0:
    print("%s" % packetHandler.getRxPacketError(dxl_error))
else:
    print("Dynamixel#2 has been successfully connected")

present_position_address = 132
data_length_4byte = 4
dxl_addparam_result = groupBulkRead.addParam(dxl_id1, present_position_address, data_length_4byte)
if dxl_addparam_result != True:
    print("[ID:%03d] groupBulkRead addparam failed" % dxl_id1)
    exit()

led_address = 65
data_length_1byte = 1
dxl_addparam_result = groupBulkRead.addParam(dxl_id2, led_address, data_length_1byte)
if dxl_addparam_result != True:
    print("[ID:%03d] groupBulkRead addparam failed" % dxl_id2)
    exit()

dxl2_led_value_read = 0
while True:
    try:
        target_position = int(input("Enter target position (0 ~ 4095, -1 to exit): "))
    except ValueError:
        print("Please enter an integer.")
        continue

    if target_position == -1:
        break
    elif target_position < 0 or target_position > 4095:
        print("Position must be between 0 and 4095.")
        continue

    param_goal_position = [
        DXL_LOBYTE(DXL_LOWORD(target_position)),
        DXL_HIBYTE(DXL_LOWORD(target_position)),
        DXL_LOBYTE(DXL_HIWORD(target_position)),
        DXL_HIBYTE(DXL_HIWORD(target_position))
    ]

    if (dxl2_led_value_read == 0):
        led_data = [1]
    else:
        led_data = [0]


    dxl_addparam_result = groupBulkWrite.addParam(dxl_id1, goal_position_address, data_length_4byte, param_goal_position)
    if not dxl_addparam_result:
        print("[ID:%03d] groupBulkWrite addparam failed" % dxl_id1)
        exit()

    dxl_addparam_result = groupBulkWrite.addParam(dxl_id2, led_address, data_length_1byte, led_data)
    if not dxl_addparam_result:
        print("[ID:%03d] groupBulkWrite addparam failed" % dxl_id2)
        exit()

    dxl_comm_result = groupBulkWrite.txPacket()
    if dxl_comm_result != COMM_SUCCESS:
        print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
    groupBulkWrite.clearParam()

    while True:
        dxl_comm_result = groupBulkRead.txRxPacket()
        if dxl_comm_result != COMM_SUCCESS:
            print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
        dxl_getdata_result = groupBulkRead.isAvailable(dxl_id1, present_position_address, data_length_4byte)
        if dxl_getdata_result != True:
            print("[ID:%03d] groupBulkRead getdata failed" % dxl_id1)
            quit()

        dxl_getdata_result = groupBulkRead.isAvailable(dxl_id2, led_address, data_length_1byte)
        if dxl_getdata_result != True:
            print("[ID:%03d] groupBulkRead getdata failed" % dxl_id2)
            quit()
        dxl1_present_position = groupBulkRead.getData(dxl_id1, present_position_address, data_length_4byte)
        dxl2_led_value_read = groupBulkRead.getData(dxl_id2, led_address, data_length_1byte)
        print("[ID:%03d] Present Position : %d \t [ID:%03d] LED Value: %d" % (dxl_id1, dxl1_present_position, dxl_id2, dxl2_led_value_read))
        if abs(target_position - dxl1_present_position) <= 10:
            break
portHandler.closePort()
#!/usr/bin/env python3

from dynamixel_sdk import *


portHandler = PortHandler('COM3')
packetHandler = PacketHandler(2.0)

goal_position_address = 116
present_position_address = 132
groupBulkWrite = GroupBulkWrite(portHandler, packetHandler)
groupBulkRead = GroupBulkRead(portHandler, packetHandler)

if portHandler.openPort():
  print("Succeeded to open the port!")
else:
  print("Failed to open the port!")
  exit()

if portHandler.setBaudRate(57600):
  print("Succeeded to change the baudrate!")
else:
  print("Failed to change the baudrate!")
  exit()

dxl_id1 = 1
dxl_id2 = 2
torque_on_address = 64
data = 1
dxl_comm_result, dxl_error = packetHandler.write1ByteTxRx(portHandler, dxl_id1, torque_on_address, data)
if dxl_comm_result != COMM_SUCCESS:
    print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
elif dxl_error != 0:
    print("%s" % packetHandler.getRxPacketError(dxl_error))
else:
    print("Dynamixel#1 has been successfully connected")

dxl_comm_result, dxl_error = packetHandler.write1ByteTxRx(portHandler, dxl_id2, torque_on_address, data)
if dxl_comm_result != COMM_SUCCESS:
    print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
elif dxl_error != 0:
    print("%s" % packetHandler.getRxPacketError(dxl_error))
else:
    print("Dynamixel#2 has been successfully connected")

present_position_address = 132
data_length_4byte = 4
dxl_addparam_result = groupBulkRead.addParam(dxl_id1, present_position_address, data_length_4byte)
if dxl_addparam_result != True:
    print("[ID:%03d] groupBulkRead addparam failed" % dxl_id1)
    exit()

led_address = 65
data_length_1byte = 1
dxl_addparam_result = groupBulkRead.addParam(dxl_id2, led_address, data_length_1byte)
if dxl_addparam_result != True:
    print("[ID:%03d] groupBulkRead addparam failed" % dxl_id2)
    exit()

dxl2_led_value_read = 0
while True:
    try:
        target_position = int(input("Enter target position (0 ~ 4095, -1 to exit): "))
    except ValueError:
        print("Please enter an integer.")
        continue

    if target_position == -1:
        break
    elif target_position < 0 or target_position > 4095:
        print("Position must be between 0 and 4095.")
        continue

    param_goal_position = [
        DXL_LOBYTE(DXL_LOWORD(target_position)),
        DXL_HIBYTE(DXL_LOWORD(target_position)),
        DXL_LOBYTE(DXL_HIWORD(target_position)),
        DXL_HIBYTE(DXL_HIWORD(target_position))
    ]

    if (dxl2_led_value_read == 0):
        led_data = [1]
    else:
        led_data = [0]


    dxl_addparam_result = groupBulkWrite.addParam(dxl_id1, goal_position_address, data_length_4byte, param_goal_position)
    if not dxl_addparam_result:
        print("[ID:%03d] groupBulkWrite addparam failed" % dxl_id1)
        exit()

    dxl_addparam_result = groupBulkWrite.addParam(dxl_id2, led_address, data_length_1byte, led_data)
    if not dxl_addparam_result:
        print("[ID:%03d] groupBulkWrite addparam failed" % dxl_id2)
        exit()

    dxl_comm_result = groupBulkWrite.txPacket()
    if dxl_comm_result != COMM_SUCCESS:
        print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
    groupBulkWrite.clearParam()

    while True:
        dxl_comm_result = groupBulkRead.txRxPacket()
        if dxl_comm_result != COMM_SUCCESS:
            print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
        dxl_getdata_result = groupBulkRead.isAvailable(dxl_id1, present_position_address, data_length_4byte)
        if dxl_getdata_result != True:
            print("[ID:%03d] groupBulkRead getdata failed" % dxl_id1)
            quit()

        dxl_getdata_result = groupBulkRead.isAvailable(dxl_id2, led_address, data_length_1byte)
        if dxl_getdata_result != True:
            print("[ID:%03d] groupBulkRead getdata failed" % dxl_id2)
            quit()
        dxl1_present_position = groupBulkRead.getData(dxl_id1, present_position_address, data_length_4byte)
        dxl2_led_value_read = groupBulkRead.getData(dxl_id2, led_address, data_length_1byte)
        print("[ID:%03d] Present Position : %d \t [ID:%03d] LED Value: %d" % (dxl_id1, dxl1_present_position, dxl_id2, dxl2_led_value_read))
        if abs(target_position - dxl1_present_position) <= 10:
            break
portHandler.closePort()