Skip to content

Commit

Permalink
Disable http client connection reuse to prevent memory leak (#842)
Browse files Browse the repository at this point in the history
The operator pod is suffering from memory leaks. After some analysis I
think I have narrowed it down to connections for the http client being
kept for reuse but never being used due to a new client being created in
every reconcile run.
This PR disables the connection keepalive/reuse and (at least in my
experiments) prevents the memory leak.

Fixes #700

- [x] Commits are signed per the DCO using --signoff
- [-] Unittest added for the new/changed functionality and all unit
tests are successful
- [-] Customer-visible features documented
- [x] No linter warnings (`make lint`)

If CRDs are changed:
- [-] CRD YAMLs updated (`make manifests`) and also copied into the helm
chart
- [-] Changes to CRDs documented

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and
signing off your commits, please check
[here](https://github.com/opensearch-project/OpenSearch/blob/main/CONTRIBUTING.md#developer-certificate-of-origin).

Signed-off-by: Sebastian Woehrl <sebastian.woehrl@maibornwolff.de>
(cherry picked from commit 56c9c8f)
  • Loading branch information
swoehrl-mw committed Jun 18, 2024
1 parent 9a17148 commit d58e8eb
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 41 deletions.
1 change: 0 additions & 1 deletion opensearch-operator/api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4766,21 +4766,25 @@ spec:
resource requirements.
properties:
claims:
description: "Claims lists the names of resources,
defined in spec.resourceClaims, that are used by
this container. \n This is an alpha field and requires
enabling the DynamicResourceAllocation feature gate.
\n This field is immutable. It can only be set for
containers."
description: |-
Claims lists the names of resources, defined in spec.resourceClaims,
that are used by this container.
This is an alpha field and requires enabling the
DynamicResourceAllocation feature gate.
This field is immutable. It can only be set for containers.
items:
description: ResourceClaim references one entry
in PodSpec.ResourceClaims.
properties:
name:
description: Name must match the name of one
entry in pod.spec.resourceClaims of the Pod
where this field is used. It makes that resource
available inside a container.
description: |-
Name must match the name of one entry in pod.spec.resourceClaims of
the Pod where this field is used. It makes that resource available
inside a container.
type: string
required:
- name
Expand All @@ -4796,8 +4800,9 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: 'Limits describes the maximum amount
of compute resources allowed. More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/'
description: |-
Limits describes the maximum amount of compute resources allowed.
More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
type: object
requests:
additionalProperties:
Expand All @@ -4806,12 +4811,11 @@ spec:
- type: string
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
description: 'Requests describes the minimum amount
of compute resources required. If Requests is omitted
for a container, it defaults to Limits if that is
explicitly specified, otherwise to an implementation-defined
value. Requests cannot exceed Limits. More info:
https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/'
description: |-
Requests describes the minimum amount of compute resources required.
If Requests is omitted for a container, it defaults to Limits if that is explicitly specified,
otherwise to an implementation-defined value. Requests cannot exceed Limits.
More info: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
type: object
type: object
type: object
Expand Down
4 changes: 4 additions & 0 deletions opensearch-operator/opensearch-gateway/services/os_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func NewOsClusterClient(clusterUrl string, username string, password string, opt
}
return &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
// These options are needed as otherwise connections would be kept and leak memory
// Connection reuse is not really possible due to each reconcile run being independent
DisableKeepAlives: true,
MaxIdleConns: 1,
}
}(),
Addresses: []string{clusterUrl},
Expand Down
33 changes: 11 additions & 22 deletions opensearch-operator/pkg/reconcilers/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/builders"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/helpers"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/k8s"
"github.com/Opster/opensearch-k8s-operator/opensearch-operator/pkg/reconcilers/util"
"github.com/cisco-open/operator-tools/pkg/reconciler"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/client-go/tools/record"
Expand All @@ -25,6 +26,7 @@ type ScalerReconciler struct {
recorder record.EventRecorder
reconcilerContext *ReconcilerContext
instance *opsterv1.OpenSearchCluster
ReconcilerOptions
}

