Skip to content

Commit

Permalink
Add: Fix for duplicate events and added support for events batching. (#…
Browse files Browse the repository at this point in the history
…18)

- Add: Support for batching
- Add: eventQueueSize and fetchTimeoutMS config option
- Refactor: Increase max request per batch to 100
- Refactor: Fix the duplicates by clearing jobs
- Refactor: Added support for batching events on read
- Refactor: Ensure that the sending events to moesif promise resolves faster
- Refactor: Remove additional logs
- Refactor: Change MAX_BATCH_WAIT_TIME_MS to 1 second
  • Loading branch information
keyur9 authored Jun 27, 2023
1 parent 7754c1a commit 0db6efd
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 42 deletions.
7 changes: 7 additions & 0 deletions install.json
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@
"description": "Print debug messages to console which may be helpful when contacting Moesif support.",
"type": "boolean",
"default": false
},
"fetchTimeoutMS": {
"order": 120,
"title": "Fetch Timeout milliseconds",
"description": "Fetch timeout in milliseconds so that Moesif can log the call even if origin server doesnt respond.",
"type": "integer",
"default": 120000
}
}
}
Expand Down
131 changes: 89 additions & 42 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ if (typeof INSTALL_OPTIONS === 'undefined') {

// Print debug messages to console.
// Enable to share debug logs with Moesif support staff for quicker debug.
"debug": false
"debug": false,

// Fetch timeout in milliseconds so that Moesif can log the call even if origin server doesnt respond
"fetchTimeoutMS": 120000
};
INSTALL_TYPE = 'custom';
}
Expand All @@ -76,7 +79,8 @@ let {
urlPatterns = [],
logIncomingRequests,
logOutgoingRequests,
debug
debug,
fetchTimeoutMS
} = INSTALL_OPTIONS;

