Skip to content

Commit

Permalink
Checking node version instead of cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
svijaykumar-px committed Apr 3, 2024
1 parent e196d8f commit 45c0ae7
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 44 deletions.
33 changes: 12 additions & 21 deletions drivers/storage/portworx/component/disruption_budget.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ const (
DefaultKVDBClusterSize = 3
)

var (
// ParallelUpgradePDBVersion is the portworx version from which parallel upgrades is supported
ParallelUpgradePDBVersion, _ = version.NewVersion("3.1.2")
)

type disruptionBudget struct {
k8sClient client.Client
sdkConn *grpc.ClientConn
Expand Down Expand Up @@ -77,13 +72,17 @@ func (c *disruptionBudget) Reconcile(cluster *corev1.StorageCluster) error {
return err
}
// Create node PDB only if parallel upgrade is supported
clusterPXver := pxutil.GetPortworxVersion(cluster)
if clusterPXver.GreaterThanOrEqual(ParallelUpgradePDBVersion) {
if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef); err != nil {
var err error
c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace)
if err != nil {
return err
}
if pxutil.ClusterSupportsParallelUpgrade(cluster, c.sdkConn, c.k8sClient) {
if err := c.createPortworxNodePodDisruptionBudget(cluster, ownerRef, c.sdkConn); err != nil {
return err
}
} else {
if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef); err != nil {
if err := c.createPortworxPodDisruptionBudget(cluster, ownerRef, c.sdkConn); err != nil {
return err
}
}
Expand Down Expand Up @@ -114,6 +113,7 @@ func (c *disruptionBudget) MarkDeleted() {}
func (c *disruptionBudget) createPortworxPodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
sdkConn *grpc.ClientConn,
) error {
userProvidedMinValue, err := pxutil.MinAvailableForStoragePDB(cluster)
if err != nil {
Expand All @@ -122,12 +122,8 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget(
}

var minAvailable int
c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace)
if err != nil {
return err
}

storageNodesCount, err := pxutil.CountStorageNodes(cluster, c.sdkConn, c.k8sClient)
storageNodesCount, err := pxutil.CountStorageNodes(cluster, sdkConn, c.k8sClient)
if err != nil {
c.closeSdkConn()
return err
Expand Down Expand Up @@ -174,14 +170,9 @@ func (c *disruptionBudget) createPortworxPodDisruptionBudget(
func (c *disruptionBudget) createPortworxNodePodDisruptionBudget(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
sdkConn *grpc.ClientConn,
) error {

var err error
c.sdkConn, err = pxutil.GetPortworxConn(c.sdkConn, c.k8sClient, cluster.Namespace)
if err != nil {
return err
}
nodesNeedingPDB, err := pxutil.NodesNeedingPDB(cluster, c.sdkConn, c.k8sClient)
nodesNeedingPDB, err := pxutil.NodesNeedingPDB(cluster, sdkConn, c.k8sClient)
if err != nil {
return err
}
Expand Down
64 changes: 41 additions & 23 deletions drivers/storage/portworx/components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12519,7 +12519,7 @@ func TestPodDisruptionBudgetEnabled(t *testing.T) {

expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{
Nodes: []*osdapi.StorageNode{
{SchedulerNodeName: "node1"},
{SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
},
}

Expand Down Expand Up @@ -12607,8 +12607,8 @@ func TestPodDisruptionBudgetEnabled(t *testing.T) {

// TestCase: Do not create storage PDB if total nodes with storage is less than 3
expectedNodeEnumerateResp.Nodes = []*osdapi.StorageNode{
{Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1"},
{Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2"},
{Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
{Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
{Pools: []*osdapi.StoragePool{}},
{},
}
Expand Down Expand Up @@ -12636,11 +12636,11 @@ func TestPodDisruptionBudgetEnabled(t *testing.T) {
// Also, ignore the annotation if the value is an invalid integer
cluster.Annotations[pxutil.AnnotationStoragePodDisruptionBudget] = "invalid"
expectedNodeEnumerateResp.Nodes = []*osdapi.StorageNode{
{Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1"},
{Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2"},
{Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
{Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
{Pools: []*osdapi.StoragePool{}},
{},
{Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3"},
{Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
}

err = driver.PreInstall(cluster)
Expand All @@ -12659,11 +12659,11 @@ func TestPodDisruptionBudgetEnabled(t *testing.T) {
// Also, ignore the annotation if the value is an invalid integer
cluster.Annotations[pxutil.AnnotationStoragePodDisruptionBudget] = "still_invalid"
expectedNodeEnumerateResp.Nodes = []*osdapi.StorageNode{
{Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1"},
{Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2"},
{Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3"},
{Pools: []*osdapi.StoragePool{{ID: 4}}, SchedulerNodeName: "node4"},
{Pools: []*osdapi.StoragePool{{ID: 5}}, SchedulerNodeName: "node5"},
{Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
{Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
{Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
{Pools: []*osdapi.StoragePool{{ID: 4}}, SchedulerNodeName: "node4", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
{Pools: []*osdapi.StoragePool{{ID: 5}}, SchedulerNodeName: "node5", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
{Pools: []*osdapi.StoragePool{}},
{},
}
Expand Down Expand Up @@ -13282,7 +13282,7 @@ func TestPodDisruptionBudgetWithErrors(t *testing.T) {
mockNodeServer.EXPECT().
EnumerateWithFilters(gomock.Any(), gomock.Any()).
Return(nil, fmt.Errorf("NodeEnumerate error")).
Times(1)
AnyTimes()
cluster.Spec.Security.Enabled = false

err = driver.PreInstall(cluster)
Expand Down Expand Up @@ -13317,9 +13317,9 @@ func TestDisablePodDisruptionBudgets(t *testing.T) {

expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{
Nodes: []*osdapi.StorageNode{
{Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1"},
{Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2"},
{Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3"},
{Pools: []*osdapi.StoragePool{{ID: 1}}, SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
{Pools: []*osdapi.StoragePool{{ID: 2}}, SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
{Pools: []*osdapi.StoragePool{{ID: 3}}, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
},
}
mockNodeServer.EXPECT().
Expand Down Expand Up @@ -13427,11 +13427,13 @@ func TestNodePodDisruptionBudget(t *testing.T) {

testutil.SetupEtcHosts(t, sdkServerIP, pxutil.PortworxServiceName+".kube-test")
defer testutil.RestoreEtcHosts(t)

// PDB should not be created for non quorum members
expectedNodeEnumerateResp := &osdapi.SdkNodeEnumerateWithFiltersResponse{
Nodes: []*osdapi.StorageNode{
{SchedulerNodeName: "node1"},
{SchedulerNodeName: "node2"},
{NonQuorumMember: true, SchedulerNodeName: "node3"},
{SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}},
{SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}},
{NonQuorumMember: true, SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}},
},
}
mockNodeServer.EXPECT().
Expand Down Expand Up @@ -13483,21 +13485,37 @@ func TestNodePodDisruptionBudget(t *testing.T) {
err = driver.PreInstall(cluster)
require.NoError(t, err)

// PDB is created for 2 storage nodes and kvdb
pdbList := &policyv1.PodDisruptionBudgetList{}
err = testutil.List(k8sClient, pdbList)
require.NoError(t, err)
require.Len(t, pdbList.Items, 3)

storagePDB := &policyv1.PodDisruptionBudget{}
err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node1", cluster.Namespace)
require.NoError(t, err)
require.Equal(t, 1, storagePDB.Spec.MinAvailable.IntValue())
require.Equal(t, "node1", storagePDB.Spec.Selector.MatchLabels[constants.OperatorLabelNodeNameKey])

err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node2", cluster.Namespace)
require.NoError(t, err)
require.Equal(t, 1, storagePDB.Spec.MinAvailable.IntValue())
require.Equal(t, "node2", storagePDB.Spec.Selector.MatchLabels[constants.OperatorLabelNodeNameKey])

err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName+"-node3", cluster.Namespace)
fmt.Println(err)
require.Error(t, err, fmt.Errorf("poddisruptionbudgets.policy %s-node3 not found", component.StoragePodDisruptionBudgetName))

// Testcase : PDB per node should not be created for any node even if 1 node is lesser than 3.1.2
// Use cluster wide PDB in this case
expectedNodeEnumerateResp.Nodes = []*osdapi.StorageNode{
{SchedulerNodeName: "node1", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}},
{SchedulerNodeName: "node2", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.2"}},
{SchedulerNodeName: "node3", NodeLabels: map[string]string{pxutil.NodeLabelPortworxVersion: "3.1.1"}},
}
err = driver.PreInstall(cluster)
require.NoError(t, err)

storagePDB = &policyv1.PodDisruptionBudget{}
err = testutil.Get(k8sClient, storagePDB, component.StoragePodDisruptionBudgetName, cluster.Namespace)
require.NoError(t, err)
require.Equal(t, 2, storagePDB.Spec.MinAvailable.IntValue())

}

func TestSCC(t *testing.T) {
Expand Down
34 changes: 34 additions & 0 deletions drivers/storage/portworx/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ var (

// ConfigMapNameRegex regex of configMap.
ConfigMapNameRegex = regexp.MustCompile("[^a-zA-Z0-9]+")

// ParallelUpgradePDBVersion is the portworx version from which parallel upgrades is supported
ParallelUpgradePDBVersion, _ = version.NewVersion("3.1.2")
)

func getStrippedClusterName(cluster *corev1.StorageCluster) string {
Expand Down Expand Up @@ -1965,3 +1968,34 @@ func NodesNeedingPDB(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k

return nodesNeedingPDB, nil
}

func ClusterSupportsParallelUpgrade(cluster *corev1.StorageCluster, sdkConn *grpc.ClientConn, k8sClient client.Client) bool {
nodeClient := api.NewOpenStorageNodeClient(sdkConn)
ctx, err := SetupContextWithToken(context.Background(), cluster, k8sClient)
if err != nil {
return false
}
nodeEnumerateResponse, err := nodeClient.EnumerateWithFilters(
ctx,
&api.SdkNodeEnumerateWithFiltersRequest{},
)
if err != nil {
return false
}

for _, node := range nodeEnumerateResponse.Nodes {
if node.Status == api.Status_STATUS_DECOMMISSION {
continue
}
v := node.NodeLabels[NodeLabelPortworxVersion]
nodeVersion, err := version.NewVersion(v)
if err != nil {
logrus.Warnf("Failed to parse node version %s for node %s: %v", v, node.Id, err)
return false
}
if nodeVersion.LessThan(ParallelUpgradePDBVersion) {
return false
}
}
return true
}

0 comments on commit 45c0ae7

Please sign in to comment.