func NewScalerReconciler(
Expand All @@ -33,14 +35,17 @@ func NewScalerReconciler(
recorder record.EventRecorder,
reconcilerContext *ReconcilerContext,
instance *opsterv1.OpenSearchCluster,
opts ...reconciler.ResourceReconcilerOption,
opts ...ReconcilerOption,
) *ScalerReconciler {
options := ReconcilerOptions{}
options.apply(opts...)
return &ScalerReconciler{
client: k8s.NewK8sClient(client, ctx, append(opts, reconciler.WithLog(log.FromContext(ctx).WithValues("reconciler", "scaler")))...),
client: k8s.NewK8sClient(client, ctx, reconciler.WithLog(log.FromContext(ctx).WithValues("reconciler", "scaler"))),
ctx: ctx,
recorder: recorder,
reconcilerContext: reconcilerContext,
instance: instance,
ReconcilerOptions: options,
}
}

Expand Down Expand Up @@ -187,11 +192,7 @@ func (r *ScalerReconciler) decreaseOneNode(currentStatus opsterv1.ComponentStatu
if !smartDecrease {
return false, err
}
username, password, err := helpers.UsernameAndPassword(r.client, r.instance)
if err != nil {
return true, err
}
clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password)
clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport)
if err != nil {
lg.Error(err, "failed to create os client")
r.recorder.AnnotatedEventf(r.instance, annotations, "WARN", "failed to remove node exclude", "Group-%s . failed to remove node exclude %s", nodePoolGroupName, lastReplicaNodeName)
Expand All @@ -209,13 +210,9 @@ func (r *ScalerReconciler) decreaseOneNode(currentStatus opsterv1.ComponentStatu

func (r *ScalerReconciler) excludeNode(currentStatus opsterv1.ComponentStatus, currentSts appsv1.StatefulSet, nodePoolGroupName string) error {
lg := log.FromContext(r.ctx)
username, password, err := helpers.UsernameAndPassword(r.client, r.instance)
annotations := map[string]string{"cluster-name": r.instance.GetName()}
if err != nil {
return err
}

clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password)
clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport)
if err != nil {
lg.Error(err, "failed to create os client")
r.recorder.AnnotatedEventf(r.instance, annotations, "Warning", "Scaler", "Failed to create os client for scaling")
Expand Down Expand Up @@ -272,12 +269,8 @@ func (r *ScalerReconciler) drainNode(currentStatus opsterv1.ComponentStatus, cur
lg := log.FromContext(r.ctx)
annotations := map[string]string{"cluster-name": r.instance.GetName()}
lastReplicaNodeName := helpers.ReplicaHostName(currentSts, *currentSts.Spec.Replicas-1)
username, password, err := helpers.UsernameAndPassword(r.client, r.instance)
if err != nil {
return err
}

clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password)
clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport)
if err != nil {
return err
}
Expand Down Expand Up @@ -328,12 +321,8 @@ func (r *ScalerReconciler) removeStatefulSet(sts appsv1.StatefulSet) (*ctrl.Resu
}

// Gracefully remove nodes
username, password, err := helpers.UsernameAndPassword(r.client, r.instance)
if err != nil {
return nil, err
}
annotations := map[string]string{"cluster-name": r.instance.GetName()}
clusterClient, err := services.NewOsClusterClient(builders.URLForCluster(r.instance), username, password)
clusterClient, err := util.CreateClientForCluster(r.client, r.ctx, r.instance, r.osClientTransport)
if err != nil {
lg.Error(err, "failed to create os client")
r.recorder.AnnotatedEventf(r.instance, annotations, "Warning", "Scaler", "Failed to create os client")
Expand Down

0 comments on commit d58e8eb

Please sign in to comment.