diff --git a/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/AvroSupport.scala b/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/AvroSupport.scala index f2f0428..1985e66 100644 --- a/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/AvroSupport.scala +++ b/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/AvroSupport.scala @@ -110,7 +110,12 @@ trait AvroSupport { Future { val schema = AvroSchema[A] if (bs.isEmpty) throw Unmarshaller.NoContentException - AvroInputStream.json[A].from(bs.toByteBuffer).build(schema).iterator.next() + val builder = + if (ByteStringInputStream.byteStringSupportsAsInputStream) + AvroInputStream.json[A].from(ByteStringInputStream(bs)) + else + AvroInputStream.json[A].from(bs.toArrayUnsafe()) + builder.build(schema).iterator.next() }(ec) } diff --git a/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/ByteStringInputStream.scala b/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/ByteStringInputStream.scala similarity index 89% rename from pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/ByteStringInputStream.scala rename to pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/ByteStringInputStream.scala index ef6ed9e..19e0225 100644 --- a/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/ByteStringInputStream.scala +++ b/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/ByteStringInputStream.scala @@ -15,18 +15,16 @@ * limitations under the License. */ -package com.github.pjfanning.pekkohttpjackson +package com.github.pjfanning.pekkohttpavro4s + +import org.apache.pekko.util.ByteString +import org.apache.pekko.util.ByteString.ByteString1C import java.io.{ ByteArrayInputStream, InputStream } import java.lang.invoke.{ MethodHandles, MethodType } - import scala.util.Try -import org.apache.pekko -import pekko.util.ByteString -import pekko.util.ByteString.ByteString1C - -private[pekkohttpjackson] object ByteStringInputStream { +private[pekkohttpavro4s] object ByteStringInputStream { private val byteStringInputStreamMethodTypeOpt = Try { val lookup = MethodHandles.publicLookup() val inputStreamMethodType = MethodType.methodType(classOf[InputStream]) diff --git a/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/JacksonSupport.scala b/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/JacksonSupport.scala index 2a94032..5668a61 100644 --- a/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/JacksonSupport.scala +++ b/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/JacksonSupport.scala @@ -21,10 +21,12 @@ import com.fasterxml.jackson.core.util.{ BufferRecycler, JsonRecyclerPools, Recy import com.fasterxml.jackson.core.{ JsonFactory, JsonFactoryBuilder, + JsonParser, StreamReadConstraints, StreamReadFeature, StreamWriteConstraints } +import com.fasterxml.jackson.core.async.ByteBufferFeeder import com.fasterxml.jackson.databind.{ Module, ObjectMapper } import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.module.scala.{ ClassTagExtensions, JavaTypeable } @@ -205,14 +207,19 @@ trait JacksonSupport { implicit def fromByteStringUnmarshaller[A: JavaTypeable](implicit objectMapper: ObjectMapper with ClassTagExtensions = defaultObjectMapper ): Unmarshaller[ByteString, A] = - if (ByteStringInputStream.byteStringSupportsAsInputStream) { - Unmarshaller { ec => bs => - Future(objectMapper.readValue[A](ByteStringInputStream(bs)))(ec) - } - } else { - Unmarshaller { ec => bs => - Future(objectMapper.readValue[A](bs.toArrayUnsafe()))(ec) - } + Unmarshaller { ec => bs => + Future { + val parser = objectMapper.getFactory + .createNonBlockingByteBufferParser() + .asInstanceOf[JsonParser with ByteBufferFeeder] + bs match { + case bs: ByteString.ByteStrings => + bs.asByteBuffers.foreach(parser.feedInput) + case bytes => + parser.feedInput(bytes.asByteBuffer) + } + objectMapper.readValue[A](parser) + }(ec) } /**