Skip to content

Commit

Permalink
Merge branch 'master' into PWX-36514
Browse files Browse the repository at this point in the history
  • Loading branch information
svijaykumar-px authored Jul 24, 2024
2 parents 94f4928 + b38c1e7 commit 51a888f
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 72 deletions.
99 changes: 64 additions & 35 deletions drivers/storage/portworx/component/portworx_basic.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package component

import (
"bytes"
"context"
"fmt"
"os"
Expand All @@ -19,7 +20,6 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/apis/core"
"sigs.k8s.io/controller-runtime/pkg/client"

pxutil "github.com/libopenstorage/operator/drivers/storage/portworx/util"
Expand All @@ -46,6 +46,7 @@ const (

var (
defaultPxSaTokenExpirationSeconds = int64(12 * 60 * 60)
rootCaCrtPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
)

type portworxBasic struct {
Expand Down Expand Up @@ -557,23 +558,23 @@ func (c *portworxBasic) createAndMaintainPxSaTokenSecret(cluster *corev1.Storage
return err
}
}
needRefresh, err := isTokenRefreshRequired(secret)
caCrtUpdated, err := updateCaCrtIfNeeded(secret)
if err != nil {
return err
}
if needRefresh {
if err := c.refreshTokenSecret(secret, cluster, ownerRef); err != nil {
return fmt.Errorf("failed to refresh the token secret for px container: %w", err)
tokenRefreshed, err := refreshTokenIfNeeded(secret, cluster)
if err != nil {
return err
}
if caCrtUpdated || tokenRefreshed {
if err := k8sutil.CreateOrUpdateSecret(c.k8sClient, secret, ownerRef); err != nil {
return err
}
}
return nil
}

func (c *portworxBasic) createTokenSecret(cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference) (*v1.Secret, error) {
rootCaCrt, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/ca.crt")
if err != nil && !os.IsNotExist(err) {
return nil, fmt.Errorf("error reading k8s cluster certificate located inside the pod at /var/run/secrets/kubernetes.io/serviceaccount/ca.crt: %w", err)
}
secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pxutil.PortworxServiceAccountTokenSecretName,
Expand All @@ -582,8 +583,7 @@ func (c *portworxBasic) createTokenSecret(cluster *corev1.StorageCluster, ownerR
},
Type: v1.SecretTypeOpaque,
Data: map[string][]byte{
core.ServiceAccountRootCAKey: rootCaCrt,
core.ServiceAccountNamespaceKey: []byte(cluster.Namespace),
v1.ServiceAccountNamespaceKey: []byte(cluster.Namespace),
},
}
if err := k8sutil.CreateOrUpdateSecret(c.k8sClient, secret, ownerRef); err != nil {
Expand All @@ -592,36 +592,30 @@ func (c *portworxBasic) createTokenSecret(cluster *corev1.StorageCluster, ownerR
return secret, nil
}

func (c *portworxBasic) refreshTokenSecret(secret *v1.Secret, cluster *corev1.StorageCluster, ownerRef *metav1.OwnerReference) error {
expirationSeconds, err := getPxSaTokenExpirationSeconds(cluster)
if err != nil {
return err
}
secret.Data[PxSaTokenRefreshTimeKey] = []byte(time.Now().UTC().Add(time.Duration(expirationSeconds/2) * time.Second).Format(time.RFC3339))
newToken, err := generatePxSaToken(cluster, expirationSeconds)
if err != nil {
return err
func updateCaCrtIfNeeded(secret *v1.Secret) (bool, error) {
rootCaCrt, err := os.ReadFile(rootCaCrtPath)
if err != nil && !os.IsNotExist(err) {
return false, fmt.Errorf("error reading k8s cluster certificate located inside the pod at %s: %w", rootCaCrtPath, err)
}
secret.Data[core.ServiceAccountTokenKey] = newToken
err = k8sutil.CreateOrUpdateSecret(c.k8sClient, secret, ownerRef)
if err != nil {
return err
if len(secret.Data) == 0 || !bytes.Equal(secret.Data[v1.ServiceAccountRootCAKey], rootCaCrt) {
secret.Data[v1.ServiceAccountRootCAKey] = rootCaCrt
return true, nil
}
return nil
return false, nil
}

func generatePxSaToken(cluster *corev1.StorageCluster, expirationSeconds int64) ([]byte, error) {
tokenRequest := &authv1.TokenRequest{
Spec: authv1.TokenRequestSpec{
Audiences: []string{"px"},
ExpirationSeconds: &expirationSeconds,
},
}
tokenResp, err := coreops.Instance().CreateToken(pxutil.PortworxServiceAccountName(cluster), cluster.Namespace, tokenRequest)
func refreshTokenIfNeeded(secret *v1.Secret, cluster *corev1.StorageCluster) (bool, error) {
needRefreshToken, err := isTokenRefreshRequired(secret)
if err != nil {
return nil, fmt.Errorf("error creating token from k8s: %w", err)
return false, err
}
if needRefreshToken {
if err := refreshToken(secret, cluster); err != nil {
return false, fmt.Errorf("failed to refresh the token secret for px container: %w", err)
}
return true, nil
}
return []byte(tokenResp.Status.Token), nil
return false, nil
}

func isTokenRefreshRequired(secret *v1.Secret) (bool, error) {
Expand All @@ -638,6 +632,36 @@ func isTokenRefreshRequired(secret *v1.Secret) (bool, error) {
return false, nil
}

func refreshToken(secret *v1.Secret, cluster *corev1.StorageCluster) error {
expirationSeconds, err := getPxSaTokenExpirationSeconds(cluster)
if err != nil {
return err
}
newToken, err := generatePxSaToken(cluster, expirationSeconds)
if err != nil {
return err
}
secret.Data[v1.ServiceAccountTokenKey] = []byte(newToken.Status.Token)
// ServiceAccount token expiration time we defined might get overwritten by the maxExpirationSeconds defined by the k8s token RESTful server,
// so our token refresh machanism has to honor this server limit.
// https://github.com/kubernetes/kubernetes/blob/79fee524e65ddc7c1448d5d2554c6f91233cf98d/pkg/registry/core/serviceaccount/storage/token.go#L208
secret.Data[PxSaTokenRefreshTimeKey] = []byte(time.Now().UTC().Add(time.Duration(*newToken.Spec.ExpirationSeconds/2) * time.Second).Format(time.RFC3339))
return nil
}

func generatePxSaToken(cluster *corev1.StorageCluster, expirationSeconds int64) (*authv1.TokenRequest, error) {
tokenRequest := &authv1.TokenRequest{
Spec: authv1.TokenRequestSpec{
ExpirationSeconds: &expirationSeconds,
},
}
tokenResp, err := coreops.Instance().CreateToken(pxutil.PortworxServiceAccountName(cluster), cluster.Namespace, tokenRequest)
if err != nil {
return nil, fmt.Errorf("error creating token from k8s: %w", err)
}
return tokenResp, nil
}

func (c *portworxBasic) createPortworxKVDBService(
cluster *corev1.StorageCluster,
ownerRef *metav1.OwnerReference,
Expand Down Expand Up @@ -726,6 +750,11 @@ func getPxSaTokenExpirationSeconds(cluster *corev1.StorageCluster) (int64, error
return defaultPxSaTokenExpirationSeconds, nil
}

// Set the path of k8s cluster root certificate for the purpose of testing
func SetRootCertPath(path string) {
rootCaCrtPath = path
}

// RegisterPortworxBasicComponent registers the Portworx Basic component
func RegisterPortworxBasicComponent() {
Register(PortworxBasicComponentName, &portworxBasic{})
Expand Down
65 changes: 36 additions & 29 deletions drivers/storage/portworx/component/telemetry.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package component

import (
"context"
cryptoTls "crypto/tls"
"crypto/x509"
"encoding/asn1"
Expand Down Expand Up @@ -842,13 +843,12 @@ func (t *telemetry) createDeploymentTelemetryRegistration(

// have a valid cluster UUID? lets validate the Telemetry SSL cert
if cuuid := cluster.Status.ClusterUID; cuuid != "" {
if certBytes, err := t.getTelemetrySSLCert(deployment.Namespace); err != nil {
logrus.WithError(err).Errorf("failed to get telemetry SSL cert")
} else if err2 := t.validateTelemetrySSLCert(certBytes, cuuid); err2 == errInvalidTelemetryCert {
sec, err := t.validateTelemetrySSLCert(deployment.Namespace, cuuid)
if err == errInvalidTelemetryCert {
logrus.Warn("refreshing telemetry SSL cert")
t.refreshTelemetrySSLCert(deployment)
} else if err2 != nil {
logrus.WithError(err2).Errorf("failed to validate telemetry SSL cert")
t.refreshTelemetrySSLCert(sec)
} else if err != nil {
logrus.WithError(err).Errorf("failed to validate telemetry SSL cert")
}
}

Expand Down Expand Up @@ -1023,33 +1023,35 @@ func (t *telemetry) createDeploymentTelemetryCollectorV2(
return nil
}

// getTelemetrySSLCert returns the telemetry SSL cert
func (t *telemetry) getTelemetrySSLCert(namespace string) ([]byte, error) {
// validateTelemetrySSLCert validates the telemetry SSL certificate.
// - note: cert's Psaudonym needs to match the cluster UUID
func (t *telemetry) validateTelemetrySSLCert(namespace, cuuid string) (*v1.Secret, error) {
if namespace == "" || cuuid == "" {
return nil, fmt.Errorf("invalid namespace or cluster UUID")
}

var sec v1.Secret
logrus.Debugf("Inspecting secret %s/%s for SSL cert", namespace, pxutil.TelemetryCertName)
err := k8sutil.GetSecret(t.k8sClient, pxutil.TelemetryCertName, namespace, &sec)
if err != nil {
return nil, err
}
return sec.Data["cert"], nil
}

// validateTelemetrySSLCert validates the telemetry SSL certificate.
// - note: cert's Psaudonym needs to match the cluster UUID
func (t *telemetry) validateTelemetrySSLCert(certBytes []byte, cuuid string) error {
if len(certBytes) <= 0 || cuuid == "" {
return nil
certBytes := sec.Data["cert"]
if len(certBytes) <= 0 {
logrus.Warnf("SSL cert not found in secret %s/%s", namespace, pxutil.TelemetryCertName)
return &sec, nil
}

block, _ := pem.Decode(certBytes)
if block == nil {
return fmt.Errorf("failed to decode SSL certificate")
return &sec, fmt.Errorf("failed to decode SSL certificate")
}
cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return err
return &sec, err
} else if cert == nil {
return fmt.Errorf("failed to parse SSL certificate")
return &sec, fmt.Errorf("failed to parse SSL certificate")
}

// find Pseudonym in Subject names
Expand All @@ -1059,38 +1061,43 @@ func (t *telemetry) validateTelemetrySSLCert(certBytes []byte, cuuid string) err
var ok bool
// quick sanity check!
if pseudonym, ok = v.Value.(string); !ok {
return fmt.Errorf("SSL cert Pseudonym is not a string")
return &sec, fmt.Errorf("SSL cert Pseudonym is not a string")
}
break
}
}
if pseudonym == "" {
logrus.Errorf("SSL cert Pseudonym not found")
return errInvalidTelemetryCert
return &sec, errInvalidTelemetryCert
}

if pseudonym != cuuid {
logrus.Errorf("SSL cert Pseudonym %s does not match cluster UUID %s",
pseudonym, cuuid)
return errInvalidTelemetryCert
return &sec, errInvalidTelemetryCert
}
logrus.Debugf("SSL cert Pseudonym %s matches cluster UUID", pseudonym)
return nil
logrus.Tracef("SSL cert Pseudonym %s matches cluster UUID", pseudonym)
return &sec, nil
}

// refreshTelemetrySSLCert deletes the telemetry SSL cert secret and telemetry-registration PODs
func (t *telemetry) refreshTelemetrySSLCert(dep *appsv1.Deployment) {
if dep == nil {
func (t *telemetry) refreshTelemetrySSLCert(sec *v1.Secret) {
if sec == nil {
return
} else if sec.Name != pxutil.TelemetryCertName {
logrus.Errorf("invalid secret name %s/%s (expected %s)", sec.Namespace, sec.Name, pxutil.TelemetryCertName)
return
}
logrus.Warnf("refreshTelemetrySSLCert - deleting telemetry SSL cert secret %s/%s", dep.Namespace, pxutil.TelemetryCertName)
err := k8sutil.DeleteSecret(t.k8sClient, pxutil.TelemetryCertName, dep.Namespace, dep.OwnerReferences...)

logrus.Warnf("refreshTelemetrySSLCert - deleting telemetry SSL cert secret %s/%s", sec.Namespace, sec.Name)
err := t.k8sClient.Delete(context.TODO(), sec)
if err != nil {
logrus.WithError(err).Warnf("failed to delete secret %s/%s", dep.Namespace, pxutil.TelemetryCertName)
logrus.WithError(err).Errorf("failed to delete secret %s/%s", sec.Namespace, sec.Name)
return
}

logrus.Warnf("refreshTelemetrySSLCert - deleting POD labeled role=%s", DeploymentNameTelemetryRegistration)
err = k8sutil.DeletePodsByLabel(t.k8sClient, map[string]string{"role": DeploymentNameTelemetryRegistration}, dep.Namespace)
err = k8sutil.DeletePodsByLabel(t.k8sClient, map[string]string{"role": DeploymentNameTelemetryRegistration}, sec.Namespace)
if err != nil {
logrus.WithError(err).Warnf("failed to delete px-telemetry-registration POD")
}
Expand Down
Loading

0 comments on commit 51a888f

Please sign in to comment.