Skip to content

Commit

Permalink
Retry on failed read (#429)
Browse files Browse the repository at this point in the history
  • Loading branch information
karasikov authored Aug 30, 2022
1 parent 262d82c commit 82a6821
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 12 deletions.
49 changes: 41 additions & 8 deletions metagraph/src/common/elias_fano/elias_fano.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,38 @@ size_t EliasFanoDecoder<T>::decompress_next_block() {
const size_t leftover = sizeof(T) - 1;
memcpy(lower_, lower_ + sizeof(lower_) - leftover, leftover);
const uint32_t to_read = std::min(sizeof(lower_) - leftover, num_lower_bytes_);
if (!source_.read(lower_ + leftover, to_read)) {
logger->error("Error while reading next {} lower bits from {}",
to_read * 8, source_name_);

// If reading fails, retry MAX_NUM_RETRIES times
size_t num_retries = 0;
const size_t MAX_NUM_RETRIES = 100;
const auto source_pos = source_.tellg();

while (num_retries <= MAX_NUM_RETRIES) {
if (source_.read(lower_ + leftover, to_read))
break;

// reading failed -> retry
while (++num_retries <= MAX_NUM_RETRIES) {
logger->warn("Failed reading lower bits from {}. Retry #{}...", source_name_, num_retries);
using namespace std::chrono_literals;
std::this_thread::sleep_for(1s);

source_ = std::ifstream(source_name_, std::ios::binary);
if (!source_) {
logger->error("Unable to open {}", source_name_);
continue;
}
source_.seekg(source_pos);
if (!source_) {
logger->error("Unable to seek in {}", source_name_);
continue;
}
break;
}
}
if (num_retries > MAX_NUM_RETRIES) {
logger->error("Failed reading lower bits from {} after {} retries",
source_name_, MAX_NUM_RETRIES);
std::exit(EXIT_FAILURE);
}
num_lower_bytes_ -= to_read;
Expand Down Expand Up @@ -516,9 +545,13 @@ bool EliasFanoDecoder<T>::init() {
logger->error("Error while reading from {}", source_name_);
std::exit(EXIT_FAILURE);
}

// If reading fails, retry MAX_NUM_RETRIES times until crashing
size_t num_retries = 0;
const size_t MAX_NUM_RETRIES = 100;
const auto source_pos = source_.tellg();

while (num_retries_ <= max_num_retries_) {
while (num_retries <= MAX_NUM_RETRIES) {
source_.read(reinterpret_cast<char *>(&offset_), sizeof(T));
source_.read(reinterpret_cast<char *>(&num_lower_bits_), 1);
assert(num_lower_bits_ < 8 * sizeof(T));
Expand All @@ -538,10 +571,10 @@ bool EliasFanoDecoder<T>::init() {
return true;

// reading failed -> retry
while (num_retries_++ < max_num_retries_) {
logger->warn("Failed reading from {}. Retry #{}...", source_name_, num_retries_);
while (++num_retries <= MAX_NUM_RETRIES) {
logger->warn("Failed reading from {}. Retry #{}...", source_name_, num_retries);
using namespace std::chrono_literals;
std::this_thread::sleep_for(500ms);
std::this_thread::sleep_for(1s);

source_ = std::ifstream(source_name_, std::ios::binary);
if (!source_) {
Expand All @@ -557,7 +590,7 @@ bool EliasFanoDecoder<T>::init() {
}
}

logger->error("Failed reading from {} after {} retries", source_name_, max_num_retries_);
logger->error("Failed reading from {} after {} retries", source_name_, MAX_NUM_RETRIES);
std::exit(EXIT_FAILURE);
}

Expand Down
4 changes: 0 additions & 4 deletions metagraph/src/common/elias_fano/elias_fano.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ class EliasFanoDecoder {
/** Decompressed the next block into buffer and returns the number of elements in it */
size_t decompress_next_block();

/** If reading fails, retry |max_num_retries_| times until crashing */
size_t num_retries_ = 0;
static constexpr size_t max_num_retries_ = 100;

/** Index of current element */
size_t position_ = 0;

Expand Down

0 comments on commit 82a6821

Please sign in to comment.