Skip to content

Commit

Permalink
lwc-events: improve data point conversion
Browse files Browse the repository at this point in the history
Updates to support common statistics for timer and dist
summary. Aggregates the values locally to reduce number
of messages.
  • Loading branch information
brharrington committed Mar 27, 2024
1 parent 3501fc8 commit 9d162d5
Show file tree
Hide file tree
Showing 6 changed files with 601 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,24 @@ import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.EventExpr
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.TraceQuery
import com.netflix.atlas.core.util.SortedTagMap
import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.NoopRegistry
import com.netflix.spectator.atlas.impl.QueryIndex

abstract class AbstractLwcEventClient extends LwcEventClient {
import java.util.concurrent.atomic.AtomicLong

abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {

import AbstractLwcEventClient.*

@volatile private var handlers: List[EventHandler] = _

@volatile private var index: QueryIndex[EventHandler] = QueryIndex.newInstance(new NoopRegistry)

@volatile private var traceHandlers: Map[Subscription, TraceQuery.SpanFilter] = Map.empty

protected def sync(subscriptions: Subscriptions): Unit = {
val flushableHandlers = List.newBuilder[EventHandler]
val idx = QueryIndex.newInstance[EventHandler](new NoopRegistry)

// Pass-through events
Expand All @@ -48,20 +53,18 @@ abstract class AbstractLwcEventClient extends LwcEventClient {
// Analytics based on events
subscriptions.analytics.foreach { sub =>
val expr = ExprUtils.parseDataExpr(sub.expression)
val converter = DatapointConverter(sub.id, expr, clock, sub.step, submit)
val q = removeValueClause(expr.query)
val queryTags = Query.tags(expr.query)
val handler = EventHandler(
sub,
event => {
val tags = groupTags(expr, event)
tags.fold(List.empty[LwcEvent]) { ts =>
val tags = ts ++ queryTags
val v = dataValue(tags, event)
List(DatapointEvent(sub.id, SortedTagMap(tags), event.timestamp, v))
}
}
converter.update(event)
Nil
},
Some(converter)
)
idx.add(q, handler)
flushableHandlers += handler
}

// Trace pass-through
Expand All @@ -70,6 +73,7 @@ abstract class AbstractLwcEventClient extends LwcEventClient {
}.toMap

index = idx
handlers = flushableHandlers.result()
}

private def removeValueClause(query: Query): SpectatorQuery = {
Expand All @@ -81,39 +85,13 @@ abstract class AbstractLwcEventClient extends LwcEventClient {
ExprUtils.toSpectatorQuery(Query.simplify(q, ignore = true))
}

private def groupTags(expr: DataExpr, event: LwcEvent): Option[Map[String, String]] = {
val tags = Map.newBuilder[String, String]
val it = expr.finalGrouping.iterator
while (it.hasNext) {
val k = it.next()
val v = event.tagValue(k)
if (v == null)
return None
else
tags += k -> v
}
Some(tags.result())
}

private def dataValue(tags: Map[String, String], event: LwcEvent): Double = {
tags.get("value").fold(1.0) { v =>
event.extractValue(v) match {
case b: Boolean if b => 1.0
case _: Boolean => 0.0
case n: Byte => n.toDouble
case n: Short => n.toDouble
case n: Int => n.toDouble
case n: Long => n.toDouble
case n: Float => n.toDouble
case n: Double => n
case n: Number => n.doubleValue()
case _ => 1.0
}
}
}

override def process(event: LwcEvent): Unit = {
index.forEachMatch(k => event.tagValue(k), h => handleMatch(event, h))
event match {
case LwcEvent.HeartbeatLwcEvent(timestamp) =>
handlers.foreach(_.flush(timestamp))
case _ =>
index.forEachMatch(k => event.tagValue(k), h => handleMatch(event, h))
}
}

private def handleMatch(event: LwcEvent, handler: EventHandler): Unit = {
Expand All @@ -136,5 +114,20 @@ abstract class AbstractLwcEventClient extends LwcEventClient {

object AbstractLwcEventClient {

private case class EventHandler(subscription: Subscription, mapper: LwcEvent => List[LwcEvent])
private case class EventHandler(
subscription: Subscription,
mapper: LwcEvent => List[LwcEvent],
converter: Option[DatapointConverter] = None
) {

private val lastFlushTimestamp = new AtomicLong(0L)

def flush(timestamp: Long): Unit = {
val stepTime = timestamp / subscription.step
if (stepTime > lastFlushTimestamp.get()) {
converter.foreach(_.flush(timestamp))
lastFlushTimestamp.set(stepTime)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.lwc.events

import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.Query
import com.netflix.spectator.api.Clock
import com.netflix.spectator.impl.AtomicDouble
import com.netflix.spectator.impl.StepDouble

import java.util.concurrent.ConcurrentHashMap

/**
* Helper to convert a sequence of events into a data point.
*/
private[events] trait DatapointConverter {

def update(event: LwcEvent): Unit

def flush(timestamp: Long): Unit
}

private[events] object DatapointConverter {

def apply(
id: String,
expr: DataExpr,
clock: Clock,
step: Long,
consumer: (String, LwcEvent) => Unit
): DatapointConverter = {
val tags = Query.tags(expr.query)
val mapper = createValueMapper(tags, expr.finalGrouping)
val params = Params(id, Query.tags(expr.query), clock, step, mapper, consumer)
toConverter(expr, params)
}

private def toConverter(expr: DataExpr, params: Params): DatapointConverter = {
expr match {
case _: DataExpr.Sum => Sum(params)
case _: DataExpr.Count => Count(params)
case _: DataExpr.Max => Max(params)
case _: DataExpr.Min => Min(params)
case by: DataExpr.GroupBy => GroupBy(by, params)
case _ => Sum(params)
}
}

/**
* Extract value and map as needed based on the type. Uses statistic and grouping to
* coerce events so they structurally work like spectator composite types.
*/
def createValueMapper(tags: Map[String, String], grouping: List[String]): LwcEvent => Double = {
tags.get("value") match {
case Some(k) =>
tags.get("statistic") match {
case Some("count") => _ => 1.0
case Some("totalOfSquares") => event => squared(event.extractValue(k))
case _ => event => toDouble(event.extractValue(k))
}
case None =>
_ => 1.0
}
}

private def squared(value: Any): Double = {
val v = toDouble(value)
v * v
}

def toDouble(value: Any): Double = {
value match {
case v: Boolean => if (v) 1.0 else 0.0
case v: Byte => v.toDouble
case v: Short => v.toDouble
case v: Int => v.toDouble
case v: Long => v.toDouble
case v: Float => v.toDouble
case v: Double => v
case v: Number => v.doubleValue()
case v: String => parseDouble(v)
case _ => 1.0
}
}

private def parseDouble(v: String): Double = {
try {
java.lang.Double.parseDouble(v)
} catch {
case _: Exception => Double.NaN
}
}

case class Params(
id: String,
tags: Map[String, String],
clock: Clock,
step: Long,
valueMapper: LwcEvent => Double,
consumer: (String, LwcEvent) => Unit
) {

val buffer = new StepDouble(0.0, clock, step)
}

/** Compute sum for a counter as a rate per second. */
case class Sum(params: Params) extends DatapointConverter {

override def update(event: LwcEvent): Unit = {
val value = params.valueMapper(event)
if (value.isFinite && value >= 0.0) {
params.buffer.getCurrent.addAndGet(value)
}
}

override def flush(timestamp: Long): Unit = {
val value = params.buffer.pollAsRate(timestamp)
if (value.isFinite) {
val ts = timestamp / params.step * params.step
val event = DatapointEvent(params.id, params.tags, ts, value)
params.consumer(params.id, event)
}
}
}

/** Compute count of contributing events. */
case class Count(params: Params) extends DatapointConverter {

override def update(event: LwcEvent): Unit = {
params.buffer.getCurrent.addAndGet(1.0)
}

override def flush(timestamp: Long): Unit = {
val value = params.buffer.poll(timestamp)
if (value.isFinite) {
val ts = timestamp / params.step * params.step
val event = DatapointEvent(params.id, params.tags, ts, value)
params.consumer(params.id, event)
}
}
}

/** Compute max value from contributing events. */
case class Max(params: Params) extends DatapointConverter {

override def update(event: LwcEvent): Unit = {
val value = params.valueMapper(event)
if (value.isFinite && value >= 0.0) {
params.buffer.getCurrent.max(value)
}
}

override def flush(timestamp: Long): Unit = {
val value = params.buffer.poll(timestamp)
if (value.isFinite) {
val ts = timestamp / params.step * params.step
val event = DatapointEvent(params.id, params.tags, ts, value)
params.consumer(params.id, event)
}
}
}

/** Compute min value from contributing events. */
case class Min(params: Params) extends DatapointConverter {

override def update(event: LwcEvent): Unit = {
val value = params.valueMapper(event)
if (value.isFinite && value >= 0.0) {
min(params.buffer.getCurrent, value)
}
}

private def min(current: AtomicDouble, value: Double): Unit = {
if (value.isFinite) {
var min = current.get()
while (isLessThan(value, min) && !current.compareAndSet(min, value)) {
min = current.get()
}
}
}

private def isLessThan(v1: Double, v2: Double): Boolean = {
v1 < v2 || v2.isNaN
}

override def flush(timestamp: Long): Unit = {
val value = params.buffer.poll(timestamp)
if (value.isFinite) {
val ts = timestamp / params.step * params.step
val event = DatapointEvent(params.id, params.tags, ts, value)
params.consumer(params.id, event)
}
}
}

/** Compute set of data points, one for each distinct group. */
case class GroupBy(by: DataExpr.GroupBy, params: Params) extends DatapointConverter {

private val groups = new ConcurrentHashMap[Map[String, String], DatapointConverter]()

override def update(event: LwcEvent): Unit = {
// Ignore events that are missing dimensions used in the grouping
val values = by.keys.map(event.tagValue).filterNot(_ == null)
if (by.keys.size == values.size) {
val value = params.valueMapper(event)
if (value.isFinite) {
val tags = by.keys.zip(values).toMap
val converter = groups.computeIfAbsent(tags, groupConverter)
converter.update(event)
}
}
}

private def groupConverter(tags: Map[String, String]): DatapointConverter = {
val ps = params.copy(consumer = (id, event) => groupConsumer(tags, id, event))
toConverter(by.af, ps)
}

private def groupConsumer(tags: Map[String, String], id: String, event: LwcEvent): Unit = {
event match {
case dp: DatapointEvent => params.consumer(id, dp.copy(tags = dp.tags ++ tags))
case ev => params.consumer(id, ev)
}
}

override def flush(timestamp: Long): Unit = {
groups.values().forEach(_.flush(timestamp))
}
}
}
Loading

0 comments on commit 9d162d5

Please sign in to comment.