Skip to content

Commit

Permalink
Chore follow up to #39 (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas authored Jun 5, 2023
1 parent ec55a7a commit f2df58d
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public final class PubSubEventListener implements EventListener, AutoCloseable {
PubSubEventListener(PubSubEventListenerConfig config, Publisher publisher) {
this.config = requireNonNull(config, "config is null");
this.publisher = requireNonNull(publisher, "publisher is null");
this.pubSubInfo = new PubSubInfo(config.projectId(), config.topicId());
this.pubSubInfo = PubSubInfo.create(config.projectId(), config.topicId());
}

public static PubSubEventListener create(PubSubEventListenerConfig config) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,87 +1,89 @@
package dev.regadas.trino.pubsub.listener.metrics;

import static java.util.Objects.requireNonNull;

public class PubSubInfo implements PubSubInfoMBean {

private final String projectId;
private final String topicId;
private final PubSubCounters queryCreated;
private final PubSubCounters queryCompleted;
private final PubSubCounters splitCompleted;

public PubSubInfo(String projectId, String topicId) {
this.projectId = requireNonNull(projectId, "projectId is null");
this.topicId = requireNonNull(topicId, "topicId is null");
this.queryCreated = new PubSubCounters();
this.queryCompleted = new PubSubCounters();
this.splitCompleted = new PubSubCounters();
import com.google.auto.value.AutoValue;

@AutoValue
public abstract class PubSubInfo implements PubSubInfoMBean {

public static final PubSubInfo create(String projectId, String topicId) {
return new AutoValue_PubSubInfo.Builder()
.setProjectId(projectId)
.setTopicId(topicId)
.setQueryCreated(new PubSubCounters())
.setQueryCompleted(new PubSubCounters())
.setSplitCompleted(new PubSubCounters())
.build();
}

@Override
public String getProjectId() {
return this.projectId;
}
public abstract String getProjectId();

@Override
public String getTopicId() {
return this.topicId;
}
public abstract String getTopicId();

public PubSubCounters getQueryCreated() {
return queryCreated;
}
public abstract PubSubCounters getQueryCreated();

public PubSubCounters getQueryCompleted() {
return queryCompleted;
}
public abstract PubSubCounters getQueryCompleted();

public PubSubCounters getSplitCompleted() {
return splitCompleted;
}
public abstract PubSubCounters getSplitCompleted();

@Override
public Long getQueryCreatedPublicationAttempts() {
return queryCreated.attempts().get();
return getQueryCreated().attempts().get();
}

@Override
public Long getQueryCreatedPublishedSuccessfully() {
return queryCreated.successful().get();
return getQueryCreated().successful().get();
}

@Override
public Long getQueryCreatedPublicationFailed() {
return queryCreated.failure().get();
return getQueryCreated().failure().get();
}

@Override
public Long getQueryCompletedPublicationAttempts() {
return queryCompleted.attempts().get();
return getQueryCompleted().attempts().get();
}

@Override
public Long getQueryCompletedPublishedSuccessfully() {
return queryCompleted.successful().get();
return getQueryCompleted().successful().get();
}

@Override
public Long getQueryCompletedPublicationFailed() {
return queryCompleted.failure().get();
return getQueryCompleted().failure().get();
}

@Override
public Long getSplitCompletedPublicationAttempts() {
return splitCompleted.attempts().get();
return getSplitCompleted().attempts().get();
}

@Override
public Long getSplitCompletedPublishedSuccessfully() {
return splitCompleted.successful().get();
return getSplitCompleted().successful().get();
}

@Override
public Long getSplitCompletedPublicationFailed() {
return splitCompleted.failure().get();
return getSplitCompleted().failure().get();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setProjectId(String projectId);

public abstract Builder setTopicId(String topicId);

public abstract Builder setQueryCreated(PubSubCounters queryCreated);

public abstract Builder setQueryCompleted(PubSubCounters queryCompleted);

public abstract Builder setSplitCompleted(PubSubCounters splitCompleted);

public abstract PubSubInfo build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ class PubSubEventListenerTest {
"""
# trackEvent, pubType, expAttempt, expSuccess, expFail
true, success, 1, 1, 0
true, returnFailure, 1, 0, 1
true, throwOnPublish, 1, 0, 1
true, return_failure, 1, 0, 1
true, throw_on_publish, 1, 0, 1
false, success, 0, 0, 0
false, returnFailure, 0, 0, 0
false, throwOnPublish, 0, 0, 0
false, return_failure, 0, 0, 0
false, throw_on_publish, 0, 0, 0
""")
void testCounterForQueryCreated(
boolean trackEvent, String pubType, long expAttempt, long expSuccess, long expFail)
Expand Down Expand Up @@ -71,11 +71,11 @@ void testCounterForQueryCreated(
"""
# trackEvent, pubType, expAttempt, expSuccess, expFail
true, success, 1, 1, 0
true, returnFailure, 1, 0, 1
true, throwOnPublish, 1, 0, 1
true, return_failure, 1, 0, 1
true, throw_on_publish, 1, 0, 1
false, success, 0, 0, 0
false, returnFailure, 0, 0, 0
false, throwOnPublish, 0, 0, 0
false, return_failure, 0, 0, 0
false, throw_on_publish, 0, 0, 0
""")
void testCounterForQueryCompleted(
boolean trackEvent, String pubType, long expAttempt, long expSuccess, long expFail)
Expand Down Expand Up @@ -110,11 +110,11 @@ void testCounterForQueryCompleted(
"""
# trackEvent, pubType, expAttempt, expSuccess, expFail
true, success, 1, 1, 0
true, returnFailure, 1, 0, 1
true, throwOnPublish, 1, 0, 1
true, return_failure, 1, 0, 1
true, throw_on_publish, 1, 0, 1
false, success, 0, 0, 0
false, returnFailure, 0, 0, 0
false, throwOnPublish, 0, 0, 0
false, return_failure, 0, 0, 0
false, throw_on_publish, 0, 0, 0
""")
void testCounterForSplitCompleted(
boolean trackEvent, String pubType, long expAttempt, long expSuccess, long expFail)
Expand Down Expand Up @@ -146,7 +146,15 @@ void testCounterForSplitCompleted(
@Test
void testQueryCreatedPublishCorrespondingMessage() {
var publisher = new SuccessPublisher();
var config = new PubSubEventListenerConfig(true, true, true, "", "", null, Encoding.JSON);
var config =
PubSubEventListenerConfig.builder()
.trackQueryCreatedEvent(true)
.trackQueryCompletedEvent(true)
.trackSplitCompletedEvent(true)
.projectId("")
.topicId("")
.encoding(Encoding.JSON)
.build();
var eventListener = new PubSubEventListener(config, publisher);

eventListener.queryCreated(TestData.FULL_QUERY_CREATED_EVENT);
Expand All @@ -157,7 +165,15 @@ void testQueryCreatedPublishCorrespondingMessage() {
@Test
void testQueryCompletedPublishCorrespondingMessage() {
var publisher = new SuccessPublisher();
var config = new PubSubEventListenerConfig(true, true, true, "", "", null, Encoding.JSON);
var config =
PubSubEventListenerConfig.builder()
.trackQueryCreatedEvent(true)
.trackQueryCompletedEvent(true)
.trackSplitCompletedEvent(true)
.projectId("")
.topicId("")
.encoding(Encoding.JSON)
.build();
var eventListener = new PubSubEventListener(config, publisher);

eventListener.queryCompleted(TestData.FULL_QUERY_COMPLETED_EVENT);
Expand All @@ -168,7 +184,15 @@ void testQueryCompletedPublishCorrespondingMessage() {
@Test
void testSplitCompletedPublishCorrespondingMessage() {
var publisher = new SuccessPublisher();
var config = new PubSubEventListenerConfig(true, true, true, "", "", null, Encoding.JSON);
var config =
PubSubEventListenerConfig.builder()
.trackQueryCreatedEvent(true)
.trackQueryCompletedEvent(true)
.trackSplitCompletedEvent(true)
.projectId("")
.topicId("")
.encoding(Encoding.JSON)
.build();
var eventListener = new PubSubEventListener(config, publisher);

eventListener.splitCompleted(TestData.FULL_SPLIT_COMPLETED_EVENT);
Expand All @@ -177,10 +201,18 @@ void testSplitCompletedPublishCorrespondingMessage() {
}

@ParameterizedTest
@ValueSource(strings = {"success", "throwOnClose"})
@ValueSource(strings = {"success", "throw_on_close"})
void testClosedNormally(String pubType) {
var publisher = TestPublisher.from(pubType);
var config = new PubSubEventListenerConfig(true, true, true, "", "", null, Encoding.JSON);
var config =
PubSubEventListenerConfig.builder()
.trackQueryCreatedEvent(true)
.trackQueryCompletedEvent(true)
.trackSplitCompletedEvent(true)
.projectId("")
.topicId("")
.encoding(Encoding.JSON)
.build();
var eventListener = new PubSubEventListener(config, publisher);

assertDoesNotThrow(() -> eventListener.close());
Expand All @@ -197,6 +229,13 @@ public static <T> void assertThatEventually(Supplier<T> actual, Matcher<? super
}

abstract static class TestPublisher implements Publisher {
enum PubType {
SUCCESS,
RETURN_FAILURE,
THROW_ON_PUBLISH,
THROW_ON_CLOSE
}

Message lastPublishedMessage;
boolean closeCalled;

Expand All @@ -213,12 +252,11 @@ public void close() throws Exception {
}

static TestPublisher from(String pubType) {
return switch (pubType) {
case "success" -> new SuccessPublisher();
case "returnFailure" -> new ReturnFailurePublisher();
case "throwOnPublish" -> new ThrowOnPublishPublisher();
case "throwOnClose" -> new ThrowOnClosePublisher();
default -> throw new IllegalArgumentException("invalid pudType: " + pubType);
return switch (PubType.valueOf(pubType.toUpperCase())) {
case SUCCESS -> new SuccessPublisher();
case RETURN_FAILURE -> new ReturnFailurePublisher();
case THROW_ON_PUBLISH -> new ThrowOnPublishPublisher();
case THROW_ON_CLOSE -> new ThrowOnClosePublisher();
};
}

Expand Down

0 comments on commit f2df58d

Please sign in to comment.