Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Avoid multiple GuiceLocator instances, and handle potential multiple KapuaMessageListeners #4066

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@
*******************************************************************************/
package org.eclipse.kapua.broker.artemis.plugin.security;

import com.google.inject.Provides;
import java.util.UUID;

import javax.inject.Named;
import javax.inject.Singleton;
import javax.jms.JMSException;

import org.eclipse.kapua.KapuaErrorCodes;
import org.eclipse.kapua.KapuaRuntimeException;
import org.eclipse.kapua.broker.artemis.plugin.security.context.SecurityContext;
import org.eclipse.kapua.broker.artemis.plugin.security.metric.LoginMetric;
import org.eclipse.kapua.broker.artemis.plugin.security.setting.BrokerSetting;
import org.eclipse.kapua.broker.artemis.plugin.security.setting.BrokerSettingKey;
import org.eclipse.kapua.client.security.MessageListener;
import org.eclipse.kapua.client.security.KapuaMessageListener;
import org.eclipse.kapua.client.security.ServiceClient;
import org.eclipse.kapua.client.security.ServiceClientMessagingImpl;
import org.eclipse.kapua.client.security.amqpclient.Client;
Expand All @@ -28,12 +33,10 @@
import org.eclipse.kapua.commons.setting.system.SystemSetting;
import org.eclipse.kapua.commons.setting.system.SystemSettingKey;

import javax.inject.Named;
import javax.inject.Singleton;
import javax.jms.JMSException;
import java.util.UUID;
import com.google.inject.Provides;

