From 240bcd209ea2fbc9c4a7031b0853d1ad63c018f6 Mon Sep 17 00:00:00 2001 From: Ladinu Chandrasinghe Date: Sat, 14 Sep 2024 02:39:12 -0700 Subject: [PATCH] Log PeerClosedStreamExceptions as warnings instead of errors (#360) --- .../pekko/grpc/javadsl/GrpcExceptionHandler.scala | 10 ++++++++-- .../pekko/grpc/scaladsl/GrpcExceptionHandler.scala | 9 +++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/GrpcExceptionHandler.scala b/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/GrpcExceptionHandler.scala index 76ec34c3..65b890f1 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/GrpcExceptionHandler.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/javadsl/GrpcExceptionHandler.scala @@ -26,6 +26,8 @@ import pekko.http.javadsl.model.HttpResponse import pekko.japi.{ Function => jFunction } import io.grpc.{ Status, StatusRuntimeException } +import org.apache.pekko.http.scaladsl.model.http2.PeerClosedStreamException + import scala.concurrent.ExecutionException import pekko.event.Logging @@ -40,6 +42,8 @@ object GrpcExceptionHandler { default(system) } + private def log(system: ActorSystem) = Logging(system, "org.apache.pekko.grpc.javadsl.GrpcExceptionHandler") + /** INTERNAL API */ @InternalApi private def default(system: ActorSystem): jFunction[Throwable, Trailers] = @@ -57,9 +61,11 @@ object GrpcExceptionHandler { case e: NotImplementedError => Trailers(Status.UNIMPLEMENTED.withDescription(e.getMessage)) case e: UnsupportedOperationException => Trailers(Status.UNIMPLEMENTED.withDescription(e.getMessage)) case e: StatusRuntimeException => Trailers(e.getStatus, new GrpcMetadataImpl(e.getTrailers)) + case e: PeerClosedStreamException => + log(system).warning(e, "Peer closed the stream: [{}]", e.getMessage) + INTERNAL case other => - val log = Logging(system, "org.apache.pekko.grpc.javadsl.GrpcExceptionHandler") - log.error(other, "Unhandled error: [{}]", other.getMessage) + log(system).error(other, "Unhandled error: [{}]", other.getMessage) INTERNAL } } diff --git a/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionHandler.scala b/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionHandler.scala index 157e21ee..232404a6 100644 --- a/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionHandler.scala +++ b/runtime/src/main/scala/org/apache/pekko/grpc/scaladsl/GrpcExceptionHandler.scala @@ -22,6 +22,7 @@ import pekko.grpc.GrpcProtocol.GrpcProtocolWriter import pekko.grpc.internal.{ GrpcMetadataImpl, GrpcResponseHelpers, MissingParameterException } import pekko.http.scaladsl.model.HttpResponse import io.grpc.{ Status, StatusRuntimeException } +import org.apache.pekko.http.scaladsl.model.http2.PeerClosedStreamException import scala.concurrent.{ ExecutionException, Future } import pekko.event.Logging @@ -31,6 +32,8 @@ object GrpcExceptionHandler { private val INTERNAL = Trailers(Status.INTERNAL) private val INVALID_ARGUMENT = Trailers(Status.INVALID_ARGUMENT) + private def log(system: ActorSystem) = Logging(system, "org.apache.pekko.grpc.scaladsl.GrpcExceptionHandler") + def defaultMapper(system: ActorSystem): PartialFunction[Throwable, Trailers] = { case e: ExecutionException => if (e.getCause == null) INTERNAL @@ -42,9 +45,11 @@ object GrpcExceptionHandler { case e: StatusRuntimeException => val meta = Option(e.getTrailers).getOrElse(new io.grpc.Metadata()) Trailers(e.getStatus, new GrpcMetadataImpl(meta)) + case e: PeerClosedStreamException => + log(system).warning(e, "Peer closed the stream: [{}]", e.getMessage) + INTERNAL case other => - val log = Logging(system, "org.apache.pekko.grpc.scaladsl.GrpcExceptionHandler") - log.error(other, "Unhandled error: [{}]", other.getMessage) + log(system).error(other, "Unhandled error: [{}]", other.getMessage) INTERNAL }