Skip to content

Commit

Permalink
Merge pull request #5 from ELDEpendenci/develop
Browse files Browse the repository at this point in the history
  • Loading branch information
eric2788 authored May 11, 2022
2 parents 2946c19 + 5087069 commit 2d8db22
Show file tree
Hide file tree
Showing 16 changed files with 124 additions and 113 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ on:
jobs:
build:
runs-on: ubuntu-latest
permissions:
contents: write
steps:
- name: Checkout Source Code
id: checkout-source
Expand Down
2 changes: 1 addition & 1 deletion eldependenci-rpc-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>eldependenci-rpc-module</artifactId>
<groupId>org.eldependenci</groupId>
<version>0.0.2</version>
<version>0.0.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.ericlam.mc.eld.misc.DebugLogger;
import com.ericlam.mc.eld.services.LoggingService;
import com.ericlam.mc.eld.services.ScheduleService;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.google.inject.Singleton;
Expand All @@ -15,12 +14,10 @@
import org.eldependenci.rpc.JsonMapperFactory;
import org.eldependenci.rpc.config.RPCConfig;
import org.eldependenci.rpc.context.*;
import org.eldependenci.rpc.exception.ServiceException;
import org.eldependenci.rpc.protocol.RPCServiceable;
import org.eldependenci.rpc.protocol.ServiceHandler;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -40,11 +37,6 @@ public final class JavalinServiceable implements RPCServiceable {
@Inject
private ELDependenciRPC plugin;


@Inject
private ScheduleService scheduleService;


@Inject
public JavalinServiceable(LoggingService loggingService, JsonMapperFactory factory) {
this.logger = loggingService.getLogger(JavalinServiceable.class);
Expand Down Expand Up @@ -119,7 +111,7 @@ public synchronized void StartService(ServiceHandler handler) {

try {

var future = handlePayload(rpcPayload, handler);
var future = handler.handlePayload(rpcPayload, ctx.queryParam("debug") != null);
ctx.future(future.thenApply(result -> new RPCResponse<>(rpcPayload.id(), result instanceof RPCError, result)));

} catch (Exception e) {
Expand Down Expand Up @@ -171,7 +163,7 @@ private CompletableFuture<Void> handleWS(WsContext ctx, ServiceHandler handler)

try {

var future = handlePayload(rpcPayload, handler);
var future = handler.handlePayload(rpcPayload, false);

return future.thenAcceptAsync(result -> {

Expand Down Expand Up @@ -201,55 +193,6 @@ private CompletableFuture<Void> handleWS(WsContext ctx, ServiceHandler handler)

}

private CompletableFuture<Object> handlePayload(RPCPayload rpcPayload, ServiceHandler handler) throws Exception {

var async = handler.shouldCallAsync(rpcPayload);

CompletableFuture<Object> future = async ? toFuture(rpcPayload, handler) : new CompletableFuture<>();

if (!async) {

var returned = handler.invokes(rpcPayload);

if (returned.result() instanceof ScheduleService.BukkitPromise<?> promise) {

logger.debug("method {0} in service {1} is returning bukkit promise", rpcPayload.method(), rpcPayload.service());

promise.thenRunAsync(re -> {

var result = objectMapper.convertValue(re, Object.class);
logger.debug("method {0} in service {1} returning result: {2}", rpcPayload.method(), rpcPayload.service(), result);
future.complete(new RPCResult(rpcPayload.method(), rpcPayload.service(), result));

}).joinWithCatch(future::completeExceptionally);

} else {
future.complete(new RPCResult(rpcPayload.method(), rpcPayload.service(), handler.finalizeType(returned.result(), returned.returnType())));
}
}

return future;
}

private CompletableFuture<Object> toFuture(RPCPayload rpcPayload, ServiceHandler handler) {
return CompletableFuture.supplyAsync(() -> {

try {
var returned = handler.invokes(rpcPayload);
return new RPCResult(rpcPayload.method(), rpcPayload.service(), handler.finalizeType(returned.result(), returned.returnType()));
} catch (Exception e) {
String[] errors = new String[0];

if (!(e instanceof ServiceException)) {
errors = Arrays.stream(e.getStackTrace()).map(StackTraceElement::toString).toArray(String[]::new);
e.printStackTrace();
}

return new RPCError(400, e.getMessage(), errors);
}
});
}


@Override
public synchronized void StopService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
import javax.annotation.Nullable;
import javax.inject.Named;
import java.io.IOException;
import java.net.ConnectException;
import java.net.http.HttpTimeoutException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -32,11 +30,12 @@ public final class OkHttpRequester implements RPCRequester {
private @Nullable String serviceName;

@Override
public void initialize(RPCInfo client) {
public CompletableFuture<Void> initialize(RPCInfo client) {
this.serviceName = client.serviceName();
this.hosts = client.fallbackHosts();
// insert main host as first
this.hosts.add(0, new RPCInfo.FallbackHost(client.host(), client.useTLS(), client.authToken()));
return CompletableFuture.completedFuture(null);
}

public Object invokeRequest(RPCPayload payload, RPCInfo.FallbackHost host) throws Exception {
Expand Down Expand Up @@ -69,7 +68,7 @@ public Object invokeRequest(RPCPayload payload, RPCInfo.FallbackHost host) throw


@Override
public CompletableFuture<Object> offerRequest(RPCPayload payload) throws Exception {
public CompletableFuture<Object> offerRequest(RPCPayload payload) {
CompletableFuture<Object> future = new CompletableFuture<>();

for (RPCInfo.FallbackHost host : this.hosts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import okhttp3.*;
import okhttp3.internal.ws.RealWebSocket;
import okio.ByteString;
import org.bukkit.Bukkit;
import org.eldependenci.rpc.context.*;
import org.eldependenci.rpc.protocol.RPCRequester;

Expand All @@ -20,7 +19,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand Down Expand Up @@ -55,21 +53,13 @@ public OkWebSocketRequester(LoggingService loggingService) {
private RPCInfo info;

@Override
public void initialize(RPCInfo client) {
public CompletableFuture<Void> initialize(RPCInfo client) {
this.info = client;
this.serviceName = client.serviceName();
this.hosts = client.fallbackHosts();
hosts.add(new RPCInfo.FallbackHost(client.host(), client.useTLS(), client.authToken()));
/*
launchWebSockets(iterator, client.locate()).whenComplete((v, ex) -> {
if (ex != null) {
logger.warn(ex, "Error while launching service {0}: {1}", client.locate(), ex.getMessage());
}
});
*/

httpClient.connectionPool().evictAll();
return launchWebSockets(hosts.iterator(), client.locate());
}


Expand Down Expand Up @@ -162,7 +152,7 @@ public void onMessage(WebSocket webSocket, ByteString bytes) {
}

@Override
public CompletableFuture<Object> offerRequest(RPCPayload payload) throws Exception {
public CompletableFuture<Object> offerRequest(RPCPayload payload) {
var previousFuture = webSocket == null ? launchWebSockets(hosts.iterator(), info.locate()).thenApply(v -> null) : CompletableFuture.completedFuture(null);
return previousFuture.thenCompose(v -> {
var future = new CompletableFuture<Object>();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package org.eldependenci.rpc.remote;

import org.eldependenci.rpc.config.RPCConfig;
import org.eldependenci.rpc.context.RPCInfo;
import org.eldependenci.rpc.context.RPCPayload;
import org.eldependenci.rpc.protocol.RPCRequester;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

public class RemoteInvocationHandler implements InvocationHandler {

private final RPCRequester requester;

private final Class<?> service;
private final RequesterManager requesterManager;

private final String serviceName;
Expand All @@ -23,7 +21,7 @@ public RemoteInvocationHandler(
RequesterManager requesterManager
) {
this.requesterManager = requesterManager;
this.requester = requesterManager.getRequester(service);
this.service = service;
var rpcInfo = requesterManager.findInfo(service);
this.serviceName = rpcInfo.map(RPCInfo::serviceName).orElse(service.getSimpleName());
this.token = rpcInfo.map(RPCInfo::authToken).orElse("");
Expand All @@ -36,8 +34,9 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
} else {
args = args != null ? args : new String[0];
var id = System.nanoTime();
var payload = new RPCPayload(id, method.getName(), serviceName, args, token);
var future = this.requester.offerRequest(payload);
var payload = new RPCPayload(id, method.getName(), serviceName, args, token);

var future = requesterManager.getRequester(service).thenCompose(requester -> requester.offerRequest(payload));
return requesterManager.handleFuture(future, method.getGenericReturnType());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ public final class RemoteManager {
@Inject
private RequesterManager requestManager;

@Inject
private RPCConfig config;

public <T> T getRemoteService(Class<T> service) {
return (T) Optional.ofNullable(proxyMap.get(service)).orElseGet(() -> createService(service));
}
Expand All @@ -39,7 +36,6 @@ private <T> T createService(Class<T> service) {

public <T> T createServiceDynamically(Class<T> service, RPCInfo info) {
var serviceName = Optional.ofNullable(info.serviceName()).orElse(service.getSimpleName());
var requester = requestManager.getRequesterDynamically(info);
return (T) Proxy.newProxyInstance(
service.getClassLoader(),
new Class[]{service},
Expand All @@ -50,7 +46,7 @@ public <T> T createServiceDynamically(Class<T> service, RPCInfo info) {
args = args != null ? args : new String[0];
var id = System.nanoTime();
var payload = new RPCPayload(id, method.getName(), serviceName, args, info.authToken());
var future = requester.offerRequest(payload);
var future = requestManager.getRequesterDynamically(info).thenCompose(requester -> requester.offerRequest(payload));
return this.requestManager.handleFuture(future, method.getGenericReturnType());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,16 @@ public Optional<RPCInfo> findInfo(Class<?> service) {
.findAny();
}

public RPCRequester getRequesterDynamically(RPCInfo info) {
public CompletableFuture<RPCRequester> getRequesterDynamically(RPCInfo info) {
var requester = this.requesterMap.get(info.protocol());
if (requester == null) {
throw new IllegalArgumentException("Protocol " + info.protocol() + " is not supported");
}
var ins = injector.getInstance(requester);
ins.initialize(info);
return ins;
return ins.initialize(info).thenApply(v -> ins);
}

public RPCRequester getRequester(Class<?> service) {
public CompletableFuture<RPCRequester> getRequester(Class<?> service) {

var info = findInfo(service).orElseThrow(() -> new IllegalStateException("Can't find remote config for " + service.getName() + ", have you defined it on remotes.yml?"));

Expand All @@ -80,12 +79,17 @@ public RPCRequester getRequester(Class<?> service) {
if (instances == null) {
throw new IllegalArgumentException("Protocol " + protocol + " is not supported");
}
return Optional.ofNullable(instances.get(service)).orElseGet(() -> {
var requester = this.requesterMap.get(protocol);
var ins = injector.getInstance(requester);
ins.initialize(info);
instances.put(service, ins);
return ins;

var ins = instances.get(service);
if (ins != null){
return CompletableFuture.completedFuture(ins);
}

var requester = this.requesterMap.get(protocol);
var newIns = injector.getInstance(requester);
return newIns.initialize(info).thenApply(v -> {
instances.put(service, newIns);
return newIns;
});
}

Expand Down
Loading

0 comments on commit 2d8db22

Please sign in to comment.