From 72ffa600125c2e5ca8969b52733873cc920c4074 Mon Sep 17 00:00:00 2001 From: Jonathan Knight Date: Mon, 13 Jan 2025 11:43:12 -0500 Subject: [PATCH] Enh 37387064 - [37381796->24.09.2] Topics: general refactoring and hardening (merge ce/main -> ce/24.09 113428) [git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v24.09/": change = 113429] --- .../SafePublisherConnector.java | 4 ---- .../coherence/common/base/Exceptions.java | 21 ++++++++++++++++- .../net/topic/NamedTopicSubscriber.java | 23 ++++++++----------- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/safeNamedTopic/SafePublisherConnector.java b/prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/safeNamedTopic/SafePublisherConnector.java index d855e5f18283f..1f6a8ece83b11 100644 --- a/prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/safeNamedTopic/SafePublisherConnector.java +++ b/prj/coherence-core-components/src/main/java/com/tangosol/coherence/component/util/safeNamedTopic/SafePublisherConnector.java @@ -308,10 +308,6 @@ protected PublisherChannelConnector ensureRunningChannelConnector() { PublisherChannelConnector connector = m_channelConnector; TopicService service = getTopicService(); - if (!service.isRunning()) - { - System.out.println(); - } if (!service.isRunning() || connector == null || !connector.isActive()) { f_lock.lock(); diff --git a/prj/coherence-core/src/main/java/com/oracle/coherence/common/base/Exceptions.java b/prj/coherence-core/src/main/java/com/oracle/coherence/common/base/Exceptions.java index 66b78503d2731..7420966f428cd 100644 --- a/prj/coherence-core/src/main/java/com/oracle/coherence/common/base/Exceptions.java +++ b/prj/coherence-core/src/main/java/com/oracle/coherence/common/base/Exceptions.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, 2023, Oracle and/or its affiliates. + * Copyright (c) 2020, 2025, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -99,6 +99,25 @@ else if (t.getClass().getName().equals("javax.ejb.EJBException")) } } + /** + * Return the root cause of an exception. + * + * @param t the exception to find the root cause in + * + * @return the root cause of the exception + */ + public static Throwable getRootCause(Throwable t) + { + Throwable rootCause = t; + Throwable cause = t.getCause(); + while (cause != null) + { + rootCause = cause; + cause = cause.getCause(); + } + return rootCause; + } + /** * Re-throw the specified exception if it is a fatal unrecoverable exception. * diff --git a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicSubscriber.java b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicSubscriber.java index 6eba26ea2ef7a..686b0745a5163 100644 --- a/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicSubscriber.java +++ b/prj/coherence-core/src/main/java/com/tangosol/internal/net/topic/NamedTopicSubscriber.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2000, 2024, Oracle and/or its affiliates. + * Copyright (c) 2000, 2025, Oracle and/or its affiliates. * * Licensed under the Universal Permissive License v 1.0 as shown at * https://oss.oracle.com/licenses/upl. @@ -103,7 +103,7 @@ * * @author Jonathan Knight 2024.11.26 */ -@SuppressWarnings({"rawtypes", "PatternVariableCanBeUsed", "SameParameterValue"}) +@SuppressWarnings({"rawtypes", "PatternVariableCanBeUsed", "SameParameterValue", "SimplifyStreamApiCallChains"}) public class NamedTopicSubscriber implements Subscriber, SubscriberConnector.ConnectedSubscriber, SubscriberStatistics, AutoCloseable { @@ -414,7 +414,7 @@ public CompletableFuture> commitAsync(Map commitInternal(e.getKey(), e.getValue(), mapResult)) .toArray(CompletableFuture[]::new); - return CompletableFuture.allOf(aFuture).handle((_void, err) -> mapResult); + return CompletableFuture.allOf(aFuture).thenApply(_void -> mapResult); } /** @@ -1379,16 +1379,13 @@ private CompletableFuture commitInternal(int nChannel, Position po { TopicChannel channel = m_aChannel[nChannel]; return f_connector.commit(this, nChannel, position) - .handle((result, err) -> + .thenApply(result -> { - if (err == null) + if (mapResult != null) { - if (mapResult != null) - { - mapResult.put(nChannel, result); - } - channel.committed(position); + mapResult.put(nChannel, result); } + channel.committed(position); return result; }); } @@ -1458,7 +1455,7 @@ private Map seekInternal(Map mapPosition) List listUnallocated = mapPosition.keySet().stream() .filter(c -> !isOwner(c)) - .toList(); + .collect(Collectors.toList()); if (!listUnallocated.isEmpty()) { @@ -1501,7 +1498,7 @@ private Map seekInternalToTimestamps(Map ma List listUnallocated = mapInstant.keySet().stream() .filter(c -> !isOwner(c)) - .toList(); + .collect(Collectors.toList()); if (!listUnallocated.isEmpty()) { @@ -1538,7 +1535,7 @@ private void ensureActiveAnOwnedChannels(int... anChannel) List listUnallocated = Arrays.stream(anChannel) .filter(c -> !isOwner(c)) .boxed() - .toList(); + .collect(Collectors.toList()); if (!listUnallocated.isEmpty()) {