Skip to content

Commit

Permalink
L-1147 Add batchSizeKiB option, add default size for Browser (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
PetrHeinz authored Jan 5, 2024
1 parent c3f2778 commit 4060622
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 5 deletions.
41 changes: 41 additions & 0 deletions packages/browser/src/browser.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,45 @@ describe("browser tests", () => {
const message: string = String(Math.random);
await expect(browser.log(message)).rejects.toThrow();
});

it("should split large logs into multiple batches to avoid 64 KiB limit on keepalive requests", async () => {
let calledCount = 0;

nock("https://in.logs.betterstack.com")
.post("/")
.twice()
.reply(201, () => {
calledCount++;
});

const browser = new Browser("valid source token", {
throwExceptions: true,
batchInterval: 100,
});

// 6 logs, each over 12 KiB (each logs also contains context, datetime, etc.)
const over12KiB = "X".repeat(13000);
await Promise.all([...Array(6)].map(() => browser.log(over12KiB)));

expect(calledCount).toEqual(2);
});

it("should be able to sent 100 small logs in a single batch", async () => {
let calledCount = 0;

nock("https://in.logs.betterstack.com")
.post("/")
.reply(201, () => {
calledCount++;
});

const browser = new Browser("valid source token", {
throwExceptions: true,
batchInterval: 100,
});

await Promise.all([...Array(100)].map(() => browser.log("small")));

expect(calledCount).toEqual(1);
});
});
3 changes: 2 additions & 1 deletion packages/browser/src/browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import { Base } from "@logtail/core";

