Skip to content

Commit

Permalink
Merge pull request #2 from getsentry/fpacifici/internal_topicctl_dele…
Browse files Browse the repository at this point in the history
…tion

Same as segmentio#198
This is a PR in our fork so we can merge it in master here and start working on that while the PR on upstream gets reviewed.
  • Loading branch information
fpacifici authored Jun 14, 2024
2 parents aedd232 + 6972a4c commit ad79c96
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 6 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,8 @@ The `apply` subcommand can make changes, but under the following conditions:
8. Before applying, the tool checks the cluster ID against the expected value in the
cluster config. This can help prevent errors around applying in the wrong cluster when multiple
clusters are accessed through the same address, e.g `localhost:2181`.
9. If the `destructive` CLI argument is passed, `apply` deletes the settings that are
set on the broker but not set in configuration.

The `reset-offsets` command can also make changes in the cluster and should be used carefully.

Expand Down
8 changes: 8 additions & 0 deletions cmd/topicctl/subcmd/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type applyCmdConfig struct {
retentionDropStepDurationStr string
skipConfirm bool
ignoreFewerPartitionsError bool
destructive bool
sleepLoopDuration time.Duration
failFast bool

Expand Down Expand Up @@ -107,6 +108,12 @@ func init() {
false,
"Don't return error when topic's config specifies fewer partitions than it currently has",
)
applyCmd.Flags().BoolVar(
&applyConfig.destructive,
"destructive",
false,
"Deletes topic settings from the broker if the settings are present on the broker but not in the config",
)
applyCmd.Flags().DurationVar(
&applyConfig.sleepLoopDuration,
"sleep-loop-duration",
Expand Down Expand Up @@ -259,6 +266,7 @@ func applyTopic(
RetentionDropStepDuration: applyConfig.retentionDropStepDuration,
SkipConfirm: applyConfig.skipConfirm,
IgnoreFewerPartitionsError: applyConfig.ignoreFewerPartitionsError,
Destructive: applyConfig.destructive,
SleepLoopDuration: applyConfig.sleepLoopDuration,
TopicConfig: topicConfig,
}
Expand Down
1 change: 1 addition & 0 deletions cmd/topicctl/subcmd/rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ func rebalanceApplyTopic(
AutoContinueRebalance: true, // to continue without prompts
RetentionDropStepDuration: retentionDropStepDuration, // not needed for rebalance
SkipConfirm: true, // to enforce action: rebalance
Destructive: false, // Irrelevant here
SleepLoopDuration: rebalanceConfig.sleepLoopDuration,
TopicConfig: topicConfig,
}
Expand Down
27 changes: 21 additions & 6 deletions pkg/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type TopicApplierConfig struct {
RetentionDropStepDuration time.Duration
SkipConfirm bool
IgnoreFewerPartitionsError bool
Destructive bool
SleepLoopDuration time.Duration
TopicConfig config.TopicConfig
}
Expand Down Expand Up @@ -392,6 +393,8 @@ func (t *TopicApplier) updateSettings(
return err
}

configEntries := []kafka.ConfigEntry{}

if len(diffKeys) > 0 {
diffsTable, err := FormatSettingsDiff(topicSettings, topicInfo.Config, diffKeys)
if err != nil {
Expand All @@ -416,6 +419,23 @@ func (t *TopicApplier) updateSettings(
)
}

configEntries, err = topicSettings.ToConfigEntries(diffKeys)
if err != nil {
return err
}
}

if len(missingKeys) > 0 && t.config.Destructive {
log.Infof(
"Found %d key(s) set in cluster but missing from config to be deleted:\n%s",
len(missingKeys),
FormatMissingKeys(topicInfo.Config, missingKeys),
)

configEntries = append(configEntries, topicSettings.ToEmptyConfigEntries(missingKeys)...)
}

if len(configEntries) > 0 {
if t.config.DryRun {
log.Infof("Skipping update because dryRun is set to true")
return nil
Expand All @@ -430,11 +450,6 @@ func (t *TopicApplier) updateSettings(
}
log.Infof("OK, updating")

configEntries, err := topicSettings.ToConfigEntries(diffKeys)
if err != nil {
return err
}

_, err = t.adminClient.UpdateTopicConfig(
ctx,
t.topicName,
Expand All @@ -446,7 +461,7 @@ func (t *TopicApplier) updateSettings(
}
}

if len(missingKeys) > 0 {
if len(missingKeys) > 0 && !t.config.Destructive {
log.Warnf(
"Found %d key(s) set in cluster but missing from config:\n%s\nThese will be left as-is.",
len(missingKeys),
Expand Down
21 changes: 21 additions & 0 deletions pkg/apply/apply_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,27 @@ func TestApplyBasicUpdates(t *testing.T) {
applier.topicConfig.Spec.ReplicationFactor = 3
err = applier.Apply(ctx)
require.NotNil(t, err)
applier.topicConfig.Spec.ReplicationFactor = 2

// Settings are not deleted if Destructive is false. They are
// if it is true
delete(applier.topicConfig.Spec.Settings, "cleanup.policy")
err = applier.Apply(ctx)
require.NoError(t, err)
topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true)
require.NoError(t, err)

assert.Equal(t, "delete", topicInfo.Config["cleanup.policy"])

applier.config.Destructive = true
err = applier.Apply(ctx)
require.NoError(t, err)
topicInfo, err = applier.adminClient.GetTopic(ctx, topicName, true)
require.NoError(t, err)

_, present := topicInfo.Config["cleanup.policy"]
assert.False(t, present)

}

func TestApplyPlacementUpdates(t *testing.T) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/config/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,22 @@ func (t TopicSettings) ToConfigEntries(keys []string) ([]kafka.ConfigEntry, erro
return entries, nil
}

// Produces a slice of kafka-go config entries with empty value. Thus used
// for deletion of the setting.
func (t TopicSettings) ToEmptyConfigEntries(keys []string) []kafka.ConfigEntry {
entries := []kafka.ConfigEntry{}

if keys != nil {
for _, key := range keys {
entries = append(
entries,
kafka.ConfigEntry{ConfigName: key, ConfigValue: ""},
)
}
}
return entries
}

// HasKey returns whether the current settings instance contains the argument key.
func (t TopicSettings) HasKey(key string) bool {
_, ok := t[key]
Expand Down

0 comments on commit ad79c96

Please sign in to comment.