diff --git a/core/src/main/java/io/grpc/internal/MessageFramer.java b/core/src/main/java/io/grpc/internal/MessageFramer.java index 5e75fa2e6fe..8b5ccb864a4 100644 --- a/core/src/main/java/io/grpc/internal/MessageFramer.java +++ b/core/src/main/java/io/grpc/internal/MessageFramer.java @@ -75,6 +75,10 @@ void deliverFrame( // effectively final. Can only be set once. private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE; private WritableBuffer buffer; + /** + * if > 0 - the number of bytes to allocate for the current known-length message. + */ + private int knownLengthPendingAllocation; private Compressor compressor = Codec.Identity.NONE; private boolean messageCompression = true; private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter(); @@ -222,9 +226,7 @@ private int writeKnownLengthUncompressed(InputStream message, int messageLength) headerScratch.put(UNCOMPRESSED).putInt(messageLength); // Allocate the initial buffer chunk based on frame header + payload length. // Note that the allocator may allocate a buffer larger or smaller than this length - if (buffer == null) { - buffer = bufferAllocator.allocate(headerScratch.position() + messageLength); - } + knownLengthPendingAllocation = HEADER_LENGTH + messageLength; writeRaw(headerScratch.array(), 0, headerScratch.position()); return writeToOutputStream(message, outputStreamAdapter); } @@ -288,8 +290,9 @@ private void writeRaw(byte[] b, int off, int len) { commitToSink(false, false); } if (buffer == null) { - // Request a buffer allocation using the message length as a hint. - buffer = bufferAllocator.allocate(len); + checkState(knownLengthPendingAllocation > 0, "knownLengthPendingAllocation reached 0"); + buffer = bufferAllocator.allocate(knownLengthPendingAllocation); + knownLengthPendingAllocation -= min(knownLengthPendingAllocation, buffer.writableBytes()); } int toWrite = min(len, buffer.writableBytes()); buffer.write(b, off, toWrite); @@ -388,6 +391,8 @@ public void write(byte[] b, int off, int len) { * {@link OutputStream}. */ private final class BufferChainOutputStream extends OutputStream { + private static final int FIRST_BUFFER_SIZE = 4096; + private final List bufferList = new ArrayList<>(); private WritableBuffer current; @@ -397,7 +402,7 @@ private final class BufferChainOutputStream extends OutputStream { * {@link #write(byte[], int, int)}. */ @Override - public void write(int b) throws IOException { + public void write(int b) { if (current != null && current.writableBytes() > 0) { current.write((byte)b); return; @@ -410,7 +415,7 @@ public void write(int b) throws IOException { public void write(byte[] b, int off, int len) { if (current == null) { // Request len bytes initially from the allocator, it may give us more. - current = bufferAllocator.allocate(len); + current = bufferAllocator.allocate(Math.max(FIRST_BUFFER_SIZE, len)); bufferList.add(current); } while (len > 0) { diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/CompressionTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/CompressionTest.java index 208eb40c438..5307c26949b 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/CompressionTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/CompressionTest.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import io.grpc.CallOptions; import io.grpc.Channel; @@ -53,8 +55,6 @@ import io.grpc.testing.integration.TestServiceGrpc.TestServiceBlockingStub; import io.grpc.testing.integration.TransportCompressionTest.Fzip; import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -146,25 +146,16 @@ public void tearDown() { * Parameters for test. */ @Parameters - public static Collection params() { - boolean[] bools = new boolean[]{false, true}; - List combos = new ArrayList<>(64); - for (boolean enableClientMessageCompression : bools) { - for (boolean clientAcceptEncoding : bools) { - for (boolean clientEncoding : bools) { - for (boolean enableServerMessageCompression : bools) { - for (boolean serverAcceptEncoding : bools) { - for (boolean serverEncoding : bools) { - combos.add(new Object[] { - enableClientMessageCompression, clientAcceptEncoding, clientEncoding, - enableServerMessageCompression, serverAcceptEncoding, serverEncoding}); - } - } - } - } - } - } - return combos; + public static Iterable params() { + List bools = Lists.newArrayList(false, true); + return Iterables.transform(Lists.cartesianProduct( + bools, // enableClientMessageCompression + bools, // clientAcceptEncoding + bools, // clientEncoding + bools, // enableServerMessageCompression + bools, // serverAcceptEncoding + bools // serverEncoding + ), List::toArray); } @Test diff --git a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java index a44a196ac8c..4dd24c3fd4d 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java @@ -233,7 +233,7 @@ public void writeFrameFutureFailedShouldCancelRpc() { // Verify that failed SendGrpcFrameCommand results in immediate CancelClientStreamCommand. inOrder.verify(writeQueue).enqueue(any(CancelClientStreamCommand.class), eq(true)); // Verify that any other failures do not produce another CancelClientStreamCommand in the queue. - inOrder.verify(writeQueue, atLeast(1)).enqueue(any(SendGrpcFrameCommand.class), eq(false)); + inOrder.verify(writeQueue, atLeast(0)).enqueue(any(SendGrpcFrameCommand.class), eq(false)); inOrder.verify(writeQueue).enqueue(any(SendGrpcFrameCommand.class), eq(true)); inOrder.verifyNoMoreInteractions(); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java index 0723e359752..2f2933ae103 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java @@ -158,7 +158,7 @@ public void writeFrameFutureFailedShouldCancelRpc() { // Verify that failed SendGrpcFrameCommand results in immediate CancelServerStreamCommand. inOrder.verify(writeQueue).enqueue(any(CancelServerStreamCommand.class), eq(true)); // Verify that any other failures do not produce another CancelServerStreamCommand in the queue. - inOrder.verify(writeQueue, atLeast(1)).enqueue(any(SendGrpcFrameCommand.class), eq(false)); + inOrder.verify(writeQueue, atLeast(0)).enqueue(any(SendGrpcFrameCommand.class), eq(false)); inOrder.verify(writeQueue).enqueue(any(SendGrpcFrameCommand.class), eq(true)); inOrder.verifyNoMoreInteractions(); } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpWritableBufferAllocator.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpWritableBufferAllocator.java index 481ada61c96..f5d490818ba 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpWritableBufferAllocator.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpWritableBufferAllocator.java @@ -19,6 +19,7 @@ import io.grpc.internal.WritableBuffer; import io.grpc.internal.WritableBufferAllocator; import okio.Buffer; +import okio.Segment; /** * The default allocator for {@link OkHttpWritableBuffer}s used by the OkHttp transport. OkHttp @@ -27,9 +28,6 @@ */ class OkHttpWritableBufferAllocator implements WritableBufferAllocator { - // Use 4k as our minimum buffer size. - private static final int MIN_BUFFER = 4096; - // Set the maximum buffer size to 1MB private static final int MAX_BUFFER = 1024 * 1024; @@ -45,7 +43,9 @@ class OkHttpWritableBufferAllocator implements WritableBufferAllocator { */ @Override public WritableBuffer allocate(int capacityHint) { - capacityHint = Math.min(MAX_BUFFER, Math.max(MIN_BUFFER, capacityHint)); + // okio buffer uses fixed size Segments, round capacityHint up + capacityHint = Math.min(MAX_BUFFER, + (capacityHint + Segment.SIZE - 1) / Segment.SIZE * Segment.SIZE); return new OkHttpWritableBuffer(new Buffer(), capacityHint); } } diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpWritableBufferAllocatorTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpWritableBufferAllocatorTest.java index e606b6b9a50..c444e0ee11d 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpWritableBufferAllocatorTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpWritableBufferAllocatorTest.java @@ -21,6 +21,7 @@ import io.grpc.internal.WritableBuffer; import io.grpc.internal.WritableBufferAllocator; import io.grpc.internal.WritableBufferAllocatorTestBase; +import okio.Segment; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -42,7 +43,7 @@ protected WritableBufferAllocator allocator() { public void testCapacity() { WritableBuffer buffer = allocator().allocate(4096); assertEquals(0, buffer.readableBytes()); - assertEquals(4096, buffer.writableBytes()); + assertEquals(Segment.SIZE, buffer.writableBytes()); } @Test @@ -54,8 +55,8 @@ public void testInitialCapacityHasMaximum() { @Test public void testIsExactBelowMaxCapacity() { - WritableBuffer buffer = allocator().allocate(4097); + WritableBuffer buffer = allocator().allocate(Segment.SIZE + 1); assertEquals(0, buffer.readableBytes()); - assertEquals(4097, buffer.writableBytes()); + assertEquals(Segment.SIZE * 2, buffer.writableBytes()); } }