/****
Expand Down Expand Up @@ -120,10 +124,13 @@ const maskContent = moesifEvent => {
return moesifEvent;
};

const MAX_REQUESTS_PER_BATCH = 10;
const EVENT_QUEUE_SIZE = 100000; // 100K
const MAX_REQUESTS_PER_BATCH = 100;
const BATCH_DURATION = 1000; // ms
const TRANSACTION_ID_HEADER = 'X-Moesif-Transaction-Id';
let samplingPercentage = 100;
let MAX_BATCH_WAIT_TIME_MS = 1000; // MS
let lastBatchSentDate = new Date(new Date() - MAX_BATCH_WAIT_TIME_MS)

if (typeof INSTALL_ID === 'undefined') {
INSTALL_ID = undefined;
Expand Down Expand Up @@ -407,49 +414,69 @@ async function handleBatch() {
moesifLog(`handleBatch start`)

if (!batchRunning) {
batchRunning = true;

await sleep(BATCH_DURATION);

if (jobs.length) await batch();

batchRunning = false;
if (jobs.length) {
const jobsForBatching = structuredClone(jobs); // clone it
jobs.length = 0; // empty the jobs as it's been cloned
await batch(jobsForBatching);
}
}
}

function batch() {
moesifLog(`batch start`)

if (jobs.length > 0) {
const applicationIdMap = {};
moesifLog(`batch has jobs`)

jobs.forEach(({ appId, moesifEvent }) => {
if (!(appId in applicationIdMap)) {
applicationIdMap[appId] = [];
}

if ((moesifEvent.direction === 'Outgoing' && logOutgoingRequests) ||
(moesifEvent.direction === 'Incoming' && logIncomingRequests)) {
applicationIdMap[appId].push(moesifEvent);
function batch(jobsForBatching) {

// ------------------------
// Important
// This should be the first statement in the function.
// ------------------------
batchRunning = true;

moesifLog(`batch start job size ${jobsForBatching.length}`);

const applicationIdMap = {};
// e.g;
// jobs = [
// { "appId": "aid-1" , "moesifEvent": {"direction": "Outgoing" , "request" : [1, 2, 3] } },
// { "appId": "aid-2" , "moesifEvent": {"direction": "Incoming" , "response": [21, 22, 23] } },
// { "appId": "aid-3" , "moesifEvent": {"direction": "Outgoing" , "request" : [31, 32, 33] } }
// ];
moesifLog(`batch has jobs`);

// Group events by appId
jobsForBatching.forEach(({ appId, moesifEvent }) => {

if ((moesifEvent.direction === 'Outgoing' && logOutgoingRequests) ||
(moesifEvent.direction === 'Incoming' && logIncomingRequests)) {

// Add event to specific appId.
try {
applicationIdMap[appId].push(moesifEvent); // Add it
} catch(e) {
// Object does not exist. Add it.
applicationIdMap[appId] = [moesifEvent]; // Initialize it
}
});
}
});

let promises = [];
const promises = [];
let batchCounter = 0;

Object.keys(applicationIdMap).forEach(appId => {
Object.keys(applicationIdMap).forEach((appId) => {
const batchEvents = applicationIdMap[appId];

if (batchEvents.length) {
const moesifHeaders = {
'Accept': 'application/json; charset=utf-8',
'X-Moesif-Application-Id': appId,
'User-Agent': 'moesif-cloudflare',
'X-Moesif-Cf-Install-Id': INSTALL_ID,
'X-Moesif-Cf-Install-Product': (INSTALL_PRODUCT && INSTALL_PRODUCT.id),
'X-Moesif-Cf-Install-Type': INSTALL_TYPE,
'X-Moesif-Cf-Install-Type': INSTALL_TYPE
}
moesifLog(JSON.stringify(moesifHeaders));

const body = JSON.stringify(applicationIdMap[appId]);
const body = JSON.stringify(batchEvents);
moesifLog(body);

const options = {
Expand All @@ -458,15 +485,33 @@ function batch() {
body: body
};


promises.push(fetch(BATCH_URL, options));
});

jobs = [];

return Promise.all(promises);
batchCounter++;
}
});

moesifLog(`Total batches: ${batchCounter}`);

// ------------------------
// Important
// This reset should be the last statement in the function before return.
// ------------------------
batchRunning = false;
lastBatchSentDate = new Date();

if (promises.length) {
// Add a sleep/wait promise too - to make sure promise is resolved in max BATCH_DURATION
// because we want to use "fire and forget" approach for sending events to moesif.
promises.push(sleep(BATCH_DURATION));
return Promise.race(promises);
}
}

function hasLastBatchSentTimeExpired() {
return new Date() - lastBatchSentDate > MAX_BATCH_WAIT_TIME_MS;
}

async function tryTrackRequest(event, request, response, before, after, txId, requestBody, userId, companyId) {
if (!isMoesif(request) && !runHook(() => skip(request, response), 'skip', false)) {
moesifLog(`tryTrackRequest start url=${request.url}`)
Expand All @@ -479,15 +524,17 @@ async function tryTrackRequest(event, request, response, before, after, txId, re
// only track this if there's an associated applicationId
// services may want to not report certain requests

jobs.push({
appId,
moesifEvent
});
if (jobs.length >= EVENT_QUEUE_SIZE) {
moesifLog(`Queue is full, skipping new events`)
} else {
jobs.push({
appId,
moesifEvent
});
}

if (jobs.length >= MAX_REQUESTS_PER_BATCH) {
// let's send everything right now
event.waitUntil(batch());
} else if (!batchRunning) {
// Log the events to moesif if batch is available or max batch time has expired.
if (jobs.length && (jobs.length >= MAX_REQUESTS_PER_BATCH || hasLastBatchSentTimeExpired()) ) {
// wait until the next batch job
// event.waitUntil(sleep(BATCH_DURATION));
event.waitUntil(handleBatch());
Expand Down Expand Up @@ -537,7 +584,7 @@ async function logRequest(event) {

const race = Promise.race([
fetch(request),
sleep(10000),
sleep(fetchTimeoutMS),
]);
event.waitUntil(race);
moesifLog(`logging request url=${request.url}`)
Expand Down

0 comments on commit 0db6efd

Please sign in to comment.