Skip to content

Commit

Permalink
Swtich to single threaded executor for transporter node
Browse files Browse the repository at this point in the history
Signed-off-by: Yadunund <yadunund@gmail.com>
  • Loading branch information
Yadunund committed Jan 27, 2025
1 parent d2bf84c commit 7c70644
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 55 deletions.
2 changes: 1 addition & 1 deletion nexus_transporter/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ target_compile_features(nexus_transporter_component INTERFACE cxx_std_17)
rclcpp_components_register_node(nexus_transporter_component
PLUGIN "nexus_transporter::TransporterNode"
EXECUTABLE nexus_transporter_node
EXECUTOR MultiThreadedExecutor)
EXECUTOR SingleThreadedExecutor)


#===============================================================================
Expand Down
112 changes: 67 additions & 45 deletions nexus_transporter/src/TransporterNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include "TransporterNode.hpp"

#include <stdexcept>

#include <geometry_msgs/msg/transform_stamped.hpp>

//==============================================================================
Expand All @@ -29,7 +31,8 @@ TransporterNode::Data::Data()
transporter(nullptr),
availability_srv(nullptr),
action_srv(nullptr),
tf_broadcaster(nullptr)
tf_broadcaster(nullptr),
ongoing_register(std::nullopt)
{
// Do nothing.
}
Expand Down Expand Up @@ -114,10 +117,7 @@ auto TransporterNode::on_configure(const State& /*previous_state*/)
response->available = true;
response->transporter = itinerary->transporter_name();
response->estimated_finish_time = itinerary->estimated_finish_time();
},
rclcpp::ServicesQoS(),
_data->cb_group
);
});

// Load transporter plugin
if (_data->transporter_plugin_name.empty())
Expand Down Expand Up @@ -163,14 +163,15 @@ auto TransporterNode::on_configure(const State& /*previous_state*/)
return CallbackReturn::FAILURE;
}

// Create the client for registration.
this->_data->register_client = this->create_client<RegisterTransporter>(
RegisterTransporterService::service_name());

//Timer for registering with the system orchestrator
this->_register_timer = this->create_wall_timer(std::chrono::seconds{1},
this->_data->register_timer = this->create_wall_timer(std::chrono::seconds{1},
[this]()
{
if (this->registration_callback())
{
this->_register_timer.reset();
}
this->_data->_register();
});

// Setup Transport action server
Expand Down Expand Up @@ -365,10 +366,7 @@ auto TransporterNode::on_configure(const State& /*previous_state*/)
}
});

},
rcl_action_server_get_default_options(),
_data->cb_group
);
});

RCLCPP_INFO(this->get_logger(), "Successfully configured.");
return CallbackReturn::SUCCESS;
Expand All @@ -379,7 +377,7 @@ auto TransporterNode::on_cleanup(const State& /*previous_state*/)
-> CallbackReturn
{
RCLCPP_INFO(this->get_logger(), "Cleaning up...");
this->_register_timer.reset();
_data->register_timer.reset();
_data->transporter.reset();
_data->availability_srv.reset();
_data->action_srv.reset();
Expand Down Expand Up @@ -436,9 +434,6 @@ TransporterNode::TransporterNode(const rclcpp::NodeOptions& options)

_data = std::make_shared<Data>();

_data->cb_group = this->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive);

_data->transporter_plugin_name = this->declare_parameter(
"transporter_plugin",
std::string(""));
Expand All @@ -458,42 +453,69 @@ TransporterNode::TransporterNode(const rclcpp::NodeOptions& options)
);
}

bool TransporterNode::registration_callback()
void TransporterNode::Data::_register()
{
RCLCPP_INFO(this->get_logger(), "Registering with system orchestrator...");
auto client = this->create_client<RegisterTransporter>(
RegisterTransporterService::service_name(),
rclcpp::ServicesQoS(),
_data->cb_group
);

if (!client->wait_for_service(_data->connection_timeout))
auto node = w_node.lock();
if (node == nullptr)
{
RCLCPP_ERROR(this->get_logger(), "Could not find system orchestrator!");
return false;
return;
}

auto req = std::make_shared<RegisterTransporter::Request>();
req->description.workcell_id = this->get_name();
auto fut = client->async_send_request(req);

if (fut.wait_for(_data->connection_timeout) != std::future_status::ready)
if (this->ongoing_register)
{
RCLCPP_ERROR(
this->get_logger(), "Could not register with system orchestrator");
return false;
node->get_logger(),
"Failed to register: No response from system orchestrator.");
if (!this->register_client->remove_pending_request(*this->
ongoing_register))
{
RCLCPP_WARN(node->get_logger(),
"Unable to remove pending request during transporter registration.");
}
}
auto resp = fut.get();
if (!resp->success)

