Skip to content

Commit

Permalink
Merge pull request #29 from yaxia/master
Browse files Browse the repository at this point in the history
Storage Client Library - 0.4.0
  • Loading branch information
veena-udayabhanu committed Oct 30, 2014
2 parents 341df62 + 02b8826 commit 8984bee
Show file tree
Hide file tree
Showing 20 changed files with 1,891 additions and 193 deletions.
13 changes: 13 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
Note: This is an Azure Storage only package. The all up Azure node sdk still has the old storage bits in there. In a future release, those storage bits will be removed and an npm dependency to this storage node sdk will
be taken. This is a CTP v1 release and the changes described below indicate the changes from the Azure node SDK 0.9.8 available here - https://github.com/Azure/azure-sdk-for-node.

2014.10.28 Version 0.4.0

ALL
* Provide an option to enable/disable nagling. Nagling is disabled by default. It can be enabled by setting options.useNagleAlgorithm to true.
* Added batch operation callback in sequence mode.

BLOB
* Added support to download a single blob in parallel similar to upload. You can set �parallelOperationThreadCount� option for api�s that download a blob to indicate number of parallel operations to use for download.
* Added speed summary in blob downloading.

FILE
* Fixed an issue which caused an invalid resource name error when the directory name starts or ends with a '/'

2014.08.20 Version 0.3.3

BLOB
Expand Down
2 changes: 1 addition & 1 deletion lib/common/common.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require('./util/patch-xmlbuilder');

var nodeVersion = azureutil.getNodeVersion();
if (nodeVersion.major === 0 && nodeVersion.minor > 8 && !(nodeVersion.minor > 10 || (nodeVersion.minor === 10 && nodeVersion.patch >= 3))) {
throw new Error('The Windows Azure node SDK does not work with node versions > 0.8.22 and < 0.10.3. Please upgrade to node >= 0.10.3');
throw new Error('The Microsoft Azure node SDK does not work with node versions > 0.9.0 and < 0.10.3. Please upgrade to node >= 0.10.3');
}

exports.xmlbuilder = require('xmlbuilder');
Expand Down
25 changes: 20 additions & 5 deletions lib/common/services/storageserviceclient.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ var defaultRequestLocationMode = Constants.RequestLocationMode.PRIMARY_ONLY;

var Logger = require('../diagnostics/logger');

var moduleVersion = require('../../../package.json').version;

