Skip to content

Commit

Permalink
switch to using flock for synchronization
Browse files Browse the repository at this point in the history
Still to do:

- use WAL
- buffer saveTile calls
- after all writers are done, disable journal

Current issue: enabling WAL seems to cause a logic error?
  • Loading branch information
cldellow committed Oct 5, 2024
1 parent ad101f9 commit 9dd569b
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 105 deletions.
12 changes: 11 additions & 1 deletion include/mbtiles.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ struct PendingStatement {
bool isMerge;
};

class Flock {
public:
Flock(int fd);
~Flock();

private:
int fd_;
};

/** \brief Write to MBTiles (sqlite) database
*
* (note that sqlite_modern_cpp.h is very slightly changed from the original, for blob support and an .init method)
Expand All @@ -24,6 +33,7 @@ class MBTiles {
sqlite::database db;
std::vector<sqlite::database_binder> preparedStatements;
std::mutex m;
int lockfd;
bool inTransaction;
std::string filename;

Expand All @@ -41,7 +51,7 @@ class MBTiles {
void saveTile(int zoom, int x, int y, std::string *data, bool isMerge);
void closeForWriting();

void populateTiles(std::vector<PreciseTileCoordinatesSet>& zooms, std::vector<Bbox>& extents);
void populateTiles(bool verbose, std::vector<PreciseTileCoordinatesSet>& zooms, std::vector<Bbox>& extents);
void openForReading(std::string &filename);
void readBoundingBox(double &minLon, double &maxLon, double &minLat, double &maxLat);
std::vector<char> readTile(int zoom, int col, int row);
Expand Down
58 changes: 51 additions & 7 deletions src/mbtiles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,62 @@
#include <boost/iostreams/filter/gzip.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/device/array.hpp>
#include <fcntl.h>
#include <sys/file.h>

using namespace sqlite;
using namespace std;
namespace bio = boost::iostreams;

Flock::Flock(int fd) {
fd_ = 0;

int rv = flock(fd, LOCK_EX);
if (rv == 0)
fd_ = fd;
else
throw std::runtime_error("failed to flock");
}

Flock::~Flock() {
if (fd_)
flock(fd_, LOCK_UN);
}

MBTiles::MBTiles():
inTransaction(false),
pendingStatements1(std::make_shared<std::vector<PendingStatement>>()),
pendingStatements2(std::make_shared<std::vector<PendingStatement>>())
{}
{
lockfd = 0;
lockfd = open("./lockfile", O_CREAT, 0644);
if (lockfd == -1)
throw std::runtime_error("failed to open lockfile");

//std::cout << "lockfd=" << std::to_string(lockfd) << std::endl;
}

MBTiles::~MBTiles() {
if (db && inTransaction) db << "COMMIT;"; // commit all the changes if open
{
Flock lock(lockfd);
if (db && inTransaction) {
db << "COMMIT;"; // commit all the changes if open
}

// Reset the DB member so that the sqlite3_close_v2() command is called
// inside of the flock
(void)[v=std::move(db)]{};
}

if (lockfd) {
close(lockfd);
}
}

// ---- Write .mbtiles

void MBTiles::openForWriting(string &filename) {
Flock lock(lockfd);
db.init(filename, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE);
this->filename = filename;

Expand All @@ -42,7 +81,7 @@ void MBTiles::openForWriting(string &filename) {
try {
db << "PRAGMA journal_mode=OFF;";
} catch(runtime_error &e) {
cout << "Couldn't turn journaling off (not fatal): " << e.what() << endl;
cout << "Couldn't turn journaling on (not fatal): " << e.what() << endl;
}
db << "PRAGMA page_size = 65536;";
db << "VACUUM;"; // make sure page_size takes effect
Expand All @@ -52,9 +91,9 @@ void MBTiles::openForWriting(string &filename) {
preparedStatements.emplace_back(db << "INSERT INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?,?,?,?);");
preparedStatements.emplace_back(db << "REPLACE INTO tiles (zoom_level, tile_column, tile_row, tile_data) VALUES (?,?,?,?);");

db << "BEGIN;"; // begin a transaction
cout << "Creating mbtiles at " << filename << endl;
inTransaction = true;
// db << "BEGIN;"; // begin a transaction
// inTransaction = true;
}

void MBTiles::writeMetadata(string key, string value) {
Expand Down Expand Up @@ -88,6 +127,9 @@ void MBTiles::flushPendingStatements() {
}

void MBTiles::saveTile(int zoom, int x, int y, string *data, bool isMerge) {
Flock lock(lockfd);
// TODO: consider buffering these and flushing only every, say, 1000 rows

// If the lock is available, write directly to SQLite.
if (m.try_lock()) {
insertOrReplace(zoom, x, y, *data, isMerge);
Expand All @@ -100,7 +142,7 @@ void MBTiles::saveTile(int zoom, int x, int y, string *data, bool isMerge) {
}
}

void MBTiles::populateTiles(std::vector<PreciseTileCoordinatesSet>& zooms, std::vector<Bbox>& extents) {
void MBTiles::populateTiles(bool verbose, std::vector<PreciseTileCoordinatesSet>& zooms, std::vector<Bbox>& extents) {
size_t tiles = 0;
db << "SELECT zoom_level,tile_column,tile_row FROM tiles" >> [&](int z,int col, int row) {
tiles++;
Expand All @@ -111,10 +153,12 @@ void MBTiles::populateTiles(std::vector<PreciseTileCoordinatesSet>& zooms, std::
if (row > extents[z].maxY) extents[z].maxY = row;
if (row < extents[z].minY) extents[z].minY = row;
};
std::cerr << filename << " had " << std::to_string(tiles) << " tiles" << std::endl;
if (verbose)
std::cout << filename << " had " << std::to_string(tiles) << " tiles" << std::endl;
}

void MBTiles::closeForWriting() {
Flock lock(lockfd);
flushPendingStatements();
preparedStatements[0].used(true);
preparedStatements[1].used(true);
Expand Down
167 changes: 70 additions & 97 deletions src/tilemaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <boost/program_options.hpp>
#include <boost/variant.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/sort/sort.hpp>

#include "rapidjson/document.h"
Expand All @@ -44,7 +43,6 @@

#include "mbtiles.h"

#include <boost/asio/post.hpp>
#include <boost/interprocess/streams/bufferstream.hpp>

#ifndef TM_VERSION
Expand Down Expand Up @@ -78,19 +76,38 @@ struct Input {
* Worker threads write the output tiles, and start in the outputProc function.
*/
int main(const int argc, const char* argv[]) {
uint16_t threadNum = max(thread::hardware_concurrency(), 1u);
uint64_t shards = 1;
uint64_t shard = 0;

if (getenv("SHARDS") != NULL) {
shards = atoi(getenv("SHARDS"));
}

if (getenv("SHARD") != NULL) {
shard = atoi(getenv("SHARD"));
}

std::cout << "shards=" << std::to_string(shards) << " shard=" << std::to_string(shard) << std::endl;

if (shard >= shards) {
std::cerr << "fatal: shard must be less than shards" << std::endl;
return 1;
}


std::vector<std::string> filenames;
for (int i = 1; i < argc; i++) {
filenames.push_back(std::string(argv[i]));
std::cout << "arg " << std::to_string(i) << ": " << filenames.back() << std::endl;
if (false && shard == 0)
std::cout << "arg " << std::to_string(i) << ": " << filenames.back() << std::endl;
}

if (filenames.empty()) {
std::cerr << "usage: ./tile-smush file1.mbtiles file2.mbtiles [...]" << std::endl;
return 1;
}

{
if (false) {
// See https://github.com/xerial/sqlite-jdbc/issues/59#issuecomment-162115704
int rv;
rv = sqlite3_config(SQLITE_CONFIG_MEMSTATUS, 0);
Expand Down Expand Up @@ -129,117 +146,73 @@ int main(const int argc, const char* argv[]) {
std::numeric_limits<size_t>::min()
});
}
}

