From add68f78ef99a26317cdda2037ff53e9f3579caa Mon Sep 17 00:00:00 2001 From: Felix Exner Date: Wed, 17 Dec 2025 12:27:02 +0000 Subject: [PATCH 01/11] Avoid allocating memory during parsing Creating new objects on the heap isn't really RT-capable and undeterministic. This adds the possibility to parse incoming data packages into a pre-allocated package to avoid having to create new objects on the fly. To achieve this, the pipeline in the RTDE client has been replaced by a double buffer with two pre-allocated objects. --- CMakeLists.txt | 1 + include/ur_client_library/comm/package.h | 6 +- include/ur_client_library/comm/parser.h | 19 +- include/ur_client_library/comm/pipeline.h | 11 ++ include/ur_client_library/comm/producer.h | 111 ++++++----- .../primary/primary_parser.h | 32 +++- include/ur_client_library/rtde/rtde_client.h | 86 +++++++-- include/ur_client_library/rtde/rtde_package.h | 9 + include/ur_client_library/rtde/rtde_parser.h | 101 +++------- include/ur_client_library/ur/ur_driver.h | 43 ++++- src/rtde/data_package.cpp | 16 +- src/rtde/rtde_client.cpp | 162 +++++++++++----- src/rtde/rtde_parser.cpp | 176 ++++++++++++++++++ src/ur/ur_driver.cpp | 9 +- 14 files changed, 584 insertions(+), 198 deletions(-) create mode 100644 src/rtde/rtde_parser.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index ab44d1cb3..50eab4edb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,6 +40,7 @@ add_library(urcl src/rtde/get_urcontrol_version.cpp src/rtde/request_protocol_version.cpp src/rtde/rtde_package.cpp + src/rtde/rtde_parser.cpp src/rtde/text_message.cpp src/rtde/rtde_client.cpp src/ur/ur_driver.cpp diff --git a/include/ur_client_library/comm/package.h b/include/ur_client_library/comm/package.h index f014a37e8..8668acf1f 100644 --- a/include/ur_client_library/comm/package.h +++ b/include/ur_client_library/comm/package.h @@ -35,9 +35,9 @@ namespace urcl namespace comm { /*! - * \brief The URPackage a parent class. From that two implementations are inherited, - * one for the primary, one for the rtde interface (primary_interface::primaryPackage; - * rtde_interface::rtdePackage). The URPackage makes use of the template HeaderT. + * \brief The URPackage is an abstract class. From that two implementations are inherited, + * one for the primary, one for the RTDE interface (primary_interface::PrimaryPackage; + * rtde_interface::RTDEPackage). The URPackage makes use of the template HeaderT. */ template class URPackage diff --git a/include/ur_client_library/comm/parser.h b/include/ur_client_library/comm/parser.h index 66d98d19a..08a868948 100644 --- a/include/ur_client_library/comm/parser.h +++ b/include/ur_client_library/comm/parser.h @@ -41,13 +41,28 @@ class Parser virtual ~Parser() = default; /*! - * \brief Declares the parse function. + * \brief Parses data from a binary parser into a vector of packages. + * + * This will create new packages and push them to the results vector, thus new memory will be + * allocated for each package. Do not use this in a real-time context. * * \param bp Instance of class binaryParser - * \param results A unique pointer + * \param results A vector of unique pointers to packages */ virtual bool parse(BinParser& bp, std::vector>& results) = 0; + /*! + * \brief Parses data from a binary parser into a single package. + * + * This implementation may try to store data in the passed package instance to avoid memory + * allocations. Thus, this function may be used in a real-time context. Refer to the specific + * implementation and parser for details. + * + * \param bp Instance of class binaryParser + * \param result A unique pointer to a package where parsed data should be stored. + */ + virtual bool parse(BinParser& bp, std::unique_ptr& result) = 0; + private: typename T::HeaderType header_; // URProducer producer_; diff --git a/include/ur_client_library/comm/pipeline.h b/include/ur_client_library/comm/pipeline.h index aafd0f754..3b55faab0 100644 --- a/include/ur_client_library/comm/pipeline.h +++ b/include/ur_client_library/comm/pipeline.h @@ -245,6 +245,17 @@ class IProducer * \returns Success of the package production. */ virtual bool tryGet(std::vector>& products) = 0; + + /*! + * \brief Reads a single package from some source and produces the corresponding object. + * + * If possible this function should try to reuse existing memory in the passed unique pointer. + * + * \param product Unique pointer to be set to the produced package. + * + * \returns Success of the package production. + */ + virtual bool tryGet(std::unique_ptr& product) = 0; }; /*! diff --git a/include/ur_client_library/comm/producer.h b/include/ur_client_library/comm/producer.h index 19a929a70..221622071 100644 --- a/include/ur_client_library/comm/producer.h +++ b/include/ur_client_library/comm/producer.h @@ -47,6 +47,57 @@ class URProducer : public IProducer bool running_; + template + bool tryGetImpl(ProductT& product) + { + // TODO This function has become really ugly! That should be refactored! + + // 4KB should be enough to hold any packet received from UR + uint8_t buf[4096]; + size_t read = 0; + // expoential backoff reconnects + while (true) + { + if (stream_.read(buf, sizeof(buf), read)) + { + // reset sleep amount + timeout_ = std::chrono::seconds(1); + BinParser bp(buf, read); + return parser_.parse(bp, product); + } + + if (!running_) + return true; + + if (stream_.getState() == SocketState::Connected) + { + continue; + } + + if (stream_.closed()) + return false; + + if (on_reconnect_cb_) + { + URCL_LOG_WARN("Failed to read from stream, invoking on reconnect callback and stopping the producer"); + on_reconnect_cb_(); + return false; + } + + URCL_LOG_WARN("Failed to read from stream, reconnecting in %ld seconds...", timeout_.count()); + std::this_thread::sleep_for(timeout_); + + if (stream_.connect()) + continue; + + auto next = timeout_ * 2; + if (next <= std::chrono::seconds(120)) + timeout_ = next; + } + + return false; + } + public: /*! * \brief Creates a URProducer object, registering a stream and a parser. @@ -106,52 +157,22 @@ class URProducer : public IProducer */ bool tryGet(std::vector>& products) override { - // TODO This function has become really ugly! That should be refactored! - - // 4KB should be enough to hold any packet received from UR - uint8_t buf[4096]; - size_t read = 0; - // expoential backoff reconnects - while (true) - { - if (stream_.read(buf, sizeof(buf), read)) - { - // reset sleep amount - timeout_ = std::chrono::seconds(1); - BinParser bp(buf, read); - return parser_.parse(bp, products); - } - - if (!running_) - return true; - - if (stream_.getState() == SocketState::Connected) - { - continue; - } - - if (stream_.closed()) - return false; - - if (on_reconnect_cb_) - { - URCL_LOG_WARN("Failed to read from stream, invoking on reconnect callback and stopping the producer"); - on_reconnect_cb_(); - return false; - } - - URCL_LOG_WARN("Failed to read from stream, reconnecting in %ld seconds...", timeout_.count()); - std::this_thread::sleep_for(timeout_); - - if (stream_.connect()) - continue; - - auto next = timeout_ * 2; - if (next <= std::chrono::seconds(120)) - timeout_ = next; - } + return tryGetImpl(products); + } - return false; + /*! + * \brief Attempts to read byte stream from the robot and parse it as a URPackage. + * + * If supported by the parser, this function will try to reuse existing memory in the passed + * unique pointer. + * + * \param product Unique pointer to hold the produced package + * + * \returns Success of reading and parsing the package + */ + bool tryGet(std::unique_ptr& product) override + { + return tryGetImpl(product); } /*! diff --git a/include/ur_client_library/primary/primary_parser.h b/include/ur_client_library/primary/primary_parser.h index d707f7588..b7058d975 100644 --- a/include/ur_client_library/primary/primary_parser.h +++ b/include/ur_client_library/primary/primary_parser.h @@ -55,7 +55,7 @@ class PrimaryParser : public comm::Parser * \returns True, if the byte stream could successfully be parsed as primary packages, false * otherwise */ - bool parse(comm::BinParser& bp, std::vector>& results) + bool parse(comm::BinParser& bp, std::vector>& results) override { int32_t packet_size; RobotPackageType type; @@ -147,6 +147,36 @@ class PrimaryParser : public comm::Parser return true; } + /** + * \brief Uses the given BinParser to create a single package object from the contained serialization. + * + * Note: This function assumes that the byte stream contains exactly one primary package. For + * packages with sub-packages this will return false. + * + * \param bp A BinParser holding one serialized primary package + * \param results A unique pointer to hold the created primary package object + * + * \returns True, if the byte stream could successfully be parsed as a primary package, false + * otherwise + */ + bool parse(comm::BinParser& bp, std::unique_ptr& results) override + { + std::vector> packages; + if (!parse(bp, packages)) + { + return false; + } + + if (packages.size() != 1) + { + URCL_LOG_ERROR("Expected exactly one primary package, but received %zu", packages.size()); + return false; + } + + results = std::move(packages[0]); + return true; + } + private: RobotState* stateFromType(RobotStateType type) { diff --git a/include/ur_client_library/rtde/rtde_client.h b/include/ur_client_library/rtde/rtde_client.h index 5ebc76ce3..c135395fb 100644 --- a/include/ur_client_library/rtde/rtde_client.h +++ b/include/ur_client_library/rtde/rtde_client.h @@ -31,21 +31,14 @@ #include -#include "ur_client_library/comm/pipeline.h" -#include "ur_client_library/rtde/package_header.h" -#include "ur_client_library/rtde/rtde_package.h" -#include "ur_client_library/comm/stream.h" -#include "ur_client_library/rtde/rtde_parser.h" #include "ur_client_library/comm/producer.h" +#include "ur_client_library/comm/stream.h" #include "ur_client_library/rtde/data_package.h" -#include "ur_client_library/rtde/request_protocol_version.h" -#include "ur_client_library/rtde/control_package_setup_outputs.h" -#include "ur_client_library/rtde/control_package_start.h" -#include "ur_client_library/log.h" +#include "ur_client_library/rtde/rtde_package.h" +#include "ur_client_library/rtde/rtde_parser.h" #include "ur_client_library/rtde/rtde_writer.h" static const int UR_RTDE_PORT = 30004; -static const std::string PIPELINE_NAME = "RTDE Data Pipeline"; namespace urcl { @@ -96,11 +89,11 @@ class RTDEClient public: RTDEClient() = delete; /*! - * \brief Creates a new RTDEClient object, including a used URStream and Pipeline to handle the + * \brief Creates a new RTDEClient object, including a used URStream and Producer to handle the * communication with the robot. * * \param robot_ip The IP of the robot - * \param notifier The notifier to use in the pipeline + * \param notifier The notifier to notify of start and stop events * \param output_recipe_file Path to the file containing the output recipe * \param input_recipe_file Path to the file containing the input recipe * \param target_frequency Frequency to run at. Defaults to 0.0 which means maximum frequency. @@ -112,11 +105,11 @@ class RTDEClient bool ignore_unavailable_outputs = false); /*! - * \brief Creates a new RTDEClient object, including a used URStream and Pipeline to handle the + * \brief Creates a new RTDEClient object, including a used URStream and Producer to handle the * communication with the robot. * * \param robot_ip The IP of the robot - * \param notifier The notifier to use in the pipeline + * \param notifier The notifier to notify of start and stop events * \param output_recipe Vector containing the output recipe * \param input_recipe Vector containing the input recipe * \param target_frequency Frequency to run at. Defaults to 0.0 which means maximum frequency. @@ -127,6 +120,7 @@ class RTDEClient const std::vector& input_recipe, double target_frequency = 0.0, bool ignore_unavailable_outputs = false); ~RTDEClient(); + /*! * \brief Sets up RTDE communication with the robot. The handshake includes negotiation of the * used protocol version and setting of input and output recipes. @@ -149,17 +143,31 @@ class RTDEClient /*! * \brief Triggers the robot to start sending RTDE data packages in the negotiated format. * + * \param read_packages_in_background Whether to start a background thread to read packages from. + * If packages are read in the background, getDataPackage() will return the latest one received. + * If packages aren't read in the background, the application is required to call + * getDataPackageBlocking() frequently. + * * \returns Success of the requested start */ - bool start(); + bool start(const bool read_packages_in_background = true); + /*! * \brief Pauses RTDE data package communication * * \returns Whether the RTDE data package communication was paused successfully */ bool pause(); + /*! - * \brief Reads the pipeline to fetch the next data package. + * \brief Return the latest data package received + * + * When packages are read from the background thread, this will return the latest data package + * received from the robot. When no new data has been received since the last call to this + * function, it will wait for the time specified in the \p timeout parameter. + * + * When packages are not read from the background thread, this function will return nullptr and + * print an error message. * * \param timeout Time to wait if no data package is currently in the queue * @@ -167,6 +175,21 @@ class RTDEClient */ std::unique_ptr getDataPackage(std::chrono::milliseconds timeout); + /*! + * \brief Blocking call to get the next data package received from the robot. + * + * This function will block until a new data package is received from the robot and return it. + * + * \param data_package Reference to a unique ptr where the received data package will be stored. + * For optimal performance, the data package pointer should contain a pre-allocated data package + * that was initialized with the same output recipe as used in this RTDEClient. If it is not an + * initialized data package, a new one will be allocated internally which will have a negative + * performance impact and print a warning. + * + * \returns Whether a data package was received successfully + */ + bool getDataPackageBlocking(std::unique_ptr& data_package); + /*! * \brief Getter for the maximum frequency the robot can publish RTDE data packages with. * @@ -232,9 +255,27 @@ class RTDEClient return input_recipe_; } - // Reads output or input recipe from a file + /// Reads output or input recipe from a file and parses it into a vector of strings where each + /// string is a line from the file. static std::vector readRecipe(const std::string& recipe_file); + /*! \brief Starts a background thread to read data packages from the robot. + * + * After calling this function, getDataPackage() can be used to get the latest data package + * received from the robot. + */ + void startBackgroundRead(); + + /*! \brief Stops the background thread reading data packages from the robot. + * + * After calling this function, getDataPackageBlocking() must be used to get data packages + * from the robot. + * + * \note When getDataPackageBlocking() is not called frequently enough, the internal buffer + * of received data packages might fill up, causing the robot to shutdown the RTDE connection. + */ + void stopBackgroundRead(); + private: comm::URStream stream_; std::vector output_recipe_; @@ -243,7 +284,6 @@ class RTDEClient RTDEParser parser_; std::unique_ptr> prod_; comm::INotifier notifier_; - std::unique_ptr> pipeline_; RTDEWriter writer_; std::atomic reconnecting_; std::atomic stop_reconnection_; @@ -255,6 +295,14 @@ class RTDEClient double max_frequency_; double target_frequency_; + std::unique_ptr data_buffer0_; + std::unique_ptr data_buffer1_; + std::mutex read_mutex_; + std::mutex write_mutex_; + std::atomic new_data_ = false; + std::atomic background_read_running_ = false; + std::thread background_read_thread_; + ClientState client_state_; constexpr static const double CB3_MAX_FREQUENCY = 125.0; @@ -310,6 +358,8 @@ class RTDEClient */ void reconnect(); void reconnectCallback(); + + void backgroundReadThreadFunc(); }; } // namespace rtde_interface diff --git a/include/ur_client_library/rtde/rtde_package.h b/include/ur_client_library/rtde/rtde_package.h index 23156b990..08019aa13 100644 --- a/include/ur_client_library/rtde/rtde_package.h +++ b/include/ur_client_library/rtde/rtde_package.h @@ -61,6 +61,7 @@ class RTDEPackage : public comm::URPackage * \returns True, if the package was parsed successfully, false otherwise */ virtual bool parseWith(comm::BinParser& bp); + /*! * \brief Produces a human readable representation of the package object. * @@ -68,6 +69,14 @@ class RTDEPackage : public comm::URPackage */ virtual std::string toString() const; + /*! + * \brief Returns the type of the RTDE package. + */ + PackageType getType() const + { + return type_; + } + protected: std::unique_ptr buffer_; size_t buffer_length_; diff --git a/include/ur_client_library/rtde/rtde_parser.h b/include/ur_client_library/rtde/rtde_parser.h index 7ea4d131b..416dddd4a 100644 --- a/include/ur_client_library/rtde/rtde_parser.h +++ b/include/ur_client_library/rtde/rtde_parser.h @@ -56,6 +56,21 @@ class RTDEParser : public comm::Parser } virtual ~RTDEParser() = default; + /*! + * \brief Uses the given BinParser to fill single package object from the contained serialization. + * + * + * \param bp A BinParser holding a serialized RTDE package + * \param result A pointer to the created RTDE package object. Ideally, the passed \p result is a pre-allocated + * package of the type expected to be read. For example, when RTDE communication has been setup it enters the data + * communication phase, where the expected package is a DataPackage. If the package content inside the \p bp object + * being doesn't match the result package's type or if the \p result is a nullptr, a new package will be allocated. + * + * \returns True, if the byte stream could successfully be parsed as an RTDE package, false + * otherwise + */ + bool parse(comm::BinParser& bp, std::unique_ptr& result) override; + /*! * \brief Uses the given BinParser to create package objects from the contained serialization. * @@ -65,93 +80,23 @@ class RTDEParser : public comm::Parser * \returns True, if the byte stream could successfully be parsed as RTDE packages, false * otherwise */ - bool parse(comm::BinParser& bp, std::vector>& results) + bool parse(comm::BinParser& bp, std::vector>& results) override; + void setProtocolVersion(uint16_t protocol_version) { - PackageHeader::_package_size_type size; - PackageType type; - bp.parse(size); - bp.parse(type); - - if (!bp.checkSize(size - sizeof(size) - sizeof(type))) - { - URCL_LOG_ERROR("Buffer len shorter than expected packet length"); - return false; - } - - switch (type) - { - case PackageType::RTDE_DATA_PACKAGE: - { - std::unique_ptr package(new DataPackage(recipe_, protocol_version_)); - - if (!package->parseWith(bp)) - { - URCL_LOG_ERROR("Package parsing of type %d failed!", static_cast(type)); - return false; - } - results.push_back(std::move(package)); - break; - } - default: - { - std::unique_ptr package(packageFromType(type)); - if (!package->parseWith(bp)) - { - URCL_LOG_ERROR("Package parsing of type %d failed!", static_cast(type)); - return false; - } - - results.push_back(std::move(package)); - break; - } - } - if (!bp.empty()) - { - URCL_LOG_ERROR("Package of type %d was not parsed completely!", static_cast(type)); - bp.debug(); - return false; - } - - return true; + protocol_version_ = protocol_version; } - void setProtocolVersion(uint16_t protocol_version) + uint16_t getProtocolVersion() const { - protocol_version_ = protocol_version; + return protocol_version_; } private: std::vector recipe_; - RTDEPackage* packageFromType(PackageType type) - { - switch (type) - { - case PackageType::RTDE_TEXT_MESSAGE: - return new TextMessage(protocol_version_); - break; - case PackageType::RTDE_GET_URCONTROL_VERSION: - return new GetUrcontrolVersion; - break; - case PackageType::RTDE_REQUEST_PROTOCOL_VERSION: - return new RequestProtocolVersion; - break; - case PackageType::RTDE_CONTROL_PACKAGE_PAUSE: - return new ControlPackagePause; - break; - case PackageType::RTDE_CONTROL_PACKAGE_SETUP_INPUTS: - return new ControlPackageSetupInputs; - break; - case PackageType::RTDE_CONTROL_PACKAGE_SETUP_OUTPUTS: - return new ControlPackageSetupOutputs(protocol_version_); - break; - case PackageType::RTDE_CONTROL_PACKAGE_START: - return new ControlPackageStart; - break; - default: - return new RTDEPackage(type); - } - } + PackageType getPackageTypeFromHeader(comm::BinParser& bp) const; + RTDEPackage* createNewPackageFromType(PackageType type) const; + uint16_t protocol_version_; }; diff --git a/include/ur_client_library/ur/ur_driver.h b/include/ur_client_library/ur/ur_driver.h index e61a8e077..5e450dfce 100644 --- a/include/ur_client_library/ur/ur_driver.h +++ b/include/ur_client_library/ur/ur_driver.h @@ -449,6 +449,21 @@ class UrDriver */ std::unique_ptr getDataPackage(); + /*! + * \brief Blocking call to get the next data package received from the robot. + * + * This function will block until a new data package is received from the robot and return it. + * + * \param data_package Reference to a unique ptr where the received data package will be stored. + * For optimal performance, the data package pointer should contain a pre-allocated data package + * that was initialized with the same output recipe as used in this RTDEClient. If it is not an + * initialized data package, a new one will be allocated internally which will have a negative + * performance impact and print a warning. + * + * \returns Whether a data package was received successfully + */ + bool getDataPackageBlocking(std::unique_ptr& data_package); + uint32_t getControlFrequency() const { return rtde_client_->getTargetFrequency(); @@ -778,8 +793,12 @@ class UrDriver * * After initialization, the cyclic RTDE communication is not started automatically, so that data * consumers can be started also at a later point. + * + * \param read_packages_in_background If true, RTDE packages will be read in a background thread. + * In this case use getDataPackage() to receive the latest package. If set to false, + * getDataPackageBlocking() has to be called frequently. */ - void startRTDECommunication(); + void startRTDECommunication(const bool read_packages_in_background = true); /*! * \brief Sends a stop command to the socket interface which will signal the program running on @@ -1004,6 +1023,26 @@ class UrDriver return primary_client_; } + /*! \brief Enable or disable background reading of RTDE packages. + * + * When enabled, RTDE packages will be read in a background thread. + * In this case use getDataPackage() to receive the latest package. If set to false, + * getDataPackageBlocking() has to be called frequently if RTDE communication is active. + * + * \param enabled Whether background reading should be enabled or disabled. + */ + void setRTDEBackgroundReadEnabled(const bool enabled) + { + if (enabled) + { + rtde_client_->startBackgroundRead(); + } + else + { + rtde_client_->stopBackgroundRead(); + } + } + private: void init(const UrDriverConfiguration& config); @@ -1044,4 +1083,4 @@ class UrDriver VersionInformation robot_version_; }; } // namespace urcl -#endif // ifndef UR_CLIENT_LIBRARY_UR_UR_DRIVER_H_INCLUDED \ No newline at end of file +#endif // ifndef UR_CLIENT_LIBRARY_UR_UR_DRIVER_H_INCLUDED diff --git a/src/rtde/data_package.cpp b/src/rtde/data_package.cpp index a92ae14e6..6581e8cc2 100644 --- a/src/rtde/data_package.cpp +++ b/src/rtde/data_package.cpp @@ -480,9 +480,10 @@ bool rtde_interface::DataPackage::parseWith(comm::BinParser& bp) { if (g_type_list.find(item) != g_type_list.end()) { - _rtde_type_variant entry = g_type_list[item]; - std::visit([&bp](auto&& arg) { bp.parse(arg); }, entry); - data_[item] = entry; + // _rtde_type_variant entry = g_type_list[item]; + // _rtde_type_variant existing_entry = data_[item]; + + std::visit([&bp](auto&& arg) { bp.parse(arg); }, data_[item]); } else { @@ -498,7 +499,14 @@ std::string rtde_interface::DataPackage::toString() const for (auto& item : data_) { ss << item.first << ": "; - std::visit([&ss](auto&& arg) { ss << arg; }, item.second); + if (std::holds_alternative(item.second)) + { + ss << int(std::get(item.second)); + } + else + { + std::visit([&ss](auto&& arg) { ss << arg; }, item.second); + } ss << std::endl; } return ss.str(); diff --git a/src/rtde/rtde_client.cpp b/src/rtde/rtde_client.cpp index 4ed2f3cd6..ee399616a 100644 --- a/src/rtde/rtde_client.cpp +++ b/src/rtde/rtde_client.cpp @@ -44,7 +44,6 @@ RTDEClient::RTDEClient(std::string robot_ip, comm::INotifier& notifier, const st , parser_(output_recipe_) , prod_(std::make_unique>(stream_, parser_)) , notifier_(notifier) - , pipeline_(std::make_unique>(*prod_, PIPELINE_NAME, notifier, true)) , writer_(&stream_, input_recipe_) , reconnecting_(false) , stop_reconnection_(false) @@ -69,7 +68,6 @@ RTDEClient::RTDEClient(std::string robot_ip, comm::INotifier& notifier, const st , parser_(output_recipe_) , prod_(std::make_unique>(stream_, parser_)) , notifier_(notifier) - , pipeline_(std::make_unique>(*prod_, PIPELINE_NAME, notifier, true)) , writer_(&stream_, input_recipe_) , reconnecting_(false) , stop_reconnection_(false) @@ -87,6 +85,7 @@ RTDEClient::~RTDEClient() { reconnecting_thread_.join(); } + stopBackgroundRead(); disconnect(); } @@ -121,8 +120,6 @@ bool RTDEClient::init(const size_t max_connection_attempts, const std::chrono::m URCL_LOG_ERROR("Failed to initialize RTDE client, retrying in %d seconds", initialization_timeout.count() / 1000); std::this_thread::sleep_for(initialization_timeout); } - // Stop pipeline again - pipeline_->stop(); client_state_ = ClientState::INITIALIZED; // Set reconnection callback after we are initialized to ensure that a disconnect during initialization doesn't // trigger a reconnect @@ -132,19 +129,8 @@ bool RTDEClient::init(const size_t max_connection_attempts, const std::chrono::m bool RTDEClient::setupCommunication(const size_t max_num_tries, const std::chrono::milliseconds reconnection_time) { - // The state initializing is used inside disconnect to stop the pipeline again. client_state_ = ClientState::INITIALIZING; - // A running pipeline is needed inside setup. - try - { - pipeline_->init(max_num_tries, reconnection_time); - } - catch (const UrException& exc) - { - URCL_LOG_ERROR("Caught exception %s, while trying to initialize pipeline", exc.what()); - return false; - } - pipeline_->run(); + prod_->setupProducer(max_num_tries, reconnection_time); uint16_t protocol_version = negotiateProtocolVersion(); // Protocol version must be above zero @@ -193,7 +179,7 @@ uint16_t RTDEClient::negotiateProtocolVersion() while (num_retries < MAX_REQUEST_RETRIES) { std::unique_ptr package; - if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000))) + if (!prod_->tryGet(package)) { URCL_LOG_ERROR("failed to get package from RTDE interface"); return 0; @@ -243,7 +229,7 @@ bool RTDEClient::queryURControlVersion() std::unique_ptr package; while (num_retries < MAX_REQUEST_RETRIES) { - if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000))) + if (!prod_->tryGet(package)) { URCL_LOG_ERROR("No answer to urcontrol version query was received from robot"); return false; @@ -253,13 +239,14 @@ bool RTDEClient::queryURControlVersion() dynamic_cast(package.get())) { urcontrol_version_ = tmp_urcontrol_version->version_information_; + URCL_LOG_INFO("Received URControl version %s", urcontrol_version_.toString().c_str()); return true; } else { std::stringstream ss; ss << "Did not receive URControl version from robot. Message received instead: " << std::endl - << package->toString() << ". Retrying..."; + << package->toString() << "\n Retrying..."; num_retries++; URCL_LOG_WARN("%s", ss.str().c_str()); } @@ -296,13 +283,8 @@ void RTDEClient::resetOutputRecipe(const std::vector new_recipe) output_recipe_.assign(new_recipe.begin(), new_recipe.end()); - // Reset pipeline first otherwise we will segfault, if the producer object no longer exists, when destroying the - // pipeline - pipeline_.reset(); - parser_ = RTDEParser(output_recipe_); prod_ = std::make_unique>(stream_, parser_); - pipeline_ = std::make_unique>(*prod_, PIPELINE_NAME, notifier_, true); } bool RTDEClient::setupOutputs(const uint16_t protocol_version) @@ -338,7 +320,7 @@ bool RTDEClient::setupOutputs(const uint16_t protocol_version) } std::unique_ptr package; - if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000))) + if (!prod_->tryGet(package)) { URCL_LOG_ERROR("Did not receive confirmation on RTDE output recipe"); return false; @@ -432,7 +414,7 @@ bool RTDEClient::setupInputs() while (num_retries < MAX_REQUEST_RETRIES) { std::unique_ptr package; - if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000))) + if (!prod_->tryGet(package)) { URCL_LOG_ERROR("Did not receive confirmation on RTDE input recipe"); return false; @@ -490,7 +472,6 @@ void RTDEClient::disconnect() } if (client_state_ >= ClientState::INITIALIZING) { - pipeline_->stop(); } if (client_state_ > ClientState::UNINITIALIZED) { @@ -498,6 +479,8 @@ void RTDEClient::disconnect() writer_.stop(); } client_state_ = ClientState::UNINITIALIZED; + prod_->stopProducer(); + notifier_.stopped("RTDE communication stopped"); } bool RTDEClient::isRobotBooted() @@ -507,7 +490,9 @@ bool RTDEClient::isRobotBooted() if (!sendStart()) return false; - std::unique_ptr package; + std::unique_ptr package = std::make_unique(output_recipe_); + dynamic_cast(package.get())->initEmpty(); + double timestamp = 0; int reading_count = 0; // During bootup the RTDE interface gets restarted once. If we connect to the RTDE interface before that happens, we @@ -519,8 +504,7 @@ bool RTDEClient::isRobotBooted() while (timestamp < 40 && reading_count < target_frequency_ * 2) { // Set timeout based on target frequency, to make sure that reading doesn't timeout - int timeout = static_cast((1 / target_frequency_) * 1000) * 10; - if (pipeline_->getLatestProduct(package, std::chrono::milliseconds(timeout))) + if (prod_->tryGet(package)) { rtde_interface::DataPackage* tmp_input = dynamic_cast(package.get()); tmp_input->getData("timestamp", timestamp); @@ -539,7 +523,7 @@ bool RTDEClient::isRobotBooted() return true; } -bool RTDEClient::start() +bool RTDEClient::start(const bool read_packages_in_background) { if (client_state_ == ClientState::RUNNING) return true; @@ -550,11 +534,15 @@ bool RTDEClient::start() return false; } - pipeline_->run(); - if (sendStart()) { + prod_->startProducer(); + if (read_packages_in_background) + { + startBackgroundRead(); + } client_state_ = ClientState::RUNNING; + notifier_.started("RTDE communication started"); return true; } else @@ -573,6 +561,7 @@ bool RTDEClient::pause() return false; } + stopBackgroundRead(); if (sendPause()) { client_state_ = ClientState::PAUSED; @@ -600,7 +589,7 @@ bool RTDEClient::sendStart() unsigned int num_retries = 0; while (num_retries < MAX_REQUEST_RETRIES) { - if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000))) + if (!prod_->tryGet(package)) { URCL_LOG_ERROR("Could not get response to RTDE communication start request from robot"); return false; @@ -649,7 +638,7 @@ bool RTDEClient::sendPause() int seconds = 5; while (std::chrono::steady_clock::now() - start < std::chrono::seconds(seconds)) { - if (!pipeline_->getLatestProduct(package, std::chrono::milliseconds(1000))) + if (!prod_->tryGet(package)) { URCL_LOG_ERROR("Could not get response to RTDE communication pause request from robot"); return false; @@ -712,16 +701,27 @@ std::unique_ptr RTDEClient::getDataPackage(std::chr // Cannot get data packages while reconnecting as we could end up getting some of the configuration packages if (reconnect_mutex_.try_lock()) { - std::unique_ptr urpackage; - if (pipeline_->getLatestProduct(urpackage, timeout)) + if (background_read_running_) { - rtde_interface::DataPackage* tmp = dynamic_cast(urpackage.get()); - if (tmp != nullptr) + auto start = std::chrono::steady_clock::now(); + + do { - urpackage.release(); - reconnect_mutex_.unlock(); - return std::unique_ptr(tmp); - } + if (new_data_.load()) + { + std::lock_guard guard(read_mutex_); + reconnect_mutex_.unlock(); + DataPackage* temp = dynamic_cast(data_buffer0_.get()); + new_data_.store(false); + return std::make_unique(*temp); + } + + } while ((std::chrono::steady_clock::now() - start) <= timeout); + } + else + { + URCL_LOG_ERROR("Background reading is not running, cannot get data package. Please either start background " + "reading or use getDataPackageBlocking(...)."); } reconnect_mutex_.unlock(); } @@ -735,6 +735,28 @@ std::unique_ptr RTDEClient::getDataPackage(std::chr return std::unique_ptr(nullptr); } +bool RTDEClient::getDataPackageBlocking(std::unique_ptr& data_package) +{ + // Cannot get data packages while reconnecting as we could end up getting some of the configuration packages + if (reconnect_mutex_.try_lock()) + { + if (prod_->tryGet(data_package)) + { + reconnect_mutex_.unlock(); + return true; + } + reconnect_mutex_.unlock(); + } + else + { + URCL_LOG_WARN("Unable to get data package while reconnecting to the RTDE interface"); + auto period = std::chrono::duration(1.0 / target_frequency_); + std::this_thread::sleep_for(period); + } + + return false; +} + std::string RTDEClient::getIP() const { return stream_.getIP(); @@ -857,5 +879,59 @@ void RTDEClient::reconnectCallback() reconnecting_thread_ = std::thread(&RTDEClient::reconnect, this); } +void RTDEClient::startBackgroundRead() +{ + if (background_read_running_) + { + URCL_LOG_WARN("Requested to start RTDEClient's background read, while it is already running. Doing nothing."); + return; + } + background_read_running_ = true; + data_buffer0_ = std::make_unique(output_recipe_); + data_buffer1_ = std::make_unique(output_recipe_); + dynamic_cast(data_buffer0_.get())->initEmpty(); + dynamic_cast(data_buffer1_.get())->initEmpty(); + + background_read_thread_ = std::thread(&RTDEClient::backgroundReadThreadFunc, this); +} + +void RTDEClient::stopBackgroundRead() +{ + background_read_running_ = false; + if (background_read_thread_.joinable()) + { + background_read_thread_.join(); + } +} + +void RTDEClient::backgroundReadThreadFunc() +{ + while (background_read_running_) + { + if (prod_->tryGet(data_buffer1_)) + { + rtde_interface::DataPackage* data_pkg = dynamic_cast(data_buffer1_.get()); + if (data_pkg != nullptr) + { + { + std::scoped_lock lock(read_mutex_, write_mutex_); + std::swap(data_buffer0_, data_buffer1_); + } + + new_data_.store(true); + } + else if (data_buffer1_->getType() == PackageType::RTDE_TEXT_MESSAGE) + { + URCL_LOG_INFO(data_buffer1_->toString().c_str()); + } + } + else + { + auto period = std::chrono::duration(1.0 / target_frequency_); + std::this_thread::sleep_for(period); + } + } +} + } // namespace rtde_interface } // namespace urcl diff --git a/src/rtde/rtde_parser.cpp b/src/rtde/rtde_parser.cpp new file mode 100644 index 000000000..93e616ab1 --- /dev/null +++ b/src/rtde/rtde_parser.cpp @@ -0,0 +1,176 @@ +/* + * Copyright 2025, Universal Robots A/S (refactor) + * + * Copyright 2019, FZI Forschungszentrum Informatik (refactor) + * + * Copyright 2017, 2018 Simon Rasmussen (refactor) + * + * Copyright 2015, 2016 Thomas Timm Andersen (original version) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ur_client_library/rtde/rtde_parser.h" +#include "ur_client_library/rtde/package_header.h" +#include "ur_client_library/rtde/rtde_package.h" + +namespace urcl +{ +namespace rtde_interface +{ + +bool RTDEParser::parse(comm::BinParser& bp, std::vector>& results) +{ + PackageType type; + try + { + type = getPackageTypeFromHeader(bp); + } + catch (const UrException& e) + { + URCL_LOG_ERROR("Exception during RTDE package parsing: %s", e.what()); + return false; + } + + switch (type) + { + case PackageType::RTDE_DATA_PACKAGE: + { + std::unique_ptr package(new DataPackage(recipe_, protocol_version_)); + dynamic_cast(package.get())->initEmpty(); + + if (!package->parseWith(bp)) + { + URCL_LOG_ERROR("Package parsing of type %d failed!", static_cast(type)); + return false; + } + results.push_back(std::move(package)); + break; + } + default: + { + std::unique_ptr package(createNewPackageFromType(type)); + if (!package->parseWith(bp)) + { + URCL_LOG_ERROR("Package parsing of type %d failed!", static_cast(type)); + return false; + } + + results.push_back(std::move(package)); + break; + } + } + if (!bp.empty()) + { + URCL_LOG_ERROR("Package of type %d was not parsed completely!", static_cast(type)); + bp.debug(); + return false; + } + + return true; +} + +bool RTDEParser::parse(comm::BinParser& bp, std::unique_ptr& result) +{ + PackageType type = getPackageTypeFromHeader(bp); + + switch (type) + { + case PackageType::RTDE_DATA_PACKAGE: + { + if (result == nullptr || result->getType() != PackageType::RTDE_DATA_PACKAGE) + { + result = std::make_unique(recipe_, protocol_version_); + dynamic_cast(result.get())->initEmpty(); + URCL_LOG_WARN("The passed result pointer is empty or does not contain a DataPackage. A new DataPackage will " + "have to be allocated. Please pass a pre-allocated DataPackage if you expect a DataPackage " + "would be sent."); + } + + if (!dynamic_cast(result.get())->parseWith(bp)) + { + URCL_LOG_ERROR("Package parsing of type %d failed!", static_cast(type)); + return false; + } + break; + } + default: + { + if (result == nullptr || result->getType() != type) + { + result = std::unique_ptr(createNewPackageFromType(type)); + } + if (!result->parseWith(bp)) + { + URCL_LOG_ERROR("Package parsing of type %d failed!", static_cast(type)); + return false; + } + + break; + } + } + if (!bp.empty()) + { + URCL_LOG_ERROR("Package of type %d was not parsed completely!", static_cast(type)); + bp.debug(); + return false; + } + + return true; +} + +PackageType RTDEParser::getPackageTypeFromHeader(comm::BinParser& bp) const +{ + PackageHeader::_package_size_type size; + PackageType type; + bp.parse(size); + bp.parse(type); + + if (!bp.checkSize(size - sizeof(size) - sizeof(type))) + { + throw UrException("Buffer len shorter than expected packet length"); + } + return type; +} + +RTDEPackage* RTDEParser::createNewPackageFromType(PackageType type) const +{ + switch (type) + { + case PackageType::RTDE_TEXT_MESSAGE: + return new TextMessage(protocol_version_); + break; + case PackageType::RTDE_GET_URCONTROL_VERSION: + return new GetUrcontrolVersion; + break; + case PackageType::RTDE_REQUEST_PROTOCOL_VERSION: + return new RequestProtocolVersion; + break; + case PackageType::RTDE_CONTROL_PACKAGE_PAUSE: + return new ControlPackagePause; + break; + case PackageType::RTDE_CONTROL_PACKAGE_SETUP_INPUTS: + return new ControlPackageSetupInputs; + break; + case PackageType::RTDE_CONTROL_PACKAGE_SETUP_OUTPUTS: + return new ControlPackageSetupOutputs(protocol_version_); + break; + case PackageType::RTDE_CONTROL_PACKAGE_START: + return new ControlPackageStart; + break; + default: + return new RTDEPackage(type); + } +} + +} // namespace rtde_interface +} // namespace urcl diff --git a/src/ur/ur_driver.cpp b/src/ur/ur_driver.cpp index 3f5271f52..9a8a150ad 100644 --- a/src/ur/ur_driver.cpp +++ b/src/ur/ur_driver.cpp @@ -198,6 +198,11 @@ std::unique_ptr urcl::UrDriver::getDataPackage() return rtde_client_->getDataPackage(timeout); } +bool UrDriver::getDataPackageBlocking(std::unique_ptr& data_package) +{ + return rtde_client_->getDataPackageBlocking(data_package); +} + bool UrDriver::writeJointCommand(const vector6d_t& values, const comm::ControlMode control_mode, const RobotReceiveTimeout& robot_receive_timeout) { @@ -580,9 +585,9 @@ bool UrDriver::writeKeepalive(const RobotReceiveTimeout& robot_receive_timeout) return reverse_interface_->write(fake, comm::ControlMode::MODE_IDLE, robot_receive_timeout); } -void UrDriver::startRTDECommunication() +void UrDriver::startRTDECommunication(const bool read_packages_in_background) { - rtde_client_->start(); + rtde_client_->start(read_packages_in_background); } bool UrDriver::stopControl() From 616ffd83eedb234eccca14c081c3a64f950f602c Mon Sep 17 00:00:00 2001 From: Felix Exner Date: Thu, 11 Dec 2025 13:15:48 +0100 Subject: [PATCH 02/11] Use a vector instead of unordered map for data storage This should be a lot faster in accessing/storing objects in the data package --- include/ur_client_library/rtde/data_package.h | 26 ++++++++++++++----- src/rtde/data_package.cpp | 22 +++++----------- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/include/ur_client_library/rtde/data_package.h b/include/ur_client_library/rtde/data_package.h index ce2b991d2..a052d010c 100644 --- a/include/ur_client_library/rtde/data_package.h +++ b/include/ur_client_library/rtde/data_package.h @@ -144,9 +144,13 @@ class DataPackage : public RTDEPackage template bool getData(const std::string& name, T& val) const { - if (data_.find(name) != data_.end()) + const auto it = + std::find_if(data_.begin(), data_.end(), [&name](const std::pair& element) { + return element.first == name; + }); + if (it != data_.end()) { - val = std::get(data_.at(name)); + val = std::get(it->second); } else { @@ -170,9 +174,13 @@ class DataPackage : public RTDEPackage { static_assert(sizeof(T) * 8 >= N, "Bitset is too large for underlying variable"); - if (data_.find(name) != data_.end()) + const auto it = + std::find_if(data_.begin(), data_.end(), [&name](const std::pair& element) { + return element.first == name; + }); + if (it != data_.end()) { - val = std::bitset(std::get(data_.at(name))); + val = std::bitset(std::get(it->second)); } else { @@ -194,9 +202,13 @@ class DataPackage : public RTDEPackage template bool setData(const std::string& name, T& val) { - if (data_.find(name) != data_.end()) + const auto it = + std::find_if(data_.begin(), data_.end(), [&name](const std::pair& element) { + return element.first == name; + }); + if (it != data_.end()) { - data_.at(name) = val; + it->second = val; } else { @@ -219,7 +231,7 @@ class DataPackage : public RTDEPackage // Const would be better here static std::unordered_map g_type_list; uint8_t recipe_id_; - std::unordered_map data_; + std::vector> data_; std::vector recipe_; uint16_t protocol_version_; }; diff --git a/src/rtde/data_package.cpp b/src/rtde/data_package.cpp index 6581e8cc2..205744e98 100644 --- a/src/rtde/data_package.cpp +++ b/src/rtde/data_package.cpp @@ -460,12 +460,14 @@ std::unordered_map DataPackage::g_ void rtde_interface::DataPackage::initEmpty() { + data_.clear(); + data_.reserve(recipe_.size()); for (auto& item : recipe_) { if (g_type_list.find(item) != g_type_list.end()) { _rtde_type_variant entry = g_type_list[item]; - data_[item] = entry; + data_.push_back({ item, entry }); } } } @@ -476,19 +478,9 @@ bool rtde_interface::DataPackage::parseWith(comm::BinParser& bp) { bp.parse(recipe_id_); } - for (auto& item : recipe_) + for (size_t i = 0; i < recipe_.size(); ++i) { - if (g_type_list.find(item) != g_type_list.end()) - { - // _rtde_type_variant entry = g_type_list[item]; - // _rtde_type_variant existing_entry = data_[item]; - - std::visit([&bp](auto&& arg) { bp.parse(arg); }, data_[item]); - } - else - { - return false; - } + std::visit([&bp](auto&& arg) { bp.parse(arg); }, data_[i].second); } return true; } @@ -523,11 +515,11 @@ size_t rtde_interface::DataPackage::serializePackage(uint8_t* buffer) size_t size = 0; size += PackageHeader::serializeHeader(buffer, PackageType::RTDE_DATA_PACKAGE, payload_size); size += comm::PackageSerializer::serialize(buffer + size, recipe_id_); - for (auto& item : recipe_) + for (size_t i = 0; i < data_.size(); ++i) { size += std::visit( [&buffer, &size](auto&& arg) -> size_t { return comm::PackageSerializer::serialize(buffer + size, arg); }, - data_[item]); + data_[i].second); } return size; From 5ab6731ad0c669bbbe99327854bc6c2700c6e8fa Mon Sep 17 00:00:00 2001 From: Felix Exner Date: Thu, 18 Dec 2025 11:49:59 +0100 Subject: [PATCH 03/11] Remove double package initialization --- include/ur_client_library/rtde/data_package.h | 1 + src/rtde/rtde_client.cpp | 3 --- src/rtde/rtde_parser.cpp | 2 -- src/rtde/rtde_writer.cpp | 4 ++-- tests/test_rtde_data_package.cpp | 6 ------ 5 files changed, 3 insertions(+), 13 deletions(-) diff --git a/include/ur_client_library/rtde/data_package.h b/include/ur_client_library/rtde/data_package.h index a052d010c..471c11613 100644 --- a/include/ur_client_library/rtde/data_package.h +++ b/include/ur_client_library/rtde/data_package.h @@ -97,6 +97,7 @@ class DataPackage : public RTDEPackage DataPackage(const std::vector& recipe, const uint16_t& protocol_version = 2) : RTDEPackage(PackageType::RTDE_DATA_PACKAGE), recipe_(recipe), protocol_version_(protocol_version) { + initEmpty(); } virtual ~DataPackage() = default; diff --git a/src/rtde/rtde_client.cpp b/src/rtde/rtde_client.cpp index ee399616a..6a4230fb2 100644 --- a/src/rtde/rtde_client.cpp +++ b/src/rtde/rtde_client.cpp @@ -491,7 +491,6 @@ bool RTDEClient::isRobotBooted() return false; std::unique_ptr package = std::make_unique(output_recipe_); - dynamic_cast(package.get())->initEmpty(); double timestamp = 0; int reading_count = 0; @@ -889,8 +888,6 @@ void RTDEClient::startBackgroundRead() background_read_running_ = true; data_buffer0_ = std::make_unique(output_recipe_); data_buffer1_ = std::make_unique(output_recipe_); - dynamic_cast(data_buffer0_.get())->initEmpty(); - dynamic_cast(data_buffer1_.get())->initEmpty(); background_read_thread_ = std::thread(&RTDEClient::backgroundReadThreadFunc, this); } diff --git a/src/rtde/rtde_parser.cpp b/src/rtde/rtde_parser.cpp index 93e616ab1..5a6257075 100644 --- a/src/rtde/rtde_parser.cpp +++ b/src/rtde/rtde_parser.cpp @@ -46,7 +46,6 @@ bool RTDEParser::parse(comm::BinParser& bp, std::vector package(new DataPackage(recipe_, protocol_version_)); - dynamic_cast(package.get())->initEmpty(); if (!package->parseWith(bp)) { @@ -90,7 +89,6 @@ bool RTDEParser::parse(comm::BinParser& bp, std::unique_ptr& result if (result == nullptr || result->getType() != PackageType::RTDE_DATA_PACKAGE) { result = std::make_unique(recipe_, protocol_version_); - dynamic_cast(result.get())->initEmpty(); URCL_LOG_WARN("The passed result pointer is empty or does not contain a DataPackage. A new DataPackage will " "have to be allocated. Please pass a pre-allocated DataPackage if you expect a DataPackage " "would be sent."); diff --git a/src/rtde/rtde_writer.cpp b/src/rtde/rtde_writer.cpp index 2df149505..95cd49d4b 100644 --- a/src/rtde/rtde_writer.cpp +++ b/src/rtde/rtde_writer.cpp @@ -33,8 +33,9 @@ namespace urcl namespace rtde_interface { RTDEWriter::RTDEWriter(comm::URStream* stream, const std::vector& recipe) - : stream_(stream), recipe_(recipe), recipe_id_(0), queue_{ 32 }, running_(false), package_(recipe_) + : stream_(stream), recipe_id_(0), queue_{ 32 }, running_(false), package_(recipe_) { + setInputRecipe(recipe); } void RTDEWriter::setInputRecipe(const std::vector& recipe) @@ -52,7 +53,6 @@ void RTDEWriter::init(uint8_t recipe_id) return; } recipe_id_ = recipe_id; - package_.initEmpty(); running_ = true; writer_thread_ = std::thread(&RTDEWriter::run, this); } diff --git a/tests/test_rtde_data_package.cpp b/tests/test_rtde_data_package.cpp index e1c34298e..0ccec48fc 100644 --- a/tests/test_rtde_data_package.cpp +++ b/tests/test_rtde_data_package.cpp @@ -36,7 +36,6 @@ TEST(rtde_data_package, serialize_pkg) { std::vector recipe{ "speed_slider_mask" }; rtde_interface::DataPackage package(recipe); - package.initEmpty(); uint32_t value = 1; package.setData("speed_slider_mask", value); @@ -59,7 +58,6 @@ TEST(rtde_data_package, parse_pkg_protocolv2) { std::vector recipe{ "timestamp", "actual_q" }; rtde_interface::DataPackage package(recipe); - package.initEmpty(); uint8_t data_package[] = { 0x01, 0x40, 0xd0, 0x75, 0x8c, 0x49, 0xba, 0x5e, 0x35, 0xbf, 0xf9, 0x9c, 0x77, 0xd1, 0x10, 0xb4, 0x60, 0xbf, 0xfb, 0xa2, 0x33, 0xd1, 0x10, 0xb4, 0x60, 0xc0, 0x01, 0x9f, 0xbe, 0x68, @@ -93,7 +91,6 @@ TEST(rtde_data_package, parse_pkg_protocolv1) { std::vector recipe{ "timestamp", "actual_q" }; rtde_interface::DataPackage package(recipe, 1); - package.initEmpty(); uint8_t data_package[] = { 0x40, 0xd0, 0x75, 0x8c, 0x49, 0xba, 0x5e, 0x35, 0xbf, 0xf9, 0x9c, 0x77, 0xd1, 0x10, 0xb4, 0x60, 0xbf, 0xfb, 0xa2, 0x33, 0xd1, 0x10, 0xb4, 0x60, 0xc0, 0x01, 0x9f, 0xbe, @@ -126,7 +123,6 @@ TEST(rtde_data_package, get_data_not_part_of_recipe) { std::vector recipe{ "timestamp", "actual_q" }; rtde_interface::DataPackage package(recipe); - package.initEmpty(); uint32_t speed_slider_mask; EXPECT_FALSE(package.getData("speed_slider_mask", speed_slider_mask)); @@ -136,7 +132,6 @@ TEST(rtde_data_package, set_data_not_part_of_recipe) { std::vector recipe{ "timestamp", "actual_q" }; rtde_interface::DataPackage package(recipe); - package.initEmpty(); uint32_t speed_slider_mask = 1; EXPECT_FALSE(package.setData("speed_slider_mask", speed_slider_mask)); @@ -146,7 +141,6 @@ TEST(rtde_data_package, parse_and_get_bitset_data) { std::vector recipe{ "robot_status_bits" }; rtde_interface::DataPackage package(recipe); - package.initEmpty(); uint8_t data_package[] = { 0x01, 0x00, 0x00, 0x00, 0x00, 0x40, 0xb2, 0x3d, 0xa9, 0xfb, 0xe7, 0x6c, 0x8b }; comm::BinParser bp(data_package, sizeof(data_package)); From 2ac36b561d2cb0dacfeff20b8c7ec208f250edd9 Mon Sep 17 00:00:00 2001 From: Felix Exner Date: Fri, 16 Jan 2026 20:39:14 +0100 Subject: [PATCH 04/11] Avoid memory allocations in RTDEWriter Before that change we were basically allocating strings and a new DataPackage on each send operation. This change will preallocate a data package and re-use that for future send operations. The necessary RTDE dictionary keys are also preallocated. --- include/ur_client_library/rtde/rtde_writer.h | 21 +- src/rtde/rtde_writer.cpp | 245 ++++++++++--------- 2 files changed, 145 insertions(+), 121 deletions(-) diff --git a/include/ur_client_library/rtde/rtde_writer.h b/include/ur_client_library/rtde/rtde_writer.h index d5af8e5df..24cec82da 100644 --- a/include/ur_client_library/rtde/rtde_writer.h +++ b/include/ur_client_library/rtde/rtde_writer.h @@ -33,8 +33,8 @@ #include "ur_client_library/rtde/rtde_package.h" #include "ur_client_library/rtde/data_package.h" #include "ur_client_library/comm/stream.h" -#include "ur_client_library/queue/readerwriterqueue.h" #include "ur_client_library/ur/datatypes.h" +#include #include #include @@ -181,15 +181,26 @@ class RTDEWriter bool sendExternalForceTorque(const vector6d_t& external_force_torque); private: + void resetMasks(const std::shared_ptr& buffer); + uint8_t pinToMask(uint8_t pin); comm::URStream* stream_; std::vector recipe_; uint8_t recipe_id_; - moodycamel::BlockingReaderWriterQueue> queue_; + std::shared_ptr data_buffer0_; + std::shared_ptr data_buffer1_; + std::shared_ptr current_store_buffer_; + std::shared_ptr current_send_buffer_; + std::vector used_masks_; std::thread writer_thread_; - bool running_; - DataPackage package_; - std::mutex package_mutex_; + std::atomic running_; + std::mutex store_mutex_; + std::atomic new_data_available_; + std::condition_variable data_available_cv_; + + static std::vector g_preallocated_input_bit_register_keys; + static std::vector g_preallocated_input_int_register_keys; + static std::vector g_preallocated_input_double_register_keys; }; } // namespace rtde_interface diff --git a/src/rtde/rtde_writer.cpp b/src/rtde/rtde_writer.cpp index 95cd49d4b..7277f117d 100644 --- a/src/rtde/rtde_writer.cpp +++ b/src/rtde/rtde_writer.cpp @@ -27,32 +27,79 @@ //---------------------------------------------------------------------- #include "ur_client_library/rtde/rtde_writer.h" +#include +#include "ur_client_library/log.h" namespace urcl { namespace rtde_interface { +std::vector RTDEWriter::g_preallocated_input_bit_register_keys; +std::vector RTDEWriter::g_preallocated_input_int_register_keys; +std::vector RTDEWriter::g_preallocated_input_double_register_keys; + RTDEWriter::RTDEWriter(comm::URStream* stream, const std::vector& recipe) - : stream_(stream), recipe_id_(0), queue_{ 32 }, running_(false), package_(recipe_) + : stream_(stream), recipe_id_(0), running_(false) { setInputRecipe(recipe); + + g_preallocated_input_bit_register_keys.resize(128); + for (size_t i = 0; i < 128; ++i) + { + g_preallocated_input_bit_register_keys[i] = "input_bit_register_" + std::to_string(i); + } + + g_preallocated_input_int_register_keys.resize(48); + for (size_t i = 0; i < 48; ++i) + { + g_preallocated_input_int_register_keys[i] = "input_int_register_" + std::to_string(i); + } + + g_preallocated_input_double_register_keys.resize(48); + for (size_t i = 0; i < 48; ++i) + { + g_preallocated_input_double_register_keys[i] = "input_double_register_" + std::to_string(i); + } + + for (const auto& field : recipe) + { + if (field.size() >= 5 && field.substr(field.size() - 5) == "_mask") + { + used_masks_.push_back(field); + } + } } void RTDEWriter::setInputRecipe(const std::vector& recipe) { - std::lock_guard guard(package_mutex_); + if (running_) + { + throw UrException("Requesting to change the input recipe while the RTDEWriter is running. The writer has to be " + "stopped before setting the recipe."); + } + std::lock_guard lock_guard(store_mutex_); recipe_ = recipe; - package_ = DataPackage(recipe_); - package_.initEmpty(); + data_buffer0_ = std::make_shared(recipe_); + data_buffer1_ = std::make_shared(recipe_); + + current_store_buffer_ = data_buffer0_; + current_send_buffer_ = data_buffer1_; } void RTDEWriter::init(uint8_t recipe_id) { if (running_) { - return; + throw UrException("Requesting to init a RTDEWriter while it is running. The writer has to be " + "stopped before initializing it."); + } + { + std::lock_guard lock_guard(store_mutex_); + data_buffer0_->setRecipeID(recipe_id); + data_buffer1_->setRecipeID(recipe_id); } recipe_id_ = recipe_id; + new_data_available_ = false; running_ = true; writer_thread_ = std::thread(&RTDEWriter::run, this); } @@ -62,14 +109,21 @@ void RTDEWriter::run() uint8_t buffer[4096]; size_t size; size_t written; - std::unique_ptr package; while (running_) { - if (queue_.waitDequeTimed(package, 1000000)) + std::unique_lock lock(store_mutex_); + auto res = data_available_cv_.wait_for(lock, std::chrono::milliseconds(1000)); + if (res != std::cv_status::timeout) { - package->setRecipeID(recipe_id_); - size = package->serializePackage(buffer); - stream_->write(buffer, size, written); + if (new_data_available_.load() == true && running_.load()) + { + std::swap(current_store_buffer_, current_send_buffer_); + new_data_available_ = false; + lock.unlock(); + size = current_send_buffer_->serializePackage(buffer); + stream_->write(buffer, size, written); + resetMasks(current_send_buffer_); + } } } URCL_LOG_DEBUG("Write thread ended."); @@ -95,21 +149,16 @@ bool RTDEWriter::sendSpeedSlider(double speed_slider_fraction) return false; } - std::lock_guard guard(package_mutex_); + static const std::string key = "speed_slider_fraction"; + static const std::string mask_key = "speed_slider_mask"; + std::lock_guard guard(store_mutex_); uint32_t mask = 1; bool success = true; - success = package_.setData("speed_slider_mask", mask); - success = success && package_.setData("speed_slider_fraction", speed_slider_fraction); + success = current_store_buffer_->setData(mask_key, mask); + success = success && current_store_buffer_->setData(key, speed_slider_fraction); + new_data_available_ = true; + data_available_cv_.notify_one(); - if (success) - { - if (!queue_.tryEnqueue(std::unique_ptr(new DataPackage(package_)))) - { - return false; - } - } - mask = 0; - success = package_.setData("speed_slider_mask", mask); return success; } @@ -123,7 +172,9 @@ bool RTDEWriter::sendStandardDigitalOutput(uint8_t output_pin, bool value) return false; } - std::lock_guard guard(package_mutex_); + static const std::string key_mask = "standard_digital_output_mask"; + static const std::string key_output = "standard_digital_output"; + std::lock_guard guard(store_mutex_); uint8_t mask = pinToMask(output_pin); bool success = true; uint8_t digital_output; @@ -135,18 +186,11 @@ bool RTDEWriter::sendStandardDigitalOutput(uint8_t output_pin, bool value) { digital_output = 0; } - success = package_.setData("standard_digital_output_mask", mask); - success = success && package_.setData("standard_digital_output", digital_output); + success = current_store_buffer_->setData(key_mask, mask); + success = success && current_store_buffer_->setData(key_output, digital_output); + new_data_available_ = true; + data_available_cv_.notify_one(); - if (success) - { - if (!queue_.tryEnqueue(std::unique_ptr(new DataPackage(package_)))) - { - return false; - } - } - mask = 0; - success = package_.setData("standard_digital_output_mask", mask); return success; } @@ -161,7 +205,10 @@ bool RTDEWriter::sendConfigurableDigitalOutput(uint8_t output_pin, bool value) return false; } - std::lock_guard guard(package_mutex_); + URCL_LOG_INFO("Write thread started."); + static const std::string key_mask = "configurable_digital_output_mask"; + static const std::string key_output = "configurable_digital_output"; + std::lock_guard guard(store_mutex_); uint8_t mask = pinToMask(output_pin); bool success = true; uint8_t digital_output; @@ -173,18 +220,11 @@ bool RTDEWriter::sendConfigurableDigitalOutput(uint8_t output_pin, bool value) { digital_output = 0; } - success = package_.setData("configurable_digital_output_mask", mask); - success = success && package_.setData("configurable_digital_output", digital_output); + success = current_store_buffer_->setData(key_mask, mask); + success = success && current_store_buffer_->setData(key_output, digital_output); + new_data_available_ = true; + data_available_cv_.notify_one(); - if (success) - { - if (!queue_.tryEnqueue(std::unique_ptr(new DataPackage(package_)))) - { - return false; - } - } - mask = 0; - success = package_.setData("configurable_digital_output_mask", mask); return success; } @@ -198,7 +238,9 @@ bool RTDEWriter::sendToolDigitalOutput(uint8_t output_pin, bool value) return false; } - std::lock_guard guard(package_mutex_); + static const std::string key_mask = "tool_digital_output_mask"; + static const std::string key_output = "tool_digital_output"; + std::lock_guard guard(store_mutex_); uint8_t mask = pinToMask(output_pin); bool success = true; uint8_t digital_output; @@ -210,18 +252,11 @@ bool RTDEWriter::sendToolDigitalOutput(uint8_t output_pin, bool value) { digital_output = 0; } - success = package_.setData("tool_digital_output_mask", mask); - success = success && package_.setData("tool_digital_output", digital_output); + success = current_store_buffer_->setData(key_mask, mask); + success = success && current_store_buffer_->setData(key_output, digital_output); + new_data_available_ = true; + data_available_cv_.notify_one(); - if (success) - { - if (!queue_.tryEnqueue(std::unique_ptr(new DataPackage(package_)))) - { - return false; - } - } - mask = 0; - success = package_.setData("tool_digital_output_mask", mask); return success; } @@ -242,29 +277,26 @@ bool RTDEWriter::sendStandardAnalogOutput(uint8_t output_pin, double value, cons return false; } - std::lock_guard guard(package_mutex_); + std::lock_guard guard(store_mutex_); uint8_t mask = pinToMask(output_pin); bool success = true; - success = package_.setData("standard_analog_output_mask", mask); + static const std::string key_mask = "standard_analog_output_mask"; + success = current_store_buffer_->setData(key_mask, mask); if (type != AnalogOutputType::SET_ON_TEACH_PENDANT) { + static const std::string key_type = "standard_analog_output_type"; auto output_type_bits = [](const uint8_t pin, const uint8_t type) { return type << pin; }; uint8_t output_type = output_type_bits(output_pin, toUnderlying(type)); - success = success && package_.setData("standard_analog_output_type", output_type); + success = success && current_store_buffer_->setData(key_type, output_type); } - success = success && package_.setData("standard_analog_output_0", value); - success = success && package_.setData("standard_analog_output_1", value); + static const std::string key_output_0 = "standard_analog_output_0"; + success = success && current_store_buffer_->setData(key_output_0, value); + static const std::string key_output_1 = "standard_analog_output_1"; + success = success && current_store_buffer_->setData(key_output_1, value); + new_data_available_ = true; + data_available_cv_.notify_one(); - if (success) - { - if (!queue_.tryEnqueue(std::unique_ptr(new DataPackage(package_)))) - { - return false; - } - } - mask = 0; - success = package_.setData("standard_analog_output_mask", mask); return success; } @@ -288,19 +320,11 @@ bool RTDEWriter::sendInputBitRegister(uint32_t register_id, bool value) return false; } - std::lock_guard guard(package_mutex_); - std::stringstream ss; - ss << "input_bit_register_" << register_id; - - bool success = package_.setData(ss.str(), value); + std::lock_guard guard(store_mutex_); + bool success = current_store_buffer_->setData(g_preallocated_input_bit_register_keys[register_id], value); + new_data_available_ = true; + data_available_cv_.notify_one(); - if (success) - { - if (!queue_.tryEnqueue(std::unique_ptr(new DataPackage(package_)))) - { - return false; - } - } return success; } @@ -314,19 +338,11 @@ bool RTDEWriter::sendInputIntRegister(uint32_t register_id, int32_t value) return false; } - std::lock_guard guard(package_mutex_); - std::stringstream ss; - ss << "input_int_register_" << register_id; + std::lock_guard guard(store_mutex_); + bool success = current_store_buffer_->setData(g_preallocated_input_int_register_keys[register_id], value); + new_data_available_ = true; + data_available_cv_.notify_one(); - bool success = package_.setData(ss.str(), value); - - if (success) - { - if (!queue_.tryEnqueue(std::unique_ptr(new DataPackage(package_)))) - { - return false; - } - } return success; } @@ -340,34 +356,31 @@ bool RTDEWriter::sendInputDoubleRegister(uint32_t register_id, double value) return false; } - std::lock_guard guard(package_mutex_); - std::stringstream ss; - ss << "input_double_register_" << register_id; - - bool success = package_.setData(ss.str(), value); + std::lock_guard guard(store_mutex_); + bool success = current_store_buffer_->setData(g_preallocated_input_double_register_keys[register_id], value); + new_data_available_ = true; + data_available_cv_.notify_one(); - if (success) - { - if (!queue_.tryEnqueue(std::unique_ptr(new DataPackage(package_)))) - { - return false; - } - } return success; } bool RTDEWriter::sendExternalForceTorque(const vector6d_t& external_force_torque) { - std::lock_guard guard(package_mutex_); - bool success = package_.setData("external_force_torque", external_force_torque); - if (success) + static const std::string key = "external_force_torque"; + std::lock_guard guard(store_mutex_); + bool success = current_store_buffer_->setData(key, external_force_torque); + new_data_available_ = true; + data_available_cv_.notify_one(); + return success; +} + +void RTDEWriter::resetMasks(const std::shared_ptr& buffer) +{ + for (const auto& mask_name : used_masks_) { - if (!queue_.tryEnqueue(std::unique_ptr(new DataPackage(package_)))) - { - return false; - } + uint8_t mask = 0; + buffer->setData(mask_name, mask); } - return success; } } // namespace rtde_interface From 3287341900d6dfa11966a5d70b81cfdfb5d1a8e3 Mon Sep 17 00:00:00 2001 From: Felix Exner Date: Mon, 19 Jan 2026 10:36:15 +0100 Subject: [PATCH 05/11] Use synchronous RTDE client in example --- examples/rtde_client.cpp | 16 ++++++++++------ src/rtde/rtde_writer.cpp | 15 +++++++++++---- tests/test_rtde_writer.cpp | 2 -- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/examples/rtde_client.cpp b/examples/rtde_client.cpp index 0972aafc7..9f3b6a380 100644 --- a/examples/rtde_client.cpp +++ b/examples/rtde_client.cpp @@ -39,7 +39,6 @@ using namespace urcl; const std::string DEFAULT_ROBOT_IP = "192.168.56.101"; const std::string OUTPUT_RECIPE = "examples/resources/rtde_output_recipe.txt"; const std::string INPUT_RECIPE = "examples/resources/rtde_input_recipe.txt"; -const std::chrono::milliseconds READ_TIMEOUT{ 100 }; void printFraction(const double fraction, const std::string& label, const size_t width = 20) { @@ -84,7 +83,10 @@ int main(int argc, char* argv[]) // Once RTDE communication is started, we have to make sure to read from the interface buffer, as // otherwise we will get pipeline overflows. Therefor, do this directly before starting your main // loop. - my_client.start(); + my_client.start(false); + const std::string key = "target_speed_fraction"; + std::unique_ptr data_pkg = + std::make_unique(my_client.getOutputRecipe()); auto start_time = std::chrono::steady_clock::now(); while (second_to_run <= 0 || @@ -96,13 +98,13 @@ int main(int argc, char* argv[]) // communication doesn't work in which case the user will be notified. // In a real-world application this thread should be scheduled with real-time priority in order // to ensure that this is called in time. - std::unique_ptr data_pkg = my_client.getDataPackage(READ_TIMEOUT); - if (data_pkg) + bool success = my_client.getDataPackageBlocking(data_pkg); + if (success) { // Data fields in the data package are accessed by their name. Only names present in the // output recipe can be accessed. Otherwise this function will return false. - data_pkg->getData("target_speed_fraction", target_speed_fraction); - printFraction(target_speed_fraction, "target_speed_fraction"); + dynamic_cast(data_pkg.get())->getData(key, target_speed_fraction); + printFraction(target_speed_fraction, key); } else { @@ -139,5 +141,7 @@ int main(int argc, char* argv[]) // Resetting the speedslider back to 100% my_client.getWriter().sendSpeedSlider(1); + URCL_LOG_INFO("Exiting RTDE read/write example."); + return 0; } diff --git a/src/rtde/rtde_writer.cpp b/src/rtde/rtde_writer.cpp index 7277f117d..4de13284b 100644 --- a/src/rtde/rtde_writer.cpp +++ b/src/rtde/rtde_writer.cpp @@ -112,10 +112,9 @@ void RTDEWriter::run() while (running_) { std::unique_lock lock(store_mutex_); - auto res = data_available_cv_.wait_for(lock, std::chrono::milliseconds(1000)); - if (res != std::cv_status::timeout) + data_available_cv_.wait(lock, [this] { return new_data_available_.load() || !running_.load(); }); { - if (new_data_available_.load() == true && running_.load()) + if (new_data_available_.load() && running_.load()) { std::swap(current_store_buffer_, current_send_buffer_); new_data_available_ = false; @@ -124,6 +123,10 @@ void RTDEWriter::run() stream_->write(buffer, size, written); resetMasks(current_send_buffer_); } + else + { + lock.unlock(); + } } } URCL_LOG_DEBUG("Write thread ended."); @@ -131,7 +134,11 @@ void RTDEWriter::run() void RTDEWriter::stop() { - running_ = false; + { + std::lock_guard lock(store_mutex_); + running_ = false; + data_available_cv_.notify_one(); + } if (writer_thread_.joinable()) { writer_thread_.join(); diff --git a/tests/test_rtde_writer.cpp b/tests/test_rtde_writer.cpp index e0b7e76e6..cad7affe1 100644 --- a/tests/test_rtde_writer.cpp +++ b/tests/test_rtde_writer.cpp @@ -262,8 +262,6 @@ TEST_F(RTDEWriterTest, send_tool_digital_output) TEST_F(RTDEWriterTest, send_standard_analog_output_unknown_domain) { - waitForMessageCallback(1000); - uint8_t expected_standard_analog_output_mask = 1; uint8_t pin = 0; From 2071a75ae0d4293111bfd13d596c86202daa51ba Mon Sep 17 00:00:00 2001 From: Felix Exner Date: Thu, 22 Jan 2026 07:46:04 +0100 Subject: [PATCH 06/11] Add method to send full DataPackage at once --- include/ur_client_library/rtde/rtde_writer.h | 13 +++++++ src/rtde/rtde_writer.cpp | 40 +++++++++++--------- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/include/ur_client_library/rtde/rtde_writer.h b/include/ur_client_library/rtde/rtde_writer.h index 24cec82da..950e116fd 100644 --- a/include/ur_client_library/rtde/rtde_writer.h +++ b/include/ur_client_library/rtde/rtde_writer.h @@ -89,6 +89,18 @@ class RTDEWriter */ void stop(); + /*! + * \brief Sends a complete RTDEPackage to the robot. + * + * Use this if multiple values need to be sent at once. When using the other provided functions, + * an RTDE data package will be sent each time. + * + * \param package The package to send + * + * \returns Success of the package creation + */ + bool sendPackage(const DataPackage& package); + /*! * \brief Creates a package to request setting a new value for the speed slider. * @@ -182,6 +194,7 @@ class RTDEWriter private: void resetMasks(const std::shared_ptr& buffer); + void markStorageToBeSent(); uint8_t pinToMask(uint8_t pin); comm::URStream* stream_; diff --git a/src/rtde/rtde_writer.cpp b/src/rtde/rtde_writer.cpp index 4de13284b..7306f8025 100644 --- a/src/rtde/rtde_writer.cpp +++ b/src/rtde/rtde_writer.cpp @@ -145,6 +145,13 @@ void RTDEWriter::stop() } } +bool RTDEWriter::sendPackage(const DataPackage& package) +{ + *current_store_buffer_ = package; + markStorageToBeSent(); + return true; +} + bool RTDEWriter::sendSpeedSlider(double speed_slider_fraction) { if (speed_slider_fraction > 1.0 || speed_slider_fraction < 0.0) @@ -163,8 +170,7 @@ bool RTDEWriter::sendSpeedSlider(double speed_slider_fraction) bool success = true; success = current_store_buffer_->setData(mask_key, mask); success = success && current_store_buffer_->setData(key, speed_slider_fraction); - new_data_available_ = true; - data_available_cv_.notify_one(); + markStorageToBeSent(); return success; } @@ -195,8 +201,7 @@ bool RTDEWriter::sendStandardDigitalOutput(uint8_t output_pin, bool value) } success = current_store_buffer_->setData(key_mask, mask); success = success && current_store_buffer_->setData(key_output, digital_output); - new_data_available_ = true; - data_available_cv_.notify_one(); + markStorageToBeSent(); return success; } @@ -229,8 +234,7 @@ bool RTDEWriter::sendConfigurableDigitalOutput(uint8_t output_pin, bool value) } success = current_store_buffer_->setData(key_mask, mask); success = success && current_store_buffer_->setData(key_output, digital_output); - new_data_available_ = true; - data_available_cv_.notify_one(); + markStorageToBeSent(); return success; } @@ -261,8 +265,7 @@ bool RTDEWriter::sendToolDigitalOutput(uint8_t output_pin, bool value) } success = current_store_buffer_->setData(key_mask, mask); success = success && current_store_buffer_->setData(key_output, digital_output); - new_data_available_ = true; - data_available_cv_.notify_one(); + markStorageToBeSent(); return success; } @@ -301,8 +304,7 @@ bool RTDEWriter::sendStandardAnalogOutput(uint8_t output_pin, double value, cons success = success && current_store_buffer_->setData(key_output_0, value); static const std::string key_output_1 = "standard_analog_output_1"; success = success && current_store_buffer_->setData(key_output_1, value); - new_data_available_ = true; - data_available_cv_.notify_one(); + markStorageToBeSent(); return success; } @@ -329,8 +331,7 @@ bool RTDEWriter::sendInputBitRegister(uint32_t register_id, bool value) std::lock_guard guard(store_mutex_); bool success = current_store_buffer_->setData(g_preallocated_input_bit_register_keys[register_id], value); - new_data_available_ = true; - data_available_cv_.notify_one(); + markStorageToBeSent(); return success; } @@ -347,8 +348,7 @@ bool RTDEWriter::sendInputIntRegister(uint32_t register_id, int32_t value) std::lock_guard guard(store_mutex_); bool success = current_store_buffer_->setData(g_preallocated_input_int_register_keys[register_id], value); - new_data_available_ = true; - data_available_cv_.notify_one(); + markStorageToBeSent(); return success; } @@ -365,8 +365,7 @@ bool RTDEWriter::sendInputDoubleRegister(uint32_t register_id, double value) std::lock_guard guard(store_mutex_); bool success = current_store_buffer_->setData(g_preallocated_input_double_register_keys[register_id], value); - new_data_available_ = true; - data_available_cv_.notify_one(); + markStorageToBeSent(); return success; } @@ -376,8 +375,7 @@ bool RTDEWriter::sendExternalForceTorque(const vector6d_t& external_force_torque static const std::string key = "external_force_torque"; std::lock_guard guard(store_mutex_); bool success = current_store_buffer_->setData(key, external_force_torque); - new_data_available_ = true; - data_available_cv_.notify_one(); + markStorageToBeSent(); return success; } @@ -390,5 +388,11 @@ void RTDEWriter::resetMasks(const std::shared_ptr& buffer) } } +void RTDEWriter::markStorageToBeSent() +{ + new_data_available_ = true; + data_available_cv_.notify_one(); +} + } // namespace rtde_interface } // namespace urcl From dbad867e7b665e385cfc36bb0f4cd239fb795759 Mon Sep 17 00:00:00 2001 From: Felix Exner Date: Thu, 22 Jan 2026 08:27:49 +0100 Subject: [PATCH 07/11] Use a pointer to a DataPackage when receiving the data package --- examples/rtde_client.cpp | 10 ++++++---- include/ur_client_library/rtde/rtde_client.h | 20 +++++++++++++++++++- include/ur_client_library/ur/ur_driver.h | 2 +- src/rtde/rtde_client.cpp | 16 ++++++++++++++-- src/ur/ur_driver.cpp | 2 +- 5 files changed, 41 insertions(+), 9 deletions(-) diff --git a/examples/rtde_client.cpp b/examples/rtde_client.cpp index 9f3b6a380..b9b5dfe4e 100644 --- a/examples/rtde_client.cpp +++ b/examples/rtde_client.cpp @@ -40,6 +40,9 @@ const std::string DEFAULT_ROBOT_IP = "192.168.56.101"; const std::string OUTPUT_RECIPE = "examples/resources/rtde_output_recipe.txt"; const std::string INPUT_RECIPE = "examples/resources/rtde_input_recipe.txt"; +// Preallocation of string to avoid allocation in main loop +const std::string TARGET_SPEED_FRACTION = "target_speed_fraction"; + void printFraction(const double fraction, const std::string& label, const size_t width = 20) { std::cout << "\r" << label << ": ["; @@ -84,8 +87,7 @@ int main(int argc, char* argv[]) // otherwise we will get pipeline overflows. Therefor, do this directly before starting your main // loop. my_client.start(false); - const std::string key = "target_speed_fraction"; - std::unique_ptr data_pkg = + std::unique_ptr data_pkg = std::make_unique(my_client.getOutputRecipe()); auto start_time = std::chrono::steady_clock::now(); @@ -103,8 +105,8 @@ int main(int argc, char* argv[]) { // Data fields in the data package are accessed by their name. Only names present in the // output recipe can be accessed. Otherwise this function will return false. - dynamic_cast(data_pkg.get())->getData(key, target_speed_fraction); - printFraction(target_speed_fraction, key); + data_pkg->getData(TARGET_SPEED_FRACTION, target_speed_fraction); + printFraction(target_speed_fraction, TARGET_SPEED_FRACTION); } else { diff --git a/include/ur_client_library/rtde/rtde_client.h b/include/ur_client_library/rtde/rtde_client.h index c135395fb..732124993 100644 --- a/include/ur_client_library/rtde/rtde_client.h +++ b/include/ur_client_library/rtde/rtde_client.h @@ -175,6 +175,24 @@ class RTDEClient */ std::unique_ptr getDataPackage(std::chrono::milliseconds timeout); + /*! + * \brief Return the latest data package received + * + * When packages are read from the background thread, the latest data package + * received from the robot can be fetched with this. When no new data has been received since the last call to this + * function, it will wait for the time specified in the \p timeout parameter. + * + * When packages are not read from the background thread, this function will return false and + * print an error message. + * + * \param data_package Reference to a DataPackage where the received data package will be stored + * if a package was fetched successfully. + * \param timeout Time to wait if no data package is currently in the queue + * + * \returns Whether a data package was received successfully + */ + bool getDataPackage(DataPackage& data_package, std::chrono::milliseconds timeout); + /*! * \brief Blocking call to get the next data package received from the robot. * @@ -188,7 +206,7 @@ class RTDEClient * * \returns Whether a data package was received successfully */ - bool getDataPackageBlocking(std::unique_ptr& data_package); + bool getDataPackageBlocking(std::unique_ptr& data_package); /*! * \brief Getter for the maximum frequency the robot can publish RTDE data packages with. diff --git a/include/ur_client_library/ur/ur_driver.h b/include/ur_client_library/ur/ur_driver.h index 5e450dfce..4f9b99f08 100644 --- a/include/ur_client_library/ur/ur_driver.h +++ b/include/ur_client_library/ur/ur_driver.h @@ -462,7 +462,7 @@ class UrDriver * * \returns Whether a data package was received successfully */ - bool getDataPackageBlocking(std::unique_ptr& data_package); + bool getDataPackageBlocking(std::unique_ptr& data_package); uint32_t getControlFrequency() const { diff --git a/src/rtde/rtde_client.cpp b/src/rtde/rtde_client.cpp index 6a4230fb2..e734b10fa 100644 --- a/src/rtde/rtde_client.cpp +++ b/src/rtde/rtde_client.cpp @@ -734,14 +734,26 @@ std::unique_ptr RTDEClient::getDataPackage(std::chr return std::unique_ptr(nullptr); } -bool RTDEClient::getDataPackageBlocking(std::unique_ptr& data_package) +bool RTDEClient::getDataPackage(DataPackage& data_package, std::chrono::milliseconds timeout) +{ + if (auto package = getDataPackage(timeout)) + { + data_package = *dynamic_cast(package.get()); + return true; + } + return false; +} + +bool RTDEClient::getDataPackageBlocking(std::unique_ptr& data_package) { // Cannot get data packages while reconnecting as we could end up getting some of the configuration packages + std::unique_ptr base_package(data_package.release()); if (reconnect_mutex_.try_lock()) { - if (prod_->tryGet(data_package)) + if (prod_->tryGet(base_package)) { reconnect_mutex_.unlock(); + data_package.reset(dynamic_cast(base_package.release())); return true; } reconnect_mutex_.unlock(); diff --git a/src/ur/ur_driver.cpp b/src/ur/ur_driver.cpp index 9a8a150ad..566d6911a 100644 --- a/src/ur/ur_driver.cpp +++ b/src/ur/ur_driver.cpp @@ -198,7 +198,7 @@ std::unique_ptr urcl::UrDriver::getDataPackage() return rtde_client_->getDataPackage(timeout); } -bool UrDriver::getDataPackageBlocking(std::unique_ptr& data_package) +bool UrDriver::getDataPackageBlocking(std::unique_ptr& data_package) { return rtde_client_->getDataPackageBlocking(data_package); } From c8f7382d7110ed32a46997740b36d292996c6586 Mon Sep 17 00:00:00 2001 From: Felix Exner Date: Thu, 22 Jan 2026 10:12:35 +0100 Subject: [PATCH 08/11] Retry starting RTDE client on input conflicts This is specific to testing since we restart the RTDE client in very quick succession. While we shutdown the connection from our end, the robot might not have freed the resources, yet. --- include/ur_client_library/exceptions.h | 46 +++++++++++++++++ src/rtde/rtde_client.cpp | 23 ++++++--- ...test_deprecated_ur_driver_construction.cpp | 50 +++++++++++-------- 3 files changed, 92 insertions(+), 27 deletions(-) diff --git a/include/ur_client_library/exceptions.h b/include/ur_client_library/exceptions.h index 2f70aa85a..72b10bc49 100644 --- a/include/ur_client_library/exceptions.h +++ b/include/ur_client_library/exceptions.h @@ -272,5 +272,51 @@ class UnexpectedResponse : public UrException private: std::string text_; }; + +class RTDEInvalidKeyException : public UrException +{ +public: + explicit RTDEInvalidKeyException() : std::runtime_error("RTDE Invalid Key Exception") + { + } + + explicit RTDEInvalidKeyException(const std::string& text) : std::runtime_error(text) + { + } + + virtual ~RTDEInvalidKeyException() = default; + + virtual const char* what() const noexcept override + { + return std::runtime_error::what(); + } +}; + +class RTDEInputConflictException : public UrException +{ +public: + explicit RTDEInputConflictException() : std::runtime_error("RTDE Output Conflict Exception") + { + } + + explicit RTDEInputConflictException(const std::string& key) + : std::runtime_error("RTDE Output Conflict for key: " + key), key_(key) + { + message_ = "Variable '" + key_ + + "' is currently controlled by another RTDE client. The input recipe can't be used as " + "configured"; + } + + virtual ~RTDEInputConflictException() = default; + + virtual const char* what() const noexcept override + { + return message_.c_str(); + } + +private: + std::string key_; + std::string message_; +}; } // namespace urcl #endif // ifndef UR_CLIENT_LIBRARY_EXCEPTIONS_H_INCLUDED diff --git a/src/rtde/rtde_client.cpp b/src/rtde/rtde_client.cpp index e734b10fa..2252492b5 100644 --- a/src/rtde/rtde_client.cpp +++ b/src/rtde/rtde_client.cpp @@ -153,7 +153,21 @@ bool RTDEClient::setupCommunication(const size_t max_num_tries, const std::chron if (input_recipe_.size() > 0) { - is_rtde_comm_setup = is_rtde_comm_setup && setupInputs(); + try + { + is_rtde_comm_setup = is_rtde_comm_setup && setupInputs(); + } + catch (const RTDEInputConflictException& exc) + { + /* + * If we are starting and shutting down the driver in quick succession, the robot might still + * have some old RTDE connections open. In this case conflicts might occur when we try reserve + * the same input fields again. To mitigate this, we try to setup communication again if + * that error occurs. + */ + URCL_LOG_ERROR("Caught exception %s, while trying to setup RTDE inputs.", exc.what()); + return false; + } } return is_rtde_comm_setup; } @@ -432,14 +446,11 @@ bool RTDEClient::setupInputs() { std::string message = "Variable '" + input_recipe_[i] + "' not recognized by the robot. Probably your input recipe contains errors"; - throw UrException(message); + throw RTDEInvalidKeyException(message); } else if (variable_types[i] == "IN_USE") { - std::string message = "Variable '" + input_recipe_[i] + - "' is currently controlled by another RTDE client. The input recipe can't be used as " - "configured"; - throw UrException(message); + throw RTDEInputConflictException(input_recipe_[i]); } } writer_.init(tmp_input->input_recipe_id_); diff --git a/tests/test_deprecated_ur_driver_construction.cpp b/tests/test_deprecated_ur_driver_construction.cpp index 08b55e65f..3f6ff2936 100644 --- a/tests/test_deprecated_ur_driver_construction.cpp +++ b/tests/test_deprecated_ur_driver_construction.cpp @@ -30,6 +30,7 @@ #include #include +#include #include "ur_client_library/ur/ur_driver.h" const std::string SCRIPT_FILE = "../resources/external_control.urscript"; @@ -44,46 +45,53 @@ void handleRobotProgramState(bool program_running) std::cout << "\033[1;32mProgram running: " << std::boolalpha << program_running << "\033[0m\n" << std::endl; } -TEST(UrDriverTestDeprecatedConstructor, sigA) +void startDriver(std::function()> constructor_fun) { - std::unique_ptr tool_comm_setup; - auto driver = std::make_shared(g_ROBOT_IP, SCRIPT_FILE, OUTPUT_RECIPE, INPUT_RECIPE, - std::bind(&handleRobotProgramState, std::placeholders::_1), g_HEADLESS, - std::move(tool_comm_setup)); + auto driver = constructor_fun(); driver->checkCalibration(CALIBRATION_CHECKSUM); auto version = driver->getVersion(); ASSERT_TRUE(version.major > 0); } +TEST(UrDriverTestDeprecatedConstructor, sigA) +{ + std::unique_ptr tool_comm_setup; + startDriver([&tool_comm_setup]() { + return std::make_shared(g_ROBOT_IP, SCRIPT_FILE, OUTPUT_RECIPE, INPUT_RECIPE, + std::bind(&handleRobotProgramState, std::placeholders::_1), g_HEADLESS, + std::move(tool_comm_setup)); + }); +} + TEST(UrDriverTestDeprecatedConstructor, sigB) { std::unique_ptr tool_comm_setup; - auto driver = std::make_shared( - g_ROBOT_IP, SCRIPT_FILE, OUTPUT_RECIPE, INPUT_RECIPE, std::bind(&handleRobotProgramState, std::placeholders::_1), - g_HEADLESS, std::move(tool_comm_setup), 50001, 50002, 2000, 0.03, false, "", 50003, 50004, 0.025, 0.5); - driver->checkCalibration(CALIBRATION_CHECKSUM); - auto version = driver->getVersion(); - ASSERT_TRUE(version.major > 0); + startDriver([&tool_comm_setup]() { + return std::make_shared(g_ROBOT_IP, SCRIPT_FILE, OUTPUT_RECIPE, INPUT_RECIPE, + std::bind(&handleRobotProgramState, std::placeholders::_1), g_HEADLESS, + std::move(tool_comm_setup), 50001, 50002, 2000, 0.03, false, "", 50003, + 50004, 0.025, 0.5); + }); } TEST(UrDriverTestDeprecatedConstructor, sigC) { std::unique_ptr tool_comm_setup; - auto driver = std::make_shared(g_ROBOT_IP, SCRIPT_FILE, OUTPUT_RECIPE, INPUT_RECIPE, - std::bind(&handleRobotProgramState, std::placeholders::_1), g_HEADLESS, - std::move(tool_comm_setup), CALIBRATION_CHECKSUM); - auto version = driver->getVersion(); - ASSERT_TRUE(version.major > 0); + startDriver([&tool_comm_setup]() { + return std::make_shared(g_ROBOT_IP, SCRIPT_FILE, OUTPUT_RECIPE, INPUT_RECIPE, + std::bind(&handleRobotProgramState, std::placeholders::_1), g_HEADLESS, + std::move(tool_comm_setup), CALIBRATION_CHECKSUM); + }); } TEST(UrDriverTestDeprecatedConstructor, sigD) { std::unique_ptr tool_comm_setup; - auto driver = std::make_shared(g_ROBOT_IP, SCRIPT_FILE, OUTPUT_RECIPE, INPUT_RECIPE, - std::bind(&handleRobotProgramState, std::placeholders::_1), g_HEADLESS, - CALIBRATION_CHECKSUM); - auto version = driver->getVersion(); - ASSERT_TRUE(version.major > 0); + startDriver([]() { + return std::make_shared(g_ROBOT_IP, SCRIPT_FILE, OUTPUT_RECIPE, INPUT_RECIPE, + std::bind(&handleRobotProgramState, std::placeholders::_1), g_HEADLESS, + CALIBRATION_CHECKSUM); + }); } int main(int argc, char* argv[]) From ec6ab167b4893e7345030dc40d444145530c10e3 Mon Sep 17 00:00:00 2001 From: Felix Exner Date: Thu, 22 Jan 2026 10:41:03 +0100 Subject: [PATCH 09/11] Update RTDEClient example documentation --- doc/examples/rtde_client.rst | 51 +++++++++++++++++++++++++++--------- examples/rtde_client.cpp | 12 ++++----- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/doc/examples/rtde_client.rst b/doc/examples/rtde_client.rst index bf47d97bf..c71bbeadb 100644 --- a/doc/examples/rtde_client.rst +++ b/doc/examples/rtde_client.rst @@ -25,11 +25,6 @@ to initialize the RTDE client. :start-at: const std::string OUTPUT_RECIPE :end-at: const std::string INPUT_RECIPE - -Internally, the RTDE client uses the same producer / consumer architecture as show in the -:ref:`primary_pipeline_example` example. However, it doesn't have a consumer thread, so data has to -be read by the user to avoid the pipeline's queue from overflowing. - Creating an RTDE Client ----------------------- @@ -45,25 +40,48 @@ omitted, RTDE communication will be established at the robot's control frequency :start-at: comm::INotifier notifier; :end-at: my_client.init(); -An RTDE data package containing every key-value pair from the output recipe can be fetched using -the ``getDataPackage()`` method. This method will block until a new package is available. - Reading data from the RTDE client --------------------------------- -Once the RTDE client is initialized, we'll have to start communication separately. As mentioned -above, we'll have to read data from the client once communication is started, hence we start -communication right before a loop reading data. +To read data received by the RTDE client, it has to be polled. For this two modes are available: + +- **Background read**: When background read is enabled (default), the RTDE client will start a + background thread that continuously reads data from the robot. The latest data package can be + fetched using the ``getDataPackage()`` method. This method returns immediately with the latest + data package received from the robot. If no data has been received since last calling this + function, it will block for a specified timeout waiting for new data to arrive. + + .. note:: This methods allocates a new data package on each call. We recommend using the blocking + read method explained below. +- **Blocking synchronous read**: When background read is not enabled, data can (and has to be) + fetched using the ``getDataPackageBlocking()`` method. This call waits for a new data package to + arrive and parses that into the passed ``DataPackage`` object. This has to be called with the + RTDE control frequency, as the robot will shutdown RTDE communication if data is not read by the + client. + +Which of the above strategies is used can be specified when starting RTDE communication using the +``start()`` method. In our example, we do not use background read and instead fetch data +synchronously. Hence, we pass ``false`` to the ``start()`` method. .. literalinclude:: ../../examples/rtde_client.cpp :language: c++ :caption: examples/rtde_client.cpp :linenos: :lineno-match: - :start-at: // Once RTDE communication is started + :start-at: std::unique_ptr data_pkg = :end-before: // Change the speed slider +In our main loop, we wait for a new data package to arrive using the blocking read method. Once +received, data from the received package can be accessed using the ``getData()`` method of the +``DataPackage`` object. This method takes the key of the data to be accessed as a parameter and +returns the corresponding value. + +.. note:: The key used to access data has to be part of the output recipe used to initialize the RTDE + client. Passing a string literal, e.g. ``"actual_q"``, is possible but not recommended as it is + converted to an ``std::string`` automatically, causing heap allocations which should be avoided + in Real-Time contexts. + Writing Data to the RTDE client ------------------------------- @@ -84,6 +102,13 @@ initialize the RTDE client has to contain the keys necessary to send that specif :end-at: } -.. note:: Many RTDE inputs require setting up the data key and a mask key. See the `RTDE guide +.. note:: Many RTDE inputs require setting up the data key and a mask key. That is done + internally, but the mask keys have to be part of the input recipe, as well. See the `RTDE guide `_ for more information. + +.. note:: Every ``send...`` call to the RTDEWriter triggers a package sent to the robot. If you + want to modify more than one input at a time, it is recommended to use the ``sendPackage()`` + method. That allows setting up the complete data package with its input recipe and sending that + to the robot at once. + diff --git a/examples/rtde_client.cpp b/examples/rtde_client.cpp index b9b5dfe4e..9e0dcffca 100644 --- a/examples/rtde_client.cpp +++ b/examples/rtde_client.cpp @@ -83,28 +83,26 @@ int main(int argc, char* argv[]) double target_speed_fraction = 1.0; double speed_slider_increment = 0.01; + std::unique_ptr data_pkg = + std::make_unique(my_client.getOutputRecipe()); // Once RTDE communication is started, we have to make sure to read from the interface buffer, as // otherwise we will get pipeline overflows. Therefor, do this directly before starting your main // loop. my_client.start(false); - std::unique_ptr data_pkg = - std::make_unique(my_client.getOutputRecipe()); auto start_time = std::chrono::steady_clock::now(); while (second_to_run <= 0 || std::chrono::duration_cast(std::chrono::steady_clock::now() - start_time).count() < second_to_run) { - // Read latest RTDE package. This will block for READ_TIMEOUT, so the - // robot will effectively be in charge of setting the frequency of this loop unless RTDE - // communication doesn't work in which case the user will be notified. - // In a real-world application this thread should be scheduled with real-time priority in order - // to ensure that this is called in time. + // Wait for a DataPackage. In a real-world application this thread should be scheduled with real-time priority in + // order to ensure that this is called in time. bool success = my_client.getDataPackageBlocking(data_pkg); if (success) { // Data fields in the data package are accessed by their name. Only names present in the // output recipe can be accessed. Otherwise this function will return false. + // We preallocated the string TARGET_SPEED_FRACTION to avoid allocations in the main loop. data_pkg->getData(TARGET_SPEED_FRACTION, target_speed_fraction); printFraction(target_speed_fraction, TARGET_SPEED_FRACTION); } From fa3fa4d9063d96573f088a5efd000896c99383d8 Mon Sep 17 00:00:00 2001 From: Felix Exner Date: Thu, 22 Jan 2026 14:29:24 +0100 Subject: [PATCH 10/11] Use blocking read in frequency test --- tests/test_rtde_client.cpp | 21 +++++---------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/tests/test_rtde_client.cpp b/tests/test_rtde_client.cpp index 37d1296bf..d9c3121c7 100644 --- a/tests/test_rtde_client.cpp +++ b/tests/test_rtde_client.cpp @@ -191,27 +191,16 @@ TEST_F(RTDEClientTest, set_target_frequency) double expected_target_frequency = 1; EXPECT_EQ(client_->getTargetFrequency(), expected_target_frequency); - EXPECT_TRUE(client_->start()); + EXPECT_TRUE(client_->start(false)); // Test that we receive packages with a frequency of 2 Hz - const std::chrono::milliseconds read_timeout{ 10000 }; - std::unique_ptr data_pkg = client_->getDataPackage(read_timeout); - if (data_pkg == nullptr) - { - std::cout << "Failed to get data package from robot" << std::endl; - GTEST_FAIL(); - } + auto data_pkg = std::make_unique(client_->getOutputRecipe()); + ASSERT_TRUE(client_->getDataPackageBlocking(data_pkg)); double first_time_stamp = 0.0; data_pkg->getData("timestamp", first_time_stamp); - data_pkg = client_->getDataPackage(read_timeout); - if (data_pkg == nullptr) - { - std::cout << "Failed to get data package from robot" << std::endl; - GTEST_FAIL(); - } - + ASSERT_TRUE(client_->getDataPackageBlocking(data_pkg)); double second_time_stamp = 0.0; data_pkg->getData("timestamp", second_time_stamp); @@ -534,4 +523,4 @@ int main(int argc, char* argv[]) } return RUN_ALL_TESTS(); -} \ No newline at end of file +} From d3ffb061339cf0b447bee025b23d12b2644c787f Mon Sep 17 00:00:00 2001 From: Felix Exner Date: Fri, 23 Jan 2026 11:29:29 +0100 Subject: [PATCH 11/11] Always upload coverage info --- .github/workflows/ci.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 860384b24..186c354b4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,7 +57,6 @@ jobs: env: URSIM_VERSION: ${{matrix.env.URSIM_VERSION}} - name: Upload test results to Codecov - if: ${{ !cancelled() }} uses: codecov/test-results-action@v1 with: token: ${{ secrets.CODECOV_TOKEN }}