Skip to content

Commit

Permalink
Make the resource limits consistent after changing states (#140)
Browse files Browse the repository at this point in the history
* Refactor file rotation and size tracking

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Fix -Wall warnings

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Apply the resource-limits independently of state transitions

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Generalize the FileTracker and the OutputSettings

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Format bytes before printing them

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Apply self-suggestions

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Avoid openning files when there is not enough space to close them empty

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Refactor the exceptions thrown and caught

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Fix double-t misspelling of writing

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Improve readability of add_schema

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Minor irrelevant fixes

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Retry to write data after a FullFileException

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Throw InconsistencyException instead of calling tsnh

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Minor fixes to pass the tests

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Fix the McapWriter's DllAPI tags for Windows

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Close MCAP's writer before renaming the file

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Save the recording version in the VERSION_METADATA_RELEASE tag

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Rename Message to McapMessage

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Apply suggestions

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Uncrustify

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Apply suggestions

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Apply more suggestions

Signed-off-by: tempate <danieldiaz@eprosima.com>

* Apply suggestions

Signed-off-by: tempate <danieldiaz@eprosima.com>

---------

Signed-off-by: tempate <danieldiaz@eprosima.com>
  • Loading branch information
Tempate authored Jun 17, 2024
1 parent 59df593 commit f45dfb1
Show file tree
Hide file tree
Showing 36 changed files with 2,072 additions and 1,314 deletions.
20 changes: 18 additions & 2 deletions ddsrecorder/src/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <cpp_utils/utils.hpp>

#include <ddsrecorder_participants/recorder/logging/DdsRecorderLogConsumer.hpp>
#include <ddsrecorder_participants/recorder/output/FileTracker.hpp>
#include <ddsrecorder_yaml/recorder/CommandlineArgsRecorder.hpp>
#include <ddsrecorder_yaml/recorder/YamlReaderConfiguration.hpp>

Expand All @@ -51,6 +52,8 @@ using DdsRecorderState = eprosima::ddsrecorder::recorder::DdsRecorderStateCode;
using json = nlohmann::json;

const std::string NEXT_STATE_TAG = "next_state";
const std::string AVOID_OVERWRITING_OUTPUT_TAG = "avoid_overwriting_output";

constexpr auto string_to_command = eprosima::ddsrecorder::recorder::receiver::string_to_enumeration;
// constexpr auto string_to_state = eprosima::ddsrecorder::recorder::string_to_enumeration; // TODO: fix compilation error

Expand Down Expand Up @@ -293,6 +296,9 @@ int main(

logUser(DDSRECORDER_EXECUTION, "DDS Recorder running.");

// The file tracker must be stored outside of the loop since it is shared between instances
std::shared_ptr<eprosima::ddsrecorder::participants::FileTracker> file_tracker;

if (configuration.enable_remote_controller)
{
logUser(DDSRECORDER_EXECUTION, "Waiting for instructions...");
Expand Down Expand Up @@ -334,8 +340,16 @@ int main(
receiver.publish_status(CommandCode::stop, prev_command);
}

if (args != nullptr && args[AVOID_OVERWRITING_OUTPUT_TAG])
{
// Save the set of output files from being overwritten.
// WARNING: If set, the resource-limits won't be consistent after stopping the DDS Recorder.
file_tracker.reset();
}

prev_command = CommandCode::stop;
parse_command(receiver.wait_for_command(), command, args);

switch (command)
{
case CommandCode::start:
Expand Down Expand Up @@ -391,7 +405,8 @@ int main(
configuration = eprosima::ddsrecorder::yaml::RecorderConfiguration(commandline_args.file_path);

// Create DDS Recorder
auto recorder = std::make_unique<DdsRecorder>(configuration, initial_state, close_handler);
auto recorder = std::make_unique<DdsRecorder>(
configuration, initial_state, close_handler, file_tracker);

// Create File Watcher Handler
std::unique_ptr<eprosima::utils::event::FileWatcherHandler> file_watcher_handler;
Expand Down Expand Up @@ -524,7 +539,8 @@ int main(
else
{
// Start recording right away
auto recorder = std::make_unique<DdsRecorder>(configuration, DdsRecorderState::RUNNING, close_handler);
auto recorder = std::make_unique<DdsRecorder>(
configuration, DdsRecorderState::RUNNING, close_handler, file_tracker);

// Create File Watcher Handler
std::unique_ptr<eprosima::utils::event::FileWatcherHandler> file_watcher_handler;
Expand Down
60 changes: 33 additions & 27 deletions ddsrecorder/src/cpp/tool/DdsRecorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,18 @@ using namespace eprosima::utils;
DdsRecorder::DdsRecorder(
const yaml::RecorderConfiguration& configuration,
const DdsRecorderStateCode& init_state,
const std::string& file_name)
: DdsRecorder(configuration, init_state, nullptr, file_name)
std::shared_ptr<participants::FileTracker>& file_tracker,
const std::string& file_name /* = "" */)
: DdsRecorder(configuration, init_state, nullptr, file_tracker, file_name)
{
}

DdsRecorder::DdsRecorder(
const yaml::RecorderConfiguration& configuration,
const DdsRecorderStateCode& init_state,
std::shared_ptr<eprosima::utils::event::MultipleEventHandler> event_handler,
const std::string& file_name)
std::shared_ptr<participants::FileTracker>& file_tracker,
const std::string& file_name /* = "" */)
: configuration_(configuration)
, event_handler_(event_handler)
{
Expand All @@ -63,45 +65,43 @@ DdsRecorder::DdsRecorder(
thread_pool_ = std::make_shared<SlotThreadPool>(configuration_.n_threads);

// Fill MCAP output file settings
participants::McapOutputSettings mcap_output_settings;
participants::OutputSettings output_settings;

if (file_name == "")
{
mcap_output_settings.output_filename = configuration_.output_filename;
mcap_output_settings.output_filepath = configuration_.output_filepath;
mcap_output_settings.prepend_timestamp = true;
mcap_output_settings.output_timestamp_format = configuration_.output_timestamp_format;
mcap_output_settings.output_local_timestamp = configuration_.output_local_timestamp;
output_settings.filename = configuration_.output_filename;
output_settings.filepath = configuration_.output_filepath;
output_settings.prepend_timestamp = true;
output_settings.timestamp_format = configuration_.output_timestamp_format;
output_settings.local_timestamp = configuration_.output_local_timestamp;
}
else
{
mcap_output_settings.output_filename = file_name;
mcap_output_settings.output_filepath = ".";
mcap_output_settings.prepend_timestamp = false;
output_settings.filename = file_name;
output_settings.filepath = ".";
output_settings.prepend_timestamp = false;
}

mcap_output_settings.safety_margin = configuration_.safety_margin;
mcap_output_settings.file_rotation = configuration_.output_resource_limits_file_rotation;
mcap_output_settings.max_file_size = configuration_.output_resource_limits_max_file_size;
output_settings.extension = ".mcap";
output_settings.safety_margin = configuration_.safety_margin;
output_settings.file_rotation = configuration_.output_resource_limits_file_rotation;
output_settings.max_file_size = configuration_.output_resource_limits_max_file_size;

if (mcap_output_settings.max_file_size == 0)
if (output_settings.max_file_size == 0)
{
mcap_output_settings.max_file_size = std::filesystem::space(mcap_output_settings.output_filepath).available;
output_settings.max_file_size = std::filesystem::space(output_settings.filepath).available;
}

mcap_output_settings.max_size = configuration_.output_resource_limits_max_size;
output_settings.max_size = configuration_.output_resource_limits_max_size;

if (mcap_output_settings.max_size == 0)
if (output_settings.max_size == 0)
{
mcap_output_settings.max_size = mcap_output_settings.max_file_size;
output_settings.max_size = output_settings.max_file_size;
}

mcap_output_settings.max_files = ceil(
static_cast<double>(mcap_output_settings.max_size) / mcap_output_settings.max_file_size);

// Create MCAP Handler configuration
participants::McapHandlerConfiguration handler_config(
mcap_output_settings,
output_settings,
configuration_.max_pending_samples,
configuration_.buffer_size,
configuration_.event_window,
Expand All @@ -112,13 +112,19 @@ DdsRecorder::DdsRecorder(
configuration_.record_types,
configuration_.ros2_types);

if (file_tracker == nullptr)
{
// Create the File Tracker
file_tracker.reset(new participants::FileTracker(output_settings));
}

// Create MCAP Handler
mcap_handler_ = std::make_shared<participants::McapHandler>(
handler_config,
payload_pool_,
recorder_to_handler_state_(init_state));

mcap_handler_->set_on_disk_full_callback(std::bind(&DdsRecorder::on_disk_full, this));
file_tracker,
recorder_to_handler_state_(init_state),
std::bind(&DdsRecorder::on_disk_full, this));

// Create DynTypes Participant
dyn_participant_ = std::make_shared<DynTypesParticipant>(
Expand Down
5 changes: 5 additions & 0 deletions ddsrecorder/src/cpp/tool/DdsRecorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <ddsrecorder_participants/recorder/mcap/McapHandler.hpp>
#include <ddsrecorder_participants/recorder/mcap/McapHandlerConfiguration.hpp>
#include <ddsrecorder_participants/recorder/monitoring/DdsRecorderMonitor.hpp>
#include <ddsrecorder_participants/recorder/output/FileTracker.hpp>

#include <ddsrecorder_yaml/recorder/YamlReaderConfiguration.hpp>

Expand Down Expand Up @@ -64,11 +65,13 @@ class DdsRecorder
*
* @param configuration: Structure encapsulating all recorder configuration options.
* @param init_state: Initial instance state (RUNNING/PAUSED/SUSPENDED/STOPPED).
* @param file_tracker: Reference to file tracker used to manage mcap files.
* @param file_name: Name of the mcap file where data is recorded. If not provided, the one from configuration is used instead.
*/
DdsRecorder(
const yaml::RecorderConfiguration& configuration,
const DdsRecorderStateCode& init_state,
std::shared_ptr<participants::FileTracker>& file_tracker,
const std::string& file_name = "");

/**
Expand All @@ -79,12 +82,14 @@ class DdsRecorder
* @param configuration: Structure encapsulating all recorder configuration options.
* @param init_state: Initial instance state (RUNNING/PAUSED/SUSPENDED/STOPPED).
* @param event_handler: Reference to event handler used for thread synchronization in main application.
* @param file_tracker: Reference to file tracker used to manage mcap files.
* @param file_name: Name of the mcap file where data is recorded. If not provided, the one from configuration is used instead.
*/
DdsRecorder(
const yaml::RecorderConfiguration& configuration,
const DdsRecorderStateCode& init_state,
std::shared_ptr<eprosima::utils::event::MultipleEventHandler> event_handler,
std::shared_ptr<participants::FileTracker>& file_tracker,
const std::string& file_name = "");

/**
Expand Down
7 changes: 5 additions & 2 deletions ddsrecorder/test/blackbox/mcap/McapFileCreationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <cpp_utils/ros2_mangling.hpp>

#include <ddsrecorder_participants/recorder/output/FileTracker.hpp>
#include <ddsrecorder_yaml/recorder/YamlReaderConfiguration.hpp>
#include <ddsrecorder_yaml/recorder/yaml_configuration_tags.hpp>

Expand Down Expand Up @@ -109,11 +110,13 @@ std::unique_ptr<DdsRecorder> create_recorder(
configuration.simple_configuration->domain = domainId;
configuration.ros2_types = ros2_types;

std::shared_ptr<eprosima::ddsrecorder::participants::FileTracker> file_tracker;

return std::make_unique<DdsRecorder>(
configuration,
recorder_state,
file_name
);
file_tracker,
file_name);
}

void create_publisher(
Expand Down
22 changes: 14 additions & 8 deletions ddsrecorder/test/blackbox/mcap/McapLogErrorTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <cpp_utils/testing/LogChecker.hpp>

#include <ddsrecorder_participants/recorder/mcap/McapHandler.hpp>
#include <ddsrecorder_participants/recorder/output/FileTracker.hpp>

#include <mcap/errors.hpp>
#include <mcap/mcap.hpp>
Expand Down Expand Up @@ -42,17 +43,19 @@ TEST(McapLogErrorTests, fail_to_open_file) {
// Check no logs have been captured yet
ASSERT_FALSE(log_checker.check_valid());

eprosima::ddsrecorder::participants::McapOutputSettings mcap_output_settings;
mcap_output_settings.output_filepath = "./fake_folder"; // This folder does not exist -> error opening file
mcap_output_settings.output_filename = "output_dummy.mcap";
mcap_output_settings.prepend_timestamp = false;
mcap_output_settings.output_timestamp_format = "%Y-%m-%d_%H-%M-%S";
mcap_output_settings.output_local_timestamp = true;
eprosima::ddsrecorder::participants::OutputSettings output_settings;
output_settings.filepath = "./fake_folder"; // This folder does not exist -> error opening file
output_settings.filename = "output_dummy";
output_settings.prepend_timestamp = false;
output_settings.timestamp_format = "%Y-%m-%d_%H-%M-%S";
output_settings.local_timestamp = true;
output_settings.max_file_size = 100 * 1000; // 100KB
output_settings.max_size = output_settings.max_file_size; // 100KB

mcap::McapWriterOptions mcap_writer_options{"ros2"};

eprosima::ddsrecorder::participants::McapHandlerConfiguration config(
mcap_output_settings,
output_settings,
BUFFER_SIZE,
MAX_FILE_SIZE,
MAX_FILE_AGE,
Expand All @@ -68,9 +71,12 @@ TEST(McapLogErrorTests, fail_to_open_file) {
eprosima::ddsrecorder::participants::McapHandlerStateCode init_state =
eprosima::ddsrecorder::participants::McapHandlerStateCode::RUNNING;

// Create the McapWriter
auto file_tracker = std::make_shared<eprosima::ddsrecorder::participants::FileTracker>(config.output_settings);

// Check if an InitializationException is thrown
ASSERT_THROW(
eprosima::ddsrecorder::participants::McapHandler mcap_handler(config, payload_pool, init_state),
eprosima::ddsrecorder::participants::McapHandler mcap_handler(config, payload_pool, file_tracker, init_state),
eprosima::utils::InitializationException);

// Assert that logErrors were captured
Expand Down
Loading

0 comments on commit f45dfb1

Please sign in to comment.