Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get serialized size #429

Merged
merged 5 commits into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion theta/include/theta_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class theta_build_helper{
// consistent way of initializing theta from p
// avoids multiplication if p == 1 since it might not yield MAX_THETA exactly
static uint64_t starting_theta_from_p(float p) {
if (p < 1) return static_cast<uint64_t>(theta_constants::MAX_THETA * p);
if (p < 1) return static_cast<uint64_t>(static_cast<double>(theta_constants::MAX_THETA) * p);
return theta_constants::MAX_THETA;
}

Expand Down
19 changes: 18 additions & 1 deletion theta/include/theta_sketch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,20 @@ class compact_theta_sketch_alloc: public theta_sketch_alloc<Allocator> {
virtual uint32_t get_num_retained() const;
virtual uint16_t get_seed_hash() const;

/**
* Computes maximum serialized size in bytes
* @param lg_k nominal number of entries in the sketch
*/
static size_t get_max_serialized_size_bytes(uint8_t lg_k);

/**
* Computes size in bytes required to serialize the current state of the sketch.
* Computing compressed size is expensive. It takes iterating over all retained hashes,
* and the actual serialization will have to look at them again.
* @param compressed if true compressed size is returned (if applicable)
*/
size_t get_serialized_size_bytes(bool compressed = false) const;

/**
* This method serializes the sketch into a given stream in a binary form
* @param os output stream
Expand Down Expand Up @@ -486,8 +500,11 @@ class compact_theta_sketch_alloc: public theta_sketch_alloc<Allocator> {
uint64_t theta_;
std::vector<uint64_t, Allocator> entries_;

uint8_t get_preamble_longs(bool compressed) const;
bool is_suitable_for_compression() const;
uint8_t compute_min_leading_zeros() const;
uint8_t compute_entry_bits() const;
uint8_t get_num_entries_bytes() const;
size_t get_compressed_serialized_size_bytes(uint8_t entry_bits, uint8_t num_entries_bytes) const;
void serialize_version_4(std::ostream& os) const;
vector_bytes serialize_version_4(unsigned header_size_bytes = 0) const;

Expand Down
65 changes: 44 additions & 21 deletions theta/include/theta_sketch_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
#include <vector>
#include <stdexcept>

#include "serde.hpp"
#include "binomial_bounds.hpp"
#include "theta_helpers.hpp"
#include "count_zeros.hpp"
#include "bit_packing.hpp"
#include "memory_operations.hpp"

namespace datasketches {

Expand Down Expand Up @@ -341,6 +341,39 @@ auto compact_theta_sketch_alloc<A>::end() const -> const_iterator {
template<typename A>
void compact_theta_sketch_alloc<A>::print_specifics(std::ostringstream&) const {}

template<typename A>
uint8_t compact_theta_sketch_alloc<A>::get_preamble_longs(bool compressed) const {
if (compressed) {
return this->is_estimation_mode() ? 2 : 1;
}
return this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2;
}

template<typename A>
size_t compact_theta_sketch_alloc<A>::get_max_serialized_size_bytes(uint8_t lg_k) {
return sizeof(uint64_t) * (3 + update_theta_sketch_alloc<A>::theta_table::get_capacity(lg_k + 1, lg_k));
}

template<typename A>
size_t compact_theta_sketch_alloc<A>::get_serialized_size_bytes(bool compressed) const {
if (compressed && is_suitable_for_compression()) {
return get_compressed_serialized_size_bytes(compute_entry_bits(), get_num_entries_bytes());
}
return sizeof(uint64_t) * get_preamble_longs(false) + sizeof(uint64_t) * entries_.size();
}

// store num_entries as whole bytes since whole-byte blocks will follow (most probably)
template<typename A>
uint8_t compact_theta_sketch_alloc<A>::get_num_entries_bytes() const {
return whole_bytes_to_hold_bits<uint8_t>(32 - count_leading_zeros_in_u32(static_cast<uint32_t>(entries_.size())));
}

template<typename A>
size_t compact_theta_sketch_alloc<A>::get_compressed_serialized_size_bytes(uint8_t entry_bits, uint8_t num_entries_bytes) const {
const size_t compressed_bits = entry_bits * entries_.size();
return sizeof(uint64_t) * get_preamble_longs(true) + num_entries_bytes + whole_bytes_to_hold_bits(compressed_bits);
}

template<typename A>
void compact_theta_sketch_alloc<A>::serialize(std::ostream& os) const {
const uint8_t preamble_longs = this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2;
Expand All @@ -366,12 +399,10 @@ void compact_theta_sketch_alloc<A>::serialize(std::ostream& os) const {

template<typename A>
auto compact_theta_sketch_alloc<A>::serialize(unsigned header_size_bytes) const -> vector_bytes {
const uint8_t preamble_longs = this->is_estimation_mode() ? 3 : this->is_empty() || entries_.size() == 1 ? 1 : 2;
const size_t size = header_size_bytes + sizeof(uint64_t) * preamble_longs
+ sizeof(uint64_t) * entries_.size();
const size_t size = get_serialized_size_bytes() + header_size_bytes;
vector_bytes bytes(size, 0, entries_.get_allocator());
uint8_t* ptr = bytes.data() + header_size_bytes;

const uint8_t preamble_longs = get_preamble_longs(false);
*ptr++ = preamble_longs;
*ptr++ = UNCOMPRESSED_SERIAL_VERSION;
*ptr++ = SKETCH_TYPE;
Expand Down Expand Up @@ -413,7 +444,7 @@ auto compact_theta_sketch_alloc<A>::serialize_compressed(unsigned header_size_by
}

template<typename A>
uint8_t compact_theta_sketch_alloc<A>::compute_min_leading_zeros() const {
uint8_t compact_theta_sketch_alloc<A>::compute_entry_bits() const {
// compression is based on leading zeros in deltas between ordered hash values
// assumes ordered sketch
uint64_t previous = 0;
Expand All @@ -423,16 +454,14 @@ uint8_t compact_theta_sketch_alloc<A>::compute_min_leading_zeros() const {
ored |= delta;
previous = entry;
}
return count_leading_zeros_in_u64(ored);
return 64 - count_leading_zeros_in_u64(ored);
}

template<typename A>
void compact_theta_sketch_alloc<A>::serialize_version_4(std::ostream& os) const {
const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1;
const uint8_t entry_bits = 64 - compute_min_leading_zeros();

// store num_entries as whole bytes since whole-byte blocks will follow (most probably)
const uint8_t num_entries_bytes = whole_bytes_to_hold_bits<uint8_t>(32 - count_leading_zeros_in_u32(static_cast<uint32_t>(entries_.size())));
const uint8_t entry_bits = compute_entry_bits();
const uint8_t num_entries_bytes = get_num_entries_bytes();

write(os, preamble_longs);
write(os, COMPRESSED_SERIAL_VERSION);
Expand Down Expand Up @@ -483,19 +512,13 @@ void compact_theta_sketch_alloc<A>::serialize_version_4(std::ostream& os) const

template<typename A>
auto compact_theta_sketch_alloc<A>::serialize_version_4(unsigned header_size_bytes) const -> vector_bytes {
const uint8_t preamble_longs = this->is_estimation_mode() ? 2 : 1;
const uint8_t entry_bits = 64 - compute_min_leading_zeros();
const size_t compressed_bits = entry_bits * entries_.size();

// store num_entries as whole bytes since whole-byte blocks will follow (most probably)
const uint8_t num_entries_bytes = whole_bytes_to_hold_bits<uint8_t>(32 - count_leading_zeros_in_u32(static_cast<uint32_t>(entries_.size())));

const size_t size = header_size_bytes + sizeof(uint64_t) * preamble_longs + num_entries_bytes
+ whole_bytes_to_hold_bits(compressed_bits);
const uint8_t entry_bits = compute_entry_bits();
const uint8_t num_entries_bytes = get_num_entries_bytes();
const size_t size = get_compressed_serialized_size_bytes(entry_bits, num_entries_bytes) + header_size_bytes;
vector_bytes bytes(size, 0, entries_.get_allocator());
uint8_t* ptr = bytes.data() + header_size_bytes;

*ptr++ = preamble_longs;
*ptr++ = get_preamble_longs(true);
*ptr++ = COMPRESSED_SERIAL_VERSION;
*ptr++ = SKETCH_TYPE;
*ptr++ = entry_bits;
Expand Down
34 changes: 32 additions & 2 deletions theta/test/theta_sketch_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,11 @@ TEST_CASE("theta sketch: serialize deserialize stream and bytes equivalence", "[
for (int i = 0; i < n; i++) update_sketch.update(i);

std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
update_sketch.compact().serialize(s);
auto bytes = update_sketch.compact().serialize();
auto compact_sketch = update_sketch.compact();
compact_sketch.serialize(s);
auto bytes = compact_sketch.serialize();
REQUIRE(bytes.size() == static_cast<size_t>(s.tellp()));
REQUIRE(bytes.size() == compact_sketch.get_serialized_size_bytes());
for (size_t i = 0; i < bytes.size(); ++i) {
REQUIRE(((char*)bytes.data())[i] == (char)s.get());
}
Expand Down Expand Up @@ -521,6 +523,7 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") {
auto compact_sketch = update_sketch.compact();

auto bytes = compact_sketch.serialize_compressed();
REQUIRE(bytes.size() == compact_sketch.get_serialized_size_bytes(true));
{ // deserialize bytes
auto deserialized_sketch = compact_theta_sketch::deserialize(bytes.data(), bytes.size());
REQUIRE(deserialized_sketch.get_num_retained() == compact_sketch.get_num_retained());
Expand All @@ -544,6 +547,7 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") {

std::stringstream s(std::ios::in | std::ios::out | std::ios::binary);
compact_sketch.serialize_compressed(s);
REQUIRE(static_cast<size_t>(s.tellp()) == compact_sketch.get_serialized_size_bytes(true));
auto deserialized_sketch = compact_theta_sketch::deserialize(s);
REQUIRE(deserialized_sketch.get_num_retained() == compact_sketch.get_num_retained());
REQUIRE(deserialized_sketch.get_theta() == compact_sketch.get_theta());
Expand All @@ -554,4 +558,30 @@ TEST_CASE("theta sketch: serialize deserialize compressed", "[theta_sketch]") {
}
}

// The sketch reaches capacity for the first time at 2 * K * 15/16,
// but at that point it is still in exact mode, so the serialized size is not the maximum
// (theta in not serialized in the exact mode).
// So we need to catch the second time, but some updates will be ignored in the estimation mode,
// so we update more than enough times keeping track of the maximum.
// Potentially the exact number of updates to reach the peak can be figured out given this particular sequence,
// but not assuming that might be even better (say, in case we change the load factor or hash function
// or just out of principle not to rely on implementation details too much).
TEST_CASE("max serialized size", "[theta_sketch]") {
const uint8_t lg_k = 10;
auto sketch = update_theta_sketch::builder().set_lg_k(lg_k).build();
int value = 0;

// this will go over the first peak, which is not the highest
for (int i = 0; i < (1 << lg_k) * 2; ++i) sketch.update(value++);

// this will to over the second peak keeping track of the max size
size_t max_size_bytes = 0;
for (int i = 0; i < (1 << lg_k) * 2; ++i) {
sketch.update(value++);
auto bytes = sketch.compact().serialize();
max_size_bytes = std::max(max_size_bytes, bytes.size());
}
REQUIRE(max_size_bytes == compact_theta_sketch::get_max_serialized_size_bytes(lg_k));
}

} /* namespace datasketches */
Loading