Skip to content

Commit

Permalink
* log-exporter: export action and event in parquet format
Browse files Browse the repository at this point in the history
Signed-off-by: neo <1100909+neowu@users.noreply.github.com>
  • Loading branch information
neowu committed Mar 4, 2025
1 parent 533a8fe commit b70deab
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.conf.PlainParquetConfiguration;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.LocalOutputFile;
Expand Down Expand Up @@ -62,11 +63,14 @@ public void uploadArchive(LocalDate date) throws IOException {
Path convertToParquet(Path sourcePath) throws IOException {
var watch = new StopWatch();
var targetPath = sourcePath.resolveSibling(sourcePath.getFileName() + "." + Randoms.alphaNumeric(5) + ".parquet");
var config = new PlainParquetConfiguration();
config.setBoolean("parquet.avro.write-old-list-structure", false);
try (DataFileReader<GenericData.Record> reader = new DataFileReader<>(sourcePath.toFile(), new GenericDatumReader<>());
ParquetWriter<GenericData.Record> writer = AvroParquetWriter
.<GenericData.Record>builder(new LocalOutputFile(targetPath))
.withSchema(reader.getSchema())
.withCompressionCodec(CompressionCodecName.ZSTD)
.withConf(config)
.build()) {

for (GenericData.Record record : reader) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package core.log;

import core.framework.log.message.ActionLogMessage;
import core.framework.log.message.PerformanceStatMessage;

import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class ActionLogMessageFactory {
public static ActionLogMessage create() {
var message = new ActionLogMessage();
message.date = Instant.parse("2022-11-07T00:00:00Z");
message.id = "id";
message.app = "app";
message.action = "action";
message.result = "OK";
message.host = "host";
message.elapsed = 1000L;
List<String> keys = new ArrayList<>();
keys.add(null);
message.context = Map.of("customer_id", List.of("customer_id1", "customer_id2"), "key", keys);
message.performanceStats = Map.of("kafka", perfStats(1, 1000L, 10, 5),
"http", perfStats(2, 2000L, null, null));
return message;
}

private static PerformanceStatMessage perfStats(int count, long totalElapsed, Integer readEntries, Integer writeEntries) {
final PerformanceStatMessage stats = new PerformanceStatMessage();
stats.count = count;
stats.totalElapsed = totalElapsed;
stats.readEntries = readEntries;
stats.writeEntries = writeEntries;
return stats;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

import core.framework.inject.Inject;
import core.framework.kafka.Message;
import core.framework.log.message.ActionLogMessage;
import core.framework.log.message.PerformanceStatMessage;
import core.framework.util.Files;
import core.log.ActionLogMessageFactory;
import core.log.IntegrationTest;
import core.log.domain.ActionLogSchema;
import core.log.service.ArchiveService;
Expand All @@ -13,10 +12,7 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* @author neo
Expand All @@ -41,30 +37,10 @@ void cleanup() {

@Test
void handle() throws IOException {
var message = new ActionLogMessage();
message.date = Instant.parse("2022-11-07T00:00:00Z");
message.id = "id";
message.app = "app";
message.action = "action";
message.result = "OK";
message.host = "host";
message.elapsed = 1000L;
List<String> keys = new ArrayList<>();
keys.add(null);
message.context = Map.of("customer_id", List.of("customer_id1", "customer_id2"), "key", keys);
message.performanceStats = Map.of("kafka", perfStats(1, 1000L, 10, 5),
"http", perfStats(2, 2000L, null, null));
var message = ActionLogMessageFactory.create();
message.traceLog = "trace";

handler.handle(List.of(new Message<>("key", message)));
handler.handle(List.of(new Message<>("key", message)));
}

private PerformanceStatMessage perfStats(int count, long totalElapsed, Integer readEntries, Integer writeEntries) {
final PerformanceStatMessage stats = new PerformanceStatMessage();
stats.count = count;
stats.totalElapsed = totalElapsed;
stats.readEntries = readEntries;
stats.writeEntries = writeEntries;
return stats;
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package core.log.service;

import core.framework.inject.Inject;
import core.framework.log.message.EventMessage;
import core.framework.util.Files;
import core.log.ActionLogMessageFactory;
import core.log.IntegrationTest;
import core.log.domain.EventSchema;
import core.log.domain.ActionLogSchema;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificDatumWriter;
Expand All @@ -16,7 +16,6 @@

import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
import java.time.LocalDate;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -26,7 +25,7 @@
*/
class ArchiveServiceTest extends IntegrationTest {
@Inject
EventSchema schema;
ActionLogSchema schema;
private ArchiveService archiveService;

@BeforeEach
Expand Down Expand Up @@ -77,15 +76,9 @@ void uploadActionLog() throws IOException {

@Test
void convertToParquet() throws IOException {
var message = new EventMessage();
message.date = Instant.parse("2022-11-07T00:00:00Z");
message.id = "id";
message.app = "app";
message.action = "action";
message.result = "OK";
message.elapsed = 1000L;

Path avroPath = archiveService.localEventFilePath(LocalDate.parse("2022-11-07"));
var message = ActionLogMessageFactory.create();

Path avroPath = archiveService.localActionLogFilePath(LocalDate.parse("2022-11-07"));
archiveService.createParentDir(avroPath);

try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new SpecificDatumWriter<>(schema.schema))) {
Expand Down

0 comments on commit b70deab

Please sign in to comment.