Skip to content

Commit

Permalink
review and tweak sse server and client side handling
Browse files Browse the repository at this point in the history
Signed-off-by: neo <1100909+neowu@users.noreply.github.com>
  • Loading branch information
neowu committed Aug 1, 2024
1 parent c397006 commit dff7f45
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 24 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

* ws/sse: updated max process time
* kafka: update to 3.8.0
* http_client: preliminary support for sse
* http_client: preliminary sse support
> example usage:
```
Expand Down
1 change: 1 addition & 0 deletions core-ng/src/main/java/core/framework/http/EventSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public EventSource(int statusCode, Map<String, String> headers, ResponseBody bod

@Override
public void close() {
LOGGER.debug("[sse] close sse connection");
ActionLogContext.track("sse", elapsed, responseBodyLength, requestBodyLength);
body.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ public final class HTTPClientException extends RuntimeException implements Error

private final String errorCode;

public HTTPClientException(String message, String errorCode) {
super(message);
this.errorCode = errorCode;
}

public HTTPClientException(String message, String errorCode, Throwable cause) {
super(message, cause);
this.errorCode = errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,15 @@ public EventSource sse(HTTPRequest request) {
int statusCode = httpResponse.code();
logger.debug("[response] status={}", statusCode);
Map<String, String> headers = headers(httpResponse);
String contentType = headers.get(HTTPHeaders.CONTENT_TYPE);
if (statusCode != 200 || !"text/event-stream".equals(contentType)) {
byte[] body = body(httpResponse, statusCode);
logger.debug("[response] body={}", BodyLogParam.of(body, contentType == null ? null : ContentType.parse(contentType)));
throw new HTTPClientException(Strings.format("invalid sse response, statusCode={}, content-type={}", statusCode, contentType), "HTTP_REQUEST_FAILED");
}
return new EventSource(statusCode, headers, httpResponse.body(), requestBodyLength, watch.elapsed());
} catch (IOException e) {
throw new HTTPClientException(Strings.format("http request failed, uri={}, error={}", request.uri, e.getMessage()), "HTTP_REQUEST_FAILED", e);
throw new HTTPClientException(Strings.format("sse request failed, uri={}, error={}", request.uri, e.getMessage()), "HTTP_REQUEST_FAILED", e);
} finally {
long elapsed = watch.elapsed();
logger.debug("sse, elapsed={}", elapsed);
Expand All @@ -97,19 +103,8 @@ public EventSource sse(HTTPRequest request) {
HTTPResponse response(Response httpResponse) throws IOException {
int statusCode = httpResponse.code();
logger.debug("[response] status={}", statusCode);

Map<String, String> headers = headers(httpResponse);

byte[] body;
if (statusCode == 204) {
// refer to https://tools.ietf.org/html/rfc7230#section-3.3.2, with 204, server won't send body and content-length, hence no need to read it, and body will be quietly closed by response.close()
body = new byte[0];
} else {
ResponseBody responseBody = httpResponse.body();
if (responseBody == null) throw new Error("unexpected response body"); // refer to okhttp3.Response.body(), call.execute always return non-null body except for cachedResponse/networkResponse
body = responseBody.bytes();
}

byte[] body = body(httpResponse, statusCode);
var response = new HTTPResponse(statusCode, headers, body);
logger.debug("[response] body={}", BodyLogParam.of(body, response.contentType));
return response;
Expand Down Expand Up @@ -157,6 +152,19 @@ Request httpRequest(HTTPRequest request) {
return builder.build();
}

private byte[] body(Response httpResponse, int statusCode) throws IOException {
byte[] body;
if (statusCode == 204) {
// refer to https://tools.ietf.org/html/rfc7230#section-3.3.2, with 204, server won't send body and content-length, hence no need to read it, and body will be quietly closed by response.close()
body = new byte[0];
} else {
ResponseBody responseBody = httpResponse.body();
if (responseBody == null) throw new Error("unexpected response body"); // refer to okhttp3.Response.body(), call.execute always return non-null body except for cachedResponse/networkResponse
body = responseBody.bytes();
}
return body;
}

private Map<String, String> headers(Response httpResponse) {
Map<String, String> headers = new TreeMap<>(CASE_INSENSITIVE_ORDER);
Headers httpHeaders = httpResponse.headers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ public class HTTPIOHandler implements HttpHandler {

@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
if (HEALTH_CHECK_PATH.equals(exchange.getRequestPath())) { // not treat health-check as action
String path = exchange.getRequestPath();
if (HEALTH_CHECK_PATH.equals(path)) { // not treat health-check as action
handler.addKeepAliveHeader(exchange);
exchange.endExchange(); // end exchange will send 200 / content-length=0
return;
Expand All @@ -53,7 +54,7 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {

HttpString method = exchange.getRequestMethod();
HeaderMap headers = exchange.getRequestHeaders();
boolean sse = sseHandler != null && sseHandler.check(method, headers);
boolean sse = sseHandler != null && sseHandler.check(method, headers, path);
boolean ws = webSocketHandler != null && webSocketHandler.check(method, headers);
boolean active = !sse && !ws;
boolean shutdown = shutdownHandler.handle(exchange, active);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import core.framework.internal.web.session.ReadOnlySession;
import core.framework.internal.web.session.SessionManager;
import core.framework.module.ServerSentEventConfig;
import core.framework.web.exception.NotFoundException;
import core.framework.web.sse.ChannelListener;
import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
Expand Down Expand Up @@ -42,13 +41,13 @@ public ServerSentEventHandler(LogManager logManager, SessionManager sessionManag
this.handlerContext = handlerContext;
}

public boolean check(HttpString method, HeaderMap headers) {
return Methods.GET.equals(method) && "text/event-stream".equals(headers.getFirst(Headers.ACCEPT));
public boolean check(HttpString method, HeaderMap headers, String path) {
return Methods.GET.equals(method) && "text/event-stream".equals(headers.getFirst(Headers.ACCEPT)) && supports.containsKey(path);
}

@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/event-stream; charset=UTF-8");
exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/event-stream");
exchange.setPersistent(false);
StreamSinkChannel sink = exchange.getResponseChannel();
if (sink.flush()) {
Expand All @@ -64,7 +63,6 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
}
}

@SuppressWarnings("PMD.ExceptionAsFlowControl")
void handle(HttpServerExchange exchange, StreamSinkChannel sink) {
VirtualThread.COUNT.increase();
long httpDelay = System.nanoTime() - exchange.getRequestStartTime();
Expand All @@ -80,9 +78,7 @@ void handle(HttpServerExchange exchange, StreamSinkChannel sink) {
actionLog.warningContext.maxProcessTimeInNano(MAX_PROCESS_TIME_IN_NANO);
String path = request.path();
@SuppressWarnings("unchecked")
ChannelSupport<Object> support = (ChannelSupport<Object>) supports.get(path);
if (support == null) throw new NotFoundException("not found, path=" + path, "PATH_NOT_FOUND");

ChannelSupport<Object> support = (ChannelSupport<Object>) supports.get(path); // ServerSentEventHandler.check() ensures path exists
actionLog.action("sse:" + path + ":open");
handlerContext.rateControl.validateRate(ServerSentEventConfig.SSE_OPEN_GROUP, request.clientIP());

Expand Down

0 comments on commit dff7f45

Please sign in to comment.