Skip to content

Commit

Permalink
Bug 37512818 - [37512368->14.1.2.0.2] Topics: Subscribers are not pro…
Browse files Browse the repository at this point in the history
…perly unsubscribed when the service senior dies

(merge 14.1.2.0 -> ce/14.1.2.0 113733)

[git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v14.1.2.0/": change = 113734]
  • Loading branch information
thegridman committed Jan 23, 2025
1 parent 492e72f commit 4de44a4
Showing 1 changed file with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,10 @@ public void closeSubscription(ConnectedSubscriber<V> subscriber, boolean fDestro
// We need to ensure that the subscription has really gone.
// During a fail-over situation the subscriber may still exist in the configmap
// so we need to repeat the closure notification
TopicSubscription subscription = getSubscription(subscriber, m_subscriptionId);
while (subscription != null && subscription.getSubscriberTimestamp(f_subscriberId) != Long.MAX_VALUE)
String sTopic = f_caches.getTopicName();
PagedTopicService service = f_caches.getService();
Set<SubscriberId> setSubscriber = service.getSubscribers(sTopic, f_subscriberGroupId);
while (setSubscriber.contains(f_subscriberId))
{
try
{
Expand All @@ -279,7 +281,7 @@ public void closeSubscription(ConnectedSubscriber<V> subscriber, boolean fDestro
}
Logger.fine("Repeating subscriber closed notification for topic subscriber: " + subscriber);
PagedTopicSubscription.notifyClosed(f_caches.Subscriptions, f_subscriberGroupId, m_subscriptionId, f_subscriberId);
subscription = getSubscription(subscriber, m_subscriptionId);
setSubscriber = service.getSubscribers(sTopic, f_subscriberGroupId);
}
}

Expand Down Expand Up @@ -1040,7 +1042,7 @@ public void entryDeleted(MapEvent evt)
}

// ----- inner class: Channel -------------------------------------------

/**
* Channel is a data structure which represents the state of a channel as known
* by this subscriber.
Expand All @@ -1054,13 +1056,13 @@ public PagedTopicChannel(int nChannel)
m_nChannel = nChannel;
m_head = new PagedPosition(HEAD_UNKNOWN, -1);
}

@Override
public int getId()
{
return m_nChannel;
}

@Override
public PagedPosition getHead()
{
Expand All @@ -1070,9 +1072,9 @@ public PagedPosition getHead()
}
return (PagedPosition) m_head;
}

// ----- Object methods ---------------------------------------------

public String toString()
{
return "Channel=" + getId() +
Expand All @@ -1089,22 +1091,22 @@ public String toString()
", lastTimestamp=" + m_lastPolledTimestamp +
", contended=" + m_fContended;
}

// ----- constants --------------------------------------------------

/**
* A page id value to indicate that the head page is unknown.
*/
public static final int HEAD_UNKNOWN = -1;

// ----- data members -----------------------------------------------

/**
* The channel identifier.
*/
private final int m_nChannel;
}

// ----- data members ---------------------------------------------------

/**
Expand Down

0 comments on commit 4de44a4

Please sign in to comment.