Skip to content

Commit

Permalink
lwcapi: improve performance for index updates (#1708)
Browse files Browse the repository at this point in the history
Now that we are off the legacy query index, refactor the
update to modify the index with changes rather than recreate
each time the subscription set changes.
  • Loading branch information
brharrington authored Oct 17, 2024
1 parent 946926a commit 6a1f1db
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,35 +46,33 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging {
private val subHandlers = new ConcurrentHashMap[String, ConcurrentSet[T]]()

@volatile private var subscriptionsList = List.empty[Subscription]
@volatile private var queryIndex = newIndex(Nil)
@volatile private var queryListChanged = false
private val queryIndex = QueryIndex.newInstance[Subscription](registry)

// Background process for updating the query index. It is not done inline because rebuilding
// the index can be computationally expensive.
private val ex =
new ScheduledThreadPoolExecutor(1, ThreadPools.threadFactory("ExpressionDatabase"))
ex.scheduleWithFixedDelay(() => regenerateQueryIndex(), 1, 1, TimeUnit.SECONDS)
ex.scheduleWithFixedDelay(() => updateQueryIndex(), 1, 1, TimeUnit.SECONDS)
ex.scheduleAtFixedRate(() => updateGauges(), 1, 1, TimeUnit.MINUTES)

private def newIndex(subs: List[Subscription]): QueryIndex[Subscription] = {
val idx = QueryIndex.newInstance[Subscription](registry)
subs.foreach { sub =>
idx.add(sub.query, sub)
}
idx
}

/** Rebuild the query index if there have been changes since it was last created. */
private[lwcapi] def regenerateQueryIndex(): Unit = {
private[lwcapi] def updateQueryIndex(): Unit = {
if (queryListChanged) {
queryListChanged = false
val previous = subscriptionsList.toSet
subscriptionsList = registrations
.values()
.asScala
.flatMap(_.subscriptions)
.toList
.distinct
queryIndex = newIndex(subscriptionsList)

val current = subscriptionsList.toSet
val added = current.diff(previous)
val removed = previous.diff(current)
added.foreach(s => queryIndex.add(s.query, s))
removed.foreach(s => queryIndex.remove(s.query, s))
}
}

Expand Down Expand Up @@ -285,7 +283,7 @@ class SubscriptionManager[T](registry: Registry) extends StrictLogging {
logger.debug("clearing all subscriptions")
registrations.clear()
queryListChanged = true
regenerateQueryIndex()
updateQueryIndex()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class ExpressionApiSuite extends MUnitRouteSuite {
splits.foreach { s =>
sm.subscribe("a", s)
}
sm.regenerateQueryIndex()
sm.updateQueryIndex()
Get("/lwc/api/v1/expressions/skan") ~> endpoint.routes ~> check {
val expected = s"""{"expressions":[$skanCount,$skanSum]}"""
assertEquals(unzip(responseAs[Array[Byte]]), expected)
Expand All @@ -121,7 +121,7 @@ class ExpressionApiSuite extends MUnitRouteSuite {
splits.foreach { s =>
sm.subscribe("a", s)
}
sm.regenerateQueryIndex()
sm.updateQueryIndex()
Get("/lwc/api/v1/expressions") ~> endpoint.routes ~> check {
val expected = s"""{"expressions":[$brhMax,$skanCount,$skanSum]}"""
assertEquals(unzip(responseAs[Array[Byte]]), expected)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ class SubscriptionManagerSuite extends FunSuite {

val exp1 = sub("name,exp1,:eq")
sm.subscribe(meta.streamId, exp1)
sm.regenerateQueryIndex()
sm.updateQueryIndex()
assertEquals(sm.subscriptions, List(exp1))

assert(!sm.register(meta, 1))
sm.regenerateQueryIndex()
sm.updateQueryIndex()
assertEquals(sm.subscriptions, List(exp1))
}

Expand All @@ -89,7 +89,7 @@ class SubscriptionManagerSuite extends FunSuite {

val subs = List(sub("name,exp1,:eq"), sub("name,exp2,:eq"))
sm.subscribe(meta.streamId, subs)
sm.regenerateQueryIndex()
sm.updateQueryIndex()

assertEquals(sm.subscriptions.toSet, subs.toSet)
}
Expand All @@ -102,7 +102,7 @@ class SubscriptionManagerSuite extends FunSuite {
val s = sub("name,exp1,:eq")
sm.subscribe(meta.streamId, s)
sm.subscribe(meta.streamId, s)
sm.regenerateQueryIndex()
sm.updateQueryIndex()

assertEquals(sm.subscriptions, List(s))
}
Expand All @@ -117,7 +117,7 @@ class SubscriptionManagerSuite extends FunSuite {
val s = sub("name,exp1,:eq")
sm.subscribe(a.streamId, s)
sm.subscribe(b.streamId, s)
sm.regenerateQueryIndex()
sm.updateQueryIndex()

assertEquals(sm.subscriptions, List(s))
assertEquals(sm.subscriptionsForStream(a.streamId), List(s))
Expand All @@ -133,7 +133,7 @@ class SubscriptionManagerSuite extends FunSuite {
val s = sub(expr)
sm.register(StreamMetadata("a"), 1)
sm.subscribe("a", s)
sm.regenerateQueryIndex()
sm.updateQueryIndex()
sm.subscriptionsForCluster(cluster).map(_.metadata.expression)
}

Expand Down

0 comments on commit 6a1f1db

Please sign in to comment.