Skip to content

Commit

Permalink
Bug 37512822 - [37512368->25.03] Topics: Subscribers are not properly…
Browse files Browse the repository at this point in the history
… unsubscribed when the service senior dies

(merge main -> ce/main 113802)

[git-p4: depot-paths = "//dev/coherence-ce/main/": change = 113809]
  • Loading branch information
thegridman committed Jan 28, 2025
1 parent 52edbfd commit dfa21ef
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
Expand Down Expand Up @@ -29,7 +29,6 @@
import com.tangosol.internal.net.topic.impl.paged.model.Subscription;
import com.tangosol.internal.net.topic.impl.paged.model.Usage;

import com.tangosol.internal.util.Daemons;
import com.tangosol.io.ClassLoaderAware;
import com.tangosol.io.Serializer;

Expand All @@ -40,7 +39,6 @@
import com.tangosol.net.MemberListener;
import com.tangosol.net.NamedCache;

import com.tangosol.net.NamedMap;
import com.tangosol.net.PagedTopicService;
import com.tangosol.net.cache.TypeAssertion;

Expand Down Expand Up @@ -76,9 +74,12 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import java.util.function.BiFunction;
import java.util.function.Consumer;

Expand Down Expand Up @@ -1158,7 +1159,6 @@ private void destroyCache(NamedCache<?, ?> cache)
if (cache.isActive() && !cache.isDestroyed())
{
f_topicService.destroyCache(cache);
// cache.destroy();
}
}

Expand Down Expand Up @@ -1513,48 +1513,82 @@ public void entryDeleted(MapEvent evt)
{
// destroy event
m_state = State.Destroyed;
//removeListeners();
Daemons.commonPool().execute(() ->
Set<NamedTopicDeactivationListener> setListener = m_mapListener.keySet();
for (NamedTopicDeactivationListener listener : setListener)
{
Set<NamedTopicDeactivationListener> setListener = m_mapListener.keySet();
for (NamedTopicDeactivationListener listener : setListener)
try
{
try
{
listener.onDestroy();
}
catch (Throwable t)
{
Logger.err(t);
}
listener.onDestroy();
}
catch (Throwable t)
{
Logger.err(t);
}
});
}
}
}

@Override
public void memberLeft(MemberEvent evt)
{
DistributedCacheService service = (DistributedCacheService) evt.getService();
if (evt.isLocal())
{
Logger.fine("Detected local member disconnect in service " + PagedTopicCaches.this);
disconnected();
}
else
f_lock.lock();
try
{
service.getOwnershipSenior();
if (service.getOwnershipEnabledMembers().isEmpty())
DistributedCacheService service = (DistributedCacheService) evt.getService();
if (evt.isLocal())
{
Logger.fine("Detected loss of all storage members in service " + PagedTopicCaches.this);
Logger.fine("Detected local member disconnect in service " + PagedTopicCaches.this);
m_fDisconnected = true;
disconnected();
}
else
{
service.getOwnershipSenior();
if (service.getOwnershipEnabledMembers().isEmpty())
{
Logger.fine("Detected loss of all storage members in service " + PagedTopicCaches.this);
m_fDisconnected = true;
disconnected();
}
}
}
finally
{
f_lock.unlock();
}
}

@Override
public void memberJoined(MemberEvent evt)
{
f_lock.lock();
try
{
if (m_fDisconnected)
{
DistributedCacheService service = (DistributedCacheService) evt.getService();
if (!service.getOwnershipEnabledMembers().isEmpty())
{
m_fDisconnected = false;
}
Set<NamedTopicDeactivationListener> setListener = m_mapListener.keySet();
for (NamedTopicDeactivationListener listener : setListener)
{
try
{
listener.onConnect();
}
catch (Throwable t)
{
Logger.err(t);
}
}
}
}
finally
{
f_lock.unlock();
}
}

@Override
Expand All @@ -1573,6 +1607,12 @@ public int hashCode()
{
return System.identityHashCode(this);
}

// ----- data members -----------------------------------------------

private final Lock f_lock = new ReentrantLock();

private boolean m_fDisconnected = false;
}

// ----- inner interface: Listener --------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
Expand All @@ -18,6 +18,7 @@
import com.tangosol.net.topic.Publisher;
import com.tangosol.net.topic.Subscriber;
import com.tangosol.util.Filter;
import com.tangosol.util.SynchronousListener;
import com.tangosol.util.ValueExtractor;

import java.util.Objects;
Expand Down Expand Up @@ -224,8 +225,9 @@ protected void ensureActive()

// ----- inner class: TopicListener -------------------------------------

@SuppressWarnings("rawtypes")
private class TopicListener
implements PagedTopicCaches.Listener
implements PagedTopicCaches.Listener, SynchronousListener
{
@Override
public void onDestroy()
Expand Down
Loading

0 comments on commit dfa21ef

Please sign in to comment.