{
boost::asio::thread_pool pool(threadNum);

for (auto& input : inputs) {
// TODO: this segfaults when run in the thread pool, which _is_
// what I expect SQLITE_CONFIG_SINGLETHREAD to do...
//
// ...but why does this die, and our very similar use immediately
// below does not?
boost::asio::post(pool, [&]() {
// Determine which tiles exist in this mbtiles file.
//
// This lets us optimize the case where only a single mbtiles has
// a tile for a given zxy, as we can copy the bytes directly.
//
//MBTiles mbtiles;
//mbtiles.openForReading(input->filename);
//mbtiles.populateTiles(input->zooms, input->bbox);
if (tlsTiles.empty()) {
for (const auto& input : inputs) {
tlsTiles.push_back(std::make_shared<MBTiles>());
tlsTiles.back()->openForReading(input->filename);
}
}

//input->mbtiles.populateTiles(input->zooms, input->bbox);
tlsTiles[input->index]->populateTiles(input->zooms, input->bbox);
});
}

pool.join();
input->mbtiles.populateTiles(shard == 0, input->zooms, input->bbox);
}

std::string MergedFilename("merged.mbtiles");
remove(MergedFilename.c_str());

if (shards == 1) {
// When we're running on the entire dataset, remove the merged.mbtiles file.
// Otherwise, we rely on tile-smush-parallel to do this.
remove(MergedFilename.c_str());
}

