Skip to content

Commit

Permalink
Enh 37387065 - [37381796->25.03] Topics: general refactoring and hard…
Browse files Browse the repository at this point in the history
…ening

(merge main -> ce/main 113425)

[git-p4: depot-paths = "//dev/coherence-ce/main/": change = 113428]
  • Loading branch information
thegridman committed Jan 13, 2025
1 parent 524816f commit 902e769
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,6 @@ protected PublisherChannelConnector<V> ensureRunningChannelConnector()
{
PublisherChannelConnector<V> connector = m_channelConnector;
TopicService service = getTopicService();
if (!service.isRunning())
{
System.out.println();
}
if (!service.isRunning() || connector == null || !connector.isActive())
{
f_lock.lock();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2023, Oracle and/or its affiliates.
* Copyright (c) 2020, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
Expand Down Expand Up @@ -99,6 +99,25 @@ else if (t.getClass().getName().equals("javax.ejb.EJBException"))
}
}

/**
* Return the root cause of an exception.
*
* @param t the exception to find the root cause in
*
* @return the root cause of the exception
*/
public static Throwable getRootCause(Throwable t)
{
Throwable rootCause = t;
Throwable cause = t.getCause();
while (cause != null)
{
rootCause = cause;
cause = cause.getCause();
}
return rootCause;
}

/**
* Re-throw the specified exception if it is a fatal unrecoverable exception.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2000, 2024, Oracle and/or its affiliates.
* Copyright (c) 2000, 2025, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
Expand Down Expand Up @@ -103,7 +103,7 @@
*
* @author Jonathan Knight 2024.11.26
*/
@SuppressWarnings({"rawtypes", "PatternVariableCanBeUsed", "SameParameterValue"})
@SuppressWarnings({"rawtypes", "PatternVariableCanBeUsed", "SameParameterValue", "SimplifyStreamApiCallChains"})
public class NamedTopicSubscriber<V>
implements Subscriber<V>, SubscriberConnector.ConnectedSubscriber<V>, SubscriberStatistics, AutoCloseable
{
Expand Down Expand Up @@ -414,7 +414,7 @@ public CompletableFuture<Map<Integer, CommitResult>> commitAsync(Map<Integer, Po
.map(e -> commitInternal(e.getKey(), e.getValue(), mapResult))
.toArray(CompletableFuture[]::new);

return CompletableFuture.allOf(aFuture).handle((_void, err) -> mapResult);
return CompletableFuture.allOf(aFuture).thenApply(_void -> mapResult);
}

/**
Expand Down Expand Up @@ -1379,16 +1379,13 @@ private CompletableFuture<CommitResult> commitInternal(int nChannel, Position po
{
TopicChannel channel = m_aChannel[nChannel];
return f_connector.commit(this, nChannel, position)
.handle((result, err) ->
.thenApply(result ->
{
if (err == null)
if (mapResult != null)
{
if (mapResult != null)
{
mapResult.put(nChannel, result);
}
channel.committed(position);
mapResult.put(nChannel, result);
}
channel.committed(position);
return result;
});
}
Expand Down Expand Up @@ -1458,7 +1455,7 @@ private Map<Integer, Position> seekInternal(Map<Integer, Position> mapPosition)

List<Integer> listUnallocated = mapPosition.keySet().stream()
.filter(c -> !isOwner(c))
.toList();
.collect(Collectors.toList());

if (!listUnallocated.isEmpty())
{
Expand Down Expand Up @@ -1501,7 +1498,7 @@ private Map<Integer, Position> seekInternalToTimestamps(Map<Integer, Instant> ma

List<Integer> listUnallocated = mapInstant.keySet().stream()
.filter(c -> !isOwner(c))
.toList();
.collect(Collectors.toList());

if (!listUnallocated.isEmpty())
{
Expand Down Expand Up @@ -1538,7 +1535,7 @@ private void ensureActiveAnOwnedChannels(int... anChannel)
List<Integer> listUnallocated = Arrays.stream(anChannel)
.filter(c -> !isOwner(c))
.boxed()
.toList();
.collect(Collectors.toList());

if (!listUnallocated.isEmpty())
{
Expand Down

0 comments on commit 902e769

Please sign in to comment.