Skip to content

Commit

Permalink
Add metrics to IoRingLogDevice2; add burst mode optimization.
Browse files Browse the repository at this point in the history
  • Loading branch information
tonyastolfi committed Jul 25, 2024
1 parent 8d4ca66 commit 364eaf5
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 51 deletions.
7 changes: 6 additions & 1 deletion src/llfs/basic_log_storage_driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ class BasicLogStorageDriver
//
//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -

Impl& impl()
Impl& impl() noexcept
{
return this->impl_;
}

const Impl& impl() const noexcept
{
return this->impl_;
}
Expand Down
7 changes: 6 additions & 1 deletion src/llfs/basic_ring_buffer_log_device.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,12 @@ class BasicRingBufferLogDevice

Status sync(LogReadMode mode, SlotUpperBoundAt event) override;

driver_type& driver()
driver_type& driver() noexcept
{
return this->driver_;
}

const driver_type& driver() const noexcept
{
return this->driver_;
}
Expand Down
10 changes: 5 additions & 5 deletions src/llfs/ioring_log_device.test.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ inline void run_log_device_benchmark(
const auto flushed_range = log_device.slot_range(LogReadMode::kDurable);
const auto committed_range = log_device.slot_range(LogReadMode::kSpeculative);

LLFS_LOG_INFO() << BATT_INSPECT(flushed_range) << BATT_INSPECT(flushed_range.size())
<< BATT_INSPECT(committed_range) << BATT_INSPECT(committed_range.size())
<< BATT_INSPECT(last_iteration);
LLFS_VLOG(1) << BATT_INSPECT(flushed_range) << BATT_INSPECT(flushed_range.size())
<< BATT_INSPECT(committed_range) << BATT_INSPECT(committed_range.size())
<< BATT_INSPECT(last_iteration);

log_device.halt();
}};
Expand All @@ -121,14 +121,14 @@ inline void run_log_device_benchmark(
llfs::SlotUpperBoundAt{durable.lower_bound + trim_trigger});

if (!sync_status.ok()) {
LLFS_LOG_INFO() << BATT_INSPECT(durable.lower_bound);
LLFS_VLOG(1) << BATT_INSPECT(durable.lower_bound);
break;
}

llfs::Status trim_status = log_device.trim(durable.lower_bound + aligned_trim_size);

if (!trim_status.ok()) {
LLFS_LOG_INFO() << BATT_INSPECT(durable.lower_bound);
LLFS_VLOG(1) << BATT_INSPECT(durable.lower_bound);
break;
}

Expand Down
26 changes: 21 additions & 5 deletions src/llfs/ioring_log_device2.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//
#include <llfs/basic_ring_buffer_log_device.hpp>
#include <llfs/ioring_log_config2.hpp>
#include <llfs/ioring_log_device2_metrics.hpp>
#include <llfs/ioring_log_device_storage.hpp>
#include <llfs/log_device.hpp>
#include <llfs/log_device_runtime_options.hpp>
Expand Down Expand Up @@ -47,6 +48,9 @@ class IoRingLogDriver2
using Self = IoRingLogDriver2;
using AlignedUnit = std::aligned_storage_t<kLogAtomicWriteSize, kLogAtomicWriteSize>;
using EventLoopTask = typename StorageT::EventLoopTask;
using Metrics = IoRingLogDevice2Metrics;

//+++++++++++-+-+--+----- --- -- - - - -

static constexpr batt::StaticType<TargetTrimPos> kTargetTrimPos{};
static constexpr batt::StaticType<CommitPos> kCommitPos{};
Expand All @@ -70,6 +74,8 @@ class IoRingLogDriver2
StorageT&& storage //
) noexcept;

~IoRingLogDriver2() noexcept;

//----

Status set_trim_pos(slot_offset_type trim_pos)
Expand Down Expand Up @@ -130,6 +136,11 @@ class IoRingLogDriver2

