Skip to content

Commit

Permalink
made recycle_pages more modular
Browse files Browse the repository at this point in the history
  • Loading branch information
Bhaskar Bora committed Feb 4, 2025
1 parent f8815b1 commit dae089d
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 85 deletions.
190 changes: 105 additions & 85 deletions src/llfs/page_recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ StatusOr<SlotRange> refresh_recycler_info_slot(TypedSlotWriter<PageRecycleEvent>
return std::unique_ptr<PageRecycler>{
new PageRecycler(scheduler, std::string{name}, page_deleter, std::move(*recovered_log),
std::move(latest_batch), std::move(state), visitor.largest_unique_offset(),
visitor.page_index())};
visitor.page_index() + 1)};
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
Expand Down Expand Up @@ -356,105 +356,67 @@ bool PageRecycler::is_page_recycling_allowed(const Slice<const PageId>& page_ids
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
StatusOr<slot_offset_type> PageRecycler::recycle_pages(
const Slice<const PageId>& page_ids_in, llfs::slot_offset_type offset_as_unique_identifier,
batt::Grant* grant, i32 depth)
/** \brief This function handles page recycling for external callers. This is done by first checking
* if this request could be even be accepted (as it could be a retry from the volume-trimmer side).
* If accepted the pages are processed starting the last pending page (which very well could be the
* first page if its the first time).
*/
StatusOr<slot_offset_type> PageRecycler::recycle_pages_depth_0(
const Slice<const PageId>& page_ids_in, llfs::slot_offset_type offset_as_unique_identifier)
{
BATT_CHECK_GE(depth, 0);

LLFS_VLOG(1) << "PageRecycler::recycle_pages(page_ids=" << batt::dump_range(page_ids_in) << "["
<< page_ids_in.size() << "]"
<< ", grant=[" << (grant ? grant->size() : usize{0}) << "], depth=" << depth << ") "
<< this->name_ << BATT_INSPECT(offset_as_unique_identifier)
<< BATT_INSPECT(this->largest_offset_as_unique_identifier_)
<< BATT_INSPECT(this->last_page_recycle_offset_);

if (page_ids_in.empty()) {
return this->wal_device_->slot_range(LogReadMode::kDurable).upper_bound;
}

// Sort the pages if called by external callers.
constexpr i32 depth = 0;
// Check to make sure request was never seen before.
//
std::vector<PageId> page_ids_list(page_ids_in.begin(), page_ids_in.end());
if (depth == 0) {
// Check to make sure request was never seen before.
//
{
auto locked_state = this->state_.lock();
if (!is_page_recycling_allowed(page_ids_in, offset_as_unique_identifier)) {
return this->wal_device_->slot_range(LogReadMode::kDurable).upper_bound;
}
{
auto locked_state = this->state_.lock();
if (!is_page_recycling_allowed(page_ids_in, offset_as_unique_identifier)) {
return this->wal_device_->slot_range(LogReadMode::kDurable).upper_bound;
}
}

// We need to include the pages which are not processed yet.
//
std::sort(page_ids_list.begin(), page_ids_list.end());
page_ids_list.erase(page_ids_list.begin(), page_ids_list.begin() + this->largest_page_index_);
// We need to include the pages which are not processed yet.
//
std::sort(page_ids_list.begin(), page_ids_list.end());
page_ids_list.erase(page_ids_list.begin(), page_ids_list.begin() + this->largest_page_index_);

LLFS_VLOG(1) << "Sorted slice: " << BATT_INSPECT(page_ids_list.size())
<< BATT_INSPECT(this->largest_page_index_);
LLFS_VLOG(1) << "Sorted slice: " << BATT_INSPECT(page_ids_list.size())
<< BATT_INSPECT(this->largest_page_index_);

BATT_CHECK_EQ(grant, nullptr) << "External callers to `PageRecycler::recycle_pages` should "
"specify depth == 0 and grant == nullptr; other values are "
"for PageRecycler internal use only.";
}
const Slice<const PageId> page_ids = batt::as_slice(page_ids_list);
Optional<slot_offset_type> sync_point = None;
const PageRecyclerOptions& options = this->state_.no_lock().options;

if (grant == nullptr) {
BATT_CHECK_EQ(depth, 0u);

const PageRecyclerOptions& options = this->state_.no_lock().options;

for (PageId page_id : page_ids) {
StatusOr<batt::Grant> local_grant = [&] {
const usize needed_size = options.insert_grant_size();

BATT_DEBUG_INFO(
"[PageRecycler::recycle_page] waiting for log space; "
<< BATT_INSPECT(needed_size) << BATT_INSPECT(this->insert_grant_pool_.size())
<< BATT_INSPECT(depth) << BATT_INSPECT(options.total_grant_size_for_depth(depth))
<< BATT_INSPECT(this->slot_writer_.in_use_size())
<< BATT_INSPECT(this->slot_writer_.log_capacity())
<< BATT_INSPECT(options.recycle_task_target()) << BATT_INSPECT(page_ids.size())
<< BATT_INSPECT(this->last_page_recycle_offset_));

return this->insert_grant_pool_.spend(needed_size, batt::WaitForResource::kTrue);
}();
if (!local_grant.ok()) {
if (!suppress_log_output_for_test() && !this->stop_requested_.load()) {
LLFS_LOG_WARNING() << "PageRecycler::recycle_pages failed; not enough log buffer space";
}
}
BATT_REQUIRE_OK(local_grant);
{
auto locked_state = this->state_.lock();
// Writing to recycler log.
StatusOr<slot_offset_type> append_slot = this->insert_to_log(
*local_grant, page_id, depth, this->largest_offset_as_unique_identifier_,
this->largest_page_index_ + 1, locked_state);
BATT_REQUIRE_OK(append_slot);

++this->largest_page_index_;

this->last_page_recycle_offset_ = *append_slot;

clamp_min_slot(&sync_point, *append_slot);
for (PageId page_id : page_ids) {
StatusOr<batt::Grant> local_grant = [&] {
const usize needed_size = options.insert_grant_size();

BATT_DEBUG_INFO("[PageRecycler::recycle_page_depth_0] waiting for log space; "
<< BATT_INSPECT(needed_size) << BATT_INSPECT(this->insert_grant_pool_.size())
<< BATT_INSPECT(options.total_grant_size_for_depth(depth))
<< BATT_INSPECT(this->slot_writer_.in_use_size())
<< BATT_INSPECT(this->slot_writer_.log_capacity())
<< BATT_INSPECT(options.recycle_task_target())
<< BATT_INSPECT(page_ids.size())
<< BATT_INSPECT(this->last_page_recycle_offset_));

return this->insert_grant_pool_.spend(needed_size, batt::WaitForResource::kTrue);
}();
if (!local_grant.ok()) {
if (!suppress_log_output_for_test() && !this->stop_requested_.load()) {
LLFS_LOG_WARNING() << "PageRecycler::recycle_pages failed; not enough log buffer space";
}
}
} else {
BATT_CHECK_LT(depth, (i32)kMaxPageRefDepth) << BATT_INSPECT_RANGE(page_ids);

auto locked_state = this->state_.lock();

for (PageId page_id : page_ids) {
BATT_REQUIRE_OK(local_grant);
{
auto locked_state = this->state_.lock();
// Writing to recycler log.
StatusOr<slot_offset_type> append_slot =
this->insert_to_log(*grant, page_id, depth, this->largest_offset_as_unique_identifier_,
this->largest_page_index_ + 1, locked_state);
StatusOr<slot_offset_type> append_slot = this->insert_to_log(
*local_grant, page_id, depth, this->largest_offset_as_unique_identifier_,
this->largest_page_index_, locked_state);
BATT_REQUIRE_OK(append_slot);

++this->largest_page_index_;
this->last_page_recycle_offset_ = *append_slot;

clamp_min_slot(&sync_point, *append_slot);
Expand All @@ -465,6 +427,64 @@ StatusOr<slot_offset_type> PageRecycler::recycle_pages(
return *sync_point;
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
/** \brief This function handles page recycling when depth is non-zero (internal calls).
*/
StatusOr<slot_offset_type> PageRecycler::recycle_pages_depth_n(const Slice<const PageId>& page_ids,
batt::Grant* grant, const i32 depth)
{
Optional<slot_offset_type> sync_point = None;
BATT_CHECK_LT(depth, (i32)kMaxPageRefDepth) << BATT_INSPECT_RANGE(page_ids);

auto locked_state = this->state_.lock();
for (PageId page_id : page_ids) {
// Writing to recycler log.
StatusOr<slot_offset_type> append_slot =
this->insert_to_log(*grant, page_id, depth, this->largest_offset_as_unique_identifier_,
this->largest_page_index_, locked_state);
BATT_REQUIRE_OK(append_slot);

this->last_page_recycle_offset_ = *append_slot;

clamp_min_slot(&sync_point, *append_slot);
}

BATT_CHECK(sync_point);
return *sync_point;
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
StatusOr<slot_offset_type> PageRecycler::recycle_pages(
const Slice<const PageId>& page_ids, llfs::slot_offset_type offset_as_unique_identifier,
batt::Grant* grant, i32 depth)
{
BATT_CHECK_GE(depth, 0);

LLFS_VLOG(1) << "PageRecycler::recycle_pages(page_ids=" << batt::dump_range(page_ids) << "["
<< page_ids.size() << "]"
<< ", grant=[" << (grant ? grant->size() : usize{0}) << "], depth=" << depth << ") "
<< this->name_ << BATT_INSPECT(offset_as_unique_identifier)
<< BATT_INSPECT(this->largest_offset_as_unique_identifier_)
<< BATT_INSPECT(this->last_page_recycle_offset_);

if (page_ids.empty()) {
return this->wal_device_->slot_range(LogReadMode::kDurable).upper_bound;
}

// Handle calls from external callers first (depth==0).
//
if (depth == 0) {
BATT_CHECK_EQ(grant, nullptr) << "External callers to `PageRecycler::recycle_pages` should "
"specify depth == 0 and grant == nullptr; other values are "
"for PageRecycler internal use only.";

return recycle_pages_depth_0(page_ids, offset_as_unique_identifier);
} else {
return recycle_pages_depth_n(page_ids, grant, depth);
}
}

//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - -
//
StatusOr<slot_offset_type> PageRecycler::recycle_page(PageId page_id,
Expand Down
12 changes: 12 additions & 0 deletions src/llfs/page_recycler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ class PageRecycler
StatusOr<slot_offset_type> recycle_page(PageId page_id, slot_offset_type unique_offset,
batt::Grant* grant = nullptr, i32 depth = 0);

// Schedule a list of pages to be recycled when depth is ZERO. This is basically called by
// recycle_pages.
//
StatusOr<slot_offset_type> recycle_pages_depth_0(
const Slice<const PageId>& page_ids_in, llfs::slot_offset_type offset_as_unique_identifier);

// Schedule a list of pages to be recycled when depth is non-ZERO. This is basically called by
// recycle_pages.
//
StatusOr<slot_offset_type> recycle_pages_depth_n(const Slice<const PageId>& page_ids_in,
batt::Grant* grant, const i32 depth);

// Waits for the given slot to be flushed to durable storage.
//
Status await_flush(Optional<slot_offset_type> min_upper_bound);
Expand Down

0 comments on commit dae089d

Please sign in to comment.