Skip to content

Commit

Permalink
optimize number of buffer allocations (#11879)
Browse files Browse the repository at this point in the history
Currently this improves 2 flows

1. Known length message which length is greater than 1Mb. Previously the
first buffer was 1Mb, and then many buffers of 4096 bytes (from
CodedOutputStream), now subsequent buffers are also up to 1Mb

2. In case of compression, the first write is always 10 bytes buffer
(gzip header), but worth allocating more space
  • Loading branch information
panchenko authored Feb 14, 2025
1 parent 7585b16 commit 5a7f350
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 37 deletions.
19 changes: 12 additions & 7 deletions core/src/main/java/io/grpc/internal/MessageFramer.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ void deliverFrame(
// effectively final. Can only be set once.
private int maxOutboundMessageSize = NO_MAX_OUTBOUND_MESSAGE_SIZE;
private WritableBuffer buffer;
/**
* if > 0 - the number of bytes to allocate for the current known-length message.
*/
private int knownLengthPendingAllocation;
private Compressor compressor = Codec.Identity.NONE;
private boolean messageCompression = true;
private final OutputStreamAdapter outputStreamAdapter = new OutputStreamAdapter();
Expand Down Expand Up @@ -222,9 +226,7 @@ private int writeKnownLengthUncompressed(InputStream message, int messageLength)
headerScratch.put(UNCOMPRESSED).putInt(messageLength);
// Allocate the initial buffer chunk based on frame header + payload length.
// Note that the allocator may allocate a buffer larger or smaller than this length
if (buffer == null) {
buffer = bufferAllocator.allocate(headerScratch.position() + messageLength);
}
knownLengthPendingAllocation = HEADER_LENGTH + messageLength;
writeRaw(headerScratch.array(), 0, headerScratch.position());
return writeToOutputStream(message, outputStreamAdapter);
}
Expand Down Expand Up @@ -288,8 +290,9 @@ private void writeRaw(byte[] b, int off, int len) {
commitToSink(false, false);
}
if (buffer == null) {
// Request a buffer allocation using the message length as a hint.
buffer = bufferAllocator.allocate(len);
checkState(knownLengthPendingAllocation > 0, "knownLengthPendingAllocation reached 0");
buffer = bufferAllocator.allocate(knownLengthPendingAllocation);
knownLengthPendingAllocation -= min(knownLengthPendingAllocation, buffer.writableBytes());
}
int toWrite = min(len, buffer.writableBytes());
buffer.write(b, off, toWrite);
Expand Down Expand Up @@ -388,6 +391,8 @@ public void write(byte[] b, int off, int len) {
* {@link OutputStream}.
*/
private final class BufferChainOutputStream extends OutputStream {
private static final int FIRST_BUFFER_SIZE = 4096;

private final List<WritableBuffer> bufferList = new ArrayList<>();
private WritableBuffer current;

Expand All @@ -397,7 +402,7 @@ private final class BufferChainOutputStream extends OutputStream {
* {@link #write(byte[], int, int)}.
*/
@Override
public void write(int b) throws IOException {
public void write(int b) {
if (current != null && current.writableBytes() > 0) {
current.write((byte)b);
return;
Expand All @@ -410,7 +415,7 @@ public void write(int b) throws IOException {
public void write(byte[] b, int off, int len) {
if (current == null) {
// Request len bytes initially from the allocator, it may give us more.
current = bufferAllocator.allocate(len);
current = bufferAllocator.allocate(Math.max(FIRST_BUFFER_SIZE, len));
bufferList.add(current);
}
while (len > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import io.grpc.CallOptions;
import io.grpc.Channel;
Expand Down Expand Up @@ -53,8 +55,6 @@
import io.grpc.testing.integration.TestServiceGrpc.TestServiceBlockingStub;
import io.grpc.testing.integration.TransportCompressionTest.Fzip;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -146,25 +146,16 @@ public void tearDown() {
* Parameters for test.
*/
@Parameters
public static Collection<Object[]> params() {
boolean[] bools = new boolean[]{false, true};
List<Object[]> combos = new ArrayList<>(64);
for (boolean enableClientMessageCompression : bools) {
for (boolean clientAcceptEncoding : bools) {
for (boolean clientEncoding : bools) {
for (boolean enableServerMessageCompression : bools) {
for (boolean serverAcceptEncoding : bools) {
for (boolean serverEncoding : bools) {
combos.add(new Object[] {
enableClientMessageCompression, clientAcceptEncoding, clientEncoding,
enableServerMessageCompression, serverAcceptEncoding, serverEncoding});
}
}
}
}
}
}
return combos;
public static Iterable<Object[]> params() {
List<Boolean> bools = Lists.newArrayList(false, true);
return Iterables.transform(Lists.cartesianProduct(
bools, // enableClientMessageCompression
bools, // clientAcceptEncoding
bools, // clientEncoding
bools, // enableServerMessageCompression
bools, // serverAcceptEncoding
bools // serverEncoding
), List::toArray);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ public void writeFrameFutureFailedShouldCancelRpc() {
// Verify that failed SendGrpcFrameCommand results in immediate CancelClientStreamCommand.
inOrder.verify(writeQueue).enqueue(any(CancelClientStreamCommand.class), eq(true));
// Verify that any other failures do not produce another CancelClientStreamCommand in the queue.
inOrder.verify(writeQueue, atLeast(1)).enqueue(any(SendGrpcFrameCommand.class), eq(false));
inOrder.verify(writeQueue, atLeast(0)).enqueue(any(SendGrpcFrameCommand.class), eq(false));
inOrder.verify(writeQueue).enqueue(any(SendGrpcFrameCommand.class), eq(true));
inOrder.verifyNoMoreInteractions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ public void writeFrameFutureFailedShouldCancelRpc() {
// Verify that failed SendGrpcFrameCommand results in immediate CancelServerStreamCommand.
inOrder.verify(writeQueue).enqueue(any(CancelServerStreamCommand.class), eq(true));
// Verify that any other failures do not produce another CancelServerStreamCommand in the queue.
inOrder.verify(writeQueue, atLeast(1)).enqueue(any(SendGrpcFrameCommand.class), eq(false));
inOrder.verify(writeQueue, atLeast(0)).enqueue(any(SendGrpcFrameCommand.class), eq(false));
inOrder.verify(writeQueue).enqueue(any(SendGrpcFrameCommand.class), eq(true));
inOrder.verifyNoMoreInteractions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.grpc.internal.WritableBuffer;
import io.grpc.internal.WritableBufferAllocator;
import okio.Buffer;
import okio.Segment;

/**
* The default allocator for {@link OkHttpWritableBuffer}s used by the OkHttp transport. OkHttp
Expand All @@ -27,9 +28,6 @@
*/
class OkHttpWritableBufferAllocator implements WritableBufferAllocator {

// Use 4k as our minimum buffer size.
private static final int MIN_BUFFER = 4096;

// Set the maximum buffer size to 1MB
private static final int MAX_BUFFER = 1024 * 1024;

Expand All @@ -45,7 +43,9 @@ class OkHttpWritableBufferAllocator implements WritableBufferAllocator {
*/
@Override
public WritableBuffer allocate(int capacityHint) {
capacityHint = Math.min(MAX_BUFFER, Math.max(MIN_BUFFER, capacityHint));
// okio buffer uses fixed size Segments, round capacityHint up
capacityHint = Math.min(MAX_BUFFER,
(capacityHint + Segment.SIZE - 1) / Segment.SIZE * Segment.SIZE);
return new OkHttpWritableBuffer(new Buffer(), capacityHint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.grpc.internal.WritableBuffer;
import io.grpc.internal.WritableBufferAllocator;
import io.grpc.internal.WritableBufferAllocatorTestBase;
import okio.Segment;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -42,7 +43,7 @@ protected WritableBufferAllocator allocator() {
public void testCapacity() {
WritableBuffer buffer = allocator().allocate(4096);
assertEquals(0, buffer.readableBytes());
assertEquals(4096, buffer.writableBytes());
assertEquals(Segment.SIZE, buffer.writableBytes());
}

@Test
Expand All @@ -54,8 +55,8 @@ public void testInitialCapacityHasMaximum() {

@Test
public void testIsExactBelowMaxCapacity() {
WritableBuffer buffer = allocator().allocate(4097);
WritableBuffer buffer = allocator().allocate(Segment.SIZE + 1);
assertEquals(0, buffer.readableBytes());
assertEquals(4097, buffer.writableBytes());
assertEquals(Segment.SIZE * 2, buffer.writableBytes());
}
}

0 comments on commit 5a7f350

Please sign in to comment.