//+++++++++++-+-+--+----- --- -- - - - -

const Metrics& metrics() const noexcept
{
return this->metrics_;
}

private:
//+++++++++++-+-+--+----- --- -- - - - -

Expand Down Expand Up @@ -199,7 +210,7 @@ class IoRingLogDriver2
* two writes may be initiated by this function, provided the conditions above are still met after
* starting the first write.
*/
void start_flush(CommitPos observed_commit_pos);
void start_flush(const CommitPos observed_commit_pos);

/** \brief Returns the passed slot range with the lower and upper bounds aligned to the nearest
* data page boundary.
Expand Down Expand Up @@ -282,6 +293,10 @@ class IoRingLogDriver2

//+++++++++++-+-+--+----- --- -- - - - -

/** \brief Diagnostic metrics for this object.
*/
Metrics metrics_;

/** \brief The context passed in at construction time; provides access to the ring buffer.
*/
LogStorageDriverContext& context_;
Expand Down Expand Up @@ -374,10 +389,6 @@ class IoRingLogDriver2
*/
usize writes_pending_ = 0;

/** \brief The maximum observed value of this->writes_pending_ (high water mark).
*/
usize writes_max_ = 0;

/** \brief Buffer containing the control block structure.
*/
std::unique_ptr<AlignedUnit[]> control_block_memory_;
Expand Down Expand Up @@ -448,6 +459,11 @@ class BasicIoRingLogDevice2
config, options, std::move(storage)}
{
}

const IoRingLogDevice2Metrics& metrics() const noexcept
{
return this->driver().impl().metrics();
}
};

