Skip to content

Commit

Permalink
BREAKING: Improve retry mechanism (#311)
Browse files Browse the repository at this point in the history
- Implemented retry mechanism with exponential backoff algorithm
- (**BREAKING**) Removed `RedisConnectionOptions.retryInterval`
- Added `RedisConnectionOptions.backoff`
  • Loading branch information
uki00a authored Jun 4, 2022
1 parent 84184f8 commit f85edec
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 84 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ To consume a redis cluster, you can use

### Retriable connection

By default, a client's connection will retry a command execution if the server
dies or the network becomes unavailable. You can change the maximum number of
retries by setting `maxRetryCount` (It's default to `10`):
By default, a client's connection will retry a command execution based on
exponential backoff algorithm if the server dies or the network becomes
unavailable. You can change the maximum number of retries by setting
`maxRetryCount` (It's default to `10`):

```ts
import { connect } from "https://deno.land/x/redis/mod.ts";
Expand Down
32 changes: 32 additions & 0 deletions backoff.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
export interface Backoff {
/**
* Returns the next backoff interval in milliseconds
*/
(attempts: number): number;
}

export interface ExponentialBackoffOptions {
/**
* @default 2
*/
multiplier: number;
/**
* The maximum backoff interval in milliseconds
* @default 5000
*/
maxInterval: number;
/**
* The minimum backoff interval in milliseconds
* @default 500
*/
minInterval: number;
}

export function exponentialBackoff({
multiplier = 2,
maxInterval = 5000,
minInterval = 500,
}: Partial<ExponentialBackoffOptions> = {}): Backoff {
return (attempts) =>
Math.min(maxInterval, minInterval * (multiplier ** (attempts - 1)));
}
159 changes: 80 additions & 79 deletions connection.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import { sendCommand } from "./protocol/mod.ts";
import type { Raw, RedisValue } from "./protocol/mod.ts";
import type { Backoff } from "./backoff.ts";
import { exponentialBackoff } from "./backoff.ts";
import { ErrorReplyError } from "./errors.ts";
import {
BufReader,
BufWriter,
} from "./vendor/https/deno.land/std/io/buffer.ts";
import { delay } from "./vendor/https/deno.land/std/async/delay.ts";
type Closer = Deno.Closer;

export interface Connection {
closer: Closer;
reader: BufReader;
writer: BufWriter;
maxRetryCount: number;
retryInterval: number;
isClosed: boolean;
isConnected: boolean;
isRetriable: boolean;
Expand All @@ -26,9 +29,11 @@ export interface RedisConnectionOptions {
password?: string;
username?: string;
name?: string;
/**
* @default 10
*/
maxRetryCount?: number;
// TODO: Provide more flexible retry strategy
retryInterval?: number;
backoff?: Backoff;
}

export class RedisConnection implements Connection {
Expand All @@ -37,12 +42,13 @@ export class RedisConnection implements Connection {
reader!: BufReader;
writer!: BufWriter;
maxRetryCount = 10;
retryInterval = 1200;

private readonly hostname: string;
private readonly port: number | string;
private retryCount = 0;
private _isClosed = false;
private _isConnected = false;
private connectThunkified: () => Promise<RedisConnection>;
private backoff: Backoff;

get isClosed(): boolean {
return this._isClosed;
Expand All @@ -61,62 +67,34 @@ export class RedisConnection implements Connection {
port: number | string,
private options: RedisConnectionOptions,
) {
this.connectThunkified = this.thunkifyConnect(hostname, port, options);
}

private thunkifyConnect(
hostname: string,
port: string | number,
options: RedisConnectionOptions,
): () => Promise<RedisConnection> {
return async () => {
const dialOpts: Deno.ConnectOptions = {
hostname,
port: parsePortLike(port),
};
const conn: Deno.Conn = options?.tls
? await Deno.connectTls(dialOpts)
: await Deno.connect(dialOpts);

if (options.name) {
this.name = options.name;
}
if (options.maxRetryCount) {
this.maxRetryCount = options.maxRetryCount;
}
if (options.retryInterval) {
this.retryInterval = options.retryInterval;
}

this.closer = conn;
this.reader = new BufReader(conn);
this.writer = new BufWriter(conn);
this._isClosed = false;
this._isConnected = true;

try {
if (options?.password != null) {
await this.authenticate(options?.username, options?.password);
}
if (options?.db) {
await this.selectDb(options.db);
}
} catch (error) {
this.close();
throw error;
}

return this as RedisConnection;
};
this.hostname = hostname;
this.port = port;
if (options.name) {
this.name = options.name;
}
if (options.maxRetryCount != null) {
this.maxRetryCount = options.maxRetryCount;
}
this.backoff = options.backoff ?? exponentialBackoff();
}

private async authenticate(
username: string | undefined,
password: string,
): Promise<void> {
password && username
? await this.sendCommand("AUTH", username, password)
: await this.sendCommand("AUTH", password);
try {
password && username
? await this.sendCommand("AUTH", username, password)
: await this.sendCommand("AUTH", password);
} catch (error) {
if (error instanceof ErrorReplyError) {
throw new AuthenticationError("Authentication failed", {
cause: error,
});
} else {
throw error;
}
}
}

private async selectDb(
Expand All @@ -138,7 +116,48 @@ export class RedisConnection implements Connection {
* Connect to Redis server
*/
async connect(): Promise<void> {
await this.connectThunkified();
try {
const dialOpts: Deno.ConnectOptions = {
hostname: this.hostname,
port: parsePortLike(this.port),
};
const conn: Deno.Conn = this.options?.tls
? await Deno.connectTls(dialOpts)
: await Deno.connect(dialOpts);

this.closer = conn;
this.reader = new BufReader(conn);
this.writer = new BufWriter(conn);
this._isClosed = false;
this._isConnected = true;

try {
if (this.options.password != null) {
await this.authenticate(this.options.username, this.options.password);
}
if (this.options.db) {
await this.selectDb(this.options.db);
}
} catch (error) {
this.close();
throw error;
}
this.retryCount = 0;
} catch (error) {
if (error instanceof AuthenticationError) {
this.retryCount = 0;
throw (error.cause ?? error);
}

if (this.retryCount++ >= this.maxRetryCount) {
this.retryCount = 0;
throw error;
}

const backoff = this.backoff(this.retryCount);
await delay(backoff);
await this.connect();
}
}

close() {
Expand All @@ -159,33 +178,15 @@ export class RedisConnection implements Connection {
await this.sendCommand("PING");
this._isConnected = true;
} catch (_error) { // TODO: Maybe we should log this error.
this._isConnected = false;
return new Promise((resolve, reject) => {
const _interval = setInterval(async () => {
if (this.retryCount > this.maxRetryCount) {
this.close();
clearInterval(_interval);
reject(new Error("Could not reconnect"));
}
try {
this.close();
await this.connect();
await this.sendCommand("PING");
this._isConnected = true;
this.retryCount = 0;
clearInterval(_interval);
resolve();
} catch (_err) {
// retrying
} finally {
this.retryCount++;
}
}, this.retryInterval);
});
this.close();
await this.connect();
await this.sendCommand("PING");
}
}
}

class AuthenticationError extends Error {}

function parsePortLike(port: string | number | undefined): number {
let parsedPort: number;
if (typeof port === "string") {
Expand Down
1 change: 1 addition & 0 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export {
InvalidStateError,
SubscriptionClosedError,
} from "./errors.ts";
export type { Backoff, ExponentialBackoffOptions } from "./backoff.ts";
export type {
ACLLogMode,
BitfieldOpts,
Expand Down
24 changes: 24 additions & 0 deletions tests/backoff_test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { assertEquals } from "../vendor/https/deno.land/std/testing/asserts.ts";
import { describe, it } from "../vendor/https/deno.land/std/testing/bdd.ts";

import { exponentialBackoff } from "../backoff.ts";

describe("backoff", {
permissions: "none",
}, () => {
describe("exponentialBackoff", () => {
it("should return exponentially increasing backoff intervals", () => {
const backoff = exponentialBackoff({
multiplier: 2,
maxInterval: 5000,
minInterval: 1000,
});

assertEquals(backoff(1), 1000);
assertEquals(backoff(2), 2000);
assertEquals(backoff(3), 4000);
assertEquals(backoff(4), 5000);
assertEquals(backoff(5), 5000);
});
});
});
2 changes: 1 addition & 1 deletion tests/commands/general.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export function generalTests(
it(`invalid port: ${v}`, async () => {
await assertRejects(
async () => {
await newClient({ hostname: "127.0.0.1", port: v });
await newClient({ hostname: "127.0.0.1", port: v, maxRetryCount: 0 });
},
Error,
"invalid",
Expand Down
3 changes: 2 additions & 1 deletion tests/commands/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ export function pubsubTests(
const port = nextPort();
let tempServer = await startRedis({ port });
const client = await newClient({ ...opts, port });
const pub = await newClient({ ...opts, maxRetryCount: 10, port });
const backoff = () => 1200;
const pub = await newClient({ ...opts, backoff, maxRetryCount: 10, port });
const sub = await client.psubscribe("ps*");
const it = sub.receive();

Expand Down

0 comments on commit f85edec

Please sign in to comment.