From fc80466dd77d33a2387b5a35e4951c056265da38 Mon Sep 17 00:00:00 2001 From: brharrington Date: Wed, 27 Mar 2024 08:14:30 -0500 Subject: [PATCH] lwc-events: improve data point conversion (#1631) Updates to support common statistics for timer and dist summary. Aggregates the values locally to reduce number of messages. --- .../lwc/events/AbstractLwcEventClient.scala | 79 +++--- .../atlas/lwc/events/DatapointConverter.scala | 243 +++++++++++++++++ .../netflix/atlas/lwc/events/LwcEvent.scala | 25 ++ .../atlas/lwc/events/LwcEventClient.scala | 18 +- .../lwc/events/DatapointConverterSuite.scala | 255 ++++++++++++++++++ .../lwc/events/LwcEventClientSuite.scala | 43 +-- 6 files changed, 601 insertions(+), 62 deletions(-) create mode 100644 atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala create mode 100644 atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala index 6e0f59a4d..d7d18de8b 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/AbstractLwcEventClient.scala @@ -19,19 +19,24 @@ import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.core.model.EventExpr import com.netflix.atlas.core.model.Query import com.netflix.atlas.core.model.TraceQuery -import com.netflix.atlas.core.util.SortedTagMap +import com.netflix.spectator.api.Clock import com.netflix.spectator.api.NoopRegistry import com.netflix.spectator.atlas.impl.QueryIndex -abstract class AbstractLwcEventClient extends LwcEventClient { +import java.util.concurrent.atomic.AtomicLong + +abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient { import AbstractLwcEventClient.* + @volatile private var handlers: List[EventHandler] = _ + @volatile private var index: QueryIndex[EventHandler] = QueryIndex.newInstance(new NoopRegistry) @volatile private var traceHandlers: Map[Subscription, TraceQuery.SpanFilter] = Map.empty protected def sync(subscriptions: Subscriptions): Unit = { + val flushableHandlers = List.newBuilder[EventHandler] val idx = QueryIndex.newInstance[EventHandler](new NoopRegistry) // Pass-through events @@ -48,20 +53,18 @@ abstract class AbstractLwcEventClient extends LwcEventClient { // Analytics based on events subscriptions.analytics.foreach { sub => val expr = ExprUtils.parseDataExpr(sub.expression) + val converter = DatapointConverter(sub.id, expr, clock, sub.step, submit) val q = removeValueClause(expr.query) - val queryTags = Query.tags(expr.query) val handler = EventHandler( sub, event => { - val tags = groupTags(expr, event) - tags.fold(List.empty[LwcEvent]) { ts => - val tags = ts ++ queryTags - val v = dataValue(tags, event) - List(DatapointEvent(sub.id, SortedTagMap(tags), event.timestamp, v)) - } - } + converter.update(event) + Nil + }, + Some(converter) ) idx.add(q, handler) + flushableHandlers += handler } // Trace pass-through @@ -70,6 +73,7 @@ abstract class AbstractLwcEventClient extends LwcEventClient { }.toMap index = idx + handlers = flushableHandlers.result() } private def removeValueClause(query: Query): SpectatorQuery = { @@ -81,39 +85,13 @@ abstract class AbstractLwcEventClient extends LwcEventClient { ExprUtils.toSpectatorQuery(Query.simplify(q, ignore = true)) } - private def groupTags(expr: DataExpr, event: LwcEvent): Option[Map[String, String]] = { - val tags = Map.newBuilder[String, String] - val it = expr.finalGrouping.iterator - while (it.hasNext) { - val k = it.next() - val v = event.tagValue(k) - if (v == null) - return None - else - tags += k -> v - } - Some(tags.result()) - } - - private def dataValue(tags: Map[String, String], event: LwcEvent): Double = { - tags.get("value").fold(1.0) { v => - event.extractValue(v) match { - case b: Boolean if b => 1.0 - case _: Boolean => 0.0 - case n: Byte => n.toDouble - case n: Short => n.toDouble - case n: Int => n.toDouble - case n: Long => n.toDouble - case n: Float => n.toDouble - case n: Double => n - case n: Number => n.doubleValue() - case _ => 1.0 - } - } - } - override def process(event: LwcEvent): Unit = { - index.forEachMatch(k => event.tagValue(k), h => handleMatch(event, h)) + event match { + case LwcEvent.HeartbeatLwcEvent(timestamp) => + handlers.foreach(_.flush(timestamp)) + case _ => + index.forEachMatch(k => event.tagValue(k), h => handleMatch(event, h)) + } } private def handleMatch(event: LwcEvent, handler: EventHandler): Unit = { @@ -136,5 +114,20 @@ abstract class AbstractLwcEventClient extends LwcEventClient { object AbstractLwcEventClient { - private case class EventHandler(subscription: Subscription, mapper: LwcEvent => List[LwcEvent]) + private case class EventHandler( + subscription: Subscription, + mapper: LwcEvent => List[LwcEvent], + converter: Option[DatapointConverter] = None + ) { + + private val lastFlushTimestamp = new AtomicLong(0L) + + def flush(timestamp: Long): Unit = { + val stepTime = timestamp / subscription.step + if (stepTime > lastFlushTimestamp.get()) { + converter.foreach(_.flush(timestamp)) + lastFlushTimestamp.set(stepTime) + } + } + } } diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala new file mode 100644 index 000000000..d2bff24d2 --- /dev/null +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala @@ -0,0 +1,243 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.lwc.events + +import com.netflix.atlas.core.model.DataExpr +import com.netflix.atlas.core.model.Query +import com.netflix.spectator.api.Clock +import com.netflix.spectator.impl.AtomicDouble +import com.netflix.spectator.impl.StepDouble + +import java.util.concurrent.ConcurrentHashMap + +/** + * Helper to convert a sequence of events into a data point. + */ +private[events] trait DatapointConverter { + + def update(event: LwcEvent): Unit + + def flush(timestamp: Long): Unit +} + +private[events] object DatapointConverter { + + def apply( + id: String, + expr: DataExpr, + clock: Clock, + step: Long, + consumer: (String, LwcEvent) => Unit + ): DatapointConverter = { + val tags = Query.tags(expr.query) + val mapper = createValueMapper(tags, expr.finalGrouping) + val params = Params(id, Query.tags(expr.query), clock, step, mapper, consumer) + toConverter(expr, params) + } + + private def toConverter(expr: DataExpr, params: Params): DatapointConverter = { + expr match { + case _: DataExpr.Sum => Sum(params) + case _: DataExpr.Count => Count(params) + case _: DataExpr.Max => Max(params) + case _: DataExpr.Min => Min(params) + case by: DataExpr.GroupBy => GroupBy(by, params) + case _ => Sum(params) + } + } + + /** + * Extract value and map as needed based on the type. Uses statistic and grouping to + * coerce events so they structurally work like spectator composite types. + */ + def createValueMapper(tags: Map[String, String], grouping: List[String]): LwcEvent => Double = { + tags.get("value") match { + case Some(k) => + tags.get("statistic") match { + case Some("count") => _ => 1.0 + case Some("totalOfSquares") => event => squared(event.extractValue(k)) + case _ => event => toDouble(event.extractValue(k)) + } + case None => + _ => 1.0 + } + } + + private def squared(value: Any): Double = { + val v = toDouble(value) + v * v + } + + def toDouble(value: Any): Double = { + value match { + case v: Boolean => if (v) 1.0 else 0.0 + case v: Byte => v.toDouble + case v: Short => v.toDouble + case v: Int => v.toDouble + case v: Long => v.toDouble + case v: Float => v.toDouble + case v: Double => v + case v: Number => v.doubleValue() + case v: String => parseDouble(v) + case _ => 1.0 + } + } + + private def parseDouble(v: String): Double = { + try { + java.lang.Double.parseDouble(v) + } catch { + case _: Exception => Double.NaN + } + } + + case class Params( + id: String, + tags: Map[String, String], + clock: Clock, + step: Long, + valueMapper: LwcEvent => Double, + consumer: (String, LwcEvent) => Unit + ) { + + val buffer = new StepDouble(0.0, clock, step) + } + + /** Compute sum for a counter as a rate per second. */ + case class Sum(params: Params) extends DatapointConverter { + + override def update(event: LwcEvent): Unit = { + val value = params.valueMapper(event) + if (value.isFinite && value >= 0.0) { + params.buffer.getCurrent.addAndGet(value) + } + } + + override def flush(timestamp: Long): Unit = { + val value = params.buffer.pollAsRate(timestamp) + if (value.isFinite) { + val ts = timestamp / params.step * params.step + val event = DatapointEvent(params.id, params.tags, ts, value) + params.consumer(params.id, event) + } + } + } + + /** Compute count of contributing events. */ + case class Count(params: Params) extends DatapointConverter { + + override def update(event: LwcEvent): Unit = { + params.buffer.getCurrent.addAndGet(1.0) + } + + override def flush(timestamp: Long): Unit = { + val value = params.buffer.poll(timestamp) + if (value.isFinite) { + val ts = timestamp / params.step * params.step + val event = DatapointEvent(params.id, params.tags, ts, value) + params.consumer(params.id, event) + } + } + } + + /** Compute max value from contributing events. */ + case class Max(params: Params) extends DatapointConverter { + + override def update(event: LwcEvent): Unit = { + val value = params.valueMapper(event) + if (value.isFinite && value >= 0.0) { + params.buffer.getCurrent.max(value) + } + } + + override def flush(timestamp: Long): Unit = { + val value = params.buffer.poll(timestamp) + if (value.isFinite) { + val ts = timestamp / params.step * params.step + val event = DatapointEvent(params.id, params.tags, ts, value) + params.consumer(params.id, event) + } + } + } + + /** Compute min value from contributing events. */ + case class Min(params: Params) extends DatapointConverter { + + override def update(event: LwcEvent): Unit = { + val value = params.valueMapper(event) + if (value.isFinite && value >= 0.0) { + min(params.buffer.getCurrent, value) + } + } + + private def min(current: AtomicDouble, value: Double): Unit = { + if (value.isFinite) { + var min = current.get() + while (isLessThan(value, min) && !current.compareAndSet(min, value)) { + min = current.get() + } + } + } + + private def isLessThan(v1: Double, v2: Double): Boolean = { + v1 < v2 || v2.isNaN + } + + override def flush(timestamp: Long): Unit = { + val value = params.buffer.poll(timestamp) + if (value.isFinite) { + val ts = timestamp / params.step * params.step + val event = DatapointEvent(params.id, params.tags, ts, value) + params.consumer(params.id, event) + } + } + } + + /** Compute set of data points, one for each distinct group. */ + case class GroupBy(by: DataExpr.GroupBy, params: Params) extends DatapointConverter { + + private val groups = new ConcurrentHashMap[Map[String, String], DatapointConverter]() + + override def update(event: LwcEvent): Unit = { + // Ignore events that are missing dimensions used in the grouping + val values = by.keys.map(event.tagValue).filterNot(_ == null) + if (by.keys.size == values.size) { + val value = params.valueMapper(event) + if (value.isFinite) { + val tags = by.keys.zip(values).toMap + val converter = groups.computeIfAbsent(tags, groupConverter) + converter.update(event) + } + } + } + + private def groupConverter(tags: Map[String, String]): DatapointConverter = { + val ps = params.copy(consumer = (id, event) => groupConsumer(tags, id, event)) + toConverter(by.af, ps) + } + + private def groupConsumer(tags: Map[String, String], id: String, event: LwcEvent): Unit = { + event match { + case dp: DatapointEvent => params.consumer(id, dp.copy(tags = dp.tags ++ tags)) + case ev => params.consumer(id, ev) + } + } + + override def flush(timestamp: Long): Unit = { + groups.values().forEach(_.flush(timestamp)) + } + } +} diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEvent.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEvent.scala index c7e7d9eac..3f3da1d1b 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEvent.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEvent.scala @@ -99,6 +99,14 @@ trait LwcEvent { object LwcEvent { + /** + * Wrap a Map as an LWC event. The timestamp will be the time when the event is + * wrapped. + */ + def apply(event: Map[String, Any]): LwcEvent = { + apply(event, k => event.getOrElse(k, null)) + } + /** * Wrap an object as an LWC event. The timestamp will be the time when the event is * wrapped. @@ -115,6 +123,23 @@ object LwcEvent { BasicLwcEvent(rawEvent, extractor) } + /** + * Event used to indicate a heartbeat. It just consists of a timestamp and can be used to + * ensure regular traffic. Some activity such as flushing analytic data points or cleanup + * may be triggered by the heartbeat. + */ + case class HeartbeatLwcEvent(timestamp: Long) extends LwcEvent { + + /** Raw event object that is being considered. */ + override def rawEvent: Any = timestamp + + /** + * Extract a value from the raw event for a given key. This method should be consistent + * with the `tagValue` method for keys that can be considered tags. + */ + override def extractValue(key: String): Any = null + } + private case class BasicLwcEvent(rawEvent: Any, extractor: String => Any) extends LwcEvent { override val timestamp: Long = System.currentTimeMillis() diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEventClient.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEventClient.scala index 7105a3c8e..c1a6eec51 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEventClient.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/LwcEventClient.scala @@ -16,6 +16,7 @@ package com.netflix.atlas.lwc.events import com.netflix.atlas.json.Json +import com.netflix.spectator.api.Clock import java.io.StringWriter import scala.util.Using @@ -58,15 +59,24 @@ object LwcEventClient { * Set of subscriptions to match with the events. * @param consumer * Function that will receive the output. + * @param clock + * Clock to use for timing events. * @return * Client instance. */ - def apply(subscriptions: Subscriptions, consumer: String => Unit): LwcEventClient = { - new LocalLwcEventClient(subscriptions, consumer) + def apply( + subscriptions: Subscriptions, + consumer: String => Unit, + clock: Clock = Clock.SYSTEM + ): LwcEventClient = { + new LocalLwcEventClient(subscriptions, consumer, clock) } - private class LocalLwcEventClient(subscriptions: Subscriptions, consumer: String => Unit) - extends AbstractLwcEventClient { + private class LocalLwcEventClient( + subscriptions: Subscriptions, + consumer: String => Unit, + clock: Clock + ) extends AbstractLwcEventClient(clock) { sync(subscriptions) diff --git a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala new file mode 100644 index 000000000..814787127 --- /dev/null +++ b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala @@ -0,0 +1,255 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.lwc.events + +import com.netflix.atlas.core.model.DataExpr +import com.netflix.atlas.core.model.Query +import com.netflix.spectator.api.ManualClock +import munit.FunSuite + +import java.util.concurrent.atomic.AtomicLong + +class DatapointConverterSuite extends FunSuite { + + private val clock = new ManualClock() + private val step = 5_000L + + override def beforeEach(context: BeforeEach): Unit = { + clock.setWallTime(0L) + clock.setMonotonicTime(0L) + } + + test("toDouble") { + assertEquals(DatapointConverter.toDouble(false), 0.0) + assertEquals(DatapointConverter.toDouble(true), 1.0) + assertEquals(DatapointConverter.toDouble(42), 42.0) + assertEquals(DatapointConverter.toDouble(42L), 42.0) + assertEquals(DatapointConverter.toDouble(42.5f), 42.5) + assertEquals(DatapointConverter.toDouble(42.5), 42.5) + assertEquals(DatapointConverter.toDouble(new AtomicLong(42)), 42.0) + assertEquals(DatapointConverter.toDouble("42"), 42.0) + assertEquals(DatapointConverter.toDouble("42e3"), 42e3) + assert(DatapointConverter.toDouble("foo").isNaN) + } + + test("counter - sum") { + val expr = DataExpr.Sum(Query.True) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("value" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Map.empty, step, 1.0)) + } + + test("counter - sum custom value") { + val expr = DataExpr.Sum(Query.Equal("value", "responseSize")) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("responseSize" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Map("value" -> "responseSize"), step, 2.0)) + } + + test("counter - count") { + val expr = DataExpr.Count(Query.Equal("value", "responseSize")) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("responseSize" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Map("value" -> "responseSize"), step, 5.0)) + } + + private def stat(name: String): Query = { + Query.Equal("statistic", name) + } + + test("timer - sum of totalTime") { + val expr = DataExpr.Sum(Query.Equal("value", "latency").and(stat("totalTime"))) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("latency" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 2.0)) + } + + test("timer - sum of count") { + val expr = DataExpr.Sum(Query.Equal("value", "latency").and(stat("count"))) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("latency" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 1.0)) + } + + test("timer - sum of totalOfSquares") { + val expr = DataExpr.Sum(Query.Equal("value", "latency").and(stat("totalOfSquares"))) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("latency" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 6.0)) + } + + test("timer - dist-max") { + val expr = DataExpr.Max(Query.Equal("value", "latency").and(stat("max"))) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("latency" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 4.0)) + } + + test("timer - dist-min") { + val expr = DataExpr.Min(Query.Equal("value", "latency").and(stat("max"))) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("latency" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 0.0)) + } + + test("dist - sum of totalTime") { + val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("totalTime"))) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("responseSize" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 2.0)) + } + + test("dist - sum of count") { + val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("count"))) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("responseSize" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 1.0)) + } + + test("dist - sum of totalOfSquares") { + val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("totalOfSquares"))) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("responseSize" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 6.0)) + } + + test("dist - dist-max") { + val expr = DataExpr.Max(Query.Equal("value", "responseSize").and(stat("max"))) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + val event = LwcEvent(Map("responseSize" -> i)) + converter.update(event) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head, DatapointEvent("id", Query.tags(expr.query), step, 4.0)) + } + + test("groupBy - sum") { + val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "responseSize")), List("app")) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter("id", expr, clock, step, (_, e) => events.addOne(e)) + (0 until 5).foreach { i => + converter.update(LwcEvent(Map("responseSize" -> i, "app" -> "a"))) + converter.update(LwcEvent(Map("responseSize" -> i * 2, "app" -> "b"))) + converter.update(LwcEvent(Map("responseSize" -> i * 3, "app" -> "c"))) + } + clock.setWallTime(step + 1) + converter.flush(clock.wallTime()) + val results = events.result() + assertEquals(results.size, 3) + results.foreach { event => + val dp = event.asInstanceOf[DatapointEvent] + assert(dp.tags.contains("app")) + dp.tags("app") match { + case "a" => assertEqualsDouble(dp.value, 2.0, 1e-12) + case "b" => assertEqualsDouble(dp.value, 4.0, 1e-12) + case "c" => assertEqualsDouble(dp.value, 6.0, 1e-12) + } + } + } +} diff --git a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventClientSuite.scala b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventClientSuite.scala index 80542bf8b..f039e7b3e 100644 --- a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventClientSuite.scala +++ b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/LwcEventClientSuite.scala @@ -16,12 +16,21 @@ package com.netflix.atlas.lwc.events import com.netflix.atlas.core.util.SortedTagMap +import com.netflix.spectator.api.ManualClock import munit.FunSuite class LwcEventClientSuite extends FunSuite { import LwcEventSuite.* + private val clock = new ManualClock() + private val step = 5_000L + + override def beforeEach(context: BeforeEach): Unit = { + clock.setWallTime(0L) + clock.setMonotonicTime(0L) + } + private val sampleSpan: TestEvent = { TestEvent(SortedTagMap("app" -> "www", "node" -> "i-123"), 42L) } @@ -47,48 +56,52 @@ class LwcEventClientSuite extends FunSuite { test("analytics, basic aggregate") { val subs = Subscriptions(analytics = List( - Subscription("1", 60000, "app,foo,:eq,:sum"), - Subscription("2", 60000, "app,www,:eq,:sum") + Subscription("1", step, "app,foo,:eq,:sum"), + Subscription("2", step, "app,www,:eq,:sum") ) ) val output = List.newBuilder[String] - val client = LwcEventClient(subs, output.addOne) + val client = LwcEventClient(subs, output.addOne, clock) client.process(sampleLwcEvent) + clock.setWallTime(step) + client.process(LwcEvent.HeartbeatLwcEvent(step)) val vs = output.result() - assertEquals(1, vs.size) - assert(vs.forall(_.contains(""""tags":{"app":"www"}"""))) - assert(vs.forall(_.contains(""""value":1.0"""))) + assertEquals(vs.size, 2) } test("analytics, basic aggregate extract value") { val subs = Subscriptions(analytics = List( - Subscription("1", 60000, "app,www,:eq,value,duration,:eq,:and,:sum") + Subscription("1", step, "app,www,:eq,value,duration,:eq,:and,:sum") ) ) val output = List.newBuilder[String] - val client = LwcEventClient(subs, output.addOne) + val client = LwcEventClient(subs, output.addOne, clock) client.process(sampleLwcEvent) + clock.setWallTime(step) + client.process(LwcEvent.HeartbeatLwcEvent(step)) val vs = output.result() - assertEquals(1, vs.size) + assertEquals(vs.size, 1) assert(vs.forall(_.contains(""""tags":{"app":"www","value":"duration"}"""))) - assert(vs.forall(_.contains(""""value":42.0"""))) + assert(vs.forall(_.contains(""""value":8.4"""))) } test("analytics, group by") { val subs = Subscriptions(analytics = List( - Subscription("1", 60000, "app,foo,:eq,:sum,(,node,),:by"), - Subscription("2", 60000, "app,www,:eq,:sum,(,node,),:by") + Subscription("1", step, "app,foo,:eq,:sum,(,node,),:by"), + Subscription("2", step, "app,www,:eq,:sum,(,node,),:by") ) ) val output = List.newBuilder[String] - val client = LwcEventClient(subs, output.addOne) + val client = LwcEventClient(subs, output.addOne, clock) client.process(sampleLwcEvent) + clock.setWallTime(step) + client.process(LwcEvent.HeartbeatLwcEvent(step)) val vs = output.result() - assertEquals(1, vs.size) + assertEquals(vs.size, 1) assert(vs.forall(_.contains(""""tags":{"app":"www","node":"i-123"}"""))) - assert(vs.forall(_.contains(""""value":1.0"""))) + assert(vs.forall(_.contains(""""value":0.2"""))) } test("analytics, group by missing key") {