/**
* Creates a new StorageServiceClient object.
*
Expand Down Expand Up @@ -114,7 +112,7 @@ StorageServiceClient._getDefaultXml2jsSettings = function() {
xml2jsSettings.charkey = Constants.TableConstants.XML_VALUE_MARKER;

// from xml2js guide: always put child nodes in an array if true; otherwise an array is created only if there is more than one.
xml2jsSettings.explicitArray = false;
xml2jsSettings.explicitArray = false;

return xml2jsSettings;
};
Expand All @@ -129,7 +127,7 @@ StorageServiceClient.prototype.setHost = function (host) {
var parsedHost;
if(!azureutil.objectIsNull(hostUri)) {
if(hostUri.indexOf('http') === -1 && hostUri.indexOf('//') !== 0){
hostUri = '//' + hostUri;
hostUri = '//' + hostUri;
}
parsedHost = url.parse(hostUri, false, true);

Expand Down Expand Up @@ -244,6 +242,11 @@ StorageServiceClient.prototype._performRequest = function (webResource, body, op
options.requestLocationMode = defaultRequestLocationMode;
}

// Initialize whether nagling is used or not.
if(azureutil.objectIsNull(options.useNagleAlgorithm)) {
options.useNagleAlgorithm = this.useNagleAlgorithm;
}

this._initializeLocation(options);

// Initialize the operationExpiryTime
Expand Down Expand Up @@ -283,6 +286,7 @@ StorageServiceClient.prototype._performRequest = function (webResource, body, op
callback(responseObject, next);
};

var endResponse;
var buildRequest = function (headersOnly) {
// Build request (if body was set before, request will process immediately, if not it'll wait for the piping to happen
var requestStream;
Expand All @@ -299,6 +303,7 @@ StorageServiceClient.prototype._performRequest = function (webResource, body, op

if (headersOnly) {
requestStream = requestWithDefaults(finalRequestOptions);

requestStream.on('error', processResponseCallback);
requestStream.on('response', function (response) {
var responseLength = 0;
Expand All @@ -315,13 +320,20 @@ StorageServiceClient.prototype._performRequest = function (webResource, body, op
}

response.length = responseLength;
processResponseCallback(null, response);
endResponse = response;
});
});
} else {
requestStream = requestWithDefaults(finalRequestOptions, processResponseCallback);
}

//If useNagleAlgorithm is not set or the value is set and is false, setNoDelay is set to true.
if (azureutil.objectIsNull(options.useNagleAlgorithm) || options.useNagleAlgorithm === false) {
requestStream.on('request', function(httpRequest) {
httpRequest.setNoDelay(true);
});
}

// Workaround to avoid request from potentially setting unwanted (rejected) headers by the service
var oldEnd = requestStream.end;
requestStream.end = function () {
Expand All @@ -348,6 +360,9 @@ StorageServiceClient.prototype._performRequest = function (webResource, body, op

// Pipe any input / output streams
if (body && body.inputStream) {
body.inputStream.on('finish', function () {
processResponseCallback(null, endResponse);
});
buildRequest(true).pipe(body.inputStream);
} else if (body && body.outputStream) {
var sendUnchunked = function () {
Expand Down
166 changes: 150 additions & 16 deletions lib/common/streams/batchoperation.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ var nodeVersion = azureutil.getNodeVersion();
var enableReuseSocket = nodeVersion.major >= 0 && nodeVersion.minor >= 10;

/**
* Concurrently execute batch operations.
* Concurrently execute batch operations and call operation callback randomly or in sequence.
* Random mode is for uploading.
* 1. Fire user callback when the operation is done.
* Sequence mode is for downloading.
* 1. Fire user callback when the operation is done and all previous operations and callback has finished.
* 2. BatchOperation guarantees the user callback is fired one by one.
* 3. The next user callback can't be fired until the current one is completed.
*/
function BatchOperation(name, options) {
if (!options) {
Expand All @@ -42,12 +48,15 @@ function BatchOperation(name, options) {
this.name = name;
this.logger = options.logger || new Logger(Logger.LogLevels.INFO);
this.operationMemoryUsage = options.operationMemoryUsage || DEFAULT_OPERATION_MEMORY_USAGE;
this.callbackInOrder = options.callbackInOrder === true;
this._currentOperationId = this.callbackInOrder ? 1 : -1;
this.concurrency = DEFAULT_GLOBAL_CONCURRENCY;

this._emitter = new EventEmitter();
this._enableComplete = false;
this._ended = false;
this._error = null;
this._paused = false;

//Total operations count(queued and active and connected)
this._totalOperation = 0;
Expand All @@ -69,6 +78,7 @@ var OperationState = {
INITED : 'inited',
QUEUED : 'queued',
RUNNING : 'running',
COMPLETE : 'complete',
CALLBACK : 'callback',
ERROR : 'error'
};
Expand Down Expand Up @@ -134,6 +144,23 @@ BatchOperation.prototype.enableComplete = function() {
this._tryEmitEndEvent();
};

/**
* Stop firing user call back
*/
BatchOperation.prototype.pause = function () {
this._paused = true;
};

/**
* Start firing user call back
*/
BatchOperation.prototype.resume = function () {
if (this._paused) {
this._paused = false;
this._fireOperationUserCallback();
}
};

/**
* Add event listener
*/
Expand Down Expand Up @@ -180,29 +207,65 @@ BatchOperation.prototype.getBatchOperationCallback = function (operation) {
}

operation._callbackArguments = arguments;
self._fireOperationUserCallback(operation);
if (self._paused) {
operation.status = OperationState.CALLBACK;
self.logger.debug(util.format('Batch operation paused and Operation %d wait for firing callback', operation.operationId));
} else if (self.callbackInOrder) {
if (self._currentOperationId === operation.operationId) {
self._fireOperationUserCallback(operation);
} else if (self._currentOperationId > operation.operationId) {
throw new Error('Debug error: current callback operation id cannot be larger than operation id');
} else {
operation.status = OperationState.CALLBACK;
self.logger.debug(util.format('Operation %d is waiting for firing callback %s', operation.operationId, self._currentOperationId));
}
} else {
self._fireOperationUserCallback(operation);
}

self._tryEmitDrainEvent();
operation = null;
self = null;
};
};

/**
* Fire user's call back
*/
BatchOperation.prototype._fireOperationUserCallback = function (operation) {
// fire the callback, if exists
if(operation._userCallback) {
this.logger.debug(util.format('Fire user call back for operation %d', operation.operationId));
operation._fireUserCallback();
}

// remove the operation from the array and decrement the counter
var index = this._operations.indexOf(operation);
this._operations.splice(index, 1);
this._activeOperation--;

// check if batch has ended and if so emit end event
this._tryEmitEndEvent();
if (!operation) {
var index = this._getCallbackOperationIndex();
if (index != -1) {
operation = this._operations[index];
}
}

if (operation && !this._paused) {
// fire the callback, if exists
if (operation._userCallback) {
this.logger.debug(util.format('Fire user call back for operation %d', operation.operationId));
// make sure UserCallback is a sync operation in sequence mode.
// both async and sync operations are available for random mode.
operation._fireUserCallback();
}

// remove the operation from the array and decrement the counter
this._operations.splice(index, 1);
this._activeOperation--;
operation.status = OperationState.COMPLETE;
index = operation = null;

if (this.callbackInOrder) {
this._currentOperationId++;
}

this._fireOperationUserCallback();
} else if (this._paused) {
this._tryEmitDrainEvent();
} else {
// check if batch has ended and if so emit end event
this._tryEmitEndEvent();
}
};

/**
Expand All @@ -212,7 +275,7 @@ BatchOperation.prototype._fireOperationUserCallback = function (operation) {
BatchOperation.prototype._tryEmitEndEvent = function () {
if(this._enableComplete && this._activeOperation === 0 && this._operations.length === 0) {
this._ended = true;
this.logger.debug(util.format('Batch operation %s emit the end event', this.name));
this.logger.debug(util.format('Batch operation %s emits the end event', this.name));
this._emitter.emit('end', this._error, null);
return true;
}
Expand All @@ -223,13 +286,51 @@ BatchOperation.prototype._tryEmitEndEvent = function () {
* Try to emit the drain event
*/
BatchOperation.prototype._tryEmitDrainEvent = function () {
if (!this._emitter) return false;
if(!this.IsWorkloadHeavy() || this._activeOperation < this.concurrency) {
this._emitter.emit('drain');
return true;
}
return false;
};

/**
* Get the current active operation index.
* Only the active operation could call user's callback in sequence model.
* The other finished but not active operations should wait for wake up.
*/
BatchOperation.prototype._getCallbackOperationIndex = function () {
var operation = null;
for (var i = 0; i < this._operations.length; i++) {
operation = this._operations[i];
if (this.callbackInOrder) {
//Sequence mode
if (operation.operationId == this._currentOperationId) {
if (operation.status === OperationState.CALLBACK) {
return i;
} else {
return -1;
}
}
} else {
//Random mode
if (operation.status === OperationState.CALLBACK) {
return i;
}
}
}
return -1;
};

/**
* Do nothing and directly call the callback.
* In random mode, the user callback will be called immediately
* In sequence mode, the user callback will be called after the previous callback has been called
*/
BatchOperation.noOperation = function (cb) {
cb();
};

/**
* Rest operation in sdk
*/
Expand Down Expand Up @@ -270,4 +371,37 @@ function RestOperation(serviceClient, operation) {

BatchOperation.RestOperation = RestOperation;

/**
* Common operation wrapper
*/
function CommonOperation(operationFunc, callback) {
this.status = OperationState.Inited;
this.operationId = -1;
this._callbackArguments = null;
var sliceStart = 2;
if (azureutil.objectIsFunction(callback)) {
this._userCallback = callback;
} else {
this._userCallback = null;
sliceStart = 1;
}
var operationArguments = Array.prototype.slice.call(arguments).slice(sliceStart);
this.run = function (cb) {
if (!cb) cb = this._userCallback;
operationArguments.push(cb);
this.status = OperationState.RUNNING;
operationFunc.apply(null, operationArguments);
operationArguments = operationFunc = null;
};

this._fireUserCallback = function () {
if (this._userCallback) {
this._userCallback.apply(null, this._callbackArguments);
}
this._userCallback = this._callbackArguments = null;
};
}

BatchOperation.CommonOperation = CommonOperation;

module.exports = BatchOperation;
Loading

0 comments on commit 8984bee

Please sign in to comment.