Skip to content

Commit

Permalink
Bug 36037585 - [36022945->22.06.7] Accessing a cache from a gRPC clie…
Browse files Browse the repository at this point in the history
…nt, AsyncNamedMap or AsyncNamedCache does not trigger read through

(merge 14.1.1.2206 -> ce/22.06 104813)

[git-p4: depot-paths = "//dev/coherence-ce/release/coherence-ce-v22.06/": change = 104817]
  • Loading branch information
thegridman committed Nov 22, 2023
1 parent 77e48a1 commit ad60dae
Show file tree
Hide file tree
Showing 11 changed files with 1,134 additions and 528 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public DistributedAsyncNamedCache(NamedCache<K, V> cache, Option[] options)
public CompletableFuture<V> get(K key)
{
InvocableMap.EntryProcessor processor = BinaryProcessors.get();
CompletableFuture<Binary> future = invoke(key, processor);
CompletableFuture<Binary> future = invoke(key, processor);
return future.thenApply(f_valueFromInternalConverter::convert);
}

Expand Down Expand Up @@ -225,6 +225,15 @@ public CompletableFuture<V> putIfAbsent(K key, V value)
.thenApply(bin -> bin == null ? null : f_valueFromInternalConverter.convert((Binary) bin));
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
public CompletableFuture<V> remove(K key)
{
InvocableMap.EntryProcessor processor = BinaryProcessors.remove();
CompletableFuture<Binary> future = invoke(key, processor);
return future.thenApply(f_valueFromInternalConverter::convert);
}

@Override
public CompletableFuture<Void> removeAll(Collection<? extends K> colKeys)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, 2023, Oracle and/or its affiliates.
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
Expand Down Expand Up @@ -130,6 +130,16 @@ public static InvocableMap.EntryProcessor<Binary, Binary, Binary> putIfAbsent(Bi
return new BinaryPutIfAbsentProcessor(value, ttl);
}

/**
* Obtain an instance of the {@link BinaryRemoveProcessor}.
*
* @return an instance of the {@link BinaryRemoveProcessor}
*/
public static InvocableMap.EntryProcessor<Binary, Binary, Binary> remove()
{
return BinaryRemoveProcessor.INSTANCE;
}

// ----- class: BaseProcessor -------------------------------------------

