diff --git a/theta/include/theta_helpers.hpp b/theta/include/theta_helpers.hpp index cbdebb40..8d83d341 100644 --- a/theta/include/theta_helpers.hpp +++ b/theta/include/theta_helpers.hpp @@ -57,7 +57,7 @@ class theta_build_helper{ // consistent way of initializing theta from p // avoids multiplication if p == 1 since it might not yield MAX_THETA exactly static uint64_t starting_theta_from_p(float p) { - if (p < 1) return static_cast(theta_constants::MAX_THETA * p); + if (p < 1) return static_cast(static_cast(theta_constants::MAX_THETA) * p); return theta_constants::MAX_THETA; } diff --git a/theta/include/theta_sketch.hpp b/theta/include/theta_sketch.hpp index 5fc15f63..ad6421e7 100644 --- a/theta/include/theta_sketch.hpp +++ b/theta/include/theta_sketch.hpp @@ -417,6 +417,20 @@ class compact_theta_sketch_alloc: public theta_sketch_alloc { virtual uint32_t get_num_retained() const; virtual uint16_t get_seed_hash() const; + /** + * Computes maximum serialized size in bytes + * @param lg_k nominal number of entries in the sketch + */ + static size_t get_max_serialized_size_bytes(uint8_t lg_k); + + /** + * Computes size in bytes required to serialize the current state of the sketch. + * Computing compressed size is expensive. It takes iterating over all retained hashes, + * and the actual serialization will have to look at them again. + * @param compressed if true compressed size is returned (if applicable) + */ + size_t get_serialized_size_bytes(bool compressed = false) const; + /** * This method serializes the sketch into a given stream in a binary form * @param os output stream @@ -486,8 +500,11 @@ class compact_theta_sketch_alloc: public theta_sketch_alloc { uint64_t theta_; std::vector entries_; + uint8_t get_preamble_longs(bool compressed) const; bool is_suitable_for_compression() const; - uint8_t compute_min_leading_zeros() const; + uint8_t compute_entry_bits() const; + uint8_t get_num_entries_bytes() const; + size_t get_compressed_serialized_size_bytes(uint8_t entry_bits, uint8_t num_entries_bytes) const; void serialize_version_4(std::ostream& os) const; vector_bytes serialize_version_4(unsigned header_size_bytes = 0) const; diff --git a/theta/include/theta_sketch_impl.hpp b/theta/include/theta_sketch_impl.hpp index e5e5050f..c31d0bad 100644 --- a/theta/include/theta_sketch_impl.hpp +++ b/theta/include/theta_sketch_impl.hpp @@ -24,11 +24,11 @@ #include #include -#include "serde.hpp" #include "binomial_bounds.hpp" #include "theta_helpers.hpp" #include "count_zeros.hpp" #include "bit_packing.hpp" +#include "memory_operations.hpp" namespace datasketches { @@ -341,6 +341,39 @@ auto compact_theta_sketch_alloc::end() const -> const_iterator { template void compact_theta_sketch_alloc::print_specifics(std::ostringstream&) const {} +template +uint8_t compact_theta_sketch_alloc::get_preamble_longs(bool compressed) const { + if (compressed) { + return this->is_estimation_mode() ? 2 : 1; + } + return this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2; +} + +template +size_t compact_theta_sketch_alloc::get_max_serialized_size_bytes(uint8_t lg_k) { + return sizeof(uint64_t) * (3 + update_theta_sketch_alloc::theta_table::get_capacity(lg_k + 1, lg_k)); +} + +template +size_t compact_theta_sketch_alloc::get_serialized_size_bytes(bool compressed) const { + if (compressed && is_suitable_for_compression()) { + return get_compressed_serialized_size_bytes(compute_entry_bits(), get_num_entries_bytes()); + } + return sizeof(uint64_t) * get_preamble_longs(false) + sizeof(uint64_t) * entries_.size(); +} + +// store num_entries as whole bytes since whole-byte blocks will follow (most probably) +template +uint8_t compact_theta_sketch_alloc::get_num_entries_bytes() const { + return whole_bytes_to_hold_bits(32 - count_leading_zeros_in_u32(static_cast(entries_.size()))); +} + +template +size_t compact_theta_sketch_alloc::get_compressed_serialized_size_bytes(uint8_t entry_bits, uint8_t num_entries_bytes) const { + const size_t compressed_bits = entry_bits * entries_.size(); + return sizeof(uint64_t) * get_preamble_longs(true) + num_entries_bytes + whole_bytes_to_hold_bits(compressed_bits); +} + template void compact_theta_sketch_alloc::serialize(std::ostream& os) const { const uint8_t preamble_longs = this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2; @@ -366,12 +399,10 @@ void compact_theta_sketch_alloc::serialize(std::ostream& os) const { template auto compact_theta_sketch_alloc::serialize(unsigned header_size_bytes) const -> vector_bytes { - const uint8_t preamble_longs = this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2; - const size_t size = header_size_bytes + sizeof(uint64_t) * preamble_longs - + sizeof(uint64_t) * entries_.size(); + const size_t size = get_serialized_size_bytes() + header_size_bytes; vector_bytes bytes(size, 0, entries_.get_allocator()); uint8_t* ptr = bytes.data() + header_size_bytes; - + const uint8_t preamble_longs = get_preamble_longs(false); *ptr++ = preamble_longs; *ptr++ = UNCOMPRESSED_SERIAL_VERSION; *ptr++ = SKETCH_TYPE; @@ -413,7 +444,7 @@ auto compact_theta_sketch_alloc::serialize_compressed(unsigned header_size_by } template -uint8_t compact_theta_sketch_alloc::compute_min_leading_zeros() const { +uint8_t compact_theta_sketch_alloc::compute_entry_bits() const { // compression is based on leading zeros in deltas between ordered hash values // assumes ordered sketch uint64_t previous = 0; @@ -423,16 +454,14 @@ uint8_t compact_theta_sketch_alloc::compute_min_leading_zeros() const { ored |= delta; previous = entry; } - return count_leading_zeros_in_u64(ored); + return 64 - count_leading_zeros_in_u64(ored); } template void compact_theta_sketch_alloc::serialize_version_4(std::ostream& os) const { const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1; - const uint8_t entry_bits = 64 - compute_min_leading_zeros(); - - // store num_entries as whole bytes since whole-byte blocks will follow (most probably) - const uint8_t num_entries_bytes = whole_bytes_to_hold_bits(32 - count_leading_zeros_in_u32(static_cast(entries_.size()))); + const uint8_t entry_bits = compute_entry_bits(); + const uint8_t num_entries_bytes = get_num_entries_bytes(); write(os, preamble_longs); write(os, COMPRESSED_SERIAL_VERSION); @@ -483,19 +512,13 @@ void compact_theta_sketch_alloc::serialize_version_4(std::ostream& os) const template auto compact_theta_sketch_alloc::serialize_version_4(unsigned header_size_bytes) const -> vector_bytes { - const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1; - const uint8_t entry_bits = 64 - compute_min_leading_zeros(); - const size_t compressed_bits = entry_bits * entries_.size(); - - // store num_entries as whole bytes since whole-byte blocks will follow (most probably) - const uint8_t num_entries_bytes = whole_bytes_to_hold_bits(32 - count_leading_zeros_in_u32(static_cast(entries_.size()))); - - const size_t size = header_size_bytes + sizeof(uint64_t) * preamble_longs + num_entries_bytes - + whole_bytes_to_hold_bits(compressed_bits); + const uint8_t entry_bits = compute_entry_bits(); + const uint8_t num_entries_bytes = get_num_entries_bytes(); + const size_t size = get_compressed_serialized_size_bytes(entry_bits, num_entries_bytes) + header_size_bytes; vector_bytes bytes(size, 0, entries_.get_allocator()); uint8_t* ptr = bytes.data() + header_size_bytes; - *ptr++ = preamble_longs; + *ptr++ = get_preamble_longs(true); *ptr++ = COMPRESSED_SERIAL_VERSION; *ptr++ = SKETCH_TYPE; *ptr++ = entry_bits; diff --git a/theta/test/theta_sketch_test.cpp b/theta/test/theta_sketch_test.cpp index 488f67cf..0721898c 100644 --- a/theta/test/theta_sketch_test.cpp +++ b/theta/test/theta_sketch_test.cpp @@ -273,9 +273,11 @@ TEST_CASE("theta sketch: serialize deserialize stream and bytes equivalence", "[ for (int i = 0; i < n; i++) update_sketch.update(i); std::stringstream s(std::ios::in | std::ios::out | std::ios::binary); - update_sketch.compact().serialize(s); - auto bytes = update_sketch.compact().serialize(); + auto compact_sketch = update_sketch.compact(); + compact_sketch.serialize(s); + auto bytes = compact_sketch.serialize(); REQUIRE(bytes.size() == static_cast(s.tellp())); + REQUIRE(bytes.size() == compact_sketch.get_serialized_size_bytes()); for (size_t i = 0; i < bytes.size(); ++i) { REQUIRE(((char*)bytes.data())[i] == (char)s.get()); } @@ -521,6 +523,7 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") { auto compact_sketch = update_sketch.compact(); auto bytes = compact_sketch.serialize_compressed(); + REQUIRE(bytes.size() == compact_sketch.get_serialized_size_bytes(true)); { // deserialize bytes auto deserialized_sketch = compact_theta_sketch::deserialize(bytes.data(), bytes.size()); REQUIRE(deserialized_sketch.get_num_retained() == compact_sketch.get_num_retained()); @@ -544,6 +547,7 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") { std::stringstream s(std::ios::in | std::ios::out | std::ios::binary); compact_sketch.serialize_compressed(s); + REQUIRE(static_cast(s.tellp()) == compact_sketch.get_serialized_size_bytes(true)); auto deserialized_sketch = compact_theta_sketch::deserialize(s); REQUIRE(deserialized_sketch.get_num_retained() == compact_sketch.get_num_retained()); REQUIRE(deserialized_sketch.get_theta() == compact_sketch.get_theta()); @@ -554,4 +558,30 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") { } } +// The sketch reaches capacity for the first time at 2 * K * 15/16, +// but at that point it is still in exact mode, so the serialized size is not the maximum +// (theta in not serialized in the exact mode). +// So we need to catch the second time, but some updates will be ignored in the estimation mode, +// so we update more than enough times keeping track of the maximum. +// Potentially the exact number of updates to reach the peak can be figured out given this particular sequence, +// but not assuming that might be even better (say, in case we change the load factor or hash function +// or just out of principle not to rely on implementation details too much). +TEST_CASE("max serialized size", "[theta_sketch]") { + const uint8_t lg_k = 10; + auto sketch = update_theta_sketch::builder().set_lg_k(lg_k).build(); + int value = 0; + + // this will go over the first peak, which is not the highest + for (int i = 0; i < (1 << lg_k) * 2; ++i) sketch.update(value++); + + // this will to over the second peak keeping track of the max size + size_t max_size_bytes = 0; + for (int i = 0; i < (1 << lg_k) * 2; ++i) { + sketch.update(value++); + auto bytes = sketch.compact().serialize(); + max_size_bytes = std::max(max_size_bytes, bytes.size()); + } + REQUIRE(max_size_bytes == compact_theta_sketch::get_max_serialized_size_bytes(lg_k)); +} + } /* namespace datasketches */