From 2e1d2de4e6ca22533874109ce6abd90ff7d52155 Mon Sep 17 00:00:00 2001 From: Javier Godoy <11554739+javier-godoy@users.noreply.github.com> Date: Mon, 10 Jun 2024 16:16:02 -0300 Subject: [PATCH 1/4] feat: support cancelling queued downloads if the UI is closed Close #127 --- .../ConcurrentStreamResourceWriter.java | 71 +++++++++++++------ .../addons/gridexporter/GridExporter.java | 58 +++++++++------ ...gurableConcurrentStreamResourceWriter.java | 11 +++ .../VaadinServiceInitListenerImpl.java | 2 + 4 files changed, 101 insertions(+), 41 deletions(-) diff --git a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java index 4bfd1ff..76b7574 100644 --- a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java +++ b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java @@ -1,11 +1,13 @@ package com.flowingcode.vaadin.addons.gridexporter; +import com.vaadin.flow.component.UI; import com.vaadin.flow.server.StreamResourceWriter; import com.vaadin.flow.server.VaadinSession; import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; import java.nio.channels.InterruptedByTimeoutException; +import java.util.Optional; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.function.IntFunction; @@ -30,6 +32,8 @@ abstract class ConcurrentStreamResourceWriter implements StreamResourceWriter { private static volatile boolean enabled; + private static volatile boolean failOnUiChange; + private final StreamResourceWriter delegate; private static final class ConfigurableSemaphore extends Semaphore { @@ -89,6 +93,10 @@ public static void setLimit(float limit) { } } + static void setFailOnUiChange(boolean failOnUiChange) { + ConcurrentStreamResourceWriter.failOnUiChange = failOnUiChange; + } + /** * Returns the limit for the number of concurrent downloads. * @@ -147,6 +155,22 @@ public float getCost(VaadinSession session) { return DEFAULT_COST; } + /** + * Returns the UI associated with the current download. + *

+ * This method is used to ensure that the UI is still attached to the current session when a + * download is initiated. Implementations should return the appropriate UI instance. + *

