From 73362f8f476510799b72e9f09e87a82a37a47462 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 16 May 2024 18:57:49 +0100 Subject: [PATCH] do json parsing asynchronously on ExecutionContext add back imports --- .../pekkohttpargonaut/ArgonautSupport.scala | 2 ++ .../pjfanning/pekkohttpavro4s/AvroSupport.scala | 15 ++++++--------- .../pjfanning/pekkohttpcirce/CirceSupport.scala | 10 +++++++++- .../pekkohttpjackson/JacksonSupport.scala | 9 +++++---- .../pjfanning/pekkohttpjson4s/Json4sSupport.scala | 6 +++--- .../JsoniterScalaSupport.scala | 3 +-- .../pjfanning/pekkohttpninny/NinnySupport.scala | 13 +++++++++++-- .../pekkohttpplayjson/PlayJsonSupport.scala | 6 +++--- .../UpickleCustomizationSupport.scala | 3 +-- 9 files changed, 41 insertions(+), 26 deletions(-) diff --git a/pekko-http-argonaut/src/main/scala/com/github/pjfanning/pekkohttpargonaut/ArgonautSupport.scala b/pekko-http-argonaut/src/main/scala/com/github/pjfanning/pekkohttpargonaut/ArgonautSupport.scala index 98af236..30a3419 100644 --- a/pekko-http-argonaut/src/main/scala/com/github/pjfanning/pekkohttpargonaut/ArgonautSupport.scala +++ b/pekko-http-argonaut/src/main/scala/com/github/pjfanning/pekkohttpargonaut/ArgonautSupport.scala @@ -35,6 +35,8 @@ import org.apache.pekko.http.scaladsl.util.FastFuture import org.apache.pekko.stream.scaladsl.{ Flow, Source } import org.apache.pekko.util.ByteString import argonaut.{ DecodeJson, EncodeJson, Json, Parse, PrettyParams } + +import scala.collection.immutable.Seq import scala.concurrent.Future import scala.util.control.NonFatal diff --git a/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/AvroSupport.scala b/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/AvroSupport.scala index f02f522..f2f0428 100644 --- a/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/AvroSupport.scala +++ b/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/AvroSupport.scala @@ -37,10 +37,10 @@ import com.sksamuel.avro4s.{ Encoder, SchemaFor } + import java.io.ByteArrayOutputStream import scala.collection.immutable.Seq import scala.concurrent.Future -import scala.util.Try import scala.util.control.NonFatal /** @@ -106,15 +106,12 @@ trait AvroSupport { * unmarshaller for any `A` value */ implicit def fromByteStringUnmarshaller[A: SchemaFor: Decoder]: Unmarshaller[ByteString, A] = - Unmarshaller { _ => bs => - Future.fromTry { + Unmarshaller { ec => bs => + Future { val schema = AvroSchema[A] - - Try { - if (bs.isEmpty) throw Unmarshaller.NoContentException - AvroInputStream.json[A].from(bs.toByteBuffer).build(schema).iterator.next() - } - } + if (bs.isEmpty) throw Unmarshaller.NoContentException + AvroInputStream.json[A].from(bs.toByteBuffer).build(schema).iterator.next() + }(ec) } /** diff --git a/pekko-http-circe/src/main/scala/com/github/pjfanning/pekkohttpcirce/CirceSupport.scala b/pekko-http-circe/src/main/scala/com/github/pjfanning/pekkohttpcirce/CirceSupport.scala index 432bfcf..8867d71 100644 --- a/pekko-http-circe/src/main/scala/com/github/pjfanning/pekkohttpcirce/CirceSupport.scala +++ b/pekko-http-circe/src/main/scala/com/github/pjfanning/pekkohttpcirce/CirceSupport.scala @@ -40,6 +40,7 @@ import cats.syntax.either.catsSyntaxEither import cats.syntax.show.toShow import io.circe.{ Decoder, DecodingFailure, Encoder, Json, Printer, jawn } import io.circe.parser.parse + import scala.collection.immutable.Seq import scala.concurrent.Future import scala.util.control.NonFatal @@ -200,7 +201,14 @@ trait BaseCirceSupport { implicit def unmarshaller[A: Decoder]: FromEntityUnmarshaller[A] def byteStringJsonUnmarshaller: Unmarshaller[ByteString, Json] = - Unmarshaller(_ => bs => Future.fromTry(jawn.parseByteBuffer(bs.asByteBuffer).toTry)) + Unmarshaller { ec => bs => + Future { + jawn.parseByteBuffer(bs.asByteBuffer) match { + case Right(json) => json + case Left(pf) => throw pf + } + }(ec) + } /** * HTTP entity => `Source[A, _]` diff --git a/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/JacksonSupport.scala b/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/JacksonSupport.scala index c052e17..2a94032 100644 --- a/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/JacksonSupport.scala +++ b/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/JacksonSupport.scala @@ -49,6 +49,7 @@ import org.apache.pekko.http.scaladsl.util.FastFuture import org.apache.pekko.stream.scaladsl.{ Flow, Source } import org.apache.pekko.util.ByteString +import scala.collection.immutable.Seq import scala.concurrent.Future import scala.util.Try import scala.util.control.NonFatal @@ -205,12 +206,12 @@ trait JacksonSupport { objectMapper: ObjectMapper with ClassTagExtensions = defaultObjectMapper ): Unmarshaller[ByteString, A] = if (ByteStringInputStream.byteStringSupportsAsInputStream) { - Unmarshaller { _ => bs => - Future.fromTry(Try(objectMapper.readValue[A](ByteStringInputStream(bs)))) + Unmarshaller { ec => bs => + Future(objectMapper.readValue[A](ByteStringInputStream(bs)))(ec) } } else { - Unmarshaller { _ => bs => - Future.fromTry(Try(objectMapper.readValue[A](bs.toArrayUnsafe()))) + Unmarshaller { ec => bs => + Future(objectMapper.readValue[A](bs.toArrayUnsafe()))(ec) } } diff --git a/pekko-http-json4s/src/main/scala/com/github/pjfanning/pekkohttpjson4s/Json4sSupport.scala b/pekko-http-json4s/src/main/scala/com/github/pjfanning/pekkohttpjson4s/Json4sSupport.scala index 56ce814..9b392a0 100644 --- a/pekko-http-json4s/src/main/scala/com/github/pjfanning/pekkohttpjson4s/Json4sSupport.scala +++ b/pekko-http-json4s/src/main/scala/com/github/pjfanning/pekkohttpjson4s/Json4sSupport.scala @@ -36,11 +36,11 @@ import org.apache.pekko.stream.Materializer import org.apache.pekko.stream.scaladsl.{ Flow, Source } import org.apache.pekko.util.ByteString import com.github.pjfanning.pekkohttpjson4s.Json4sSupport.ShouldWritePretty.False -import java.lang.reflect.InvocationTargetException import org.json4s.{ Formats, MappingException, Serialization } + +import java.lang.reflect.InvocationTargetException import scala.collection.immutable.Seq import scala.concurrent.{ ExecutionContext, Future } -import scala.util.Try import scala.util.control.NonFatal /** @@ -167,7 +167,7 @@ trait Json4sSupport { formats: Formats ): Unmarshaller[ByteString, A] = { val result: Unmarshaller[ByteString, A] = - Unmarshaller(_ => bs => Future.fromTry(Try(s.read(bs.utf8String)))) + Unmarshaller(ec => bs => Future(s.read(bs.utf8String))(ec)) result.recover(throwCause) } diff --git a/pekko-http-jsoniter-scala/src/main/scala/com/github/pjfanning/pekkohttpjsoniterscala/JsoniterScalaSupport.scala b/pekko-http-jsoniter-scala/src/main/scala/com/github/pjfanning/pekkohttpjsoniterscala/JsoniterScalaSupport.scala index 65c3abe..4e25413 100644 --- a/pekko-http-jsoniter-scala/src/main/scala/com/github/pjfanning/pekkohttpjsoniterscala/JsoniterScalaSupport.scala +++ b/pekko-http-jsoniter-scala/src/main/scala/com/github/pjfanning/pekkohttpjsoniterscala/JsoniterScalaSupport.scala @@ -39,7 +39,6 @@ import com.github.plokhotnyuk.jsoniter_scala.core._ import scala.collection.immutable.Seq import scala.concurrent.Future -import scala.util.Try import scala.util.control.NonFatal /** @@ -137,7 +136,7 @@ trait JsoniterScalaSupport { codec: JsonValueCodec[A], config: ReaderConfig = defaultReaderConfig ): Unmarshaller[ByteString, A] = - Unmarshaller(_ => bs => Future.fromTry(Try(readFromByteBuffer(bs.asByteBuffer, config)))) + Unmarshaller(ec => bs => Future(readFromByteBuffer(bs.asByteBuffer, config))(ec)) /** * HTTP entity => `Source[A, _]` diff --git a/pekko-http-ninny/src/main/scala/com/github/pjfanning/pekkohttpninny/NinnySupport.scala b/pekko-http-ninny/src/main/scala/com/github/pjfanning/pekkohttpninny/NinnySupport.scala index 6491886..20093f4 100644 --- a/pekko-http-ninny/src/main/scala/com/github/pjfanning/pekkohttpninny/NinnySupport.scala +++ b/pekko-http-ninny/src/main/scala/com/github/pjfanning/pekkohttpninny/NinnySupport.scala @@ -36,8 +36,9 @@ import org.apache.pekko.stream.scaladsl.{ Flow, Source } import org.apache.pekko.util.ByteString import nrktkt.ninny._ -import scala.concurrent.Future import scala.collection.immutable.Seq +import scala.concurrent.Future +import scala.util.{ Failure, Success } import scala.util.control.NonFatal trait NinnySupport { @@ -89,7 +90,15 @@ trait NinnySupport { * unmarshaller for any `A` value */ implicit def fromByteStringUnmarshaller[A: FromJson]: Unmarshaller[ByteString, A] = - Unmarshaller(_ => bs => Future.fromTry(Json.parse(bs.utf8String).to[A])) + Unmarshaller(ec => + bs => + Future { + Json.parse(bs.utf8String).to[A] match { + case Success(value) => value + case Failure(exception) => throw exception + } + }(ec) + ) /** * HTTP entity => `Source[A, _]` diff --git a/pekko-http-play-json/src/main/scala/com/github/pjfanning/pekkohttpplayjson/PlayJsonSupport.scala b/pekko-http-play-json/src/main/scala/com/github/pjfanning/pekkohttpplayjson/PlayJsonSupport.scala index 2d843f5..c274fae 100644 --- a/pekko-http-play-json/src/main/scala/com/github/pjfanning/pekkohttpplayjson/PlayJsonSupport.scala +++ b/pekko-http-play-json/src/main/scala/com/github/pjfanning/pekkohttpplayjson/PlayJsonSupport.scala @@ -31,9 +31,9 @@ import org.apache.pekko.http.scaladsl.util.FastFuture import org.apache.pekko.stream.scaladsl.{ Flow, Source } import org.apache.pekko.util.ByteString import play.api.libs.json.{ JsError, JsResultException, JsValue, Json, Reads, Writes } + import scala.collection.immutable.Seq import scala.concurrent.Future -import scala.util.Try import scala.util.control.NonFatal /** @@ -141,9 +141,9 @@ trait PlayJsonSupport { */ implicit def fromByteStringUnmarshaller[A: Reads]: Unmarshaller[ByteString, A] = if (ByteStringInputStream.byteStringSupportsAsInputStream) - Unmarshaller(_ => bs => Future.fromTry(Try(Json.parse(ByteStringInputStream(bs)).as[A]))) + Unmarshaller(ec => bs => Future(Json.parse(ByteStringInputStream(bs)).as[A])(ec)) else - Unmarshaller(_ => bs => Future.fromTry(Try(Json.parse(bs.toArrayUnsafe()).as[A]))) + Unmarshaller(ec => bs => Future(Json.parse(bs.toArrayUnsafe()).as[A])(ec)) /** * HTTP entity => `Source[A, _]` diff --git a/pekko-http-upickle/src/main/scala/com/github/pjfanning/pekkohttpupickle/UpickleCustomizationSupport.scala b/pekko-http-upickle/src/main/scala/com/github/pjfanning/pekkohttpupickle/UpickleCustomizationSupport.scala index 5880c4e..f872558 100644 --- a/pekko-http-upickle/src/main/scala/com/github/pjfanning/pekkohttpupickle/UpickleCustomizationSupport.scala +++ b/pekko-http-upickle/src/main/scala/com/github/pjfanning/pekkohttpupickle/UpickleCustomizationSupport.scala @@ -38,7 +38,6 @@ import org.apache.pekko.util.ByteString import scala.collection.immutable.Seq import scala.concurrent.Future -import scala.util.Try import scala.util.control.NonFatal // This companion object only exists for binary compatibility as adding methods with default implementations @@ -110,7 +109,7 @@ trait UpickleCustomizationSupport { * unmarshaller for any `A` value */ implicit def fromByteStringUnmarshaller[A: api.Reader]: Unmarshaller[ByteString, A] = - Unmarshaller(_ => bs => Future.fromTry(Try(api.read(bs.toArrayUnsafe())))) + Unmarshaller(ec => bs => Future(api.read(bs.toArrayUnsafe()))(ec)) /** * HTTP entity => `A`