Skip to content

Commit

Permalink
[GUI] tweak quota
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Feb 10, 2025
1 parent f2e2125 commit 7a88265
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 77 deletions.
6 changes: 3 additions & 3 deletions common/src/main/java/org/astraea/common/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ default CompletionStage<Map<Integer, Set<String>>> brokerFolders() {
/**
* remove the connection quotas for given ip addresses
*
* @param ips to delete connection quotas
* @param ips to delete connection quotas. Empty means you want to remove default quota
*/
CompletionStage<Void> unsetConnectionQuotas(Set<String> ips);

Expand All @@ -246,7 +246,7 @@ default CompletionStage<Map<Integer, Set<String>>> brokerFolders() {
/**
* remove the producer rate quotas for given client ids
*
* @param clientIds to delete producer rate quotas
* @param clientIds to delete producer rate quotas. Empty means you want to remove default quota
*/
CompletionStage<Void> unsetProducerQuotas(Set<String> clientIds);

Expand All @@ -263,7 +263,7 @@ default CompletionStage<Map<Integer, Set<String>>> brokerFolders() {
/**
* remove the consumer rate quotas for given client ids
*
* @param clientIds to delete consumer rate quotas
* @param clientIds to delete consumer rate quotas. Empty means you want to remove default quota
*/
CompletionStage<Void> unsetConsumerQuotas(Set<String> clientIds);

Expand Down
36 changes: 36 additions & 0 deletions common/src/main/java/org/astraea/common/admin/AdminImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,18 @@ public CompletionStage<Void> setConnectionQuota(int rate) {

@Override
public CompletionStage<Void> unsetConnectionQuotas(Set<String> ips) {
if (ips.isEmpty()) {
return to(
kafkaAdmin
.alterClientQuotas(
List.of(
new ClientQuotaAlteration(
new ClientQuotaEntity(entity(ClientQuotaEntity.IP)),
List.of(
new ClientQuotaAlteration.Op(
QuotaConfigs.IP_CONNECTION_RATE_CONFIG, null)))))
.all());
}
return to(
kafkaAdmin
.alterClientQuotas(
Expand Down Expand Up @@ -843,6 +855,18 @@ public CompletionStage<Void> setConsumerQuota(DataRate rate) {

@Override
public CompletionStage<Void> unsetConsumerQuotas(Set<String> clientIds) {
if (clientIds.isEmpty()) {
return to(
kafkaAdmin
.alterClientQuotas(
List.of(
new ClientQuotaAlteration(
new ClientQuotaEntity(entity(ClientQuotaEntity.CLIENT_ID)),
List.of(
new ClientQuotaAlteration.Op(
QuotaConfigs.CONSUMER_BYTE_RATE_CONFIG, null)))))
.all());
}
return to(
kafkaAdmin
.alterClientQuotas(
Expand Down Expand Up @@ -894,6 +918,18 @@ public CompletionStage<Void> setProducerQuota(DataRate rate) {

@Override
public CompletionStage<Void> unsetProducerQuotas(Set<String> clientIds) {
if (clientIds.isEmpty()) {
return to(
kafkaAdmin
.alterClientQuotas(
List.of(
new ClientQuotaAlteration(
new ClientQuotaEntity(entity(ClientQuotaEntity.CLIENT_ID)),
List.of(
new ClientQuotaAlteration.Op(
QuotaConfigs.PRODUCER_BYTE_RATE_CONFIG, null)))))
.all());
}
return to(
kafkaAdmin
.alterClientQuotas(
Expand Down
161 changes: 87 additions & 74 deletions gui/src/main/java/org/astraea/gui/tab/QuotaNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javafx.geometry.Side;
Expand All @@ -45,36 +45,35 @@ private static Node connectionNode(Context context) {
var rateKey = "connections/second";
var multiInput =
List.of(
TextInput.required(ipLabelKey, EditableText.singleLine().disallowEmpty().build()),
TextInput.of(ipLabelKey, EditableText.singleLine().disallowEmpty().build()),
TextInput.of(rateKey, EditableText.singleLine().onlyNumber().build()));
var firstPart =
FirstPart.builder()
.textInputs(multiInput)
.clickName("ALTER")
.tableRefresher(
(argument, logger) ->
Optional.ofNullable(argument.nonEmptyTexts().get(rateKey))
.map(
rate ->
context
.admin()
.setConnectionQuotas(
Map.of(
argument.nonEmptyTexts().get(ipLabelKey),
Integer.parseInt(rate))))
.orElseGet(
() ->
context
.admin()
.unsetConnectionQuotas(
Set.of(argument.nonEmptyTexts().get(ipLabelKey))))
.thenApply(
ignored -> {
logger.log(
"succeed to alter rate for "
+ argument.nonEmptyTexts().get(ipLabelKey));
return List.of();
}))
(argument, logger) -> {
var ip = argument.nonEmptyTexts().get(ipLabelKey);
var rate = argument.nonEmptyTexts().get(rateKey);
final CompletionStage<Void> result;
if (ip == null && rate == null)
result = context.admin().unsetConnectionQuotas(Set.of());
else if (ip != null && rate != null)
result =
context.admin().setConnectionQuotas(Map.of(ip, Integer.parseInt(rate)));
else if (ip != null) result = context.admin().unsetConnectionQuotas(Set.of(ip));
else result = context.admin().setConnectionQuota(Integer.parseInt(rate));
return result.thenApply(
ignored -> {
if (ip == null && rate == null)
logger.log("succeed to remove default quota");
else if (ip != null && rate != null)
logger.log("succeed to alter " + rate + " for " + ip);
else if (ip != null) logger.log("succeed to remove quota from " + ip);
else logger.log("succeed to alter " + rate + " for default quota");
return List.of();
});
})
.build();
return PaneBuilder.of().firstPart(firstPart).build();
}
Expand All @@ -84,36 +83,41 @@ private static Node producerNode(Context context) {
var byteRateKey = "MB/second";
var multiInput =
List.of(
TextInput.required(clientIdLabelKey, EditableText.singleLine().disallowEmpty().build()),
TextInput.of(clientIdLabelKey, EditableText.singleLine().disallowEmpty().build()),
TextInput.of(byteRateKey, EditableText.singleLine().onlyNumber().build()));
var firstPart =
FirstPart.builder()
.textInputs(multiInput)
.clickName("ALTER")
.tableRefresher(
(argument, logger) ->
Optional.ofNullable(argument.nonEmptyTexts().get(byteRateKey))
.map(
rate ->
context
.admin()
.setProducerQuotas(
Map.of(
argument.nonEmptyTexts().get(clientIdLabelKey),
DataRate.MB.of(Long.parseLong(rate)))))
.orElseGet(
() ->
context
.admin()
.unsetProducerQuotas(
Set.of(argument.nonEmptyTexts().get(clientIdLabelKey))))
.thenApply(
ignored -> {
logger.log(
"succeed to alter rate for "
+ argument.nonEmptyTexts().get(clientIdLabelKey));
return List.of();
}))
(argument, logger) -> {
var clientId = argument.nonEmptyTexts().get(clientIdLabelKey);
var rate = argument.nonEmptyTexts().get(byteRateKey);
final CompletionStage<Void> result;
if (clientId == null && rate == null)
result = context.admin().unsetProducerQuotas(Set.of());
else if (clientId != null && rate != null)
result =
context
.admin()
.setProducerQuotas(
Map.of(clientId, DataRate.MB.of(Long.parseLong(rate))));
else if (clientId != null)
result = context.admin().unsetProducerQuotas(Set.of(clientId));
else
result = context.admin().setProducerQuota(DataRate.MB.of(Long.parseLong(rate)));
return result.thenApply(
ignored -> {
if (clientId == null && rate == null)
logger.log("succeed to remove default quota");
else if (clientId != null && rate != null)
logger.log("succeed to alter " + rate + " for " + clientId);
else if (clientId != null)
logger.log("succeed to remove quota from " + clientId);
else logger.log("succeed to alter " + rate + " for default quota");
return List.of();
});
})
.build();
return PaneBuilder.of().firstPart(firstPart).build();
}
Expand All @@ -123,45 +127,54 @@ private static Node consumerNode(Context context) {
var byteRateKey = "MB/second";
var multiInput =
List.of(
TextInput.required(clientIdLabelKey, EditableText.singleLine().disallowEmpty().build()),
TextInput.of(clientIdLabelKey, EditableText.singleLine().disallowEmpty().build()),
TextInput.of(byteRateKey, EditableText.singleLine().onlyNumber().build()));
var firstPart =
FirstPart.builder()
.textInputs(multiInput)
.clickName("ALTER")
.tableRefresher(
(argument, logger) ->
Optional.ofNullable(argument.nonEmptyTexts().get(byteRateKey))
.map(
rate ->
context
.admin()
.setConsumerQuotas(
Map.of(
argument.nonEmptyTexts().get(clientIdLabelKey),
DataRate.MB.of(Long.parseLong(rate)))))
.orElseGet(
() ->
context
.admin()
.unsetConsumerQuotas(
Set.of(argument.nonEmptyTexts().get(clientIdLabelKey))))
.thenApply(
ignored -> {
logger.log(
"succeed to alter rate for "
+ argument.nonEmptyTexts().get(clientIdLabelKey));
return List.of();
}))
(argument, logger) -> {
var clientId = argument.nonEmptyTexts().get(clientIdLabelKey);
var rate = argument.nonEmptyTexts().get(byteRateKey);
final CompletionStage<Void> result;
if (clientId == null && rate == null)
result = context.admin().unsetConsumerQuotas(Set.of());
else if (clientId != null && rate != null)
result =
context
.admin()
.setConsumerQuotas(
Map.of(clientId, DataRate.MB.of(Long.parseLong(rate))));
else if (clientId != null)
result = context.admin().unsetConsumerQuotas(Set.of(clientId));
else
result = context.admin().setConsumerQuota(DataRate.MB.of(Long.parseLong(rate)));
return result.thenApply(
ignored -> {
if (clientId == null && rate == null)
logger.log("succeed to remove default quota");
else if (clientId != null && rate != null)
logger.log("succeed to alter " + rate + " for " + clientId);
else if (clientId != null)
logger.log("succeed to remove quota from " + clientId);
else logger.log("succeed to alter " + rate + " for default quota");
return List.of();
});
})
.build();
return PaneBuilder.of().firstPart(firstPart).build();
}

static LinkedHashMap<String, Object> basicResult(Quota quota) {
return MapUtils.of(
"entity",
quota.targetKey(),
quota.targetValue(),
"name",
quota.targetValue() == null ? "<default>" : quota.targetValue(),
"limit",
quota.limitKey(),
"value",
quota.limitKey().equals(QuotaConfigs.PRODUCER_BYTE_RATE_CONFIG)
|| quota.limitKey().equals(QuotaConfigs.CONSUMER_BYTE_RATE_CONFIG)
? DataSize.Byte.of((long) quota.limitValue())
Expand Down

0 comments on commit 7a88265

Please sign in to comment.