Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Weighted updates for kll items sketch #498

Merged
merged 3 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,5 +111,5 @@ public int sizeOf(final T[] items) {
* Returns the concrete class of type T
* @return the concrete class of type T
*/
public abstract Class<?> getClassOfT();
public abstract Class<T> getClassOfT();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@

import org.apache.datasketches.memory.WritableMemory;

//
//
/**
* Static methods to support KllDoublesSketch
* @author Kevin Lang
* @author Alexander Saydakov
*/
//
final class KllDoublesHelper {

/**
Expand Down Expand Up @@ -338,6 +338,7 @@ static void updateDouble(final KllDoublesSketch dblSk, final double item, final
} else {
dblSk.updateMinMax(item);
final KllHeapDoublesSketch tmpSk = new KllHeapDoublesSketch(dblSk.getK(), DEFAULT_M, item, weight);

dblSk.merge(tmpSk);
}
}
Expand Down Expand Up @@ -471,7 +472,7 @@ private static void populateDoubleWorkArrays( //workBuf and workLevels are modif

workLevels[0] = 0;

// Note: the level zero data from "other" was already inserted into "self",
// Note: the level zero data from "other" was already inserted into "self".
// This copies into workbuf.
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[0], workBuf, workLevels[0], selfPopZero);
Expand All @@ -481,7 +482,7 @@ private static void populateDoubleWorkArrays( //workBuf and workLevels are modif
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
workLevels[lvl + 1] = workLevels[lvl] + selfPop + otherPop;

assert selfPop >= 0 && otherPop >= 0;
if (selfPop == 0 && otherPop == 0) { continue; }
else if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurDoubleItemsArr, myCurLevelsArr[lvl], workBuf, workLevels[lvl], selfPop);
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/org/apache/datasketches/kll/KllFloatsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@

import org.apache.datasketches.memory.WritableMemory;

//
//
/**
* Static methods to support KllFloatsSketch
* @author Kevin Lang
* @author Alexander Saydakov
*/
//
final class KllFloatsHelper {

/**
Expand Down Expand Up @@ -338,6 +338,7 @@ static void updateFloat(final KllFloatsSketch fltSk, final float item, final int
} else {
fltSk.updateMinMax(item);
final KllHeapFloatsSketch tmpSk = new KllHeapFloatsSketch(fltSk.getK(), DEFAULT_M, item, weight);

fltSk.merge(tmpSk);
}
}
Expand Down Expand Up @@ -471,7 +472,7 @@ private static void populateFloatWorkArrays(

worklevels[0] = 0;

// Note: the level zero data from "other" was already inserted into "self"
// Note: the level zero data from "other" was already inserted into "self".
// This copies into workbuf.
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy( myCurFloatItemsArr, myCurLevelsArr[0], workbuf, worklevels[0], selfPopZero);
Expand All @@ -481,14 +482,14 @@ private static void populateFloatWorkArrays(
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
worklevels[lvl + 1] = worklevels[lvl] + selfPop + otherPop;

assert selfPop >= 0 && otherPop >= 0;
if (selfPop == 0 && otherPop == 0) { continue; }
if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurFloatItemsArr, myCurLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
}
}
else if (selfPop == 0 && otherPop > 0) {
System.arraycopy(otherFloatItemsArr, otherLevelsArr[lvl], workbuf, worklevels[lvl], otherPop);
}
}
else if (selfPop > 0 && otherPop > 0) {
mergeSortedFloatArrays(
myCurFloatItemsArr, myCurLevelsArr[lvl], selfPop,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ final class KllHeapDoublesSketch extends KllDoublesSketch {
*
* @param k parameter that controls size of the sketch and accuracy of estimates.
* <em>k</em> can be between <em>m</em> and 65535, inclusive.
* The default <em>k</em> = 200 results in a normalized rank error of about 1.65%.
* Larger <em>k</em> will have smaller error but the sketch will be larger (and slower).
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ final class KllHeapFloatsSketch extends KllFloatsSketch {
*
* @param k parameter that controls size of the sketch and accuracy of estimates.
* <em>k</em> can be between <em>m</em> and 65535, inclusive.
* The default <em>k</em> = 200 results in a normalized rank error of about 1.65%.
* Larger <em>k</em> will have smaller error but the sketch will be larger (and slower).
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
Expand Down
50 changes: 39 additions & 11 deletions src/main/java/org/apache/datasketches/kll/KllHeapItemsSketch.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_EMPTY;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_FULL;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.COMPACT_SINGLE;
import static org.apache.datasketches.kll.KllSketch.SketchStructure.UPDATABLE;

import java.lang.reflect.Array;
import java.util.Comparator;
Expand All @@ -34,6 +35,14 @@
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.memory.WritableMemory;

/**
* This class implements an on-heap doubles KllSketch.
*
* <p>Please refer to the documentation in the package-info:<br>
* {@link org.apache.datasketches.kll}</p>
*
* @author Lee Rhodes, Kevin Lang
*/
@SuppressWarnings("unchecked")
final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
private final int k; // configured size of K.
Expand All @@ -46,14 +55,17 @@ final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
private Object[] itemsArr;

/**
* Constructs a new empty instance of this sketch on the Java heap.
* New instance heap constructor.
* @param k parameter that controls size of the sketch and accuracy of estimates.
* @param m parameter controls the minimum level width in items. It can be 2, 4, 6 or 8.
* The DEFAULT_M, which is 8 is recommended. Other sizes of <em>m</em> should be considered
* experimental as they have not been as well characterized.
* @param comparator user specified comparator of type T.
* @param serDe serialization / deserialization class
*/
KllHeapItemsSketch(
final int k,
final int m,
final Comparator<? super T> comparator,
KllHeapItemsSketch(final int k, final int m, final Comparator<? super T> comparator,
final ArrayOfItemsSerDe<T> serDe) {
super(SketchStructure.UPDATABLE, comparator, serDe);
super(UPDATABLE, comparator, serDe);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.levelsArr = new int[] {k, k};
Expand All @@ -69,11 +81,27 @@ final class KllHeapItemsSketch<T> extends KllItemsSketch<T> {
}

/**
* The Heapify constructor, which constructs an image of this sketch from
* a Memory (or WritableMemory) object that was created by this sketch
* and has a type T consistent with the given comparator and serDe.
* Once the data from the given Memory has been transferred into this heap sketch,
* the reference to the Memory object is no longer retained.
* Used for creating a temporary sketch for use with weighted updates.
*/
KllHeapItemsSketch(final int k, final int m, final T item, final int weight, final Comparator<? super T> comparator,
final ArrayOfItemsSerDe<T> serDe) {
super(UPDATABLE, comparator, serDe);
KllHelper.checkM(m);
KllHelper.checkK(k, m);
this.levelsArr = KllHelper.createLevelsArray(weight);
this.readOnly = false;
this.k = k;
this.m = m;
this.n = weight;
this.minK = k;
this.isLevelZeroSorted = false;
this.minItem = item;
this.maxItem = item;
this.itemsArr = KllItemsHelper.createItemsArray(serDe.getClassOfT(), item, weight);
}

/**
* The Heapify constructor
* @param srcMem the Source Memory image that contains data.
* @param comparator the comparator for this sketch and given Memory.
* @param serDe the serializer / deserializer for this sketch and the given Memory.
Expand Down
88 changes: 58 additions & 30 deletions src/main/java/org/apache/datasketches/kll/KllItemsHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import static java.lang.Math.max;
import static java.lang.Math.min;
import static java.lang.reflect.Array.newInstance;
import static org.apache.datasketches.common.Util.isEven;
import static org.apache.datasketches.common.Util.isOdd;
import static org.apache.datasketches.kll.KllHelper.findLevelToCompact;
import static org.apache.datasketches.kll.KllSketch.DEFAULT_M;

import java.util.Arrays;
import java.util.Comparator;
Expand All @@ -37,7 +39,21 @@
* @author Lee Rhodes
*/
@SuppressWarnings("unchecked")
final class KllItemsHelper<T> {
final class KllItemsHelper {

/**
* Create Items Array from given item and weight.
* Used with weighted update only.
* @param item the given item
* @param weight the given weight
* @return the Items Array.
*/
static <T> T[] createItemsArray(final Class<T> clazz, final T item, final int weight) {
final int itemsArrLen = Integer.bitCount(weight);
final T[] itemsArr = (T[])newInstance(clazz, itemsArrLen);
Arrays.fill(itemsArr, item);
return itemsArr;
}

/**
* The following code is only valid in the special case of exactly reaching capacity while updating.
Expand Down Expand Up @@ -126,12 +142,12 @@ static <T> void mergeItemImpl(final KllItemsSketch<T> mySketch,

//MERGE: update this sketch with level0 items from the other sketch
if (otherItmSk.isCompactSingleItem()) {
updateItem(mySketch, otherItmSk.getSingleItem(), comp);
updateItem(mySketch, otherItmSk.getSingleItem());
otherItemsArr = new Object[0];
} else {
otherItemsArr = otherItmSk.getTotalItemsArray();
for (int i = otherLevelsArr[0]; i < otherLevelsArr[1]; i++) {
updateItem(mySketch, otherItemsArr[i], comp);
updateItem(mySketch, otherItemsArr[i]);
}
}

Expand All @@ -150,12 +166,13 @@ static <T> void mergeItemImpl(final KllItemsSketch<T> mySketch,
final int tmpSpaceNeeded = mySketch.getNumRetained()
+ KllHelper.getNumRetainedAboveLevelZero(otherNumLevels, otherLevelsArr);
final Object[] workbuf = new Object[tmpSpaceNeeded];
final int ub = KllHelper.ubOnNumLevels(finalN);
final int[] worklevels = new int[ub + 2]; // ub+1 does not work
final int[] outlevels = new int[ub + 2];

final int provisionalNumLevels = max(myCurNumLevels, otherNumLevels);

final int ub = max(KllHelper.ubOnNumLevels(finalN), provisionalNumLevels);
final int[] worklevels = new int[ub + 2]; // ub+1 does not work
final int[] outlevels = new int[ub + 2];

populateItemWorkArrays(workbuf, worklevels, provisionalNumLevels,
myCurNumLevels, myCurLevelsArr, myCurItemsArr,
otherNumLevels, otherLevelsArr, otherItemsArr, comp);
Expand Down Expand Up @@ -195,7 +212,11 @@ static <T> void mergeItemImpl(final KllItemsSketch<T> mySketch,

//MEMORY SPACE MANAGEMENT
//not used
}
//extra spaces to make comparison with other helpers easier
//
//
//
} //end of updating levels above level 0

//Update Preamble:
mySketch.setN(finalN);
Expand All @@ -221,7 +242,7 @@ static <T> void mergeItemImpl(final KllItemsSketch<T> mySketch,
assert KllHelper.sumTheSampleWeights(mySketch.getNumLevels(), mySketch.levelsArr) == mySketch.getN();
}

private static <T> void mergeSortedItemsArrays(
private static <T> void mergeSortedItemsArrays( //only bufC is modified
final Object[] bufA, final int startA, final int lenA,
final Object[] bufB, final int startB, final int lenB,
final Object[] bufC, final int startC, final Comparator<? super T> comp) {
Expand Down Expand Up @@ -295,30 +316,34 @@ private static void randomlyHalveUpItems(final Object[] buf, final int start, fi
}

//Called from KllItemsSketch::update and this
static <T> void updateItem(final KllItemsSketch<T> itmSk,
final Object item, final Comparator<? super T> comp) {
if (item == null) { return; } //ignore
if (itmSk.isEmpty()) {
itmSk.setMinItem(item);
itmSk.setMaxItem(item);
} else {
itmSk.setMinItem(Util.minT(itmSk.getMinItem(), item, comp));
itmSk.setMaxItem(Util.maxT(itmSk.getMaxItem(), item, comp));
}
int level0space = itmSk.levelsArr[0];
assert level0space >= 0;
if (level0space == 0) {
static <T> void updateItem(final KllItemsSketch<T> itmSk, final Object item) {
itmSk.updateMinMax((T)item);
int freeSpace = itmSk.levelsArr[0];
assert freeSpace >= 0;
if (freeSpace == 0) {
compressWhileUpdatingSketch(itmSk);
level0space = itmSk.levelsArr[0];
assert (level0space > 0);
freeSpace = itmSk.levelsArr[0];
assert (freeSpace > 0);
}
itmSk.incN();
itmSk.setLevelZeroSorted(false);
final int nextPos = level0space - 1;
final int nextPos = freeSpace - 1;
itmSk.setLevelsArrayAt(0, nextPos);
itmSk.setItemsArrayAt(nextPos, item);
}

//Called from KllItemsSketch::update with weight
static <T> void updateItem(final KllItemsSketch<T> itmSk, final T item, final int weight) {
if (weight < itmSk.levelsArr[0]) {
for (int i = 0; i < weight; i++) { updateItem(itmSk, item); }
} else {
itmSk.updateMinMax(item);
final KllHeapItemsSketch<T> tmpSk =
new KllHeapItemsSketch<>(itmSk.getK(), DEFAULT_M, item, weight, itmSk.comparator, itmSk.serDe);
itmSk.merge(tmpSk);
}
}

/**
* Compression algorithm used to merge higher levels.
* <p>Here is what we do for each level:</p>
Expand Down Expand Up @@ -448,7 +473,8 @@ private static <T> void populateItemWorkArrays(
final Comparator<? super T> comp) {
worklevels[0] = 0;

// Note: the level zero data from "other" was already inserted into "self"
// Note: the level zero data from "other" was already inserted into "self".
// This copies into workbuf.
final int selfPopZero = KllHelper.currentLevelSizeItems(0, myCurNumLevels, myCurLevelsArr);
System.arraycopy( myCurItemsArr, myCurLevelsArr[0], workbuf, worklevels[0], selfPopZero);
worklevels[1] = worklevels[0] + selfPopZero;
Expand All @@ -457,12 +483,15 @@ private static <T> void populateItemWorkArrays(
final int selfPop = KllHelper.currentLevelSizeItems(lvl, myCurNumLevels, myCurLevelsArr);
final int otherPop = KllHelper.currentLevelSizeItems(lvl, otherNumLevels, otherLevelsArr);
worklevels[lvl + 1] = worklevels[lvl] + selfPop + otherPop;

if (selfPop > 0 && otherPop == 0) {
assert selfPop >= 0 && otherPop >= 0;
if (selfPop == 0 && otherPop == 0) { continue; }
else if (selfPop > 0 && otherPop == 0) {
System.arraycopy(myCurItemsArr, myCurLevelsArr[lvl], workbuf, worklevels[lvl], selfPop);
} else if (selfPop == 0 && otherPop > 0) {
}
else if (selfPop == 0 && otherPop > 0) {
System.arraycopy(otherItemsArr, otherLevelsArr[lvl], workbuf, worklevels[lvl], otherPop);
} else if (selfPop > 0 && otherPop > 0) {
}
else if (selfPop > 0 && otherPop > 0) {
mergeSortedItemsArrays(
myCurItemsArr, myCurLevelsArr[lvl], selfPop,
otherItemsArr, otherLevelsArr[lvl], otherPop,
Expand All @@ -486,4 +515,3 @@ private static <T> void populateItemWorkArrays(
// }

}

Loading
Loading