Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eval: add new subscription message #1633

Merged
merged 2 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@
package com.netflix.atlas.eval.model

import com.fasterxml.jackson.annotation.JsonAlias
import com.fasterxml.jackson.annotation.JsonIgnore
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.DataVocabulary
import com.netflix.atlas.core.stacklang.Interpreter

/**
* Triple representing the id, data expression, and frequency for a given data
Expand All @@ -35,20 +31,4 @@ import com.netflix.atlas.core.stacklang.Interpreter
* @param step
* The step size used for this stream of data.
*/
case class LwcDataExpr(id: String, expression: String, @JsonAlias(Array("frequency")) step: Long) {

@JsonIgnore
lazy val expr: DataExpr = LwcDataExpr.parseExpr(expression)
}

object LwcDataExpr {

private val interpreter = Interpreter(DataVocabulary.allWords)

private def parseExpr(input: String): DataExpr = {
interpreter.execute(input).stack match {
case (expr: DataExpr) :: Nil => expr
case _ => throw new IllegalArgumentException(s"invalid expr: $input")
}
}
}
case class LwcDataExpr(id: String, expression: String, @JsonAlias(Array("frequency")) step: Long) {}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ object LwcMessages {

// LwcSubscription
// - expression
var metrics: List[LwcDataExpr] = Nil
// - metrics
// LwcSubscriptionV2
// - expression
// - exprType
// - subExprs
var subExprs: List[LwcDataExpr] = Nil

// LwcDatapoint
var timestamp: Long = -1L
Expand Down Expand Up @@ -97,7 +102,8 @@ object LwcMessages {
case "expression" => expression = nextString(parser)
case "exprType" => exprType = ExprType.valueOf(nextString(parser))
case "step" => step = nextLong(parser)
case "metrics" => metrics = parseDataExprs(parser)
case "metrics" => subExprs = parseDataExprs(parser)
case "subExprs" => subExprs = parseDataExprs(parser)

case "timestamp" => timestamp = nextLong(parser)
case "id" => id = nextString(parser)
Expand All @@ -117,13 +123,14 @@ object LwcMessages {
}

typeDesc match {
case "expression" => LwcExpression(expression, exprType, step)
case "subscription" => LwcSubscription(expression, metrics)
case "datapoint" => LwcDatapoint(timestamp, id, tags, value)
case "event" => LwcEvent(id, payload)
case "diagnostic" => LwcDiagnosticMessage(id, diagnosticMessage)
case "heartbeat" => LwcHeartbeat(timestamp, step)
case _ => DiagnosticMessage(typeDesc, message, None)
case "expression" => LwcExpression(expression, exprType, step)
case "subscription" => LwcSubscription(expression, subExprs)
case "subscription-v2" => LwcSubscriptionV2(expression, exprType, subExprs)
case "datapoint" => LwcDatapoint(timestamp, id, tags, value)
case "event" => LwcEvent(id, payload)
case "diagnostic" => LwcDiagnosticMessage(id, diagnosticMessage)
case "heartbeat" => LwcHeartbeat(timestamp, step)
case _ => DiagnosticMessage(typeDesc, message, None)
}
} finally {
parser.close()
Expand Down Expand Up @@ -180,6 +187,7 @@ object LwcMessages {
private val Diagnostic = 4
private val Heartbeat = 5
private val Event = 6
private val SubscriptionV2 = 7

/**
* Encode messages using Jackson's smile format into a ByteString.
Expand Down Expand Up @@ -215,6 +223,17 @@ object LwcMessages {
gen.writeNumber(m.step)
}
gen.writeEndArray()
case msg: LwcSubscriptionV2 =>
gen.writeNumber(SubscriptionV2)
gen.writeString(msg.expression)
gen.writeString(msg.exprType.name())
gen.writeStartArray()
msg.subExprs.foreach { s =>
gen.writeString(s.id)
gen.writeString(s.expression)
gen.writeNumber(s.step)
}
gen.writeEndArray()
case msg: LwcDatapoint =>
gen.writeNumber(Datapoint)
gen.writeNumber(msg.timestamp)
Expand Down Expand Up @@ -288,6 +307,18 @@ object LwcMessages {
)
}
builder += LwcSubscription(expression, dataExprs.result())
case SubscriptionV2 =>
val expression = parser.nextTextValue()
val exprType = ExprType.valueOf(parser.nextTextValue())
val subExprs = List.newBuilder[LwcDataExpr]
foreachItem(parser) {
subExprs += LwcDataExpr(
parser.getText,
parser.nextTextValue(),
parser.nextLongValue(-1L)
)
}
builder += LwcSubscriptionV2(expression, exprType, subExprs.result())
case Datapoint =>
val timestamp = parser.nextLongValue(-1L)
val id = parser.nextTextValue()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2014-2024 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.model

import com.netflix.atlas.json.JsonSupport

/**
* Subscription message that is returned by the LWC service.
*
* @param expression
* Expression that was used for the initial subscription.
* @param exprType
* Indicates the type of expression for the subscription. This is typically determined
* based on the endpoint used on the URI.
* @param subExprs
* Data expressions that result from the root expression.
*/
case class LwcSubscriptionV2(expression: String, exprType: ExprType, subExprs: List[LwcDataExpr])
extends JsonSupport {
val `type`: String = "subscription-v2"
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/
package com.netflix.atlas.eval.stream

import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.DataVocabulary
import com.netflix.atlas.core.stacklang.Interpreter
import org.apache.pekko.stream.Attributes
import org.apache.pekko.stream.FlowShape
import org.apache.pekko.stream.Inlet
Expand All @@ -25,7 +28,6 @@ import org.apache.pekko.stream.stage.InHandler
import org.apache.pekko.stream.stage.OutHandler
import com.netflix.atlas.eval.model.AggrDatapoint
import com.netflix.atlas.eval.model.EventMessage
import com.netflix.atlas.eval.model.LwcDataExpr
import com.netflix.atlas.eval.model.LwcDatapoint
import com.netflix.atlas.eval.model.LwcDiagnosticMessage
import com.netflix.atlas.eval.model.LwcEvent
Expand All @@ -39,6 +41,8 @@ import com.netflix.atlas.eval.model.LwcSubscription
private[stream] class LwcToAggrDatapoint(context: StreamContext)
extends GraphStage[FlowShape[List[AnyRef], List[AggrDatapoint]]] {

import LwcToAggrDatapoint.*

private val unknown = context.registry.counter("atlas.eval.unknownMessages")

private val in = Inlet[List[AnyRef]]("LwcToAggrDatapoint.in")
Expand All @@ -49,7 +53,7 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) with InHandler with OutHandler {

private[this] val state = scala.collection.mutable.AnyRefMap.empty[String, LwcDataExpr]
private[this] val state = scala.collection.mutable.AnyRefMap.empty[String, DatapointMetadata]

override def onPush(): Unit = {
val builder = List.newBuilder[AggrDatapoint]
Expand All @@ -71,15 +75,16 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
private def updateState(sub: LwcSubscription): Unit = {
sub.metrics.foreach { m =>
if (!state.contains(m.id)) {
state.put(m.id, m)
val expr = parseExpr(m.expression)
state.put(m.id, DatapointMetadata(m.expression, expr, m.step))
}
}
}

private def pushDatapoint(dp: LwcDatapoint): Option[AggrDatapoint] = {
state.get(dp.id) match {
case Some(sub) =>
val expr = sub.expr
val expr = sub.dataExpr
val step = sub.step
Some(AggrDatapoint(dp.timestamp, step, expr, "datapoint", dp.tags, dp.value))
case None =>
Expand All @@ -90,14 +95,14 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)

private def pushEvent(event: LwcEvent): Unit = {
state.get(event.id) match {
case Some(sub) => context.log(sub.expr, EventMessage(event.payload))
case Some(sub) => context.log(sub.dataExpr, EventMessage(event.payload))
case None => unknown.increment()
}
}

private def pushDiagnosticMessage(diagMsg: LwcDiagnosticMessage): Unit = {
state.get(diagMsg.id) match {
case Some(sub) => context.log(sub.expr, diagMsg.message)
case Some(sub) => context.log(sub.dataExpr, diagMsg.message)
case None => unknown.increment()
}
}
Expand All @@ -118,3 +123,17 @@ private[stream] class LwcToAggrDatapoint(context: StreamContext)
}
}
}

private[stream] object LwcToAggrDatapoint {

case class DatapointMetadata(dataExprStr: String, dataExpr: DataExpr, step: Long)

private val interpreter = Interpreter(DataVocabulary.allWords)

private def parseExpr(input: String): DataExpr = {
interpreter.execute(input).stack match {
case (expr: DataExpr) :: Nil => expr
case _ => throw new IllegalArgumentException(s"invalid expr: $input")
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,46 @@ class LwcMessagesSuite extends FunSuite {
assertEquals(actual, expected)
}

test("subscription-v2 time series") {
val expr = "name,cpu,:eq,:avg"
val sum = "name,cpu,:eq,:sum"
val count = "name,cpu,:eq,:count"
val dataExprs = List(LwcDataExpr("a", sum, step), LwcDataExpr("b", count, step))
val expected = LwcSubscriptionV2(expr, ExprType.TIME_SERIES, dataExprs)
val actual = LwcMessages.parse(Json.encode(expected))
assertEquals(actual, expected)
}

test("subscription-v2 events") {
val raw = "name,cpu,:eq"
val table = "name,cpu,:eq,(,name,value,),:table"
val expr = s"$raw,$table"
val dataExprs = List(LwcDataExpr("a", raw, 0L), LwcDataExpr("b", table, 0L))
val expected = LwcSubscriptionV2(expr, ExprType.EVENTS, dataExprs)
val actual = LwcMessages.parse(Json.encode(expected))
assertEquals(actual, expected)
}

test("subscription-v2 trace events") {
val q1 = "app,www,:eq,app,db,:eq,:child"
val q2 = "app,www,:eq,app,foo,:eq,:span-and"
val expr = s"$q1,$q2"
val dataExprs = List(LwcDataExpr("a", q1, 0L), LwcDataExpr("b", q2, 0L))
val expected = LwcSubscriptionV2(expr, ExprType.TRACE_EVENTS, dataExprs)
val actual = LwcMessages.parse(Json.encode(expected))
assertEquals(actual, expected)
}

test("subscription-v2 trace time series") {
val q1 = "app,www,:eq,app,db,:eq,:child"
val q2 = "app,www,:eq,app,foo,:eq,:span-and"
val expr = s"$q1,$q2"
val dataExprs = List(LwcDataExpr("a", q1, 0L), LwcDataExpr("b", q2, 0L))
val expected = LwcSubscriptionV2(expr, ExprType.TRACE_TIME_SERIES, dataExprs)
val actual = LwcMessages.parse(Json.encode(expected))
assertEquals(actual, expected)
}

test("datapoint") {
val expected = LwcDatapoint(step, "a", Map("foo" -> "bar"), 42.0)
val actual = LwcMessages.parse(Json.encode(expected))
Expand Down Expand Up @@ -125,6 +165,21 @@ class LwcMessagesSuite extends FunSuite {
assertEquals(actual, expected.toList)
}

test("batch: subscription-v2") {
val expected = (0 until 10).map { i =>
LwcSubscriptionV2(
"name,cpu,:eq,:avg",
ExprType.TIME_SERIES,
List(
LwcDataExpr(s"$i", "name,cpu,:eq,:sum", i),
LwcDataExpr(s"$i", "name,cpu,:eq,:count", i)
)
)
}
val actual = LwcMessages.parseBatch(LwcMessages.encodeBatch(expected))
assertEquals(actual, expected.toList)
}

test("batch: datapoint") {
val expected = (0 until 10).map { i =>
LwcDatapoint(
Expand Down
Loading