From 25a4d691264b8aa129d9ea5f2e5a4fd7055863bb Mon Sep 17 00:00:00 2001 From: Grigorii Berezin Date: Mon, 13 May 2024 12:53:11 +0300 Subject: [PATCH 1/2] feat: stm quickstart --- build.sbt | 4 +- zio-quickstart-stm/build.sbt | 8 + .../scala/dev/zio/quickstart/CacheItem.scala | 3 + .../scala/dev/zio/quickstart/LRUCache.scala | 11 ++ .../dev/zio/quickstart/LRUCacheSTM.scala | 142 ++++++++++++++++++ .../UseLRUCacheWithMultipleFibers.scala | 50 ++++++ 6 files changed, 217 insertions(+), 1 deletion(-) create mode 100644 zio-quickstart-stm/build.sbt create mode 100644 zio-quickstart-stm/src/main/scala/dev/zio/quickstart/CacheItem.scala create mode 100644 zio-quickstart-stm/src/main/scala/dev/zio/quickstart/LRUCache.scala create mode 100644 zio-quickstart-stm/src/main/scala/dev/zio/quickstart/LRUCacheSTM.scala create mode 100644 zio-quickstart-stm/src/main/scala/dev/zio/quickstart/UseLRUCacheWithMultipleFibers.scala diff --git a/build.sbt b/build.sbt index 05a8389..6bb1bbc 100644 --- a/build.sbt +++ b/build.sbt @@ -34,7 +34,8 @@ lazy val root = `zio-quickstart-streams`, `zio-quickstart-encode-decode-json`, `zio-quickstart-cache`, - `zio-quickstart-prelude` + `zio-quickstart-prelude`, + `zio-quickstart-stm` ) lazy val `zio-quickstart-hello-world` = project @@ -52,3 +53,4 @@ lazy val `zio-quickstart-encode-decode-json` = project lazy val `zio-quickstart-reloadable-services` = project lazy val `zio-quickstart-cache` = project lazy val `zio-quickstart-prelude` = project +lazy val `zio-quickstart-stm` = project diff --git a/zio-quickstart-stm/build.sbt b/zio-quickstart-stm/build.sbt new file mode 100644 index 0000000..98c2c4a --- /dev/null +++ b/zio-quickstart-stm/build.sbt @@ -0,0 +1,8 @@ +scalaVersion := "2.13.13" + +libraryDependencies ++= Seq( + "dev.zio" %% "zio" % "2.0.19", + "dev.zio" %% "zio-macros" % "2.0.19" +) + +scalacOptions += "-Ymacro-annotations" diff --git a/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/CacheItem.scala b/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/CacheItem.scala new file mode 100644 index 0000000..7166d1a --- /dev/null +++ b/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/CacheItem.scala @@ -0,0 +1,3 @@ +package dev.zio.quickstart + +case class CacheItem[K, V](value: V, left: Option[K], right: Option[K]) diff --git a/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/LRUCache.scala b/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/LRUCache.scala new file mode 100644 index 0000000..c341547 --- /dev/null +++ b/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/LRUCache.scala @@ -0,0 +1,11 @@ +package dev.zio.quickstart + +import zio._ +import zio.macros.accessible + +@accessible +trait LRUCache[K, V] { + def get(key: K): IO[NoSuchElementException, V] + def put(key: K, value: V): UIO[Unit] + def getStatus: UIO[(Map[K, CacheItem[K, V]], Option[K], Option[K])] +} diff --git a/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/LRUCacheSTM.scala b/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/LRUCacheSTM.scala new file mode 100644 index 0000000..d421c34 --- /dev/null +++ b/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/LRUCacheSTM.scala @@ -0,0 +1,142 @@ +package dev.zio.quickstart + +import zio._ +import zio.stm._ + +case class LRUCacheSTM[K, V] private ( + private val capacity: Int, + private val items: TMap[K, CacheItem[K, V]], + private val startRef: TRef[Option[K]], + private val endRef: TRef[Option[K]] +) extends LRUCache[K, V] { self => + def get(key: K): IO[NoSuchElementException, V] = + (for { + optionItem <- self.items.get(key) + item <- STM + .fromOption(optionItem) + .mapError(_ => new NoSuchElementException(s"Key does not exist: $key")) + _ <- removeKeyFromList(key) *> addKeyToStartOfList(key) + } yield item.value).commitEither + + def put(key: K, value: V): UIO[Unit] = + STM + .ifSTM(self.items.contains(key))( + updateItem(key, value), + addNewItem(key, value) + ) + .commitEither + + val getStatus: UIO[(Map[K, CacheItem[K, V]], Option[K], Option[K])] = + (for { + items <- (items.keys zipWith items.values) { case (keys, values) => + (keys zip values).toMap + } + optionStart <- startRef.get + optionEnd <- endRef.get + } yield (items, optionStart, optionEnd)).commit + + private def updateItem(key: K, value: V): USTM[Unit] = + removeKeyFromList(key) *> + self.items.put(key, CacheItem(value, None, None)) *> + addKeyToStartOfList(key) + + private def addNewItem(key: K, value: V): USTM[Unit] = { + val newCacheItem = CacheItem[K, V](value, None, None) + STM.ifSTM(self.items.keys.map(_.length < self.capacity))( + self.items.put(key, newCacheItem) *> addKeyToStartOfList(key), + replaceEndCacheItem(key, newCacheItem) + ) + } + + private def replaceEndCacheItem( + key: K, + newCacheItem: CacheItem[K, V] + ): USTM[Unit] = + endRef.get.someOrElseSTM(STM.dieMessage(s"End is not defined!")).flatMap { + end => + removeKeyFromList(end) *> self.items.delete(end) *> self.items + .put(key, newCacheItem) *> addKeyToStartOfList(key) + } + + private def addKeyToStartOfList(key: K): USTM[Unit] = + for { + oldOptionStart <- self.startRef.get + _ <- getExistingCacheItem(key).flatMap { cacheItem => + self.items.put(key, cacheItem.copy(left = None, right = oldOptionStart)) + } + _ <- oldOptionStart match { + case Some(oldStart) => + getExistingCacheItem(oldStart).flatMap { oldStartCacheItem => + self.items.put(oldStart, oldStartCacheItem.copy(left = Some(key))) + } + case None => STM.unit + } + _ <- self.startRef.set(Some(key)) + _ <- self.endRef.updateSome { case None => Some(key) } + } yield () + + private def removeKeyFromList(key: K): USTM[Unit] = + for { + cacheItem <- getExistingCacheItem(key) + optionLeftKey = cacheItem.left + optionRightKey = cacheItem.right + _ <- (optionLeftKey, optionRightKey) match { + case (Some(l), Some(r)) => + updateLeftAndRightCacheItems(l, r) + case (Some(l), None) => + setNewEnd(l) + case (None, Some(r)) => + setNewStart(r) + case (None, None) => + clearStartAndEnd + } + } yield () + + private def updateLeftAndRightCacheItems(l: K, r: K): USTM[Unit] = + for { + leftCacheItem <- getExistingCacheItem(l) + rightCacheItem <- getExistingCacheItem(r) + _ <- self.items.put(l, leftCacheItem.copy(right = Some(r))) + _ <- self.items.put(r, rightCacheItem.copy(left = Some(l))) + } yield () + + private def setNewEnd(newEnd: K): USTM[Unit] = + for { + cacheItem <- getExistingCacheItem(newEnd) + _ <- self.items.put(newEnd, cacheItem.copy(right = None)) *> self.endRef + .set(Some(newEnd)) + } yield () + + private def setNewStart(newStart: K): USTM[Unit] = + for { + cacheItem <- getExistingCacheItem(newStart) + _ <- self.items.put( + newStart, + cacheItem.copy(left = None) + ) *> self.startRef.set(Some(newStart)) + } yield () + + private val clearStartAndEnd: STM[Nothing, Unit] = + self.startRef.set(None) *> self.endRef.set(None) + + private def getExistingCacheItem(key: K): USTM[CacheItem[K, V]] = + self.items + .get(key) + .someOrElseSTM(STM.dieMessage(s"Key $key does not exist, but it should!")) +} + +object LRUCacheSTM { + def layer[K: Tag, V: Tag](capacity: Int): ULayer[LRUCache[K, V]] = + ZLayer { + if (capacity > 0) { + (for { + itemsRef <- TMap.empty[K, CacheItem[K, V]] + startRef <- TRef.make(Option.empty[K]) + endRef <- TRef.make(Option.empty[K]) + } yield LRUCacheSTM[K, V](capacity, itemsRef, startRef, endRef)).commit + } else + ZIO.die( + new IllegalArgumentException("Capacity must be a positive number!") + ) + } +} diff --git a/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/UseLRUCacheWithMultipleFibers.scala b/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/UseLRUCacheWithMultipleFibers.scala new file mode 100644 index 0000000..1b93875 --- /dev/null +++ b/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/UseLRUCacheWithMultipleFibers.scala @@ -0,0 +1,50 @@ +package dev.zio.quickstart + +import zio._ + +// thanks jorge-vasquez-2301 for this example +object UseLRUCacheWithMultipleFibers extends ZIOAppDefault { + lazy val run = + (for { + fiberReporter <- reporter.forever.fork + fiberProducers <- startWorkers(producer) + fiberConsumers <- startWorkers(consumer) + _ <- + Console.readLine.orDie *> (fiberReporter <*> fiberProducers <*> fiberConsumers).interrupt + } yield ()).provideLayer(layer) + + lazy val layer = LRUCacheSTM.layer[Int, Int](capacity = 3) + + def startWorkers(worker: URIO[LRUCache[Int, Int], Unit]) = + ZIO.forkAll { + ZIO.replicate(100) { + worker.forever.catchAllCause(cause => + Console.printLineError(cause.prettyPrint) + ) + } + } + + lazy val producer: URIO[LRUCache[Int, Int], Unit] = + for { + number <- Random.nextIntBounded(100) + _ <- Console.printLine(s"Producing ($number, $number)").orDie *> LRUCache + .put(number, number) + } yield () + + lazy val consumer: URIO[LRUCache[Int, Int], Unit] = + (for { + key <- Random.nextIntBounded(100) + value <- Console + .printLine(s"Consuming key: $key") *> LRUCache.get[Int, Int](key) + _ <- Console.printLine(s"Consumed value: $value") + } yield ()).catchAll(ex => Console.printLine(ex.getMessage).orDie) + + lazy val reporter: URIO[LRUCache[Int, Int], Unit] = + for { + status <- LRUCache.getStatus[Int, Int] + (items, optionStart, optionEnd) = status + _ <- Console + .printLine(s"Items: $items, Start: $optionStart, End: $optionEnd") + .orDie + } yield () +} From 99c5eefe83ea0f54b6070c818dd0d1fc2af33fd4 Mon Sep 17 00:00:00 2001 From: Grigorii Berezin Date: Mon, 20 May 2024 08:53:08 +0300 Subject: [PATCH 2/2] feat: add list of quickstarts to README and add thanks for example --- README.md | 19 +++++++++++++++++++ .../UseLRUCacheWithMultipleFibers.scala | 1 - 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 88e922d..6613f76 100644 --- a/README.md +++ b/README.md @@ -18,3 +18,22 @@ Once you are inside the project directory, run the application: ```bash $ sbt run ``` + +## List of Quickstarts + +- [ZIO Cache](zio-quickstart-cache) +- [ZIO Json](zio-quickstart-encode-decode-json) +- [ZIO GraphQL Webservice](zio-quickstart-graphql-webservice) +- [ZIO Hello World](zio-quickstart-hello-world) +- [ZIO JUnit Tests](zio-quickstart-junit-integration) +- [ZIO Kafka](zio-quickstart-kafka) +- [ZIO Prelude](zio-quickstart-prelude) +- [ZIO Reloadable Service](zio-quickstart-reloadable-services) +- [ZIO RESTful webservice](zio-quickstart-restful-webservice) +- [ZIO RESTful webservice with configs](zio-quickstart-restful-webservice-configurable-app) +- [ZIO RESTful webservice with default logger](zio-quickstart-restful-webservice-logging) +- [ZIO RESTful webservice with custom logger](zio-quickstart-restful-webservice-custom-logger) +- [ZIO RESTful webservice with docker](zio-quickstart-restful-webservice-dockerize) +- [ZIO RESTful webservice with metrics](zio-quickstart-restful-webservice-metrics) +- [ZIO STM](zio-quickstart-stm) - many thanks to [@jorge-vasquez-2301](https://github.com/jorge-vasquez-2301) and his [article](https://scalac.io/blog/how-to-write-a-completely-lock-free-concurrent-lru-cache-with-zio-stm/) for this example +- [ZIO Streams](zio-quickstart-streams) diff --git a/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/UseLRUCacheWithMultipleFibers.scala b/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/UseLRUCacheWithMultipleFibers.scala index 1b93875..2aa6d5f 100644 --- a/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/UseLRUCacheWithMultipleFibers.scala +++ b/zio-quickstart-stm/src/main/scala/dev/zio/quickstart/UseLRUCacheWithMultipleFibers.scala @@ -2,7 +2,6 @@ package dev.zio.quickstart import zio._ -// thanks jorge-vasquez-2301 for this example object UseLRUCacheWithMultipleFibers extends ZIOAppDefault { lazy val run = (for {