Edit on GitHub

Sync Read/Write Tutorial

NOTE: This tutorial is based on XL-430-W250 DYNAMIXEL motors and uses Protocol 2.0.

This section provides examples of how to write code in python to sync_read and sync_write data to DYNAMIXEL motors.
Sync Read/Write allows simultaneous access to the same address on multiple DYNAMIXEL motors.
We need two motors to operate simultaneously.

Make python file

  • Create a python file and open it in a text editor. In this case, we use visual studio code, but you can use any text editor you prefer.
    $ mkdir -p my_dxl_project/python
    $ cd my_dxl_project/python
    $ code my_sync_read_write.py
    

Make python file

  • Open the Visual Studio Code and create a python file in your workspace.

Source Code Description

Add Header Files

Initialize Handler Objects

  • Initialize the PortHandler,PacketHandler,GroupSyncWrite and GroupSyncRead. Set the port name and protocol version according to your DYNAMIXEL setup. The example below uses /dev/ttyUSB0 as the port name and 2.0 as the protocol version.
    portHandler = PortHandler('/dev/ttyUSB0')
    packetHandler = PacketHandler(2.0)
    
    goal_position_address = 116
    present_position_address = 132
    data_length_4byte = 4
    groupSyncWrite = GroupSyncWrite(portHandler, packetHandler, goal_position_address, data_length_4byte)
    groupSyncRead  = GroupSyncRead(portHandler, packetHandler, present_position_address, data_length_4byte)
    
  • Initialize the PortHandler,PacketHandler,GroupSyncWrite and GroupSyncRead. Set the port name and protocol version according to your DYNAMIXEL setup. The example below uses COM3 as the port name and 2.0 as the protocol version.
    portHandler = PortHandler('COM3')
    packetHandler = PacketHandler(2.0)
    
    goal_position_address = 116
    present_position_address = 132
    data_length_4byte = 4
    groupSyncWrite = GroupSyncWrite(portHandler, packetHandler, goal_position_address, data_length_4byte)
    groupSyncRead  = GroupSyncRead(portHandler, packetHandler, present_position_address, data_length_4byte)
    

Open Port and Set Baud Rate

ERROR LOGGING

openPort() and setBaudRate() functions return a boolean value indicating success or failure. If you want to check for failure, you can write like below to print an error message and exit the program.

if portHandler.openPort():
  print("Succeeded to open the port!")
else:
  print("Failed to open the port!")
  exit()

if portHandler.setBaudRate(57600):
  print("Succeeded to change the baudrate!")
else:
  print("Failed to change the baudrate!")
  exit()

Write data to enable torque

ERROR LOGGING

write1ByteTxRx() returns a communication result. If you want to check the communication result and error, you can write the code as shown below.

dxl_comm_result, dxl_error = packetHandler.write1ByteTxRx(portHandler, dxl_id1, torque_on_address, data)
if dxl_comm_result != COMM_SUCCESS:
    print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
elif dxl_error != 0:
    print("%s" % packetHandler.getRxPacketError(dxl_error))
else:
    print("Dynamixel#1 has been successfully connected")

dxl_comm_result, dxl_error = packetHandler.write1ByteTxRx(portHandler, dxl_id2, torque_on_address, data)
if dxl_comm_result != COMM_SUCCESS:
    print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
elif dxl_error != 0:
    print("%s" % packetHandler.getRxPacketError(dxl_error))
else:
    print("Dynamixel#2 has been successfully connected")

Add parameters to GroupSyncRead

ERROR LOGGING

addParam() returns a communication result as boolean. If you want to check the communication result and error, you can write the code as shown below.

dxl_addparam_result = groupSyncRead.addParam(dxl_id1)
if dxl_addparam_result != True:
    print("[ID:%03d] groupSyncRead addparam failed" % dxl_id1)
    exit()

dxl_addparam_result = groupSyncRead.addParam(dxl_id2)
if dxl_addparam_result != True:
    print("[ID:%03d] groupSyncRead addparam failed" % dxl_id2)
    exit()

Get User Input and Set Data

Add parameters to GroupSyncWrite

ERROR LOGGING

txPacket() and addParam() returns a communication result. If you want to check the communication result and error, you can write the code as shown below.

    dxl_addparam_result = groupSyncWrite.addParam(dxl_id1, param_goal_position)
    if not dxl_addparam_result:
        print("[ID:%03d] groupSyncWrite addparam failed" % dxl_id1)
        exit()

    dxl_addparam_result = groupSyncWrite.addParam(dxl_id2, param_goal_position)
    if not dxl_addparam_result:
        print("[ID:%03d] groupSyncWrite addparam failed" % dxl_id2)
        exit()

    dxl_comm_result = groupSyncWrite.txPacket()
    if dxl_comm_result != COMM_SUCCESS:
        print("%s" % packetHandler.getTxRxResult(dxl_comm_result))

Read data to get current position

ERROR LOGGING

