Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PWX-36514: Integration test to validate node pdb #1600

Merged
merged 5 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 139 additions & 8 deletions pkg/util/test/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ var (
opVer23_10_2, _ = version.NewVersion("23.10.2-")
OpVer23_10_3, _ = version.NewVersion("23.10.3-")
opVer24_1_0, _ = version.NewVersion("24.1.0-")
opVer24_2_0, _ = version.NewVersion("24.2.0-")

minOpVersionForKubeSchedConfig, _ = version.NewVersion("1.10.2-")
minimumCcmGoVersionCO, _ = version.NewVersion("1.2.3")
Expand All @@ -227,9 +228,10 @@ var (
// OCP Dynamic Plugin is only supported in starting with OCP 4.12+ which is k8s v1.25.0+
minK8sVersionForDynamicPlugin, _ = version.NewVersion("1.25.0")

pxVer2_13, _ = version.NewVersion("2.13")
pxVer3_0, _ = version.NewVersion("3.0")
pxVer3_1, _ = version.NewVersion("3.1")
pxVer2_13, _ = version.NewVersion("2.13")
pxVer3_0, _ = version.NewVersion("3.0")
pxVer3_1, _ = version.NewVersion("3.1")
pxVer3_1_2, _ = version.NewVersion("3.1.2")

// minimumPxVersionCCMJAVA minimum PX version to install ccm-java
minimumPxVersionCCMJAVA, _ = version.NewVersion("2.8")
Expand Down Expand Up @@ -5337,7 +5339,7 @@ func ValidateTelemetryV1Enabled(pxImageList map[string]string, cluster *corev1.S

// ValidatePodDisruptionBudget validates the value of minavailable and number of disruptions for px-storage poddisruptionbudget
func ValidatePodDisruptionBudget(cluster *corev1.StorageCluster, timeout, interval time.Duration) error {
logrus.Info("Validate px-storage poddisruptionbudget minAvailable and allowed disruptions")
logrus.Info("Validate portworx storage poddisruptionbudget")

kbVer, err := GetK8SVersion()
if err != nil {
Expand All @@ -5348,10 +5350,49 @@ func ValidatePodDisruptionBudget(cluster *corev1.StorageCluster, timeout, interv
if err != nil {
return err
}
pxVersion := GetPortworxVersion(cluster)

// PodDisruptionBudget is supported for k8s version greater than or equal to 1.21 and operator version greater than or equal to 1.5.0
// Changing opVersion to 23.10.0 for PTX-23350 | TODO: add better logic with PTX-23407
if k8sVersion.GreaterThanOrEqual(minSupportedK8sVersionForPdb) && opVersion.GreaterThanOrEqual(opVer23_10) {

// Smart and parallel upgrades is supported from px version 3.1.2 and operator version 24.2.0
if k8sVersion.GreaterThanOrEqual(minSupportedK8sVersionForPdb) && opVersion.GreaterThanOrEqual(opVer24_2_0) && pxVersion.GreaterThanOrEqual(pxVer3_1_2) {
t := func() (interface{}, bool, error) {
nodes, err := operatorops.Instance().ListStorageNodes(cluster.Namespace)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are these going to be PX StorageNodes (kubectl get sn) or kubernetes nodes (kubectl get nodes)?
what will be the output of this ?

if err != nil {
return nil, true, fmt.Errorf("failed to get storage nodes, Err: %v", err)
}
availableNodes := 0
for _, node := range nodes.Items {
if *node.Status.NodeAttributes.Storage {
if node.Status.Phase == "Online" {
availableNodes++
} else {
logrus.Infof("Node %s is in state [%s], PDB might be incorrect", node.Name, node.Status.Phase)
}
}
}
pdbs, err := policyops.Instance().ListPodDisruptionBudget(cluster.Namespace)
if err != nil {
return nil, true, fmt.Errorf("failed to list all poddisruptionbudgets, Err: %v", err)
}
actualNodePDBCount := 0
for _, pdb := range pdbs.Items {
if strings.HasPrefix(pdb.Name, "px-") && pdb.Name != "px-kvdb" {
actualNodePDBCount++
}
}
if actualNodePDBCount == availableNodes {
return nil, false, nil
}
return nil, true, fmt.Errorf("incorrect node PDB count. Expected node PDB count [%d], Actual node PDB count [%d]", availableNodes, actualNodePDBCount)

}
if _, err := task.DoRetryWithTimeout(t, timeout, interval); err != nil {
return err
}
return nil
} else if k8sVersion.GreaterThanOrEqual(minSupportedK8sVersionForPdb) && opVersion.GreaterThanOrEqual(opVer23_10) {
// This is only for non async DR setup
t := func() (interface{}, bool, error) {

Expand All @@ -5361,16 +5402,16 @@ func ValidatePodDisruptionBudget(cluster *corev1.StorageCluster, timeout, interv
}

nodeslen := 0
availablenodes := 0
availableNodes := 0
for _, node := range nodes.Items {
if *node.Status.NodeAttributes.Storage {
nodeslen++
if node.Status.Phase == "Online" {
availablenodes++
availableNodes++
}
}
}
nodesUnavailable := nodeslen - availablenodes
nodesUnavailable := nodeslen - availableNodes
// Skip PDB validation for px-storage if number of storage nodes is lesser than or equal to 2
if nodeslen <= 2 {
logrus.Infof("Storage PDB does not exist for storage nodes lesser than or equal to 2, skipping PDB validattion")
Expand Down Expand Up @@ -5948,3 +5989,93 @@ func RestoreEtcHosts(t *testing.T) {
assert.Equal(t, bb.Len(), n, "short write")
fd.Close()
}

func ValidateNodePDB(cluster *corev1.StorageCluster, timeout, interval time.Duration) error {
t := func() (interface{}, bool, error) {
nodes, err := coreops.Instance().GetNodes()
if err != nil {
return nil, true, fmt.Errorf("failed to get k8s nodes, Err: %v", err)
}
nodesPDBMap := make(map[string]bool)

pdbs, err := policyops.Instance().ListPodDisruptionBudget(cluster.Namespace)
if err != nil {
return nil, true, fmt.Errorf("failed to get px-storage poddisruptionbudget, Err: %v", err)
}

for _, pdb := range pdbs.Items {
if strings.HasPrefix(pdb.Name, "px-") && pdb.Name != "px-kvdb" {
nodesPDBMap[pdb.Name] = true
if pdb.Spec.MinAvailable.IntValue() != 1 {
return nil, true, fmt.Errorf("incorrect PDB minAvailable value for node %s. Expected PDB [%d], Actual PDB [%d]", strings.TrimPrefix(pdb.Name, "px-"), 1, pdb.Spec.MinAvailable.IntValue())
}
}
}
// create map of storage nodes as well
storagenodes, err := operatorops.Instance().ListStorageNodes(cluster.Namespace)
if err != nil {
return nil, true, fmt.Errorf("failed to get storage nodes, Err: %v", err)
}
storageNodesMap := make(map[string]bool)
for _, node := range storagenodes.Items {
if *node.Status.NodeAttributes.Storage {
storageNodesMap[node.Name] = true
}
}

for _, node := range nodes.Items {
if coreops.Instance().IsNodeMaster(node) {
continue
}
if _, ok := nodesPDBMap["px-"+node.Name]; !ok {
// return error only if the k8s node has a storage node in it
if _, ok := storageNodesMap[node.Name]; ok {
return nil, true, fmt.Errorf("PDB for node %s is missing", node.Name)
}
}
}
return nil, false, nil
}
if _, err := task.DoRetryWithTimeout(t, timeout, interval); err != nil {
return err
}
return nil
}

func ValidateNodesSelectedForUpgrade(cluster *corev1.StorageCluster, minAvailable int, timeout, interval time.Duration) error {
t := func() (interface{}, bool, error) {
nodes, err := operatorops.Instance().ListStorageNodes(cluster.Namespace)
if err != nil {
return nil, true, fmt.Errorf("failed to get storage nodes, Err: %v", err)
}
totalStorageNodes := 0
for _, node := range nodes.Items {
if *node.Status.NodeAttributes.Storage {
totalStorageNodes++
}
}
if minAvailable == -1 {
// Setting minAvailable to quorum value
minAvailable = (totalStorageNodes / 2) + 1
}

pdbs, err := policyops.Instance().ListPodDisruptionBudget(cluster.Namespace)
if err != nil {
return nil, true, fmt.Errorf("failed to get px-storage poddisruptionbudget, Err: %v", err)
}
nodesReadyForUpgrade := 0
for _, pdb := range pdbs.Items {
if strings.HasPrefix(pdb.Name, "px-") && pdb.Spec.MinAvailable.IntValue() == 0 {
nodesReadyForUpgrade++
}
}
if nodesReadyForUpgrade <= (totalStorageNodes - minAvailable) {
return nil, false, nil
}
return nil, true, fmt.Errorf("nodes available for upgrade [%d] are more than expected [%d]", nodesReadyForUpgrade, totalStorageNodes-minAvailable)
}
if _, err := task.DoRetryWithTimeout(t, timeout, interval); err != nil {
return err
}
return nil
}
52 changes: 19 additions & 33 deletions test/integration_test/bluegreen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ package integrationtest
import (
"bytes"
"fmt"
"io"

"os"
"sort"
"strconv"
Expand Down Expand Up @@ -131,13 +131,13 @@ var bgTestCases = []types.TestCase{

logrus.Infof("Attempt license expand on Trial via node %s", pl.Items[0].Spec.NodeName)
var stdout, stderr bytes.Buffer
err = runInPortworxPod(&pl.Items[0],
err = ci_utils.RunInPortworxPod(&pl.Items[0],
nil, &stdout, &stderr,
"/bin/sh", "-c", "/opt/pwx/bin/pxctl license trial; exec /opt/pwx/bin/pxctl license expand --start")
require.Contains(t, stdout.String(), " not supported for Trial licenses")

logrus.Infof("Installing license via node %s", pl.Items[0].Spec.NodeName)
err = runInPortworxPod(&pl.Items[0],
err = ci_utils.RunInPortworxPod(&pl.Items[0],
bytes.NewReader([]byte(crippledTestLicense)), &stdout, &stderr,
"/bin/sh", "-c", "base64 -d | /opt/pwx/bin/pxctl license add /dev/stdin")
require.Equal(t, "", stderr.String())
Expand All @@ -146,7 +146,7 @@ var bgTestCases = []types.TestCase{

logrus.Infof("Renstalling license via node %s", pl.Items[2].Spec.NodeName)
stdout.Reset()
err = runInPortworxPod(&pl.Items[2],
err = ci_utils.RunInPortworxPod(&pl.Items[2],
bytes.NewReader([]byte(crippledTestLicense)), &stdout, &stderr,
"/bin/sh", "-c", "base64 -d | /opt/pwx/bin/pxctl license add /dev/stdin")
require.Equal(t, "", stderr.String())
Expand All @@ -157,7 +157,7 @@ var bgTestCases = []types.TestCase{
for _, p := range pl.Items {
stdout.Reset()
stderr.Reset()
err = runInPortworxPod(&p, nil, &stdout, &stderr, "/opt/pwx/bin/pxctl", "license", "list")
err = ci_utils.RunInPortworxPod(&p, nil, &stdout, &stderr, "/opt/pwx/bin/pxctl", "license", "list")
require.Equal(t, "", stderr.String(),
"unexpected STDERR on node %s", p.Spec.NodeName)
require.Contains(t, stdout.String(), "PX-Enterprise Torpedo_TEST_license",
Expand Down Expand Up @@ -262,7 +262,7 @@ var bgTestCases = []types.TestCase{

logrus.Infof("Extending license via node %s", pl.Items[0].Spec.NodeName)
var stdout, stderr bytes.Buffer
err = runInPortworxPod(&pl.Items[0], nil, &stdout, &stderr,
err = ci_utils.RunInPortworxPod(&pl.Items[0], nil, &stdout, &stderr,
"/opt/pwx/bin/pxctl", "license", "expand", "--start")
require.NoError(t, err)
require.Contains(t, stdout.String(), "Successfully initiated license extension")
Expand All @@ -271,7 +271,7 @@ var bgTestCases = []types.TestCase{
for _, p := range pl.Items {
stdout.Reset()
stderr.Reset()
err = runInPortworxPod(&p, nil, &stdout, &stderr, "/opt/pwx/bin/pxctl", "status")
err = ci_utils.RunInPortworxPod(&p, nil, &stdout, &stderr, "/opt/pwx/bin/pxctl", "status")
assert.Empty(t, stderr.String())
assert.Contains(t, stdout.String(), "NOTICE: License extension expires in ",
"unexpected STDOUT @%s", p.Spec.NodeName)
Expand Down Expand Up @@ -304,19 +304,19 @@ var bgTestCases = []types.TestCase{
tmpVolName := "testVol" + tmpSuffix

logrus.Infof("Attempt volume creation on %s", lastPOD.Spec.NodeName)
err = runInPortworxPod(&lastPOD, nil, &stdout, &stderr,
err = ci_utils.RunInPortworxPod(&lastPOD, nil, &stdout, &stderr,
"/opt/pwx/bin/pxctl", "volume", "create", "--repl", "1", "--size", "3", tmpVolName)
require.Contains(t, stdout.String(), "Volume successfully created")
require.NoError(t, err)

logrus.Infof("Attempt volume snapshot on %s", lastPOD.Spec.NodeName)
err = runInPortworxPod(&lastPOD, nil, &stdout, &stderr,
err = ci_utils.RunInPortworxPod(&lastPOD, nil, &stdout, &stderr,
"/opt/pwx/bin/pxctl", "volume", "snapshot", "create", "--name", "snap"+tmpSuffix, tmpVolName)
require.Contains(t, stdout.String(), "Volume snap successful")
require.NoError(t, err)

logrus.Infof("Cleaning up volume / snapshot on %s", lastPOD.Spec.NodeName)
err = runInPortworxPod(&lastPOD, nil, &stdout, &stderr,
err = ci_utils.RunInPortworxPod(&lastPOD, nil, &stdout, &stderr,
"/bin/sh", "-c", "/opt/pwx/bin/pxctl v delete --force snap"+tmpSuffix+
"; /opt/pwx/bin/pxctl v delete --force "+tmpVolName)
require.Contains(t, stdout.String(), "Volume snap"+tmpSuffix+" successfully deleted")
Expand Down Expand Up @@ -401,7 +401,7 @@ var bgTestCases = []types.TestCase{
for _, p := range pl.Items {
stdout.Reset()
stderr.Reset()
err = runInPortworxPod(&p, nil, &stdout, &stderr, "/opt/pwx/bin/pxctl", "status")
err = ci_utils.RunInPortworxPod(&p, nil, &stdout, &stderr, "/opt/pwx/bin/pxctl", "status")
assert.Empty(t, stderr.String())
assert.Contains(t, stdout.String(), "NOTICE: License extension expires in ",
"unexpected STDOUT @%s", p.Spec.NodeName)
Expand All @@ -424,7 +424,7 @@ var bgTestCases = []types.TestCase{

logrus.Infof("Attempt license reinstall")
var stdout, stderr bytes.Buffer
err = runInPortworxPod(&pl.Items[0], bytes.NewReader([]byte(crippledTestLicense)), &stdout, &stderr,
err = ci_utils.RunInPortworxPod(&pl.Items[0], bytes.NewReader([]byte(crippledTestLicense)), &stdout, &stderr,
"/bin/sh", "-c", "base64 -d | /opt/pwx/bin/pxctl license add /dev/stdin")
require.Equal(t, "", stderr.String())
require.Contains(t, strings.ToLower(stdout.String()),
Expand All @@ -446,7 +446,7 @@ var bgTestCases = []types.TestCase{

logrus.Infof("End license extension while cluster over-allocated")
var stdout, stderr bytes.Buffer
err = runInPortworxPod(&pl.Items[0], nil, &stdout, &stderr,
err = ci_utils.RunInPortworxPod(&pl.Items[0], nil, &stdout, &stderr,
"/opt/pwx/bin/pxctl", "license", "expand", "--end")
assert.Equal(t, "", stderr.String())
assert.Contains(t, stdout.String(), "Successfully turned off license extension")
Expand All @@ -456,7 +456,7 @@ var bgTestCases = []types.TestCase{
for _, p := range pl.Items {
stdout.Reset()
stderr.Reset()
err = runInPortworxPod(&p, nil, &stdout, &stderr, "/opt/pwx/bin/pxctl", "license", "ls")
err = ci_utils.RunInPortworxPod(&p, nil, &stdout, &stderr, "/opt/pwx/bin/pxctl", "license", "ls")
assert.Equal(t, "", stderr.String(),
"did no expect errors @%s", p.Spec.NodeName)
assert.Contains(t, stdout.String(), "ERROR: too many nodes in the cluster",
Expand Down Expand Up @@ -585,7 +585,7 @@ var bgTestCases = []types.TestCase{

// get NodeID for the wiped node
var stdout, stderr bytes.Buffer
err = runInPortworxPod(&pl.Items[0], nil, &stdout, &stderr,
err = ci_utils.RunInPortworxPod(&pl.Items[0], nil, &stdout, &stderr,
"/bin/sh", "-c", "/opt/pwx/bin/pxctl status | grep "+lastNode+" | head -1 | awk '{print $2}'")
lastNodeID := strings.Trim(stdout.String(), "\r\n\t ")
require.NoError(t, err)
Expand All @@ -595,7 +595,7 @@ var bgTestCases = []types.TestCase{
_, err = task.DoRetryWithTimeout(
func() (interface{}, bool, error) {
var stdout, stderr bytes.Buffer
runInPortworxPod(&pl.Items[0], nil, &stdout, &stderr,
ci_utils.RunInPortworxPod(&pl.Items[0], nil, &stdout, &stderr,
"/opt/pwx/bin/pxctl", "cluster", "delete", lastNodeID)
if strings.Contains(stdout.String(), " successfully deleted.") {
logrus.Debugf("Node %s successfully decomissioned", lastNode)
Expand All @@ -613,7 +613,7 @@ var bgTestCases = []types.TestCase{
logrus.Infof("Checking PX status on all nodes")
for _, p := range pl.Items[:len(pl.Items)-1] {
var stdout, stderr bytes.Buffer
err = runInPortworxPod(&p, nil, &stdout, &stderr, "/opt/pwx/bin/pxctl", "status")
err = ci_utils.RunInPortworxPod(&p, nil, &stdout, &stderr, "/opt/pwx/bin/pxctl", "status")
require.NoError(t, err, "unexpected error @%s", p.Spec.NodeName)
require.Contains(t, stdout.String(), "License: PX-Enterprise Torpedo_TEST_license (expires in ",
"unexpected content @%s", p.Spec.NodeName)
Expand All @@ -624,27 +624,13 @@ var bgTestCases = []types.TestCase{
},
}

func runInPortworxPod(pod *v1.Pod, in io.Reader, out, err io.Writer, command ...string) error {
if pod == nil || len(command) <= 0 {
return os.ErrInvalid
}

if logrus.IsLevelEnabled(logrus.DebugLevel) {
logrus.Debugf("run on %s via %s: `%s`", pod.Spec.NodeName, pod.Name, strings.Join(command, " "))
}

return coreops.Instance().RunCommandInPodEx(&coreops.RunCommandInPodExRequest{
command, pod.Name, "portworx", pod.Namespace, false, in, out, err,
})
}

func wipeNodeRunningPod(pod *v1.Pod) error {
if pod == nil {
return os.ErrInvalid
}
logrus.Debugf("Wiping PX on node %s using POD %s", pod.Spec.NodeName, pod.Name)
var stdout, stderr bytes.Buffer
err := runInPortworxPod(pod, nil, &stdout, &stderr,
err := ci_utils.RunInPortworxPod(pod, nil, &stdout, &stderr,
"nsenter", "--mount=/host_proc/1/ns/mnt", "--", "/bin/sh", "-c", "pxctl sv nw --all")
if err != nil {
return fmt.Errorf("node-wipe failed: %s (%s)", err,
Expand Down Expand Up @@ -677,7 +663,7 @@ func taskWaitPxctlStatus(t *testing.T, nodeName, podName, expectedOutput string)

// run `pxctl status` -- compare output
var stdout, stderr bytes.Buffer
runInPortworxPod(monitoredPod, nil, &stdout, &stderr, "/opt/pwx/bin/pxctl", "status")
ci_utils.RunInPortworxPod(monitoredPod, nil, &stdout, &stderr, "/opt/pwx/bin/pxctl", "status")
s := strings.Trim(stdout.String(), "\r\n ")
if strings.Contains(s, expectedOutput) {
logrus.Infof("'pxctl status' @%s got expected %q", nodeName, expectedOutput)
Expand Down
Loading
Loading