From 8cffa7838abccd7d8fa988ffaee0a3a62642af75 Mon Sep 17 00:00:00 2001 From: koparasy Date: Wed, 22 Nov 2023 10:40:27 -0800 Subject: [PATCH 1/5] Remove memory copies and align header size to 8 bytes --- src/AMSlib/wf/basedb.hpp | 1056 +++++++++++++++++++------------------- 1 file changed, 529 insertions(+), 527 deletions(-) diff --git a/src/AMSlib/wf/basedb.hpp b/src/AMSlib/wf/basedb.hpp index 3c8a21bc..00db8671 100644 --- a/src/AMSlib/wf/basedb.hpp +++ b/src/AMSlib/wf/basedb.hpp @@ -19,6 +19,8 @@ #include "AMS.h" #include "wf/debug.h" +#include "wf/device.hpp" +#include "wf/resource_manager.hpp" #include "wf/utils.hpp" namespace fs = std::experimental::filesystem; @@ -742,8 +744,11 @@ struct AMSMsgHeader { */ static size_t size() { - return sizeof(hsize) + sizeof(dtype) + sizeof(mpi_rank) + sizeof(num_elem) + - sizeof(in_dim) + sizeof(out_dim); + return ((sizeof(hsize) + sizeof(dtype) + sizeof(mpi_rank) + + sizeof(num_elem) + sizeof(in_dim) + sizeof(out_dim) + + sizeof(double) - 1) / + sizeof(double)) * + sizeof(double); } /** @@ -775,7 +780,13 @@ struct AMSMsgHeader { std::memcpy(data_blob + current_offset, &(out_dim), sizeof(out_dim)); current_offset += sizeof(out_dim); - return current_offset; + CFATAL(RMQHeader, + current_offset > AMSMsgHeader::size(), + "Offset is %d but header size is %d", + current_offset, + AMSMsgHeader::size()); + + return AMSMsgHeader::size(); } }; @@ -832,7 +843,11 @@ class AMSMessage _input_dim, _output_dim); size_t current_offset = header.encode(_data); - current_offset = encode_data(_data, current_offset, inputs, outputs); + DBG(RMQPublisherHandler, "Current Offset is %d", current_offset); + current_offset += + encode_data(reinterpret_cast(_data + current_offset), + inputs, + outputs); } AMSMessage(const AMSMessage&) = delete; @@ -862,32 +877,27 @@ class AMSMessage * @param[in] outputs Outputs * @return The number of bytes in the message or 0 if error */ - size_t encode_data(uint8_t* data_blob, - size_t offset, + size_t encode_data(TypeValue* data_blob, const std::vector& inputs, const std::vector& outputs) { + size_t x_dim = _input_dim + _output_dim; if (!data_blob) return 0; // Creating the body part of the messages // TODO: slow method (one copy per element!), improve by reducing number of copies for (size_t i = 0; i < _num_elements; i++) { for (size_t j = 0; j < _input_dim; j++) { - ams::ResourceManager::copy(&(inputs[j][i]), - reinterpret_cast( - _data + offset), - sizeof(TypeValue)); - offset += sizeof(TypeValue); + data_blob[i * x_dim + j] = inputs[j][i]; } + } + + for (size_t i = 0; i < _num_elements; i++) { for (size_t j = 0; j < _output_dim; j++) { - ams::ResourceManager::copy(&(outputs[j][i]), - reinterpret_cast( - _data + offset), - sizeof(TypeValue)); - offset += sizeof(TypeValue); + data_blob[i * x_dim + _input_dim + j] = outputs[j][i]; } } - return offset; + return (x_dim * _num_elements) * sizeof(TypeValue); } /** @@ -1274,139 +1284,136 @@ class RMQConsumer return std::make_tuple("", "", "", -1, false); } - /** + /** * @brief Return the message corresponding to the delivery tag. Do not delete the * message. * @param[in] delivery_tag Delivery tag that will be returned (if found) * @return A structure inbound_msg which is a std::tuple (see typedef) */ - inbound_msg get_messages(uint64_t delivery_tag) - { - if (!_messages.empty()) { - auto it = std::find_if(_messages.begin(), - _messages.end(), - [&delivery_tag](const inbound_msg& e) { - return std::get<3>(e) == delivery_tag; - }); - if (it != _messages.end()) return *it; - } - return std::make_tuple("", "", "", -1, false); + inbound_msg get_messages(uint64_t delivery_tag) + { + if (!_messages.empty()) { + auto it = std::find_if(_messages.begin(), + _messages.end(), + [&delivery_tag](const inbound_msg& e) { + return std::get<3>(e) == delivery_tag; + }); + if (it != _messages.end()) return *it; } + return std::make_tuple("", "", "", -1, false); + } - ~RMQConsumer() - { - _connection->close(false); - delete _connection; - } - }; // class RMQConsumer + ~RMQConsumer() + { + _connection->close(false); + delete _connection; + } +}; // class RMQConsumer - /** +/** * @brief Specific handler for RabbitMQ connections based on libevent. */ - template - class RMQPublisherHandler : public AMQP::LibEventHandler - { - private: - /** @brief Path to TLS certificate */ - std::string _cacert; - /** @brief The MPI rank (0 if MPI is not used) */ - int _rank; - /** @brief LibEvent I/O loop */ - std::shared_ptr _loop; - /** @brief main channel used to send data to the broker */ - std::shared_ptr _channel; - /** @brief AMQP reliable channel (wrapper of classic channel with added functionalities) */ - std::shared_ptr> _rchannel; - /** @brief RabbitMQ queue */ - std::string _queue; - /** @brief Total number of messages sent */ - int _nb_msg; - /** @brief Number of messages successfully acknowledged */ - int _nb_msg_ack; - - public: - /** +template +class RMQPublisherHandler : public AMQP::LibEventHandler +{ +private: + /** @brief Path to TLS certificate */ + std::string _cacert; + /** @brief The MPI rank (0 if MPI is not used) */ + int _rank; + /** @brief LibEvent I/O loop */ + std::shared_ptr _loop; + /** @brief main channel used to send data to the broker */ + std::shared_ptr _channel; + /** @brief AMQP reliable channel (wrapper of classic channel with added functionalities) */ + std::shared_ptr> _rchannel; + /** @brief RabbitMQ queue */ + std::string _queue; + /** @brief Total number of messages sent */ + int _nb_msg; + /** @brief Number of messages successfully acknowledged */ + int _nb_msg_ack; + +public: + /** * @brief Constructor * @param[in] loop Event Loop * @param[in] cacert SSL Cacert * @param[in] rank MPI rank */ - RMQPublisherHandler(std::shared_ptr loop, - std::string cacert, - std::string queue) - : AMQP::LibEventHandler(loop.get()), - _loop(loop), - _rank(0), - _cacert(std::move(cacert)), - _queue(queue), - _nb_msg_ack(0), - _nb_msg(0), - _channel(nullptr), - _rchannel(nullptr) - { + RMQPublisherHandler(std::shared_ptr loop, + std::string cacert, + std::string queue) + : AMQP::LibEventHandler(loop.get()), + _loop(loop), + _rank(0), + _cacert(std::move(cacert)), + _queue(queue), + _nb_msg_ack(0), + _nb_msg(0), + _channel(nullptr), + _rchannel(nullptr) + { #ifdef __ENABLE_MPI__ - MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &_rank)); + MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &_rank)); #endif - } + } - /** + /** * @brief Publish data on RMQ queue. * @param[in] data The data pointer * @param[in] data_size The number of bytes in the data pointer */ - void publish(const AMSMessage& msg) - { - if (_rchannel) { - // publish a message via the reliable-channel - _rchannel - ->publish("", - _queue, - reinterpret_cast(msg.data()), - msg.size()) - .onAck([&]() { - DBG(RMQPublisherHandler, - "[rank=%d] message #%d got acknowledged successfully by RMQ " - "server", - _rank, - _nb_msg) - _nb_msg_ack++; - }) - .onNack([&]() { - WARNING(RMQPublisherHandler, - "[rank=%d] message #%d received negative acknowledged by " - "RMQ " - "server", - _rank, - _nb_msg) - }) - .onLost([&]() { - CFATAL(RMQPublisherHandler, - false, - "[rank=%d] message #%d likely got lost by RMQ server", - _rank, - _nb_msg) - }) - .onError([&](const char* err_message) { - CFATAL(RMQPublisherHandler, - false, - "[rank=%d] message #%d did not get send: %s", - _rank, - _nb_msg, - err_message) - }); - } else { - WARNING(RMQPublisherHandler, - "[rank=%d] The reliable channel was not ready for message #%d.", + void publish(const AMSMessage& msg) + { + if (_rchannel) { + // publish a message via the reliable-channel + _rchannel + ->publish("", _queue, reinterpret_cast(msg.data()), msg.size()) + .onAck([&]() { + DBG(RMQPublisherHandler, + "[rank=%d] message #%d got acknowledged successfully by RMQ " + "server", _rank, _nb_msg) - } - _nb_msg++; + _nb_msg_ack++; + }) + .onNack([&]() { + WARNING(RMQPublisherHandler, + "[rank=%d] message #%d received negative acknowledged by " + "RMQ " + "server", + _rank, + _nb_msg) + }) + .onLost([&]() { + CFATAL(RMQPublisherHandler, + false, + "[rank=%d] message #%d likely got lost by RMQ server", + _rank, + _nb_msg) + }) + .onError([&](const char* err_message) { + CFATAL(RMQPublisherHandler, + false, + "[rank=%d] message #%d did not get send: %s", + _rank, + _nb_msg, + err_message) + }); + } else { + WARNING(RMQPublisherHandler, + "[rank=%d] The reliable channel was not ready for message #%d.", + _rank, + _nb_msg) } + _nb_msg++; + } - ~RMQPublisherHandler() = default; + ~RMQPublisherHandler() = default; - private: - /** +private: + /** * @brief Method that is called after a TCP connection has been set up, and * right before the SSL handshake is going to be performed to secure the * connection (only for amqps:// connections). This method can be overridden @@ -1417,34 +1424,33 @@ class RMQConsumer * @return bool True to proceed / accept the connection, false * to break up */ - virtual bool onSecuring(AMQP::TcpConnection* connection, SSL* ssl) - { - ERR_clear_error(); - unsigned long err; + virtual bool onSecuring(AMQP::TcpConnection* connection, SSL* ssl) + { + ERR_clear_error(); + unsigned long err; #if OPENSSL_VERSION_NUMBER < 0x10100000L - int ret = - SSL_use_certificate_file(ssl, _cacert.c_str(), SSL_FILETYPE_PEM); + int ret = SSL_use_certificate_file(ssl, _cacert.c_str(), SSL_FILETYPE_PEM); #else - int ret = SSL_use_certificate_chain_file(ssl, _cacert.c_str()); + int ret = SSL_use_certificate_chain_file(ssl, _cacert.c_str()); #endif - if (ret != 1) { - std::string error("openssl: error loading ca-chain (" + _cacert + - ") + from ["); - SSL_get_error(ssl, ret); - if ((err = ERR_get_error())) { - error += std::string(ERR_reason_error_string(err)); - } - error += "]"; - throw std::runtime_error(error); - } else { - DBG(RMQPublisherHandler, - "Success logged with ca-chain %s", - _cacert.c_str()) - return true; + if (ret != 1) { + std::string error("openssl: error loading ca-chain (" + _cacert + + ") + from ["); + SSL_get_error(ssl, ret); + if ((err = ERR_get_error())) { + error += std::string(ERR_reason_error_string(err)); } + error += "]"; + throw std::runtime_error(error); + } else { + DBG(RMQPublisherHandler, + "Success logged with ca-chain %s", + _cacert.c_str()) + return true; } + } - /** + /** * @brief Method that is called when the secure TLS connection has been * established. This is only called for amqps:// connections. It allows you to * inspect whether the connection is secure enough for your liking (you can @@ -1454,81 +1460,81 @@ class RMQConsumer * @param[in] ssl SSL structure from openssl library * @return bool True if connection can be used */ - virtual bool onSecured(AMQP::TcpConnection* connection, - const SSL* ssl) override - { - DBG(RMQPublisherHandler, - "[rank=%d] Secured TLS connection has been established.", - _rank) - return true; - } + virtual bool onSecured(AMQP::TcpConnection* connection, + const SSL* ssl) override + { + DBG(RMQPublisherHandler, + "[rank=%d] Secured TLS connection has been established.", + _rank) + return true; + } - /** + /** * @brief Method that is called by the AMQP library when the login attempt * succeeded. After this the connection is ready to use. * @param[in] connection The connection that can now be used */ - virtual void onReady(AMQP::TcpConnection* connection) override - { - DBG(RMQPublisherHandler, - "[rank=%d] Sucessfuly logged in. Connection ready to use.\n", - _rank) + virtual void onReady(AMQP::TcpConnection* connection) override + { + DBG(RMQPublisherHandler, + "[rank=%d] Sucessfuly logged in. Connection ready to use.\n", + _rank) - _channel = std::make_shared(connection); - _channel->onError([&](const char* message) { - CFATAL(RMQPublisherHandler, - false, - "[rank=%d] Error on channel: %s", - _rank, - message) - }); - - _channel->declareQueue(_queue) - .onSuccess([&](const std::string& name, - uint32_t messagecount, - uint32_t consumercount) { - if (messagecount > 0 || consumercount > 1) { - CWARNING(RMQPublisherHandler, - _rank == 0, - "[rank=%d] declared queue: %s (messagecount=%d, " - "consumercount=%d)", - _rank, - _queue.c_str(), - messagecount, - consumercount) - } - // We can now instantiate the shared buffer between AMS and RMQ - DBG(RMQPublisherHandler, - "[rank=%d] declared queue: %s", - _rank, - _queue.c_str()) - _rchannel = - std::make_shared>(*_channel.get()); - }) - .onError([&](const char* message) { - CFATAL(RMQPublisherHandler, - false, - "[ERROR][rank=%d] Error while creating broker queue (%s): " - "%s", - _rank, - _queue.c_str(), - message) - }); - } + _channel = std::make_shared(connection); + _channel->onError([&](const char* message) { + CFATAL(RMQPublisherHandler, + false, + "[rank=%d] Error on channel: %s", + _rank, + message) + }); + + _channel->declareQueue(_queue) + .onSuccess([&](const std::string& name, + uint32_t messagecount, + uint32_t consumercount) { + if (messagecount > 0 || consumercount > 1) { + CWARNING(RMQPublisherHandler, + _rank == 0, + "[rank=%d] declared queue: %s (messagecount=%d, " + "consumercount=%d)", + _rank, + _queue.c_str(), + messagecount, + consumercount) + } + // We can now instantiate the shared buffer between AMS and RMQ + DBG(RMQPublisherHandler, + "[rank=%d] declared queue: %s", + _rank, + _queue.c_str()) + _rchannel = + std::make_shared>(*_channel.get()); + }) + .onError([&](const char* message) { + CFATAL(RMQPublisherHandler, + false, + "[ERROR][rank=%d] Error while creating broker queue (%s): " + "%s", + _rank, + _queue.c_str(), + message) + }); + } - /** + /** * Method that is called when the AMQP protocol is ended. This is the * counter-part of a call to connection.close() to graceful shutdown * the connection. Note that the TCP connection is at this time still * active, and you will also receive calls to onLost() and onDetached() * @param connection The connection over which the AMQP protocol ended */ - virtual void onClosed(AMQP::TcpConnection* connection) override - { - DBG(RMQPublisherHandler, "[rank=%d] Connection is closed.\n", _rank) - } + virtual void onClosed(AMQP::TcpConnection* connection) override + { + DBG(RMQPublisherHandler, "[rank=%d] Connection is closed.\n", _rank) + } - /** + /** * @brief Method that is called by the AMQP library when a fatal error occurs * on the connection, for example because data received from RabbitMQ * could not be recognized, or the underlying connection is lost. This @@ -1537,153 +1543,150 @@ class RMQConsumer * @param[in] connection The connection on which the error occurred * @param[in] message A human readable error message */ - virtual void onError(AMQP::TcpConnection* connection, - const char* message) override - { - DBG(RMQPublisherHandler, - "[rank=%d] fatal error when establishing TCP connection: %s\n", - _rank, - message) - } + virtual void onError(AMQP::TcpConnection* connection, + const char* message) override + { + DBG(RMQPublisherHandler, + "[rank=%d] fatal error when establishing TCP connection: %s\n", + _rank, + message) + } - /** + /** * Final method that is called. This signals that no further calls to your * handler will be made about the connection. * @param connection The connection that can be destructed */ - virtual void onDetached(AMQP::TcpConnection* connection) override - { - // add your own implementation, like cleanup resources or exit the application - DBG(RMQPublisherHandler, "[rank=%d] Connection is detached.\n", _rank) - } - }; // class RMQPublisherHandler + virtual void onDetached(AMQP::TcpConnection* connection) override + { + // add your own implementation, like cleanup resources or exit the application + DBG(RMQPublisherHandler, "[rank=%d] Connection is detached.\n", _rank) + } +}; // class RMQPublisherHandler - /** +/** * @brief Class that manages a RabbitMQ broker and handles connection, event * loop and set up various handlers. */ - template - class RMQPublisher +template +class RMQPublisher +{ +private: + /** @brief Connection to the broker */ + AMQP::TcpConnection* _connection; + /** @brief name of the queue to send data */ + std::string _queue; + /** @brief TLS certificate file */ + std::string _cacert; + /** @brief MPI rank (if MPI is used, otherwise 0) */ + int _rank; + /** @brief The event loop for sender (usually the default one in libevent) */ + std::shared_ptr _loop; + /** @brief The handler which contains various callbacks for the sender */ + std::shared_ptr> _handler; + +public: + RMQPublisher(const RMQPublisher&) = delete; + RMQPublisher& operator=(const RMQPublisher&) = delete; + + RMQPublisher(const AMQP::Address& address, + std::string cacert, + std::string queue) + : _rank(0), _queue(queue), _cacert(cacert), _handler(nullptr) { - private: - /** @brief Connection to the broker */ - AMQP::TcpConnection* _connection; - /** @brief name of the queue to send data */ - std::string _queue; - /** @brief TLS certificate file */ - std::string _cacert; - /** @brief MPI rank (if MPI is used, otherwise 0) */ - int _rank; - /** @brief The event loop for sender (usually the default one in libevent) */ - std::shared_ptr _loop; - /** @brief The handler which contains various callbacks for the sender */ - std::shared_ptr> _handler; - - public: - RMQPublisher(const RMQPublisher&) = delete; - RMQPublisher& operator=(const RMQPublisher&) = delete; - - RMQPublisher(const AMQP::Address& address, - std::string cacert, - std::string queue) - : _rank(0), _queue(queue), _cacert(cacert), _handler(nullptr) - { #ifdef __ENABLE_MPI__ - MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &_rank)); + MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &_rank)); #endif #ifdef EVTHREAD_USE_PTHREADS_IMPLEMENTED - evthread_use_pthreads(); + evthread_use_pthreads(); #endif - CDEBUG(RMQPublisher, - _rank == 0, - "Libevent %s (LIBEVENT_VERSION_NUMBER = %#010x)", - event_get_version(), - event_get_version_number()); - CDEBUG(RMQPublisher, - _rank == 0, - "%s (OPENSSL_VERSION_NUMBER = %#010x)", - OPENSSL_VERSION_TEXT, - OPENSSL_VERSION_NUMBER); + CDEBUG(RMQPublisher, + _rank == 0, + "Libevent %s (LIBEVENT_VERSION_NUMBER = %#010x)", + event_get_version(), + event_get_version_number()); + CDEBUG(RMQPublisher, + _rank == 0, + "%s (OPENSSL_VERSION_NUMBER = %#010x)", + OPENSSL_VERSION_TEXT, + OPENSSL_VERSION_NUMBER); #if OPENSSL_VERSION_NUMBER < 0x10100000L - SSL_library_init(); + SSL_library_init(); #else - OPENSSL_init_ssl(0, NULL); + OPENSSL_init_ssl(0, NULL); #endif - CINFO(RMQPublisher, - _rank == 0, - "RabbitMQ address: %s:%d/%s (queue = %s)", - address.hostname().c_str(), - address.port(), - address.vhost().c_str(), - _queue.c_str()) - - _loop = std::shared_ptr(event_base_new(), - [](struct event_base* event) { - event_base_free(event); - }); - - _handler = std::make_shared>(_loop, - _cacert, - _queue); - _connection = new AMQP::TcpConnection(_handler.get(), address); - } + CINFO(RMQPublisher, + _rank == 0, + "RabbitMQ address: %s:%d/%s (queue = %s)", + address.hostname().c_str(), + address.port(), + address.vhost().c_str(), + _queue.c_str()) - /** + _loop = std::shared_ptr(event_base_new(), + [](struct event_base* event) { + event_base_free(event); + }); + + _handler = std::make_shared>(_loop, + _cacert, + _queue); + _connection = new AMQP::TcpConnection(_handler.get(), address); + } + + /** * @brief Check if the underlying RabbitMQ connection is ready and usable * @return True if the publisher is ready to publish */ - bool ready_publish() - { - return _connection->ready() && _connection->usable(); - } + bool ready_publish() { return _connection->ready() && _connection->usable(); } - /** + /** * @brief Wait that the connection is ready (blocking call) * @return True if the publisher is ready to publish */ - void wait_ready(int ms = 500, int timeout_sec = 30) - { - // We wait for the connection to be ready - int total_time = 0; - while (!ready_publish()) { - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); - DBG(RMQPublisher, - "[rank=%d] Waiting for connection to be ready...", - _rank) - total_time += ms; - if (total_time > timeout_sec * 1000) { - DBG(RMQPublisher, "[rank=%d] Connection timeout", _rank) - break; - // TODO: if connection is not working -> revert to classic file DB. - } + void wait_ready(int ms = 500, int timeout_sec = 30) + { + // We wait for the connection to be ready + int total_time = 0; + while (!ready_publish()) { + std::this_thread::sleep_for(std::chrono::milliseconds(ms)); + DBG(RMQPublisher, + "[rank=%d] Waiting for connection to be ready...", + _rank) + total_time += ms; + if (total_time > timeout_sec * 1000) { + DBG(RMQPublisher, "[rank=%d] Connection timeout", _rank) + break; + // TODO: if connection is not working -> revert to classic file DB. } } + } - /** + /** * @brief Start the underlying I/O loop (blocking call) */ - void start() - { - event_base_dispatch(_loop.get()); - // We wait for the connection to be ready - wait_ready(); - } + void start() + { + event_base_dispatch(_loop.get()); + // We wait for the connection to be ready + wait_ready(); + } - /** + /** * @brief Stop the underlying I/O loop */ - void stop() { event_base_loopexit(_loop.get(), NULL); } + void stop() { event_base_loopexit(_loop.get(), NULL); } - void publish(const AMSMessage& message) - { - _handler->publish(message); - } + void publish(const AMSMessage& message) + { + _handler->publish(message); + } - ~RMQPublisher() { delete _connection; } - }; // class RMQPublisher + ~RMQPublisher() { delete _connection; } +}; // class RMQPublisher - /** +/** * @brief Class that manages a RabbitMQ broker and handles connection, event * loop and set up various handlers. * @details This class manages a specific type of database backend in AMSLib. @@ -1739,141 +1742,141 @@ class RMQConsumer * For example, we create a channel only if the underlying connection has been succesfuly initiated * (see RMQPublisherHandler::onReady()). */ - template - class RabbitMQDB final : public BaseDB - { - private: - /** @brief Path of the config file (JSON) */ - std::string _config; - /** @brief name of the queue to send data */ - std::string _queue_sender; - /** @brief name of the queue to receive data */ - std::string _queue_receiver; - /** @brief MPI rank (if MPI is used, otherwise 0) */ - int _rank; - /** @brief Represent the ID of the last message sent */ - int _msg_tag; - /** @brief Publisher sending messages to RMQ server */ - std::shared_ptr> _publisher; - /** @brief Thread in charge of the publisher */ - std::thread _publisher_thread; - /** @brief Consumer listening to RMQ and consuming messages */ - std::shared_ptr> _consumer; - /** @brief Thread in charge of the consumer */ - std::thread _consumer_thread; - - /** +template +class RabbitMQDB final : public BaseDB +{ +private: + /** @brief Path of the config file (JSON) */ + std::string _config; + /** @brief name of the queue to send data */ + std::string _queue_sender; + /** @brief name of the queue to receive data */ + std::string _queue_receiver; + /** @brief MPI rank (if MPI is used, otherwise 0) */ + int _rank; + /** @brief Represent the ID of the last message sent */ + int _msg_tag; + /** @brief Publisher sending messages to RMQ server */ + std::shared_ptr> _publisher; + /** @brief Thread in charge of the publisher */ + std::thread _publisher_thread; + /** @brief Consumer listening to RMQ and consuming messages */ + std::shared_ptr> _consumer; + /** @brief Thread in charge of the consumer */ + std::thread _consumer_thread; + + /** * @brief Read a JSON and create a hashmap * @param[in] fn Path of the RabbitMQ JSON config file * @return a hashmap (std::unordered_map) of the JSON file */ - std::unordered_map _read_config(std::string fn) - { - std::ifstream config; - std::unordered_map connection_info = { - {"rabbitmq-erlang-cookie", ""}, - {"rabbitmq-name", ""}, - {"rabbitmq-password", ""}, - {"rabbitmq-user", ""}, - {"rabbitmq-vhost", ""}, - {"service-port", ""}, - {"service-host", ""}, - {"rabbitmq-cert", ""}, - {"rabbitmq-inbound-queue", ""}, - {"rabbitmq-outbound-queue", ""}, - }; - - config.open(fn, std::ifstream::in); - - if (config.is_open()) { - std::string line; - while (std::getline(config, line)) { - if (line.find("{") != std::string::npos || - line.find("}") != std::string::npos) { - continue; - } - line.erase(std::remove(line.begin(), line.end(), ' '), line.end()); - line.erase(std::remove(line.begin(), line.end(), ','), line.end()); - line.erase(std::remove(line.begin(), line.end(), '"'), line.end()); + std::unordered_map _read_config(std::string fn) + { + std::ifstream config; + std::unordered_map connection_info = { + {"rabbitmq-erlang-cookie", ""}, + {"rabbitmq-name", ""}, + {"rabbitmq-password", ""}, + {"rabbitmq-user", ""}, + {"rabbitmq-vhost", ""}, + {"service-port", ""}, + {"service-host", ""}, + {"rabbitmq-cert", ""}, + {"rabbitmq-inbound-queue", ""}, + {"rabbitmq-outbound-queue", ""}, + }; - std::string key = line.substr(0, line.find(':')); - line.erase(0, line.find(":") + 1); - connection_info[key] = line; + config.open(fn, std::ifstream::in); + + if (config.is_open()) { + std::string line; + while (std::getline(config, line)) { + if (line.find("{") != std::string::npos || + line.find("}") != std::string::npos) { + continue; } - config.close(); - } else { - std::string err = "Could not open JSON file: " + fn; - CFATAL(RabbitMQDB, false, err.c_str()); + line.erase(std::remove(line.begin(), line.end(), ' '), line.end()); + line.erase(std::remove(line.begin(), line.end(), ','), line.end()); + line.erase(std::remove(line.begin(), line.end(), '"'), line.end()); + + std::string key = line.substr(0, line.find(':')); + line.erase(0, line.find(":") + 1); + connection_info[key] = line; } - return connection_info; + config.close(); + } else { + std::string err = "Could not open JSON file: " + fn; + CFATAL(RabbitMQDB, false, err.c_str()); } + return connection_info; + } - public: - RabbitMQDB(const RabbitMQDB&) = delete; - RabbitMQDB& operator=(const RabbitMQDB&) = delete; - - RabbitMQDB(char* config, uint64_t id) - : BaseDB(id), - _rank(0), - _msg_tag(0), - _config(std::string(config)), - _publisher(nullptr), - _consumer(nullptr) - { - std::unordered_map rmq_config = - _read_config(_config); - _queue_sender = - rmq_config["rabbitmq-outbound-queue"]; // Queue to send data to - _queue_receiver = rmq_config - ["rabbitmq-inbound-queue"]; // Queue to receive data from PDS - bool is_secure = true; - - if (rmq_config["service-port"].empty()) { - CFATAL(RabbitMQDB, - false, - "service-port is empty, make sure the port number is present in " - "the JSON configuration") - return; - } - if (rmq_config["service-host"].empty()) { - CFATAL(RabbitMQDB, - false, - "service-host is empty, make sure the host is present in the " - "JSON " - "configuration") - return; - } +public: + RabbitMQDB(const RabbitMQDB&) = delete; + RabbitMQDB& operator=(const RabbitMQDB&) = delete; - uint16_t port = std::stoi(rmq_config["service-port"]); - if (_queue_sender.empty() || _queue_receiver.empty()) { - CFATAL(RabbitMQDB, - false, - "Queues are empty, please check your credentials file and make " - "sure rabbitmq-inbound-queue and rabbitmq-outbound-queue exist") - return; - } + RabbitMQDB(char* config, uint64_t id) + : BaseDB(id), + _rank(0), + _msg_tag(0), + _config(std::string(config)), + _publisher(nullptr), + _consumer(nullptr) + { + std::unordered_map rmq_config = + _read_config(_config); + _queue_sender = + rmq_config["rabbitmq-outbound-queue"]; // Queue to send data to + _queue_receiver = + rmq_config["rabbitmq-inbound-queue"]; // Queue to receive data from PDS + bool is_secure = true; + + if (rmq_config["service-port"].empty()) { + CFATAL(RabbitMQDB, + false, + "service-port is empty, make sure the port number is present in " + "the JSON configuration") + return; + } + if (rmq_config["service-host"].empty()) { + CFATAL(RabbitMQDB, + false, + "service-host is empty, make sure the host is present in the " + "JSON " + "configuration") + return; + } + + uint16_t port = std::stoi(rmq_config["service-port"]); + if (_queue_sender.empty() || _queue_receiver.empty()) { + CFATAL(RabbitMQDB, + false, + "Queues are empty, please check your credentials file and make " + "sure rabbitmq-inbound-queue and rabbitmq-outbound-queue exist") + return; + } + + AMQP::Login login(rmq_config["rabbitmq-user"], + rmq_config["rabbitmq-password"]); + AMQP::Address address(rmq_config["service-host"], + port, + login, + rmq_config["rabbitmq-vhost"], + is_secure); - AMQP::Login login(rmq_config["rabbitmq-user"], - rmq_config["rabbitmq-password"]); - AMQP::Address address(rmq_config["service-host"], - port, - login, - rmq_config["rabbitmq-vhost"], - is_secure); - - std::string cacert = rmq_config["rabbitmq-cert"]; - _publisher = std::make_shared>(address, - cacert, - _queue_sender); - _consumer = std::make_shared>(address, + std::string cacert = rmq_config["rabbitmq-cert"]; + _publisher = std::make_shared>(address, cacert, - _queue_receiver); + _queue_sender); + _consumer = std::make_shared>(address, + cacert, + _queue_receiver); - _publisher_thread = std::thread([&]() { _publisher->start(); }); - _consumer_thread = std::thread([&]() { _consumer->start(); }); - } + _publisher_thread = std::thread([&]() { _publisher->start(); }); + _consumer_thread = std::thread([&]() { _consumer->start(); }); + } - /** + /** * @brief Takes an input and an output vector each holding 1-D vectors data, and push * it onto the libevent buffer. * @param[in] num_elements Number of elements of each 1-D vector @@ -1881,49 +1884,49 @@ class RMQConsumer * @param[in] outputs Vector of 1-D vectors, each 1-D vectors contains * 'num_elements' values to be sent */ - PERFFASPECT() - void store(size_t num_elements, - std::vector& inputs, - std::vector& outputs) override - { - DBG(RabbitMQDB, - "[tag=%d] %s stores %ld elements of input/output " - "dimensions (%d, %d)", - _msg_tag, - type().c_str(), - num_elements, - inputs.size(), - outputs.size()) - - auto msg = AMSMessage(_msg_tag, num_elements, inputs, outputs); - _publisher->publish(msg); - _msg_tag++; - } + PERFFASPECT() + void store(size_t num_elements, + std::vector& inputs, + std::vector& outputs) override + { + DBG(RabbitMQDB, + "[tag=%d] %s stores %ld elements of input/output " + "dimensions (%d, %d)", + _msg_tag, + type().c_str(), + num_elements, + inputs.size(), + outputs.size()) - /** + auto msg = AMSMessage(_msg_tag, num_elements, inputs, outputs); + _publisher->publish(msg); + _msg_tag++; + } + + /** * @brief Return the type of this broker * @return The type of the broker */ - std::string type() override { return "rabbitmq"; } + std::string type() override { return "rabbitmq"; } /** * @brief Return the DB enumerationt type (File, Redis etc) */ AMSDBType dbType() { return AMSDBType::RMQ; }; - ~RabbitMQDB() - { - _publisher->stop(); - _consumer->stop(); - _publisher_thread.join(); - _consumer_thread.join(); - } - }; // class RabbitMQDB + ~RabbitMQDB() + { + _publisher->stop(); + _consumer->stop(); + _publisher_thread.join(); + _consumer_thread.join(); + } +}; // class RabbitMQDB #endif // __ENABLE_RMQ__ - /** +/** * @brief Create an object of the respective database. * This should never be used for large scale simulations as txt/csv format will * be extremely slow. @@ -1932,43 +1935,43 @@ class RMQConsumer * @param[in] rId a unique Id for each process taking part in a distributed * execution (rank-id) */ - template - BaseDB* createDB(char* dbPath, AMSDBType dbType, uint64_t rId = 0) - { - DBG(DB, "Instantiating data base"); +template +BaseDB* createDB(char* dbPath, AMSDBType dbType, uint64_t rId = 0) +{ + DBG(DB, "Instantiating data base"); #ifdef __ENABLE_DB__ - if (dbPath == nullptr) { - std::cerr << " [WARNING] Path of DB is NULL, Please provide a valid path " - "to enable db\n"; - std::cerr << " [WARNING] Continueing\n"; - return nullptr; - } + if (dbPath == nullptr) { + std::cerr << " [WARNING] Path of DB is NULL, Please provide a valid path " + "to enable db\n"; + std::cerr << " [WARNING] Continueing\n"; + return nullptr; + } - switch (dbType) { - case AMSDBType::CSV: - return new csvDB(dbPath, rId); + switch (dbType) { + case AMSDBType::CSV: + return new csvDB(dbPath, rId); #ifdef __ENABLE_REDIS__ - case AMSDBType::REDIS: - return new RedisDB(dbPath, rId); + case AMSDBType::REDIS: + return new RedisDB(dbPath, rId); #endif #ifdef __ENABLE_HDF5__ - case AMSDBType::HDF5: - return new hdf5DB(dbPath, rId); + case AMSDBType::HDF5: + return new hdf5DB(dbPath, rId); #endif #ifdef __ENABLE_RMQ__ - case AMSDBType::RMQ: - return new RabbitMQDB(dbPath, rId); + case AMSDBType::RMQ: + return new RabbitMQDB(dbPath, rId); #endif - default: - return nullptr; - } + default: + return nullptr; + } #else return nullptr; #endif - } +} - /** +/** * @brief get a data base object referred by this string. * This should never be used for large scale simulations as txt/csv format will * be extremely slow. @@ -1977,47 +1980,46 @@ class RMQConsumer * @param[in] rId a unique Id for each process taking part in a distributed * execution (rank-id) */ - template - std::shared_ptr> getDB(char* dbPath, - AMSDBType dbType, - uint64_t rId = 0) - { - static std::unordered_map>> - instances; - if (dbPath == nullptr) { - std::cerr << " [WARNING] Path of DB is NULL, Please provide a valid path " - "to enable db\n"; - std::cerr << " [WARNING] Continueing\n"; - return nullptr; - } - - auto db_iter = instances.find(std::string(dbPath)); - if (db_iter == instances.end()) { - DBG(DB, "Creating new Database writting to file: %s", dbPath); - std::shared_ptr> db = - std::shared_ptr>( - createDB(dbPath, dbType, rId)); - instances.insert(std::make_pair(std::string(dbPath), db)); - return db; - } +template +std::shared_ptr> getDB(char* dbPath, + AMSDBType dbType, + uint64_t rId = 0) +{ + static std::unordered_map>> + instances; + if (dbPath == nullptr) { + std::cerr << " [WARNING] Path of DB is NULL, Please provide a valid path " + "to enable db\n"; + std::cerr << " [WARNING] Continueing\n"; + return nullptr; + } - auto db = db_iter->second; - // Corner case where creation of the db failed and someone is requesting - // the same entry point - if (db == nullptr) { - return db; - } + auto db_iter = instances.find(std::string(dbPath)); + if (db_iter == instances.end()) { + DBG(DB, "Creating new Database writting to file: %s", dbPath); + std::shared_ptr> db = std::shared_ptr>( + createDB(dbPath, dbType, rId)); + instances.insert(std::make_pair(std::string(dbPath), db)); + return db; + } - if (db->dbType() != dbType) { - throw std::runtime_error("Requesting databases of different types"); - } + auto db = db_iter->second; + // Corner case where creation of the db failed and someone is requesting + // the same entry point + if (db == nullptr) { + return db; + } - if (db->getId() != rId) { - throw std::runtime_error("Requesting databases from different ranks"); - } - DBG(DB, "Using existing Database writting to file: %s", dbPath); + if (db->dbType() != dbType) { + throw std::runtime_error("Requesting databases of different types"); + } - return db; + if (db->getId() != rId) { + throw std::runtime_error("Requesting databases from different ranks"); } + DBG(DB, "Using existing Database writting to file: %s", dbPath); + + return db; +} #endif // __AMS_BASE_DB__ From a6f1fdb6c7171fd5323a6c8e02e1dea7bbc4d4c8 Mon Sep 17 00:00:00 2001 From: koparasy Date: Wed, 22 Nov 2023 10:43:03 -0800 Subject: [PATCH 2/5] Check device status for errors --- src/AMSlib/ml/hdcache.hpp | 8 ++++- src/AMSlib/ml/surrogate.hpp | 14 +++++++++ src/AMSlib/wf/cuda/utilities.cuh | 7 +++++ src/AMSlib/wf/debug.h | 52 ++++++++++++++++---------------- src/AMSlib/wf/device.hpp | 8 +++++ 5 files changed, 62 insertions(+), 27 deletions(-) diff --git a/src/AMSlib/ml/hdcache.hpp b/src/AMSlib/ml/hdcache.hpp index 05355ccd..2b89fdf2 100644 --- a/src/AMSlib/ml/hdcache.hpp +++ b/src/AMSlib/ml/hdcache.hpp @@ -415,6 +415,11 @@ class HDCache } else { _evaluate(ndata, data, is_acceptable); } + + if (cache_location == AMSResourceType::DEVICE) { + deviceCheckErrors(__FILE__, __LINE__); + } + DBG(UQModule, "Done with evalution of uq") } @@ -621,7 +626,8 @@ class HDCache #ifdef __ENABLE_CUDA__ random_uq_device<<<1, 1>>>(is_acceptable, ndata, acceptable_error); #else - THROW(std::runtime_error, "Random-uq is not configured to use device allocations"); + THROW(std::runtime_error, + "Random-uq is not configured to use device allocations"); #endif } else { random_uq_host(is_acceptable, ndata, acceptable_error); diff --git a/src/AMSlib/ml/surrogate.hpp b/src/AMSlib/ml/surrogate.hpp index 72b262b6..1514dd4f 100644 --- a/src/AMSlib/ml/surrogate.hpp +++ b/src/AMSlib/ml/surrogate.hpp @@ -203,6 +203,10 @@ class SurrogateModel tensorToArray(output, num_elements, num_out, outputs); } + if (is_device()) { + deviceCheckErrors(__FILE__, __LINE__); + } + DBG(Surrogate, "Evaluate surrogate model (%ld, %ld) -> (%ld, %ld)", num_elements, @@ -357,7 +361,17 @@ class SurrogateModel if (typeid(TypeInValue) == typeid(double)) return true; return false; } + +#endif + + inline bool is_device() const + { +#ifdef __ENABLE_TORCH__ + return model_resource == AMSResourceType::DEVICE; +#else + return false; #endif + } bool is_DeltaUQ() { return _is_DeltaUQ; } }; diff --git a/src/AMSlib/wf/cuda/utilities.cuh b/src/AMSlib/wf/cuda/utilities.cuh index d8e64577..6c944f32 100644 --- a/src/AMSlib/wf/cuda/utilities.cuh +++ b/src/AMSlib/wf/cuda/utilities.cuh @@ -335,6 +335,8 @@ int compact(bool cond, dims, d_BlocksOffset, isReverse); + cudaDeviceSynchronize(); + CUDACHECKERROR(); ams::ResourceManager::copy(d_BlocksCount, h_BlocksCount); ams::ResourceManager::copy(d_BlocksOffset, h_BlocksOffset); @@ -353,6 +355,7 @@ int compact(bool cond, ams::ResourceManager::deregisterExternal(dense); ams::ResourceManager::deregisterExternal(sparse); cudaDeviceSynchronize(); + CUDACHECKERROR(); return compact_length; } @@ -395,6 +398,7 @@ int compact(bool cond, assignK<<>>( sparse, dense, indices, sparseElements, dims, isReverse); cudaDeviceSynchronize(); + CUDACHECKERROR(); return sparseElements; } @@ -418,6 +422,7 @@ void device_linearize(TypeOutValue* output, linearizeK<<>>(output, inputs, dims, elements); cudaDeviceSynchronize(); + CUDACHECKERROR(); } template @@ -440,6 +445,7 @@ void cuda_rand_init(bool* predicate, const size_t length, T threshold) length); fillRandom<<>>(predicate, TS, dev_random, length, threshold); cudaDeviceSynchronize(); + CUDACHECKERROR(); } void device_compute_predicate(float* data, @@ -456,6 +462,7 @@ void device_compute_predicate(float* data, threshold); compute_predicate<<>>(data, predicate, nData, kneigh, threshold); cudaDeviceSynchronize(); + CUDACHECKERROR(); } #endif diff --git a/src/AMSlib/wf/debug.h b/src/AMSlib/wf/debug.h index 000db8b4..52d04739 100644 --- a/src/AMSlib/wf/debug.h +++ b/src/AMSlib/wf/debug.h @@ -72,10 +72,10 @@ inline uint32_t getVerbosityLevel() while (0) \ ; -#define CFATAL(id, condition, ...) \ - AMSPRINT(id, condition, AMSVerbosity::AMSFATAL, RED, __VA_ARGS__) \ - if (condition) { \ - abort(); \ +#define CFATAL(id, condition, ...) \ + if (condition) { \ + AMSPRINT(id, condition, AMSVerbosity::AMSFATAL, RED, __VA_ARGS__) \ + abort(); \ } #define FATAL(id, ...) CFATAL(id, true, __VA_ARGS__) @@ -97,28 +97,28 @@ inline uint32_t getVerbosityLevel() #define DBG(id, ...) CDEBUG(id, true, __VA_ARGS__) -#define REPORT_MEM_USAGE(id, phase) \ - do { \ - double vm, rs; \ - size_t watermark, current_size, actual_size; \ - memUsage(vm, rs); \ - DBG(id, "Memory usage at %s is VM:%g RS:%g\n", phase, \ - vm, \ - rs); \ - \ - for (int i = 0; i < AMSResourceType::RSEND; i++){ \ - if ( ams::ResourceManager::isActive((AMSResourceType) i) ){ \ - ams::ResourceManager::getAllocatorStats((AMSResourceType) i, \ - watermark, \ - current_size, \ - actual_size); \ - DBG(id, "Allocator: %s HWM:%lu CS:%lu AS:%lu) ", \ - ams::ResourceManager::getAllocatorName((AMSResourceType) i).c_str(),\ - watermark, \ - current_size, \ - actual_size); \ - } \ - } \ +#define REPORT_MEM_USAGE(id, phase) \ + do { \ + double vm, rs; \ + size_t watermark, current_size, actual_size; \ + memUsage(vm, rs); \ + DBG(id, "Memory usage at %s is VM:%g RS:%g\n", phase, vm, rs); \ + \ + for (int i = 0; i < AMSResourceType::RSEND; i++) { \ + if (ams::ResourceManager::isActive((AMSResourceType)i)) { \ + ams::ResourceManager::getAllocatorStats((AMSResourceType)i, \ + watermark, \ + current_size, \ + actual_size); \ + DBG(id, \ + "Allocator: %s HWM:%lu CS:%lu AS:%lu) ", \ + ams::ResourceManager::getAllocatorName((AMSResourceType)i) \ + .c_str(), \ + watermark, \ + current_size, \ + actual_size); \ + } \ + } \ } while (0); #define THROW(exception, msg) \ diff --git a/src/AMSlib/wf/device.hpp b/src/AMSlib/wf/device.hpp index c2262fea..17ee08a0 100644 --- a/src/AMSlib/wf/device.hpp +++ b/src/AMSlib/wf/device.hpp @@ -144,6 +144,14 @@ void rand_init(bool *predicate, const size_t n, TypeValue threshold) } // namespace Device } // namespace ams +void deviceCheckErrors(const char *file, const int line) +{ +#ifdef __ENABLE_CUDA__ + __cudaCheckError(file, line); +#endif + return; +} + #ifdef __ENABLE_CUDA__ From 68c36448a6b4139cf806e5e9da06a5a8153f38e3 Mon Sep 17 00:00:00 2001 From: koparasy Date: Thu, 23 Nov 2023 11:49:35 -0800 Subject: [PATCH 3/5] Fix header sizes and unpacking on stager --- src/AMSWorkflow/ams/stage.py | 28 +++++++++++++++++++++------- src/AMSWorkflow/ams_wf/AMSDBStage.py | 6 ++++-- 2 files changed, 25 insertions(+), 9 deletions(-) diff --git a/src/AMSWorkflow/ams/stage.py b/src/AMSWorkflow/ams/stage.py index 56ca8ed7..bd12a628 100644 --- a/src/AMSWorkflow/ams/stage.py +++ b/src/AMSWorkflow/ams/stage.py @@ -220,6 +220,7 @@ def header_format(self) -> str: - 4 bytes are the number of elements in the message. Limit max: 2^32 - 1 - 2 bytes are the input dimension. Limit max: 65535 - 2 bytes are the output dimension. Limit max: 65535 + - 4 bytes are for aligning memory to 8 |__Header_size__|__Datatype__|__Rank__|__#elem__|__InDim__|__OutDim__|...real data...| @@ -229,7 +230,7 @@ def header_format(self) -> str: |__Header_(12B)__|__Input 1__|__Output 1__|...|__Input_K__|__Output_K__| """ - return "BBHIHH" + return "BBHIHHI" def endianness(self) -> str: """ @@ -272,6 +273,7 @@ def _parse_header(self, body: str) -> dict: res["num_element"], res["input_dim"], res["output_dim"], + res["padding"], ) = struct.unpack(fmt, body[:hsize]) assert hsize == res["hsize"] assert res["datatype"] in [4, 8] @@ -314,9 +316,9 @@ def _decode(self, body: str) -> Tuple[np.array]: while body: header_info = self._parse_header(body) temp_input, temp_output = self._parse_data(body, header_info) + print(f"input shape {temp_input.shape} outpute shape {temp_output.shape}") # total size of byte we read for that message chunk_size = header_info["hsize"] + header_info["dsize"] - print(f"Processing message #{i}") input.append(temp_input) output.append(temp_output) # We remove the current message and keep going @@ -442,6 +444,7 @@ def __call__(self): fn = get_unique_fn() fn = f"{self.out_dir}/{fn}.{self.suffix}" is_terminate = False + total_bytes_written = 0 with self.data_writer_cls(fn) as fd: bytes_written = 0 while True: @@ -454,6 +457,8 @@ def __call__(self): bytes_written += data.inputs.size * data.inputs.itemsize bytes_written += data.outputs.size * data.outputs.itemsize fd.store(data.inputs, data.outputs) + total_bytes_written += data.inputs.size * data.inputs.itemsize + total_bytes_written += data.outputs.size * data.outputs.itemsize # FIXME: We currently decide to chunk files to 2GB # of contents. Is this a good size? if is_terminate or bytes_written >= 2 * 1024 * 1024 * 1024: @@ -465,7 +470,7 @@ def __call__(self): break end = time.time() - print(f"Spend {end - start} at {self.__class__.__name__}") + print(f"Spend {end - start} {total_bytes_written} at {self.__class__.__name__}") class PushToStore(Task): @@ -545,7 +550,6 @@ def __init__(self, db_dir, store, dest_dir=None, stage_dir=None, db_type="hdf5") initializes the Pipeline class to write the final data in the 'dest_dir' using a file writer of type 'db_type' and optionally caching the data in the 'stage_dir' before making them available in the cache store. """ - print("DATABASE DIR IS: ", db_dir) self.ams_config = AMSInstance.from_path(db_dir) if dest_dir is not None: @@ -782,12 +786,12 @@ class RMQPipeline(Pipeline): rmq_queue: The RMQ queue to listen to. """ - def __init__(self, store, dest_dir, stage_dir, db_type, credentials, cacert, rmq_queue): + def __init__(self, db_dir, store, dest_dir, stage_dir, db_type, credentials, cacert, rmq_queue): """ Initialize a RMQPipeline that will write data to the 'dest_dir' and optionally publish these files to the kosh-store 'store' by using the stage_dir as an intermediate directory. """ - super().__init__(store, dest_dir, stage_dir, db_type) + super().__init__(db_dir, store, dest_dir, stage_dir, db_type) self._credentials = Path(credentials) self._cacert = Path(cacert) self._rmq_queue = rmq_queue @@ -819,7 +823,17 @@ def from_cli(cls, args): """ Create RMQPipeline from the user provided CLI. """ - return cls(args.store, args.dest_dir, args.stage_dir, args.db_type, args.creds, args.cert, args.queue) + print("Creating database from here", args.persistent_db_path) + return cls( + args.persistent_db_path, + args.store, + args.dest_dir, + args.stage_dir, + args.db_type, + args.creds, + args.cert, + args.queue, + ) def get_pipeline(src_mechanism="fs"): diff --git a/src/AMSWorkflow/ams_wf/AMSDBStage.py b/src/AMSWorkflow/ams_wf/AMSDBStage.py index 6b53de75..deef62e3 100644 --- a/src/AMSWorkflow/ams_wf/AMSDBStage.py +++ b/src/AMSWorkflow/ams_wf/AMSDBStage.py @@ -22,7 +22,7 @@ def main(): "--load", "-l", dest="user_module", help="Path implementing a custom pipeline stage module", default=None ) parser.add_argument( - "--class", "-cls", dest="user_class", help="Class implementing the 'Action' performed on data", default=None + "--class", dest="user_class", help="Class implementing the 'Action' performed on data", default=None ) parser.add_argument( "--policy", @@ -42,6 +42,7 @@ def main(): user_class = None user_args = None user_prog = "" + print(f"User class is {args.user_module} {args.user_class}") if args.user_module is not None: user_class = load_class(args.user_module, args.user_class) @@ -55,6 +56,7 @@ def main(): user_args, extras = user_parser.parse_known_args(extras) pipeline_cls = get_pipeline(args.mechanism) + print(pipeline_cls) pipeline_parser = argparse.ArgumentParser( prog=pipeline_cls.__name__, description="Pipeline mechanism to load data from specified end-point", @@ -81,5 +83,5 @@ def main(): if __name__ == "__main__": - print("{0}".format(" ".join(sys.argv))) + main() main() From 956a97435ee1ec7cc3c863fac5c390aa8efa7cb3 Mon Sep 17 00:00:00 2001 From: koparasy Date: Mon, 27 Nov 2023 07:57:11 -0800 Subject: [PATCH 4/5] Synchronize RMQProducer thread with main thread using future/promises. Clean up buffers --- src/AMSlib/wf/basedb.hpp | 337 +++++++++++++++++++++++++----------- tests/AMSlib/CMakeLists.txt | 2 +- 2 files changed, 241 insertions(+), 98 deletions(-) diff --git a/src/AMSlib/wf/basedb.hpp b/src/AMSlib/wf/basedb.hpp index 00db8671..7f72107a 100644 --- a/src/AMSlib/wf/basedb.hpp +++ b/src/AMSlib/wf/basedb.hpp @@ -8,16 +8,21 @@ #ifndef __AMS_BASE_DB__ #define __AMS_BASE_DB__ + +#include #include #include #include #include +#include #include #include #include #include #include "AMS.h" +#include "debug.h" +#include "resource_manager.hpp" #include "wf/debug.h" #include "wf/device.hpp" #include "wf/resource_manager.hpp" @@ -51,6 +56,7 @@ using namespace sw::redis; #include #include #include +#include #include #include #include @@ -61,6 +67,7 @@ using namespace sw::redis; #include #include #include +#include #include #include #include @@ -706,7 +713,6 @@ class RedisDB : public BaseDB * * |__Header_(12B)__|__Input 1__|__Output 1__|...|__Input_K__|__Output_K__| */ -template struct AMSMsgHeader { /** @brief Heaader size (bytes) */ uint8_t hsize; @@ -728,9 +734,13 @@ struct AMSMsgHeader { * @param[in] in_dim Inputs dimension * @param[in] out_dim Outputs dimension */ - AMSMsgHeader(size_t mpi_rank, size_t num_elem, size_t in_dim, size_t out_dim) + AMSMsgHeader(size_t mpi_rank, + size_t num_elem, + size_t in_dim, + size_t out_dim, + size_t type_size) : hsize(static_cast(AMSMsgHeader::size())), - dtype(static_cast(sizeof(TypeValue))), + dtype(static_cast(type_size)), mpi_rank(static_cast(mpi_rank)), num_elem(static_cast(num_elem)), in_dim(static_cast(in_dim)), @@ -742,7 +752,7 @@ struct AMSMsgHeader { * @brief Return the size of a header in the AMS protocol. * @return The size of a message header in AMS (in byte) */ - static size_t size() + static size_t constexpr size() { return ((sizeof(hsize) + sizeof(dtype) + sizeof(mpi_rank) + sizeof(num_elem) + sizeof(in_dim) + sizeof(out_dim) + @@ -780,12 +790,6 @@ struct AMSMsgHeader { std::memcpy(data_blob + current_offset, &(out_dim), sizeof(out_dim)); current_offset += sizeof(out_dim); - CFATAL(RMQHeader, - current_offset > AMSMsgHeader::size(), - "Offset is %d but header size is %d", - current_offset, - AMSMsgHeader::size()); - return AMSMsgHeader::size(); } }; @@ -794,7 +798,6 @@ struct AMSMsgHeader { /** * @brief Class representing a message for the AMSLib */ -template class AMSMessage { private: @@ -820,6 +823,7 @@ class AMSMessage * @param[in] inputs Inputs * @param[in] outputs Outputs */ + template AMSMessage(int id, size_t num_elements, const std::vector& inputs, @@ -834,20 +838,19 @@ class AMSMessage #ifdef __ENABLE_MPI__ MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &_rank)); #endif - _total_size = AMSMsgHeader::size() + getDataSize(); + AMSMsgHeader header( + _rank, _num_elements, _input_dim, _output_dim, sizeof(TypeValue)); + + _total_size = AMSMsgHeader::size() + getTotalElements() * sizeof(TypeValue); _data = ams::ResourceManager::allocate(_total_size, AMSResourceType::HOST); - AMSMsgHeader header(_rank, - _num_elements, - _input_dim, - _output_dim); size_t current_offset = header.encode(_data); - DBG(RMQPublisherHandler, "Current Offset is %d", current_offset); current_offset += encode_data(reinterpret_cast(_data + current_offset), inputs, outputs); + DBG(AMSMessage, "Allocated message: %p", _data); } AMSMessage(const AMSMessage&) = delete; @@ -857,6 +860,7 @@ class AMSMessage AMSMessage& operator=(AMSMessage&& other) noexcept { + DBG(AMSMessage, "Move AMSMessage : %p -- %d", other._data, other._id); if (this != &other) { _id = other._id; _num_elements = other._num_elements; @@ -877,14 +881,15 @@ class AMSMessage * @param[in] outputs Outputs * @return The number of bytes in the message or 0 if error */ + template size_t encode_data(TypeValue* data_blob, const std::vector& inputs, const std::vector& outputs) { + size_t offset = 0; size_t x_dim = _input_dim + _output_dim; if (!data_blob) return 0; // Creating the body part of the messages - // TODO: slow method (one copy per element!), improve by reducing number of copies for (size_t i = 0; i < _num_elements; i++) { for (size_t j = 0; j < _input_dim; j++) { data_blob[i * x_dim + j] = inputs[j][i]; @@ -901,12 +906,12 @@ class AMSMessage } /** - * @brief Return the size of the data portion for that message + * @brief Return the total number of elements in this message * @return Size in bytes of the data portion */ - size_t getDataSize() + size_t getTotalElements() const { - return (_num_elements * (_input_dim + _output_dim)) * sizeof(TypeValue); + return (_num_elements * (_input_dim + _output_dim)); } /** @@ -921,6 +926,8 @@ class AMSMessage */ int id() const { return _id; } + int rank() const { return _rank; } + /** * @brief Return the size in bytes of the underlying binary blob * @return Byte size of data pointer @@ -929,8 +936,7 @@ class AMSMessage ~AMSMessage() { - if (_data) - ams::ResourceManager::deallocate(_data, AMSResourceType::HOST); + DBG(AMSMessage, "Destroying message with address %p %d", _data, _id) } }; // class AMSMessage @@ -949,7 +955,6 @@ typedef std::tuple /** * @brief Specific handler for RabbitMQ connections based on libevent. */ -template class RMQConsumerHandler : public AMQP::LibEventHandler { private: @@ -1108,7 +1113,8 @@ class RMQConsumerHandler : public AMQP::LibEventHandler _channel->ack(deliveryTag); std::string msg(message.body(), message.bodySize()); DBG(RMQConsumerHandler, - "message received [tag=%d] : '%s' of size %d B from '%s'/'%s'", + "message received [tag=%d] : '%s' of size %d B from " + "'%s'/'%s'", deliveryTag, msg.c_str(), message.bodySize(), @@ -1141,7 +1147,8 @@ class RMQConsumerHandler : public AMQP::LibEventHandler .onError([&](const char* message) { CFATAL(RMQConsumerHandler, false, - "[ERROR][rank=%d] Error while creating broker queue (%s): %s", + "[ERROR][rank=%d] Error while creating broker queue (%s): " + "%s", _rank, _queue.c_str(), message) @@ -1194,7 +1201,6 @@ class RMQConsumerHandler : public AMQP::LibEventHandler * @brief Class that manages a RabbitMQ broker and handles connection, event * loop and set up various handlers. */ -template class RMQConsumer { private: @@ -1209,7 +1215,7 @@ class RMQConsumer /** @brief The event loop for sender (usually the default one in libevent) */ std::shared_ptr _loop; /** @brief The handler which contains various callbacks for the sender */ - std::shared_ptr> _handler; + std::shared_ptr _handler; /** @brief Queue that contains all the messages received on receiver queue (messages can be popped in) */ std::vector _messages; @@ -1255,8 +1261,7 @@ class RMQConsumer [](struct event_base* event) { event_base_free(event); }); - _handler = - std::make_shared>(_loop, _cacert, _queue); + _handler = std::make_shared(_loop, _cacert, _queue); _connection = new AMQP::TcpConnection(_handler.get(), address); } @@ -1313,10 +1318,10 @@ class RMQConsumer /** * @brief Specific handler for RabbitMQ connections based on libevent. */ -template class RMQPublisherHandler : public AMQP::LibEventHandler { private: + enum ConnectionStatus { FAILED, CONNECTED, CLOSED }; /** @brief Path to TLS certificate */ std::string _cacert; /** @brief The MPI rank (0 if MPI is not used) */ @@ -1334,7 +1339,16 @@ class RMQPublisherHandler : public AMQP::LibEventHandler /** @brief Number of messages successfully acknowledged */ int _nb_msg_ack; + std::promise establish_connection; + std::future established; + + std::promise close_connection; + std::future closed; + public: + std::mutex ptr_mutex; + std::vector data_ptrs; + /** * @brief Constructor * @param[in] loop Event Loop @@ -1357,6 +1371,8 @@ class RMQPublisherHandler : public AMQP::LibEventHandler #ifdef __ENABLE_MPI__ MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &_rank)); #endif + established = establish_connection.get_future(); + closed = close_connection.get_future(); } /** @@ -1364,43 +1380,75 @@ class RMQPublisherHandler : public AMQP::LibEventHandler * @param[in] data The data pointer * @param[in] data_size The number of bytes in the data pointer */ - void publish(const AMSMessage& msg) + void publish(AMSMessage&& msg) { if (_rchannel) { // publish a message via the reliable-channel _rchannel ->publish("", _queue, reinterpret_cast(msg.data()), msg.size()) - .onAck([&]() { + .onAck([_msg_ptr = msg.data(), + &_nb_msg_ack = _nb_msg_ack, + rank = msg.rank(), + id = msg.id(), + &ptr_mutex = ptr_mutex, + &data_ptrs = this->data_ptrs]() mutable { + const std::lock_guard lock(ptr_mutex); DBG(RMQPublisherHandler, - "[rank=%d] message #%d got acknowledged successfully by RMQ " + "[rank=%d] message #%d (Addr:%p) got acknowledged successfully " + "by " + "RMQ " "server", - _rank, - _nb_msg) + rank, + id, + _msg_ptr) _nb_msg_ack++; + data_ptrs.push_back(_msg_ptr); }) - .onNack([&]() { + .onNack([_msg_ptr = msg.data(), + &_nb_msg_ack = _nb_msg_ack, + rank = msg.rank(), + id = msg.id(), + &ptr_mutex = ptr_mutex, + &data_ptrs = this->data_ptrs]() mutable { + const std::lock_guard lock(ptr_mutex); WARNING(RMQPublisherHandler, "[rank=%d] message #%d received negative acknowledged by " "RMQ " "server", - _rank, - _nb_msg) + rank, + id) + data_ptrs.push_back(_msg_ptr); }) - .onLost([&]() { + .onLost([_msg_ptr = msg.data(), + &_nb_msg_ack = _nb_msg_ack, + rank = msg.rank(), + id = msg.id(), + &ptr_mutex = ptr_mutex, + &data_ptrs = this->data_ptrs]() mutable { + const std::lock_guard lock(ptr_mutex); CFATAL(RMQPublisherHandler, false, "[rank=%d] message #%d likely got lost by RMQ server", - _rank, - _nb_msg) + rank, + id) + data_ptrs.push_back(_msg_ptr); }) - .onError([&](const char* err_message) { - CFATAL(RMQPublisherHandler, - false, - "[rank=%d] message #%d did not get send: %s", - _rank, - _nb_msg, - err_message) - }); + .onError( + [_msg_ptr = msg.data(), + &_nb_msg_ack = _nb_msg_ack, + rank = msg.rank(), + id = msg.id(), + &ptr_mutex = ptr_mutex, + &data_ptrs = this->data_ptrs](const char* err_message) mutable { + const std::lock_guard lock(ptr_mutex); + CFATAL(RMQPublisherHandler, + false, + "[rank=%d] message #%d did not get send: %s", + rank, + id, + err_message) + data_ptrs.push_back(_msg_ptr); + }); } else { WARNING(RMQPublisherHandler, "[rank=%d] The reliable channel was not ready for message #%d.", @@ -1410,8 +1458,84 @@ class RMQPublisherHandler : public AMQP::LibEventHandler _nb_msg++; } + bool waitToEstablish(unsigned ms, int repeat = 1) + { + if (waitFuture(established, ms, repeat)) { + auto status = established.get(); + DBG(RMQPublisherHandler, "Connection Status: %d", status); + return status == CONNECTED; + } + return false; + } + + bool waitToClose(unsigned ms, int repeat = 1) + { + if (waitFuture(closed, ms, repeat)) { + return closed.get() == CLOSED; + } + return false; + } + ~RMQPublisherHandler() = default; + void release_message_buffers() + { + const std::lock_guard lock(ptr_mutex); + for (auto& dp : data_ptrs) { + DBG(RMQPublisherHandler, "deallocate address %p", dp) + ams::ResourceManager::deallocate(dp, AMSResourceType::HOST); + } + data_ptrs.erase(data_ptrs.begin(), data_ptrs.end()); + } + + unsigned unacknowledged() const { return _rchannel->unacknowledged(); } + + void flush() + { + uint32_t tries = 0; + while (auto unAck = _rchannel->unacknowledged()) { + DBG(RMQPublisherHandler, + "Waiting for %lu messages to be acknowledged", + unAck); + + if (++tries > 10) break; + std::this_thread::sleep_for(std::chrono::milliseconds(50 * tries)); + } + } + + // void purge() + // { + // std::promise purge_queue; + // std::future purged; + // purged = purge_queue.get_future(); + // + // _channel->purgeQueue(_queue) + // .onSuccess([&](uint32_t messageCount) { + // DBG(RMQPublisherHandler, + // "Sucessfuly purged queue with (%u) remaining messages", + // messageCount); + // purge_queue.set_value(true); + // }) + // .onError([&](const char* message) { + // DBG(RMQPublisherHandler, + // "Error '%s' when purging queue %s", + // message, + // _queue.c_str()); + // purge_queue.set_value(false); + // }) + // .onFinalize([&]() { + // DBG(RMQPublisherHandler, "Finalizing queue %s", _queue.c_str()) + // }); + // + // if (purged.get()) { + // DBG(RMQPublisherHandler, "Successfull destruction of RMQ queue"); + // return; + // } + // + // DBG(RMQPublisherHandler, "Non-successfull destruction of RMQ queue"); + // } + + private: /** * @brief Method that is called after a TCP connection has been set up, and @@ -1441,7 +1565,8 @@ class RMQPublisherHandler : public AMQP::LibEventHandler error += std::string(ERR_reason_error_string(err)); } error += "]"; - throw std::runtime_error(error); + establish_connection.set_value(FAILED); + return false; } else { DBG(RMQPublisherHandler, "Success logged with ca-chain %s", @@ -1510,6 +1635,7 @@ class RMQPublisherHandler : public AMQP::LibEventHandler _queue.c_str()) _rchannel = std::make_shared>(*_channel.get()); + establish_connection.set_value(CONNECTED); }) .onError([&](const char* message) { CFATAL(RMQPublisherHandler, @@ -1519,6 +1645,7 @@ class RMQPublisherHandler : public AMQP::LibEventHandler _rank, _queue.c_str(), message) + establish_connection.set_value(FAILED); }); } @@ -1546,10 +1673,10 @@ class RMQPublisherHandler : public AMQP::LibEventHandler virtual void onError(AMQP::TcpConnection* connection, const char* message) override { - DBG(RMQPublisherHandler, - "[rank=%d] fatal error when establishing TCP connection: %s\n", - _rank, - message) + FATAL(RMQPublisherHandler, + "[rank=%d] fatal error on TCP connection: %s\n", + _rank, + message) } /** @@ -1561,6 +1688,20 @@ class RMQPublisherHandler : public AMQP::LibEventHandler { // add your own implementation, like cleanup resources or exit the application DBG(RMQPublisherHandler, "[rank=%d] Connection is detached.\n", _rank) + close_connection.set_value(CLOSED); + } + + bool waitFuture(std::future& future, + unsigned ms, + int repeat) + { + std::chrono::milliseconds span(ms); + int iters = 0; + std::future_status status; + while ((status = future.wait_for(span)) == std::future_status::timeout && + (iters++ < repeat)) + std::future established; + return status == std::future_status::ready; } }; // class RMQPublisherHandler @@ -1569,7 +1710,6 @@ class RMQPublisherHandler : public AMQP::LibEventHandler * @brief Class that manages a RabbitMQ broker and handles connection, event * loop and set up various handlers. */ -template class RMQPublisher { private: @@ -1584,7 +1724,7 @@ class RMQPublisher /** @brief The event loop for sender (usually the default one in libevent) */ std::shared_ptr _loop; /** @brief The handler which contains various callbacks for the sender */ - std::shared_ptr> _handler; + std::shared_ptr _handler; public: RMQPublisher(const RMQPublisher&) = delete; @@ -1629,9 +1769,7 @@ class RMQPublisher event_base_free(event); }); - _handler = std::make_shared>(_loop, - _cacert, - _queue); + _handler = std::make_shared(_loop, _cacert, _queue); _connection = new AMQP::TcpConnection(_handler.get(), address); } @@ -1645,45 +1783,37 @@ class RMQPublisher * @brief Wait that the connection is ready (blocking call) * @return True if the publisher is ready to publish */ - void wait_ready(int ms = 500, int timeout_sec = 30) + bool waitToEstablish(unsigned ms, int repeat = 1) { - // We wait for the connection to be ready - int total_time = 0; - while (!ready_publish()) { - std::this_thread::sleep_for(std::chrono::milliseconds(ms)); - DBG(RMQPublisher, - "[rank=%d] Waiting for connection to be ready...", - _rank) - total_time += ms; - if (total_time > timeout_sec * 1000) { - DBG(RMQPublisher, "[rank=%d] Connection timeout", _rank) - break; - // TODO: if connection is not working -> revert to classic file DB. - } - } + return _handler->waitToEstablish(ms, repeat); } + unsigned unacknowledged() const { return _handler->unacknowledged(); } + + /** * @brief Start the underlying I/O loop (blocking call) */ - void start() - { - event_base_dispatch(_loop.get()); - // We wait for the connection to be ready - wait_ready(); - } + void start() { event_base_dispatch(_loop.get()); } /** * @brief Stop the underlying I/O loop */ void stop() { event_base_loopexit(_loop.get(), NULL); } - void publish(const AMSMessage& message) + void release_messages() { _handler->release_message_buffers(); } + + void publish(AMSMessage&& message) { _handler->publish(std::move(message)); } + + bool close(unsigned ms, int repeat = 1) { - _handler->publish(message); + _handler->flush(); + _connection->close(false); + return _handler->waitToClose(ms, repeat); } - ~RMQPublisher() { delete _connection; } + ~RMQPublisher() {} + }; // class RMQPublisher /** @@ -1757,11 +1887,11 @@ class RabbitMQDB final : public BaseDB /** @brief Represent the ID of the last message sent */ int _msg_tag; /** @brief Publisher sending messages to RMQ server */ - std::shared_ptr> _publisher; + std::shared_ptr _publisher; /** @brief Thread in charge of the publisher */ std::thread _publisher_thread; /** @brief Consumer listening to RMQ and consuming messages */ - std::shared_ptr> _consumer; + std::shared_ptr _consumer; /** @brief Thread in charge of the consumer */ std::thread _consumer_thread; @@ -1865,15 +1995,21 @@ class RabbitMQDB final : public BaseDB is_secure); std::string cacert = rmq_config["rabbitmq-cert"]; - _publisher = std::make_shared>(address, - cacert, - _queue_sender); - _consumer = std::make_shared>(address, - cacert, - _queue_receiver); + _publisher = std::make_shared(address, cacert, _queue_sender); _publisher_thread = std::thread([&]() { _publisher->start(); }); - _consumer_thread = std::thread([&]() { _consumer->start(); }); + + bool status = _publisher->waitToEstablish(100, 10); + if (!status) { + _publisher->stop(); + _publisher_thread.join(); + FATAL(RabbitMQDB, "Could not establish connection"); + } + + //_consumer_thread = std::thread([&]() { _consumer->start(); }); + //_consumer = std::make_shared>(address, + // cacert, + // _queue_receiver); } /** @@ -1898,8 +2034,8 @@ class RabbitMQDB final : public BaseDB inputs.size(), outputs.size()) - auto msg = AMSMessage(_msg_tag, num_elements, inputs, outputs); - _publisher->publish(msg); + _publisher->release_messages(); + _publisher->publish(AMSMessage(_msg_tag, num_elements, inputs, outputs)); _msg_tag++; } @@ -1916,10 +2052,17 @@ class RabbitMQDB final : public BaseDB ~RabbitMQDB() { + + bool status = _publisher->close(100, 10); + CWARNING(RabbitMQDB, !status, "Could not gracefully close TCP connection") + DBG(RabbitMQDB, + "Number of unacknowledged messages are %d", + _publisher->unacknowledged()) _publisher->stop(); - _consumer->stop(); + //_publisher->release_messages(); + //_consumer->stop(); _publisher_thread.join(); - _consumer_thread.join(); + //_consumer_thread.join(); } }; // class RabbitMQDB diff --git a/tests/AMSlib/CMakeLists.txt b/tests/AMSlib/CMakeLists.txt index bae8ef6d..83318b1f 100644 --- a/tests/AMSlib/CMakeLists.txt +++ b/tests/AMSlib/CMakeLists.txt @@ -7,7 +7,7 @@ function (BUILD_TEST exe source) add_executable(${exe} ${source}) target_include_directories(${exe} PRIVATE "${PROJECT_SOURCE_DIR}/src/AMSlib/" umpire ${caliper_INCLUDE_DIR} ${MPI_INCLUDE_PATH}) target_link_directories(${exe} PRIVATE ${AMS_APP_LIB_DIRS}) - target_link_libraries(${exe} PRIVATE AMS umpire MPI::MPI_CXX) + target_link_libraries(${exe} PRIVATE AMS ${AMS_APP_LIBRARIES}) target_compile_definitions(${exe} PRIVATE ${AMS_APP_DEFINES}) if (WITH_CUDA) From ce38f931788e759e311f8dc772f10633e2934e9f Mon Sep 17 00:00:00 2001 From: Konstantinos Parasyris Date: Thu, 7 Dec 2023 09:58:26 -0800 Subject: [PATCH 5/5] Delete two main invocations --- src/AMSWorkflow/ams_wf/AMSDBStage.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/AMSWorkflow/ams_wf/AMSDBStage.py b/src/AMSWorkflow/ams_wf/AMSDBStage.py index deef62e3..9657d37c 100644 --- a/src/AMSWorkflow/ams_wf/AMSDBStage.py +++ b/src/AMSWorkflow/ams_wf/AMSDBStage.py @@ -84,4 +84,3 @@ def main(): if __name__ == "__main__": main() - main()