diff --git a/build.gradle.kts b/build.gradle.kts index e034295a..fa0522be 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -127,7 +127,7 @@ val hamcrestVersion = "2.2" val jacksonVersion = "2.17.2" // This Jackson is used in the tests. val jupiterVersion = "5.11.0" val servletVersion = "4.0.1" -val testcontainersVersion = "1.20.1" +val testcontainersVersion = "1.20.2" val awaitilityVersion = "4.2.1" val log4jVersion = "2.20.0" diff --git a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java index 03371b1c..02042670 100644 --- a/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java +++ b/src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java @@ -510,8 +510,9 @@ public static void validatePKModeAgainstPKFields(final Config config) { validateKafkaPKFields(pkFieldsConfigValue, pkFields); break; case "record_key": + // If empty then all fields from the key struct apply, otherwise the desired fields case "record_value": - validatePKFieldsRequired(pkFieldsConfigValue, pkFields); + // If empty then all fields from the value struct apply, otherwise the desired fields break; default: pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + pkMode); @@ -535,12 +536,4 @@ private static void validateKafkaPKFields(final ConfigValue pkFieldsConfigValue, ); } } - - private static void validatePKFieldsRequired(final ConfigValue pkFieldsConfigValue, final List pkFields) { - if (pkFields == null || pkFields.isEmpty()) { - pkFieldsConfigValue.addErrorMessage( - "Primary key fields must be set when pkMode is 'record_key' or 'record_value'." - ); - } - } } diff --git a/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java index 3af67b05..9ee2caf8 100644 --- a/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java +++ b/src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java @@ -127,40 +127,6 @@ public void shouldValidatePKModeKafkaWithInvalidPKFields() { ))); } - @Test - public void shouldValidatePKModeRecordKeyWithNoPKFields() { - final Map props = new HashMap<>(); - props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); - props.put(JdbcSinkConfig.PK_MODE, "record_key"); - - final Config config = new JdbcSinkConnector().validate(props); - - assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); - assertTrue(config.configValues().stream() - .filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS)) - .flatMap(cv -> cv.errorMessages().stream()) - .anyMatch(msg -> msg.contains( - "Primary key fields must be set when pkMode is 'record_key' or 'record_value'" - ))); - } - - @Test - public void shouldValidatePKModeRecordValueWithNoPKFields() { - final Map props = new HashMap<>(); - props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost"); - props.put(JdbcSinkConfig.PK_MODE, "record_value"); - - final Config config = new JdbcSinkConnector().validate(props); - - assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0)); - assertTrue(config.configValues().stream() - .filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS)) - .flatMap(cv -> cv.errorMessages().stream()) - .anyMatch(msg -> msg.contains( - "Primary key fields must be set when pkMode is 'record_key' or 'record_value'" - ))); - } - @Test public void shouldValidateValidPKModeAndPKFields() { final Map props = new HashMap<>(); diff --git a/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskUpdateTest.java b/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskUpdateTest.java index 31eff9b1..54be2646 100644 --- a/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskUpdateTest.java +++ b/src/test/java/io/aiven/connect/jdbc/source/JdbcSourceTaskUpdateTest.java @@ -301,13 +301,15 @@ public void testTimestampWithDelay() throws Exception { final Long currentTime = new Date().getTime(); // Validate that we are seeing 2,3 but not 4,5 as they are getting delayed to the next round - // Using "toString" and not UTC because Derby's current_timestamp is always local time + // using "toString" and not UTC because Derby's current_timestamp is always local time // (i.e. doesn't honor Calendar settings) db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime).toString(), "id", 2); db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 1L).toString(), "id", 3); db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 500L).toString(), "id", 4); db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 501L).toString(), "id", 5); + // avoid flaky test where only 1 record gets received + Thread.sleep(1); verifyPoll(2, "id", Arrays.asList(2, 3), true, false, false, TOPIC_PREFIX + SINGLE_TABLE_NAME); // make sure we get the rest