Skip to content
This repository has been archived by the owner on May 19, 2020. It is now read-only.

Commit

Permalink
refactoring code because the set to running should be done just befor…
Browse files Browse the repository at this point in the history
…e the actual running not 10 years before!

Adding a commented out section in push model in case the test shows it requres further wait.
  • Loading branch information
aoehmichen committed Nov 24, 2017
1 parent 6de685d commit eeb9246
Showing 4 changed files with 86 additions and 62 deletions.
70 changes: 32 additions & 38 deletions src/jobExecutorAbstract.js
Original file line number Diff line number Diff line change
@@ -56,6 +56,9 @@ JobExecutorAbstract.prototype.pushModel = function() {
let _this = this;

return new Promise(function(resolve, reject) {
// while(_this._model.statusLock){
// setTimeout(function() { _this.fetchModel(); }, 500); // we wait for 500 ms before refreshing again
// }
let replacementData = _this._model;
delete replacementData._id; // Cleanup MongoDB managed _id field, if any
_this._jobCollection.findOneAndReplace({ _id : _this._jobID }, replacementData, { upsert: true, returnOriginal: false })
@@ -109,48 +112,39 @@ JobExecutorAbstract.prototype._exec = function(command, args, options) {
}); // Post execution error
}; // end_fn

//Clean model for execution
_this._model.stdout = '';
_this._model.stderr = '';
_this._model.status.unshift(Constants.EAE_JOB_STATUS_RUNNING);
_this._model.startDate = new Date();
_this.pushModel().then(function() {
_this._preExecution().then(function() {
//Fork a process on the machine
_this._child_process = child_process.spawn(command, args, options);

//Stores stdout
_this._child_process.stdout.on('data', function (stdout_data) {
_this._model.stdout += stdout_data;
});
_this._preExecution().then(function() {
//Fork a process on the machine
_this._child_process = child_process.spawn(command, args, options);

//Stores stderr
_this._child_process.stderr.on('data', function (stderr_data) {
_this._model.stderr += stderr_data;
});
//Stores stdout
_this._child_process.stdout.on('data', function (stdout_data) {
_this._model.stdout += stdout_data;
});

//Handle spawn errors
_this._child_process.on('error', function (error) {
end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Spawn - ' + error.toString());
});
//Stores stderr
_this._child_process.stderr.on('data', function (stderr_data) {
_this._model.stderr += stderr_data;
});

//Handle child termination
_this._child_process.on('exit', function (code, signal) {
if (code !== null) { //Successful run or interruption
end_fn(Constants.EAE_JOB_STATUS_DONE, code, 'Exit success');
}
else if (signal === 'SIGTERM') {
end_fn(Constants.EAE_JOB_STATUS_CANCELLED, 1, 'Interrupt success');
}
else {
end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Exit error');
}
});
}, function (error) {
end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Pre-exec - ' + error.toString());
//Handle spawn errors
_this._child_process.on('error', function (error) {
end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Spawn - ' + error.toString());
});

//Handle child termination
_this._child_process.on('exit', function (code, signal) {
if (code !== null) { //Successful run or interruption
end_fn(Constants.EAE_JOB_STATUS_DONE, code, 'Exit success');
}
else if (signal === 'SIGTERM') {
end_fn(Constants.EAE_JOB_STATUS_CANCELLED, 1, 'Interrupt success');
}
else {
end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Exit error');
}
});
}, function(error) {
end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Prepare -' + error.toString());
}, function (error) {
end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Pre-exec - ' + error.toString());
});
};

26 changes: 18 additions & 8 deletions src/jobExecutorPip.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const process = require('process');
const JobExecutorAbstract = require('./jobExecutorAbstract.js');
const {Constants} = require('eae-utils');

/**
* @class JobExecutorPip
@@ -57,14 +58,23 @@ JobExecutorPip.prototype.startExecution = function(callback) {

_this._callback = callback;
_this.fetchModel().then(function() {
let cmd = 'pip';
let args = _this._model.params;
let opts = {
cwd: process.cwd(),
end: process.env,
shell: true
};
_this._exec(cmd, args, opts);
//Clean model for execution
_this._model.stdout = '';
_this._model.stderr = '';
_this._model.status.unshift(Constants.EAE_JOB_STATUS_RUNNING);
_this._model.startDate = new Date();
_this.pushModel().then(function() {
let cmd = 'pip';
let args = _this._model.params;
let opts = {
cwd: process.cwd(),
end: process.env,
shell: true
};
_this._exec(cmd, args, opts);
}, function(error) {
throw error;
});
}, function(error) {
throw error;
});
26 changes: 18 additions & 8 deletions src/jobExecutorPython.js
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ const process = require('process');
const fs = require('fs');
const path = require('path');
const os = require('os');
const {Constants} = require('eae-utils');

const JobExecutorAbstract = require('./jobExecutorAbstract.js');
const { SwiftHelper, ErrorHelper } = require('eae-utils');
@@ -169,14 +170,23 @@ JobExecutorPython.prototype.startExecution = function(callback) {
_this._tmpDirectory = directoryPath; //Save tmp dir

_this.fetchModel().then(function () {
let cmd = 'python ' + _this._model.main;
let args = _this._model.params;
let opts = {
cwd: _this._tmpDirectory + '/input',
end: process.env,
shell: true
};
_this._exec(cmd, args, opts);
//Clean model for execution
_this._model.stdout = '';
_this._model.stderr = '';
_this._model.status.unshift(Constants.EAE_JOB_STATUS_RUNNING);
_this._model.startDate = new Date();
_this.pushModel().then(function() {
let cmd = 'python ' + _this._model.main;
let args = _this._model.params;
let opts = {
cwd: _this._tmpDirectory + '/input',
end: process.env,
shell: true
};
_this._exec(cmd, args, opts);
}, function(error) {
throw error;
});
}, function (error) {
callback(error);
});
26 changes: 18 additions & 8 deletions src/jobExecutorR.js
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ const process = require('process');
const fs = require('fs');
const path = require('path');
const os = require('os');
const {Constants} = require('eae-utils');

const JobExecutorAbstract = require('./jobExecutorAbstract.js');
const { SwiftHelper, ErrorHelper } = require('eae-utils');
@@ -169,14 +170,23 @@ JobExecutorR.prototype.startExecution = function(callback) {
_this._tmpDirectory = directoryPath; //Save tmp dir

_this.fetchModel().then(function () {
let cmd = 'Rscript --vanilla ' + _this._model.main;
let args = _this._model.params;
let opts = {
cwd: _this._tmpDirectory + '/input',
end: process.env,
shell: true
};
_this._exec(cmd, args, opts);
//Clean model for execution
_this._model.stdout = '';
_this._model.stderr = '';
_this._model.status.unshift(Constants.EAE_JOB_STATUS_RUNNING);
_this._model.startDate = new Date();
_this.pushModel().then(function() {
let cmd = 'Rscript --vanilla ' + _this._model.main;
let args = _this._model.params;
let opts = {
cwd: _this._tmpDirectory + '/input',
end: process.env,
shell: true
};
_this._exec(cmd, args, opts);
}, function(error) {
throw error;
});
}, function (error) {
callback(error);
});

0 comments on commit eeb9246

Please sign in to comment.