Skip to content

Commit

Permalink
feat: implement memberjoin action release 0.9 (#8384)
Browse files Browse the repository at this point in the history
  • Loading branch information
kubeJocker authored Dec 4, 2024
1 parent 7d10f3e commit f75aa8f
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 48 deletions.
266 changes: 218 additions & 48 deletions controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"reflect"
"strings"
"time"

"github.com/spf13/viper"
"golang.org/x/exp/maps"
Expand All @@ -41,6 +42,7 @@ import (
"github.com/apecloud/kubeblocks/pkg/controller/configuration"
"github.com/apecloud/kubeblocks/pkg/controller/factory"
"github.com/apecloud/kubeblocks/pkg/controller/graph"
"github.com/apecloud/kubeblocks/pkg/controller/instanceset"
"github.com/apecloud/kubeblocks/pkg/controller/model"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/generics"
Expand All @@ -59,6 +61,7 @@ type componentWorkloadOps struct {
cluster *appsv1alpha1.Cluster
synthesizeComp *component.SynthesizedComponent
dag *graph.DAG
component *appsv1alpha1.Component

// runningITS is a snapshot of the InstanceSet that is already running
runningITS *workloads.InstanceSet
Expand All @@ -81,6 +84,7 @@ func (t *componentWorkloadTransformer) Transform(ctx graph.TransformContext, dag
cluster := transCtx.Cluster
compDef := transCtx.CompDef
synthesizeComp := transCtx.SynthesizeComponent
component := transCtx.Component
reqCtx := intctrlutil.RequestCtx{
Ctx: transCtx.Context,
Log: transCtx.Logger,
Expand Down Expand Up @@ -115,7 +119,7 @@ func (t *componentWorkloadTransformer) Transform(ctx graph.TransformContext, dag
if protoITS == nil {
graphCli.Delete(dag, runningITS)
} else {
err = t.handleUpdate(reqCtx, graphCli, dag, cluster, synthesizeComp, runningITS, protoITS)
err = t.handleUpdate(reqCtx, graphCli, dag, cluster, synthesizeComp, runningITS, protoITS, component)
}
}
return err
Expand Down Expand Up @@ -167,11 +171,10 @@ func (t *componentWorkloadTransformer) stopWorkload(protoITS *workloads.Instance
}
}

func (t *componentWorkloadTransformer) handleUpdate(reqCtx intctrlutil.RequestCtx, cli model.GraphClient, dag *graph.DAG,
cluster *appsv1alpha1.Cluster, synthesizeComp *component.SynthesizedComponent, runningITS, protoITS *workloads.InstanceSet) error {
func (t *componentWorkloadTransformer) handleUpdate(reqCtx intctrlutil.RequestCtx, cli model.GraphClient, dag *graph.DAG, cluster *appsv1alpha1.Cluster, synthesizeComp *component.SynthesizedComponent, runningITS, protoITS *workloads.InstanceSet, component *appsv1alpha1.Component) error {
if !isCompStopped(synthesizeComp) {
// postpone the update of the workload until the component is back to running.
if err := t.handleWorkloadUpdate(reqCtx, dag, cluster, synthesizeComp, runningITS, protoITS); err != nil {
if err := t.handleWorkloadUpdate(reqCtx, dag, cluster, synthesizeComp, runningITS, protoITS, component); err != nil {
return err
}
}
Expand All @@ -188,9 +191,8 @@ func (t *componentWorkloadTransformer) handleUpdate(reqCtx intctrlutil.RequestCt
return nil
}

func (t *componentWorkloadTransformer) handleWorkloadUpdate(reqCtx intctrlutil.RequestCtx, dag *graph.DAG,
cluster *appsv1alpha1.Cluster, synthesizeComp *component.SynthesizedComponent, obj, its *workloads.InstanceSet) error {
cwo, err := newComponentWorkloadOps(reqCtx, t.Client, cluster, synthesizeComp, obj, its, dag)
func (t *componentWorkloadTransformer) handleWorkloadUpdate(reqCtx intctrlutil.RequestCtx, dag *graph.DAG, cluster *appsv1alpha1.Cluster, synthesizeComp *component.SynthesizedComponent, obj, its *workloads.InstanceSet, component *appsv1alpha1.Component) error {
cwo, err := newComponentWorkloadOps(reqCtx, t.Client, cluster, synthesizeComp, obj, its, dag, component)
if err != nil {
return err
}
Expand Down Expand Up @@ -412,6 +414,10 @@ func (r *componentWorkloadOps) expandVolume() error {
// horizontalScale handles workload horizontal scale
func (r *componentWorkloadOps) horizontalScale() error {
its := r.runningITS
// handle memberjoin lifecycle action
if err := r.checkAndDoMemberJoin(); err != nil {
return err
}
doScaleOut, doScaleIn := r.horizontalScaling()
if !doScaleOut && !doScaleIn {
if err := r.postScaleIn(); err != nil {
Expand Down Expand Up @@ -522,6 +528,9 @@ func (r *componentWorkloadOps) scaleOut(itsObj *workloads.InstanceSet) error {
if *itsObj.Spec.Replicas == 0 {
return nil
}

r.annotateInstanceSetForMemberJoin()

graphCli := model.NewGraphClient(r.cli)
graphCli.Noop(r.dag, r.protoITS)
d, err := newDataClone(r.reqCtx, r.cli, r.cluster, r.synthesizeComp, itsObj, r.protoITS, backupKey)
Expand Down Expand Up @@ -568,12 +577,81 @@ func getHealthyLorryClient(pods []*corev1.Pod) (lorry.Client, error) {
return nil, fmt.Errorf("no health lorry client found")
}

func (r *componentWorkloadOps) annotateInstanceSetForMemberJoin() {
if r.synthesizeComp.LifecycleActions.MemberJoin == nil {
return
}

podsToMemberjoin := getPodsToMemberJoinFromAnno(r.runningITS)

for podName := range r.desiredCompPodNameSet {
if r.runningItsPodNameSet.Has(podName) {
continue
}
if podsToMemberjoin.Has(podName) {
continue
}
podsToMemberjoin.Insert(podName)
}

if podsToMemberjoin.Len() > 0 {
r.protoITS.Annotations[constant.MemberJoinStatusAnnotationKey] = strings.Join(sets.List(podsToMemberjoin), ",")
}
}

func getPodsToMemberJoinFromAnno(instanceSet *workloads.InstanceSet) sets.Set[string] {
podsToMemberjoin := sets.New[string]()
if instanceSet == nil {
return podsToMemberjoin
}

if instanceSet.Annotations == nil {
return podsToMemberjoin
}

if memberJoinStatus := instanceSet.Annotations[constant.MemberJoinStatusAnnotationKey]; memberJoinStatus != "" {
podsToMemberjoin.Insert(strings.Split(memberJoinStatus, ",")...)
}

return podsToMemberjoin
}

func (r *componentWorkloadOps) leaveMember4ScaleIn() error {
labels := constant.GetComponentWellKnownLabels(r.synthesizeComp.ClusterName, r.synthesizeComp.Name)
pods, err := component.ListPodOwnedByComponent(r.reqCtx.Ctx, r.cli, r.synthesizeComp.Namespace, labels, inDataContext4C())
if err != nil {
return err
}

// TODO: Move memberLeave to the ITS controller. Instead of performing a switchover, we can directly scale down the non-leader nodes. This is because the pod ordinal is not guaranteed to be continuous.
podsToMemberLeave := make([]*corev1.Pod, 0)

podsToMemberjoin := getPodsToMemberJoinFromAnno(r.runningITS)
for _, pod := range pods {
// if the pod not exists in the generated pod names, it should be a member that needs to leave
if _, ok := r.desiredCompPodNameSet[pod.Name]; ok {
continue
}
podsToMemberLeave = append(podsToMemberLeave, pod)
}

var leaveErrors []error
for _, pod := range podsToMemberLeave {
if podsToMemberjoin.Has(pod.Name) {
leaveErrors = append(leaveErrors, fmt.Errorf("pod %s is in memberjoin process", pod.Name))
continue
}
if err := r.leaveMemberForPod(pod, pods); err != nil {
leaveErrors = append(leaveErrors, err)
}
}
if len(leaveErrors) > 0 {
return newRequeueError(time.Second, fmt.Sprintf("%v", leaveErrors))
}
return nil
}

func (r *componentWorkloadOps) leaveMemberForPod(pod *corev1.Pod, pods []*corev1.Pod) error {
tryToSwitchover := func(lorryCli lorry.Client, pod *corev1.Pod) error {
if pod == nil || len(pod.Labels) == 0 {
return nil
Expand Down Expand Up @@ -613,51 +691,148 @@ func (r *componentWorkloadOps) leaveMember4ScaleIn() error {
return err
}

// TODO: Move memberLeave to the ITS controller. Instead of performing a switchover, we can directly scale down the non-leader nodes. This is because the pod ordinal is not guaranteed to be continuous.
podsToMemberLeave := make([]*corev1.Pod, 0)
for _, pod := range pods {
// if the pod not exists in the generated pod names, it should be a member that needs to leave
if _, ok := r.desiredCompPodNameSet[pod.Name]; ok {
continue
// try the pod to leave first
lorryCli, err := lorry.NewClient(*pod)
if err != nil {
// try another pod
lorryCli, err = getHealthyLorryClient(pods)
if err != nil {
return err
}
podsToMemberLeave = append(podsToMemberLeave, pod)
}
for _, pod := range podsToMemberLeave {
// try the pod to leave first
lorryCli, err1 := lorry.NewClient(*pod)
if err1 != nil {
// try another pod
lorryCli, err1 = getHealthyLorryClient(pods)
if err1 != nil {
if err == nil {
err = err1
}
continue
}

if intctrlutil.IsNil(lorryCli) {
// no lorry in the pod
return nil
}

// switchover if the leaving pod is leader
if switchoverErr := tryToSwitchover(lorryCli, pod); switchoverErr != nil {
return switchoverErr
}

if err = lorryCli.LeaveMember(r.reqCtx.Ctx, pod.Name); err != nil {
if err != lorry.NotImplemented {
return err
}
}
return nil
}

func (r *componentWorkloadOps) checkAndDoMemberJoin() error {
// just wait for memberjoin anno to be updated
if r.protoITS.Annotations[constant.MemberJoinStatusAnnotationKey] != "" {
return nil
}

podsToMemberjoin := getPodsToMemberJoinFromAnno(r.runningITS)
if len(podsToMemberjoin) == 0 {
return nil
}

if r.synthesizeComp.LifecycleActions == nil || r.synthesizeComp.LifecycleActions.MemberJoin == nil {
podsToMemberjoin.Clear()
}
err := r.doMemberJoin(podsToMemberjoin)
if err != nil {
return err
}

if podsToMemberjoin.Len() == 0 {
// Anno will be merged later, so it should be deleted from both protoITS and runningITS
delete(r.protoITS.Annotations, constant.MemberJoinStatusAnnotationKey)
delete(r.runningITS.Annotations, constant.MemberJoinStatusAnnotationKey)
} else {
r.protoITS.Annotations[constant.MemberJoinStatusAnnotationKey] = strings.Join(sets.List(podsToMemberjoin), ",")
}
return nil
}

func (r *componentWorkloadOps) precondition(name string, action *appsv1alpha1.Action) error {
if action == nil || action.PreCondition == nil {
return nil
}

switch *action.PreCondition {
case appsv1alpha1.ImmediatelyPreConditionType:
return nil
case appsv1alpha1.ComponentReadyPreConditionType:
if r.component == nil || r.component.Status.Phase != appsv1alpha1.RunningClusterCompPhase {
return fmt.Errorf("component is nil when checking RuntimeReady preCondition in %s action", name)
}
case appsv1alpha1.RuntimeReadyPreConditionType:
if r.runningITS == nil || !instanceset.IsInstancesReady(r.runningITS) {
return fmt.Errorf("runtime is nil when checking RuntimeReady preCondition in %s action", name)
}
case appsv1alpha1.ClusterReadyPreConditionType:
if r.cluster == nil || r.cluster.Status.Phase != appsv1alpha1.RunningClusterPhase {
return fmt.Errorf("cluster is nil when checking RuntimeReady preCondition in %s action", name)
}
default:
return fmt.Errorf("unknown precondition type %s", *action.PreCondition)
}

return nil
}

if intctrlutil.IsNil(lorryCli) {
// no lorry in the pod
func (r *componentWorkloadOps) doMemberJoin(podSet sets.Set[string]) error {
if len(podSet) == 0 {
return nil
}

if r.synthesizeComp.LifecycleActions == nil || r.synthesizeComp.LifecycleActions.MemberJoin == nil {
return nil
}

if err := r.precondition(constant.MemberJoinAction, r.synthesizeComp.LifecycleActions.MemberJoin.CustomHandler); err != nil {
return err
}

labels := constant.GetComponentWellKnownLabels(r.synthesizeComp.ClusterName, r.synthesizeComp.Name)
runningPods, err := component.ListPodOwnedByComponent(r.reqCtx.Ctx, r.cli, r.synthesizeComp.Namespace, labels, inDataContext4C())
if err != nil {
return fmt.Errorf("failed to list pods: %w", err)
}

var joinErrors []error
for _, pod := range runningPods {
if !podSet.Has(pod.Name) {
continue
}

// switchover if the leaving pod is leader
if switchoverErr := tryToSwitchover(lorryCli, pod); switchoverErr != nil {
return switchoverErr
if err := r.joinMemberForPod(pod, podSet); err != nil {
joinErrors = append(joinErrors, fmt.Errorf("pod %s: %w", pod.Name, err))
} else {
podSet.Delete(pod.Name)
}
}

if err2 := lorryCli.LeaveMember(r.reqCtx.Ctx, pod.Name); err2 != nil {
// For the purpose of upgrade compatibility, if the version of Lorry is 0.7 and
// the version of KB is upgraded to 0.8 or newer, lorry client will return an NotImplemented error,
// in this case, here just ignore it.
if err2 == lorry.NotImplemented {
r.reqCtx.Log.Info("lorry leave member api is not implemented")
} else if err == nil {
err = err2
}
if len(joinErrors) > 0 {
return newRequeueError(time.Second, fmt.Sprintf("%v", joinErrors))
}
return nil
}

func (r *componentWorkloadOps) joinMemberForPod(pod *corev1.Pod, podSet sets.Set[string]) error {
lorryCli, err := lorry.NewClient(*pod)
if err != nil {
return fmt.Errorf("failed to create lorry client: %w", err)
}

if intctrlutil.IsNil(lorryCli) {
r.reqCtx.Log.Info("skipping pod with nil lorry client", "pod", pod.Name)
return nil
}

if err = lorryCli.JoinMember(r.reqCtx.Ctx); err != nil {
if err == lorry.NotImplemented {
r.reqCtx.Log.Info("lorry join member API not implemented", "pod", pod.Name)
return nil
}
return fmt.Errorf("join member failed: %w", err)
}
return err // TODO: use requeue-after

return nil
}

func (r *componentWorkloadOps) deletePVCs4ScaleIn(itsObj *workloads.InstanceSet) error {
Expand Down Expand Up @@ -943,13 +1118,7 @@ func buildInstanceSetPlacementAnnotation(comp *appsv1alpha1.Component, its *work
}
}

func newComponentWorkloadOps(reqCtx intctrlutil.RequestCtx,
cli client.Client,
cluster *appsv1alpha1.Cluster,
synthesizeComp *component.SynthesizedComponent,
runningITS *workloads.InstanceSet,
protoITS *workloads.InstanceSet,
dag *graph.DAG) (*componentWorkloadOps, error) {
func newComponentWorkloadOps(reqCtx intctrlutil.RequestCtx, cli client.Client, cluster *appsv1alpha1.Cluster, synthesizeComp *component.SynthesizedComponent, runningITS *workloads.InstanceSet, protoITS *workloads.InstanceSet, dag *graph.DAG, component *appsv1alpha1.Component) (*componentWorkloadOps, error) {
compPodNames, err := generatePodNames(synthesizeComp)
if err != nil {
return nil, err
Expand All @@ -970,5 +1139,6 @@ func newComponentWorkloadOps(reqCtx intctrlutil.RequestCtx,
runningItsPodNames: itsPodNames,
desiredCompPodNameSet: sets.New(compPodNames...),
runningItsPodNameSet: sets.New(itsPodNames...),
component: component,
}, nil
}
1 change: 1 addition & 0 deletions pkg/constant/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (

// NodeSelectorOnceAnnotationKey adds nodeSelector in podSpec for one pod exactly once
NodeSelectorOnceAnnotationKey = "workloads.kubeblocks.io/node-selector-once"
MemberJoinStatusAnnotationKey = "workloads.kubeblocks.io/memberjoin-in-processing"
)

// GetKBGenerationAnnotation returns the annotation for kubeblocks generation.
Expand Down

0 comments on commit f75aa8f

Please sign in to comment.