Skip to content

Commit

Permalink
fix: re-allow empty pk.fields for some pk.mode
Browse files Browse the repository at this point in the history
Recently added validations (see PR #335) are preventing the
support of schema evolution use-cases, e.g.

Documentation:
```
If 'pk.mode' is 'record_key' and 'pk.fields' is empty,
then all fields from the key struct will be used.
```

Configuration:
```
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true",
```

Validation rule:
```
Primary key fields must be set when pkMode is 'record_key'.
```

The purpose of this PR is to re-enable this feature
for both `record_key` and `record_value` modes.
  • Loading branch information
jclarysse committed Feb 17, 2025
1 parent 1c9f858 commit a20df4d
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 44 deletions.
11 changes: 2 additions & 9 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,9 @@ public static void validatePKModeAgainstPKFields(final Config config) {
validateKafkaPKFields(pkFieldsConfigValue, pkFields);
break;
case "record_key":
// If empty then all fields from the key struct apply, otherwise the desired fields
case "record_value":
validatePKFieldsRequired(pkFieldsConfigValue, pkFields);
// If empty then all fields from the value struct apply, otherwise the desired fields
break;
default:
pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + pkMode);
Expand All @@ -535,12 +536,4 @@ private static void validateKafkaPKFields(final ConfigValue pkFieldsConfigValue,
);
}
}

private static void validatePKFieldsRequired(final ConfigValue pkFieldsConfigValue, final List<String> pkFields) {
if (pkFields == null || pkFields.isEmpty()) {
pkFieldsConfigValue.addErrorMessage(
"Primary key fields must be set when pkMode is 'record_key' or 'record_value'."
);
}
}
}
34 changes: 0 additions & 34 deletions src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,40 +127,6 @@ public void shouldValidatePKModeKafkaWithInvalidPKFields() {
)));
}

@Test
public void shouldValidatePKModeRecordKeyWithNoPKFields() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
props.put(JdbcSinkConfig.PK_MODE, "record_key");

final Config config = new JdbcSinkConnector().validate(props);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains(
"Primary key fields must be set when pkMode is 'record_key' or 'record_value'"
)));
}

@Test
public void shouldValidatePKModeRecordValueWithNoPKFields() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
props.put(JdbcSinkConfig.PK_MODE, "record_value");

final Config config = new JdbcSinkConnector().validate(props);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains(
"Primary key fields must be set when pkMode is 'record_key' or 'record_value'"
)));
}

@Test
public void shouldValidateValidPKModeAndPKFields() {
final Map<String, String> props = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,15 @@ public void testTimestampWithDelay() throws Exception {
final Long currentTime = new Date().getTime();

// Validate that we are seeing 2,3 but not 4,5 as they are getting delayed to the next round
// Using "toString" and not UTC because Derby's current_timestamp is always local time
// using "toString" and not UTC because Derby's current_timestamp is always local time
// (i.e. doesn't honor Calendar settings)
db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime).toString(), "id", 2);
db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 1L).toString(), "id", 3);
db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 500L).toString(), "id", 4);
db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 501L).toString(), "id", 5);

// avoid flaky test where only 1 record gets received
Thread.sleep(1);
verifyPoll(2, "id", Arrays.asList(2, 3), true, false, false, TOPIC_PREFIX + SINGLE_TABLE_NAME);

// make sure we get the rest
Expand Down

0 comments on commit a20df4d

Please sign in to comment.