Skip to content

Commit

Permalink
bump: deno@v0.34.0 (#49)
Browse files Browse the repository at this point in the history
* wip

* Update redis.ts

* fixed reply types for some commands

* Update .denov
  • Loading branch information
keroxp authored Feb 21, 2020
1 parent 5d65f77 commit 58a197f
Show file tree
Hide file tree
Showing 14 changed files with 506 additions and 269 deletions.
2 changes: 1 addition & 1 deletion .denov
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v0.33.0
v0.34.0
15 changes: 9 additions & 6 deletions io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ import { BufReader, BufWriter } from "./vendor/https/deno.land/std/io/bufio.ts";
import Buffer = Deno.Buffer;
import { ErrorReplyError } from "./errors.ts";

export type BulkResult = string | undefined;
export type RedisRawReply =
| ["status", string]
| ["integer", number]
| ["bulk", string]
| ["bulk", BulkResult]
| ["array", any[]]
| ["error", ErrorReplyError];

Expand Down Expand Up @@ -39,7 +40,7 @@ export async function sendCommand(
writer: BufWriter,
reader: BufReader,
command: string,
...args
...args: (number | string)[]
): Promise<RedisRawReply> {
const msg = createRequest(command, ...args);
await writer.write(encoder.encode(msg));
Expand All @@ -64,6 +65,7 @@ export async function readReply(reader: BufReader): Promise<RedisRawReply> {
case ErrorReplyCode:
tryParseErrorReply(await readLine(reader));
}
throw new Error("Invalid state");
}

export async function readLine(reader: BufReader): Promise<string> {
Expand All @@ -81,6 +83,7 @@ export async function readLine(reader: BufReader): Promise<string> {
}
buf[loc++] = d;
}
throw new Error("Invalid state");
}

export async function readStatusReply(reader: BufReader): Promise<string> {
Expand All @@ -100,7 +103,7 @@ export async function readIntegerReply(reader: BufReader): Promise<number> {
tryParseErrorReply(line);
}

export async function readBulkReply(reader: BufReader): Promise<string> {
export async function readBulkReply(reader: BufReader): Promise<BulkResult> {
const line = await readLine(reader);
if (line[0] !== "$") {
tryParseErrorReply(line);
Expand All @@ -109,7 +112,7 @@ export async function readBulkReply(reader: BufReader): Promise<string> {
const size = parseInt(sizeStr);
if (size < 0) {
// nil bulk reply
return;
return undefined;
}
const dest = new Uint8Array(size + 2);
await reader.readFull(dest);
Expand All @@ -119,7 +122,7 @@ export async function readBulkReply(reader: BufReader): Promise<string> {
export async function readArrayReply(reader: BufReader): Promise<any[]> {
const line = await readLine(reader);
const argCount = parseInt(line.substr(1, line.length - 3));
const result = [];
const result: any[] = [];
for (let i = 0; i < argCount; i++) {
const res = await reader.peek(1);
if (res === Deno.EOF) {
Expand All @@ -143,7 +146,7 @@ export async function readArrayReply(reader: BufReader): Promise<any[]> {
return result;
}

function tryParseErrorReply(line: string) {
function tryParseErrorReply(line: string): never {
const code = line[0];
if (code === "-") {
throw new ErrorReplyError(line);
Expand Down
2 changes: 1 addition & 1 deletion modules-lock.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"https://deno.land/std": {
"version": "@v0.33.0",
"version": "@v0.34.0",
"modules": [
"/util/async.ts",
"/testing/asserts.ts",
Expand Down
2 changes: 1 addition & 1 deletion modules.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"https://deno.land/std": {
"version": "@v0.33.0",
"version": "@v0.34.0",
"modules": [
"/util/async.ts",
"/testing/asserts.ts",
Expand Down
21 changes: 17 additions & 4 deletions pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { deferred, Deferred } from "./vendor/https/deno.land/std/util/async.ts";

const encoder = new TextEncoder();
export type RedisPipeline = {
enqueue(command: string, ...args);
enqueue(command: string, ...args: (number | string)[]): void;
flush(): Promise<RedisRawReply[]>;
} & Redis;

Expand Down Expand Up @@ -54,11 +54,11 @@ export function createRedisPipeline(
}

const executor = {
enqueue(command: string, ...args) {
enqueue(command: string, ...args: (number | string)[]): void {
const msg = createRequest(command, ...args);
commands.push(msg);
},
async flush() {
async flush(): Promise<RedisRawReply[]> {
// wrap pipelined commands with MULTI/EXEC
if (opts && opts.tx) {
commands.splice(0, 0, createRequest("MULTI"));
Expand All @@ -80,6 +80,19 @@ export function createRedisPipeline(
return ["status", "OK"];
}
};
const fakeRedis = create(null, null, null, executor);
const d = dummyReadWriteCloser();
const fakeRedis = create(d, d, d, executor);
return Object.assign(fakeRedis, executor);
}

function dummyReadWriteCloser(): Deno.ReadWriteCloser {
return {
close() {},
async read(p) {
return 0;
},
async write(p) {
return 0;
}
};
}
13 changes: 10 additions & 3 deletions pipeline_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,18 @@ test("pipeline in concurrent", async () => {
"OK", //set(a)
"OK", //set(b)
"OK", //set(c)
[["status", "OK"], ["status", "OK"], ["status", "OK"]], //flush()
[
["status", "OK"],
["status", "OK"],
["status", "OK"]
], //flush()
"OK", // get(a)
"OK", // get(b)
"OK", //get(c)
[["bulk", "a"], ["bulk", "b"], ["bulk", "c"]] //flush()
[
["bulk", "a"],
["bulk", "b"],
["bulk", "c"]
] //flush()
]);
});

2 changes: 1 addition & 1 deletion pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class RedisSubscriptionImpl implements RedisSubscription {

constructor(private writer: BufWriter, private reader: BufReader) {}

async psubscribe(...patterns) {
async psubscribe(...patterns: string[]) {
await sendCommand(this.writer, this.reader, "PSUBSCRIBE", ...patterns);
for (const pat of patterns) {
this.channels[pat] = true;
Expand Down
9 changes: 4 additions & 5 deletions pubsub_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const addr = {
port: 6379
};

async function wait(duration) {
async function wait(duration: number) {
return new Promise(resolve => {
setTimeout(resolve, duration);
});
Expand All @@ -27,20 +27,19 @@ test(async function testSubscribe2() {
const redis = await connect(addr);
const pub = await connect(addr);
const sub = await redis.subscribe("subsc2");
let message: RedisPubSubMessage;
const p = (async function() {
const it = sub.receive();
message = (await it.next()).value;
return (await it.next()).value;
})();
await pub.publish("subsc2", "wayway");
await p;
const message = await p;
assertEquals(message, {
channel: "subsc2",
message: "wayway"
});
await sub.close();
const a = await redis.get("aaa");
assertEquals(a, void 0);
assertEquals(a, undefined);
pub.close();
redis.close();
});
Expand Down
Loading

0 comments on commit 58a197f

Please sign in to comment.