/** \brief A fast, durable LogDevice implementation.
Expand Down
88 changes: 76 additions & 12 deletions src/llfs/ioring_log_device2.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
#include <llfs/data_reader.hpp>
#include <llfs/slot_writer.hpp>

#include <batteries/metrics/metric_registry.hpp>

namespace llfs {

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand All @@ -34,6 +36,37 @@ inline /*explicit*/ IoRingLogDriver2<StorageT>::IoRingLogDriver2(
{
BATT_CHECK_GE(this->config_.data_alignment_log2, this->config_.device_page_size_log2);
BATT_CHECK_GE(this->data_page_size_, sizeof(PackedLogControlBlock2));

using batt::Token;

// Register all metrics.
//
MetricLabelSet labels{
MetricLabel{Token{"object_type"}, Token{"llfs_IoRingLogDriver2"}},
MetricLabel{Token{"log_name"}, Token{this->options_.name}},
};

this->metrics_.export_to(global_metric_registry(), labels);

global_metric_registry() //
.add("trim_target_pos", this->observed_watch_[Self::kTargetTrimPos], batt::make_copy(labels))
.add("commit_pos", this->observed_watch_[Self::kCommitPos], batt::make_copy(labels))
.add("trim_pos", this->trim_pos_, batt::make_copy(labels))
.add("flush_pos", this->flush_pos_, batt::make_copy(labels));
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
template <typename StorageT>
inline IoRingLogDriver2<StorageT>::~IoRingLogDriver2() noexcept
{
this->metrics_.unexport_from(global_metric_registry());

global_metric_registry() //
.remove(this->observed_watch_[Self::kTargetTrimPos])
.remove(this->observed_watch_[Self::kCommitPos])
.remove(this->trim_pos_)
.remove(this->flush_pos_);
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand Down Expand Up @@ -292,7 +325,7 @@ inline void IoRingLogDriver2<StorageT>::wait_for_slot_offset_change(T observed_v
//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
template <typename StorageT>
inline void IoRingLogDriver2<StorageT>::start_flush(CommitPos observed_commit_pos)
inline void IoRingLogDriver2<StorageT>::start_flush(const CommitPos observed_commit_pos)
{
// Unflushed data comes after flushed.
//
Expand Down Expand Up @@ -350,20 +383,37 @@ inline void IoRingLogDriver2<StorageT>::start_flush(CommitPos observed_commit_po
}
}

// If burst mode optimization is enabled, then round slot_range.upper_bound *down* to get the
// aligned upper bound (instead of up, the default); this way, we are less likely to need to
// issue another I/O to fill in partial data in flush_tail_.
//
if (this->flush_tail_ && this->options_.optimize_burst_mode) {
this->metrics_.burst_mode_checked.add(1);

const usize new_upper_bound = slot_max(
slot_range.lower_bound,
batt::round_down_bits(this->config_.data_alignment_log2, slot_range.upper_bound));

if (slot_range.upper_bound != new_upper_bound) {
this->metrics_.burst_mode_applied.add(1);
}

slot_range.upper_bound = new_upper_bound;
}

// Align to 512-byte boundaries for direct I/O
//
SlotRange aligned_range = this->get_aligned_range(slot_range);

// If this flush would overlap with an ongoing one (at the last device page) then trim the
// aligned_range so it doesn't.
// aligned_range (on the lower end) so it doesn't.
//
if (this->flush_tail_) {
if (slot_less_than(aligned_range.lower_bound, this->flush_tail_->upper_bound)) {
aligned_range.lower_bound = this->flush_tail_->upper_bound;
if (aligned_range.empty()) {
flush_upper_bound = this->flush_tail_->upper_bound;
continue;
}
if (this->flush_tail_ &&
slot_less_than(aligned_range.lower_bound, this->flush_tail_->upper_bound)) {
aligned_range.lower_bound = this->flush_tail_->upper_bound;
if (aligned_range.empty()) {
flush_upper_bound = this->flush_tail_->upper_bound;
continue;
}
}

Expand Down Expand Up @@ -439,17 +489,21 @@ inline void IoRingLogDriver2<StorageT>::start_flush_write(const SlotRange& slot_
resize_buffer(this->context_.buffer_.get(aligned_range.lower_bound), aligned_range.size());

BATT_CHECK_LE(write_offset + (i64)buffer.size(), this->data_end_)
<< "Data to flush extends beyond the end of the storage extent; forgot to handle wrap-around "
<< "Data to flush extends beyond the end of the storage extent; forgot to handle "
"wrap-around "
"case?";

LLFS_VLOG(1) << " -- async_write_some(offset=" << write_offset << ".."
<< write_offset + buffer.size() << ", size=" << buffer.size() << ")";

++this->writes_pending_;
this->writes_max_ = std::max(this->writes_max_, this->writes_pending_);
this->metrics_.max_concurrent_writes.clamp_min(this->writes_pending_);

BATT_CHECK_LE(this->writes_pending_, this->options_.max_concurrent_writes);

this->metrics_.total_write_count.add(1);
this->metrics_.flush_write_count.add(1);

this->storage_.async_write_some(write_offset, buffer,
this->make_write_handler([this, slot_range, aligned_range](
Self* this_, StatusOr<i32> result) {
Expand Down Expand Up @@ -502,6 +556,9 @@ inline void IoRingLogDriver2<StorageT>::handle_flush_write(const SlotRange& slot
};
BATT_CHECK(!flushed_range.empty());

this->metrics_.bytes_written.add(bytes_written);
this->metrics_.bytes_flushed.add(flushed_range.size());

LLFS_DVLOG(1) << BATT_INSPECT(flushed_range);

// Update this->known_flush_pos_ and this->known_flushed_commit_pos_ to reflect the write.
Expand Down Expand Up @@ -625,6 +682,9 @@ void IoRingLogDriver2<StorageT>::start_control_block_update(

this->writing_control_block_ = true;

this->metrics_.total_write_count.add(1);
this->metrics_.control_block_write_count.add(1);

this->storage_.async_write_some_fixed(
this->config_.control_block_offset, this->control_block_buffer_, /*buf_index=*/0,
this->make_write_handler([](Self* this_, StatusOr<i32> result) {
Expand All @@ -648,7 +708,11 @@ void IoRingLogDriver2<StorageT>::handle_control_block_update(StatusOr<i32> resul
return;
}

if (BATT_CHECKED_CAST(usize, *result) != this->control_block_buffer_.size()) {
const usize bytes_written = BATT_CHECKED_CAST(usize, *result);

this->metrics_.bytes_written.add(bytes_written);

if (bytes_written != this->control_block_buffer_.size()) {
LLFS_LOG_ERROR() << "Failed to write entire log control block!";
this->context_.update_error_status(batt::StatusCode::kInternal);
return;
Expand Down
58 changes: 32 additions & 26 deletions src/llfs/ioring_log_device2.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -488,35 +488,29 @@ TEST_F(IoringLogDevice2SimTest, Simulation)
//
TEST(IoringLogDevice2Test, Benchmark)
{
llfs::run_log_device_benchmark([&](usize log_size, bool create, auto&& workload_fn) {
//+++++++++++-+-+--+----- --- -- - - - -
// Set configuration and options.
//
llfs::IoRingLogConfig2 config{
.control_block_offset = 0,
.log_capacity = log_size,
.device_page_size_log2 = 9,
.data_alignment_log2 = 12,
};

llfs::LogDeviceRuntimeOptions options{
.name = "test log",
.flush_delay_threshold = 2 * kMiB,
.max_concurrent_writes = 64,
};

const char* file_name = //
std::getenv("LLFS_LOG_DEVICE_FILE");

if (!file_name) {
LLFS_LOG_INFO() << "LLFS_LOG_DEVICE_FILE not specified; skipping benchmark test";
return;
}
//+++++++++++-+-+--+----- --- -- - - - -
// Set configuration and options.
//
llfs::LogDeviceRuntimeOptions options{
.name = "test log",
.flush_delay_threshold = 128 * kKiB,
.max_concurrent_writes = 64,
.optimize_burst_mode = llfs::read_test_var<int>("LLFS_LOG_DEVICE_BURST", /*default=*/0) != 0,
};

const char* file_name = //
std::getenv("LLFS_LOG_DEVICE_FILE");

if (!file_name) {
LLFS_LOG_INFO() << "LLFS_LOG_DEVICE_FILE not specified; skipping benchmark test";
return;
}

std::cout << "LLFS_LOG_DEVICE_FILE=" << batt::c_str_literal(file_name) << std::endl;
std::cout << "LLFS_LOG_DEVICE_FILE=" << batt::c_str_literal(file_name) << std::endl;

LLFS_LOG_INFO() << BATT_INSPECT(file_name);
LLFS_LOG_INFO() << BATT_INSPECT(file_name);

llfs::run_log_device_benchmark([&](usize log_size, bool create, auto&& workload_fn) {
//+++++++++++-+-+--+----- --- -- - - - -
// Erase any existing file.
//
Expand All @@ -531,6 +525,13 @@ TEST(IoringLogDevice2Test, Benchmark)
//+++++++++++-+-+--+----- --- -- - - - -
// Create a new log file and size it to the configured capacity.
//
llfs::IoRingLogConfig2 config{
.control_block_offset = 0,
.log_capacity = log_size,
.device_page_size_log2 = 9,
.data_alignment_log2 = 12,
};

llfs::StatusOr<int> status_or_fd = [&] {
if (create) {
return llfs::create_file_read_write(file_name, llfs::OpenForAppend{false});
Expand Down Expand Up @@ -588,6 +589,11 @@ TEST(IoringLogDevice2Test, Benchmark)
// Run the passed workload.
//
workload_fn(log_device);

const double write_amplification =
double(log_device.metrics().bytes_written) / double(log_device.metrics().bytes_flushed);

LLFS_LOG_INFO() << log_device.metrics() << BATT_INSPECT(write_amplification);
});
}

Expand Down
Loading

0 comments on commit 364eaf5

Please sign in to comment.