diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/DatapointsTuple.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/DatapointsTuple.scala new file mode 100644 index 000000000..d4b3fe1e2 --- /dev/null +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/DatapointsTuple.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.eval.model + +import com.netflix.atlas.eval.stream.Evaluator + +/** + * Pairs set of data points with other arbitrary messages to pass through to the + * consumer. + */ +case class DatapointsTuple( + data: List[AggrDatapoint], + messages: List[Evaluator.MessageEnvelope] = Nil +) { + + def step: Long = { + if (data.nonEmpty) data.head.step else 0L + } + + def groupByStep: List[DatapointsTuple] = { + val dps = data.groupBy(_.step).map(t => DatapointsTuple(t._2)).toList + if (messages.nonEmpty) + DatapointsTuple(Nil, messages) :: dps + else + dps + } +} diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/ExprType.java b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/ExprType.java index b56290ace..81520c43c 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/ExprType.java +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/ExprType.java @@ -19,17 +19,31 @@ public enum ExprType { /** Expression to select a set of events to be passed through. */ - EVENTS, + EVENTS(false), /** * Time series expression such as used with Atlas Graph API. Can also be used for analytics * queries on top of event data. */ - TIME_SERIES, + TIME_SERIES(true), /** Expression to select a set of traces to be passed through. */ - TRACE_EVENTS, + TRACE_EVENTS(false), /** Time series expression based on data extraced from traces. */ - TRACE_TIME_SERIES + TRACE_TIME_SERIES(true); + + private final boolean timeSeries; + + ExprType(boolean timeSeries) { + this.timeSeries = timeSeries; + } + + public boolean isEventType() { + return !timeSeries; + } + + public boolean isTimeSeriesType() { + return timeSeries; + } } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeGroupsTuple.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeGroupsTuple.scala new file mode 100644 index 000000000..1217208a4 --- /dev/null +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeGroupsTuple.scala @@ -0,0 +1,40 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.eval.model + +import com.netflix.atlas.eval.stream.Evaluator + +/** + * Pairs set of time groups with other arbitrary messages to pass through to the + * consumer. + */ +case class TimeGroupsTuple( + groups: List[TimeGroup], + messages: List[Evaluator.MessageEnvelope] = Nil +) { + + def step: Long = { + if (groups.nonEmpty) groups.head.step else 0L + } + + def groupByStep: List[TimeGroupsTuple] = { + val gps = groups.groupBy(_.step).map(t => TimeGroupsTuple(t._2)).toList + if (messages.nonEmpty) + TimeGroupsTuple(Nil, messages) :: gps + else + gps + } +} diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala index 76f11da3f..7a922334a 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala @@ -51,6 +51,7 @@ import com.netflix.atlas.eval.model.ExprType import com.netflix.atlas.eval.model.LwcExpression import com.netflix.atlas.eval.model.LwcMessages import com.netflix.atlas.eval.model.TimeGroup +import com.netflix.atlas.eval.model.TimeGroupsTuple import com.netflix.atlas.eval.stream.EurekaSource.Instance import com.netflix.atlas.eval.stream.Evaluator.DataSource import com.netflix.atlas.eval.stream.Evaluator.DataSources @@ -244,12 +245,11 @@ private[stream] abstract class EvaluatorImpl( val intermediateEval = createInputFlow(context) .via(context.monitorFlow("10_InputBatches")) .via(new LwcToAggrDatapoint(context)) - .flatMapConcat { vs => - Source(vs.groupBy(_.step).map(_._2.toList)) + .flatMapConcat { t => + Source(t.groupByStep) } - .groupBy(Int.MaxValue, _.head.step, allowClosedSubstreamRecreation = true) + .groupBy(Int.MaxValue, _.step, allowClosedSubstreamRecreation = true) .via(new TimeGrouped(context)) - .flatMapConcat(Source.apply) .mergeSubstreams .via(context.monitorFlow("11_GroupedDatapoints")) @@ -362,7 +362,7 @@ private[stream] abstract class EvaluatorImpl( aggregator match { case Some(aggr) if aggr.limitExceeded => - context.logDatapointsExceeded(group.timestamp, t._1) + context.logDatapointsExceeded(group.timestamp, t._1.toString) AggrValuesInfo(Nil, t._2.size) case Some(aggr) => AggrValuesInfo(aggr.datapoints, t._2.size) @@ -389,9 +389,9 @@ private[stream] abstract class EvaluatorImpl( * the objects so the FinalExprEval stage will only see a single step. */ private def stepSize: PartialFunction[AnyRef, Long] = { - case ds: DataSources => ds.stepSize() - case grp: TimeGroup => grp.step - case v => throw new IllegalArgumentException(s"unexpected value in stream: $v") + case ds: DataSources => ds.stepSize() + case t: TimeGroupsTuple => t.step + case v => throw new IllegalArgumentException(s"unexpected value in stream: $v") } /** @@ -408,6 +408,8 @@ private[stream] abstract class EvaluatorImpl( new DataSources(sources.asJava) } .toList + case t: TimeGroupsTuple => + t.groupByStep case _ => List(value) } @@ -506,8 +508,10 @@ private[stream] abstract class EvaluatorImpl( private def toExprSet(dss: DataSources, interpreter: ExprInterpreter): Set[LwcExpression] = { dss.sources.asScala.flatMap { dataSource => - interpreter.eval(Uri(dataSource.uri)).exprs.map { expr => - LwcExpression(expr.toString, ExprType.TIME_SERIES, dataSource.step.toMillis) + val (exprType, exprs) = interpreter.parseQuery(Uri(dataSource.uri)) + exprs.map { expr => + val step = if (exprType.isTimeSeriesType) dataSource.step.toMillis else 0L + LwcExpression(expr.toString, exprType, step) } }.toSet } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala index bee142aad..d87cc391c 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/ExprInterpreter.scala @@ -15,13 +15,21 @@ */ package com.netflix.atlas.eval.stream +import com.netflix.atlas.core.model.CustomVocabulary import org.apache.pekko.http.scaladsl.model.Uri import com.netflix.atlas.core.model.DataExpr +import com.netflix.atlas.core.model.EventExpr +import com.netflix.atlas.core.model.EventVocabulary import com.netflix.atlas.core.model.Expr import com.netflix.atlas.core.model.FilterExpr +import com.netflix.atlas.core.model.ModelExtractors import com.netflix.atlas.core.model.StatefulExpr +import com.netflix.atlas.core.model.TraceQuery +import com.netflix.atlas.core.model.TraceVocabulary +import com.netflix.atlas.core.stacklang.Interpreter import com.netflix.atlas.eval.graph.GraphConfig import com.netflix.atlas.eval.graph.Grapher +import com.netflix.atlas.eval.model.ExprType import com.netflix.atlas.eval.stream.Evaluator.DataSource import com.netflix.atlas.eval.stream.Evaluator.DataSources import com.netflix.atlas.eval.util.HostRewriter @@ -31,6 +39,16 @@ import scala.util.Success private[stream] class ExprInterpreter(config: Config) { + import ExprInterpreter.* + + private val eventInterpreter = Interpreter( + new CustomVocabulary(config, List(EventVocabulary)).allWords + ) + + private val traceInterpreter = Interpreter( + new CustomVocabulary(config, List(TraceVocabulary)).allWords + ) + private val grapher = Grapher(config) private val hostRewriter = new HostRewriter(config.getConfig("atlas.eval.host-rewrite")) @@ -72,16 +90,103 @@ private[stream] class ExprInterpreter(config: Config) { ) } - def dataExprMap(ds: DataSources): Map[DataExpr, List[DataSource]] = { + def dataExprMap(ds: DataSources): Map[String, List[DataSource]] = { import scala.jdk.CollectionConverters.* ds.sources.asScala.toList .flatMap { s => - val exprs = eval(Uri(s.uri)).exprs.flatMap(_.expr.dataExprs).distinct - exprs.map(_ -> s) + dataExprs(Uri(s.uri)).map(_ -> s) } .groupBy(_._1) .map { case (expr, vs) => expr -> vs.map(_._2) } } + + private def invalidValue(value: Any): IllegalArgumentException = { + new IllegalArgumentException(s"invalid value on stack; $value") + } + + private def evalEvents(uri: Uri): List[EventExpr] = { + uri.query().get("q") match { + case Some(query) => + eventInterpreter.execute(query).stack.map { + case ModelExtractors.EventExprType(t) => t + case value => throw invalidValue(value) + } + case None => + throw new IllegalArgumentException(s"missing required parameter: q ($uri)") + } + } + + private def evalTraceEvents(uri: Uri): List[TraceQuery.SpanFilter] = { + uri.query().get("q") match { + case Some(query) => + traceInterpreter.execute(query).stack.map { + case ModelExtractors.TraceFilterType(t) => t + case value => throw invalidValue(value) + } + case None => + throw new IllegalArgumentException(s"missing required parameter: q ($uri)") + } + } + + private def evalTraceTimeSeries(uri: Uri): List[TraceQuery.SpanTimeSeries] = { + uri.query().get("q") match { + case Some(query) => + traceInterpreter.execute(query).stack.map { + case ModelExtractors.TraceTimeSeriesType(t) => t + case value => throw invalidValue(value) + } + case None => + throw new IllegalArgumentException(s"missing required parameter: q ($uri)") + } + } + + def parseQuery(uri: Uri): (ExprType, List[Expr]) = { + val exprType = determineExprType(uri) + val exprs = exprType match { + case ExprType.TIME_SERIES => eval(uri).exprs + case ExprType.EVENTS => evalEvents(uri) + case ExprType.TRACE_EVENTS => evalTraceEvents(uri) + case ExprType.TRACE_TIME_SERIES => evalTraceTimeSeries(uri) + } + exprType -> exprs.distinct + } + + def dataExprs(uri: Uri): List[String] = { + val exprs = determineExprType(uri) match { + case ExprType.TIME_SERIES => eval(uri).exprs.flatMap(_.expr.dataExprs) + case ExprType.EVENTS => evalEvents(uri) + case ExprType.TRACE_EVENTS => evalTraceEvents(uri) + case ExprType.TRACE_TIME_SERIES => evalTraceTimeSeries(uri) + } + exprs.map(_.toString).distinct + } + + def determineExprType(uri: Uri): ExprType = { + val reversed = reversedPath(uri.path) + if (reversed.startsWith(eventsPrefix)) + ExprType.EVENTS + else if (reversed.startsWith(traceEventsPrefix)) + ExprType.TRACE_EVENTS + else if (reversed.startsWith(traceTimeSeriesPrefix)) + ExprType.TRACE_TIME_SERIES + else + ExprType.TIME_SERIES + } + + private def reversedPath(path: Uri.Path): Uri.Path = { + val reversed = path.reverse + if (reversed.startsWithSlash) + reversed.tail + else + reversed + } +} + +private[stream] object ExprInterpreter { + + private val eventsPrefix = Uri.Path("events") + private val traceEventsPrefix = Uri.Path("traces") + private val traceTimeSeriesPrefix = Uri.Path("graph") / "traces" } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala index 2797c5e22..7aa055a8c 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala @@ -33,6 +33,7 @@ import com.netflix.atlas.core.model.StyleExpr import com.netflix.atlas.core.model.TimeSeries import com.netflix.atlas.core.util.IdentityMap import com.netflix.atlas.eval.model.TimeGroup +import com.netflix.atlas.eval.model.TimeGroupsTuple import com.netflix.atlas.eval.model.TimeSeriesMessage import com.netflix.atlas.eval.stream.Evaluator.DataSources import com.netflix.atlas.eval.stream.Evaluator.MessageEnvelope @@ -173,7 +174,7 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) // Perform the final evaluation and create a source with the TimeSeriesMessages // addressed to each recipient - private def handleData(group: TimeGroup): Unit = { + private def handleData(group: TimeGroup): List[MessageEnvelope] = { // Finalize the DataExprs, needed as input for further evaluation val timestamp = group.timestamp val groupedDatapoints = group.dataExprValues @@ -236,14 +237,26 @@ private[stream] class FinalExprEval(exprInterpreter: ExprInterpreter) case (id, rate) => new MessageEnvelope(id, rate) }.toList - push(out, Source(output ++ rateMessages)) + output ++ rateMessages + } + + private def handleSingleGroup(g: TimeGroup): Unit = { + push(out, Source(handleData(g))) + } + + private def handleGroups(t: TimeGroupsTuple): Unit = { + val msgs = List.newBuilder[MessageEnvelope] + msgs ++= t.messages + msgs ++= t.groups.flatMap(handleData) + push(out, Source(msgs.result())) } override def onPush(): Unit = { grab(in) match { - case ds: DataSources => handleDataSources(ds) - case data: TimeGroup => handleData(data) - case v => throw new MatchError(v) + case ds: DataSources => handleDataSources(ds) + case data: TimeGroup => handleSingleGroup(data) + case t: TimeGroupsTuple => handleGroups(t) + case v => throw new MatchError(v) } } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala index 57256ea38..07d8d6fac 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapoint.scala @@ -33,56 +33,74 @@ import com.netflix.atlas.eval.model.LwcDiagnosticMessage import com.netflix.atlas.eval.model.LwcEvent import com.netflix.atlas.eval.model.LwcHeartbeat import com.netflix.atlas.eval.model.LwcSubscription +import com.netflix.atlas.eval.model.LwcSubscriptionV2 +import com.netflix.atlas.eval.model.DatapointsTuple /** * Process the SSE output from an LWC service and convert it into a stream of * [[AggrDatapoint]]s that can be used for evaluation. */ private[stream] class LwcToAggrDatapoint(context: StreamContext) - extends GraphStage[FlowShape[List[AnyRef], List[AggrDatapoint]]] { + extends GraphStage[FlowShape[List[AnyRef], DatapointsTuple]] { import LwcToAggrDatapoint.* private val unknown = context.registry.counter("atlas.eval.unknownMessages") private val in = Inlet[List[AnyRef]]("LwcToAggrDatapoint.in") - private val out = Outlet[List[AggrDatapoint]]("LwcToAggrDatapoint.out") + private val out = Outlet[DatapointsTuple]("LwcToAggrDatapoint.out") - override val shape: FlowShape[List[AnyRef], List[AggrDatapoint]] = FlowShape(in, out) + override val shape: FlowShape[List[AnyRef], DatapointsTuple] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { new GraphStageLogic(shape) with InHandler with OutHandler { - private[this] val state = scala.collection.mutable.AnyRefMap.empty[String, DatapointMetadata] + private val tsState = scala.collection.mutable.AnyRefMap.empty[String, DatapointMetadata] + private val eventState = scala.collection.mutable.AnyRefMap.empty[String, String] override def onPush(): Unit = { - val builder = List.newBuilder[AggrDatapoint] + val dpBuilder = List.newBuilder[AggrDatapoint] + val msgBuilder = List.newBuilder[Evaluator.MessageEnvelope] grab(in).foreach { case sb: LwcSubscription => updateState(sb) - case dp: LwcDatapoint => builder ++= pushDatapoint(dp) - case ev: LwcEvent => pushEvent(ev) - case dg: LwcDiagnosticMessage => pushDiagnosticMessage(dg) - case hb: LwcHeartbeat => builder += pushHeartbeat(hb) + case sb: LwcSubscriptionV2 => updateStateV2(sb) + case dp: LwcDatapoint => dpBuilder ++= pushDatapoint(dp) + case ev: LwcEvent => msgBuilder ++= pushEvent(ev) + case dg: LwcDiagnosticMessage => msgBuilder ++= pushDiagnosticMessage(dg) + case hb: LwcHeartbeat => dpBuilder += pushHeartbeat(hb) case _ => } - val datapoints = builder.result() - if (datapoints.isEmpty) + val datapoints = dpBuilder.result() + val messages = msgBuilder.result() + if (datapoints.isEmpty && messages.isEmpty) pull(in) else - push(out, datapoints) + push(out, DatapointsTuple(datapoints, messages)) } private def updateState(sub: LwcSubscription): Unit = { sub.metrics.foreach { m => - if (!state.contains(m.id)) { + if (!tsState.contains(m.id)) { val expr = parseExpr(m.expression) - state.put(m.id, DatapointMetadata(m.expression, expr, m.step)) + tsState.put(m.id, DatapointMetadata(m.expression, expr, m.step)) + } + } + } + + private def updateStateV2(sub: LwcSubscriptionV2): Unit = { + sub.subExprs.foreach { s => + if (sub.exprType.isTimeSeriesType && !tsState.contains(s.id)) { + val expr = parseExpr(s.expression) + tsState.put(s.id, DatapointMetadata(s.expression, expr, s.step)) + } + if (sub.exprType.isEventType && !eventState.contains(s.id)) { + eventState.put(s.id, s.expression) } } } private def pushDatapoint(dp: LwcDatapoint): Option[AggrDatapoint] = { - state.get(dp.id) match { + tsState.get(dp.id) match { case Some(sub) => val expr = sub.dataExpr val step = sub.step @@ -93,17 +111,24 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext) } } - private def pushEvent(event: LwcEvent): Unit = { - state.get(event.id) match { - case Some(sub) => context.log(sub.dataExpr, EventMessage(event.payload)) - case None => unknown.increment() + private def pushEvent(event: LwcEvent): List[Evaluator.MessageEnvelope] = { + eventState.get(event.id) match { + case Some(sub) => context.messagesForDataSource(sub, EventMessage(event.payload)) + case None => unknown.increment(); Nil } } - private def pushDiagnosticMessage(diagMsg: LwcDiagnosticMessage): Unit = { - state.get(diagMsg.id) match { - case Some(sub) => context.log(sub.dataExpr, diagMsg.message) - case None => unknown.increment() + private def pushDiagnosticMessage( + diagMsg: LwcDiagnosticMessage + ): List[Evaluator.MessageEnvelope] = { + tsState.get(diagMsg.id) match { + case Some(sub) => + context.messagesForDataSource(sub.dataExprStr, diagMsg.message) + case None => + eventState.get(diagMsg.id) match { + case Some(sub) => context.messagesForDataSource(sub, diagMsg.message) + case None => unknown.increment(); Nil + } } } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/StreamContext.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/StreamContext.scala index 0c079a075..e3add16a3 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/StreamContext.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/StreamContext.scala @@ -121,7 +121,7 @@ private[stream] class StreamContext( @volatile private var dataSources: DataSources = DataSources.empty() /** Map of DataExpr to data sources for being able to quickly log information. */ - @volatile private var dataExprMap: Map[DataExpr, List[DataSource]] = Map.empty + @volatile private var dataExprMap: Map[String, List[DataSource]] = Map.empty def setDataSources(ds: DataSources): Unit = { if (dataSources != ds) { @@ -221,7 +221,7 @@ private[stream] class StreamContext( * Emit an error to the sources where the number of input * or intermediate datapoints exceed for an expression. */ - def logDatapointsExceeded(timestamp: Long, dataExpr: DataExpr): Unit = { + def logDatapointsExceeded(timestamp: Long, dataExpr: String): Unit = { val diagnosticMessage = DiagnosticMessage.error( s"expression: $dataExpr exceeded the configured max input datapoints limit" + s" '$maxInputDatapointsPerExpression' or max intermediate" + @@ -234,12 +234,21 @@ private[stream] class StreamContext( /** * Send a diagnostic message to all data sources that use a particular data expression. */ - def log(expr: DataExpr, msg: JsonSupport): Unit = { + def log(expr: String, msg: JsonSupport): Unit = { dataExprMap.get(expr).foreach { ds => ds.foreach(s => dsLogger(s, msg)) } } + /** + * Creates a set of messages for each data source that uses a given expression. + */ + def messagesForDataSource(expr: String, msg: JsonSupport): List[Evaluator.MessageEnvelope] = { + dataExprMap.get(expr).toList.flatMap { ds => + ds.map(s => new Evaluator.MessageEnvelope(s.id, msg)) + } + } + /** * Returns a simple http client flow that will log the request using the provide name. */ diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala index 38db1aae3..3c05b5689 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/SyntheticDataSource.scala @@ -15,19 +15,27 @@ */ package com.netflix.atlas.eval.stream +import com.fasterxml.jackson.databind.JsonNode import org.apache.pekko.NotUsed import org.apache.pekko.http.scaladsl.model.Uri import org.apache.pekko.stream.IOResult import org.apache.pekko.stream.scaladsl.Source import org.apache.pekko.util.ByteString import com.netflix.atlas.core.model.DataExpr +import com.netflix.atlas.core.model.EventExpr +import com.netflix.atlas.core.model.Expr import com.netflix.atlas.core.model.Query import com.netflix.atlas.core.model.StyleExpr +import com.netflix.atlas.core.model.TraceQuery import com.netflix.atlas.core.util.Strings +import com.netflix.atlas.eval.model.ExprType import com.netflix.atlas.eval.model.LwcDataExpr import com.netflix.atlas.eval.model.LwcDatapoint +import com.netflix.atlas.eval.model.LwcEvent import com.netflix.atlas.eval.model.LwcSubscription +import com.netflix.atlas.eval.model.LwcSubscriptionV2 import com.netflix.atlas.json.Json +import com.netflix.spectator.impl.Hash64 import java.util.concurrent.TimeUnit import scala.concurrent.Future @@ -57,10 +65,10 @@ object SyntheticDataSource { def apply(interpreter: ExprInterpreter, uri: Uri): Source[ByteString, Future[IOResult]] = { val settings = getSettings(uri) - val exprs = interpreter.eval(uri).exprs + val (exprType, exprs) = interpreter.parseQuery(uri) val promise = Promise[IOResult]() Source(exprs) - .flatMapMerge(Int.MaxValue, expr => source(settings, expr)) + .flatMapMerge(Int.MaxValue, expr => source(settings, exprType, expr)) .via(new OnUpstreamFinish[ByteString](promise.success(IOResult.createSuccessful(0L)))) .mapMaterializedValue(_ => promise.future) } @@ -75,6 +83,23 @@ object SyntheticDataSource { ) } + private def source( + settings: Settings, + exprType: ExprType, + expr: Expr + ): Source[ByteString, NotUsed] = { + exprType match { + case ExprType.EVENTS => + source(settings, expr.asInstanceOf[EventExpr]) + case ExprType.TIME_SERIES => + source(settings, expr.asInstanceOf[StyleExpr]) + case ExprType.TRACE_EVENTS => + Source.empty + case ExprType.TRACE_TIME_SERIES => + source(settings, expr.asInstanceOf[TraceQuery.SpanTimeSeries].expr) + } + } + private def source(settings: Settings, styleExpr: StyleExpr): Source[ByteString, NotUsed] = { val subMessage = LwcSubscription( styleExpr.toString, @@ -130,5 +155,36 @@ object SyntheticDataSource { } } + private def source(settings: Settings, expr: EventExpr): Source[ByteString, NotUsed] = { + val id = computeId(ExprType.EVENTS, expr, 0L) + val dataExpr = LwcDataExpr(id, expr.toString, 0L) + val subMessage = LwcSubscriptionV2(expr.toString, ExprType.EVENTS, List(dataExpr)) + + val exprSource = Source(0 until settings.numStepIntervals) + .throttle(1, FiniteDuration(settings.step, TimeUnit.MILLISECONDS)) + .flatMapConcat { i => + Source(0 until settings.inputDataSize) + .map { j => + val data = Map( + "tags" -> Query.tags(expr.query), + "i" -> i, + "j" -> j + ) + val json = Json.decode[JsonNode](Json.encode(data)) + LwcEvent(id, json) + } + } + + Source + .single(subMessage) + .concat(exprSource) + .map(msg => ByteString(Json.encode(msg))) + } + + private def computeId(exprType: ExprType, expr: Expr, step: Long): String = { + val key = s"$exprType:$expr:$step" + java.lang.Long.toString(new Hash64().updateString(key).compute(), 16) + } + case class Settings(step: Long, numStepIntervals: Int, inputDataSize: Int, outputDataSize: Int) } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/TimeGrouped.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/TimeGrouped.scala index 15c2a49a4..89d59414a 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/TimeGrouped.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/TimeGrouped.scala @@ -26,7 +26,9 @@ import org.apache.pekko.stream.stage.OutHandler import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.eval.model.AggrDatapoint import com.netflix.atlas.eval.model.AggrValuesInfo +import com.netflix.atlas.eval.model.DatapointsTuple import com.netflix.atlas.eval.model.TimeGroup +import com.netflix.atlas.eval.model.TimeGroupsTuple /** * Operator for grouping data into buckets by time. The expectation is that the data @@ -38,7 +40,7 @@ import com.netflix.atlas.eval.model.TimeGroup */ private[stream] class TimeGrouped( context: StreamContext -) extends GraphStage[FlowShape[List[AggrDatapoint], List[TimeGroup]]] { +) extends GraphStage[FlowShape[DatapointsTuple, TimeGroupsTuple]] { type AggrMap = java.util.HashMap[DataExpr, AggrDatapoint.Aggregator] @@ -60,10 +62,10 @@ private[stream] class TimeGrouped( context.registry ) - private val in = Inlet[List[AggrDatapoint]]("TimeGrouped.in") - private val out = Outlet[List[TimeGroup]]("TimeGrouped.out") + private val in = Inlet[DatapointsTuple]("TimeGrouped.in") + private val out = Outlet[TimeGroupsTuple]("TimeGrouped.out") - override val shape: FlowShape[List[AggrDatapoint], List[TimeGroup]] = FlowShape(in, out) + override val shape: FlowShape[DatapointsTuple, TimeGroupsTuple] = FlowShape(in, out) private val metricName = "atlas.eval.datapoints" private val registry = context.registry @@ -138,7 +140,7 @@ private[stream] class TimeGrouped( val aggregateMapForExpWithinLimits = aggrMap.asScala .filter { case (expr, aggr) if aggr.limitExceeded => - context.logDatapointsExceeded(ts, expr) + context.logDatapointsExceeded(ts, expr.toString) false case _ => true @@ -153,7 +155,8 @@ private[stream] class TimeGrouped( override def onPush(): Unit = { val builder = List.newBuilder[TimeGroup] - grab(in).foreach { v => + val tuple = grab(in) + tuple.data.foreach { v => val t = v.timestamp val now = clock.wallTime() step = v.step @@ -177,10 +180,10 @@ private[stream] class TimeGrouped( } } val groups = builder.result() - if (groups.isEmpty) + if (groups.isEmpty && tuple.messages.isEmpty) pull(in) else - push(out, groups) + push(out, TimeGroupsTuple(groups, tuple.messages)) } override def onPull(): Unit = { @@ -203,7 +206,7 @@ private[stream] class TimeGrouped( private def flushPending(): Unit = { if (pending.nonEmpty && isAvailable(out)) { - push(out, pending) + push(out, TimeGroupsTuple(pending)) pending = Nil } if (pending.isEmpty) { diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala index aff00075e..e4364c3f2 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala @@ -741,4 +741,35 @@ class EvaluatorSuite extends FunSuite { val json = """{"id":"_","uri":":true"}""" assertEquals(Json.decode[Evaluator.DataSource](json), ds) } + + test("publisher, time series") { + val evaluator = new Evaluator(config, registry, system) + + val uri = + "synthetic://test/graph?q=name,cpu,:eq,nf.app,foo,:eq,:and,:max&step=1ms&numStepIntervals=5" + val future = Source.fromPublisher(evaluator.createPublisher(uri)).runWith(Sink.seq) + val result = Await.result(future, scala.concurrent.duration.Duration.Inf) + // 5 data points and 5 messages indicating data volumes + assertEquals(result.size, 10) + } + + test("publisher, events") { + val evaluator = new Evaluator(config, registry, system) + + val uri = + "synthetic://test/events?q=name,cpu,:eq,nf.app,foo,:eq,:and&step=1ms&numStepIntervals=5" + val future = Source.fromPublisher(evaluator.createPublisher(uri)).runWith(Sink.seq) + val result = Await.result(future, scala.concurrent.duration.Duration.Inf) + assertEquals(result.size, 5000) + } + + test("publisher, trace time series") { + val evaluator = new Evaluator(config, registry, system) + + val uri = + "synthetic://test/traces/graph?q=name,cpu,:eq,nf.app,foo,:eq,:and&step=1ms&numStepIntervals=5" + val future = Source.fromPublisher(evaluator.createPublisher(uri)).runWith(Sink.seq) + val result = Await.result(future, scala.concurrent.duration.Duration.Inf) + assertEquals(result.size, 5) + } } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/ExprInterpreterSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/ExprInterpreterSuite.scala new file mode 100644 index 000000000..eb81a1a5c --- /dev/null +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/ExprInterpreterSuite.scala @@ -0,0 +1,57 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.eval.stream + +import com.netflix.atlas.eval.model.ExprType +import com.typesafe.config.ConfigFactory +import munit.FunSuite +import org.apache.pekko.http.scaladsl.model.Uri + +class ExprInterpreterSuite extends FunSuite { + + private val interpreter = new ExprInterpreter(ConfigFactory.load()) + + test("determineExprType time series") { + assertEquals(interpreter.determineExprType(Uri("/api/v1/graph")), ExprType.TIME_SERIES) + assertEquals(interpreter.determineExprType(Uri("/api/v2/fetch")), ExprType.TIME_SERIES) + assertEquals(interpreter.determineExprType(Uri("/api/v1/graph/")), ExprType.TIME_SERIES) + assertEquals(interpreter.determineExprType(Uri("/graph")), ExprType.TIME_SERIES) + } + + test("determineExprType events") { + assertEquals(interpreter.determineExprType(Uri("/api/v1/events")), ExprType.EVENTS) + assertEquals(interpreter.determineExprType(Uri("/api/v1/events/")), ExprType.EVENTS) + assertEquals(interpreter.determineExprType(Uri("/events")), ExprType.EVENTS) + } + + test("determineExprType trace events") { + assertEquals(interpreter.determineExprType(Uri("/api/v1/traces")), ExprType.TRACE_EVENTS) + assertEquals(interpreter.determineExprType(Uri("/api/v1/traces/")), ExprType.TRACE_EVENTS) + assertEquals(interpreter.determineExprType(Uri("/traces")), ExprType.TRACE_EVENTS) + } + + test("determineExprType trace time series") { + assertEquals( + interpreter.determineExprType(Uri("/api/v1/traces/graph")), + ExprType.TRACE_TIME_SERIES + ) + assertEquals( + interpreter.determineExprType(Uri("/api/v1/traces/graph/")), + ExprType.TRACE_TIME_SERIES + ) + assertEquals(interpreter.determineExprType(Uri("/traces/graph")), ExprType.TRACE_TIME_SERIES) + } +} diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapointSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapointSuite.scala index 22d85eaca..aaac0a913 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapointSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapointSuite.scala @@ -66,13 +66,13 @@ class LwcToAggrDatapointSuite extends FunSuite { """{"type":"diagnostic","id":"sum","message":{"type":"error","message":"2"}}""" ) - private val logMessages = new ArrayBlockingQueue[(DataSource, JsonSupport)](10) + private val logMessages = new ArrayBlockingQueue[Evaluator.MessageEnvelope](10) private val context = new StreamContext( ConfigFactory.load(), null, materializer, - dsLogger = (ds, msg) => logMessages.add(ds -> msg) + dsLogger = (_, _) => () ) context.setDataSources( @@ -87,7 +87,10 @@ class LwcToAggrDatapointSuite extends FunSuite { .map(LwcMessages.parse) .map(msg => List(msg)) .via(new LwcToAggrDatapoint(context)) - .flatMapConcat(Source.apply) + .flatMapConcat { t => + t.messages.foreach(m => logMessages.add(m)) + Source(t.data) + } .runWith(Sink.seq[AggrDatapoint]) Await.result(future, Duration.Inf).toList } @@ -112,12 +115,19 @@ class LwcToAggrDatapointSuite extends FunSuite { eval(input) assertEquals(logMessages.size(), 2) List("1", "2").foreach { i => + // https://github.com/lampepfl/dotty/issues/15661 ? + // On 3.4.0 there is an error if using `v` instead of `null` logMessages.poll() match { - case (_, msg: DiagnosticMessage) => - assertEquals(msg.`type`, "error") - assertEquals(msg.message, i) - case v => - fail(s"unexpected message: $v") + case env: Evaluator.MessageEnvelope => + env.message match { + case msg: DiagnosticMessage => + assertEquals(msg.`type`, "error") + assertEquals(msg.message, i) + case v => + fail(s"unexpected message: $v") + } + case null => + fail(s"unexpected type: null") } } } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TimeGroupedSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TimeGroupedSuite.scala index ac3ce049d..15adf3c49 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TimeGroupedSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TimeGroupedSuite.scala @@ -22,6 +22,7 @@ import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.core.model.Query import com.netflix.atlas.eval.model.AggrDatapoint import com.netflix.atlas.eval.model.AggrValuesInfo +import com.netflix.atlas.eval.model.DatapointsTuple import com.netflix.atlas.eval.model.TimeGroup import com.netflix.spectator.api.DefaultRegistry import munit.FunSuite @@ -55,9 +56,9 @@ class TimeGroupedSuite extends FunSuite { private def run(data: List[AggrDatapoint]): List[TimeGroup] = { val future = Source - .single(data) + .single(DatapointsTuple(data)) .via(new TimeGrouped(context)) - .flatMapConcat(Source.apply) + .flatMapConcat(t => Source(t.groups)) .runFold(List.empty[TimeGroup])((acc, g) => g :: acc) result(future) }