public class ArtemisSecurityModule extends AbstractKapuaModule {

@Override
protected void configureModule() {
bind(ServerContext.class).in(Singleton.class);
Expand All @@ -46,9 +49,9 @@ protected void configureModule() {
@Provides
@Singleton
SecurityContext securityContext(LoginMetric loginMetric,
BrokerSetting brokerSettings,
MetricsSecurityPlugin metricsSecurityPlugin,
RunWithLock runWithLock) {
BrokerSetting brokerSettings,
MetricsSecurityPlugin metricsSecurityPlugin,
RunWithLock runWithLock) {
return new SecurityContext(loginMetric,
brokerSettings.getBoolean(BrokerSettingKey.PRINT_SECURITY_CONTEXT_REPORT, false),
new LocalCache<>(
Expand All @@ -74,14 +77,14 @@ SecurityContext securityContext(LoginMetric loginMetric,
@Singleton
@Provides
ServiceClient authServiceClient(
MessageListener messageListener,
KapuaMessageListener messageListener,
@Named("clusterName") String clusterName,
@Named("brokerHost") String brokerHost,
SystemSetting systemSetting) {
return new ServiceClientMessagingImpl(messageListener, buildClient(systemSetting, clusterName, brokerHost, messageListener));
}

public Client buildClient(SystemSetting systemSetting, String clusterName, String brokerHost, MessageListener messageListener) {
public Client buildClient(SystemSetting systemSetting, String clusterName, String brokerHost, KapuaMessageListener messageListener) {
//TODO change configuration (use service event broker for now)
String clientId = "svc-ath-" + UUID.randomUUID().toString();
String host = systemSetting.getString(SystemSettingKey.SERVICE_BUS_HOST, "events-broker");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,26 @@
*******************************************************************************/
package org.eclipse.kapua.client.security;

import javax.inject.Singleton;

import org.eclipse.kapua.client.security.metric.AuthLoginMetricFactory;
import org.eclipse.kapua.client.security.metric.AuthMetric;
import org.eclipse.kapua.commons.core.AbstractKapuaModule;

import javax.inject.Singleton;
import com.google.inject.Provides;

public class ClientSecurityModule extends AbstractKapuaModule {

@Override
protected void configureModule() {
bind(MetricsClientSecurity.class).in(Singleton.class);
bind(MessageListener.class).in(Singleton.class);
bind(AuthMetric.class).in(Singleton.class);
bind(AuthLoginMetricFactory.class).in(Singleton.class);
}

@Provides
@Singleton
KapuaMessageListener messageListener(MetricsClientSecurity metricsClientSecurity) {
return new KapuaMessageListener(metricsClientSecurity);

Check warning on line 35 in client/security/src/main/java/org/eclipse/kapua/client/security/ClientSecurityModule.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/ClientSecurityModule.java#L35

Added line #L35 was not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,16 @@
*******************************************************************************/
package org.eclipse.kapua.client.security;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import javax.inject.Singleton;
import javax.jms.JMSException;
import javax.jms.Message;

import org.apache.qpid.jms.message.JmsTextMessage;
import org.eclipse.kapua.KapuaErrorCodes;
import org.eclipse.kapua.KapuaRuntimeException;
Expand All @@ -27,48 +35,54 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import javax.inject.Singleton;
import javax.jms.JMSException;
import javax.jms.Message;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;

/**
* This class is responsible to correlate request/response messages. Only one instance of this must be present at any given time!
*/
@Singleton
public class MessageListener extends ClientMessageListener {
public class KapuaMessageListener extends ClientMessageListener implements Closeable {

protected static Logger logger = LoggerFactory.getLogger(MessageListener.class);

private final Map<String, ResponseContainer<?>> callbacks;//is not needed the synchronization
protected static Logger logger = LoggerFactory.getLogger(KapuaMessageListener.class);

Check warning on line 47 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L47

Added line #L47 was not covered by tests
//Should only be one
private static final AtomicInteger INSTANCES = new AtomicInteger();

Check warning on line 49 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L49

Added line #L49 was not covered by tests
private final int currentInstanceNumber;
//Hate to use a static here, but at least in case of multiple listeners they will be able to correlate messages
private static final Map<String, ResponseContainer<?>> CALLBACKS = new ConcurrentHashMap<>();

Check warning on line 52 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L52

Added line #L52 was not covered by tests
//is not needed the synchronization
private static ObjectMapper mapper = new ObjectMapper();
private static ObjectReader reader = mapper.reader();//check if it's thread safe

private MetricsClientSecurity metrics;

@Inject
public MessageListener(MetricsClientSecurity metricsClientSecurity) {
logger.debug("Starting MessageListener");
KapuaMessageListener(MetricsClientSecurity metricsClientSecurity) {
currentInstanceNumber = INSTANCES.incrementAndGet();

Check warning on line 60 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L59-L60

Added lines #L59 - L60 were not covered by tests
if (currentInstanceNumber != 1) {
logger.warn("Starting KapuaMessageListener, instance number {}! Is this right?!?!?", currentInstanceNumber);

Check warning on line 62 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L62

Added line #L62 was not covered by tests
} else {
logger.debug("Starting KapuaMessageListener, instance {}", currentInstanceNumber);

Check warning on line 64 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L64

Added line #L64 was not covered by tests
}
this.metrics = metricsClientSecurity;
callbacks = new ConcurrentHashMap<>();
}

@Override
public void onMessage(Message message) {
logger.debug("KapuaMessageListener processing message, instance {} responding", currentInstanceNumber);

Check warning on line 71 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L71

Added line #L71 was not covered by tests
try {
SecurityAction securityAction = SecurityAction.valueOf(message.getStringProperty(MessageConstants.HEADER_ACTION));
switch (securityAction) {
case brokerConnect:
updateResponseContainer(buildAuthResponseFromMessage((JmsTextMessage) message));
break;
case brokerDisconnect:
updateResponseContainer(buildAuthResponseFromMessage((JmsTextMessage) message));
break;
case getEntity:
updateResponseContainer(buildAccountResponseFromMessage((JmsTextMessage) message));
break;
default:
throw new KapuaRuntimeException(KapuaErrorCodes.ILLEGAL_ARGUMENT, "action");
case brokerConnect:
updateResponseContainer(buildAuthResponseFromMessage((JmsTextMessage) message));
break;

Check warning on line 77 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L76-L77

Added lines #L76 - L77 were not covered by tests
case brokerDisconnect:
updateResponseContainer(buildAuthResponseFromMessage((JmsTextMessage) message));
break;

Check warning on line 80 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L79-L80

Added lines #L79 - L80 were not covered by tests
case getEntity:
updateResponseContainer(buildAccountResponseFromMessage((JmsTextMessage) message));
break;

Check warning on line 83 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L82-L83

Added lines #L82 - L83 were not covered by tests
default:
throw new KapuaRuntimeException(KapuaErrorCodes.ILLEGAL_ARGUMENT, "action");

Check warning on line 85 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L85

Added line #L85 was not covered by tests
}
} catch (JMSException | IOException e) {
metrics.getLoginCallbackError().inc();
Expand All @@ -77,8 +91,8 @@
}

private <R extends Response> void updateResponseContainer(R response) throws JMSException, IOException {
logger.debug("update callback {} on instance {}, map size: {}", response.getRequestId(), this, callbacks.size());
ResponseContainer<R> responseContainer = (ResponseContainer<R>) callbacks.get(response.getRequestId());
logger.debug("update callback {} on instance {}, map size: {}", response.getRequestId(), this, CALLBACKS.size());
ResponseContainer<R> responseContainer = (ResponseContainer<R>) CALLBACKS.get(response.getRequestId());

Check warning on line 95 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L94-L95

Added lines #L94 - L95 were not covered by tests
if (responseContainer == null) {
//internal error
logger.error("Cannot find request container for requestId {}", response.getRequestId());
Expand All @@ -102,13 +116,17 @@
}

public void registerCallback(String requestId, ResponseContainer<?> responseContainer) {
callbacks.put(requestId, responseContainer);
logger.debug("registered callback {} on instance {}, map size: {}", requestId, this, callbacks.size());
CALLBACKS.put(requestId, responseContainer);
logger.debug("registered callback {} on instance {}, map size: {}", requestId, this, CALLBACKS.size());

Check warning on line 120 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L119-L120

Added lines #L119 - L120 were not covered by tests
}

public void removeCallback(String requestId) {
callbacks.remove(requestId);
logger.debug("removed callback {} from instance {}, map size: {}", requestId, this, callbacks.size());
CALLBACKS.remove(requestId);
logger.debug("removed callback {} from instance {}, map size: {}", requestId, this, CALLBACKS.size());

Check warning on line 125 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L124-L125

Added lines #L124 - L125 were not covered by tests
}

@Override
public void close() throws IOException {
INSTANCES.decrementAndGet();
}

Check warning on line 131 in client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/KapuaMessageListener.java#L130-L131

Added lines #L130 - L131 were not covered by tests
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*******************************************************************************/
package org.eclipse.kapua.client.security;

import com.fasterxml.jackson.core.JsonProcessingException;
import javax.jms.JMSException;

import org.eclipse.kapua.client.security.amqpclient.Client;
import org.eclipse.kapua.client.security.bean.AuthRequest;
Expand All @@ -24,7 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.jms.JMSException;
import com.fasterxml.jackson.core.JsonProcessingException;

/**
* Security service. Implementation through AMQP messaging layer.
Expand All @@ -34,17 +34,18 @@
private static final Logger logger = LoggerFactory.getLogger(ServiceClientMessagingImpl.class);

private static final int TIMEOUT = 5000;
private final MessageListener messageListener;
private final KapuaMessageListener messageListener;

private Client client;

public ServiceClientMessagingImpl(MessageListener messageListener, Client client) {
public ServiceClientMessagingImpl(KapuaMessageListener messageListener, Client client) {

Check warning on line 41 in client/security/src/main/java/org/eclipse/kapua/client/security/ServiceClientMessagingImpl.java

View check run for this annotation

Codecov / codecov/patch

client/security/src/main/java/org/eclipse/kapua/client/security/ServiceClientMessagingImpl.java#L41

Added line #L41 was not covered by tests
this.messageListener = messageListener;
this.client = client;
}

@Override
public AuthResponse brokerConnect(AuthRequest authRequest) throws InterruptedException, JMSException, JsonProcessingException {//TODO review exception when Kapua code will be linked (throw KapuaException)
public AuthResponse brokerConnect(AuthRequest authRequest)
throws InterruptedException, JMSException, JsonProcessingException {//TODO review exception when Kapua code will be linked (throw KapuaException)
String requestId = MessageHelper.getNewRequestId();
authRequest.setRequestId(requestId);
authRequest.setAction(SecurityAction.brokerConnect.name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*******************************************************************************/
package org.eclipse.kapua.client.security.bean;

import org.eclipse.kapua.client.security.MessageListener;
import org.eclipse.kapua.client.security.KapuaMessageListener;

public class ResponseContainer<O extends Response> {

Expand All @@ -35,7 +35,7 @@ public String getRequestId() {
return requestId;
}

public static <O extends Response> ResponseContainer<O> createAnRegisterNewMessageContainer(MessageListener messageListener, Request request) {
public static <O extends Response> ResponseContainer<O> createAnRegisterNewMessageContainer(KapuaMessageListener messageListener, Request request) {
ResponseContainer<O> messageContainer = new ResponseContainer<>(request.getRequestId());
messageListener.registerCallback(request.getRequestId(), messageContainer);
return messageContainer;
Expand Down
4 changes: 0 additions & 4 deletions commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,6 @@
<groupId>com.google.inject</groupId>
<artifactId>guice</artifactId>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-multibindings</artifactId>
</dependency>
<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
*******************************************************************************/
package org.eclipse.kapua.commons.jpa;

import com.zaxxer.hikari.HikariDataSource;
import org.eclipse.kapua.commons.setting.system.SystemSetting;
import org.eclipse.kapua.commons.setting.system.SystemSettingKey;

import com.zaxxer.hikari.HikariDataSource;

public final class DataSource {

private static HikariDataSource hikariDataSource;
Expand Down
Loading
Loading