From 7a8826519dbd64e3271e5e7dedbdef8fab610d1f Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Mon, 10 Feb 2025 17:29:36 +0800 Subject: [PATCH] [GUI] tweak quota --- .../java/org/astraea/common/admin/Admin.java | 6 +- .../org/astraea/common/admin/AdminImpl.java | 36 ++++ .../java/org/astraea/gui/tab/QuotaNode.java | 161 ++++++++++-------- 3 files changed, 126 insertions(+), 77 deletions(-) diff --git a/common/src/main/java/org/astraea/common/admin/Admin.java b/common/src/main/java/org/astraea/common/admin/Admin.java index 6c3c54fbe8..a6df8ce866 100644 --- a/common/src/main/java/org/astraea/common/admin/Admin.java +++ b/common/src/main/java/org/astraea/common/admin/Admin.java @@ -229,7 +229,7 @@ default CompletionStage>> brokerFolders() { /** * remove the connection quotas for given ip addresses * - * @param ips to delete connection quotas + * @param ips to delete connection quotas. Empty means you want to remove default quota */ CompletionStage unsetConnectionQuotas(Set ips); @@ -246,7 +246,7 @@ default CompletionStage>> brokerFolders() { /** * remove the producer rate quotas for given client ids * - * @param clientIds to delete producer rate quotas + * @param clientIds to delete producer rate quotas. Empty means you want to remove default quota */ CompletionStage unsetProducerQuotas(Set clientIds); @@ -263,7 +263,7 @@ default CompletionStage>> brokerFolders() { /** * remove the consumer rate quotas for given client ids * - * @param clientIds to delete consumer rate quotas + * @param clientIds to delete consumer rate quotas. Empty means you want to remove default quota */ CompletionStage unsetConsumerQuotas(Set clientIds); diff --git a/common/src/main/java/org/astraea/common/admin/AdminImpl.java b/common/src/main/java/org/astraea/common/admin/AdminImpl.java index 4d16854334..1e9949c5c0 100644 --- a/common/src/main/java/org/astraea/common/admin/AdminImpl.java +++ b/common/src/main/java/org/astraea/common/admin/AdminImpl.java @@ -793,6 +793,18 @@ public CompletionStage setConnectionQuota(int rate) { @Override public CompletionStage unsetConnectionQuotas(Set ips) { + if (ips.isEmpty()) { + return to( + kafkaAdmin + .alterClientQuotas( + List.of( + new ClientQuotaAlteration( + new ClientQuotaEntity(entity(ClientQuotaEntity.IP)), + List.of( + new ClientQuotaAlteration.Op( + QuotaConfigs.IP_CONNECTION_RATE_CONFIG, null))))) + .all()); + } return to( kafkaAdmin .alterClientQuotas( @@ -843,6 +855,18 @@ public CompletionStage setConsumerQuota(DataRate rate) { @Override public CompletionStage unsetConsumerQuotas(Set clientIds) { + if (clientIds.isEmpty()) { + return to( + kafkaAdmin + .alterClientQuotas( + List.of( + new ClientQuotaAlteration( + new ClientQuotaEntity(entity(ClientQuotaEntity.CLIENT_ID)), + List.of( + new ClientQuotaAlteration.Op( + QuotaConfigs.CONSUMER_BYTE_RATE_CONFIG, null))))) + .all()); + } return to( kafkaAdmin .alterClientQuotas( @@ -894,6 +918,18 @@ public CompletionStage setProducerQuota(DataRate rate) { @Override public CompletionStage unsetProducerQuotas(Set clientIds) { + if (clientIds.isEmpty()) { + return to( + kafkaAdmin + .alterClientQuotas( + List.of( + new ClientQuotaAlteration( + new ClientQuotaEntity(entity(ClientQuotaEntity.CLIENT_ID)), + List.of( + new ClientQuotaAlteration.Op( + QuotaConfigs.PRODUCER_BYTE_RATE_CONFIG, null))))) + .all()); + } return to( kafkaAdmin .alterClientQuotas( diff --git a/gui/src/main/java/org/astraea/gui/tab/QuotaNode.java b/gui/src/main/java/org/astraea/gui/tab/QuotaNode.java index 3f7758f8f8..6b491f66f3 100644 --- a/gui/src/main/java/org/astraea/gui/tab/QuotaNode.java +++ b/gui/src/main/java/org/astraea/gui/tab/QuotaNode.java @@ -19,8 +19,8 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import java.util.stream.Stream; import javafx.geometry.Side; @@ -45,36 +45,35 @@ private static Node connectionNode(Context context) { var rateKey = "connections/second"; var multiInput = List.of( - TextInput.required(ipLabelKey, EditableText.singleLine().disallowEmpty().build()), + TextInput.of(ipLabelKey, EditableText.singleLine().disallowEmpty().build()), TextInput.of(rateKey, EditableText.singleLine().onlyNumber().build())); var firstPart = FirstPart.builder() .textInputs(multiInput) .clickName("ALTER") .tableRefresher( - (argument, logger) -> - Optional.ofNullable(argument.nonEmptyTexts().get(rateKey)) - .map( - rate -> - context - .admin() - .setConnectionQuotas( - Map.of( - argument.nonEmptyTexts().get(ipLabelKey), - Integer.parseInt(rate)))) - .orElseGet( - () -> - context - .admin() - .unsetConnectionQuotas( - Set.of(argument.nonEmptyTexts().get(ipLabelKey)))) - .thenApply( - ignored -> { - logger.log( - "succeed to alter rate for " - + argument.nonEmptyTexts().get(ipLabelKey)); - return List.of(); - })) + (argument, logger) -> { + var ip = argument.nonEmptyTexts().get(ipLabelKey); + var rate = argument.nonEmptyTexts().get(rateKey); + final CompletionStage result; + if (ip == null && rate == null) + result = context.admin().unsetConnectionQuotas(Set.of()); + else if (ip != null && rate != null) + result = + context.admin().setConnectionQuotas(Map.of(ip, Integer.parseInt(rate))); + else if (ip != null) result = context.admin().unsetConnectionQuotas(Set.of(ip)); + else result = context.admin().setConnectionQuota(Integer.parseInt(rate)); + return result.thenApply( + ignored -> { + if (ip == null && rate == null) + logger.log("succeed to remove default quota"); + else if (ip != null && rate != null) + logger.log("succeed to alter " + rate + " for " + ip); + else if (ip != null) logger.log("succeed to remove quota from " + ip); + else logger.log("succeed to alter " + rate + " for default quota"); + return List.of(); + }); + }) .build(); return PaneBuilder.of().firstPart(firstPart).build(); } @@ -84,36 +83,41 @@ private static Node producerNode(Context context) { var byteRateKey = "MB/second"; var multiInput = List.of( - TextInput.required(clientIdLabelKey, EditableText.singleLine().disallowEmpty().build()), + TextInput.of(clientIdLabelKey, EditableText.singleLine().disallowEmpty().build()), TextInput.of(byteRateKey, EditableText.singleLine().onlyNumber().build())); var firstPart = FirstPart.builder() .textInputs(multiInput) .clickName("ALTER") .tableRefresher( - (argument, logger) -> - Optional.ofNullable(argument.nonEmptyTexts().get(byteRateKey)) - .map( - rate -> - context - .admin() - .setProducerQuotas( - Map.of( - argument.nonEmptyTexts().get(clientIdLabelKey), - DataRate.MB.of(Long.parseLong(rate))))) - .orElseGet( - () -> - context - .admin() - .unsetProducerQuotas( - Set.of(argument.nonEmptyTexts().get(clientIdLabelKey)))) - .thenApply( - ignored -> { - logger.log( - "succeed to alter rate for " - + argument.nonEmptyTexts().get(clientIdLabelKey)); - return List.of(); - })) + (argument, logger) -> { + var clientId = argument.nonEmptyTexts().get(clientIdLabelKey); + var rate = argument.nonEmptyTexts().get(byteRateKey); + final CompletionStage result; + if (clientId == null && rate == null) + result = context.admin().unsetProducerQuotas(Set.of()); + else if (clientId != null && rate != null) + result = + context + .admin() + .setProducerQuotas( + Map.of(clientId, DataRate.MB.of(Long.parseLong(rate)))); + else if (clientId != null) + result = context.admin().unsetProducerQuotas(Set.of(clientId)); + else + result = context.admin().setProducerQuota(DataRate.MB.of(Long.parseLong(rate))); + return result.thenApply( + ignored -> { + if (clientId == null && rate == null) + logger.log("succeed to remove default quota"); + else if (clientId != null && rate != null) + logger.log("succeed to alter " + rate + " for " + clientId); + else if (clientId != null) + logger.log("succeed to remove quota from " + clientId); + else logger.log("succeed to alter " + rate + " for default quota"); + return List.of(); + }); + }) .build(); return PaneBuilder.of().firstPart(firstPart).build(); } @@ -123,45 +127,54 @@ private static Node consumerNode(Context context) { var byteRateKey = "MB/second"; var multiInput = List.of( - TextInput.required(clientIdLabelKey, EditableText.singleLine().disallowEmpty().build()), + TextInput.of(clientIdLabelKey, EditableText.singleLine().disallowEmpty().build()), TextInput.of(byteRateKey, EditableText.singleLine().onlyNumber().build())); var firstPart = FirstPart.builder() .textInputs(multiInput) .clickName("ALTER") .tableRefresher( - (argument, logger) -> - Optional.ofNullable(argument.nonEmptyTexts().get(byteRateKey)) - .map( - rate -> - context - .admin() - .setConsumerQuotas( - Map.of( - argument.nonEmptyTexts().get(clientIdLabelKey), - DataRate.MB.of(Long.parseLong(rate))))) - .orElseGet( - () -> - context - .admin() - .unsetConsumerQuotas( - Set.of(argument.nonEmptyTexts().get(clientIdLabelKey)))) - .thenApply( - ignored -> { - logger.log( - "succeed to alter rate for " - + argument.nonEmptyTexts().get(clientIdLabelKey)); - return List.of(); - })) + (argument, logger) -> { + var clientId = argument.nonEmptyTexts().get(clientIdLabelKey); + var rate = argument.nonEmptyTexts().get(byteRateKey); + final CompletionStage result; + if (clientId == null && rate == null) + result = context.admin().unsetConsumerQuotas(Set.of()); + else if (clientId != null && rate != null) + result = + context + .admin() + .setConsumerQuotas( + Map.of(clientId, DataRate.MB.of(Long.parseLong(rate)))); + else if (clientId != null) + result = context.admin().unsetConsumerQuotas(Set.of(clientId)); + else + result = context.admin().setConsumerQuota(DataRate.MB.of(Long.parseLong(rate))); + return result.thenApply( + ignored -> { + if (clientId == null && rate == null) + logger.log("succeed to remove default quota"); + else if (clientId != null && rate != null) + logger.log("succeed to alter " + rate + " for " + clientId); + else if (clientId != null) + logger.log("succeed to remove quota from " + clientId); + else logger.log("succeed to alter " + rate + " for default quota"); + return List.of(); + }); + }) .build(); return PaneBuilder.of().firstPart(firstPart).build(); } static LinkedHashMap basicResult(Quota quota) { return MapUtils.of( + "entity", quota.targetKey(), - quota.targetValue(), + "name", + quota.targetValue() == null ? "" : quota.targetValue(), + "limit", quota.limitKey(), + "value", quota.limitKey().equals(QuotaConfigs.PRODUCER_BYTE_RATE_CONFIG) || quota.limitKey().equals(QuotaConfigs.CONSUMER_BYTE_RATE_CONFIG) ? DataSize.Byte.of((long) quota.limitValue())