From 15093fe9b5d82e665bb82b73d1500fdad2b7f1d4 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 17 May 2024 11:32:30 +0100 Subject: [PATCH 1/2] avro: go back to using input stream when parsing (#24) --- .../pekkohttpavro4s/AvroSupport.scala | 7 ++- .../ByteStringInputStream.scala | 49 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/ByteStringInputStream.scala diff --git a/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/AvroSupport.scala b/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/AvroSupport.scala index f02f522..1f924b1 100644 --- a/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/AvroSupport.scala +++ b/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/AvroSupport.scala @@ -112,7 +112,12 @@ trait AvroSupport { Try { if (bs.isEmpty) throw Unmarshaller.NoContentException - AvroInputStream.json[A].from(bs.toByteBuffer).build(schema).iterator.next() + val builder = + if (ByteStringInputStream.byteStringSupportsAsInputStream) + AvroInputStream.json[A].from(ByteStringInputStream(bs)) + else + AvroInputStream.json[A].from(bs.toArrayUnsafe()) + builder.build(schema).iterator.next() } } } diff --git a/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/ByteStringInputStream.scala b/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/ByteStringInputStream.scala new file mode 100644 index 0000000..19e0225 --- /dev/null +++ b/pekko-http-avro4s/src/main/scala/com/github/pjfanning/pekkohttpavro4s/ByteStringInputStream.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.github.pjfanning.pekkohttpavro4s + +import org.apache.pekko.util.ByteString +import org.apache.pekko.util.ByteString.ByteString1C + +import java.io.{ ByteArrayInputStream, InputStream } +import java.lang.invoke.{ MethodHandles, MethodType } +import scala.util.Try + +private[pekkohttpavro4s] object ByteStringInputStream { + private val byteStringInputStreamMethodTypeOpt = Try { + val lookup = MethodHandles.publicLookup() + val inputStreamMethodType = MethodType.methodType(classOf[InputStream]) + lookup.findVirtual(classOf[ByteString], "asInputStream", inputStreamMethodType) + }.toOption + + def apply(bs: ByteString): InputStream = bs match { + case cs: ByteString1C => + getInputStreamUnsafe(cs) + case _ => + if (byteStringInputStreamMethodTypeOpt.isDefined) { + byteStringInputStreamMethodTypeOpt.get.invoke(bs).asInstanceOf[InputStream] + } else { + getInputStreamUnsafe(bs) + } + } + + val byteStringSupportsAsInputStream: Boolean = byteStringInputStreamMethodTypeOpt.isDefined + + private def getInputStreamUnsafe(bs: ByteString): InputStream = + new ByteArrayInputStream(bs.toArrayUnsafe()) +} From a12e702856bc092239a71a5f275de7c731378b9e Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 17 May 2024 11:33:07 +0100 Subject: [PATCH 2/2] jackson: parse using a ByteBuffer (#23) * jackson: parse using a ByteBuffer Update JacksonSupport.scala * Update JacksonSupport.scala --- .../ByteStringInputStream.scala | 51 ------------------- .../pekkohttpjackson/JacksonSupport.scala | 23 ++++++--- 2 files changed, 15 insertions(+), 59 deletions(-) delete mode 100644 pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/ByteStringInputStream.scala diff --git a/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/ByteStringInputStream.scala b/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/ByteStringInputStream.scala deleted file mode 100644 index ef6ed9e..0000000 --- a/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/ByteStringInputStream.scala +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.github.pjfanning.pekkohttpjackson - -import java.io.{ ByteArrayInputStream, InputStream } -import java.lang.invoke.{ MethodHandles, MethodType } - -import scala.util.Try - -import org.apache.pekko -import pekko.util.ByteString -import pekko.util.ByteString.ByteString1C - -private[pekkohttpjackson] object ByteStringInputStream { - private val byteStringInputStreamMethodTypeOpt = Try { - val lookup = MethodHandles.publicLookup() - val inputStreamMethodType = MethodType.methodType(classOf[InputStream]) - lookup.findVirtual(classOf[ByteString], "asInputStream", inputStreamMethodType) - }.toOption - - def apply(bs: ByteString): InputStream = bs match { - case cs: ByteString1C => - getInputStreamUnsafe(cs) - case _ => - if (byteStringInputStreamMethodTypeOpt.isDefined) { - byteStringInputStreamMethodTypeOpt.get.invoke(bs).asInstanceOf[InputStream] - } else { - getInputStreamUnsafe(bs) - } - } - - val byteStringSupportsAsInputStream: Boolean = byteStringInputStreamMethodTypeOpt.isDefined - - private def getInputStreamUnsafe(bs: ByteString): InputStream = - new ByteArrayInputStream(bs.toArrayUnsafe()) -} diff --git a/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/JacksonSupport.scala b/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/JacksonSupport.scala index c052e17..4665e8b 100644 --- a/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/JacksonSupport.scala +++ b/pekko-http-jackson/src/main/scala/com/github/pjfanning/pekkohttpjackson/JacksonSupport.scala @@ -21,10 +21,12 @@ import com.fasterxml.jackson.core.util.{ BufferRecycler, JsonRecyclerPools, Recy import com.fasterxml.jackson.core.{ JsonFactory, JsonFactoryBuilder, + JsonParser, StreamReadConstraints, StreamReadFeature, StreamWriteConstraints } +import com.fasterxml.jackson.core.async.ByteBufferFeeder import com.fasterxml.jackson.databind.{ Module, ObjectMapper } import com.fasterxml.jackson.databind.json.JsonMapper import com.fasterxml.jackson.module.scala.{ ClassTagExtensions, JavaTypeable } @@ -204,14 +206,19 @@ trait JacksonSupport { implicit def fromByteStringUnmarshaller[A: JavaTypeable](implicit objectMapper: ObjectMapper with ClassTagExtensions = defaultObjectMapper ): Unmarshaller[ByteString, A] = - if (ByteStringInputStream.byteStringSupportsAsInputStream) { - Unmarshaller { _ => bs => - Future.fromTry(Try(objectMapper.readValue[A](ByteStringInputStream(bs)))) - } - } else { - Unmarshaller { _ => bs => - Future.fromTry(Try(objectMapper.readValue[A](bs.toArrayUnsafe()))) - } + Unmarshaller { _ => bs => + Future.fromTry(Try { + val parser = objectMapper.getFactory + .createNonBlockingByteBufferParser() + .asInstanceOf[JsonParser with ByteBufferFeeder] + bs match { + case bs: ByteString.ByteStrings => + bs.asByteBuffers.foreach(parser.feedInput) + case bytes => + parser.feedInput(bytes.asByteBuffer) + } + objectMapper.readValue[A](parser) + }) } /**