Skip to content

Commit c1ae2db

Browse files
author
arnett, stu
committed
v2.0.7
1 parent bdff87c commit c1ae2db

9 files changed

+485
-41
lines changed

src/main/java/com/emc/object/s3/LargeFileUploader.java

100644100755
+75-38
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import com.emc.object.s3.bean.MultipartPartETag;
3434
import com.emc.object.s3.request.*;
3535
import com.emc.object.util.InputStreamSegment;
36+
import com.emc.object.util.ProgressInputStream;
37+
import com.emc.object.util.ProgressListener;
3638
import com.emc.rest.util.SizedInputStream;
3739
import org.apache.log4j.Logger;
3840

@@ -47,6 +49,7 @@
4749
import java.util.concurrent.ExecutorService;
4850
import java.util.concurrent.Executors;
4951
import java.util.concurrent.Future;
52+
import java.util.concurrent.atomic.AtomicLong;
5053

5154
/**
5255
* Convenience class to facilitate multipart upload for large files. This class will split the file
@@ -74,8 +77,9 @@ public class LargeFileUploader implements Runnable {
7477
private Long partSize = DEFAULT_PART_SIZE;
7578
private int threads = DEFAULT_THREADS;
7679
private ExecutorService executorService;
80+
private AtomicLong bytesTransferred = new AtomicLong();
81+
private ProgressListener progressListener;
7782

78-
private long bytesTransferred;
7983
private String eTag;
8084

8185
/**
@@ -113,22 +117,14 @@ public void doMultipartUpload() {
113117
String uploadId = s3Client.initiateMultipartUpload(initRequest).getUploadId();
114118

115119
List<Future<MultipartPartETag>> futures = new ArrayList<Future<MultipartPartETag>>();
116-
List<SizedInputStream> segmentStreams = new ArrayList<SizedInputStream>();
117120
try {
118121
// submit all upload tasks
119122
int partNumber = 1;
120123
long offset = 0, length = partSize;
121124
while (offset < fullSize) {
122125
if (offset + length > fullSize) length = fullSize - offset;
123126

124-
SizedInputStream segmentStream = file != null
125-
? new InputStreamSegment(new FileInputStream(file), offset, length)
126-
: new SizedInputStream(stream, length);
127-
segmentStreams.add(segmentStream);
128-
129-
UploadPartRequest partRequest = new UploadPartRequest(bucket, key, uploadId, partNumber++, segmentStream);
130-
partRequest.setContentLength(length);
131-
futures.add(executorService.submit(new UploadPartTask(partRequest)));
127+
futures.add(executorService.submit(new UploadPartTask(uploadId, partNumber++, offset, length)));
132128

133129
offset += length;
134130
}
@@ -155,10 +151,6 @@ public void doMultipartUpload() {
155151
if (e instanceof RuntimeException) throw (RuntimeException) e;
156152
throw new RuntimeException("error during upload", e);
157153
} finally {
158-
for (SizedInputStream segmentStream : segmentStreams) {
159-
bytesTransferred += segmentStream.getRead();
160-
}
161-
162154
// make sure all spawned threads are shut down
163155
executorService.shutdown();
164156

@@ -184,22 +176,13 @@ public void doByteRangeUpload() {
184176
s3Client.putObject(request);
185177

186178
List<Future<String>> futures = new ArrayList<Future<String>>();
187-
List<SizedInputStream> segmentStreams = new ArrayList<SizedInputStream>();
188179
try {
189180
// submit all upload tasks
190-
PutObjectRequest rangeRequest;
191181
long offset = 0, length = partSize;
192182
while (offset < fullSize) {
193183
if (offset + length > fullSize) length = fullSize - offset;
194-
Range range = Range.fromOffsetLength(offset, length);
195184

196-
SizedInputStream segmentStream = file != null
197-
? new InputStreamSegment(new FileInputStream(file), offset, length)
198-
: new SizedInputStream(stream, length);
199-
segmentStreams.add(segmentStream);
200-
201-
rangeRequest = new PutObjectRequest(bucket, key, segmentStream).withRange(range);
202-
futures.add(executorService.submit(new PutObjectTask(rangeRequest)));
185+
futures.add(executorService.submit(new PutObjectTask(offset, length)));
203186

204187
offset += length;
205188
}
@@ -219,10 +202,6 @@ public void doByteRangeUpload() {
219202
if (e instanceof RuntimeException) throw (RuntimeException) e;
220203
throw new RuntimeException("error during upload", e);
221204
} finally {
222-
for (SizedInputStream segmentStream : segmentStreams) {
223-
bytesTransferred += segmentStream.getRead();
224-
}
225-
226205
// make sure all spawned threads are shut down
227206
executorService.shutdown();
228207

@@ -300,7 +279,7 @@ public long getFullSize() {
300279
}
301280

302281
public long getBytesTransferred() {
303-
return bytesTransferred;
282+
return bytesTransferred.get();
304283
}
305284

306285
public String getETag() {
@@ -339,6 +318,14 @@ public void setCloseStream(boolean closeStream) {
339318
this.closeStream = closeStream;
340319
}
341320

321+
public ProgressListener getProgressListener() {
322+
return progressListener;
323+
}
324+
325+
public void setProgressListener(ProgressListener progressListener) {
326+
this.progressListener = progressListener;
327+
}
328+
342329
public long getPartSize() {
343330
return partSize;
344331
}
@@ -368,6 +355,14 @@ public ExecutorService getExecutorService() {
368355
return executorService;
369356
}
370357

358+
private void updateBytesTransferred(long count) {
359+
long totalTransferred = bytesTransferred.addAndGet(count);
360+
361+
if(progressListener != null) {
362+
progressListener.progress(totalTransferred, fullSize);
363+
}
364+
}
365+
371366
/**
372367
* Allows for providing a custom thread executor (i.e. for custom thread factories). Note that if
373368
* you set a custom executor service, the <code>threads</code> property will be ignored.
@@ -411,29 +406,71 @@ public LargeFileUploader withExecutorService(ExecutorService executorService) {
411406
return this;
412407
}
413408

414-
protected class UploadPartTask implements Callable<MultipartPartETag> {
415-
private UploadPartRequest request;
409+
public LargeFileUploader withProgressListener(ProgressListener progressListener) {
410+
setProgressListener(progressListener);
411+
return this;
412+
}
413+
414+
private class UploadPartTask implements Callable<MultipartPartETag> {
415+
private String uploadId;
416+
private int partNumber;
417+
private long offset;
418+
private long length;
416419

417-
public UploadPartTask(UploadPartRequest request) {
418-
this.request = request;
420+
public UploadPartTask(String uploadId, int partNumber, long offset, long length) {
421+
this.uploadId = uploadId;
422+
this.partNumber = partNumber;
423+
this.offset = offset;
424+
this.length = length;
419425
}
420426

421427
@Override
422428
public MultipartPartETag call() throws Exception {
423-
return s3Client.uploadPart(request);
429+
SizedInputStream segmentStream;
430+
if (file != null) {
431+
segmentStream = new InputStreamSegment(new ProgressInputStream(new FileInputStream(file), progressListener), offset, length);
432+
} else {
433+
segmentStream = new SizedInputStream(new ProgressInputStream(stream, progressListener), length);
434+
}
435+
436+
UploadPartRequest request = new UploadPartRequest(bucket, key, uploadId, partNumber++, segmentStream);
437+
request.setContentLength(length);
438+
439+
MultipartPartETag etag = s3Client.uploadPart(request);
440+
updateBytesTransferred(length);
441+
return etag;
424442
}
425443
}
426444

427445
protected class PutObjectTask implements Callable<String> {
428-
private PutObjectRequest request;
446+
private long offset;
447+
private long length;
429448

430-
public PutObjectTask(PutObjectRequest request) {
431-
this.request = request;
449+
public PutObjectTask(long offset, long length) {
450+
this.offset = offset;
451+
this.length = length;
432452
}
433453

434454
@Override
435455
public String call() throws Exception {
436-
return s3Client.putObject(request).getETag();
456+
Range range = Range.fromOffsetLength(offset, length);
457+
458+
SizedInputStream segmentStream = file != null
459+
? new InputStreamSegment(new ProgressInputStream(new FileInputStream(file), progressListener),
460+
offset, length) : new SizedInputStream(new ProgressInputStream(stream, progressListener),
461+
length);
462+
463+
PutObjectRequest request = new PutObjectRequest(bucket, key, segmentStream).withRange(range);
464+
465+
String etag = s3Client.putObject(request).getETag();
466+
long length = 0;
467+
if(request.getRange() != null) {
468+
length = request.getRange().getLast() - request.getRange().getFirst() + 1;
469+
} else if(request.getContentLength() != null) {
470+
length = request.getContentLength();
471+
}
472+
updateBytesTransferred(length);
473+
return etag;
437474
}
438475
}
439476
}

src/main/java/com/emc/object/s3/jersey/AuthorizationFilter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public ClientResponse handle(ClientRequest request) throws ClientHandlerExceptio
5353

5454
// if no identity is provided, this is an anonymous client
5555
if (s3Config.getIdentity() != null) {
56-
Map<String, String> parameters = RestUtil.getQueryParameterMap(request.getURI().getQuery());
56+
Map<String, String> parameters = RestUtil.getQueryParameterMap(request.getURI().getRawQuery());
5757
String resource = RestUtil.getEncodedPath(request.getURI());
5858

5959
// check if bucket is in hostname
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright (c) 2015, EMC Corporation.
3+
* Redistribution and use in source and binary forms, with or without modification,
4+
* are permitted provided that the following conditions are met:
5+
*
6+
* + Redistributions of source code must retain the above copyright notice,
7+
* this list of conditions and the following disclaimer.
8+
* + Redistributions in binary form must reproduce the above copyright
9+
* notice, this list of conditions and the following disclaimer in the
10+
* documentation and/or other materials provided with the distribution.
11+
* + The name of EMC Corporation may not be used to endorse or promote
12+
* products derived from this software without specific prior written
13+
* permission.
14+
*
15+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17+
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18+
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
19+
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20+
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22+
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23+
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24+
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25+
* POSSIBILITY OF SUCH DAMAGE.
26+
*/
27+
package com.emc.object.util;
28+
29+
import java.io.FilterInputStream;
30+
import java.io.IOException;
31+
import java.io.InputStream;
32+
33+
/**
34+
* InputStream wrapper class that fires events when data is read so realtime performance can be measured.
35+
*/
36+
public class ProgressInputStream extends FilterInputStream {
37+
private final ProgressListener listener;
38+
39+
public ProgressInputStream(InputStream wrappedStream, ProgressListener listener) {
40+
super(wrappedStream);
41+
this.listener = listener;
42+
}
43+
44+
@Override
45+
public int read() throws IOException {
46+
// This is a really, really bad idea.
47+
throw new RuntimeException("No, I'm not going to let you kill performance.");
48+
}
49+
50+
@Override
51+
public int read(byte[] b) throws IOException {
52+
int count = in.read(b);
53+
if(listener != null) listener.transferred(count);
54+
return count;
55+
}
56+
57+
@Override
58+
public int read(byte[] b, int off, int len) throws IOException {
59+
int count = in.read(b, off, len);
60+
if(listener != null) listener.transferred(count);
61+
return count;
62+
}
63+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2015, EMC Corporation.
3+
* Redistribution and use in source and binary forms, with or without modification,
4+
* are permitted provided that the following conditions are met:
5+
*
6+
* + Redistributions of source code must retain the above copyright notice,
7+
* this list of conditions and the following disclaimer.
8+
* + Redistributions in binary form must reproduce the above copyright
9+
* notice, this list of conditions and the following disclaimer in the
10+
* documentation and/or other materials provided with the distribution.
11+
* + The name of EMC Corporation may not be used to endorse or promote
12+
* products derived from this software without specific prior written
13+
* permission.
14+
*
15+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17+
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18+
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
19+
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20+
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22+
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23+
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24+
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25+
* POSSIBILITY OF SUCH DAMAGE.
26+
*/
27+
package com.emc.object.util;
28+
29+
/**
30+
* Interface for reporting progress on large file transfers.
31+
*/
32+
public interface ProgressListener {
33+
/**
34+
* Provides feedback on the number of bytes completed and the total number of bytes to transfer.
35+
* @param completed bytes completely transferred
36+
* @param total total number of bytes to transfer
37+
*/
38+
void progress(long completed, long total);
39+
40+
/**
41+
* Reports that some bytes have been transferred. This is a raw method that will be called frequently and can
42+
* be used for computing current transfer rate. Note that if data is retried, the sum of this method's events
43+
* may be more than the total object size. For reporting on percent complete, use the progress method instead.
44+
* @param size number of bytes transferred
45+
*/
46+
void transferred(long size);
47+
}

0 commit comments

Comments
 (0)