diff --git a/src/main/java/org/apache/datasketches/common/Family.java b/src/main/java/org/apache/datasketches/common/Family.java index cac7722e8..cac049e9e 100644 --- a/src/main/java/org/apache/datasketches/common/Family.java +++ b/src/main/java/org/apache/datasketches/common/Family.java @@ -146,7 +146,12 @@ public enum Family { /** * CountMin Sketch */ - COUNTMIN(18, "COUNTMIN", 2, 2); + COUNTMIN(18, "COUNTMIN", 2, 2), + + /** + * Exact and Bounded, Probability Proportional to Size (EBPPS) + */ + EBPPS(19, "EBPPS", 1, 5); private static final Map lookupID = new HashMap<>(); private static final Map lookupFamName = new HashMap<>(); diff --git a/src/main/java/org/apache/datasketches/sampling/EbppsItemsSample.java b/src/main/java/org/apache/datasketches/sampling/EbppsItemsSample.java new file mode 100644 index 000000000..d85c99eac --- /dev/null +++ b/src/main/java/org/apache/datasketches/sampling/EbppsItemsSample.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datasketches.sampling; + +import static org.apache.datasketches.common.Util.LS; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.datasketches.common.SketchesArgumentException; + +// this is a supporting class used to hold the raw data sample +class EbppsItemsSample { + + private double c_; // Current sample size, including fractional part + private T partialItem_; // a sample item corresponding to a partial weight + private ArrayList data_; // full sample items + + private Random rand_; // ThreadLocalRandom.current() in general + + // basic constructor + EbppsItemsSample(final int reservedSize) { + c_ = 0.0; + data_ = new ArrayList<>(reservedSize); + rand_ = ThreadLocalRandom.current(); + } + + // copy constructor used during merge + EbppsItemsSample(final EbppsItemsSample other) { + c_ = other.c_; + partialItem_ = other.partialItem_; + data_ = new ArrayList<>(other.data_); + rand_ = other.rand_; + } + + // constructor used for deserialization and testing + // does NOT copy the incoming ArrayList since this is an internal + // class's package-private constructor, not something directly + // taking user data + EbppsItemsSample(ArrayList data, T partialItem, final double c) { + if (c < 0.0 || Double.isNaN(c) || Double.isInfinite(c)) + throw new SketchesArgumentException("C must be nonnegative and finite. Found: " + c); + + c_ = c; + partialItem_ = partialItem; + data_ = data; + rand_ = ThreadLocalRandom.current(); + } + + // Used in lieu of a constructor to populate a temporary sample + // with data before immediately merging it. This approach + // avoids excessive object allocation calls. + // rand_ is not set since it is not expected to be used from + // this object + void replaceContent(final T item, final double theta) { + if (theta < 0.0 || theta > 1.0 || Double.isNaN(theta)) + throw new SketchesArgumentException("Theta must be in the range [0.0, 1.0]. Found: " + theta); + + c_ = theta; + if (theta == 1.0) { + if (data_ != null && data_.size() == 1) { + data_.set(0, item); + } else { + data_ = new ArrayList(1); + data_.add(item); + } + partialItem_ = null; + } else { + data_ = null; + partialItem_ = item; + } + } + + void reset() { + c_ = 0.0; + partialItem_ = null; + data_.clear(); + } + + ArrayList getSample() { + final double cFrac = c_ % 1; + final boolean includePartial = partialItem_ != null && rand_.nextDouble() < cFrac; + final int resultSize = (data_ != null ? data_.size() : 0) + (includePartial ? 1 : 0); + + if (resultSize == 0) + return null; + + ArrayList result = new ArrayList<>(resultSize); + if (data_ != null) + result.addAll(data_); + + if (includePartial) + result.add(partialItem_); + + return result; + } + + @SuppressWarnings("unchecked") + T[] getAllSamples(final Class clazz) { + // Is it faster to use sublist and append 1? + final T[] itemsArray = (T[]) Array.newInstance(clazz, getNumRetainedItems()); + int i = 0; + if (data_ != null) { + for (T item : data_) { + if (item != null) { + itemsArray[i++] = item; + } + } + } + if (partialItem_ != null) + itemsArray[i] = partialItem_; // no need to increment i again + + return itemsArray; + } + + // package-private for use in merge and serialization + ArrayList getFullItems() { + return data_; + } + + // package-private for use in merge and serialization + T getPartialItem() { + return partialItem_; + } + + double getC() { return c_; } + + boolean hasPartialItem() { return partialItem_ != null; } + + // for testing to allow setting the seed + void replaceRandom(Random r) { + rand_ = r; + } + + void downsample(final double theta) { + if (theta >= 1.0) return; + + double newC = theta * c_; + double newCInt = Math.floor(newC); + double newCFrac = newC % 1; + double cInt = Math.floor(c_); + double cFrac = c_ % 1; + + if (newCInt == 0.0) { + // no full items retained + if (rand_.nextDouble() > (cFrac / c_)) { + swapWithPartialItem(); + } + data_.clear(); + } else if (newCInt == cInt) { + // no items deleted + if (rand_.nextDouble() > (1 - theta * cFrac)/(1 - newCFrac)) { + swapWithPartialItem(); + } + } else { + if (rand_.nextDouble() < theta * cFrac) { + // subsample data in random order; last item is partial + // create sample size newC then swapWithPartialItem() + subsample((int) newCInt); + swapWithPartialItem(); + } else { + // create sample size newCInt + 1 then moveOneToPartialItem() + subsample((int) newCInt + 1); + moveOneToPartialItem(); + } + } + + if (newC == newCInt) + partialItem_ = null; + + c_ = newC; + } + + void merge(final EbppsItemsSample other) { + //double cInt = Math.floor(c_); + double cFrac = c_ % 1; + double otherCFrac = other.c_ % 1; + + // update c_ here but do NOT recompute fractional part yet + c_ += other.c_; + + if (other.data_ != null) + data_.addAll(other.data_); + + // This modifies the original algorithm slightly due to numeric + // precision issues. Specifically, the test if cFrac + otherCFrac == 1.0 + // happens before tests for < 1.0 or > 1.0 and can also be triggered + // if c_ == floor(c_) (the updated value of c_, not the input). + // + // We can still run into issues where cFrac + otherCFrac == epsilon + // and the first case would have ideally triggered. As a result, we must + // check if the partial item exists before adding to the data_ vector. + + if (cFrac == 0.0 && otherCFrac == 0.0) { + partialItem_ = null; + } else if (cFrac + otherCFrac == 1.0 || c_ == Math.floor(c_)) { + if (rand_.nextDouble() <= cFrac) { + if (partialItem_ != null) { + data_.add(partialItem_); + } + } else { + if (other.partialItem_ != null) { + data_.add(other.partialItem_); + } + } + partialItem_ = null; + } else if (cFrac + otherCFrac < 1.0) { + if (rand_.nextDouble() > cFrac / (cFrac + otherCFrac)) { + partialItem_ = other.partialItem_; + } + } else { // cFrac + otherCFrac > 1 + if (rand_.nextDouble() <= (1 - cFrac) / ((1 - cFrac) + (1 - otherCFrac))) { + data_.add(other.partialItem_); + } else { + data_.add(partialItem_); + partialItem_ = other.partialItem_; + } + } + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + + sb.append(" sample:").append(LS); + int idx = 0; + for (T item : data_) + sb.append("\t").append(idx++).append(":\t").append(item.toString()).append(LS); + sb.append(" partial: "); + if (partialItem_ != null) + sb.append(partialItem_.toString()).append(LS); + else + sb.append("NULL").append(LS); + + return sb.toString(); + } + + void subsample(final int numSamples) { + // we can perform a Fisher-Yates style shuffle, stopping after + // numSamples points since subsequent swaps would only be + // between items after num_samples. This is valid since a + // point from anywhere in the initial array would be eligible + // to end up in the final subsample. + + if (numSamples == data_.size()) return; + + int dataLen = data_.size(); + for (int i = 0; i < numSamples; ++i) { + int j = i + rand_.nextInt(dataLen - i); + // swap i and j + T tmp = data_.get(i); + data_.set(i, data_.get(j)); + data_.set(j, tmp); + } + + // clear anything beyond numSamples + data_.subList(numSamples, data_.size()).clear(); + } + + void swapWithPartialItem() { + if (partialItem_ == null) { + moveOneToPartialItem(); + } else { + int idx = rand_.nextInt(data_.size()); + T tmp = partialItem_; + partialItem_ = data_.get(idx); + data_.set(idx, tmp); + } + } + + void moveOneToPartialItem() { + int idx = rand_.nextInt(data_.size()); + // swap selected item to end so we can delete it easily + int lastIdx = data_.size() - 1; + if (idx != lastIdx) { + T tmp = data_.get(idx); + data_.set(idx, data_.get(lastIdx)); + partialItem_ = tmp; + } else { + partialItem_ = data_.get(lastIdx); + } + + data_.remove(lastIdx); + } + + int getNumRetainedItems() { + return (data_ != null ? data_.size() : 0) + + (partialItem_ != null ? 1 : 0); + } +} diff --git a/src/main/java/org/apache/datasketches/sampling/EbppsItemsSketch.java b/src/main/java/org/apache/datasketches/sampling/EbppsItemsSketch.java new file mode 100644 index 000000000..8b44aa086 --- /dev/null +++ b/src/main/java/org/apache/datasketches/sampling/EbppsItemsSketch.java @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datasketches.sampling; + +import static org.apache.datasketches.sampling.PreambleUtil.EBPPS_SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.datasketches.common.ArrayOfItemsSerDe; +import org.apache.datasketches.common.Family; +import org.apache.datasketches.common.SketchesArgumentException; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; + +/** + * An implementation of an Exact and Bounded Sampling Proportional to Size sketch. + * + * From: "Exact PPS Sampling with Bounded Sample Size", + * B. Hentschel, P. J. Haas, Y. Tian. Information Processing Letters, 2023. + * + * This sketch samples data from a stream of items proportional to the weight of each item. + * The sample guarantees the presence of an item in the result is proportional to that item's + * portion of the total weight seen by the sketch, and returns a sample no larger than size k. + * + * The sample may be smaller than k and the resulting size of the sample potentially includes + * a probabilistic component, meaning the resulting sample size is not always constant. + * + * @author Jon Malkin + */ +public class EbppsItemsSketch { + private static final int MAX_K = Integer.MAX_VALUE - 2; + private static final int EBPPS_C_DOUBLE = 40; // part of sample state, not preamble + private static final int EBPPS_ITEMS_START = 48; + + private int k_; // max size of sketch, in items + private long n_; // total number of items processed by the sketch + + private double cumulativeWt_; // total weight of items processed by the sketch + private double wtMax_; // maximum weight seen so far + private double rho_; // latest scaling parameter for downsampling + + private EbppsItemsSample sample_; // Object holding the current state of the sample + + private EbppsItemsSample tmp_; // temporary storage + + /** + * Constructor + * @param k The maximum number of samples to retain + */ + public EbppsItemsSketch(final int k) { + checkK(k); + k_ = k; + rho_ = 1.0; + sample_ = new EbppsItemsSample<>(k); + tmp_ = new EbppsItemsSample<>(1); + } + + // private copy constrcutor + private EbppsItemsSketch(EbppsItemsSketch other) { + k_ = other.k_; + n_ = other.n_; + rho_ = other.rho_; + cumulativeWt_ = other.cumulativeWt_; + wtMax_ = other.wtMax_; + rho_ = other.rho_; + sample_ = new EbppsItemsSample<>(other.sample_); + tmp_ = new EbppsItemsSample<>(1); + } + + // private constructor for heapify + private EbppsItemsSketch(final EbppsItemsSample sample, + final int k, + final long n, + final double cumWt, + final double maxWt, + final double rho) { + k_ = k; + n_ = n; + cumulativeWt_ = cumWt; + wtMax_ = maxWt; + rho_ = rho; + sample_ = sample; + tmp_ = new EbppsItemsSample<>(1); + } + + /** + * Returns a sketch instance of this class from the given srcMem, + * which must be a Memory representation of this sketch class. + * + * @param The type of item this sketch contains + * @param srcMem a Memory representation of a sketch of this class. + * See Memory + * @param serDe An instance of ArrayOfItemsSerDe + * @return a sketch instance of this class + */ + public static EbppsItemsSketch heapify(final Memory srcMem, + final ArrayOfItemsSerDe serDe) + { + final int numPreLongs = PreambleUtil.getAndCheckPreLongs(srcMem); + final int serVer = PreambleUtil.extractSerVer(srcMem); + final int familyId = PreambleUtil.extractFamilyID(srcMem); + final int flags = PreambleUtil.extractFlags(srcMem); + final boolean isEmpty = (flags & EMPTY_FLAG_MASK) != 0; + + // Check values + if (isEmpty) { + if (numPreLongs != Family.EBPPS.getMinPreLongs()) { + throw new SketchesArgumentException("Possible corruption: Must be " + Family.EBPPS.getMinPreLongs() + + " for an empty sketch. Found: " + numPreLongs); + } + } else { + if (numPreLongs != Family.EBPPS.getMaxPreLongs()) { + throw new SketchesArgumentException("Possible corruption: Must be " + + Family.EBPPS.getMaxPreLongs() + " for a non-empty sketch. Found: " + numPreLongs); + } + } + if (serVer != EBPPS_SER_VER) { + throw new SketchesArgumentException( + "Possible Corruption: Ser Ver must be " + EBPPS_SER_VER + ": " + serVer); + } + final int reqFamilyId = Family.EBPPS.getID(); + if (familyId != reqFamilyId) { + throw new SketchesArgumentException( + "Possible Corruption: FamilyID must be " + reqFamilyId + ": " + familyId); + } + + final int k = PreambleUtil.extractK(srcMem); + if (k < 1 || k > MAX_K) { + throw new SketchesArgumentException("Possible Corruption: k must be at least 1 " + + "and less than " + MAX_K + ". Found: " + k); + } + + if (isEmpty) + return new EbppsItemsSketch(k); + + final long n = PreambleUtil.extractN(srcMem); + if (n < 0) { + throw new SketchesArgumentException("Possible Corruption: n cannot be negative: " + n); + } + + final double cumWt = PreambleUtil.extractEbppsCumulativeWeight(srcMem); + if (cumWt < 0.0 || Double.isNaN(cumWt) || Double.isInfinite(cumWt)) { + throw new SketchesArgumentException("Possible Corruption: cumWt must be nonnegative and finite: " + cumWt); + } + + final double maxWt = PreambleUtil.extractEbppsMaxWeight(srcMem); + if (maxWt < 0.0 || Double.isNaN(maxWt) || Double.isInfinite(maxWt)) { + throw new SketchesArgumentException("Possible Corruption: maxWt must be nonnegative and finite: " + maxWt); + } + + final double rho = PreambleUtil.extractEbppsRho(srcMem); + if (rho < 0.0 || rho > 1.0 || Double.isNaN(rho) || Double.isInfinite(rho)) { + throw new SketchesArgumentException("Possible Corruption: rho must be in [0.0, 1.0]: " + rho); + } + + // extract C (part of sample_, not the preamble) + // due to numeric precision issues, c may occasionally be very slightly larger than k + final double c = srcMem.getDouble(EBPPS_C_DOUBLE); + if (c < 0 || c >= (k + 1) || Double.isNaN(c) || Double.isInfinite(c)) { + throw new SketchesArgumentException("Possible Corruption: c must be between 0 and k: " + c); + } + + // extract items + int numTotalItems = (int) Math.ceil(c); + int numFullItems = (int) Math.floor(c); // floor() not strictly necessary + final int offsetBytes = EBPPS_ITEMS_START; + final T[] rawItems = serDe.deserializeFromMemory( + srcMem.region(offsetBytes, srcMem.getCapacity() - offsetBytes), 0, numTotalItems); + final List itemsList = Arrays.asList(rawItems); + final ArrayList data; + final T partialItem; + if (numFullItems < numTotalItems) { + data = new ArrayList<>(itemsList.subList(0, numFullItems)); + partialItem = itemsList.get(numFullItems); // 0-based, so last item + } else { + data = new ArrayList<>(itemsList); + partialItem = null; // just to be explicit + } + + EbppsItemsSample sample = new EbppsItemsSample(data, partialItem, c); + + return new EbppsItemsSketch<>(sample, k, n, cumWt, maxWt, rho); + } + + /** + * Updates this sketch with the given data item with weight 1.0. + * @param item an item from a stream of items + */ + public void update(final T item) { + update(item, 1.0); + } + + /** + * Updates this sketch with the given data item with the given weight. + * @param item an item from a stream of items + * @param weight the weight of the item + */ + public void update(final T item, final double weight) { + if (weight < 0.0 || Double.isNaN(weight) || Double.isInfinite(weight)) + throw new SketchesArgumentException("Item weights must be nonnegative and finite. " + + "Found: " + weight); + if (weight == 0.0) + return; + + final double newCumWt = cumulativeWt_ + weight; + final double newWtMax = Math.max(wtMax_, weight); + final double newRho = Math.min(1.0 / newWtMax, k_ / newCumWt); + + if (cumulativeWt_ > 0.0) + sample_.downsample((newRho / rho_)); + + tmp_.replaceContent(item, newRho * weight); + sample_.merge(tmp_); + + cumulativeWt_ = newCumWt; + wtMax_ = newWtMax; + rho_ = newRho; + ++n_; + } + + /* Merging + * There is a trivial merge algorithm that involves downsampling each sketch A and B + * as A.cum_wt / (A.cum_wt + B.cum_wt) and B.cum_wt / (A.cum_wt + B.cum_wt), + * respectively. That merge does preserve first-order probabilities, specifically + * the probability proportional to size property, and like all other known merge + * algorithms distorts second-order probabilities (co-occurrences). There are + * pathological cases, most obvious with k=2 and A.cum_wt == B.cum_wt where that + * approach will always take exactly 1 item from A and 1 from B, meaning the + * co-occurrence rate for two items from either sketch is guaranteed to be 0.0. + * + * With EBPPS, once an item is accepted into the sketch we no longer need to + * track the item's weight: All accepted items are treated equally. As a result, we + * can take inspiration from the reservoir sampling merge in the datasketches-java + * library. We need to merge the smaller sketch into the larger one, swapping as + * needed to ensure that, at which point we simply call update() with the items + * in the smaller sketch as long as we adjust the weight appropriately. + * Merging smaller into larger is essential to ensure that no item has a + * contribution to C > 1.0. + */ + + /** + * Merges the provided sketch into the current one. + * @param other the sketch to merge into the current object + */ + public void merge(final EbppsItemsSketch other) { + if (other.getCumulativeWeight() == 0.0) return; + else if (other.getCumulativeWeight() > cumulativeWt_) { + // need to swap this with other + // make a copy of other, merge into it, and take the result + EbppsItemsSketch copy = new EbppsItemsSketch<>(other); + copy.internalMerge(this); + k_ = copy.k_; + n_ = copy.n_; + cumulativeWt_ = copy.cumulativeWt_; + wtMax_ = copy.wtMax_; + rho_ = copy.rho_; + sample_ = copy.sample_; + } else { + internalMerge(other); + } + } + + // merge implementation called exclusively from public merge() + private void internalMerge(EbppsItemsSketch other) { + // assumes that other.cumulativeWeight_ <= cumulativeWt_m + // which must be checked before calling this + + final double finalCumWt = cumulativeWt_ + other.cumulativeWt_; + final double newWtMax = Math.max(wtMax_, other.wtMax_); + k_ = Math.min(k_, other.k_); + final long newN = n_ + other.n_; + + // Insert other's items with the cumulative weight + // split between the input items. We repeat the same process + // for full items and the partial item, scaling the input + // weight appropriately. + // We handle all C input items, meaning we always process + // the partial item using a scaled down weight. + // Handling the partial item by probabilistically including + // it as a full item would be correct on average but would + // introduce bias for any specific merge operation. + final double avgWt = other.cumulativeWt_ / other.getC(); + ArrayList items = other.sample_.getFullItems(); + if (items != null) { + for (int i = 0; i < items.size(); ++i) { + // newWtMax is pre-computed + final double newCumWt = cumulativeWt_ + avgWt; + final double newRho = Math.min(1.0 / newWtMax, k_ / newCumWt); + + if (cumulativeWt_ > 0.0) + sample_.downsample(newRho / rho_); + + tmp_.replaceContent(items.get(i), newRho * avgWt); + sample_.merge(tmp_); + + cumulativeWt_ = newCumWt; + rho_ = newRho; + } + } + + // insert partial item with weight scaled by the fractional part of C + if (other.sample_.hasPartialItem()) { + final double otherCFrac = other.getC() % 1; + final double newCumWt = cumulativeWt_ + (otherCFrac * avgWt); + final double newRho = Math.min(1.0 / newWtMax, k_ / newCumWt); + + if (cumulativeWt_ > 0.0) + sample_.downsample(newRho / rho_); + + tmp_.replaceContent(other.sample_.getPartialItem(), newRho * otherCFrac * avgWt); + sample_.merge(tmp_); + + cumulativeWt_ = newCumWt; + rho_ = newRho; + } + + // avoid numeric issues by setting cumulative weight to the + // pre-computed value + cumulativeWt_ = finalCumWt; + n_ = newN; + } + + /** + * Returns a copy of the current sample. The exact size may be + * probabilsitic, differing by at most 1 item. + * @return the current sketch sample + */ + public ArrayList getResult() { return sample_.getSample(); } + + /** + * Provides a human-readable summary of the sketch + * @return a summary of information in the sketch + */ + @Override + public String toString() { + return null; + } + + /** + * Returns the configured maximum sample size. + * @return configured maximum sample size + */ + public int getK() { return k_; } + + /** + * Returns the number of items processed by the sketch, regardless + * of item weight. + * @return count of items processed by the sketch + */ + public long getN() { return n_; } + + /** + * Returns the cumulative weight of items processed by the sketch. + * @return cumulative weight of items seen + */ + public double getCumulativeWeight() { return cumulativeWt_; } + + /** + * Returns the expected number of samples returned upon a call to + * getResult(). The number is a floating point value, where the + * fractional portion represents the probability of including a + * "partial item" from the sample. + * + * The value C should be no larger than the sketch's configured + * value of k, although numerical precision limitations mean it + * may exceed k by double precision floating point error margins + * in certain cases. + * @return The expected number of samples returned when querying the sketch + */ + public double getC() { return sample_.getC(); } + + /** + * Returns true if the sketch is empty. + * @return empty flag + */ + public boolean isEmpty() { return n_ == 0; } + + /** + * Resets the sketch to its default, empty state. + */ + public void reset() { + n_ = 0; + cumulativeWt_ = 0.0; + wtMax_ = 0.0; + rho_ = 1.0; + sample_ = new EbppsItemsSample<>(k_); + } + + /** + * Returns the size of a byte array representation of this sketch. May fail for polymorphic item types. + * + * @param serDe An instance of ArrayOfItemsSerDe + * @return the length of a byte array representation of this sketch + */ + public int getSerializedSizeBytes(final ArrayOfItemsSerDe serDe) { + if (isEmpty()) + return Family.EBPPS.getMinPreLongs() << 3; + else if (sample_.getC() < 1.0) + return getSerializedSizeBytes(serDe, sample_.getPartialItem().getClass()); + else + return getSerializedSizeBytes(serDe, sample_.getSample().get(0).getClass()); + } + + /** + * Returns the length of a byte array representation of this sketch. Copies contents into an array of the + * specified class for serialization to allow for polymorphic types. + * + * @param serDe An instance of ArrayOfItemsSerDe + * @param clazz The class represented by <T> + * @return the length of a byte array representation of this sketch + */ + public int getSerializedSizeBytes(final ArrayOfItemsSerDe serDe, final Class clazz) { + if (n_ == 0) + return Family.EBPPS.getMinPreLongs() << 3; + + final int preLongs = Family.EBPPS.getMaxPreLongs(); + final byte[] itemBytes = serDe.serializeToByteArray(sample_.getAllSamples(clazz)); + // in C++, c_ is serialized as part of the sample_ and not included in the header size + return (preLongs << 3) + Double.BYTES + itemBytes.length; + } + + /** + * Returns a byte array representation of this sketch. May fail for polymorphic item types. + * + * @param serDe An instance of ArrayOfItemsSerDe + * @return a byte array representation of this sketch + */ + public byte[] toByteArray(final ArrayOfItemsSerDe serDe) { + if (n_ == 0) + // null class is ok since empty -- no need to call serDe + return toByteArray(serDe, null); + else if (sample_.getC() < 1.0) + return toByteArray(serDe, sample_.getPartialItem().getClass()); + else + return toByteArray(serDe, sample_.getSample().get(0).getClass()); + } + + /** + * Returns a byte array representation of this sketch. Copies contents into an array of the + * specified class for serialization to allow for polymorphic types. + * + * @param serDe An instance of ArrayOfItemsSerDe + * @param clazz The class represented by <T> + * @return a byte array representation of this sketch + */ + public byte[] toByteArray(final ArrayOfItemsSerDe serDe, final Class clazz) { + final int preLongs, outBytes; + final boolean empty = n_ == 0; + byte[] itemBytes = null; // for serialized items from sample_ + + if (empty) { + preLongs = 1; + outBytes = 8; + } else { + preLongs = Family.EBPPS.getMaxPreLongs(); + itemBytes = serDe.serializeToByteArray(sample_.getAllSamples(clazz)); + // in C++, c_ is serialized as part of the sample_ and not included in the header size + outBytes = (preLongs << 3) + Double.BYTES + itemBytes.length; + } + final byte[] outArr = new byte[outBytes]; + final WritableMemory mem = WritableMemory.writableWrap(outArr); + + // Common header elements + PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 + PreambleUtil.insertSerVer(mem, EBPPS_SER_VER); // Byte 1 + PreambleUtil.insertFamilyID(mem, Family.EBPPS.getID()); // Byte 2 + if (empty) { + PreambleUtil.insertFlags(mem, EMPTY_FLAG_MASK); // Byte 3 + } else { + PreambleUtil.insertFlags(mem, 0); + } + PreambleUtil.insertK(mem, k_); // Bytes 4-7 + + // conditional elements + if (!empty) { + PreambleUtil.insertN(mem, n_); + PreambleUtil.insertEbppsCumulativeWeight(mem, cumulativeWt_); + PreambleUtil.insertEbppsMaxWeight(mem, wtMax_); + PreambleUtil.insertEbppsRho(mem, rho_); + + // data from sample_ -- itemBytes includes the partial item + mem.putDouble(EBPPS_C_DOUBLE, sample_.getC()); + mem.putByteArray(EBPPS_ITEMS_START, itemBytes, 0, itemBytes.length); + } + + return outArr; + } + + private void checkK(final int k) { + if (k <= 0 || k > MAX_K) + throw new SketchesArgumentException("k must be strictly positive and less than " + MAX_K); + } +} diff --git a/src/main/java/org/apache/datasketches/sampling/PreambleUtil.java b/src/main/java/org/apache/datasketches/sampling/PreambleUtil.java index 453e3901e..454514fed 100644 --- a/src/main/java/org/apache/datasketches/sampling/PreambleUtil.java +++ b/src/main/java/org/apache/datasketches/sampling/PreambleUtil.java @@ -35,12 +35,10 @@ /** * This class defines the preamble items structure and provides basic utilities for some of the key - * fields. + * fields. Fields are presented in Little Endian format, but multi-byte values (int, long, double) + * are stored in native byte order. All byte values are treated as unsigned. * - *

- * MAP: Low significance bytes of this long items structure are on the right. However, the - * multi-byte integers (int and long) are stored in native byte order. The - * byte values are treated as unsigned.

+ * Reservoir Sampling * *

Sketch: The count of items seen is limited to 48 bits (~256 trillion) even * though there are adjacent unused preamble bits. The acceptance probability for an item is a @@ -54,11 +52,11 @@ *

  * Long || Start Byte Adr:
  * Adr:
- *      ||    7   |    6   |    5   |    4   |    3   |    2   |    1   |     0              |
- *  0   ||--------Reservoir Size (K)---------|  Flags | FamID  | SerVer |   Preamble_Longs   |
+ *      ||       0        |    1   |    2   |    3   |    4   |    5   |    6   |    7   |
+ *  0   || Preamble_Longs | SerVer | FamID  |  Flags |---------Max Res. Size (K)---------|
  *
- *      ||   15   |   14   |   13   |   12   |   11   |   10   |    9   |     8              |
- *  1   ||------------------------------Items Seen Count (N)---------------------------------|
+ *      ||       8        |    9   |   10   |   11   |   12   |   13   |   14   |   15   |
+ *  1   ||----------------------------Items Seen Count (N)-------------------------------|
  *  
* *

Union: The reservoir union has fewer internal parameters to track and uses @@ -74,10 +72,13 @@ *

  * Long || Start Byte Adr:
  * Adr:
- *      ||    7   |    6   |    5   |    4   |    3   |    2   |    1   |     0              |
- *  0   ||---------Max Res. Size (K)---------|  Flags | FamID  | SerVer |   Preamble_Longs   |
+ *      ||       0        |    1   |    2   |    3   |    4   |    5   |    6   |    7   |
+ *  0   || Preamble_Longs | SerVer | FamID  |  Flags |---------Max Res. Size (K)---------|
  * 
* + * + * VarOpt Sampling + * *

VarOpt: A VarOpt sketch has a more complex internal items structure and * requires a larger preamble. Values serving a similar purpose in both reservoir and varopt sampling * share the same byte ranges, allowing method re-use where practical.

@@ -88,21 +89,21 @@ *
  * Long || Start Byte Adr:
  * Adr:
- *      ||    7   |    6   |    5   |    4   |    3   |    2   |    1   |     0              |
- *  0   ||--------Reservoir Size (K)---------|  Flags | FamID  | SerVer |   Preamble_Longs   |
+ *      ||       0        |    1   |    2   |    3   |    4   |    5   |    6   |    7   |
+ *  0   || Preamble_Longs | SerVer | FamID  |  Flags |---------Max Res. Size (K)---------|
  *
- *      ||   15   |   14   |   13   |   12   |   11   |   10   |    9   |     8              |
- *  1   ||------------------------------Items Seen Count (N)---------------------------------|
+ *      ||       8        |    9   |   10   |   11   |   12   |   13   |   14   |   15   |
+ *  1   ||----------------------------Items Seen Count (N)-------------------------------|
  *
- *      ||   23   |   22   |   21   |   20   |   19   |   18   |   17   |    16              |
- *  2   ||---------Item Count in R-----------|-----------Item Count in H---------------------|
+ *      ||      16        |   17   |   18   |   19   |   20   |   21   |   22   |   23   |
+ *  2   ||--------Item Count in R-----------|-----------Item Count in H------------------|
  *
- *      ||   31   |   30   |   29   |   28   |   27   |   26   |   25   |    24              |
- *  3   ||--------------------------------Total Weight in R----------------------------------|
+ *      ||      24        |   25   |   26   |   27   |   28   |   29   |   30   |   31   |
+ *  3   ||------------------------------Total Weight in R--------------------------------|
  *  
* *

VarOpt Union: VarOpt unions also store more information than a reservoir - * sketch. As before, we keep values with similar o hte same meaning in corresponding locations + * sketch. As before, we keep values with similar to the same meaning in corresponding locations * actoss sketch and union formats. The items in the union are stored in a varopt sketch-compatible * format after the union preamble.

* @@ -111,19 +112,63 @@ *
  * Long || Start Byte Adr:
  * Adr:
- *      ||    7   |    6   |    5   |    4   |    3   |    2   |    1   |     0              |
- *  0   ||---------Max Res. Size (K)---------|  Flags | FamID  | SerVer |   Preamble_Longs   |
+ *      ||       0        |    1   |    2   |    3   |    4   |    5   |    6   |    7   |
+ *  0   || Preamble_Longs | SerVer | FamID  |  Flags |---------Max Res. Size (K)---------|
  *
- *      ||   15   |   14   |   13   |   12   |   11   |   10   |    9   |     8              |
- *  1   ||------------------------------Items Seen Count (N)---------------------------------|
+ *      ||       8        |    9   |   10   |   11   |   12   |   13   |   14   |   15   |
+ *  1   ||----------------------------Items Seen Count (N)-------------------------------|
  *
- *      ||   23   |   22   |   21   |   20   |   19   |   18   |   17   |    16              |
- *  2   ||---------------------------Outer Tau Numerator (double)----------------------------|
+ *      ||      16        |   17   |   18   |   19   |   20   |   21   |   22   |   23   |
+ *  2   ||-------------------------Outer Tau Numerator (double)--------------------------|
  *
- *      ||   31   |   30   |   29   |   28   |   27   |   26   |   25   |    24              |
- *  3   ||---------------------------Outer Tau Denominator (long)----------------------------|
+ *      ||      24        |   25   |   26   |   27   |   28   |   29   |   30   |   31   |
+ *  3   ||-------------------------Outer Tau Denominator (long)--------------------------|
  *  
* + * + * EPPS Sampling + * + * An empty sketch requires 8 bytes. + * + *
+ * Long || Start Byte Adr:
+ * Adr:
+ *      ||       0        |    1   |    2   |    3   |    4   |    5   |    6   |    7   |
+ *  0   || Preamble_Longs | SerVer | FamID  |  Flags |---------Max Res. Size (K)---------|
+ * 
+ * + * A non-empty sketch requires 40 bytes of preamble. C looks like part of + * the preamble but is treated as part of the sample state. + * + * The count of items seen is not used but preserved as the value seems like a useful + * count to track. + * + *
+ * Long || Start Byte Adr:
+ * Adr:
+ *      ||       0        |    1   |    2   |    3   |    4   |    5   |    6   |    7   |
+ *  0   || Preamble_Longs | SerVer | FamID  |  Flags |---------Max Res. Size (K)---------|
+ *
+ *      ||       8        |    9   |   10   |   11   |   12   |   13   |   14   |   15   |
+ *  1   ||---------------------------Items Seen Count (N)--------------------------------|
+ *
+ *      ||      16        |   17   |   18   |   19   |   20   |   21   |   22   |   23   |
+ *  2   ||----------------------------Cumulative Weight----------------------------------|
+ *
+ *      ||      24        |   25   |   26   |   27   |   28   |   29   |   30   |   31   |
+ *  3   ||-----------------------------Max Item Weight-----------------------------------|
+ *
+ *      ||      32        |   33   |   34   |   35   |   36   |   37   |   38   |   39   |
+ *  4   ||----------------------------------Rho------------------------------------------|
+ *
+ *      ||      40        |   41   |   42   |   43   |   44   |   45   |   46   |   47   |
+ *  5   ||-----------------------------------C-------------------------------------------|
+ *
+ *      ||      40+                      |
+ *  6+  ||  {Items Array}                |
+ *      ||  {Optional Item (if needed)}  |
+ * 
+ * * @author Jon Malkin * @author Lee Rhodes */ @@ -155,6 +200,11 @@ private PreambleUtil() {} static final int VO_PRELONGS_WARMUP = 3; // Doesn't match min or max prelongs in Family static final int VO_PRELONGS_FULL = Family.VAROPT.getMaxPreLongs(); + // constants and addresses used in EBPPS + static final int EBPPS_CUM_WT_DOUBLE = 16; + static final int EBPPS_MAX_WT_DOUBLE = 24; + static final int EBPPS_RHO_DOUBLE = 32; + // flag bit masks //static final int BIG_ENDIAN_FLAG_MASK = 1; //static final int READ_ONLY_FLAG_MASK = 2; @@ -162,7 +212,9 @@ private PreambleUtil() {} static final int GADGET_FLAG_MASK = 128; //Other constants - static final int SER_VER = 2; + static final int RESERVOIR_SER_VER = 2; + static final int VAROPT_SER_VER = 2; + static final int EBPPS_SER_VER = 1; static final boolean NATIVE_ORDER_IS_BIG_ENDIAN = (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN); @@ -378,6 +430,18 @@ static long extractOuterTauDenominator(final Memory mem) { return mem.getLong(OUTER_TAU_DENOM_LONG); } + static double extractEbppsCumulativeWeight(final Memory mem) { + return mem.getDouble(EBPPS_CUM_WT_DOUBLE); + } + + static double extractEbppsMaxWeight(final Memory mem) { + return mem.getDouble(EBPPS_MAX_WT_DOUBLE); + } + + static double extractEbppsRho(final Memory mem) { + return mem.getDouble(EBPPS_RHO_DOUBLE); + } + // Insertion methods static void insertPreLongs(final WritableMemory wmem, final int preLongs) { @@ -439,6 +503,18 @@ static void insertOuterTauDenominator(final WritableMemory wmem, final long deno wmem.putLong(OUTER_TAU_DENOM_LONG, denom); } + static void insertEbppsCumulativeWeight(final WritableMemory wmem, final double cumWt) { + wmem.putDouble(EBPPS_CUM_WT_DOUBLE, cumWt); + } + + static void insertEbppsMaxWeight(final WritableMemory wmem, final double maxWt) { + wmem.putDouble(EBPPS_MAX_WT_DOUBLE, maxWt); + } + + static void insertEbppsRho(final WritableMemory wmem, final double rho) { + wmem.putDouble(EBPPS_RHO_DOUBLE, rho); + } + /** * Checks Memory for capacity to hold the preamble and returns the extracted preLongs. * @param mem the given Memory diff --git a/src/main/java/org/apache/datasketches/sampling/ReservoirItemsSketch.java b/src/main/java/org/apache/datasketches/sampling/ReservoirItemsSketch.java index dc21d5ef7..030728fea 100644 --- a/src/main/java/org/apache/datasketches/sampling/ReservoirItemsSketch.java +++ b/src/main/java/org/apache/datasketches/sampling/ReservoirItemsSketch.java @@ -22,7 +22,7 @@ import static org.apache.datasketches.common.Util.LS; import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; import static org.apache.datasketches.sampling.PreambleUtil.FAMILY_BYTE; -import static org.apache.datasketches.sampling.PreambleUtil.SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.RESERVOIR_SER_VER; import static org.apache.datasketches.sampling.PreambleUtil.extractEncodedReservoirSize; import static org.apache.datasketches.sampling.PreambleUtil.extractFlags; import static org.apache.datasketches.sampling.PreambleUtil.extractK; @@ -229,13 +229,13 @@ public static ReservoirItemsSketch heapify(final Memory srcMem, + Family.RESERVOIR.getMinPreLongs() + " preLong(s)"); } - if (serVer != SER_VER) { + if (serVer != RESERVOIR_SER_VER) { if (serVer == 1) { final short encK = extractEncodedReservoirSize(srcMem); k = ReservoirSize.decodeValue(encK); } else { throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + SER_VER + ": " + serVer); + "Possible Corruption: Ser Ver must be " + RESERVOIR_SER_VER + ": " + serVer); } } @@ -479,7 +479,7 @@ public byte[] toByteArray(final ArrayOfItemsSerDe serDe, final Class< // Common header elements PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 PreambleUtil.insertLgResizeFactor(mem, rf_.lg()); - PreambleUtil.insertSerVer(mem, SER_VER); // Byte 1 + PreambleUtil.insertSerVer(mem, RESERVOIR_SER_VER); // Byte 1 PreambleUtil.insertFamilyID(mem, Family.RESERVOIR.getID()); // Byte 2 if (empty) { PreambleUtil.insertFlags(mem, EMPTY_FLAG_MASK); // Byte 3 diff --git a/src/main/java/org/apache/datasketches/sampling/ReservoirItemsUnion.java b/src/main/java/org/apache/datasketches/sampling/ReservoirItemsUnion.java index 9f4bd5f33..74a52e492 100644 --- a/src/main/java/org/apache/datasketches/sampling/ReservoirItemsUnion.java +++ b/src/main/java/org/apache/datasketches/sampling/ReservoirItemsUnion.java @@ -22,7 +22,7 @@ import static org.apache.datasketches.common.Util.LS; import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; import static org.apache.datasketches.sampling.PreambleUtil.FAMILY_BYTE; -import static org.apache.datasketches.sampling.PreambleUtil.SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.RESERVOIR_SER_VER; import static org.apache.datasketches.sampling.PreambleUtil.extractEncodedReservoirSize; import static org.apache.datasketches.sampling.PreambleUtil.extractFlags; import static org.apache.datasketches.sampling.PreambleUtil.extractMaxK; @@ -109,13 +109,13 @@ public static ReservoirItemsUnion heapify(final Memory srcMem, + Family.RESERVOIR_UNION.getMinPreLongs() + "preLongs"); } - if (serVer != SER_VER) { + if (serVer != RESERVOIR_SER_VER) { if (serVer == 1) { final short encMaxK = extractEncodedReservoirSize(srcMem); maxK = ReservoirSize.decodeValue(encMaxK); } else { throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + SER_VER + ": " + serVer); + "Possible Corruption: Ser Ver must be " + RESERVOIR_SER_VER + ": " + serVer); } } @@ -304,7 +304,7 @@ public byte[] toByteArray(final ArrayOfItemsSerDe serDe, final Class clazz // build preLong PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 - PreambleUtil.insertSerVer(mem, SER_VER); // Byte 1 + PreambleUtil.insertSerVer(mem, RESERVOIR_SER_VER); // Byte 1 PreambleUtil.insertFamilyID(mem, Family.RESERVOIR_UNION.getID()); // Byte 2 if (empty) { PreambleUtil.insertFlags(mem, EMPTY_FLAG_MASK); diff --git a/src/main/java/org/apache/datasketches/sampling/ReservoirLongsSketch.java b/src/main/java/org/apache/datasketches/sampling/ReservoirLongsSketch.java index acdfd303e..55ca558e4 100644 --- a/src/main/java/org/apache/datasketches/sampling/ReservoirLongsSketch.java +++ b/src/main/java/org/apache/datasketches/sampling/ReservoirLongsSketch.java @@ -22,7 +22,7 @@ import static org.apache.datasketches.common.Util.LS; import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; import static org.apache.datasketches.sampling.PreambleUtil.FAMILY_BYTE; -import static org.apache.datasketches.sampling.PreambleUtil.SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.RESERVOIR_SER_VER; import static org.apache.datasketches.sampling.PreambleUtil.extractEncodedReservoirSize; import static org.apache.datasketches.sampling.PreambleUtil.extractFlags; import static org.apache.datasketches.sampling.PreambleUtil.extractK; @@ -209,13 +209,13 @@ public static ReservoirLongsSketch heapify(final Memory srcMem) { + Family.RESERVOIR.getMinPreLongs() + "preLongs"); } - if (serVer != SER_VER) { + if (serVer != RESERVOIR_SER_VER) { if (serVer == 1) { final short encK = extractEncodedReservoirSize(srcMem); k = ReservoirSize.decodeValue(encK); } else { throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + SER_VER + ": " + serVer); + "Possible Corruption: Ser Ver must be " + RESERVOIR_SER_VER + ": " + serVer); } } @@ -407,7 +407,7 @@ public byte[] toByteArray() { // build first preLong PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 PreambleUtil.insertLgResizeFactor(mem, rf_.lg()); - PreambleUtil.insertSerVer(mem, SER_VER); // Byte 1 + PreambleUtil.insertSerVer(mem, RESERVOIR_SER_VER); // Byte 1 PreambleUtil.insertFamilyID(mem, Family.RESERVOIR.getID()); // Byte 2 if (empty) { PreambleUtil.insertFlags(mem, EMPTY_FLAG_MASK); // Byte 3 diff --git a/src/main/java/org/apache/datasketches/sampling/ReservoirLongsUnion.java b/src/main/java/org/apache/datasketches/sampling/ReservoirLongsUnion.java index f8d9d1eb0..c3ef33957 100644 --- a/src/main/java/org/apache/datasketches/sampling/ReservoirLongsUnion.java +++ b/src/main/java/org/apache/datasketches/sampling/ReservoirLongsUnion.java @@ -22,7 +22,7 @@ import static org.apache.datasketches.common.Util.LS; import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; import static org.apache.datasketches.sampling.PreambleUtil.FAMILY_BYTE; -import static org.apache.datasketches.sampling.PreambleUtil.SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.RESERVOIR_SER_VER; import static org.apache.datasketches.sampling.PreambleUtil.extractEncodedReservoirSize; import static org.apache.datasketches.sampling.PreambleUtil.extractFlags; import static org.apache.datasketches.sampling.PreambleUtil.extractMaxK; @@ -100,13 +100,13 @@ public static ReservoirLongsUnion heapify(final Memory srcMem) { + Family.RESERVOIR_UNION.getMinPreLongs() + "preLongs"); } - if (serVer != SER_VER) { + if (serVer != RESERVOIR_SER_VER) { if (serVer == 1) { final short encMaxK = extractEncodedReservoirSize(srcMem); maxK = ReservoirSize.decodeValue(encMaxK); } else { throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + SER_VER + ": " + serVer); + "Possible Corruption: Ser Ver must be " + RESERVOIR_SER_VER + ": " + serVer); } } @@ -257,7 +257,7 @@ public byte[] toByteArray() { // construct header PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 - PreambleUtil.insertSerVer(mem, SER_VER); // Byte 1 + PreambleUtil.insertSerVer(mem, RESERVOIR_SER_VER); // Byte 1 PreambleUtil.insertFamilyID(mem, Family.RESERVOIR_UNION.getID()); // Byte 2 if (empty) { PreambleUtil.insertFlags(mem, EMPTY_FLAG_MASK); // Byte 3 diff --git a/src/main/java/org/apache/datasketches/sampling/VarOptItemsSketch.java b/src/main/java/org/apache/datasketches/sampling/VarOptItemsSketch.java index e43ff6907..c0c56ba3e 100644 --- a/src/main/java/org/apache/datasketches/sampling/VarOptItemsSketch.java +++ b/src/main/java/org/apache/datasketches/sampling/VarOptItemsSketch.java @@ -22,7 +22,7 @@ import static org.apache.datasketches.common.Util.LS; import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; import static org.apache.datasketches.sampling.PreambleUtil.GADGET_FLAG_MASK; -import static org.apache.datasketches.sampling.PreambleUtil.SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.VAROPT_SER_VER; import static org.apache.datasketches.sampling.PreambleUtil.TOTAL_WEIGHT_R_DOUBLE; import static org.apache.datasketches.sampling.PreambleUtil.VO_PRELONGS_EMPTY; import static org.apache.datasketches.sampling.PreambleUtil.VO_PRELONGS_FULL; @@ -291,9 +291,9 @@ public static VarOptItemsSketch heapify(final Memory srcMem, + " or " + VO_PRELONGS_FULL + " for a non-empty sketch. Found: " + numPreLongs); } } - if (serVer != SER_VER) { + if (serVer != VAROPT_SER_VER) { throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + SER_VER + ": " + serVer); + "Possible Corruption: Ser Ver must be " + VAROPT_SER_VER + ": " + serVer); } final int reqFamilyId = Family.VAROPT.getID(); if (familyId != reqFamilyId) { @@ -587,7 +587,7 @@ public byte[] toByteArray(final ArrayOfItemsSerDe serDe, final Class< // build first preLong PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 PreambleUtil.insertLgResizeFactor(mem, rf_.lg()); - PreambleUtil.insertSerVer(mem, SER_VER); // Byte 1 + PreambleUtil.insertSerVer(mem, VAROPT_SER_VER); // Byte 1 PreambleUtil.insertFamilyID(mem, Family.VAROPT.getID()); // Byte 2 PreambleUtil.insertFlags(mem, flags); // Byte 3 PreambleUtil.insertK(mem, k_); // Bytes 4-7 diff --git a/src/main/java/org/apache/datasketches/sampling/VarOptItemsUnion.java b/src/main/java/org/apache/datasketches/sampling/VarOptItemsUnion.java index d21c716aa..1f1215d70 100644 --- a/src/main/java/org/apache/datasketches/sampling/VarOptItemsUnion.java +++ b/src/main/java/org/apache/datasketches/sampling/VarOptItemsUnion.java @@ -22,7 +22,7 @@ import static org.apache.datasketches.common.Util.LS; import static org.apache.datasketches.sampling.PreambleUtil.EMPTY_FLAG_MASK; import static org.apache.datasketches.sampling.PreambleUtil.FAMILY_BYTE; -import static org.apache.datasketches.sampling.PreambleUtil.SER_VER; +import static org.apache.datasketches.sampling.PreambleUtil.VAROPT_SER_VER; import static org.apache.datasketches.sampling.PreambleUtil.extractFlags; import static org.apache.datasketches.sampling.PreambleUtil.extractMaxK; import static org.apache.datasketches.sampling.PreambleUtil.extractN; @@ -165,9 +165,9 @@ public static VarOptItemsUnion heapify(final Memory srcMem, outerTauDenom = extractOuterTauDenominator(srcMem); } - if (serVer != SER_VER) { + if (serVer != VAROPT_SER_VER) { throw new SketchesArgumentException( - "Possible Corruption: Ser Ver must be " + SER_VER + ": " + serVer); + "Possible Corruption: Ser Ver must be " + VAROPT_SER_VER + ": " + serVer); } final boolean preLongsEqMin = (numPreLongs == Family.VAROPT_UNION.getMinPreLongs()); @@ -332,7 +332,7 @@ public byte[] toByteArray(final ArrayOfItemsSerDe serDe, final Class clazz // build preLong PreambleUtil.insertPreLongs(mem, preLongs); // Byte 0 - PreambleUtil.insertSerVer(mem, SER_VER); // Byte 1 + PreambleUtil.insertSerVer(mem, VAROPT_SER_VER); // Byte 1 PreambleUtil.insertFamilyID(mem, Family.VAROPT_UNION.getID()); // Byte 2 if (empty) { PreambleUtil.insertFlags(mem, EMPTY_FLAG_MASK); diff --git a/src/test/java/org/apache/datasketches/sampling/EbppsSampleTest.java b/src/test/java/org/apache/datasketches/sampling/EbppsSampleTest.java new file mode 100644 index 000000000..e00c216ae --- /dev/null +++ b/src/test/java/org/apache/datasketches/sampling/EbppsSampleTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datasketches.sampling; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Random; + +import org.testng.annotations.Test; + +public class EbppsSampleTest { + private static final double EPS = 1e-14; + + @Test + public void basicInitialization() { + EbppsItemsSample sample = new EbppsItemsSample<>(0); + assertEquals(sample.getC(), 0.0); + assertEquals(sample.getNumRetainedItems(), 0); + assertNull(sample.getSample()); + } + + @Test + public void initializeWithData() { + final double theta1 = 1.0; + EbppsItemsSample sample = new EbppsItemsSample<>(1); + sample.replaceContent(-1, theta1); + assertEquals(sample.getC(), theta1); + assertEquals(sample.getNumRetainedItems(), 1); + assertEquals(sample.getSample().size(), 1); + assertEquals(sample.getSample().get(0), -1); + assertFalse(sample.hasPartialItem()); + + final double theta2 = 1e-300; + sample.replaceContent(-2, theta2); + assertEquals(sample.getC(), theta2); + assertEquals(sample.getNumRetainedItems(), 1); + // next check assumes random number is > 1e-300 + assertNull(sample.getSample()); + assertTrue(sample.hasPartialItem()); + } + + @Test + public void downsampleToZeroOrOneItem() { + EbppsItemsSample sample = new EbppsItemsSample<>(1); + sample.replaceContent("a", 1.0); + + sample.downsample(2.0); // no-op + assertEquals(sample.getC(), 1.0); + assertEquals(sample.getNumRetainedItems(), 1); + assertEquals(sample.getSample().get(0), "a"); + assertFalse(sample.hasPartialItem()); + + // downsample and result in an empty sample + ArrayList items = new ArrayList<>(Arrays.asList("a", "b")); + sample = new EbppsItemsSample<>(items, null, 1.8); + sample.replaceRandom(new Random(85942)); + sample.downsample(0.5); + assertEquals(sample.getC(), 0.9); + assertEquals(sample.getNumRetainedItems(), 0); + assertNull(sample.getSample()); + assertFalse(sample.hasPartialItem()); + + // downsample and result in a sample with a partial item + // create a new ArrayList each time to be sure it's clean + items = new ArrayList<>(Arrays.asList("a", "b")); + sample = new EbppsItemsSample<>(items, null, 1.5); + sample.replaceRandom(new Random(15)); + sample.downsample(0.5); + assertEquals(sample.getC(), 0.75); + assertEquals(sample.getNumRetainedItems(), 1); + assertTrue(sample.hasPartialItem()); + for (String s : sample.getSample()) { + assertTrue("a".equals(s) || "b".equals(s)); + } + } + + @Test + public void downsampleMultipleItems() { + // downsample to an exact integer c (7.5 * 0.8 = 6.0) + ArrayList items = new ArrayList<>(Arrays.asList("a", "b", "c", "d", "e", "f", "g")); + String partial = "h"; + ArrayList referenceItems = new ArrayList<>(items); // copy of inputs + referenceItems.add("h"); // include the partial item + + EbppsItemsSample sample = new EbppsItemsSample<>(items, partial, 7.5); + sample.downsample(0.8); + assertEquals(sample.getC(), 6.0); + assertEquals(sample.getNumRetainedItems(), 6); + assertFalse(sample.hasPartialItem()); + for (String s : sample.getSample()) { + assertTrue(referenceItems.contains(s)); + } + + // downsample to c > 1 with partial item + items = new ArrayList<>(referenceItems); // includes previous optional + partial = "i"; + referenceItems.add("i"); + sample = new EbppsItemsSample<>(items, partial, 8.5); + sample.downsample(0.8); + assertEquals(sample.getC(), 6.8, EPS); + assertEquals(sample.getNumRetainedItems(), 7); + assertTrue(sample.hasPartialItem()); + for (String s : sample.getSample()) { + assertTrue(referenceItems.contains(s)); + } + } + + @Test + public void mergeUnitSamples() { + int k = 8; + EbppsItemsSample sample = new EbppsItemsSample<>(k); + EbppsItemsSample s = new EbppsItemsSample<>(1); + + for (int i = 1; i <= k; ++i) { + s.replaceContent(i, 1.0); + sample.merge(s); + assertEquals(sample.getC(), (double) i); + assertEquals(sample.getNumRetainedItems(), i); + } + + sample.reset(); + assertEquals(sample.getC(), 0.0); + assertEquals(sample.getNumRetainedItems(), 0); + assertFalse(sample.hasPartialItem()); + } +} diff --git a/src/test/java/org/apache/datasketches/sampling/EbppsSketchTest.java b/src/test/java/org/apache/datasketches/sampling/EbppsSketchTest.java new file mode 100644 index 000000000..dde0db669 --- /dev/null +++ b/src/test/java/org/apache/datasketches/sampling/EbppsSketchTest.java @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.datasketches.sampling; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +import java.util.ArrayList; + +import org.apache.datasketches.common.ArrayOfLongsSerDe; +import org.apache.datasketches.common.ArrayOfStringsSerDe; +import org.apache.datasketches.common.SketchesArgumentException; +import org.apache.datasketches.memory.Memory; +import org.apache.datasketches.memory.WritableMemory; +import org.testng.annotations.Test; + +public class EbppsSketchTest { + private static final double EPS = 1e-13; + + static EbppsItemsSketch createUnweightedSketch(int k, long n) { + EbppsItemsSketch sk = new EbppsItemsSketch<>(k); + for (long i = 0; i < n; ++i) { + sk.update((int) i); + } + return sk; + } + + static void checkIfEqual(EbppsItemsSketch sk1, EbppsItemsSketch sk2) { + assertEquals(sk1.getK(), sk2.getK()); + assertEquals(sk1.getN(), sk2.getN()); + assertEquals(sk1.getC(), sk2.getC()); + assertEquals(sk1.getCumulativeWeight(), sk2.getCumulativeWeight()); + + // results may validly differ in size based on presence of partial items + ArrayList sample1 = sk1.getResult(); + ArrayList sample2 = sk2.getResult(); + + if (sk1.getC() < 1.0) { + if (sample1 != null && sample2 != null) { + assertEquals(sample1.size(), sample2.size()); + assertEquals(sample1.get(0), sample2.get(0)); + } + // nothing to test if one is null and the other isn't + } else { + // sk1.getC() >= 1.0 and sk2.getC() >= 1.0 (they're equal per above) + // so the samples shouldn't be null + assertTrue(sample1 != null && sample2 != null); + final int len = Math.min(sample1.size(), sample2.size()); + for (int i = 0; i < len; ++i) { + assertEquals(sample1.get(i), sample2.get(i)); + } + assertTrue((len == Math.floor(sk1.getC()) || len == Math.ceil(sk1.getC()))); + + // if c != floor(c) one sketch may not have reached the end, + // but that's not reliably testable from the external API + } + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void checkZeroK() { + new EbppsItemsSketch(0); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void checkTooBigK() { + new EbppsItemsSketch(Integer.MAX_VALUE); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void checkNegativeWeight() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(1); + sk.update("a", -1.0); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void checkInfiniteWeight() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(1); + sk.update("a", Double.POSITIVE_INFINITY); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void checkNaNWeight() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(1); + sk.update("a", Double.NaN); + } + + @Test + public void insertItems() { + int n = 0; + final int k = 5; + + // empty sketch + EbppsItemsSketch sk = createUnweightedSketch(k, n); + assertEquals(sk.getK(), k); + assertEquals(sk.getN(), 0); + assertEquals(sk.getC(), 0.0); + assertEquals(sk.getCumulativeWeight(), 0.0); + assertTrue(sk.isEmpty()); + + // exact mode + n = k; + sk = createUnweightedSketch(k, n); + assertFalse(sk.isEmpty()); + assertEquals(sk.getN(), n); + assertEquals(sk.getC(), (double) k); + assertEquals(sk.getCumulativeWeight(), (double) n); + assertEquals(sk.getResult().size(), sk.getK()); + for (Integer val : sk.getResult()) + assertTrue(val < n); + + // sampling mode with uniform eights + n = k * 10; + sk = createUnweightedSketch(k, n); + assertFalse(sk.isEmpty()); + assertEquals(sk.getN(), n); + assertEquals(sk.getCumulativeWeight(), (double) n); + assertEquals(sk.getC(), (double) k, EPS); + assertEquals(sk.getResult().size(), sk.getK()); + for (Integer val : sk.getResult()) + assertTrue(val < n); + + // add a very heavy item + sk.update(n, (double) n); + assertTrue(sk.getC() < sk.getK()); + } + + @Test + public void mergeSmallIntoLarge() { + final int k = 100; + + final EbppsItemsSketch sk1 = createUnweightedSketch(k, k); + final EbppsItemsSketch sk2 = new EbppsItemsSketch<>(k / 2); + sk2.update(-1, k / 10.0); // on eheavy item, but less than sk1 weight + + sk1.merge(sk2); + assertEquals(sk1.getK(), k / 2); + assertEquals(sk1.getN(), k + 1); + assertTrue(sk1.getC() < k); + assertEquals(sk1.getCumulativeWeight(), 1.1 * k, EPS); + } + + @Test + public void mergeLargeIntoSmall() { + final int k = 100; + + EbppsItemsSketch sk1 = new EbppsItemsSketch<>(k / 2); + sk1.update(-1, k / 4.0); + sk1.update(-2, k / 8.0); + EbppsItemsSketch sk2 = createUnweightedSketch(k, k); + assertEquals(sk2.getN(), k); + assertEquals(sk2.getC(), k, EPS); + + sk1.merge(sk2); + assertEquals(sk1.getK(), k / 2); + assertEquals(sk1.getN(), k + 2); + assertTrue(sk1.getC() < k); + // cumulative weight is now (1 + 0.25 + 0.125)k = 1.375k + assertEquals(sk1.getCumulativeWeight(), 1.375 * k, EPS); + } + + @Test + public void serializeDeserializeString() { + // since C <= k we don't have the usual sketch notion of exact vs estimation + // mode at any time. The only real serializaiton cases are empty and non-empty + // with and without a partial item + final int k = 10; + EbppsItemsSketch sk = new EbppsItemsSketch<>(k); + + // empty + byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); + assertEquals(bytes.length, sk.getSerializedSizeBytes(new ArrayOfStringsSerDe())); + Memory mem = Memory.wrap(bytes); + EbppsItemsSketch sk_heapify = EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); + checkIfEqual(sk, sk_heapify); + + // add uniform items + for (int i = 0; i < k; ++i) + sk.update(Integer.toString(i)); + + // non-empty, no partial item + bytes = sk.toByteArray(new ArrayOfStringsSerDe()); + assertEquals(bytes.length, sk.getSerializedSizeBytes(new ArrayOfStringsSerDe())); + mem = Memory.wrap(bytes); + sk_heapify = EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); + checkIfEqual(sk, sk_heapify); + + // non-empty with partial item + sk.update(Integer.toString(2 * k), 2.5); + assertEquals(sk.getCumulativeWeight(), k + 2.5, EPS); + bytes = sk.toByteArray(new ArrayOfStringsSerDe()); + assertEquals(bytes.length, sk.getSerializedSizeBytes(new ArrayOfStringsSerDe())); + mem = Memory.wrap(bytes); + sk_heapify = EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); + checkIfEqual(sk, sk_heapify); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void deserializeZeroK() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(5); + final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); + final WritableMemory mem = WritableMemory.writableWrap(bytes); + PreambleUtil.insertK(mem, 0); + EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void deserializeTooLargeK() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(5); + final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); + final WritableMemory mem = WritableMemory.writableWrap(bytes); + PreambleUtil.insertK(mem, Integer.MAX_VALUE); + EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void deserializeBadSerVer() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(5); + final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); + final WritableMemory mem = WritableMemory.writableWrap(bytes); + PreambleUtil.insertSerVer(mem, -1); + EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void deserializeBadFamily() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(5); + final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); + final WritableMemory mem = WritableMemory.writableWrap(bytes); + PreambleUtil.insertFamilyID(mem, 0); + EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void deserializeNegativeN() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(5); + for (int i = 0; i < 10; ++i) sk.update(Integer.toString(i)); + final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); + final WritableMemory mem = WritableMemory.writableWrap(bytes); + PreambleUtil.insertN(mem, -1000); + EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void deserializeNaNCumulativeWeight() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(5); + for (int i = 0; i < 10; ++i) sk.update(Integer.toString(i)); + final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); + final WritableMemory mem = WritableMemory.writableWrap(bytes); + PreambleUtil.insertEbppsCumulativeWeight(mem, Double.NaN); + EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void deserializeInfiniteMaxWeight() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(5); + for (int i = 0; i < 10; ++i) sk.update(Integer.toString(i)); + final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); + final WritableMemory mem = WritableMemory.writableWrap(bytes); + PreambleUtil.insertEbppsMaxWeight(mem, Double.POSITIVE_INFINITY); + EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void deserializeNegativeRho() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(5); + for (int i = 0; i < 10; ++i) sk.update(Integer.toString(i)); + final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); + final WritableMemory mem = WritableMemory.writableWrap(bytes); + PreambleUtil.insertEbppsRho(mem, -0.1); + EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void deserializeNegativeC() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(5); + for (int i = 0; i < 10; ++i) sk.update(Integer.toString(i)); + final byte[] bytes = sk.toByteArray(new ArrayOfStringsSerDe()); + final WritableMemory mem = WritableMemory.writableWrap(bytes); + mem.putDouble(40, -2.0); // from the defined spec + EbppsItemsSketch.heapify(mem, new ArrayOfStringsSerDe()); + } + + @Test(expectedExceptions = SketchesArgumentException.class) + public void deserializeTooShort() { + EbppsItemsSketch sk = new EbppsItemsSketch<>(5); + for (long i = 0; i < 10; ++i) sk.update(i); + final byte[] bytes = sk.toByteArray(new ArrayOfLongsSerDe()); + final WritableMemory mem = WritableMemory.writableWrap(bytes); + final Memory shortMem = mem.region(0, mem.getCapacity() - 1); + EbppsItemsSketch.heapify(shortMem, new ArrayOfStringsSerDe()); + } +}