diff --git a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala index 54ecec00f..e5fcaa325 100644 --- a/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala +++ b/atlas-lwc-events/src/main/scala/com/netflix/atlas/lwc/events/DatapointConverter.scala @@ -28,7 +28,6 @@ import com.typesafe.scalalogging.StrictLogging import java.time.Duration import java.util.Locale import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.locks.ReentrantLock /** * Helper to convert a sequence of events into a data point. @@ -162,25 +161,16 @@ private[events] object DatapointConverter { private val buffer = new StepDouble(Double.NaN, params.clock, params.step) - // Lock to avoid race condition between update and flush that can otherwise result - // in the samples occasionally being missing for an aggregate. - private val lock = new ReentrantLock() private val sampleMapper: LwcEvent => List[Any] = params.sampleMapper.orNull - private var sample: List[Any] = Nil + @volatile private var sample: List[Any] = Nil + @volatile private var sampleStale: Boolean = true override def update(event: LwcEvent): Unit = { - if (sampleMapper == null) { - update(params.valueMapper(event)) - } else { - lock.lock() - try { - update(params.valueMapper(event)) - if (sample.isEmpty) - sample = sampleMapper(event) - } finally { - lock.unlock() - } + if (sampleMapper != null && sampleStale) { + sampleStale = false + sample = sampleMapper(event) } + update(params.valueMapper(event)) } override def update(value: Double): Unit = { @@ -193,14 +183,10 @@ private[events] object DatapointConverter { if (sampleMapper == null) { Nil } else { - lock.lock() - try { - val s = sample - sample = Nil - List(s) - } finally { - lock.unlock() - } + // Mark the sample as stale so it will get refreshed roughly once per + // step interval + sampleStale = true + List(sample) } } diff --git a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala index 6d7f2f4b4..2b7da4ec2 100644 --- a/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala +++ b/atlas-lwc-events/src/test/scala/com/netflix/atlas/lwc/events/DatapointConverterSuite.scala @@ -457,4 +457,35 @@ class DatapointConverterSuite extends FunSuite { } } } + + test("groupBy - sum with sample ordering") { + val expr = DataExpr.GroupBy(DataExpr.Sum(Query.Equal("value", "responseSize")), List("app")) + val events = List.newBuilder[LwcEvent] + val converter = DatapointConverter( + "id", + expr.toString, + expr, + clock, + step, + Some(e => List(e.extractValue("app"))), + (_, e) => events.addOne(e) + ) + converter.update(LwcEvent(Map("responseSize" -> 100, "app" -> "a"))) + clock.setWallTime(step + 1) + + // Update after passing step boundary and before flush + converter.update(LwcEvent(Map("responseSize" -> 100, "app" -> "a"))) + converter.flush(clock.wallTime()) + var results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head.asInstanceOf[DatapointEvent].samples, List(List("a"))) + + // Flush again with no update + clock.setWallTime(step * 2 + 1) + events.clear() + converter.flush(clock.wallTime()) + results = events.result() + assertEquals(results.size, 1) + assertEquals(results.head.asInstanceOf[DatapointEvent].samples, List(List("a"))) + } }