Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/materialize cratedb #2452

Merged
merged 24 commits into from
Feb 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
498661a
First draft of CrateDB connector
surister Jan 30, 2025
36b432b
Set JSON and MULTIPLE type to OBJECT
surister Jan 31, 2025
f2012b5
Set DATE and TIMESTAMPZ types as TIMESTAMP WITH TIME ZONE
surister Jan 31, 2025
8f53480
Set type for sql.ARRAY from JSON to ARRAY(TEXT)
surister Jan 31, 2025
cc4b07e
Set default schema as "doc"
surister Jan 31, 2025
0763f63
Remove space from test tables names, e.g. 'Multiple Types' -> 'Multip…
surister Jan 31, 2025
8251bc8
Remove 'COMMENT ON' statements from template definition. (Will this c…
surister Jan 31, 2025
0ab1c67
Early return to avoid db.PingContext killing the connection with 'bad…
surister Jan 31, 2025
1c196c9
Set sql.Integer to INTEGER
surister Feb 6, 2025
01e3f9d
Set UUID dtype to TEXT
surister Feb 6, 2025
42ac1dc
set sql.STRING_INTEGER to DECIMAL
surister Feb 6, 2025
4f5f672
set sql.STRING_INTEGER and sql.STRING_NUMBER to NUMERIC(20, 0)
surister Feb 12, 2025
bfb7dff
use `jsonConverter` for sql.ARRAY
surister Feb 12, 2025
4b45780
set `JSON` to `OBJECT` in sql.Dialect MigratableTypes
surister Feb 12, 2025
fe285d4
set time dtype to `TEXT`
surister Feb 12, 2025
b686325
set uuid dtype as `TEXT`
surister Feb 12, 2025
de1cc3b
Remove the `TEMPORARY` from crateLoadTable definition
surister Feb 12, 2025
f02a3f5
Remove `DO BEGIN` and `IF NOT FOUND` statements from `updateFence`
surister Feb 12, 2025
6b8fe69
Directly use b.input for createLoadTable
surister Feb 24, 2025
080cc23
Rename pgDialect to crateDialect
surister Feb 26, 2025
6e06838
Create temp tables from b.target instead of loadTableColumns
surister Feb 26, 2025
1011e32
Disable NOT NULL column creation and ALTER COLUMN DROP NOT NULL
surister Feb 26, 2025
336f7cc
Disable CreateSchema
surister Feb 26, 2025
ea9afaf
Add materialize-cratedb to CI build matrix
surister Feb 27, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ jobs:
- materialize-webhook
- materialize-sqlserver
- materialize-slack
- materialize-cratedb
include:
- connector: source-criteo
connector_type: capture
Expand Down
2 changes: 2 additions & 0 deletions materialize-cratedb/.snapshots/TestConfigURI-Basic
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
postgres://will:secret1234@example.com:5432/somedb
config valid
2 changes: 2 additions & 0 deletions materialize-cratedb/.snapshots/TestConfigURI-IncorrectSSL
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
postgres://will:secret1234@example.com:5432/somedb?sslmode=whoops-this-isnt-right
invalid 'sslmode' configuration: unknown setting "whoops-this-isnt-right"
2 changes: 2 additions & 0 deletions materialize-cratedb/.snapshots/TestConfigURI-RequireSSL
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
postgres://will:secret1234@example.com:5432/somedb?sslmode=verify-full
config valid
274 changes: 274 additions & 0 deletions materialize-cratedb/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
--- Begin key_value createTargetTable ---

CREATE TABLE IF NOT EXISTS key_value (
key1 BIGINT NOT NULL,
key2 BOOLEAN NOT NULL,
"key!binary" TEXT NOT NULL,
"array" JSON,
"binary" TEXT,
"boolean" BOOLEAN,
flow_published_at TIMESTAMPTZ NOT NULL,
"integer" BIGINT,
"integerGt64Bit" NUMERIC,
"integerWithUserDDL" DECIMAL(20),
multiple JSON,
number DOUBLE PRECISION,
"numberCastToString" TEXT,
object JSON,
string TEXT,
"stringInteger" NUMERIC,
"stringInteger39Chars" NUMERIC,
"stringInteger66Chars" NUMERIC,
"stringNumber" DECIMAL,
flow_document JSON NOT NULL,

PRIMARY KEY (key1, key2, "key!binary")
);

COMMENT ON TABLE key_value IS 'Generated for materialization test/sqlite of collection key/value';
COMMENT ON COLUMN key_value.key1 IS 'auto-generated projection of JSON at: /key1 with inferred types: [integer]';
COMMENT ON COLUMN key_value.key2 IS 'auto-generated projection of JSON at: /key2 with inferred types: [boolean]';
COMMENT ON COLUMN key_value."key!binary" IS 'auto-generated projection of JSON at: /key!binary with inferred types: [string]';
COMMENT ON COLUMN key_value."array" IS 'auto-generated projection of JSON at: /array with inferred types: [array]';
COMMENT ON COLUMN key_value."binary" IS 'auto-generated projection of JSON at: /binary with inferred types: [string]';
COMMENT ON COLUMN key_value."boolean" IS 'auto-generated projection of JSON at: /boolean with inferred types: [boolean]';
COMMENT ON COLUMN key_value.flow_published_at IS 'Flow Publication Time
Flow publication date-time of this document
auto-generated projection of JSON at: /_meta/uuid with inferred types: [string]';
COMMENT ON COLUMN key_value."integer" IS 'auto-generated projection of JSON at: /integer with inferred types: [integer]';
COMMENT ON COLUMN key_value."integerGt64Bit" IS 'auto-generated projection of JSON at: /integerGt64Bit with inferred types: [integer]';
COMMENT ON COLUMN key_value."integerWithUserDDL" IS 'auto-generated projection of JSON at: /integerWithUserDDL with inferred types: [integer]';
COMMENT ON COLUMN key_value.multiple IS 'auto-generated projection of JSON at: /multiple with inferred types: [boolean integer object]';
COMMENT ON COLUMN key_value.number IS 'auto-generated projection of JSON at: /number with inferred types: [number]';
COMMENT ON COLUMN key_value."numberCastToString" IS 'auto-generated projection of JSON at: /numberCastToString with inferred types: [number string]';
COMMENT ON COLUMN key_value.object IS 'auto-generated projection of JSON at: /object with inferred types: [object]';
COMMENT ON COLUMN key_value.string IS 'auto-generated projection of JSON at: /string with inferred types: [string]';
COMMENT ON COLUMN key_value."stringInteger" IS 'auto-generated projection of JSON at: /stringInteger with inferred types: [integer string]';
COMMENT ON COLUMN key_value."stringInteger39Chars" IS 'auto-generated projection of JSON at: /stringInteger39Chars with inferred types: [integer string]';
COMMENT ON COLUMN key_value."stringInteger66Chars" IS 'auto-generated projection of JSON at: /stringInteger66Chars with inferred types: [integer string]';
COMMENT ON COLUMN key_value."stringNumber" IS 'auto-generated projection of JSON at: /stringNumber with inferred types: [number string]';
COMMENT ON COLUMN key_value.flow_document IS 'auto-generated projection of JSON at: with inferred types: [object]';
--- End key_value createTargetTable ---

--- Begin delta_updates createTargetTable ---

CREATE TABLE IF NOT EXISTS delta_updates (
"theKey" TEXT NOT NULL,
"aValue" BIGINT,
flow_published_at TIMESTAMPTZ NOT NULL
);

COMMENT ON TABLE delta_updates IS 'Generated for materialization test/sqlite of collection delta/updates';
COMMENT ON COLUMN delta_updates."theKey" IS 'auto-generated projection of JSON at: /theKey with inferred types: [string]';
COMMENT ON COLUMN delta_updates."aValue" IS 'A super-awesome value.
auto-generated projection of JSON at: /aValue with inferred types: [integer]';
COMMENT ON COLUMN delta_updates.flow_published_at IS 'Flow Publication Time
Flow publication date-time of this document
auto-generated projection of JSON at: /_meta/uuid with inferred types: [string]';
--- End delta_updates createTargetTable ---

--- Begin key_value createLoadTable ---

CREATE TEMPORARY TABLE flow_temp_table_0 (
key1 BIGINT NOT NULL,
key2 BOOLEAN NOT NULL,
"key!binary" TEXT NOT NULL
) ON COMMIT DELETE ROWS;
--- End key_value createLoadTable ---

--- Begin delta_updates createLoadTable ---

CREATE TEMPORARY TABLE flow_temp_table_1 (
"theKey" TEXT NOT NULL
) ON COMMIT DELETE ROWS;
--- End delta_updates createLoadTable ---

--- Begin key_value loadInsert ---

INSERT INTO flow_temp_table_0 (key1, key2, "key!binary")
VALUES ($1, $2, $3);
--- End key_value loadInsert ---

--- Begin delta_updates loadInsert ---

INSERT INTO flow_temp_table_1 ("theKey")
VALUES ($1);
--- End delta_updates loadInsert ---

--- Begin key_value loadQuery ---

SELECT 0, r.flow_document
FROM flow_temp_table_0 AS l
JOIN key_value AS r
ON l.key1 = r.key1
AND l.key2 = r.key2
AND l."key!binary" = r."key!binary"

--- End key_value loadQuery ---

--- Begin delta_updates loadQuery ---

SELECT * FROM (SELECT -1, CAST(NULL AS JSON) LIMIT 0) as nodoc

--- End delta_updates loadQuery ---

--- Begin key_value storeInsert ---

INSERT INTO key_value (
key1,
key2,
"key!binary",
"array",
"binary",
"boolean",
flow_published_at,
"integer",
"integerGt64Bit",
"integerWithUserDDL",
multiple,
number,
"numberCastToString",
object,
string,
"stringInteger",
"stringInteger39Chars",
"stringInteger66Chars",
"stringNumber",
flow_document
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20);
--- End key_value storeInsert ---

--- Begin delta_updates storeInsert ---

INSERT INTO delta_updates (
"theKey",
"aValue",
flow_published_at
) VALUES ($1, $2, $3);
--- End delta_updates storeInsert ---

--- Begin key_value storeUpdate ---

UPDATE key_value SET
"array" = $4,
"binary" = $5,
"boolean" = $6,
flow_published_at = $7,
"integer" = $8,
"integerGt64Bit" = $9,
"integerWithUserDDL" = $10,
multiple = $11,
number = $12,
"numberCastToString" = $13,
object = $14,
string = $15,
"stringInteger" = $16,
"stringInteger39Chars" = $17,
"stringInteger66Chars" = $18,
"stringNumber" = $19,
flow_document = $20
WHERE key1 = $1
AND key2 = $2
AND "key!binary" = $3;
--- End key_value storeUpdate ---

--- Begin delta_updates storeUpdate ---

UPDATE delta_updates SET
"aValue" = $2,
flow_published_at = $3
WHERE "theKey" = $1;
--- End delta_updates storeUpdate ---

--- Begin key_value deleteQuery ---

DELETE FROM key_value WHERE
key1 = $1 AND key2 = $2 AND "key!binary" = $3;
--- End key_value deleteQuery ---

--- Begin delta_updates deleteQuery ---

DELETE FROM delta_updates WHERE
"theKey" = $1;
--- End delta_updates deleteQuery ---

--- Begin alter table add columns and drop not nulls ---

ALTER TABLE key_value
ADD COLUMN first_new_column STRING,
ADD COLUMN second_new_column BOOL,
ALTER COLUMN first_required_column DROP NOT NULL,
ALTER COLUMN second_required_column DROP NOT NULL;
--- End alter table add columns and drop not nulls ---

--- Begin alter table add columns ---

ALTER TABLE key_value
ADD COLUMN first_new_column STRING,
ADD COLUMN second_new_column BOOL;
--- End alter table add columns ---

--- Begin alter table drop not nulls ---

ALTER TABLE key_value
ALTER COLUMN first_required_column DROP NOT NULL,
ALTER COLUMN second_required_column DROP NOT NULL;
--- End alter table drop not nulls ---

--- Begin Fence Install ---

with
-- Increment the fence value of _any_ checkpoint which overlaps our key range.
update_covered as (
update path."to".checkpoints
set fence = fence + 1
where materialization = 'some/Materialization'
and key_end >= 1122867
and key_begin <= 4293844428
returning *
),
-- Read the checkpoint with the narrowest [key_begin, key_end] which fully overlaps our range.
best_match as (
select materialization, key_begin, key_end, fence, checkpoint from update_covered
where materialization = 'some/Materialization'
and key_begin <= 1122867
and key_end >= 4293844428
order by key_end - key_begin asc
limit 1
),
-- Install a new checkpoint if best_match is not an exact match.
install_new as (
insert into path."to".checkpoints (materialization, key_begin, key_end, fence, checkpoint)
-- Case: best_match is a non-empty covering span but not an exact match
select 'some/Materialization', 1122867, 4293844428, fence, checkpoint
from best_match where key_begin != 1122867 or key_end != 4293844428
union all
-- Case: best_match is empty
select 'some/Materialization', 1122867, 4293844428, 123, 'AAECAwQFBgcICQ=='
where (select count(*) from best_match) = 0
returning *
)
select fence, decode(checkpoint, 'base64') from install_new
union all
select fence, decode(checkpoint, 'base64') from best_match
limit 1
;
--- End Fence Install ---

--- Begin Fence Update ---
DO $$
BEGIN
UPDATE path."to".checkpoints
SET checkpoint = 'AAECAwQFBgcICQ=='
WHERE materialization = 'some/Materialization'
AND key_begin = 1122867
AND key_end = 4293844428
AND fence = 123;

IF NOT FOUND THEN
RAISE 'This instance was fenced off by another';
END IF;
END $$;
--- End Fence Update ---


Loading
Loading