Skip to content

Commit

Permalink
making requested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
svijaykumar-px committed Apr 5, 2024
1 parent 9a52030 commit 9ce2a4a
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 70 deletions.
39 changes: 30 additions & 9 deletions drivers/storage/portworx/component/disruption_budget.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package component

import (
"context"
"fmt"
"math"
"strconv"

"github.com/hashicorp/go-version"
"github.com/libopenstorage/openstorage/api"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
policyv1 "k8s.io/api/policy/v1"
Expand Down Expand Up @@ -77,12 +80,27 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error {
if err != nil {
return err
}
if pxutil.ClusterSupportsParallelUpgrade(cluster, c.sdkConn, c.k8sClient) {
if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, c.sdkConn); err != nil {

// Get list of portworx storage nodes
nodeClient := api.NewOpenStorageNodeClient(c.sdkConn)
ctx, err := pxutil.SetupContextWithToken(context.Background(), cluster, c.k8sClient)
if err != nil {
return err
}
nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters(
ctx,
&api.SdkNodeEnumerateWithFiltersRequest{},
)
if err != nil {
return fmt.Errorf("failed to enumerate nodes: %v", err)
}

if pxutil.ClusterSupportsParallelUpgrade(nodeEnumerateResponse) {
if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil {
return err
}
} else {
if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef, c.sdkConn); err != nil {
if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil {
return err
}
}
Expand Down Expand Up @@ -113,7 +131,7 @@ func (c *disruptionBudget) MarkDeleted() {}
func (c *disruptionBudget) createPortworxPodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
sdkConn *grpc.ClientConn,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
) error {
userProvidedMinValue, err := pxutil.MinAvailableForStoragePDB(cluster)
if err != nil {
Expand All @@ -123,7 +141,7 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget(

var minAvailable int

storageNodesCount, err := pxutil.CountStorageNodes(cluster, sdkConn, c.k8sClient)
storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient, nodeEnumerateResponse)
if err != nil {
c.closeSdkConn()
return err
Expand Down Expand Up @@ -170,15 +188,15 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget(
func (c *disruptionBudget) createPortworxNodePodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
sdkConn *grpc.ClientConn,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
) error {
nodesNeedingPDB, err := pxutil.NodesNeedingPDB(cluster, sdkConn, c.k8sClient)
nodesNeedingPDB, err := pxutil.NodesNeedingPDB(c.k8sClient, nodeEnumerateResponse)
if err != nil {
return err
}
for _, node := range nodesNeedingPDB {
minAvailable := intstr.FromInt(1)
PDBName := "px-storage-" + node
PDBName := "px-" + node
pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: PDBName,
Expand All @@ -196,6 +214,10 @@ func (c *disruptionBudget) createPortworxNodePodDisruptionBudget(
},
}
err = k8sutil.CreateOrUpdatePodDisruptionBudget(c.k8sClient, pdb, ownerRef)
if err != nil {
logrus.Warnf("Failed to create PDB for node %s: %v", node, err)
break
}
}
return err

Expand All @@ -209,7 +231,6 @@ func (c *disruptionBudget) createKVDBPodDisruptionBudget(
if cluster.Spec.Kvdb != nil && !cluster.Spec.Kvdb.Internal {
return nil
}

clusterSize := kvdbClusterSize(cluster)
minAvailable := intstr.FromInt(clusterSize - 1)
pdb := &policyv1.PodDisruptionBudget{
Expand Down
14 changes: 9 additions & 5 deletions drivers/storage/portworx/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13471,12 +13471,14 @@ func TestNodePodDisruptionBudget(t *testing.T) {
testutil.SetupEtcHosts(t, sdkServerIP, pxutil.PortworxServiceName+".kube-test")
defer testutil.RestoreEtcHosts(t)

// PDB should not be created for non quorum members
// PDB should not be created for non quorum members, nodes not part of current cluster and nodes in decommissioned state
expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{
Nodes: []*osdapi.StorageNode{
{SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}},
{SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}},
{NonQuorumMember: true, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}},
{SchedulerNodeName: "node4", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}},
{NonQuorumMember: false, Status: osdapi.Status_STATUS_DECOMMISSION},
},
}
mockNodeServer.EXPECT().
Expand All @@ -13499,6 +13501,7 @@ func TestNodePodDisruptionBudget(t *testing.T) {
fakeK8sNodes := &v1.NodeList{Items: []v1.Node{
{ObjectMeta: metav1.ObjectMeta{Name: "node1"}},
{ObjectMeta: metav1.ObjectMeta{Name: "node2"}},
{ObjectMeta: metav1.ObjectMeta{Name: "node3"}},
},
}

Expand Down Expand Up @@ -13535,14 +13538,15 @@ func TestNodePodDisruptionBudget(t *testing.T) {
require.Len(t, pdbList.Items, 3)

storagePDB := &policyv1.PodDisruptionBudget{}
err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node1", cluster.Namespace)
err = testutil.Get(k8sClient, storagePDB, "px-node1", cluster.Namespace)
require.NoError(t, err)
require.Equal(t, 1, storagePDB.Spec.MinAvailable.IntValue())
require.Equal(t, "node1", storagePDB.Spec.Selector.MatchLabels[constants.OperatorLabelNodeNameKey])

err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node3", cluster.Namespace)
fmt.Println(err)
require.Error(t, err, fmt.Errorf("poddisruptionbudgets.policy %s-node3 not found", component.StoragePodDisruptionBudgetName))
err = testutil.Get(k8sClient, storagePDB, "px-node3", cluster.Namespace)
require.True(t, errors.IsNotFound(err))
err = testutil.Get(k8sClient, storagePDB, "px-node4", cluster.Namespace)
require.True(t, errors.IsNotFound(err))

// Testcase : PDB per node should not be created for any node even if 1 node is lesser than 3.1.2
// Use cluster wide PDB in this case
Expand Down
63 changes: 7 additions & 56 deletions drivers/storage/portworx/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,7 @@ func CountStorageNodes(
cluster *corev1.StorageCluster,
sdkConn *grpc.ClientConn,
k8sClient client.Client,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
) (int, error) {
nodeClient := api.NewOpenStorageNodeClient(sdkConn)
ctx, err := SetupContextWithToken(context.Background(), cluster, k8sClient)
Expand All @@ -1401,14 +1402,6 @@ func CountStorageNodes(
clusterDomains, err := clusterDomainClient.Enumerate(ctx, &api.SdkClusterDomainsEnumerateRequest{})
isDRSetup = err == nil && len(clusterDomains.ClusterDomainNames) > 1

nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters(
ctx,
&api.SdkNodeEnumerateWithFiltersRequest{},
)
if err != nil {
return -1, fmt.Errorf("failed to enumerate nodes: %v", err)
}

// Use the quorum member flag from the node enumerate response if all the nodes are upgraded to the
// newer version. The Enumerate response could be coming from any node and we want to make sure that
// we are not talking to an old node when enumerating.
Expand Down Expand Up @@ -1911,37 +1904,19 @@ func IsK3sClusterExt(ext string) bool {
}

// Get list of storagenodes that are a part of the current cluster that need a node PDB
func NodesNeedingPDB(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k8sClient client.Client) ([]string, error) {
// Get the list of storage nodes
nodeClient := api.NewOpenStorageNodeClient(sdkConn)
ctx, err := SetupContextWithToken(context.Background(), cluster, k8sClient)
if err != nil {
return nil, err
}

nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters(
ctx,
&api.SdkNodeEnumerateWithFiltersRequest{},
)
if err != nil {
return nil, fmt.Errorf("failed to enumerate nodes: %v", err)
}
func NodesNeedingPDB(k8sClient client.Client, nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse) ([]string, error) {

// Get the list of k8s nodes that are part of the current cluster
k8sNodesStoragePodCouldRun := make(map[string]bool)
k8sNodeList := &v1.NodeList{}
err = k8sClient.List(context.TODO(), k8sNodeList)
err := k8sClient.List(context.TODO(), k8sNodeList)
if err != nil {
return nil, err
}

// Get list of kubernetes nodes that are a part of the current cluster
for _, node := range k8sNodeList.Items {
shouldRun, shouldContinueRunning, err := k8sutil.CheckPredicatesForStoragePod(&node, cluster, nil)
if err != nil {
return nil, err
}
if shouldRun || shouldContinueRunning {
k8sNodesStoragePodCouldRun[node.Name] = true
}
k8sNodesStoragePodCouldRun[node.Name] = true
}

// Create a list of nodes that are part of quorum to create node PDB for them
Expand All @@ -1953,18 +1928,6 @@ func NodesNeedingPDB(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k
continue
}

if node.SchedulerNodeName == "" {
k8sNode, err := coreops.Instance().SearchNodeByAddresses(
[]string{node.DataIp, node.MgmtIp, node.Hostname},
)
if err != nil {
// In Metro-DR setup, this could be expected.
logrus.Infof("Unable to find kubernetes node name for nodeID %v: %v", node.Id, err)
continue
}
node.SchedulerNodeName = k8sNode.Name
}

if _, ok := k8sNodesStoragePodCouldRun[node.SchedulerNodeName]; ok {
nodesNeedingPDB = append(nodesNeedingPDB, node.SchedulerNodeName)
}
Expand All @@ -1973,19 +1936,7 @@ func NodesNeedingPDB(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k
return nodesNeedingPDB, nil
}

func ClusterSupportsParallelUpgrade(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k8sClient client.Client) bool {
nodeClient := api.NewOpenStorageNodeClient(sdkConn)
ctx, err := SetupContextWithToken(context.Background(), cluster, k8sClient)
if err != nil {
return false
}
nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters(
ctx,
&api.SdkNodeEnumerateWithFiltersRequest{},
)
if err != nil {
return false
}
func ClusterSupportsParallelUpgrade(nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse) bool {

for _, node := range nodeEnumerateResponse.Nodes {
if node.Status == api.Status_STATUS_DECOMMISSION {
Expand Down

0 comments on commit 9ce2a4a

Please sign in to comment.