From 9dd569b4bc95a88422509bb94da7472a8ec4d16f Mon Sep 17 00:00:00 2001 From: Colin Dellow Date: Sat, 5 Oct 2024 12:10:47 -0400 Subject: [PATCH] switch to using flock for synchronization Still to do: - use WAL - buffer saveTile calls - after all writers are done, disable journal Current issue: enabling WAL seems to cause a logic error? --- include/mbtiles.h | 12 +++- src/mbtiles.cpp | 58 ++++++++++++++-- src/tilemaker.cpp | 167 +++++++++++++++++++--------------------------- 3 files changed, 132 insertions(+), 105 deletions(-) diff --git a/include/mbtiles.h b/include/mbtiles.h index f02c367b..926477d0 100644 --- a/include/mbtiles.h +++ b/include/mbtiles.h @@ -16,6 +16,15 @@ struct PendingStatement { bool isMerge; }; +class Flock { +public: + Flock(int fd); + ~Flock(); + +private: + int fd_; +}; + /** \brief Write to MBTiles (sqlite) database * * (note that sqlite_modern_cpp.h is very slightly changed from the original, for blob support and an .init method) @@ -24,6 +33,7 @@ class MBTiles { sqlite::database db; std::vector preparedStatements; std::mutex m; + int lockfd; bool inTransaction; std::string filename; @@ -41,7 +51,7 @@ class MBTiles { void saveTile(int zoom, int x, int y, std::string *data, bool isMerge); void closeForWriting(); - void populateTiles(std::vector& zooms, std::vector& extents); + void populateTiles(bool verbose, std::vector& zooms, std::vector& extents); void openForReading(std::string &filename); void readBoundingBox(double &minLon, double &maxLon, double &minLat, double &maxLat); std::vector readTile(int zoom, int col, int row); diff --git a/src/mbtiles.cpp b/src/mbtiles.cpp index b55f5698..4306cb50 100644 --- a/src/mbtiles.cpp +++ b/src/mbtiles.cpp @@ -8,23 +8,62 @@ #include #include #include +#include +#include using namespace sqlite; using namespace std; namespace bio = boost::iostreams; +Flock::Flock(int fd) { + fd_ = 0; + + int rv = flock(fd, LOCK_EX); + if (rv == 0) + fd_ = fd; + else + throw std::runtime_error("failed to flock"); +} + +Flock::~Flock() { + if (fd_) + flock(fd_, LOCK_UN); +} + MBTiles::MBTiles(): + inTransaction(false), pendingStatements1(std::make_shared>()), pendingStatements2(std::make_shared>()) -{} +{ + lockfd = 0; + lockfd = open("./lockfile", O_CREAT, 0644); + if (lockfd == -1) + throw std::runtime_error("failed to open lockfile"); + + //std::cout << "lockfd=" << std::to_string(lockfd) << std::endl; +} MBTiles::~MBTiles() { - if (db && inTransaction) db << "COMMIT;"; // commit all the changes if open + { + Flock lock(lockfd); + if (db && inTransaction) { + db << "COMMIT;"; // commit all the changes if open + } + + // Reset the DB member so that the sqlite3_close_v2() command is called + // inside of the flock + (void)[v=std::move(db)]{}; + } + + if (lockfd) { + close(lockfd); + } } // ---- Write .mbtiles void MBTiles::openForWriting(string &filename) { + Flock lock(lockfd); db.init(filename, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE); this->filename = filename; @@ -42,7 +81,7 @@ void MBTiles::openForWriting(string &filename) { try { db << "PRAGMA journal_mode=OFF;"; } catch(runtime_error &e) { - cout << "Couldn't turn journaling off (not fatal): " << e.what() << endl; + cout << "Couldn't turn journaling on (not fatal): " << e.what() << endl; } db << "PRAGMA page_size = 65536;"; db << "VACUUM;"; // make sure page_size takes effect @@ -52,9 +91,9 @@ void MBTiles::openForWriting(string &filename) { preparedStatements.emplace_back(db << "INSERT INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?,?,?,?);"); preparedStatements.emplace_back(db << "REPLACE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?,?,?,?);"); - db << "BEGIN;"; // begin a transaction cout << "Creating mbtiles at " << filename << endl; - inTransaction = true; +// db << "BEGIN;"; // begin a transaction +// inTransaction = true; } void MBTiles::writeMetadata(string key, string value) { @@ -88,6 +127,9 @@ void MBTiles::flushPendingStatements() { } void MBTiles::saveTile(int zoom, int x, int y, string *data, bool isMerge) { + Flock lock(lockfd); + // TODO: consider buffering these and flushing only every, say, 1000 rows + // If the lock is available, write directly to SQLite. if (m.try_lock()) { insertOrReplace(zoom, x, y, *data, isMerge); @@ -100,7 +142,7 @@ void MBTiles::saveTile(int zoom, int x, int y, string *data, bool isMerge) { } } -void MBTiles::populateTiles(std::vector& zooms, std::vector& extents) { +void MBTiles::populateTiles(bool verbose, std::vector& zooms, std::vector& extents) { size_t tiles = 0; db << "SELECT zoom_level,tile_column,tile_row FROM tiles" >> [&](int z,int col, int row) { tiles++; @@ -111,10 +153,12 @@ void MBTiles::populateTiles(std::vector& zooms, std:: if (row > extents[z].maxY) extents[z].maxY = row; if (row < extents[z].minY) extents[z].minY = row; }; - std::cerr << filename << " had " << std::to_string(tiles) << " tiles" << std::endl; + if (verbose) + std::cout << filename << " had " << std::to_string(tiles) << " tiles" << std::endl; } void MBTiles::closeForWriting() { + Flock lock(lockfd); flushPendingStatements(); preparedStatements[0].used(true); preparedStatements[1].used(true); diff --git a/src/tilemaker.cpp b/src/tilemaker.cpp index d7fe7bfd..850ec15d 100644 --- a/src/tilemaker.cpp +++ b/src/tilemaker.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include #include "rapidjson/document.h" @@ -44,7 +43,6 @@ #include "mbtiles.h" -#include #include #ifndef TM_VERSION @@ -78,11 +76,30 @@ struct Input { * Worker threads write the output tiles, and start in the outputProc function. */ int main(const int argc, const char* argv[]) { - uint16_t threadNum = max(thread::hardware_concurrency(), 1u); + uint64_t shards = 1; + uint64_t shard = 0; + + if (getenv("SHARDS") != NULL) { + shards = atoi(getenv("SHARDS")); + } + + if (getenv("SHARD") != NULL) { + shard = atoi(getenv("SHARD")); + } + + std::cout << "shards=" << std::to_string(shards) << " shard=" << std::to_string(shard) << std::endl; + + if (shard >= shards) { + std::cerr << "fatal: shard must be less than shards" << std::endl; + return 1; + } + + std::vector filenames; for (int i = 1; i < argc; i++) { filenames.push_back(std::string(argv[i])); - std::cout << "arg " << std::to_string(i) << ": " << filenames.back() << std::endl; + if (false && shard == 0) + std::cout << "arg " << std::to_string(i) << ": " << filenames.back() << std::endl; } if (filenames.empty()) { @@ -90,7 +107,7 @@ int main(const int argc, const char* argv[]) { return 1; } - { + if (false) { // See https://github.com/xerial/sqlite-jdbc/issues/59#issuecomment-162115704 int rv; rv = sqlite3_config(SQLITE_CONFIG_MEMSTATUS, 0); @@ -129,117 +146,73 @@ int main(const int argc, const char* argv[]) { std::numeric_limits::min() }); } - } - - { - boost::asio::thread_pool pool(threadNum); - - for (auto& input : inputs) { - // TODO: this segfaults when run in the thread pool, which _is_ - // what I expect SQLITE_CONFIG_SINGLETHREAD to do... - // - // ...but why does this die, and our very similar use immediately - // below does not? - boost::asio::post(pool, [&]() { - // Determine which tiles exist in this mbtiles file. - // - // This lets us optimize the case where only a single mbtiles has - // a tile for a given zxy, as we can copy the bytes directly. - // - //MBTiles mbtiles; - //mbtiles.openForReading(input->filename); - //mbtiles.populateTiles(input->zooms, input->bbox); - if (tlsTiles.empty()) { - for (const auto& input : inputs) { - tlsTiles.push_back(std::make_shared()); - tlsTiles.back()->openForReading(input->filename); - } - } - - //input->mbtiles.populateTiles(input->zooms, input->bbox); - tlsTiles[input->index]->populateTiles(input->zooms, input->bbox); - }); - } - pool.join(); + input->mbtiles.populateTiles(shard == 0, input->zooms, input->bbox); } std::string MergedFilename("merged.mbtiles"); - remove(MergedFilename.c_str()); + + if (shards == 1) { + // When we're running on the entire dataset, remove the merged.mbtiles file. + // Otherwise, we rely on tile-smush-parallel to do this. + remove(MergedFilename.c_str()); + } MBTiles merged; + std::cerr << "openForWriting() " << std::endl; merged.openForWriting(MergedFilename); - { - boost::asio::thread_pool pool(threadNum); + std::vector matching; + for (int zoom = 0; zoom < 15; zoom++) { + Bbox bbox = inputs[0]->bbox[zoom]; + for (const auto& input : inputs) { + if (input->bbox[zoom].minX < bbox.minX) bbox.minX = input->bbox[zoom].minX; + if (input->bbox[zoom].minY < bbox.minY) bbox.minY = input->bbox[zoom].minY; + if (input->bbox[zoom].maxX > bbox.maxX) bbox.maxX = input->bbox[zoom].maxX; + if (input->bbox[zoom].maxY > bbox.maxY) bbox.maxY = input->bbox[zoom].maxY; + } - const uint64_t shards = threadNum * 2; - for (uint64_t shard = 0; shard < shards; shard++) { - boost::asio::post(pool, [&, shard]() { - // Ensure we have tls copies of each of the mbtiles + for (int x = bbox.minX; x < bbox.maxX; x++) { + for (int y = bbox.minY; y < bbox.maxY; y++) { + if ((x * (1 << zoom) + y) % shards != shard) + continue; - if (tlsTiles.empty()) { - for (const auto& input : inputs) { - tlsTiles.push_back(std::make_shared()); - tlsTiles.back()->openForReading(input->filename); - } + matching.clear(); + for (const auto& input : inputs) { + if (input->zooms[zoom].test(x, y)) + matching.push_back(input.get()); } - std::vector matching; - for (int zoom = 0; zoom < 15; zoom++) { - Bbox bbox = inputs[0]->bbox[zoom]; - for (const auto& input : inputs) { - if (input->bbox[zoom].minX < bbox.minX) bbox.minX = input->bbox[zoom].minX; - if (input->bbox[zoom].minY < bbox.minY) bbox.minY = input->bbox[zoom].minY; - if (input->bbox[zoom].maxX > bbox.maxX) bbox.maxX = input->bbox[zoom].maxX; - if (input->bbox[zoom].maxY > bbox.maxY) bbox.maxY = input->bbox[zoom].maxY; - } - - for (int x = bbox.minX; x < bbox.maxX; x++) { - for (int y = bbox.minY; y < bbox.maxY; y++) { - if ((x * (1 << zoom) + y) % shards != shard) - continue; - - matching.clear(); - for (const auto& input : inputs) { - if (input->zooms[zoom].test(x, y)) - matching.push_back(input.get()); - } - - if (matching.empty()) - continue; - - if (matching.size() == 1) { - // When exactly 1 mbtiles matches, it's a special case and we can - // copy directly between them. - std::vector old = tlsTiles[matching[0]->index]->readTile(zoom, x, y); - //std::vector old = matching[0]->mbtiles.readTile(zoom, x, y); - - std::string buffer(old.data(), old.size()); - // TODO: is this valid? We have a lock, but we'll access - // from different threads. This might be problematic because - // we cache the prepared statement. - merged.saveTile(zoom, x, y, &buffer, false); - continue; - } - - // Multiple mbtiles want to contribute a tile at this zxy. - // They'll all have disjoint layers, so decompress each tile - // and concatenate their contents to form the new tile. - - // TODO: do this - } - } + if (matching.empty()) + continue; + + if (matching.size() == 1) { + // When exactly 1 mbtiles matches, it's a special case and we can + // copy directly between them. + //std::vector old = tlsTiles[matching[0]->index]->readTile(zoom, x, y); + std::vector old = matching[0]->mbtiles.readTile(zoom, x, y); + + std::string buffer(old.data(), old.size()); + // TODO: is this valid? We have a lock, but we'll access + // from different threads. This might be problematic because + // we cache the prepared statement. + merged.saveTile(zoom, x, y, &buffer, false); + continue; } - }); + + // Multiple mbtiles want to contribute a tile at this zxy. + // They'll all have disjoint layers, so decompress each tile + // and concatenate their contents to form the new tile. + + // TODO: do this + } } - - pool.join(); } // TODO: Populate the `metadata` table // See https://github.com/mapbox/mbtiles-spec/blob/master/1.3/spec.md#content + std::cerr << "closeForWriting() " << std::endl; merged.closeForWriting(); }