-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregator.js
63 lines (50 loc) · 1.77 KB
/
aggregator.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
var config = require('./config');
var _ = require('lodash');
var winston = require('winston');
var moment = require('moment');
function logError(err) {
winston.error(err.stack ? err.stack : (err.message ? err.message : err));
}
var Aggregator = function (db)
{
this.db = db;
this.updateAlways = false;
this.aggregationInterval = config.stats.aggregation_interval * 1000;
};
Aggregator.prototype.process = function (timestamp, lastTimestamp, callback)
{
var self = this;
if ("function" !== typeof callback) callback = function (err) {
if (err) winston.error(err);
};
if (!this.updateAlways &&
lastTimestamp instanceof Date) {
var timeDelta = timestamp.getTime() - lastTimestamp.getTime();
if (timeDelta < this.aggregationInterval) {
// We're within the aggregationInterval, don't aggregate
return false;
}
}
var periodStart = new Date(
Math.floor(timestamp.getTime() / this.aggregationInterval)
* this.aggregationInterval);
var periodEnd = new Date(
periodStart.getTime() + this.aggregationInterval - 1);
winston.info("Aggregating " + moment(periodStart).format("M/D/YY HH:mm") +
" to " + moment(periodEnd).format("M/D/YY HH:mm"));
updateLedgersAggregate();
function updateLedgersAggregate() {
self.db.query("REPLACE INTO ledgers_aggregate "+
"(time, ledger_first, ledger_last, txs, accounts_delta) "+
"SELECT ? AS srctime, MIN(id), MAX(id), SUM(txs), SUM(accounts_delta) "+
"FROM ledgers "+
"WHERE time >= ? AND time <= ? ",
[periodStart, periodStart, periodEnd],
function(err) {
if (err) callback(err);
else callback();
});
}
return true;
};
exports.Aggregator = Aggregator;