Skip to content

Commit

Permalink
Fixed Streaming02KafkaToHttpTest
Browse files Browse the repository at this point in the history
  • Loading branch information
Juboy committed Feb 5, 2025
1 parent 87ee97e commit f7d06ba
Showing 1 changed file with 15 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package org.eclipse.edc.samples.transfer.streaming;

import jakarta.json.Json;
import okhttp3.mockwebserver.MockWebServer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -26,16 +25,16 @@
import org.eclipse.edc.junit.extensions.EmbeddedRuntime;
import org.eclipse.edc.junit.extensions.RuntimeExtension;
import org.eclipse.edc.junit.extensions.RuntimePerClassExtension;
import org.eclipse.edc.util.io.Ports;
import org.junit.jupiter.api.BeforeEach;
import org.eclipse.edc.samples.util.HttpRequestLoggerConsumer;
import org.eclipse.edc.samples.util.HttpRequestLoggerContainer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.kafka.ConfluentKafkaContainer;
import org.testcontainers.utility.DockerImageName;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.Properties;
Expand All @@ -57,6 +56,7 @@ public class Streaming02KafkaToHttpTest {
private static final String MAX_DURATION = "PT30S";
private static final String SAMPLE_FOLDER = "transfer/streaming/streaming-02-kafka-to-http";
private static final Duration TIMEOUT = Duration.ofSeconds(30);
private static final HttpRequestLoggerConsumer LOG_CONSUMER = new HttpRequestLoggerConsumer();
private static final StreamingParticipant PROVIDER = StreamingParticipant.Builder.newStreamingInstance()
.name("provider")
.id("provider")
Expand All @@ -71,11 +71,13 @@ public class Streaming02KafkaToHttpTest {
.build();

@Container
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
.withKraft()
static ConfluentKafkaContainer kafkaContainer = new ConfluentKafkaContainer(DockerImageName.parse(KAFKA_IMAGE_NAME))
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "true")
.withEnv("KAFKA_CREATE_TOPICS", TOPIC.concat(":1:1"));

@Container
public static HttpRequestLoggerContainer httpRequestLoggerContainer = new HttpRequestLoggerContainer(LOG_CONSUMER);

@RegisterExtension
static RuntimeExtension providerConnector = new RuntimePerClassExtension(new EmbeddedRuntime(
"provider",
Expand All @@ -88,12 +90,10 @@ public class Streaming02KafkaToHttpTest {
":transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime"
).configurationProvider(fromPropertiesFile(SAMPLE_FOLDER + "/streaming-02-runtime/consumer.properties")));

private final int httpReceiverPort = Ports.getFreePort();
private final MockWebServer consumerReceiverServer = new MockWebServer();

@BeforeEach
void setUp() throws IOException {
consumerReceiverServer.start(httpReceiverPort);
@BeforeAll
static void setUp() {
httpRequestLoggerContainer.start();
}

@Test
Expand All @@ -108,7 +108,7 @@ void streamData() {

var destination = Json.createObjectBuilder()
.add("type", "HttpData")
.add("baseUrl", "http://localhost:" + httpReceiverPort)
.add("baseUrl", "http://localhost:4000")
.build();

var transferProcessId = CONSUMER.requestAssetFrom("kafka-stream-asset", PROVIDER)
Expand All @@ -122,15 +122,11 @@ void streamData() {
});

var producer = createKafkaProducer();
var message = "message";
var message = "message from producer";
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> producer
.send(new ProducerRecord<>(TOPIC, "key", message)), 0L, 100L, MICROSECONDS);

await().atMost(TIMEOUT).untilAsserted(() -> {
var request = consumerReceiverServer.takeRequest();
assertThat(request).isNotNull();
assertThat(request.getBody().readByteArray()).isEqualTo(message.getBytes());
});
await().atMost(TIMEOUT).untilAsserted(() -> assertThat(LOG_CONSUMER.toUtf8String()).contains(message));

producer.close();
}
Expand Down

0 comments on commit f7d06ba

Please sign in to comment.