Skip to content

Commit

Permalink
do json parsing asynchronously on ExecutionContext
Browse files Browse the repository at this point in the history
add back imports
  • Loading branch information
pjfanning committed May 17, 2024
1 parent 33cf12d commit 73362f8
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import org.apache.pekko.http.scaladsl.util.FastFuture
import org.apache.pekko.stream.scaladsl.{ Flow, Source }
import org.apache.pekko.util.ByteString
import argonaut.{ DecodeJson, EncodeJson, Json, Parse, PrettyParams }

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.util.control.NonFatal

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import com.sksamuel.avro4s.{
Encoder,
SchemaFor
}

import java.io.ByteArrayOutputStream
import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.util.Try
import scala.util.control.NonFatal

/**
Expand Down Expand Up @@ -106,15 +106,12 @@ trait AvroSupport {
* unmarshaller for any `A` value
*/
implicit def fromByteStringUnmarshaller[A: SchemaFor: Decoder]: Unmarshaller[ByteString, A] =
Unmarshaller { _ => bs =>
Future.fromTry {
Unmarshaller { ec => bs =>
Future {
val schema = AvroSchema[A]

Try {
if (bs.isEmpty) throw Unmarshaller.NoContentException
AvroInputStream.json[A].from(bs.toByteBuffer).build(schema).iterator.next()
}
}
if (bs.isEmpty) throw Unmarshaller.NoContentException
AvroInputStream.json[A].from(bs.toByteBuffer).build(schema).iterator.next()
}(ec)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import cats.syntax.either.catsSyntaxEither
import cats.syntax.show.toShow
import io.circe.{ Decoder, DecodingFailure, Encoder, Json, Printer, jawn }
import io.circe.parser.parse

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.util.control.NonFatal
Expand Down Expand Up @@ -200,7 +201,14 @@ trait BaseCirceSupport {
implicit def unmarshaller[A: Decoder]: FromEntityUnmarshaller[A]

def byteStringJsonUnmarshaller: Unmarshaller[ByteString, Json] =
Unmarshaller(_ => bs => Future.fromTry(jawn.parseByteBuffer(bs.asByteBuffer).toTry))
Unmarshaller { ec => bs =>
Future {
jawn.parseByteBuffer(bs.asByteBuffer) match {
case Right(json) => json
case Left(pf) => throw pf
}
}(ec)
}

/**
* HTTP entity => `Source[A, _]`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.pekko.http.scaladsl.util.FastFuture
import org.apache.pekko.stream.scaladsl.{ Flow, Source }
import org.apache.pekko.util.ByteString

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.util.Try
import scala.util.control.NonFatal
Expand Down Expand Up @@ -205,12 +206,12 @@ trait JacksonSupport {
objectMapper: ObjectMapper with ClassTagExtensions = defaultObjectMapper
): Unmarshaller[ByteString, A] =
if (ByteStringInputStream.byteStringSupportsAsInputStream) {
Unmarshaller { _ => bs =>
Future.fromTry(Try(objectMapper.readValue[A](ByteStringInputStream(bs))))
Unmarshaller { ec => bs =>
Future(objectMapper.readValue[A](ByteStringInputStream(bs)))(ec)
}
} else {
Unmarshaller { _ => bs =>
Future.fromTry(Try(objectMapper.readValue[A](bs.toArrayUnsafe())))
Unmarshaller { ec => bs =>
Future(objectMapper.readValue[A](bs.toArrayUnsafe()))(ec)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ import org.apache.pekko.stream.Materializer
import org.apache.pekko.stream.scaladsl.{ Flow, Source }
import org.apache.pekko.util.ByteString
import com.github.pjfanning.pekkohttpjson4s.Json4sSupport.ShouldWritePretty.False
import java.lang.reflect.InvocationTargetException
import org.json4s.{ Formats, MappingException, Serialization }

import java.lang.reflect.InvocationTargetException
import scala.collection.immutable.Seq
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try
import scala.util.control.NonFatal

/**
Expand Down Expand Up @@ -167,7 +167,7 @@ trait Json4sSupport {
formats: Formats
): Unmarshaller[ByteString, A] = {
val result: Unmarshaller[ByteString, A] =
Unmarshaller(_ => bs => Future.fromTry(Try(s.read(bs.utf8String))))
Unmarshaller(ec => bs => Future(s.read(bs.utf8String))(ec))

result.recover(throwCause)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import com.github.plokhotnyuk.jsoniter_scala.core._

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.util.Try
import scala.util.control.NonFatal

/**
Expand Down Expand Up @@ -137,7 +136,7 @@ trait JsoniterScalaSupport {
codec: JsonValueCodec[A],
config: ReaderConfig = defaultReaderConfig
): Unmarshaller[ByteString, A] =
Unmarshaller(_ => bs => Future.fromTry(Try(readFromByteBuffer(bs.asByteBuffer, config))))
Unmarshaller(ec => bs => Future(readFromByteBuffer(bs.asByteBuffer, config))(ec))

/**
* HTTP entity => `Source[A, _]`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import org.apache.pekko.stream.scaladsl.{ Flow, Source }
import org.apache.pekko.util.ByteString
import nrktkt.ninny._

import scala.concurrent.Future
import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.util.{ Failure, Success }
import scala.util.control.NonFatal

trait NinnySupport {
Expand Down Expand Up @@ -89,7 +90,15 @@ trait NinnySupport {
* unmarshaller for any `A` value
*/
implicit def fromByteStringUnmarshaller[A: FromJson]: Unmarshaller[ByteString, A] =
Unmarshaller(_ => bs => Future.fromTry(Json.parse(bs.utf8String).to[A]))
Unmarshaller(ec =>
bs =>
Future {
Json.parse(bs.utf8String).to[A] match {
case Success(value) => value
case Failure(exception) => throw exception
}
}(ec)
)

/**
* HTTP entity => `Source[A, _]`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ import org.apache.pekko.http.scaladsl.util.FastFuture
import org.apache.pekko.stream.scaladsl.{ Flow, Source }
import org.apache.pekko.util.ByteString
import play.api.libs.json.{ JsError, JsResultException, JsValue, Json, Reads, Writes }

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.util.Try
import scala.util.control.NonFatal

/**
Expand Down Expand Up @@ -141,9 +141,9 @@ trait PlayJsonSupport {
*/
implicit def fromByteStringUnmarshaller[A: Reads]: Unmarshaller[ByteString, A] =
if (ByteStringInputStream.byteStringSupportsAsInputStream)
Unmarshaller(_ => bs => Future.fromTry(Try(Json.parse(ByteStringInputStream(bs)).as[A])))
Unmarshaller(ec => bs => Future(Json.parse(ByteStringInputStream(bs)).as[A])(ec))
else
Unmarshaller(_ => bs => Future.fromTry(Try(Json.parse(bs.toArrayUnsafe()).as[A])))
Unmarshaller(ec => bs => Future(Json.parse(bs.toArrayUnsafe()).as[A])(ec))

/**
* HTTP entity => `Source[A, _]`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.pekko.util.ByteString

import scala.collection.immutable.Seq
import scala.concurrent.Future
import scala.util.Try
import scala.util.control.NonFatal

// This companion object only exists for binary compatibility as adding methods with default implementations
Expand Down Expand Up @@ -110,7 +109,7 @@ trait UpickleCustomizationSupport {
* unmarshaller for any `A` value
*/
implicit def fromByteStringUnmarshaller[A: api.Reader]: Unmarshaller[ByteString, A] =
Unmarshaller(_ => bs => Future.fromTry(Try(api.read(bs.toArrayUnsafe()))))
Unmarshaller(ec => bs => Future(api.read(bs.toArrayUnsafe()))(ec))

/**
* HTTP entity => `A`
Expand Down

0 comments on commit 73362f8

Please sign in to comment.