Skip to content

Commit

Permalink
feat(exporter): migrate to Zeebe 0.17.0
Browse files Browse the repository at this point in the history
  • Loading branch information
saig0 committed Apr 3, 2019
1 parent 51537d8 commit 55da7b9
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 83 deletions.
74 changes: 23 additions & 51 deletions connector-java/src/test/java/io/zeebe/hazelcast/ExporterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.tuple;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.core.HazelcastInstance;
Expand All @@ -23,6 +18,10 @@
import io.zeebe.model.bpmn.BpmnModelInstance;
import io.zeebe.test.ZeebeTestRule;
import io.zeebe.test.util.TestUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
Expand All @@ -34,7 +33,7 @@ public class ExporterTest {
Bpmn.createExecutableProcess("process")
.startEvent("start")
.sequenceFlowId("to-task")
.serviceTask("task", s -> s.zeebeTaskType("test").zeebeInput("$.foo", "$.bar"))
.serviceTask("task", s -> s.zeebeTaskType("test").zeebeInput("foo", "bar"))
.sequenceFlowId("to-end")
.endEvent("end")
.done();
Expand Down Expand Up @@ -68,30 +67,19 @@ public void shouldExportWorkflowInstanceEvents() {
final ITopic<byte[]> topic = hz.getTopic(CONFIGURATION.workflowInstanceTopic);
topic.addMessageListener(new WorkflowInstanceEventListener(events::add));

client
.workflowClient()
.newDeployCommand()
.addWorkflowModel(WORKFLOW, "process.bpmn")
.send()
.join();
client.newDeployCommand().addWorkflowModel(WORKFLOW, "process.bpmn").send().join();

client
.workflowClient()
.newCreateInstanceCommand()
.bpmnProcessId("process")
.latestVersion()
.send()
.join();
client.newCreateInstanceCommand().bpmnProcessId("process").latestVersion().send().join();

TestUtil.waitUntil(() -> events.size() >= 4);

assertThat(events)
.extracting(r -> tuple(r.getElementId(), r.getMetadata().getIntent()))
.containsSequence(
tuple("process", "ELEMENT_READY"),
tuple("process", "ELEMENT_ACTIVATING"),
tuple("process", "ELEMENT_ACTIVATED"),
tuple("start", "START_EVENT_OCCURRED"),
tuple("to-task", "SEQUENCE_FLOW_TAKEN"));
tuple("start", "ELEMENT_ACTIVATING"),
tuple("start", "ELEMENT_ACTIVATED"));
}

@Test
Expand All @@ -101,12 +89,7 @@ public void shouldExportDeploymentEvents() {
final ITopic<byte[]> topic = hz.getTopic(CONFIGURATION.deploymentTopic);
topic.addMessageListener(new DeploymentEventListener(events::add));

client
.workflowClient()
.newDeployCommand()
.addWorkflowModel(WORKFLOW, "process.bpmn")
.send()
.join();
client.newDeployCommand().addWorkflowModel(WORKFLOW, "process.bpmn").send().join();

TestUtil.waitUntil(() -> events.size() >= 2);

Expand All @@ -123,25 +106,22 @@ public void shouldExportJobEvents() {
final ITopic<byte[]> topic = hz.getTopic(CONFIGURATION.jobTopic);
topic.addMessageListener(new JobEventListener(events::add));

client
.workflowClient()
.newDeployCommand()
.addWorkflowModel(WORKFLOW, "process.bpmn")
.send()
.join();
client.newDeployCommand().addWorkflowModel(WORKFLOW, "process.bpmn").send().join();

client
.workflowClient()
.newCreateInstanceCommand()
.bpmnProcessId("process")
.latestVersion()
.payload(Collections.singletonMap("foo", 123))
.variables(Collections.singletonMap("foo", 123))
.send()
.join();

TestUtil.waitUntil(() -> events.size() >= 1);

assertThat(events).hasSize(1).extracting(r -> r.getMetadata().getIntent()).containsExactly("CREATED");
assertThat(events)
.hasSize(1)
.extracting(r -> r.getMetadata().getIntent())
.containsExactly("CREATED");
}

@Test
Expand All @@ -151,23 +131,15 @@ public void shouldExportIncidentEvents() {
final ITopic<byte[]> topic = hz.getTopic(CONFIGURATION.incidentTopic);
topic.addMessageListener(new IncidentEventListener(events::add));

client
.workflowClient()
.newDeployCommand()
.addWorkflowModel(WORKFLOW, "process.bpmn")
.send()
.join();
client.newDeployCommand().addWorkflowModel(WORKFLOW, "process.bpmn").send().join();

client
.workflowClient()
.newCreateInstanceCommand()
.bpmnProcessId("process")
.latestVersion()
.send()
.join();
client.newCreateInstanceCommand().bpmnProcessId("process").latestVersion().send().join();

TestUtil.waitUntil(() -> events.size() >= 1);

assertThat(events).hasSize(1).extracting(r -> r.getMetadata().getIntent()).containsExactly("CREATED");
assertThat(events)
.hasSize(1)
.extracting(r -> r.getMetadata().getIntent())
.containsExactly("CREATED");
}
}
2 changes: 1 addition & 1 deletion exporter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<dependency>
<groupId>io.zeebe</groupId>
<artifactId>zb-exporter-api</artifactId>
<artifactId>zeebe-exporter-api</artifactId>
<scope>provided</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package io.zeebe.hazelcast.exporter;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.EnumMap;

