From 04ab441dcacb6edb31818550dae0a3aa5e2d62ba Mon Sep 17 00:00:00 2001 From: pgallik Date: Thu, 30 Nov 2023 11:25:06 +0100 Subject: [PATCH] feat: add integrationdb ksql scripts --- ...CIPALITY_SNAPSHOT_OSLO_STREAM_FLATTEN.ksql | 2 +- ...GRATIONDB_MUNICIPALITY.OSLO_CONNECTOR.ksql | 28 +++++++++++++++++ ...CIPALITY_SNAPSHOT_OSLO_STREAM_FLATTEN.ksql | 30 +++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) create mode 100644 .ksql/INTEGRATIONDB/INTEGRATIONDB_MUNICIPALITY.OSLO_CONNECTOR.ksql create mode 100644 .ksql/INTEGRATIONDB/INTEGRATIONDB_MUNICIPALITY_SNAPSHOT_OSLO_STREAM_FLATTEN.ksql diff --git a/.ksql/GEOLOCATION_0/GEOLOCATION_01_MUNICIPALITY_SNAPSHOT_OSLO_STREAM_FLATTEN.ksql b/.ksql/GEOLOCATION_0/GEOLOCATION_01_MUNICIPALITY_SNAPSHOT_OSLO_STREAM_FLATTEN.ksql index 5f6b42d7..d7a75af2 100644 --- a/.ksql/GEOLOCATION_0/GEOLOCATION_01_MUNICIPALITY_SNAPSHOT_OSLO_STREAM_FLATTEN.ksql +++ b/.ksql/GEOLOCATION_0/GEOLOCATION_01_MUNICIPALITY_SNAPSHOT_OSLO_STREAM_FLATTEN.ksql @@ -1,5 +1,5 @@ CREATE OR REPLACE STREAM IF NOT EXISTS MUNICIPALITY_SNAPSHOT_OSLO_STREAM_FLATTEN_GEOLOCATION -WITH (KAFKA_TOPIC='municipality.snapshot.oslo.flatten.geolocation', PARTITIONS=1, VALUE_FORMAT='JSON_SR', KEY_FORMAT='JSON_SR') +WITH (KAFKA_TOPIC='municipality.snapshot.oslo.flatten.geolocation', PARTITIONS=1, VALUE_FORMAT='JSON_SR', KEY_FORMAT='JSON_SR') AS SELECT CAST(REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (s,x) => x) AS INTEGER) msgkey, IDENTIFICATOR->ID IDENTIFICATOR_ID, diff --git a/.ksql/INTEGRATIONDB/INTEGRATIONDB_MUNICIPALITY.OSLO_CONNECTOR.ksql b/.ksql/INTEGRATIONDB/INTEGRATIONDB_MUNICIPALITY.OSLO_CONNECTOR.ksql new file mode 100644 index 00000000..0fb1beb2 --- /dev/null +++ b/.ksql/INTEGRATIONDB/INTEGRATIONDB_MUNICIPALITY.OSLO_CONNECTOR.ksql @@ -0,0 +1,28 @@ +CREATE SINK CONNECTOR `MunicipalityIntegrationDbConnector` with ( + "topics"= 'municipality.snapshot.oslo.flatten.integrationdb', + "input.data.format"= 'JSON_SR', + "input.key.format"= 'JSON_SR', + "delete.enabled"= false, + "connector.class"= 'PostgresSink', + "name"= 'MunicipalityIntegrationDbConnector', + "kafka.auth.mode"= 'KAFKA_API_KEY', + "kafka.api.key"= '***', --clear value + "kafka.api.secret"= '***', --clear value + "connection.host"= '***', --clear value + "connection.port"= '5432', + "connection.user"= '***', --clear value + "connection.password"= '***', --clear value + "db.name"= 'postgres', + "ssl.mode"= 'require', + "insert.mode"= 'UPSERT', + "table.name.format"= 'Integration.Municipalities', + "table.types"= 'TABLE', + "db.timezone"= 'UTC', + "pk.mode"= 'record_key', + "pk.fields"= 'NisCode', + "auto.create"= false, + "auto.evolve"= false, + "quote.sql.identifiers"= 'ALWAYS', + "batch.sizes"= 3000, + "tasks.max"= 1 + ); \ No newline at end of file diff --git a/.ksql/INTEGRATIONDB/INTEGRATIONDB_MUNICIPALITY_SNAPSHOT_OSLO_STREAM_FLATTEN.ksql b/.ksql/INTEGRATIONDB/INTEGRATIONDB_MUNICIPALITY_SNAPSHOT_OSLO_STREAM_FLATTEN.ksql new file mode 100644 index 00000000..374f74a0 --- /dev/null +++ b/.ksql/INTEGRATIONDB/INTEGRATIONDB_MUNICIPALITY_SNAPSHOT_OSLO_STREAM_FLATTEN.ksql @@ -0,0 +1,30 @@ +CREATE OR REPLACE STREAM IF NOT EXISTS MUNICIPALITY_SNAPSHOT_OSLO_STREAM_FLATTEN_INTEGRATIONDB +WITH (KAFKA_TOPIC='municipality.snapshot.oslo.flatten.integrationdb', PARTITIONS=1, VALUE_FORMAT='JSON_SR', KEY_FORMAT='JSON_SR') +AS SELECT + CAST(REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (s,x) => x) AS INTEGER) NisCode, + + GEMEENTESTATUS as "Status", + + CASE WHEN FILTER(OFFICIELETALEN, (TAAL) => (TAAL = 'nl'))[1] is not null THEN TRUE ELSE FALSE END as "OfficialLanguageDutch", + CASE WHEN FILTER(OFFICIELETALEN, (TAAL) => (TAAL = 'fr'))[1] is not null THEN TRUE ELSE FALSE END as "OfficialLanguageFrench", + CASE WHEN FILTER(OFFICIELETALEN, (TAAL) => (TAAL = 'de'))[1] is not null THEN TRUE ELSE FALSE END as "OfficialLanguageGerman", + CASE WHEN FILTER(OFFICIELETALEN, (TAAL) => (TAAL = 'en'))[1] is not null THEN TRUE ELSE FALSE END as "OfficialLanguageEnglish", + + CASE WHEN FILTER(FACILITEITENTALEN, (TAAL) => (TAAL = 'nl'))[1] is not null THEN TRUE ELSE FALSE END as "FacilityLanguageDutch", + CASE WHEN FILTER(FACILITEITENTALEN, (TAAL) => (TAAL = 'fr'))[1] is not null THEN TRUE ELSE FALSE END as "FacilityLanguageFrench", + CASE WHEN FILTER(FACILITEITENTALEN, (TAAL) => (TAAL = 'de'))[1] is not null THEN TRUE ELSE FALSE END as "FacilityLanguageGerman", + CASE WHEN FILTER(FACILITEITENTALEN, (TAAL) => (TAAL = 'en'))[1] is not null THEN TRUE ELSE FALSE END as "FacilityLanguageEnglish", + + FILTER(GEMEENTENAMEN, (X) => (X->TAAL = 'nl'))[1]->SPELLING as "NameDutch", + FILTER(GEMEENTENAMEN, (X) => (X->TAAL = 'fr'))[1]->SPELLING as "NameFrench", + FILTER(GEMEENTENAMEN, (X) => (X->TAAL = 'de'))[1]->SPELLING as "NameGerman", + FILTER(GEMEENTENAMEN, (X) => (X->TAAL = 'en'))[1]->SPELLING as "NameEnglish", + + IDENTIFICATOR->ID as "PuriId", + IDENTIFICATOR->NAAMRUIMTE as "Namespace", + IDENTIFICATOR->VERSIEID as "VersionString", + PARSE_TIMESTAMP(IDENTIFICATOR->VERSIEID, 'yyyy-MM-dd''T''HH:mm:ssXXX', 'UTC') as "VersionTimestamp", + CASE WHEN IDENTIFICATOR->ID is null THEN TRUE ELSE FALSE END as "IsRemoved" + +FROM MUNICIPALITY_SNAPSHOT_OSLO_STREAM +PARTITION BY CAST(REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (s,x) => x) AS INTEGER);