Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PWX-36475 : Creating node PDB for parallel upgrades #1508

Merged
merged 10 commits into from
May 27, 2024
119 changes: 112 additions & 7 deletions drivers/storage/portworx/component/disruption_budget.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package component

import (
"context"
"fmt"
"math"
"strconv"

"github.com/hashicorp/go-version"
"github.com/libopenstorage/openstorage/api"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -19,6 +23,7 @@ import (
"github.com/libopenstorage/operator/pkg/constants"
"github.com/libopenstorage/operator/pkg/util"
k8sutil "github.com/libopenstorage/operator/pkg/util/k8s"
v1 "k8s.io/api/core/v1"
)

const (
Expand Down Expand Up @@ -71,9 +76,46 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error {
if err := c.createKVDBPodDisruptionBudget(cluster, ownerRef); err != nil {
return err
}
if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef); err != nil {
// Create node PDB only if parallel upgrade is supported
var err error
c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace)
if err != nil {
return err
}

// Get list of portworx storage nodes
nodeClient := api.NewOpenStorageNodeClient(c.sdkConn)
ctx, err := pxutil.SetupContextWithToken(context.Background(), cluster, c.k8sClient)
if err != nil {
return err
}
nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters(
ctx,
&api.SdkNodeEnumerateWithFiltersRequest{},
)
if err != nil {
return fmt.Errorf("failed to enumerate nodes: %v", err)
}

if pxutil.ClusterSupportsParallelUpgrade(nodeEnumerateResponse) {
// Get the list of k8s nodes that are part of the current cluster
k8sNodeList := &v1.NodeList{}
err = c.k8sClient.List(context.TODO(), k8sNodeList)
if err != nil {
return err
}
if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil {
return err
}
if err := c.deletePortworxNodePodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse, k8sNodeList); err != nil {
return err
}
} else {
if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef, nodeEnumerateResponse); err != nil {
return err
}
}

return nil
}

Expand All @@ -100,6 +142,7 @@ func (c *disruptionBudget) MarkDeleted() {}
func (c *disruptionBudget) createPortworxPodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
) error {
userProvidedMinValue, err := pxutil.MinAvailableForStoragePDB(cluster)
if err != nil {
Expand All @@ -108,12 +151,8 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget(
}

var minAvailable int
c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace)
if err != nil {
return err
}

storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient)
storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient, nodeEnumerateResponse)
if err != nil {
c.closeSdkConn()
return err
Expand Down Expand Up @@ -157,6 +196,73 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget(
return err
}

func (c *disruptionBudget) createPortworxNodePodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
k8sNodeList *v1.NodeList,
) error {
nodesNeedingPDB, err := pxutil.NodesNeedingPDB(c.k8sClient, nodeEnumerateResponse, k8sNodeList)
if err != nil {
return err
}
errors := []error{}
for _, node := range nodesNeedingPDB {
minAvailable := intstr.FromInt(1)
PDBName := "px-" + node
Copy link
Contributor

@piyush-nimbalkar piyush-nimbalkar Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: internal variables should have lower case.

*pdbName

pdb := &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: PDBName,
Namespace: cluster.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
constants.LabelKeyClusterName: cluster.Name,
constants.OperatorLabelNodeNameKey: node,
},
},
},
}
err = k8sutil.CreateOrUpdatePodDisruptionBudget(c.k8sClient, pdb, ownerRef)
if err != nil {
logrus.Warnf("Failed to create PDB for node %s: %v", node, err)
errors = append(errors, err)
}
}
return utilerrors.NewAggregate(errors)

}

// Delete node pod disruption budget when kubertetes is not part of cluster or portworx does not run on it
func (c *disruptionBudget) deletePortworxNodePodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
nodeEnumerateResponse *api.SdkNodeEnumerateWithFiltersResponse,
k8sNodeList *v1.NodeList,
) error {
nodesToDeletePDB, err := pxutil.NodesToDeletePDB(c.k8sClient, nodeEnumerateResponse, k8sNodeList)
if err != nil {
return err
}
errors := []error{}

for _, node := range nodesToDeletePDB {
PDBName := "px-" + node
err = k8sutil.DeletePodDisruptionBudget(
c.k8sClient, PDBName,
cluster.Namespace, *ownerRef,
)
if err != nil {
logrus.Warnf("Failed to delete PDB for node %s: %v", node, err)
errors = append(errors, err)
}
}
return utilerrors.NewAggregate(errors)
}

func (c *disruptionBudget) createKVDBPodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
Expand All @@ -165,7 +271,6 @@ func (c *disruptionBudget) createKVDBPodDisruptionBudget(
if cluster.Spec.Kvdb != nil && !cluster.Spec.Kvdb.Internal {
return nil
}

clusterSize := kvdbClusterSize(cluster)
minAvailable := intstr.FromInt(clusterSize - 1)
pdb := &policyv1.PodDisruptionBudget{
Expand Down
Loading
Loading