Skip to content

Commit

Permalink
update cache support
Browse files Browse the repository at this point in the history
  • Loading branch information
SamYuan1990 committed May 16, 2020
1 parent e08860d commit 1254c30
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@

import com.github.samyuan1990.FabricJavaPool.FabricJavaPoolConfig;
import com.github.samyuan1990.FabricJavaPool.api.FabricConnection;
import com.github.samyuan1990.FabricJavaPool.cache.CacheProxy;
import com.github.samyuan1990.FabricJavaPool.impl.FabricConnectionImpl;
import com.github.samyuan1990.FabricJavaPool.cache.FabricContractConnectImplCacheProxy;
import com.github.samyuan1990.FabricJavaPool.impl.FabricContractConnectImpl;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
Expand Down Expand Up @@ -62,7 +61,7 @@ public FabricConnection create() throws Exception {
// Obtain a smart contract deployed on the network.
FabricContractConnectImpl fCCI = new FabricContractConnectImpl(gateway.getNetwork(channel));
if (config.isUseCache()) {
CacheProxy proxy = new CacheProxy(fCCI, config.getCacheURL(), userName, channel, config.getCacheTimeout());
FabricContractConnectImplCacheProxy proxy = new FabricContractConnectImplCacheProxy(fCCI, config.getCacheURL(), userName, channel, config.getCacheTimeout());
return (FabricConnection) Proxy.newProxyInstance(FabricContractConnectImpl.class.getClassLoader(), new Class[]{FabricConnection.class}, proxy);
} else {
return fCCI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

import com.github.samyuan1990.FabricJavaPool.FabricJavaPoolConfig;
import com.github.samyuan1990.FabricJavaPool.api.FabricConnection;
import com.github.samyuan1990.FabricJavaPool.cache.CacheProxy;
import com.github.samyuan1990.FabricJavaPool.cache.FabricConnectionImplCacheProxy;
import com.github.samyuan1990.FabricJavaPool.cache.FabricContractConnectImplCacheProxy;
import com.github.samyuan1990.FabricJavaPool.impl.FabricConnectionImpl;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
Expand Down Expand Up @@ -63,7 +64,7 @@ public FabricConnection create() throws Exception {
myChannel.initialize();
myConnection = new FabricConnectionImpl(hfclient, myChannel, appUser);
if (config.isUseCache()) {
CacheProxy proxy = new CacheProxy(myConnection, config.getCacheURL(), appUser.getName(), channel, config.getCacheTimeout());
FabricConnectionImplCacheProxy proxy = new FabricConnectionImplCacheProxy(myConnection, config.getCacheURL(), appUser.getName(), channel, config.getCacheTimeout());
return (FabricConnection) Proxy.newProxyInstance(FabricConnectionImpl.class.getClassLoader(), new Class[]{FabricConnection.class}, proxy);
} else {
return myConnection;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.github.samyuan1990.FabricJavaPool.cache;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;

import com.github.samyuan1990.FabricJavaPool.ExecuteResult;
import org.hyperledger.fabric.protos.ledger.rwset.kvrwset.KvRwset;
import org.hyperledger.fabric.sdk.ProposalResponse;
import org.hyperledger.fabric.sdk.TxReadWriteSetInfo;

public class FabricConnectionImplCacheProxy extends FabricContractConnectImplCacheProxy implements InvocationHandler {


public FabricConnectionImplCacheProxy(Object obj, String cacheURL, String userName, String channelName, int timeout) {
super(obj, cacheURL, userName, channelName, timeout);
}

public FabricConnectionImplCacheProxy(String cacheURL, String user, String mychannel, int timeout) {
super(cacheURL, user, mychannel, timeout);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object result = null;
if (method.getName().equals("query")) {
String key = genericKey(userName, channelName, args);
result = memcachedClient.get(key);
if (result != null) {
System.out.println("hit");
return result;
}
result = method.invoke(obj, args);
ExecuteResult executeResult = (ExecuteResult) result;
if (executeResult.getPropResp() == null) {
return result;
}
for (ProposalResponse p : executeResult.getPropResp()) {
TxReadWriteSetInfo txReadWriteSetInfo = p.getChaincodeActionResponseReadWriteSetInfo();
for (TxReadWriteSetInfo.NsRwsetInfo nsRwsetInfo : txReadWriteSetInfo.getNsRwsetInfos()) {
KvRwset.KVRWSet rws = nsRwsetInfo.getRwset();
for (KvRwset.KVRead readList : rws.getReadsList()) {
String blockKey = readList.getKey();
memcachedClient.set(blockKey, timeout, key);
}
}
}
memcachedClient.set(key, timeout, result);
return result;
}
if (method.getName().equals("invoke")) {
result = method.invoke(obj, args);
ExecuteResult executeResult = (ExecuteResult) result;
if (executeResult.getPropResp() == null) {
return result;
}
for (ProposalResponse p : executeResult.getPropResp()) {
TxReadWriteSetInfo txReadWriteSetInfo = p.getChaincodeActionResponseReadWriteSetInfo();
for (TxReadWriteSetInfo.NsRwsetInfo nsRwsetInfo : txReadWriteSetInfo.getNsRwsetInfos()) {
KvRwset.KVRWSet rws = nsRwsetInfo.getRwset();
for (KvRwset.KVRead readList : rws.getReadsList()) {
String blockKey = readList.getKey();
String blockCache = memcachedClient.get(blockKey);
if (!blockCache.equals(null)) {
memcachedClient.delete(blockCache);
memcachedClient.delete(blockKey);
}
}
}
}
return result;
}
result = method.invoke(obj, args);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,35 @@
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;

import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.utils.AddrUtil;

public class CacheProxy implements InvocationHandler {
public class FabricContractConnectImplCacheProxy implements InvocationHandler {

private Object obj;
private MemcachedClient memcachedClient;
private String cacheURL;
private String userName;
private String channelName;
private int timeout;
Object obj;

public CacheProxy(Object obj, String cacheURL, String userName, String channelName, int timeout) {
public void setMemcachedClient(MemcachedClient memcachedClient) {
this.memcachedClient = memcachedClient;
}

public FabricContractConnectImplCacheProxy(Object obj, String userName, String channelName, int timeout) {
this.timeout = timeout;
this.channelName = channelName;
this.userName = userName;
this.cacheURL = cacheURL;
this.obj = obj;
}

MemcachedClient memcachedClient;
String cacheURL;
String userName;
String channelName;
int timeout;

public FabricContractConnectImplCacheProxy(Object obj, String cacheURL, String userName, String channelName, int timeout) {
this.timeout = timeout;
this.channelName = channelName;
this.userName = userName;
Expand All @@ -34,13 +45,15 @@ public CacheProxy(Object obj, String cacheURL, String userName, String channelNa
this.obj = obj;
}

private String genericKey(String user, String channel, Object[] args) {
String genericKey(String user, String channel, Object[] args) {
String key = "";
key = key.concat(args[0].toString());
key = key.concat(args[1].toString());
String[] list = (String[]) args[2];
for (Object l : list) {
key = key.concat(l.toString());
if (args.length > 2) {
String[] list = (String[]) args[2];
for (Object l : list) {
key = key.concat(l.toString());
}
}
key = key.concat(user.concat(channel));
return key;
Expand All @@ -60,7 +73,6 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
}
result = method.invoke(obj, args);
memcachedClient.set(key, timeout, result);
result = memcachedClient.get(key);
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.github.samyuan1990.FabricJavaPool.cache;

import java.lang.reflect.Method;

import com.github.samyuan1990.FabricJavaPool.ExecuteResult;
import net.rubyeye.xmemcached.MemcachedClient;
import org.junit.Assert;
import org.junit.Test;

import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class FabricConnectionImplCacheProxyTest {

static String noQuery() {
return "a";
}

static ExecuteResult query(String a, String b) {
return new ExecuteResult("a", null);
}

static ExecuteResult invoke(String a, String b) {
return new ExecuteResult("a", null);
}

@Test
public void invokeFunctionQuery() throws Throwable {
FabricConnectionImplCacheProxy test = new FabricConnectionImplCacheProxy("test", "user", "mychannel", 10);
Method method = FabricConnectionImplCacheProxyTest.class.getDeclaredMethod("query", String.class, String.class);
MemcachedClient memcachedClient = mock(MemcachedClient.class);
when(memcachedClient.get((String) any())).thenReturn(null);
//when(memcachedClient.set((String) any(), (int) any(), (String) any())).thenReturn(true);
test.setMemcachedClient(memcachedClient);
ExecuteResult rs = (ExecuteResult) test.invoke(null, method, new Object[]{"1", "2"});
Assert.assertEquals("a", rs.getResult());
}

@Test
public void invokeFunctionQuery2() throws Throwable {
FabricConnectionImplCacheProxy test = new FabricConnectionImplCacheProxy("test", "user", "mychannel", 10);
Method method = FabricConnectionImplCacheProxyTest.class.getDeclaredMethod("query", String.class, String.class);
MemcachedClient memcachedClient = mock(MemcachedClient.class);
when(memcachedClient.get((String) any())).thenReturn(new ExecuteResult("b", null));
//when(memcachedClient.set((String) any(), (int) any(), (String) any())).thenReturn(true);
test.setMemcachedClient(memcachedClient);
ExecuteResult rs = (ExecuteResult) test.invoke(null, method, new Object[]{"1", "2"});
Assert.assertEquals("b", rs.getResult());
}

@Test
public void invokeFunctionOthers() throws Throwable {
FabricConnectionImplCacheProxy test = new FabricConnectionImplCacheProxy("test", "test", "test", 10);
Method method = FabricConnectionImplCacheProxyTest.class.getDeclaredMethod("noQuery");
String rs = (String) test.invoke(null, method, null);
Assert.assertEquals("a", rs);
}

@Test
public void invokeFunctionInvoke() throws Throwable {
FabricConnectionImplCacheProxy test = new FabricConnectionImplCacheProxy("test", "user", "mychannel", 10);
Method method = FabricConnectionImplCacheProxyTest.class.getDeclaredMethod("invoke", String.class, String.class);
MemcachedClient memcachedClient = mock(MemcachedClient.class);
when(memcachedClient.get((String) any())).thenReturn("b");
//when(memcachedClient.set((String) any(), (int) any(), (String) any())).thenReturn(true);
test.setMemcachedClient(memcachedClient);
ExecuteResult rs = (ExecuteResult) test.invoke(null, method, new Object[]{"1", "2"});
Assert.assertEquals("a", rs.getResult());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package com.github.samyuan1990.FabricJavaPool.cache;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;

import net.rubyeye.xmemcached.MemcachedClient;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mock;

import static org.junit.Assert.*;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class FabricContractConnectImplCacheProxyTest {

static String noQuery() {
return "a";
}

static String query(String a, String b) {
return "a";
}

@Test
public void genericKey() {
FabricContractConnectImplCacheProxy test = new FabricContractConnectImplCacheProxy("test", "test", "test", 10);
String rs = test.genericKey("user", "mychannel", new Object[]{"1", "2"});
Assert.assertEquals("12usermychannel", rs);
}

@Test
public void invokeForQuery() throws Throwable {
FabricContractConnectImplCacheProxy test = new FabricContractConnectImplCacheProxy("test", "user", "mychannel", 10);
Method method = FabricContractConnectImplCacheProxyTest.class.getDeclaredMethod("query", String.class, String.class);
MemcachedClient memcachedClient = mock(MemcachedClient.class);
when(memcachedClient.get((String) any())).thenReturn(null);
//when(memcachedClient.set((String) any(), (int) any(), (String) any())).thenReturn(true);
test.setMemcachedClient(memcachedClient);
String rs = (String) test.invoke(null, method, new Object[]{"1", "2"});
Assert.assertEquals("a", rs);
}

@Test
public void invokeForQuery2() throws Throwable {
FabricContractConnectImplCacheProxy test = new FabricContractConnectImplCacheProxy("test", "user", "mychannel", 10);
Method method = FabricContractConnectImplCacheProxyTest.class.getDeclaredMethod("query", String.class, String.class);
MemcachedClient memcachedClient = mock(MemcachedClient.class);
when(memcachedClient.get((String) any())).thenReturn("b");
//when(memcachedClient.set((String) any(), (int) any(), (String) any())).thenReturn(true);
test.setMemcachedClient(memcachedClient);
String rs = (String) test.invoke(null, method, new Object[]{"1", "2"});
Assert.assertEquals("b", rs);
}

@Test
public void invokeNonQuery() throws Throwable {
FabricContractConnectImplCacheProxy test = new FabricContractConnectImplCacheProxy("test", "test", "test", 10);
Method method = FabricContractConnectImplCacheProxyTest.class.getDeclaredMethod("noQuery");
String rs = (String) test.invoke(null, method, null);
Assert.assertEquals("a", rs);
}
}

0 comments on commit 1254c30

Please sign in to comment.