Skip to content

Commit

Permalink
Merge pull request #116 from GoogleCloudPlatform/routing_extension
Browse files Browse the repository at this point in the history
Merge routing extension into master
  • Loading branch information
danielbeaudreau authored Nov 9, 2020
2 parents fd7049d + 00b087a commit de09688
Show file tree
Hide file tree
Showing 47 changed files with 2,054 additions and 653 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package com.google.cloud.healthcare.imaging.dicomadapter;

import com.google.cloud.healthcare.IDicomWebClient;
import com.google.cloud.healthcare.imaging.dicomadapter.cstoresender.ICStoreSender;
import com.google.cloud.healthcare.imaging.dicomadapter.cstoresender.ICStoreSenderFactory;
import com.google.cloud.healthcare.imaging.dicomadapter.cmove.ISender;
import com.google.cloud.healthcare.imaging.dicomadapter.cmove.ISenderFactory;
import com.google.cloud.healthcare.imaging.dicomadapter.monitoring.Event;
import com.google.cloud.healthcare.imaging.dicomadapter.monitoring.MonitoringService;
import java.io.IOException;
Expand Down Expand Up @@ -45,16 +45,16 @@ public class CMoveService extends BasicCMoveSCP {
private static Logger log = LoggerFactory.getLogger(CMoveService.class);
private final IDicomWebClient dicomWebClient;
private final AetDictionary aets;
private final ICStoreSenderFactory cstoreSenderFactory;
private final ISenderFactory senderFactory;

CMoveService(
IDicomWebClient dicomWebClient,
AetDictionary aets,
ICStoreSenderFactory cstoreSenderFactory) {
ISenderFactory senderFactory) {
super(UID.StudyRootQueryRetrieveInformationModelMOVE);
this.dicomWebClient = dicomWebClient;
this.aets = aets;
this.cstoreSenderFactory = cstoreSenderFactory;
this.senderFactory = senderFactory;
}

@Override
Expand Down Expand Up @@ -84,7 +84,7 @@ private CMoveTask(Association as, PresentationContext pc,
@Override
public void run() {
List<String> failedInstanceUids = new ArrayList<>();
ICStoreSender cstoreSender = null;
ISender sender = null;
try {
if (canceled) {
throw new CancellationException();
Expand Down Expand Up @@ -126,7 +126,7 @@ public void run() {
return;
}

cstoreSender = cstoreSenderFactory.create();
sender = senderFactory.create();

int successfullInstances = 0;
int remainingInstances = qidoResult.length();
Expand All @@ -149,7 +149,7 @@ public void run() {

try {
MonitoringService.addEvent(Event.CMOVE_CSTORE_REQUEST);
long bytesSent = cstoreSender.cstore(cstoreTarget, studyUid, seriesUid,
long bytesSent = sender.cmove(cstoreTarget, studyUid, seriesUid,
instanceUid, classUid);
successfullInstances++;
MonitoringService.addEvent(Event.CMOVE_CSTORE_BYTES, bytesSent);
Expand Down Expand Up @@ -182,9 +182,9 @@ public void run() {
int msgId = cmd.getInt(Tag.MessageID, -1);
as.removeCancelRQHandler(msgId);

if (cstoreSender != null) {
if (sender != null) {
try {
cstoreSender.close();
sender.close();
} catch (IOException e) {
log.error("Failure closing cstoreSender: ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@

package com.google.cloud.healthcare.imaging.dicomadapter;

import com.google.cloud.healthcare.IDicomWebClient;
import com.google.cloud.healthcare.IDicomWebClient.DicomWebException;
import com.google.cloud.healthcare.deid.redactor.DicomRedactor;
import com.google.cloud.healthcare.imaging.dicomadapter.backupuploader.BackupState;
import com.google.cloud.healthcare.imaging.dicomadapter.backupuploader.BackupUploadService;
import com.google.cloud.healthcare.imaging.dicomadapter.backupuploader.IBackupUploader;
import com.google.cloud.healthcare.imaging.dicomadapter.cstore.DicomStreamUtil;
import com.google.cloud.healthcare.imaging.dicomadapter.cstore.destination.IDestinationClientFactory;
import com.google.cloud.healthcare.imaging.dicomadapter.cstore.destination.DestinationHolder;
import com.google.cloud.healthcare.imaging.dicomadapter.cstore.multipledest.IMultipleDestinationUploadService;
import com.google.cloud.healthcare.imaging.dicomadapter.cstore.multipledest.IMultipleDestinationUploadService.MultipleDestinationUploadServiceException;
import com.google.cloud.healthcare.imaging.dicomadapter.monitoring.Event;
import com.google.cloud.healthcare.imaging.dicomadapter.monitoring.MonitoringService;
import com.google.common.io.CountingInputStream;
Expand All @@ -30,18 +31,15 @@
import java.io.PipedOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.atomic.AtomicReference;

import org.dcm4che3.data.Attributes;
import org.dcm4che3.data.Tag;
import org.dcm4che3.data.VR;
import org.dcm4che3.imageio.codec.Transcoder;
import org.dcm4che3.io.DicomInputStream;
import org.dcm4che3.net.Association;
import org.dcm4che3.net.PDVInputStream;
import org.dcm4che3.net.Status;
Expand All @@ -59,23 +57,19 @@ public class CStoreService extends BasicCStoreSCP {

private static Logger log = LoggerFactory.getLogger(CStoreService.class);

private final IDicomWebClient defaultDicomWebClient;
private final Map<DestinationFilter, IDicomWebClient> destinationMap;
private final IDestinationClientFactory destinationClientFactory;
private final IMultipleDestinationUploadService multipleSendService;
private final DicomRedactor redactor;
private final String transcodeToSyntax;
private final BackupUploadService backupUploadService;

CStoreService(IDicomWebClient defaultDicomWebClient,
Map<DestinationFilter, IDicomWebClient> destinationMap,
DicomRedactor redactor, String transcodeToSyntax, BackupUploadService backupUploadService) {
this.defaultDicomWebClient = defaultDicomWebClient;
this.destinationMap =
destinationMap != null && destinationMap.size() > 0 ? destinationMap : null;
this.redactor = redactor;
this.transcodeToSyntax =
transcodeToSyntax != null && transcodeToSyntax.length() > 0 ? transcodeToSyntax : null;

this.backupUploadService = backupUploadService;
CStoreService(IDestinationClientFactory destinationClientFactory,
DicomRedactor redactor,
String transcodeToSyntax,
IMultipleDestinationUploadService multipleSendService) {
this.destinationClientFactory = destinationClientFactory;
this.redactor = redactor;
this.transcodeToSyntax = transcodeToSyntax != null && transcodeToSyntax.length() > 0 ? transcodeToSyntax : null;
this.multipleSendService = multipleSendService;

if(this.transcodeToSyntax != null) {
log.info("Transcoding to: " + transcodeToSyntax);
Expand All @@ -90,116 +84,80 @@ protected void store(
PDVInputStream inPdvStream,
Attributes response)
throws IOException {
try {
MonitoringService.addEvent(Event.CSTORE_REQUEST);

AtomicReference<BackupState> backupState = new AtomicReference<>();
AtomicReference<IDicomWebClient> destinationClient = new AtomicReference<>();
long uploadedBytesCount = 0;

try {
MonitoringService.addEvent(Event.CSTORE_REQUEST);

String sopClassUID = request.getString(Tag.AffectedSOPClassUID);
String sopInstanceUID = request.getString(Tag.AffectedSOPInstanceUID);
String transferSyntax = presentationContext.getTransferSyntax();

validateParam(sopClassUID, "AffectedSOPClassUID");
validateParam(sopInstanceUID, "AffectedSOPInstanceUID");

final CountingInputStream countingStream;

if(destinationMap != null){
DicomInputStream inDicomStream = new DicomInputStream(inPdvStream);
inDicomStream.mark(Integer.MAX_VALUE);
Attributes attrs = inDicomStream.readDataset(-1, Tag.PixelData);
inDicomStream.reset();

countingStream = new CountingInputStream(inDicomStream);
destinationClient.set(selectDestinationClient(association.getAAssociateAC().getCallingAET(), attrs));
} else {
countingStream = new CountingInputStream(inPdvStream);
destinationClient.set(defaultDicomWebClient);
}
String sopClassUID = request.getString(Tag.AffectedSOPClassUID);
String sopInstanceUID = request.getString(Tag.AffectedSOPInstanceUID);
String transferSyntax = presentationContext.getTransferSyntax();

InputStream inWithHeader =
DicomStreamUtil.dicomStreamWithFileMetaHeader(
sopInstanceUID, sopClassUID, transferSyntax, countingStream);
validateParam(sopClassUID, "AffectedSOPClassUID");
validateParam(sopInstanceUID, "AffectedSOPInstanceUID");

List<StreamProcessor> processorList = new ArrayList<>();
if (redactor != null) {
processorList.add(redactor::redact);
}
DestinationHolder destinationHolder =
destinationClientFactory.create(association.getAAssociateAC().getCallingAET(), inPdvStream);

if(transcodeToSyntax != null && !transcodeToSyntax.equals(transferSyntax)) {
processorList.add((inputStream, outputStream) -> {
try (Transcoder transcoder = new Transcoder(inputStream)) {
transcoder.setIncludeFileMetaInformation(true);
transcoder.setDestinationTransferSyntax(transcodeToSyntax);
transcoder.transcode((transcoder1, dataset) -> outputStream);
}
});
}
final CountingInputStream countingStream = destinationHolder.getCountingInputStream();

if (backupUploadService != null) {
processorList.add((inputStream, outputStream) -> {
backupState.set(backupUploadService.createBackup(inputStream, sopInstanceUID));
backupUploadService.getBackupStream(sopInstanceUID).transferTo(outputStream);
});
}
List<StreamProcessor> processorList = new ArrayList<>();
if (redactor != null) {
processorList.add(redactor::redact);
}

if(transcodeToSyntax != null && !transcodeToSyntax.equals(transferSyntax)) {
processorList.add((inputStream, outputStream) -> {
destinationClient.get().stowRs(inputStream);
try (Transcoder transcoder = new Transcoder(inputStream)) {
transcoder.setIncludeFileMetaInformation(true);
transcoder.setDestinationTransferSyntax(transcodeToSyntax);
transcoder.transcode((transcoder1, dataset) -> outputStream);
}
});

processStream(association.getApplicationEntity().getDevice().getExecutor(), inWithHeader, processorList);
uploadedBytesCount = countingStream.getCount();

updateResponseToSuccess(response, uploadedBytesCount);

if (backupUploadService != null && backupState.get() != null) {
backupUploadService.removeBackup(backupState.get().getUniqueFileName());
}
} catch (DicomWebException dwe) {
if (backupUploadService != null
&& backupUploadService.filterHttpCode(dwe.getHttpStatus())
&& backupState.get().getAttemptsCountdown() > 0) {
log.error("C-STORE request failed. Trying to resend...", dwe);
resendWithDelayRecursivelyExceptionally(backupState, destinationClient);
updateResponseToSuccess(response, uploadedBytesCount);
return;
}
reportError(dwe);
throw new DicomServiceException(dwe.getStatus(), dwe);
} catch (IBackupUploader.BackupException e) {
MonitoringService.addEvent(Event.CSTORE_BACKUP_ERROR);
log.error("Backup io processing during C-STORE request is failed: ", e);
reportError(e);
throw new DicomServiceException(Status.ProcessingFailure, e);
} catch (DicomServiceException e) {
reportError(e);
throw e;
} catch (Throwable e) {
reportError(e);
throw new DicomServiceException(Status.ProcessingFailure, e);
}
}

private void updateResponseToSuccess(Attributes response, long countingStreamCount) {
response.setInt(Tag.Status, VR.US, Status.Success);
MonitoringService.addEvent(Event.CSTORE_BYTES, countingStreamCount);
}
if (multipleSendService != null) {
processorList.add((inputStream, outputStream) -> {
multipleSendService.start(
destinationHolder.getHealthcareDestinations(),
destinationHolder.getDicomDestinations(),
inputStream,
sopClassUID,
sopInstanceUID
);
});
} else {
processorList.add((inputStream, outputStream) -> {
destinationHolder.getSingleDestination().stowRs(inputStream);
});
}

private void resendWithDelayRecursivelyExceptionally(AtomicReference<BackupState> backupState, AtomicReference<IDicomWebClient> destinationClient)
throws DicomServiceException {
try {
backupUploadService.startUploading(destinationClient.get(), backupState.get());
} catch (IBackupUploader.BackupException bae) {
MonitoringService.addEvent(Event.CSTORE_BACKUP_ERROR);
throw new DicomServiceException(bae.getDicomStatus() != null ? bae.getDicomStatus() : Status.ProcessingFailure, bae);
InputStream inWithHeader =
DicomStreamUtil.dicomStreamWithFileMetaHeader(
sopInstanceUID, sopClassUID, transferSyntax, countingStream);

processStream(association.getApplicationEntity().getDevice().getExecutor(),
inWithHeader, processorList);

response.setInt(Tag.Status, VR.US, Status.Success);
MonitoringService.addEvent(Event.CSTORE_BYTES, countingStream.getCount());
} catch (DicomWebException e) {
reportError(e, Event.CSTORE_ERROR);
throw new DicomServiceException(e.getStatus(), e);
} catch (DicomServiceException e) {
reportError(e, Event.CSTORE_ERROR);
throw e;
} catch (MultipleDestinationUploadServiceException me) {
reportError(me, null);
throw new DicomServiceException(me.getDicomStatus() != null ? me.getDicomStatus() : Status.ProcessingFailure, me);
} catch (Throwable e) {
reportError(e, Event.CSTORE_ERROR);
throw new DicomServiceException(Status.ProcessingFailure, e);
}
}

private void reportError(Throwable e) {
MonitoringService.addEvent(Event.CSTORE_ERROR);
private void reportError(Throwable e, Event event) {
if (event != null) {
MonitoringService.addEvent(event);
}
log.error("C-STORE request failed: ", e);
}

Expand All @@ -209,15 +167,6 @@ private void validateParam(String value, String name) throws DicomServiceExcepti
}
}

private IDicomWebClient selectDestinationClient(String callingAet, Attributes attrs){
for(DestinationFilter filter: destinationMap.keySet()){
if(filter.matches(callingAet, attrs)){
return destinationMap.get(filter);
}
}
return defaultDicomWebClient;
}

private void processStream(Executor underlyingExecutor, InputStream inputStream,
List<StreamProcessor> processorList) throws Throwable {
if (processorList.size() == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ public class Flags {
)
List<Integer> httpErrorCodesToRetry = new ArrayList<>();

@Parameter(
names = {"--send_to_all_matching_destinations"},
description = "If true, when processing C-STORE requests with a destination config specified, the adapter will " +
"send to all matching destinations rather than the first matching destination."
)
Boolean sendToAllMatchingDestinations = false;

public Flags() {
}
}
Loading

0 comments on commit de09688

Please sign in to comment.