From f3a6dcd6b27f35bcd330ec31aa1bdea6e50d03ef Mon Sep 17 00:00:00 2001 From: Adam Strickland Date: Mon, 24 Feb 2025 19:49:28 -0500 Subject: [PATCH] [NOJIRA] chore: merge from upstream (#2) * chore(deps): bump testcontainersVersion from 1.20.1 to 1.20.2 Bumps `testcontainersVersion` from 1.20.1 to 1.20.2. Updates `org.testcontainers:junit-jupiter` from 1.20.1 to 1.20.2 - [Release notes](https://github.com/testcontainers/testcontainers-java/releases) - [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md) - [Commits](https://github.com/testcontainers/testcontainers-java/compare/1.20.1...1.20.2) Updates `org.testcontainers:kafka` from 1.20.1 to 1.20.2 - [Release notes](https://github.com/testcontainers/testcontainers-java/releases) - [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md) - [Commits](https://github.com/testcontainers/testcontainers-java/compare/1.20.1...1.20.2) Updates `org.testcontainers:testcontainers` from 1.20.1 to 1.20.2 - [Release notes](https://github.com/testcontainers/testcontainers-java/releases) - [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md) - [Commits](https://github.com/testcontainers/testcontainers-java/compare/1.20.1...1.20.2) Updates `org.testcontainers:postgresql` from 1.20.1 to 1.20.2 - [Release notes](https://github.com/testcontainers/testcontainers-java/releases) - [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md) - [Commits](https://github.com/testcontainers/testcontainers-java/compare/1.20.1...1.20.2) Updates `org.testcontainers:oracle-free` from 1.20.1 to 1.20.2 - [Release notes](https://github.com/testcontainers/testcontainers-java/releases) - [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md) - [Commits](https://github.com/testcontainers/testcontainers-java/compare/1.20.1...1.20.2) --- updated-dependencies: - dependency-name: org.testcontainers:junit-jupiter dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: org.testcontainers:kafka dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: org.testcontainers:testcontainers dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: org.testcontainers:postgresql dependency-type: direct:production update-type: version-update:semver-patch - dependency-name: org.testcontainers:oracle-free dependency-type: direct:production update-type: version-update:semver-patch ... Signed-off-by: dependabot[bot] * fix: re-allow empty `pk.fields` for some `pk.mode` Recently added validations (see PR #335) are preventing the support of schema evolution use-cases, e.g. Documentation: ``` If 'pk.mode' is 'record_key' and 'pk.fields' is empty, then all fields from the key struct will be used. ``` Configuration: ``` "pk.mode": "record_key", "auto.create": "true", "auto.evolve": "true", ``` Validation rule: ``` Primary key fields must be set when pkMode is 'record_key'. ``` The purpose of this PR is to re-enable this feature for both `record_key` and `record_value` modes. --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Anatoly Popov Co-authored-by: Julien Clarysse Co-authored-by: Jonas Keeling --- build.gradle.kts | 2 +- .../connect/jdbc/sink/JdbcSinkConfig.java | 11 ++---- .../connect/jdbc/sink/JdbcSinkConfigTest.java | 34 ------------------- .../jdbc/source/JdbcSourceTaskUpdateTest.java | 4 ++- 4 files changed, 6 insertions(+), 45 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index e034295a..fa0522be 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -127,7 +127,7 @@ val hamcrestVersion = "2.2" val jacksonVersion = "2.17.2" // This Jackson is used in the tests. val jupiterVersion = "5.11.0" val servletVersion = "4.0.1" -val testcontainersVersion = "1.20.1" +val testcontainersVersion = "1.20.2" val awaitilityVersion = "4.2.1" val log4jVersion = "2.20.0" diff --git a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java index 03371b1c..02042670 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java @@ -510,8 +510,9 @@ public static void validatePKModeAgainstPKFields(final Config config) { validateKafkaPKFields(pkFieldsConfigValue, pkFields); break; case "record_key": + // If empty then all fields from the key struct apply, otherwise the desired fields case "record_value": - validatePKFieldsRequired(pkFieldsConfigValue, pkFields); + // If empty then all fields from the value struct apply, otherwise the desired fields break; default: pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + pkMode); @@ -535,12 +536,4 @@ private static void validateKafkaPKFields(final ConfigValue pkFieldsConfigValue, ); } } - - private static void validatePKFieldsRequired(final ConfigValue pkFieldsConfigValue, final List pkFields) { - if (pkFields == null || pkFields.isEmpty()) { - pkFieldsConfigValue.addErrorMessage( - "Primary key fields must be set when pkMode is 'record_key' or 'record_value'." - ); - } - } } diff --git a/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java index 3af67b05..9ee2caf8 100644 --- a/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java +++ b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java @@ -127,40 +127,6 @@ public void shouldValidatePKModeKafkaWithInvalidPKFields() { ))); } - @Test - public void shouldValidatePKModeRecordKeyWithNoPKFields() { - final Map props = new HashMap<>(); - props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); - props.put(JdbcSinkConfig.PK_MODE, "record_key"); - - final Config config = new JdbcSinkConnector().validate(props); - - assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); - assertTrue(config.configValues().stream() - .filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS)) - .flatMap(cv -> cv.errorMessages().stream()) - .anyMatch(msg -> msg.contains( - "Primary key fields must be set when pkMode is 'record_key' or 'record_value'" - ))); - } - - @Test - public void shouldValidatePKModeRecordValueWithNoPKFields() { - final Map props = new HashMap<>(); - props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); - props.put(JdbcSinkConfig.PK_MODE, "record_value"); - - final Config config = new JdbcSinkConnector().validate(props); - - assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); - assertTrue(config.configValues().stream() - .filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS)) - .flatMap(cv -> cv.errorMessages().stream()) - .anyMatch(msg -> msg.contains( - "Primary key fields must be set when pkMode is 'record_key' or 'record_value'" - ))); - } - @Test public void shouldValidateValidPKModeAndPKFields() { final Map props = new HashMap<>(); diff --git a/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskUpdateTest.java b/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskUpdateTest.java index 31eff9b1..54be2646 100644 --- a/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskUpdateTest.java +++ b/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskUpdateTest.java @@ -301,13 +301,15 @@ public void testTimestampWithDelay() throws Exception { final Long currentTime = new Date().getTime(); // Validate that we are seeing 2,3 but not 4,5 as they are getting delayed to the next round - // Using "toString" and not UTC because Derby's current_timestamp is always local time + // using "toString" and not UTC because Derby's current_timestamp is always local time // (i.e. doesn't honor Calendar settings) db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime).toString(), "id", 2); db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 1L).toString(), "id", 3); db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 500L).toString(), "id", 4); db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 501L).toString(), "id", 5); + // avoid flaky test where only 1 record gets received + Thread.sleep(1); verifyPoll(2, "id", Arrays.asList(2, 3), true, false, false, TOPIC_PREFIX + SINGLE_TABLE_NAME); // make sure we get the rest