From 7bb40225b45e82aa5387b0595a52c79d80da59cc Mon Sep 17 00:00:00 2001 From: neo <1100909+neowu@users.noreply.github.com> Date: Mon, 3 Mar 2025 16:09:34 +0800 Subject: [PATCH] * log-exporter: export action and event in parquet format Signed-off-by: neo <1100909+neowu@users.noreply.github.com> --- CHANGELOG.md | 1 + .../framework/log/message/EventMessage.java | 4 +- ext/build.gradle.kts | 11 +++ .../main/java/core/log/LogExporterApp.java | 9 +++ .../java/core/log/domain/ActionLogSchema.java | 65 ++++++++++++++++ .../java/core/log/domain/EventSchema.java | 46 ++++++++++++ .../main/java/core/log/job/ProcessLogJob.java | 3 +- .../log/kafka/ActionLogMessageHandler.java | 54 +++++--------- .../core/log/kafka/EventMessageHandler.java | 29 +++++--- .../java/core/log/service/ArchiveService.java | 74 +++++++++++++++---- .../java/core/log/job/ProcessLogJobTest.java | 2 +- .../kafka/ActionLogMessageHandlerTest.java | 15 +++- .../log/kafka/EventMessageHandlerTest.java | 12 ++- .../core/log/service/ArchiveServiceTest.java | 63 ++++++++++++++-- .../core/log/web/UploadControllerTest.java | 3 +- 15 files changed, 312 insertions(+), 79 deletions(-) create mode 100644 ext/log-exporter/src/main/java/core/log/domain/ActionLogSchema.java create mode 100644 ext/log-exporter/src/main/java/core/log/domain/EventSchema.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cd27012b..daa365690 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * sse: send ErrorResponse to client via "event: error" on exception * sse: log clientIP on sse:close action +* log-exporter: export action and event in parquet format ### 9.1.6 (2/10/2025 - 2/25/2025) diff --git a/core-ng/src/main/java/core/framework/log/message/EventMessage.java b/core-ng/src/main/java/core/framework/log/message/EventMessage.java index e46529e77..72715ed5a 100644 --- a/core-ng/src/main/java/core/framework/log/message/EventMessage.java +++ b/core-ng/src/main/java/core/framework/log/message/EventMessage.java @@ -25,12 +25,12 @@ public class EventMessage { public String errorCode; @Property(name = "error_message") public String errorMessage; + @Property(name = "elapsed") + public Long elapsed; @Property(name = "context") public Map context; @Property(name = "stats") public Map stats; @Property(name = "info") public Map info; - @Property(name = "elapsed") - public Long elapsed; } diff --git a/ext/build.gradle.kts b/ext/build.gradle.kts index 83deb13ed..c5422ee90 100644 --- a/ext/build.gradle.kts +++ b/ext/build.gradle.kts @@ -24,6 +24,17 @@ project("log-exporter") { apply(plugin = "app") dependencies { implementation(project(":core-ng")) + + // for parquet + compileOnly("org.apache.hadoop:hadoop-annotations:3.4.1") + implementation("org.apache.parquet:parquet-avro:1.15.0") + implementation("org.apache.avro:avro:1.12.0") + implementation("org.apache.hadoop:hadoop-common:3.4.1@jar") + runtimeOnly("commons-collections:commons-collections:3.2.2@jar") + runtimeOnly("com.fasterxml.woodstox:woodstox-core:5.4.0@jar") + runtimeOnly("org.codehaus.woodstox:stax2-api:4.2.1@jar") + runtimeOnly("org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.2.0@jar") + testImplementation(project(":core-ng-test")) } } diff --git a/ext/log-exporter/src/main/java/core/log/LogExporterApp.java b/ext/log-exporter/src/main/java/core/log/LogExporterApp.java index bff0a9ae9..ff0bc04e4 100644 --- a/ext/log-exporter/src/main/java/core/log/LogExporterApp.java +++ b/ext/log-exporter/src/main/java/core/log/LogExporterApp.java @@ -5,6 +5,8 @@ import core.framework.log.message.EventMessage; import core.framework.log.message.LogTopics; import core.framework.module.App; +import core.log.domain.ActionLogSchema; +import core.log.domain.EventSchema; import core.log.job.ProcessLogJob; import core.log.kafka.ActionLogMessageHandler; import core.log.kafka.EventMessageHandler; @@ -12,6 +14,8 @@ import core.log.service.UploadService; import core.log.web.UploadController; import core.log.web.UploadRequest; +import org.apache.avro.data.TimeConversions; +import org.apache.avro.specific.SpecificData; import java.time.Duration; import java.time.LocalTime; @@ -31,6 +35,11 @@ protected void initialize() { kafka().minPoll(1024 * 1024, Duration.ofMillis(5000)); // try to get at least 1M message, and can wait longer kafka().maxPoll(3000, 3 * 1024 * 1024); // get 3M message at max + SpecificData specificData = SpecificData.get(); + specificData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion()); + bind(ActionLogSchema.class); + bind(EventSchema.class); + bind(new UploadService(requiredProperty("app.log.bucket"))); bind(ArchiveService.class); diff --git a/ext/log-exporter/src/main/java/core/log/domain/ActionLogSchema.java b/ext/log-exporter/src/main/java/core/log/domain/ActionLogSchema.java new file mode 100644 index 000000000..58c2678ec --- /dev/null +++ b/ext/log-exporter/src/main/java/core/log/domain/ActionLogSchema.java @@ -0,0 +1,65 @@ +package core.log.domain; + +import core.framework.log.message.ActionLogMessage; +import core.framework.log.message.PerformanceStatMessage; +import core.framework.util.Maps; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; + +import java.util.Map; + +public class ActionLogSchema { + public final Schema schema; + + public ActionLogSchema() { + schema = SchemaBuilder.record("action") + .fields() + .requiredString("id") + .name("date").type().optional().type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .requiredString("app") + .requiredString("host") + .requiredString("result") + .requiredString("action") + .name("correlation_ids").type().optional().array().items().stringType() + .name("client").type().optional().array().items().stringType() + .name("ref_ids").type().optional().array().items().stringType() + .optionalString("error_code") + .optionalString("error_message") + .requiredLong("elapsed") + .name("context").type().optional().type(SchemaBuilder.map().values().stringType()) + .name("stats").type().optional().map().values().doubleType() + .name("perf_stats").type().optional().map().values().longType() + .endRecord(); + } + + public GenericData.Record record(ActionLogMessage message) { + var record = new GenericData.Record(schema); + record.put("id", message.id); + record.put("date", message.date); + record.put("app", message.app); + record.put("host", message.host); + record.put("result", message.result); + record.put("action", message.action); + record.put("correlation_ids", message.correlationIds); + record.put("client", message.clients); + record.put("ref_ids", message.refIds); + record.put("error_code", message.errorCode); + record.put("error_message", message.errorMessage); + record.put("elapsed", message.elapsed); + record.put("context", message.context); + record.put("stats", message.stats); + Map perfStats = Maps.newHashMapWithExpectedSize(message.performanceStats.size() * 3); + for (Map.Entry entry : message.performanceStats.entrySet()) { + String key = entry.getKey(); + PerformanceStatMessage stat = entry.getValue(); + perfStats.put(key + ".count", (long) stat.count); + perfStats.put(key + ".total_elapsed", stat.totalElapsed); + if (stat.readEntries != null) perfStats.put(key + ".read_entries", (long) stat.readEntries); + if (stat.writeEntries != null) perfStats.put(key + ".write_entries", (long) stat.writeEntries); + } + record.put("perf_stats", message.performanceStats); + return record; + } +} diff --git a/ext/log-exporter/src/main/java/core/log/domain/EventSchema.java b/ext/log-exporter/src/main/java/core/log/domain/EventSchema.java new file mode 100644 index 000000000..25386c0d8 --- /dev/null +++ b/ext/log-exporter/src/main/java/core/log/domain/EventSchema.java @@ -0,0 +1,46 @@ +package core.log.domain; + +import core.framework.log.message.EventMessage; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; + +public class EventSchema { + public final Schema schema; + + public EventSchema() { + schema = SchemaBuilder.record("event") + .fields() + .requiredString("id") + .name("date").type().optional().type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .requiredString("app") + .name("received_time").type().optional().type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .requiredString("result") + .requiredString("action") + .optionalString("error_code") + .optionalString("error_message") + .requiredLong("elapsed") + .name("context").type().optional().type(SchemaBuilder.map().values().stringType()) + .name("stats").type().optional().map().values().doubleType() + .name("info").type().optional().type(SchemaBuilder.map().values().stringType()) + .endRecord(); + } + + public GenericData.Record record(EventMessage message) { + var record = new GenericData.Record(schema); + record.put("id", message.id); + record.put("date", message.date); + record.put("app", message.app); + record.put("received_time", message.receivedTime); + record.put("result", message.result); + record.put("action", message.action); + record.put("error_code", message.errorCode); + record.put("error_message", message.errorMessage); + record.put("elapsed", message.elapsed); + record.put("context", message.context); + record.put("stats", message.stats); + record.put("info", message.info); + return record; + } +} diff --git a/ext/log-exporter/src/main/java/core/log/job/ProcessLogJob.java b/ext/log-exporter/src/main/java/core/log/job/ProcessLogJob.java index f1fcfffcf..c77936451 100644 --- a/ext/log-exporter/src/main/java/core/log/job/ProcessLogJob.java +++ b/ext/log-exporter/src/main/java/core/log/job/ProcessLogJob.java @@ -5,6 +5,7 @@ import core.framework.scheduler.JobContext; import core.log.service.ArchiveService; +import java.io.IOException; import java.time.LocalDate; /** @@ -15,7 +16,7 @@ public class ProcessLogJob implements Job { ArchiveService archiveService; @Override - public void execute(JobContext context) { + public void execute(JobContext context) throws IOException { LocalDate today = context.scheduledTime.toLocalDate(); archiveService.cleanupArchive(today.minusDays(5)); // cleanup first, to free disk space when possible archiveService.uploadArchive(today.minusDays(1)); diff --git a/ext/log-exporter/src/main/java/core/log/kafka/ActionLogMessageHandler.java b/ext/log-exporter/src/main/java/core/log/kafka/ActionLogMessageHandler.java index d69ec7bc2..8ccf0092f 100644 --- a/ext/log-exporter/src/main/java/core/log/kafka/ActionLogMessageHandler.java +++ b/ext/log-exporter/src/main/java/core/log/kafka/ActionLogMessageHandler.java @@ -1,64 +1,46 @@ package core.log.kafka; import core.framework.inject.Inject; -import core.framework.internal.json.JSONWriter; import core.framework.kafka.BulkMessageHandler; import core.framework.kafka.Message; import core.framework.log.message.ActionLogMessage; -import core.log.domain.ActionLogEntry; +import core.log.domain.ActionLogSchema; import core.log.service.ArchiveService; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.specific.SpecificDatumWriter; -import java.io.BufferedOutputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.time.LocalDateTime; +import java.time.LocalDate; import java.util.List; -import static java.nio.file.StandardOpenOption.APPEND; -import static java.nio.file.StandardOpenOption.CREATE; - /** * @author neo */ public class ActionLogMessageHandler implements BulkMessageHandler { - private final JSONWriter writer = new JSONWriter<>(ActionLogEntry.class); - @Inject ArchiveService archiveService; + @Inject + ActionLogSchema schema; @Override public void handle(List> messages) throws IOException { - LocalDateTime now = LocalDateTime.now(); + LocalDate now = LocalDate.now(); - Path path = archiveService.initializeLogFilePath(archiveService.actionLogPath(now.toLocalDate())); - try (BufferedOutputStream stream = new BufferedOutputStream(Files.newOutputStream(path, CREATE, APPEND), 3 * 1024 * 1024)) { - for (Message message : messages) { - ActionLogEntry entry = entry(message.value); + Path path = archiveService.localActionLogFilePath(now); + archiveService.createParentDir(path); - stream.write(writer.toJSON(entry)); - stream.write('\n'); + try (DataFileWriter writer = new DataFileWriter<>(new SpecificDatumWriter<>(schema.schema))) { + if (!Files.exists(path)) { + writer.create(schema.schema, path.toFile()); + } else { + writer.appendTo(path.toFile()); + } + for (Message message : messages) { + writer.append(schema.record(message.value)); } } } - - private ActionLogEntry entry(ActionLogMessage message) { - var entry = new ActionLogEntry(); - entry.id = message.id; - entry.date = message.date; - entry.app = message.app; - entry.host = message.host; - entry.result = message.result; - entry.action = message.action; - entry.correlationIds = message.correlationIds; - entry.clients = message.clients; - entry.refIds = message.refIds; - entry.errorCode = message.errorCode; - entry.errorMessage = message.errorMessage; - entry.elapsed = message.elapsed; - entry.context = message.context; - entry.stats = message.stats; - entry.performanceStats = message.performanceStats; - return entry; - } } diff --git a/ext/log-exporter/src/main/java/core/log/kafka/EventMessageHandler.java b/ext/log-exporter/src/main/java/core/log/kafka/EventMessageHandler.java index 1de6a7753..761077dc2 100644 --- a/ext/log-exporter/src/main/java/core/log/kafka/EventMessageHandler.java +++ b/ext/log-exporter/src/main/java/core/log/kafka/EventMessageHandler.java @@ -1,40 +1,45 @@ package core.log.kafka; import core.framework.inject.Inject; -import core.framework.internal.json.JSONWriter; import core.framework.kafka.BulkMessageHandler; import core.framework.kafka.Message; import core.framework.log.message.EventMessage; +import core.log.domain.EventSchema; import core.log.service.ArchiveService; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.specific.SpecificDatumWriter; -import java.io.BufferedOutputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.time.LocalDate; import java.util.List; -import static java.nio.file.StandardOpenOption.APPEND; -import static java.nio.file.StandardOpenOption.CREATE; - /** * @author neo */ public class EventMessageHandler implements BulkMessageHandler { - private final JSONWriter writer = new JSONWriter<>(EventMessage.class); - @Inject ArchiveService archiveService; + @Inject + EventSchema schema; @Override public void handle(List> messages) throws IOException { - LocalDate date = LocalDate.now(); + LocalDate now = LocalDate.now(); - Path path = archiveService.initializeLogFilePath(archiveService.eventPath(date)); - try (BufferedOutputStream stream = new BufferedOutputStream(Files.newOutputStream(path, CREATE, APPEND), 3 * 1024 * 1024)) { + Path path = archiveService.localEventFilePath(now); + archiveService.createParentDir(path); + + try (DataFileWriter writer = new DataFileWriter<>(new SpecificDatumWriter<>(schema.schema))) { + if (!Files.exists(path)) { + writer.create(schema.schema, path.toFile()); + } else { + writer.appendTo(path.toFile()); + } for (Message message : messages) { - stream.write(writer.toJSON(message.value)); - stream.write('\n'); + writer.append(schema.record(message.value)); } } } diff --git a/ext/log-exporter/src/main/java/core/log/service/ArchiveService.java b/ext/log-exporter/src/main/java/core/log/service/ArchiveService.java index e138bac8d..f5a4aa42e 100644 --- a/ext/log-exporter/src/main/java/core/log/service/ArchiveService.java +++ b/ext/log-exporter/src/main/java/core/log/service/ArchiveService.java @@ -2,8 +2,18 @@ import core.framework.crypto.Hash; import core.framework.inject.Inject; +import core.framework.util.Files; import core.framework.util.Network; +import core.framework.util.Randoms; +import core.framework.util.StopWatch; import core.framework.util.Strings; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.parquet.avro.AvroParquetWriter; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.LocalOutputFile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,46 +37,78 @@ public class ArchiveService { @Inject UploadService uploadService; - public void uploadArchive(LocalDate date) { + public void uploadArchive(LocalDate date) throws IOException { logger.info("uploading begin, date={}", date); - String actionLogPath = actionLogPath(date); - Path actionLogFilePath = Path.of(logDir.toString(), actionLogPath); + Path actionLogFilePath = localActionLogFilePath(date); if (exists(actionLogFilePath)) { - uploadService.upload(actionLogFilePath, actionLogPath); + Path actionLogParquetFilePath = convertToParquet(actionLogFilePath); + String remoteActionLogPath = remoteActionLogPath(date); + uploadService.upload(actionLogParquetFilePath, remoteActionLogPath); + Files.delete(actionLogParquetFilePath); } - String eventPath = eventPath(date); - Path eventFilePath = Path.of(logDir.toString(), eventPath); + Path eventFilePath = localEventFilePath(date); if (exists(eventFilePath)) { - uploadService.upload(eventFilePath, eventPath); + Path eventParquetFilePath = convertToParquet(actionLogFilePath); + String remoteEventPath = remoteEventPath(date); + uploadService.upload(eventParquetFilePath, remoteEventPath); + Files.delete(eventParquetFilePath); } logger.info("uploading end, date={}", date); } + Path convertToParquet(Path sourcePath) throws IOException { + var watch = new StopWatch(); + var targetPath = sourcePath.resolveSibling(sourcePath.getFileName() + "." + Randoms.alphaNumeric(5) + ".parquet"); + try (DataFileReader reader = new DataFileReader<>(sourcePath.toFile(), new GenericDatumReader<>()); + ParquetWriter writer = AvroParquetWriter + .builder(new LocalOutputFile(targetPath)) + .withSchema(reader.getSchema()) + .withCompressionCodec(CompressionCodecName.ZSTD) + .build()) { + + for (GenericData.Record record : reader) { + writer.write(record); + } + + } finally { + logger.info("convert avro to parquet, source={}, target={}, elapsed={}", sourcePath, targetPath, watch.elapsed()); + } + return targetPath; + } + public void cleanupArchive(LocalDate date) { logger.info("cleaning up archives, date={}", date); - Path actionLogFilePath = Path.of(logDir.toString(), actionLogPath(date)); + Path actionLogFilePath = localActionLogFilePath(date); shell.execute("rm", "-f", actionLogFilePath.toString()); - Path eventFilePath = Path.of(logDir.toString(), eventPath(date)); + Path eventFilePath = localEventFilePath(date); shell.execute("rm", "-f", eventFilePath.toString()); } - public String actionLogPath(LocalDate date) { - return Strings.format("/action/{}/action-{}-{}.ndjson", date.getYear(), date, hash); + public String remoteActionLogPath(LocalDate date) { + return Strings.format("/action/{}/action-{}-{}.parquet", date.getYear(), date, hash); + } + + public String remoteEventPath(LocalDate date) { + return Strings.format("/event/{}/event-{}-{}.parquet", date.getYear(), date, hash); + } + + public Path localActionLogFilePath(LocalDate date) { + String path = Strings.format("/action/{}/action-{}-{}.avro", date.getYear(), date, hash); + return Path.of(logDir.toString(), path); } - public String eventPath(LocalDate date) { - return Strings.format("/event/{}/event-{}-{}.ndjson", date.getYear(), date, hash); + public Path localEventFilePath(LocalDate date) { + String path = Strings.format("/event/{}/event-{}-{}.avro", date.getYear(), date, hash); + return Path.of(logDir.toString(), path); } - public Path initializeLogFilePath(String logPath) throws IOException { - Path path = Path.of(logDir.toString(), logPath); + public void createParentDir(Path path) throws IOException { Path parent = path.getParent(); if (parent != null && !exists(parent)) createDirectories(parent); - return path; } } diff --git a/ext/log-exporter/src/test/java/core/log/job/ProcessLogJobTest.java b/ext/log-exporter/src/test/java/core/log/job/ProcessLogJobTest.java index 88694917d..9b4dedacf 100644 --- a/ext/log-exporter/src/test/java/core/log/job/ProcessLogJobTest.java +++ b/ext/log-exporter/src/test/java/core/log/job/ProcessLogJobTest.java @@ -29,7 +29,7 @@ void createProcessLogJob() { } @Test - void execute() { + void execute() throws Exception { job.execute(new JobContext("job", ZonedDateTime.parse("2022-11-07T01:00:00Z"))); verify(archiveService).uploadArchive(LocalDate.parse("2022-11-06")); diff --git a/ext/log-exporter/src/test/java/core/log/kafka/ActionLogMessageHandlerTest.java b/ext/log-exporter/src/test/java/core/log/kafka/ActionLogMessageHandlerTest.java index a2c85899f..8700e9e51 100644 --- a/ext/log-exporter/src/test/java/core/log/kafka/ActionLogMessageHandlerTest.java +++ b/ext/log-exporter/src/test/java/core/log/kafka/ActionLogMessageHandlerTest.java @@ -1,8 +1,11 @@ package core.log.kafka; +import core.framework.inject.Inject; import core.framework.kafka.Message; import core.framework.log.message.ActionLogMessage; import core.framework.util.Files; +import core.log.IntegrationTest; +import core.log.domain.ActionLogSchema; import core.log.service.ArchiveService; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -11,18 +14,22 @@ import java.io.IOException; import java.time.Instant; import java.util.List; +import java.util.Map; /** * @author neo */ -class ActionLogMessageHandlerTest { +class ActionLogMessageHandlerTest extends IntegrationTest { private ActionLogMessageHandler handler; + @Inject + ActionLogSchema schema; @BeforeEach void createActionLogMessageHandler() { handler = new ActionLogMessageHandler(); handler.archiveService = new ArchiveService(); handler.archiveService.logDir = Files.tempDir(); + handler.schema = schema; } @AfterEach @@ -36,7 +43,13 @@ void handle() throws IOException { message.date = Instant.parse("2022-11-07T00:00:00Z"); message.id = "id"; message.app = "app"; + message.action = "action"; + message.result = "OK"; + message.host = "host"; + message.elapsed = 1000L; + message.performanceStats = Map.of(); message.traceLog = "trace"; handler.handle(List.of(new Message<>("key", message))); + handler.handle(List.of(new Message<>("key", message))); } } diff --git a/ext/log-exporter/src/test/java/core/log/kafka/EventMessageHandlerTest.java b/ext/log-exporter/src/test/java/core/log/kafka/EventMessageHandlerTest.java index e89949e70..42ae47876 100644 --- a/ext/log-exporter/src/test/java/core/log/kafka/EventMessageHandlerTest.java +++ b/ext/log-exporter/src/test/java/core/log/kafka/EventMessageHandlerTest.java @@ -1,8 +1,11 @@ package core.log.kafka; +import core.framework.inject.Inject; import core.framework.kafka.Message; import core.framework.log.message.EventMessage; import core.framework.util.Files; +import core.log.IntegrationTest; +import core.log.domain.EventSchema; import core.log.service.ArchiveService; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -15,14 +18,17 @@ /** * @author neo */ -class EventMessageHandlerTest { +class EventMessageHandlerTest extends IntegrationTest { private EventMessageHandler handler; + @Inject + EventSchema schema; @BeforeEach void createEventMessageHandler() { handler = new EventMessageHandler(); handler.archiveService = new ArchiveService(); handler.archiveService.logDir = Files.tempDir(); + handler.schema = schema; } @AfterEach @@ -36,6 +42,10 @@ void handle() throws IOException { message.date = Instant.parse("2022-11-07T00:00:00Z"); message.id = "id"; message.app = "app"; + message.action = "action"; + message.result = "OK"; + message.elapsed = 1000L; + handler.handle(List.of(new Message<>("key", message))); handler.handle(List.of(new Message<>("key", message))); } } diff --git a/ext/log-exporter/src/test/java/core/log/service/ArchiveServiceTest.java b/ext/log-exporter/src/test/java/core/log/service/ArchiveServiceTest.java index 1a3a900b1..132c1e21e 100644 --- a/ext/log-exporter/src/test/java/core/log/service/ArchiveServiceTest.java +++ b/ext/log-exporter/src/test/java/core/log/service/ArchiveServiceTest.java @@ -1,12 +1,22 @@ package core.log.service; +import core.framework.inject.Inject; +import core.framework.log.message.EventMessage; import core.framework.util.Files; +import core.log.IntegrationTest; +import core.log.domain.EventSchema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.specific.SpecificDatumWriter; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledOnOs; import org.junit.jupiter.api.condition.OS; +import java.io.IOException; +import java.nio.file.Path; +import java.time.Instant; import java.time.LocalDate; import static org.assertj.core.api.Assertions.assertThat; @@ -14,7 +24,9 @@ /** * @author neo */ -class ArchiveServiceTest { +class ArchiveServiceTest extends IntegrationTest { + @Inject + EventSchema schema; private ArchiveService archiveService; @BeforeEach @@ -29,15 +41,27 @@ void cleanup() { } @Test - void actionLogPath() { - assertThat(archiveService.actionLogPath(LocalDate.parse("2022-11-03"))) - .matches("/action/2022/action-2022-11-03-[a-z0-9]*.ndjson"); + void remoteActionLogPath() { + assertThat(archiveService.remoteActionLogPath(LocalDate.parse("2022-11-03"))) + .matches("/action/2022/action-2022-11-03-[a-z0-9]*.parquet"); } @Test - void eventPath() { - assertThat(archiveService.eventPath(LocalDate.parse("2022-11-03"))) - .matches("/event/2022/event-2022-11-03-[a-z0-9]*.ndjson"); + void remoteEventPath() { + assertThat(archiveService.remoteEventPath(LocalDate.parse("2022-11-03"))) + .matches("/event/2022/event-2022-11-03-[a-z0-9]*.parquet"); + } + + @Test + void localActionLogFilePath() { + assertThat(archiveService.localActionLogFilePath(LocalDate.parse("2022-11-03")).toString()) + .matches(".*/action/2022/action-2022-11-03-[a-z0-9]*.avro"); + } + + @Test + void localEventFilePath() { + assertThat(archiveService.localEventFilePath(LocalDate.parse("2022-11-03")).toString()) + .matches(".*/event/2022/event-2022-11-03-[a-z0-9]*.avro"); } @Test @@ -47,7 +71,30 @@ void cleanupArchive() { } @Test - void uploadActionLog() { + void uploadActionLog() throws IOException { archiveService.uploadArchive(LocalDate.now()); } + + @Test + void convertToParquet() throws IOException { + var message = new EventMessage(); + message.date = Instant.parse("2022-11-07T00:00:00Z"); + message.id = "id"; + message.app = "app"; + message.action = "action"; + message.result = "OK"; + message.elapsed = 1000L; + + Path avroPath = archiveService.localEventFilePath(LocalDate.parse("2022-11-07")); + archiveService.createParentDir(avroPath); + + try (DataFileWriter writer = new DataFileWriter<>(new SpecificDatumWriter<>(schema.schema))) { + writer.create(schema.schema, avroPath.toFile()); + writer.append(schema.record(message)); + writer.append(schema.record(message)); + } + + Path parquetPath = archiveService.convertToParquet(avroPath); + assertThat(parquetPath).exists(); + } } diff --git a/ext/log-exporter/src/test/java/core/log/web/UploadControllerTest.java b/ext/log-exporter/src/test/java/core/log/web/UploadControllerTest.java index b599741d1..006bb4381 100644 --- a/ext/log-exporter/src/test/java/core/log/web/UploadControllerTest.java +++ b/ext/log-exporter/src/test/java/core/log/web/UploadControllerTest.java @@ -13,6 +13,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import java.io.IOException; import java.time.LocalDate; import static org.assertj.core.api.Assertions.assertThat; @@ -42,7 +43,7 @@ void createUploadController() { } @Test - void execute() { + void execute() throws IOException { Response response = controller.execute(request); assertThat(response.status()).isEqualTo(HTTPStatus.NO_CONTENT);