From eeb9246cdbfc41ad5236dc7a3d5ceb3a47e1d270 Mon Sep 17 00:00:00 2001 From: Axel Oehmichen Date: Fri, 24 Nov 2017 15:08:58 +0000 Subject: [PATCH] refactoring code because the set to running should be done just before the actual running not 10 years before! Adding a commented out section in push model in case the test shows it requres further wait. --- src/jobExecutorAbstract.js | 70 +++++++++++++++++--------------------- src/jobExecutorPip.js | 26 +++++++++----- src/jobExecutorPython.js | 26 +++++++++----- src/jobExecutorR.js | 26 +++++++++----- 4 files changed, 86 insertions(+), 62 deletions(-) diff --git a/src/jobExecutorAbstract.js b/src/jobExecutorAbstract.js index 05607e7..da70d85 100644 --- a/src/jobExecutorAbstract.js +++ b/src/jobExecutorAbstract.js @@ -56,6 +56,9 @@ JobExecutorAbstract.prototype.pushModel = function() { let _this = this; return new Promise(function(resolve, reject) { + // while(_this._model.statusLock){ + // setTimeout(function() { _this.fetchModel(); }, 500); // we wait for 500 ms before refreshing again + // } let replacementData = _this._model; delete replacementData._id; // Cleanup MongoDB managed _id field, if any _this._jobCollection.findOneAndReplace({ _id : _this._jobID }, replacementData, { upsert: true, returnOriginal: false }) @@ -109,48 +112,39 @@ JobExecutorAbstract.prototype._exec = function(command, args, options) { }); // Post execution error }; // end_fn - //Clean model for execution - _this._model.stdout = ''; - _this._model.stderr = ''; - _this._model.status.unshift(Constants.EAE_JOB_STATUS_RUNNING); - _this._model.startDate = new Date(); - _this.pushModel().then(function() { - _this._preExecution().then(function() { - //Fork a process on the machine - _this._child_process = child_process.spawn(command, args, options); - - //Stores stdout - _this._child_process.stdout.on('data', function (stdout_data) { - _this._model.stdout += stdout_data; - }); + _this._preExecution().then(function() { + //Fork a process on the machine + _this._child_process = child_process.spawn(command, args, options); - //Stores stderr - _this._child_process.stderr.on('data', function (stderr_data) { - _this._model.stderr += stderr_data; - }); + //Stores stdout + _this._child_process.stdout.on('data', function (stdout_data) { + _this._model.stdout += stdout_data; + }); - //Handle spawn errors - _this._child_process.on('error', function (error) { - end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Spawn - ' + error.toString()); - }); + //Stores stderr + _this._child_process.stderr.on('data', function (stderr_data) { + _this._model.stderr += stderr_data; + }); - //Handle child termination - _this._child_process.on('exit', function (code, signal) { - if (code !== null) { //Successful run or interruption - end_fn(Constants.EAE_JOB_STATUS_DONE, code, 'Exit success'); - } - else if (signal === 'SIGTERM') { - end_fn(Constants.EAE_JOB_STATUS_CANCELLED, 1, 'Interrupt success'); - } - else { - end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Exit error'); - } - }); - }, function (error) { - end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Pre-exec - ' + error.toString()); + //Handle spawn errors + _this._child_process.on('error', function (error) { + end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Spawn - ' + error.toString()); + }); + + //Handle child termination + _this._child_process.on('exit', function (code, signal) { + if (code !== null) { //Successful run or interruption + end_fn(Constants.EAE_JOB_STATUS_DONE, code, 'Exit success'); + } + else if (signal === 'SIGTERM') { + end_fn(Constants.EAE_JOB_STATUS_CANCELLED, 1, 'Interrupt success'); + } + else { + end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Exit error'); + } }); - }, function(error) { - end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Prepare -' + error.toString()); + }, function (error) { + end_fn(Constants.EAE_JOB_STATUS_ERROR, 1, 'Pre-exec - ' + error.toString()); }); }; diff --git a/src/jobExecutorPip.js b/src/jobExecutorPip.js index 14f2fea..e84ec14 100644 --- a/src/jobExecutorPip.js +++ b/src/jobExecutorPip.js @@ -1,5 +1,6 @@ const process = require('process'); const JobExecutorAbstract = require('./jobExecutorAbstract.js'); +const {Constants} = require('eae-utils'); /** * @class JobExecutorPip @@ -57,14 +58,23 @@ JobExecutorPip.prototype.startExecution = function(callback) { _this._callback = callback; _this.fetchModel().then(function() { - let cmd = 'pip'; - let args = _this._model.params; - let opts = { - cwd: process.cwd(), - end: process.env, - shell: true - }; - _this._exec(cmd, args, opts); + //Clean model for execution + _this._model.stdout = ''; + _this._model.stderr = ''; + _this._model.status.unshift(Constants.EAE_JOB_STATUS_RUNNING); + _this._model.startDate = new Date(); + _this.pushModel().then(function() { + let cmd = 'pip'; + let args = _this._model.params; + let opts = { + cwd: process.cwd(), + end: process.env, + shell: true + }; + _this._exec(cmd, args, opts); + }, function(error) { + throw error; + }); }, function(error) { throw error; }); diff --git a/src/jobExecutorPython.js b/src/jobExecutorPython.js index 983f118..f198bf9 100644 --- a/src/jobExecutorPython.js +++ b/src/jobExecutorPython.js @@ -2,6 +2,7 @@ const process = require('process'); const fs = require('fs'); const path = require('path'); const os = require('os'); +const {Constants} = require('eae-utils'); const JobExecutorAbstract = require('./jobExecutorAbstract.js'); const { SwiftHelper, ErrorHelper } = require('eae-utils'); @@ -169,14 +170,23 @@ JobExecutorPython.prototype.startExecution = function(callback) { _this._tmpDirectory = directoryPath; //Save tmp dir _this.fetchModel().then(function () { - let cmd = 'python ' + _this._model.main; - let args = _this._model.params; - let opts = { - cwd: _this._tmpDirectory + '/input', - end: process.env, - shell: true - }; - _this._exec(cmd, args, opts); + //Clean model for execution + _this._model.stdout = ''; + _this._model.stderr = ''; + _this._model.status.unshift(Constants.EAE_JOB_STATUS_RUNNING); + _this._model.startDate = new Date(); + _this.pushModel().then(function() { + let cmd = 'python ' + _this._model.main; + let args = _this._model.params; + let opts = { + cwd: _this._tmpDirectory + '/input', + end: process.env, + shell: true + }; + _this._exec(cmd, args, opts); + }, function(error) { + throw error; + }); }, function (error) { callback(error); }); diff --git a/src/jobExecutorR.js b/src/jobExecutorR.js index 78c1c47..c6691cc 100644 --- a/src/jobExecutorR.js +++ b/src/jobExecutorR.js @@ -2,6 +2,7 @@ const process = require('process'); const fs = require('fs'); const path = require('path'); const os = require('os'); +const {Constants} = require('eae-utils'); const JobExecutorAbstract = require('./jobExecutorAbstract.js'); const { SwiftHelper, ErrorHelper } = require('eae-utils'); @@ -169,14 +170,23 @@ JobExecutorR.prototype.startExecution = function(callback) { _this._tmpDirectory = directoryPath; //Save tmp dir _this.fetchModel().then(function () { - let cmd = 'Rscript --vanilla ' + _this._model.main; - let args = _this._model.params; - let opts = { - cwd: _this._tmpDirectory + '/input', - end: process.env, - shell: true - }; - _this._exec(cmd, args, opts); + //Clean model for execution + _this._model.stdout = ''; + _this._model.stderr = ''; + _this._model.status.unshift(Constants.EAE_JOB_STATUS_RUNNING); + _this._model.startDate = new Date(); + _this.pushModel().then(function() { + let cmd = 'Rscript --vanilla ' + _this._model.main; + let args = _this._model.params; + let opts = { + cwd: _this._tmpDirectory + '/input', + end: process.env, + shell: true + }; + _this._exec(cmd, args, opts); + }, function(error) { + throw error; + }); }, function (error) { callback(error); });