Skip to content

Commit

Permalink
* log-exporter: export action and event in parquet format
Browse files Browse the repository at this point in the history
Signed-off-by: neo <1100909+neowu@users.noreply.github.com>
  • Loading branch information
neowu committed Mar 3, 2025
1 parent e35fad8 commit 7bb4022
Show file tree
Hide file tree
Showing 15 changed files with 312 additions and 79 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

* sse: send ErrorResponse to client via "event: error" on exception
* sse: log clientIP on sse:close action
* log-exporter: export action and event in parquet format

### 9.1.6 (2/10/2025 - 2/25/2025)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ public class EventMessage {
public String errorCode;
@Property(name = "error_message")
public String errorMessage;
@Property(name = "elapsed")
public Long elapsed;
@Property(name = "context")
public Map<String, String> context;
@Property(name = "stats")
public Map<String, Double> stats;
@Property(name = "info")
public Map<String, String> info;
@Property(name = "elapsed")
public Long elapsed;
}
11 changes: 11 additions & 0 deletions ext/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ project("log-exporter") {
apply(plugin = "app")
dependencies {
implementation(project(":core-ng"))

// for parquet
compileOnly("org.apache.hadoop:hadoop-annotations:3.4.1")
implementation("org.apache.parquet:parquet-avro:1.15.0")
implementation("org.apache.avro:avro:1.12.0")
implementation("org.apache.hadoop:hadoop-common:3.4.1@jar")
runtimeOnly("commons-collections:commons-collections:3.2.2@jar")
runtimeOnly("com.fasterxml.woodstox:woodstox-core:5.4.0@jar")
runtimeOnly("org.codehaus.woodstox:stax2-api:4.2.1@jar")
runtimeOnly("org.apache.hadoop.thirdparty:hadoop-shaded-guava:1.2.0@jar")

testImplementation(project(":core-ng-test"))
}
}
Expand Down
9 changes: 9 additions & 0 deletions ext/log-exporter/src/main/java/core/log/LogExporterApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
import core.framework.log.message.EventMessage;
import core.framework.log.message.LogTopics;
import core.framework.module.App;
import core.log.domain.ActionLogSchema;
import core.log.domain.EventSchema;
import core.log.job.ProcessLogJob;
import core.log.kafka.ActionLogMessageHandler;
import core.log.kafka.EventMessageHandler;
import core.log.service.ArchiveService;
import core.log.service.UploadService;
import core.log.web.UploadController;
import core.log.web.UploadRequest;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.specific.SpecificData;

import java.time.Duration;
import java.time.LocalTime;
Expand All @@ -31,6 +35,11 @@ protected void initialize() {
kafka().minPoll(1024 * 1024, Duration.ofMillis(5000)); // try to get at least 1M message, and can wait longer
kafka().maxPoll(3000, 3 * 1024 * 1024); // get 3M message at max

SpecificData specificData = SpecificData.get();
specificData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
bind(ActionLogSchema.class);
bind(EventSchema.class);

bind(new UploadService(requiredProperty("app.log.bucket")));
bind(ArchiveService.class);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package core.log.domain;

import core.framework.log.message.ActionLogMessage;
import core.framework.log.message.PerformanceStatMessage;
import core.framework.util.Maps;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;

import java.util.Map;

public class ActionLogSchema {
public final Schema schema;

public ActionLogSchema() {
schema = SchemaBuilder.record("action")
.fields()
.requiredString("id")
.name("date").type().optional().type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)))
.requiredString("app")
.requiredString("host")
.requiredString("result")
.requiredString("action")
.name("correlation_ids").type().optional().array().items().stringType()
.name("client").type().optional().array().items().stringType()
.name("ref_ids").type().optional().array().items().stringType()
.optionalString("error_code")
.optionalString("error_message")
.requiredLong("elapsed")
.name("context").type().optional().type(SchemaBuilder.map().values().stringType())
.name("stats").type().optional().map().values().doubleType()
.name("perf_stats").type().optional().map().values().longType()
.endRecord();
}

public GenericData.Record record(ActionLogMessage message) {
var record = new GenericData.Record(schema);
record.put("id", message.id);
record.put("date", message.date);
record.put("app", message.app);
record.put("host", message.host);
record.put("result", message.result);
record.put("action", message.action);
record.put("correlation_ids", message.correlationIds);
record.put("client", message.clients);
record.put("ref_ids", message.refIds);
record.put("error_code", message.errorCode);
record.put("error_message", message.errorMessage);
record.put("elapsed", message.elapsed);
record.put("context", message.context);
record.put("stats", message.stats);
Map<String, Long> perfStats = Maps.newHashMapWithExpectedSize(message.performanceStats.size() * 3);
for (Map.Entry<String, PerformanceStatMessage> entry : message.performanceStats.entrySet()) {
String key = entry.getKey();
PerformanceStatMessage stat = entry.getValue();
perfStats.put(key + ".count", (long) stat.count);
perfStats.put(key + ".total_elapsed", stat.totalElapsed);
if (stat.readEntries != null) perfStats.put(key + ".read_entries", (long) stat.readEntries);
if (stat.writeEntries != null) perfStats.put(key + ".write_entries", (long) stat.writeEntries);
}
record.put("perf_stats", message.performanceStats);
return record;
}
}
46 changes: 46 additions & 0 deletions ext/log-exporter/src/main/java/core/log/domain/EventSchema.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package core.log.domain;

import core.framework.log.message.EventMessage;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;