txRxPacket() and isAvailable() returns a communication result . If you want to check the communication result and error, you can write the code as shown below.

        dxl_comm_result = groupSyncRead.txRxPacket()
        if dxl_comm_result != COMM_SUCCESS:
            print("%s" % packetHandler.getTxRxResult(dxl_comm_result))

You can also check if the data is available in the GroupSyncRead by using the isAvailable() function. If the data is not available, you can print an error message and exit the program.

        dxl_getdata_result = groupSyncRead.isAvailable(dxl_id1, present_position_address, data_length_4byte)
        if dxl_getdata_result != True:
            print("[ID:%03d] groupSyncRead getdata failed" % dxl_id1)
            quit()

        dxl_getdata_result = groupSyncRead.isAvailable(dxl_id2, present_position_address, data_length_4byte)
        if dxl_getdata_result != True:
            print("[ID:%03d] groupSyncRead getdata failed" % dxl_id2)
            quit()

Run the Code

  • Run the code using python3.
    $ python3 my_sync_read_write.py
    
  • Run the code through Visual Studio Code.

Full Source Code

#!/usr/bin/env python3

from dynamixel_sdk import *


portHandler = PortHandler('/dev/ttyUSB0')
packetHandler = PacketHandler(2.0)

goal_position_address = 116
present_position_address = 132
data_length_4byte = 4
groupSyncWrite = GroupSyncWrite(portHandler, packetHandler, goal_position_address, data_length_4byte)
groupSyncRead  = GroupSyncRead(portHandler, packetHandler, present_position_address, data_length_4byte)

if portHandler.openPort():
  print("Succeeded to open the port!")
else:
  print("Failed to open the port!")
  exit()

if portHandler.setBaudRate(57600):
  print("Succeeded to change the baudrate!")
else:
  print("Failed to change the baudrate!")
  exit()

dxl_id1 = 1
dxl_id2 = 2
torque_on_address = 64
data = 1
dxl_comm_result, dxl_error = packetHandler.write1ByteTxRx(portHandler, dxl_id1, torque_on_address, data)
if dxl_comm_result != COMM_SUCCESS:
    print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
elif dxl_error != 0:
    print("%s" % packetHandler.getRxPacketError(dxl_error))
else:
    print("Dynamixel#1 has been successfully connected")

dxl_comm_result, dxl_error = packetHandler.write1ByteTxRx(portHandler, dxl_id2, torque_on_address, data)
if dxl_comm_result != COMM_SUCCESS:
    print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
elif dxl_error != 0:
    print("%s" % packetHandler.getRxPacketError(dxl_error))
else:
    print("Dynamixel#2 has been successfully connected")

dxl_addparam_result = groupSyncRead.addParam(dxl_id1)
if dxl_addparam_result != True:
    print("[ID:%03d] groupSyncRead addparam failed" % dxl_id1)
    exit()

dxl_addparam_result = groupSyncRead.addParam(dxl_id2)
if dxl_addparam_result != True:
    print("[ID:%03d] groupSyncRead addparam failed" % dxl_id2)
    exit()

while True:
    try:
        target_position = int(input("Enter target position (0 ~ 4095, -1 to exit): "))
    except ValueError:
        print("Please enter an integer.")
        continue

    if target_position == -1:
        break
    elif target_position < 0 or target_position > 4095:
        print("Position must be between 0 and 4095.")
        continue

    param_goal_position = [
        DXL_LOBYTE(DXL_LOWORD(target_position)),
        DXL_HIBYTE(DXL_LOWORD(target_position)),
        DXL_LOBYTE(DXL_HIWORD(target_position)),
        DXL_HIBYTE(DXL_HIWORD(target_position))
    ]

    dxl_addparam_result = groupSyncWrite.addParam(dxl_id1, param_goal_position)
    if not dxl_addparam_result:
        print("[ID:%03d] groupSyncWrite addparam failed" % dxl_id1)
        exit()

    dxl_addparam_result = groupSyncWrite.addParam(dxl_id2, param_goal_position)
    if not dxl_addparam_result:
        print("[ID:%03d] groupSyncWrite addparam failed" % dxl_id2)
        exit()

    dxl_comm_result = groupSyncWrite.txPacket()
    if dxl_comm_result != COMM_SUCCESS:
        print("%s" % packetHandler.getTxRxResult(dxl_comm_result))

    groupSyncWrite.clearParam()
    while True:
        dxl_comm_result = groupSyncRead.txRxPacket()
        if dxl_comm_result != COMM_SUCCESS:
            print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
        dxl_getdata_result = groupSyncRead.isAvailable(dxl_id1, present_position_address, data_length_4byte)
        if dxl_getdata_result != True:
            print("[ID:%03d] groupSyncRead getdata failed" % dxl_id1)
            quit()

        dxl_getdata_result = groupSyncRead.isAvailable(dxl_id2, present_position_address, data_length_4byte)
        if dxl_getdata_result != True:
            print("[ID:%03d] groupSyncRead getdata failed" % dxl_id2)
            quit()
        dxl1_present_position = groupSyncRead.getData(dxl_id1, present_position_address, data_length_4byte)
        dxl2_present_position = groupSyncRead.getData(dxl_id2, present_position_address, data_length_4byte)
        print("[ID:%03d] GoalPos:%03d  PresPos:%03d\t[ID:%03d] GoalPos:%03d  PresPos:%03d" % (dxl_id1, target_position, dxl1_present_position, dxl_id2, target_position, dxl2_present_position))
        if abs(target_position - dxl1_present_position) <= 10 and abs(target_position - dxl2_present_position) <= 10:
            break
