From 871472074a3c6f9e4e1366d4d22be66d63b75cdd Mon Sep 17 00:00:00 2001 From: AlexK Date: Sun, 10 Jul 2022 19:10:44 +0600 Subject: [PATCH 1/2] update dependency versions for preventing known security issues --- pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index ac38cf0..7e5c203 100644 --- a/pom.xml +++ b/pom.xml @@ -84,13 +84,13 @@ org.apache.httpcomponents httpclient - 4.5.12 + 4.5.13 org.apache.httpcomponents httpcore - 4.4.13 + 4.4.15 @@ -102,7 +102,7 @@ com.google.guava guava - 29.0-jre + 31.1-jre org.reflections From 424287e3e5e8940afad25d529b7c68abb33315fe Mon Sep 17 00:00:00 2001 From: AlexK Date: Sun, 10 Jul 2022 23:57:50 +0600 Subject: [PATCH 2/2] try to recreate websocket connection which closed by server --- .../com/sirius/sdk/base/ReadOnlyChannel.java | 1 - .../java/com/sirius/sdk/base/Retryer.java | 176 +++++++++++++++++ .../sirius/sdk/base/WebSocketConnector.java | 179 ++++-------------- .../java/com/sirius/sdk/base/RetryerTest.java | 43 +++++ 4 files changed, 260 insertions(+), 139 deletions(-) create mode 100644 src/main/java/com/sirius/sdk/base/Retryer.java create mode 100644 src/test/java/com/sirius/sdk/base/RetryerTest.java diff --git a/src/main/java/com/sirius/sdk/base/ReadOnlyChannel.java b/src/main/java/com/sirius/sdk/base/ReadOnlyChannel.java index 81fae7a..37c4c99 100644 --- a/src/main/java/com/sirius/sdk/base/ReadOnlyChannel.java +++ b/src/main/java/com/sirius/sdk/base/ReadOnlyChannel.java @@ -8,7 +8,6 @@ public interface ReadOnlyChannel { /** * Read message packet - * @param timeout Operation timeout is sec * @return chunk of data stream */ CompletableFuture read(); diff --git a/src/main/java/com/sirius/sdk/base/Retryer.java b/src/main/java/com/sirius/sdk/base/Retryer.java new file mode 100644 index 0000000..f846994 --- /dev/null +++ b/src/main/java/com/sirius/sdk/base/Retryer.java @@ -0,0 +1,176 @@ +package com.sirius.sdk.base; + +import com.sirius.sdk.utils.Pair; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class Retryer { + + private static final Logger log = Logger.getLogger(Retryer.class.getName()); + + private final Policy policy; + + private final Context context; + + public Retryer(Policy policy, Context context) { + this.policy = policy; + this.context = context; + } + + public static Builder builder() { + return new Builder(); + } + + public R retry(Callable callable) throws Exception { + while (policy.shouldRetry(context)) { + try { + return callable.call(); + } catch (Exception e) { + context.setInvocationTimestamp(Instant.now()); + log.log(Level.WARNING, "try " + context.getInvocationNumber() + " failed: " + e.getMessage()); + context.incrementInvocationNumber(); + context.setResult(Pair.pair(null, e)); + Duration delay = policy.nextRunDelay(context); + if (delay != null && !delay.isZero()) { + try { + Thread.sleep(delay.toMillis()); + } catch (InterruptedException ex) { + break; + } + } + } + } + if (context.getResult().second != null) { + throw context.getResult().second; + } else { + return (R) context.getResult().first; + } + + } + + public static class Context { + + private final AtomicInteger invocationNumber = new AtomicInteger(0); + + private final AtomicReference invocationTimestamp = new AtomicReference<>(); + + private final AtomicReference> result = new AtomicReference<>(); + + public Pair getResult() { + return result.get(); + } + + public void setResult(Pair result) { + this.result.set(result); + } + + public int getInvocationNumber() { + return invocationNumber.get(); + } + + public int incrementInvocationNumber() { + return invocationNumber.incrementAndGet(); + } + + public Instant getInvocationTimestamp() { + return invocationTimestamp.get(); + } + + public void setInvocationTimestamp(Instant invocationTimestamp) { + this.invocationTimestamp.set(invocationTimestamp); + } + } + + public interface Policy { + + int DEFAULT_RETRY_COUNT = 3; + Duration DEFAULT_DELAY = Duration.ZERO; + + default boolean shouldRetry(Context context) { + return context.getInvocationNumber() < DEFAULT_RETRY_COUNT; + } + + default Duration nextRunDelay(Context context) { + return DEFAULT_DELAY; + } + + class Builder { + + private int maxAttempts = Policy.DEFAULT_RETRY_COUNT; + private Duration waitDuration = Policy.DEFAULT_DELAY; + private Function intervalFunction; + + public Builder maxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + public Builder waitDuration(Duration waitDuration) { + this.waitDuration = waitDuration; + this.intervalFunction = null; + return this; + } + + public Builder intervalFunction(Function intervalFunction) { + this.intervalFunction = intervalFunction; + this.waitDuration = null; + return this; + } + + private Policy build() { + return new Policy() { + @Override + public boolean shouldRetry(Context context) { + return context.getInvocationNumber() < maxAttempts; + } + + @Override + public Duration nextRunDelay(Context context) { + if (waitDuration != null) { + return waitDuration; + } else if (intervalFunction != null) { + return intervalFunction.apply(context.getInvocationNumber()); + } else { + return Policy.super.nextRunDelay(context); + } + } + }; + } + + } + } + + public static class Builder { + + private final Policy.Builder retryPolicy = new Policy.Builder(); + + public Builder maxAttempts(int maxAttempts) { + retryPolicy.maxAttempts(maxAttempts); + return this; + } + + public Builder waitDuration(Duration waitDuration) { + retryPolicy.waitDuration(waitDuration); + return this; + } + + public Builder intervalFunction(Function intervalFunction) { + retryPolicy.intervalFunction(intervalFunction); + return this; + } + + public Retryer build() { + return new Retryer(retryPolicy.build(), new Context()); + } + + } + + +} diff --git a/src/main/java/com/sirius/sdk/base/WebSocketConnector.java b/src/main/java/com/sirius/sdk/base/WebSocketConnector.java index 59ea8e7..56f48cd 100644 --- a/src/main/java/com/sirius/sdk/base/WebSocketConnector.java +++ b/src/main/java/com/sirius/sdk/base/WebSocketConnector.java @@ -1,16 +1,20 @@ package com.sirius.sdk.base; -import com.neovisionaries.ws.client.*; +import com.neovisionaries.ws.client.WebSocket; +import com.neovisionaries.ws.client.WebSocketAdapter; +import com.neovisionaries.ws.client.WebSocketException; +import com.neovisionaries.ws.client.WebSocketExtension; +import com.neovisionaries.ws.client.WebSocketFactory; +import com.neovisionaries.ws.client.WebSocketFrame; +import com.neovisionaries.ws.client.WebSocketListener; +import com.sirius.sdk.errors.sirius_exceptions.SiriusConnectionClosed; import com.sirius.sdk.messaging.Message; import com.sirius.sdk.utils.StringUtils; import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Map; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.function.Function; import java.util.logging.Level; @@ -23,7 +27,7 @@ public class WebSocketConnector extends BaseConnector { Charset encoding = StandardCharsets.UTF_8; String serverAddress; String path; - byte[] credentials = null; + byte[] credentials; WebSocket webSocket; public Function readCallback = null; @@ -44,158 +48,56 @@ public WebSocketConnector(String serverAddress, String path, byte[] credentials) initWebSocket(); } - WebSocketListener webSocketListener = new WebSocketListener() { - @Override - public void onStateChanged(WebSocket webSocket, WebSocketState webSocketState) throws Exception { - - } + WebSocketListener webSocketListener = new WebSocketAdapter() { @Override - public void onConnected(WebSocket webSocket, Map> map) throws Exception { - //log.log(Level.INFO, "Connected"); - } - - @Override - public void onConnectError(WebSocket webSocket, WebSocketException e) throws Exception { + public void onConnectError(WebSocket webSocket, WebSocketException e) { log.log(Level.WARNING, "Connect error"); } @Override - public void onDisconnected(WebSocket webSocket, WebSocketFrame webSocketFrame, WebSocketFrame webSocketFrame1, boolean b) throws Exception { + public void onDisconnected(WebSocket websocket, + WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, + boolean closedByServer) throws Exception { + if (closedByServer) { + throw new SiriusConnectionClosed("Socket is closed by server"); + } //log.log(Level.INFO, "Disconnected"); } @Override - public void onFrame(WebSocket webSocket, WebSocketFrame webSocketFrame) throws Exception { - - } - - @Override - public void onContinuationFrame(WebSocket webSocket, WebSocketFrame webSocketFrame) throws Exception { - - } - - @Override - public void onTextFrame(WebSocket webSocket, WebSocketFrame webSocketFrame) throws Exception { - read(webSocketFrame, null, defTimeout); - } - - @Override - public void onBinaryFrame(WebSocket webSocket, WebSocketFrame webSocketFrame) throws Exception { - read(webSocketFrame, null, defTimeout); - } - - @Override - public void onCloseFrame(WebSocket webSocket, WebSocketFrame webSocketFrame) throws Exception { - - } - - @Override - public void onPingFrame(WebSocket webSocket, WebSocketFrame webSocketFrame) throws Exception { - + public void onTextFrame(WebSocket webSocket, WebSocketFrame webSocketFrame) { + read(webSocketFrame); } @Override - public void onPongFrame(WebSocket webSocket, WebSocketFrame webSocketFrame) throws Exception { - - } - - @Override - public void onTextMessage(WebSocket webSocket, String s) throws Exception { - - } - - @Override - public void onTextMessage(WebSocket webSocket, byte[] bytes) throws Exception { - - } - - @Override - public void onBinaryMessage(WebSocket webSocket, byte[] bytes) throws Exception { - - } - - @Override - public void onSendingFrame(WebSocket webSocket, WebSocketFrame webSocketFrame) throws Exception { - - } - - @Override - public void onFrameSent(WebSocket webSocket, WebSocketFrame webSocketFrame) throws Exception { - - } - - @Override - public void onFrameUnsent(WebSocket webSocket, WebSocketFrame webSocketFrame) throws Exception { - - } - - @Override - public void onThreadCreated(WebSocket webSocket, ThreadType threadType, Thread thread) throws Exception { - - } - - @Override - public void onThreadStarted(WebSocket webSocket, ThreadType threadType, Thread thread) throws Exception { - - } - - @Override - public void onThreadStopping(WebSocket webSocket, ThreadType threadType, Thread thread) throws Exception { - - } - - @Override - public void onError(WebSocket webSocket, WebSocketException e) throws Exception { - - } - - @Override - public void onFrameError(WebSocket webSocket, WebSocketException e, WebSocketFrame webSocketFrame) throws Exception { - - } - - @Override - public void onMessageError(WebSocket webSocket, WebSocketException e, List list) throws Exception { - - } - - @Override - public void onMessageDecompressionError(WebSocket webSocket, WebSocketException e, byte[] bytes) throws Exception { - - } - - @Override - public void onTextMessageError(WebSocket webSocket, WebSocketException e, byte[] bytes) throws Exception { - - } - - @Override - public void onSendError(WebSocket webSocket, WebSocketException e, WebSocketFrame webSocketFrame) throws Exception { - - } - - @Override - public void onUnexpectedError(WebSocket webSocket, WebSocketException e) throws Exception { - + public void onBinaryFrame(WebSocket webSocket, WebSocketFrame webSocketFrame) { + read(webSocketFrame); } @Override public void handleCallbackError(WebSocket webSocket, Throwable throwable) throws Exception { - - } - - @Override - public void onSendingHandshake(WebSocket webSocket, String s, List list) throws Exception { - + if (throwable instanceof SiriusConnectionClosed) { + log.log(Level.WARNING, throwable.getMessage()); + Retryer retryer = Retryer.builder() + .maxAttempts(3) + .waitDuration(Duration.ofMillis(100L)) + .build(); + retryer.retry(() -> { + WebSocketConnector.this.close(); + WebSocketConnector.this.initWebSocket(); + return null; + }); + } } }; public void initWebSocket() { String url = serverAddress + "/" + path; - while (url.endsWith("/")) - url = url.substring(0, url.length()-1); + while (url.endsWith("/")) { + url = url.substring(0, url.length() - 1); + } try { webSocket = new WebSocketFactory() .setVerifyHostname(false) @@ -204,7 +106,7 @@ public void initWebSocket() { .addListener(webSocketListener) .addExtension(WebSocketExtension.PERMESSAGE_DEFLATE) .setPingInterval(60 * 3 * 1000). - addHeader("origin", serverAddress); + addHeader("origin", serverAddress); if (this.credentials != null) { webSocket.addHeader("credentials", StringUtils.bytesToString(credentials)); } @@ -249,11 +151,12 @@ public CompletableFuture read() { } - private byte[] read(WebSocketFrame frame, WebSocketException exception, int timeout) { + private byte[] read(WebSocketFrame frame) { if (frame != null) { readFuture.complete(frame.getPayload()); - if (readCallback != null) + if (readCallback != null) { readCallback.apply(frame.getPayload()); + } return frame.getPayload(); } return null; diff --git a/src/test/java/com/sirius/sdk/base/RetryerTest.java b/src/test/java/com/sirius/sdk/base/RetryerTest.java new file mode 100644 index 0000000..dee03b9 --- /dev/null +++ b/src/test/java/com/sirius/sdk/base/RetryerTest.java @@ -0,0 +1,43 @@ +package com.sirius.sdk.base; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +public class RetryerTest { + + @Test + public void retryResult() throws Exception { + String expected = "test"; + Retryer retryer = Retryer.builder().build(); + String actual = retryer.retry(() -> "test"); + Assert.assertEquals(expected, actual); + + } + + @Test + public void retryCount() { + int retryCount = 5; + AtomicInteger actualCount = new AtomicInteger(0); + Retryer retryer = Retryer.builder().maxAttempts(retryCount).build(); + try { + retryer.retry(() -> { + int currentTry = actualCount.getAndIncrement(); + throw new TestRetryException("test call " + currentTry); + }); + } catch (Exception e) { + Assert.assertTrue("Unexpected exception was thrown", e instanceof TestRetryException); + Assert.assertEquals("wrong retry count", retryCount, actualCount.get()); + } + + } + + public static class TestRetryException extends RuntimeException { + + public TestRetryException(String message) { + super(message); + } + } + +} \ No newline at end of file