Skip to content

Commit

Permalink
feat: add integrationdb ksql scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
pgallik committed Nov 30, 2023
1 parent 85536e6 commit ad54735
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 0 deletions.
28 changes: 28 additions & 0 deletions .ksql/INTEGRATIONDB/INTEGRATIONDB_BUILDINGUNIT_OSLO_CONNECTOR.ksql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
CREATE SINK CONNECTOR `BuildingUnitIntegrationDbConnector` with (
"topics"= 'buildingunit.snapshot.oslo.flatten.integrationdb',
"input.data.format"= 'JSON_SR',
"input.key.format"= 'JSON_SR',
"delete.enabled"= false,
"connector.class"= 'PostgresSink',
"name"= 'BuildingUnitIntegrationDbConnector',
"kafka.auth.mode"= 'KAFKA_API_KEY',
"kafka.api.key"= 'RIQXAHXON7TRUNPU', --clear value
"kafka.api.secret"= 'B6s7UW1hbk0kUmjQFi2yEjemFiGHzw/vn8RAfX3YO/fhlcKZFysruig1Gp6M6Yi2', --clear value
"connection.host"= 'vbr-integrationdb-test.postgres.database.azure.com', --clear value
"connection.port"= '5432',
"connection.user"= 'basisregisters', --clear value
"connection.password"= 'Rj7Hnudk02zie4eA2N2jkSeIAEflbQe7e6DwN7Dhe9gDAFR7akvfvLVx2KOXo5Pkjd3BFiALyUkcbjoxI29R1zGT3dWhdJDhu0A', --clear value
"db.name"= 'postgres',
"ssl.mode"= 'require',
"insert.mode"= 'UPSERT',
"table.name.format"= 'Integration.BuildingUnits',
"table.types"= 'TABLE',
"db.timezone"= 'UTC',
"pk.mode"= 'record_key',
"pk.fields"= 'PersistentLocalId',
"auto.create"= false,
"auto.evolve"= false,
"quote.sql.identifiers"= 'ALWAYS',
"batch.sizes"= 3000,
"tasks.max"= 1
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
CREATE OR REPLACE STREAM IF NOT EXISTS BUILDINGUNIT_SNAPSHOT_OSLO_STREAM_FLATTEN_INTEGRATIONDB
WITH (KAFKA_TOPIC='buildingunit.snapshot.oslo.flatten.integrationdb', PARTITIONS=1, VALUE_FORMAT='JSON_SR', KEY_FORMAT='JSON_SR') AS
SELECT
CAST(REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (s,x) => x) as INT) as PersistentLocalId,

CAST(GEBOUW->OBJECTID as INT) as "BuildingPersistentLocalId",
GEBOUWEENHEIDSTATUS as "Status",
FUNCTIE as "Function",
GEBOUWEENHEIDPOSITIE->POSITIEGEOMETRIEMETHODE as "GeometryMethod",
GEBOUWEENHEIDPOSITIE->GEOMETRIE->GML as "GeometryGml",
ARRAY_JOIN(TRANSFORM(adressen, (x) => (x->objectId)), ', ') AS "Addresses",
AFWIJKINGVASTGESTELD as "HasDeviation",

IDENTIFICATOR->ID as "PuriId",
IDENTIFICATOR->NAAMRUIMTE as "Namespace",
IDENTIFICATOR->VERSIEID as "VersionString",
PARSE_TIMESTAMP(IDENTIFICATOR->VERSIEID, 'yyyy-MM-dd''T''HH:mm:ssXXX', 'UTC') as "VersionTimestamp",
CASE WHEN IDENTIFICATOR->ID is null THEN TRUE ELSE FALSE END as "IsRemoved"

FROM BUILDINGUNIT_SNAPSHOT_OSLO_STREAM
PARTITION BY CAST(REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (s,x) => x) as INT);
28 changes: 28 additions & 0 deletions .ksql/INTEGRATIONDB/INTEGRATIONDB_BUILDING_OSLO_CONNECTOR.ksql
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
CREATE SINK CONNECTOR `BuildingIntegrationDbConnector` with (
"topics"= 'building.snapshot.oslo.flatten.integrationdb',
"input.data.format"= 'JSON_SR',
"input.key.format"= 'JSON_SR',
"delete.enabled"= false,
"connector.class"= 'PostgresSink',
"name"= 'BuildingIntegrationDbConnector',
"kafka.auth.mode"= 'KAFKA_API_KEY',
"kafka.api.key"= 'RIQXAHXON7TRUNPU', --clear value
"kafka.api.secret"= 'B6s7UW1hbk0kUmjQFi2yEjemFiGHzw/vn8RAfX3YO/fhlcKZFysruig1Gp6M6Yi2', --clear value
"connection.host"= 'vbr-integrationdb-test.postgres.database.azure.com', --clear value
"connection.port"= '5432',
"connection.user"= 'basisregisters', --clear value
"connection.password"= 'Rj7Hnudk02zie4eA2N2jkSeIAEflbQe7e6DwN7Dhe9gDAFR7akvfvLVx2KOXo5Pkjd3BFiALyUkcbjoxI29R1zGT3dWhdJDhu0A', --clear value
"db.name"= 'postgres',
"ssl.mode"= 'require',
"insert.mode"= 'UPSERT',
"table.name.format"= 'Integration.Buildings',
"table.types"= 'TABLE',
"db.timezone"= 'UTC',
"pk.mode"= 'record_key',
"pk.fields"= 'PersistentLocalId',
"auto.create"= false,
"auto.evolve"= false,
"quote.sql.identifiers"= 'ALWAYS',
"batch.sizes"= 3000,
"tasks.max"= 1
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
CREATE OR REPLACE STREAM IF NOT EXISTS BUILDING_SNAPSHOT_OSLO_STREAM_FLATTEN_INTEGRATIONDB
WITH (KAFKA_TOPIC='building.snapshot.oslo.flatten.integrationdb', PARTITIONS=1, VALUE_FORMAT='JSON_SR', KEY_FORMAT='JSON_SR') AS
SELECT
CAST(REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (s,x) => x) as INT) as PersistentLocalId,
GEBOUWSTATUS as "Status",
GEBOUWPOLYGOON->GEOMETRIEMETHODE as "GeometryMethod",
GEBOUWPOLYGOON->GEOMETRIE->GML as "GeometryGml",

IDENTIFICATOR->ID as "PuriId",
IDENTIFICATOR->NAAMRUIMTE as "Namespace",
IDENTIFICATOR->VERSIEID as "VersionString",
PARSE_TIMESTAMP(IDENTIFICATOR->VERSIEID, 'yyyy-MM-dd''T''HH:mm:ssXXX', 'UTC') as "VersionTimestamp",
CASE WHEN IDENTIFICATOR->ID is null THEN TRUE ELSE FALSE END as "IsRemoved"

FROM BUILDING_SNAPSHOT_OSLO_STREAM
PARTITION BY CAST(REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (s,x) => x) as INT);

0 comments on commit ad54735

Please sign in to comment.