Skip to content

Commit

Permalink
[GUI] enable to add/remove voter by node id
Browse files Browse the repository at this point in the history
  • Loading branch information
chia7712 committed Feb 11, 2025
1 parent 4d25bee commit 68ced59
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 49 deletions.
2 changes: 1 addition & 1 deletion common/src/main/java/org/astraea/common/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ default CompletionStage<Map<Integer, Set<String>>> brokerFolders() {

// ---------------------------------[write]---------------------------------//

CompletionStage<Void> addVoter(int nodeId, String directoryId, RaftEndpoint endpoint);
CompletionStage<Void> addVoter(int nodeId, String directoryId, List<RaftEndpoint> endpoints);

CompletionStage<Void> removeVoter(int nodeId, String directoryId);

Expand Down
47 changes: 43 additions & 4 deletions common/src/main/java/org/astraea/common/admin/AdminImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,44 @@ public CompletionStage<List<Broker>> brokers() {
@Override
public CompletionStage<List<Controller>> controllers() {
return to(controllerAdmin.describeCluster().nodes())
.thenApply(
nodes -> nodes.stream().map(n -> new Controller(n.id(), n.host(), n.port())).toList());
.thenCompose(
nodes ->
to(controllerAdmin
.describeConfigs(
nodes.stream()
.map(
n ->
new ConfigResource(
ConfigResource.Type.BROKER, String.valueOf(n.id())))
.toList())
.all())
.thenApply(
configs ->
nodes.stream()
.map(
n ->
new Controller(
n.id(),
n.host(),
n.port(),
new org.astraea.common.admin.Config(
configs
.getOrDefault(
new ConfigResource(
ConfigResource.Type.BROKER,
String.valueOf(n.id())),
new Config(List.of()))
.entries()
.stream()
.filter(
entry ->
entry.value() != null
&& !entry.value().isBlank())
.collect(
Collectors.toMap(
ConfigEntry::name,
ConfigEntry::value)))))
.toList()));
}

private CompletionStage<Map.Entry<String, List<Broker>>> clusterIdAndBrokers() {
Expand Down Expand Up @@ -856,13 +892,16 @@ public CompletionStage<QuorumInfo> quorumInfo() {
}

@Override
public CompletionStage<Void> addVoter(int nodeId, String directoryId, RaftEndpoint endpoint) {
public CompletionStage<Void> addVoter(
int nodeId, String directoryId, List<RaftEndpoint> endpoints) {
return to(
kafkaAdmin
.addRaftVoter(
nodeId,
Uuid.fromString(directoryId),
Set.of(new RaftVoterEndpoint(endpoint.name(), endpoint.host(), endpoint.port())))
endpoints.stream()
.map(e -> new RaftVoterEndpoint(e.name(), e.host(), e.port()))
.collect(Collectors.toSet()))
.all());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
*/
package org.astraea.common.admin;

public record Controller(int id, String host, int port) {}
public record Controller(int id, String host, int port, Config config) {}
158 changes: 115 additions & 43 deletions gui/src/main/java/org/astraea/gui/tab/QuorumNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@
package org.astraea.gui.tab;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javafx.geometry.Side;
import javafx.scene.Node;
import org.apache.kafka.common.network.ListenerName;
import org.astraea.common.FutureUtils;
import org.astraea.common.MapUtils;
import org.astraea.common.admin.Controller;
Expand All @@ -36,40 +43,42 @@
import org.astraea.gui.text.TextInput;

public class QuorumNode {
private static final Pattern URI_PARSE_REGEXP =
Pattern.compile("^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");

private static Node addVoterNode(Context context) {
var id = "node id";
var directoryId = "directory id";
var name = "endpoint name";
var host = "endpoint host";
var port = "endpoint port";
var idKey = "node id";
var multiInput =
List.of(
TextInput.required(id, EditableText.singleLine().disallowEmpty().build()),
TextInput.required(directoryId, EditableText.singleLine().disallowEmpty().build()),
TextInput.required(name, EditableText.singleLine().disallowEmpty().build()),
TextInput.required(host, EditableText.singleLine().disallowEmpty().build()),
TextInput.required(port, EditableText.singleLine().onlyNumber().build()));
List.of(TextInput.required(idKey, EditableText.singleLine().disallowEmpty().build()));
var firstPart =
FirstPart.builder()
.textInputs(multiInput)
.clickName("add voter")
.tableRefresher(
(argument, logger) ->
context
.admin()
.addVoter(
Integer.parseInt(argument.nonEmptyTexts().get(id)),
argument.nonEmptyTexts().get(directoryId),
new RaftEndpoint(
argument.nonEmptyTexts().get(name),
argument.nonEmptyTexts().get(host),
Integer.parseInt(argument.nonEmptyTexts().get(port))))
FutureUtils.combine(
context.admin().quorumInfo(),
context.admin().controllers(),
(quorumInfo, controllers) -> {
var id = Integer.parseInt(argument.nonEmptyTexts().get(idKey));
var directoryId =
quorumInfo.observers().stream()
.filter(e -> e.replicaId() == id)
.findFirst()
.get()
.replicaDirectoryId();
var controller =
controllers.stream().filter(e -> e.id() == id).findFirst().get();
var endpoints = raftEndpoints(controller.config().raw());
return context
.admin()
.addVoter(id, directoryId.toString(), endpoints);
})
.thenApply(
ignored -> {
logger.log(
"succeed to add "
+ argument.nonEmptyTexts().get(id)
+ argument.nonEmptyTexts().get(idKey)
+ " to the voters");
return List.of();
}))
Expand All @@ -78,12 +87,10 @@ private static Node addVoterNode(Context context) {
}

private static Node removeVoterNode(Context context) {
var id = "node id";
var directoryId = "directory id";
var idKey = "node id";
var multiInput =
List.of(
TextInput.required(id, EditableText.singleLine().disallowEmpty().build()),
TextInput.required(directoryId, EditableText.singleLine().disallowEmpty().build()));
List.of(TextInput.required(idKey, EditableText.singleLine().disallowEmpty().build()));

var firstPart =
FirstPart.builder()
.textInputs(multiInput)
Expand All @@ -92,14 +99,30 @@ private static Node removeVoterNode(Context context) {
(argument, logger) ->
context
.admin()
.removeVoter(
Integer.parseInt(argument.nonEmptyTexts().get(id)),
argument.nonEmptyTexts().get(directoryId))
.quorumInfo()
.thenCompose(
quorumInfo ->
context
.admin()
.removeVoter(
Integer.parseInt(argument.nonEmptyTexts().get(idKey)),
Stream.concat(
quorumInfo.voters().stream(),
quorumInfo.observers().stream())
.filter(
e ->
e.replicaId()
== Integer.parseInt(
argument.nonEmptyTexts().get(idKey)))
.findFirst()
.get()
.replicaDirectoryId()
.toString()))
.thenApply(
ignored -> {
logger.log(
"succeed to remove "
+ argument.nonEmptyTexts().get(id)
+ argument.nonEmptyTexts().get(idKey)
+ " from the voters");
return List.of();
}))
Expand All @@ -110,20 +133,20 @@ private static Node removeVoterNode(Context context) {
private static List<Map<String, Object>> basicResult(
QuorumInfo quorumInfo, List<Controller> controllers) {

Function<Integer, String> endpoint =
Function<Integer, String> endpointString =
id -> {
var r =
quorumInfo.endpoints().getOrDefault(id, List.of()).stream()
.map(Object::toString)
.collect(Collectors.joining(","));
var r = quorumInfo.endpoints().getOrDefault(id, List.of());
if (r.isEmpty()) {
r =
controllers.stream()
.filter(c -> c.id() == id)
.map(Controller::toString)
.collect(Collectors.joining(","));
.findFirst()
.map(c -> raftEndpoints(c.config().raw()))
.orElse(List.of());
}
return r;
return r.stream()
.map(e -> e.name() + "@" + e.host() + ":" + e.port())
.collect(Collectors.joining(","));
};

var result = new ArrayList<Map<String, Object>>();
Expand All @@ -147,9 +170,7 @@ private static List<Map<String, Object>> basicResult(
"last caught-up timestamp",
rs.lastCaughtUpTimestamp().orElse(-1),
"endpoints",
quorumInfo.endpoints().getOrDefault(rs.replicaId(), List.of()).stream()
.map(Record::toString)
.collect(Collectors.joining(","))))
endpointString.apply(rs.replicaId())))
.toList());

result.addAll(
Expand All @@ -160,7 +181,7 @@ private static List<Map<String, Object>> basicResult(
"role",
controllers.stream().anyMatch(b -> b.id() == rs.replicaId())
? "observer"
: "observer (broker)",
: "broker",
"leader",
quorumInfo.leaderId() == rs.replicaId(),
"node id",
Expand All @@ -174,7 +195,7 @@ private static List<Map<String, Object>> basicResult(
"last caught-up timestamp",
rs.lastCaughtUpTimestamp().orElse(-1),
"endpoints",
endpoint.apply(rs.replicaId())))
endpointString.apply(rs.replicaId())))
.toList());
return result;
}
Expand Down Expand Up @@ -205,4 +226,55 @@ public static Node of(Context context) {
removeVoterNode(context)))
.node();
}

