diff --git a/install.json b/install.json index 8aa2ccf..078bdf2 100644 --- a/install.json +++ b/install.json @@ -92,6 +92,13 @@ "description": "Print debug messages to console which may be helpful when contacting Moesif support.", "type": "boolean", "default": false + }, + "fetchTimeoutMS": { + "order": 120, + "title": "Fetch Timeout milliseconds", + "description": "Fetch timeout in milliseconds so that Moesif can log the call even if origin server doesnt respond.", + "type": "integer", + "default": 120000 } } } diff --git a/src/index.js b/src/index.js index 3c197a3..d1da5ef 100644 --- a/src/index.js +++ b/src/index.js @@ -61,7 +61,10 @@ if (typeof INSTALL_OPTIONS === 'undefined') { // Print debug messages to console. // Enable to share debug logs with Moesif support staff for quicker debug. - "debug": false + "debug": false, + + // Fetch timeout in milliseconds so that Moesif can log the call even if origin server doesnt respond + "fetchTimeoutMS": 120000 }; INSTALL_TYPE = 'custom'; } @@ -76,7 +79,8 @@ let { urlPatterns = [], logIncomingRequests, logOutgoingRequests, - debug + debug, + fetchTimeoutMS } = INSTALL_OPTIONS; /**** @@ -120,10 +124,13 @@ const maskContent = moesifEvent => { return moesifEvent; }; -const MAX_REQUESTS_PER_BATCH = 10; +const EVENT_QUEUE_SIZE = 100000; // 100K +const MAX_REQUESTS_PER_BATCH = 100; const BATCH_DURATION = 1000; // ms const TRANSACTION_ID_HEADER = 'X-Moesif-Transaction-Id'; let samplingPercentage = 100; +let MAX_BATCH_WAIT_TIME_MS = 1000; // MS +let lastBatchSentDate = new Date(new Date() - MAX_BATCH_WAIT_TIME_MS) if (typeof INSTALL_ID === 'undefined') { INSTALL_ID = undefined; @@ -407,49 +414,69 @@ async function handleBatch() { moesifLog(`handleBatch start`) if (!batchRunning) { - batchRunning = true; await sleep(BATCH_DURATION); - - if (jobs.length) await batch(); - - batchRunning = false; + if (jobs.length) { + const jobsForBatching = structuredClone(jobs); // clone it + jobs.length = 0; // empty the jobs as it's been cloned + await batch(jobsForBatching); + } } } -function batch() { - moesifLog(`batch start`) - - if (jobs.length > 0) { - const applicationIdMap = {}; - moesifLog(`batch has jobs`) - - jobs.forEach(({ appId, moesifEvent }) => { - if (!(appId in applicationIdMap)) { - applicationIdMap[appId] = []; - } - - if ((moesifEvent.direction === 'Outgoing' && logOutgoingRequests) || - (moesifEvent.direction === 'Incoming' && logIncomingRequests)) { - applicationIdMap[appId].push(moesifEvent); +function batch(jobsForBatching) { + + // ------------------------ + // Important + // This should be the first statement in the function. + // ------------------------ + batchRunning = true; + + moesifLog(`batch start job size ${jobsForBatching.length}`); + + const applicationIdMap = {}; + // e.g; + // jobs = [ + // { "appId": "aid-1" , "moesifEvent": {"direction": "Outgoing" , "request" : [1, 2, 3] } }, + // { "appId": "aid-2" , "moesifEvent": {"direction": "Incoming" , "response": [21, 22, 23] } }, + // { "appId": "aid-3" , "moesifEvent": {"direction": "Outgoing" , "request" : [31, 32, 33] } } + // ]; + moesifLog(`batch has jobs`); + + // Group events by appId + jobsForBatching.forEach(({ appId, moesifEvent }) => { + + if ((moesifEvent.direction === 'Outgoing' && logOutgoingRequests) || + (moesifEvent.direction === 'Incoming' && logIncomingRequests)) { + + // Add event to specific appId. + try { + applicationIdMap[appId].push(moesifEvent); // Add it + } catch(e) { + // Object does not exist. Add it. + applicationIdMap[appId] = [moesifEvent]; // Initialize it } - }); + } + }); - let promises = []; + const promises = []; + let batchCounter = 0; - Object.keys(applicationIdMap).forEach(appId => { + Object.keys(applicationIdMap).forEach((appId) => { + const batchEvents = applicationIdMap[appId]; + if (batchEvents.length) { const moesifHeaders = { 'Accept': 'application/json; charset=utf-8', 'X-Moesif-Application-Id': appId, 'User-Agent': 'moesif-cloudflare', 'X-Moesif-Cf-Install-Id': INSTALL_ID, 'X-Moesif-Cf-Install-Product': (INSTALL_PRODUCT && INSTALL_PRODUCT.id), - 'X-Moesif-Cf-Install-Type': INSTALL_TYPE, + 'X-Moesif-Cf-Install-Type': INSTALL_TYPE } moesifLog(JSON.stringify(moesifHeaders)); - const body = JSON.stringify(applicationIdMap[appId]); + const body = JSON.stringify(batchEvents); moesifLog(body); const options = { @@ -458,15 +485,33 @@ function batch() { body: body }; + promises.push(fetch(BATCH_URL, options)); - }); - - jobs = []; - - return Promise.all(promises); + batchCounter++; + } + }); + + moesifLog(`Total batches: ${batchCounter}`); + + // ------------------------ + // Important + // This reset should be the last statement in the function before return. + // ------------------------ + batchRunning = false; + lastBatchSentDate = new Date(); + + if (promises.length) { + // Add a sleep/wait promise too - to make sure promise is resolved in max BATCH_DURATION + // because we want to use "fire and forget" approach for sending events to moesif. + promises.push(sleep(BATCH_DURATION)); + return Promise.race(promises); } } +function hasLastBatchSentTimeExpired() { + return new Date() - lastBatchSentDate > MAX_BATCH_WAIT_TIME_MS; +} + async function tryTrackRequest(event, request, response, before, after, txId, requestBody, userId, companyId) { if (!isMoesif(request) && !runHook(() => skip(request, response), 'skip', false)) { moesifLog(`tryTrackRequest start url=${request.url}`) @@ -479,15 +524,17 @@ async function tryTrackRequest(event, request, response, before, after, txId, re // only track this if there's an associated applicationId // services may want to not report certain requests - jobs.push({ - appId, - moesifEvent - }); + if (jobs.length >= EVENT_QUEUE_SIZE) { + moesifLog(`Queue is full, skipping new events`) + } else { + jobs.push({ + appId, + moesifEvent + }); + } - if (jobs.length >= MAX_REQUESTS_PER_BATCH) { - // let's send everything right now - event.waitUntil(batch()); - } else if (!batchRunning) { + // Log the events to moesif if batch is available or max batch time has expired. + if (jobs.length && (jobs.length >= MAX_REQUESTS_PER_BATCH || hasLastBatchSentTimeExpired()) ) { // wait until the next batch job // event.waitUntil(sleep(BATCH_DURATION)); event.waitUntil(handleBatch()); @@ -537,7 +584,7 @@ async function logRequest(event) { const race = Promise.race([ fetch(request), - sleep(10000), + sleep(fetchTimeoutMS), ]); event.waitUntil(race); moesifLog(`logging request url=${request.url}`)