Skip to content

Commit

Permalink
lwc-events: add log for max groups limit (#1736)
Browse files Browse the repository at this point in the history
Log expression hitting max groups limit.
  • Loading branch information
brharrington authored Dec 10, 2024
1 parent 7bfee25 commit 0e0e604
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
case expr: EventExpr.Sample =>
val converter = DatapointConverter(
sub.id,
sub.expression,
expr.dataExpr,
clock,
sub.step,
Expand Down Expand Up @@ -93,7 +94,8 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
// Analytics based on events
diff.added.timeSeries.foreach { sub =>
val expr = ExprUtils.parseDataExpr(sub.expression)
val converter = DatapointConverter(sub.id, expr, clock, sub.step, None, submit)
val converter =
DatapointConverter(sub.id, sub.expression, expr, clock, sub.step, None, submit)
val q = ExprUtils.toSpectatorQuery(removeValueClause(expr.query))
val handler = EventHandler(
sub,
Expand Down Expand Up @@ -123,7 +125,8 @@ abstract class AbstractLwcEventClient(clock: Clock) extends LwcEventClient {
diff.added.traceTimeSeries.foreach { sub =>
val tq = ExprUtils.parseTraceTimeSeriesQuery(sub.expression)
val dataExpr = tq.expr.expr.dataExprs.head
val converter = DatapointConverter(sub.id, dataExpr, clock, sub.step, None, submit)
val converter =
DatapointConverter(sub.id, sub.expression, dataExpr, clock, sub.step, None, submit)
val q = ExprUtils.toSpectatorQuery(removeValueClause(dataExpr.query))
val handler = EventHandler(
sub,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.netflix.iep.config.ConfigManager
import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.histogram.PercentileBuckets
import com.netflix.spectator.impl.StepDouble
import com.typesafe.scalalogging.StrictLogging

import java.time.Duration
import java.util.Locale
Expand Down Expand Up @@ -56,6 +57,7 @@ private[events] object DatapointConverter {

def apply(
id: String,
rawExpr: String,
expr: DataExpr,
clock: Clock,
step: Long,
Expand All @@ -64,7 +66,8 @@ private[events] object DatapointConverter {
): DatapointConverter = {
val tags = Query.tags(expr.query)
val mapper = createValueMapper(tags)
val params = Params(id, Query.tags(expr.query), clock, step, mapper, sampleMapper, consumer)
val params =
Params(id, rawExpr, Query.tags(expr.query), clock, step, mapper, sampleMapper, consumer)
toConverter(expr, params)
}

Expand Down Expand Up @@ -144,6 +147,7 @@ private[events] object DatapointConverter {

case class Params(
id: String,
rawExpr: String,
tags: Map[String, String],
clock: Clock,
step: Long,
Expand Down Expand Up @@ -278,7 +282,9 @@ private[events] object DatapointConverter {
}

/** Compute set of data points, one for each distinct group. */
case class GroupBy(by: DataExpr.GroupBy, params: Params) extends DatapointConverter {
case class GroupBy(by: DataExpr.GroupBy, params: Params)
extends DatapointConverter
with StrictLogging {

private val groups = new ConcurrentHashMap[Map[String, String], DatapointConverter]()

Expand Down Expand Up @@ -366,7 +372,10 @@ private[events] object DatapointConverter {
if (converter.hasNoData)
it.remove()
}
maxGroupsExceeded = groups.size() >= MaxGroupBySize
if (groups.size() >= MaxGroupBySize) {
maxGroupsExceeded = true
logger.debug(s"max groups exceeded for expression: ${params.rawExpr}")
}
}

override def hasNoData: Boolean = groups.isEmpty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class RemoteLwcEventClient(registry: Registry, config: Config)
if (!events.isEmpty) {
import scala.jdk.CollectionConverters.*

// Write out datapoints that need to be batch by timestamp
// Write out datapoints that need to be batched by timestamp
val ds = events.asScala.collect {
case Event(_, e: DatapointEvent) => e
}.toList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class DatapointConverterSuite extends FunSuite {
test("counter - sum") {
val expr = DataExpr.Sum(Query.True)
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("value" -> i))
converter.update(event)
Expand All @@ -71,6 +72,7 @@ class DatapointConverterSuite extends FunSuite {
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter(
"id",
expr.toString,
expr,
clock,
step,
Expand All @@ -91,7 +93,8 @@ class DatapointConverterSuite extends FunSuite {
test("counter - sum custom value") {
val expr = DataExpr.Sum(Query.Equal("value", "responseSize"))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
Expand All @@ -106,7 +109,8 @@ class DatapointConverterSuite extends FunSuite {
test("counter - count") {
val expr = DataExpr.Count(Query.Equal("value", "responseSize"))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
Expand All @@ -121,7 +125,8 @@ class DatapointConverterSuite extends FunSuite {
test("counter - max custom value") {
val expr = DataExpr.Max(Query.Equal("value", "responseSize"))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
Expand All @@ -136,7 +141,8 @@ class DatapointConverterSuite extends FunSuite {
test("counter - max negative value") {
val expr = DataExpr.Max(Query.Equal("value", "responseSize"))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> -(i + 10)))
converter.update(event)
Expand All @@ -151,7 +157,8 @@ class DatapointConverterSuite extends FunSuite {
test("counter - min negative value") {
val expr = DataExpr.Min(Query.Equal("value", "responseSize"))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> -i))
converter.update(event)
Expand All @@ -170,7 +177,8 @@ class DatapointConverterSuite extends FunSuite {
test("timer - sum of totalTime") {
val expr = DataExpr.Sum(Query.Equal("value", "latency").and(stat("totalTime")))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("latency" -> i))
converter.update(event)
Expand All @@ -185,7 +193,8 @@ class DatapointConverterSuite extends FunSuite {
test("timer - sum of count") {
val expr = DataExpr.Sum(Query.Equal("value", "latency").and(stat("count")))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("latency" -> i))
converter.update(event)
Expand All @@ -200,7 +209,8 @@ class DatapointConverterSuite extends FunSuite {
test("timer - sum of totalOfSquares") {
val expr = DataExpr.Sum(Query.Equal("value", "latency").and(stat("totalOfSquares")))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("latency" -> i))
converter.update(event)
Expand All @@ -215,7 +225,8 @@ class DatapointConverterSuite extends FunSuite {
test("timer - dist-max") {
val expr = DataExpr.Max(Query.Equal("value", "latency").and(stat("max")))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("latency" -> i))
converter.update(event)
Expand All @@ -230,7 +241,8 @@ class DatapointConverterSuite extends FunSuite {
test("timer - dist-min") {
val expr = DataExpr.Min(Query.Equal("value", "latency").and(stat("max")))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("latency" -> i))
converter.update(event)
Expand All @@ -245,7 +257,8 @@ class DatapointConverterSuite extends FunSuite {
test("timer - percentile") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "latency")), List("percentile"))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("latency" -> Duration.ofMillis(i)))
converter.update(event)
Expand All @@ -260,7 +273,8 @@ class DatapointConverterSuite extends FunSuite {
test("dist - sum of totalAmount") {
val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("totalAmount")))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
Expand All @@ -276,7 +290,8 @@ class DatapointConverterSuite extends FunSuite {
val stat = Query.In("statistic", List("totalTime", "totalAmount"))
val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
Expand All @@ -291,7 +306,8 @@ class DatapointConverterSuite extends FunSuite {
test("dist - sum of count") {
val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("count")))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
Expand All @@ -306,7 +322,8 @@ class DatapointConverterSuite extends FunSuite {
test("dist - sum of totalOfSquares") {
val expr = DataExpr.Sum(Query.Equal("value", "responseSize").and(stat("totalOfSquares")))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
Expand All @@ -321,7 +338,8 @@ class DatapointConverterSuite extends FunSuite {
test("dist - dist-max") {
val expr = DataExpr.Max(Query.Equal("value", "responseSize").and(stat("max")))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
Expand All @@ -339,7 +357,8 @@ class DatapointConverterSuite extends FunSuite {
List("percentile")
)
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
val event = LwcEvent(Map("responseSize" -> i))
converter.update(event)
Expand All @@ -354,7 +373,8 @@ class DatapointConverterSuite extends FunSuite {
test("groupBy - sum") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "responseSize")), List("app"))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))
(0 until 5).foreach { i =>
converter.update(LwcEvent(Map("responseSize" -> i, "app" -> "a")))
converter.update(LwcEvent(Map("responseSize" -> i * 2, "app" -> "b")))
Expand All @@ -379,7 +399,8 @@ class DatapointConverterSuite extends FunSuite {
val expr =
DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "responseSize")), List("responseSize"))
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter("id", expr, clock, step, None, (_, e) => events.addOne(e))
val converter =
DatapointConverter("id", expr.toString, expr, clock, step, None, (_, e) => events.addOne(e))

(0 until 10_000).foreach { i =>
converter.update(LwcEvent(Map("responseSize" -> i.toString, "app" -> "a")))
Expand All @@ -404,6 +425,7 @@ class DatapointConverterSuite extends FunSuite {
val events = List.newBuilder[LwcEvent]
val converter = DatapointConverter(
"id",
expr.toString,
expr,
clock,
step,
Expand Down

0 comments on commit 0e0e604

Please sign in to comment.