Skip to content

Commit

Permalink
Enh 37387065 - [37381796->25.03] Topics: general refactoring and hard…
Browse files Browse the repository at this point in the history
…ening

(merge main -> ce/main 113310)

[git-p4: depot-paths = "//dev/coherence-ce/main/": change = 113314]
  • Loading branch information
thegridman committed Jan 9, 2025
1 parent 1761c02 commit cef7be6
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 55 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
/*
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/

package com.tangosol.coherence.component.util.safeNamedTopic;

import com.oracle.coherence.common.base.Logger;
import com.tangosol.coherence.component.util.SafeNamedTopic;

import com.tangosol.internal.net.topic.NamedTopicPublisher;
Expand All @@ -25,7 +26,7 @@
import java.util.Arrays;
import java.util.List;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -160,12 +161,24 @@ public void removeListener(NamedTopicPublisher.PublisherListener listener)
protected PublisherConnector<V> ensureRunningConnector()
{
PublisherConnector<V> connector = m_connector;
if (connector == null || !connector.isActive())
TopicService service = getTopicService();
if (!service.isRunning() || connector == null || !connector.isActive())
{
f_lock.lock();
try
{
connector = m_connector;

if (connector != null)
{
service = getTopicService();
if (!service.isRunning())
{
Logger.info("Restarting Publisher connector, topic=" + getTopicName());
connector = null;
}
}

if (connector == null || !connector.isActive())
{
if (isReleased() || isDestroyed())
Expand Down Expand Up @@ -260,7 +273,7 @@ public void ensureConnected()
}

@Override
public CompletableFuture<?> initialize()
public CompletionStage<?> initialize()
{
return ensureRunningChannelConnector().initialize();
}
Expand All @@ -272,7 +285,7 @@ public void offer(Object oCookie, List<Binary> listBinary, int nNotifyPostFull,
}

@Override
public CompletableFuture<?> prepareOfferRetry(Object oCookie)
public CompletionStage<?> prepareOfferRetry(Object oCookie)
{
return ensureRunningChannelConnector().prepareOfferRetry(oCookie);
}
Expand All @@ -283,17 +296,38 @@ public TopicDependencies getTopicDependencies()
return SafePublisherConnector.this.getTopicDependencies();
}

@Override
public TopicService getTopicService()
{
return f_safeTopic.getTopicService();
}

// ----- helper methods ---------------------------------------------

protected PublisherChannelConnector<V> ensureRunningChannelConnector()
{
PublisherChannelConnector<V> connector = m_channelConnector;
if (connector == null || !connector.isActive())
TopicService service = getTopicService();
if (!service.isRunning())
{
System.out.println();
}
if (!service.isRunning() || connector == null || !connector.isActive())
{
f_lock.lock();
try
{
connector = m_channelConnector;
if (connector != null)
{
service = getTopicService();
if (!service.isRunning())
{
Logger.info("Restarting Publisher channel connector, topic=" + getTopicName() + ", channel=" + f_nChannel);
connector = null;
}
}

if (connector == null || !connector.isActive())
{
connector = m_channelConnector = ensureRunningConnector().createChannelConnector(f_nChannel);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
Expand All @@ -18,7 +18,7 @@
import java.util.List;
import java.util.Map;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;

import java.util.function.BiConsumer;
Expand Down Expand Up @@ -195,7 +195,7 @@ public void ensureConnected()
}

@Override
public CompletableFuture<?> initialize()
public CompletionStage<?> initialize()
{
return f_channelConnector.initialize();
}
Expand All @@ -209,7 +209,7 @@ public void offer(Object oCookie, List<Binary> listBinary, int nNotifyPostFull,
}

@Override
public CompletableFuture<?> prepareOfferRetry(Object oCookie)
public CompletionStage<?> prepareOfferRetry(Object oCookie)
{
return f_channelConnector.prepareOfferRetry(oCookie);
}
Expand All @@ -220,6 +220,12 @@ public TopicDependencies getTopicDependencies()
return f_channelConnector.getTopicDependencies();
}

@Override
public TopicService getTopicService()
{
return f_channelConnector.getTopicService();
}

// ----- data members -----------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
Expand Down Expand Up @@ -36,6 +36,8 @@

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -115,52 +117,67 @@ public CompletableFuture<Publisher.Status> publish(Binary binValue)
*/
private void ensureConnected()
{
TopicDependencies dependencies = m_connector.getTopicDependencies();
long retry = dependencies.getReconnectRetryMillis();
long now = System.currentTimeMillis();
long timeout = now + dependencies.getReconnectTimeoutMillis();
Throwable error = null;
if (m_state != State.Active || m_connector == null)
{
// we're closed
return;
}

while (now < timeout)
f_lock.lock();
try
{
if (m_state != State.Active || m_connector == null)
{
// we're closed
return;
}
TopicDependencies dependencies = m_connector.getTopicDependencies();
long retry = dependencies.getReconnectRetryMillis();
long now = System.currentTimeMillis();
long timeout = now + dependencies.getReconnectTimeoutMillis();
Throwable error = null;

try
while (now < timeout)
{
m_connector.ensureConnected();
break;
}
catch (Throwable thrown)
{
error = thrown;
if (error instanceof TopicException)
if (m_state != State.Active || m_connector == null)
{
break;
// we're closed
return;
}
}
now = System.currentTimeMillis();
if (now < timeout)
{
Logger.finer("Failed to reconnect publisher, will retry in "
+ retry + " millis " + this + " due to " + error.getMessage());

try
{
Thread.sleep(retry);
m_connector.ensureConnected();
error = null;
break;
}
catch (InterruptedException e)
catch (Throwable thrown)
{
// ignored
error = thrown;
if (error instanceof TopicException)
{
break;
}
}
now = System.currentTimeMillis();
if (now < timeout)
{
Logger.finer("Failed to reconnect publisher, will retry in "
+ retry + " millis " + this + " due to " + error.getMessage());
try
{
Thread.sleep(retry);
}
catch (InterruptedException e)
{
// ignored
}
}
}
}

if (error != null)
if (error != null)
{
throw Exceptions.ensureRuntimeException(error);
}
}
finally
{
throw Exceptions.ensureRuntimeException(error);
f_lock.unlock();
}
}