portHandler.closePort()
#!/usr/bin/env python3

from dynamixel_sdk import *


portHandler = PortHandler('COM3')
packetHandler = PacketHandler(2.0)

goal_position_address = 116
present_position_address = 132
data_length_4byte = 4
groupSyncWrite = GroupSyncWrite(portHandler, packetHandler, goal_position_address, data_length_4byte)
groupSyncRead  = GroupSyncRead(portHandler, packetHandler, present_position_address, data_length_4byte)

if portHandler.openPort():
  print("Succeeded to open the port!")
else:
  print("Failed to open the port!")
  exit()

if portHandler.setBaudRate(57600):
  print("Succeeded to change the baudrate!")
else:
  print("Failed to change the baudrate!")
  exit()

dxl_id1 = 1
dxl_id2 = 2
torque_on_address = 64
data = 1
dxl_comm_result, dxl_error = packetHandler.write1ByteTxRx(portHandler, dxl_id1, torque_on_address, data)
if dxl_comm_result != COMM_SUCCESS:
    print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
elif dxl_error != 0:
    print("%s" % packetHandler.getRxPacketError(dxl_error))
else:
    print("Dynamixel#1 has been successfully connected")

dxl_comm_result, dxl_error = packetHandler.write1ByteTxRx(portHandler, dxl_id2, torque_on_address, data)
if dxl_comm_result != COMM_SUCCESS:
    print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
elif dxl_error != 0:
    print("%s" % packetHandler.getRxPacketError(dxl_error))
else:
    print("Dynamixel#2 has been successfully connected")

dxl_addparam_result = groupSyncRead.addParam(dxl_id1)
if dxl_addparam_result != True:
    print("[ID:%03d] groupSyncRead addparam failed" % dxl_id1)
    exit()

dxl_addparam_result = groupSyncRead.addParam(dxl_id2)
if dxl_addparam_result != True:
    print("[ID:%03d] groupSyncRead addparam failed" % dxl_id2)
    exit()

while True:
    try:
        target_position = int(input("Enter target position (0 ~ 4095, -1 to exit): "))
    except ValueError:
        print("Please enter an integer.")
        continue

    if target_position == -1:
        break
    elif target_position < 0 or target_position > 4095:
        print("Position must be between 0 and 4095.")
        continue

    param_goal_position = [
        DXL_LOBYTE(DXL_LOWORD(target_position)),
        DXL_HIBYTE(DXL_LOWORD(target_position)),
        DXL_LOBYTE(DXL_HIWORD(target_position)),
        DXL_HIBYTE(DXL_HIWORD(target_position))
    ]

    dxl_addparam_result = groupSyncWrite.addParam(dxl_id1, param_goal_position)
    if not dxl_addparam_result:
        print("[ID:%03d] groupSyncWrite addparam failed" % dxl_id1)
        exit()

    dxl_addparam_result = groupSyncWrite.addParam(dxl_id2, param_goal_position)
    if not dxl_addparam_result:
        print("[ID:%03d] groupSyncWrite addparam failed" % dxl_id2)
        exit()

    dxl_comm_result = groupSyncWrite.txPacket()
    if dxl_comm_result != COMM_SUCCESS:
        print("%s" % packetHandler.getTxRxResult(dxl_comm_result))

    groupSyncWrite.clearParam()
    while True:
        dxl_comm_result = groupSyncRead.txRxPacket()
        if dxl_comm_result != COMM_SUCCESS:
            print("%s" % packetHandler.getTxRxResult(dxl_comm_result))
        dxl_getdata_result = groupSyncRead.isAvailable(dxl_id1, present_position_address, data_length_4byte)
        if dxl_getdata_result != True:
            print("[ID:%03d] groupSyncRead getdata failed" % dxl_id1)
            quit()

        dxl_getdata_result = groupSyncRead.isAvailable(dxl_id2, present_position_address, data_length_4byte)
        if dxl_getdata_result != True:
            print("[ID:%03d] groupSyncRead getdata failed" % dxl_id2)
            quit()
        dxl1_present_position = groupSyncRead.getData(dxl_id1, present_position_address, data_length_4byte)
        dxl2_present_position = groupSyncRead.getData(dxl_id2, present_position_address, data_length_4byte)
        print("[ID:%03d] GoalPos:%03d  PresPos:%03d\t[ID:%03d] GoalPos:%03d  PresPos:%03d" % (dxl_id1, target_position, dxl1_present_position, dxl_id2, target_position, dxl2_present_position))
        if abs(target_position - dxl1_present_position) <= 10 and abs(target_position - dxl2_present_position) <= 10:
            break
portHandler.closePort()