Skip to content

Commit

Permalink
[ADMIN] enable to configure default quota
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Feb 8, 2025
1 parent 736a1f2 commit a9e741d
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
9 changes: 9 additions & 0 deletions common/src/main/java/org/astraea/common/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ default CompletionStage<Map<Integer, Set<String>>> brokerFolders() {
*/
CompletionStage<Void> setConnectionQuotas(Map<String, Integer> ipAndRate);

/** set the connection rate by default */
CompletionStage<Void> setConnectionQuota(int rate);

/**
* remove the connection quotas for given ip addresses
*
Expand All @@ -237,6 +240,9 @@ default CompletionStage<Map<Integer, Set<String>>> brokerFolders() {
*/
CompletionStage<Void> setProducerQuotas(Map<String, DataRate> clientAndRate);

/** set the producer rate by default */
CompletionStage<Void> setProducerQuota(DataRate rate);

/**
* remove the producer rate quotas for given client ids
*
Expand All @@ -251,6 +257,9 @@ default CompletionStage<Map<Integer, Set<String>>> brokerFolders() {
*/
CompletionStage<Void> setConsumerQuotas(Map<String, DataRate> clientAndRate);

/** set the consumer rate by default */
CompletionStage<Void> setConsumerQuota(DataRate rate);

/**
* remove the consumer rate quotas for given client ids
*
Expand Down
49 changes: 49 additions & 0 deletions common/src/main/java/org/astraea/common/admin/AdminImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -770,6 +771,26 @@ public CompletionStage<Void> setConnectionQuotas(Map<String, Integer> ipAndRate)
.all());
}

private static Map<String, String> entity(String name) {
var map = new HashMap<String, String>();
map.put(name, null);
return map;
}

@Override
public CompletionStage<Void> setConnectionQuota(int rate) {
return to(
kafkaAdmin
.alterClientQuotas(
List.of(
new org.apache.kafka.common.quota.ClientQuotaAlteration(
new ClientQuotaEntity(entity(ClientQuotaEntity.IP)),
List.of(
new ClientQuotaAlteration.Op(
QuotaConfigs.IP_CONNECTION_RATE_CONFIG, (double) rate)))))
.all());
}

@Override
public CompletionStage<Void> unsetConnectionQuotas(Set<String> ips) {
return to(
Expand Down Expand Up @@ -806,6 +827,20 @@ public CompletionStage<Void> setConsumerQuotas(Map<String, DataRate> ipAndRate)
.all());
}

@Override
public CompletionStage<Void> setConsumerQuota(DataRate rate) {
return to(
kafkaAdmin
.alterClientQuotas(
List.of(
new ClientQuotaAlteration(
new ClientQuotaEntity(entity(ClientQuotaEntity.CLIENT_ID)),
List.of(
new ClientQuotaAlteration.Op(
QuotaConfigs.CONSUMER_BYTE_RATE_CONFIG, rate.byteRate())))))
.all());
}

@Override
public CompletionStage<Void> unsetConsumerQuotas(Set<String> clientIds) {
return to(
Expand Down Expand Up @@ -843,6 +878,20 @@ public CompletionStage<Void> setProducerQuotas(Map<String, DataRate> ipAndRate)
.all());
}

@Override
public CompletionStage<Void> setProducerQuota(DataRate rate) {
return to(
kafkaAdmin
.alterClientQuotas(
List.of(
new ClientQuotaAlteration(
new ClientQuotaEntity(entity(ClientQuotaEntity.CLIENT_ID)),
List.of(
new ClientQuotaAlteration.Op(
QuotaConfigs.PRODUCER_BYTE_RATE_CONFIG, rate.byteRate())))))
.all());
}

@Override
public CompletionStage<Void> unsetProducerQuotas(Set<String> clientIds) {
return to(
Expand Down

0 comments on commit a9e741d

Please sign in to comment.