Expand Down Expand Up @@ -598,10 +615,12 @@ public void run()

// ----- data members ---------------------------------------------------

private final Lock f_lock = new ReentrantLock();

/**
* The publisher connector to use to connect to back end resources.
*/
private PublisherChannelConnector<V> m_connector;
private PublisherChannelConnector<V> m_connector;

/**
* The identifier of the parent publisher.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
/*
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/

package com.tangosol.internal.net.topic;

import com.tangosol.net.TopicService;
import com.tangosol.net.topic.TopicDependencies;

import com.tangosol.util.Binary;

import java.util.List;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import java.util.function.BiConsumer;

Expand Down Expand Up @@ -64,7 +65,7 @@ public interface PublisherChannelConnector<V>
* @return an opaque cookie to pass to the {@link #offer(Object, List, int, BiConsumer)} method.
*/
@SuppressWarnings("TypeParameterExplicitlyExtendsObject")
CompletableFuture<? extends Object> initialize();
CompletionStage<? extends Object> initialize();

/**
* Publish a value to the underlying topic.
Expand All @@ -79,17 +80,23 @@ public interface PublisherChannelConnector<V>
/**
* Perform any set-up required to retry an offer.
*
* @param oCookie the cookie used by the connector.
*
* @return a {@link CompletableFuture} that completes when the set-up is completed
* @param oCookie the cookie used by the connector.
* @return a {@link CompletionStage} that completes when the set-up is completed
*/
@SuppressWarnings("TypeParameterExplicitlyExtendsObject")
CompletableFuture<? extends Object> prepareOfferRetry(Object oCookie);
CompletionStage<? extends Object> prepareOfferRetry(Object oCookie);

/**
* Return the dependencies for the topic.
*
* @return the dependencies for the topic
*/
TopicDependencies getTopicDependencies();

/**
* Return the underlying topic service.
*
* @return the underlying topic service
*/
TopicService getTopicService();
}
Loading

0 comments on commit cef7be6

Please sign in to comment.