diff --git a/README.md b/README.md index a3229385..57055f1a 100644 --- a/README.md +++ b/README.md @@ -87,9 +87,10 @@ To consume a redis cluster, you can use ### Retriable connection -By default, a client's connection will retry a command execution if the server -dies or the network becomes unavailable. You can change the maximum number of -retries by setting `maxRetryCount` (It's default to `10`): +By default, a client's connection will retry a command execution based on +exponential backoff algorithm if the server dies or the network becomes +unavailable. You can change the maximum number of retries by setting +`maxRetryCount` (It's default to `10`): ```ts import { connect } from "https://deno.land/x/redis/mod.ts"; diff --git a/backoff.ts b/backoff.ts new file mode 100644 index 00000000..3a55e60a --- /dev/null +++ b/backoff.ts @@ -0,0 +1,32 @@ +export interface Backoff { + /** + * Returns the next backoff interval in milliseconds + */ + (attempts: number): number; +} + +export interface ExponentialBackoffOptions { + /** + * @default 2 + */ + multiplier: number; + /** + * The maximum backoff interval in milliseconds + * @default 5000 + */ + maxInterval: number; + /** + * The minimum backoff interval in milliseconds + * @default 500 + */ + minInterval: number; +} + +export function exponentialBackoff({ + multiplier = 2, + maxInterval = 5000, + minInterval = 500, +}: Partial = {}): Backoff { + return (attempts) => + Math.min(maxInterval, minInterval * (multiplier ** (attempts - 1))); +} diff --git a/connection.ts b/connection.ts index e97726fc..7cad3e57 100644 --- a/connection.ts +++ b/connection.ts @@ -1,9 +1,13 @@ import { sendCommand } from "./protocol/mod.ts"; import type { Raw, RedisValue } from "./protocol/mod.ts"; +import type { Backoff } from "./backoff.ts"; +import { exponentialBackoff } from "./backoff.ts"; +import { ErrorReplyError } from "./errors.ts"; import { BufReader, BufWriter, } from "./vendor/https/deno.land/std/io/buffer.ts"; +import { delay } from "./vendor/https/deno.land/std/async/delay.ts"; type Closer = Deno.Closer; export interface Connection { @@ -11,7 +15,6 @@ export interface Connection { reader: BufReader; writer: BufWriter; maxRetryCount: number; - retryInterval: number; isClosed: boolean; isConnected: boolean; isRetriable: boolean; @@ -26,9 +29,11 @@ export interface RedisConnectionOptions { password?: string; username?: string; name?: string; + /** + * @default 10 + */ maxRetryCount?: number; - // TODO: Provide more flexible retry strategy - retryInterval?: number; + backoff?: Backoff; } export class RedisConnection implements Connection { @@ -37,12 +42,13 @@ export class RedisConnection implements Connection { reader!: BufReader; writer!: BufWriter; maxRetryCount = 10; - retryInterval = 1200; + private readonly hostname: string; + private readonly port: number | string; private retryCount = 0; private _isClosed = false; private _isConnected = false; - private connectThunkified: () => Promise; + private backoff: Backoff; get isClosed(): boolean { return this._isClosed; @@ -61,62 +67,34 @@ export class RedisConnection implements Connection { port: number | string, private options: RedisConnectionOptions, ) { - this.connectThunkified = this.thunkifyConnect(hostname, port, options); - } - - private thunkifyConnect( - hostname: string, - port: string | number, - options: RedisConnectionOptions, - ): () => Promise { - return async () => { - const dialOpts: Deno.ConnectOptions = { - hostname, - port: parsePortLike(port), - }; - const conn: Deno.Conn = options?.tls - ? await Deno.connectTls(dialOpts) - : await Deno.connect(dialOpts); - - if (options.name) { - this.name = options.name; - } - if (options.maxRetryCount) { - this.maxRetryCount = options.maxRetryCount; - } - if (options.retryInterval) { - this.retryInterval = options.retryInterval; - } - - this.closer = conn; - this.reader = new BufReader(conn); - this.writer = new BufWriter(conn); - this._isClosed = false; - this._isConnected = true; - - try { - if (options?.password != null) { - await this.authenticate(options?.username, options?.password); - } - if (options?.db) { - await this.selectDb(options.db); - } - } catch (error) { - this.close(); - throw error; - } - - return this as RedisConnection; - }; + this.hostname = hostname; + this.port = port; + if (options.name) { + this.name = options.name; + } + if (options.maxRetryCount != null) { + this.maxRetryCount = options.maxRetryCount; + } + this.backoff = options.backoff ?? exponentialBackoff(); } private async authenticate( username: string | undefined, password: string, ): Promise { - password && username - ? await this.sendCommand("AUTH", username, password) - : await this.sendCommand("AUTH", password); + try { + password && username + ? await this.sendCommand("AUTH", username, password) + : await this.sendCommand("AUTH", password); + } catch (error) { + if (error instanceof ErrorReplyError) { + throw new AuthenticationError("Authentication failed", { + cause: error, + }); + } else { + throw error; + } + } } private async selectDb( @@ -138,7 +116,48 @@ export class RedisConnection implements Connection { * Connect to Redis server */ async connect(): Promise { - await this.connectThunkified(); + try { + const dialOpts: Deno.ConnectOptions = { + hostname: this.hostname, + port: parsePortLike(this.port), + }; + const conn: Deno.Conn = this.options?.tls + ? await Deno.connectTls(dialOpts) + : await Deno.connect(dialOpts); + + this.closer = conn; + this.reader = new BufReader(conn); + this.writer = new BufWriter(conn); + this._isClosed = false; + this._isConnected = true; + + try { + if (this.options.password != null) { + await this.authenticate(this.options.username, this.options.password); + } + if (this.options.db) { + await this.selectDb(this.options.db); + } + } catch (error) { + this.close(); + throw error; + } + this.retryCount = 0; + } catch (error) { + if (error instanceof AuthenticationError) { + this.retryCount = 0; + throw (error.cause ?? error); + } + + if (this.retryCount++ >= this.maxRetryCount) { + this.retryCount = 0; + throw error; + } + + const backoff = this.backoff(this.retryCount); + await delay(backoff); + await this.connect(); + } } close() { @@ -159,33 +178,15 @@ export class RedisConnection implements Connection { await this.sendCommand("PING"); this._isConnected = true; } catch (_error) { // TODO: Maybe we should log this error. - this._isConnected = false; - return new Promise((resolve, reject) => { - const _interval = setInterval(async () => { - if (this.retryCount > this.maxRetryCount) { - this.close(); - clearInterval(_interval); - reject(new Error("Could not reconnect")); - } - try { - this.close(); - await this.connect(); - await this.sendCommand("PING"); - this._isConnected = true; - this.retryCount = 0; - clearInterval(_interval); - resolve(); - } catch (_err) { - // retrying - } finally { - this.retryCount++; - } - }, this.retryInterval); - }); + this.close(); + await this.connect(); + await this.sendCommand("PING"); } } } +class AuthenticationError extends Error {} + function parsePortLike(port: string | number | undefined): number { let parsedPort: number; if (typeof port === "string") { diff --git a/mod.ts b/mod.ts index bda3f3e9..3c62da36 100644 --- a/mod.ts +++ b/mod.ts @@ -8,6 +8,7 @@ export { InvalidStateError, SubscriptionClosedError, } from "./errors.ts"; +export type { Backoff, ExponentialBackoffOptions } from "./backoff.ts"; export type { ACLLogMode, BitfieldOpts, diff --git a/tests/backoff_test.ts b/tests/backoff_test.ts new file mode 100644 index 00000000..143faaaa --- /dev/null +++ b/tests/backoff_test.ts @@ -0,0 +1,24 @@ +import { assertEquals } from "../vendor/https/deno.land/std/testing/asserts.ts"; +import { describe, it } from "../vendor/https/deno.land/std/testing/bdd.ts"; + +import { exponentialBackoff } from "../backoff.ts"; + +describe("backoff", { + permissions: "none", +}, () => { + describe("exponentialBackoff", () => { + it("should return exponentially increasing backoff intervals", () => { + const backoff = exponentialBackoff({ + multiplier: 2, + maxInterval: 5000, + minInterval: 1000, + }); + + assertEquals(backoff(1), 1000); + assertEquals(backoff(2), 2000); + assertEquals(backoff(3), 4000); + assertEquals(backoff(4), 5000); + assertEquals(backoff(5), 5000); + }); + }); +}); diff --git a/tests/commands/general.ts b/tests/commands/general.ts index 3d453540..7a682dac 100644 --- a/tests/commands/general.ts +++ b/tests/commands/general.ts @@ -99,7 +99,7 @@ export function generalTests( it(`invalid port: ${v}`, async () => { await assertRejects( async () => { - await newClient({ hostname: "127.0.0.1", port: v }); + await newClient({ hostname: "127.0.0.1", port: v, maxRetryCount: 0 }); }, Error, "invalid", diff --git a/tests/commands/pubsub.ts b/tests/commands/pubsub.ts index 1cdff599..602f3e34 100644 --- a/tests/commands/pubsub.ts +++ b/tests/commands/pubsub.ts @@ -82,7 +82,8 @@ export function pubsubTests( const port = nextPort(); let tempServer = await startRedis({ port }); const client = await newClient({ ...opts, port }); - const pub = await newClient({ ...opts, maxRetryCount: 10, port }); + const backoff = () => 1200; + const pub = await newClient({ ...opts, backoff, maxRetryCount: 10, port }); const sub = await client.psubscribe("ps*"); const it = sub.receive();