diff --git a/nexus_transporter/CMakeLists.txt b/nexus_transporter/CMakeLists.txt index 9d87129..edda70e 100644 --- a/nexus_transporter/CMakeLists.txt +++ b/nexus_transporter/CMakeLists.txt @@ -93,7 +93,7 @@ target_compile_features(nexus_transporter_component INTERFACE cxx_std_17) rclcpp_components_register_node(nexus_transporter_component PLUGIN "nexus_transporter::TransporterNode" EXECUTABLE nexus_transporter_node - EXECUTOR MultiThreadedExecutor) + EXECUTOR SingleThreadedExecutor) #=============================================================================== diff --git a/nexus_transporter/src/TransporterNode.cpp b/nexus_transporter/src/TransporterNode.cpp index f85bd3d..cef732f 100644 --- a/nexus_transporter/src/TransporterNode.cpp +++ b/nexus_transporter/src/TransporterNode.cpp @@ -18,6 +18,8 @@ #include "TransporterNode.hpp" +#include + #include //============================================================================== @@ -29,7 +31,8 @@ TransporterNode::Data::Data() transporter(nullptr), availability_srv(nullptr), action_srv(nullptr), - tf_broadcaster(nullptr) + tf_broadcaster(nullptr), + ongoing_register(std::nullopt) { // Do nothing. } @@ -114,10 +117,7 @@ auto TransporterNode::on_configure(const State& /*previous_state*/) response->available = true; response->transporter = itinerary->transporter_name(); response->estimated_finish_time = itinerary->estimated_finish_time(); - }, - rclcpp::ServicesQoS(), - _data->cb_group - ); + }); // Load transporter plugin if (_data->transporter_plugin_name.empty()) @@ -163,14 +163,15 @@ auto TransporterNode::on_configure(const State& /*previous_state*/) return CallbackReturn::FAILURE; } + // Create the client for registration. + this->_data->register_client = this->create_client( + RegisterTransporterService::service_name()); + //Timer for registering with the system orchestrator - this->_register_timer = this->create_wall_timer(std::chrono::seconds{1}, + this->_data->register_timer = this->create_wall_timer(std::chrono::seconds{1}, [this]() { - if (this->registration_callback()) - { - this->_register_timer.reset(); - } + this->_data->_register(); }); // Setup Transport action server @@ -365,10 +366,7 @@ auto TransporterNode::on_configure(const State& /*previous_state*/) } }); - }, - rcl_action_server_get_default_options(), - _data->cb_group - ); + }); RCLCPP_INFO(this->get_logger(), "Successfully configured."); return CallbackReturn::SUCCESS; @@ -379,7 +377,7 @@ auto TransporterNode::on_cleanup(const State& /*previous_state*/) -> CallbackReturn { RCLCPP_INFO(this->get_logger(), "Cleaning up..."); - this->_register_timer.reset(); + _data->register_timer.reset(); _data->transporter.reset(); _data->availability_srv.reset(); _data->action_srv.reset(); @@ -436,9 +434,6 @@ TransporterNode::TransporterNode(const rclcpp::NodeOptions& options) _data = std::make_shared(); - _data->cb_group = this->create_callback_group( - rclcpp::CallbackGroupType::MutuallyExclusive); - _data->transporter_plugin_name = this->declare_parameter( "transporter_plugin", std::string("")); @@ -458,42 +453,69 @@ TransporterNode::TransporterNode(const rclcpp::NodeOptions& options) ); } -bool TransporterNode::registration_callback() +void TransporterNode::Data::_register() { - RCLCPP_INFO(this->get_logger(), "Registering with system orchestrator..."); - auto client = this->create_client( - RegisterTransporterService::service_name(), - rclcpp::ServicesQoS(), - _data->cb_group - ); - - if (!client->wait_for_service(_data->connection_timeout)) + auto node = w_node.lock(); + if (node == nullptr) { - RCLCPP_ERROR(this->get_logger(), "Could not find system orchestrator!"); - return false; + return; } - auto req = std::make_shared(); - req->description.workcell_id = this->get_name(); - auto fut = client->async_send_request(req); - - if (fut.wait_for(_data->connection_timeout) != std::future_status::ready) + if (this->ongoing_register) { RCLCPP_ERROR( - this->get_logger(), "Could not register with system orchestrator"); - return false; + node->get_logger(), + "Failed to register: No response from system orchestrator."); + if (!this->register_client->remove_pending_request(*this-> + ongoing_register)) + { + RCLCPP_WARN(node->get_logger(), + "Unable to remove pending request during transporter registration."); + } } - auto resp = fut.get(); - if (!resp->success) + + RCLCPP_INFO(node->get_logger(), "Registering with system orchestrator..."); + auto register_cb = + [this, node](rclcpp::Client::SharedFuture future) + { + this->ongoing_register = std::nullopt; + auto resp = future.get(); + if (!resp->success) + { + switch (resp->error_code) + { + case RegisterTransporter::Response::ERROR_NOT_READY: + RCLCPP_ERROR( + node->get_logger(), + "Error while registering with system orchestrator, retrying again... [%s]", + resp->message.c_str()); + break; + default: + RCLCPP_FATAL(node->get_logger(), + "Failed to register with system orchestrator! [%s]", + resp->message.c_str()); + throw std::runtime_error(resp->message); + } + return; + } + RCLCPP_INFO(node->get_logger(), + "Successfully registered with system orchestrator"); + this->register_timer->cancel(); + // TODO(luca) reintroduce once https://github.com/ros2/rclcpp/issues/2652 is fixed and released + // this->register_timer.reset(); + }; + + if (!this->register_client->wait_for_service(connection_timeout)) { - RCLCPP_ERROR( - this->get_logger(), "Failed to register with system orchestrator (%s)", - resp->message.c_str()); - return false; + RCLCPP_ERROR(node->get_logger(), "Could not find system orchestrator!"); + // timer is not cancelled so it will run again. + return; } - RCLCPP_INFO(this->get_logger(), - "Successfully registered with system orchestrator"); - return true; + + auto req = std::make_shared(); + req->description.workcell_id = node->get_name(); + this->ongoing_register = + this->register_client->async_send_request(req, register_cb); } } // namespace nexus_transporter diff --git a/nexus_transporter/src/TransporterNode.hpp b/nexus_transporter/src/TransporterNode.hpp index ce084bb..76744ac 100644 --- a/nexus_transporter/src/TransporterNode.hpp +++ b/nexus_transporter/src/TransporterNode.hpp @@ -31,6 +31,7 @@ #include #include +#include //============================================================================== namespace nexus_transporter { @@ -53,9 +54,6 @@ class TransporterNode : public rclcpp_lifecycle::LifecycleNode /// Constructor TransporterNode(const rclcpp::NodeOptions& options = rclcpp::NodeOptions()); - /// Register the transporter to the system orchestrator. - bool registration_callback(); - /// rclcpp_lifecycle::LifecycleNode functions to override CallbackReturn on_configure(const State& previous_state) override; CallbackReturn on_cleanup(const State& previous_state) override; @@ -65,7 +63,6 @@ class TransporterNode : public rclcpp_lifecycle::LifecycleNode CallbackReturn on_error(const State& previous_state) override; private: - // Bundling all member variables into a data struct and capturing a shared_ptr // of this struct within lambdas of various callbacks will guarantee thread // safety over capturing "this" raw ptr by reference. @@ -73,6 +70,9 @@ class TransporterNode : public rclcpp_lifecycle::LifecycleNode // multithreaded executor. struct Data { + /// Register the transporter to the system orchestrator. + void _register(); + /// pluginlib clasloader to dynamically load the transporter plugin. pluginlib::ClassLoader transporter_loader; /// The loaded transporter plugin. @@ -90,10 +90,6 @@ class TransporterNode : public rclcpp_lifecycle::LifecycleNode rclcpp_action::Server::SharedPtr action_srv; - /// A separate callback group to prevent deadlock during state transitions - /// and other callbacks that need to execute. - rclcpp::CallbackGroup::SharedPtr cb_group; - // Map itinerary id to GoalData. std::unordered_map> itineraries; @@ -104,12 +100,22 @@ class TransporterNode : public rclcpp_lifecycle::LifecycleNode // Weakptr to lifecycle node. std::weak_ptr w_node; + // Client for registration. + rclcpp::Client::SharedPtr register_client; + + // Asynchronously monitor whether the transport has registered with the + // system orchestrator. + std::optional:: + SharedFutureAndRequestId> ongoing_register; + + rclcpp::TimerBase::SharedPtr register_timer; + + Data(); }; std::shared_ptr _data; - rclcpp::TimerBase::SharedPtr _register_timer; }; } // namespace nexus_transporter