Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix HTTP client reading non-ascii journal content #47

Merged
merged 4 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion client/dist/esm/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,17 @@ export function decodeContent() {
transform(value, controller) {
// Base64 decode the `content` field and send it as a chunk.
if (value.content?.length) {
controller.enqueue(atob(value.content));
// The `atob` function does not work properly if the decoded content contains any byte
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we are not running in node but in the UI we have stopped using atob and btoa and replaced with 'Bufferand.toString('base64')`

https://dev.to/2ezpz2plzme/btoa-replacement-in-nodejs-3k6g

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind trying this out on my local and running the tests

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind - as that will be a pain polyfilling Node stuff.

// values over 0x7f, because "binary" in JS means that each byte gets represented as a
// UTF-16 code unit, which happens to be <= 0xff. I wish I was making this up:
// https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem
const binary = atob(value.content);
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < bytes.length; i++) {
bytes[i] = binary.charCodeAt(i);
}
const text = new TextDecoder().decode(bytes);
controller.enqueue(text);
}
},
});
Expand Down
12 changes: 11 additions & 1 deletion client/dist/script/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,17 @@ function decodeContent() {
transform(value, controller) {
// Base64 decode the `content` field and send it as a chunk.
if (value.content?.length) {
controller.enqueue(atob(value.content));
// The `atob` function does not work properly if the decoded content contains any byte
// values over 0x7f, because "binary" in JS means that each byte gets represented as a
// UTF-16 code unit, which happens to be <= 0xff. I wish I was making this up:
// https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem
const binary = atob(value.content);
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < bytes.length; i++) {
bytes[i] = binary.charCodeAt(i);
}
const text = new TextDecoder().decode(bytes);
controller.enqueue(text);
}
},
});
Expand Down
12 changes: 11 additions & 1 deletion client/dist/src/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ export function decodeContent() {
transform(value, controller) {
// Base64 decode the `content` field and send it as a chunk.
if (value.content?.length) {
controller.enqueue(atob(value.content));
// The `atob` function does not work properly if the decoded content contains any byte
// values over 0x7f, because "binary" in JS means that each byte gets represented as a
// UTF-16 code unit, which happens to be <= 0xff. I wish I was making this up:
// https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem
const binary = atob(value.content);
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < bytes.length; i++) {
bytes[i] = binary.charCodeAt(i);
}
const text = new TextDecoder().decode(bytes);
controller.enqueue(text);
}
},
});
Expand Down
12 changes: 11 additions & 1 deletion client/src/streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,17 @@ export function decodeContent() {
transform(value, controller) {
// Base64 decode the `content` field and send it as a chunk.
if (value.content?.length) {
controller.enqueue(atob(value.content));
// The `atob` function does not work properly if the decoded content contains any byte
// values over 0x7f, because "binary" in JS means that each byte gets represented as a
// UTF-16 code unit, which happens to be <= 0xff. I wish I was making this up:
// https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem
const binary = atob(value.content);
const bytes = new Uint8Array(binary.length);
for (let i = 0; i < bytes.length; i++) {
bytes[i] = binary.charCodeAt(i);
}
const text = new TextDecoder().decode(bytes);
controller.enqueue(text);
}
},
});
Expand Down
57 changes: 55 additions & 2 deletions client/test/journal_client_test.snap
Original file line number Diff line number Diff line change
@@ -1,4 +1,57 @@
{
'JournalClient.list collection selector test': [
'acmeCo/greetings/00ffffffffffffff/pivot=00',
],
'JournalClient.list exclusion selector test': [
'ops.us-central1.v1/logs/00ffffffffffffff/kind=capture/name=acmeCo%2Farabic-source-hello-world/pivot=00',
'ops.us-central1.v1/logs/00ffffffffffffff/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00',
],
'JournalClient.list name selector test': [
'acmeCo/greetings/00ffffffffffffff/pivot=00',
],
'JournalClient.list prefix selector test': [
'ops.us-central1.v1/logs/00ffffffffffffff/kind=capture/name=acmeCo%2Farabic-source-hello-world/pivot=00',
'ops.us-central1.v1/logs/00ffffffffffffff/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00',
'ops.us-central1.v1/stats/00ffffffffffffff/kind=capture/name=acmeCo%2Farabic-source-hello-world/pivot=00',
'ops.us-central1.v1/stats/00ffffffffffffff/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00',
],
'JournalClient.read Arabic content test': [
{
_meta: {
uuid: '[MASKED string]',
},
message: 'مرحبا #1',
ts: '[MASKED string]',
},
{
_meta: {
uuid: '[MASKED string]',
},
message: 'مرحبا #2',
ts: '[MASKED string]',
},
{
_meta: {
uuid: '[MASKED string]',
},
message: 'مرحبا #3',
ts: '[MASKED string]',
},
{
_meta: {
uuid: '[MASKED string]',
},
message: 'مرحبا #4',
ts: '[MASKED string]',
},
{
_meta: {
uuid: '[MASKED string]',
},
message: 'مرحبا #5',
ts: '[MASKED string]',
},
],
'JournalClient.read content test': [
{
_meta: {
Expand Down Expand Up @@ -40,7 +93,7 @@
{
status: 'OK',
fragment: {
journal: 'acmeCo/greetings/pivot=00',
journal: 'acmeCo/greetings/00ffffffffffffff/pivot=00',
compressionCodec: 'GZIP',
},
},
Expand All @@ -51,7 +104,7 @@
{
status: 'OK',
fragment: {
journal: 'acmeCo/greetings/pivot=00',
journal: 'acmeCo/greetings/00ffffffffffffff/pivot=00',
compressionCodec: 'GZIP',
},
},
Expand Down
52 changes: 29 additions & 23 deletions client/test/journal_client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,53 +11,41 @@ import { JournalClient, parseJournalDocuments } from "../src/journal_client.ts";
import { JournalSelector } from "../src/selector.ts";
import { readStreamToEnd } from "../src/streams.ts";

Deno.test("JournalClient.list collection selector test", async () => {
snapshotTest("JournalClient.list collection selector test", async ({assertSnapshot}) => {
const client = new JournalClient(BASE_URL, await makeJwt({}));

const name = new JournalSelector().collection("acmeCo/greetings");
const journals = (await client.list(name)).unwrap();

assertEquals(1, journals.length);
assertEquals("acmeCo/greetings/pivot=00", journals[0].name);
assertSnapshot(journals.map((j)=>j.name));
});

Deno.test("JournalClient.list name selector test", async () => {
snapshotTest("JournalClient.list name selector test", async ({assertSnapshot}) => {
const client = new JournalClient(BASE_URL, await makeJwt({}));

const name = new JournalSelector().name("acmeCo/greetings/pivot=00");
const name = new JournalSelector().name("acmeCo/greetings/00ffffffffffffff/pivot=00");
const journals = (await client.list(name)).unwrap();

assertEquals(1, journals.length);
assertEquals("acmeCo/greetings/pivot=00", journals[0].name);
assertSnapshot(journals.map((j)=>j.name));
});

Deno.test("JournalClient.list prefix selector test", async () => {
snapshotTest("JournalClient.list prefix selector test", async ({assertSnapshot}) => {
const client = new JournalClient(BASE_URL, await makeJwt({}));
const expectedJournals = [
"ops.us-central1.v1/logs/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00",
"ops.us-central1.v1/stats/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00",
];

const prefixSelector = JournalSelector.prefix("ops.us-central1.v1/");
const journals = (await client.list(prefixSelector)).unwrap();

assertEquals(2, journals.length);
assertEquals(expectedJournals, journals.map((j) => j.name).sort());
assertSnapshot(journals.map((j)=>j.name));
});

Deno.test("JournalClient.list exclusion selector test", async () => {
snapshotTest("JournalClient.list exclusion selector test", async ({assertSnapshot}) => {
const client = new JournalClient(BASE_URL, await makeJwt({}));

const prefixSelector = JournalSelector.prefix("ops.us-central1.v1/");
const excludedSelector = new JournalSelector().collection("ops.us-central1.v1/stats");
const journals = (await client.list(prefixSelector, excludedSelector))
.unwrap();

assertEquals(1, journals.length);
assertEquals(
"ops.us-central1.v1/logs/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00",
journals[0].name,
);
assertSnapshot(journals.map((j)=>j.name));
});

Deno.test("JournalClient.list wrong signing key", async () => {
Expand Down Expand Up @@ -101,7 +89,7 @@ Deno.test("JournalClient.list unauthorized prefix", async () => {
snapshotTest("JournalClient.read test", async ({ assertSnapshot }) => {
const client = new JournalClient(BASE_URL, await makeJwt({}));

const req = { journal: "acmeCo/greetings/pivot=00", endOffset: "1024" };
const req = { journal: "acmeCo/greetings/00ffffffffffffff/pivot=00", endOffset: "1024" };
const stream = (await client.read(req)).map_err(console.error).unwrap();
const results: Array<broker.ProtocolReadResponse> = await readStreamToEnd(
stream,
Expand All @@ -127,7 +115,25 @@ snapshotTest("JournalClient.read content test", async ({ assertSnapshot }) => {
const client = new JournalClient(BASE_URL, await makeJwt({}));

const req = {
journal: "acmeCo/greetings/pivot=00",
journal: "acmeCo/greetings/00ffffffffffffff/pivot=00",
offset: "10",
endOffset: "1024",
};
const stream = (await client.read(req)).unwrap();
const docStream = parseJournalDocuments(stream!);
const results = await readStreamToEnd(docStream);

let filtered_results = results.filter(r=>!(r._meta as any).ack).slice(0,5)

const masks = ["/*/_meta/uuid", "/*/ts"];
assertSnapshot(filtered_results, masks);
});

snapshotTest("JournalClient.read Arabic content test", async ({ assertSnapshot }) => {
const client = new JournalClient(BASE_URL, await makeJwt({}));

const req = {
journal: "acmeCo/arabic-greetings/00ffffffffffffff/pivot=00",
offset: "10",
endOffset: "1024",
};
Expand Down
70 changes: 68 additions & 2 deletions client/test/shard_client_test.snap
Original file line number Diff line number Diff line change
@@ -1,13 +1,79 @@
{
'ShardClient.list task selector test': [
{
spec: {
id: 'capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000',
sources: [],
recoveryLogPrefix: 'recovery',
hintPrefix: '/estuary/flow/hints',
hintBackups: 2,
maxTxnDuration: '1s',
minTxnDuration: '0s',
disable: false,
hotStandbys: 0,
labels: {
labels: [
{
name: 'app.gazette.dev/managed-by',
value: 'estuary.dev/flow',
},
{
name: 'estuary.dev/build',
value: '0000000000000000',
},
{
name: 'estuary.dev/key-begin',
value: '00000000',
},
{
name: 'estuary.dev/key-end',
value: 'ffffffff',
},
{
name: 'estuary.dev/log-level',
value: 'info',
},
{
name: 'estuary.dev/rclock-begin',
value: '00000000',
},
{
name: 'estuary.dev/rclock-end',
value: 'ffffffff',
},
{
name: 'estuary.dev/task-name',
value: 'acmeCo/source-hello-world',
},
{
name: 'estuary.dev/task-type',
value: 'capture',
},
],
},
disableWaitForAck: false,
ringBufferSize: 65536,
readChannelSize: 4096,
},
status: [
{
code: 'FAILED',
errors: [
'runTransactions: txnStartCommit: store.StartCommit: failed to write atomic RocksDB commit\n\nCaused by:\n IO error: No such file or directory: While open a file for appending: /home/travis/code/data-plane-gateway/test/tmp/capture_acmeCo_source-hello-world_00ffffffffffffff_00000000-00000000-1644699521/000008.log: No such file or directory',
],
},
],
},
],
'ShardClient.stat test': {
status: 'OK',
readThrough: {
'acmeCo/source-hello-world/txn': '[MASKED string]',
'acmeCo/source-hello-world/eof': 'who cares',
},
publishAt: {
'acmeCo/greetings/pivot=00': '[MASKED string]',
'ops.us-central1.v1/stats/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00': '[MASKED string]',
'acmeCo/greetings/00ffffffffffffff/pivot=00': '[MASKED string]',
'ops.us-central1.v1/stats/00ffffffffffffff/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00': '[MASKED string]',
},
},
}
14 changes: 7 additions & 7 deletions client/test/shard_client_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Deno.test("ShardClient.list task selector test", async () => {

assertEquals(1, shards.length);
assertEquals(
"capture/acmeCo/source-hello-world/00000000-00000000",
"capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000",
shards[0].spec.id,
);
assertEquals("PRIMARY", shards[0].status[0].code);
Expand All @@ -25,7 +25,7 @@ Deno.test("ShardClient.list task selector test", async () => {
Deno.test("ShardClient.list bare id selector test", async () => {
const client = new ShardClient(BASE_URL, await makeJwt({}));
const idSelector = new ShardSelector().id(
"capture/acmeCo/source-hello-world/00000000-00000000",
"capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000",
);

const error = (await client.list(idSelector)).unwrap_err();
Expand All @@ -39,13 +39,13 @@ Deno.test("ShardClient.list compound id selector test", async () => {
.task("acmeCo/yet-another-task")
.task("acmeCo/source-hello-world")
.task("acmeCo/verifies-label-sorting")
.id("capture/acmeCo/source-hello-world/00000000-00000000");
.id("capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000");

const shards = (await client.list(idSelector)).unwrap();

assertEquals(1, shards.length);
assertEquals(
"capture/acmeCo/source-hello-world/00000000-00000000",
"capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000",
shards[0].spec.id,
);
});
Expand All @@ -54,7 +54,7 @@ snapshotTest("ShardClient.stat test", async ({ assertSnapshot }) => {
const client = new ShardClient(BASE_URL, await makeJwt({prefixes: ["capture/acmeCo/"]}));

const stats = (await client.stat(
"capture/acmeCo/source-hello-world/00000000-00000000",
"capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000",
{},
)).unwrap();

Expand All @@ -74,8 +74,8 @@ snapshotTest("ShardClient.stat test", async ({ assertSnapshot }) => {

const masks = [
"/readThrough/acmeCo\/source-hello-world\/txn",
"/publishAt/acmeCo\/greetings\/pivot=00",
"/publishAt/ops.us-central1.v1\/stats\/kind=capture\/name=acmeCo%2Fsource-hello-world\/pivot=00",
"/publishAt/acmeCo\/greetings\/00ffffffffffffff\/pivot=00",
"/publishAt/ops.us-central1.v1\/stats\/00ffffffffffffff\/kind=capture\/name=acmeCo%2Fsource-hello-world\/pivot=00",
];
assertSnapshot(pluck(stats), masks);
});
Loading
Loading