diff --git a/prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/safeNamedTopic/SafePublisherConnector.java b/prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/safeNamedTopic/SafePublisherConnector.java index 69f15ff535fc7..d855e5f18283f 100644 --- a/prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/safeNamedTopic/SafePublisherConnector.java +++ b/prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/safeNamedTopic/SafePublisherConnector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2024, Oracle and/or its affiliates. + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -7,6 +7,7 @@ package com.tangosol.coherence.component.util.safeNamedTopic; +import com.oracle.coherence.common.base.Logger; import com.tangosol.coherence.component.util.SafeNamedTopic; import com.tangosol.internal.net.topic.NamedTopicPublisher; @@ -25,7 +26,7 @@ import java.util.Arrays; import java.util.List; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -160,12 +161,24 @@ public void removeListener(NamedTopicPublisher.PublisherListener listener) protected PublisherConnector ensureRunningConnector() { PublisherConnector connector = m_connector; - if (connector == null || !connector.isActive()) + TopicService service = getTopicService(); + if (!service.isRunning() || connector == null || !connector.isActive()) { f_lock.lock(); try { connector = m_connector; + + if (connector != null) + { + service = getTopicService(); + if (!service.isRunning()) + { + Logger.info("Restarting Publisher connector, topic=" + getTopicName()); + connector = null; + } + } + if (connector == null || !connector.isActive()) { if (isReleased() || isDestroyed()) @@ -260,7 +273,7 @@ public void ensureConnected() } @Override - public CompletableFuture initialize() + public CompletionStage initialize() { return ensureRunningChannelConnector().initialize(); } @@ -272,7 +285,7 @@ public void offer(Object oCookie, List listBinary, int nNotifyPostFull, } @Override - public CompletableFuture prepareOfferRetry(Object oCookie) + public CompletionStage prepareOfferRetry(Object oCookie) { return ensureRunningChannelConnector().prepareOfferRetry(oCookie); } @@ -283,17 +296,38 @@ public TopicDependencies getTopicDependencies() return SafePublisherConnector.this.getTopicDependencies(); } + @Override + public TopicService getTopicService() + { + return f_safeTopic.getTopicService(); + } + // ----- helper methods --------------------------------------------- protected PublisherChannelConnector ensureRunningChannelConnector() { PublisherChannelConnector connector = m_channelConnector; - if (connector == null || !connector.isActive()) + TopicService service = getTopicService(); + if (!service.isRunning()) + { + System.out.println(); + } + if (!service.isRunning() || connector == null || !connector.isActive()) { f_lock.lock(); try { connector = m_channelConnector; + if (connector != null) + { + service = getTopicService(); + if (!service.isRunning()) + { + Logger.info("Restarting Publisher channel connector, topic=" + getTopicName() + ", channel=" + f_nChannel); + connector = null; + } + } + if (connector == null || !connector.isActive()) { connector = m_channelConnector = ensureRunningConnector().createChannelConnector(f_nChannel); diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/ConverterPublisherConnector.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/ConverterPublisherConnector.java index 696958c12094b..876fd9bb3e386 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/ConverterPublisherConnector.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/ConverterPublisherConnector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2024, Oracle and/or its affiliates. + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -18,7 +18,7 @@ import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; @@ -195,7 +195,7 @@ public void ensureConnected() } @Override - public CompletableFuture initialize() + public CompletionStage initialize() { return f_channelConnector.initialize(); } @@ -209,7 +209,7 @@ public void offer(Object oCookie, List listBinary, int nNotifyPostFull, } @Override - public CompletableFuture prepareOfferRetry(Object oCookie) + public CompletionStage prepareOfferRetry(Object oCookie) { return f_channelConnector.prepareOfferRetry(oCookie); } @@ -220,6 +220,12 @@ public TopicDependencies getTopicDependencies() return f_channelConnector.getTopicDependencies(); } + @Override + public TopicService getTopicService() + { + return f_channelConnector.getTopicService(); + } + // ----- data members ----------------------------------------------- /** diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicPublisherChannel.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicPublisherChannel.java index 03680bb94f303..9051ed21bb626 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicPublisherChannel.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicPublisherChannel.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2024, Oracle and/or its affiliates. + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -36,6 +36,8 @@ import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -115,52 +117,67 @@ public CompletableFuture publish(Binary binValue) */ private void ensureConnected() { - TopicDependencies dependencies = m_connector.getTopicDependencies(); - long retry = dependencies.getReconnectRetryMillis(); - long now = System.currentTimeMillis(); - long timeout = now + dependencies.getReconnectTimeoutMillis(); - Throwable error = null; + if (m_state != State.Active || m_connector == null) + { + // we're closed + return; + } - while (now < timeout) + f_lock.lock(); + try { - if (m_state != State.Active || m_connector == null) - { - // we're closed - return; - } + TopicDependencies dependencies = m_connector.getTopicDependencies(); + long retry = dependencies.getReconnectRetryMillis(); + long now = System.currentTimeMillis(); + long timeout = now + dependencies.getReconnectTimeoutMillis(); + Throwable error = null; - try + while (now < timeout) { - m_connector.ensureConnected(); - break; - } - catch (Throwable thrown) - { - error = thrown; - if (error instanceof TopicException) + if (m_state != State.Active || m_connector == null) { - break; + // we're closed + return; } - } - now = System.currentTimeMillis(); - if (now < timeout) - { - Logger.finer("Failed to reconnect publisher, will retry in " - + retry + " millis " + this + " due to " + error.getMessage()); + try { - Thread.sleep(retry); + m_connector.ensureConnected(); + error = null; + break; } - catch (InterruptedException e) + catch (Throwable thrown) { - // ignored + error = thrown; + if (error instanceof TopicException) + { + break; + } + } + now = System.currentTimeMillis(); + if (now < timeout) + { + Logger.finer("Failed to reconnect publisher, will retry in " + + retry + " millis " + this + " due to " + error.getMessage()); + try + { + Thread.sleep(retry); + } + catch (InterruptedException e) + { + // ignored + } } } - } - if (error != null) + if (error != null) + { + throw Exceptions.ensureRuntimeException(error); + } + } + finally { - throw Exceptions.ensureRuntimeException(error); + f_lock.unlock(); } } @@ -598,10 +615,12 @@ public void run() // ----- data members --------------------------------------------------- + private final Lock f_lock = new ReentrantLock(); + /** * The publisher connector to use to connect to back end resources. */ - private PublisherChannelConnector m_connector; + private PublisherChannelConnector m_connector; /** * The identifier of the parent publisher. diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/PublisherChannelConnector.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/PublisherChannelConnector.java index f2dfd6d142238..ded7ef3095b92 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/PublisherChannelConnector.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/PublisherChannelConnector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2024, Oracle and/or its affiliates. + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -7,13 +7,14 @@ package com.tangosol.internal.net.topic; +import com.tangosol.net.TopicService; import com.tangosol.net.topic.TopicDependencies; import com.tangosol.util.Binary; import java.util.List; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.BiConsumer; @@ -64,7 +65,7 @@ public interface PublisherChannelConnector * @return an opaque cookie to pass to the {@link #offer(Object, List, int, BiConsumer)} method. */ @SuppressWarnings("TypeParameterExplicitlyExtendsObject") - CompletableFuture initialize(); + CompletionStage initialize(); /** * Publish a value to the underlying topic. @@ -79,12 +80,11 @@ public interface PublisherChannelConnector /** * Perform any set-up required to retry an offer. * - * @param oCookie the cookie used by the connector. - * - * @return a {@link CompletableFuture} that completes when the set-up is completed + * @param oCookie the cookie used by the connector. + * @return a {@link CompletionStage} that completes when the set-up is completed */ @SuppressWarnings("TypeParameterExplicitlyExtendsObject") - CompletableFuture prepareOfferRetry(Object oCookie); + CompletionStage prepareOfferRetry(Object oCookie); /** * Return the dependencies for the topic. @@ -92,4 +92,11 @@ public interface PublisherChannelConnector * @return the dependencies for the topic */ TopicDependencies getTopicDependencies(); + + /** + * Return the underlying topic service. + * + * @return the underlying topic service + */ + TopicService getTopicService(); } diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicChannelPublisherConnector.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicChannelPublisherConnector.java index 6a6f632675675..f40681ab972c9 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicChannelPublisherConnector.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicChannelPublisherConnector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2024, Oracle and/or its affiliates. + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -26,6 +26,7 @@ import com.tangosol.net.PagedTopicService; +import com.tangosol.net.TopicService; import com.tangosol.net.partition.KeyPartitioningStrategy; import com.tangosol.net.topic.Publisher; @@ -40,6 +41,7 @@ import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.BiConsumer; /** @@ -50,7 +52,6 @@ * * @author Jonathan Knight 2024.11.26 */ -@SuppressWarnings("TypeParameterExplicitlyExtendsObject") public class PagedTopicChannelPublisherConnector implements PublisherChannelConnector { @@ -175,7 +176,7 @@ public void ensureConnected() } @Override - public CompletableFuture initialize() + public CompletionStage initialize() { return ensurePageId(); } @@ -204,7 +205,7 @@ public void offer(Object oCookie, List listBinary, int nNotifyPostFull, } @Override - public CompletableFuture prepareOfferRetry(Object oCookie) + public CompletionStage prepareOfferRetry(Object oCookie) { Long lPageId = (Long) oCookie; return moveToNextPage(lPageId); @@ -216,6 +217,12 @@ public TopicDependencies getTopicDependencies() return m_caches.getDependencies(); } + @Override + public TopicService getTopicService() + { + return m_caches.getService(); + } + /** * Returns {@code true} if this publisher is active, otherwise returns {@code false}. *