From 2407a63ad14ae97a18b55a9480493d06333e8af4 Mon Sep 17 00:00:00 2001 From: baranowb Date: Wed, 25 Oct 2023 13:45:08 +0200 Subject: [PATCH 01/30] [UNDERTOW-2333] introduce WebSocket IO specific timeouts --- .../main/java/io/undertow/UndertowLogger.java | 6 +++++- .../main/java/io/undertow/UndertowOptions.java | 15 +++++++++++++++ .../ResponseCachingStreamSinkConduit.java | 2 +- .../websockets/core/WebSocketChannel.java | 18 ++++++++++++++++++ 4 files changed, 39 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/undertow/UndertowLogger.java b/core/src/main/java/io/undertow/UndertowLogger.java index f771ed8ab9..7fd2147acd 100644 --- a/core/src/main/java/io/undertow/UndertowLogger.java +++ b/core/src/main/java/io/undertow/UndertowLogger.java @@ -484,4 +484,8 @@ void nodeConfigCreated(URI connectionURI, String balancer, String domain, String @LogMessage(level = WARN) @Message(id = 5106, value = "Content mismatch for '%s'. Expected length '%s', but was '%s'.") void contentEntryMismatch(Object key, long indicatedSize, long written); -} + + @LogMessage(level = WARN) + @Message(id = 5107, value = "Failed to set web socket timeout.") + void failedToSetWSTimeout(@Cause Exception e); +} \ No newline at end of file diff --git a/core/src/main/java/io/undertow/UndertowOptions.java b/core/src/main/java/io/undertow/UndertowOptions.java index acefed2d03..60fb51fade 100644 --- a/core/src/main/java/io/undertow/UndertowOptions.java +++ b/core/src/main/java/io/undertow/UndertowOptions.java @@ -19,6 +19,9 @@ package io.undertow; import org.xnio.Option; +import org.xnio.Options; +import org.xnio.channels.ReadTimeoutException; +import org.xnio.channels.WriteTimeoutException; /** * @author Stuart Douglas @@ -440,6 +443,18 @@ public class UndertowOptions { */ public static final Option MAX_RST_FRAMES_PER_WINDOW = Option.simple(UndertowOptions.class, "MAX_RST_STREAMS_PER_TIME_WINDOW", Integer.class); + /** + * Configure a read timeout for a web socket, in milliseconds. If its present it will override {@link org.xnio.Options.READ_TIMEOUT}. If the given amount of time elapses without + * a successful read taking place, the socket's next read will throw a {@link ReadTimeoutException}. + */ + public static final Option WEB_SOCKETS_READ_TIMEOUT = Option.simple(Options.class, "WEB_SOCKETS_READ_TIMEOUT", Integer.class); + + /** + * Configure a write timeout for a web socket, in milliseconds. If its present it will override {@link org.xnio.Options.WRITE_TIMEOUT}. If the given amount of time elapses without + * a successful write taking place, the socket's next write will throw a {@link WriteTimeoutException}. + */ + public static final Option WEB_SOCKETS_WRITE_TIMEOUT = Option.simple(Options.class, "WEB_SOCKETS_WRITE_TIMEOUT", Integer.class); + private UndertowOptions() { } diff --git a/core/src/main/java/io/undertow/server/handlers/cache/ResponseCachingStreamSinkConduit.java b/core/src/main/java/io/undertow/server/handlers/cache/ResponseCachingStreamSinkConduit.java index 0206ce7623..6d4008d1e6 100644 --- a/core/src/main/java/io/undertow/server/handlers/cache/ResponseCachingStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/server/handlers/cache/ResponseCachingStreamSinkConduit.java @@ -167,4 +167,4 @@ public long writeFinal(ByteBuffer[] srcs, int offset, int length) throws IOExcep public int writeFinal(ByteBuffer src) throws IOException { return Conduits.writeFinalBasic(this, src); } -} +} \ No newline at end of file diff --git a/core/src/main/java/io/undertow/websockets/core/WebSocketChannel.java b/core/src/main/java/io/undertow/websockets/core/WebSocketChannel.java index 4b76338091..7b0c3b568a 100644 --- a/core/src/main/java/io/undertow/websockets/core/WebSocketChannel.java +++ b/core/src/main/java/io/undertow/websockets/core/WebSocketChannel.java @@ -17,6 +17,8 @@ */ package io.undertow.websockets.core; +import io.undertow.UndertowLogger; +import io.undertow.UndertowOptions; import io.undertow.conduits.IdleTimeoutConduit; import io.undertow.server.protocol.framed.AbstractFramedChannel; import io.undertow.server.protocol.framed.AbstractFramedStreamSourceChannel; @@ -28,6 +30,8 @@ import org.xnio.ChannelListeners; import org.xnio.IoUtils; import org.xnio.OptionMap; +import org.xnio.Options; + import io.undertow.connector.ByteBufferPool; import io.undertow.connector.PooledByteBuffer; import org.xnio.StreamConnection; @@ -106,6 +110,20 @@ protected WebSocketChannel(final StreamConnection connectedStreamChannel, ByteBu this.hasReservedOpCode = extensionFunction.hasExtensionOpCode(); this.subProtocol = subProtocol; this.peerConnections = peerConnections; + if(options.contains(UndertowOptions.WEB_SOCKETS_READ_TIMEOUT)) { + try { + this.setOption(Options.READ_TIMEOUT, options.get(UndertowOptions.WEB_SOCKETS_READ_TIMEOUT)); + } catch (IOException e) { + UndertowLogger.ROOT_LOGGER.failedToSetWSTimeout(e); + } + } + if(options.contains(UndertowOptions.WEB_SOCKETS_WRITE_TIMEOUT)) { + try { + this.setOption(Options.WRITE_TIMEOUT, options.get(UndertowOptions.WEB_SOCKETS_WRITE_TIMEOUT)); + } catch (IOException e) { + UndertowLogger.ROOT_LOGGER.failedToSetWSTimeout(e); + } + } addCloseTask(new ChannelListener() { @Override public void handleEvent(WebSocketChannel channel) { From 7bd8b5096e66c75b1cbad06f7851e61a92337b6f Mon Sep 17 00:00:00 2001 From: baranowb Date: Tue, 7 Nov 2023 11:19:53 +0100 Subject: [PATCH 02/30] [UNDERTOW-2333] Add websocket timeout testcase --- .../io/undertow/testutils/DefaultServer.java | 16 +- .../protocol/WebSocketTimeoutTestCase.java | 182 ++++++++++++++++++ .../websockets/utils/WebSocketTestClient.java | 21 ++ 3 files changed, 211 insertions(+), 8 deletions(-) create mode 100644 core/src/test/java/io/undertow/websockets/core/protocol/WebSocketTimeoutTestCase.java diff --git a/core/src/test/java/io/undertow/testutils/DefaultServer.java b/core/src/test/java/io/undertow/testutils/DefaultServer.java index a04bd949a0..75525d48c3 100644 --- a/core/src/test/java/io/undertow/testutils/DefaultServer.java +++ b/core/src/test/java/io/undertow/testutils/DefaultServer.java @@ -443,7 +443,7 @@ public static boolean startServer() { } else { server = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), 7777 + PROXY_OFFSET), acceptListener, serverOptions); - proxyOpenListener = new HttpOpenListener(pool, OptionMap.create(UndertowOptions.BUFFER_PIPELINED_DATA, true)); + proxyOpenListener = new HttpOpenListener(pool, OptionMap.builder().addAll(serverOptions).set(UndertowOptions.BUFFER_PIPELINED_DATA, true).getMap()); proxyAcceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(proxyOpenListener)); proxyServer = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT)), proxyAcceptListener, serverOptions); loadBalancingProxyClient = new LoadBalancingProxyClient(GSSAPIAuthenticationMechanism.EXCLUSIVITY_CHECKER) @@ -466,7 +466,7 @@ public static boolean startServer() { server = ssl.createSslConnectionServer(worker, new InetSocketAddress(getHostAddress("default"), 7777 + PROXY_OFFSET), acceptListener, serverOptions); server.resumeAccepts(); - proxyOpenListener = new HttpOpenListener(pool, OptionMap.create(UndertowOptions.BUFFER_PIPELINED_DATA, true)); + proxyOpenListener = new HttpOpenListener(pool, OptionMap.builder().addAll(serverOptions).set(UndertowOptions.BUFFER_PIPELINED_DATA, true).getMap()); proxyAcceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(proxyOpenListener)); proxyServer = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT)), proxyAcceptListener, serverOptions); loadBalancingProxyClient = new LoadBalancingProxyClient(GSSAPIAuthenticationMechanism.EXCLUSIVITY_CHECKER) @@ -488,13 +488,13 @@ public static boolean startServer() { proxyOpenListener.setRootHandler(proxyHandler); proxyServer.resumeAccepts(); } else if (h2c || h2cUpgrade) { - openListener = new HttpOpenListener(pool, OptionMap.create(UndertowOptions.ENABLE_HTTP2, true, UndertowOptions.HTTP2_PADDING_SIZE, 10)); + openListener = new HttpOpenListener(pool, OptionMap.builder().addAll(serverOptions).set(UndertowOptions.ENABLE_HTTP2, true).set(UndertowOptions.HTTP2_PADDING_SIZE, 10).getMap()); acceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(openListener)); InetSocketAddress targetAddress = new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT) + PROXY_OFFSET); server = worker.createStreamConnectionServer(targetAddress, acceptListener, serverOptions); - proxyOpenListener = new HttpOpenListener(pool, OptionMap.create(UndertowOptions.BUFFER_PIPELINED_DATA, true)); + proxyOpenListener = new HttpOpenListener(pool, OptionMap.builder().addAll(serverOptions).set(UndertowOptions.BUFFER_PIPELINED_DATA, true).getMap()); proxyAcceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(proxyOpenListener)); proxyServer = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT)), proxyAcceptListener, serverOptions); loadBalancingProxyClient = new LoadBalancingProxyClient(GSSAPIAuthenticationMechanism.EXCLUSIVITY_CHECKER) @@ -519,13 +519,13 @@ public static boolean startServer() { } else if (https) { XnioSsl clientSsl = new UndertowXnioSsl(xnio, OptionMap.EMPTY, SSL_BUFFER_POOL, createClientSslContext()); - openListener = new HttpOpenListener(pool, OptionMap.create(UndertowOptions.BUFFER_PIPELINED_DATA, true)); + openListener = new HttpOpenListener(pool, OptionMap.builder().addAll(serverOptions).set(UndertowOptions.BUFFER_PIPELINED_DATA, true).getMap()); acceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(openListener)); server = ssl.createSslConnectionServer(worker, new InetSocketAddress(getHostAddress("default"), 7777 + PROXY_OFFSET), acceptListener, serverOptions); server.getAcceptSetter().set(acceptListener); server.resumeAccepts(); - proxyOpenListener = new HttpOpenListener(pool, OptionMap.create(UndertowOptions.BUFFER_PIPELINED_DATA, true)); + proxyOpenListener = new HttpOpenListener(pool, OptionMap.builder().addAll(serverOptions).set(UndertowOptions.BUFFER_PIPELINED_DATA, true).getMap()); proxyAcceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(proxyOpenListener)); proxyServer = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT)), proxyAcceptListener, serverOptions); loadBalancingProxyClient = new LoadBalancingProxyClient(GSSAPIAuthenticationMechanism.EXCLUSIVITY_CHECKER) @@ -545,7 +545,7 @@ public static boolean startServer() { if (h2) { UndertowLogger.ROOT_LOGGER.error("HTTP2 selected but Netty ALPN was not on the boot class path"); } - openListener = new HttpOpenListener(pool, OptionMap.builder().set(UndertowOptions.BUFFER_PIPELINED_DATA, true).set(UndertowOptions.ENABLE_CONNECTOR_STATISTICS, true).set(UndertowOptions.REQUIRE_HOST_HTTP11, true).getMap()); + openListener = new HttpOpenListener(pool, OptionMap.builder().addAll(serverOptions).set(UndertowOptions.BUFFER_PIPELINED_DATA, true).set(UndertowOptions.ENABLE_CONNECTOR_STATISTICS, true).set(UndertowOptions.REQUIRE_HOST_HTTP11, true).getMap()); acceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(openListener)); if (!proxy) { server = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT)), acceptListener, serverOptions); @@ -553,7 +553,7 @@ public static boolean startServer() { InetSocketAddress targetAddress = new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT) + PROXY_OFFSET); server = worker.createStreamConnectionServer(targetAddress, acceptListener, serverOptions); - proxyOpenListener = new HttpOpenListener(pool, OptionMap.create(UndertowOptions.BUFFER_PIPELINED_DATA, true)); + proxyOpenListener = new HttpOpenListener(pool, OptionMap.builder().addAll(serverOptions).set(UndertowOptions.BUFFER_PIPELINED_DATA, true).getMap()); proxyAcceptListener = ChannelListeners.openListenerAdapter(wrapOpenListener(proxyOpenListener)); proxyServer = worker.createStreamConnectionServer(new InetSocketAddress(Inet4Address.getByName(getHostAddress(DEFAULT)), getHostPort(DEFAULT)), proxyAcceptListener, serverOptions); loadBalancingProxyClient = new LoadBalancingProxyClient(GSSAPIAuthenticationMechanism.EXCLUSIVITY_CHECKER) diff --git a/core/src/test/java/io/undertow/websockets/core/protocol/WebSocketTimeoutTestCase.java b/core/src/test/java/io/undertow/websockets/core/protocol/WebSocketTimeoutTestCase.java new file mode 100644 index 0000000000..a020d3c6e3 --- /dev/null +++ b/core/src/test/java/io/undertow/websockets/core/protocol/WebSocketTimeoutTestCase.java @@ -0,0 +1,182 @@ +/* + * JBoss, Home of Professional Open Source. + * Copyright 2023 Red Hat, Inc., and individual contributors + * as indicated by the @author tags. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.undertow.websockets.core.protocol; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.xnio.FutureResult; +import org.xnio.OptionMap; +import org.xnio.Options; + +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.handler.codec.http.websocketx.WebSocketVersion; +import io.netty.util.CharsetUtil; +import io.undertow.UndertowOptions; +import io.undertow.testutils.DefaultServer; +import io.undertow.testutils.HttpOneOnly; +import io.undertow.util.NetworkUtils; +import io.undertow.websockets.WebSocketConnectionCallback; +import io.undertow.websockets.WebSocketProtocolHandshakeHandler; +import io.undertow.websockets.core.AbstractReceiveListener; +import io.undertow.websockets.core.BufferedTextMessage; +import io.undertow.websockets.core.WebSocketChannel; +import io.undertow.websockets.core.WebSockets; +import io.undertow.websockets.spi.WebSocketHttpExchange; +import io.undertow.websockets.utils.FrameChecker; +import io.undertow.websockets.utils.WebSocketTestClient; + +@RunWith(DefaultServer.class) +@HttpOneOnly +public class WebSocketTimeoutTestCase { + + protected void beforeTest(int regularTimeouts, int wsReadTimeout, int wsWriteTimeout) { + DefaultServer.stopServer(); + DefaultServer.setServerOptions(OptionMap.builder() + .set(Options.READ_TIMEOUT, regularTimeouts) + .set(Options.WRITE_TIMEOUT, regularTimeouts) + .set(UndertowOptions.WEB_SOCKETS_READ_TIMEOUT, wsReadTimeout) + .set(UndertowOptions.WEB_SOCKETS_WRITE_TIMEOUT, wsWriteTimeout).getMap()); + + DefaultServer.setUndertowOptions(OptionMap.builder() + .set(Options.READ_TIMEOUT, regularTimeouts) + .set(Options.WRITE_TIMEOUT, regularTimeouts) + .set(UndertowOptions.WEB_SOCKETS_READ_TIMEOUT, wsReadTimeout) + .set(UndertowOptions.WEB_SOCKETS_WRITE_TIMEOUT, wsWriteTimeout).getMap()); + DefaultServer.startServer(); + SCHEDULER = Executors.newScheduledThreadPool(2); + } + + @After + public void afterTest() { + DefaultServer.stopServer(); + DefaultServer.setServerOptions(OptionMap.EMPTY); + DefaultServer.setUndertowOptions(OptionMap.EMPTY); + SCHEDULER.shutdown(); + } + + protected static final int TESTABLE_TIMEOUT_VALUE = 2000; + protected static final int NON_TESTABLE_TIMEOUT_VALUE = 30180; + protected static final int DEFAULTS_IO_TIMEOTU_VALUE = 500; + private ScheduledExecutorService SCHEDULER; + + protected WebSocketVersion getVersion() { + return WebSocketVersion.V13; + } + + + @Test + public void testServerReadTimeout() throws Exception { + beforeTest(DEFAULTS_IO_TIMEOTU_VALUE, TESTABLE_TIMEOUT_VALUE, NON_TESTABLE_TIMEOUT_VALUE); + final AtomicBoolean connected = new AtomicBoolean(false); + DefaultServer.setRootHandler(new WebSocketProtocolHandshakeHandler(new WebSocketConnectionCallback() { + @Override + public void onConnect(final WebSocketHttpExchange exchange, final WebSocketChannel channel) { + connected.set(true); + channel.getReceiveSetter().set(new AbstractReceiveListener() { + @Override + protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) throws IOException { + String string = message.getData(); + + if (string.equals("hello")) { + WebSockets.sendText("world", channel, null); + } else { + WebSockets.sendText(string, channel, null); + } + } + }); + channel.resumeReceives(); + } + })); + + final FutureResult latch = new FutureResult(); + WebSocketTestClient client = new WebSocketTestClient(getVersion(), new URI("ws://" + NetworkUtils.formatPossibleIpv6Address(DefaultServer.getHostAddress("default")) + ":" + DefaultServer.getHostPort("default") + "/")); + client.connect(); + client.send(new TextWebSocketFrame(Unpooled.copiedBuffer("hello", CharsetUtil.US_ASCII)), new FrameChecker(TextWebSocketFrame.class, "world".getBytes(CharsetUtil.US_ASCII), latch)); + latch.getIoFuture().get(); + + final long watchStart = System.currentTimeMillis(); + final long watchTimeout = System.currentTimeMillis()+TESTABLE_TIMEOUT_VALUE+500; + final FutureResult timeoutLatch = new FutureResult(); + ReadTimeoutChannelGuard readTimeoutChannelGuard = new ReadTimeoutChannelGuard(client, timeoutLatch, watchTimeout); + + final ScheduledFuture sf = SCHEDULER.scheduleAtFixedRate(readTimeoutChannelGuard, 0, 50, TimeUnit.MILLISECONDS); + readTimeoutChannelGuard.setTaskScheduledFuture(sf); + + final Long watchTimeEnd = timeoutLatch.getIoFuture().get(); + if(watchTimeEnd == -1) { + Assert.fail("Timeout did not happen... in time. Were waiting '"+watchTimeout+"' ms, timeout should happen in '"+TESTABLE_TIMEOUT_VALUE+"' ms."); + } else { + long timeSpent = watchTimeEnd - watchStart; + //lets be generous and give 150ms diff( there is "fuzz" coded for 50ms in undertow as well + if(!(timeSpent<=TESTABLE_TIMEOUT_VALUE+150)) { + Assert.fail("Timeout did not happen... in time. Socket timeout out in '"+timeSpent+"' ms, supposed to happen in '"+TESTABLE_TIMEOUT_VALUE+"' ms."); + } + } + } + + private static class ReadTimeoutChannelGuard implements Runnable{ + private final WebSocketTestClient channel; + private final FutureResult resultHandler; + private final long watchEnd; + private ScheduledFuture sf; + + ReadTimeoutChannelGuard(final WebSocketTestClient channel, final FutureResult resultHandler, final long watchEnd) { + super(); + this.channel = channel; + this.resultHandler = resultHandler; + this.watchEnd = watchEnd; + } + + public void setTaskScheduledFuture(ScheduledFuture sf2) { + this.sf = sf2; + } + + @Override + public void run() { + if(System.currentTimeMillis() > watchEnd) { + sf.cancel(false); + if(channelActive()) { + resultHandler.setResult(new Long(-1)); + } else { + resultHandler.setResult(System.currentTimeMillis()); + } + } else { + if(!channelActive()) { + sf.cancel(false); + resultHandler.setResult(System.currentTimeMillis()); + } + } + } + + private boolean channelActive() { + return channel.isOpen(); + } + + } + } diff --git a/core/src/test/java/io/undertow/websockets/utils/WebSocketTestClient.java b/core/src/test/java/io/undertow/websockets/utils/WebSocketTestClient.java index 373c186b55..8345338783 100644 --- a/core/src/test/java/io/undertow/websockets/utils/WebSocketTestClient.java +++ b/core/src/test/java/io/undertow/websockets/utils/WebSocketTestClient.java @@ -173,6 +173,27 @@ public void onError(Throwable t) { } } + public boolean isActive() { + if(this.ch != null) { + return this.ch.isActive(); + } + return false; + } + + public boolean isOpen() { + if(this.ch != null) { + return this.ch.isOpen(); + } + return false; + } + + public boolean isWritable() { + if(this.ch != null) { + return this.ch.isWritable(); + } + return false; + } + public interface FrameListener { /** * Is called if an WebSocketFrame was received From 3f771bfa94352455b6ff1504d03863d0fb510957 Mon Sep 17 00:00:00 2001 From: Flavia Rainone Date: Sun, 20 Oct 2024 12:19:36 -0300 Subject: [PATCH 03/30] [UNDERTOW-2518] Tidy up WebSocketTimeoutTestCase and make it uset @BeforeServerStarts and @AfterServerStops for simpler before and after methods Signed-off-by: Flavia Rainone --- .../protocol/WebSocketTimeoutTestCase.java | 116 ++++++++---------- 1 file changed, 53 insertions(+), 63 deletions(-) diff --git a/core/src/test/java/io/undertow/websockets/core/protocol/WebSocketTimeoutTestCase.java b/core/src/test/java/io/undertow/websockets/core/protocol/WebSocketTimeoutTestCase.java index a020d3c6e3..08a4573550 100644 --- a/core/src/test/java/io/undertow/websockets/core/protocol/WebSocketTimeoutTestCase.java +++ b/core/src/test/java/io/undertow/websockets/core/protocol/WebSocketTimeoutTestCase.java @@ -17,22 +17,6 @@ */ package io.undertow.websockets.core.protocol; -import java.io.IOException; -import java.net.URI; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.xnio.FutureResult; -import org.xnio.OptionMap; -import org.xnio.Options; - import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketVersion; @@ -47,72 +31,78 @@ import io.undertow.websockets.core.BufferedTextMessage; import io.undertow.websockets.core.WebSocketChannel; import io.undertow.websockets.core.WebSockets; -import io.undertow.websockets.spi.WebSocketHttpExchange; import io.undertow.websockets.utils.FrameChecker; import io.undertow.websockets.utils.WebSocketTestClient; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.xnio.FutureResult; +import org.xnio.OptionMap; +import org.xnio.Options; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; @RunWith(DefaultServer.class) @HttpOneOnly public class WebSocketTimeoutTestCase { - protected void beforeTest(int regularTimeouts, int wsReadTimeout, int wsWriteTimeout) { - DefaultServer.stopServer(); - DefaultServer.setServerOptions(OptionMap.builder() - .set(Options.READ_TIMEOUT, regularTimeouts) - .set(Options.WRITE_TIMEOUT, regularTimeouts) - .set(UndertowOptions.WEB_SOCKETS_READ_TIMEOUT, wsReadTimeout) - .set(UndertowOptions.WEB_SOCKETS_WRITE_TIMEOUT, wsWriteTimeout).getMap()); + protected static final int TESTABLE_TIMEOUT_VALUE = 2000; + protected static final int NON_TESTABLE_TIMEOUT_VALUE = 30180; + protected static final int DEFAULTS_IO_TIMEOUT_VALUE = 500; + private static ScheduledExecutorService SCHEDULER = null; + @DefaultServer.BeforeServerStarts + public static void beforeTest() { + DefaultServer.setServerOptions(OptionMap.builder() + .set(Options.READ_TIMEOUT, DEFAULTS_IO_TIMEOUT_VALUE) + .set(Options.WRITE_TIMEOUT, DEFAULTS_IO_TIMEOUT_VALUE) + .set(UndertowOptions.WEB_SOCKETS_READ_TIMEOUT, TESTABLE_TIMEOUT_VALUE) + .set(UndertowOptions.WEB_SOCKETS_WRITE_TIMEOUT, NON_TESTABLE_TIMEOUT_VALUE).getMap()); +/* DefaultServer.setUndertowOptions(OptionMap.builder() .set(Options.READ_TIMEOUT, regularTimeouts) .set(Options.WRITE_TIMEOUT, regularTimeouts) .set(UndertowOptions.WEB_SOCKETS_READ_TIMEOUT, wsReadTimeout) - .set(UndertowOptions.WEB_SOCKETS_WRITE_TIMEOUT, wsWriteTimeout).getMap()); - DefaultServer.startServer(); + .set(UndertowOptions.WEB_SOCKETS_WRITE_TIMEOUT, wsWriteTimeout).getMap());*/ SCHEDULER = Executors.newScheduledThreadPool(2); } - @After - public void afterTest() { - DefaultServer.stopServer(); - DefaultServer.setServerOptions(OptionMap.EMPTY); - DefaultServer.setUndertowOptions(OptionMap.EMPTY); + @DefaultServer.AfterServerStops + public static void afterTest() { SCHEDULER.shutdown(); + DefaultServer.setServerOptions(OptionMap.EMPTY); } - protected static final int TESTABLE_TIMEOUT_VALUE = 2000; - protected static final int NON_TESTABLE_TIMEOUT_VALUE = 30180; - protected static final int DEFAULTS_IO_TIMEOTU_VALUE = 500; - private ScheduledExecutorService SCHEDULER; - protected WebSocketVersion getVersion() { return WebSocketVersion.V13; } - @Test public void testServerReadTimeout() throws Exception { - beforeTest(DEFAULTS_IO_TIMEOTU_VALUE, TESTABLE_TIMEOUT_VALUE, NON_TESTABLE_TIMEOUT_VALUE); final AtomicBoolean connected = new AtomicBoolean(false); - DefaultServer.setRootHandler(new WebSocketProtocolHandshakeHandler(new WebSocketConnectionCallback() { - @Override - public void onConnect(final WebSocketHttpExchange exchange, final WebSocketChannel channel) { - connected.set(true); - channel.getReceiveSetter().set(new AbstractReceiveListener() { - @Override - protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) throws IOException { - String string = message.getData(); - - if (string.equals("hello")) { - WebSockets.sendText("world", channel, null); - } else { - WebSockets.sendText(string, channel, null); + DefaultServer.setRootHandler(new WebSocketProtocolHandshakeHandler( + (WebSocketConnectionCallback) (exchange, channel) -> { + connected.set(true); + channel.getReceiveSetter().set(new AbstractReceiveListener() { + @Override + protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage message) throws IOException { + String string = message.getData(); + + if (string.equals("hello")) { + WebSockets.sendText("world", channel, null); + } else { + WebSockets.sendText(string, channel, null); + } } - } - }); - channel.resumeReceives(); - } - })); + }); + channel.resumeReceives(); + })); final FutureResult latch = new FutureResult(); WebSocketTestClient client = new WebSocketTestClient(getVersion(), new URI("ws://" + NetworkUtils.formatPossibleIpv6Address(DefaultServer.getHostAddress("default")) + ":" + DefaultServer.getHostPort("default") + "/")); @@ -121,7 +111,7 @@ protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage m latch.getIoFuture().get(); final long watchStart = System.currentTimeMillis(); - final long watchTimeout = System.currentTimeMillis()+TESTABLE_TIMEOUT_VALUE+500; + final long watchTimeout = System.currentTimeMillis() + TESTABLE_TIMEOUT_VALUE + 500; final FutureResult timeoutLatch = new FutureResult(); ReadTimeoutChannelGuard readTimeoutChannelGuard = new ReadTimeoutChannelGuard(client, timeoutLatch, watchTimeout); @@ -130,17 +120,17 @@ protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage m final Long watchTimeEnd = timeoutLatch.getIoFuture().get(); if(watchTimeEnd == -1) { - Assert.fail("Timeout did not happen... in time. Were waiting '"+watchTimeout+"' ms, timeout should happen in '"+TESTABLE_TIMEOUT_VALUE+"' ms."); + Assert.fail("Timeout did not happen... in time. Were waiting '" + watchTimeout + "' ms, timeout should happen in '" + TESTABLE_TIMEOUT_VALUE + "' ms."); } else { long timeSpent = watchTimeEnd - watchStart; - //lets be generous and give 150ms diff( there is "fuzz" coded for 50ms in undertow as well - if(!(timeSpent<=TESTABLE_TIMEOUT_VALUE+150)) { - Assert.fail("Timeout did not happen... in time. Socket timeout out in '"+timeSpent+"' ms, supposed to happen in '"+TESTABLE_TIMEOUT_VALUE+"' ms."); + //let's be generous and give 150ms diff( there is "fuzz" coded for 50ms in undertow as well + if(!(timeSpent <= TESTABLE_TIMEOUT_VALUE + 150)) { + Assert.fail("Timeout did not happen... in time. Socket timeout out in '" + timeSpent + "' ms, supposed to happen in '" + TESTABLE_TIMEOUT_VALUE + "' ms."); } } } - private static class ReadTimeoutChannelGuard implements Runnable{ + private static class ReadTimeoutChannelGuard implements Runnable { private final WebSocketTestClient channel; private final FutureResult resultHandler; private final long watchEnd; @@ -159,7 +149,7 @@ public void setTaskScheduledFuture(ScheduledFuture sf2) { @Override public void run() { - if(System.currentTimeMillis() > watchEnd) { + if (System.currentTimeMillis() > watchEnd) { sf.cancel(false); if(channelActive()) { resultHandler.setResult(new Long(-1)); From f2d44241b84a0219ffa03b5e9148787070bec75b Mon Sep 17 00:00:00 2001 From: Flavia Rainone Date: Sun, 20 Oct 2024 12:20:52 -0300 Subject: [PATCH 04/30] [UNDERTOW-2518] At WebSocketTimeoutTestCase, use a longer watch timeout to verify if the timeout is working, and increase the interval of tolerance for the time spent This may work better on slower machines when running this test. Signed-off-by: Flavia Rainone --- .../websockets/core/protocol/WebSocketTimeoutTestCase.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/io/undertow/websockets/core/protocol/WebSocketTimeoutTestCase.java b/core/src/test/java/io/undertow/websockets/core/protocol/WebSocketTimeoutTestCase.java index 08a4573550..b6ed4dd8c0 100644 --- a/core/src/test/java/io/undertow/websockets/core/protocol/WebSocketTimeoutTestCase.java +++ b/core/src/test/java/io/undertow/websockets/core/protocol/WebSocketTimeoutTestCase.java @@ -111,7 +111,7 @@ protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage m latch.getIoFuture().get(); final long watchStart = System.currentTimeMillis(); - final long watchTimeout = System.currentTimeMillis() + TESTABLE_TIMEOUT_VALUE + 500; + final long watchTimeout = System.currentTimeMillis() + TESTABLE_TIMEOUT_VALUE + 1000; final FutureResult timeoutLatch = new FutureResult(); ReadTimeoutChannelGuard readTimeoutChannelGuard = new ReadTimeoutChannelGuard(client, timeoutLatch, watchTimeout); @@ -124,7 +124,7 @@ protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage m } else { long timeSpent = watchTimeEnd - watchStart; //let's be generous and give 150ms diff( there is "fuzz" coded for 50ms in undertow as well - if(!(timeSpent <= TESTABLE_TIMEOUT_VALUE + 150)) { + if(!(timeSpent <= TESTABLE_TIMEOUT_VALUE + 250)) { Assert.fail("Timeout did not happen... in time. Socket timeout out in '" + timeSpent + "' ms, supposed to happen in '" + TESTABLE_TIMEOUT_VALUE + "' ms."); } } From ab5ec01bc414fecc9ea7c4bd91e0f2df17d040f9 Mon Sep 17 00:00:00 2001 From: Flavia Rainone Date: Wed, 16 Oct 2024 05:49:17 -0300 Subject: [PATCH 05/30] Revert "[UNDERTOW-2279] Re-enable LotsOfHeadersResponseTestCase in Windows" The tests were disabled in CI by mistake and this prevented us seeing that those tests still need to be fixed so they can work in Windows This reverts commit abc70e2601b69cc28d167d129bc71e29ec566cbe. --- .../undertow/server/handlers/LotsOfHeadersResponseTestCase.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/java/io/undertow/server/handlers/LotsOfHeadersResponseTestCase.java b/core/src/test/java/io/undertow/server/handlers/LotsOfHeadersResponseTestCase.java index b8d7927127..1d7d3d3b12 100644 --- a/core/src/test/java/io/undertow/server/handlers/LotsOfHeadersResponseTestCase.java +++ b/core/src/test/java/io/undertow/server/handlers/LotsOfHeadersResponseTestCase.java @@ -67,6 +67,8 @@ public void handleRequest(final HttpServerExchange exchange) { @Test public void testLotsOfHeadersInResponse() throws IOException { + // FIXME UNDERTOW-2279 + Assume.assumeFalse(System.getProperty("os.name").startsWith("Windows")); TestHttpClient client = new TestHttpClient(); try { HttpGet get = new HttpGet(DefaultServer.getDefaultServerURL() + "/path"); From e7be76bbc5d2e1c8dcc3a519ea14d67e4a2a35ad Mon Sep 17 00:00:00 2001 From: xjusko Date: Tue, 3 Sep 2024 17:13:24 +0200 Subject: [PATCH 06/30] [UNDERTOW-2404] Add default sorting by type and name in directory listing view; enable clickable name and size column headers for custom sorting --- .../handlers/resource/DirectoryUtils.java | 174 ++++++++++++++---- .../file/FileHandlerIndexTestCase.java | 5 +- 2 files changed, 140 insertions(+), 39 deletions(-) diff --git a/core/src/main/java/io/undertow/server/handlers/resource/DirectoryUtils.java b/core/src/main/java/io/undertow/server/handlers/resource/DirectoryUtils.java index 9fd3212090..cfdc0523f7 100644 --- a/core/src/main/java/io/undertow/server/handlers/resource/DirectoryUtils.java +++ b/core/src/main/java/io/undertow/server/handlers/resource/DirectoryUtils.java @@ -39,8 +39,14 @@ import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.text.SimpleDateFormat; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.FormatStyle; +import java.util.ArrayList; +import java.util.Comparator; import java.util.Date; +import java.util.List; import java.util.Locale; import java.util.Map; @@ -109,32 +115,72 @@ public static StringBuilder renderDirectoryListing(final HttpServerExchange exch path += "/"; } - String relative = null; + String relative = determineRelativePath(exchange, path); + + String sortColumn = "name"; + String currentSortOrder = "asc"; + if (exchange != null) { - final Map context = exchange.getAttachment(Predicate.PREDICATE_CONTEXT); - if (context != null) { - final PathPrefixPredicate.PathPrefixMatchRecord trans = (PathPrefixMatchRecord) context - .get(PathPrefixPredicate.PREFIX_MATCH_RECORD); - if (trans != null) { - if (trans.isOverWritten()) { - relative = trans.getPrefix(); - if (!relative.endsWith("/") && !path.startsWith("/")) { - relative += "/"; - } - } - } + if (exchange.getQueryParameters().get("sort") != null) { + sortColumn = exchange.getQueryParameters().get("sort").getFirst(); + } + if (exchange.getQueryParameters().get("order") != null) { + currentSortOrder = exchange.getQueryParameters().get("order").getFirst(); } } - StringBuilder builder = new StringBuilder(); - builder.append("\n\n\n") - .append("\n\n"); - builder.append("\n\n\n"); - builder.append("\n") - .append("\n\n") - .append("\n\n\n\n"); + String newSortOrder = "asc".equals(currentSortOrder) ? "desc" : "asc"; + String sortUrl = relative == null ? path : relative + path; + + StringBuilder builder = buildDirectoryListingTable(sortUrl, sortColumn, newSortOrder); int state = 0; + String parent = getParentPath(path, state); + + int i = 0; + if (parent != null) { + i++; + appendParentDirectory(resource, builder, relative, parent); + } + + List directories = new ArrayList<>(); + List files = new ArrayList<>(); + separateDirectoriesAndFiles(resource, directories, files); + + Comparator comparator = getComparator(sortColumn, currentSortOrder); + directories.sort(comparator); + files.sort(comparator); + + appendDirectories(directories, builder, i, sortUrl); + appendFiles(files, builder, i, sortUrl); + + builder.append("\n
Directory Listing - ").append(relative == null ? path : relative + path).append("
NameLast ModifiedSize
Powered by Undertow
\n\n"); + + return builder; + + } + + private static String formatLastModified(Date lastModified) { + if (lastModified == null) { + return "-"; + } + ZonedDateTime lastModifiedTime = ZonedDateTime.ofInstant( + lastModified.toInstant(), + ZoneId.systemDefault() + ); + DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.MEDIUM) + .withLocale(Locale.getDefault()); + + return formatter.format(lastModifiedTime); + } + + private static void appendParentDirectory(Resource resource, StringBuilder builder, String relative, String parent) { + builder.append("[..]"); + builder.append(formatLastModified(resource.getLastModified())) + .append("--\n"); + } + + private static String getParentPath(String path, int state) { String parent = null; if(path.length() > 1) { for (int i = path.length() - 1; i >= 0; i--) { @@ -154,33 +200,89 @@ public static StringBuilder renderDirectoryListing(final HttpServerExchange exch parent = "/"; } } + return parent; + } - SimpleDateFormat format = new SimpleDateFormat("MMM dd, yyyy HH:mm:ss", Locale.US); - int i = 0; - if (parent != null) { - i++; - builder.append("[..]"); - builder.append(format.format((resource.getLastModified() == null ? new Date(0L) : resource.getLastModified()))) + private static void appendFiles(List files, StringBuilder builder, int i, String sortUrl) { + for (Resource entry : files) { + builder.append("") + .append(entry.getName()).append("") + .append(formatLastModified(entry.getLastModified())) + .append(""); + formatSize(builder, entry.getContentLength()); + builder.append("\n"); + } + } + + private static void appendDirectories(List directories, StringBuilder builder, int i, String sortUrl) { + for (Resource entry : directories) { + builder.append("") + .append(entry.getName()).append("") + .append(formatLastModified(entry.getLastModified())) .append("--\n"); } + } + private static Comparator getComparator(String sortColumn, String currentSortOrder) { + Comparator comparator; + if ("lastModified".equals(sortColumn)) { + comparator = Comparator.comparing( + entry -> (entry.getLastModified() == null) ? new Date(0L) : entry.getLastModified() + ); + } else { + comparator = Comparator.comparing(Resource::getName); + } + + if ("desc".equals(currentSortOrder)) { + comparator = comparator.reversed(); + } + return comparator; + } + + private static void separateDirectoriesAndFiles(Resource resource, List directories, List files) { for (Resource entry : resource.list()) { - builder.append("").append(entry.getName()).append(""); - builder.append(format.format((entry.getLastModified() == null) ? new Date(0L) : entry.getLastModified())) - .append(""); if (entry.isDirectory()) { - builder.append("--"); + directories.add(entry); } else { - formatSize(builder, entry.getContentLength()); + files.add(entry); } - builder.append("\n"); } - builder.append("\n\n\n"); + } + private static StringBuilder buildDirectoryListingTable(String sortUrl, String sortColumn, String newSortOrder) { + StringBuilder builder = new StringBuilder(); + builder.append("\n\n\n") + .append("\n\n"); + builder.append("\n\n\n"); + builder.append("\n") + .append("") + .append("") + .append("") + .append("\n\n"); + builder.append("\n\n\n\n"); return builder; + } + private static String determineRelativePath(HttpServerExchange exchange, String path) { + String relative = null; + if (exchange != null) { + final Map context = exchange.getAttachment(Predicate.PREDICATE_CONTEXT); + if (context != null) { + final PathPrefixMatchRecord trans = (PathPrefixMatchRecord) context + .get(PathPrefixPredicate.PREFIX_MATCH_RECORD); + if (trans != null) { + if (trans.isOverWritten()) { + relative = trans.getPrefix(); + if (!relative.endsWith("/") && !path.startsWith("/")) { + relative += "/"; + } + } + } + } + } + return relative; } public static void renderDirectoryListing(HttpServerExchange exchange, Resource resource) { diff --git a/core/src/test/java/io/undertow/server/handlers/file/FileHandlerIndexTestCase.java b/core/src/test/java/io/undertow/server/handlers/file/FileHandlerIndexTestCase.java index 101aea2138..3de1f1ab9f 100644 --- a/core/src/test/java/io/undertow/server/handlers/file/FileHandlerIndexTestCase.java +++ b/core/src/test/java/io/undertow/server/handlers/file/FileHandlerIndexTestCase.java @@ -24,7 +24,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.text.SimpleDateFormat; -import java.util.Date; import java.util.Locale; import io.undertow.server.handlers.CanonicalPathHandler; @@ -103,9 +102,9 @@ public void testDirectoryIndex() throws IOException, URISyntaxException { Assert.assertEquals("text/html; charset=UTF-8", headers[0].getValue()); Assert.assertTrue(response, response.contains("page.html")); Assert.assertTrue(response, response.contains("tmp2")); - // All invalid symlinks have their date set to epoch + // All invalid symlinks have their date set to "-" SimpleDateFormat format = new SimpleDateFormat("MMM dd, yyyy HH:mm:ss", Locale.US); - Assert.assertTrue(response, response.contains(format.format((new Date(0L))))); + Assert.assertTrue(response, response.contains("-")); } finally { client.getConnectionManager().shutdown(); if (badSymlink != null) { From c05e8eba5289ecb84ac6b3ca625db9091a87fc7b Mon Sep 17 00:00:00 2001 From: chalsuligesriniv Date: Fri, 4 Oct 2024 12:55:25 -0700 Subject: [PATCH 07/30] [UNDERTOW-2462] Adding default constant for ALLOW_ENCODED_SLASH --- core/src/main/java/io/undertow/UndertowOptions.java | 7 ++++++- core/src/main/java/io/undertow/util/URLUtils.java | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/io/undertow/UndertowOptions.java b/core/src/main/java/io/undertow/UndertowOptions.java index 60fb51fade..ebde0ca5b5 100644 --- a/core/src/main/java/io/undertow/UndertowOptions.java +++ b/core/src/main/java/io/undertow/UndertowOptions.java @@ -115,13 +115,18 @@ public class UndertowOptions { */ public static final Option MAX_COOKIES = Option.simple(UndertowOptions.class, "MAX_COOKIES", Integer.class); + /** + * Default value of {@link #ALLOW_ENCODED_SLASH} option. + */ + public static final Boolean DEFAULT_ALLOW_ENCODED_SLASH = Boolean.FALSE; + /** * If a request comes in with encoded / characters (i.e. %2F), will these be decoded. *

* This can cause security problems if a front end proxy does not perform the same decoding, and as a result * this is disabled by default. *

- * Defaults to false + * Defaults to {@link #DEFAULT_ALLOW_ENCODED_SLASH} *

* See CVE-2007-0450 * @deprecated - this option was interpreted improperly. diff --git a/core/src/main/java/io/undertow/util/URLUtils.java b/core/src/main/java/io/undertow/util/URLUtils.java index 18d49b3050..56c20a0bcc 100644 --- a/core/src/main/java/io/undertow/util/URLUtils.java +++ b/core/src/main/java/io/undertow/util/URLUtils.java @@ -354,7 +354,7 @@ public static boolean isAbsoluteUrl(String location) { } public static boolean getSlashDecodingFlag(final OptionMap options) { - final boolean allowEncodedSlash = options.get(UndertowOptions.ALLOW_ENCODED_SLASH, false); + final boolean allowEncodedSlash = options.get(UndertowOptions.ALLOW_ENCODED_SLASH, UndertowOptions.DEFAULT_ALLOW_ENCODED_SLASH); final Boolean decodeSlash = options.get(UndertowOptions.DECODE_SLASH); return getSlashDecodingFlag(allowEncodedSlash, decodeSlash); } From 18afeb23cd9c9fc71d42f6b0f86368cb046f0188 Mon Sep 17 00:00:00 2001 From: lvydra Date: Thu, 17 Oct 2024 12:22:17 +0200 Subject: [PATCH 08/30] [UNDERTOW-2505] Review anonymous classes in Undertow io.undertow.websockets.jsr.test.suspendresume --- .../suspendresume/SuspendResumeTestCase.java | 50 +++++++++++-------- 1 file changed, 30 insertions(+), 20 deletions(-) diff --git a/websockets-jsr/src/test/java/io/undertow/websockets/jsr/test/suspendresume/SuspendResumeTestCase.java b/websockets-jsr/src/test/java/io/undertow/websockets/jsr/test/suspendresume/SuspendResumeTestCase.java index 23907d2465..26eeb24956 100644 --- a/websockets-jsr/src/test/java/io/undertow/websockets/jsr/test/suspendresume/SuspendResumeTestCase.java +++ b/websockets-jsr/src/test/java/io/undertow/websockets/jsr/test/suspendresume/SuspendResumeTestCase.java @@ -109,26 +109,7 @@ public void testConnectionWaitsForMessageEnd() throws Exception { final AtomicReference message = new AtomicReference<>(); WebSocketChannel channel = WebSocketClient.connectionBuilder(DefaultServer.getWorker(), DefaultServer.getBufferPool(), new URI(DefaultServer.getDefaultServerURL() + "/")) .connect().get(); - channel.getReceiveSetter().set(new AbstractReceiveListener() { - @Override - protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage msg) { - message.set(msg.getData()); - done.countDown(); - } - - @Override - protected void onError(WebSocketChannel channel, Throwable error) { - error.printStackTrace(); - message.set("error"); - done.countDown(); - } - - @Override - protected void onFullCloseMessage(WebSocketChannel channel, BufferedBinaryMessage message) { - message.getData().free(); - done.countDown(); - } - }); + channel.getReceiveSetter().set(new ReceiveListener(message, done)); channel.resumeReceives(); Assert.assertTrue(channel.isOpen()); WebSockets.sendText("Hello World", channel, null); @@ -194,4 +175,33 @@ public void testRejectWhenSuspended() throws Exception { } } + + private static class ReceiveListener extends AbstractReceiveListener { + private final CountDownLatch done; + private final AtomicReference message; + + ReceiveListener(AtomicReference message, CountDownLatch done) { + this.message = message; + this.done = done; + } + + @Override + protected void onFullTextMessage(WebSocketChannel channel, BufferedTextMessage msg) { + message.set(msg.getData()); + done.countDown(); + } + + @Override + protected void onError(WebSocketChannel channel, Throwable error) { + error.printStackTrace(); + message.set("error"); + done.countDown(); + } + + @Override + protected void onFullCloseMessage(WebSocketChannel channel, BufferedBinaryMessage message) { + message.getData().free(); + done.countDown(); + } + } } From 02136bebb3b0b7f1afb5d6084f6e38c754550b34 Mon Sep 17 00:00:00 2001 From: baranowb Date: Fri, 18 Oct 2024 11:27:45 +0200 Subject: [PATCH 09/30] [UNDERTOW-2509] Add 413 response code to overflowing multipart in default handler --- .../form/MultiPartParserDefinition.java | 4 + .../form/MultipartFormDataParserTestCase.java | 94 +++++++++++++++---- 2 files changed, 82 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/io/undertow/server/handlers/form/MultiPartParserDefinition.java b/core/src/main/java/io/undertow/server/handlers/form/MultiPartParserDefinition.java index 2a78e203b6..66269c64fd 100644 --- a/core/src/main/java/io/undertow/server/handlers/form/MultiPartParserDefinition.java +++ b/core/src/main/java/io/undertow/server/handlers/form/MultiPartParserDefinition.java @@ -480,6 +480,10 @@ public void handleEvent(StreamSourceChannel channel) { pooled.close(); } + } catch(FileTooLargeException e) { + UndertowLogger.REQUEST_IO_LOGGER.debug("Exception parsing data", e); + exchange.setStatusCode(StatusCodes.REQUEST_ENTITY_TOO_LARGE); + exchange.endExchange(); } catch (Throwable e) { UndertowLogger.REQUEST_IO_LOGGER.debug("Exception parsing data", e); exchange.setStatusCode(StatusCodes.INTERNAL_SERVER_ERROR); diff --git a/core/src/test/java/io/undertow/server/handlers/form/MultipartFormDataParserTestCase.java b/core/src/test/java/io/undertow/server/handlers/form/MultipartFormDataParserTestCase.java index c6ab4c792b..7d6d02bce4 100644 --- a/core/src/test/java/io/undertow/server/handlers/form/MultipartFormDataParserTestCase.java +++ b/core/src/test/java/io/undertow/server/handlers/form/MultipartFormDataParserTestCase.java @@ -49,6 +49,7 @@ import io.undertow.server.HttpHandler; import io.undertow.server.HttpServerExchange; import io.undertow.server.handlers.BlockingHandler; +import io.undertow.server.handlers.form.MultiPartParserDefinition.FileTooLargeException; import io.undertow.testutils.DefaultServer; import io.undertow.testutils.HttpClientUtils; import io.undertow.testutils.TestHttpClient; @@ -198,33 +199,44 @@ public void testFileUploadWithEagerParsingAndNonASCIIFilename() throws Exception client.getConnectionManager().shutdown(); } } - private static HttpHandler createInMemoryReadingHandler(final long fileSizeThreshold) { + return createInMemoryReadingHandler(fileSizeThreshold, -1, null); + } + + private static HttpHandler createInMemoryReadingHandler(final long fileSizeThreshold, final long maxInvidualFileThreshold, final HttpHandler async) { return new HttpHandler() { @Override public void handleRequest(final HttpServerExchange exchange) throws Exception { MultiPartParserDefinition multiPartParserDefinition = new MultiPartParserDefinition(); multiPartParserDefinition.setFileSizeThreshold(fileSizeThreshold); + multiPartParserDefinition.setMaxIndividualFileSize(maxInvidualFileThreshold); final FormDataParser parser = FormParserFactory.builder(false) .addParsers(new FormEncodedDataDefinition(), multiPartParserDefinition) .build().createParser(exchange); - try { - FormData data = parser.parseBlocking(); - exchange.setStatusCode(StatusCodes.INTERNAL_SERVER_ERROR); - if (data.getFirst("formValue").getValue().equals("myValue")) { - FormData.FormValue file = data.getFirst("file"); - if (file.isFileItem()) { - exchange.setStatusCode(StatusCodes.OK); - logResult(exchange, file.getFileItem().isInMemory(), file.getFileName(), stream2String(file)); + if (async == null) { + try { + FormData data = parser.parseBlocking(); + exchange.setStatusCode(StatusCodes.INTERNAL_SERVER_ERROR); + if (data.getFirst("formValue").getValue().equals("myValue")) { + FormData.FormValue file = data.getFirst("file"); + if (file.isFileItem()) { + exchange.setStatusCode(StatusCodes.OK); + logResult(exchange, file.getFileItem().isInMemory(), file.getFileName(), stream2String(file)); + } } + exchange.endExchange(); + } catch (FileTooLargeException e) { + exchange.setStatusCode(StatusCodes.REQUEST_ENTITY_TOO_LARGE); + exchange.endExchange(); + } catch (Throwable e) { + e.printStackTrace(); + exchange.setStatusCode(StatusCodes.INTERNAL_SERVER_ERROR); + exchange.endExchange(); + } finally { + IoUtils.safeClose(parser); } - exchange.endExchange(); - } catch (Throwable e) { - e.printStackTrace(); - exchange.setStatusCode(StatusCodes.INTERNAL_SERVER_ERROR); - exchange.endExchange(); - } finally { - IoUtils.safeClose(parser); + } else { + parser.parse(async); } } @@ -373,6 +385,56 @@ public void testLargeContentWithoutFileNameWithSmallFileSizeThreshold() throws E } } + @Test + public void testFileUploadWithFileSizeThresholdOverflow_Sync() throws Exception { + DefaultServer.setRootHandler(new BlockingHandler(createInMemoryReadingHandler(10, 1, null))); + + TestHttpClient client = new TestHttpClient(); + try { + + HttpPost post = new HttpPost(DefaultServer.getDefaultServerURL() + "/path"); + MultipartEntity entity = new MultipartEntity(HttpMultipartMode.BROWSER_COMPATIBLE); + + entity.addPart("formValue", new StringBody("myValue", "text/plain", StandardCharsets.UTF_8)); + entity.addPart("file", new FileBody(new File(MultipartFormDataParserTestCase.class.getResource("uploadfile.txt").getFile()))); + + post.setEntity(entity); + HttpResponse result = client.execute(post); + Assert.assertEquals(StatusCodes.REQUEST_ENTITY_TOO_LARGE, result.getStatusLine().getStatusCode()); + + } finally { + client.getConnectionManager().shutdown(); + } + } + + @Test + public void testFileUploadWithFileSizeThresholdOverflow_ASync() throws Exception { + DefaultServer.setRootHandler(new BlockingHandler(createInMemoryReadingHandler(10, 1, new HttpHandler() { + + @Override + public void handleRequest(HttpServerExchange exchange) throws Exception { + throw new Exception(); + } + }))); + + TestHttpClient client = new TestHttpClient(); + try { + + HttpPost post = new HttpPost(DefaultServer.getDefaultServerURL() + "/path"); + MultipartEntity entity = new MultipartEntity(HttpMultipartMode.BROWSER_COMPATIBLE); + + entity.addPart("formValue", new StringBody("myValue", "text/plain", StandardCharsets.UTF_8)); + entity.addPart("file", new FileBody(new File(MultipartFormDataParserTestCase.class.getResource("uploadfile.txt").getFile()))); + + post.setEntity(entity); + HttpResponse result = client.execute(post); + Assert.assertEquals(StatusCodes.REQUEST_ENTITY_TOO_LARGE, result.getStatusLine().getStatusCode()); + + } finally { + client.getConnectionManager().shutdown(); + } + } + private void writeLargeFileContent(File file, int size) throws IOException { int textLength = "content".getBytes().length; FileOutputStream fos = new FileOutputStream(file); From 9012f0707bf222597a0eea4d17e4b47707c1cdad Mon Sep 17 00:00:00 2001 From: lvydra Date: Fri, 18 Oct 2024 10:04:52 +0200 Subject: [PATCH 10/30] [UNDERTOW-2504] Review anonymous classes in Undertow io.undertow.websockets.jsr.test.security --- .../security/WebsocketBasicAuthTestCase.java | 42 ++++++++++--------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/websockets-jsr/src/test/java/io/undertow/websockets/jsr/test/security/WebsocketBasicAuthTestCase.java b/websockets-jsr/src/test/java/io/undertow/websockets/jsr/test/security/WebsocketBasicAuthTestCase.java index a155c164bf..0497723a63 100644 --- a/websockets-jsr/src/test/java/io/undertow/websockets/jsr/test/security/WebsocketBasicAuthTestCase.java +++ b/websockets-jsr/src/test/java/io/undertow/websockets/jsr/test/security/WebsocketBasicAuthTestCase.java @@ -43,7 +43,6 @@ import jakarta.websocket.ContainerProvider; import jakarta.websocket.Endpoint; import jakarta.websocket.EndpointConfig; -import jakarta.websocket.MessageHandler; import jakarta.websocket.OnOpen; import jakarta.websocket.Session; import jakarta.websocket.server.ServerEndpoint; @@ -147,12 +146,7 @@ public static void cleanup() throws ServletException { @Test public void testAuthenticatedWebsocket() throws Exception { ProgramaticClientEndpoint endpoint = new ProgramaticClientEndpoint(); - ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().configurator(new ClientConfigurator(){ - @Override - public void beforeRequest(Map> headers) { - headers.put(AUTHORIZATION.toString(), Collections.singletonList(BASIC + " " + FlexBase64.encodeString("user1:password1".getBytes(), false))); - } - }).build(); + ClientEndpointConfig clientEndpointConfig = ClientEndpointConfig.Builder.create().configurator(new CustomClientConfigurator()).build(); ContainerProvider.getWebSocketContainer().connectToServer(endpoint, clientEndpointConfig, new URI("ws://" + DefaultServer.getHostAddress("default") + ":" + DefaultServer.getHostPort("default") + "/servletContext/secured")); assertEquals("user1", endpoint.getResponses().poll(15, TimeUnit.SECONDS)); endpoint.session.close(); @@ -179,13 +173,7 @@ public static class ProgramaticClientEndpoint extends Endpoint { @Override public void onOpen(Session session, EndpointConfig config) { this.session = session; - session.addMessageHandler(new MessageHandler.Whole() { - - @Override - public void onMessage(String message) { - responses.add(message); - } - }); + session.addMessageHandler(String.class, (message) -> responses.add(message)); } @Override @@ -217,12 +205,7 @@ public void init(FilterConfig filterConfig) { @Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException { - filterChain.doFilter(new HttpServletRequestWrapper((HttpServletRequest) servletRequest) { - @Override - public Principal getUserPrincipal() { - return () -> "wrapped"; - } - }, servletResponse); + filterChain.doFilter(new ServletRequestWrapper((HttpServletRequest) servletRequest), servletResponse); } @Override @@ -231,4 +214,23 @@ public void destroy() { } } + private static class ServletRequestWrapper extends HttpServletRequestWrapper { + + ServletRequestWrapper(HttpServletRequest request) { + super(request); + } + + @Override + public Principal getUserPrincipal() { + return () -> "wrapped"; + } + } + + private static class CustomClientConfigurator extends ClientConfigurator { + + @Override + public void beforeRequest(Map> headers) { + headers.put(AUTHORIZATION.toString(), Collections.singletonList(BASIC + " " + FlexBase64.encodeString("user1:password1".getBytes(), false))); + } + } } From 9d80a6e0a3a0f4a8d3852ba117da077f7a5b3d6e Mon Sep 17 00:00:00 2001 From: baranowb Date: Thu, 31 Oct 2024 10:39:18 +0100 Subject: [PATCH 11/30] [UNDERTOW-2356] Add logger method --- core/src/main/java/io/undertow/UndertowLogger.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/java/io/undertow/UndertowLogger.java b/core/src/main/java/io/undertow/UndertowLogger.java index 7fd2147acd..4f29c03a10 100644 --- a/core/src/main/java/io/undertow/UndertowLogger.java +++ b/core/src/main/java/io/undertow/UndertowLogger.java @@ -488,4 +488,8 @@ void nodeConfigCreated(URI connectionURI, String balancer, String domain, String @LogMessage(level = WARN) @Message(id = 5107, value = "Failed to set web socket timeout.") void failedToSetWSTimeout(@Cause Exception e); + + @LogMessage(level = WARN) + @Message(id = 5108, value = "Failed to transition to '%s' state in '%s'.") + void failedToTransitionToState(String state, Object src); } \ No newline at end of file From 6f665d1a89c4cc473752b7a72c569533705821e3 Mon Sep 17 00:00:00 2001 From: baranowb Date: Thu, 31 Oct 2024 10:39:44 +0100 Subject: [PATCH 12/30] [UNDERTOW-2356] AjpClientConnection --- .../client/ajp/AjpClientConnection.java | 35 +++++++++++-------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/core/src/main/java/io/undertow/client/ajp/AjpClientConnection.java b/core/src/main/java/io/undertow/client/ajp/AjpClientConnection.java index 899d25a450..83433b41d1 100644 --- a/core/src/main/java/io/undertow/client/ajp/AjpClientConnection.java +++ b/core/src/main/java/io/undertow/client/ajp/AjpClientConnection.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import io.undertow.client.ClientStatistics; import org.jboss.logging.Logger; @@ -103,23 +104,30 @@ public void handleEvent(AjpClientResponseStreamSourceChannel channel) { private static final int CLOSE_REQ = 1 << 30; private static final int CLOSED = 1 << 31; - private int state; + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(AjpClientConnection.class, "state"); private final ChannelListener.SimpleSetter closeSetter = new ChannelListener.SimpleSetter<>(); private final ClientStatistics clientStatistics; private final List> closeListeners = new CopyOnWriteArrayList<>(); - AjpClientConnection(final AjpClientChannel connection, final OptionMap options, final ByteBufferPool bufferPool, ClientStatistics clientStatistics) { + AjpClientConnection(final AjpClientChannel channel, final OptionMap options, final ByteBufferPool bufferPool, ClientStatistics clientStatistics) { this.clientStatistics = clientStatistics; this.options = options; - this.connection = connection; + this.connection = channel; this.bufferPool = bufferPool; - connection.addCloseTask(new ChannelListener() { + channel.addCloseTask(new ChannelListener() { @Override public void handleEvent(AjpClientChannel channel) { log.debugf("connection to %s closed", getPeerAddress()); - AjpClientConnection.this.state |= CLOSED; + final int oldVal = stateUpdater.getAndAccumulate(AjpClientConnection.this, CLOSED, (currentState, flag)-> currentState | flag); + if(anyAreSet(oldVal, CLOSED)) { + //this was closed already? + UndertowLogger.ROOT_LOGGER.failedToTransitionToState("CLOSED", AjpClientConnection.this); + return; + } ChannelListeners.invokeChannelListener(AjpClientConnection.this, closeSetter.get()); for(ChannelListener listener : closeListeners) { listener.handleEvent(AjpClientConnection.this); @@ -135,8 +143,8 @@ public void handleEvent(AjpClientChannel channel) { } } }); - connection.getReceiveSetter().set(new ClientReceiveListener()); - connection.resumeReceives(); + channel.getReceiveSetter().set(new ClientReceiveListener()); + channel.resumeReceives(); } @Override @@ -236,7 +244,7 @@ public void sendRequest(final ClientRequest request, final ClientCallback currentState | flag); } } else if (request.getProtocol() != Protocols.HTTP_1_1) { - state |= CLOSE_REQ; + stateUpdater.getAndAccumulate(AjpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag); } if (request.getRequestHeaders().contains(UPGRADE)) { - state |= UPGRADE_REQUESTED; + stateUpdater.getAndAccumulate(AjpClientConnection.this, UPGRADE_REQUESTED, (currentState, flag)-> currentState | flag); } long length = 0; @@ -327,7 +335,7 @@ public void close() throws IOException { if (anyAreSet(state, CLOSED)) { return; } - state |= CLOSED | CLOSE_REQ; + stateUpdater.accumulateAndGet(this, CLOSED | CLOSE_REQ, (currentState, flag)-> currentState | flag); connection.close(); } @@ -336,7 +344,6 @@ public void close() throws IOException { */ public void requestDone() { currentRequest = null; - if (anyAreSet(state, CLOSE_REQ)) { safeClose(connection); } else if (anyAreSet(state, UPGRADE_REQUESTED)) { @@ -352,7 +359,7 @@ public void requestDone() { } public void requestClose() { - state |= CLOSE_REQ; + stateUpdater.getAndAccumulate(AjpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag); } From 04b2ffaac8bc56f7fc7df06f8c1a8af1c6ba5235 Mon Sep 17 00:00:00 2001 From: baranowb Date: Thu, 31 Oct 2024 10:54:11 +0100 Subject: [PATCH 13/30] [UNDERTOW-2356] AjpClientExchange --- .../undertow/client/ajp/AjpClientExchange.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/io/undertow/client/ajp/AjpClientExchange.java b/core/src/main/java/io/undertow/client/ajp/AjpClientExchange.java index 53536882b6..f6958deeab 100644 --- a/core/src/main/java/io/undertow/client/ajp/AjpClientExchange.java +++ b/core/src/main/java/io/undertow/client/ajp/AjpClientExchange.java @@ -35,6 +35,7 @@ import org.xnio.channels.StreamSourceChannel; import java.io.IOException; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static org.xnio.Bits.anyAreSet; @@ -58,7 +59,9 @@ class AjpClientExchange extends AbstractAttachable implements ClientExchange { private AjpClientResponseStreamSourceChannel responseChannel; private AjpClientRequestClientStreamSinkChannel requestChannel; - private int state = 0; + @SuppressWarnings("unused") + private volatile int state = 0; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(AjpClientExchange.class, "state"); private static final int REQUEST_TERMINATED = 1; private static final int RESPONSE_TERMINATED = 1 << 1; @@ -78,9 +81,9 @@ class AjpClientExchange extends AbstractAttachable implements ClientExchange { } void terminateRequest() { - state |= REQUEST_TERMINATED; + stateUpdater.accumulateAndGet(this, REQUEST_TERMINATED, (currentState, flag)-> currentState | flag); if(!clientConnection.isOpen()) { - state |= RESPONSE_TERMINATED; + stateUpdater.accumulateAndGet(this, RESPONSE_TERMINATED, (currentState, flag)-> currentState | flag); } if (anyAreSet(state, RESPONSE_TERMINATED)) { clientConnection.requestDone(); @@ -88,9 +91,9 @@ void terminateRequest() { } void terminateResponse() { - state |= RESPONSE_TERMINATED; + stateUpdater.accumulateAndGet(this, RESPONSE_TERMINATED, (currentState, flag)-> currentState | flag); if(!clientConnection.isOpen()) { - state |= REQUEST_TERMINATED; + stateUpdater.accumulateAndGet(this, REQUEST_TERMINATED, (currentState, flag)-> currentState | flag); } if (anyAreSet(state, REQUEST_TERMINATED)) { clientConnection.requestDone(); @@ -155,7 +158,7 @@ public StreamSinkChannel getRequestChannel() { return new DetachableStreamSinkChannel(requestChannel) { @Override protected boolean isFinished() { - return anyAreSet(state, REQUEST_TERMINATED); + return anyAreSet(AjpClientExchange.this.state, REQUEST_TERMINATED); } }; } @@ -165,7 +168,7 @@ public StreamSourceChannel getResponseChannel() { return new DetachableStreamSourceChannel(responseChannel) { @Override protected boolean isFinished() { - return anyAreSet(state, RESPONSE_TERMINATED); + return anyAreSet(AjpClientExchange.this.state, RESPONSE_TERMINATED); } }; } From 9b5ef8c02d87a95efaca770fd2e3b77cd24aa478 Mon Sep 17 00:00:00 2001 From: baranowb Date: Mon, 4 Nov 2024 09:41:28 +0100 Subject: [PATCH 14/30] [UNDERTOW-2356] HttpClientConnection --- .../client/http/HttpClientConnection.java | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/io/undertow/client/http/HttpClientConnection.java b/core/src/main/java/io/undertow/client/http/HttpClientConnection.java index 7d66368cf2..5824b6ffe4 100644 --- a/core/src/main/java/io/undertow/client/http/HttpClientConnection.java +++ b/core/src/main/java/io/undertow/client/http/HttpClientConnection.java @@ -83,6 +83,7 @@ import java.util.List; import java.util.Locale; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static io.undertow.client.UndertowClientMessages.MESSAGES; import static org.xnio.Bits.allAreClear; @@ -132,7 +133,9 @@ public void handleEvent(StreamSourceConduit channel) { private static final int CLOSE_REQ = 1 << 30; private static final int CLOSED = 1 << 31; - private int state; + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(HttpClientConnection.class, "state"); private final ChannelListener.SimpleSetter closeSetter = new ChannelListener.SimpleSetter<>(); private final ClientStatistics clientStatistics; @@ -178,7 +181,13 @@ public void activity(long bytes) { public void handleEvent(StreamConnection channel) { log.debugf("connection to %s closed", getPeerAddress()); - HttpClientConnection.this.state |= CLOSED; + final int oldVal = stateUpdater.getAndAccumulate(HttpClientConnection.this, CLOSED, (currentState, flag)-> currentState | flag); + if(anyAreSet(oldVal, CLOSED)) { + //this was closed already? + UndertowLogger.ROOT_LOGGER.failedToTransitionToState("CLOSED", HttpClientConnection.this); + //NOTE: cant do that.... + //return; + } ChannelListeners.invokeChannelListener(HttpClientConnection.this, closeSetter.get()); try { if (pooledBuffer != null) { @@ -256,7 +265,7 @@ public boolean isOpen() { if(http2Delegate != null) { return http2Delegate.isOpen(); } - return connection.isOpen() && allAreClear(state, CLOSE_REQ | CLOSED); + return connection.isOpen() && allAreClear(state, CLOSED | CLOSE_REQ); } @Override @@ -348,10 +357,10 @@ public void sendRequest(final ClientRequest request, final ClientCallback currentState | flag); } else if (Headers.UPGRADE.equalToString(connectionString)) { - state |= UPGRADE_REQUESTED; + stateUpdater.accumulateAndGet(HttpClientConnection.this, UPGRADE_REQUESTED, (currentState, flag)-> currentState | flag); } } else if (request.getProtocol() != Protocols.HTTP_1_1) { - state |= CLOSE_REQ; + stateUpdater.accumulateAndGet(HttpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag); } if (request.getRequestHeaders().contains(Headers.UPGRADE)) { - state |= UPGRADE_REQUESTED; + stateUpdater.accumulateAndGet(HttpClientConnection.this, UPGRADE_REQUESTED, (currentState, flag)-> currentState | flag); } if(request.getMethod().equals(Methods.CONNECT)) { //we treat CONNECT like upgrade requests - state |= UPGRADE_REQUESTED; + stateUpdater.accumulateAndGet(HttpClientConnection.this, UPGRADE_REQUESTED, (currentState, flag)-> currentState | flag); } //setup the client request conduits @@ -476,7 +485,7 @@ public StreamConnection performUpgrade() throws IOException { if (allAreSet(state, UPGRADED | CLOSE_REQ | CLOSED)) { throw new IOException(UndertowClientMessages.MESSAGES.connectionClosed()); } - state |= UPGRADED; + stateUpdater.accumulateAndGet(this, UPGRADED, (currentState, flag)-> currentState | flag); connection.getSinkChannel().setConduit(originalSinkConduit); connection.getSourceChannel().setConduit(pushBackStreamSourceConduit); return connection; @@ -490,7 +499,7 @@ public void close() throws IOException { if (anyAreSet(state, CLOSED)) { return; } - state |= CLOSED | CLOSE_REQ; + stateUpdater.accumulateAndGet(this, CLOSED | CLOSE_REQ, (currentState, flag)-> currentState | flag); ConnectionUtils.cleanClose(connection); } @@ -508,7 +517,7 @@ public void exchangeDone() { if (anyAreSet(state, CLOSE_REQ)) { currentRequest = null; pendingResponse = null; - this.state |= CLOSED; + stateUpdater.accumulateAndGet(this, CLOSED, (currentState, flag)-> currentState | flag); safeClose(connection); } else if (anyAreSet(state, UPGRADE_REQUESTED)) { connection.getSourceChannel().suspendReads(); @@ -630,7 +639,7 @@ public void handleEvent(StreamSourceChannel channel) { if ((connectionString == null || !Headers.UPGRADE.equalToString(connectionString)) && !response.getResponseHeaders().contains(Headers.UPGRADE)) { if(!currentRequest.getRequest().getMethod().equals(Methods.CONNECT) || response.getResponseCode() != 200) { //make sure it was not actually a connect request //just unset the upgrade requested flag - HttpClientConnection.this.state &= ~UPGRADE_REQUESTED; + stateUpdater.accumulateAndGet(HttpClientConnection.this, ~UPGRADE_REQUESTED, (currentState, flag)-> currentState & flag); } } } @@ -647,7 +656,7 @@ public void handleEvent(StreamSourceChannel channel) { close = true; } if(close) { - HttpClientConnection.this.state |= CLOSE_REQ; + stateUpdater.accumulateAndGet(HttpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag); //we are going to close, kill any queued connections HttpClientExchange ex = pendingQueue.poll(); while (ex != null) { @@ -673,7 +682,7 @@ public void handleEvent(StreamSourceChannel channel) { currentRequest.setResponse(response); if(response.getResponseCode() == StatusCodes.EXPECTATION_FAILED) { if(HttpContinue.requiresContinueResponse(currentRequest.getRequest().getRequestHeaders())) { - HttpClientConnection.this.state |= CLOSE_REQ; + stateUpdater.accumulateAndGet(HttpClientConnection.this, CLOSE_REQ, (currentState, flag)-> currentState | flag); ConduitStreamSinkChannel sinkChannel = HttpClientConnection.this.connection.getSinkChannel(); sinkChannel.shutdownWrites(); if(!sinkChannel.flush()) { @@ -749,7 +758,7 @@ private void prepareResponseChannel(ClientResponse response, ClientExchange exch connection.getSourceChannel().setConduit(new FixedLengthStreamSourceConduit(connection.getSourceChannel().getConduit(), 0, responseFinishedListener)); } else { connection.getSourceChannel().setConduit(new FinishableStreamSourceConduit(connection.getSourceChannel().getConduit(), responseFinishedListener)); - state |= CLOSE_REQ; + stateUpdater.accumulateAndGet(this, CLOSE_REQ, (currentState, flag)-> currentState | flag); } } From 7b899d435257bd51ac04ee68584a15509b90b589 Mon Sep 17 00:00:00 2001 From: baranowb Date: Mon, 4 Nov 2024 10:18:57 +0100 Subject: [PATCH 15/30] [UNDERTOW-2356] HttpClientExchange --- .../client/http/HttpClientExchange.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/io/undertow/client/http/HttpClientExchange.java b/core/src/main/java/io/undertow/client/http/HttpClientExchange.java index 77834ad2ac..b8ad08598c 100644 --- a/core/src/main/java/io/undertow/client/http/HttpClientExchange.java +++ b/core/src/main/java/io/undertow/client/http/HttpClientExchange.java @@ -34,6 +34,7 @@ import org.xnio.channels.StreamSourceChannel; import java.io.IOException; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static org.xnio.Bits.anyAreSet; @@ -57,7 +58,9 @@ class HttpClientExchange extends AbstractAttachable implements ClientExchange { private IOException failedReason; private HttpRequestConduit requestConduit; - private int state = 0; + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(HttpClientExchange.class, "state"); private static final int REQUEST_TERMINATED = 1; private static final int RESPONSE_TERMINATED = 1 << 1; @@ -81,28 +84,28 @@ public void setRequestConduit(HttpRequestConduit requestConduit) { } void terminateRequest() { - if(anyAreSet(state, REQUEST_TERMINATED)) { + if(anyAreSet(HttpClientExchange.this.state, REQUEST_TERMINATED)) { return; } log.debugf("request terminated for request to %s %s", clientConnection.getPeerAddress(), getRequest().getPath()); - state |= REQUEST_TERMINATED; + stateUpdater.accumulateAndGet(this, REQUEST_TERMINATED, (currentState, flag)-> currentState | flag); clientConnection.requestDataSent(); - if (anyAreSet(state, RESPONSE_TERMINATED)) { + if (anyAreSet(HttpClientExchange.this.state, RESPONSE_TERMINATED)) { clientConnection.exchangeDone(); } } boolean isRequestDataSent() { - return anyAreSet(state, REQUEST_TERMINATED); + return anyAreSet(HttpClientExchange.this.state, REQUEST_TERMINATED); } void terminateResponse() { - if(anyAreSet(state, RESPONSE_TERMINATED)) { + if(anyAreSet(HttpClientExchange.this.state, RESPONSE_TERMINATED)) { return; } log.debugf("response terminated for request to %s %s", clientConnection.getPeerAddress(), getRequest().getPath()); - state |= RESPONSE_TERMINATED; - if (anyAreSet(state, REQUEST_TERMINATED)) { + stateUpdater.accumulateAndGet(this, RESPONSE_TERMINATED, (currentState, flag)-> currentState | flag); + if (anyAreSet(HttpClientExchange.this.state, REQUEST_TERMINATED)) { clientConnection.exchangeDone(); } } @@ -168,7 +171,7 @@ public StreamSinkChannel getRequestChannel() { return new DetachableStreamSinkChannel(clientConnection.getConnection().getSinkChannel()) { @Override protected boolean isFinished() { - return anyAreSet(state, REQUEST_TERMINATED); + return anyAreSet(HttpClientExchange.this.state, REQUEST_TERMINATED); } }; } @@ -178,7 +181,7 @@ public StreamSourceChannel getResponseChannel() { return new DetachableStreamSourceChannel(clientConnection.getConnection().getSourceChannel()) { @Override protected boolean isFinished() { - return anyAreSet(state, RESPONSE_TERMINATED); + return anyAreSet(HttpClientExchange.this.state, RESPONSE_TERMINATED); } }; } From 94c8d14c6fe656e7ff6ae9eb949983fca5b3a1f7 Mon Sep 17 00:00:00 2001 From: baranowb Date: Mon, 4 Nov 2024 12:14:54 +0100 Subject: [PATCH 16/30] [UNDERTOW-2356] ChunkedStreamSinkConduit --- .../conduits/ChunkedStreamSinkConduit.java | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/io/undertow/conduits/ChunkedStreamSinkConduit.java b/core/src/main/java/io/undertow/conduits/ChunkedStreamSinkConduit.java index ec8a8e3a61..f874c3ea97 100644 --- a/core/src/main/java/io/undertow/conduits/ChunkedStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/conduits/ChunkedStreamSinkConduit.java @@ -25,6 +25,7 @@ import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.Supplier; import io.undertow.UndertowLogger; @@ -82,7 +83,9 @@ public class ChunkedStreamSinkConduit extends AbstractStreamSinkConduit stateUpdater = AtomicIntegerFieldUpdater.newUpdater(ChunkedStreamSinkConduit.class, "state"); private int chunkleft = 0; private final ByteBuffer chunkingBuffer = ByteBuffer.allocate(12); //12 is the most @@ -136,7 +139,7 @@ int doWrite(final ByteBuffer src) throws IOException { if(src.remaining() == 0) { return 0; } - this.state |= FLAG_FIRST_DATA_WRITTEN; + stateUpdater.accumulateAndGet(this, FLAG_FIRST_DATA_WRITTEN, (currentState, flag)-> currentState | flag); int oldLimit = src.limit(); boolean dataRemaining = false; //set to true if there is data in src that still needs to be written out if (chunkleft == 0 && !chunkingSepBuffer.hasRemaining()) { @@ -147,7 +150,7 @@ int doWrite(final ByteBuffer src) throws IOException { chunkingSepBuffer.clear(); chunkingSepBuffer.put(CRLF); chunkingSepBuffer.flip(); - state |= FLAG_WRITTEN_FIRST_CHUNK; + stateUpdater.accumulateAndGet(this, FLAG_WRITTEN_FIRST_CHUNK, (currentState, flag)-> currentState | flag); chunkleft = src.remaining(); } else { if (src.remaining() > chunkleft) { @@ -172,10 +175,10 @@ int doWrite(final ByteBuffer src) throws IOException { result = next.write(buf, 0, buf.length); } if (!src.hasRemaining()) { - state |= FLAG_WRITES_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_WRITES_SHUTDOWN, (currentState, flag)-> currentState | flag); } if (!lastChunkBuffer.getBuffer().hasRemaining()) { - state |= FLAG_NEXT_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_NEXT_SHUTDOWN, (currentState, flag)-> currentState | flag); lastChunkBuffer.close(); } } @@ -265,7 +268,7 @@ public boolean flush() throws IOException { if (anyAreSet(state, FLAG_FINISHED)) { return true; } - this.state |= FLAG_FIRST_DATA_WRITTEN; + stateUpdater.accumulateAndGet(this, FLAG_FIRST_DATA_WRITTEN, (currentState, flag)-> currentState | flag); if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) { if (anyAreSet(state, FLAG_NEXT_SHUTDOWN)) { boolean val = next.flush(); @@ -280,7 +283,7 @@ public boolean flush() throws IOException { if (anyAreSet(config, CONF_FLAG_PASS_CLOSE)) { next.terminateWrites(); } - state |= FLAG_NEXT_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_NEXT_SHUTDOWN, (currentState, flag)-> currentState | flag); boolean val = next.flush(); if (val && allAreClear(state, FLAG_FINISHED)) { invokeFinishListener(); @@ -296,7 +299,7 @@ public boolean flush() throws IOException { } private void invokeFinishListener() { - state |= FLAG_FINISHED; + stateUpdater.accumulateAndGet(this, FLAG_FINISHED, (currentState, flag)-> currentState | flag); if (finishListener != null) { finishListener.handleEvent(this); } @@ -313,7 +316,7 @@ public void terminateWrites() throws IOException { //todo: should we make this behaviour configurable? responseHeaders.put(Headers.CONTENT_LENGTH, "0"); //according to the spec we don't actually need this, but better to be safe responseHeaders.remove(Headers.TRANSFER_ENCODING); - state |= FLAG_NEXT_SHUTDOWN | FLAG_WRITES_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_NEXT_SHUTDOWN | FLAG_WRITES_SHUTDOWN, (currentState, flag)-> currentState | flag); try { flush(); } catch (IOException ignore) { @@ -325,7 +328,7 @@ public void terminateWrites() throws IOException { } } else { createLastChunk(false); - state |= FLAG_WRITES_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_WRITES_SHUTDOWN, (currentState, flag)-> currentState | flag); try { flush(); } catch (IOException ignore) { From a55efee16397f9b1ea2a3eb5f083aa7774d42b00 Mon Sep 17 00:00:00 2001 From: baranowb Date: Mon, 4 Nov 2024 15:42:33 +0100 Subject: [PATCH 17/30] [UNDERTOW-2356] DeflatingStreamSinkConduit --- .../conduits/DeflatingStreamSinkConduit.java | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/io/undertow/conduits/DeflatingStreamSinkConduit.java b/core/src/main/java/io/undertow/conduits/DeflatingStreamSinkConduit.java index 24acce45dc..6dad5b21f5 100644 --- a/core/src/main/java/io/undertow/conduits/DeflatingStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/conduits/DeflatingStreamSinkConduit.java @@ -27,6 +27,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.zip.Deflater; import io.undertow.server.Connectors; @@ -77,7 +78,9 @@ public class DeflatingStreamSinkConduit implements StreamSinkConduit { */ private ByteBuffer trailerBuffer; - private int state = 0; + @SuppressWarnings("unused") + private volatile int state = 0; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(DeflatingStreamSinkConduit.class, "state"); private static final int SHUTDOWN = 1; private static final int NEXT_SHUTDOWN = 1 << 1; @@ -220,7 +223,7 @@ public XnioWorker getWorker() { @Override public void suspendWrites() { if (next == null) { - state = state & ~WRITES_RESUMED; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, ~WRITES_RESUMED, (currentState, flag)-> currentState & flag); } else { next.suspendWrites(); } @@ -248,7 +251,7 @@ public void wakeupWrites() { @Override public void resumeWrites() { if (next == null) { - state |= WRITES_RESUMED; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, WRITES_RESUMED, (currentState, flag)-> currentState | flag); queueWriteListener(); } else { next.resumeWrites(); @@ -278,7 +281,7 @@ public void terminateWrites() throws IOException { if (deflater != null) { deflater.finish(); } - state |= SHUTDOWN; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, SHUTDOWN, (currentState, flag)-> currentState | flag); } @Override @@ -343,7 +346,7 @@ public boolean flush() throws IOException { } final ByteBuffer buffer = currentBuffer.getBuffer(); if (allAreClear(state, WRITTEN_TRAILER)) { - state |= WRITTEN_TRAILER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, WRITTEN_TRAILER, (currentState, flag)-> currentState | flag); byte[] data = getTrailer(); if (data != null) { Connectors.updateResponseBytesSent(exchange, data.length); @@ -364,14 +367,14 @@ public boolean flush() throws IOException { //ok the deflater is flushed, now we need to flush the buffer if (!anyAreSet(state, FLUSHING_BUFFER)) { buffer.flip(); - state |= FLUSHING_BUFFER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, FLUSHING_BUFFER, (currentState, flag)-> currentState | flag); if (next == null) { nextCreated = true; this.next = createNextChannel(); } } if (performFlushIfRequired()) { - state |= NEXT_SHUTDOWN; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, NEXT_SHUTDOWN, (currentState, flag)-> currentState | flag); freeBuffer(); next.terminateWrites(); return next.flush(); @@ -389,7 +392,7 @@ public boolean flush() throws IOException { if(allAreClear(state, FLUSHING_BUFFER)) { //deflateData can cause this to be change currentBuffer.getBuffer().flip(); - this.state |= FLUSHING_BUFFER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, FLUSHING_BUFFER, (currentState, flag)-> currentState | flag); } } if(!performFlushIfRequired()) { @@ -450,7 +453,7 @@ private boolean performFlushIfRequiredSingleBuffer() throws IOException { } while (total < totalLength); } currentBuffer.getBuffer().clear(); - state = state & ~FLUSHING_BUFFER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, ~FLUSHING_BUFFER, (currentState, flag)-> currentState & flag); return true; } @@ -472,7 +475,7 @@ private boolean performFlushIfRequiredAdditionalBuffer() throws IOException { } trailerBuffer = null; currentBuffer.getBuffer().clear(); - state = state & ~FLUSHING_BUFFER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, ~FLUSHING_BUFFER, (currentState, flag)-> currentState & flag); return true; } @@ -514,7 +517,7 @@ private void deflateData(boolean force) throws IOException { Connectors.updateResponseBytesSent(exchange, count); if (!outputBuffer.hasRemaining()) { outputBuffer.flip(); - this.state |= FLUSHING_BUFFER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, FLUSHING_BUFFER, (currentState, flag)-> currentState | flag); if (next == null) { nextCreated = true; this.next = createNextChannel(); @@ -540,7 +543,7 @@ private void deflateData(boolean force) throws IOException { @Override public void truncateWrites() throws IOException { freeBuffer(); - state |= CLOSED; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, CLOSED, (currentState, flag)-> currentState | flag); next.truncateWrites(); } @@ -548,7 +551,7 @@ private void freeBuffer() { if (currentBuffer != null) { currentBuffer.close(); currentBuffer = null; - state = state & ~FLUSHING_BUFFER; + stateUpdater.getAndAccumulate(DeflatingStreamSinkConduit.this, ~FLUSHING_BUFFER, (currentState, flag)-> currentState & flag); } if (deflater != null) { deflater = null; From fcfa9d88514e9daefcc50ab86b1863c75da2b62c Mon Sep 17 00:00:00 2001 From: baranowb Date: Tue, 5 Nov 2024 11:30:00 +0100 Subject: [PATCH 18/30] [UNDERTOW-2356] UndertowInputStream UndertowOutputStream --- .../java/io/undertow/io/UndertowInputStream.java | 13 ++++++++----- .../java/io/undertow/io/UndertowOutputStream.java | 13 ++++++++----- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/io/undertow/io/UndertowInputStream.java b/core/src/main/java/io/undertow/io/UndertowInputStream.java index 576c715dcf..17e17d174f 100644 --- a/core/src/main/java/io/undertow/io/UndertowInputStream.java +++ b/core/src/main/java/io/undertow/io/UndertowInputStream.java @@ -30,6 +30,7 @@ import java.io.InputStream; import java.nio.ByteBuffer; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static io.undertow.UndertowLogger.REQUEST_IO_LOGGER; import static io.undertow.UndertowOptions.DEFAULT_READ_TIMEOUT; @@ -56,7 +57,9 @@ public class UndertowInputStream extends InputStream { private static final int FLAG_CLOSED = 1; private static final int FLAG_FINISHED = 1 << 1; - private int state; + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(UndertowInputStream.class, "state"); private PooledByteBuffer pooled; public UndertowInputStream(final HttpServerExchange exchange) { @@ -133,7 +136,7 @@ private void readIntoBuffer() throws IOException { int res = Channels.readBlocking(channel, pooled.getBuffer(), readTimeout, TimeUnit.MILLISECONDS); pooled.getBuffer().flip(); if (res == -1) { - state |= FLAG_FINISHED; + stateUpdater.getAndAccumulate(this, FLAG_FINISHED, (currentState, flag)-> currentState | flag); pooled.close(); pooled = null; } else if (res == 0) { @@ -153,7 +156,7 @@ private void readIntoBufferNonBlocking() throws IOException { } pooled.getBuffer().flip(); if (res == -1) { - state |= FLAG_FINISHED; + stateUpdater.getAndAccumulate(this, FLAG_FINISHED, (currentState, flag)-> currentState | flag); pooled.close(); pooled = null; } @@ -180,7 +183,7 @@ public void close() throws IOException { if (anyAreSet(state, FLAG_CLOSED)) { return; } - state |= FLAG_CLOSED; + stateUpdater.getAndAccumulate(this, FLAG_CLOSED, (currentState, flag)-> currentState | flag); try { while (allAreClear(state, FLAG_FINISHED)) { readIntoBuffer(); @@ -195,7 +198,7 @@ public void close() throws IOException { pooled = null; } channel.shutdownReads(); - state |= FLAG_FINISHED; + stateUpdater.getAndAccumulate(this, FLAG_FINISHED, (currentState, flag)-> currentState | flag); } } } diff --git a/core/src/main/java/io/undertow/io/UndertowOutputStream.java b/core/src/main/java/io/undertow/io/UndertowOutputStream.java index 22b7b49709..eaeff45be7 100644 --- a/core/src/main/java/io/undertow/io/UndertowOutputStream.java +++ b/core/src/main/java/io/undertow/io/UndertowOutputStream.java @@ -22,6 +22,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import io.undertow.UndertowMessages; import io.undertow.server.HttpServerExchange; @@ -51,7 +52,9 @@ public class UndertowOutputStream extends OutputStream implements BufferWritable private ByteBuffer buffer; private PooledByteBuffer pooledBuffer; private StreamSinkChannel channel; - private int state; + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(UndertowOutputStream.class, "state"); private long written; private final long contentLength; @@ -230,7 +233,7 @@ public void write(ByteBuffer[] buffers) throws IOException { channel = exchange.getResponseChannel(); } Channels.writeBlocking(channel, buffers, 0, buffers.length); - state |= FLAG_WRITE_STARTED; + stateUpdater.getAndAccumulate(this, FLAG_WRITE_STARTED, (currentState, flag)-> currentState | flag); } else { ByteBuffer buffer = buffer(); if (len < buffer.remaining()) { @@ -249,7 +252,7 @@ public void write(ByteBuffer[] buffers) throws IOException { Channels.writeBlocking(channel, newBuffers, 0, newBuffers.length); buffer.clear(); } - state |= FLAG_WRITE_STARTED; + stateUpdater.getAndAccumulate(this, FLAG_WRITE_STARTED, (currentState, flag)-> currentState | flag); } } updateWritten(len); @@ -304,7 +307,7 @@ private void writeBufferBlocking(final boolean writeFinal) throws IOException { } } buffer.clear(); - state |= FLAG_WRITE_STARTED; + stateUpdater.getAndAccumulate(this, FLAG_WRITE_STARTED, (currentState, flag)-> currentState | flag); } @Override @@ -330,7 +333,7 @@ public void transferFrom(FileChannel source) throws IOException { public void close() throws IOException { if (anyAreSet(state, FLAG_CLOSED)) return; try { - state |= FLAG_CLOSED; + stateUpdater.getAndAccumulate(this, FLAG_CLOSED, (currentState, flag)-> currentState | flag); if (anyAreClear(state, FLAG_WRITE_STARTED) && channel == null && !isHeadRequestWithContentLength(exchange)) { From 10787161d12850eada9e88fa1f00454565fed507 Mon Sep 17 00:00:00 2001 From: baranowb Date: Tue, 5 Nov 2024 11:40:50 +0100 Subject: [PATCH 19/30] [UNDERTOW-2356] HeadStreamSinkConduit --- .../conduits/HeadStreamSinkConduit.java | 38 ++++++++----------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/io/undertow/conduits/HeadStreamSinkConduit.java b/core/src/main/java/io/undertow/conduits/HeadStreamSinkConduit.java index aad1232380..0a3b78deab 100644 --- a/core/src/main/java/io/undertow/conduits/HeadStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/conduits/HeadStreamSinkConduit.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.xnio.IoUtils; import org.xnio.channels.StreamSourceChannel; @@ -43,7 +44,9 @@ public final class HeadStreamSinkConduit extends AbstractStreamSinkConduit finishListener; - private int state; + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(HeadStreamSinkConduit.class, "state"); private final boolean shutdownDelegate; private static final int FLAG_CLOSE_REQUESTED = 1; @@ -122,29 +125,26 @@ public long transferFrom(final StreamSourceChannel source, final long count, fin } public boolean flush() throws IOException { - int val = state; - if (anyAreSet(val, FLAG_CLOSE_COMPLETE)) { + if (anyAreSet(state, FLAG_CLOSE_COMPLETE)) { return true; } boolean flushed = false; try { return flushed = next.flush(); } finally { - exitFlush(val, flushed); + exitFlush(flushed); } } public void suspendWrites() { - long val = state; - if (anyAreSet(val, FLAG_CLOSE_COMPLETE)) { + if (anyAreSet(state, FLAG_CLOSE_COMPLETE)) { return; } next.suspendWrites(); } public void resumeWrites() { - long val = state; - if (anyAreSet(val, FLAG_CLOSE_COMPLETE)) { + if (anyAreSet(state, FLAG_CLOSE_COMPLETE)) { return; } next.resumeWrites(); @@ -156,37 +156,31 @@ public boolean isWriteResumed() { } public void wakeupWrites() { - long val = state; - if (anyAreSet(val, FLAG_CLOSE_COMPLETE)) { + if (anyAreSet(state, FLAG_CLOSE_COMPLETE)) { return; } next.wakeupWrites(); } public void terminateWrites() throws IOException { - int oldVal, newVal; - oldVal = state; - if (anyAreSet(oldVal, FLAG_CLOSE_REQUESTED | FLAG_CLOSE_COMPLETE)) { + if (anyAreSet(state, FLAG_CLOSE_REQUESTED | FLAG_CLOSE_COMPLETE)) { // no action necessary return; } - newVal = oldVal | FLAG_CLOSE_REQUESTED; - state = newVal; + stateUpdater.accumulateAndGet(this, FLAG_CLOSE_REQUESTED, (currentState, flag)-> currentState | flag); if(shutdownDelegate) { next.terminateWrites(); } } - private void exitFlush(int oldVal, boolean flushed) { - int newVal = oldVal; + private void exitFlush( boolean flushed) { boolean callFinish = false; - if (anyAreSet(oldVal, FLAG_CLOSE_REQUESTED) && flushed) { - newVal |= FLAG_CLOSE_COMPLETE; - if (!anyAreSet(oldVal, FLAG_FINISHED_CALLED)) { - newVal |= FLAG_FINISHED_CALLED; + if (anyAreSet(state, FLAG_CLOSE_REQUESTED) && flushed) { + stateUpdater.accumulateAndGet(this, FLAG_CLOSE_COMPLETE, (currentState, flag)-> currentState | flag); + if (!anyAreSet(state, FLAG_FINISHED_CALLED)) { + stateUpdater.accumulateAndGet(this, FLAG_FINISHED_CALLED, (currentState, flag)-> currentState | flag); callFinish = true; } - state = newVal; if (callFinish) { if (finishListener != null) { finishListener.handleEvent(this); From 15c128d9d92d84c5fe022722268ec9e773f908d6 Mon Sep 17 00:00:00 2001 From: baranowb Date: Wed, 6 Nov 2024 08:59:20 +0100 Subject: [PATCH 20/30] [UNDERTOW-2356] HttpServerExchange --- .../undertow/server/HttpServerExchange.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/io/undertow/server/HttpServerExchange.java b/core/src/main/java/io/undertow/server/HttpServerExchange.java index 3515907dd1..1e4a579a5a 100644 --- a/core/src/main/java/io/undertow/server/HttpServerExchange.java +++ b/core/src/main/java/io/undertow/server/HttpServerExchange.java @@ -1674,7 +1674,7 @@ HttpServerExchange setRequestStartTime(long requestStartTime) { * If the exchange is already complete this method is a noop */ public HttpServerExchange endExchange() { - final int state = this.state; + //TODO: check if this required or even desired? if (allAreSet(state, FLAG_REQUEST_TERMINATED | FLAG_RESPONSE_TERMINATED)) { if(blockingHttpExchange != null) { //we still have to close the blocking exchange in this case, @@ -1819,7 +1819,7 @@ public void handleEvent(final StreamSinkChannel channel) { channel.suspendWrites(); channel.getWriteSetter().set(null); //defensive programming, should never happen - if (anyAreClear(state, FLAG_RESPONSE_TERMINATED)) { + if (anyAreClear(HttpServerExchange.this.state, FLAG_RESPONSE_TERMINATED)) { //make sure the listeners have been invoked invokeExchangeCompleteListeners(); UndertowLogger.ROOT_LOGGER.responseWasNotTerminated(connection, HttpServerExchange.this); @@ -2070,14 +2070,14 @@ private class WriteDispatchChannel extends DetachableStreamSinkChannel implement @Override protected boolean isFinished() { - return allAreSet(state, FLAG_RESPONSE_TERMINATED); + return allAreSet(HttpServerExchange.this.state, FLAG_RESPONSE_TERMINATED); } @Override public void resumeWrites() { if (isInCall()) { setFlags(FLAG_SHOULD_RESUME_WRITES); - if(anyAreSet(state, FLAG_DISPATCHED)) { + if(anyAreSet(HttpServerExchange.this.state, FLAG_DISPATCHED)) { throw UndertowMessages.MESSAGES.resumedAndDispatched(); } } else if(!isFinished()){ @@ -2099,7 +2099,7 @@ public void wakeupWrites() { if (isInCall()) { wakeup = true; setFlags(FLAG_SHOULD_RESUME_WRITES); - if(anyAreSet(state, FLAG_DISPATCHED)) { + if(anyAreSet(HttpServerExchange.this.state, FLAG_DISPATCHED)) { throw UndertowMessages.MESSAGES.resumedAndDispatched(); } } else { @@ -2109,7 +2109,7 @@ public void wakeupWrites() { @Override public boolean isWriteResumed() { - return anyAreSet(state, FLAG_SHOULD_RESUME_WRITES) || super.isWriteResumed(); + return anyAreSet(HttpServerExchange.this.state, FLAG_SHOULD_RESUME_WRITES) || super.isWriteResumed(); } public void runResume() { @@ -2242,7 +2242,7 @@ private final class ReadDispatchChannel extends DetachableStreamSourceChannel im @Override protected boolean isFinished() { - return allAreSet(state, FLAG_REQUEST_TERMINATED); + return allAreSet(HttpServerExchange.this.state, FLAG_REQUEST_TERMINATED); } @Override @@ -2250,7 +2250,7 @@ public void resumeReads() { readsResumed = true; if (isInCall()) { setFlags(FLAG_SHOULD_RESUME_READS); - if(anyAreSet(state, FLAG_DISPATCHED)) { + if(anyAreSet(HttpServerExchange.this.state, FLAG_DISPATCHED)) { throw UndertowMessages.MESSAGES.resumedAndDispatched(); } } else if (!isFinished()) { @@ -2263,7 +2263,7 @@ public void wakeupReads() { if (isInCall()) { wakeup = true; setFlags(FLAG_SHOULD_RESUME_READS); - if(anyAreSet(state, FLAG_DISPATCHED)) { + if(anyAreSet(HttpServerExchange.this.state, FLAG_DISPATCHED)) { throw UndertowMessages.MESSAGES.resumedAndDispatched(); } } else { @@ -2445,7 +2445,7 @@ public boolean isReadResumed() { if(isFinished()) { return false; } - return anyAreSet(state, FLAG_SHOULD_RESUME_READS) || super.isReadResumed(); + return anyAreSet(HttpServerExchange.this.state, FLAG_SHOULD_RESUME_READS) || super.isReadResumed(); } @Override From d3882545e55d3316982c54dead36af116d179822 Mon Sep 17 00:00:00 2001 From: baranowb Date: Wed, 6 Nov 2024 09:01:49 +0100 Subject: [PATCH 21/30] [UNDERTOW-2356] GracefulShutdownHandler - just suppress warnin --- .../io/undertow/server/handlers/GracefulShutdownHandler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/java/io/undertow/server/handlers/GracefulShutdownHandler.java b/core/src/main/java/io/undertow/server/handlers/GracefulShutdownHandler.java index 263b79565f..d65b9106c9 100644 --- a/core/src/main/java/io/undertow/server/handlers/GracefulShutdownHandler.java +++ b/core/src/main/java/io/undertow/server/handlers/GracefulShutdownHandler.java @@ -63,6 +63,7 @@ public class GracefulShutdownHandler implements HttpHandler { private final Object lock = new Object(); + @SuppressWarnings("unused") private volatile long state = 0; private static final AtomicLongFieldUpdater stateUpdater = AtomicLongFieldUpdater.newUpdater(GracefulShutdownHandler.class, "state"); From 6a69d27f73ca192c35a73c90c084e077ec607609 Mon Sep 17 00:00:00 2001 From: baranowb Date: Wed, 6 Nov 2024 11:36:57 +0100 Subject: [PATCH 22/30] [UNDERTOW-2356] PipeliningBufferingStreamSinkConduit --- .../PipeliningBufferingStreamSinkConduit.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/io/undertow/server/protocol/http/PipeliningBufferingStreamSinkConduit.java b/core/src/main/java/io/undertow/server/protocol/http/PipeliningBufferingStreamSinkConduit.java index 5f8fe4a574..c826076751 100644 --- a/core/src/main/java/io/undertow/server/protocol/http/PipeliningBufferingStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/server/protocol/http/PipeliningBufferingStreamSinkConduit.java @@ -24,6 +24,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import io.undertow.UndertowLogger; import io.undertow.server.HttpServerExchange; @@ -59,8 +60,9 @@ public class PipeliningBufferingStreamSinkConduit extends AbstractStreamSinkCond private static final int DELEGATE_SHUTDOWN = 1 << 1; private static final int FLUSHING = 1 << 3; - private int state; - + @SuppressWarnings("unused") + private volatile int state; + private static final AtomicIntegerFieldUpdater stateUpdater = AtomicIntegerFieldUpdater.newUpdater(PipeliningBufferingStreamSinkConduit.class, "state"); private final ByteBufferPool pool; private PooledByteBuffer buffer; @@ -157,7 +159,7 @@ private long flushBufferWithUserData(final ByteBuffer[] byteBuffers, int offset, } if (!anyAreSet(state, FLUSHING)) { - state |= FLUSHING; + stateUpdater.accumulateAndGet(this, FLUSHING, (current, flag) -> current | flag); byteBuffer.flip(); } int originalBufferedRemaining = byteBuffer.remaining(); @@ -178,7 +180,7 @@ private long flushBufferWithUserData(final ByteBuffer[] byteBuffers, int offset, if (written > originalBufferedRemaining) { buffer.close(); this.buffer = null; - state &= ~FLUSHING; + stateUpdater.accumulateAndGet(this, ~FLUSHING, (current, flag) -> current & flag); return written - originalBufferedRemaining; } return 0; @@ -186,7 +188,7 @@ private long flushBufferWithUserData(final ByteBuffer[] byteBuffers, int offset, } while (written < toWrite); buffer.close(); this.buffer = null; - state &= ~FLUSHING; + stateUpdater.accumulateAndGet(this, ~FLUSHING, (current, flag) -> current & flag); return written - originalBufferedRemaining; } @@ -221,7 +223,7 @@ private boolean flushBuffer() throws IOException { } final ByteBuffer byteBuffer = buffer.getBuffer(); if (!anyAreSet(state, FLUSHING)) { - state |= FLUSHING; + stateUpdater.accumulateAndGet(this, FLUSHING, (current, flag) -> current | flag); byteBuffer.flip(); } while (byteBuffer.hasRemaining()) { @@ -234,7 +236,7 @@ private boolean flushBuffer() throws IOException { } buffer.close(); this.buffer = null; - state &= ~FLUSHING; + stateUpdater.accumulateAndGet(this, ~FLUSHING, (current, flag) -> current & flag); return true; } @@ -266,7 +268,7 @@ public boolean flush() throws IOException { } if (anyAreSet(state, SHUTDOWN) && anyAreClear(state, DELEGATE_SHUTDOWN)) { - state |= DELEGATE_SHUTDOWN; + stateUpdater.accumulateAndGet(this, DELEGATE_SHUTDOWN, (current, flag) -> current | flag); next.terminateWrites(); } return next.flush(); @@ -276,9 +278,9 @@ public boolean flush() throws IOException { @Override public void terminateWrites() throws IOException { - state |= SHUTDOWN; + stateUpdater.accumulateAndGet(this, SHUTDOWN, (current, flag) -> current | flag); if (buffer == null) { - state |= DELEGATE_SHUTDOWN; + stateUpdater.accumulateAndGet(this, DELEGATE_SHUTDOWN, (current, flag) -> current | flag); next.terminateWrites(); } } From 8ad1f12fecfd150d3c0cb3a9adc1cdab3bb0df2f Mon Sep 17 00:00:00 2001 From: baranowb Date: Thu, 7 Nov 2024 17:02:44 +0100 Subject: [PATCH 23/30] [UNDERTOW-2356] HttpRequestConduit --- .../client/http/HttpRequestConduit.java | 46 +++++++++++++------ 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/io/undertow/client/http/HttpRequestConduit.java b/core/src/main/java/io/undertow/client/http/HttpRequestConduit.java index f25ad40c88..22ccbfd1d4 100644 --- a/core/src/main/java/io/undertow/client/http/HttpRequestConduit.java +++ b/core/src/main/java/io/undertow/client/http/HttpRequestConduit.java @@ -52,8 +52,6 @@ final class HttpRequestConduit extends AbstractStreamSinkConduit nameIterator; private String string; private HttpString headerName; @@ -79,6 +77,7 @@ final class HttpRequestConduit extends AbstractStreamSinkConduit stateUpdater = AtomicIntegerFieldUpdater.newUpdater( HttpRequestConduit.class, "state"); @@ -465,7 +464,7 @@ private int doProcessWrite(int state, final ByteBuffer userData) throws IOExcept if (res == 0) { log.trace("Continuation"); this.charIndex = i; - this.state = STATE_URL; + stateUpdater.set(this, STATE_URL) ; return STATE_URL; } } while (buffer.hasRemaining()); @@ -534,14 +533,14 @@ public int write(final ByteBuffer src) throws IOException { } return alreadyWritten; } catch (IOException | RuntimeException | Error e) { - this.state |= FLAG_SHUTDOWN; + setFlags(FLAG_SHUTDOWN); if(pooledBuffer != null) { pooledBuffer.close(); pooledBuffer = null; } throw e; } finally { - this.state = oldState & ~MASK_STATE | state; + stateUpdater.set(this, oldState & ~MASK_STATE | state); } } @@ -576,14 +575,14 @@ public long write(final ByteBuffer[] srcs, final int offset, final int length) t } return length == 1 ? next.write(srcs[offset]) : next.write(srcs, offset, length); } catch (IOException | RuntimeException | Error e) { - this.state |= FLAG_SHUTDOWN; + setFlags(FLAG_SHUTDOWN); if(pooledBuffer != null) { pooledBuffer.close(); pooledBuffer = null; } throw e; } finally { - this.state = oldVal & ~MASK_STATE | state; + stateUpdater.set(this, oldVal & ~MASK_STATE | state); } } @@ -630,7 +629,7 @@ public long transferFrom(final FileChannel src, final long position, final long } throw e; } finally { - this.state = oldVal & ~MASK_STATE | state; + stateUpdater.set(this, oldVal & ~MASK_STATE | state); } } @@ -661,14 +660,14 @@ public long transferFrom(final StreamSourceChannel source, final long count, fin } return next.transferFrom(source, count, throughBuffer); } catch (IOException | RuntimeException | Error e) { - this.state |= FLAG_SHUTDOWN; + setFlags(FLAG_SHUTDOWN); if(pooledBuffer != null) { pooledBuffer.close(); pooledBuffer = null; } throw e; } finally { - this.state = oldVal & ~MASK_STATE | state; + stateUpdater.set(this, oldVal & ~MASK_STATE | state); } } @@ -698,14 +697,14 @@ public boolean flush() throws IOException { log.trace("Delegating flush"); return next.flush(); } catch (IOException | RuntimeException | Error e) { - this.state |= FLAG_SHUTDOWN; + setFlags(FLAG_SHUTDOWN); if(pooledBuffer != null) { pooledBuffer.close(); pooledBuffer = null; } throw e; } finally { - this.state = oldVal & ~MASK_STATE | state; + stateUpdater.set(this, oldVal & ~MASK_STATE | state); } } @@ -717,7 +716,7 @@ public void terminateWrites() throws IOException { next.terminateWrites(); return; } - this.state = oldVal | FLAG_SHUTDOWN; + stateUpdater.set(this, oldVal | FLAG_SHUTDOWN); } public void truncateWrites() throws IOException { @@ -734,7 +733,7 @@ public void truncateWrites() throws IOException { } return; } - this.state = oldVal & ~MASK_STATE | FLAG_SHUTDOWN; + stateUpdater.set(this, oldVal & ~MASK_STATE | FLAG_SHUTDOWN); throw new TruncatedResponseException(); } @@ -746,7 +745,24 @@ public void freeBuffers() { if(pooledBuffer != null) { pooledBuffer.close(); pooledBuffer = null; - this.state = state & ~MASK_STATE | FLAG_SHUTDOWN; + //this.state = state & ~MASK_STATE | FLAG_SHUTDOWN; + clearFlags(MASK_STATE); + setFlags(FLAG_SHUTDOWN); } } + + + private void setFlags(int flags) { + int old; + do { + old = state; + } while (!stateUpdater.compareAndSet(this, old, old | flags)); + } + + private void clearFlags(int flags) { + int old; + do { + old = state; + } while (!stateUpdater.compareAndSet(this, old, old & ~flags)); + } } From d02948d1364e6876f49cab52e6a1e886b4a5975f Mon Sep 17 00:00:00 2001 From: baranowb Date: Fri, 8 Nov 2024 12:14:07 +0100 Subject: [PATCH 24/30] [UNDERTOW-2356] HttpResponseConduit --- .../protocol/http/HttpResponseConduit.java | 40 +++++++++++++------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/io/undertow/server/protocol/http/HttpResponseConduit.java b/core/src/main/java/io/undertow/server/protocol/http/HttpResponseConduit.java index f8fdfe916c..49c605d8f2 100644 --- a/core/src/main/java/io/undertow/server/protocol/http/HttpResponseConduit.java +++ b/core/src/main/java/io/undertow/server/protocol/http/HttpResponseConduit.java @@ -41,6 +41,7 @@ import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static org.xnio.Bits.allAreClear; import static org.xnio.Bits.allAreSet; @@ -57,8 +58,6 @@ final class HttpResponseConduit extends AbstractStreamSinkConduit stateUpdater = AtomicIntegerFieldUpdater.newUpdater( + HttpResponseConduit.class, "state"); + HttpResponseConduit(final StreamSinkConduit next, final ByteBufferPool pool, HttpServerConnection connection) { super(next); this.pool = pool; @@ -102,9 +105,8 @@ final class HttpResponseConduit extends AbstractStreamSinkConduit= 50; // append protocol HttpString protocol = exchange.getProtocol(); @@ -232,7 +234,7 @@ private int processWrite(int state, final Object userData, int pos, int length) this.headerValues = headerValues; this.valueIdx = valueIdx; this.charIndex = 0; - this.state = STATE_HDR_NAME; + stateUpdater.set(this, STATE_HDR_NAME); buffer.flip(); return processStatefulWrite(STATE_HDR_NAME, userData, pos, length); } @@ -247,7 +249,7 @@ private int processWrite(int state, final Object userData, int pos, int length) this.headerValues = headerValues; this.valueIdx = valueIdx; this.charIndex = 0; - this.state = STATE_HDR_VAL; + stateUpdater.set(this, STATE_HDR_VAL); buffer.flip(); return processStatefulWrite(STATE_HDR_VAL, userData, pos, length); } @@ -298,7 +300,7 @@ private int processWrite(int state, final Object userData, int pos, int length) } finally { if (buffer != null) { bufferDone(); - this.state &= ~POOLED_BUFFER_IN_USE; + clearFlags(POOLED_BUFFER_IN_USE); } } } @@ -642,7 +644,7 @@ public int write(final ByteBuffer src) throws IOException { } return alreadyWritten; } finally { - this.state = oldState & ~MASK_STATE | state; + stateUpdater.set(this, oldState & ~MASK_STATE | state); } } catch(IOException|RuntimeException|Error e) { IoUtils.safeClose(connection); @@ -681,7 +683,7 @@ public long write(final ByteBuffer[] srcs, final int offset, final int length) t IoUtils.safeClose(connection); throw e; } finally { - this.state = oldVal & ~MASK_STATE | state; + stateUpdater.set(this, oldVal & ~MASK_STATE | state); } } @@ -784,7 +786,7 @@ public boolean flush() throws IOException { IoUtils.safeClose(connection); throw e; } finally { - this.state = oldVal & ~MASK_STATE | state; + stateUpdater.set(this, oldVal & ~MASK_STATE | state); } } @@ -796,7 +798,7 @@ public void terminateWrites() throws IOException { next.terminateWrites(); return; } - this.state = oldVal | FLAG_SHUTDOWN; + stateUpdater.set(this, oldVal | FLAG_SHUTDOWN); } catch (IOException | RuntimeException | Error e) { IoUtils.safeClose(connection); throw e; @@ -834,4 +836,18 @@ void freeBuffers() { pooledFileTransferBuffer = null; } } + + private void setFlags(int flags) { + int old; + do { + old = state; + } while (!stateUpdater.compareAndSet(this, old, old | flags)); + } + + private void clearFlags(int flags) { + int old; + do { + old = state; + } while (!stateUpdater.compareAndSet(this, old, old & ~flags)); + } } From e1fd79b77ee1cb562ef83d4a17da4e84e31c13ae Mon Sep 17 00:00:00 2001 From: baranowb Date: Fri, 8 Nov 2024 14:24:05 +0100 Subject: [PATCH 25/30] [UNDERTOW-2356] ChunkReader - minor, use local copy for check --- core/src/main/java/io/undertow/conduits/ChunkReader.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/io/undertow/conduits/ChunkReader.java b/core/src/main/java/io/undertow/conduits/ChunkReader.java index f4816e3f92..8f127e8abd 100644 --- a/core/src/main/java/io/undertow/conduits/ChunkReader.java +++ b/core/src/main/java/io/undertow/conduits/ChunkReader.java @@ -71,7 +71,7 @@ public long readChunk(final ByteBuffer buf) throws IOException { long oldVal = state; long chunkRemaining = state & MASK_COUNT; - if (chunkRemaining > 0 && !anyAreSet(state, FLAG_READING_AFTER_LAST | FLAG_READING_LENGTH | FLAG_READING_NEWLINE | FLAG_READING_TILL_END_OF_LINE)) { + if (chunkRemaining > 0 && !anyAreSet(oldVal, FLAG_READING_AFTER_LAST | FLAG_READING_LENGTH | FLAG_READING_NEWLINE | FLAG_READING_TILL_END_OF_LINE)) { return chunkRemaining; } long newVal = oldVal & ~MASK_COUNT; @@ -159,10 +159,11 @@ public long getChunkRemaining() { } public void setChunkRemaining(final long remaining) { - if (remaining < 0 || anyAreSet(state, FLAG_READING_LENGTH | FLAG_READING_TILL_END_OF_LINE | FLAG_READING_NEWLINE | FLAG_READING_AFTER_LAST)) { + long old = state; + if (remaining < 0 || anyAreSet(old, FLAG_READING_LENGTH | FLAG_READING_TILL_END_OF_LINE | FLAG_READING_NEWLINE | FLAG_READING_AFTER_LAST)) { return; } - long old = state; + long oldRemaining = old & MASK_COUNT; if (remaining == 0 && oldRemaining != 0) { //if oldRemaining is zero it could be that no data has been read yet From cb49088924386f2165977273a67bdc31f196134d Mon Sep 17 00:00:00 2001 From: baranowb Date: Tue, 12 Nov 2024 10:34:16 +0100 Subject: [PATCH 26/30] [UNDERTOW-2356] AbstractFixedLengthStreamSinkConduit --- .../AbstractFixedLengthStreamSinkConduit.java | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/io/undertow/conduits/AbstractFixedLengthStreamSinkConduit.java b/core/src/main/java/io/undertow/conduits/AbstractFixedLengthStreamSinkConduit.java index c8379ef24c..c8d324b49f 100644 --- a/core/src/main/java/io/undertow/conduits/AbstractFixedLengthStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/conduits/AbstractFixedLengthStreamSinkConduit.java @@ -19,6 +19,7 @@ package io.undertow.conduits; import io.undertow.UndertowLogger; + import org.xnio.Buffers; import org.xnio.channels.FixedLengthOverflowException; import org.xnio.channels.StreamSourceChannel; @@ -31,6 +32,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import static java.lang.Math.min; import static org.xnio.Bits.allAreClear; @@ -46,7 +48,9 @@ public abstract class AbstractFixedLengthStreamSinkConduit extends AbstractStreamSinkConduit { private int config; - private long state; + @SuppressWarnings("unused") + private volatile long state; + protected static final AtomicLongFieldUpdater stateUpdater = AtomicLongFieldUpdater.newUpdater(AbstractFixedLengthStreamSinkConduit.class, "state"); private boolean broken = false; @@ -78,7 +82,7 @@ public AbstractFixedLengthStreamSinkConduit(final StreamSinkConduit next, final } protected void reset(long contentLength, boolean propagateClose) { - this.state = contentLength; + stateUpdater.set(this, contentLength); if (propagateClose) { config |= CONF_FLAG_PASS_CLOSE; } else { @@ -256,7 +260,7 @@ public void terminateWrites() throws IOException { next.truncateWrites(); } finally { if (!anyAreSet(state, FLAG_FINISHED_CALLED)) { - state |= FLAG_FINISHED_CALLED; + stateUpdater.accumulateAndGet(this, FLAG_FINISHED_CALLED, (current, flag)-> current | flag); channelFinished(); } } @@ -270,7 +274,7 @@ public void terminateWrites() throws IOException { public void truncateWrites() throws IOException { try { if (!anyAreSet(state, FLAG_FINISHED_CALLED)) { - state |= FLAG_FINISHED_CALLED; + stateUpdater.accumulateAndGet(this, FLAG_FINISHED_CALLED, (current, flag)-> current | flag); channelFinished(); } } finally { @@ -298,7 +302,7 @@ public long getRemaining() { private void exitWrite(long oldVal, long consumed) { long newVal = oldVal - consumed; - state = newVal; + stateUpdater.set(this, newVal); } private void exitFlush(long oldVal, boolean flushed) { @@ -311,7 +315,7 @@ private void exitFlush(long oldVal, boolean flushed) { newVal |= FLAG_FINISHED_CALLED; callFinish = true; } - state = newVal; + stateUpdater.set(this, newVal); if (callFinish) { channelFinished(); } @@ -322,18 +326,20 @@ protected void channelFinished() { } private long enterShutdown() { - long oldVal, newVal; - oldVal = state; + long oldVal; + oldVal = stateUpdater.get(this); if (anyAreSet(oldVal, FLAG_CLOSE_REQUESTED | FLAG_CLOSE_COMPLETE)) { // no action necessary return oldVal; } - newVal = oldVal | FLAG_CLOSE_REQUESTED; + stateUpdater.accumulateAndGet(this, FLAG_CLOSE_REQUESTED, (currentState, flag)-> currentState | flag); if (anyAreSet(oldVal, MASK_COUNT)) { // error: channel not filled. set both close flags. - newVal |= FLAG_CLOSE_COMPLETE; + stateUpdater.accumulateAndGet(this, FLAG_CLOSE_REQUESTED, (currentState, flag)-> currentState | flag); + } else { + stateUpdater.accumulateAndGet(this, FLAG_CLOSE_REQUESTED, (currentState, flag)-> currentState | flag); } - state = newVal; + //state = newVal; return oldVal; } From 6d017d640cde4234ac7caa8ecbe4486850f0dc04 Mon Sep 17 00:00:00 2001 From: baranowb Date: Tue, 12 Nov 2024 11:09:42 +0100 Subject: [PATCH 27/30] [UNDERTOW-2356] FixedLengthStreamSourceConduit --- .../FixedLengthStreamSourceConduit.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/io/undertow/conduits/FixedLengthStreamSourceConduit.java b/core/src/main/java/io/undertow/conduits/FixedLengthStreamSourceConduit.java index 1757210309..b8a6214073 100644 --- a/core/src/main/java/io/undertow/conduits/FixedLengthStreamSourceConduit.java +++ b/core/src/main/java/io/undertow/conduits/FixedLengthStreamSourceConduit.java @@ -30,6 +30,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import static java.lang.Math.min; import static org.xnio.Bits.allAreClear; @@ -60,7 +61,9 @@ public final class FixedLengthStreamSourceConduit extends AbstractStreamSourceCo private final ConduitListener finishListener; @SuppressWarnings("unused") - private long state; + private volatile long state; + private static final AtomicLongFieldUpdater stateUpdater = AtomicLongFieldUpdater.newUpdater( + FixedLengthStreamSourceConduit.class, "state"); private static final long FLAG_CLOSED = 1L << 63L; private static final long FLAG_FINISHED = 1L << 62L; @@ -169,11 +172,11 @@ private void checkMaxSize(long state) throws IOException { Connectors.terminateRequest(exchange); exchange.setPersistent(false); finishListener.handleEvent(this); - this.state |= FLAG_FINISHED | FLAG_CLOSED; + stateUpdater.accumulateAndGet(this, FLAG_FINISHED | FLAG_CLOSED, (current, flag ) -> current | flag ); throw UndertowMessages.MESSAGES.requestEntityWasTooLarge(exchange.getMaxEntitySize()); } } - this.state |= FLAG_LENGTH_CHECKED; + stateUpdater.accumulateAndGet(this, FLAG_LENGTH_CHECKED, (current, flag ) -> current | flag ); } } @@ -336,7 +339,7 @@ private long enterShutdownReads() { return oldVal; } newVal = oldVal | FLAG_CLOSED; - state = newVal; + stateUpdater.set(this, newVal); return oldVal; } @@ -360,7 +363,7 @@ private void exitRead(long consumed, Throwable readError) throws IOException { if(consumed == -1) { if (anyAreSet(oldVal, MASK_COUNT)) { invokeFinishListener(); - state &= ~MASK_COUNT; + stateUpdater.accumulateAndGet(this, ~MASK_COUNT, (current, flag ) -> current & flag ); final IOException couldNotReadAll = UndertowMessages.MESSAGES.couldNotReadContentLengthData(); if (readError != null) { couldNotReadAll.addSuppressed(readError); @@ -370,11 +373,11 @@ private void exitRead(long consumed, Throwable readError) throws IOException { return; } long newVal = oldVal - consumed; - state = newVal; + stateUpdater.set(this, newVal); } private void invokeFinishListener() { - this.state |= FLAG_FINISHED; + stateUpdater.accumulateAndGet(this, FLAG_FINISHED, (current, flag ) -> current | flag ); finishListener.handleEvent(this); } From 8d9e8493291e69b5c3a74cf9606af426742d0b96 Mon Sep 17 00:00:00 2001 From: baranowb Date: Tue, 12 Nov 2024 12:18:31 +0100 Subject: [PATCH 28/30] [UNDERTOW-2356 ChunkReader] --- .../java/io/undertow/conduits/ChunkReader.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/io/undertow/conduits/ChunkReader.java b/core/src/main/java/io/undertow/conduits/ChunkReader.java index 8f127e8abd..79eeb111f3 100644 --- a/core/src/main/java/io/undertow/conduits/ChunkReader.java +++ b/core/src/main/java/io/undertow/conduits/ChunkReader.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.xnio.conduits.Conduit; import io.undertow.UndertowMessages; @@ -44,12 +45,17 @@ class ChunkReader { private static final long FLAG_READING_TILL_END_OF_LINE = 1L << 60L; private static final long FLAG_READING_NEWLINE = 1L << 59L; private static final long FLAG_READING_AFTER_LAST = 1L << 58L; + private static final long FLAG_COMPOUND = FLAG_READING_LENGTH | FLAG_READING_TILL_END_OF_LINE | FLAG_READING_NEWLINE | FLAG_READING_AFTER_LAST; private static final long MASK_COUNT = longBitMask(0, 56); private static final long LIMIT = Long.MAX_VALUE >> 4; - private long state; + @SuppressWarnings("unused") + private volatile long state; + private static final AtomicLongFieldUpdater stateUpdater = AtomicLongFieldUpdater.newUpdater( + ChunkReader.class, "state"); + private final Attachable attachable; private final AttachmentKey trailerAttachmentKey; /** @@ -144,7 +150,7 @@ public long readChunk(final ByteBuffer buf) throws IOException { } return chunkRemaining; } finally { - state = newVal | chunkRemaining; + stateUpdater.set(this, newVal | chunkRemaining); } } @@ -152,7 +158,7 @@ public long getChunkRemaining() { if (anyAreSet(state, FLAG_FINISHED)) { return -1; } - if (anyAreSet(state, FLAG_READING_LENGTH | FLAG_READING_TILL_END_OF_LINE | FLAG_READING_NEWLINE | FLAG_READING_AFTER_LAST)) { + if (anyAreSet(state, FLAG_COMPOUND)) { return 0; } return state & MASK_COUNT; @@ -160,7 +166,7 @@ public long getChunkRemaining() { public void setChunkRemaining(final long remaining) { long old = state; - if (remaining < 0 || anyAreSet(old, FLAG_READING_LENGTH | FLAG_READING_TILL_END_OF_LINE | FLAG_READING_NEWLINE | FLAG_READING_AFTER_LAST)) { + if (remaining < 0 || anyAreSet(old, FLAG_COMPOUND)) { return; } @@ -170,7 +176,7 @@ public void setChunkRemaining(final long remaining) { //and the correct state is READING_LENGTH old |= FLAG_READING_NEWLINE; } - state = (old & ~MASK_COUNT) | remaining; + stateUpdater.set(this, (old & ~MASK_COUNT) | remaining); } private int handleChunkedRequestEnd(ByteBuffer buffer) throws IOException { From 77c90e0afd73b001709215a0c75e3ad291b87f4c Mon Sep 17 00:00:00 2001 From: baranowb Date: Tue, 12 Nov 2024 13:19:41 +0100 Subject: [PATCH 29/30] [UNDERTOW-2356] PreChunkedStreamSinkConduit --- .../undertow/conduits/PreChunkedStreamSinkConduit.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/io/undertow/conduits/PreChunkedStreamSinkConduit.java b/core/src/main/java/io/undertow/conduits/PreChunkedStreamSinkConduit.java index 2255e6fdd4..942267836a 100644 --- a/core/src/main/java/io/undertow/conduits/PreChunkedStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/conduits/PreChunkedStreamSinkConduit.java @@ -33,6 +33,7 @@ import java.nio.channels.ClosedChannelException; import java.nio.channels.FileChannel; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static org.xnio.Bits.allAreClear; import static org.xnio.Bits.anyAreSet; @@ -52,7 +53,9 @@ public class PreChunkedStreamSinkConduit extends AbstractStreamSinkConduit stateUpdater = AtomicIntegerFieldUpdater.newUpdater( + PreChunkedStreamSinkConduit.class, "state"); final ChunkReader chunkReader; /** @@ -183,7 +186,7 @@ public boolean flush() throws IOException { } private void invokeFinishListener() { - state |= FLAG_FINISHED; + stateUpdater.accumulateAndGet(this, FLAG_FINISHED, (current, flag) -> current | flag); if (finishListener != null) { finishListener.handleEvent(this); } @@ -197,7 +200,7 @@ public void terminateWrites() throws IOException { if (chunkReader.getChunkRemaining() != -1) { throw UndertowMessages.MESSAGES.chunkedChannelClosedMidChunk(); } - state |= FLAG_WRITES_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_WRITES_SHUTDOWN, (current, flag) -> current | flag); } @Override From da7f3e72909a05441dfd3229c449bfccdfff312a Mon Sep 17 00:00:00 2001 From: baranowb Date: Wed, 13 Nov 2024 09:01:31 +0100 Subject: [PATCH 30/30] [UNDERTOW-2356] AbstractFramedStreamSinkConduit --- .../conduits/AbstractFramedStreamSinkConduit.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/io/undertow/conduits/AbstractFramedStreamSinkConduit.java b/core/src/main/java/io/undertow/conduits/AbstractFramedStreamSinkConduit.java index 8ca6ed87af..34ae8cb727 100644 --- a/core/src/main/java/io/undertow/conduits/AbstractFramedStreamSinkConduit.java +++ b/core/src/main/java/io/undertow/conduits/AbstractFramedStreamSinkConduit.java @@ -19,6 +19,7 @@ package io.undertow.conduits; import io.undertow.UndertowMessages; + import org.xnio.Buffers; import org.xnio.IoUtils; import io.undertow.connector.PooledByteBuffer; @@ -33,6 +34,7 @@ import java.nio.channels.FileChannel; import java.util.ArrayDeque; import java.util.Deque; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static org.xnio.Bits.allAreClear; import static org.xnio.Bits.anyAreSet; @@ -58,8 +60,9 @@ public class AbstractFramedStreamSinkConduit extends AbstractStreamSinkConduit stateUpdater = AtomicIntegerFieldUpdater.newUpdater( + AbstractFramedStreamSinkConduit.class, "state"); private static final int FLAG_WRITES_TERMINATED = 1; private static final int FLAG_DELEGATE_SHUTDOWN = 2; @@ -191,11 +194,12 @@ public void terminateWrites() throws IOException { return; } queueCloseFrames(); - state |= FLAG_WRITES_TERMINATED; if (queuedData == 0) { - state |= FLAG_DELEGATE_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_WRITES_TERMINATED | FLAG_DELEGATE_SHUTDOWN, (current, flag) -> current | flag); doTerminateWrites(); finished(); + } else { + stateUpdater.accumulateAndGet(this, FLAG_WRITES_TERMINATED, (current, flag) -> current | flag); } } @@ -212,7 +216,7 @@ protected boolean flushQueuedData() throws IOException { } if (anyAreSet(state, FLAG_WRITES_TERMINATED) && allAreClear(state, FLAG_DELEGATE_SHUTDOWN)) { doTerminateWrites(); - state |= FLAG_DELEGATE_SHUTDOWN; + stateUpdater.accumulateAndGet(this, FLAG_DELEGATE_SHUTDOWN, (current, flag) -> current | flag); finished(); } return next.flush();

Directory Listing - ").append(sortUrl).append("
NameLast ModifiedSize
Powered by Undertow