From 90a7bba5525892800f6c61e617b7c7d22dde9e06 Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Wed, 8 May 2024 16:13:47 -0700 Subject: [PATCH 1/5] implemented get_serialized_size_bytes() --- theta/include/theta_sketch.hpp | 13 ++++++- theta/include/theta_sketch_impl.hpp | 60 +++++++++++++++++++---------- 2 files changed, 51 insertions(+), 22 deletions(-) diff --git a/theta/include/theta_sketch.hpp b/theta/include/theta_sketch.hpp index 5fc15f63..081d9711 100644 --- a/theta/include/theta_sketch.hpp +++ b/theta/include/theta_sketch.hpp @@ -417,6 +417,14 @@ class compact_theta_sketch_alloc: public theta_sketch_alloc { virtual uint32_t get_num_retained() const; virtual uint16_t get_seed_hash() const; + /** + * Computes size in bytes required to serialize the current state of the sketch. + * Computing compressed size is expensive. It takes iterating over all retained hashes, + * and the actual serialization will have to look at them again. + * @param compressed if true compressed size is returned (if applicable) + */ + size_t get_serialized_size_bytes(bool compressed = false) const; + /** * This method serializes the sketch into a given stream in a binary form * @param os output stream @@ -486,8 +494,11 @@ class compact_theta_sketch_alloc: public theta_sketch_alloc { uint64_t theta_; std::vector entries_; + uint8_t get_preamble_longs(bool compressed) const; bool is_suitable_for_compression() const; - uint8_t compute_min_leading_zeros() const; + uint8_t compute_entry_bits() const; + uint8_t get_num_entries_bytes() const; + size_t get_compressed_serialized_size_bytes(uint8_t entry_bits, uint8_t num_entries_bytes) const; void serialize_version_4(std::ostream& os) const; vector_bytes serialize_version_4(unsigned header_size_bytes = 0) const; diff --git a/theta/include/theta_sketch_impl.hpp b/theta/include/theta_sketch_impl.hpp index e5e5050f..2b4318c1 100644 --- a/theta/include/theta_sketch_impl.hpp +++ b/theta/include/theta_sketch_impl.hpp @@ -24,11 +24,11 @@ #include #include -#include "serde.hpp" #include "binomial_bounds.hpp" #include "theta_helpers.hpp" #include "count_zeros.hpp" #include "bit_packing.hpp" +#include "memory_operations.hpp" namespace datasketches { @@ -341,6 +341,34 @@ auto compact_theta_sketch_alloc::end() const -> const_iterator { template void compact_theta_sketch_alloc::print_specifics(std::ostringstream&) const {} +template +uint8_t compact_theta_sketch_alloc::get_preamble_longs(bool compressed) const { + if (compressed) { + return this->is_estimation_mode() ? 2 : 1; + } + return this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2; +} + +template +size_t compact_theta_sketch_alloc::get_serialized_size_bytes(bool compressed) const { + if (compressed && is_suitable_for_compression()) { + return get_compressed_serialized_size_bytes(compute_entry_bits(), get_num_entries_bytes()); + } + return sizeof(uint64_t) * get_preamble_longs(false) + sizeof(uint64_t) * entries_.size(); +} + +// store num_entries as whole bytes since whole-byte blocks will follow (most probably) +template +uint8_t compact_theta_sketch_alloc::get_num_entries_bytes() const { + return whole_bytes_to_hold_bits(32 - count_leading_zeros_in_u32(static_cast(entries_.size()))); +} + +template +size_t compact_theta_sketch_alloc::get_compressed_serialized_size_bytes(uint8_t entry_bits, uint8_t num_entries_bytes) const { + const size_t compressed_bits = entry_bits * entries_.size(); + return sizeof(uint64_t) * get_preamble_longs(true) + num_entries_bytes + whole_bytes_to_hold_bits(compressed_bits); +} + template void compact_theta_sketch_alloc::serialize(std::ostream& os) const { const uint8_t preamble_longs = this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2; @@ -366,12 +394,10 @@ void compact_theta_sketch_alloc::serialize(std::ostream& os) const { template auto compact_theta_sketch_alloc::serialize(unsigned header_size_bytes) const -> vector_bytes { - const uint8_t preamble_longs = this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2; - const size_t size = header_size_bytes + sizeof(uint64_t) * preamble_longs - + sizeof(uint64_t) * entries_.size(); + const size_t size = get_serialized_size_bytes() + header_size_bytes; vector_bytes bytes(size, 0, entries_.get_allocator()); uint8_t* ptr = bytes.data() + header_size_bytes; - + const uint8_t preamble_longs = get_preamble_longs(false); *ptr++ = preamble_longs; *ptr++ = UNCOMPRESSED_SERIAL_VERSION; *ptr++ = SKETCH_TYPE; @@ -413,7 +439,7 @@ auto compact_theta_sketch_alloc::serialize_compressed(unsigned header_size_by } template -uint8_t compact_theta_sketch_alloc::compute_min_leading_zeros() const { +uint8_t compact_theta_sketch_alloc::compute_entry_bits() const { // compression is based on leading zeros in deltas between ordered hash values // assumes ordered sketch uint64_t previous = 0; @@ -423,16 +449,14 @@ uint8_t compact_theta_sketch_alloc::compute_min_leading_zeros() const { ored |= delta; previous = entry; } - return count_leading_zeros_in_u64(ored); + return 64 - count_leading_zeros_in_u64(ored); } template void compact_theta_sketch_alloc::serialize_version_4(std::ostream& os) const { const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1; - const uint8_t entry_bits = 64 - compute_min_leading_zeros(); - - // store num_entries as whole bytes since whole-byte blocks will follow (most probably) - const uint8_t num_entries_bytes = whole_bytes_to_hold_bits(32 - count_leading_zeros_in_u32(static_cast(entries_.size()))); + const uint8_t entry_bits = compute_entry_bits(); + const uint8_t num_entries_bytes = get_num_entries_bytes(); write(os, preamble_longs); write(os, COMPRESSED_SERIAL_VERSION); @@ -483,19 +507,13 @@ void compact_theta_sketch_alloc::serialize_version_4(std::ostream& os) const template auto compact_theta_sketch_alloc::serialize_version_4(unsigned header_size_bytes) const -> vector_bytes { - const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1; - const uint8_t entry_bits = 64 - compute_min_leading_zeros(); - const size_t compressed_bits = entry_bits * entries_.size(); - - // store num_entries as whole bytes since whole-byte blocks will follow (most probably) - const uint8_t num_entries_bytes = whole_bytes_to_hold_bits(32 - count_leading_zeros_in_u32(static_cast(entries_.size()))); - - const size_t size = header_size_bytes + sizeof(uint64_t) * preamble_longs + num_entries_bytes - + whole_bytes_to_hold_bits(compressed_bits); + const uint8_t entry_bits = compute_entry_bits(); + const uint8_t num_entries_bytes = get_num_entries_bytes(); + const size_t size = get_compressed_serialized_size_bytes(entry_bits, num_entries_bytes) + header_size_bytes; vector_bytes bytes(size, 0, entries_.get_allocator()); uint8_t* ptr = bytes.data() + header_size_bytes; - *ptr++ = preamble_longs; + *ptr++ = get_preamble_longs(true); *ptr++ = COMPRESSED_SERIAL_VERSION; *ptr++ = SKETCH_TYPE; *ptr++ = entry_bits; From b4ee4b49593712dfb42213da067b03a2f9092872 Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Wed, 8 May 2024 16:14:14 -0700 Subject: [PATCH 2/5] fixed warning --- theta/include/theta_helpers.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/theta/include/theta_helpers.hpp b/theta/include/theta_helpers.hpp index cbdebb40..8d83d341 100644 --- a/theta/include/theta_helpers.hpp +++ b/theta/include/theta_helpers.hpp @@ -57,7 +57,7 @@ class theta_build_helper{ // consistent way of initializing theta from p // avoids multiplication if p == 1 since it might not yield MAX_THETA exactly static uint64_t starting_theta_from_p(float p) { - if (p < 1) return static_cast(theta_constants::MAX_THETA * p); + if (p < 1) return static_cast(static_cast(theta_constants::MAX_THETA) * p); return theta_constants::MAX_THETA; } From a986e8ecc6bf54ce8d5a1eb3c2ad9b461ec45b04 Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Wed, 8 May 2024 18:03:14 -0700 Subject: [PATCH 3/5] implemented get_max_serialized_size_bytes() + tests --- theta/include/theta_sketch.hpp | 6 ++++++ theta/include/theta_sketch_impl.hpp | 5 +++++ theta/test/theta_sketch_test.cpp | 22 ++++++++++++++++++++-- 3 files changed, 31 insertions(+), 2 deletions(-) diff --git a/theta/include/theta_sketch.hpp b/theta/include/theta_sketch.hpp index 081d9711..ad6421e7 100644 --- a/theta/include/theta_sketch.hpp +++ b/theta/include/theta_sketch.hpp @@ -417,6 +417,12 @@ class compact_theta_sketch_alloc: public theta_sketch_alloc { virtual uint32_t get_num_retained() const; virtual uint16_t get_seed_hash() const; + /** + * Computes maximum serialized size in bytes + * @param lg_k nominal number of entries in the sketch + */ + static size_t get_max_serialized_size_bytes(uint8_t lg_k); + /** * Computes size in bytes required to serialize the current state of the sketch. * Computing compressed size is expensive. It takes iterating over all retained hashes, diff --git a/theta/include/theta_sketch_impl.hpp b/theta/include/theta_sketch_impl.hpp index 2b4318c1..c31d0bad 100644 --- a/theta/include/theta_sketch_impl.hpp +++ b/theta/include/theta_sketch_impl.hpp @@ -349,6 +349,11 @@ uint8_t compact_theta_sketch_alloc::get_preamble_longs(bool compressed) const return this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2; } +template +size_t compact_theta_sketch_alloc::get_max_serialized_size_bytes(uint8_t lg_k) { + return sizeof(uint64_t) * (3 + update_theta_sketch_alloc::theta_table::get_capacity(lg_k + 1, lg_k)); +} + template size_t compact_theta_sketch_alloc::get_serialized_size_bytes(bool compressed) const { if (compressed && is_suitable_for_compression()) { diff --git a/theta/test/theta_sketch_test.cpp b/theta/test/theta_sketch_test.cpp index 488f67cf..93950137 100644 --- a/theta/test/theta_sketch_test.cpp +++ b/theta/test/theta_sketch_test.cpp @@ -273,9 +273,11 @@ TEST_CASE("theta sketch: serialize deserialize stream and bytes equivalence", "[ for (int i = 0; i < n; i++) update_sketch.update(i); std::stringstream s(std::ios::in | std::ios::out | std::ios::binary); - update_sketch.compact().serialize(s); - auto bytes = update_sketch.compact().serialize(); + auto compact_sketch = update_sketch.compact(); + compact_sketch.serialize(s); + auto bytes = compact_sketch.serialize(); REQUIRE(bytes.size() == static_cast(s.tellp())); + REQUIRE(bytes.size() == compact_sketch.get_serialized_size_bytes()); for (size_t i = 0; i < bytes.size(); ++i) { REQUIRE(((char*)bytes.data())[i] == (char)s.get()); } @@ -521,6 +523,7 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") { auto compact_sketch = update_sketch.compact(); auto bytes = compact_sketch.serialize_compressed(); + REQUIRE(bytes.size() == compact_sketch.get_serialized_size_bytes(true)); { // deserialize bytes auto deserialized_sketch = compact_theta_sketch::deserialize(bytes.data(), bytes.size()); REQUIRE(deserialized_sketch.get_num_retained() == compact_sketch.get_num_retained()); @@ -544,6 +547,7 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") { std::stringstream s(std::ios::in | std::ios::out | std::ios::binary); compact_sketch.serialize_compressed(s); + REQUIRE(static_cast(s.tellp()) == compact_sketch.get_serialized_size_bytes(true)); auto deserialized_sketch = compact_theta_sketch::deserialize(s); REQUIRE(deserialized_sketch.get_num_retained() == compact_sketch.get_num_retained()); REQUIRE(deserialized_sketch.get_theta() == compact_sketch.get_theta()); @@ -554,4 +558,18 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") { } } +TEST_CASE("max serialized size", "[theta_sketch]") { + const uint8_t lg_k = 10; + auto sketch = update_theta_sketch::builder().set_lg_k(lg_k).build(); + int value = 0; + for (int i = 0; i < (1 << lg_k) * 2; ++i) sketch.update(value++); + size_t max_size_bytes; + for (int i = 0; i < (1 << lg_k) * 2; ++i) { + sketch.update(value++); + auto bytes = sketch.compact().serialize(); + max_size_bytes = std::max(max_size_bytes, bytes.size()); + } + REQUIRE(max_size_bytes == compact_theta_sketch::get_max_serialized_size_bytes(lg_k)); +} + } /* namespace datasketches */ From c6b4edcf94c767b081a30e65b3ce217c1041d822 Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Wed, 8 May 2024 18:41:38 -0700 Subject: [PATCH 4/5] init value --- theta/test/theta_sketch_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/theta/test/theta_sketch_test.cpp b/theta/test/theta_sketch_test.cpp index 93950137..fc8db610 100644 --- a/theta/test/theta_sketch_test.cpp +++ b/theta/test/theta_sketch_test.cpp @@ -563,7 +563,7 @@ TEST_CASE("max serialized size", "[theta_sketch]") { auto sketch = update_theta_sketch::builder().set_lg_k(lg_k).build(); int value = 0; for (int i = 0; i < (1 << lg_k) * 2; ++i) sketch.update(value++); - size_t max_size_bytes; + size_t max_size_bytes = 0; for (int i = 0; i < (1 << lg_k) * 2; ++i) { sketch.update(value++); auto bytes = sketch.compact().serialize(); From 994e8829f111f94ac7a4b9dcb9749d82092d4396 Mon Sep 17 00:00:00 2001 From: AlexanderSaydakov Date: Fri, 10 May 2024 12:29:47 -0700 Subject: [PATCH 5/5] added explanation for a bit tricky test --- theta/test/theta_sketch_test.cpp | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/theta/test/theta_sketch_test.cpp b/theta/test/theta_sketch_test.cpp index fc8db610..0721898c 100644 --- a/theta/test/theta_sketch_test.cpp +++ b/theta/test/theta_sketch_test.cpp @@ -558,11 +558,23 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") { } } +// The sketch reaches capacity for the first time at 2 * K * 15/16, +// but at that point it is still in exact mode, so the serialized size is not the maximum +// (theta in not serialized in the exact mode). +// So we need to catch the second time, but some updates will be ignored in the estimation mode, +// so we update more than enough times keeping track of the maximum. +// Potentially the exact number of updates to reach the peak can be figured out given this particular sequence, +// but not assuming that might be even better (say, in case we change the load factor or hash function +// or just out of principle not to rely on implementation details too much). TEST_CASE("max serialized size", "[theta_sketch]") { const uint8_t lg_k = 10; auto sketch = update_theta_sketch::builder().set_lg_k(lg_k).build(); int value = 0; + + // this will go over the first peak, which is not the highest for (int i = 0; i < (1 << lg_k) * 2; ++i) sketch.update(value++); + + // this will to over the second peak keeping track of the max size size_t max_size_bytes = 0; for (int i = 0; i < (1 << lg_k) * 2; ++i) { sketch.update(value++);