Skip to content

Commit

Permalink
Add Retry policy to Google Storage
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
pditommaso committed Nov 20, 2023
1 parent 4c6f2e8 commit d2dd216
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 12 deletions.
19 changes: 13 additions & 6 deletions plugins/nf-google/src/main/nextflow/cloud/google/GoogleOpts.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import groovy.transform.Memoized
import groovy.transform.ToString
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.cloud.google.config.GoogleStorageOpts
import nextflow.exception.AbortOperationException
import nextflow.util.Duration
/**
Expand All @@ -46,13 +47,24 @@ class GoogleOpts {
private boolean enableRequesterPaysBuckets
private Duration httpConnectTimeout
private Duration httpReadTimeout
private GoogleStorageOpts storageOpts

String getProjectId() { projectId }
File getCredsFile() { credsFile }
String getLocation() { location ?: DEFAULT_LOCATION }
boolean getEnableRequesterPaysBuckets() { enableRequesterPaysBuckets }
Duration getHttpConnectTimeout() { httpConnectTimeout }
Duration getHttpReadTimeout() { httpReadTimeout }
GoogleStorageOpts getStorageOpts() { storageOpts }

GoogleOpts(Map opts) {
projectId = opts.project as String
location = opts.location as String
enableRequesterPaysBuckets = opts.enableRequesterPaysBuckets as boolean
httpConnectTimeout = opts.httpConnectTimeout ? opts.httpConnectTimeout as Duration : Duration.of('60s')
httpReadTimeout = opts.httpReadTimeout ? opts.httpReadTimeout as Duration : Duration.of('60s')
storageOpts = new GoogleStorageOpts( opts.storage as Map ?: Map.of() )
}

@Memoized
static GoogleOpts fromSession(Session session) {
Expand All @@ -66,12 +78,7 @@ class GoogleOpts {
}

protected static GoogleOpts fromSession0(Map config) {
final result = new GoogleOpts()
result.projectId = config.navigate("google.project") as String
result.location = config.navigate("google.location") as String
result.enableRequesterPaysBuckets = config.navigate('google.enableRequesterPaysBuckets') as boolean
result.httpConnectTimeout = config.navigate('google.httpConnectTimeout', '60s') as Duration
result.httpReadTimeout = config.navigate('google.httpReadTimeout', '60s') as Duration
final result = new GoogleOpts( config.google as Map ?: Map.of() )

if( result.enableRequesterPaysBuckets && !result.projectId )
throw new IllegalArgumentException("Config option 'google.enableRequesterPaysBuckets' cannot be honoured because the Google project Id has not been specified - Provide it by adding the option 'google.project' in the nextflow.config file")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.cloud.google.config

import groovy.transform.CompileStatic
import nextflow.util.Duration

/**
* Model Google storage retry settings
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
@CompileStatic
class GoogleRetryOpts {

final int maxAttempts
final double multiplier
final Duration maxDelay

GoogleRetryOpts(Map opts) {
maxAttempts = opts.maxAttempts ? opts.maxAttempts as int : 10
multiplier = opts.multiplier ? opts.multiplier as double : 2d
maxDelay = opts.maxDelay ? opts.maxDelay as Duration : Duration.of('90s')
}

long maxDelaySecs() {
return maxDelay.seconds
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.cloud.google.config

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class GoogleStorageOpts {

final GoogleRetryOpts retryPolicy

GoogleStorageOpts(Map opts) {
retryPolicy = new GoogleRetryOpts( opts.retryPolicy as Map ?: Map.of() )
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nextflow.cloud.google.util

import java.nio.file.Path

import com.google.api.gax.retrying.RetrySettings
import com.google.cloud.storage.StorageOptions
import com.google.cloud.storage.contrib.nio.CloudStorageConfiguration
import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem
Expand Down Expand Up @@ -57,15 +58,24 @@ class GsPathFactory extends FileSystemPathFactory {
return builder.build()
}

static protected StorageOptions getCloudStorageOptions(GoogleOpts googleOpts) {
static protected StorageOptions getCloudStorageOptions(GoogleOpts opts) {
final transportOptions = StorageOptions.getDefaultHttpTransportOptions().toBuilder()
if( googleOpts.httpConnectTimeout )
transportOptions.setConnectTimeout( (int)googleOpts.httpConnectTimeout.toMillis() )
if( googleOpts.httpReadTimeout )
transportOptions.setReadTimeout( (int)googleOpts.httpReadTimeout.toMillis() )
if( opts.httpConnectTimeout )
transportOptions.setConnectTimeout( (int)opts.httpConnectTimeout.toMillis() )
if( opts.httpReadTimeout )
transportOptions.setReadTimeout( (int)opts.httpReadTimeout.toMillis() )

RetrySettings retrySettings =
StorageOptions.getDefaultRetrySettings()
.toBuilder()
.setMaxAttempts(opts.storageOpts.retryPolicy.maxAttempts)
.setRetryDelayMultiplier(opts.storageOpts.retryPolicy.multiplier)
.setTotalTimeout(org.threeten.bp.Duration.ofSeconds(opts.storageOpts.retryPolicy.maxDelaySecs()))
.build()

return StorageOptions.getDefaultInstance().toBuilder()
.setTransportOptions(transportOptions.build())
.setRetrySettings(retrySettings)
.build()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.cloud.google.config

import nextflow.util.Duration
import spock.lang.Specification

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
class GoogleRetryOptsTest extends Specification {

def 'should get retry opts' () {
when:
def opts1 = new GoogleRetryOpts([:])
then:
opts1.maxAttempts == 10
opts1.multiplier == 2.0d
opts1.maxDelay == Duration.of('90s')

when:
def opts2 = new GoogleRetryOpts([maxAttempts: 5, maxDelay: '5s', multiplier: 10])
then:
opts2.maxAttempts == 5
opts2.multiplier == 10d
opts2.maxDelay == Duration.of('5s')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.google.cloud.storage.StorageOptions
import nextflow.Global
import nextflow.Session
import nextflow.cloud.google.GoogleOpts
import nextflow.cloud.google.config.GoogleRetryOpts
import spock.lang.Specification
import spock.lang.Unroll

Expand Down Expand Up @@ -87,6 +88,14 @@ class GsPathFactoryTest extends Specification {
getConfig() >> [google:[httpConnectTimeout: CONNECT, httpReadTimeout: READ]]
}
and:
def policy = new GoogleRetryOpts([:])
def retrySettings = StorageOptions.getDefaultRetrySettings()
.toBuilder()
.setMaxAttempts(policy.maxAttempts)
.setRetryDelayMultiplier(policy.multiplier)
.setTotalTimeout(org.threeten.bp.Duration.ofSeconds(policy.maxDelaySecs()))
.build()
and:
def opts = GoogleOpts.fromSession(session)
and:
def storageOptions = GsPathFactory.getCloudStorageOptions(opts)
Expand All @@ -98,6 +107,7 @@ class GsPathFactoryTest extends Specification {
expect:
storageOptions == StorageOptions.getDefaultInstance().toBuilder()
.setTransportOptions(transportOptions.build())
.setRetrySettings(retrySettings)
.build()

where:
Expand All @@ -106,4 +116,19 @@ class GsPathFactoryTest extends Specification {
'30s' | 30000 | '30s' | 30000
'60s' | 60000 | '60s' | 60000
}

def 'should apply retry settings' () {
given:
def session = Mock(Session) {
getConfig() >> [google:[storage:[retryPolicy: [maxAttempts: 5, maxDelay:'50s', multiplier: 500]]]]
}

when:
def opts = GoogleOpts.fromSession(session)
then:
opts.storageOpts.retryPolicy.maxAttempts == 5
opts.storageOpts.retryPolicy.maxDelaySecs() == 50
opts.storageOpts.retryPolicy.multiplier == 500d

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,8 @@ class WaveClient {
return false
if( value.startsWith('http://') || value.startsWith('https://') )
return false
if( value.startsWith('/') && !value.contains('/n') )
return true
return value.endsWith('.yaml') || value.endsWith('.yml') || value.endsWith('.txt')
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.SysEnv
import nextflow.container.inspect.ContainerInspectMode
import nextflow.container.inspect.ContainersInspector
import nextflow.extension.FilesEx
import nextflow.file.FileHelper
import nextflow.processor.TaskRun
Expand Down Expand Up @@ -1215,6 +1214,7 @@ class WaveClientTest extends Specification {
'foo' | false
'foo.yml' | true
'foo.txt' | true
'/foo/bar' | true
'foo\nbar.yml' | false
'http://foo.com' | false
}
Expand Down

0 comments on commit d2dd216

Please sign in to comment.