From 7e8db84549cd3face03880bd4c638aa5f0dfee32 Mon Sep 17 00:00:00 2001 From: Michael Graff Date: Tue, 5 Jul 2016 10:30:19 -0700 Subject: [PATCH] fix :sdes to handle streaming updates Refactors the `:sdes` operator to properly handle evaluation with incremental updates. Before the `pos` was never getting updated, so the first value would get set to `NaN` and all others would not be modified. As part of this change the code was refactored a bit so that the `OnlineSlidingDes` instance is now part of the state object. This means that the state passed in to eval will get modified. For now that is ok, but the plan is to clean it up in future commits as part of making the state serializable. --- .../atlas/core/model/StatefulExpr.scala | 30 +++++----- .../netflix/atlas/core/model/DesSuite.scala | 55 ++++++++++++++++--- 2 files changed, 62 insertions(+), 23 deletions(-) diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/model/StatefulExpr.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/model/StatefulExpr.scala index 65e10cbfd..40a594dff 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/model/StatefulExpr.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/model/StatefulExpr.scala @@ -143,7 +143,7 @@ object StatefulExpr { trainingSize: Int, alpha: Double, beta: Double) extends StatefulExpr { - import com.netflix.atlas.core.model.StatefulExpr.Des._ + import com.netflix.atlas.core.model.StatefulExpr.SlidingDes._ def dataExprs: List[DataExpr] = expr.dataExprs override def toString: String = s"$expr,$trainingSize,$alpha,$beta,:sdes" @@ -152,15 +152,13 @@ object StatefulExpr { def groupByKey(tags: Map[String, String]): Option[String] = expr.groupByKey(tags) - private def eval(ts: ArrayTimeSeq, s: State, skip: Int): State = { - val desF = new OnlineSlidingDes(trainingSize, alpha, beta) - desF.reset() - desF.currentSample = s.currentSample - + private def eval(ts: ArrayTimeSeq, s: State): State = { + var desF : OnlineSlidingDes = s.desF + var skipUpTo = s.skipUpTo val data = ts.data - var pos = s.pos + var pos = 0 while (pos < data.length) { - if (pos < skip) { + if (ts.start + pos * ts.step < skipUpTo) { data(pos) = Double.NaN } else { val yn = data(pos) @@ -168,11 +166,9 @@ object StatefulExpr { } pos += 1 } - State(pos, desF.currentSample, 0.0, 0.0) + State(skipUpTo, desF) } - private def newState: State = State(0, 0, 0.0, 0.0) - private def getAlignedStartTime(context: EvalContext): Long = { val trainingStep = context.step * trainingSize if (context.start % trainingStep == 0) @@ -182,13 +178,17 @@ object StatefulExpr { } def eval(context: EvalContext, data: Map[DataExpr, List[TimeSeries]]): ResultSet = { - val alignedSkip = (getAlignedStartTime(context) - context.start) / context.step val rs = expr.eval(context, data) val state = rs.state.getOrElse(this, new StateMap).asInstanceOf[StateMap] val newData = rs.data.map { t => val bounded = t.data.bounded(context.start, context.end) - val s = state.getOrElse(t.id, newState) - state(t.id) = eval(bounded, s, alignedSkip.toInt) + val s = state.getOrElse(t.id, { + val alignedStart = getAlignedStartTime(context) + val desF = new OnlineSlidingDes(trainingSize, alpha, beta) + desF.reset() + State(alignedStart, desF) + }) + state(t.id) = eval(bounded, s) TimeSeries(t.tags, s"sdes(${t.label})", bounded) } ResultSet(this, newData, rs.state + (this -> state)) @@ -196,7 +196,7 @@ object StatefulExpr { } object SlidingDes { - case class State(pos: Int, currentSample: Int, sp: Double, bp: Double) + case class State(skipUpTo: Long, desF: OnlineSlidingDes) type StateMap = scala.collection.mutable.AnyRefMap[BigInteger, State] } diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/model/DesSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/model/DesSuite.scala index 8e9c2d6e7..0b7c25b2d 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/model/DesSuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/model/DesSuite.scala @@ -22,7 +22,8 @@ class DesSuite extends FunSuite { val step = 60000L val dataTags = Map("name" -> "cpu", "node" -> "i-1") - val stream = List( + + val alignedStream = List( List(Datapoint(dataTags, 0L * step, 1.0)), List(Datapoint(dataTags, 1L * step, 1.5)), List(Datapoint(dataTags, 2L * step, 1.6)), @@ -38,10 +39,27 @@ class DesSuite extends FunSuite { List(Datapoint(dataTags, 12L * step, 1.2)), List(Datapoint(dataTags, 13L * step, 1.2)) ) - - val inputTS = TimeSeries(dataTags, new ArrayTimeSeq(DsType.Gauge, 0L, step, + val alignedInputTS = TimeSeries(dataTags, new ArrayTimeSeq(DsType.Gauge, 0L, step, Array[Double](1.0, 1.5, 1.6, 1.7, 1.4, 1.3, 1.2, 1.0, 0.0, 0.0, 1.0, 1.1, 1.2, 1.2))) + val unalignedStream = List( + List(Datapoint(dataTags, 1L * step, 1.5)), + List(Datapoint(dataTags, 2L * step, 1.6)), + List(Datapoint(dataTags, 3L * step, 1.7)), + List(Datapoint(dataTags, 4L * step, 1.4)), + List(Datapoint(dataTags, 5L * step, 1.3)), + List(Datapoint(dataTags, 6L * step, 1.2)), + List(Datapoint(dataTags, 7L * step, 1.0)), + List(Datapoint(dataTags, 8L * step, 0.0)), + List(Datapoint(dataTags, 9L * step, 0.0)), + List(Datapoint(dataTags, 10L * step, 1.0)), + List(Datapoint(dataTags, 11L * step, 1.1)), + List(Datapoint(dataTags, 12L * step, 1.2)), + List(Datapoint(dataTags, 13L * step, 1.2)) + ) + val unalignedInputTS = TimeSeries(dataTags, new ArrayTimeSeq(DsType.Gauge, 1L * step, step, + Array[Double](1.5, 1.6, 1.7, 1.4, 1.3, 1.2, 1.0, 0.0, 0.0, 1.0, 1.1, 1.2, 1.2))) + val des = StatefulExpr.Des(DataExpr.Sum(Query.Equal("name" , "cpu")), 2, 0.1, 0.02) val sdes = StatefulExpr.SlidingDes(DataExpr.Sum(Query.Equal("name" , "cpu")), 2, 0.1, 0.02) @@ -60,9 +78,9 @@ class DesSuite extends FunSuite { val s = 0L val e = 14L * step val context = EvalContext(s, e, step, Map.empty) - val expected = des.eval(context, List(inputTS)).data.head.data.bounded(s, e).data + val expected = des.eval(context, List(alignedInputTS)).data.head.data.bounded(s, e).data - val result = eval(des, stream) + val result = eval(des, alignedStream) result.zip(expected).zipWithIndex.foreach { case ((ts, v), i) => assert(ts.size === 1) ts.foreach { t => @@ -75,13 +93,13 @@ class DesSuite extends FunSuite { } } - ignore("sdes: incremental exec matches global") { + test("sdes: aligned incremental exec matches global") { val s = 0L val e = 14L * step val context = EvalContext(s, e, step, Map.empty) - val expected = sdes.eval(context, List(inputTS)).data.head.data.bounded(s, e).data + val expected = sdes.eval(context, List(alignedInputTS)).data.head.data.bounded(s, e).data - val result = eval(sdes, stream) + val result = eval(sdes, alignedStream) result.zip(expected).zipWithIndex.foreach { case ((ts, v), i) => assert(ts.size === 1) ts.foreach { t => @@ -93,4 +111,25 @@ class DesSuite extends FunSuite { } } } + + test("sdes: unaligned incremental exec matches global") { + val s = 1L * step // offset by one step, half a training window used + val e = 14L * step + val context = EvalContext(s, e, step, Map.empty) + val expected = sdes.eval(context, List(unalignedInputTS)).data.head.data.bounded(s, e).data + + val result = eval(sdes, unalignedStream) + //println(expected.mkString(", ")) + //println(result.map { case v => v(0).data.asInstanceOf[ArrayTimeSeq].data(0) }.mkString(", ")) + result.zip(expected).zipWithIndex.foreach { case ((ts, v), i) => + assert(ts.size === 1) + ts.foreach { t => + val r = t.data((i + 1) * step) // offset step by our skipped data + if (i <= 2) + assert(r.isNaN) + else + assert(v === r +- 0.00001) + } + } + } }