Skip to content

Commit

Permalink
[NOJIRA] chore: merge from upstream (#2)
Browse files Browse the repository at this point in the history
* chore(deps): bump testcontainersVersion from 1.20.1 to 1.20.2

Bumps `testcontainersVersion` from 1.20.1 to 1.20.2.

Updates `org.testcontainers:junit-jupiter` from 1.20.1 to 1.20.2
- [Release notes](https://github.com/testcontainers/testcontainers-java/releases)
- [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md)
- [Commits](testcontainers/testcontainers-java@1.20.1...1.20.2)

Updates `org.testcontainers:kafka` from 1.20.1 to 1.20.2
- [Release notes](https://github.com/testcontainers/testcontainers-java/releases)
- [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md)
- [Commits](testcontainers/testcontainers-java@1.20.1...1.20.2)

Updates `org.testcontainers:testcontainers` from 1.20.1 to 1.20.2
- [Release notes](https://github.com/testcontainers/testcontainers-java/releases)
- [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md)
- [Commits](testcontainers/testcontainers-java@1.20.1...1.20.2)

Updates `org.testcontainers:postgresql` from 1.20.1 to 1.20.2
- [Release notes](https://github.com/testcontainers/testcontainers-java/releases)
- [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md)
- [Commits](testcontainers/testcontainers-java@1.20.1...1.20.2)

Updates `org.testcontainers:oracle-free` from 1.20.1 to 1.20.2
- [Release notes](https://github.com/testcontainers/testcontainers-java/releases)
- [Changelog](https://github.com/testcontainers/testcontainers-java/blob/main/CHANGELOG.md)
- [Commits](testcontainers/testcontainers-java@1.20.1...1.20.2)

---
updated-dependencies:
- dependency-name: org.testcontainers:junit-jupiter
  dependency-type: direct:production
  update-type: version-update:semver-patch
- dependency-name: org.testcontainers:kafka
  dependency-type: direct:production
  update-type: version-update:semver-patch
- dependency-name: org.testcontainers:testcontainers
  dependency-type: direct:production
  update-type: version-update:semver-patch
- dependency-name: org.testcontainers:postgresql
  dependency-type: direct:production
  update-type: version-update:semver-patch
- dependency-name: org.testcontainers:oracle-free
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

* fix: re-allow empty `pk.fields` for some `pk.mode`

Recently added validations (see PR Aiven-Open#335) are preventing the
support of schema evolution use-cases, e.g.

Documentation:
```
If 'pk.mode' is 'record_key' and 'pk.fields' is empty,
then all fields from the key struct will be used.
```

Configuration:
```
"pk.mode": "record_key",
"auto.create": "true",
"auto.evolve": "true",
```

Validation rule:
```
Primary key fields must be set when pkMode is 'record_key'.
```

The purpose of this PR is to re-enable this feature
for both `record_key` and `record_value` modes.

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Anatoly Popov <anatolii.popov@aiven.io>
Co-authored-by: Julien Clarysse <julien.clarysse@aiven.io>
Co-authored-by: Jonas Keeling <jonas.keeling@aiven.io>
  • Loading branch information
5 people authored Feb 25, 2025
1 parent 2fae56b commit f3a6dcd
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 45 deletions.
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ val hamcrestVersion = "2.2"
val jacksonVersion = "2.17.2" // This Jackson is used in the tests.
val jupiterVersion = "5.11.0"
val servletVersion = "4.0.1"
val testcontainersVersion = "1.20.1"
val testcontainersVersion = "1.20.2"
val awaitilityVersion = "4.2.1"
val log4jVersion = "2.20.0"

Expand Down
11 changes: 2 additions & 9 deletions src/main/java/io/aiven/connect/jdbc/sink/JdbcSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -510,8 +510,9 @@ public static void validatePKModeAgainstPKFields(final Config config) {
validateKafkaPKFields(pkFieldsConfigValue, pkFields);
break;
case "record_key":
// If empty then all fields from the key struct apply, otherwise the desired fields
case "record_value":
validatePKFieldsRequired(pkFieldsConfigValue, pkFields);
// If empty then all fields from the value struct apply, otherwise the desired fields
break;
default:
pkFieldsConfigValue.addErrorMessage("Invalid pkMode value: " + pkMode);
Expand All @@ -535,12 +536,4 @@ private static void validateKafkaPKFields(final ConfigValue pkFieldsConfigValue,
);
}
}

private static void validatePKFieldsRequired(final ConfigValue pkFieldsConfigValue, final List<String> pkFields) {
if (pkFields == null || pkFields.isEmpty()) {
pkFieldsConfigValue.addErrorMessage(
"Primary key fields must be set when pkMode is 'record_key' or 'record_value'."
);
}
}
}
34 changes: 0 additions & 34 deletions src/test/java/io/aiven/connect/jdbc/sink/JdbcSinkConfigTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -127,40 +127,6 @@ public void shouldValidatePKModeKafkaWithInvalidPKFields() {
)));
}

@Test
public void shouldValidatePKModeRecordKeyWithNoPKFields() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
props.put(JdbcSinkConfig.PK_MODE, "record_key");

final Config config = new JdbcSinkConnector().validate(props);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains(
"Primary key fields must be set when pkMode is 'record_key' or 'record_value'"
)));
}

@Test
public void shouldValidatePKModeRecordValueWithNoPKFields() {
final Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL_CONFIG, "jdbc://localhost");
props.put(JdbcSinkConfig.PK_MODE, "record_value");

final Config config = new JdbcSinkConnector().validate(props);

assertTrue(config.configValues().stream().anyMatch(cv -> cv.errorMessages().size() > 0));
assertTrue(config.configValues().stream()
.filter(cv -> cv.name().equals(JdbcSinkConfig.PK_FIELDS))
.flatMap(cv -> cv.errorMessages().stream())
.anyMatch(msg -> msg.contains(
"Primary key fields must be set when pkMode is 'record_key' or 'record_value'"
)));
}

@Test
public void shouldValidateValidPKModeAndPKFields() {
final Map<String, String> props = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,15 @@ public void testTimestampWithDelay() throws Exception {
final Long currentTime = new Date().getTime();

// Validate that we are seeing 2,3 but not 4,5 as they are getting delayed to the next round
// Using "toString" and not UTC because Derby's current_timestamp is always local time
// using "toString" and not UTC because Derby's current_timestamp is always local time
// (i.e. doesn't honor Calendar settings)
db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime).toString(), "id", 2);
db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 1L).toString(), "id", 3);
db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 500L).toString(), "id", 4);
db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 501L).toString(), "id", 5);

// avoid flaky test where only 1 record gets received
Thread.sleep(1);
verifyPoll(2, "id", Arrays.asList(2, 3), true, false, false, TOPIC_PREFIX + SINGLE_TABLE_NAME);

// make sure we get the rest
Expand Down

0 comments on commit f3a6dcd

Please sign in to comment.