Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry on S3 SlowDown exceptions #958

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
import logging
import os
import traceback

from minio.error import MinioException, S3Error
from tenacity import RetryError, before_log, retry, retry_if_result, stop_after_attempt, \
wait_exponential, wait_random


class ObjectStoreError(Exception):
Expand Down Expand Up @@ -57,23 +59,57 @@ def __init__(self, url=None, username=None, password=None, use_https=False):
'MINIO_SECRET_KEY'],
secure=secure_connection)

def upload_file(self, bucket, object_name, path):
@retry(
stop=stop_after_attempt(3),
retry=retry_if_result(lambda e:
isinstance(e, S3Error)
and hasattr(e, 'code')
and e.code in ['SlowDown', 'ServiceUnavailable', 'Throttling']),
wait=wait_exponential(multiplier=3, exp_base=4) + wait_random(min=1, max=3),
before=before_log(logging.getLogger(__name__), logging.INFO)
)
def _upload_file_with_retry(self, bucket, object_name, path):
"""
Upload a file to the object store with retry logic for certain S3 errors.
To take advantage of tenacity's retry logic based on specific errors, this
function needs to return the exception rather than raise it.
"""
try:
result = self.minio_client.fput_object(bucket_name=bucket,
object_name=object_name,
file_path=path)
except Exception as e:
# retry_if_result needs the exception to be returned and not raised
return e
return result

def upload_file(self, bucket, object_name, path):
try:
result = self._upload_file_with_retry(bucket, object_name, path)

# The retry logic will return the exception rather than raise it
if isinstance(result, Exception):
raise result

self.logger.info("OSM > created object.", extra={
"requestId": bucket, "object": result.object_name})
except S3Error as e:

except (RetryError, S3Error) as e:
# If a non-retryable S3Error is raised it will come here. If we
# ran out of retries it will also come here, but the exception will
# wrapped by tenacity
self.logger.error("S3Error", exc_info=True)
traceback.print_exc()
raise ObjectStoreError(f"Error uploading file to object store: {path}") from e

except MinioException as e:
self.logger.error("Minio error", exc_info=True)
traceback.print_exc()
raise ObjectStoreError(f"Error uploading file to object store: {path}") from e

try:
os.remove(path)
except FileNotFoundError:
pass # Should never happen, but is not fatal if it occurs
finally:
# Delete the file regardless of success or failure
try:
os.remove(path)
except FileNotFoundError:
pass # Should never happen, but is not fatal if it occurs
45 changes: 37 additions & 8 deletions transformer_sidecar/tests/test_object_store_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,50 @@ def test_upload_file(self, mocker):

def test_upload_file_exception(self, mocker):
import minio

def s3_error(code):
"""
Construct an S3Error with a specific code.
"""
return minio.error.S3Error(
response=mocker.Mock(),
code=code,
message='Mocked S3 Error',
resource='test-resource',
request_id='test-request-id',
host_id='test-host-id'
)

mock_minio = mocker.MagicMock(minio.api.Minio)
mock_minio.fput_object = mocker.Mock()
mock_minio.fput_object.side_effect = minio.error.S3Error(
response=mocker.Mock(),
code='TestError',
message='Mocked S3 Error',
resource='test-resource',
request_id='test-request-id',
host_id='test-host-id'
)

# Throw three retryable errors
mock_minio.fput_object.side_effect = [
s3_error('SlowDown'),
s3_error('ServiceUnavailable'),
s3_error('Throttling')
]

mocker.patch('minio.Minio', return_value=mock_minio)
result = ObjectStoreManager('localhost:9999', 'foo', 'bar')
with pytest.raises(ObjectStoreError):
result.upload_file("my-bucket", "foo.txt", "/tmp/foo.txt")
assert mock_minio.fput_object.call_count == 3

# Now throw one retryable exception and one non-retryable which should
# cause the upload to fail after the first retry
mock_minio.fput_object.reset_mock()
mock_minio.fput_object.side_effect = [
s3_error('SlowDown'),
s3_error('ItsDeadJim')
]
with pytest.raises(ObjectStoreError):
result.upload_file("my-bucket", "foo.txt", "/tmp/foo.txt")
assert mock_minio.fput_object.call_count == 2

# Now test out the MinioExceptions which are all non-retryable
mock_minio.fput_object.reset_mock()
mock_minio.fput_object.side_effect = minio.error.MinioException()
with pytest.raises(ObjectStoreError):
result.upload_file("my-bucket", "foo.txt", "/tmp/foo.txt")
assert mock_minio.fput_object.call_count == 1