/**
Expand Down Expand Up @@ -887,6 +897,14 @@ public Binary process(InvocableMap.Entry<Binary, Binary> entry)
{
prevValue = ((BinaryEntry<Binary, Binary>) entry).getBinaryValue();
}
else
{
entry.getValue(); // maybe trigger a CacheStore load
if (entry.isPresent())
{
prevValue = ((BinaryEntry<Binary, Binary>) entry).getBinaryValue();
}
}
return prevValue;
}

Expand All @@ -908,6 +926,14 @@ public Map<Binary, Binary> processAll(Set<? extends InvocableMap.Entry<Binary, B
{
mapResults.put(((BinaryEntry<Binary, Binary>) entry).getBinaryKey(), this.process(entry));
}
else
{
entry.getValue(); // maybe trigger a CacheStore load
if (entry.isPresent())
{
mapResults.put(((BinaryEntry<Binary, Binary>) entry).getBinaryKey(), this.process(entry));
}
}
iter.remove();
if (ctxGuard != null)
{
Expand Down Expand Up @@ -946,6 +972,15 @@ public Binary process(InvocableMap.Entry<Binary, Binary> entry)
prevValue = ((BinaryEntry<Binary, Binary>) entry).getBinaryValue();
entry.remove(false);
}
else
{
entry.getValue(); // maybe trigger a CacheStore load...
if (entry.isPresent())
{
prevValue = ((BinaryEntry<Binary, Binary>) entry).getBinaryValue();
entry.remove(false);
}
}
return prevValue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import java.io.DataOutput;
import java.io.IOException;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -815,6 +814,10 @@ public static class RemoveBlind<K, V>
{
public Boolean process(InvocableMap.Entry<K, V> entry)
{
if (!entry.isPresent())
{
entry.getValue(); // maybe trigger a CacheStore load
}
boolean fRemoved = entry.isPresent();
entry.remove(false);
return fRemoved;
Expand Down Expand Up @@ -850,6 +853,10 @@ public RemoveNoResults(boolean fSynthetic)

public Void process(InvocableMap.Entry<K, V> entry)
{
if (!entry.isPresent())
{
entry.getValue(); // Maybe trigger a CacheStore load so the remove triggers an erase
}
entry.remove(m_fSynthetic);
return null;
}
Expand All @@ -862,6 +869,10 @@ public Map<K, Void> processAll(Set<? extends InvocableMap.Entry<K, V>> setEntrie
for (Iterator<? extends InvocableMap.Entry<K, V>> iter = setEntries.iterator(); iter.hasNext(); )
{
InvocableMap.Entry<K, V> entry = iter.next();
if (!entry.isPresent())
{
entry.getValue(); // Maybe trigger a CacheStore load so the remove triggers an erase
}
entry.remove(m_fSynthetic);
iter.remove();
if (ctxGuard != null)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
/*
* Copyright (c) 2000, 2022, Oracle and/or its affiliates.
* Copyright (c) 2000, 2023, Oracle and/or its affiliates.
*
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/
package cache;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.Cluster;
import com.tangosol.net.Coherence;
import com.tangosol.net.NamedCache;

import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import com.tangosol.net.Session;
import org.junit.BeforeClass;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

/**
* Tests for COH-20907 AsyncNamedCache putAll causing CacheLoader
Expand All @@ -25,44 +30,139 @@
*/
public class AsyncNamedCacheWithCacheStoreTests
{
@BeforeClass
public static void setup() throws Exception
{
System.setProperty("coherence.ttl", "0");
System.setProperty("coherence.wka", "127.0.0.1");
System.setProperty("coherence.localhost", "127.0.0.1");
System.setProperty("coherence.cacheconfig", FILE_CFG_CACHE);

Coherence coherence = Coherence.clusterMember().start().get(5, TimeUnit.MINUTES);
s_session = coherence.getSession();
}

@Test
public void testAsyncPutAll()
public void testAsyncGetCallsLoadOnly() throws Exception
{
NamedCache<Integer, Integer> cache = s_session.getCache("test-coh20907-get");

Cluster cluster = null;
TestCacheStore.clear();
cache.async().get(1).get(1, TimeUnit.MINUTES);

try
{
String cacheConfig = this.getClass().getClassLoader()
.getResource(FILE_CFG_CACHE).getFile();
System.setProperty("coherence.cacheconfig", cacheConfig);
cluster = CacheFactory.ensureCluster();

NamedCache cache = CacheFactory.getCache("test-coh20907");
Map<Integer, Integer> map = new HashMap<>();

for (int i = 0; i < 10; ++i)
{
map.put(i, i);
}

try
{
cache.async().putAll(map);
}
catch (Throwable t)
{
fail("No Exception should have been thrown! Got Exception: " + t);
}
}
finally
assertThat(TestCacheStore.getErases().isEmpty(), is(true));
assertThat(TestCacheStore.getStores().isEmpty(), is(true));
Queue<?> loads = TestCacheStore.getLoads();
assertThat(loads.size(), is(1));
assertThat(loads.contains(1), is(true));
}

@Test
public void testAsyncGetAllCallsLoadOnly() throws Exception
{
NamedCache<Integer, Integer> cache = s_session.getCache("test-coh20907-getAll");
Set<Integer> setKey = Set.of(1, 2, 3, 4);

TestCacheStore.clear();
cache.async().getAll(setKey).get(1, TimeUnit.MINUTES);

assertThat(TestCacheStore.getErases().isEmpty(), is(true));
assertThat(TestCacheStore.getStores().isEmpty(), is(true));
Set<?> loads = TestCacheStore.getLoadsAsSet();
assertThat(loads, is(setKey));
}

@Test
public void testAsyncPutCallsStoreOnly() throws Exception
{
NamedCache<Integer, Integer> cache = s_session.getCache("test-coh20907-put");

TestCacheStore.clear();
cache.async().put(1, 100).get(1, TimeUnit.MINUTES);

assertThat(TestCacheStore.getErases().isEmpty(), is(true));
assertThat(TestCacheStore.getLoads().isEmpty(), is(true));
assertThat(TestCacheStore.getStores().get(1), is(100));
}

@Test
public void testAsyncPutAllCallsStoreOnly() throws Exception
{
NamedCache<Integer, Integer> cache = s_session.getCache("test-coh20907-putAll");
Map<Integer, Integer> map = new HashMap<>();

for (int i = 0; i < 10; ++i)
{
if (cluster != null)
{
cluster.shutdown();
}
map.put(i, i);
}

TestCacheStore.clear();
cache.async().putAll(map).get(1, TimeUnit.MINUTES);

assertThat(TestCacheStore.getErases().isEmpty(), is(true));
assertThat(TestCacheStore.getLoads().isEmpty(), is(true));
assertThat(TestCacheStore.getStores(), is(map));
}

@Test
public void testAsyncRemoveCallsLoadAndErase() throws Exception
{
NamedCache<Integer, Integer> cache = s_session.getCache("test-coh20907-remove");
cache.clear();

TestCacheStore.clear();
TestCacheStore.put(1, 100);
cache.async().remove(1).get(1, TimeUnit.MINUTES);

assertThat(TestCacheStore.getStores().isEmpty(), is(true));

Queue<?> loads = TestCacheStore.getLoads();
assertThat(loads.size(), is(1));
assertThat(loads.contains(1), is(true));

Queue<?> erases = TestCacheStore.getErases();
assertThat(erases.size(), is(1));
assertThat(erases.contains(1), is(true));
}

@Test
public void testAsyncRemoveKeyAndValueCallsLoadAndErase() throws Exception
{
NamedCache<Integer, Integer> cache = s_session.getCache("test-coh20907-removeKeyAndValue");
cache.clear();

TestCacheStore.clear();
TestCacheStore.put(1, 100);
cache.async().remove(1, 100).get(1, TimeUnit.MINUTES);

assertThat(TestCacheStore.getStores().isEmpty(), is(true));

Queue<?> loads = TestCacheStore.getLoads();
assertThat(loads.size(), is(1));
assertThat(loads.contains(1), is(true));

Queue<?> erases = TestCacheStore.getErases();
assertThat(erases.size(), is(1));
assertThat(erases.contains(1), is(true));
}

@Test
public void testAsyncRemoveAllCallsLoadAndErase() throws Exception
{
NamedCache<Integer, Integer> cache = s_session.getCache("test-coh20907-removeAll");
Map<Integer, Integer> map = Map.of(1, 100, 2, 200, 3, 300, 4, 400);

TestCacheStore.clear();
TestCacheStore.putAll(map);
cache.async().removeAll(map.keySet()).get(1, TimeUnit.MINUTES);

assertThat(TestCacheStore.getStores().isEmpty(), is(true));

Set<?> loads = TestCacheStore.getLoadsAsSet();
assertThat(loads, is(map.keySet()));

Set<?> erases = TestCacheStore.getErasesAsSet();
assertThat(erases, is(map.keySet()));
}

// ----- constants and data members -------------------------------------
Expand All @@ -72,4 +172,5 @@ public void testAsyncPutAll()
*/
public static final String FILE_CFG_CACHE = "server-cache-config.xml";

private static Session s_session;
}
Loading

0 comments on commit ad60dae

Please sign in to comment.