From f7990c2947a18b81fa467dfc6c77f34ebd2ad6a6 Mon Sep 17 00:00:00 2001 From: aubelm Date: Fri, 14 Jun 2024 14:14:10 +0200 Subject: [PATCH] https://github.com/atomikos/transactions-essentials/issues/213 Builder for Rest Client of remote-transactions --- .../icatch/provider/ConfigProperties.java | 8 ++- .../remoting/twopc/AtomikosRestPort.java | 2 + .../twopc/DefaultRestClientBuilder.java | 18 +++++++ .../remoting/twopc/ParticipantAdapter.java | 51 ++++++++++++++---- .../remoting/twopc/RestClientBuilder.java | 53 +++++++++++++++++++ .../transactions-defaults.properties | 1 + 6 files changed, 123 insertions(+), 10 deletions(-) create mode 100644 public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/DefaultRestClientBuilder.java create mode 100644 public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/RestClientBuilder.java diff --git a/public/transactions-api/src/main/java/com/atomikos/icatch/provider/ConfigProperties.java b/public/transactions-api/src/main/java/com/atomikos/icatch/provider/ConfigProperties.java index 3a61392d..65747456 100644 --- a/public/transactions-api/src/main/java/com/atomikos/icatch/provider/ConfigProperties.java +++ b/public/transactions-api/src/main/java/com/atomikos/icatch/provider/ConfigProperties.java @@ -38,7 +38,8 @@ public final class ConfigProperties { public static final String LOG_LOCK_ACQUISITION_MAX_ATTEMPTS = "com.atomikos.icatch.log_lock_acquisition_max_attempts"; public static final String LOG_LOCK_ACQUISITION_RETRY_DELAY = "com.atomikos.icatch.log_lock_acquisition_retry_delay"; - + public static final String REST_CLIENT_BUILDER = "com.atomikos.remoting.rest_client_builder"; + /** * Replace ${...} sequence with the referenced value from the given properties or * (if not found) the system properties - @@ -272,6 +273,11 @@ public String getJvmId() { return getProperty(JVM_ID_PROPERTY_NAME); } + + public String getRestClientBuilder() { + return getProperty(REST_CLIENT_BUILDER); + + } } diff --git a/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/AtomikosRestPort.java b/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/AtomikosRestPort.java index 0606922b..dd09597b 100644 --- a/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/AtomikosRestPort.java +++ b/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/AtomikosRestPort.java @@ -17,6 +17,7 @@ import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; @@ -46,6 +47,7 @@ @Path("/atomikos") @Consumes(HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON) +@Produces("text/plain") public class AtomikosRestPort { public static final String REST_URL_PROPERTY_NAME = "com.atomikos.icatch.rest_port_url"; diff --git a/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/DefaultRestClientBuilder.java b/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/DefaultRestClientBuilder.java new file mode 100644 index 00000000..7a54258f --- /dev/null +++ b/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/DefaultRestClientBuilder.java @@ -0,0 +1,18 @@ +package com.atomikos.remoting.twopc; + +import static javax.ws.rs.client.ClientBuilder.newClient; + +import javax.ws.rs.client.Client; + +/** + * Default provider for standard Jaxrs Client without connection pool + */ +public class DefaultRestClientBuilder extends RestClientBuilder { + + public Client build() { + Client client = newClient(); + client.property("jersey.config.client.suppressHttpComplianceValidation", true); + client.register(ParticipantsProvider.class); + return client; + } +} diff --git a/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/ParticipantAdapter.java b/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/ParticipantAdapter.java index 7081ec28..c08ef292 100644 --- a/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/ParticipantAdapter.java +++ b/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/ParticipantAdapter.java @@ -29,6 +29,7 @@ import com.atomikos.icatch.Participant; import com.atomikos.icatch.RollbackException; import com.atomikos.icatch.SysException; +import com.atomikos.icatch.config.Configuration; import com.atomikos.logging.Logger; import com.atomikos.logging.LoggerFactory; import com.atomikos.remoting.support.HeaderNames; @@ -42,20 +43,29 @@ public class ParticipantAdapter implements Participant { private static final Logger LOGGER = LoggerFactory.createLogger(ParticipantAdapter.class); - private final WebTarget target; + private static Client client; + + private final URI uri; private final Map cascadeList = new HashMap<>(); public ParticipantAdapter(URI uri) { - Client client = newClient(); - client.property("jersey.config.client.suppressHttpComplianceValidation", true); - client.register(ParticipantsProvider.class); - target = client.target(uri); + if (client == null) { + String className = Configuration.getConfigProperties().getRestClientBuilder(); + try { + Class builderClass = Thread.currentThread().getContextClassLoader().loadClass(className); + RestClientBuilder restClientBuilder = (RestClientBuilder)builderClass.newInstance(); + client = restClientBuilder.build(); + } catch (Exception e) { + throw new IllegalArgumentException(e); + } + } + this.uri = uri; } @Override public String getURI() { - return target.getUri().toASCIIString(); + return uri.toASCIIString(); } @Override @@ -74,7 +84,7 @@ public int prepare() throws RollbackException, HeurHazardException, HeurMixedExc LOGGER.logDebug("Calling prepare on " + getURI()); } try { - int result = target.request() + int result = client.target(uri).request() .buildPost(Entity.entity(cascadeList, HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON)) .invoke(Integer.class); if (LOGGER.isTraceEnabled()) { @@ -84,9 +94,16 @@ public int prepare() throws RollbackException, HeurHazardException, HeurMixedExc } catch (WebApplicationException e) { int status = e.getResponse().getStatus(); if (status == 404) { + // 404 writes a String entity - we have to consume it + consumeStringEntity(e.getResponse()); LOGGER.logWarning("Remote participant not available - any remote work will rollback...", e); throw new RollbackException(); } else { + if (status == 409) { + // 409 writes a String entity - we have to consume it + consumeStringEntity(e.getResponse()); + e.getResponse().close(); + } LOGGER.logWarning("Unexpected error during prepare - see stacktrace for more details...", e); throw new HeurHazardException(); } @@ -100,17 +117,21 @@ public void commit(boolean onePhase) LOGGER.logDebug("Calling commit on " + getURI()); } - Response r = target.path(String.valueOf(onePhase)).request().buildPut(Entity.entity("", HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON)).invoke(); + Response r = client.target(uri).path(String.valueOf(onePhase)).request().buildPut(Entity.entity("", HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON)).invoke(); if (r.getStatusInfo().getFamily() != Family.SUCCESSFUL) { int status = r.getStatus(); switch (status) { case 404: + // 404 writes a String entity - we have to consume it + consumeStringEntity(r); if (onePhase) { LOGGER.logWarning("Remote participant not available - default outcome will be rollback"); throw new RollbackException(); } case 409: + // 409 writes a String entity - we have to consume it + consumeStringEntity(r); LOGGER.logWarning("Unexpected 409 error on commit"); throw new HeurMixedException(); default: @@ -127,12 +148,14 @@ public void rollback() throws HeurCommitException, HeurMixedException, HeurHazar LOGGER.logDebug("Calling rollback on " + getURI()); } - Response r = target.request().header(HttpHeaders.CONTENT_TYPE, HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON).delete(); + Response r = client.target(uri).request().header(HttpHeaders.CONTENT_TYPE, HeaderNames.MimeType.APPLICATION_VND_ATOMIKOS_JSON).delete(); if (r.getStatusInfo().getFamily() != Family.SUCCESSFUL) { int status = r.getStatus(); switch (status) { case 409: + // 409 writes a String entity - we have to consume it + consumeStringEntity(r); LOGGER.logWarning("Unexpected 409 error on rollback"); throw new HeurMixedException(); case 404: @@ -174,5 +197,15 @@ public int hashCode() { public String toString() { return "ParticipantAdapter for: " + getURI(); } + + private void consumeStringEntity(Response r) { + // the entity body has to be consumed to allow pooling of http connections. + // see https://stackoverflow.com/questions/27063667/httpclient-4-3-blocking-on-connection-pool + try { + r.readEntity(String.class); + } catch (Exception e) { + // catch exception. we only want to be sure that all content was cosumed + } + } } diff --git a/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/RestClientBuilder.java b/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/RestClientBuilder.java new file mode 100644 index 00000000..c67d60d2 --- /dev/null +++ b/public/transactions-remoting/src/main/java/com/atomikos/remoting/twopc/RestClientBuilder.java @@ -0,0 +1,53 @@ +package com.atomikos.remoting.twopc; + +import javax.ws.rs.client.Client; + +/** + * Abstract Builder for creation of Rest Client + * + * You can create a sublcass if you need some special handling like connection polling, timeouts, ... + * Implementation is defined by the property com.atomikos.remoting.rest_client_builder + * Default in transaction-default.properties is + * com.atomikos.remoting.rest_client_builder=com.atomikos.remoting.twopc.DefaultRestClientBuilder + * + * Here some example: + *
+ * 
+ * public class PooledRestClientBuilder extends RestClientBuilder {
+
+	@Override
+	public Client build() {
+		ResteasyClientBuilder builder = new ResteasyClientBuilder();
+		
+		ConfigProperties configProperties = Configuration.getConfigProperties();
+		String connectionPoolSizeProperty = configProperties.getProperty("com.atomikos.remoting.twopc.ParticipantAdapter.connectionPoolSize");
+		int connectionPoolSize = 20;
+		if (connectionPoolSizeProperty != null)
+			connectionPoolSize = Integer.valueOf(connectionPoolSizeProperty);
+		
+		String connectTimeoutProperty = configProperties.getProperty("com.atomikos.remoting.twopc.ParticipantAdapter.connectTimeout");
+		int connectTimeout = 10;
+		if (connectTimeoutProperty != null)
+			connectTimeout = Integer.valueOf(connectTimeoutProperty);
+
+		String readTimeoutProperty = configProperties.getProperty("com.atomikos.remoting.twopc.ParticipantAdapter.readTimeout");
+		int readTimeout = 60;
+		if (readTimeoutProperty != null)
+			readTimeout = Integer.valueOf(readTimeoutProperty);
+
+		builder.connectTimeout(connectTimeout, TimeUnit.SECONDS);
+		builder.readTimeout(readTimeout, TimeUnit.SECONDS);
+		Client c = builder.connectionPoolSize(connectionPoolSize).build(); 
+		c.property("jersey.config.client.suppressHttpComplianceValidation", true);
+		c.register(ParticipantsProvider.class);
+		return c;
+	}
+}
+ * 
+ * 
+ */ +public abstract class RestClientBuilder { + + public abstract Client build(); + +} diff --git a/public/transactions/src/main/resources/transactions-defaults.properties b/public/transactions/src/main/resources/transactions-defaults.properties index f1c2113c..f2fc9862 100644 --- a/public/transactions/src/main/resources/transactions-defaults.properties +++ b/public/transactions/src/main/resources/transactions-defaults.properties @@ -24,3 +24,4 @@ com.atomikos.icatch.logcloud_datasource_name=logCloudDS com.atomikos.icatch.throw_on_heuristic=false com.atomikos.icatch.log_lock_acquisition_max_attempts=3 com.atomikos.icatch.log_lock_acquisition_retry_delay=1000 +com.atomikos.remoting.rest_client_builder=com.atomikos.remoting.twopc.DefaultRestClientBuilder