export class Browser extends Base {
public constructor(sourceToken: string, options?: Partial<ILogtailOptions>) {
super(sourceToken, options);
// After reaching 48KiB, the batch will get flushed automatically to avoid 64KiB body limit for keepalive requests
super(sourceToken, { batchSizeKiB: 48, ...options });

// Sync function
const sync = async (logs: ILogtailLog[]): Promise<ILogtailLog[]> => {
Expand Down
15 changes: 14 additions & 1 deletion packages/core/src/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ import {
Middleware,
Sync,
} from "@logtail/types";
import { makeBatch, makeBurstProtection, makeThrottle } from "@logtail/tools";
import {
makeBatch,
makeBurstProtection,
makeThrottle,
calculateJsonLogSizeBytes,
} from "@logtail/tools";
import { serializeError } from "serialize-error";

// Types
Expand All @@ -21,6 +26,9 @@ const defaultOptions: ILogtailOptions = {
// Maximum number of logs to sync in a single request to Better Stack
batchSize: 1000,

// Size of logs (in KiB) to trigger sync to Better Stack (0 to disable)
batchSizeKiB: 0,

// Max interval (in milliseconds) before a batch of logs proceeds to syncing
batchInterval: 1000,

Expand Down Expand Up @@ -60,6 +68,9 @@ const defaultOptions: ILogtailOptions = {

// If true, all logs will be sent to Better Stack
sendLogsToBetterStack: true,

// Function to be used to calculate size of logs in bytes (to evaluate batchSizeLimitKiB)
calculateLogSizeBytes: calculateJsonLogSizeBytes,
};

/**
Expand Down Expand Up @@ -138,6 +149,8 @@ class Logtail {
this._options.batchInterval,
this._options.retryCount,
this._options.retryBackoff,
this._options.batchSizeKiB * 1024,
this._options.calculateLogSizeBytes,
);

this._batch = batcher.initPusher((logs: any) => {
Expand Down
48 changes: 47 additions & 1 deletion packages/tools/src/batch.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import nock from "nock";
import fetch from "cross-fetch";
import { ILogtailLog, LogLevel } from "@logtail/types";
import makeBatch from "./batch";
import makeBatch, { calculateJsonLogSizeBytes } from "./batch";
import makeThrottle from "./throttle";

/**
Expand Down Expand Up @@ -184,4 +184,50 @@ describe("batch tests", () => {
}
expect(called).toHaveBeenCalledTimes(1);
});

it("should send large logs in multiple batches", async () => {
const called = jest.fn();
const size = 1000;
const sendTimeout = 1000;
const retryCount = 0;
const retryBackoff = 0;

// Every log is calculated to have 50B and there's 500B limit
const sizeBytes = 500;
const calculateSize = (_log: ILogtailLog) => 50;

const batcher = makeBatch(
size,
sendTimeout,
retryCount,
retryBackoff,
sizeBytes,
calculateSize,
);
const logger = batcher.initPusher(async (_batch: ILogtailLog[]) => {
called();
});

// 100 logs with 50B each is 5000B in total - expecting 10 batches of 500B
await Promise.all(logNumberTimes(logger, 100)).catch(e => {
throw e;
});
expect(called).toHaveBeenCalledTimes(10);
});
});

describe("JSON log size calculator", () => {
it("should calculate log size as JSON length", async () => {
const log: ILogtailLog = {
dt: new Date(),
level: LogLevel.Info,
message: "My message",
};

const actualLogSizeBytes = calculateJsonLogSizeBytes(log);
const expectedLogSizeBytes = '{"dt":"????-??-??T??:??:??.???Z","level":"INFO","message":"My message"},'
.length;

expect(actualLogSizeBytes).toEqual(expectedLogSizeBytes);
});
});
25 changes: 24 additions & 1 deletion packages/tools/src/batch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,36 @@ const DEFAULT_RETRY_COUNT = 3;
*/
const DEFAULT_RETRY_BACKOFF = 100;

/*
* Default function for computing log size (serialized JSON length + 1 for comma)
*/
export const calculateJsonLogSizeBytes = (log: ILogtailLog) =>
JSON.stringify(log).length + 1;

/**
* batch the buffer coming in, process them and then resolve
*
* @param size - Number
* @param flushTimeout - Number
* @param retryCount - Number
* @param retryBackoff - Number
* @param sizeBytes - Size of the batch (in bytes) that triggers flushing. Set to 0 to disable.
* @param calculateLogSizeBytes - Function to calculate size of a single ILogtailLog instance (in bytes).
*/
export default function makeBatch(
size: number = DEFAULT_BUFFER_SIZE,
flushTimeout: number = DEFAULT_FLUSH_TIMEOUT,
retryCount: number = DEFAULT_RETRY_COUNT,
retryBackoff: number = DEFAULT_RETRY_BACKOFF,
sizeBytes: number = 0,
calculateLogSizeBytes: (
log: ILogtailLog,
) => number = calculateJsonLogSizeBytes,
) {
let timeout: NodeJS.Timeout | null;
let cb: Function;
let buffer: IBuffer[] = [];
let bufferSizeBytes = 0;
let retry: number = 0;
// Wait until the minimum retry backoff time has passed before retrying
let minRetryBackoff: number = 0;
Expand All @@ -61,7 +74,9 @@ export default function makeBatch(
timeout = null;

const currentBuffer = buffer;
const currentBufferSizeKB = bufferSizeBytes;
buffer = [];
bufferSizeBytes = 0;

try {
await cb(currentBuffer.map(d => d.log));
Expand All @@ -72,6 +87,7 @@ export default function makeBatch(
retry++;
minRetryBackoff = Date.now() + retryBackoff;
buffer = buffer.concat(currentBuffer);
bufferSizeBytes += currentBufferSizeKB;
await setupTimeout();
return;
}
Expand Down Expand Up @@ -111,10 +127,17 @@ export default function makeBatch(
return async function(log: ILogtailLog): Promise<ILogtailLog> {
return new Promise<ILogtailLog>(async (resolve, reject) => {
buffer.push({ log, resolve, reject });
// We can skip log size calculation if there is no max size set
if (sizeBytes > 0) {
bufferSizeBytes += calculateLogSizeBytes(log);
}

// If the buffer is full enough, flush it
// Unless we're still waiting for the minimum retry backoff time
if (buffer.length >= size && Date.now() > minRetryBackoff) {
const isBufferFullEnough =
buffer.length >= size ||
(sizeBytes > 0 && bufferSizeBytes >= sizeBytes);
if (isBufferFullEnough && Date.now() > minRetryBackoff) {
await flush();
} else {
await setupTimeout();
Expand Down
3 changes: 2 additions & 1 deletion packages/tools/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IQueue } from "./types";
import Queue from "./queue";
import { base64Encode } from "./encode";
import makeBatch from "./batch";
import makeBatch, { calculateJsonLogSizeBytes } from "./batch";
import makeBurstProtection from "./burstProtection";
import makeThrottle from "./throttle";

Expand All @@ -15,4 +15,5 @@ export {
makeBatch,
makeBurstProtection,
makeThrottle,
calculateJsonLogSizeBytes,
};
10 changes: 10 additions & 0 deletions packages/types/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ export interface ILogtailOptions {
*/
batchSize: number;

/**
* Size of logs (in KiB) to trigger sync to Better Stack (0 to disable)
*/
batchSizeKiB: number;

/**
* Max interval (in milliseconds) before a batch of logs proceeds to syncing
*/
Expand Down Expand Up @@ -79,6 +84,11 @@ export interface ILogtailOptions {
* If true, all logs will be sent to Better Stack
**/
sendLogsToBetterStack: boolean;

/**
* Function to be used to calculate size of logs in bytes (to evaluate batchSizeKiB). JSON length by default.
**/
calculateLogSizeBytes: (logs: ILogtailLog) => number;
}
export interface ILogtailEdgeOptions extends ILogtailOptions {
/**
Expand Down

0 comments on commit 4060622

Please sign in to comment.