Skip to content

Commit

Permalink
Merge branch 'master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
khajavi authored May 20, 2024
2 parents 0a03839 + aab319b commit 9597fab
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 1 deletion.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,22 @@ Once you are inside the project directory, run the application:
```bash
$ sbt run
```

## List of Quickstarts

- [ZIO Cache](zio-quickstart-cache)
- [ZIO Json](zio-quickstart-encode-decode-json)
- [ZIO GraphQL Webservice](zio-quickstart-graphql-webservice)
- [ZIO Hello World](zio-quickstart-hello-world)
- [ZIO JUnit Tests](zio-quickstart-junit-integration)
- [ZIO Kafka](zio-quickstart-kafka)
- [ZIO Prelude](zio-quickstart-prelude)
- [ZIO Reloadable Service](zio-quickstart-reloadable-services)
- [ZIO RESTful webservice](zio-quickstart-restful-webservice)
- [ZIO RESTful webservice with configs](zio-quickstart-restful-webservice-configurable-app)
- [ZIO RESTful webservice with default logger](zio-quickstart-restful-webservice-logging)
- [ZIO RESTful webservice with custom logger](zio-quickstart-restful-webservice-custom-logger)
- [ZIO RESTful webservice with docker](zio-quickstart-restful-webservice-dockerize)
- [ZIO RESTful webservice with metrics](zio-quickstart-restful-webservice-metrics)
- [ZIO STM](zio-quickstart-stm) - many thanks to [@jorge-vasquez-2301](https://github.com/jorge-vasquez-2301) and his [article](https://scalac.io/blog/how-to-write-a-completely-lock-free-concurrent-lru-cache-with-zio-stm/) for this example
- [ZIO Streams](zio-quickstart-streams)
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ lazy val root =
`zio-quickstart-streams`,
`zio-quickstart-encode-decode-json`,
`zio-quickstart-cache`,
`zio-quickstart-prelude`
`zio-quickstart-prelude`,
`zio-quickstart-stm`
)

lazy val `zio-quickstart-hello-world` = project
Expand All @@ -52,3 +53,4 @@ lazy val `zio-quickstart-encode-decode-json` = project
lazy val `zio-quickstart-reloadable-services` = project
lazy val `zio-quickstart-cache` = project
lazy val `zio-quickstart-prelude` = project
lazy val `zio-quickstart-stm` = project
8 changes: 8 additions & 0 deletions zio-quickstart-stm/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
scalaVersion := "2.13.13"

libraryDependencies ++= Seq(
"dev.zio" %% "zio" % "2.0.19",
"dev.zio" %% "zio-macros" % "2.0.19"
)

scalacOptions += "-Ymacro-annotations"
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package dev.zio.quickstart

case class CacheItem[K, V](value: V, left: Option[K], right: Option[K])
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package dev.zio.quickstart

import zio._
import zio.macros.accessible

@accessible
trait LRUCache[K, V] {
def get(key: K): IO[NoSuchElementException, V]
def put(key: K, value: V): UIO[Unit]
def getStatus: UIO[(Map[K, CacheItem[K, V]], Option[K], Option[K])]
}
142 changes: 142 additions & 0 deletions zio-quickstart-stm/src/main/scala/dev/zio/quickstart/LRUCacheSTM.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package dev.zio.quickstart

import zio._
import zio.stm._

