From 6a1f1db1cafeb88d39d92817590378b1cf6aee44 Mon Sep 17 00:00:00 2001 From: brharrington Date: Thu, 17 Oct 2024 17:16:40 -0500 Subject: [PATCH] lwcapi: improve performance for index updates (#1708) Now that we are off the legacy query index, refactor the update to modify the index with changes rather than recreate each time the subscription set changes. --- .../atlas/lwcapi/SubscriptionManager.scala | 24 +++++++++---------- .../atlas/lwcapi/ExpressionApiSuite.scala | 4 ++-- .../lwcapi/SubscriptionManagerSuite.scala | 12 +++++----- 3 files changed, 19 insertions(+), 21 deletions(-) diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala index 648a35f5a..5b7e17892 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscriptionManager.scala @@ -46,35 +46,33 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging { private val subHandlers = new ConcurrentHashMap[String, ConcurrentSet[T]]() @volatile private var subscriptionsList = List.empty[Subscription] - @volatile private var queryIndex = newIndex(Nil) @volatile private var queryListChanged = false + private val queryIndex = QueryIndex.newInstance[Subscription](registry) // Background process for updating the query index. It is not done inline because rebuilding // the index can be computationally expensive. private val ex = new ScheduledThreadPoolExecutor(1, ThreadPools.threadFactory("ExpressionDatabase")) - ex.scheduleWithFixedDelay(() => regenerateQueryIndex(), 1, 1, TimeUnit.SECONDS) + ex.scheduleWithFixedDelay(() => updateQueryIndex(), 1, 1, TimeUnit.SECONDS) ex.scheduleAtFixedRate(() => updateGauges(), 1, 1, TimeUnit.MINUTES) - private def newIndex(subs: List[Subscription]): QueryIndex[Subscription] = { - val idx = QueryIndex.newInstance[Subscription](registry) - subs.foreach { sub => - idx.add(sub.query, sub) - } - idx - } - /** Rebuild the query index if there have been changes since it was last created. */ - private[lwcapi] def regenerateQueryIndex(): Unit = { + private[lwcapi] def updateQueryIndex(): Unit = { if (queryListChanged) { queryListChanged = false + val previous = subscriptionsList.toSet subscriptionsList = registrations .values() .asScala .flatMap(_.subscriptions) .toList .distinct - queryIndex = newIndex(subscriptionsList) + + val current = subscriptionsList.toSet + val added = current.diff(previous) + val removed = previous.diff(current) + added.foreach(s => queryIndex.add(s.query, s)) + removed.foreach(s => queryIndex.remove(s.query, s)) } } @@ -285,7 +283,7 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging { logger.debug("clearing all subscriptions") registrations.clear() queryListChanged = true - regenerateQueryIndex() + updateQueryIndex() } } diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala index 63b4867ea..367dda923 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala @@ -107,7 +107,7 @@ class ExpressionApiSuite extends MUnitRouteSuite { splits.foreach { s => sm.subscribe("a", s) } - sm.regenerateQueryIndex() + sm.updateQueryIndex() Get("/lwc/api/v1/expressions/skan") ~> endpoint.routes ~> check { val expected = s"""{"expressions":[$skanCount,$skanSum]}""" assertEquals(unzip(responseAs[Array[Byte]]), expected) @@ -121,7 +121,7 @@ class ExpressionApiSuite extends MUnitRouteSuite { splits.foreach { s => sm.subscribe("a", s) } - sm.regenerateQueryIndex() + sm.updateQueryIndex() Get("/lwc/api/v1/expressions") ~> endpoint.routes ~> check { val expected = s"""{"expressions":[$brhMax,$skanCount,$skanSum]}""" assertEquals(unzip(responseAs[Array[Byte]]), expected) diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala index 329a9c73a..85d976957 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/SubscriptionManagerSuite.scala @@ -74,11 +74,11 @@ class SubscriptionManagerSuite extends FunSuite { val exp1 = sub("name,exp1,:eq") sm.subscribe(meta.streamId, exp1) - sm.regenerateQueryIndex() + sm.updateQueryIndex() assertEquals(sm.subscriptions, List(exp1)) assert(!sm.register(meta, 1)) - sm.regenerateQueryIndex() + sm.updateQueryIndex() assertEquals(sm.subscriptions, List(exp1)) } @@ -89,7 +89,7 @@ class SubscriptionManagerSuite extends FunSuite { val subs = List(sub("name,exp1,:eq"), sub("name,exp2,:eq")) sm.subscribe(meta.streamId, subs) - sm.regenerateQueryIndex() + sm.updateQueryIndex() assertEquals(sm.subscriptions.toSet, subs.toSet) } @@ -102,7 +102,7 @@ class SubscriptionManagerSuite extends FunSuite { val s = sub("name,exp1,:eq") sm.subscribe(meta.streamId, s) sm.subscribe(meta.streamId, s) - sm.regenerateQueryIndex() + sm.updateQueryIndex() assertEquals(sm.subscriptions, List(s)) } @@ -117,7 +117,7 @@ class SubscriptionManagerSuite extends FunSuite { val s = sub("name,exp1,:eq") sm.subscribe(a.streamId, s) sm.subscribe(b.streamId, s) - sm.regenerateQueryIndex() + sm.updateQueryIndex() assertEquals(sm.subscriptions, List(s)) assertEquals(sm.subscriptionsForStream(a.streamId), List(s)) @@ -133,7 +133,7 @@ class SubscriptionManagerSuite extends FunSuite { val s = sub(expr) sm.register(StreamMetadata("a"), 1) sm.subscribe("a", s) - sm.regenerateQueryIndex() + sm.updateQueryIndex() sm.subscriptionsForCluster(cluster).map(_.metadata.expression) }