Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FDB-427 Limit memory usage on remotefdb.read #96

Merged
merged 6 commits into from
Mar 15, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Make memory limit configurable by user config; Fix bug counting sizes
Clang format
  • Loading branch information
ChrisspyB committed Mar 14, 2025
commit 34d522ffe1c6ba1d110b88a5cfe45bc9fc9d2ab2
9 changes: 9 additions & 0 deletions src/fdb5/api/RemoteFDB.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <cstdlib>
#include <ctime>

#include "eckit/config/Resource.h"
#include "eckit/io/Buffer.h"
#include "eckit/log/Log.h"
#include "eckit/serialisation/MemoryStream.h"
@@ -14,6 +15,7 @@

#include "fdb5/remote/RemoteFieldLocation.h"
#include "fdb5/remote/client/ClientConnectionRouter.h"
#include "fdb5/remote/client/ReadLimiter.h"

using namespace fdb5::remote;
using namespace eckit;
@@ -189,6 +191,13 @@ RemoteFDB::RemoteFDB(const eckit::Configuration& config, const std::string& name
config_.set("stores", stores);
config_.set("fieldLocationEndpoints", fieldLocationEndpoints);
config_.overrideSchema(static_cast<std::string>(controlEndpoint()) + "/schema", schema);

/// @note: We must instantiate the ReadLimiter before any RemoteStores due to their static initialisation.
/// @todo: this may change in future.
static size_t memoryLimit = eckit::Resource<size_t>(
"$FDB_READ_LIMIT;fdbReadLimit",
config_.userConfig().getUnsigned("limits.read", size_t(1) * 1024 * 1024 * 1024)); // 1GiB
ReadLimiter::init(memoryLimit);
}

// -----------------------------------------------------------------------------------------------------
28 changes: 15 additions & 13 deletions src/fdb5/remote/client/ReadLimiter.cc
Original file line number Diff line number Diff line change
@@ -17,19 +17,21 @@ namespace fdb5::remote {

//----------------------------------------------------------------------------------------------------------------------
namespace {
// Initialise imediately to outlive the RemoteStores which are also static...
ReadLimiter& instance = ReadLimiter::instance();
ReadLimiter* instance_ = nullptr;
} // namespace

ReadLimiter& ReadLimiter::instance() {
static ReadLimiter limiter;
return limiter;
ASSERT(instance_);
return *instance_;
}

ReadLimiter::ReadLimiter() :
memoryUsed_{0},
memoryLimit_{eckit::Resource<size_t>("$FDB_REMOTE_READ_LIMIT", size_t(1) * 1024 * 1024 * 1024)} // 1 GiB
{}
void ReadLimiter::init(size_t memoryLimit) {
if (!instance_) {
instance_ = new ReadLimiter(memoryLimit);
}
}

ReadLimiter::ReadLimiter(size_t memoryLimit) : memoryUsed_{0}, memoryLimit_{memoryLimit} {}

void ReadLimiter::add(RemoteStore* client, uint32_t id, const FieldLocation& fieldLocation, const Key& remapKey) {
eckit::Buffer requestBuffer(4096);
@@ -67,7 +69,7 @@ bool ReadLimiter::tryNextRequest() {
}

activeRequests_[request.client->id()].insert(request.id);
resultSizes_[request.id] = request.resultSize;
resultSizes_[{request.client->id(), request.id}] = request.resultSize;
memoryUsed_ += request.resultSize;

sendRequest(request);
@@ -88,9 +90,9 @@ void ReadLimiter::finishRequest(uint32_t clientID, uint32_t requestID) {
auto it2 = it->second.find(requestID);
ASSERT(it2 != it->second.end());

memoryUsed_ -= resultSizes_[requestID];
memoryUsed_ -= resultSizes_[{clientID, requestID}];
it->second.erase(it2);
resultSizes_.erase(requestID);
resultSizes_.erase({clientID, requestID});
}

tryNextRequest();
@@ -105,8 +107,8 @@ void ReadLimiter::evictClient(size_t clientID) {

if (it != activeRequests_.end()) {
for (auto requestID : it->second) {
memoryUsed_ -= resultSizes_[requestID];
resultSizes_.erase(requestID);
memoryUsed_ -= resultSizes_[{clientID, requestID}];
resultSizes_.erase({clientID, requestID});
}
activeRequests_.erase(it);
}
10 changes: 6 additions & 4 deletions src/fdb5/remote/client/ReadLimiter.h
Original file line number Diff line number Diff line change
@@ -34,15 +34,17 @@ struct RequestInfo {
size_t resultSize; // Size of the field obtained from the store
};

// Class to limit the number of requests we send to the servers at any one time to avoid running out of memory.
// Holds a queue of all of the currently unfulfilled read requests.
// Prevents asking the servers for more data until we have consumed the data we have already received.
/// @note: Does not own any result buffers, just keeps track of their expected sizes.
/// @todo: In future, we will have more fine-grained memory limits on individual queues.
class ReadLimiter : eckit::NonCopyable {
public:

static ReadLimiter& instance();

static void init(size_t memoryLimit);

// Add a new request to the queue of requests to be sent. Will not be sent until we know we have buffer space.
void add(RemoteStore* client, uint32_t id, const FieldLocation& fieldLocation,
const Key& remapKey); // use const *?
@@ -65,7 +67,7 @@ class ReadLimiter : eckit::NonCopyable {

private:

ReadLimiter();
ReadLimiter(size_t memoryLimit);

// Send the request to the server
void sendRequest(const RequestInfo& request) const;
@@ -83,8 +85,8 @@ class ReadLimiter : eckit::NonCopyable {
// client id -> request id
std::map<uint32_t, std::set<uint32_t>> activeRequests_;

// request id -> result size
std::map<uint32_t, size_t> resultSizes_;
// {clientID, requestID} -> result size in bytes
std::map<std::pair<uint32_t, uint32_t>, size_t> resultSizes_;
};

} // namespace fdb5::remote
Loading
Oops, something went wrong.