+ * + * @return the {@link UI} instance associated with the current download, or {@code null} if no UI + * is available. + */ + protected abstract UI getUI(); + + private UI getAttachedUI() { + return Optional.ofNullable(getUI()).filter(UI::isAttached).orElse(null); + } + /** * Callback method that is invoked when a timeout occurs while trying to acquire a permit for * starting a download. @@ -207,36 +231,41 @@ public float getCost(VaadinSession session) { public final void accept(OutputStream stream, VaadinSession session) throws IOException { onAccept(); try { - if (!enabled) { - delegate.accept(stream, session); - } else { + if (!enabled) { + delegate.accept(stream, session); + } else { - try { + try { + int permits; + float cost = getCost(session); + synchronized (semaphore) { + permits = costToPermits(cost, semaphore.maxPermits); + } - int permits; - float cost = getCost(session); - synchronized (semaphore) { - permits = costToPermits(cost, semaphore.maxPermits); - } + UI ui = failOnUiChange ? getAttachedUI() : null; - if (semaphore.tryAcquire(permits, getTimeout(), TimeUnit.NANOSECONDS)) { - try { - delegate.accept(stream, session); - } finally { - semaphore.release(permits); + if (semaphore.tryAcquire(permits, getTimeout(), TimeUnit.NANOSECONDS)) { + try { + if (ui != null && getAttachedUI()!=ui) { + // The UI has changed or was detached after acquirig the semaphore + throw new IOException("Detached UI"); } - } else { - onTimeout(); - throw new InterruptedByTimeoutException(); + delegate.accept(stream, session); + } finally { + semaphore.release(permits); } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw (IOException) new InterruptedIOException().initCause(e); + } else { + onTimeout(); + throw new InterruptedByTimeoutException(); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw (IOException) new InterruptedIOException().initCause(e); } + } } finally { onFinish(); - } + } } } \ No newline at end of file diff --git a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java index 1650c78..3604d14 100644 --- a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java +++ b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java @@ -24,6 +24,7 @@ import com.vaadin.flow.component.Component; import com.vaadin.flow.component.ComponentUtil; import com.vaadin.flow.component.HasEnabled; +import com.vaadin.flow.component.UI; import com.vaadin.flow.component.grid.ColumnPathRenderer; import com.vaadin.flow.component.grid.Grid; import com.vaadin.flow.component.grid.Grid.Column; @@ -41,6 +42,7 @@ import com.vaadin.flow.server.StreamResourceWriter; import com.vaadin.flow.server.VaadinSession; import com.vaadin.flow.shared.Registration; +import java.io.IOException; import java.io.Serializable; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; @@ -354,27 +356,32 @@ private class GridExporterConcurrentStreamResourceWriter extends ConcurrentStrea private Component button; - @Override - public float getCost(VaadinSession session) { - return concurrentDownloadCost; - } + @Override + public float getCost(VaadinSession session) { + return concurrentDownloadCost; + } - @Override - public long getTimeout() { - // It would have been possible to specify a different timeout for each instance but I cannot - // figure out a good use case for that. The timeout returned herebecomes relevant when the - // semaphore has been acquired by any other download, so the timeout must reflect how long - // it is reasonable to wait for "any other download" to complete and release the semaphore. - // - // Since the reasonable timeout would depend on the duration of "any other download", it - // makes sense that it's a global setting instead of a per-instance setting. - return concurrentDownloadTimeoutNanos; - } + @Override + public long getTimeout() { + // It would have been possible to specify a different timeout for each instance but I cannot + // figure out a good use case for that. The timeout returned herebecomes relevant when the + // semaphore has been acquired by any other download, so the timeout must reflect how long + // it is reasonable to wait for "any other download" to complete and release the semaphore. + // + // Since the reasonable timeout would depend on the duration of "any other download", it + // makes sense that it's a global setting instead of a per-instance setting. + return concurrentDownloadTimeoutNanos; + } - @Override - protected void onTimeout() { - fireConcurrentDownloadTimeout(); - } + @Override + protected UI getUI() { + return grid.getUI().orElse(null); + } + + @Override + protected void onTimeout() { + fireConcurrentDownloadTimeout(); + } @Override protected void onAccept() { @@ -386,7 +393,7 @@ protected void onAccept() { @Override protected void onFinish() { setButtonEnabled(true); - } + } private void setButtonEnabled(boolean enabled) { if (button instanceof HasEnabled) { @@ -474,6 +481,17 @@ public static float getConcurrentDownloadLimit() { return ConcurrentStreamResourceWriter.getLimit(); } + /** + * Configures the behavior of the stream operation when the UI changes during execution. + * + * @param failOnUiChange If {@code true}, the operation will throw an {@link IOException} if the + * UI changes (e.g., becomes detached) after acquiring the semaphore. If {@code false}, the + * operation will proceed regardless of any UI changes. + */ + public static void setFailOnUiChange(boolean failOnUiChange) { + ConcurrentStreamResourceWriter.setFailOnUiChange(failOnUiChange); + } + /** * Sets the timeout for acquiring a permit to start a download when the * {@linkplain #setConcurrentDownloadLimit(int) maximum number of concurrent downloads} is diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/ConfigurableConcurrentStreamResourceWriter.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/ConfigurableConcurrentStreamResourceWriter.java index 8a3d685..0b32a94 100644 --- a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/ConfigurableConcurrentStreamResourceWriter.java +++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/ConfigurableConcurrentStreamResourceWriter.java @@ -1,5 +1,6 @@ package com.flowingcode.vaadin.addons.gridexporter; +import com.vaadin.flow.component.UI; import com.vaadin.flow.server.StreamResourceWriter; import com.vaadin.flow.server.VaadinSession; @@ -13,6 +14,7 @@ public ConfigurableConcurrentStreamResourceWriter(StreamResourceWriter delegate) private float cost = GridExporter.DEFAULT_COST; private long timeout = 0L; + private UI ui; @Override public float getCost(VaadinSession session) { @@ -32,4 +34,13 @@ public void setTimeout(long timeout) { this.timeout = timeout; } + @Override + public UI getUI() { + return ui; + } + + public void setUi(UI ui) { + this.ui = ui; + } + } diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java index 5f959a2..7be7e88 100644 --- a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java +++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/VaadinServiceInitListenerImpl.java @@ -20,6 +20,8 @@ public void serviceInit(ServiceInitEvent event) { }); // end-block + GridExporter.setFailOnUiChange(true); + } } From 7eba81a59e3f962e5a6845c3f1f6b89da1723bad Mon Sep 17 00:00:00 2001 From: Javier Godoy <11554739+javier-godoy@users.noreply.github.com> Date: Wed, 11 Sep 2024 09:45:43 -0300 Subject: [PATCH 2/4] docs: fix javadocs --- .../vaadin/addons/gridexporter/GridExporter.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java index 3604d14..9efb925 100644 --- a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java +++ b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java @@ -494,7 +494,7 @@ public static void setFailOnUiChange(boolean failOnUiChange) { /** * Sets the timeout for acquiring a permit to start a download when the - * {@linkplain #setConcurrentDownloadLimit(int) maximum number of concurrent downloads} is + * {@linkplain #setConcurrentDownloadLimit(float) maximum number of concurrent downloads} is * reached. If the timeout is less than or equal to zero, the downloads will fail immediately if * no enough permits can be acquired. * @@ -687,7 +687,7 @@ public void setExportValue(Column column, ValueProvider vp) { * Configure if the column is exported or not * * @param column - * @param export: true will be included in the exported file, false will not be included + * @param export true will be included in the exported file, false will not be included */ public void setExportColumn(Column column, boolean export) { ComponentUtil.setData(column, COLUMN_EXPORTED_PROVIDER_DATA, export); @@ -755,7 +755,7 @@ public void setDateColumnFormat(Column column, DateFormat dateFormat, String * * @param column * @param dateFormat - * @param excelFormat + * @param excelFormatProvider */ public void setDateColumnFormatProvider(Column column, DateFormat dateFormat, ValueProvider excelFormatProvider) { @@ -838,8 +838,6 @@ public List> getColumns() { /** * Get columns in the positions specified by {@link GridExporter.setColumnPosition} - * - * @return */ public List> getColumnsOrdered() { return columns == null From ae690cfe8e34522176aab20c5677bf42e1e59371 Mon Sep 17 00:00:00 2001 From: Javier Godoy <11554739+javier-godoy@users.noreply.github.com> Date: Wed, 11 Sep 2024 14:04:52 -0300 Subject: [PATCH 3/4] test: interrupt threads after each test When a test fails, there's a chance a thread is still waiting on the semaphore. Also, make sure the tests run sequentially, as each test modifies static fields. --- .../test/ConcurrentExportTests.java | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java index 0e442e1..b61f88a 100644 --- a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java +++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java @@ -13,6 +13,8 @@ import com.vaadin.flow.server.VaadinSession; import java.io.IOException; import java.nio.channels.InterruptedByTimeoutException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Exchanger; @@ -21,11 +23,13 @@ import java.util.concurrent.locks.ReentrantLock; import org.hamcrest.Matcher; import org.hamcrest.Matchers; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @SuppressWarnings("serial") +@net.jcip.annotations.NotThreadSafe public class ConcurrentExportTests { private static final int TEST_TIMEOUT = 5000; @@ -63,6 +67,8 @@ protected void onFinish() { } private CyclicBarrier barrier; + private final static List threads = new ArrayList<>(); + private final static Lock lock = new ReentrantLock(); private void initializeCyclicBarrier(int parties) { barrier = new CyclicBarrier(parties); @@ -71,6 +77,17 @@ private void initializeCyclicBarrier(int parties) { @Before public void before() { barrier = null; + if (!lock.tryLock()) { + throw new IllegalStateException( + this.getClass().getSimpleName() + "test cannot be run in parallel"); + } + threads.forEach(Thread::interrupt); + threads.clear(); + } + + @After + public void after() { + lock.unlock(); } @SuppressWarnings("unchecked") @@ -104,7 +121,7 @@ private MockDownload newDownload() { Exchanger exchanger = new Exchanger<>(); - Thread thread = new Thread(() -> { + Thread thread = newThread(() -> { Throwable throwable = null; try { @@ -117,10 +134,12 @@ private MockDownload newDownload() { try { exchanger.exchange(throwable); } catch (InterruptedException e) { - e.printStackTrace(); + return; } }); + threads.add(thread); + return new MockDownload() { @Override public Throwable get() throws InterruptedException { From 4940b76a0c31f11fadb12d23eaea978c2f7097cf Mon Sep 17 00:00:00 2001 From: Javier Godoy <11554739+javier-godoy@users.noreply.github.com> Date: Wed, 11 Sep 2024 14:52:34 -0300 Subject: [PATCH 4/4] test: add unit test for setFailOnUiChange(true) --- .../test/ConcurrentExportTests.java | 81 ++++++++++++++++++- 1 file changed, 79 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java index b61f88a..c43807d 100644 --- a/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java +++ b/src/test/java/com/flowingcode/vaadin/addons/gridexporter/test/ConcurrentExportTests.java @@ -1,12 +1,15 @@ package com.flowingcode.vaadin.addons.gridexporter.test; import static org.apache.commons.io.output.NullOutputStream.NULL_OUTPUT_STREAM; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import com.flowingcode.vaadin.addons.gridexporter.ConfigurableConcurrentStreamResourceWriter; import com.flowingcode.vaadin.addons.gridexporter.GridExporter; +import com.vaadin.flow.component.UI; import com.vaadin.flow.server.StreamResourceWriter; import com.vaadin.flow.server.VaadinService; import com.vaadin.flow.server.VaadinServletService; @@ -15,6 +18,7 @@ import java.nio.channels.InterruptedByTimeoutException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Exchanger; @@ -76,6 +80,7 @@ private void initializeCyclicBarrier(int parties) { @Before public void before() { + GridExporter.setFailOnUiChange(false); barrier = null; if (!lock.tryLock()) { throw new IllegalStateException( @@ -100,6 +105,8 @@ private interface MockDownload { MockDownload withCost(float cost); + void detach(); + Throwable get() throws InterruptedException; MockDownload await() throws InterruptedException; @@ -107,10 +114,21 @@ private interface MockDownload { MockDownload start(); boolean wasInterruptedByTimeout(); + + boolean isFinished(); + + boolean isAccepted(); + } + + private Thread newThread(Runnable target) { + Thread thread = new Thread(target); + threads.add(thread); + return thread; } private MockDownload newDownload() { + CyclicBarrier barrier = this.barrier; CountDownLatch latch = new CountDownLatch(1); ConcurrentStreamResourceWriter writer = @@ -119,6 +137,7 @@ private MockDownload newDownload() { await(barrier); }); + writer.setUi(new UI()); Exchanger exchanger = new Exchanger<>(); Thread thread = newThread(() -> { @@ -138,8 +157,6 @@ private MockDownload newDownload() { } }); - threads.add(thread); - return new MockDownload() { @Override public Throwable get() throws InterruptedException { @@ -185,10 +202,25 @@ public MockDownload withCost(float cost) { return this; } + @Override + public void detach() { + writer.setUi(null); + } + @Override public boolean wasInterruptedByTimeout() { return writer.interruptedByTimeout; } + + @Override + public boolean isAccepted() { + return writer.accepted; + } + + @Override + public boolean isFinished() { + return writer.finished; + } }; } @@ -329,4 +361,49 @@ public void testInterruptedByTimeout3() assertThat(q3.get(), throwsInterruptedByTimeout()); } + + @Test(timeout = TEST_TIMEOUT) + public void testAcceptFinish() throws InterruptedException { + ConcurrentStreamResourceWriter.setLimit(2); + initializeCyclicBarrier(2); + var q1 = newDownload().await(); + assertTrue("Download has not been accepted", q1.isAccepted()); + assertFalse("Download has finished too early", q1.isFinished()); + var q2 = newDownload().await(); + assertTrue("Download has not been accepted", q2.isAccepted()); + assertThat(q1.get(), nullValue()); + assertThat(q2.get(), nullValue()); + assertTrue("Download has not finished", q1.isFinished()); + assertTrue("Download has not finished", q2.isFinished()); + } + + @Test(timeout = TEST_TIMEOUT) + public void testFailOnUiClose() throws InterruptedException, BrokenBarrierException { + GridExporter.setFailOnUiChange(true); + ConcurrentStreamResourceWriter.setLimit(1); + + initializeCyclicBarrier(2); + CyclicBarrier b1 = barrier; + var q1 = newDownload().await(); + assertTrue("Download has not been accepted", q1.isAccepted()); + assertFalse("Download has finished too early", q1.isFinished()); + + initializeCyclicBarrier(2); + var q2 = newDownload().withTimeout(TEST_TIMEOUT).start(); + assertTrue("Download has not been accepted", q1.isAccepted()); + assertFalse("Download has finished too early", q1.isFinished()); + + // detach while the semaphore is held by q1 + q2.detach(); + + // await on b1 so that q1 releases the semaphore + b1.await(); + assertThat(q1.get(), nullValue()); + + // with "FailOnUiChange" the other thread never arrives at the barrier + assertThat(barrier.getNumberWaiting(), equalTo(0)); + assertThat(q2.get(), instanceOf(IOException.class)); + + } + } \ No newline at end of file