From 514519d4acaaf3e392f25ae5955c8121d6709c1e Mon Sep 17 00:00:00 2001 From: Tony Astolfi Date: Mon, 26 Feb 2024 16:49:15 -0500 Subject: [PATCH] Implement New Lock-Free Page Cache (#140) * wip - lock-free page cache with LRU clock. * wip - build broken (halfway through converting to new cache). * Builds and tests pass (new lock-free page cache). * Don't ever destroy the LRUClock singleton instance; re-enable Cache. * Fix LRUCounter to use std::mutex instead of batt::Mutex. This was a bug because it mixes threading models; when adding a LRUClock::LocalCounter to the global list, it is very important that if the current thread must wait for the mutex, that when it resumes (with the lock held), it be on the _same_ thread, with the same thread_local variables, as when it blocked. * Simplify new cache and fix bugs. * Finish documenting PageCacheSlot::Pool. * Log more often in page allocator model test. * Add more doc, tests for PageCacheSlot. * Clean up the design proposal. --- conanfile.py | 8 +- doc/proposals/lock_free_cache.md | 146 ++++++++ src/llfs/ioring_log_flush_op.hpp | 1 + src/llfs/lru_clock.cpp | 170 ++++++++++ src/llfs/lru_clock.hpp | 154 +++++++++ src/llfs/lru_clock.test.cpp | 183 ++++++++++ src/llfs/page_allocator.test.cpp | 9 +- src/llfs/page_cache.cpp | 366 +++++++++++--------- src/llfs/page_cache.hpp | 112 +++++-- src/llfs/page_cache_slot.cpp | 320 ++++++++++++++++++ src/llfs/page_cache_slot.hpp | 338 +++++++++++++++++++ src/llfs/page_cache_slot.test.cpp | 427 ++++++++++++++++++++++++ src/llfs/page_cache_slot_atomic_ref.hpp | 146 ++++++++ src/llfs/page_cache_slot_pinned_ref.hpp | 136 ++++++++ src/llfs/page_cache_slot_pool.cpp | 200 +++++++++++ src/llfs/page_cache_slot_pool.hpp | 130 ++++++++ src/llfs/page_device_cache.cpp | 222 ++++++++++++ src/llfs/page_device_cache.hpp | 92 +++++ src/llfs/page_id.hpp | 2 +- src/llfs/page_id_slot.cpp | 41 ++- src/llfs/page_id_slot.hpp | 36 +- src/llfs/page_loader.cpp | 11 + src/llfs/page_loader.hpp | 5 + src/llfs/page_recycler.hpp | 1 + src/llfs/page_view.hpp | 2 +- src/llfs/pinned_page.hpp | 7 +- src/llfs/status_code.cpp | 2 + src/llfs/status_code.hpp | 1 + src/llfs/storage_context.test.cpp | 10 +- src/llfs/volume.cpp | 9 +- src/llfs/volume.test.cpp | 60 ++-- 31 files changed, 3098 insertions(+), 249 deletions(-) create mode 100644 doc/proposals/lock_free_cache.md create mode 100644 src/llfs/lru_clock.cpp create mode 100644 src/llfs/lru_clock.hpp create mode 100644 src/llfs/lru_clock.test.cpp create mode 100644 src/llfs/page_cache_slot.cpp create mode 100644 src/llfs/page_cache_slot.hpp create mode 100644 src/llfs/page_cache_slot.test.cpp create mode 100644 src/llfs/page_cache_slot_atomic_ref.hpp create mode 100644 src/llfs/page_cache_slot_pinned_ref.hpp create mode 100644 src/llfs/page_cache_slot_pool.cpp create mode 100644 src/llfs/page_cache_slot_pool.hpp create mode 100644 src/llfs/page_device_cache.cpp create mode 100644 src/llfs/page_device_cache.hpp diff --git a/conanfile.py b/conanfile.py index 004a43d..2b77892 100644 --- a/conanfile.py +++ b/conanfile.py @@ -14,7 +14,7 @@ #==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - # Import batt helper utilities module. # -import script.batt as batt +import script.batt from script.batt import VISIBLE, OVERRIDE # #+++++++++++-+-+--+----- --- -- - - - - @@ -69,7 +69,7 @@ def configure(self): def requirements(self): - self.requires("batteries/0.50.2", **VISIBLE) + self.requires("batteries/0.51.0", **VISIBLE) self.requires("boost/1.83.0", **VISIBLE) self.requires("cli11/2.3.2", **VISIBLE) self.requires("glog/0.6.0", **VISIBLE) @@ -77,11 +77,11 @@ def requirements(self): self.requires("libbacktrace/cci.20210118", **VISIBLE) self.requires("openssl/3.2.0", **VISIBLE) - self.requires("zlib/1.2.13", **OVERRIDE) + self.requires("zlib/1.3", **OVERRIDE) if platform.system() == "Linux": self.requires("liburing/2.4", **VISIBLE) - self.requires("libfuse/3.10.5", **VISIBLE) + self.requires("libfuse/3.16.2", **VISIBLE) self.requires("libunwind/1.7.2", **VISIBLE, **OVERRIDE) #+++++++++++-+-+--+----- --- -- - - - - diff --git a/doc/proposals/lock_free_cache.md b/doc/proposals/lock_free_cache.md new file mode 100644 index 0000000..7399c2d --- /dev/null +++ b/doc/proposals/lock_free_cache.md @@ -0,0 +1,146 @@ +# Lock-Free Page Cache Design Proposal + +## Problem + +The current page cache index is implemented using a set of page-size-specific instances of the `llfs::Cache` class. This class is thread-safe but performs poorly under contention for two reasons: + + 1. The mapping from PageId to buffered page data is maintained via a single `std::unordered_map` protected by a single mutex + 2. The LRU eviction policy is implemented using a per-`llfs::Cache` object linked list of pages in LRU order, which is also protected by the same mutex + +Note that number 2 essentially turns every access (which should update the notion of recency-of-use somehow) into a write, meaning that we can't even use a reader/writer locking strategy to optimize read-heavy workloads. + +## Proposed Solution + +### High Level + +We address the two problems above in different ways, each of which can be implemented separately (though the solution to concurrent hash table lookups is somewhat pointless without also solving the LRU maintenance problem as well). + +Instead of maintaining LRU order via a linked list of cache slots, we will instead update a logical time stamp per slot on each access to that slot. Eviction will be implemented by randomly selecting some constant _k_ number of candidate slots to evict, then attempting to evict the one with the oldest (smallest-valued) logical time stamp. This trades off some accuracy for a (potentially) dramatic increase in scalability and performance. + + +The concurrent hash table access problem will be solved by implementing a new lock-free index system, where an array of 64-bit atomic integers will be maintained for each `PageDevice`. These will index into a pool of cache slot objects; there will be one pool per page size, and multiple PageDevice instances may share a single pool. This design takes advantage of the fact that a given page is kept alive only while there are any references to that page, with the underlying storage resources of a page being reused by multiple page ids over time. This means that while the total space of `PageId` values is quite large (and the active set at any given time is sparse within this space), the number of _active_ (i.e. readable) pages at a given time is much smaller, and is dense due to the fact that page ids map to a physical storage location. + +### Details + +#### Background: CacheSlot + +The `llfs::CacheSlot` class holds a key/value pair in the current implementation. `CacheSlot` instances are essentially the values in the hash-table index used in the current `Cache` implementation. There are two ways for some part of the system to obtain a reference to a `CacheSlot`: + +1. Perform a lookup in the `Cache` object that owns the slot +2. Promote an atomic "weak" reference to a strong (pinned) reference + +Method 2 allows applications that use LLFS to implement lookup strategies that are optimized to that use case, in order to reduce contention on the `Cache` hash table. For example, an application might implement a tree-like structure where one page contains references to other pages. The `PageView` implementation for this page layout might contain a number of `llfs::AtomicCacheSlotRef` objects (one for each outgoing page reference), each of which is lazily initialized to point at the cache slot containing the referenced page the first time the reference is followed. On subsequent traversals of the link from the referring page to the referent, the application can attempt first to "pin" `llfs::AtomicCacheSlotRef` for that link. This will succeed iff the slot still contains the target page. If that slot has been evicted due to cache pressure, the application will fall back on the default load path for the page, which will allocate a new cache slot and update the hash table in `Cache`. + +**_Note: The potential concurrent scalability offered by this design is currently thwarted by the need to maintain an LRU list whenever a page is accessed; even when weak slot references are successfully pinned, we fail to avoid locking the single shared mutex that protects the entire Cache state._** + +`llfs::CacheSlot` has a field, `std::atomic state_`, which is responsible for storing the current state of the slot and coordinating transitions between the different states. The states of a `CacheSlot` are: + + - Invalid (the initial state; the slot _can_ return to this state repeatedly in its lifespan) + - Valid + Filled + - Valid + Filled + Pinned + - Valid + Cleared + +These are mapped onto the state integer as follows: + +``` +┌───────┬───────┬────────┬────────────────┬────────┬────────────────┐ +│Valid? │Unused │Overflow│DecreasePinCount│Overflow│IncreasePinCount│ +│(1 bit)│(1 bit)│(1 bit) │ (30 bits) │(1 bit) │ (30 bits) │ +└───────┴───────┴────────┴────────────────┴────────┴────────────────┘ +``` + +The `Valid?` bit determines whether the slot is in the Invalid state (`Valid? == 0`) or one of the Valid states. + +The current "pin count" is defined as `IncreasePinCount - DecreasePinCount`; it is the number of active "pins" on the cache slot. A slot may not be evicted or modified when it is pinned. The slot is considered "pinned" when the pin count is not zero (negative pin count is illegal; the invariant is that `IncreasePinCount >= DecreasePinCount` at all times). + +After updating either of the pin counts, if we observe that the corresponding overflow bit is set, we simply do an atomic `fetch_and` operation to set it back to zero. This allows us to use atomic `fetch_add` operations instead of CAS to update the pin count. For example: + +```c++ + // Increment the pin count. + // + const u64 observed_state = this->state_.fetch_add(kIncreasePinDelta); + const u64 new_state = observed_state + kIncreasePinDelta; + + if (new_state & kIncreasePinOverflow) { + this->state_.fetch_and(~kIncreasePinOverflow); + } +``` + +The key and value data for a slot are stored in the following data members: + +```c++ + Optional key_; + std::shared_ptr value_; +``` + +If the `key_` field is set to `None` _and_ the slot is in a valid state (`Valid?` bit is set) then the state is Valid + Cleared. If `key_` is non-`None`, then the slot is in a Filled state (and `value_` is presumed to be non-null, the current value bound to key). Thus Cleared and Filled are mutually exclusive. + +The state transitions for CacheSlot are shown below: + +``` + ┌─────────┐ + ┌─────│ Invalid │──────────┐ + fill() │ └─────────┘ │ clear() + │ ▲ │ + │ │ │ + │ │evict() │ + ▼ │ ▼ + ┌────────────────┐ │ ┌─────────────────┐ + ┌──────│ Valid + Filled │──┴──────│ Valid + Cleared │ + │ └────────────────┘ └─────────────────┘ + │ ▲ +acquire_pin():│ │ + 0 -> 1 │ │release_pin(): + │ │ 1 -> 0 + │ │ + │ ┌─────────────────────────┐ + └─▶│ Valid + Filled + Pinned │ + └─────────────────────────┘ +``` + +It is only legal to read the key or value of a slot if it is in a Valid state. If the slot enters the Invalid state because a thread is able to successfully call evict(), then that thread has exclusive access to the key and value fields. This allows the either clear() or fill() to be called to change the key/value fields. For this reason, readers must be careful to prevent a slot from transitioning states while they are reading the key and/or value fields. Thus a reader must first "pin" the slot by incrementing the pin count; it can then check to see whether the pin succeeded (is the `Valid?` bit set?). If successful, the reader can proceed to read the key and value, confident there is no data race. Otherwise, it must restore the pin count to its prior value by incrementing the `DecreasePinCount` value. This design allows the use of a single atomic `fetch_add` instruction to pin a slot (happy path), instead of a more expensive CAS instruction. + +This state machine mechanism essentially implements a non-blocking reader/writer locking system for a single cache slot. To reiterate: if the pin count is non-zero, this means there are read locks held against the slot; it therefore must not be changed. Conversely, if the slot is in the Invalid state, this means there is a unique (exclusive) write lock held against the slot; whoever called `evict()` to force the slot into this state (or whoever constructed the slot object) is free to modify the slot key and value without fear of a data race. + + +#### LRU Maintenance + +A new field, `std::atomic latest_use_`, will be added to the class `llfs::CacheSlot`. This will be updated with a new logical time stamp (LTS) whenever that slot is accessed from the cache. The LTS values will be provided by a new singleton class `llfs::LRUClock`. The LRU clock will work by maintaining thread-local `i64` counters which are added to a global linked list the first time the thread-local object is created (on a new thread); the counter will later be removed from the global linked list in the destructor for the counter class (`llfs::LRUClock::LocalCounter`). The global linked list of counters will be protected by a mutex. When the `LRUClock` singleton is first initialized, it will create a background thread (`std::thread`) that will periodically synchronize the thread-local counters via the following procedure: + + 1. Lock the list mutex + 2. Iterate over all the `LocalCounter` objects, saving the maximum value in a field of `LRUClock` + 3. Do a second pass over all the `LocalCounter`s, this time clamping their value to at least the maximum observed in step 2 + +This will keep the thread-local LTS counters from drifting too far from each other over time. The `LRUClock` background thread will sleep for an average of 500 microseconds, with random jitter, in between each synchronization, so as not to impose much overhead on the rest of the system (this is something we can easily tune later if it turns out not to be a good choice). + +The `LRUClock` class will provide the following interface: + +```c++ + /** \brief Returns the current thread's local counter. + */ + static i64 read_local() noexcept; + + /** \brief Increments the current thread's local counter, returning the old value. + */ + static i64 advance_local() noexcept; + + /** \brief Returns the last observed maximum count (over all thread-local values); this may be + * slightly out of date, as it is only updated by the background sync thread. + */ + static i64 read_global() noexcept; +``` + +Note that the maximum observed LTS value from step 2 above is saved in between rounds of synchronization so that LTS values continue to move forward even if all threads with a local counter happen to go away at some point. + +#### Lock-Free Page Cache Index + +##### Proposed Design + +We propose the following changes to the existing design: + +1. Remove the generality of `Cache` and `CacheSlot`, replacing these with concrete classes that explicitly name `llfs::PageId` as the key type, and `batt::Latch>` as the value type. +2. Replace `CacheSlot` with `llfs::PageCacheSlot`, as described below +3. Replace `Cache` with two types that separate the concerns currently both handled inside `Cache`: + 1. `llfs::PageDeviceCache` implements a per-`PageDevice` physical-page-index to cache slot index (using an array of atomic `u64` values) + 2. `llfs::PageCacheSlot::Pool` implements a shared pool of cache slots; one pool can be shared among many `PageDeviceCache` objects +4. Simplify the design of the `PageCacheSlot` state update mechanism; we don't really need two counters for increase and decrease of pin count, and we can also avoid heap-allocating the `Latch` object in favor of using `Optional>>`. diff --git a/src/llfs/ioring_log_flush_op.hpp b/src/llfs/ioring_log_flush_op.hpp index 2685d5f..6f2cc90 100644 --- a/src/llfs/ioring_log_flush_op.hpp +++ b/src/llfs/ioring_log_flush_op.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include diff --git a/src/llfs/lru_clock.cpp b/src/llfs/lru_clock.cpp new file mode 100644 index 0000000..a5e1d5d --- /dev/null +++ b/src/llfs/lru_clock.cpp @@ -0,0 +1,170 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#include +// + +namespace llfs { + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +LRUClock::LocalCounter::LocalCounter() noexcept : value{0} +{ + LRUClock::instance().add_local_counter(*this); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +LRUClock::LocalCounter::~LocalCounter() noexcept +{ + LRUClock::instance().remove_local_counter(*this); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/*static*/ auto LRUClock::instance() noexcept -> Self& +{ + // Leak instance_ to avoid shutdown destructor ordering issues. + // + static Self* instance_ = new Self; + + return *instance_; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/*static*/ LRUClock::LocalCounter& LRUClock::thread_local_counter() noexcept +{ + thread_local LocalCounter counter_; + + return counter_; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/*static*/ i64 LRUClock::read_local() noexcept +{ + return Self::thread_local_counter().value.load(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/*static*/ i64 LRUClock::advance_local() noexcept +{ + return Self::thread_local_counter().value.fetch_add(1); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/*static*/ i64 LRUClock::read_global() noexcept +{ + return Self::instance().read_observed_count(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +LRUClock::LRUClock() noexcept + : sync_thread_{[this] { + this->run(); + }} +{ + this->sync_thread_.detach(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void LRUClock::run() noexcept +{ + static_assert(kMinSyncDelayUsec <= kMaxSyncDelayUsec); + + std::random_device rand_dev; + std::default_random_engine rng(rand_dev()); + std::uniform_int_distribution pick_jitter{ + 0, + Self::kMaxSyncDelayUsec - Self::kMinSyncDelayUsec, + }; + + // Loop forever, waiting and synchronizing thread-local counters. + // + for (;;) { + // Pick a delay with random jitter. + // + const i64 delay_usec = Self::kMinSyncDelayUsec + pick_jitter(rng); + + // Wait... + // + std::this_thread::sleep_for(std::chrono::microseconds(delay_usec)); + + // Synchronize the thread-local counters; this will update this->observed_count_. + // + this->sync_local_counters(); + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void LRUClock::sync_local_counters() noexcept +{ + std::unique_lock lock{this->mutex_}; + + i64 max_value = this->observed_count_; + + // On the first pass, figure out the maximum counter value. + // + for (LocalCounter& counter : this->counter_list_) { + max_value = std::max(max_value, counter.value.load()); + } + + // Save the observed max counter value so that we continue to advance, even if all threads + // terminate. + // + this->observed_count_ = max_value; + + // On the second pass, use CAS to make sure that all local counters are at least at the + // `max_value` calculated above. + // + for (LocalCounter& counter : this->counter_list_) { + i64 observed = counter.value.load(); + while (observed < max_value) { + if (counter.value.compare_exchange_weak(observed, max_value)) { + break; + } + } + } + + // Done! +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void LRUClock::add_local_counter(LocalCounter& counter) noexcept +{ + std::unique_lock lock{this->mutex_}; + + this->counter_list_.push_back(counter); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void LRUClock::remove_local_counter(LocalCounter& counter) noexcept +{ + std::unique_lock lock{this->mutex_}; + + this->counter_list_.erase(this->counter_list_.iterator_to(counter)); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +i64 LRUClock::read_observed_count() noexcept +{ + std::unique_lock lock{this->mutex_}; + + return this->observed_count_; +} + +} //namespace llfs diff --git a/src/llfs/lru_clock.hpp b/src/llfs/lru_clock.hpp new file mode 100644 index 0000000..748bfea --- /dev/null +++ b/src/llfs/lru_clock.hpp @@ -0,0 +1,154 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#pragma once +#ifndef LLFS_LRU_CLOCK_HPP +#define LLFS_LRU_CLOCK_HPP + +#include +// + +#include +#include + +#include +#include +#include +#include +#include + +namespace llfs { + +/** \brief A fast (if slightly inaccurate) logical timestamp maintainer, suitable for comparing + * approximate last-time-of-usage for different objects. + * + * The LRUClock is comprised of two elements: + * + * 1. A (set of) thread-local monotonic event counters + * 2. A background task that periodically synchronizes the thread-local counters + * + * The thread-local event counters are atomic, but since they are almost always accessed from a + * single thread (and we use the weakest possible memory order/fencing), they cause minimal problems + * in terms of cache stalls and contention. Every time a thread reads its counter, it advances it + * by one. Periodically (every 0.5 to 1.5 ms by default, with pseudo-random jitter), the background + * task wakes up and cycles over all the counters, setting them each to the highest value found in + * any of the others (it must, in the worst case, cycle through all twice). + * + * Every time a thread wants to use the LRU clock for the first time, it must first acquire a global + * mutex lock in order to add its local counter to a linked-list. The background synchronization + * task must also grab this mutex to make sure there are no races on the list while it updates all + * the thread-local counters. When a thread exits, it acquires the mutex and removes its counter. + * + * Note: a regular thread never needs to block in order to simply acquire a logical timestamp; it + * only needs to do so when it starts or stops. + */ +class LRUClock +{ + public: + using Self = LRUClock; + + //----- --- -- - - - - + static constexpr i64 kMinSyncDelayUsec = 500; + static constexpr i64 kMaxSyncDelayUsec = 1500; + //----- --- -- - - - - + + class LocalCounter; + + /** \brief A linked-list node; this is the base type for LocalCounter. + */ + using LocalCounterHook = boost::intrusive::list_base_hook>; + + /** \brief A per-thread atomic counter. + */ + class LocalCounter : public LocalCounterHook + { + public: + explicit LocalCounter() noexcept; + + ~LocalCounter() noexcept; + + std::atomic value{0}; + }; + + /** \brief Alias for the counter linked-list collection type. + */ + using LocalCounterList = + boost::intrusive::list>; + + //+++++++++++-+-+--+----- --- -- - - - - + + /** \brief Returns a reference to the global instance of the LRUClock. + */ + static Self& instance() noexcept; + + /** \brief Returns a reference to the current thread's counter object. + */ + static LocalCounter& thread_local_counter() noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + + /** \brief Returns the current thread's local counter. + */ + static i64 read_local() noexcept; + + /** \brief Increments the current thread's local counter, returning the old value. + */ + static i64 advance_local() noexcept; + + /** \brief Returns the last observed maximum count (over all thread-local values); this may be + * slightly out of date, as it is only updated by the background sync thread. + */ + static i64 read_global() noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + private: + LRUClock() noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + + /** \brief The background counter sync thread entry point. + */ + void run() noexcept; + + /** \brief Locks the counter list mutex, then iterates through all thread-local counters, + * atomically updating each to be at least the maximum observed value. + * + * This takes two passes through the list, so it's not 100% guaranteed that all counters will be + * the same by the end. + */ + void sync_local_counters() noexcept; + + /** \brief Adds the passed LocalCounter to the global list. + */ + void add_local_counter(LocalCounter& counter) noexcept; + + /** \brief Removes the passed LocalCounter from the global list. + */ + void remove_local_counter(LocalCounter& counter) noexcept; + + /** \brief Returns the maximum count value from the last time sync_local_counters() was called. + */ + i64 read_observed_count() noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + + std::mutex mutex_; + LocalCounterList counter_list_; + std::thread sync_thread_; + + // Keeps track of the synchronized counter value as it advances, so we don't go backwards if all + // the threads go away temporarily at some point. + // + i64 observed_count_ = 0; +}; + +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ + +} //namespace llfs + +#endif // LLFS_LRU_CLOCK_HPP diff --git a/src/llfs/lru_clock.test.cpp b/src/llfs/lru_clock.test.cpp new file mode 100644 index 0000000..8665906 --- /dev/null +++ b/src/llfs/lru_clock.test.cpp @@ -0,0 +1,183 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#include +// +#include + +#include + +#include +#include + +#include + +namespace { + +// Test Goals: +// - local counts are independent on different threads +// - local counts are monotonic on a given thread +// - llfs::LRUClock::kMaxSyncDelayUsec is an upper bound on the time two threads' counters can be +// out of sync +// +// Test Plan: +// 1. start N threads, update counts on each +// - maintain list of which count values were seen on each thread +// - verify local monotonicity, global independence +// 2. same as (1), but add at least one "slow" thread; verify that it jumps ahead after sleeping +// for the max sync delay. +// +// + +using namespace llfs::int_types; + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 1. +// +TEST(LruClockTest, PerThreadUpdate) +{ + const usize kNumThreads = std::thread::hardware_concurrency(); + const usize kUpdatesPerThread = 1000; + + std::vector> per_thread_values(kNumThreads, std::vector(kUpdatesPerThread)); + + std::vector threads; + + for (usize i = 0; i < kNumThreads; ++i) { + threads.emplace_back([i, &per_thread_values] { + for (usize j = 0; j < kUpdatesPerThread; ++j) { + const i64 value = llfs::LRUClock::advance_local(); + per_thread_values[i][j] = value; + } + }); + } + + for (std::thread& t : threads) { + t.join(); + } + + std::map count_per_value; + + for (const std::vector& values : per_thread_values) { + ASSERT_EQ(values.size(), kUpdatesPerThread); + ++count_per_value[values[0]]; + for (usize i = 1; i < kUpdatesPerThread; ++i) { + EXPECT_LT(values[i - 1], values[i]); + ++count_per_value[values[i]]; + } + } + + usize repeated_values = 0; + for (const auto& [value, count] : count_per_value) { + if (count > 1) { + ++repeated_values; + } + } + + EXPECT_GT(repeated_values, 0u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 2. +// +void run_sync_update_test(const usize kNumFastThreads) +{ + const usize kUpdatesPerThread = 50 * 1000 * 1000; + const usize kSlowThreadReads = 20; + + std::vector slow_thread_values(kSlowThreadReads); + std::thread slow_thread{[&slow_thread_values] { + for (usize i = 0; i < kSlowThreadReads; ++i) { + std::this_thread::sleep_for( + std::chrono::microseconds(llfs::LRUClock::kMaxSyncDelayUsec * 10)); + slow_thread_values[i] = llfs::LRUClock::read_local(); + } + }}; + + std::vector fast_threads; + std::vector> max_fast_thread_value(kNumFastThreads); + + for (usize i = 0; i < kNumFastThreads; ++i) { + fast_threads.emplace_back([i, &max_fast_thread_value] { + i64 last_value = -1; + for (usize j = 0; j < kUpdatesPerThread; ++j) { + const i64 value = llfs::LRUClock::advance_local(); + EXPECT_GT(value, last_value); + *max_fast_thread_value[i] = std::max(*max_fast_thread_value[i], value); + } + }); + } + + for (std::thread& t : fast_threads) { + t.join(); + } + + slow_thread.join(); + + i64 max_count = 0; + for (batt::CpuCacheLineIsolated& count : max_fast_thread_value) { + max_count = std::max(max_count, *count); + } + + const i64 max_synced_count = llfs::LRUClock::read_global(); + + EXPECT_LE(max_synced_count, max_count); + + for (usize i = 1; i < kSlowThreadReads; ++i) { + if (slow_thread_values[i] >= max_synced_count) { + for (; i < kSlowThreadReads; ++i) { + EXPECT_EQ(slow_thread_values[i], max_synced_count); + } + break; + } + if (slow_thread_values[i] == 0 && slow_thread_values[i - 1] == 0) { + continue; + } + EXPECT_GT(slow_thread_values[i] - slow_thread_values[i - 1], 50) + << BATT_INSPECT(i) << BATT_INSPECT(slow_thread_values[i]) + << BATT_INSPECT(slow_thread_values[i - 1]) << BATT_INSPECT(max_synced_count) + << BATT_INSPECT(max_count); + } + + EXPECT_GT(slow_thread_values.back(), kUpdatesPerThread / 2); +} + +TEST(LruClockTest, SyncUpdate1) +{ + run_sync_update_test(1); +} +TEST(LruClockTest, SyncUpdate2) +{ + run_sync_update_test(2); +} +TEST(LruClockTest, SyncUpdate4) +{ + run_sync_update_test(4); +} +TEST(LruClockTest, SyncUpdate8) +{ + run_sync_update_test(8); +} +TEST(LruClockTest, SyncUpdate16) +{ + run_sync_update_test(16); +} +TEST(LruClockTest, SyncUpdate32) +{ + run_sync_update_test(32); +} +TEST(LruClockTest, SyncUpdate64) +{ + run_sync_update_test(64); +} +TEST(LruClockTest, SyncUpdate128) +{ + run_sync_update_test(128); +} + +} // namespace diff --git a/src/llfs/page_allocator.test.cpp b/src/llfs/page_allocator.test.cpp index 17ef062..04f7f4f 100644 --- a/src/llfs/page_allocator.test.cpp +++ b/src/llfs/page_allocator.test.cpp @@ -476,8 +476,13 @@ class PageAllocatorModel } static std::atomic step_count{0}; - ++step_count; - LLFS_LOG_INFO_EVERY_N(25000) << BATT_INSPECT(step_count); + thread_local usize local_count{0}; + ++local_count; + if ((local_count & 0xfff) == 0) { + step_count.fetch_add(local_count); + local_count = 0; + LLFS_LOG_INFO_EVERY_N(5) << BATT_INSPECT(step_count); + } LLFS_VLOG(2) << "Entered PageAllocatorModel::step()"; diff --git a/src/llfs/page_cache.cpp b/src/llfs/page_cache.cpp index eb27045..75ed8db 100644 --- a/src/llfs/page_cache.cpp +++ b/src/llfs/page_cache.cpp @@ -20,6 +20,13 @@ namespace llfs { +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +usize get_page_size(const PageCache::PageDeviceEntry* entry) +{ + return entry ? get_page_size(entry->arena) : 0; +} + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // /*explicit*/ PageCache::PageDeleterImpl::PageDeleterImpl(PageCache& page_cache) noexcept @@ -93,43 +100,71 @@ PageCache::PageCache(std::vector&& storage_pool, const PageCacheOptions& options) noexcept : options_{options} , metrics_{} - , storage_pool_{std::move(storage_pool)} - , arenas_by_size_log2_{} - , arenas_by_device_id_{} - , impl_for_size_log2_{} , page_readers_{std::make_shared>()} { + this->cache_slot_pool_by_page_size_log2_.fill(nullptr); + + // Find the maximum page device id value. + // + page_device_id_int max_page_device_id = 0; + for (const PageArena& arena : storage_pool) { + max_page_device_id = std::max(max_page_device_id, arena.device().get_id()); + } + + // Populate this->page_devices_. + // + this->page_devices_.resize(max_page_device_id + 1); + for (PageArena& arena : storage_pool) { + const page_device_id_int device_id = arena.device().get_id(); + const auto page_size_log2 = batt::log2_ceil(arena.device().page_size()); + + BATT_CHECK_EQ(PageSize{1} << page_size_log2, arena.device().page_size()) + << "Page sizes must be powers of 2!"; + + BATT_CHECK_LT(page_size_log2, kMaxPageSizeLog2); + + // Create a slot pool for this page size if we haven't already done so. + // + if (!this->cache_slot_pool_by_page_size_log2_[page_size_log2]) { + this->cache_slot_pool_by_page_size_log2_[page_size_log2] = PageCacheSlot::Pool::make_new( + /*n_slots=*/this->options_.max_cached_pages_per_size_log2[page_size_log2], + /*name=*/batt::to_string("size_", u64{1} << page_size_log2)); + } + + BATT_CHECK_EQ(this->page_devices_[device_id], nullptr) + << "Duplicate entries found for the same device id!" << BATT_INSPECT(device_id); + + this->page_devices_[device_id] = std::make_unique( // + std::move(arena), // + batt::make_copy(this->cache_slot_pool_by_page_size_log2_[page_size_log2]) // + ); + + // We will sort these later. + // + this->page_devices_by_page_size_.emplace_back(this->page_devices_[device_id].get()); + } + BATT_CHECK_EQ(this->page_devices_by_page_size_.size(), storage_pool.size()); + // Sort the storage pool by page size (MUST be first). // - std::sort(this->storage_pool_.begin(), this->storage_pool_.end(), PageSizeOrder{}); + std::sort(this->page_devices_by_page_size_.begin(), this->page_devices_by_page_size_.end(), + PageSizeOrder{}); // Index the storage pool into groups of arenas by page size. // for (usize size_log2 = 6; size_log2 < kMaxPageSizeLog2; ++size_log2) { - auto iter_pair = std::equal_range(this->storage_pool_.begin(), this->storage_pool_.end(), + auto iter_pair = std::equal_range(this->page_devices_by_page_size_.begin(), + this->page_devices_by_page_size_.end(), PageSize{u32{1} << size_log2}, PageSizeOrder{}); - this->arenas_by_size_log2_[size_log2] = as_slice( - this->storage_pool_.data() + std::distance(this->storage_pool_.begin(), iter_pair.first), - as_range(iter_pair).size()); - - if (!this->arenas_by_size_log2_[size_log2].empty()) { - this->impl_for_size_log2_[size_log2] = - CacheImpl::make_new(/*n_slots=*/this->options_.max_cached_pages_per_size_log2[size_log2], - /*name=*/batt::to_string("size_", u64{1} << size_log2)); - } + this->page_devices_by_page_size_log2_[size_log2] = + as_slice(this->page_devices_by_page_size_.data() + + std::distance(this->page_devices_by_page_size_.begin(), iter_pair.first), + as_range(iter_pair).size()); } - // Index the storage pool by page device id. + // Register metrics. // - for (PageArena& arena : this->storage_pool_) { - const bool already_present = - !this->arenas_by_device_id_.emplace(arena.device().get_id(), &arena).second; - BATT_CHECK(!already_present) - << "All device ids within a storage pool must be unique; found duplicate: " - << arena.device().get_id(); - } - const auto metric_name = [this](std::string_view property) { return batt::to_string("PageCache_", property); }; @@ -218,8 +253,10 @@ batt::Status PageCache::register_page_reader(const PageLayoutId& layout_id, cons // void PageCache::close() { - for (PageArena& arena : this->storage_pool_) { - arena.close(); + for (const std::unique_ptr& entry : this->page_devices_) { + if (entry) { + entry->arena.close(); + } } } @@ -227,8 +264,10 @@ void PageCache::close() // void PageCache::join() { - for (PageArena& arena : this->storage_pool_) { - arena.join(); + for (const std::unique_ptr& entry : this->page_devices_) { + if (entry) { + entry->arena.join(); + } } } @@ -263,13 +302,14 @@ StatusOr> PageCache::allocate_page_of_size_log2( LatencyTimer alloc_timer{this->metrics_.allocate_page_alloc_latency}; - Slice arenas = this->arenas_for_page_size_log2(size_log2); + Slice device_entries = this->devices_with_page_size_log2(size_log2); // TODO [tastolfi 2021-09-08] If the caller wants to wait, which device should we wait on? First // available? Random? Round-Robin? // for (auto wait_arg : {batt::WaitForResource::kFalse, batt::WaitForResource::kTrue}) { - for (const PageArena& arena : arenas) { + for (PageDeviceEntry* device_entry : device_entries) { + PageArena& arena = device_entry->arena; StatusOr page_id = arena.allocator().allocate_page(wait_arg, cancel_token); if (!page_id.ok()) { if (page_id.status() == batt::StatusCode::kResourceExhausted) { @@ -344,10 +384,11 @@ Status PageCache::attach(const boost::uuids::uuid& user_id, slot_offset_type slo } }); - for (const PageArena& arena : this->storage_pool_) { - auto arena_status = arena.allocator().attach_user(user_id, slot_offset); + for (PageDeviceEntry* entry : this->all_devices()) { + BATT_CHECK_NOT_NULLPTR(entry); + auto arena_status = entry->arena.allocator().attach_user(user_id, slot_offset); BATT_REQUIRE_OK(arena_status); - attached_arenas.emplace_back(&arena); + attached_arenas.emplace_back(&entry->arena); } success = true; @@ -359,8 +400,9 @@ Status PageCache::attach(const boost::uuids::uuid& user_id, slot_offset_type slo // Status PageCache::detach(const boost::uuids::uuid& user_id, slot_offset_type slot_offset) { - for (const PageArena& arena : this->storage_pool_) { - auto arena_status = arena.allocator().detach_user(user_id, slot_offset); + for (PageDeviceEntry* entry : this->all_devices()) { + BATT_CHECK_NOT_NULLPTR(entry); + auto arena_status = entry->arena.allocator().detach_user(user_id, slot_offset); BATT_REQUIRE_OK(arena_status); } // @@ -378,28 +420,29 @@ void PageCache::prefetch_hint(PageId page_id) //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -Slice PageCache::all_arenas() const +Slice PageCache::all_devices() const { - return as_slice(this->storage_pool_); + return as_slice(this->page_devices_by_page_size_); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -Slice PageCache::arenas_for_page_size(usize size) const +Slice PageCache::devices_with_page_size(usize size) const { const usize size_log2 = batt::log2_ceil(size); BATT_CHECK_EQ(size, usize{1} << size_log2) << "page size must be a power of 2"; - return this->arenas_for_page_size_log2(size_log2); + return this->devices_with_page_size_log2(size_log2); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -Slice PageCache::arenas_for_page_size_log2(usize size_log2) const +Slice PageCache::devices_with_page_size_log2( + usize size_log2) const { BATT_CHECK_LT(size_log2, kMaxPageSizeLog2); - return this->arenas_by_size_log2_[size_log2]; + return this->page_devices_by_page_size_log2_[size_log2]; } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - @@ -413,12 +456,12 @@ const PageArena& PageCache::arena_for_page_id(PageId page_id) const // const PageArena& PageCache::arena_for_device_id(page_device_id_int device_id_val) const { - auto iter = this->arenas_by_device_id_.find(device_id_val); - - BATT_CHECK_NE(iter, this->arenas_by_device_id_.end()) + BATT_CHECK_LT(device_id_val, this->page_devices_.size()) << "the specified page_id's device is not in the storage pool for this cache"; - return *iter->second; + BATT_CHECK_NOT_NULLPTR(this->page_devices_[device_id_val]); + + return this->page_devices_[device_id_val]->arena; } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - @@ -436,19 +479,18 @@ StatusOr PageCache::put_view(std::shared_ptr&& view, return {::llfs::make_status(StatusCode::kPutViewUnknownLayoutId)}; } - const page_id_int id_val = view->page_id().int_value(); - BATT_CHECK_NE(id_val, kInvalidPageId); - const PageView* p_view = view.get(); - auto latch = std::make_shared>>(); - latch->set_value(std::move(view)); + const PageId page_id = view->page_id(); + + PageDeviceEntry* const entry = this->get_device_for_page(page_id); + BATT_CHECK_NOT_NULLPTR(entry); // Attempt to insert the new page view into the cache. // - const auto page_id = PageId{id_val}; - auto pinned_cache_slot = this->impl_for_page(page_id).find_or_insert(id_val, [&] { - return std::move(latch); - }); + batt::StatusOr pinned_cache_slot = + entry->cache.find_or_insert(page_id, [&view](const PageCacheSlot::PinnedRef& pinned_ref) { + pinned_ref->set_value(std::move(view)); + }); this->track_new_page_event(NewPageTracker{ .ts = 0, @@ -473,7 +515,7 @@ StatusOr PageCache::put_view(std::shared_ptr&& view, // void PageCache::purge(PageId page_id, u64 callers, u64 job_id) { - if (page_id.int_value() != kInvalidPageId) { + if (page_id.is_valid()) { this->track_new_page_event(NewPageTracker{ .ts = 0, .job_id = job_id, @@ -482,8 +524,23 @@ void PageCache::purge(PageId page_id, u64 callers, u64 job_id) .event_id = (int)NewPageTracker::Event::kPurge, }); - this->impl_for_page(page_id).erase(page_id.int_value()); + PageDeviceEntry* const entry = this->get_device_for_page(page_id); + BATT_CHECK_NOT_NULLPTR(entry); + + entry->cache.erase(page_id); + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +PageCache::PageDeviceEntry* PageCache::get_device_for_page(PageId page_id) +{ + const page_device_id_int device_id = PageIdFactory::get_device_id(page_id); + if (BATT_HINT_FALSE(device_id >= this->page_devices_.size())) { + return nullptr; } + + return this->page_devices_[device_id].get(); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - @@ -502,136 +559,123 @@ StatusOr PageCache::get_page_with_layout_in_job( return ::llfs::make_status(StatusCode::kPageIdInvalid); } - BATT_ASSIGN_OK_RESULT(CacheImpl::PinnedSlot cache_slot, // + BATT_ASSIGN_OK_RESULT(PageCacheSlot::PinnedRef pinned_slot, // this->find_page_in_cache(page_id, require_layout, ok_if_not_found)); BATT_ASSIGN_OK_RESULT(StatusOr> loaded, // - cache_slot->await()); + pinned_slot->await()); - BATT_CHECK_EQ(loaded->get() != nullptr, bool{cache_slot}); + BATT_CHECK_EQ(loaded->get() != nullptr, bool{pinned_slot}); - return PinnedPage{loaded->get(), std::move(cache_slot)}; + return PinnedPage{loaded->get(), std::move(pinned_slot)}; } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -auto PageCache::impl_for_page(PageId page_id) -> CacheImpl& +auto PageCache::find_page_in_cache(PageId page_id, const Optional& required_layout, + OkIfNotFound ok_if_not_found) + -> batt::StatusOr { - const u32 page_size = this->arena_for_page_id(page_id).device().page_size(); - const usize page_size_log2 = batt::log2_ceil(page_size); + if (!page_id) { + return PageCacheSlot::PinnedRef{}; + } - BATT_CHECK_LT(page_size_log2, this->impl_for_size_log2_.size()); + PageDeviceEntry* const entry = this->get_device_for_page(page_id); + BATT_CHECK_NOT_NULLPTR(entry); - return *this->impl_for_size_log2_[page_size_log2]; + return entry->cache.find_or_insert(page_id, [&](const PageCacheSlot::PinnedRef& pinned_slot) { + this->async_load_page_into_slot(pinned_slot, required_layout, ok_if_not_found); + }); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -auto PageCache::find_page_in_cache(PageId page_id, const Optional& required_layout, - OkIfNotFound ok_if_not_found) - -> batt::StatusOr +void PageCache::async_load_page_into_slot(const PageCacheSlot::PinnedRef& pinned_slot, + const Optional& required_layout, + OkIfNotFound ok_if_not_found) { - if (!page_id) { - return CacheImpl::PinnedSlot{}; - } + const PageId page_id = pinned_slot.key(); - std::shared_ptr>> latch = nullptr; + PageDeviceEntry* const entry = this->get_device_for_page(page_id); + BATT_CHECK_NOT_NULLPTR(entry); - batt::StatusOr pinned_slot = - this->impl_for_page(page_id).find_or_insert(page_id.int_value(), [&latch] { - latch = std::make_shared>>(); - return latch; - }); + entry->arena.device().read( + page_id, + /*read_handler=*/[this, required_layout, ok_if_not_found, - if (latch) { - BATT_CHECK(pinned_slot.ok()); - - BATT_DEBUG_INFO("PageCache::find_page_in_cache - starting async page read: " - << BATT_INSPECT(page_id) << BATT_INSPECT(batt::Task::current_stack_pos())); - - this->arena_for_page_id(page_id).device().read( - page_id, - /*read_handler=*/[ - // Save the metrics and start time so we can record read latency etc. - // - p_metrics = &this->metrics_, - start_time = std::chrono::steady_clock::now(), - - // We need to update the latch, so retain it in the capture. - // - captured_latch = latch, - - // Keep a copy of pinned_slot while loading the page to limit the - // amount of churn under heavy read loads. - // - pinned_slot = batt::make_copy(*pinned_slot), - - // Save a shared_ptr to the typed page view readers so we can parse the - // page data. - // - page_readers = this->page_readers_, // - required_layout, this, page_id, ok_if_not_found - - ](StatusOr>&& result) mutable { - BATT_DEBUG_INFO("PageCache::find_page_in_cache - read handler"); - - auto cleanup = batt::finally([&] { - pinned_slot = {}; - }); - - auto latch = std::move(captured_latch); - if (!result.ok()) { - if (!ok_if_not_found) { - LLFS_LOG_WARNING() << "recent events for" << BATT_INSPECT(page_id) - << BATT_INSPECT(ok_if_not_found) << " (now=" << this->history_end_ - << "):" - << batt::dump_range( - this->find_new_page_events(page_id) | seq::collect_vec(), - batt::Pretty::True); - } - latch->set_value(result.status()); - return; + // Save the metrics and start time so we can record read latency etc. + // + start_time = std::chrono::steady_clock::now(), + + // Keep a copy of pinned_slot while loading the page to limit the + // amount of churn under heavy read loads. + // + pinned_slot = batt::make_copy(pinned_slot) + + ](StatusOr>&& result) mutable { + const PageId page_id = pinned_slot.key(); + auto* p_metrics = &this->metrics_; + auto page_readers = this->page_readers_; + + BATT_DEBUG_INFO("PageCache::find_page_in_cache - read handler"); + + auto cleanup = batt::finally([&] { + pinned_slot = {}; + }); + + batt::Latch>* latch = pinned_slot.value(); + BATT_CHECK_NOT_NULLPTR(latch); + + if (!result.ok()) { + if (!ok_if_not_found) { + LLFS_LOG_WARNING() << "recent events for" << BATT_INSPECT(page_id) + << BATT_INSPECT(ok_if_not_found) << " (now=" << this->history_end_ + << "):" + << batt::dump_range( + this->find_new_page_events(page_id) | seq::collect_vec(), + batt::Pretty::True); } - p_metrics->page_read_latency.update(start_time); - - // Page read succeeded! Find the right typed reader. - // - std::shared_ptr& page_data = *result; - p_metrics->total_bytes_read.add(page_data->size()); - - const PageLayoutId layout_id = [&] { - if (required_layout) { - return *required_layout; - } - return get_page_header(*page_data).layout_id; - }(); - - PageReader reader_for_layout; - { - auto locked = page_readers->lock(); - auto iter = locked->find(layout_id); - if (iter == locked->end()) { - LLFS_LOG_ERROR() << "Unknown page layout: " - << batt::c_str_literal( - std::string_view{(const char*)&layout_id, sizeof(layout_id)}) - << BATT_INSPECT(page_id); - latch->set_value(make_status(StatusCode::kNoReaderForPageViewType)); - return; - } - reader_for_layout = iter->second.page_reader; + latch->set_value(result.status()); + return; + } + p_metrics->page_read_latency.update(start_time); + + // Page read succeeded! Find the right typed reader. + // + std::shared_ptr& page_data = *result; + p_metrics->total_bytes_read.add(page_data->size()); + + PageLayoutId layout_id = get_page_header(*page_data).layout_id; + if (required_layout) { + if (*required_layout != layout_id) { + latch->set_value(::llfs::make_status(StatusCode::kPageHeaderBadLayoutId)); + return; } - // ^^ Release the page_readers mutex ASAP + } - StatusOr> page_view = - reader_for_layout(std::move(page_data)); - if (page_view.ok()) { - BATT_CHECK_EQ(page_view->use_count(), 1u); + PageReader reader_for_layout; + { + auto locked = page_readers->lock(); + auto iter = locked->find(layout_id); + if (iter == locked->end()) { + LLFS_LOG_ERROR() << "Unknown page layout: " + << batt::c_str_literal( + std::string_view{(const char*)&layout_id, sizeof(layout_id)}) + << BATT_INSPECT(page_id); + latch->set_value(make_status(StatusCode::kNoReaderForPageViewType)); + return; } - latch->set_value(std::move(page_view)); - }); - } + reader_for_layout = iter->second.page_reader; + } + // ^^ Release the page_readers mutex ASAP - return pinned_slot; + StatusOr> page_view = + reader_for_layout(std::move(page_data)); + if (page_view.ok()) { + BATT_CHECK_EQ(page_view->use_count(), 1u); + } + latch->set_value(std::move(page_view)); + }); } //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - diff --git a/src/llfs/page_cache.hpp b/src/llfs/page_cache.hpp index 2553154..05e6d9f 100644 --- a/src/llfs/page_cache.hpp +++ b/src/llfs/page_cache.hpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -96,14 +97,32 @@ inline std::ostream& operator<<(std::ostream& out, const NewPageTracker& t) class PageCache : public PageLoader { public: - using CacheImpl = Cache>>; - struct PageReaderFromFile { PageReader page_reader; const char* file; int line; }; + /** \brief All the per-PageDevice state for a single device in the storage pool. + */ + struct PageDeviceEntry { + explicit PageDeviceEntry(PageArena&& arena, + boost::intrusive_ptr&& slot_pool) noexcept + : arena{std::move(arena)} + , cache{this->arena.device().page_ids(), std::move(slot_pool)} + { + } + + /** \brief The PageDevice and PageAllocator. + */ + PageArena arena; + + /** \brief A per-device page cache; shares a PageCacheSlot::Pool with all other PageDeviceEntry + * objects that have the same page size. + */ + PageDeviceCache cache; + }; + class PageDeleterImpl : public PageDeleter { public: @@ -168,11 +187,11 @@ class PageCache : public PageLoader Status detach(const boost::uuids::uuid& user_id, slot_offset_type slot_offset); - Slice arenas_for_page_size_log2(usize size_log2) const; + Slice devices_with_page_size_log2(usize size_log2) const; - Slice arenas_for_page_size(usize size) const; + Slice devices_with_page_size(usize size) const; - Slice all_arenas() const; + Slice all_devices() const; const PageArena& arena_for_page_id(PageId id_val) const; @@ -203,12 +222,14 @@ class PageCache : public PageLoader // //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - - // Insert a newly built PageView into the cache. - // + //----- --- -- - - - - + /** \brief Inserts a newly built PageView into the cache. + */ StatusOr put_view(std::shared_ptr&& view, u64 callers, u64 job_id); - // Remove all cached data for the specified page. - // + //----- --- -- - - - - + /** \brief Removes all cached data for the specified page. + */ void purge(PageId id_val, u64 callers, u64 job_id); bool page_might_contain_key(PageId id, const KeyView& key) const; @@ -222,14 +243,16 @@ class PageCache : public PageLoader return this->metrics_; } - const CacheImpl::Metrics& metrics_for_page_size(PageSize page_size) const + const PageCacheSlot::Pool::Metrics& metrics_for_page_size(PageSize page_size) const { const i32 page_size_log2 = batt::log2_ceil(page_size); - BATT_CHECK_LT(static_cast(page_size_log2), this->impl_for_size_log2_.size()); - BATT_CHECK_NOT_NULLPTR(this->impl_for_size_log2_[page_size_log2]); + BATT_CHECK_LT(static_cast(page_size_log2), + this->cache_slot_pool_by_page_size_log2_.size()); + + BATT_CHECK_NOT_NULLPTR(this->cache_slot_pool_by_page_size_log2_[page_size_log2]); - return this->impl_for_size_log2_[page_size_log2]->metrics(); + return this->cache_slot_pool_by_page_size_log2_[page_size_log2]->metrics(); } private: @@ -245,11 +268,47 @@ class PageCache : public PageLoader //+++++++++++-+-+--+----- --- -- - - - - - CacheImpl& impl_for_page(PageId page_id); - - batt::StatusOr find_page_in_cache( + //----- --- -- - - - - + /** \brief Returns the PageDeviceEntry for the device that owns the given page. + * + * If the specified device (in the most-significant bits of `page_id`) isn't known by this + * PageCache, returns nullptr. + */ + PageDeviceEntry* get_device_for_page(PageId page_id); + + //----- --- -- - - - - + /** \brief Attempts to find the specified page (`page_id`) in the cache; if successful, the cache + * slot is pinned (so it can't be evicted) and a pinned reference is returned. Otherwise, we + * attempt to load the page. + * + * If the given page is not in-cache and a cache slot can't be evicted/allocated (because there + * are too many pinned pages), then this function returns llfs::StatusCode::kCacheSlotsFull. + * + * \param page_id The page to load + * + * \param required_layout If specified, then the layout of the page is checked and if it doesn't + * match the given identifier, llfs::StatusCode::kPageHeaderBadLayoutId is returned. + * + * \param ok_if_not_found Controls whether page-not-found log messages (WARNING) are emitted if + * the page isn't found; ok_if_not_found == false -> emit log warnings, ... == true -> don't + */ + batt::StatusOr find_page_in_cache( PageId page_id, const Optional& required_layout, OkIfNotFound ok_if_not_found); + //----- --- -- - - - - + /** \brief Populates the passed PageCacheSlot asynchronously by attempting to read the page from + * storage and setting the Latch value of the slot. + * + * \param required_layout If specified, then the layout of the page is checked and if it doesn't + * match the given identifier, the Latch is set to llfs::StatusCode::kPageHeaderBadLayoutId. + * + * \param ok_if_not_found Controls whether page-not-found log messages (WARNING) are emitted if + * the page isn't found; ok_if_not_found == false -> emit log warnings, ... == true -> don't + */ + void async_load_page_into_slot(const PageCacheSlot::PinnedRef& pinned_slot, + const Optional& required_layout, + OkIfNotFound ok_if_not_found); + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // The configuration passed in at creation time. @@ -260,25 +319,24 @@ class PageCache : public PageLoader // PageCacheMetrics metrics_; - // The arenas backing up this cache, sorted in ascending order of page size (each arena has a - // homogenous page size). + // The arenas backing up this cache, indexed by device id int. // - std::vector storage_pool_; + std::vector> page_devices_; + + // The contents of `storage_pool_`, sorted by non-decreasing page size. + // + std::vector page_devices_by_page_size_; // Slices of `this->storage_pool_` that group arenas by page size (log2). For example, // `this->arenas_by_size_log2_[12]` is the slice of `this->storage_pool_` comprised of // PageArenas whose page size is 4096. // - std::array, kMaxPageSizeLog2> arenas_by_size_log2_; - - // Index of `this->storage_pool_` by device id, for fast lookup from PageId to the PageArena that - // contains the page. - // - std::unordered_map arenas_by_device_id_; + std::array, kMaxPageSizeLog2> page_devices_by_page_size_log2_; - // Maintain cache maps from page id to the page data for each page size. + // A pool of cache slots for each page size. // - std::array, kMaxPageSizeLog2> impl_for_size_log2_; + std::array, kMaxPageSizeLog2> + cache_slot_pool_by_page_size_log2_; // A thread-safe shared map from PageLayoutId to PageReader function; layouts must be registered // with the PageCache so that we trace references during page recycling (aka garbage collection). diff --git a/src/llfs/page_cache_slot.cpp b/src/llfs/page_cache_slot.cpp new file mode 100644 index 0000000..5a5dd83 --- /dev/null +++ b/src/llfs/page_cache_slot.cpp @@ -0,0 +1,320 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#include +// + +namespace llfs { + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/*explicit*/ PageCacheSlot::PageCacheSlot(Pool& pool) noexcept : pool_{pool} +{ + BATT_CHECK(!this->is_pinned()); + BATT_CHECK(!this->is_valid()); + BATT_CHECK_EQ(this->ref_count(), 0); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +PageCacheSlot::~PageCacheSlot() noexcept +{ + BATT_CHECK(!this->is_pinned()) << BATT_INSPECT(this->pin_count()) + << BATT_INSPECT(this->ref_count()) << BATT_INSPECT((void*)this); + BATT_CHECK_EQ(this->ref_count(), 0); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +PageCacheSlot::Pool& PageCacheSlot::pool() const noexcept +{ + return this->pool_; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +usize PageCacheSlot::index() const noexcept +{ + return this->pool_.index_of(this); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +PageId PageCacheSlot::key() const +{ + return this->key_; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +batt::Latch>* PageCacheSlot::value() noexcept +{ + BATT_CHECK(this->value_); + return std::addressof(*this->value_); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +bool PageCacheSlot::is_valid() const noexcept +{ + return Self::is_valid(this->state_.load(std::memory_order_acquire)); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +bool PageCacheSlot::is_pinned() const noexcept +{ + return Self::is_pinned(this->state_.load(std::memory_order_acquire)); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +u32 PageCacheSlot::pin_count() const noexcept +{ + return Self::get_pin_count(this->state_.load(std::memory_order_acquire)); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +u64 PageCacheSlot::ref_count() const noexcept +{ + return this->ref_count_.load(std::memory_order_acquire); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void PageCacheSlot::add_ref() noexcept +{ + const auto observed_count = this->ref_count_.fetch_add(1, std::memory_order_relaxed); + if (observed_count == 0) { + this->notify_first_ref_acquired(); + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void PageCacheSlot::remove_ref() noexcept +{ + const auto observed_count = this->ref_count_.fetch_sub(1, std::memory_order_release); + BATT_CHECK_GT(observed_count, 0); + if (observed_count == 1) { + (void)this->ref_count_.load(std::memory_order_acquire); + this->notify_last_ref_released(); + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +auto PageCacheSlot::acquire_pin(PageId key, bool ignore_key) noexcept -> PinnedRef +{ + const auto old_state = this->state_.fetch_add(kPinCountDelta, std::memory_order_acquire); + const auto new_state = old_state + kPinCountDelta; + const bool newly_pinned = !this->is_pinned(old_state); + + BATT_CHECK_EQ(new_state & Self::kOverflowMask, 0); + + BATT_CHECK(this->is_pinned(new_state)); + + BATT_SUPPRESS_IF_GCC("-Wmaybe-uninitialized") + + // We must always do this, even if the pin fails, so that we don't have an unmatched + // `remove_ref` in `release_pin` below. + // + if (newly_pinned) { + this->add_ref(); + } + + // If the pin_count > 1 (because of the fetch_add above) and the slot is valid, it is safe to read + // the key. If the key doesn't match, release the ref and return failure. + // + if (!this->is_valid(old_state) || + (!ignore_key && (!this->key_.is_valid() || this->key_ != key))) { + this->release_pin(); + return PinnedRef{}; + } + + BATT_UNSUPPRESS_IF_GCC() + + return PinnedRef{this}; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void PageCacheSlot::extend_pin() noexcept +{ + const auto old_state = this->state_.fetch_add(kPinCountDelta, std::memory_order_relaxed); + const auto new_state = old_state + kPinCountDelta; + + BATT_CHECK_EQ(new_state & Self::kOverflowMask, 0); + + BATT_CHECK(Self::is_pinned(old_state)) + << "This method should never be called in cases where the current pin count might be 0; " + "use acquire_pin() instead."; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void PageCacheSlot::update_latest_use() noexcept +{ + this->latest_use_.store(LRUClock::advance_local()); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void PageCacheSlot::set_obsolete_hint() noexcept +{ + this->latest_use_.store(LRUClock::read_global() - (i64{1} << 56)); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +i64 PageCacheSlot::get_latest_use() const noexcept +{ + return this->latest_use_.load(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void PageCacheSlot::release_pin() noexcept +{ + const auto old_state = this->state_.fetch_sub(kPinCountDelta, std::memory_order_release); + + BATT_CHECK(Self::is_pinned(old_state)) + << "Each call to release_pin should have a previous call to " + "acquire_pin, so we should always observe a prior pinned state. " + << BATT_INSPECT(old_state); + + const auto new_state = old_state - kPinCountDelta; + const bool newly_unpinned = !this->is_pinned(new_state); + + BATT_CHECK_EQ(new_state & Self::kOverflowMask, 0); + + if (newly_unpinned) { + // Load the state with `acquire` order to create a full memory barrier. + // + (void)this->state_.load(std::memory_order_acquire); + + this->remove_ref(); + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +bool PageCacheSlot::evict() noexcept +{ + // Use a CAS loop here to guarantee an atomic transition from Valid + Filled (unpinned) state to + // Invalid. + // + auto observed_state = this->state_.load(std::memory_order_acquire); + for (;;) { + if (Self::is_pinned(observed_state) || !this->is_valid(observed_state)) { + return false; + } + + // Clear the valid bit from the state mask. + // + const auto target_state = observed_state & ~kValidMask; + if (this->state_.compare_exchange_weak(observed_state, target_state)) { + BATT_CHECK(!this->is_valid()); + return true; + } + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +bool PageCacheSlot::evict_if_key_equals(PageId key) noexcept +{ + // The slot must be pinned in order to read the key, so increase the pin count. + // + const auto old_state = this->state_.fetch_add(kPinCountDelta, std::memory_order_acquire); + auto observed_state = old_state + kPinCountDelta; + + BATT_CHECK_EQ(observed_state & Self::kOverflowMask, 0); + + // Use a CAS loop here to guarantee an atomic transition from Valid + Filled (unpinned) state to + // Invalid. + // + for (;;) { + // To succeed, we must be holding the only pin, the slot must be valid, and the key must match. + // + if (!(Self::get_pin_count(observed_state) == 1 && this->is_valid(observed_state) && + this->key_ == key)) { + this->state_.fetch_sub(kPinCountDelta, std::memory_order_release); + return false; + } + + // Clear the valid bit from the state mask and release the pin count we acquired above. + // + auto target_state = ((observed_state - kPinCountDelta) & ~kValidMask); + + BATT_CHECK(!Self::is_pinned(target_state) && !Self::is_valid(target_state)) + << BATT_INSPECT(target_state); + + if (this->state_.compare_exchange_weak(observed_state, target_state)) { + BATT_CHECK(!Self::is_valid()); + return true; + } + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +auto PageCacheSlot::fill(PageId key) noexcept -> PinnedRef +{ + BATT_CHECK(!this->is_valid()); + BATT_CHECK(key.is_valid()); + + this->key_ = key; + this->value_.emplace(); + this->update_latest_use(); + + auto observed_state = this->state_.fetch_add(kPinCountDelta) + kPinCountDelta; + BATT_CHECK_EQ(observed_state & Self::kOverflowMask, 0); + BATT_CHECK(Self::is_pinned(observed_state)); + + this->add_ref(); + this->set_valid(); + + return PinnedRef{this}; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void PageCacheSlot::clear() noexcept +{ + BATT_CHECK(!this->is_valid()); + + this->key_ = PageId{}; + this->value_ = None; + this->set_valid(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void PageCacheSlot::notify_first_ref_acquired() +{ + intrusive_ptr_add_ref(std::addressof(this->pool_)); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void PageCacheSlot::notify_last_ref_released() +{ + intrusive_ptr_release(std::addressof(this->pool_)); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void PageCacheSlot::set_valid() +{ + const auto observed_state = this->state_.fetch_or(kValidMask, std::memory_order_release); + BATT_CHECK(!this->is_valid(observed_state)) << "Must go from an invalid state to valid!"; +} + +} //namespace llfs diff --git a/src/llfs/page_cache_slot.hpp b/src/llfs/page_cache_slot.hpp new file mode 100644 index 0000000..adae5d8 --- /dev/null +++ b/src/llfs/page_cache_slot.hpp @@ -0,0 +1,338 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#pragma once +#ifndef LLFS_PAGE_CACHE_SLOT_HPP +#define LLFS_PAGE_CACHE_SLOT_HPP + +#include +// +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include +#include + +namespace llfs { + +/** \brief A container for a single key/value pair in a PageDeviceCache. + * + * PageCacheSlot objects are always in one of four states: + * - Invalid (initial) + * - Valid + Filled + * - Valid + Filled + Pinned + * - Valid + Cleared + * + */ +class PageCacheSlot +{ + public: + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - + // State Transition Diagram: + // + // ┌─────────┐ + // ┌─────│ Invalid │──────────┐ + // fill() │ └─────────┘ │ clear() + // │ ▲ │ + // │ │ │ + // │ │evict() │ + // ▼ │ ▼ + // ┌────────────────┐ │ ┌─────────────────┐ + // ┌──────│ Valid + Filled │──┴──────│ Valid + Cleared │ + // │ └────────────────┘ └─────────────────┘ + // │ ▲ + // acquire_pin():│ │ + // 0 -> 1 │ │release_pin(): + // │ │ 1 -> 0 + // │ │ + // │ ┌─────────────────────────┐ + // └─▶│ Valid + Filled + Pinned │ + // └─────────────────────────┘ + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - + + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - + // State integer bit layout: + // + // ┌─────────────────┬────────────────────────────────────────────────────┬─┐ + // │Overflow (8 bits)│ Pin Count (47 bits) │ │ + // └─────────────────┴────────────────────────────────────────────────────┴─┘ + // ▲ + // Valid? (1 bit)───────┘ + //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - + + /** \brief The byte offset of the Pin Count within the state integer. + */ + static constexpr usize kPinCountShift = 1; + + /** \brief The number of unused (most-significant) bits; used to detect integer overflow. + */ + static constexpr usize kOverflowBits = 8; + + /** \brief The amount to add/subtract to the state integer to increment or decrement the pin + * count. + */ + static constexpr u64 kPinCountDelta = u64{1} << kPinCountShift; + + /** \brief Used to detect pin count integer overflow; should always be zero. + */ + static constexpr u64 kOverflowMask = ((u64{1} << kOverflowBits) - 1) << (64 - kOverflowBits); + + /** \brief The Valid bit. + */ + static constexpr u64 kValidMask = 1; + + //+++++++++++-+-+--+----- --- -- - - - - + + // Forward-declarations of member types. + // + class Pool; // defined in + class AtomicRef; // defined in + class PinnedRef; // defined in + + using Self = PageCacheSlot; + + //+++++++++++-+-+--+----- --- -- - - - - + + /** \brief Returns the pin count bit field within the passed state integer. + */ + static constexpr u32 get_pin_count(u64 state) + { + return state >> kPinCountShift; + } + + /** \brief Returns true iff the pin count of `state` is non-zero, indicating the slot is in-use + * and must not be evicted or modified. + */ + static constexpr bool is_pinned(u64 state) + { + return Self::get_pin_count(state) != 0; + } + + /** \brief Returns true iff the Valid? bit of `state` is set. + */ + static constexpr bool is_valid(u64 state) + { + return (state & kValidMask) != 0; + } + + //+++++++++++-+-+--+----- --- -- - - - - + + PageCacheSlot(const PageCacheSlot&) = delete; + PageCacheSlot& operator=(const PageCacheSlot&) = delete; + + //+++++++++++-+-+--+----- --- -- - - - - + + /** \brief Constructs a new PageCacheSlot owned by the passed Pool. + */ + explicit PageCacheSlot(Pool& pool) noexcept; + + /** \brief Destroys the cache slot; ref count and pin count MUST both be zero when the slot is + * destroyed, or we will panic. + */ + ~PageCacheSlot() noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + + /** \brief Returns the PageCacheSlot::Pool containing this slot. + */ + Pool& pool() const noexcept; + + /** \brief Returns the index of `this` within its pool. + */ + usize index() const noexcept; + + /** \brief Returns the current key held in the slot, if valid; if the slot is invalid, the + * returned value is undefined. + */ + PageId key() const; + + /** Returns the current value held in the slot, if valid; if the slot is invalid, behavior is + * undefined. + * + * Most callers of this function must not modify the returned Latch object. There is one + * exception to this rule: the caller who most recently called `fill` to transition the slot state + * from 'Invalid' to 'Valid + Filled' is required to set the Latch value, which broadcasts to all + * other observers of this slot that the page has been loaded. + */ + batt::Latch>* value() noexcept; + + /** \brief Returns true iff the slot is in a valid state. + */ + bool is_valid() const noexcept; + + //----- --- -- - - - - + + /** \brief Returns the current (weak/non-pinning) reference count. + * + * Do not confuse this with the pin count! A non-zero ref count keeps the PageCacheSlot (and by + * extension the pool that owns it) in scope, but it does not prevent the slot from being evicted + * and refilled. Think of this as a weak reference count. + */ + u64 ref_count() const noexcept; + + /** \brief Adds a (weak/non-pinning) reference to the slot. + * + * Used to avoid premature destruction of the cache. + */ + void add_ref() noexcept; + + /** \brief Removes a (weak/non-pinning) reference from the slot. + * + * Used to avoid premature destruction of the cache. + */ + void remove_ref() noexcept; + + //----- --- -- - - - - + + /** \brief Returns true iff the slot is in a pinned state. + */ + bool is_pinned() const noexcept; + + /** \brief Returns the current pin count of the slot; if this is 0, the slot is not pinned. + */ + u32 pin_count() const noexcept; + + /** \brief Conditionally pins the slot so it can't be evicted. + * + * This operation is conditioned on `key` matching the currently stored key in the slot. If the + * slot is in an invalid state or the key doesn't match, the operation will fail and an + * empty/invalid value is returned. + * + * If ignore_key is true, then the pin will succeed if the slot is in a valid state, no matter + * what the current key is. + * + * A slot is removed from the cache's LRU list when its pin count goes from 0 -> 1, and placed + * back at the "most recently used" end of the LRU list when the pin count goes from 1 -> 0. + */ + PinnedRef acquire_pin(PageId key, bool ignore_key = false) noexcept; + + /** \brief Called when creating a copy of PinnedCacheSlot, i.e. only when the pin count is going + * from n -> n+1, where n > 0. + */ + void extend_pin() noexcept; + + /** \brief Decreases the pin count by 1. + * + * If this unpins the slot, then we also remove a single weak ref. + */ + void release_pin() noexcept; + + /** \brief If this slot is not pinned and it is not evicted, atomically increment the generation + * counter and return true; else return false. + * + * If evict() succeeds (returns true), then the slot is in an "invalid" state. + */ + bool evict() noexcept; + + /** \brief Evicts the slot iff it is evict-able and the current key matches the passed value. + */ + bool evict_if_key_equals(PageId key) noexcept; + + /** \brief Resets the key and value for this slot. + * + * The generation counter must be odd (indicating the slot has been evicted) prior to calling this + * function. + * + * May only be called when the slot is in an invalid state. + */ + PinnedRef fill(PageId key) noexcept; + + /** \brief Sets the key and value of the slot to empty/null. + * + * This causes the slot to leave the invalid state, but all attempts to pin will fail until it is + * evicted/filled. + */ + void clear() noexcept; + + //----- --- -- - - - - + + /** \brief Updates the latest use logical timestamp for this object, to make eviction less likely. + * + * Only has an effect if the "obsolete hint" (see set_obsolete_hint, get_obsolete_hint) is false. + */ + void update_latest_use() noexcept; + + /** Give a hint to the cache that this slot is likely to be needed again in the future. + * + * This function sets the latest_use LTS to a very old value. + */ + void set_obsolete_hint() noexcept; + + /** \brief Returns the current latest use logical timestamp. + */ + i64 get_latest_use() const noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + private: + /** \brief The implementation of acquire_pin; returns true iff successful. + */ + bool acquire_pin_impl(PageId key) noexcept; + + /** \brief Invoked when the ref count goes from 0 -> 1. + */ + void notify_first_ref_acquired(); + + /** \brief Invoked when the ref count goes from 1 -> 0. + */ + void notify_last_ref_released(); + + /** \brief Sets the valid bit; Panic if the previous state was not Invalid. + */ + void set_valid(); + + //+++++++++++-+-+--+----- --- -- - - - - + + Pool& pool_; + PageId key_; + Optional>> value_; + std::atomic state_{0}; + std::atomic ref_count_{0}; + std::atomic latest_use_{0}; +}; + +//=#=#==#==#===============+=+=+=+=++=++++++++++++++-++-+--+-+----+--------------- + +namespace detail { + +/** \brief Calls slot->add_ref() iff slot is not nullptr. + */ +inline PageCacheSlot* increment_weak_ref(PageCacheSlot* slot) +{ + if (slot) { + slot->add_ref(); + } + return slot; +} + +/** \brief Calls slot->remove_ref() iff slot is not nullptr. + */ +inline void decrement_weak_ref(PageCacheSlot* slot) +{ + if (slot) { + slot->remove_ref(); + } +} + +} //namespace detail + +} //namespace llfs + +#include +// +#include +#include + +#endif // LLFS_PAGE_CACHE_SLOT_HPP diff --git a/src/llfs/page_cache_slot.test.cpp b/src/llfs/page_cache_slot.test.cpp new file mode 100644 index 0000000..0660603 --- /dev/null +++ b/src/llfs/page_cache_slot.test.cpp @@ -0,0 +1,427 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#include +// +#include + +#include +#include + +#include + +namespace { + +// Test Plan: +// +// 1. Create slots with different index values in a Pool; verify index() +// - also verify initial is_valid state +// 2. ref_count test - add_ref/remove_ref should affect the ref_count, and also the use count of +// the pool but only in the case of add: 0 -> 1 and remove: 1 -> 0. +// 3. State transition test +// a. Invalid --(clear)--> Valid + Cleared +// b. Invalid --(fill)--> Valid + Filled +// c. Valid + Cleared --(evict)--> Invalid +// d. Valid + Filled --(evict)--> Invalid +// e. Valid + Filled --(acquire_pin)--> Valid + Filled + Pinned +// f. Valid + Filled + Pinned --(acquire_pin)-- >Valid + Filled + Pinned +// g. Valid + Filled + Pinned --(release_pin)-- >Valid + Filled + Pinned +// h. Valid + Filled + Pinned --(release_pin)-- >Valid + Filled +// 4. extend_pin increases pin count +// a. success if already > 0 +// b. panic otherwise +// 5. evict fails if pin count != 0 +// 6. evict_if_key_equals +// a. success +// b. fail because pin count != 0 +// c. fail because key is wrong +// 7. fill fails when state is not Invalid: +// a. Valid + Filled +// b. Valid + Cleared +// c. Valid + Filled + Pinned +// 8. update_latest_use +// 9. set_obsolete_hint +// + +using namespace llfs::int_types; + +constexpr usize kNumTestSlots = 4; +const std::string kTestPoolName = "Test PageCacheSlot Pool"; + +class PageCacheSlotTest : public ::testing::Test +{ + public: + boost::intrusive_ptr pool_ = llfs::PageCacheSlot::Pool::make_new( + /*n_slots=*/kNumTestSlots, batt::make_copy(kTestPoolName)); +}; + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 1. Create slots with different index values in a Pool; verify index() +// - also verify initial is_valid state +// +TEST_F(PageCacheSlotTest, CreateSlots) +{ + for (usize i = 0; i < kNumTestSlots; ++i) { + llfs::PageCacheSlot* slot = this->pool_->allocate(); + + ASSERT_NE(slot, nullptr); + EXPECT_EQ(slot, this->pool_->get_slot(i)); + EXPECT_EQ(slot->index(), i); + EXPECT_EQ(this->pool_->index_of(slot), i); + EXPECT_FALSE(slot->is_valid()); + + if (i == 0) { + EXPECT_DEATH(slot->value(), ".*Assert.*failed:.*this->value_.*==.*true.*"); + } + + EXPECT_FALSE(slot->key().is_valid()); + EXPECT_EQ(slot->ref_count(), 0u); + EXPECT_EQ(slot->pin_count(), 0u); + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 2. ref_count test - add_ref/remove_ref should affect the ref_count, and also the use count of +// the pool but only in the case of add: 0 -> 1 and remove: 1 -> 0. +// +TEST_F(PageCacheSlotTest, AddRemoveRef) +{ + EXPECT_EQ(this->pool_->use_count(), 1u); + + llfs::PageCacheSlot* slot = this->pool_->allocate(); + + EXPECT_EQ(slot->ref_count(), 0u); + EXPECT_DEATH(slot->remove_ref(), "Assert.*failed:.*observed_count.*>.*0"); + + slot->add_ref(); + + EXPECT_EQ(slot->ref_count(), 1u); + EXPECT_EQ(this->pool_->use_count(), 2u); + + slot->add_ref(); + slot->add_ref(); + + EXPECT_EQ(slot->ref_count(), 3u); + EXPECT_EQ(this->pool_->use_count(), 2u); + + slot->remove_ref(); + + EXPECT_EQ(slot->ref_count(), 2u); + EXPECT_EQ(this->pool_->use_count(), 2u); + + slot->remove_ref(); + slot->remove_ref(); + + EXPECT_EQ(slot->ref_count(), 0u); + EXPECT_EQ(this->pool_->use_count(), 1u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 3. State transition test +// a. Invalid --(clear)--> Valid + Cleared +// b. Invalid --(fill)--> Valid + Filled +// c. Valid + Cleared --(evict)--> Invalid +// d. Valid + Filled --(evict)--> Invalid +// e. Valid + Filled --(acquire_pin)--> Valid + Filled + Pinned +// f. Valid + Filled + Pinned --(acquire_pin)-- >Valid + Filled + Pinned +// g. Valid + Filled + Pinned --(release_pin)-- >Valid + Filled + Pinned +// h. Valid + Filled + Pinned --(release_pin)-- >Valid + Filled +// +TEST_F(PageCacheSlotTest, StateTransitions) +{ + llfs::PageCacheSlot* slot = this->pool_->allocate(); + + EXPECT_FALSE(slot->is_valid()); + + // a. Invalid --(clear)--> Valid + Cleared + // + slot->clear(); + EXPECT_TRUE(slot->is_valid()); + + // c. Valid + Cleared --(evict)--> Invalid + // + EXPECT_TRUE(slot->evict()); + EXPECT_FALSE(slot->is_valid()); + + // b. Invalid --(fill)--> Valid + Filled + // + { + llfs::PageCacheSlot::PinnedRef pinned_ref = slot->fill(llfs::PageId{1}); + + EXPECT_TRUE(slot->is_valid()); + EXPECT_EQ(slot->key(), llfs::PageId{1}); + EXPECT_NE(slot->value(), nullptr); + EXPECT_TRUE(pinned_ref); + EXPECT_EQ(pinned_ref.slot(), slot); + EXPECT_EQ(pinned_ref.key(), slot->key()); + EXPECT_EQ(pinned_ref.value(), slot->value()); + EXPECT_EQ(pinned_ref.get(), slot->value()); + EXPECT_EQ(pinned_ref.pin_count(), 1u); + EXPECT_EQ(slot->pin_count(), 1u); + EXPECT_EQ(pinned_ref.ref_count(), 1u); + EXPECT_EQ(slot->ref_count(), 1u); + + // f. Valid + Filled + Pinned --(acquire_pin)-- >Valid + Filled + Pinned + // + llfs::PageCacheSlot::PinnedRef ref2 = pinned_ref; + + EXPECT_TRUE(ref2); + EXPECT_EQ(ref2.slot(), slot); + EXPECT_EQ(pinned_ref.pin_count(), 2u); + EXPECT_EQ(slot->pin_count(), 2u); + EXPECT_EQ(pinned_ref.ref_count(), 1u); + EXPECT_EQ(slot->ref_count(), 1u); + + llfs::PageCacheSlot::PinnedRef ref3; + + EXPECT_FALSE(ref3); + + ref3 = ref2; + + EXPECT_TRUE(ref3); + EXPECT_EQ(ref3.slot(), slot); + EXPECT_EQ(pinned_ref.pin_count(), 3u); + EXPECT_EQ(slot->pin_count(), 3u); + EXPECT_EQ(pinned_ref.ref_count(), 1u); + EXPECT_EQ(slot->ref_count(), 1u); + + { + llfs::PageCacheSlot::PinnedRef ref4 = std::move(ref2); + + EXPECT_FALSE(ref2); + EXPECT_EQ(ref4.slot(), slot); + EXPECT_TRUE(ref4); + EXPECT_EQ(pinned_ref.pin_count(), 3u); + EXPECT_EQ(slot->pin_count(), 3u); + EXPECT_EQ(pinned_ref.ref_count(), 1u); + EXPECT_EQ(slot->ref_count(), 1u); + + { + llfs::PageCacheSlot::PinnedRef ref5; + + EXPECT_FALSE(ref5); + + ref5 = std::move(ref3); + + EXPECT_EQ(ref5.slot(), slot); + EXPECT_FALSE(ref3); + EXPECT_TRUE(ref5); + EXPECT_EQ(pinned_ref.pin_count(), 3u); + EXPECT_EQ(slot->pin_count(), 3u); + EXPECT_EQ(pinned_ref.ref_count(), 1u); + EXPECT_EQ(slot->ref_count(), 1u); + } + // + // g. Valid + Filled + Pinned --(release_pin)-- >Valid + Filled + Pinned + + EXPECT_EQ(pinned_ref.pin_count(), 2u); + EXPECT_EQ(slot->pin_count(), 2u); + EXPECT_EQ(pinned_ref.ref_count(), 1u); + EXPECT_EQ(slot->ref_count(), 1u); + } + + EXPECT_EQ(pinned_ref.pin_count(), 1u); + EXPECT_EQ(slot->pin_count(), 1u); + EXPECT_EQ(pinned_ref.ref_count(), 1u); + EXPECT_EQ(slot->ref_count(), 1u); + } + // + // h. Valid + Filled + Pinned --(release_pin)-- >Valid + Filled + + //----- --- -- - - - - + + // e. Valid + Filled --(acquire_pin)--> Valid + Filled + Pinned + // + EXPECT_EQ(slot->pin_count(), 0u); + EXPECT_EQ(slot->ref_count(), 0u); + EXPECT_TRUE(slot->is_valid()); + { + llfs::PageCacheSlot::PinnedRef pinned_ref = + slot->acquire_pin(llfs::PageId{}, /*ignore_key=*/true); + + EXPECT_TRUE(pinned_ref); + EXPECT_TRUE(slot->is_valid()); + EXPECT_EQ(slot->pin_count(), 1u); + } + EXPECT_EQ(slot->pin_count(), 0u); + EXPECT_EQ(slot->ref_count(), 0u); + EXPECT_TRUE(slot->is_valid()); + { + llfs::PageCacheSlot::PinnedRef pinned_ref = + slot->acquire_pin(llfs::PageId{1}, /*ignore_key=*/false); + + EXPECT_TRUE(pinned_ref); + EXPECT_TRUE(slot->is_valid()); + EXPECT_EQ(slot->pin_count(), 1u); + } + EXPECT_EQ(slot->pin_count(), 0u); + EXPECT_EQ(slot->ref_count(), 0u); + EXPECT_TRUE(slot->is_valid()); + { + // Try to acquire pin using the wrong PageId; expect to fail. + // + llfs::PageCacheSlot::PinnedRef pinned_ref = + slot->acquire_pin(llfs::PageId{2}, /*ignore_key=*/false); + + EXPECT_FALSE(pinned_ref); + EXPECT_TRUE(slot->is_valid()); + EXPECT_EQ(slot->pin_count(), 0u); + } + + // b. Invalid --(fill)--> Valid + Filled + // + EXPECT_TRUE(slot->evict()); + EXPECT_FALSE(slot->is_valid()); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 4. extend_pin increases pin count +// a. success if already > 0 +// +TEST_F(PageCacheSlotTest, ExtendPinSuccess) +{ + llfs::PageCacheSlot* slot = this->pool_->allocate(); + + llfs::PageCacheSlot::PinnedRef pinned_ref = slot->fill(llfs::PageId{1}); + + EXPECT_EQ(slot->pin_count(), 1u); + + slot->extend_pin(); + + EXPECT_EQ(slot->pin_count(), 2u); + + slot->release_pin(); + + EXPECT_EQ(slot->pin_count(), 1u); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 4. extend_pin increases pin count +// b. panic otherwise +// +TEST_F(PageCacheSlotTest, ExtendPinDeath) +{ + llfs::PageCacheSlot* slot = this->pool_->allocate(); + + EXPECT_DEATH(slot->extend_pin(), "Assert.*failed:.*is.*pinned"); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 5. evict fails if pin count != 0 +// +TEST_F(PageCacheSlotTest, EvictFailure) +{ + llfs::PageCacheSlot* slot = this->pool_->allocate(); + + llfs::PageCacheSlot::PinnedRef pinned_ref = slot->fill(llfs::PageId{1}); + + EXPECT_FALSE(slot->evict()); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 6. evict_if_key_equals +// a. success +// +TEST_F(PageCacheSlotTest, EvictIfKeyEqualsSuccess) +{ + llfs::PageCacheSlot* slot = this->pool_->allocate(); + { + llfs::PageCacheSlot::PinnedRef pinned_ref = slot->fill(llfs::PageId{1}); + } + + EXPECT_TRUE(slot->evict_if_key_equals(llfs::PageId{1})); + EXPECT_FALSE(slot->is_valid()); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 6. evict_if_key_equals +// b. fail because pin count != 0 +// +TEST_F(PageCacheSlotTest, EvictIfKeyEqualsFailurePinned) +{ + llfs::PageCacheSlot* slot = this->pool_->allocate(); + { + llfs::PageCacheSlot::PinnedRef pinned_ref = slot->fill(llfs::PageId{1}); + + EXPECT_FALSE(slot->evict_if_key_equals(llfs::PageId{1})); + } + EXPECT_TRUE(slot->is_valid()); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 6. evict_if_key_equals +// c. fail because key is wrong +// +TEST_F(PageCacheSlotTest, EvictIfKeyEqualsFailureWrongKey) +{ + llfs::PageCacheSlot* slot = this->pool_->allocate(); + { + llfs::PageCacheSlot::PinnedRef pinned_ref = slot->fill(llfs::PageId{1}); + } + + EXPECT_FALSE(slot->evict_if_key_equals(llfs::PageId{2})); + EXPECT_TRUE(slot->is_valid()); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 7. fill fails when state is not Invalid: +// a. Valid + Filled +// c. Valid + Filled + Pinned +// +TEST_F(PageCacheSlotTest, FillFailureAlreadyFilled) +{ + llfs::PageCacheSlot* slot = this->pool_->allocate(); + { + llfs::PageCacheSlot::PinnedRef pinned_ref = slot->fill(llfs::PageId{1}); + + EXPECT_EQ(slot->pin_count(), 1u); + EXPECT_TRUE(pinned_ref); + EXPECT_DEATH(slot->fill(llfs::PageId{2}), "Assert.*fail.*is.*valid"); + } + EXPECT_EQ(slot->pin_count(), 0u); + EXPECT_DEATH(slot->fill(llfs::PageId{2}), "Assert.*fail.*is.*valid"); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 7. fill fails when state is not Invalid: +// b. Valid + Cleared +// +TEST_F(PageCacheSlotTest, FillFailureCleared) +{ + llfs::PageCacheSlot* slot = this->pool_->allocate(); + + EXPECT_FALSE(slot->is_valid()); + + slot->clear(); + + EXPECT_TRUE(slot->is_valid()); + EXPECT_DEATH(slot->fill(llfs::PageId{2}), "Assert.*fail.*is.*valid"); + EXPECT_DEATH(slot->clear(), "Assert.*fail.*is.*valid"); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// 8. update_latest_use +// 9. set_obsolete_hint +// +TEST_F(PageCacheSlotTest, LatestUse) +{ + llfs::PageCacheSlot* slot = this->pool_->allocate(); + + i64 t0 = slot->get_latest_use(); + slot->update_latest_use(); + i64 t1 = slot->get_latest_use(); + + EXPECT_GT(t1 - t0, 0); + + slot->set_obsolete_hint(); + i64 t2 = slot->get_latest_use(); + + EXPECT_LT(t2 - t1, 0); +} + +} // namespace diff --git a/src/llfs/page_cache_slot_atomic_ref.hpp b/src/llfs/page_cache_slot_atomic_ref.hpp new file mode 100644 index 0000000..40b724a --- /dev/null +++ b/src/llfs/page_cache_slot_atomic_ref.hpp @@ -0,0 +1,146 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#ifndef LLFS_PAGE_CACHE_SLOT_HPP +#error This file must be included from/after page_cache_slot.hpp! +#endif + +#include + +#include + +namespace llfs { + +/** \brief A lock-free, thread-safe weak reference to a PageCacheSlot object. + */ +class PageCacheSlot::AtomicRef : public boost::equality_comparable +{ + public: + /** \brief Constructs an empty (invalid) AtomicRef. + */ + AtomicRef() = default; + + /** \brief Initializes a new AtomicRef from an existing pinned ref. + * + * This does not affect the pin count of the slot. + */ + /*implicit*/ AtomicRef(const PageCacheSlot::PinnedRef& pinned) noexcept + : slot_{detail::increment_weak_ref(pinned.slot())} + { + } + + /** \brief Initializes a new AtomicRef from an existing pinned ref. + * + * This will release the passed pinned ref, decrementing the pin count by one. + */ + /*implicit*/ AtomicRef(PageCacheSlot::PinnedRef&& pinned) noexcept + : slot_{detail::increment_weak_ref(pinned.slot())} + { + pinned.reset(); + } + + /** \brief Initializes a new AtomicRef by copying an existing one. + * + * This will increment the slot's weak ref count, but leave the pin count unchanged. + */ + AtomicRef(const AtomicRef& that) noexcept : slot_{detail::increment_weak_ref(that.slot_.load())} + { + } + + /** \brief Initializes a new AtomicRef by moving an existing one to a new object. + * + * This leaves both the slot's weak ref count and pin count unchanged. The passed AtomicRef + * (that) is cleared. + */ + AtomicRef(AtomicRef&& that) noexcept : slot_{that.slot_.exchange(nullptr)} + { + } + + /** \brief If this ref is valid, decrements the weak ref count of the slot. + */ + ~AtomicRef() noexcept + { + detail::decrement_weak_ref(this->slot_.load()); + } + + //+++++++++++-+-+--+----- --- -- - - - - + + /** \brief Swaps the values of `this` and `that`. + * + * Because the swap operation cannot be implemented with a single atomic instruction, this + * function MUST only be used when the caller is sure that there is no concurrent access to + * `this`. Observers of `that` will see a single atomic transition to the new value (the old + * value of this). + */ + void unsynchronized_swap(AtomicRef& that) noexcept + { + this->slot_.store(that.slot_.exchange(this->slot_.load())); + } + + /** \brief Assigns this to that, returning a reference to this. + * + * This is safe to call concurrently for the same AtomicRef object; however, observers of `this` + * and `that` will see the change to the value of each as two distinct atomic events. + */ + AtomicRef& operator=(const AtomicRef& that) noexcept + { + AtomicRef copy{that}; + copy.unsynchronized_swap(*this); + return *this; + } + + /** \brief Assigns this to that, returning a reference to this. + * + * Clears the value of `that`. + * + * This is safe to call concurrently for the same AtomicRef object; however, observers of `this` + * and `that` will see the change to the value of each as two distinct atomic events. + */ + AtomicRef& operator=(AtomicRef&& that) noexcept + { + AtomicRef copy{std::move(that)}; + copy.unsynchronized_swap(*this); + return *this; + } + + /** \brief Returns true iff this is a valid ref (non-null). + */ + explicit operator bool() const noexcept + { + return this->slot_.load() != nullptr; + } + + /** \brief Attempts to pin the referenced slot, returning a valid PinnedRef iff successful. + * + * If this AtomicRef does not refer to a slot (i.e., it is invalid), then this will return an + * invalid PinnedRef to indicate failure. If the pin fails for any reason, e.g. the slot has been + * evicted and/or its current key is different from `key`, returns an invalid PinnedRef. + */ + PageCacheSlot::PinnedRef pin(PageId key) const noexcept + { + auto* slot = this->slot_.load(); + if (!slot) { + return {}; + } + return slot->acquire_pin(key); + } + + /** \brief Returns a pointer to the referenced slot (nullptr if this reference is currently + * invalid). + */ + PageCacheSlot* slot() const noexcept + { + return this->slot_.load(); + } + + //+++++++++++-+-+--+----- --- -- - - - - + private: + std::atomic slot_{nullptr}; +}; + +} //namespace llfs diff --git a/src/llfs/page_cache_slot_pinned_ref.hpp b/src/llfs/page_cache_slot_pinned_ref.hpp new file mode 100644 index 0000000..c73dff4 --- /dev/null +++ b/src/llfs/page_cache_slot_pinned_ref.hpp @@ -0,0 +1,136 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#ifndef LLFS_PAGE_CACHE_SLOT_HPP +#error This file must be included from/after page_cache_slot.hpp! +#endif + +namespace llfs { + +class PageCacheSlot::PinnedRef : public boost::equality_comparable +{ + public: + friend class PageCacheSlot; + + using value_type = batt::Latch>; + + PinnedRef() = default; + + private: + explicit PinnedRef(PageCacheSlot* slot) noexcept + : slot_{slot} + , value_{slot ? slot->value() : nullptr} + { + } + + public: + PinnedRef(const PinnedRef& that) noexcept : slot_{that.slot_}, value_{that.value_} + { + if (this->slot_) { + this->slot_->extend_pin(); + } + } + + PinnedRef& operator=(const PinnedRef& that) noexcept + { + PinnedRef copy{that}; + this->swap(copy); + return *this; + } + + PinnedRef(PinnedRef&& that) noexcept : slot_{that.slot_}, value_{that.value_} + { + that.slot_ = nullptr; + that.value_ = nullptr; + } + + PinnedRef& operator=(PinnedRef&& that) noexcept + { + PinnedRef copy{std::move(that)}; + this->swap(copy); + return *this; + } + + ~PinnedRef() noexcept + { + this->reset(); + } + + void reset() + { + if (this->slot_) { + this->slot_->release_pin(); + this->slot_ = nullptr; + this->value_ = nullptr; + } + } + + void swap(PinnedRef& that) + { + std::swap(this->slot_, that.slot_); + std::swap(this->value_, that.value_); + } + + explicit operator bool() const + { + return this->slot_ != nullptr; + } + + PageCacheSlot* slot() const noexcept + { + return this->slot_; + } + + PageId key() const noexcept + { + return this->slot_->key(); + } + + value_type* value() const noexcept + { + return this->value_; + } + + value_type* get() const noexcept + { + return this->value(); + } + + value_type* operator->() const noexcept + { + return this->get(); + } + + value_type& operator*() const noexcept + { + return *this->get(); + } + + u32 pin_count() const noexcept + { + return this->slot_ ? this->slot_->pin_count() : 0; + } + + u64 ref_count() const noexcept + { + return this->slot_ ? this->slot_->ref_count() : 0; + } + + private: + PageCacheSlot* slot_ = nullptr; + value_type* value_ = nullptr; +}; + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - + +inline bool operator==(const PageCacheSlot::PinnedRef& l, const PageCacheSlot::PinnedRef& r) +{ + return l.slot() == r.slot() && l.get() == r.get(); +} + +} //namespace llfs diff --git a/src/llfs/page_cache_slot_pool.cpp b/src/llfs/page_cache_slot_pool.cpp new file mode 100644 index 0000000..2cf561b --- /dev/null +++ b/src/llfs/page_cache_slot_pool.cpp @@ -0,0 +1,200 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#include +// + +#include + +namespace llfs { + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/*explicit*/ PageCacheSlot::Pool::Pool(usize n_slots, std::string&& name, + usize eviction_candidates) noexcept + : n_slots_{n_slots} + , eviction_candidates_{std::min(n_slots, std::max(2, eviction_candidates))} + , name_{std::move(name)} + , slot_storage_{new SlotStorage[n_slots]} +{ + this->metrics_.max_slots.set(n_slots); + + const auto metric_name = [this](std::string_view property) { + return batt::to_string("Cache_", this->name_, "_", property); + }; + +#define ADD_METRIC_(n) global_metric_registry().add(metric_name(#n), this->metrics_.n) + + ADD_METRIC_(max_slots); + ADD_METRIC_(indexed_slots); + ADD_METRIC_(query_count); + ADD_METRIC_(hit_count); + ADD_METRIC_(stale_count); + ADD_METRIC_(alloc_count); + ADD_METRIC_(evict_count); + ADD_METRIC_(insert_count); + ADD_METRIC_(erase_count); + ADD_METRIC_(full_count); + +#undef ADD_METRIC_ +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +PageCacheSlot::Pool::~Pool() noexcept +{ + if (this->slot_storage_) { + const usize n_to_delete = this->n_constructed_.get_value(); + BATT_CHECK_EQ(n_to_delete, this->n_allocated_.load()); + + for (usize i = 0; i < n_to_delete; ++i) { + BATT_DEBUG_INFO("Destructing slot " << i << BATT_INSPECT(n_to_delete) + << BATT_INSPECT(this->n_allocated_.load()) + << BATT_INSPECT(this->n_slots_)); + this->get_slot(i)->~PageCacheSlot(); + } + } + + global_metric_registry() + .remove(this->metrics_.max_slots) + .remove(this->metrics_.indexed_slots) + .remove(this->metrics_.query_count) + .remove(this->metrics_.hit_count) + .remove(this->metrics_.stale_count) + .remove(this->metrics_.alloc_count) + .remove(this->metrics_.evict_count) + .remove(this->metrics_.insert_count) + .remove(this->metrics_.erase_count) + .remove(this->metrics_.full_count); + + LLFS_VLOG(1) << "PageCacheSlot::Pool::~Pool()"; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +PageCacheSlot* PageCacheSlot::Pool::get_slot(usize i) noexcept +{ + BATT_CHECK_LT(i, this->n_slots_); + + return std::addressof(*this->slots()[i]); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +PageCacheSlot* PageCacheSlot::Pool::allocate() noexcept +{ + if (this->n_allocated_.load() < this->n_slots_) { + const usize allocated_i = this->n_allocated_.fetch_add(1); + if (allocated_i < this->n_slots_) { + void* storage_addr = this->slots() + allocated_i; + PageCacheSlot* const new_slot = new (storage_addr) PageCacheSlot{*this}; + this->n_constructed_.fetch_add(1); + return new_slot; + } + const usize reverted = this->n_allocated_.fetch_sub(1); + BATT_CHECK_GE(reverted, this->n_slots_); + // + // continue... + } + + BATT_CHECK_OK(this->n_constructed_.await_equal(this->n_slots_)); + + return this->evict_lru(); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +usize PageCacheSlot::Pool::index_of(const PageCacheSlot* slot) noexcept +{ + BATT_CHECK_NOT_NULLPTR(slot); + BATT_CHECK_EQ(std::addressof(slot->pool()), this); + + const usize index = batt::CpuCacheLineIsolated::pointer_from(slot) - this->slots(); + + BATT_CHECK_LT(index, this->n_slots_); + + return index; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +batt::CpuCacheLineIsolated* PageCacheSlot::Pool::slots() noexcept +{ + return reinterpret_cast*>(this->slot_storage_.get()); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +PageCacheSlot* PageCacheSlot::Pool::evict_lru() +{ + thread_local std::default_random_engine rng{/*seed=*/std::random_device{}()}; + + const usize n_slots = this->n_constructed_.get_value(); + + if (n_slots == 0) { + return nullptr; + } + + if (n_slots == 1) { + PageCacheSlot* only_slot = this->get_slot(0); + if (only_slot->evict()) { + return only_slot; + } + return nullptr; + } + + // Pick k slots at random and try to evict whichever one has the least (earliest) latest use + // logical time stamp. + // + std::uniform_int_distribution pick_first_slot{0, n_slots - 1}; + std::uniform_int_distribution pick_second_slot{0, n_slots - 2}; + + for (usize attempts = 0; attempts < n_slots; ++attempts) { + usize first_slot_i = pick_first_slot(rng); + usize second_slot_i = pick_second_slot(rng); + + if (second_slot_i >= first_slot_i) { + ++second_slot_i; + } + BATT_CHECK_NE(first_slot_i, second_slot_i); + + PageCacheSlot* first_slot = this->get_slot(first_slot_i); + PageCacheSlot* second_slot = this->get_slot(second_slot_i); + PageCacheSlot* lru_slot = [&] { + if (first_slot->get_latest_use() - second_slot->get_latest_use() < 0) { + return first_slot; + } + return second_slot; + }(); + + // Pick more random slots (with replacement, since we already have >= 2) to try to get a better + // (older) last-usage LTS. + // + for (usize k = 2; k < this->eviction_candidates_; ++k) { + usize nth_slot_i = pick_first_slot(rng); + PageCacheSlot* nth_slot = this->get_slot(nth_slot_i); + lru_slot = [&] { + if (nth_slot->get_latest_use() - lru_slot->get_latest_use() < 0) { + return nth_slot; + } + return lru_slot; + }(); + } + + // Fingers crossed! + // + if (lru_slot->evict()) { + this->metrics_.evict_count.fetch_add(1); + return lru_slot; + } + } + + return nullptr; +} + +} //namespace llfs diff --git a/src/llfs/page_cache_slot_pool.hpp b/src/llfs/page_cache_slot_pool.hpp new file mode 100644 index 0000000..7d3dc93 --- /dev/null +++ b/src/llfs/page_cache_slot_pool.hpp @@ -0,0 +1,130 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#ifndef LLFS_PAGE_CACHE_SLOT_HPP +#error This file must be included from/after page_cache_slot.hpp! +#endif + +#include + +namespace llfs { + +/** \brief A pool of PageCacheSlot objects. + * + * Used to construct a PageDeviceCache. + */ +class PageCacheSlot::Pool : public boost::intrusive_ref_counter +{ + public: + using Self = Pool; + + /** \brief The default number of randomly-selected slots to consider when trying to evict a slot + * that hasn't been accessed recently. + */ + static constexpr usize kDefaultEvictionCandidates = 8; + + /** \brief Aligned storage type for a single PageCacheSlot. We allocate an array of this type + * when constructing a Pool object, then construct the individual slots via placement-new as they + * are needed. + */ + using SlotStorage = std::aligned_storage_t), + alignof(batt::CpuCacheLineIsolated)>; + + /** \brief Observability metrics for a cache slot pool. + */ + struct Metrics { + CountMetric max_slots{0}; + CountMetric indexed_slots{0}; + CountMetric query_count{0}; + CountMetric hit_count{0}; + CountMetric stale_count{0}; + CountMetric alloc_count{0}; + CountMetric evict_count{0}; + CountMetric insert_count{0}; + CountMetric erase_count{0}; + CountMetric full_count{0}; + }; + + //+++++++++++-+-+--+----- --- -- - - - - + + /** \brief Creates a new PageCacheSlot::Pool. + * + * Objects of this type MUST be managed via boost::intrusive_ptr. + */ + template + static boost::intrusive_ptr make_new(Args&&... args) + { + return boost::intrusive_ptr{new Pool(BATT_FORWARD(args)...)}; + } + + /** \brief Destroys a PageCacheSlot pool. + * + * Will panic if there are any pinned slots. + */ + ~Pool() noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + + /** \brief Returns the slot at the specified index (`i`). + * + * The passed index must refer to a slot that was previously returned by this->allocate(), or + * behavior is undefined! + */ + PageCacheSlot* get_slot(usize i) noexcept; + + /** \brief Returns a cache slot in the `Invalid` state, ready to be filled by the caller. + * + * This function is guaranteed to return an available slot the first `n_slots` times it is called. + * Thereafter, it will attempt to evict an unpinned slot that hasn't been used recently. If no + * such slot can be found, `nullptr` will be returned. + */ + PageCacheSlot* allocate() noexcept; + + /** \brief Returns the index of the specified slot object. + * + * If `slot` does not belong to this pool, behavior is undefined! + */ + usize index_of(const PageCacheSlot* slot) noexcept; + + /** \brief Returns the metrics for this pool. + */ + const Metrics& metrics() const + { + return this->metrics_; + } + + //+++++++++++-+-+--+----- --- -- - - - - + private: + /** \brief Constructs a new Pool with capacity for `n_slots` cached pages. + */ + explicit Pool(usize n_slots, std::string&& name, + usize eviction_candidates = Self::kDefaultEvictionCandidates) noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + + batt::CpuCacheLineIsolated* slots() noexcept; + + /** \brief Tries to find a slot that hasn't been used in a while to evict. + * + * Will keep on looping until it has made one attempt for each slot in the cache. At that point, + * we just give up and return nullptr. + */ + PageCacheSlot* evict_lru(); + + //+++++++++++-+-+--+----- --- -- - - - - + + const usize n_slots_; + const usize eviction_candidates_; + const std::string name_; + std::unique_ptr slot_storage_; + std::atomic n_allocated_{0}; + batt::Watch n_constructed_{0}; + Metrics metrics_; +}; + +} //namespace llfs diff --git a/src/llfs/page_device_cache.cpp b/src/llfs/page_device_cache.cpp new file mode 100644 index 0000000..291be5b --- /dev/null +++ b/src/llfs/page_device_cache.cpp @@ -0,0 +1,222 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#include +// + +#include + +namespace llfs { + +namespace { + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +struct NewSlot { + PageCacheSlot* p_slot = nullptr; + PageCacheSlot::PinnedRef pinned_ref; + usize slot_index = PageDeviceCache::kInvalidIndex; +}; + +} //namespace + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +/*explicit*/ PageDeviceCache::PageDeviceCache( + const PageIdFactory& page_ids, boost::intrusive_ptr&& slot_pool) noexcept + : page_ids_{page_ids} + , slot_pool_{std::move(slot_pool)} + , cache_(this->page_ids_.get_physical_page_count(), kInvalidIndex) +{ +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +PageDeviceCache::~PageDeviceCache() noexcept +{ +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +const PageIdFactory& PageDeviceCache::page_ids() const noexcept +{ + return this->page_ids_; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +batt::StatusOr PageDeviceCache::find_or_insert( + PageId key, const std::function& initialize) +{ + BATT_CHECK_EQ(PageIdFactory::get_device_id(key), this->page_ids_.get_device_id()); + + // Lookup the cache table entry for the given page id. + // + const i64 physical_page = this->page_ids_.get_physical_page(key); + std::atomic& slot_index_ref = this->get_slot_index_ref(physical_page); + + // Initialized lazily (at most once) below, only when we discover we might + // need them. + // + Optional new_slot; + + // Let's take a look at what's there now... + // + usize observed_slot_index = slot_index_ref.load(); + + for (;;) { + // If the current index is invalid, then there's no point trying to pin it, so check that + // first. + // + if (observed_slot_index != kInvalidIndex) { + // If the CAS at the end of this loop failed spuriously, we might end up here... + // + if (new_slot && observed_slot_index == new_slot->slot_index) { + break; + } + + // Looks like there is already a slot for this physical page... let's try to pin it to see + // if it still contains the desired page. + // + PageCacheSlot* slot = this->slot_pool_->get_slot(observed_slot_index); + PageCacheSlot::PinnedRef pinned = slot->acquire_pin(key); + if (pinned) { + if (new_slot) { + BATT_CHECK(new_slot->pinned_ref); + BATT_CHECK_NE(slot->value(), new_slot->pinned_ref.value()); + + // [tastolfi 2024-02-09] I can't think of a reason why the new_value would ever be visible + // to anyone if we go down this code path, but just in case, resolve the Latch with a + // unique status code so that we don't get hangs waiting for pages to load. + // + new_slot->pinned_ref.value()->set_error( + ::llfs::make_status(StatusCode::kPageCacheSlotNotInitialized)); + } + + // Refresh the LTS. + // + slot->update_latest_use(); + + // Done! (Found existing value) + // + return {std::move(pinned)}; + } + } + + // No existing value found, or pin failed; allocate a new slot, fill it, and attempt to CAS it + // into the cache array. + // + if (!new_slot) { + new_slot.emplace(); + new_slot->p_slot = this->slot_pool_->allocate(); + if (!new_slot->p_slot) { + return ::llfs::make_status(StatusCode::kCacheSlotsFull); + } + BATT_CHECK(!new_slot->p_slot->is_valid()); + + new_slot->pinned_ref = new_slot->p_slot->fill(key); + new_slot->slot_index = new_slot->p_slot->index(); + + BATT_CHECK_EQ(new_slot->p_slot, this->slot_pool_->get_slot(new_slot->slot_index)); + } + BATT_CHECK_NE(new_slot->slot_index, kInvalidIndex); + + // If we can atomically overwrite the slot index value we saw above (CAS), then we are done! + // + if (slot_index_ref.compare_exchange_weak(observed_slot_index, new_slot->slot_index)) { + break; + } + } + + BATT_CHECK(new_slot); + + // We purposely delayed this step until we knew that this thread must initialize the cache + // slot's Latch. This function will probably start I/O (or do something else in a test case...) + // + initialize(new_slot->pinned_ref); + + // Done! (Inserted new value) + // + return {std::move(new_slot->pinned_ref)}; +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +void PageDeviceCache::erase(PageId key) +{ + BATT_CHECK_EQ(PageIdFactory::get_device_id(key), this->page_ids_.get_device_id()); + + // Lookup the cache table entry for the given page id. + // + const i64 physical_page = this->page_ids_.get_physical_page(key); + std::atomic& slot_index_ref = this->get_slot_index_ref(physical_page); + + usize slot_index = slot_index_ref.load(); + if (slot_index == kInvalidIndex) { + return; + } + + // Helper function. + // + const auto invalidate_ref = [&] { + const usize slot_index_to_erase = slot_index; + do { + if (slot_index_ref.compare_exchange_weak(slot_index, kInvalidIndex)) { + break; + } + } while (slot_index == slot_index_to_erase); + }; + + // If the slot is still holding the passed id, then clear it out. + // + PageCacheSlot* slot = this->slot_pool_->get_slot(slot_index); + if (slot->evict_if_key_equals(key)) { + invalidate_ref(); + + // Important! Only clear the slot once we have invalidated our table entry. + // + slot->clear(); + + } else { + // If we weren't able to evict `key`, we can still try to read the slot to see if it contains + // `key` but is non-evictable (because there are outstanding pins); if this is the case, then + // clear it from our table. + // + PageCacheSlot::PinnedRef pinned = slot->acquire_pin(PageId{}, /*ignore_key=*/true); + if (pinned) { + const PageId observed_key = pinned.key(); + if (observed_key == key || + PageIdFactory::get_device_id(observed_key) != this->page_ids_.get_device_id()) { + invalidate_ref(); + if (observed_key == key) { + slot->set_obsolete_hint(); + } + } else { + // The table contains an older or newer generation of the same physical page; leave it + // alone! + // + BATT_CHECK_EQ(this->page_ids_.get_physical_page(observed_key), physical_page) + << BATT_INSPECT(key) << BATT_INSPECT(observed_key); + } + } + } +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +std::atomic& PageDeviceCache::get_slot_index_ref(i64 physical_page) +{ + static_assert(sizeof(std::atomic) == sizeof(usize)); + static_assert(alignof(std::atomic) == alignof(usize)); + + BATT_CHECK_LT((usize)physical_page, this->cache_.size()); + + return reinterpret_cast&>(this->cache_[physical_page]); +} + +} //namespace llfs diff --git a/src/llfs/page_device_cache.hpp b/src/llfs/page_device_cache.hpp new file mode 100644 index 0000000..a88095a --- /dev/null +++ b/src/llfs/page_device_cache.hpp @@ -0,0 +1,92 @@ +//#=##=##=#==#=#==#===#+==#+==========+==+=+=+=+=+=++=+++=+++++=-++++=-+++++++++++ +// +// Part of the LLFS Project, under Apache License v2.0. +// See https://www.apache.org/licenses/LICENSE-2.0 for license information. +// SPDX short identifier: Apache-2.0 +// +//+++++++++++-+-+--+----- --- -- - - - - + +#pragma once +#ifndef LLFS_PAGE_DEVICE_CACHE_HPP +#define LLFS_PAGE_DEVICE_CACHE_HPP + +#include +// +#include +#include +#include +#include + +#include + +#include + +#include +#include +#include +#include + +namespace llfs { + +/** \brief A lock-free cache for a single PageDevice. + * + * This cache is populated with slots from a pool passed in at construction time. This pool may be + * shared among many different per-device caches. If there is memory pressure, cached data may be + * evicted (i.e. stolen) from a cache that hasn't accessed it in a while and given to another cache + * that is using the same pool. If the data is pinned, however, this will never happen. + */ +class PageDeviceCache +{ + public: + static constexpr usize kInvalidIndex = ~usize{0}; + + //+++++++++++-+-+--+----- --- -- - - - - + + explicit PageDeviceCache(const PageIdFactory& page_ids, + boost::intrusive_ptr&& slot_pool) noexcept; + + PageDeviceCache(const PageDeviceCache&) = delete; + PageDeviceCache& operator=(const PageDeviceCache&) = delete; + + ~PageDeviceCache() noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + + const PageCacheSlot::Pool::Metrics& metrics() const noexcept + { + return this->slot_pool_->metrics(); + } + + /** \brief Returns the PageDevice id factory passed in at construction time. + */ + const PageIdFactory& page_ids() const noexcept; + + /** \brief Returns a PinnedRef to the cache slot for the given page. + * + * If the specified page is was not present in the cache, then the initialize function will be + * called to start the process of loading the page data into the slot. + */ + batt::StatusOr find_or_insert( + PageId key, const std::function& initialize); + + /** \brief Removes the specified key from this cache, if it is currently present. + */ + void erase(PageId key); + + //+++++++++++-+-+--+----- --- -- - - - - + private: + /** \brief Returns a reference to the atomic cache slot index integer for the given physical page + * on the device for this cache. + */ + std::atomic& get_slot_index_ref(i64 physical_page); + + //+++++++++++-+-+--+----- --- -- - - - - + + const PageIdFactory page_ids_; + boost::intrusive_ptr slot_pool_; + std::vector cache_; +}; + +} //namespace llfs + +#endif // LLFS_PAGE_DEVICE_CACHE_HPP diff --git a/src/llfs/page_id.hpp b/src/llfs/page_id.hpp index 4d8412c..47e3212 100644 --- a/src/llfs/page_id.hpp +++ b/src/llfs/page_id.hpp @@ -10,8 +10,8 @@ #ifndef LLFS_PAGE_ID_HPP #define LLFS_PAGE_ID_HPP -#include #include +#include #include #include diff --git a/src/llfs/page_id_slot.cpp b/src/llfs/page_id_slot.cpp index c210846..4f8be0c 100644 --- a/src/llfs/page_id_slot.cpp +++ b/src/llfs/page_id_slot.cpp @@ -27,21 +27,21 @@ namespace llfs { //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -batt::StatusOr PageIdSlot::load_through(PageLoader& loader, - const Optional& required_layout, - PinPageToJob pin_page_to_job, - OkIfNotFound ok_if_not_found) const noexcept +/*static*/ batt::StatusOr PageIdSlot::load_through_impl( + PageCacheSlot::AtomicRef& cache_slot_ref, PageLoader& loader, + const Optional& required_layout, PinPageToJob pin_page_to_job, + OkIfNotFound ok_if_not_found, PageId page_id) noexcept { { - batt::StatusOr pinned = this->try_pin(); + batt::StatusOr pinned = Self::try_pin_impl(cache_slot_ref, page_id); if (pinned.ok()) { return pinned; } } batt::StatusOr pinned = loader.get_page_with_layout_in_job( - this->page_id, required_layout, pin_page_to_job, ok_if_not_found); + page_id, required_layout, pin_page_to_job, ok_if_not_found); if (pinned.ok()) { - this->cache_slot_ref = pinned->get_cache_slot(); + cache_slot_ref = pinned->get_cache_slot(); } return pinned; @@ -49,15 +49,12 @@ batt::StatusOr PageIdSlot::load_through(PageLoader& loader, //==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - // -batt::StatusOr PageIdSlot::try_pin() const noexcept +/*static*/ batt::StatusOr PageIdSlot::try_pin_impl( + PageCacheSlot::AtomicRef& cache_slot_ref, PageId page_id) noexcept { PageIdSlot::metrics().load_total_count.fetch_add(1); - const page_id_int id_val = this->page_id.int_value(); - - PinnedCacheSlot>> cache_slot = - this->cache_slot_ref.pin(id_val); - + PageCacheSlot::PinnedRef cache_slot = cache_slot_ref.pin(page_id); if (!cache_slot) { PageIdSlot::metrics().load_slot_miss_count.fetch_add(1); return make_status(StatusCode::kPinFailedPageEvicted); @@ -71,4 +68,22 @@ batt::StatusOr PageIdSlot::try_pin() const noexcept return PinnedPage{page_view->get(), std::move(cache_slot)}; } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +batt::StatusOr PageIdSlot::load_through(PageLoader& loader, + const Optional& required_layout, + PinPageToJob pin_page_to_job, + OkIfNotFound ok_if_not_found) const noexcept +{ + return Self::load_through_impl(this->cache_slot_ref, loader, required_layout, pin_page_to_job, + ok_if_not_found, this->page_id); +} + +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +batt::StatusOr PageIdSlot::try_pin() const noexcept +{ + return Self::try_pin_impl(this->cache_slot_ref, this->page_id); +} + } // namespace llfs diff --git a/src/llfs/page_id_slot.hpp b/src/llfs/page_id_slot.hpp index 32fcee1..a6e7cd8 100644 --- a/src/llfs/page_id_slot.hpp +++ b/src/llfs/page_id_slot.hpp @@ -11,7 +11,7 @@ #define LLFS_PAGE_ID_SLOT_HPP #include -#include +#include #include #include @@ -44,15 +44,14 @@ bool bool_from(PinPageToJob pin_page, bool default_value); struct PageIdSlot { using Self = PageIdSlot; - using AtomicCacheSlotRefT = - AtomicCacheSlotRef>>; - struct Metrics { CountMetric load_total_count; CountMetric load_slot_hit_count; CountMetric load_slot_miss_count; }; + //+++++++++++-+-+--+----- --- -- - - - - + static Metrics& metrics() { static Metrics m_; @@ -69,8 +68,33 @@ struct PageIdSlot { static Self from_pinned_page(const PinnedPage& pinned); + /** \brief Attempts to pin the passed cache slot using the specified `page_id`; if this fails, + * then falls back on loading the page from the `loader`, updating `cache_slot_ref` if successful. + */ + static batt::StatusOr load_through_impl(PageCacheSlot::AtomicRef& cache_slot_ref, + PageLoader& loader, + const Optional& required_layout, + PinPageToJob pin_page_to_job, + OkIfNotFound ok_if_not_found, + PageId page_id) noexcept; + + /** \brief Attempts to pin the slot using the specified page_id. + * + * If pin succeeded, but the page failed to load into the slot when it was originally added to the + * cache, then the page load error status code is returned. + * + * \return The PinnedPage if successful, llfs::StatusCode::kPinFailedPageEvicted otherwise (unless + * load error; see above) + */ + static batt::StatusOr try_pin_impl(PageCacheSlot::AtomicRef& cache_slot_ref, + PageId page_id) noexcept; + + //+++++++++++-+-+--+----- --- -- - - - - + PageId page_id; - mutable AtomicCacheSlotRefT cache_slot_ref; + mutable PageCacheSlot::AtomicRef cache_slot_ref; + + //+++++++++++-+-+--+----- --- -- - - - - operator PageId() const { @@ -91,7 +115,7 @@ struct PageIdSlot { { if (BATT_HINT_TRUE(id != this->page_id)) { this->page_id = id; - this->cache_slot_ref = AtomicCacheSlotRefT{}; + this->cache_slot_ref = PageCacheSlot::AtomicRef{}; } return *this; } diff --git a/src/llfs/page_loader.cpp b/src/llfs/page_loader.cpp index 3b5a70c..583853f 100644 --- a/src/llfs/page_loader.cpp +++ b/src/llfs/page_loader.cpp @@ -100,4 +100,15 @@ StatusOr PageLoader::get_page(PageId page_id, OkIfNotFound ok_if_not return this->get_page_with_layout(page_id, /*required_layout=*/None, ok_if_not_found); } +//==#==========+==+=+=++=+++++++++++-+-+--+----- --- -- - - - - +// +StatusOr PageLoader::get_page_slot_ref_with_layout_in_job( + PageId page_id, PageCacheSlot::AtomicRef& slot_ref, + const Optional& required_layout, PinPageToJob pin_page_to_job, + OkIfNotFound ok_if_not_found) +{ + return PageIdSlot::load_through_impl(slot_ref, *this, required_layout, pin_page_to_job, + ok_if_not_found, page_id); +} + } // namespace llfs diff --git a/src/llfs/page_loader.hpp b/src/llfs/page_loader.hpp index e48f5b5..da1272a 100644 --- a/src/llfs/page_loader.hpp +++ b/src/llfs/page_loader.hpp @@ -64,6 +64,11 @@ class PageLoader virtual StatusOr get_page(PageId page_id, OkIfNotFound ok_if_not_found); + virtual StatusOr get_page_slot_ref_with_layout_in_job( + PageId page_id, PageCacheSlot::AtomicRef& slot_ref, + const Optional& required_layout, PinPageToJob pin_page_to_job, + OkIfNotFound ok_if_not_found); + protected: PageLoader() = default; }; diff --git a/src/llfs/page_recycler.hpp b/src/llfs/page_recycler.hpp index c9676bf..c0ba830 100644 --- a/src/llfs/page_recycler.hpp +++ b/src/llfs/page_recycler.hpp @@ -12,6 +12,7 @@ #include // +#include #include #include #include diff --git a/src/llfs/page_view.hpp b/src/llfs/page_view.hpp index 38cccbb..c25be68 100644 --- a/src/llfs/page_view.hpp +++ b/src/llfs/page_view.hpp @@ -16,7 +16,7 @@ #include #include #include -#include +//#include #include #include diff --git a/src/llfs/pinned_page.hpp b/src/llfs/pinned_page.hpp index 669526f..697414c 100644 --- a/src/llfs/pinned_page.hpp +++ b/src/llfs/pinned_page.hpp @@ -10,7 +10,7 @@ #ifndef LLFS_PINNED_PAGE_HPP #define LLFS_PINNED_PAGE_HPP -#include +#include #include #include @@ -28,8 +28,7 @@ class PageView; // class PinnedPage { - using PinnedCacheSlotT = - PinnedCacheSlot>>; + using PinnedCacheSlotT = PageCacheSlot::PinnedRef; public: PinnedPage() = default; @@ -65,7 +64,7 @@ class PinnedPage void hint_obsolete() const { if (this->pinned_cache_slot_) { - this->pinned_cache_slot_.slot()->set_obsolete_hint(true); + this->pinned_cache_slot_.slot()->set_obsolete_hint(); } } diff --git a/src/llfs/status_code.cpp b/src/llfs/status_code.cpp index 395762b..ab224c8 100644 --- a/src/llfs/status_code.cpp +++ b/src/llfs/status_code.cpp @@ -131,6 +131,8 @@ bool initialize_status_codes() "PackedPageHeader::layout_id does not match PageView::get_page_layout_id()"), // 61, CODE_WITH_MSG_(StatusCode::kPutViewUnknownLayoutId, "PageCache::put_view failed; page layout id is not registered"), // 62, + CODE_WITH_MSG_(StatusCode::kPageCacheSlotNotInitialized, + "The page cache slot for this PageId is not initialized"), // 63, }); return initialized; } diff --git a/src/llfs/status_code.hpp b/src/llfs/status_code.hpp index aa7a1f9..127b5a8 100644 --- a/src/llfs/status_code.hpp +++ b/src/llfs/status_code.hpp @@ -78,6 +78,7 @@ enum struct StatusCode { kPageReaderConflict = 60, kPageHeaderBadLayoutId = 61, kPutViewUnknownLayoutId = 62, + kPageCacheSlotNotInitialized = 63, }; bool initialize_status_codes(); diff --git a/src/llfs/storage_context.test.cpp b/src/llfs/storage_context.test.cpp index 19ed930..f3836af 100644 --- a/src/llfs/storage_context.test.cpp +++ b/src/llfs/storage_context.test.cpp @@ -129,11 +129,13 @@ TEST(StorageContextTest, GetPageCache) ASSERT_TRUE(cache.ok()) << BATT_INSPECT(cache.status()); ASSERT_NE(*cache, nullptr); - llfs::Slice arenas_4kb = (*cache)->arenas_for_page_size(4 * kKiB); - EXPECT_EQ(arenas_4kb.size(), 1u); + llfs::Slice devices_4kb = + (*cache)->devices_with_page_size(4 * kKiB); + EXPECT_EQ(devices_4kb.size(), 1u); - llfs::Slice arenas_2mb = (*cache)->arenas_for_page_size(2 * kMiB); - EXPECT_EQ(arenas_2mb.size(), 1u); + llfs::Slice devices_2mb = + (*cache)->devices_with_page_size(2 * kMiB); + EXPECT_EQ(devices_2mb.size(), 1u); } } // namespace diff --git a/src/llfs/volume.cpp b/src/llfs/volume.cpp index d46b846..6ce61c7 100644 --- a/src/llfs/volume.cpp +++ b/src/llfs/volume.cpp @@ -161,7 +161,9 @@ u64 Volume::calculate_grant_size(const AppendableJob& appendable) const metadata.ids->recycler_uuid, metadata.ids->trimmer_uuid, }) { - for (const PageArena& arena : cache->all_arenas()) { + for (PageCache::PageDeviceEntry* entry : cache->all_devices()) { + BATT_CHECK_NOT_NULLPTR(entry); + const PageArena& arena = entry->arena; Optional attachment = arena.allocator().get_client_attachment_status(uuid); @@ -272,8 +274,9 @@ u64 Volume::calculate_grant_size(const AppendableJob& appendable) const metadata.ids->recycler_uuid, metadata.ids->trimmer_uuid, }) { - for (const PageArena& arena : cache->all_arenas()) { - BATT_REQUIRE_OK(arena.allocator().notify_user_recovered(uuid)); + for (PageCache::PageDeviceEntry* entry : cache->all_devices()) { + BATT_CHECK_NOT_NULLPTR(entry); + BATT_REQUIRE_OK(entry->arena.allocator().notify_user_recovered(uuid)); } } } diff --git a/src/llfs/volume.test.cpp b/src/llfs/volume.test.cpp index 834c804..1ce17d9 100644 --- a/src/llfs/volume.test.cpp +++ b/src/llfs/volume.test.cpp @@ -1428,10 +1428,10 @@ class VolumeSimTest : public ::testing::Test TEST_F(VolumeSimTest, RecoverySimulation) { static const u32 kInitialSeed = // - batt::getenv_as("LLFS_VOLUME_SIM_SEED").value_or(253689123); + batt::getenv_as("LLFS_VOLUME_SIM_SEED").value_or(987654321); static const u32 kNumSeeds = // - batt::getenv_as("LLFS_VOLUME_SIM_COUNT").value_or(256); + batt::getenv_as("LLFS_VOLUME_SIM_COUNT").value_or(2500); static const u32 kCpuPin = // batt::getenv_as("LLFS_VOLUME_SIM_CPU").value_or(0); @@ -1585,9 +1585,11 @@ TEST_F(VolumeSimTest, ConcurrentAppendJobs) // constexpr i32 kExpectedRefCount = 2; - for (const llfs::PageArena& arena : sim.cache()->arenas_for_page_size(1 * kKiB)) { + for (llfs::PageCache::PageDeviceEntry* entry : + sim.cache()->devices_with_page_size(1 * kKiB)) { + BATT_CHECK_NOT_NULLPTR(entry); for (llfs::PageId page_id : page_ids) { - EXPECT_EQ(arena.allocator().get_ref_count(page_id).first, kExpectedRefCount); + EXPECT_EQ(entry->arena.allocator().get_ref_count(page_id).first, kExpectedRefCount); } break; } @@ -1732,7 +1734,7 @@ void VolumeSimTest::commit_first_job(RecoverySimState& state, llfs::StorageSimul batt::StatusOr slot_range = this->commit_job_to_root_log(std::move(job), state.first_page_id, volume, sim); - ASSERT_TRUE(slot_range.ok()) << BATT_INSPECT(slot_range.status()); + ASSERT_TRUE(slot_range.ok()) << BATT_INSPECT(slot_range.status()) << BATT_INSPECT(state.seed); sim.log_event("first job successfully appended! slot_range=", *slot_range); } @@ -1855,45 +1857,57 @@ void VolumeSimTest::verify_post_recovery_expectations(RecoverySimState& state, if (state.recovered_second_page) { EXPECT_FALSE(state.second_job_will_not_commit); - for (const llfs::PageArena& arena : sim.cache()->arenas_for_page_size(1 * kKiB)) { - EXPECT_EQ(arena.allocator().free_pool_size(), this->pages_per_device - 1); - EXPECT_EQ(arena.allocator().get_ref_count(state.first_page_id).first, 3); + for (llfs::PageCache::PageDeviceEntry* entry : + sim.cache()->devices_with_page_size(1 * kKiB)) { + BATT_CHECK_NOT_NULLPTR(entry); + EXPECT_EQ(entry->arena.allocator().free_pool_size(), this->pages_per_device - 1); + EXPECT_EQ(entry->arena.allocator().get_ref_count(state.first_page_id).first, 3); ASSERT_TRUE(sim.has_data_for_page_id(state.first_page_id).ok()); EXPECT_TRUE(*sim.has_data_for_page_id(state.first_page_id)); } - for (const llfs::PageArena& arena : sim.cache()->arenas_for_page_size(2 * kKiB)) { - EXPECT_EQ(arena.allocator().free_pool_size(), this->pages_per_device - 1); - EXPECT_EQ(arena.allocator().get_ref_count(state.second_root_page_id).first, 2); + for (llfs::PageCache::PageDeviceEntry* entry : + sim.cache()->devices_with_page_size(2 * kKiB)) { + BATT_CHECK_NOT_NULLPTR(entry); + EXPECT_EQ(entry->arena.allocator().free_pool_size(), this->pages_per_device - 1); + EXPECT_EQ(entry->arena.allocator().get_ref_count(state.second_root_page_id).first, 2); ASSERT_TRUE(sim.has_data_for_page_id(state.second_root_page_id).ok()); EXPECT_TRUE(*sim.has_data_for_page_id(state.second_root_page_id)); } - for (const llfs::PageArena& arena : sim.cache()->arenas_for_page_size(4 * kKiB)) { - EXPECT_EQ(arena.allocator().free_pool_size(), this->pages_per_device - 1); - EXPECT_EQ(arena.allocator().get_ref_count(state.third_page_id).first, 2); + for (llfs::PageCache::PageDeviceEntry* entry : + sim.cache()->devices_with_page_size(4 * kKiB)) { + BATT_CHECK_NOT_NULLPTR(entry); + EXPECT_EQ(entry->arena.allocator().free_pool_size(), this->pages_per_device - 1); + EXPECT_EQ(entry->arena.allocator().get_ref_count(state.third_page_id).first, 2); ASSERT_TRUE(sim.has_data_for_page_id(state.third_page_id).ok()); EXPECT_TRUE(*sim.has_data_for_page_id(state.third_page_id)); } } else { - for (const llfs::PageArena& arena : sim.cache()->arenas_for_page_size(1 * kKiB)) { - EXPECT_EQ(arena.allocator().free_pool_size(), this->pages_per_device - 1); - EXPECT_EQ(arena.allocator().get_ref_count(state.first_page_id).first, 2); + for (llfs::PageCache::PageDeviceEntry* entry : + sim.cache()->devices_with_page_size(1 * kKiB)) { + BATT_CHECK_NOT_NULLPTR(entry); + EXPECT_EQ(entry->arena.allocator().free_pool_size(), this->pages_per_device - 1); + EXPECT_EQ(entry->arena.allocator().get_ref_count(state.first_page_id).first, 2); ASSERT_TRUE(sim.has_data_for_page_id(state.first_page_id).ok()); EXPECT_TRUE(*sim.has_data_for_page_id(state.first_page_id)); } - for (const llfs::PageArena& arena : sim.cache()->arenas_for_page_size(2 * kKiB)) { - EXPECT_EQ(arena.allocator().free_pool_size(), this->pages_per_device); + for (llfs::PageCache::PageDeviceEntry* entry : + sim.cache()->devices_with_page_size(2 * kKiB)) { + BATT_CHECK_NOT_NULLPTR(entry); + EXPECT_EQ(entry->arena.allocator().free_pool_size(), this->pages_per_device); if (state.second_root_page_id.is_valid()) { - EXPECT_EQ(arena.allocator().get_ref_count(state.second_root_page_id).first, 0); + EXPECT_EQ(entry->arena.allocator().get_ref_count(state.second_root_page_id).first, 0); if (!llfs::Volume::write_new_pages_asap()) { ASSERT_TRUE(sim.has_data_for_page_id(state.second_root_page_id).ok()); EXPECT_FALSE(*sim.has_data_for_page_id(state.second_root_page_id)); } } } - for (const llfs::PageArena& arena : sim.cache()->arenas_for_page_size(4 * kKiB)) { - EXPECT_EQ(arena.allocator().free_pool_size(), this->pages_per_device); + for (llfs::PageCache::PageDeviceEntry* entry : + sim.cache()->devices_with_page_size(4 * kKiB)) { + BATT_CHECK_NOT_NULLPTR(entry); + EXPECT_EQ(entry->arena.allocator().free_pool_size(), this->pages_per_device); if (state.third_page_id.is_valid()) { - EXPECT_EQ(arena.allocator().get_ref_count(state.third_page_id).first, 0); + EXPECT_EQ(entry->arena.allocator().get_ref_count(state.third_page_id).first, 0); if (!llfs::Volume::write_new_pages_asap()) { ASSERT_TRUE(sim.has_data_for_page_id(state.third_page_id).ok()); EXPECT_FALSE(*sim.has_data_for_page_id(state.third_page_id));