Skip to content

Commit

Permalink
chore: support to configurate the qps and burst for client config
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyelei committed Mar 13, 2024
1 parent 1d4590d commit fb26daa
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 16 deletions.
8 changes: 5 additions & 3 deletions cmd/dataprotection/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"flag"
"fmt"
"os"
"runtime"
"strings"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand All @@ -33,7 +34,7 @@ import (
snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
discoverycli "k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -71,7 +72,7 @@ const (
)

var (
scheme = runtime.NewScheme()
scheme = k8sruntime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)

Expand Down Expand Up @@ -99,6 +100,7 @@ func init() {
viper.SetDefault(constant.CfgKeyCtrlrMgrNS, "default")
viper.SetDefault(constant.KubernetesClusterDomainEnv, constant.DefaultDNSDomain)
viper.SetDefault(dptypes.CfgKeyGCFrequencySeconds, dptypes.DefaultGCFrequencySeconds)
viper.SetDefault(dptypes.CfgDataProtectionReconcileWorkers, runtime.NumCPU())
}

func main() {
Expand Down Expand Up @@ -165,7 +167,7 @@ func main() {
os.Exit(1)
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
mgr, err := ctrl.NewManager(intctrlutil.GeKubeRestConfig(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
Expand Down
2 changes: 1 addition & 1 deletion cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func main() {
os.Exit(1)
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
mgr, err := ctrl.NewManager(intctrlutil.GeKubeRestConfig(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
Expand Down
2 changes: 1 addition & 1 deletion controllers/dataprotection/backup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (r *BackupReconciler) SetupWithManager(mgr ctrl.Manager) error {
b := ctrl.NewControllerManagedBy(mgr).
For(&dpv1alpha1.Backup{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(maxConcurDataProtectionReconKey),
MaxConcurrentReconciles: viper.GetInt(dptypes.CfgDataProtectionReconcileWorkers),
}).
Owns(&batchv1.Job{}).
Watches(&batchv1.Job{}, handler.EnqueueRequestsFromMapFunc(r.parseBackupJob))
Expand Down
10 changes: 0 additions & 10 deletions controllers/dataprotection/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,16 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
package dataprotection

import (
"runtime"
"time"

corev1 "k8s.io/api/core/v1"

viper "github.com/apecloud/kubeblocks/pkg/viperx"
)

const (
trueVal = "true"
)

const (
// settings keys
maxConcurDataProtectionReconKey = "MAXCONCURRENTRECONCILES_DATAPROTECTION"

// label keys
dataProtectionBackupRepoKey = "dataprotection.kubeblocks.io/backup-repo-name"
dataProtectionWaitRepoPreparationKey = "dataprotection.kubeblocks.io/wait-repo-preparation"
Expand Down Expand Up @@ -100,7 +94,3 @@ const (
)

var reconcileInterval = time.Second

func init() {
viper.SetDefault(maxConcurDataProtectionReconKey, runtime.NumCPU()*2)
}
7 changes: 7 additions & 0 deletions controllers/workloads/replicatedstatemachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"

workloads "github.com/apecloud/kubeblocks/apis/workloads/v1alpha1"
Expand Down Expand Up @@ -154,6 +155,9 @@ func (r *ReplicatedStateMachineReconciler) SetupWithManager(mgr ctrl.Manager) er

return ctrl.NewControllerManagedBy(mgr).
For(&workloads.ReplicatedStateMachine{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers),
}).
Watches(&appsv1.StatefulSet{}, stsHandler).
Watches(&batchv1.Job{}, jobHandler).
Watches(&corev1.Pod{}, podHandler).
Expand All @@ -165,6 +169,9 @@ func (r *ReplicatedStateMachineReconciler) SetupWithManager(mgr ctrl.Manager) er
podHandler := handler.NewBuilder(ctx).AddFinder(stsOwnerFinder).AddFinder(rsmOwnerFinder).Build()
return ctrl.NewControllerManagedBy(mgr).
For(&workloads.ReplicatedStateMachine{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: viper.GetInt(constant.CfgKBReconcileWorkers),
}).
Owns(&appsv1.StatefulSet{}).
Owns(&batchv1.Job{}).
Watches(&corev1.Pod{}, podHandler).
Expand Down
12 changes: 12 additions & 0 deletions deploy/helm/templates/dataprotection.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ spec:
- name: ENABLE_WEBHOOKS
value: "true"
{{- end }}
{{- if .Values.dataProtection.reconcileWorkers }}
- name: DATAPROTECTION_RECONCILE_WORKERS
value: {{ .Values.dataProtection.reconcileWorkers | quote }}
{{- end }}
{{- if .Values.client.qps }}
- name: CLIENT_QPS
value: {{ .Values.client.qps | quote }}
{{- end }}
{{- if .Values.client.burst }}
- name: CLIENT_BURST
value: {{ .Values.client.burst | quote }}
{{- end }}
- name: DP_ENCRYPTION_KEY
valueFrom:
secretKeyRef:
Expand Down
8 changes: 8 additions & 0 deletions deploy/helm/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ spec:
- name: KUBEBLOCKS_RECONCILE_WORKERS
value: {{ .Values.reconcileWorkers | quote }}
{{- end }}
{{- if .Values.client.qps }}
- name: CLIENT_QPS
value: {{ .Values.client.qps | quote }}
{{- end }}
{{- if .Values.client.burst }}
- name: CLIENT_BURST
value: {{ .Values.client.burst | quote }}
{{- end }}
{{- with .Values.nodeSelector }}
- name: CM_NODE_SELECTOR
value: {{ toJson . | quote }}
Expand Down
10 changes: 9 additions & 1 deletion deploy/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ replicaCount: 1
##
reconcileWorkers: ""

## k8s client configuration.
client:
# default is 20
qps: ""
# default is 30
burst: ""

## @param nameOverride
##
nameOverride: ""
Expand Down Expand Up @@ -311,7 +318,8 @@ dataProtection:
# if 'get/list' role of the backup CR are compromised.
encryptionKey: ""
gcFrequencySeconds: 3600

## MaxConcurrentReconciles for backup controller.
reconcileWorkers: ""
image:
# if the value of dataProtection.image.registry is not specified using `--set`, it will be set to the value of 'image.registry' by default
registry: ""
Expand Down
2 changes: 2 additions & 0 deletions pkg/constant/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
CfgKeyDPEncryptionKey = "DP_ENCRYPTION_KEY"

CfgKBReconcileWorkers = "KUBEBLOCKS_RECONCILE_WORKERS"
CfgClientQPS = "CLIENT_QPS"
CfgClientBurst = "CLIENT_BURST"
)

const (
Expand Down
15 changes: 15 additions & 0 deletions pkg/controllerutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

Expand Down Expand Up @@ -119,3 +121,16 @@ func SetOwnerReference(owner, object metav1.Object) error {
func SetControllerReference(owner, object metav1.Object) error {
return controllerutil.SetControllerReference(owner, object, innerScheme)
}

func GeKubeRestConfig() *rest.Config {
cfg := ctrl.GetConfigOrDie()
clientQPS := viper.GetInt(constant.CfgClientQPS)
if clientQPS != 0 {
cfg.QPS = float32(clientQPS)
}
clientBurst := viper.GetInt(constant.CfgClientBurst)
if clientBurst != 0 {
cfg.Burst = clientBurst
}
return cfg
}
5 changes: 5 additions & 0 deletions pkg/dataprotection/types/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ const (
ReconfigureRefAnnotationKey = "dataprotection.kubeblocks.io/reconfigure-ref"
)

const (
// CfgDataProtectionReconcileWorkers the max reconcile workers for MaxConcurrentReconciles
CfgDataProtectionReconcileWorkers = "DATAPROTECTION_RECONCILE_WORKERS"
)

// label keys
const (
// ClusterUIDLabelKey specifies the cluster UID label key.
Expand Down
4 changes: 4 additions & 0 deletions pkg/viperx/viperx.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func GetInt32(key string) int32 {
return rCall(key, viper.GetInt32)
}

func GetFloat64(key string) float64 {
return rCall(key, viper.GetFloat64)
}

func GetString(key string) string {
return rCall(key, viper.GetString)
}
Expand Down

0 comments on commit fb26daa

Please sign in to comment.