case class LRUCacheSTM[K, V] private (
private val capacity: Int,
private val items: TMap[K, CacheItem[K, V]],
private val startRef: TRef[Option[K]],
private val endRef: TRef[Option[K]]
) extends LRUCache[K, V] { self =>
def get(key: K): IO[NoSuchElementException, V] =
(for {
optionItem <- self.items.get(key)
item <- STM
.fromOption(optionItem)
.mapError(_ => new NoSuchElementException(s"Key does not exist: $key"))
_ <- removeKeyFromList(key) *> addKeyToStartOfList(key)
} yield item.value).commitEither

def put(key: K, value: V): UIO[Unit] =
STM
.ifSTM(self.items.contains(key))(
updateItem(key, value),
addNewItem(key, value)
)
.commitEither

val getStatus: UIO[(Map[K, CacheItem[K, V]], Option[K], Option[K])] =
(for {
items <- (items.keys zipWith items.values) { case (keys, values) =>
(keys zip values).toMap
}
optionStart <- startRef.get
optionEnd <- endRef.get
} yield (items, optionStart, optionEnd)).commit

private def updateItem(key: K, value: V): USTM[Unit] =
removeKeyFromList(key) *>
self.items.put(key, CacheItem(value, None, None)) *>
addKeyToStartOfList(key)

private def addNewItem(key: K, value: V): USTM[Unit] = {
val newCacheItem = CacheItem[K, V](value, None, None)
STM.ifSTM(self.items.keys.map(_.length < self.capacity))(
self.items.put(key, newCacheItem) *> addKeyToStartOfList(key),
replaceEndCacheItem(key, newCacheItem)
)
}

private def replaceEndCacheItem(
key: K,
newCacheItem: CacheItem[K, V]
): USTM[Unit] =
endRef.get.someOrElseSTM(STM.dieMessage(s"End is not defined!")).flatMap {
end =>
removeKeyFromList(end) *> self.items.delete(end) *> self.items
.put(key, newCacheItem) *> addKeyToStartOfList(key)
}

private def addKeyToStartOfList(key: K): USTM[Unit] =
for {
oldOptionStart <- self.startRef.get
_ <- getExistingCacheItem(key).flatMap { cacheItem =>
self.items.put(key, cacheItem.copy(left = None, right = oldOptionStart))
}
_ <- oldOptionStart match {
case Some(oldStart) =>
getExistingCacheItem(oldStart).flatMap { oldStartCacheItem =>
self.items.put(oldStart, oldStartCacheItem.copy(left = Some(key)))
}
case None => STM.unit
}
_ <- self.startRef.set(Some(key))
_ <- self.endRef.updateSome { case None => Some(key) }
} yield ()

private def removeKeyFromList(key: K): USTM[Unit] =
for {
cacheItem <- getExistingCacheItem(key)
optionLeftKey = cacheItem.left
optionRightKey = cacheItem.right
_ <- (optionLeftKey, optionRightKey) match {
case (Some(l), Some(r)) =>
updateLeftAndRightCacheItems(l, r)
case (Some(l), None) =>
setNewEnd(l)
case (None, Some(r)) =>
setNewStart(r)
case (None, None) =>
clearStartAndEnd
}
} yield ()

private def updateLeftAndRightCacheItems(l: K, r: K): USTM[Unit] =
for {
leftCacheItem <- getExistingCacheItem(l)
rightCacheItem <- getExistingCacheItem(r)
_ <- self.items.put(l, leftCacheItem.copy(right = Some(r)))
_ <- self.items.put(r, rightCacheItem.copy(left = Some(l)))
} yield ()

private def setNewEnd(newEnd: K): USTM[Unit] =
for {
cacheItem <- getExistingCacheItem(newEnd)
_ <- self.items.put(newEnd, cacheItem.copy(right = None)) *> self.endRef
.set(Some(newEnd))
} yield ()

private def setNewStart(newStart: K): USTM[Unit] =
for {
cacheItem <- getExistingCacheItem(newStart)
_ <- self.items.put(
newStart,
cacheItem.copy(left = None)
) *> self.startRef.set(Some(newStart))
} yield ()

private val clearStartAndEnd: STM[Nothing, Unit] =
self.startRef.set(None) *> self.endRef.set(None)

private def getExistingCacheItem(key: K): USTM[CacheItem[K, V]] =
self.items
.get(key)
.someOrElseSTM(STM.dieMessage(s"Key $key does not exist, but it should!"))
}

object LRUCacheSTM {
def layer[K: Tag, V: Tag](capacity: Int): ULayer[LRUCache[K, V]] =
ZLayer {
if (capacity > 0) {
(for {
itemsRef <- TMap.empty[K, CacheItem[K, V]]
startRef <- TRef.make(Option.empty[K])
endRef <- TRef.make(Option.empty[K])
} yield LRUCacheSTM[K, V](capacity, itemsRef, startRef, endRef)).commit
} else
ZIO.die(
new IllegalArgumentException("Capacity must be a positive number!")
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package dev.zio.quickstart

import zio._

object UseLRUCacheWithMultipleFibers extends ZIOAppDefault {
lazy val run =
(for {
fiberReporter <- reporter.forever.fork
fiberProducers <- startWorkers(producer)
fiberConsumers <- startWorkers(consumer)
_ <-
Console.readLine.orDie *> (fiberReporter <*> fiberProducers <*> fiberConsumers).interrupt
} yield ()).provideLayer(layer)

lazy val layer = LRUCacheSTM.layer[Int, Int](capacity = 3)

def startWorkers(worker: URIO[LRUCache[Int, Int], Unit]) =
ZIO.forkAll {
ZIO.replicate(100) {
worker.forever.catchAllCause(cause =>
Console.printLineError(cause.prettyPrint)
)
}
}

lazy val producer: URIO[LRUCache[Int, Int], Unit] =
for {
number <- Random.nextIntBounded(100)
_ <- Console.printLine(s"Producing ($number, $number)").orDie *> LRUCache
.put(number, number)
} yield ()

lazy val consumer: URIO[LRUCache[Int, Int], Unit] =
(for {
key <- Random.nextIntBounded(100)
value <- Console
.printLine(s"Consuming key: $key") *> LRUCache.get[Int, Int](key)
_ <- Console.printLine(s"Consumed value: $value")
} yield ()).catchAll(ex => Console.printLine(ex.getMessage).orDie)

lazy val reporter: URIO[LRUCache[Int, Int], Unit] =
for {
status <- LRUCache.getStatus[Int, Int]
(items, optionStart, optionEnd) = status
_ <- Console
.printLine(s"Items: $items, Start: $optionStart, End: $optionEnd")
.orDie
} yield ()
}

0 comments on commit 9597fab

Please sign in to comment.