Skip to content

Commit

Permalink
part-2 of review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Bhaskar Bora committed Feb 5, 2025
1 parent 5164285 commit f9b088b
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 97 deletions.
84 changes: 41 additions & 43 deletions src/llfs/page_recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ StatusOr<SlotRange> refresh_recycler_info_slot(TypedSlotWriter<PageRecycleEvent>
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
/** \brief This function determins the bytes needed for bufferd pages in the log.
*
*/
//
/*static*/ u64 PageRecycler::calculate_log_size_no_padding(
const PageRecyclerOptions& options, Optional<PageCount> max_buffered_page_count)
{
Expand Down Expand Up @@ -178,8 +176,7 @@ StatusOr<SlotRange> refresh_recycler_info_slot(TypedSlotWriter<PageRecycleEvent>

return std::unique_ptr<PageRecycler>{
new PageRecycler(scheduler, std::string{name}, page_deleter, std::move(*recovered_log),
std::move(latest_batch), std::move(state), visitor.volume_trim_offset(),
visitor.page_index() + 1)};
std::move(latest_batch), std::move(state), visitor.volume_trim_slot_info())};
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand All @@ -188,8 +185,7 @@ PageRecycler::PageRecycler(batt::TaskScheduler& scheduler, const std::string& na
PageDeleter& page_deleter, std::unique_ptr<LogDevice>&& wal_device,
Optional<Batch>&& recovered_batch,
std::unique_ptr<PageRecycler::State>&& state,
llfs::slot_offset_type volume_trim_slot_init,
u32 page_index_init) noexcept
VolumeTrimSlotInfo volume_trim_slot_info) noexcept
: scheduler_{scheduler}
, name_{name}
, page_deleter_{page_deleter}
Expand All @@ -203,8 +199,8 @@ PageRecycler::PageRecycler(batt::TaskScheduler& scheduler, const std::string& na
, recycle_task_{}
, metrics_{}
, prepared_batch_{std::move(recovered_batch)}
, volume_trim_slot_{volume_trim_slot_init}
, largest_page_index_{page_index_init}
, volume_trim_slot_info_{volume_trim_slot_info}
, last_page_recycle_offset_{}
{
const PageRecyclerOptions& options = this->state_.no_lock().options;

Expand Down Expand Up @@ -325,29 +321,32 @@ void PageRecycler::join()
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
/** \brief This check is to make sure this request was never seen before. Note that we do unique
* identifier check only for external calls.
*/
//
bool PageRecycler::is_page_recycling_allowed(const Slice<const PageId>& page_ids,
llfs::slot_offset_type volume_trim_slot) noexcept
llfs::slot_offset_type volume_trim_slot)

{
// If we got higher offset allow recycle.
VolumeTrimSlotInfo incoming_volume_trim_slot_info{volume_trim_slot,
static_cast<u32>(page_ids.size())};

// If we got higher trim_slot allow recycle.
//
if (this->volume_trim_slot_ < volume_trim_slot) {
// Update the largest unique identifier and index.
if (this->volume_trim_slot_info_ < incoming_volume_trim_slot_info) {
// If trim_slot matches then it's a partially executed request. Keep processing from the last
// pending page.
//
this->volume_trim_slot_ = volume_trim_slot;
this->largest_page_index_ = 0;
return true;
}
// If we got same offset but not all pages were processed last time then allow recycle.
//
else if (this->volume_trim_slot_ == volume_trim_slot &&
this->largest_page_index_ < page_ids.size()) {
return true;
if (this->volume_trim_slot_info_.volume_trim_slot ==
incoming_volume_trim_slot_info.volume_trim_slot) {
return true;
}
// We got a new trim_slot altogether. Update internal trim_slot and reset page_index.
//
else {
this->volume_trim_slot_info_ = VolumeTrimSlotInfo{volume_trim_slot, 0};
return true;
}
}
// Look like this is a repost of some old request thus update metric and do not allow recycle.
// We got a report of the same recycle_pages request. Don't allow.
//
else {
this->global_metrics().page_id_deletion_reissue_count.fetch_add(1);
Expand All @@ -374,11 +373,11 @@ StatusOr<slot_offset_type> PageRecycler::recycle_pages_depth_0(
// We need to include the pages which are not processed yet.
//
std::sort(page_ids_list.begin(), page_ids_list.end());
page_ids_list.erase(page_ids_list.begin(), page_ids_list.begin() + this->largest_page_index_);
page_ids_list.erase(page_ids_list.begin(),
page_ids_list.begin() + this->volume_trim_slot_info_.page_index);

LLFS_VLOG(1) << "Sorted slice: " << BATT_INSPECT(page_ids_list.size())
<< BATT_INSPECT(this->largest_page_index_);

<< BATT_INSPECT(this->volume_trim_slot_info_.page_index);
const Slice<const PageId> page_ids = batt::as_slice(page_ids_list);
Optional<slot_offset_type> sync_point = None;
const PageRecyclerOptions& options = this->state_.no_lock().options;
Expand Down Expand Up @@ -407,12 +406,11 @@ StatusOr<slot_offset_type> PageRecycler::recycle_pages_depth_0(
{
auto locked_state = this->state_.lock();
// Writing to recycler log.
StatusOr<slot_offset_type> append_slot =
this->insert_to_log(*local_grant, page_id, depth, this->volume_trim_slot_,
this->largest_page_index_, locked_state);
StatusOr<slot_offset_type> append_slot = this->insert_to_log(
*local_grant, page_id, depth, this->volume_trim_slot_info_, locked_state);
BATT_REQUIRE_OK(append_slot);

++this->largest_page_index_;
++this->volume_trim_slot_info_.page_index;
this->last_page_recycle_offset_ = *append_slot;

clamp_min_slot(&sync_point, *append_slot);
Expand All @@ -426,7 +424,7 @@ StatusOr<slot_offset_type> PageRecycler::recycle_pages_depth_0(
//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
StatusOr<slot_offset_type> PageRecycler::recycle_pages_depth_n(const Slice<const PageId>& page_ids,
batt::Grant* grant,
batt::Grant& grant,
const i32 depth) noexcept
{
Optional<slot_offset_type> sync_point = None;
Expand All @@ -435,8 +433,8 @@ StatusOr<slot_offset_type> PageRecycler::recycle_pages_depth_n(const Slice<const
auto locked_state = this->state_.lock();
for (PageId page_id : page_ids) {
// Writing to recycler log.
StatusOr<slot_offset_type> append_slot = this->insert_to_log(
*grant, page_id, depth, this->volume_trim_slot_, this->largest_page_index_, locked_state);
StatusOr<slot_offset_type> append_slot =
this->insert_to_log(grant, page_id, depth, this->volume_trim_slot_info_, locked_state);
BATT_REQUIRE_OK(append_slot);

this->last_page_recycle_offset_ = *append_slot;
Expand All @@ -459,8 +457,8 @@ StatusOr<slot_offset_type> PageRecycler::recycle_pages(const Slice<const PageId>
LLFS_VLOG(1) << "PageRecycler::recycle_pages(page_ids=" << batt::dump_range(page_ids) << "["
<< page_ids.size() << "]"
<< ", grant=[" << (grant ? grant->size() : usize{0}) << "], depth=" << depth << ") "
<< this->name_ << BATT_INSPECT(volume_trim_slot)
<< BATT_INSPECT(this->volume_trim_slot_)
<< this->name_ << BATT_INSPECT(this->volume_trim_slot_info_.volume_trim_slot)
<< BATT_INSPECT(this->volume_trim_slot_info_.page_index)
<< BATT_INSPECT(this->last_page_recycle_offset_);

if (page_ids.empty()) {
Expand All @@ -476,7 +474,7 @@ StatusOr<slot_offset_type> PageRecycler::recycle_pages(const Slice<const PageId>

return recycle_pages_depth_0(page_ids, volume_trim_slot);
} else {
return recycle_pages_depth_n(page_ids, grant, depth);
return recycle_pages_depth_n(page_ids, *grant, depth);
}
}

Expand All @@ -493,8 +491,8 @@ StatusOr<slot_offset_type> PageRecycler::recycle_page(PageId page_id,
//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
StatusOr<slot_offset_type> PageRecycler::insert_to_log(
batt::Grant& grant, PageId page_id, i32 depth, slot_offset_type volume_trim_slot,
u32 page_index, batt::Mutex<std::unique_ptr<State>>::Lock& locked_state)
batt::Grant& grant, PageId page_id, i32 depth, const VolumeTrimSlotInfo& volume_trim_slot_info,
batt::Mutex<std::unique_ptr<State>>::Lock& locked_state) noexcept
{
BATT_CHECK(locked_state.is_held());

Expand All @@ -509,8 +507,8 @@ StatusOr<slot_offset_type> PageRecycler::insert_to_log(
.refresh_slot = None,
.batch_slot = None,
.depth = depth,
.volume_trim_slot = volume_trim_slot,
.page_index = page_index,
.volume_trim_slot_info{volume_trim_slot_info.volume_trim_slot,
volume_trim_slot_info.page_index + 1},
},
[&](const batt::SmallVecBase<PageToRecycle*>& to_append) -> StatusOr<slot_offset_type> {
if (to_append.empty()) {
Expand Down
25 changes: 15 additions & 10 deletions src/llfs/page_recycler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class PageRecycler
static u64 calculate_log_size(const PageRecyclerOptions& options,
Optional<PageCount> max_buffered_page_count = None);

/** \brief This function determins the bytes needed for bufferd pages in the log. Part of the
* calculation is based on number of buffered-pages and total-page-grant size.
*/
static u64 calculate_log_size_no_padding(const PageRecyclerOptions& options,
Optional<PageCount> max_buffered_page_count = None);

Expand Down Expand Up @@ -130,7 +133,7 @@ class PageRecycler
/** \brief Schedule a list of pages to be recycled when depth is non-ZERO.
*/
StatusOr<slot_offset_type> recycle_pages_depth_n(const Slice<const PageId>& page_ids_in,
batt::Grant* grant, const i32 depth) noexcept;
batt::Grant& grant, const i32 depth) noexcept;

// Waits for the given slot to be flushed to durable storage.
//
Expand Down Expand Up @@ -218,23 +221,27 @@ class PageRecycler
explicit PageRecycler(batt::TaskScheduler& scheduler, const std::string& name,
PageDeleter& page_deleter, std::unique_ptr<LogDevice>&& wal_device,
Optional<Batch>&& recovered_batch,
std::unique_ptr<PageRecycler::State>&& state, u64 volume_trim_slot_init,
u32 page_index_init) noexcept;
std::unique_ptr<PageRecycler::State>&& state,
VolumeTrimSlotInfo volume_trim_slot_info) noexcept;

void start_recycle_task();

void recycle_task_main();

/** \brief This check is to make sure this request was never seen before. Note that we do this
* check only for external calls.
*/
bool is_page_recycling_allowed(const Slice<const PageId>& page_ids,
llfs::slot_offset_type volume_trim_slot) noexcept;
llfs::slot_offset_type volume_trim_slot);

// MUST be called only on the recycle task or the ctor.
//
void refresh_grants();

StatusOr<slot_offset_type> insert_to_log(batt::Grant& grant, PageId page_id, i32 depth,
slot_offset_type volume_trim_slot, u32 page_index,
batt::Mutex<std::unique_ptr<State>>::Lock& locked_state);
StatusOr<slot_offset_type> insert_to_log(
batt::Grant& grant, PageId page_id, i32 depth,
const VolumeTrimSlotInfo& volume_trim_slot_info,
batt::Mutex<std::unique_ptr<State>>::Lock& locked_state) noexcept;

StatusOr<Batch> prepare_batch(std::vector<PageToRecycle>&& to_recycle);

Expand Down Expand Up @@ -276,9 +283,7 @@ class PageRecycler

Optional<slot_offset_type> latest_batch_upper_bound_;

slot_offset_type volume_trim_slot_;

u32 largest_page_index_;
VolumeTrimSlotInfo volume_trim_slot_info_;

slot_offset_type last_page_recycle_offset_;
};
Expand Down
4 changes: 2 additions & 2 deletions src/llfs/page_recycler_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ std::ostream& operator<<(std::ostream& out, const PageToRecycle& t)
{
return out << "PageToRecycle{.page_id=" << t.page_id << ", .refresh_slot=" << t.refresh_slot
<< ", .batch_slot=" << t.batch_slot << ", .depth=" << t.depth
<< ", volume_trim_slot=" << t.volume_trim_slot << ", page_index=" << t.page_index
<< ",}";
<< ", volume_trim_slot=" << t.volume_trim_slot_info.volume_trim_slot
<< ", page_index=" << t.volume_trim_slot_info.page_index << ",}";
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand Down
42 changes: 28 additions & 14 deletions src/llfs/page_recycler_events.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,29 @@

namespace llfs {

/** \brief This is to track volume_trim_slot and page_index values for PageRecycler so that it could
* detect a re-issue of recycle_pages request by an external caller (like volume-trimmer side).
* If a duplicate request was deetcted, recycler skips adding it to its internal work queue.
* 'volume_trim_slot' is an ever increasing value. 'page_index' is used to allow resume of a
* partially executed request.
*/
struct VolumeTrimSlotInfo {
// The offset given by volume trimmer.
//
slot_offset_type volume_trim_slot;

// This tracks the page index within recycle_pages request.
//
u32 page_index;

bool operator<(const VolumeTrimSlotInfo& other) const
{
return slot_less_than(this->volume_trim_slot, other.volume_trim_slot) ||
(this->volume_trim_slot == other.volume_trim_slot &&
this->page_index < other.page_index);
}
};

struct PageToRecycle {
// Which page to recycle.
//
Expand All @@ -39,13 +62,7 @@ struct PageToRecycle {
//
i32 depth;

// The offset given by volume trimmer.
//
slot_offset_type volume_trim_slot;

// This tracks the page index within recycle_pages request.
//
u32 page_index;
VolumeTrimSlotInfo volume_trim_slot_info;

//+++++++++++-+-+--+----- --- -- - - - -

Expand All @@ -56,8 +73,7 @@ struct PageToRecycle {
.refresh_slot = None,
.batch_slot = None,
.depth = 0,
.volume_trim_slot = 0,
.page_index = 0,
.volume_trim_slot_info{0, 0},
};
}

Expand Down Expand Up @@ -168,15 +184,14 @@ inline bool pack_object_to(const PageToRecycle& from, PackedPageToRecycle* to, D
to->page_id = from.page_id.int_value();
to->depth = from.depth;
to->flags = 0;
//std::memset(&to->reserved_, 0, sizeof(PackedPageToRecycle::reserved_));
if (from.batch_slot) {
to->flags |= PackedPageToRecycle::kHasBatchSlot;
to->batch_slot = *from.batch_slot;
} else {
to->batch_slot = 0;
}
to->volume_trim_slot = from.volume_trim_slot;
to->page_index = from.page_index;
to->volume_trim_slot = from.volume_trim_slot_info.volume_trim_slot;
to->page_index = from.volume_trim_slot_info.page_index;
return true;
}

Expand All @@ -192,8 +207,7 @@ inline StatusOr<PageToRecycle> unpack_object(const PackedPageToRecycle& packed,
return None;
}(),
.depth = packed.depth,
.volume_trim_slot = packed.volume_trim_slot,
.page_index = packed.page_index,
.volume_trim_slot_info = VolumeTrimSlotInfo{packed.volume_trim_slot, packed.page_index},
};
}

Expand Down
27 changes: 8 additions & 19 deletions src/llfs/page_recycler_recovery_visitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ namespace llfs {
, latest_batch_{}
, recycler_uuid_{boost::uuids::random_generator{}()}
, latest_info_refresh_slot_{}
, volume_trim_slot_{0}
, largest_page_index_{0}
, volume_trim_slot_info_{}
{
initialize_status_codes();
LLFS_VLOG(1) << "PageRecyclerRecoveryVisitor()";
Expand Down Expand Up @@ -115,16 +114,9 @@ Optional<SlotRange> PageRecyclerRecoveryVisitor::latest_info_refresh_slot() cons

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
slot_offset_type PageRecyclerRecoveryVisitor::volume_trim_offset() const
VolumeTrimSlotInfo PageRecyclerRecoveryVisitor::volume_trim_slot_info() const
{
return this->volume_trim_slot_;
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
u32 PageRecyclerRecoveryVisitor::page_index() const
{
return this->largest_page_index_;
return this->volume_trim_slot_info_;
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand All @@ -133,8 +125,9 @@ Status PageRecyclerRecoveryVisitor::operator()(const SlotParse& slot,
const PageToRecycle& recovered)
{
LLFS_VLOG(1) << "Recovered slot: " << slot.offset << " " << recovered
<< BATT_INSPECT((bool)this->latest_batch_) << BATT_INSPECT(this->volume_trim_slot_)
<< BATT_INSPECT(this->largest_page_index_);
<< BATT_INSPECT((bool)this->latest_batch_)
<< BATT_INSPECT(this->volume_trim_slot_info_.volume_trim_slot)
<< BATT_INSPECT(this->volume_trim_slot_info_.page_index);

// Add the new record, or verify that the existing one and the new one are equivalent.
//
Expand All @@ -161,12 +154,8 @@ Status PageRecyclerRecoveryVisitor::operator()(const SlotParse& slot,
// Save the largest unique offset identifier. Note that if offsets are same then we need to only
// update the largest page_index.
//
if (this->volume_trim_slot_ < recovered.volume_trim_slot) {
this->volume_trim_slot_ = recovered.volume_trim_slot;
this->largest_page_index_ = recovered.page_index;
} else if (this->volume_trim_slot_ == recovered.volume_trim_slot &&
this->largest_page_index_ < recovered.page_index) {
this->largest_page_index_ = recovered.page_index;
if (this->volume_trim_slot_info_ < recovered.volume_trim_slot_info) {
this->volume_trim_slot_info_ = recovered.volume_trim_slot_info;
}

PageToRecycle& to_recycle = iter->second;
Expand Down
Loading

0 comments on commit f9b088b

Please sign in to comment.