Skip to content

Commit

Permalink
* sse: support client side "x-trace-id" header
Browse files Browse the repository at this point in the history
Signed-off-by: neo <1100909+neowu@users.noreply.github.com>
  • Loading branch information
neowu committed Mar 4, 2025
1 parent 4d59f4e commit 0abc0f6
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* sse: send ErrorResponse to client via "event: error" on exception
* sse: log clientIP on sse:close action
* log-exporter: export action and event in parquet format
* sse: support client side "x-trace-id" header

### 9.1.6 (2/10/2025 - 2/25/2025)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

class ServerSentEventCloseHandler<T> implements ExchangeCompletionListener {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerSentEventCloseHandler.class);
final ServerSentEventContextImpl<T> context;
private final LogManager logManager;
private final ChannelImpl<T> channel;
private final String clientIP;
private final Map<String, String> logContext;

ServerSentEventCloseHandler(LogManager logManager, ChannelImpl<T> channel, ServerSentEventContextImpl<T> context, String clientIP) {
ServerSentEventCloseHandler(LogManager logManager, ChannelImpl<T> channel, ServerSentEventContextImpl<T> context, Map<String, String> logContext) {
this.logManager = logManager;
this.channel = channel;
this.context = context;
this.clientIP = clientIP;
this.logContext = logContext;
}

@Override
Expand All @@ -36,7 +37,11 @@ public void exchangeEvent(HttpServerExchange exchange, NextListener next) {
List<String> refIds = List.of(channel.refId);
actionLog.refIds = refIds;
actionLog.correlationIds = refIds;
actionLog.context.put("client_ip", List.of(clientIP));

for (Map.Entry<String, String> entry : logContext.entrySet()) {
actionLog.context(entry.getKey(), entry.getValue());
}

if (!channel.groups.isEmpty()) actionLog.context("group", channel.groups.toArray());
context.remove(channel);
channel.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ServerSentEventHandler implements HttpHandler {
// persistent connection, use longer max process time
// though LB backend timeout is to 600s, for long time sse, it should be processed via message queue
static final long MAX_PROCESS_TIME_IN_NANO = Duration.ofSeconds(300).toNanos();
static final HttpString HEADER_TRACE_ID = new HttpString("x-trace-id");

private static final HttpString LAST_EVENT_ID = new HttpString("Last-Event-ID");
private final Logger logger = LoggerFactory.getLogger(ServerSentEventHandler.class);
Expand Down Expand Up @@ -97,9 +99,18 @@ void handle(HttpServerExchange exchange, StreamSinkChannel sink) {

channel = new ChannelImpl<>(exchange, sink, support.context, support.builder, actionLog.id);
actionLog.context("channel", channel.id);

Map<String, String> logContext = new HashMap<>();
logContext.put("client_id", request.clientIP());
String traceId = exchange.getRequestHeaders().getFirst(HEADER_TRACE_ID); // used by frontend to trace request
if (traceId != null) {
actionLog.context.put("trace_id", List.of(traceId));
logContext.put("trace_id", traceId);
}

sink.getWriteSetter().set(channel.writeListener);
support.context.add(channel);
exchange.addExchangeCompleteListener(new ServerSentEventCloseHandler<>(logManager, channel, support.context, request.clientIP()));
exchange.addExchangeCompleteListener(new ServerSentEventCloseHandler<>(logManager, channel, support.context, logContext));

channel.sendBytes(Strings.bytes("retry: 5000\n\n")); // set browser retry to 5s

Expand Down

0 comments on commit 0abc0f6

Please sign in to comment.