Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add Retry function for throttling #3

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ dependencies {
exclude group: "commons-logging", module: "commons-logging"
}

implementation "org.embulk:embulk-util-retryhelper:0.8.2"

implementation("org.embulk:embulk-util-aws-credentials:0.4.1") {
// They conflict with embulk-core. They are once excluded here,
// and added explicitly with versions exactly the same with embulk-core:0.10.29.
Expand Down Expand Up @@ -125,6 +127,7 @@ dependencies {
implementation "com.sun.xml.bind:jaxb-impl:2.2.11"

testImplementation "junit:junit:4.13.2"
testImplementation "org.embulk:embulk-deps:0.10.37"
testImplementation "org.embulk:embulk-core:0.10.37"
testImplementation "org.embulk:embulk-junit4:0.10.37"
}
Expand Down
1 change: 1 addition & 0 deletions gradle.lockfile
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ org.embulk:embulk-spi:0.10.37=compileClasspath
org.embulk:embulk-util-aws-credentials:0.4.1=compileClasspath,runtimeClasspath
org.embulk:embulk-util-config:0.3.2=compileClasspath,runtimeClasspath
org.embulk:embulk-util-file:0.1.3=compileClasspath,runtimeClasspath
org.embulk:embulk-util-retryhelper:0.8.2=compileClasspath,runtimeClasspath
org.msgpack:msgpack-core:0.8.11=compileClasspath
org.slf4j:jcl-over-slf4j:1.7.12=compileClasspath,runtimeClasspath
org.slf4j:slf4j-api:1.7.30=compileClasspath
Expand Down
207 changes: 207 additions & 0 deletions src/main/java/org/embulk/output/s3/DefaultRetryable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
* Copyright 2018 The Embulk project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.embulk.output.s3;

import com.amazonaws.AmazonServiceException;
import org.apache.http.HttpStatus;
import org.embulk.util.retryhelper.RetryExecutor;
import org.embulk.util.retryhelper.RetryGiveupException;
import org.embulk.util.retryhelper.Retryable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;

import static java.lang.String.format;

/**
* Retryable utility, regardless the occurred exceptions,
* Also provide a default approach for exception propagation.
*/
public class DefaultRetryable<T> implements Retryable<T>
{
private static final Logger log = LoggerFactory.getLogger(DefaultRetryable.class);
private static final Set<Integer> NONRETRYABLE_STATUS_CODES = new HashSet<Integer>(2);
private static final Set<String> NONRETRYABLE_ERROR_CODES = new HashSet<String>(1);
private String operationName;
private Callable<T> callable;

static {
NONRETRYABLE_STATUS_CODES.add(HttpStatus.SC_FORBIDDEN);
NONRETRYABLE_STATUS_CODES.add(HttpStatus.SC_METHOD_NOT_ALLOWED);
NONRETRYABLE_ERROR_CODES.add("ExpiredToken");
}

/**
* @param operationName the name that will be referred on logging
*/
public DefaultRetryable(String operationName)
{
this.operationName = operationName;
}

/**
* @param operationName the name that will be referred on logging
* @param callable the operation, either define this at construction time or override the call() method
*/
public DefaultRetryable(String operationName, Callable<T> callable)
{
this.operationName = operationName;
this.callable = callable;
}

public DefaultRetryable()
{
this("Anonymous operation");
}

public DefaultRetryable(Callable<T> callable)
{
this("Anonymous operation", callable);
}

@Override
public T call() throws Exception
{
if (callable != null) {
return callable.call();
}
else {
throw new IllegalStateException("Either override call() or construct with a Runnable");
}
}

@Override
public boolean isRetryableException(Exception exception)
{
// No retry on a subset of service exceptions
if (exception instanceof AmazonServiceException) {
AmazonServiceException ase = (AmazonServiceException) exception;
return !NONRETRYABLE_STATUS_CODES.contains(ase.getStatusCode()) && !NONRETRYABLE_ERROR_CODES.contains(ase.getErrorCode());
}
return true;
}

@Override
public void onRetry(Exception exception, int retryCount, int retryLimit, int retryWait)
{
String message = format("%s failed. Retrying %d/%d after %d seconds. Message: %s",
operationName, retryCount, retryLimit, retryWait / 1000, exception.getMessage());
if (retryCount % retryLimit == 0) {
log.warn(message, exception);
}
else {
log.warn(message);
}
}

@Override
public void onGiveup(Exception firstException, Exception lastException)
{
// Exceptions would be propagated, so it's up to the caller to handle, this is just warning
log.warn("Giving up on retrying for {}, first exception is [{}], last exception is [{}]",
operationName, firstException.getMessage(), lastException.getMessage());
}

/**
* Run itself by the supplied executor,
*
* This propagates all exceptions (as unchecked) and unwrap RetryGiveupException for the original cause.
* If the original exception already is a RuntimeException, it will be propagated as is. If not, it will
* be wrapped around with a RuntimeException.
*
* For convenient, it execute normally without retrying when executor is null.
*
* @throws RuntimeException the original cause
*/
public T executeWith(RetryExecutor executor)
{
if (executor == null) {
try {
return this.call();
}
catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RuntimeException(e);
}
}

try {
return executor.runInterruptible(this);
}
catch (RetryGiveupException e) {
final Exception cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new RuntimeException(cause);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

/**
* Run itself by the supplied executor,
*
* Same as `executeWith`, this propagates all original exceptions. But `propagateAsIsException` will
* be re-throw without being wrapped on a RuntimeException, whether it is a checked or unchecked exception.
*
* For convenient, it execute normally without retrying when executor is null.
*
* @throws X whatever checked exception that you decided to propagate directly
* @throws RuntimeException wrap around whatever the original cause of failure (potentially thread interruption)
*/
public <X extends Throwable> T executeWithCheckedException(RetryExecutor executor,
Class<X> propagateAsIsException) throws X
{
if (executor == null) {
try {
return this.call();
}
catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RuntimeException(e);
}
}

try {
return executor.runInterruptible(this);
}
catch (RetryGiveupException e) {
final Exception cause = e.getCause();
if (cause != null) {
if (propagateAsIsException.isInstance(cause)) {
throw propagateAsIsException.cast(cause);
}
}
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
throw new RuntimeException(cause);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
16 changes: 14 additions & 2 deletions src/main/java/org/embulk/output/s3/S3FileOutputPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.Buffer;
import org.embulk.spi.Exec;
import org.embulk.spi.FileOutput;
import org.embulk.spi.FileOutputPlugin;
import org.embulk.spi.TransactionalFileOutput;
import org.embulk.util.config.TaskMapper;
import org.embulk.util.retryhelper.RetryExecutor;
import org.slf4j.Logger;

import com.amazonaws.ClientConfiguration;
Expand Down Expand Up @@ -301,11 +301,23 @@ private String buildCurrentKey()

private void putFile(Path from, String key)
{
RetryExecutor retryExecutor = RetryExecutor.builder()
.withRetryLimit(5)
.build();

PutObjectRequest request = new PutObjectRequest(bucket, key, from.toFile());
if (cannedAccessControlListOptional.isPresent()) {
request.withCannedAcl(cannedAccessControlListOptional.get());
}
client.putObject(request);

new DefaultRetryable<Object>("Put file to s3") {
@Override
public Object call()
{
client.putObject(request);
return null;
}
}.executeWith(retryExecutor);
}

private void closeCurrent()
Expand Down
Loading