Skip to content

Commit

Permalink
Merge branch 'main' into use-async
Browse files Browse the repository at this point in the history
  • Loading branch information
pjfanning authored May 17, 2024
2 parents 73362f8 + a12e702 commit 149da5a
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ trait AvroSupport {
Future {
val schema = AvroSchema[A]
if (bs.isEmpty) throw Unmarshaller.NoContentException
AvroInputStream.json[A].from(bs.toByteBuffer).build(schema).iterator.next()
val builder =
if (ByteStringInputStream.byteStringSupportsAsInputStream)
AvroInputStream.json[A].from(ByteStringInputStream(bs))
else
AvroInputStream.json[A].from(bs.toArrayUnsafe())
builder.build(schema).iterator.next()
}(ec)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,16 @@
* limitations under the License.
*/

package com.github.pjfanning.pekkohttpjackson
package com.github.pjfanning.pekkohttpavro4s

import org.apache.pekko.util.ByteString
import org.apache.pekko.util.ByteString.ByteString1C

import java.io.{ ByteArrayInputStream, InputStream }
import java.lang.invoke.{ MethodHandles, MethodType }

import scala.util.Try

import org.apache.pekko
import pekko.util.ByteString
import pekko.util.ByteString.ByteString1C

private[pekkohttpjackson] object ByteStringInputStream {
private[pekkohttpavro4s] object ByteStringInputStream {
private val byteStringInputStreamMethodTypeOpt = Try {
val lookup = MethodHandles.publicLookup()
val inputStreamMethodType = MethodType.methodType(classOf[InputStream])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import com.fasterxml.jackson.core.util.{ BufferRecycler, JsonRecyclerPools, Recy
import com.fasterxml.jackson.core.{
JsonFactory,
JsonFactoryBuilder,
JsonParser,
StreamReadConstraints,
StreamReadFeature,
StreamWriteConstraints
}
import com.fasterxml.jackson.core.async.ByteBufferFeeder
import com.fasterxml.jackson.databind.{ Module, ObjectMapper }
import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.module.scala.{ ClassTagExtensions, JavaTypeable }
Expand Down Expand Up @@ -205,14 +207,19 @@ trait JacksonSupport {
implicit def fromByteStringUnmarshaller[A: JavaTypeable](implicit
objectMapper: ObjectMapper with ClassTagExtensions = defaultObjectMapper
): Unmarshaller[ByteString, A] =
if (ByteStringInputStream.byteStringSupportsAsInputStream) {
Unmarshaller { ec => bs =>
Future(objectMapper.readValue[A](ByteStringInputStream(bs)))(ec)
}
} else {
Unmarshaller { ec => bs =>
Future(objectMapper.readValue[A](bs.toArrayUnsafe()))(ec)
}
Unmarshaller { ec => bs =>
Future {
val parser = objectMapper.getFactory
.createNonBlockingByteBufferParser()
.asInstanceOf[JsonParser with ByteBufferFeeder]
bs match {
case bs: ByteString.ByteStrings =>
bs.asByteBuffers.foreach(parser.feedInput)
case bytes =>
parser.feedInput(bytes.asByteBuffer)
}
objectMapper.readValue[A](parser)
}(ec)
}

/**
Expand Down

0 comments on commit 149da5a

Please sign in to comment.