import com.google.protobuf.GeneratedMessageV3;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import io.zeebe.exporter.context.Context;
import io.zeebe.exporter.context.Controller;
import io.zeebe.exporter.api.context.Context;
import io.zeebe.exporter.api.context.Controller;
import io.zeebe.exporter.api.record.Record;
import io.zeebe.exporter.api.record.RecordMetadata;
import io.zeebe.exporter.api.spi.Exporter;
import io.zeebe.exporter.proto.RecordTransformer;
import io.zeebe.exporter.record.Record;
import io.zeebe.exporter.record.RecordMetadata;
import io.zeebe.exporter.spi.Exporter;
import io.zeebe.protocol.clientapi.RecordType;
import io.zeebe.protocol.clientapi.ValueType;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.EnumMap;
import org.slf4j.Logger;

public class HazelcastExporter implements Exporter {
Expand Down Expand Up @@ -77,15 +76,14 @@ public void export(Record record) {
private byte[] transformRecord(Record record) {
final GeneratedMessageV3 dto = RecordTransformer.toProtobufMessage(record);

try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream())
{
dto.writeTo(outputStream);
return outputStream.toByteArray();
} catch (IOException ioe)
{
final String exceptionMsg = String.format("Failed to write %s to byte array output stream.", dto.toString());
logger.error(exceptionMsg, ioe);
throw new RuntimeException(exceptionMsg, ioe);
}
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
dto.writeTo(outputStream);
return outputStream.toByteArray();
} catch (IOException ioe) {
final String exceptionMsg =
String.format("Failed to write %s to byte array output stream.", dto.toString());
logger.error(exceptionMsg, ioe);
throw new RuntimeException(exceptionMsg, ioe);
}
}
}
10 changes: 2 additions & 8 deletions exporter/src/test/java/io/zeebe/hazelcast/ExporterTest.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.zeebe.hazelcast;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.hazelcast.client.HazelcastClient;
import com.hazelcast.client.config.ClientConfig;
Expand All @@ -28,7 +27,7 @@ public class ExporterTest {
Bpmn.createExecutableProcess("process")
.startEvent("start")
.sequenceFlowId("to-task")
.serviceTask("task", s -> s.zeebeTaskType("test").zeebeInput("$.foo", "$.bar"))
.serviceTask("task", s -> s.zeebeTaskType("test"))
.sequenceFlowId("to-end")
.endEvent("end")
.done();
Expand Down Expand Up @@ -62,12 +61,7 @@ public void shouldExportEventsAsProtobuf() throws Exception {
final ITopic<byte[]> topic = hz.getTopic(CONFIGURATION.deploymentTopic);
topic.addMessageListener(m -> messages.add(m.getMessageObject()));

client
.workflowClient()
.newDeployCommand()
.addWorkflowModel(WORKFLOW, "process.bpmn")
.send()
.join();
client.newDeployCommand().addWorkflowModel(WORKFLOW, "process.bpmn").send().join();

TestUtil.waitUntil(() -> messages.size() > 0);

Expand Down
13 changes: 9 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>
<name>Zeebe Hazelcast Root</name>
Expand All @@ -16,9 +18,9 @@
</parent>

<properties>
<version.zeebe>0.14.0</version.zeebe>
<version.zeebe>0.17.0</version.zeebe>
<version.hazelcast>3.11</version.hazelcast>
<version.exporter.protobuf>0.1.0</version.exporter.protobuf>
<version.exporter.protobuf>0.4.0</version.exporter.protobuf>
<version.jackson>2.9.6</version.jackson>

<!-- release parent settings -->
Expand Down Expand Up @@ -51,7 +53,7 @@

<dependency>
<groupId>io.zeebe</groupId>
<artifactId>zb-bom</artifactId>
<artifactId>zeebe-bom</artifactId>
<version>${version.zeebe}</version>
<scope>import</scope>
<type>pom</type>
Expand All @@ -69,16 +71,19 @@
<version>${version.hazelcast}</version>
</dependency>


<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.10.0</version>
<scope>test</scope>
</dependency>

<dependency>
Expand Down

0 comments on commit 55da7b9

Please sign in to comment.