public class EventSchema {
public final Schema schema;

public EventSchema() {
schema = SchemaBuilder.record("event")
.fields()
.requiredString("id")
.name("date").type().optional().type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)))
.requiredString("app")
.name("received_time").type().optional().type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)))
.requiredString("result")
.requiredString("action")
.optionalString("error_code")
.optionalString("error_message")
.requiredLong("elapsed")
.name("context").type().optional().type(SchemaBuilder.map().values().stringType())
.name("stats").type().optional().map().values().doubleType()
.name("info").type().optional().type(SchemaBuilder.map().values().stringType())
.endRecord();
}

public GenericData.Record record(EventMessage message) {
var record = new GenericData.Record(schema);
record.put("id", message.id);
record.put("date", message.date);
record.put("app", message.app);
record.put("received_time", message.receivedTime);
record.put("result", message.result);
record.put("action", message.action);
record.put("error_code", message.errorCode);
record.put("error_message", message.errorMessage);
record.put("elapsed", message.elapsed);
record.put("context", message.context);
record.put("stats", message.stats);
record.put("info", message.info);
return record;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import core.framework.scheduler.JobContext;
import core.log.service.ArchiveService;

import java.io.IOException;
import java.time.LocalDate;

/**
Expand All @@ -15,7 +16,7 @@ public class ProcessLogJob implements Job {
ArchiveService archiveService;

@Override
public void execute(JobContext context) {
public void execute(JobContext context) throws IOException {
LocalDate today = context.scheduledTime.toLocalDate();
archiveService.cleanupArchive(today.minusDays(5)); // cleanup first, to free disk space when possible
archiveService.uploadArchive(today.minusDays(1));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,64 +1,46 @@
package core.log.kafka;

import core.framework.inject.Inject;
import core.framework.internal.json.JSONWriter;
import core.framework.kafka.BulkMessageHandler;
import core.framework.kafka.Message;
import core.framework.log.message.ActionLogMessage;
import core.log.domain.ActionLogEntry;
import core.log.domain.ActionLogSchema;
import core.log.service.ArchiveService;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.LocalDate;
import java.util.List;

import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;

/**
* @author neo
*/
public class ActionLogMessageHandler implements BulkMessageHandler<ActionLogMessage> {
private final JSONWriter<ActionLogEntry> writer = new JSONWriter<>(ActionLogEntry.class);

@Inject
ArchiveService archiveService;
@Inject
ActionLogSchema schema;

@Override
public void handle(List<Message<ActionLogMessage>> messages) throws IOException {
LocalDateTime now = LocalDateTime.now();
LocalDate now = LocalDate.now();

Path path = archiveService.initializeLogFilePath(archiveService.actionLogPath(now.toLocalDate()));
try (BufferedOutputStream stream = new BufferedOutputStream(Files.newOutputStream(path, CREATE, APPEND), 3 * 1024 * 1024)) {
for (Message<ActionLogMessage> message : messages) {
ActionLogEntry entry = entry(message.value);
Path path = archiveService.localActionLogFilePath(now);
archiveService.createParentDir(path);

stream.write(writer.toJSON(entry));
stream.write('\n');
try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new SpecificDatumWriter<>(schema.schema))) {
if (!Files.exists(path)) {
writer.create(schema.schema, path.toFile());
} else {
writer.appendTo(path.toFile());
}
for (Message<ActionLogMessage> message : messages) {
writer.append(schema.record(message.value));
}
}
}

private ActionLogEntry entry(ActionLogMessage message) {
var entry = new ActionLogEntry();
entry.id = message.id;
entry.date = message.date;
entry.app = message.app;
entry.host = message.host;
entry.result = message.result;
entry.action = message.action;
entry.correlationIds = message.correlationIds;
entry.clients = message.clients;
entry.refIds = message.refIds;
entry.errorCode = message.errorCode;
entry.errorMessage = message.errorMessage;
entry.elapsed = message.elapsed;
entry.context = message.context;
entry.stats = message.stats;
entry.performanceStats = message.performanceStats;
return entry;
}
}
Original file line number Diff line number Diff line change
@@ -1,40 +1,45 @@
package core.log.kafka;

import core.framework.inject.Inject;
import core.framework.internal.json.JSONWriter;
import core.framework.kafka.BulkMessageHandler;
import core.framework.kafka.Message;
import core.framework.log.message.EventMessage;
import core.log.domain.EventSchema;
import core.log.service.ArchiveService;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificDatumWriter;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDate;
import java.util.List;

import static java.nio.file.StandardOpenOption.APPEND;
import static java.nio.file.StandardOpenOption.CREATE;

/**
* @author neo
*/
public class EventMessageHandler implements BulkMessageHandler<EventMessage> {
private final JSONWriter<EventMessage> writer = new JSONWriter<>(EventMessage.class);

@Inject
ArchiveService archiveService;
@Inject
EventSchema schema;

@Override
public void handle(List<Message<EventMessage>> messages) throws IOException {
LocalDate date = LocalDate.now();
LocalDate now = LocalDate.now();

Path path = archiveService.initializeLogFilePath(archiveService.eventPath(date));
try (BufferedOutputStream stream = new BufferedOutputStream(Files.newOutputStream(path, CREATE, APPEND), 3 * 1024 * 1024)) {
Path path = archiveService.localEventFilePath(now);
archiveService.createParentDir(path);

try (DataFileWriter<GenericData.Record> writer = new DataFileWriter<>(new SpecificDatumWriter<>(schema.schema))) {
if (!Files.exists(path)) {
writer.create(schema.schema, path.toFile());
} else {
writer.appendTo(path.toFile());
}
for (Message<EventMessage> message : messages) {
stream.write(writer.toJSON(message.value));
stream.write('\n');
writer.append(schema.record(message.value));
}
}
}
Expand Down
Loading

0 comments on commit 7bb4022

Please sign in to comment.