Skip to content

Commit

Permalink
Compress pub/sub message payload (#76)
Browse files Browse the repository at this point in the history
* Compress pub/sub message payload
* Use Zstd instead of GZIP
  • Loading branch information
narape authored Aug 11, 2023
1 parent b46d9ac commit e090376
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 1 deletion.
1 change: 1 addition & 0 deletions plugin/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies {
implementation platform('com.google.cloud:libraries-bom:26.22.0')
implementation 'com.google.cloud:google-cloud-pubsub'
implementation 'io.airlift:stats:235'
implementation 'io.airlift:aircompressor:0.25'
testImplementation 'org.hamcrest:hamcrest-core:2.2'
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package dev.regadas.trino.pubsub.listener;

import com.google.protobuf.Message;
import dev.regadas.trino.pubsub.listener.Encoder.MessageEncoder;
import io.airlift.compress.zstd.ZstdOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Objects;

public class CompressingMessageEncoder implements MessageEncoder {

private final MessageEncoder delegate;

public CompressingMessageEncoder(MessageEncoder delegate) {
this.delegate = Objects.requireNonNull(delegate);
}

@Override
public byte[] encode(Message value) throws Exception {
var uncompressedBytes = delegate.encode(value);
try (var in = new ByteArrayInputStream(uncompressedBytes);
var bao = new ByteArrayOutputStream()) {
// ZstdOutputStream compress and flushes on close,
// so we wrap it on its own try with resources
try (var zout = new ZstdOutputStream(bao)) {
in.transferTo(zout);
}
return bao.toByteArray();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.google.protobuf.Message;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import dev.regadas.trino.pubsub.listener.CompressingMessageEncoder;
import dev.regadas.trino.pubsub.listener.Encoder;
import dev.regadas.trino.pubsub.listener.Encoder.Encoding;
import dev.regadas.trino.pubsub.listener.Encoder.MessageEncoder;
Expand Down Expand Up @@ -50,7 +51,7 @@ public static PubSubPublisher create(
.setBatchingSettings(batchingSettings)
.build();

var encoder = MessageEncoder.create(encoding);
var encoder = new CompressingMessageEncoder(MessageEncoder.create(encoding));
return new PubSubPublisher(publisher, encoder);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package dev.regadas.trino.pubsub.listener;

import static java.util.stream.Collectors.joining;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.lessThan;

import com.google.protobuf.Message;
import dev.regadas.trino.pubsub.listener.Encoder.MessageEncoder;
import dev.regadas.trino.pubsub.listener.proto.Test.TestMessage;
import io.airlift.compress.zstd.ZstdInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.stream.Stream;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class CompressingMessageEncoderTest {

private static final String TEXT = Stream.generate(() -> "a").limit(1000).collect(joining());
private static final TestMessage MESSAGE = TestMessage.newBuilder().setText(TEXT).build();
private static final ProtoMessageEncoder DELEGATE = new ProtoMessageEncoder();
private CompressingMessageEncoder encoder;

@BeforeEach
void setUp() {
encoder = new CompressingMessageEncoder(DELEGATE);
}

@Test
void testEncodeActuallyCompress() throws Exception {
byte[] uncompressed = DELEGATE.encode(MESSAGE);

byte[] compressed = encoder.encode(MESSAGE);

assertThat(compressed.length, lessThan(uncompressed.length));
}

@Test
void testEncodeCompressionRoundTrip() throws Exception {
byte[] compressed = encoder.encode(MESSAGE);

byte[] decompressed = decompress(compressed);
assertThat(TestMessage.parseFrom(decompressed), equalTo(MESSAGE));
}

public static byte[] decompress(byte[] uncompressedBytes) throws IOException {
try (var zin = new ZstdInputStream(new ByteArrayInputStream(uncompressedBytes));
var bao = new ByteArrayOutputStream()) {
zin.transferTo(bao);
return bao.toByteArray();
}
}

static class ProtoMessageEncoder implements MessageEncoder {

@Override
public byte[] encode(Message value) {
return value.toByteArray();
}
}
}
6 changes: 6 additions & 0 deletions plugin/src/test/proto/test.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
syntax = "proto3";
package dev.regadas.trino.pubsub.listener.proto;

message TestMessage {
string text = 1;
}

0 comments on commit e090376

Please sign in to comment.