diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriberConnector.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriberConnector.java index 0eae5721a30b..dd0b58f433e7 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriberConnector.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/impl/paged/PagedTopicSubscriberConnector.java @@ -266,8 +266,10 @@ public void closeSubscription(ConnectedSubscriber subscriber, boolean fDestro // We need to ensure that the subscription has really gone. // During a fail-over situation the subscriber may still exist in the configmap // so we need to repeat the closure notification - TopicSubscription subscription = getSubscription(subscriber, m_subscriptionId); - while (subscription != null && subscription.getSubscriberTimestamp(f_subscriberId) != Long.MAX_VALUE) + String sTopic = f_caches.getTopicName(); + PagedTopicService service = f_caches.getService(); + Set setSubscriber = service.getSubscribers(sTopic, f_subscriberGroupId); + while (setSubscriber.contains(f_subscriberId)) { try { @@ -279,7 +281,7 @@ public void closeSubscription(ConnectedSubscriber subscriber, boolean fDestro } Logger.fine("Repeating subscriber closed notification for topic subscriber: " + subscriber); PagedTopicSubscription.notifyClosed(f_caches.Subscriptions, f_subscriberGroupId, m_subscriptionId, f_subscriberId); - subscription = getSubscription(subscriber, m_subscriptionId); + setSubscriber = service.getSubscribers(sTopic, f_subscriberGroupId); } } @@ -1040,7 +1042,7 @@ public void entryDeleted(MapEvent evt) } // ----- inner class: Channel ------------------------------------------- - + /** * Channel is a data structure which represents the state of a channel as known * by this subscriber. @@ -1054,13 +1056,13 @@ public PagedTopicChannel(int nChannel) m_nChannel = nChannel; m_head = new PagedPosition(HEAD_UNKNOWN, -1); } - + @Override public int getId() { return m_nChannel; } - + @Override public PagedPosition getHead() { @@ -1070,9 +1072,9 @@ public PagedPosition getHead() } return (PagedPosition) m_head; } - + // ----- Object methods --------------------------------------------- - + public String toString() { return "Channel=" + getId() + @@ -1089,14 +1091,14 @@ public String toString() ", lastTimestamp=" + m_lastPolledTimestamp + ", contended=" + m_fContended; } - + // ----- constants -------------------------------------------------- - + /** * A page id value to indicate that the head page is unknown. */ public static final int HEAD_UNKNOWN = -1; - + // ----- data members ----------------------------------------------- /** @@ -1104,7 +1106,7 @@ public String toString() */ private final int m_nChannel; } - + // ----- data members --------------------------------------------------- /**