Skip to content

Commit

Permalink
fix :sdes to handle streaming updates
Browse files Browse the repository at this point in the history
Refactors the `:sdes` operator to properly handle evaluation
with incremental updates. Before the `pos` was never getting
updated, so the first value would get set to `NaN` and all others
would not be modified.

As part of this change the code was refactored a bit so that
the `OnlineSlidingDes` instance is now part of the state object.
This means that the state passed in to eval will get modified.
For now that is ok, but the plan is to clean it up in future commits
as part of making the state serializable.
  • Loading branch information
skandragon authored and brharrington committed Jul 5, 2016
1 parent ed361fb commit 7e8db84
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ object StatefulExpr {
trainingSize: Int,
alpha: Double,
beta: Double) extends StatefulExpr {
import com.netflix.atlas.core.model.StatefulExpr.Des._
import com.netflix.atlas.core.model.StatefulExpr.SlidingDes._

def dataExprs: List[DataExpr] = expr.dataExprs
override def toString: String = s"$expr,$trainingSize,$alpha,$beta,:sdes"
Expand All @@ -152,27 +152,23 @@ object StatefulExpr {

def groupByKey(tags: Map[String, String]): Option[String] = expr.groupByKey(tags)

private def eval(ts: ArrayTimeSeq, s: State, skip: Int): State = {
val desF = new OnlineSlidingDes(trainingSize, alpha, beta)
desF.reset()
desF.currentSample = s.currentSample

private def eval(ts: ArrayTimeSeq, s: State): State = {
var desF : OnlineSlidingDes = s.desF
var skipUpTo = s.skipUpTo
val data = ts.data
var pos = s.pos
var pos = 0
while (pos < data.length) {
if (pos < skip) {
if (ts.start + pos * ts.step < skipUpTo) {
data(pos) = Double.NaN
} else {
val yn = data(pos)
data(pos) = desF.next(yn)
}
pos += 1
}
State(pos, desF.currentSample, 0.0, 0.0)
State(skipUpTo, desF)
}

private def newState: State = State(0, 0, 0.0, 0.0)

private def getAlignedStartTime(context: EvalContext): Long = {
val trainingStep = context.step * trainingSize
if (context.start % trainingStep == 0)
Expand All @@ -182,21 +178,25 @@ object StatefulExpr {
}

def eval(context: EvalContext, data: Map[DataExpr, List[TimeSeries]]): ResultSet = {
val alignedSkip = (getAlignedStartTime(context) - context.start) / context.step
val rs = expr.eval(context, data)
val state = rs.state.getOrElse(this, new StateMap).asInstanceOf[StateMap]
val newData = rs.data.map { t =>
val bounded = t.data.bounded(context.start, context.end)
val s = state.getOrElse(t.id, newState)
state(t.id) = eval(bounded, s, alignedSkip.toInt)
val s = state.getOrElse(t.id, {
val alignedStart = getAlignedStartTime(context)
val desF = new OnlineSlidingDes(trainingSize, alpha, beta)
desF.reset()
State(alignedStart, desF)
})
state(t.id) = eval(bounded, s)
TimeSeries(t.tags, s"sdes(${t.label})", bounded)
}
ResultSet(this, newData, rs.state + (this -> state))
}
}

object SlidingDes {
case class State(pos: Int, currentSample: Int, sp: Double, bp: Double)
case class State(skipUpTo: Long, desF: OnlineSlidingDes)

type StateMap = scala.collection.mutable.AnyRefMap[BigInteger, State]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class DesSuite extends FunSuite {

val step = 60000L
val dataTags = Map("name" -> "cpu", "node" -> "i-1")
val stream = List(

val alignedStream = List(
List(Datapoint(dataTags, 0L * step, 1.0)),
List(Datapoint(dataTags, 1L * step, 1.5)),
List(Datapoint(dataTags, 2L * step, 1.6)),
Expand All @@ -38,10 +39,27 @@ class DesSuite extends FunSuite {
List(Datapoint(dataTags, 12L * step, 1.2)),
List(Datapoint(dataTags, 13L * step, 1.2))
)

val inputTS = TimeSeries(dataTags, new ArrayTimeSeq(DsType.Gauge, 0L, step,
val alignedInputTS = TimeSeries(dataTags, new ArrayTimeSeq(DsType.Gauge, 0L, step,
Array[Double](1.0, 1.5, 1.6, 1.7, 1.4, 1.3, 1.2, 1.0, 0.0, 0.0, 1.0, 1.1, 1.2, 1.2)))

val unalignedStream = List(
List(Datapoint(dataTags, 1L * step, 1.5)),
List(Datapoint(dataTags, 2L * step, 1.6)),
List(Datapoint(dataTags, 3L * step, 1.7)),
List(Datapoint(dataTags, 4L * step, 1.4)),
List(Datapoint(dataTags, 5L * step, 1.3)),
List(Datapoint(dataTags, 6L * step, 1.2)),
List(Datapoint(dataTags, 7L * step, 1.0)),
List(Datapoint(dataTags, 8L * step, 0.0)),
List(Datapoint(dataTags, 9L * step, 0.0)),
List(Datapoint(dataTags, 10L * step, 1.0)),
List(Datapoint(dataTags, 11L * step, 1.1)),
List(Datapoint(dataTags, 12L * step, 1.2)),
List(Datapoint(dataTags, 13L * step, 1.2))
)
val unalignedInputTS = TimeSeries(dataTags, new ArrayTimeSeq(DsType.Gauge, 1L * step, step,
Array[Double](1.5, 1.6, 1.7, 1.4, 1.3, 1.2, 1.0, 0.0, 0.0, 1.0, 1.1, 1.2, 1.2)))

val des = StatefulExpr.Des(DataExpr.Sum(Query.Equal("name" , "cpu")), 2, 0.1, 0.02)
val sdes = StatefulExpr.SlidingDes(DataExpr.Sum(Query.Equal("name" , "cpu")), 2, 0.1, 0.02)

Expand All @@ -60,9 +78,9 @@ class DesSuite extends FunSuite {
val s = 0L
val e = 14L * step
val context = EvalContext(s, e, step, Map.empty)
val expected = des.eval(context, List(inputTS)).data.head.data.bounded(s, e).data
val expected = des.eval(context, List(alignedInputTS)).data.head.data.bounded(s, e).data

val result = eval(des, stream)
val result = eval(des, alignedStream)
result.zip(expected).zipWithIndex.foreach { case ((ts, v), i) =>
assert(ts.size === 1)
ts.foreach { t =>
Expand All @@ -75,13 +93,13 @@ class DesSuite extends FunSuite {
}
}

ignore("sdes: incremental exec matches global") {
test("sdes: aligned incremental exec matches global") {
val s = 0L
val e = 14L * step
val context = EvalContext(s, e, step, Map.empty)
val expected = sdes.eval(context, List(inputTS)).data.head.data.bounded(s, e).data
val expected = sdes.eval(context, List(alignedInputTS)).data.head.data.bounded(s, e).data

val result = eval(sdes, stream)
val result = eval(sdes, alignedStream)
result.zip(expected).zipWithIndex.foreach { case ((ts, v), i) =>
assert(ts.size === 1)
ts.foreach { t =>
Expand All @@ -93,4 +111,25 @@ class DesSuite extends FunSuite {
}
}
}

test("sdes: unaligned incremental exec matches global") {
val s = 1L * step // offset by one step, half a training window used
val e = 14L * step
val context = EvalContext(s, e, step, Map.empty)
val expected = sdes.eval(context, List(unalignedInputTS)).data.head.data.bounded(s, e).data

val result = eval(sdes, unalignedStream)
//println(expected.mkString(", "))
//println(result.map { case v => v(0).data.asInstanceOf[ArrayTimeSeq].data(0) }.mkString(", "))
result.zip(expected).zipWithIndex.foreach { case ((ts, v), i) =>
assert(ts.size === 1)
ts.foreach { t =>
val r = t.data((i + 1) * step) // offset step by our skipped data
if (i <= 2)
assert(r.isNaN)
else
assert(v === r +- 0.00001)
}
}
}
}

0 comments on commit 7e8db84

Please sign in to comment.