Skip to content

Commit

Permalink
Merge pull request #111 from GoogleCloudPlatform/retries
Browse files Browse the repository at this point in the history
Merge Retries into master
  • Loading branch information
danielbeaudreau authored Sep 25, 2020
2 parents 5d86302 + 44d1db3 commit f7889e7
Show file tree
Hide file tree
Showing 29 changed files with 1,812 additions and 76 deletions.
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Table of Contents
* [Deployment using Data Protection Toolkit](#deployment-using-data-protection-toolkit)
* [Building from source](#building-from-source)
* [Building and publishing Docker Images](#building-and-publishing-docker-images)
* [Backup and retry upload](#backup-and-retry-upload)
* [Wiki](#wiki)
* [Troubleshooting](#troubleshooting)

Expand Down Expand Up @@ -287,6 +288,36 @@ TAG=gcr.io/${PROJECT}/dicom-export-adapter
gradle dockerBuildImage -Pdocker_tag=${TAG}
docker push ${TAG}
```
## Backup and retry upload
In C-STORE to STOW-RS mode, the Import Adapter can use additional flags to improve the reliability of file uploading. Before uploading a file to the dicomweb endpoint, the file is saved to temporary storage from which the Import Adapter tries to upload the file the specified number of times. The file can be saved locally or loaded into a GCS bucket.
After a successful upload, the temporary file will be deleted. Also, the user can independently configure a TTL to automatically delete files from GCS.

The following arguments are used to configure the mode:
* --persistent_file_storage_location: temporary location for storing files before send.
* --persistent_file_upload_retry_amount: upload retry amount.
* --min_upload_delay: minimum delay before upload backup file (ms) (default 100ms).
* --max_waiting_time_between_uploads: maximum waiting time between uploads (ms) (default 5000ms).

If the flag --persistent_file_storage_location is not used then loading occurs without writing a temporary file.

if the value for the flag --persistent_file_storage_location is specified in the format gs://bucket-id/some-directory,
the file will be written to the specified project bucket. It is also necessary to set the flag --gcs_backup_project_id=GCS_PROJECT_ID, where GCS_PROJECT_ID is the GSC project id in which the bucket was created.

If you are using Kubernetes, then in the file `dicom_adapter.yaml` change the arguments in the file as follows:

```yaml
args:
- "--persistent_file_storage_location=/tmp/backupfile"
- "--persistent_file_upload_retry_amount=5"
- "--min_upload_delay=100"
- "--max_waiting_time_between_uploads=5000"
```

if you are using import adapter locally:
```shell
gradle run -Dexec.args="--dimse_aet=IMPORTADAPTER --dimse_port=4008 --dicomweb_address=http://localhost:80 --persistent_file_storage_location=/tmp/backupfile --persistent_file_upload_retry_amount=5 --min_upload_delay=100 --max_waiting_time_between_uploads=5000"
```

## Wiki

For addition documentation please check out the [Wiki](https://github.com/GoogleCloudPlatform/healthcare-dicom-dicomweb-adapter/wiki).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public interface IDicomWebClient {
class DicomWebException extends Exception {

private int status = Status.ProcessingFailure;
private int httpStatus;

public DicomWebException(String message, int status) {
super(message);
Expand All @@ -55,6 +56,7 @@ public DicomWebException(
int defaultDicomStatus) {
super(message);
this.status = httpStatusToDicomStatus(httpStatus, defaultDicomStatus);
this.httpStatus = httpStatus;
}

public DicomWebException(
Expand All @@ -64,6 +66,7 @@ public DicomWebException(
int defaultDicomStatus) {
super(message, cause);
this.status = httpStatusToDicomStatus(httpStatus, defaultDicomStatus);
this.httpStatus = httpStatus;
}

public DicomWebException(String message) {
Expand All @@ -78,6 +81,10 @@ public int getStatus() {
return status;
}

public int getHttpStatus() {
return httpStatus;
}

private int httpStatusToDicomStatus(int httpStatus, int defaultStatus) {
switch (httpStatus) {
case HttpStatusCodes.STATUS_CODE_SERVICE_UNAVAILABLE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private static class MultipartHandler implements MultiPartParser.Handler {
ByteBuffer lastItem;

@Override
public boolean content(ByteBuffer item, boolean last) {
public boolean content(ByteBuffer item, boolean last) {
lastItem = item;

// Returning true seems to trigger early abort of the parser.
Expand Down
4 changes: 4 additions & 0 deletions import/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ dependencies {

compile "com.github.red1408:deid-redactor:233908ca"

compile "com.google.cloud:google-cloud-storage:1.98.0"
compile 'com.google.cloud:google-cloud-nio:0.116.0-alpha'

testCompile "com.google.truth:truth:1.0"
testCompile "org.mockito:mockito-core:3.+"
testCompile "junit:junit:4.2"
testCompile project(path: ":dicom_util", configuration: 'testArtifacts')
testCompile project(path: ":util", configuration: 'testArtifacts')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import com.google.cloud.healthcare.IDicomWebClient;
import com.google.cloud.healthcare.IDicomWebClient.DicomWebException;
import com.google.cloud.healthcare.deid.redactor.DicomRedactor;
import com.google.cloud.healthcare.imaging.dicomadapter.backupuploader.BackupState;
import com.google.cloud.healthcare.imaging.dicomadapter.backupuploader.BackupUploadService;
import com.google.cloud.healthcare.imaging.dicomadapter.backupuploader.IBackupUploader;
import com.google.cloud.healthcare.imaging.dicomadapter.monitoring.Event;
import com.google.cloud.healthcare.imaging.dicomadapter.monitoring.MonitoringService;
import com.google.common.io.CountingInputStream;
Expand All @@ -32,6 +35,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.atomic.AtomicReference;

import org.dcm4che3.data.Attributes;
import org.dcm4che3.data.Tag;
import org.dcm4che3.data.VR;
Expand All @@ -58,17 +63,20 @@ public class CStoreService extends BasicCStoreSCP {
private final Map<DestinationFilter, IDicomWebClient> destinationMap;
private final DicomRedactor redactor;
private final String transcodeToSyntax;
private final BackupUploadService backupUploadService;

CStoreService(IDicomWebClient defaultDicomWebClient,
Map<DestinationFilter, IDicomWebClient> destinationMap,
DicomRedactor redactor, String transcodeToSyntax) {
DicomRedactor redactor, String transcodeToSyntax, BackupUploadService backupUploadService) {
this.defaultDicomWebClient = defaultDicomWebClient;
this.destinationMap =
destinationMap != null && destinationMap.size() > 0 ? destinationMap : null;
this.redactor = redactor;
this.transcodeToSyntax =
transcodeToSyntax != null && transcodeToSyntax.length() > 0 ? transcodeToSyntax : null;

this.backupUploadService = backupUploadService;

if(this.transcodeToSyntax != null) {
log.info("Transcoding to: " + transcodeToSyntax);
}
Expand All @@ -81,71 +89,112 @@ protected void store(
Attributes request,
PDVInputStream inPdvStream,
Attributes response)
throws DicomServiceException, IOException {
try {
MonitoringService.addEvent(Event.CSTORE_REQUEST);

String sopClassUID = request.getString(Tag.AffectedSOPClassUID);
String sopInstanceUID = request.getString(Tag.AffectedSOPInstanceUID);
String transferSyntax = presentationContext.getTransferSyntax();

validateParam(sopClassUID, "AffectedSOPClassUID");
validateParam(sopInstanceUID, "AffectedSOPInstanceUID");

final CountingInputStream countingStream;
final IDicomWebClient destinationClient;
if(destinationMap != null){
DicomInputStream inDicomStream = new DicomInputStream(inPdvStream);
inDicomStream.mark(Integer.MAX_VALUE);
Attributes attrs = inDicomStream.readDataset(-1, Tag.PixelData);
inDicomStream.reset();

countingStream = new CountingInputStream(inDicomStream);
destinationClient = selectDestinationClient(association.getAAssociateAC().getCallingAET(), attrs);
} else {
countingStream = new CountingInputStream(inPdvStream);
destinationClient = defaultDicomWebClient;
}
throws IOException {

InputStream inWithHeader =
DicomStreamUtil.dicomStreamWithFileMetaHeader(
sopInstanceUID, sopClassUID, transferSyntax, countingStream);
AtomicReference<BackupState> backupState = new AtomicReference<>();
AtomicReference<IDicomWebClient> destinationClient = new AtomicReference<>();
long uploadedBytesCount = 0;

List<StreamProcessor> processorList = new ArrayList<>();
if (redactor != null) {
processorList.add(redactor::redact);
}
try {
MonitoringService.addEvent(Event.CSTORE_REQUEST);

String sopClassUID = request.getString(Tag.AffectedSOPClassUID);
String sopInstanceUID = request.getString(Tag.AffectedSOPInstanceUID);
String transferSyntax = presentationContext.getTransferSyntax();

validateParam(sopClassUID, "AffectedSOPClassUID");
validateParam(sopInstanceUID, "AffectedSOPInstanceUID");

final CountingInputStream countingStream;

if(destinationMap != null){
DicomInputStream inDicomStream = new DicomInputStream(inPdvStream);
inDicomStream.mark(Integer.MAX_VALUE);
Attributes attrs = inDicomStream.readDataset(-1, Tag.PixelData);
inDicomStream.reset();

countingStream = new CountingInputStream(inDicomStream);
destinationClient.set(selectDestinationClient(association.getAAssociateAC().getCallingAET(), attrs));
} else {
countingStream = new CountingInputStream(inPdvStream);
destinationClient.set(defaultDicomWebClient);
}

InputStream inWithHeader =
DicomStreamUtil.dicomStreamWithFileMetaHeader(
sopInstanceUID, sopClassUID, transferSyntax, countingStream);

List<StreamProcessor> processorList = new ArrayList<>();
if (redactor != null) {
processorList.add(redactor::redact);
}

if(transcodeToSyntax != null && !transcodeToSyntax.equals(transferSyntax)) {
processorList.add((inputStream, outputStream) -> {
try (Transcoder transcoder = new Transcoder(inputStream)) {
transcoder.setIncludeFileMetaInformation(true);
transcoder.setDestinationTransferSyntax(transcodeToSyntax);
transcoder.transcode((transcoder1, dataset) -> outputStream);
}
});
}

if (backupUploadService != null) {
processorList.add((inputStream, outputStream) -> {
backupState.set(backupUploadService.createBackup(inputStream, sopInstanceUID));
backupUploadService.getBackupStream(sopInstanceUID).transferTo(outputStream);
});
}

if(transcodeToSyntax != null && !transcodeToSyntax.equals(transferSyntax)) {
processorList.add((inputStream, outputStream) -> {
try (Transcoder transcoder = new Transcoder(inputStream)) {
transcoder.setIncludeFileMetaInformation(true);
transcoder.setDestinationTransferSyntax(transcodeToSyntax);
transcoder.transcode((transcoder1, dataset) -> outputStream);
}
destinationClient.get().stowRs(inputStream);
});

processStream(association.getApplicationEntity().getDevice().getExecutor(), inWithHeader, processorList);
uploadedBytesCount = countingStream.getCount();

updateResponseToSuccess(response, uploadedBytesCount);

if (backupUploadService != null && backupState.get() != null) {
backupUploadService.removeBackup(backupState.get().getUniqueFileName());
}
} catch (DicomWebException dwe) {
if (backupUploadService != null
&& backupUploadService.filterHttpCode(dwe.getHttpStatus())
&& backupState.get().getAttemptsCountdown() > 0) {
log.error("C-STORE request failed. Trying to resend...", dwe);
resendWithDelayRecursivelyExceptionally(backupState, destinationClient);
updateResponseToSuccess(response, uploadedBytesCount);
return;
}
reportError(dwe);
throw new DicomServiceException(dwe.getStatus(), dwe);
} catch (IBackupUploader.BackupException e) {
MonitoringService.addEvent(Event.CSTORE_BACKUP_ERROR);
log.error("Backup io processing during C-STORE request is failed: ", e);
reportError(e);
throw new DicomServiceException(Status.ProcessingFailure, e);
} catch (DicomServiceException e) {
reportError(e);
throw e;
} catch (Throwable e) {
reportError(e);
throw new DicomServiceException(Status.ProcessingFailure, e);
}
}

processorList.add((inputStream, outputStream) -> {
destinationClient.stowRs(inputStream);
});

processStream(association.getApplicationEntity().getDevice().getExecutor(),
inWithHeader, processorList);

response.setInt(Tag.Status, VR.US, Status.Success);
MonitoringService.addEvent(Event.CSTORE_BYTES, countingStream.getCount());
} catch (DicomWebException e) {
reportError(e);
DicomServiceException serviceException = new DicomServiceException(e.getStatus(), e);
serviceException.setErrorComment(e.getMessage());
throw serviceException;
} catch (DicomServiceException e) {
reportError(e);
throw e;
} catch (Throwable e) {
reportError(e);
throw new DicomServiceException(Status.ProcessingFailure, e);
private void updateResponseToSuccess(Attributes response, long countingStreamCount) {
response.setInt(Tag.Status, VR.US, Status.Success);
MonitoringService.addEvent(Event.CSTORE_BYTES, countingStreamCount);
}

private void resendWithDelayRecursivelyExceptionally(AtomicReference<BackupState> backupState, AtomicReference<IDicomWebClient> destinationClient)
throws DicomServiceException {
try {
backupUploadService.startUploading(destinationClient.get(), backupState.get());
} catch (IBackupUploader.BackupException bae) {
MonitoringService.addEvent(Event.CSTORE_BACKUP_ERROR);
throw new DicomServiceException(bae.getDicomStatus() != null ? bae.getDicomStatus() : Status.ProcessingFailure, bae);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;

import java.util.ArrayList;
import java.util.List;

@Parameters(separators = "= ")
public class Flags {

Expand Down Expand Up @@ -149,6 +152,42 @@ public class Flags {
)
Boolean fuzzyMatching = false;

@Parameter(
names = {"--persistent_file_storage_location"},
description = "temporary location for storing files before send"
)
String persistentFileStorageLocation = "";

@Parameter(
names = {"--gcs_backup_project_id"},
description = "Google Cloud project ID"
)
String gcsBackupProjectId = "";

@Parameter(
names = {"--persistent_file_upload_retry_amount"},
description = "upload retry amount"
)
Integer persistentFileUploadRetryAmount = 0;

@Parameter(
names = {"--min_upload_delay"},
description = "minimum delay before upload backup file (ms)"
)
Integer minUploadDelay = 100;

@Parameter(
names = {"--max_waiting_time_between_uploads"},
description = "maximum waiting time between uploads (ms)"
)
Integer maxWaitingTimeBetweenUploads = 5000;

@Parameter(
names = {"--http_error_codes_to_retry"},
description = "http codes list to retry that less than 500."
)
List<Integer> httpErrorCodesToRetry = new ArrayList<>();

public Flags() {
}
}
Loading

0 comments on commit f7889e7

Please sign in to comment.