Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store / recover previous jextctxt #1691

Merged
merged 17 commits into from
Jan 31, 2025
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
- Add: store (and recover) previous jexlctxt and make available in current jexlctxt (#1690)
- Fix: set polling and transport for autoprovisioned devices
- Add: option to force to use CB flow control with new API field useCBflowControl at group and device device level (#1420)
- Add: useCBflowControl config setting (IOTA_CB_FLOW_CONTROL env var) to set CB flow control behaviour at instance level (#1420)
Expand Down
10 changes: 6 additions & 4 deletions doc/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ expression. In all cases the following data is available to all expressions:
- `service`: device service (`Fiware-Service`)
- `subservice`: device subservice (`Fiware-ServicePath`)
- `staticAttributes`: static attributes defined in the device or config group
- `oldCtxt`: previous JEXL context (related to last processed measure)

Additionally, for attribute expressions (`expression`, `entity_name`), `entityNameExp` and metadata expressions
(`expression`) the following is available in the **context** used to evalute:
Expand Down Expand Up @@ -1998,10 +1999,11 @@ the API resource fields and the same fields in the database model.
| `internal_attributes` | ✓ | `array` | | List of internal attributes with free format for specific IoT Agent configuration. |
| `explicitAttrs` | ✓ | `boolean` | ✓ | Field to support selective ignore of measures so that IOTA doesn’t progress. See details in [specific section](#explicitly-defined-attributes-explicitattrs) |
| `ngsiVersion` | ✓ | `string` | | string value used in mixed mode to switch between **NGSI-v2** and **NGSI-LD** payloads. The default is `v2`. When not running in mixed mode, this field is ignored. |
| `payloadType` | ✓ | `string` | | optional string value used to switch between **IoTAgent**, **NGSI-v2** and **NGSI-LD** measure payloads types. Possible values are: `iotagent`, `ngsiv2` or `ngsild`. The default is `iotagent`. |
| `storeLastMeasure` | ✓ | `boolean` | | Store in device last measure received. See more info [in this section](admin.md#storelastmeasure). False by default. |
| `lastMeasure` | ✓ | `object` | | last measure stored on device when `storeLastMeasure` is enabled. See more info [in this section](admin.md#storelastmeasure). This field can be cleared using `{}` in a device update request. In that case, `lastMeasure` is removed from device (until a next measure is received and `lastMesuare` gets created again). |
| `useCBflowControl` | ✓ | `boolean` | | Use Context Broker flow control. See more info [in this section](admin.md#useCBflowControl). False by default. |
| `payloadType` | ✓ | `string` | | optional string value used to switch between **IoTAgent**, **NGSI-v2** and **NGSI-LD** measure payloads types. Possible values are: `iotagent`, `ngsiv2` or `ngsild`. The default is `iotagent`. |
| `storeLastMeasure` | ✓ | `boolean` | | Store in device last measure received. Useful just for debugging purpose. See more info [in this section](admin.md#storelastmeasure). False by default. |
| `lastMeasure` | ✓ | `object` | | last measure stored on device when `storeLastMeasure` is enabled. See more info [in this section](admin.md#storelastmeasure). This field can be cleared using `{}` in a device update request. In that case, `lastMeasure` is removed from device (until a next measure is received and `lastMesuare` gets created again). |
| `useCBflowControl` | ✓ | `boolean` | | Use Context Broker flow control. See more info [in this section](admin.md#useCBflowControl). False by default. |
Comment on lines -2001 to +2005
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any actual change here? Or it it's just indent/whitespace?

Copy link
Member Author

@AlvaroVega AlvaroVega Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes: "Useful just for debugging purpose."


### Device operations

#### Retrieve devices /iot/devices `GET /iot/devices`
Expand Down
3 changes: 2 additions & 1 deletion lib/model/Device.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ const Device = new Schema({
payloadType: String,
useCBflowControl: Boolean,
storeLastMeasure: Boolean,
lastMeasure: Object
lastMeasure: Object,
oldCtxt: Object
});

function load() {
Expand Down
10 changes: 7 additions & 3 deletions lib/services/devices/deviceRegistryMemory.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ function getDevicesByAttribute(name, value, service, subservice, callback) {
}
}

function storeLastMeasure(measure, typeInformation, callback) {
function storeDeviceField(fieldName, fieldValue, typeInformation, callback) {
if (
typeInformation &&
typeInformation.id &&
Expand All @@ -226,7 +226,11 @@ function storeLastMeasure(measure, typeInformation, callback) {
function (error, data) {
if (!error) {
if (data) {
data.lastMeasure = measure;
if (fieldName === 'lastMeasure') {
data.lastMeasure = { timestamp: new Date().toISOString(), measure: fieldValue };
} else {
data[fieldName] = fieldValue;
}
}
if (callback) {
callback(null, data);
Expand All @@ -243,7 +247,7 @@ function storeLastMeasure(measure, typeInformation, callback) {

exports.getDevicesByAttribute = getDevicesByAttribute;
exports.store = storeDevice;
exports.storeLastMeasure = storeLastMeasure;
exports.storeDeviceField = storeDeviceField;
exports.update = update;
exports.remove = removeDevice;
exports.list = listDevices;
Expand Down
10 changes: 7 additions & 3 deletions lib/services/devices/deviceRegistryMongoDB.js
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ function getDevicesByAttribute(name, value, service, subservice, callback) {
});
}

function storeLastMeasure(measure, typeInformation, callback) {
function storeDeviceField(fieldName, fieldValue, typeInformation, callback) {
if (
typeInformation &&
typeInformation.id &&
Expand All @@ -414,7 +414,11 @@ function storeLastMeasure(measure, typeInformation, callback) {
if (error) {
callback(error);
} else {
data.lastMeasure = { timestamp: new Date().toISOString(), measure };
if (fieldName === 'lastMeasure') {
data.lastMeasure = { timestamp: new Date().toISOString(), measure: fieldValue };
} else {
data[fieldName] = fieldValue;
}
/* eslint-disable-next-line new-cap */
const deviceObj = new Device.model(data);
deviceObj.isNew = false;
Expand All @@ -441,7 +445,7 @@ function storeLastMeasure(measure, typeInformation, callback) {

exports.getDevicesByAttribute = alarmsInt(constants.MONGO_ALARM, getDevicesByAttribute);
exports.store = alarmsInt(constants.MONGO_ALARM, storeDevice);
exports.storeLastMeasure = alarmsInt(constants.MONGO_ALARM, storeLastMeasure);
exports.storeDeviceField = alarmsInt(constants.MONGO_ALARM, storeDeviceField);
exports.update = alarmsInt(constants.MONGO_ALARM, update);
exports.remove = alarmsInt(constants.MONGO_ALARM, removeDevice);
exports.list = alarmsInt(constants.MONGO_ALARM, listDevices);
Expand Down
6 changes: 3 additions & 3 deletions lib/services/devices/deviceService.js
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,8 @@ function retrieveDevice(deviceId, apiKey, callback) {
}
}

function storeLastMeasure(measure, typeInformation, callback) {
config.getRegistry().storeLastMeasure(measure, typeInformation, callback);
function storeDeviceField(fieldName, fieldValue, typeInformation, callback) {
config.getRegistry().storeDeviceField(fieldName, fieldValue, typeInformation, callback);
}

exports.listDevices = intoTrans(context, checkRegistry)(listDevices);
Expand All @@ -737,5 +737,5 @@ exports.retrieveDevice = intoTrans(context, checkRegistry)(retrieveDevice);
exports.mergeDeviceWithConfiguration = mergeDeviceWithConfiguration;
exports.findOrCreate = findOrCreate;
exports.findConfigurationGroup = findConfigurationGroup;
exports.storeLastMeasure = storeLastMeasure;
exports.storeDeviceField = storeDeviceField;
exports.init = init;
8 changes: 8 additions & 0 deletions lib/services/ngsi/entities-NGSI-v2.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ function sendUpdateValueNgsi2(entityName, originMeasures, originTypeInformation,
//metadata static attributes
jexlctxt = reduceMetadataAttrToPlainObject(typeInformation.staticAttributes, jexlctxt);

//recover oldctxt
if (typeInformation.oldCtxt) {
jexlctxt.oldCtxt = typeInformation.oldCtxt;
}
logger.debug(
context,
'sendUpdateValueNgsi2 loop with: entityName=%s, measures=%j, typeInformation=%j, initial jexlContext=%j, timestamp=%j',
Expand Down Expand Up @@ -652,6 +656,10 @@ function sendUpdateValueNgsi2(entityName, originMeasures, originTypeInformation,
}
}
}
//Add jexlctxt to typeInformation without oldCtxt
const oldCJexlctxt = { ...jexlctxt };
delete oldCJexlctxt.oldCtxt;
originTypeInformation['oldCtxt'] = oldCJexlctxt;
} // end for (let measures of originMeasures)
let url = '/v2/op/update';
if (originTypeInformation.useCBflowControl) {
Expand Down
30 changes: 26 additions & 4 deletions lib/services/ngsi/ngsiService.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,37 @@ function init() {
* @param {String} token User token to identify against the PEP Proxies (optional).
*/
function sendUpdateValue(entityName, attributes, typeInformation, token, callback) {
// check config about store last measure
const newCallback = statsRegistry.withStats('updateEntityRequestsOk', 'updateEntityRequestsError', callback);
const additionalCallback = (data, next) => {
if (typeInformation.oldCtxt) {
logger.debug(context, 'StoreOldCtxt %j', typeInformation.oldCtxt);
deviceService.storeDeviceField('oldCtxt', typeInformation.oldCtxt, typeInformation, function () {
next(null, data);
});
} else {
next(null, data);
}
};
const wrappedNewCallback = (err, result) => {
if (err) {
newCallback(err);
} else {
additionalCallback(result, (additionalErr, modifiedResult) => {
if (additionalErr) {
newCallback(additionalErr);
}
newCallback(null, modifiedResult || result);
});
}
};
// check config about store last measure
if (typeInformation.storeLastMeasure) {
logger.debug(context, 'StoreLastMeasure for %j', typeInformation);
deviceService.storeLastMeasure(attributes, typeInformation, function () {
return entityHandler.sendUpdateValue(entityName, attributes, typeInformation, token, newCallback);
deviceService.storeDeviceField('lastMeasure', attributes, typeInformation, function () {
return entityHandler.sendUpdateValue(entityName, attributes, typeInformation, token, wrappedNewCallback);
});
} else {
entityHandler.sendUpdateValue(entityName, attributes, typeInformation, token, newCallback);
entityHandler.sendUpdateValue(entityName, attributes, typeInformation, token, wrappedNewCallback);
}
}

Expand Down