static List<RaftEndpoint> raftEndpoints(Map<String, String> props) {
var listeners =
listenerToRaftEndpoint(props.getOrDefault("listeners", "")).stream()
.collect(Collectors.toMap(RaftEndpoint::name, e -> e));
var advertisedListeners =
listenerToRaftEndpoint(props.getOrDefault("advertised.listeners", "")).stream()
.collect(Collectors.toMap(RaftEndpoint::name, e -> e));
if (!props.containsKey("controller.listener.names")) {
throw new RuntimeException("controller.listener.names was not found");
}
return Arrays.stream(props.get("controller.listener.names").split(","))
.map(name -> ListenerName.normalised(name).value())
.map(
name ->
Objects.requireNonNull(
advertisedListeners.getOrDefault(name, listeners.get(name)),
"Cannot find information about controller listener name: " + name))
.toList();
}

private static List<RaftEndpoint> listenerToRaftEndpoint(String input) {
return parseCsvList(input.trim()).stream()
.map(
entry -> {
Matcher matcher = URI_PARSE_REGEXP.matcher(entry);
if (!matcher.matches()) {
throw new RuntimeException("Unable to parse " + entry + " to a broker endpoint");
}
ListenerName listenerName = ListenerName.normalised(matcher.group(1));
String host = matcher.group(2);
if (host.isEmpty()) {
// By Kafka convention, an empty host string indicates binding to the wildcard
// address, and is stored as null.
host = null;
}
String portString = matcher.group(3);
return new RaftEndpoint(listenerName.value(), host, Integer.parseInt(portString));
})
.toList();
}

private static List<String> parseCsvList(String csvList) {
if (csvList == null || csvList.isEmpty()) {
return Collections.emptyList();
} else {
return Stream.of(csvList.split("\\s*,\\s*"))
.filter(v -> !v.isEmpty())
.collect(Collectors.toList());
}
}
}

0 comments on commit 68ced59

Please sign in to comment.