MBTiles merged;
std::cerr << "openForWriting() " << std::endl;
merged.openForWriting(MergedFilename);

{
boost::asio::thread_pool pool(threadNum);
std::vector<Input*> matching;
for (int zoom = 0; zoom < 15; zoom++) {
Bbox bbox = inputs[0]->bbox[zoom];
for (const auto& input : inputs) {
if (input->bbox[zoom].minX < bbox.minX) bbox.minX = input->bbox[zoom].minX;
if (input->bbox[zoom].minY < bbox.minY) bbox.minY = input->bbox[zoom].minY;
if (input->bbox[zoom].maxX > bbox.maxX) bbox.maxX = input->bbox[zoom].maxX;
if (input->bbox[zoom].maxY > bbox.maxY) bbox.maxY = input->bbox[zoom].maxY;
}

const uint64_t shards = threadNum * 2;
for (uint64_t shard = 0; shard < shards; shard++) {
boost::asio::post(pool, [&, shard]() {
// Ensure we have tls copies of each of the mbtiles
for (int x = bbox.minX; x < bbox.maxX; x++) {
for (int y = bbox.minY; y < bbox.maxY; y++) {
if ((x * (1 << zoom) + y) % shards != shard)
continue;

if (tlsTiles.empty()) {
for (const auto& input : inputs) {
tlsTiles.push_back(std::make_shared<MBTiles>());
tlsTiles.back()->openForReading(input->filename);
}
matching.clear();
for (const auto& input : inputs) {
if (input->zooms[zoom].test(x, y))
matching.push_back(input.get());
}

std::vector<Input*> matching;
for (int zoom = 0; zoom < 15; zoom++) {
Bbox bbox = inputs[0]->bbox[zoom];
for (const auto& input : inputs) {
if (input->bbox[zoom].minX < bbox.minX) bbox.minX = input->bbox[zoom].minX;
if (input->bbox[zoom].minY < bbox.minY) bbox.minY = input->bbox[zoom].minY;
if (input->bbox[zoom].maxX > bbox.maxX) bbox.maxX = input->bbox[zoom].maxX;
if (input->bbox[zoom].maxY > bbox.maxY) bbox.maxY = input->bbox[zoom].maxY;
}

for (int x = bbox.minX; x < bbox.maxX; x++) {
for (int y = bbox.minY; y < bbox.maxY; y++) {
if ((x * (1 << zoom) + y) % shards != shard)
continue;

matching.clear();
for (const auto& input : inputs) {
if (input->zooms[zoom].test(x, y))
matching.push_back(input.get());
}

if (matching.empty())
continue;

if (matching.size() == 1) {
// When exactly 1 mbtiles matches, it's a special case and we can
// copy directly between them.
std::vector<char> old = tlsTiles[matching[0]->index]->readTile(zoom, x, y);
//std::vector<char> old = matching[0]->mbtiles.readTile(zoom, x, y);

std::string buffer(old.data(), old.size());
// TODO: is this valid? We have a lock, but we'll access
// from different threads. This might be problematic because
// we cache the prepared statement.
merged.saveTile(zoom, x, y, &buffer, false);
continue;
}

// Multiple mbtiles want to contribute a tile at this zxy.
// They'll all have disjoint layers, so decompress each tile
// and concatenate their contents to form the new tile.

// TODO: do this
}
}
if (matching.empty())
continue;

if (matching.size() == 1) {
// When exactly 1 mbtiles matches, it's a special case and we can
// copy directly between them.
//std::vector<char> old = tlsTiles[matching[0]->index]->readTile(zoom, x, y);
std::vector<char> old = matching[0]->mbtiles.readTile(zoom, x, y);

std::string buffer(old.data(), old.size());
// TODO: is this valid? We have a lock, but we'll access
// from different threads. This might be problematic because
// we cache the prepared statement.
merged.saveTile(zoom, x, y, &buffer, false);
continue;
}
});

// Multiple mbtiles want to contribute a tile at this zxy.
// They'll all have disjoint layers, so decompress each tile
// and concatenate their contents to form the new tile.

// TODO: do this
}
}

pool.join();
}

// TODO: Populate the `metadata` table
// See https://github.com/mapbox/mbtiles-spec/blob/master/1.3/spec.md#content

std::cerr << "closeForWriting() " << std::endl;
merged.closeForWriting();

}
Expand Down

0 comments on commit 9dd569b

Please sign in to comment.