diff --git a/drivers/storage/portworx/component/disruption_budget.go b/drivers/storage/portworx/component/disruption_budget.go index a61803e52..c861c353a 100644 --- a/drivers/storage/portworx/component/disruption_budget.go +++ b/drivers/storage/portworx/component/disruption_budget.go @@ -1,16 +1,19 @@ package component import ( + "context" "fmt" "math" "strconv" "github.com/hashicorp/go-version" + "github.com/libopenstorage/openstorage/api" "github.com/sirupsen/logrus" "google.golang.org/grpc" policyv1 "k8s.io/api/policy/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" @@ -77,9 +80,46 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error { if err := c.createKVDBPodDisruptionBudget(cluster, ownerRef); err != nil { return err } - if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef); err != nil { + // Create node PDB only if parallel upgrade is supported + var err error + c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace) + if err != nil { return err } + + // Get list of portworx storage nodes + nodeClient := api.NewOpenStorageNodeClient(c.sdkConn) + ctx, err := pxutil.SetupContextWithToken(context.Background(), cluster, c.k8sClient, false) + if err != nil { + return err + } + nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters( + ctx, + &api.SdkNodeEnumerateWithFiltersRequest{}, + ) + if err != nil { + return fmt.Errorf("failed to enumerate nodes: %v", err) + } + + if pxutil.ClusterSupportsParallelUpgrade(nodeEnumerateResponse) { + // Get the list of k8s nodes that are part of the current cluster + k8sNodeList := &v1.NodeList{} + err = c.k8sClient.List(context.TODO(), k8sNodeList) + if err != nil { + return err + } + if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil { + return err + } + if err := c.deletePortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil { + return err + } + } else { + if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil { + return err + } + } + return nil } @@ -106,6 +146,7 @@ func (c *disruptionBudget) MarkDeleted() {} func (c *disruptionBudget) createPortworxPodDisruptionBudget( cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference, + nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, ) error { userProvidedMinValue, err := pxutil.MinAvailableForStoragePDB(cluster) if err != nil { @@ -114,12 +155,8 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget( } var minAvailable int - c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace) - if err != nil { - return err - } - storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient) + storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient, nodeEnumerateResponse) if err != nil { c.closeSdkConn() return err @@ -178,6 +215,73 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget( return err } +func (c *disruptionBudget) createPortworxNodePodDisruptionBudget( + cluster *corev1.StorageCluster, + ownerRef *metav1.OwnerReference, + nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, + k8sNodeList *v1.NodeList, +) error { + nodesNeedingPDB, err := pxutil.NodesNeedingPDB(c.k8sClient, nodeEnumerateResponse, k8sNodeList) + if err != nil { + return err + } + errors := []error{} + for _, node := range nodesNeedingPDB { + minAvailable := intstr.FromInt(1) + PDBName := "px-" + node + pdb := &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: PDBName, + Namespace: cluster.Namespace, + OwnerReferences: []metav1.OwnerReference{*ownerRef}, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MinAvailable: &minAvailable, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + constants.LabelKeyClusterName: cluster.Name, + constants.OperatorLabelNodeNameKey: node, + }, + }, + }, + } + err = k8sutil.CreateOrUpdatePodDisruptionBudget(c.k8sClient, pdb, ownerRef) + if err != nil { + logrus.Warnf("Failed to create PDB for node %s: %v", node, err) + errors = append(errors, err) + } + } + return utilerrors.NewAggregate(errors) + +} + +// Delete node pod disruption budget when kubertetes is not part of cluster or portworx does not run on it +func (c *disruptionBudget) deletePortworxNodePodDisruptionBudget( + cluster *corev1.StorageCluster, + ownerRef *metav1.OwnerReference, + nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, + k8sNodeList *v1.NodeList, +) error { + nodesToDeletePDB, err := pxutil.NodesToDeletePDB(c.k8sClient, nodeEnumerateResponse, k8sNodeList) + if err != nil { + return err + } + errors := []error{} + + for _, node := range nodesToDeletePDB { + PDBName := "px-" + node + err = k8sutil.DeletePodDisruptionBudget( + c.k8sClient, PDBName, + cluster.Namespace, *ownerRef, + ) + if err != nil { + logrus.Warnf("Failed to delete PDB for node %s: %v", node, err) + errors = append(errors, err) + } + } + return utilerrors.NewAggregate(errors) +} + func (c *disruptionBudget) createKVDBPodDisruptionBudget( cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference, @@ -186,7 +290,6 @@ func (c *disruptionBudget) createKVDBPodDisruptionBudget( if cluster.Spec.Kvdb != nil && !cluster.Spec.Kvdb.Internal { return nil } - clusterSize := kvdbClusterSize(cluster) minAvailable := intstr.FromInt(clusterSize - 1) pdb := &policyv1.PodDisruptionBudget{ diff --git a/drivers/storage/portworx/components_test.go b/drivers/storage/portworx/components_test.go index 1265e772f..f663275a3 100644 --- a/drivers/storage/portworx/components_test.go +++ b/drivers/storage/portworx/components_test.go @@ -12605,7 +12605,7 @@ func TestPodDisruptionBudgetEnabled(t *testing.T) { expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{ Nodes: []*osdapi.StorageNode{ - {SchedulerNodeName: "node1"}, + {SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, }, } @@ -12694,8 +12694,8 @@ func TestPodDisruptionBudgetEnabled(t *testing.T) { // TestCase: Do not create storage PDB if total nodes with storage is less than 3 expectedNodeEnumerateResp.Nodes = []*osdapi.StorageNode{ - {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1"}, - {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2"}, + {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, {Pools: []*osdapi.StoragePool{}}, {}, } @@ -12727,11 +12727,11 @@ func TestPodDisruptionBudgetEnabled(t *testing.T) { // Also, ignore the annotation if the value is an invalid integer cluster.Annotations[pxutil.AnnotationStoragePodDisruptionBudget] = "invalid" expectedNodeEnumerateResp.Nodes = []*osdapi.StorageNode{ - {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1"}, - {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2"}, + {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, {Pools: []*osdapi.StoragePool{}}, {}, - {Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3"}, + {Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, } err = driver.PreInstall(cluster) @@ -12754,11 +12754,11 @@ func TestPodDisruptionBudgetEnabled(t *testing.T) { // Also, ignore the annotation if the value is an invalid integer cluster.Annotations[pxutil.AnnotationStoragePodDisruptionBudget] = "still_invalid" expectedNodeEnumerateResp.Nodes = []*osdapi.StorageNode{ - {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1"}, - {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2"}, - {Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3"}, - {Pools: []*osdapi.StoragePool{{ID: 4}}, SchedulerNodeName: "node4"}, - {Pools: []*osdapi.StoragePool{{ID: 5}}, SchedulerNodeName: "node5"}, + {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 4}}, SchedulerNodeName: "node4", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 5}}, SchedulerNodeName: "node5", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, {Pools: []*osdapi.StoragePool{}}, {}, } @@ -13392,7 +13392,7 @@ func TestPodDisruptionBudgetWithErrors(t *testing.T) { mockNodeServer.EXPECT(). EnumerateWithFilters(gomock.Any(), gomock.Any()). Return(nil, fmt.Errorf("NodeEnumerate error")). - Times(1) + AnyTimes() cluster.Spec.Security.Enabled = false err = driver.PreInstall(cluster) @@ -13427,9 +13427,9 @@ func TestDisablePodDisruptionBudgets(t *testing.T) { expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{ Nodes: []*osdapi.StorageNode{ - {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1"}, - {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2"}, - {Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3"}, + {Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + {Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, }, } mockNodeServer.EXPECT(). @@ -13521,6 +13521,230 @@ func TestDisablePodDisruptionBudgets(t *testing.T) { require.True(t, errors.IsNotFound(err)) } +func TestCreateNodePodDisruptionBudget(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockNodeServer := mock.NewMockOpenStorageNodeServer(mockCtrl) + sdkServerIP := "127.0.0.1" + sdkServerPort := 21883 + mockSdk := mock.NewSdkServer(mock.SdkServers{ + Node: mockNodeServer, + }) + err := mockSdk.StartOnAddress(sdkServerIP, strconv.Itoa(sdkServerPort)) + require.NoError(t, err) + defer mockSdk.Stop() + + testutil.SetupEtcHosts(t, sdkServerIP, pxutil.PortworxServiceName+".kube-test") + defer testutil.RestoreEtcHosts(t) + + // PDB should not be created for non quorum members, nodes not part of current cluster and nodes in decommissioned state + expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{ + Nodes: []*osdapi.StorageNode{ + {SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {NonQuorumMember: true, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {SchedulerNodeName: "node4", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {NonQuorumMember: false, Status: osdapi.Status_STATUS_DECOMMISSION}, + }, + } + mockNodeServer.EXPECT(). + EnumerateWithFilters(gomock.Any(), gomock.Any()). + Return(expectedNodeEnumerateResp, nil). + AnyTimes() + + cluster := &corev1.StorageCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "px-cluster", + Namespace: "kube-test", + }, + Spec: corev1.StorageClusterSpec{ + Image: "portworx/oci-monitor:3.1.2", + }, + Status: corev1.StorageClusterStatus{ + Phase: string(corev1.ClusterStateRunning), + }, + } + fakeK8sNodes := &v1.NodeList{Items: []v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node2"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node3"}}, + }, + } + + k8sClient := testutil.FakeK8sClient(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: pxutil.PortworxServiceName, + Namespace: cluster.Namespace, + }, + Spec: v1.ServiceSpec{ + ClusterIP: sdkServerIP, + Ports: []v1.ServicePort{ + { + Name: pxutil.PortworxSDKPortName, + Port: int32(sdkServerPort), + }, + }, + }, + }, fakeK8sNodes) + + coreops.SetInstance(coreops.New(fakek8sclient.NewSimpleClientset())) + component.DeregisterAllComponents() + component.RegisterDisruptionBudgetComponent() + + driver := portworx{} + err = driver.Init(k8sClient, runtime.NewScheme(), record.NewFakeRecorder(0)) + require.NoError(t, err) + err = driver.PreInstall(cluster) + require.NoError(t, err) + + // PDB is created for 2 storage nodes and kvdb + pdbList := &policyv1.PodDisruptionBudgetList{} + err = testutil.List(k8sClient, pdbList) + require.NoError(t, err) + require.Len(t, pdbList.Items, 3) + + storagePDB := &policyv1.PodDisruptionBudget{} + err = testutil.Get(k8sClient, storagePDB, "px-node1", cluster.Namespace) + require.NoError(t, err) + require.Equal(t, 1, storagePDB.Spec.MinAvailable.IntValue()) + require.Equal(t, "node1", storagePDB.Spec.Selector.MatchLabels[constants.OperatorLabelNodeNameKey]) + + err = testutil.Get(k8sClient, storagePDB, "px-node3", cluster.Namespace) + require.True(t, errors.IsNotFound(err)) + err = testutil.Get(k8sClient, storagePDB, "px-node4", cluster.Namespace) + require.True(t, errors.IsNotFound(err)) + + // Testcase : PDB per node should not be created for any node even if 1 node is lesser than 3.1.2 + // Use cluster wide PDB in this case + expectedNodeEnumerateResp.Nodes = []*osdapi.StorageNode{ + {SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}}, + } + err = driver.PreInstall(cluster) + require.NoError(t, err) + + storagePDB = &policyv1.PodDisruptionBudget{} + err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName, cluster.Namespace) + require.NoError(t, err) + require.Equal(t, 2, storagePDB.Spec.MinAvailable.IntValue()) + +} + +func TestDeleteNodePodDisruptionBudget(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockNodeServer := mock.NewMockOpenStorageNodeServer(mockCtrl) + sdkServerIP := "127.0.0.1" + sdkServerPort := 21883 + mockSdk := mock.NewSdkServer(mock.SdkServers{ + Node: mockNodeServer, + }) + err := mockSdk.StartOnAddress(sdkServerIP, strconv.Itoa(sdkServerPort)) + require.NoError(t, err) + defer mockSdk.Stop() + + testutil.SetupEtcHosts(t, sdkServerIP, pxutil.PortworxServiceName+".kube-test") + defer testutil.RestoreEtcHosts(t) + + expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{ + Nodes: []*osdapi.StorageNode{ + {SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {NonQuorumMember: true, SchedulerNodeName: "node4", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}}, + {NonQuorumMember: false, Status: osdapi.Status_STATUS_DECOMMISSION}, + }, + } + mockNodeServer.EXPECT(). + EnumerateWithFilters(gomock.Any(), gomock.Any()). + Return(expectedNodeEnumerateResp, nil). + AnyTimes() + + cluster := &corev1.StorageCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "px-cluster", + Namespace: "kube-test", + }, + Spec: corev1.StorageClusterSpec{ + Image: "portworx/oci-monitor:3.1.2", + }, + Status: corev1.StorageClusterStatus{ + Phase: string(corev1.ClusterStateRunning), + }, + } + fakeK8sNodes := &v1.NodeList{Items: []v1.Node{ + {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node2"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node3"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "node4"}}, + }, + } + + k8sClient := testutil.FakeK8sClient(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: pxutil.PortworxServiceName, + Namespace: cluster.Namespace, + }, + Spec: v1.ServiceSpec{ + ClusterIP: sdkServerIP, + Ports: []v1.ServicePort{ + { + Name: pxutil.PortworxSDKPortName, + Port: int32(sdkServerPort), + }, + }, + }, + }, fakeK8sNodes) + + coreops.SetInstance(coreops.New(fakek8sclient.NewSimpleClientset())) + component.DeregisterAllComponents() + component.RegisterDisruptionBudgetComponent() + + driver := portworx{} + err = driver.Init(k8sClient, runtime.NewScheme(), record.NewFakeRecorder(0)) + require.NoError(t, err) + err = driver.PreInstall(cluster) + require.NoError(t, err) + + pdbList := &policyv1.PodDisruptionBudgetList{} + err = testutil.List(k8sClient, pdbList) + require.NoError(t, err) + require.Len(t, pdbList.Items, 4) + + // Testcase: Removing kubernetes node2 from the cluster should delete node PDB + err = testutil.Delete(k8sClient, &fakeK8sNodes.Items[1]) + require.NoError(t, err) + + err = driver.PreInstall(cluster) + require.NoError(t, err) + + pdbList = &policyv1.PodDisruptionBudgetList{} + err = testutil.List(k8sClient, pdbList) + require.NoError(t, err) + require.Len(t, pdbList.Items, 3) + + err = testutil.Get(k8sClient, &policyv1.PodDisruptionBudget{}, "px-node2", cluster.Namespace) + require.True(t, errors.IsNotFound(err)) + + // Testcase: Making node 3 non quorum member should delete node PDB + expectedNodeEnumerateResp.Nodes[2].NonQuorumMember = true + + err = driver.PreInstall(cluster) + require.NoError(t, err) + + pdbList = &policyv1.PodDisruptionBudgetList{} + err = testutil.List(k8sClient, pdbList) + require.NoError(t, err) + require.Len(t, pdbList.Items, 2) + + err = testutil.Get(k8sClient, &policyv1.PodDisruptionBudget{}, "px-node3", cluster.Namespace) + require.True(t, errors.IsNotFound(err)) + +} + func TestSCC(t *testing.T) { coreops.SetInstance(coreops.New(fakek8sclient.NewSimpleClientset())) reregisterComponents() diff --git a/drivers/storage/portworx/util/util.go b/drivers/storage/portworx/util/util.go index 7d4849808..23dab49bb 100644 --- a/drivers/storage/portworx/util/util.go +++ b/drivers/storage/portworx/util/util.go @@ -335,6 +335,9 @@ var ( // ConfigMapNameRegex regex of configMap. ConfigMapNameRegex = regexp.MustCompile("[^a-zA-Z0-9]+") + + // ParallelUpgradePDBVersion is the portworx version from which parallel upgrades is supported + ParallelUpgradePDBVersion, _ = version.NewVersion("3.1.2") ) func getStrippedClusterName(cluster *corev1.StorageCluster) string { @@ -1396,6 +1399,7 @@ func CountStorageNodes( cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k8sClient client.Client, + nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, ) (int, error) { nodeClient := api.NewOpenStorageNodeClient(sdkConn) ctx, err := SetupContextWithToken(context.Background(), cluster, k8sClient, false) @@ -1409,14 +1413,6 @@ func CountStorageNodes( clusterDomains, err := clusterDomainClient.Enumerate(ctx, &api.SdkClusterDomainsEnumerateRequest{}) isDRSetup = err == nil && len(clusterDomains.ClusterDomainNames) > 1 - nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters( - ctx, - &api.SdkNodeEnumerateWithFiltersRequest{}, - ) - if err != nil { - return -1, fmt.Errorf("failed to enumerate nodes: %v", err) - } - // Use the quorum member flag from the node enumerate response if all the nodes are upgraded to the // newer version. The Enumerate response could be coming from any node and we want to make sure that // we are not talking to an old node when enumerating. @@ -1948,3 +1944,78 @@ func ShouldUseClusterDomain(node *api.StorageNode) (bool, error) { } return true, nil } + +// Get list of storagenodes that are a part of the current cluster that need a node PDB +func NodesNeedingPDB(k8sClient client.Client, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, k8sNodeList *v1.NodeList) ([]string, error) { + + // Get list of kubernetes nodes that are a part of the current cluster + k8sNodesStoragePodCouldRun := make(map[string]bool) + for _, node := range k8sNodeList.Items { + k8sNodesStoragePodCouldRun[node.Name] = true + } + + // Create a list of nodes that are part of quorum to create node PDB for them + nodesNeedingPDB := make([]string, 0) + for _, node := range nodeEnumerateResponse.Nodes { + // Do not add node if its not part of quorum or is decomissioned + if node.Status == api.Status_STATUS_DECOMMISSION || node.NonQuorumMember { + logrus.Debugf("Node %s is not a quorum member or is decomissioned, skipping", node.Id) + continue + } + + if _, ok := k8sNodesStoragePodCouldRun[node.SchedulerNodeName]; ok { + nodesNeedingPDB = append(nodesNeedingPDB, node.SchedulerNodeName) + } + } + + return nodesNeedingPDB, nil +} + +// List of nodes that have an existing pdb but are no longer in k8s cluster or not a portworx storage node +func NodesToDeletePDB(k8sClient client.Client, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse, k8sNodeList *v1.NodeList) ([]string, error) { + // nodeCounts map is used to find the elements that are uncommon between list of k8s nodes in cluster + // and list of portworx storage nodes. Used to find nodes where PDB needs to be deleted + nodeCounts := make(map[string]int) + + // Increase count of each node + for _, node := range k8sNodeList.Items { + nodeCounts[node.Name]++ + } + + // Get list of storage nodes that are part of quorum and increment count of each node + nodesToDeletePDB := make([]string, 0) + for _, node := range nodeEnumerateResponse.Nodes { + if node.Status == api.Status_STATUS_DECOMMISSION || node.NonQuorumMember { + continue + } + nodeCounts[node.SchedulerNodeName]++ + } + + // Nodes with count 1 might have a node PDB but should not, so delete the PDB for such nodes + for node, count := range nodeCounts { + if count == 1 { + nodesToDeletePDB = append(nodesToDeletePDB, node) + } + } + return nodesToDeletePDB, nil + +} + +func ClusterSupportsParallelUpgrade(nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse) bool { + + for _, node := range nodeEnumerateResponse.Nodes { + if node.Status == api.Status_STATUS_DECOMMISSION { + continue + } + v := node.NodeLabels[NodeLabelPortworxVersion] + nodeVersion, err := version.NewVersion(v) + if err != nil { + logrus.Warnf("Failed to parse node version %s for node %s: %v", v, node.Id, err) + return false + } + if nodeVersion.LessThan(ParallelUpgradePDBVersion) { + return false + } + } + return true +}