From d7c561b5445487c8bf741010fac509cc17524cbb Mon Sep 17 00:00:00 2001 From: Travis Jenkins Date: Mon, 8 Jul 2024 14:16:12 -0400 Subject: [PATCH 1/4] Adding some tests for Arabic characters (have not ran these yet) --- client/test/journal_client_test.ts | 18 +++++++++++++++++ test.sh | 7 +++++++ .../arabic-source-hello-world.flow.yaml | 20 +++++++++++++++++++ 3 files changed, 45 insertions(+) create mode 100644 test/acmeCo/arabic-source-hello-world.flow.yaml diff --git a/client/test/journal_client_test.ts b/client/test/journal_client_test.ts index 2b63fb8..e02a627 100644 --- a/client/test/journal_client_test.ts +++ b/client/test/journal_client_test.ts @@ -141,6 +141,24 @@ snapshotTest("JournalClient.read content test", async ({ assertSnapshot }) => { assertSnapshot(filtered_results, masks); }); +snapshotTest("JournalClient.read Arabic content test", async ({ assertSnapshot }) => { + const client = new JournalClient(BASE_URL, await makeJwt({})); + + const req = { + journal: "acmeCo/arabic-greetings/pivot=00", + offset: "10", + endOffset: "1024", + }; + const stream = (await client.read(req)).unwrap(); + const docStream = parseJournalDocuments(stream!); + const results = await readStreamToEnd(docStream); + + let filtered_results = results.filter(r=>!(r._meta as any).ack).slice(0,5) + + const masks = ["/*/_meta/uuid", "/*/ts"]; + assertSnapshot(filtered_results, masks); +}); + Deno.test("JournalClient.read unauthorized prefix", async () => { const client = new JournalClient(BASE_URL, await makeJwt({})); diff --git a/test.sh b/test.sh index 425b4dc..97a8392 100755 --- a/test.sh +++ b/test.sh @@ -63,6 +63,7 @@ export GATEWAY_PORT=28318 export BUILD_ID=test-build-id export CATALOG_SOURCE="test/acmeCo/source-hello-world.flow.yaml" +export CATALOG_SOURCE_ARABIC="test/acmeCo/arabic-source-hello-world.flow.yaml" # This is needed in order for docker run commands to work on ARM macs. export DOCKER_DEFAULT_PLATFORM="linux/amd64" @@ -117,6 +118,12 @@ ${FLOW_BIN} api build \ --build-id=${BUILD_ID} \ --source=${CATALOG_SOURCE} || bail "Build failed." +# Build the catalog for some minor multi language testing +${FLOW_BIN} api build \ + --build-db=${TESTDIR}/builds/${BUILD_ID} \ + --build-id=${BUILD_ID} \ + --source=${CATALOG_SOURCE_ARABIC} || bail "Build failed." + log "Build finished" # Activate the catalog. diff --git a/test/acmeCo/arabic-source-hello-world.flow.yaml b/test/acmeCo/arabic-source-hello-world.flow.yaml new file mode 100644 index 0000000..52cef09 --- /dev/null +++ b/test/acmeCo/arabic-source-hello-world.flow.yaml @@ -0,0 +1,20 @@ +storageMappings: + "": + stores: + - provider: GCS + bucket: example +collections: + acmeCo/arabic-greetings: + schema: greetings.schema.yaml + key: [/ts] +captures: + acmeCo/arabic-source-hello-world: + endpoint: + connector: + image: ghcr.io/estuary/source-hello-world:dev + config: source-hello-world.config.yaml + bindings: + - resource: + name: greetings + prefix: 'مرحبا #{}' + target: acmeCo/arabic-greetings From 8925e13e1a30f99cf45e57a3ab807f001803b78e Mon Sep 17 00:00:00 2001 From: Travis Jenkins Date: Mon, 8 Jul 2024 17:24:27 -0400 Subject: [PATCH 2/4] Adding tests for arabic content Switching more tests to snapshots to test a wider range of things all at once Getting some tests updated to handle the new control plane --- client/test/journal_client_test.snap | 57 ++++++++++++++- client/test/journal_client_test.ts | 36 ++++------ client/test/shard_client_test.snap | 70 ++++++++++++++++++- client/test/shard_client_test.ts | 17 ++--- test.sh | 2 +- .../arabic-source-hello-world.flow.yaml | 5 -- test/acmeCo/source-hello-world.flow.yaml | 5 -- 7 files changed, 145 insertions(+), 47 deletions(-) diff --git a/client/test/journal_client_test.snap b/client/test/journal_client_test.snap index 902589f..da8e8d5 100644 --- a/client/test/journal_client_test.snap +++ b/client/test/journal_client_test.snap @@ -1,4 +1,57 @@ { + 'JournalClient.list collection selector test': [ + 'acmeCo/greetings/00ffffffffffffff/pivot=00', + ], + 'JournalClient.list exclusion selector test': [ + 'ops.us-central1.v1/logs/00ffffffffffffff/kind=capture/name=acmeCo%2Farabic-source-hello-world/pivot=00', + 'ops.us-central1.v1/logs/00ffffffffffffff/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00', + ], + 'JournalClient.list name selector test': [ + 'acmeCo/greetings/00ffffffffffffff/pivot=00', + ], + 'JournalClient.list prefix selector test': [ + 'ops.us-central1.v1/logs/00ffffffffffffff/kind=capture/name=acmeCo%2Farabic-source-hello-world/pivot=00', + 'ops.us-central1.v1/logs/00ffffffffffffff/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00', + 'ops.us-central1.v1/stats/00ffffffffffffff/kind=capture/name=acmeCo%2Farabic-source-hello-world/pivot=00', + 'ops.us-central1.v1/stats/00ffffffffffffff/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00', + ], + 'JournalClient.read Arabic content test': [ + { + _meta: { + uuid: '[MASKED string]', + }, + message: 'مرحبا #1', + ts: '[MASKED string]', + }, + { + _meta: { + uuid: '[MASKED string]', + }, + message: 'مرحبا #2', + ts: '[MASKED string]', + }, + { + _meta: { + uuid: '[MASKED string]', + }, + message: 'مرحبا #3', + ts: '[MASKED string]', + }, + { + _meta: { + uuid: '[MASKED string]', + }, + message: 'مرحبا #4', + ts: '[MASKED string]', + }, + { + _meta: { + uuid: '[MASKED string]', + }, + message: 'مرحبا #5', + ts: '[MASKED string]', + }, + ], 'JournalClient.read content test': [ { _meta: { @@ -40,7 +93,7 @@ { status: 'OK', fragment: { - journal: 'acmeCo/greetings/pivot=00', + journal: 'acmeCo/greetings/00ffffffffffffff/pivot=00', compressionCodec: 'GZIP', }, }, @@ -51,7 +104,7 @@ { status: 'OK', fragment: { - journal: 'acmeCo/greetings/pivot=00', + journal: 'acmeCo/greetings/00ffffffffffffff/pivot=00', compressionCodec: 'GZIP', }, }, diff --git a/client/test/journal_client_test.ts b/client/test/journal_client_test.ts index e02a627..c655b1a 100644 --- a/client/test/journal_client_test.ts +++ b/client/test/journal_client_test.ts @@ -11,41 +11,33 @@ import { JournalClient, parseJournalDocuments } from "../src/journal_client.ts"; import { JournalSelector } from "../src/selector.ts"; import { readStreamToEnd } from "../src/streams.ts"; -Deno.test("JournalClient.list collection selector test", async () => { +snapshotTest("JournalClient.list collection selector test", async ({assertSnapshot}) => { const client = new JournalClient(BASE_URL, await makeJwt({})); const name = new JournalSelector().collection("acmeCo/greetings"); const journals = (await client.list(name)).unwrap(); - assertEquals(1, journals.length); - assertEquals("acmeCo/greetings/pivot=00", journals[0].name); + assertSnapshot(journals.map((j)=>j.name)); }); -Deno.test("JournalClient.list name selector test", async () => { +snapshotTest("JournalClient.list name selector test", async ({assertSnapshot}) => { const client = new JournalClient(BASE_URL, await makeJwt({})); - const name = new JournalSelector().name("acmeCo/greetings/pivot=00"); + const name = new JournalSelector().name("acmeCo/greetings/00ffffffffffffff/pivot=00"); const journals = (await client.list(name)).unwrap(); - assertEquals(1, journals.length); - assertEquals("acmeCo/greetings/pivot=00", journals[0].name); + assertSnapshot(journals.map((j)=>j.name)); }); -Deno.test("JournalClient.list prefix selector test", async () => { +snapshotTest("JournalClient.list prefix selector test", async ({assertSnapshot}) => { const client = new JournalClient(BASE_URL, await makeJwt({})); - const expectedJournals = [ - "ops.us-central1.v1/logs/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00", - "ops.us-central1.v1/stats/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00", - ]; - const prefixSelector = JournalSelector.prefix("ops.us-central1.v1/"); const journals = (await client.list(prefixSelector)).unwrap(); - assertEquals(2, journals.length); - assertEquals(expectedJournals, journals.map((j) => j.name).sort()); + assertSnapshot(journals.map((j)=>j.name)); }); -Deno.test("JournalClient.list exclusion selector test", async () => { +snapshotTest("JournalClient.list exclusion selector test", async ({assertSnapshot}) => { const client = new JournalClient(BASE_URL, await makeJwt({})); const prefixSelector = JournalSelector.prefix("ops.us-central1.v1/"); @@ -53,11 +45,7 @@ Deno.test("JournalClient.list exclusion selector test", async () => { const journals = (await client.list(prefixSelector, excludedSelector)) .unwrap(); - assertEquals(1, journals.length); - assertEquals( - "ops.us-central1.v1/logs/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00", - journals[0].name, - ); + assertSnapshot(journals.map((j)=>j.name)); }); Deno.test("JournalClient.list wrong signing key", async () => { @@ -101,7 +89,7 @@ Deno.test("JournalClient.list unauthorized prefix", async () => { snapshotTest("JournalClient.read test", async ({ assertSnapshot }) => { const client = new JournalClient(BASE_URL, await makeJwt({})); - const req = { journal: "acmeCo/greetings/pivot=00", endOffset: "1024" }; + const req = { journal: "acmeCo/greetings/00ffffffffffffff/pivot=00", endOffset: "1024" }; const stream = (await client.read(req)).map_err(console.error).unwrap(); const results: Array = await readStreamToEnd( stream, @@ -127,7 +115,7 @@ snapshotTest("JournalClient.read content test", async ({ assertSnapshot }) => { const client = new JournalClient(BASE_URL, await makeJwt({})); const req = { - journal: "acmeCo/greetings/pivot=00", + journal: "acmeCo/greetings/00ffffffffffffff/pivot=00", offset: "10", endOffset: "1024", }; @@ -145,7 +133,7 @@ snapshotTest("JournalClient.read Arabic content test", async ({ assertSnapshot } const client = new JournalClient(BASE_URL, await makeJwt({})); const req = { - journal: "acmeCo/arabic-greetings/pivot=00", + journal: "acmeCo/arabic-greetings/00ffffffffffffff/pivot=00", offset: "10", endOffset: "1024", }; diff --git a/client/test/shard_client_test.snap b/client/test/shard_client_test.snap index d63a541..9ed6067 100644 --- a/client/test/shard_client_test.snap +++ b/client/test/shard_client_test.snap @@ -1,4 +1,70 @@ { + 'ShardClient.list task selector test': [ + { + spec: { + id: 'capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000', + sources: [], + recoveryLogPrefix: 'recovery', + hintPrefix: '/estuary/flow/hints', + hintBackups: 2, + maxTxnDuration: '1s', + minTxnDuration: '0s', + disable: false, + hotStandbys: 0, + labels: { + labels: [ + { + name: 'app.gazette.dev/managed-by', + value: 'estuary.dev/flow', + }, + { + name: 'estuary.dev/build', + value: '0000000000000000', + }, + { + name: 'estuary.dev/key-begin', + value: '00000000', + }, + { + name: 'estuary.dev/key-end', + value: 'ffffffff', + }, + { + name: 'estuary.dev/log-level', + value: 'info', + }, + { + name: 'estuary.dev/rclock-begin', + value: '00000000', + }, + { + name: 'estuary.dev/rclock-end', + value: 'ffffffff', + }, + { + name: 'estuary.dev/task-name', + value: 'acmeCo/source-hello-world', + }, + { + name: 'estuary.dev/task-type', + value: 'capture', + }, + ], + }, + disableWaitForAck: false, + ringBufferSize: 65536, + readChannelSize: 4096, + }, + status: [ + { + code: 'FAILED', + errors: [ + 'runTransactions: txnStartCommit: store.StartCommit: failed to write atomic RocksDB commit\n\nCaused by:\n IO error: No such file or directory: While open a file for appending: /home/travis/code/data-plane-gateway/test/tmp/capture_acmeCo_source-hello-world_00ffffffffffffff_00000000-00000000-1644699521/000008.log: No such file or directory', + ], + }, + ], + }, + ], 'ShardClient.stat test': { status: 'OK', readThrough: { @@ -6,8 +72,8 @@ 'acmeCo/source-hello-world/eof': 'who cares', }, publishAt: { - 'acmeCo/greetings/pivot=00': '[MASKED string]', - 'ops.us-central1.v1/stats/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00': '[MASKED string]', + 'acmeCo/greetings/00ffffffffffffff/pivot=00': '[MASKED string]', + 'ops.us-central1.v1/stats/00ffffffffffffff/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00': '[MASKED string]', }, }, } \ No newline at end of file diff --git a/client/test/shard_client_test.ts b/client/test/shard_client_test.ts index efc1a9c..979744b 100644 --- a/client/test/shard_client_test.ts +++ b/client/test/shard_client_test.ts @@ -8,15 +8,16 @@ import { ShardClient } from "../src/shard_client.ts"; import { ShardSelector } from "../src/selector.ts"; -Deno.test("ShardClient.list task selector test", async () => { +snapshotTest("ShardClient.list task selector test", async ({ assertSnapshot }) => { const client = new ShardClient(BASE_URL, await makeJwt({})); const taskSelector = new ShardSelector().task("acmeCo/source-hello-world"); const shards = (await client.list(taskSelector)).unwrap(); + assertSnapshot(shards); assertEquals(1, shards.length); assertEquals( - "capture/acmeCo/source-hello-world/00000000-00000000", + "capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000", shards[0].spec.id, ); assertEquals("PRIMARY", shards[0].status[0].code); @@ -25,7 +26,7 @@ Deno.test("ShardClient.list task selector test", async () => { Deno.test("ShardClient.list bare id selector test", async () => { const client = new ShardClient(BASE_URL, await makeJwt({})); const idSelector = new ShardSelector().id( - "capture/acmeCo/source-hello-world/00000000-00000000", + "capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000", ); const error = (await client.list(idSelector)).unwrap_err(); @@ -39,13 +40,13 @@ Deno.test("ShardClient.list compound id selector test", async () => { .task("acmeCo/yet-another-task") .task("acmeCo/source-hello-world") .task("acmeCo/verifies-label-sorting") - .id("capture/acmeCo/source-hello-world/00000000-00000000"); + .id("capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000"); const shards = (await client.list(idSelector)).unwrap(); assertEquals(1, shards.length); assertEquals( - "capture/acmeCo/source-hello-world/00000000-00000000", + "capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000", shards[0].spec.id, ); }); @@ -54,7 +55,7 @@ snapshotTest("ShardClient.stat test", async ({ assertSnapshot }) => { const client = new ShardClient(BASE_URL, await makeJwt({prefixes: ["capture/acmeCo/"]})); const stats = (await client.stat( - "capture/acmeCo/source-hello-world/00000000-00000000", + "capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000", {}, )).unwrap(); @@ -74,8 +75,8 @@ snapshotTest("ShardClient.stat test", async ({ assertSnapshot }) => { const masks = [ "/readThrough/acmeCo\/source-hello-world\/txn", - "/publishAt/acmeCo\/greetings\/pivot=00", - "/publishAt/ops.us-central1.v1\/stats\/kind=capture\/name=acmeCo%2Fsource-hello-world\/pivot=00", + "/publishAt/acmeCo\/greetings\/00ffffffffffffff\/pivot=00", + "/publishAt/ops.us-central1.v1\/stats\/00ffffffffffffff\/kind=capture\/name=acmeCo%2Fsource-hello-world\/pivot=00", ]; assertSnapshot(pluck(stats), masks); }); diff --git a/test.sh b/test.sh index 97a8392..a8bde62 100755 --- a/test.sh +++ b/test.sh @@ -61,7 +61,7 @@ export BROKER_ADDRESS=unix://localhost${TESTDIR}/gazette.sock export CONSUMER_ADDRESS=unix://localhost${TESTDIR}/consumer.sock export GATEWAY_PORT=28318 -export BUILD_ID=test-build-id +export BUILD_ID='0000000000000000' export CATALOG_SOURCE="test/acmeCo/source-hello-world.flow.yaml" export CATALOG_SOURCE_ARABIC="test/acmeCo/arabic-source-hello-world.flow.yaml" diff --git a/test/acmeCo/arabic-source-hello-world.flow.yaml b/test/acmeCo/arabic-source-hello-world.flow.yaml index 52cef09..a203385 100644 --- a/test/acmeCo/arabic-source-hello-world.flow.yaml +++ b/test/acmeCo/arabic-source-hello-world.flow.yaml @@ -1,8 +1,3 @@ -storageMappings: - "": - stores: - - provider: GCS - bucket: example collections: acmeCo/arabic-greetings: schema: greetings.schema.yaml diff --git a/test/acmeCo/source-hello-world.flow.yaml b/test/acmeCo/source-hello-world.flow.yaml index 64598f7..2fb6515 100644 --- a/test/acmeCo/source-hello-world.flow.yaml +++ b/test/acmeCo/source-hello-world.flow.yaml @@ -1,8 +1,3 @@ -storageMappings: - "": - stores: - - provider: GCS - bucket: example collections: acmeCo/greetings: schema: greetings.schema.yaml From 41f551158a18f61752b74296759c5aebfb157eb1 Mon Sep 17 00:00:00 2001 From: Travis Jenkins Date: Tue, 23 Jul 2024 10:54:24 -0400 Subject: [PATCH 3/4] Do not need to match everything with snapshots --- client/test/shard_client_test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client/test/shard_client_test.ts b/client/test/shard_client_test.ts index 979744b..4407e52 100644 --- a/client/test/shard_client_test.ts +++ b/client/test/shard_client_test.ts @@ -8,13 +8,12 @@ import { ShardClient } from "../src/shard_client.ts"; import { ShardSelector } from "../src/selector.ts"; -snapshotTest("ShardClient.list task selector test", async ({ assertSnapshot }) => { +Deno.test("ShardClient.list task selector test", async () => { const client = new ShardClient(BASE_URL, await makeJwt({})); const taskSelector = new ShardSelector().task("acmeCo/source-hello-world"); const shards = (await client.list(taskSelector)).unwrap(); - assertSnapshot(shards); assertEquals(1, shards.length); assertEquals( "capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000", From ffd85301141922f19c3df899a6a21ae3fc9268d7 Mon Sep 17 00:00:00 2001 From: Phil Date: Fri, 9 Aug 2024 10:59:23 -0400 Subject: [PATCH 4/4] http-client: fix base64 decoding of journal content The grpc gateway returns journal content as a base64-encoded string. Previously, we had been using `atob` to decode this, but that function does not work if the decoded content contains any byte values larger than 0x7f, because of course we can't have nice things in JS. So this adds one of those looks-insane-but-actually-works workarounds to make base64 decoding work properly with arbitrary utf-8 data. --- client/dist/esm/streams.js | 12 +++++++++++- client/dist/script/streams.js | 12 +++++++++++- client/dist/src/streams.ts | 12 +++++++++++- client/src/streams.ts | 12 +++++++++++- 4 files changed, 44 insertions(+), 4 deletions(-) diff --git a/client/dist/esm/streams.js b/client/dist/esm/streams.js index 075f566..543b951 100644 --- a/client/dist/esm/streams.js +++ b/client/dist/esm/streams.js @@ -50,7 +50,17 @@ export function decodeContent() { transform(value, controller) { // Base64 decode the `content` field and send it as a chunk. if (value.content?.length) { - controller.enqueue(atob(value.content)); + // The `atob` function does not work properly if the decoded content contains any byte + // values over 0x7f, because "binary" in JS means that each byte gets represented as a + // UTF-16 code unit, which happens to be <= 0xff. I wish I was making this up: + // https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem + const binary = atob(value.content); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < bytes.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + const text = new TextDecoder().decode(bytes); + controller.enqueue(text); } }, }); diff --git a/client/dist/script/streams.js b/client/dist/script/streams.js index d4c2c70..52c3730 100644 --- a/client/dist/script/streams.js +++ b/client/dist/script/streams.js @@ -56,7 +56,17 @@ function decodeContent() { transform(value, controller) { // Base64 decode the `content` field and send it as a chunk. if (value.content?.length) { - controller.enqueue(atob(value.content)); + // The `atob` function does not work properly if the decoded content contains any byte + // values over 0x7f, because "binary" in JS means that each byte gets represented as a + // UTF-16 code unit, which happens to be <= 0xff. I wish I was making this up: + // https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem + const binary = atob(value.content); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < bytes.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + const text = new TextDecoder().decode(bytes); + controller.enqueue(text); } }, }); diff --git a/client/dist/src/streams.ts b/client/dist/src/streams.ts index 777d832..5a4d289 100644 --- a/client/dist/src/streams.ts +++ b/client/dist/src/streams.ts @@ -54,7 +54,17 @@ export function decodeContent() { transform(value, controller) { // Base64 decode the `content` field and send it as a chunk. if (value.content?.length) { - controller.enqueue(atob(value.content)); + // The `atob` function does not work properly if the decoded content contains any byte + // values over 0x7f, because "binary" in JS means that each byte gets represented as a + // UTF-16 code unit, which happens to be <= 0xff. I wish I was making this up: + // https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem + const binary = atob(value.content); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < bytes.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + const text = new TextDecoder().decode(bytes); + controller.enqueue(text); } }, }); diff --git a/client/src/streams.ts b/client/src/streams.ts index 777d832..5a4d289 100644 --- a/client/src/streams.ts +++ b/client/src/streams.ts @@ -54,7 +54,17 @@ export function decodeContent() { transform(value, controller) { // Base64 decode the `content` field and send it as a chunk. if (value.content?.length) { - controller.enqueue(atob(value.content)); + // The `atob` function does not work properly if the decoded content contains any byte + // values over 0x7f, because "binary" in JS means that each byte gets represented as a + // UTF-16 code unit, which happens to be <= 0xff. I wish I was making this up: + // https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem + const binary = atob(value.content); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < bytes.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + const text = new TextDecoder().decode(bytes); + controller.enqueue(text); } }, });