diff --git a/client/dist/esm/streams.js b/client/dist/esm/streams.js index 075f566..543b951 100644 --- a/client/dist/esm/streams.js +++ b/client/dist/esm/streams.js @@ -50,7 +50,17 @@ export function decodeContent() { transform(value, controller) { // Base64 decode the `content` field and send it as a chunk. if (value.content?.length) { - controller.enqueue(atob(value.content)); + // The `atob` function does not work properly if the decoded content contains any byte + // values over 0x7f, because "binary" in JS means that each byte gets represented as a + // UTF-16 code unit, which happens to be <= 0xff. I wish I was making this up: + // https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem + const binary = atob(value.content); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < bytes.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + const text = new TextDecoder().decode(bytes); + controller.enqueue(text); } }, }); diff --git a/client/dist/script/streams.js b/client/dist/script/streams.js index d4c2c70..52c3730 100644 --- a/client/dist/script/streams.js +++ b/client/dist/script/streams.js @@ -56,7 +56,17 @@ function decodeContent() { transform(value, controller) { // Base64 decode the `content` field and send it as a chunk. if (value.content?.length) { - controller.enqueue(atob(value.content)); + // The `atob` function does not work properly if the decoded content contains any byte + // values over 0x7f, because "binary" in JS means that each byte gets represented as a + // UTF-16 code unit, which happens to be <= 0xff. I wish I was making this up: + // https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem + const binary = atob(value.content); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < bytes.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + const text = new TextDecoder().decode(bytes); + controller.enqueue(text); } }, }); diff --git a/client/dist/src/streams.ts b/client/dist/src/streams.ts index 777d832..5a4d289 100644 --- a/client/dist/src/streams.ts +++ b/client/dist/src/streams.ts @@ -54,7 +54,17 @@ export function decodeContent() { transform(value, controller) { // Base64 decode the `content` field and send it as a chunk. if (value.content?.length) { - controller.enqueue(atob(value.content)); + // The `atob` function does not work properly if the decoded content contains any byte + // values over 0x7f, because "binary" in JS means that each byte gets represented as a + // UTF-16 code unit, which happens to be <= 0xff. I wish I was making this up: + // https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem + const binary = atob(value.content); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < bytes.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + const text = new TextDecoder().decode(bytes); + controller.enqueue(text); } }, }); diff --git a/client/src/streams.ts b/client/src/streams.ts index 777d832..5a4d289 100644 --- a/client/src/streams.ts +++ b/client/src/streams.ts @@ -54,7 +54,17 @@ export function decodeContent() { transform(value, controller) { // Base64 decode the `content` field and send it as a chunk. if (value.content?.length) { - controller.enqueue(atob(value.content)); + // The `atob` function does not work properly if the decoded content contains any byte + // values over 0x7f, because "binary" in JS means that each byte gets represented as a + // UTF-16 code unit, which happens to be <= 0xff. I wish I was making this up: + // https://developer.mozilla.org/en-US/docs/Glossary/Base64#the_unicode_problem + const binary = atob(value.content); + const bytes = new Uint8Array(binary.length); + for (let i = 0; i < bytes.length; i++) { + bytes[i] = binary.charCodeAt(i); + } + const text = new TextDecoder().decode(bytes); + controller.enqueue(text); } }, }); diff --git a/client/test/journal_client_test.snap b/client/test/journal_client_test.snap index 902589f..da8e8d5 100644 --- a/client/test/journal_client_test.snap +++ b/client/test/journal_client_test.snap @@ -1,4 +1,57 @@ { + 'JournalClient.list collection selector test': [ + 'acmeCo/greetings/00ffffffffffffff/pivot=00', + ], + 'JournalClient.list exclusion selector test': [ + 'ops.us-central1.v1/logs/00ffffffffffffff/kind=capture/name=acmeCo%2Farabic-source-hello-world/pivot=00', + 'ops.us-central1.v1/logs/00ffffffffffffff/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00', + ], + 'JournalClient.list name selector test': [ + 'acmeCo/greetings/00ffffffffffffff/pivot=00', + ], + 'JournalClient.list prefix selector test': [ + 'ops.us-central1.v1/logs/00ffffffffffffff/kind=capture/name=acmeCo%2Farabic-source-hello-world/pivot=00', + 'ops.us-central1.v1/logs/00ffffffffffffff/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00', + 'ops.us-central1.v1/stats/00ffffffffffffff/kind=capture/name=acmeCo%2Farabic-source-hello-world/pivot=00', + 'ops.us-central1.v1/stats/00ffffffffffffff/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00', + ], + 'JournalClient.read Arabic content test': [ + { + _meta: { + uuid: '[MASKED string]', + }, + message: 'مرحبا #1', + ts: '[MASKED string]', + }, + { + _meta: { + uuid: '[MASKED string]', + }, + message: 'مرحبا #2', + ts: '[MASKED string]', + }, + { + _meta: { + uuid: '[MASKED string]', + }, + message: 'مرحبا #3', + ts: '[MASKED string]', + }, + { + _meta: { + uuid: '[MASKED string]', + }, + message: 'مرحبا #4', + ts: '[MASKED string]', + }, + { + _meta: { + uuid: '[MASKED string]', + }, + message: 'مرحبا #5', + ts: '[MASKED string]', + }, + ], 'JournalClient.read content test': [ { _meta: { @@ -40,7 +93,7 @@ { status: 'OK', fragment: { - journal: 'acmeCo/greetings/pivot=00', + journal: 'acmeCo/greetings/00ffffffffffffff/pivot=00', compressionCodec: 'GZIP', }, }, @@ -51,7 +104,7 @@ { status: 'OK', fragment: { - journal: 'acmeCo/greetings/pivot=00', + journal: 'acmeCo/greetings/00ffffffffffffff/pivot=00', compressionCodec: 'GZIP', }, }, diff --git a/client/test/journal_client_test.ts b/client/test/journal_client_test.ts index 2b63fb8..c655b1a 100644 --- a/client/test/journal_client_test.ts +++ b/client/test/journal_client_test.ts @@ -11,41 +11,33 @@ import { JournalClient, parseJournalDocuments } from "../src/journal_client.ts"; import { JournalSelector } from "../src/selector.ts"; import { readStreamToEnd } from "../src/streams.ts"; -Deno.test("JournalClient.list collection selector test", async () => { +snapshotTest("JournalClient.list collection selector test", async ({assertSnapshot}) => { const client = new JournalClient(BASE_URL, await makeJwt({})); const name = new JournalSelector().collection("acmeCo/greetings"); const journals = (await client.list(name)).unwrap(); - assertEquals(1, journals.length); - assertEquals("acmeCo/greetings/pivot=00", journals[0].name); + assertSnapshot(journals.map((j)=>j.name)); }); -Deno.test("JournalClient.list name selector test", async () => { +snapshotTest("JournalClient.list name selector test", async ({assertSnapshot}) => { const client = new JournalClient(BASE_URL, await makeJwt({})); - const name = new JournalSelector().name("acmeCo/greetings/pivot=00"); + const name = new JournalSelector().name("acmeCo/greetings/00ffffffffffffff/pivot=00"); const journals = (await client.list(name)).unwrap(); - assertEquals(1, journals.length); - assertEquals("acmeCo/greetings/pivot=00", journals[0].name); + assertSnapshot(journals.map((j)=>j.name)); }); -Deno.test("JournalClient.list prefix selector test", async () => { +snapshotTest("JournalClient.list prefix selector test", async ({assertSnapshot}) => { const client = new JournalClient(BASE_URL, await makeJwt({})); - const expectedJournals = [ - "ops.us-central1.v1/logs/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00", - "ops.us-central1.v1/stats/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00", - ]; - const prefixSelector = JournalSelector.prefix("ops.us-central1.v1/"); const journals = (await client.list(prefixSelector)).unwrap(); - assertEquals(2, journals.length); - assertEquals(expectedJournals, journals.map((j) => j.name).sort()); + assertSnapshot(journals.map((j)=>j.name)); }); -Deno.test("JournalClient.list exclusion selector test", async () => { +snapshotTest("JournalClient.list exclusion selector test", async ({assertSnapshot}) => { const client = new JournalClient(BASE_URL, await makeJwt({})); const prefixSelector = JournalSelector.prefix("ops.us-central1.v1/"); @@ -53,11 +45,7 @@ Deno.test("JournalClient.list exclusion selector test", async () => { const journals = (await client.list(prefixSelector, excludedSelector)) .unwrap(); - assertEquals(1, journals.length); - assertEquals( - "ops.us-central1.v1/logs/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00", - journals[0].name, - ); + assertSnapshot(journals.map((j)=>j.name)); }); Deno.test("JournalClient.list wrong signing key", async () => { @@ -101,7 +89,7 @@ Deno.test("JournalClient.list unauthorized prefix", async () => { snapshotTest("JournalClient.read test", async ({ assertSnapshot }) => { const client = new JournalClient(BASE_URL, await makeJwt({})); - const req = { journal: "acmeCo/greetings/pivot=00", endOffset: "1024" }; + const req = { journal: "acmeCo/greetings/00ffffffffffffff/pivot=00", endOffset: "1024" }; const stream = (await client.read(req)).map_err(console.error).unwrap(); const results: Array = await readStreamToEnd( stream, @@ -127,7 +115,25 @@ snapshotTest("JournalClient.read content test", async ({ assertSnapshot }) => { const client = new JournalClient(BASE_URL, await makeJwt({})); const req = { - journal: "acmeCo/greetings/pivot=00", + journal: "acmeCo/greetings/00ffffffffffffff/pivot=00", + offset: "10", + endOffset: "1024", + }; + const stream = (await client.read(req)).unwrap(); + const docStream = parseJournalDocuments(stream!); + const results = await readStreamToEnd(docStream); + + let filtered_results = results.filter(r=>!(r._meta as any).ack).slice(0,5) + + const masks = ["/*/_meta/uuid", "/*/ts"]; + assertSnapshot(filtered_results, masks); +}); + +snapshotTest("JournalClient.read Arabic content test", async ({ assertSnapshot }) => { + const client = new JournalClient(BASE_URL, await makeJwt({})); + + const req = { + journal: "acmeCo/arabic-greetings/00ffffffffffffff/pivot=00", offset: "10", endOffset: "1024", }; diff --git a/client/test/shard_client_test.snap b/client/test/shard_client_test.snap index d63a541..9ed6067 100644 --- a/client/test/shard_client_test.snap +++ b/client/test/shard_client_test.snap @@ -1,4 +1,70 @@ { + 'ShardClient.list task selector test': [ + { + spec: { + id: 'capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000', + sources: [], + recoveryLogPrefix: 'recovery', + hintPrefix: '/estuary/flow/hints', + hintBackups: 2, + maxTxnDuration: '1s', + minTxnDuration: '0s', + disable: false, + hotStandbys: 0, + labels: { + labels: [ + { + name: 'app.gazette.dev/managed-by', + value: 'estuary.dev/flow', + }, + { + name: 'estuary.dev/build', + value: '0000000000000000', + }, + { + name: 'estuary.dev/key-begin', + value: '00000000', + }, + { + name: 'estuary.dev/key-end', + value: 'ffffffff', + }, + { + name: 'estuary.dev/log-level', + value: 'info', + }, + { + name: 'estuary.dev/rclock-begin', + value: '00000000', + }, + { + name: 'estuary.dev/rclock-end', + value: 'ffffffff', + }, + { + name: 'estuary.dev/task-name', + value: 'acmeCo/source-hello-world', + }, + { + name: 'estuary.dev/task-type', + value: 'capture', + }, + ], + }, + disableWaitForAck: false, + ringBufferSize: 65536, + readChannelSize: 4096, + }, + status: [ + { + code: 'FAILED', + errors: [ + 'runTransactions: txnStartCommit: store.StartCommit: failed to write atomic RocksDB commit\n\nCaused by:\n IO error: No such file or directory: While open a file for appending: /home/travis/code/data-plane-gateway/test/tmp/capture_acmeCo_source-hello-world_00ffffffffffffff_00000000-00000000-1644699521/000008.log: No such file or directory', + ], + }, + ], + }, + ], 'ShardClient.stat test': { status: 'OK', readThrough: { @@ -6,8 +72,8 @@ 'acmeCo/source-hello-world/eof': 'who cares', }, publishAt: { - 'acmeCo/greetings/pivot=00': '[MASKED string]', - 'ops.us-central1.v1/stats/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00': '[MASKED string]', + 'acmeCo/greetings/00ffffffffffffff/pivot=00': '[MASKED string]', + 'ops.us-central1.v1/stats/00ffffffffffffff/kind=capture/name=acmeCo%2Fsource-hello-world/pivot=00': '[MASKED string]', }, }, } \ No newline at end of file diff --git a/client/test/shard_client_test.ts b/client/test/shard_client_test.ts index efc1a9c..4407e52 100644 --- a/client/test/shard_client_test.ts +++ b/client/test/shard_client_test.ts @@ -16,7 +16,7 @@ Deno.test("ShardClient.list task selector test", async () => { assertEquals(1, shards.length); assertEquals( - "capture/acmeCo/source-hello-world/00000000-00000000", + "capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000", shards[0].spec.id, ); assertEquals("PRIMARY", shards[0].status[0].code); @@ -25,7 +25,7 @@ Deno.test("ShardClient.list task selector test", async () => { Deno.test("ShardClient.list bare id selector test", async () => { const client = new ShardClient(BASE_URL, await makeJwt({})); const idSelector = new ShardSelector().id( - "capture/acmeCo/source-hello-world/00000000-00000000", + "capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000", ); const error = (await client.list(idSelector)).unwrap_err(); @@ -39,13 +39,13 @@ Deno.test("ShardClient.list compound id selector test", async () => { .task("acmeCo/yet-another-task") .task("acmeCo/source-hello-world") .task("acmeCo/verifies-label-sorting") - .id("capture/acmeCo/source-hello-world/00000000-00000000"); + .id("capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000"); const shards = (await client.list(idSelector)).unwrap(); assertEquals(1, shards.length); assertEquals( - "capture/acmeCo/source-hello-world/00000000-00000000", + "capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000", shards[0].spec.id, ); }); @@ -54,7 +54,7 @@ snapshotTest("ShardClient.stat test", async ({ assertSnapshot }) => { const client = new ShardClient(BASE_URL, await makeJwt({prefixes: ["capture/acmeCo/"]})); const stats = (await client.stat( - "capture/acmeCo/source-hello-world/00000000-00000000", + "capture/acmeCo/source-hello-world/00ffffffffffffff/00000000-00000000", {}, )).unwrap(); @@ -74,8 +74,8 @@ snapshotTest("ShardClient.stat test", async ({ assertSnapshot }) => { const masks = [ "/readThrough/acmeCo\/source-hello-world\/txn", - "/publishAt/acmeCo\/greetings\/pivot=00", - "/publishAt/ops.us-central1.v1\/stats\/kind=capture\/name=acmeCo%2Fsource-hello-world\/pivot=00", + "/publishAt/acmeCo\/greetings\/00ffffffffffffff\/pivot=00", + "/publishAt/ops.us-central1.v1\/stats\/00ffffffffffffff\/kind=capture\/name=acmeCo%2Fsource-hello-world\/pivot=00", ]; assertSnapshot(pluck(stats), masks); }); diff --git a/test.sh b/test.sh index 425b4dc..a8bde62 100755 --- a/test.sh +++ b/test.sh @@ -61,8 +61,9 @@ export BROKER_ADDRESS=unix://localhost${TESTDIR}/gazette.sock export CONSUMER_ADDRESS=unix://localhost${TESTDIR}/consumer.sock export GATEWAY_PORT=28318 -export BUILD_ID=test-build-id +export BUILD_ID='0000000000000000' export CATALOG_SOURCE="test/acmeCo/source-hello-world.flow.yaml" +export CATALOG_SOURCE_ARABIC="test/acmeCo/arabic-source-hello-world.flow.yaml" # This is needed in order for docker run commands to work on ARM macs. export DOCKER_DEFAULT_PLATFORM="linux/amd64" @@ -117,6 +118,12 @@ ${FLOW_BIN} api build \ --build-id=${BUILD_ID} \ --source=${CATALOG_SOURCE} || bail "Build failed." +# Build the catalog for some minor multi language testing +${FLOW_BIN} api build \ + --build-db=${TESTDIR}/builds/${BUILD_ID} \ + --build-id=${BUILD_ID} \ + --source=${CATALOG_SOURCE_ARABIC} || bail "Build failed." + log "Build finished" # Activate the catalog. diff --git a/test/acmeCo/arabic-source-hello-world.flow.yaml b/test/acmeCo/arabic-source-hello-world.flow.yaml new file mode 100644 index 0000000..a203385 --- /dev/null +++ b/test/acmeCo/arabic-source-hello-world.flow.yaml @@ -0,0 +1,15 @@ +collections: + acmeCo/arabic-greetings: + schema: greetings.schema.yaml + key: [/ts] +captures: + acmeCo/arabic-source-hello-world: + endpoint: + connector: + image: ghcr.io/estuary/source-hello-world:dev + config: source-hello-world.config.yaml + bindings: + - resource: + name: greetings + prefix: 'مرحبا #{}' + target: acmeCo/arabic-greetings diff --git a/test/acmeCo/source-hello-world.flow.yaml b/test/acmeCo/source-hello-world.flow.yaml index 64598f7..2fb6515 100644 --- a/test/acmeCo/source-hello-world.flow.yaml +++ b/test/acmeCo/source-hello-world.flow.yaml @@ -1,8 +1,3 @@ -storageMappings: - "": - stores: - - provider: GCS - bucket: example collections: acmeCo/greetings: schema: greetings.schema.yaml