RCLCPP_INFO(node->get_logger(), "Registering with system orchestrator...");
auto register_cb =
[this, node](rclcpp::Client<RegisterTransporter>::SharedFuture future)
{
this->ongoing_register = std::nullopt;
auto resp = future.get();
if (!resp->success)
{
switch (resp->error_code)
{
case RegisterTransporter::Response::ERROR_NOT_READY:
RCLCPP_ERROR(
node->get_logger(),
"Error while registering with system orchestrator, retrying again... [%s]",
resp->message.c_str());
break;
default:
RCLCPP_FATAL(node->get_logger(),
"Failed to register with system orchestrator! [%s]",
resp->message.c_str());
throw std::runtime_error(resp->message);
}
return;
}
RCLCPP_INFO(node->get_logger(),
"Successfully registered with system orchestrator");
this->register_timer->cancel();
// TODO(luca) reintroduce once https://github.com/ros2/rclcpp/issues/2652 is fixed and released
// this->register_timer.reset();
};

if (!this->register_client->wait_for_service(connection_timeout))
{
RCLCPP_ERROR(
this->get_logger(), "Failed to register with system orchestrator (%s)",
resp->message.c_str());
return false;
RCLCPP_ERROR(node->get_logger(), "Could not find system orchestrator!");
// timer is not cancelled so it will run again.
return;
}
RCLCPP_INFO(this->get_logger(),
"Successfully registered with system orchestrator");
return true;

auto req = std::make_shared<RegisterTransporter::Request>();
req->description.workcell_id = node->get_name();
this->ongoing_register =
this->register_client->async_send_request(req, register_cb);
}

} // namespace nexus_transporter
Expand Down
24 changes: 15 additions & 9 deletions nexus_transporter/src/TransporterNode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <tf2_ros/transform_broadcaster.h>

#include <memory>
#include <optional>

//==============================================================================
namespace nexus_transporter {
Expand All @@ -53,9 +54,6 @@ class TransporterNode : public rclcpp_lifecycle::LifecycleNode
/// Constructor
TransporterNode(const rclcpp::NodeOptions& options = rclcpp::NodeOptions());

/// Register the transporter to the system orchestrator.
bool registration_callback();

/// rclcpp_lifecycle::LifecycleNode functions to override
CallbackReturn on_configure(const State& previous_state) override;
CallbackReturn on_cleanup(const State& previous_state) override;
Expand All @@ -65,14 +63,16 @@ class TransporterNode : public rclcpp_lifecycle::LifecycleNode
CallbackReturn on_error(const State& previous_state) override;

private:

// Bundling all member variables into a data struct and capturing a shared_ptr
// of this struct within lambdas of various callbacks will guarantee thread
// safety over capturing "this" raw ptr by reference.
// TODO(Yadunund): Add an internal node and get rid of dependency on
// multithreaded executor.
struct Data
{
/// Register the transporter to the system orchestrator.
void _register();

/// pluginlib clasloader to dynamically load the transporter plugin.
pluginlib::ClassLoader<Transporter> transporter_loader;
/// The loaded transporter plugin.
Expand All @@ -90,10 +90,6 @@ class TransporterNode : public rclcpp_lifecycle::LifecycleNode
rclcpp_action::Server<ActionType>::SharedPtr
action_srv;

/// A separate callback group to prevent deadlock during state transitions
/// and other callbacks that need to execute.
rclcpp::CallbackGroup::SharedPtr cb_group;

// Map itinerary id to GoalData.
std::unordered_map<rclcpp_action::GoalUUID, std::unique_ptr<Itinerary>>
itineraries;
Expand All @@ -104,12 +100,22 @@ class TransporterNode : public rclcpp_lifecycle::LifecycleNode
// Weakptr to lifecycle node.
std::weak_ptr<rclcpp_lifecycle::LifecycleNode> w_node;

// Client for registration.
rclcpp::Client<RegisterTransporter>::SharedPtr register_client;

// Asynchronously monitor whether the transport has registered with the
// system orchestrator.
std::optional<rclcpp::Client<RegisterTransporter>::
SharedFutureAndRequestId> ongoing_register;

rclcpp::TimerBase::SharedPtr register_timer;


Data();
};

std::shared_ptr<Data> _data;

rclcpp::TimerBase::SharedPtr _register_timer;
};

} // namespace nexus_transporter
Expand Down

0 comments on commit 7c70644

Please sign in to comment.