From dfa21ef4f38c1766bc009f048d893c8f0a297df9 Mon Sep 17 00:00:00 2001 From: Jonathan Knight Date: Tue, 28 Jan 2025 10:48:40 -0500 Subject: [PATCH] Bug 37512822 - [37512368->25.03] Topics: Subscribers are not properly unsubscribed when the service senior dies (merge main -> ce/main 113802) [git-p4: depot-paths = "//dev/coherence-ce/main/": change = 113809] --- .../topic/impl/paged/PagedTopicCaches.java | 96 +++++--- .../topic/impl/paged/PagedTopicConnector.java | 6 +- .../paged/PagedTopicSubscriberConnector.java | 209 ++++++++++++++---- 3 files changed, 232 insertions(+), 79 deletions(-) diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicCaches.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicCaches.java index 82e38fbd2228e..8db7306a023c7 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicCaches.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicCaches.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2024, Oracle and/or its affiliates. + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -29,7 +29,6 @@ import com.tangosol.internal.net.topic.impl.paged.model.Subscription; import com.tangosol.internal.net.topic.impl.paged.model.Usage; -import com.tangosol.internal.util.Daemons; import com.tangosol.io.ClassLoaderAware; import com.tangosol.io.Serializer; @@ -40,7 +39,6 @@ import com.tangosol.net.MemberListener; import com.tangosol.net.NamedCache; -import com.tangosol.net.NamedMap; import com.tangosol.net.PagedTopicService; import com.tangosol.net.cache.TypeAssertion; @@ -76,9 +74,12 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; - import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; + +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import java.util.function.BiFunction; import java.util.function.Consumer; @@ -1158,7 +1159,6 @@ private void destroyCache(NamedCache cache) if (cache.isActive() && !cache.isDestroyed()) { f_topicService.destroyCache(cache); -// cache.destroy(); } } @@ -1513,48 +1513,82 @@ public void entryDeleted(MapEvent evt) { // destroy event m_state = State.Destroyed; - //removeListeners(); - Daemons.commonPool().execute(() -> + Set setListener = m_mapListener.keySet(); + for (NamedTopicDeactivationListener listener : setListener) { - Set setListener = m_mapListener.keySet(); - for (NamedTopicDeactivationListener listener : setListener) + try { - try - { - listener.onDestroy(); - } - catch (Throwable t) - { - Logger.err(t); - } + listener.onDestroy(); + } + catch (Throwable t) + { + Logger.err(t); } - }); + } } } @Override public void memberLeft(MemberEvent evt) { - DistributedCacheService service = (DistributedCacheService) evt.getService(); - if (evt.isLocal()) - { - Logger.fine("Detected local member disconnect in service " + PagedTopicCaches.this); - disconnected(); - } - else + f_lock.lock(); + try { - service.getOwnershipSenior(); - if (service.getOwnershipEnabledMembers().isEmpty()) + DistributedCacheService service = (DistributedCacheService) evt.getService(); + if (evt.isLocal()) { - Logger.fine("Detected loss of all storage members in service " + PagedTopicCaches.this); + Logger.fine("Detected local member disconnect in service " + PagedTopicCaches.this); + m_fDisconnected = true; disconnected(); } + else + { + service.getOwnershipSenior(); + if (service.getOwnershipEnabledMembers().isEmpty()) + { + Logger.fine("Detected loss of all storage members in service " + PagedTopicCaches.this); + m_fDisconnected = true; + disconnected(); + } + } + } + finally + { + f_lock.unlock(); } } @Override public void memberJoined(MemberEvent evt) { + f_lock.lock(); + try + { + if (m_fDisconnected) + { + DistributedCacheService service = (DistributedCacheService) evt.getService(); + if (!service.getOwnershipEnabledMembers().isEmpty()) + { + m_fDisconnected = false; + } + Set setListener = m_mapListener.keySet(); + for (NamedTopicDeactivationListener listener : setListener) + { + try + { + listener.onConnect(); + } + catch (Throwable t) + { + Logger.err(t); + } + } + } + } + finally + { + f_lock.unlock(); + } } @Override @@ -1573,6 +1607,12 @@ public int hashCode() { return System.identityHashCode(this); } + + // ----- data members ----------------------------------------------- + + private final Lock f_lock = new ReentrantLock(); + + private boolean m_fDisconnected = false; } // ----- inner interface: Listener -------------------------------------- diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicConnector.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicConnector.java index f7491c9d86e84..05afa1aa4c047 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicConnector.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicConnector.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2024, Oracle and/or its affiliates. + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -18,6 +18,7 @@ import com.tangosol.net.topic.Publisher; import com.tangosol.net.topic.Subscriber; import com.tangosol.util.Filter; +import com.tangosol.util.SynchronousListener; import com.tangosol.util.ValueExtractor; import java.util.Objects; @@ -224,8 +225,9 @@ protected void ensureActive() // ----- inner class: TopicListener ------------------------------------- + @SuppressWarnings("rawtypes") private class TopicListener - implements PagedTopicCaches.Listener + implements PagedTopicCaches.Listener, SynchronousListener { @Override public void onDestroy() diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriberConnector.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriberConnector.java index dd0b58f433e78..bf2c1294f97ce 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriberConnector.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriberConnector.java @@ -237,51 +237,90 @@ public void close() @Override public void closeSubscription(ConnectedSubscriber subscriber, boolean fDestroyed) { - if (!fDestroyed) + State state = m_state; + if (m_state != State.Closing && state != State.Closed) { - // caches have not been destroyed, so we're just closing this subscriber - unregisterDeactivationListener(); - unregisterChannelAllocationListener(); - unregisterNotificationListener(); - PagedTopicSubscription.notifyClosed(f_caches.Subscriptions, f_subscriberGroupId, m_subscriptionId, f_subscriberId); - removeSubscriberEntry(subscriber.getKey()); - } - else - { - PagedTopicSubscription.notifyClosed(f_caches.Subscriptions, f_subscriberGroupId, m_subscriptionId, f_subscriberId); + f_lockState.lock(); + try + { + if (m_state != State.Closed) + { + m_state = State.Closing; + } + } + finally + { + f_lockState.unlock(); + } } - if (!fDestroyed && f_subscriberGroupId.isAnonymous()) + if (m_state == State.Closing) { - // this subscriber is anonymous and thus non-durable and must be destroyed upon close - // Note: if close isn't the cluster will eventually destroy this subscriber once it - // identifies the associated member has left the cluster. - // If an application creates a lot of subscribers and does not close them when finished - // then this will cause heap consumption to rise. - // There used to be a To-Do comment here about cleaning up in a finalizer, but as - // finalizers in the JVM are not reliable that is probably not such a good idea. - destroy(f_caches, f_subscriberGroupId, m_subscriptionId); - } + Logger.finest("Closing subscription for topic subscriber: fDestroyed=" + fDestroyed + " subscriber=" + subscriber); + if (!fDestroyed) + { + // caches have not been destroyed, so we're just closing this subscriber + PagedTopicService service = f_caches.getService(); + boolean fActive = f_caches.isActive() + && service.isRunning() + && !service.getOwnershipEnabledMembers().isEmpty(); + if (fActive) + { + unregisterDeactivationListener(); + unregisterChannelAllocationListener(); + unregisterNotificationListener(); + } + PagedTopicSubscription.notifyClosed(f_caches.Subscriptions, f_subscriberGroupId, m_subscriptionId, f_subscriberId); + if (fActive) + { + removeSubscriberEntry(subscriber.getKey()); + } + } + else + { + PagedTopicSubscription.notifyClosed(f_caches.Subscriptions, f_subscriberGroupId, m_subscriptionId, f_subscriberId); + } - // We need to ensure that the subscription has really gone. - // During a fail-over situation the subscriber may still exist in the configmap - // so we need to repeat the closure notification - String sTopic = f_caches.getTopicName(); - PagedTopicService service = f_caches.getService(); - Set setSubscriber = service.getSubscribers(sTopic, f_subscriberGroupId); - while (setSubscriber.contains(f_subscriberId)) - { - try + if (!fDestroyed && f_subscriberGroupId.isAnonymous()) { - Blocking.sleep(100); + // this subscriber is anonymous and thus non-durable and must be destroyed upon close + // Note: if close isn't the cluster will eventually destroy this subscriber once it + // identifies the associated member has left the cluster. + // If an application creates a lot of subscribers and does not close them when finished + // then this will cause heap consumption to rise. + // There used to be a To-Do comment here about cleaning up in a finalizer, but as + // finalizers in the JVM are not reliable that is probably not such a good idea. + try + { + destroy(f_caches, f_subscriberGroupId, m_subscriptionId); + } + catch (Exception e) + { + Logger.err(e); + } } - catch (InterruptedException e) + + // We need to ensure that the subscription has really gone. + // During a fail-over situation the subscriber may still exist in the configmap + // so we need to repeat the closure notification + String sTopic = f_caches.getTopicName(); + PagedTopicService service = f_caches.getService(); + Set setSubscriber = service.getSubscribers(sTopic, f_subscriberGroupId); + while (setSubscriber.contains(f_subscriberId)) { - break; + Logger.fine("Repeating subscriber closed notification for topic subscriber: " + subscriber); + try + { + Blocking.sleep(100); + } + catch (InterruptedException e) + { + break; + } + PagedTopicSubscription.notifyClosed(f_caches.Subscriptions, f_subscriberGroupId, m_subscriptionId, f_subscriberId); + setSubscriber = service.getSubscribers(sTopic, f_subscriberGroupId); } - Logger.fine("Repeating subscriber closed notification for topic subscriber: " + subscriber); - PagedTopicSubscription.notifyClosed(f_caches.Subscriptions, f_subscriberGroupId, m_subscriptionId, f_subscriberId); - setSubscriber = service.getSubscribers(sTopic, f_subscriberGroupId); + m_state = State.Closed; } } @@ -328,6 +367,10 @@ public boolean isCommitted(SubscriberGroupId groupId, int nChannel, Position pos public void ensureConnected() { f_caches.ensureConnected(); + if (f_caches.isActive() && (m_state == State.Initial || m_state == State.Disconnected)) + { + m_state = State.Connected; + } } @Override @@ -912,16 +955,34 @@ protected void registerNotificationListener(ConnectedSubscriber subscriber) @SuppressWarnings({"unchecked", "rawtypes"}) protected void unregisterNotificationListener() { - // un-register the subscriber listener in each partition - SimpleMapListener listener = m_listenerNotification; - Filter filter = m_filterNotification; - if (f_caches.Notifications.isActive() && listener != null) + try + { + // un-register the subscriber listener in each partition + SimpleMapListener listener = m_listenerNotification; + Filter filter = m_filterNotification; + if (f_caches.Notifications.isActive() && listener != null) + { + f_caches.Notifications.removeMapListener(listener, filter); + } + } + catch (Exception e) { - f_caches.Notifications.removeMapListener(listener, filter); + // intentionally empty } } - // ----- inner class: SubscriptionListener -------------------------- + // ----- inner enum: State ---------------------------------------------- + + public enum State + { + Initial, + Connected, + Disconnected, + Closing, + Closed, + } + + // ----- inner class: SubscriptionListener ------------------------------ protected class SubscriptionListener implements PagedTopicSubscription.Listener @@ -998,27 +1059,75 @@ protected class DeactivationListener @Override public void onConnect() { + f_lockState.lock(); + try + { + if (m_state == State.Disconnected || m_state == State.Initial) + { + m_state = State.Connected; + } + } + finally + { + f_lockState.unlock(); + } } @Override public void onDisconnect() { - SubscriberEvent event = new SubscriberEvent(PagedTopicSubscriberConnector.this, SubscriberEvent.Type.Disconnected); - event.dispatch(f_listeners); + f_lockState.lock(); + try + { + if (m_state == State.Connected || m_state == State.Initial) + { + m_state = State.Disconnected; + SubscriberEvent event = new SubscriberEvent(PagedTopicSubscriberConnector.this, SubscriberEvent.Type.Disconnected); + event.dispatch(f_listeners); + } + } + finally + { + f_lockState.unlock(); + } } @Override public void onDestroy() { - SubscriberEvent event = new SubscriberEvent(PagedTopicSubscriberConnector.this, SubscriberEvent.Type.Destroyed); - event.dispatch(f_listeners); + f_lockState.lock(); + try + { + if (m_state != State.Closed && m_state != State.Closing) + { + m_state = State.Closed; + SubscriberEvent event = new SubscriberEvent(PagedTopicSubscriberConnector.this, SubscriberEvent.Type.Destroyed); + event.dispatch(f_listeners); + } + } + finally + { + f_lockState.unlock(); + } } @Override public void onRelease() { - SubscriberEvent event = new SubscriberEvent(PagedTopicSubscriberConnector.this, SubscriberEvent.Type.Released); - event.dispatch(f_listeners); + f_lockState.lock(); + try + { + if (m_state != State.Closed && m_state != State.Closing) + { + m_state = State.Closed; + SubscriberEvent event = new SubscriberEvent(PagedTopicSubscriberConnector.this, SubscriberEvent.Type.Released); + event.dispatch(f_listeners); + } + } + finally + { + f_lockState.unlock(); + } } } @@ -1106,9 +1215,11 @@ public String toString() */ private final int m_nChannel; } - + // ----- data members --------------------------------------------------- + private State m_state = State.Initial; + /** * The {@link PagedTopicCaches} to use to invoke cache operations. */