diff --git a/.buildkite/bk.integration.pipeline.yml b/.buildkite/bk.integration.pipeline.yml index 189c7af7c0b..fa293b7c4ca 100644 --- a/.buildkite/bk.integration.pipeline.yml +++ b/.buildkite/bk.integration.pipeline.yml @@ -28,7 +28,7 @@ steps: steps: - label: "Win2022:sudo:{{matrix}}" command: | - buildkite-agent artifact download build/distributions/** . --step 'packaging-windows' --build ${BUILDKITE_TRIGGERED_FROM_BUILD_ID} + buildkite-agent artifact download build/distributions/** . --step 'packaging-windows' .buildkite/scripts/integration-tests.ps1 {{matrix}} true artifact_paths: - build/** @@ -37,6 +37,9 @@ steps: provider: "gcp" machineType: "n1-standard-8" image: "family/platform-ingest-elastic-agent-windows-2022" + retry: + automatic: + limit: 1 matrix: - default - fleet @@ -49,7 +52,7 @@ steps: - label: "Win2022:non-sudo:{{matrix}}" command: | - buildkite-agent artifact download build/distributions/** . --step 'packaging-windows' --build ${BUILDKITE_TRIGGERED_FROM_BUILD_ID} + buildkite-agent artifact download build/distributions/** . --step 'packaging-windows' .buildkite/scripts/integration-tests.ps1 {{matrix}} false artifact_paths: - build/** @@ -58,12 +61,15 @@ steps: provider: "gcp" machineType: "n1-standard-8" image: "family/platform-ingest-elastic-agent-windows-2022" + retry: + automatic: + limit: 1 matrix: - default - label: "Win2025:sudo:{{matrix}}" command: | - buildkite-agent artifact download build/distributions/** . --step 'packaging-windows' --build ${BUILDKITE_TRIGGERED_FROM_BUILD_ID} + buildkite-agent artifact download build/distributions/** . --step 'packaging-windows' .buildkite/scripts/integration-tests.ps1 {{matrix}} true artifact_paths: - build/** @@ -84,7 +90,7 @@ steps: - label: "Win2025:non-sudo:{{matrix}}" command: | - buildkite-agent artifact download build/distributions/** . --step 'packaging-windows' --build ${BUILDKITE_TRIGGERED_FROM_BUILD_ID} + buildkite-agent artifact download build/distributions/** . --step 'packaging-windows' .buildkite/scripts/integration-tests.ps1 {{matrix}} false artifact_paths: - build/** @@ -104,11 +110,14 @@ steps: - label: "x86_64:non-sudo: {{matrix}}" # only packaging-ubuntu-x86-64 artifact dependency is required command: | - buildkite-agent artifact download build/distributions/** . --step 'packaging-ubuntu-x86-64' --build ${BUILDKITE_TRIGGERED_FROM_BUILD_ID} + buildkite-agent artifact download build/distributions/** . --step 'packaging-ubuntu-x86-64' .buildkite/scripts/steps/integration_tests_tf.sh {{matrix}} false artifact_paths: - build/** - build/diagnostics/** + retry: + automatic: + limit: 1 agents: provider: "gcp" machineType: "n1-standard-8" @@ -119,11 +128,14 @@ steps: - label: "x86_64:sudo: {{matrix}}" # due to deb group present in matrix tar.gz and deb packages artifacts are required command: | - buildkite-agent artifact download build/distributions/** . --build ${BUILDKITE_TRIGGERED_FROM_BUILD_ID} + buildkite-agent artifact download build/distributions/** . .buildkite/scripts/steps/integration_tests_tf.sh {{matrix}} true artifact_paths: - build/** - build/diagnostics/** + retry: + automatic: + limit: 1 agents: provider: "gcp" machineType: "n1-standard-8" @@ -145,9 +157,8 @@ steps: - container - label: "arm:sudo: {{matrix}}" - skip: true command: | - buildkite-agent artifact download build/distributions/** . --step 'packaging-ubuntu-arm64' --build ${BUILDKITE_TRIGGERED_FROM_BUILD_ID} + buildkite-agent artifact download build/distributions/** . --step 'packaging-ubuntu-arm64' .buildkite/scripts/steps/integration_tests_tf.sh {{matrix}} true artifact_paths: - build/** @@ -156,30 +167,37 @@ steps: provider: "aws" imagePrefix: "platform-ingest-beats-ubuntu-2404-aarch64" instanceType: "m6g.2xlarge" + retry: + automatic: + limit: 1 matrix: - default - upgrade - upgrade-flavor - standalone-upgrade - fleet - - fleet-endpoint-security - - fleet-airgapped - - fleet-airgapped-privileged - - fleet-privileged - - fleet-upgrade-to-pr-build - - install-uninstall - - fqdn - - deb - - container + # ARM tests are enabled for only selected groups in order to save resources + # - fleet-endpoint-security + # - fleet-airgapped + # - fleet-airgapped-privileged + # - fleet-privileged + # - fleet-upgrade-to-pr-build + # - install-uninstall + # - fqdn + # - deb + # - container - label: "arm:non-sudo: {{matrix}}" skip: true command: | - buildkite-agent artifact download build/distributions/** . --step 'packaging-ubuntu-arm64' --build ${BUILDKITE_TRIGGERED_FROM_BUILD_ID} + buildkite-agent artifact download build/distributions/** . --step 'packaging-ubuntu-arm64' .buildkite/scripts/steps/integration_tests_tf.sh {{matrix}} false artifact_paths: - build/** - build/diagnostics/** + retry: + automatic: + limit: 1 agents: provider: "aws" imagePrefix: "platform-ingest-beats-ubuntu-2404-aarch64" @@ -194,11 +212,14 @@ steps: steps: - label: "x86_64:sudo:rpm" command: | - buildkite-agent artifact download build/distributions/** . --build ${BUILDKITE_TRIGGERED_FROM_BUILD_ID} + buildkite-agent artifact download build/distributions/** . .buildkite/scripts/steps/integration_tests_tf.sh rpm true artifact_paths: - build/** - build/diagnostics/** + retry: + automatic: + limit: 1 agents: provider: "gcp" machineType: "n1-standard-8" diff --git a/.buildkite/integration.pipeline.yml b/.buildkite/integration.pipeline.yml index 45434e62fc8..3a71673bfc3 100644 --- a/.buildkite/integration.pipeline.yml +++ b/.buildkite/integration.pipeline.yml @@ -16,10 +16,26 @@ steps: command: ".buildkite/scripts/steps/integration-package.sh" artifact_paths: - build/distributions/** + retry: + automatic: + limit: 1 agents: provider: "gcp" machineType: "n2-standard-8" + - label: "Packaging: Ubuntu x86_64 FIPS" + key: "packaging-ubuntu-x86-64-fips" + env: + PACKAGES: "tar.gz" + PLATFORMS: "linux/amd64" + FIPS: "true" + command: ".buildkite/scripts/steps/integration-package.sh" + artifact_paths: + - build/distributions/** + agents: + provider: "gcp" + machineType: "n2-standard-4" + - label: "Packaging: Ubuntu arm64" key: "packaging-ubuntu-arm64" env: @@ -28,10 +44,26 @@ steps: command: ".buildkite/scripts/steps/integration-package.sh" artifact_paths: - build/distributions/** + retry: + automatic: + limit: 1 agents: provider: "gcp" machineType: "n2-standard-8" + - label: "Packaging: Ubuntu arm64 FIPS" + key: "packaging-ubuntu-arm64-fips" + env: + PACKAGES: "tar.gz" + PLATFORMS: "linux/arm64" + FIPS: "true" + command: ".buildkite/scripts/steps/integration-package.sh" + artifact_paths: + - build/distributions/** + agents: + provider: "gcp" + machineType: "n2-standard-4" + - label: "Packaging: Windows" key: "packaging-windows" env: @@ -40,6 +72,9 @@ steps: command: ".buildkite/scripts/steps/integration-package.sh" artifact_paths: - build/distributions/** + retry: + automatic: + limit: 1 agents: provider: "gcp" machineType: "n2-standard-8" @@ -52,6 +87,9 @@ steps: command: ".buildkite/scripts/steps/integration-package.sh" artifact_paths: - build/distributions/** + retry: + automatic: + limit: 1 agents: provider: "gcp" machineType: "n2-standard-8" @@ -82,6 +120,9 @@ steps: agents: provider: "gcp" machineType: "n2-standard-8" + retry: + automatic: + limit: 1 notify: - github_commit_status: context: "buildkite/elastic-agent-extended-testing - Serverless integration test" @@ -103,6 +144,9 @@ steps: agents: provider: "gcp" machineType: "n2-standard-8" + retry: + automatic: + limit: 1 notify: - github_commit_status: context: "buildkite/elastic-agent-extended-testing - Extended runtime leak tests" @@ -110,10 +154,7 @@ steps: - label: "Triggering Integration tests" depends_on: - int-packaging - trigger: "elastic-agent-extended-testing-bk" - build: - commit: "${BUILDKITE_COMMIT}" - branch: "${BUILDKITE_BRANCH}" + command: "buildkite-agent pipeline upload .buildkite/bk.integration.pipeline.yml" - label: "Serverless Beats Tests" # To speedup the build process only packaging-ubuntu-x86-64 artifact dependency is required @@ -122,6 +163,9 @@ steps: key: "serverless-beats-integration-tests" concurrency_group: elastic-agent-extended-testing/beats-integration concurrency: 8 + retry: + automatic: + limit: 1 env: TEST_INTEG_AUTH_GCP_DATACENTER: "us-central1-a" command: | @@ -140,6 +184,9 @@ steps: K8S_VERSION: "v1.31.0" KIND_VERSION: "v0.24.0" command: ".buildkite/scripts/steps/k8s-extended-tests.sh" + retry: + automatic: + limit: 1 artifact_paths: - "build/k8s-logs*/*" - "build/k8s-logs*/**/*" diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 443175d17e1..69db79fc954 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -29,6 +29,8 @@ steps: provider: "gcp" image: "family/core-ubuntu-2204" retry: + automatic: + limit: 1 manual: allowed: true @@ -46,6 +48,8 @@ steps: diskSizeGb: 200 instanceType: "m6g.xlarge" retry: + automatic: + limit: 1 manual: allowed: true @@ -66,6 +70,8 @@ steps: disk_size: 200 disk_type: "pd-ssd" retry: + automatic: + limit: 1 manual: allowed: true @@ -86,6 +92,8 @@ steps: disk_size: 200 disk_type: "pd-ssd" retry: + automatic: + limit: 1 manual: allowed: true @@ -103,6 +111,8 @@ steps: provider: orka imagePrefix: generic-base-15-arm-002 retry: + automatic: + limit: 1 manual: allowed: true @@ -119,6 +129,8 @@ steps: provider: orka imagePrefix: generic-13-ventura-x64 retry: + automatic: + limit: 1 manual: allowed: true @@ -142,6 +154,8 @@ steps: machine_type: "n2-standard-8" disk_type: "pd-ssd" retry: + automatic: + limit: 1 manual: allowed: true - label: "Unit tests - Windows 11" @@ -161,6 +175,8 @@ steps: machine_type: "n2-standard-8" disk_type: "pd-ssd" retry: + automatic: + limit: 1 manual: allowed: true @@ -245,8 +261,9 @@ steps: - label: "Trigger k8s sync" branches: main plugins: - - monebag/monorepo-diff#v2.5.9: + - monorepo-diff#v1.2.0: diff: "git diff --name-only HEAD~1" + interpolation: false watch: - path: - deploy/kubernetes/* @@ -267,8 +284,9 @@ steps: build.env("GITHUB_PR_TRIGGER_COMMENT") =~ /.*extended.*/ plugins: - - monorepo-diff#v1.0.1: + - monorepo-diff#v1.2.0: diff: "git diff --name-only origin/${GITHUB_PR_TARGET_BRANCH}...HEAD" + interpolation: false watch: - path: - internal/ @@ -294,14 +312,12 @@ steps: - .buildkite/hooks/ config: - trigger: "elastic-agent-extended-testing" - build: - commit: "${BUILDKITE_COMMIT}" - branch: "${BUILDKITE_BRANCH}" - env: - - BUILDKITE_PULL_REQUEST=${BUILDKITE_PULL_REQUEST} - - BUILDKITE_PULL_REQUEST_BASE_BRANCH=${BUILDKITE_PULL_REQUEST_BASE_BRANCH} - - GITHUB_PR_LABELS=${GITHUB_PR_LABELS} + label: ":pipeline: Upload extended testing Pipeline" + command: "buildkite-agent pipeline upload .buildkite/integration.pipeline.yml" + env: + - BUILDKITE_PULL_REQUEST=${BUILDKITE_PULL_REQUEST} + - BUILDKITE_PULL_REQUEST_BASE_BRANCH=${BUILDKITE_PULL_REQUEST_BASE_BRANCH} + - GITHUB_PR_LABELS=${GITHUB_PR_LABELS} # Trigger for branches - label: "Triggering Extended tests for branches" diff --git a/.golangci.yml b/.golangci.yml index f76beb64fcc..50a2b2b7f88 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -46,7 +46,7 @@ linters: - asciicheck # simple linter to check that your code does not contain non-ASCII identifiers - bodyclose # checks whether HTTP response body is closed successfully - durationcheck # check for two durations multiplied together - - exportloopref # checks for pointers to enclosing loop variables + - copyloopvar # checks for unnecessary loop variable copies - goimports # Goimports does everything that gofmt does. Additionally it checks unused imports - gosec # inspects source code for security problems - importas # enforces consistent import aliases diff --git a/.package-version b/.package-version index e977f5eae6f..47da986f86f 100644 --- a/.package-version +++ b/.package-version @@ -1 +1 @@ -9.1.0 \ No newline at end of file +9.1.0 diff --git a/Makefile b/Makefile index e45941ae274..b5d3cbba6f2 100644 --- a/Makefile +++ b/Makefile @@ -3,20 +3,13 @@ COVERAGE_DIR=$(BUILD_DIR)/coverage BEATS?=elastic-agent PROJECTS= $(BEATS) PYTHON_ENV?=$(BUILD_DIR)/python-env -MAGE_PRESENT := $(shell mage --version 2> /dev/null | grep $(MAGE_VERSION)) -MAGE_IMPORT_PATH ?= github.com/magefile/mage -export MAGE_IMPORT_PATH ## mage : Sets mage .PHONY: mage mage: -ifndef MAGE_PRESENT - @echo Installing mage. - @go install ${MAGE_IMPORT_PATH} + @echo Installing mage + @go install github.com/magefile/mage @-mage -clean -else - @echo Mage already installed. -endif ## help : Show this help. help: Makefile diff --git a/changelog/fragments/1739437836-Improve-kubernetes_secrets-provider-secret-logging.yaml b/changelog/fragments/1739437836-Improve-kubernetes_secrets-provider-secret-logging.yaml new file mode 100644 index 00000000000..5a364cbcac9 --- /dev/null +++ b/changelog/fragments/1739437836-Improve-kubernetes_secrets-provider-secret-logging.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: enhancement + +# Change summary; a 80ish characters long description of the change. +summary: Improve kubernetes_secrets provider secret logging + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: "elastic-agent" + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/6841 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/6187 diff --git a/changelog/fragments/1740399399-fleet-backoff.yaml b/changelog/fragments/1740399399-fleet-backoff.yaml new file mode 100644 index 00000000000..ed88d561637 --- /dev/null +++ b/changelog/fragments/1740399399-fleet-backoff.yaml @@ -0,0 +1,34 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Make enroll command backoff more conservative + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: | + The plain enroll command now has an initial delay of 5s and a maximum of 10 minutes. It also has a jitter. + Delayed enroll now uses the same backoff behaviour as other requests to Fleet Server. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/6761 diff --git a/changelog/fragments/1740485771-ast-null-checks.yaml b/changelog/fragments/1740485771-ast-null-checks.yaml new file mode 100644 index 00000000000..4c9d752a437 --- /dev/null +++ b/changelog/fragments/1740485771-ast-null-checks.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Add missing null checks to AST methods + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +#pr: https://github.com/owner/repo/1234 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/6999 diff --git a/changelog/fragments/1740492672-set-replicas-for-gateway-collector.yaml b/changelog/fragments/1740492672-set-replicas-for-gateway-collector.yaml new file mode 100644 index 00000000000..b90d805fd4a --- /dev/null +++ b/changelog/fragments/1740492672-set-replicas-for-gateway-collector.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: set replicas for gateway collector + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: "elastic-agent" + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/7011 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/changelog/fragments/1740521494-diagnostics-include-all-metadata-that-is-sent-to-Fleet-by-default.yaml b/changelog/fragments/1740521494-diagnostics-include-all-metadata-that-is-sent-to-Fleet-by-default.yaml new file mode 100644 index 00000000000..f59a3708871 --- /dev/null +++ b/changelog/fragments/1740521494-diagnostics-include-all-metadata-that-is-sent-to-Fleet-by-default.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: enhancement + +# Change summary; a 80ish characters long description of the change. +summary: Include all metadata that is sent to Fleet in the agent-info.yaml file in diagnostics by default. + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: "elastic-agent" + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/7029 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/changelog/fragments/1740674973-add-ApiKey-prefix-to-MOTel-host-configurations.yaml b/changelog/fragments/1740674973-add-ApiKey-prefix-to-MOTel-host-configurations.yaml new file mode 100644 index 00000000000..7a8ef82f9af --- /dev/null +++ b/changelog/fragments/1740674973-add-ApiKey-prefix-to-MOTel-host-configurations.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: enhancement + +# Change summary; a 80ish characters long description of the change. +summary: add ApiKey prefix to MOTel host configurations + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: "elastic-agent" + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/7063 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/changelog/fragments/1740712347-support-ipv6-enroll-url.yaml b/changelog/fragments/1740712347-support-ipv6-enroll-url.yaml new file mode 100644 index 00000000000..89d00ba0a51 --- /dev/null +++ b/changelog/fragments/1740712347-support-ipv6-enroll-url.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: support ipv6 hosts in enroll url + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/7036 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/changelog/fragments/1740712540-support-grpc-ipv6-host.yaml b/changelog/fragments/1740712540-support-grpc-ipv6-host.yaml new file mode 100644 index 00000000000..b3bc5a8029a --- /dev/null +++ b/changelog/fragments/1740712540-support-grpc-ipv6-host.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: support ipv6 host in grpc config + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/7035 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/changelog/fragments/1740713597-support-agent-monitoring-ipv6.yaml b/changelog/fragments/1740713597-support-agent-monitoring-ipv6.yaml new file mode 100644 index 00000000000..034b52320ed --- /dev/null +++ b/changelog/fragments/1740713597-support-agent-monitoring-ipv6.yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: support ipv6 host in agent monitoring http config + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/7073 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/deploy/helm/edot-collector/kube-stack/managed_otlp/values.yaml b/deploy/helm/edot-collector/kube-stack/managed_otlp/values.yaml index e00aa891755..268269936f3 100644 --- a/deploy/helm/edot-collector/kube-stack/managed_otlp/values.yaml +++ b/deploy/helm/edot-collector/kube-stack/managed_otlp/values.yaml @@ -526,6 +526,7 @@ collectors: # forwarding telemetry to an Elasticsearch endpoint. gateway: suffix: gateway + replicas: 2 autoscaler: minReplicas: 2 # Start with at least 2 replicas for better availability. maxReplicas: 5 # Allow more scale-out if needed. diff --git a/deploy/helm/edot-collector/kube-stack/values.yaml b/deploy/helm/edot-collector/kube-stack/values.yaml index 0a11f0e8a9e..e0afdfcfd77 100644 --- a/deploy/helm/edot-collector/kube-stack/values.yaml +++ b/deploy/helm/edot-collector/kube-stack/values.yaml @@ -559,7 +559,6 @@ collectors: routing: default_pipelines: [metrics/otel] error_mode: ignore - match_once: true table: - context: metric statement: route() where instrumentation_scope.name == "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kubeletstatsreceiver" or IsMatch(instrumentation_scope.name, "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/*") diff --git a/dev-tools/mage/build.go b/dev-tools/mage/build.go index 190efa543ea..92da55273b1 100644 --- a/dev-tools/mage/build.go +++ b/dev-tools/mage/build.go @@ -11,6 +11,7 @@ import ( "log" "os" "path/filepath" + "regexp" "strings" "github.com/josephspurrier/goversioninfo" @@ -34,6 +35,39 @@ type BuildArgs struct { WinMetadata bool // Add resource metadata to Windows binaries (like add the version number to the .exe properties). } +// buildTagRE is a regexp to match strings like "-tags=abcd" +// but does not match "-tags= " +var buildTagRE = regexp.MustCompile(`-tags=([\S]+)?`) + +// ParseBuildTags returns the ExtraFlags param where all flags that are go build tags are joined by a comma. +// +// For example if given -someflag=val1 -tags=buildtag1 -tags=buildtag2 +// It will return -someflag=val1 -tags=buildtag1,buildtag2 +func (b BuildArgs) ParseBuildTags() []string { + flags := make([]string, 0) + if len(b.ExtraFlags) == 0 { + return flags + } + + buildTags := make([]string, 0) + for _, flag := range b.ExtraFlags { + if buildTagRE.MatchString(flag) { + arr := buildTagRE.FindStringSubmatch(flag) + if len(arr) != 2 || arr[1] == "" { + log.Printf("Unexpected format found for buildargs.ExtraFlags, ignoring value %q", flag) + continue + } + buildTags = append(buildTags, arr[1]) + } else { + flags = append(flags, flag) + } + } + if len(buildTags) > 0 { + flags = append(flags, "-tags="+strings.Join(buildTags, ",")) + } + return flags +} + // DefaultBuildArgs returns the default BuildArgs for use in builds. func DefaultBuildArgs() BuildArgs { args := BuildArgs{ @@ -53,6 +87,11 @@ func DefaultBuildArgs() BuildArgs { args.ExtraFlags = append(args.ExtraFlags, "-buildmode", "pie") } + if FIPSBuild { + args.ExtraFlags = append(args.ExtraFlags, "-tags=requirefips") + args.CGO = true + } + if DevBuild { // Disable optimizations (-N) and inlining (-l) for debugging. args.ExtraFlags = append(args.ExtraFlags, `-gcflags=all=-N -l`) @@ -151,6 +190,12 @@ func Build(params BuildArgs) error { if params.CGO { cgoEnabled = "1" } + + if FIPSBuild { + cgoEnabled = "1" + env["GOEXPERIMENT"] = "systemcrypto" + } + env["CGO_ENABLED"] = cgoEnabled // Spec @@ -159,7 +204,7 @@ func Build(params BuildArgs) error { "-o", filepath.Join(params.OutputDir, binaryName), } - args = append(args, params.ExtraFlags...) + args = append(args, params.ParseBuildTags()...) // ldflags ldflags := params.LDFlags diff --git a/dev-tools/mage/build_test.go b/dev-tools/mage/build_test.go new file mode 100644 index 00000000000..e30b5edbc58 --- /dev/null +++ b/dev-tools/mage/build_test.go @@ -0,0 +1,59 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. + +package mage + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_BuildArgs_ParseBuildTags(t *testing.T) { + tests := []struct { + name string + input []string + expect []string + }{{ + name: "no flags", + input: nil, + expect: []string{}, + }, { + name: "multiple flags with no tags", + input: []string{"-a", "-b", "-key=value"}, + expect: []string{"-a", "-b", "-key=value"}, + }, { + name: "one build tag", + input: []string{"-tags=example"}, + expect: []string{"-tags=example"}, + }, { + name: "multiple build tags", + input: []string{"-tags=example", "-tags=test"}, + expect: []string{"-tags=example,test"}, + }, { + name: "joined build tags", + input: []string{"-tags=example,test"}, + expect: []string{"-tags=example,test"}, + }, { + name: "multiple build tags with other flags", + input: []string{"-tags=example", "-tags=test", "-key=value", "-a"}, + expect: []string{"-key=value", "-a", "-tags=example,test"}, + }, { + name: "incorrectly formatted tag", + input: []string{"-tags= example"}, + expect: []string{}, + }, { + name: "incorrectly formatted tag with valid tag", + input: []string{"-tags= example", "-tags=test"}, + expect: []string{"-tags=test"}, + }} + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + args := BuildArgs{ExtraFlags: tc.input} + flags := args.ParseBuildTags() + assert.EqualValues(t, tc.expect, flags) + }) + } +} diff --git a/dev-tools/mage/crossbuild.go b/dev-tools/mage/crossbuild.go index 30750602118..a1492902f98 100644 --- a/dev-tools/mage/crossbuild.go +++ b/dev-tools/mage/crossbuild.go @@ -249,6 +249,10 @@ func CrossBuildImage(platform string) (string, error) { return "", err } + if FIPSBuild { + tagSuffix += "-fips" + } + return BeatsCrossBuildImage + ":" + goVersion + "-" + tagSuffix, nil } @@ -332,6 +336,7 @@ func (b GolangCrossBuilder) Build() error { "--env", fmt.Sprintf("SNAPSHOT=%v", Snapshot), "--env", fmt.Sprintf("DEV=%v", DevBuild), "--env", fmt.Sprintf("EXTERNAL=%v", ExternalBuild), + "--env", fmt.Sprintf("FIPS=%v", FIPSBuild), "-v", repoInfo.RootDir+":"+mountPoint, "-w", workDir, image, diff --git a/dev-tools/mage/dockerbuilder.go b/dev-tools/mage/dockerbuilder.go index 0e0bd2078ca..cb539145645 100644 --- a/dev-tools/mage/dockerbuilder.go +++ b/dev-tools/mage/dockerbuilder.go @@ -11,6 +11,7 @@ import ( "fmt" "io" "io/fs" + "maps" "os" "os/exec" "path/filepath" @@ -53,13 +54,13 @@ func (b *dockerBuilder) Build() error { return fmt.Errorf("failed to prepare build: %w", err) } - tag, err := b.dockerBuild() + tag, additionalTags, err := b.dockerBuild() tries := 3 for err != nil && tries != 0 { fmt.Println(">> Building docker images again (after 10 s)") // This sleep is to avoid hitting the docker build issues when resources are not available. time.Sleep(time.Second * 10) - tag, err = b.dockerBuild() + tag, additionalTags, err = b.dockerBuild() tries-- } if err != nil { @@ -70,6 +71,16 @@ func (b *dockerBuilder) Build() error { return fmt.Errorf("failed to save docker as artifact: %w", err) } + // additional tags should not be created with + for _, tag := range additionalTags { + if err := b.dockerSave(tag, map[string]interface{}{ + // effectively override the name used from b.ImageName() to the tag + "Name": strings.ReplaceAll(tag, ":", "-"), + }); err != nil { + return fmt.Errorf("failed to save docker with tag %s as artifact: %w", tag, err) + } + } + return nil } @@ -173,22 +184,43 @@ func (b *dockerBuilder) expandDockerfile(templatesDir string, data map[string]in return nil } -func (b *dockerBuilder) dockerBuild() (string, error) { - tag := fmt.Sprintf("%s:%s", b.imageName, b.Version) +// dockerBuild runs "docker build -t t1 -t t2 ... buildDir" +// returns the main tag additional tags if specified as part of extra_tags property +// the extra tags are not push to the registry from b.ExtraVars["repository"] +// returns an error if the command fails +func (b *dockerBuilder) dockerBuild() (string, []string, error) { + mainTag := fmt.Sprintf("%s:%s", b.imageName, b.Version) // For Independent Agent releases, replace the "+" with a "." since the "+" character // currently isn't allowed in a tag in Docker // E.g., 8.13.0+build202402191057 -> 8.13.0.build202402191057 - tag = strings.Replace(tag, "+", ".", 1) + mainTag = strings.Replace(mainTag, "+", ".", 1) if b.Snapshot { - tag = tag + "-SNAPSHOT" + mainTag = mainTag + "-SNAPSHOT" + } + if b.FIPS { + mainTag = mainTag + "-fips" } if repository := b.ExtraVars["repository"]; repository != "" { - tag = fmt.Sprintf("%s/%s", repository, tag) + mainTag = fmt.Sprintf("%s/%s", repository, mainTag) + } + + args := []string{ + "build", + "-t", mainTag, } - return tag, sh.Run("docker", "build", "-t", tag, b.buildDir) + extraTags := []string{} + for _, tag := range b.ExtraTags { + extraTags = append(extraTags, fmt.Sprintf("%s:%s", b.imageName, tag)) + } + for _, t := range extraTags { + args = append(args, "-t", t) + } + args = append(args, b.buildDir) + + return mainTag, extraTags, sh.Run("docker", args...) } -func (b *dockerBuilder) dockerSave(tag string) error { +func (b *dockerBuilder) dockerSave(tag string, templateExtraArgs ...map[string]interface{}) error { if _, err := os.Stat(distributionsDir); os.IsNotExist(err) { err := os.MkdirAll(distributionsDir, 0750) if err != nil { @@ -198,9 +230,13 @@ func (b *dockerBuilder) dockerSave(tag string) error { // Save the container as artifact outputFile := b.OutputFile if outputFile == "" { - outputTar, err := b.Expand(defaultBinaryName+".docker.tar.gz", map[string]interface{}{ + args := map[string]interface{}{ "Name": b.imageName, - }) + } + for _, extraArgs := range templateExtraArgs { + maps.Copy(args, extraArgs) + } + outputTar, err := b.Expand(defaultBinaryName+".docker.tar.gz", args) if err != nil { return err } diff --git a/dev-tools/mage/gotest.go b/dev-tools/mage/gotest.go index 11294caa333..768e2465d77 100644 --- a/dev-tools/mage/gotest.go +++ b/dev-tools/mage/gotest.go @@ -187,9 +187,9 @@ func GoTestBuild(ctx context.Context, params GoTestArgs) error { args := []string{"test", "-c", "-o", params.OutputFile} if len(params.Tags) > 0 { - params := strings.Join(params.Tags, " ") + params := strings.Join(params.Tags, ",") if params != "" { - args = append(args, "-tags", params) + args = append(args, "-tags="+params) } } diff --git a/dev-tools/mage/pkg.go b/dev-tools/mage/pkg.go index a3917ddac1d..6ec09f2e598 100644 --- a/dev-tools/mage/pkg.go +++ b/dev-tools/mage/pkg.go @@ -80,6 +80,7 @@ func Package() error { spec.OS = target.GOOS() spec.Arch = packageArch spec.Snapshot = Snapshot + spec.FIPS = FIPSBuild spec.evalContext = map[string]interface{}{ "GOOS": target.GOOS(), "GOARCH": target.GOARCH(), @@ -148,11 +149,11 @@ type packageBuilder struct { } func (b packageBuilder) Build() error { - fmt.Printf(">> package: Building %v type=%v for platform=%v\n", b.Spec.Name, b.Type, b.Platform.Name) + fmt.Printf(">> package: Building %v type=%v for platform=%v fips=%v\n", b.Spec.Name, b.Type, b.Platform.Name, b.Spec.FIPS) log.Printf("Package spec: %+v", b.Spec) if err := b.Type.Build(b.Spec); err != nil { - return fmt.Errorf("failed building %v type=%v for platform=%v : %w", - b.Spec.Name, b.Type, b.Platform.Name, err) + return fmt.Errorf("failed building %v type=%v for platform=%v fips=%v : %w", + b.Spec.Name, b.Type, b.Platform.Name, b.Spec.FIPS, err) } return nil } @@ -247,6 +248,10 @@ func TestPackages(options ...TestPackagesOption) error { args = append(args, "-root-owner") } + if FIPSBuild { + args = append(args, "-fips") + } + args = append(args, "-files", MustExpand("{{.PWD}}/build/distributions/*")) if out, err := goTest(args...); err != nil { diff --git a/dev-tools/mage/pkg_test.go b/dev-tools/mage/pkg_test.go index 9d1b40928b3..dbbc74b6fcd 100644 --- a/dev-tools/mage/pkg_test.go +++ b/dev-tools/mage/pkg_test.go @@ -20,6 +20,9 @@ func testPackageSpec() PackageSpec { Snapshot: true, OS: "windows", Arch: "x86_64", + ExtraTags: []string{ + "git-{{ substring commit 0 12 }}", + }, Files: map[string]PackageFile{ "brewbeat.yml": PackageFile{ Source: "./testdata/config.yml", @@ -66,6 +69,10 @@ func testPackage(t testing.TB, pack func(PackageSpec) error) { readmePath := filepath.ToSlash(filepath.Clean(readme.Source)) assert.True(t, strings.HasPrefix(readmePath, packageStagingDir)) + commit := spec.ExtraTags[0] + expected := "git-" + commitHash[:12] + assert.Equal(t, expected, commit) + if err := pack(spec); err != nil { t.Fatal(err) } diff --git a/dev-tools/mage/pkgtypes.go b/dev-tools/mage/pkgtypes.go index 5c6b32c0083..e0b977c0a4c 100644 --- a/dev-tools/mage/pkgtypes.go +++ b/dev-tools/mage/pkgtypes.go @@ -14,6 +14,7 @@ import ( "io" "io/fs" "log" + "math" "os" "path/filepath" "reflect" @@ -39,13 +40,13 @@ const ( packageStagingDir = "build/package" // defaultBinaryName specifies the output file for zip and tar.gz. - defaultBinaryName = "{{.Name}}{{if .Qualifier}}-{{.Qualifier}}{{end}}-{{.Version}}{{if .Snapshot}}-SNAPSHOT{{end}}{{if .OS}}-{{.OS}}{{end}}{{if .Arch}}-{{.Arch}}{{end}}" + defaultBinaryName = "{{.Name}}{{if .Qualifier}}-{{.Qualifier}}{{end}}-{{.Version}}{{if .Snapshot}}-SNAPSHOT{{end}}{{if .OS}}-{{.OS}}{{end}}{{if .Arch}}-{{.Arch}}{{end}}{{if .FIPS}}-fips{{end}}" // defaultRootDir is the default name of the root directory contained inside of zip and // tar.gz packages. // NOTE: This uses .BeatName instead of .Name because we wanted the internal // directory to not include "-oss". - defaultRootDir = "{{.BeatName}}{{if .Qualifier}}-{{.Qualifier}}{{end}}-{{.Version}}{{if .Snapshot}}-SNAPSHOT{{end}}{{if .OS}}-{{.OS}}{{end}}{{if .Arch}}-{{.Arch}}{{end}}" + defaultRootDir = "{{.BeatName}}{{if .Qualifier}}-{{.Qualifier}}{{end}}-{{.Version}}{{if .Snapshot}}-SNAPSHOT{{end}}{{if .OS}}-{{.OS}}{{end}}{{if .Arch}}-{{.Arch}}{{end}}{{if .FIPS}}-fips{{end}}" componentConfigMode os.FileMode = 0600 @@ -92,6 +93,7 @@ type PackageSpec struct { Arch string `yaml:"arch,omitempty"` Vendor string `yaml:"vendor,omitempty"` Snapshot bool `yaml:"snapshot"` + FIPS bool `yaml:"fips"` Version string `yaml:"version,omitempty"` License string `yaml:"license,omitempty"` URL string `yaml:"url,omitempty"` @@ -104,6 +106,7 @@ type PackageSpec struct { Qualifier string `yaml:"qualifier,omitempty"` // Optional OutputFile string `yaml:"output_file,omitempty"` // Optional ExtraVars map[string]string `yaml:"extra_vars,omitempty"` // Optional + ExtraTags []string `yaml:"extra_tags,omitempty"` // Optional evalContext map[string]interface{} packageDir string @@ -384,6 +387,12 @@ func (s PackageSpec) Evaluate(args ...map[string]interface{}) PackageSpec { s.evalContext[k] = mustExpand(v) } + if s.ExtraTags != nil { + for i, tag := range s.ExtraTags { + s.ExtraTags[i] = mustExpand(tag) + } + } + s.Name = mustExpand(s.Name) s.ServiceName = mustExpand(s.ServiceName) s.OS = mustExpand(s.OS) @@ -732,7 +741,7 @@ func runFPM(spec PackageSpec, packageType PackageType) error { } defer os.Remove(inputTar) - outputFile, err := spec.Expand("{{.Name}}-{{.Version}}{{if .Snapshot}}-SNAPSHOT{{end}}-{{.Arch}}") + outputFile, err := spec.Expand("{{.Name}}-{{.Version}}{{if .Snapshot}}-SNAPSHOT{{end}}-{{.Arch}}{{if .FIPS}}-fips{{end}}") if err != nil { return err } @@ -962,7 +971,7 @@ func addFileToTar(ar *tar.Writer, baseDir string, pkgFile PackageFile) error { } if mg.Verbose() { - log.Println("Adding", os.FileMode(header.Mode), header.Name) + log.Println("Adding", os.FileMode(mustConvertToUnit32(header.Mode)), header.Name) } if err := ar.WriteHeader(header); err != nil { return err @@ -1030,7 +1039,7 @@ func addSymlinkToTar(tmpdir string, ar *tar.Writer, baseDir string, pkgFile Pack header.Typeflag = tar.TypeSymlink if mg.Verbose() { - log.Println("Adding", os.FileMode(header.Mode), header.Name) + log.Println("Adding", os.FileMode(mustConvertToUnit32(header.Mode)), header.Name) } if err := ar.WriteHeader(header); err != nil { return err @@ -1052,3 +1061,10 @@ func PackageDocker(spec PackageSpec) error { } return b.Build() } + +func mustConvertToUnit32(i int64) uint32 { + if i > math.MaxUint32 { + panic(fmt.Sprintf("%d is bigger than math.MaxUint32", i)) + } + return uint32(i) // #nosec +} diff --git a/dev-tools/mage/settings.go b/dev-tools/mage/settings.go index 3318e361968..ab09281e015 100644 --- a/dev-tools/mage/settings.go +++ b/dev-tools/mage/settings.go @@ -88,6 +88,7 @@ var ( Snapshot bool DevBuild bool ExternalBuild bool + FIPSBuild bool versionQualified bool versionQualifier string @@ -112,6 +113,7 @@ var ( "title": func(s string) string { return cases.Title(language.English, cases.NoLower).String(s) }, "tolower": strings.ToLower, "contains": strings.Contains, + "substring": Substring, agentPackageVersionMappedFunc: AgentPackageVersion, agentManifestGeneratorMappedFunc: PackageManifest, snapshotSuffix: SnapshotSuffix, @@ -153,6 +155,11 @@ func initGlobals() { panic(fmt.Errorf("failed to parse EXTERNAL env value: %w", err)) } + FIPSBuild, err = strconv.ParseBool(EnvOr("FIPS", "false")) + if err != nil { + panic(fmt.Errorf("failed to parse FIPS env value: %w", err)) + } + versionQualifier, versionQualified = os.LookupEnv("VERSION_QUALIFIER") agentPackageVersion = EnvOr(agentPackageVersionEnvVar, "") @@ -210,6 +217,7 @@ func varMap(args ...map[string]interface{}) map[string]interface{} { "Snapshot": Snapshot, "DEV": DevBuild, "EXTERNAL": ExternalBuild, + "FIPS": FIPSBuild, "Qualifier": versionQualifier, "CI": CI, } @@ -378,6 +386,17 @@ func SnapshotSuffix() string { return GenerateSnapshotSuffix(Snapshot) } +func Substring(s string, start, length int) string { + if start < 0 || start >= len(s) { + return "" + } + end := start + length + if end > len(s) { + end = len(s) + } + return s[start:end] +} + func GenerateSnapshotSuffix(snapshot bool) string { if !snapshot { return "" diff --git a/dev-tools/packaging/package_test.go b/dev-tools/packaging/package_test.go index 41104997df1..59801d8cd4f 100644 --- a/dev-tools/packaging/package_test.go +++ b/dev-tools/packaging/package_test.go @@ -14,6 +14,8 @@ import ( "bytes" "compress/gzip" "crypto/sha512" + "debug/buildinfo" + "debug/elf" "encoding/hex" "encoding/json" "errors" @@ -33,7 +35,6 @@ import ( "github.com/cavaliergopher/rpm" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gopkg.in/yaml.v3" "github.com/elastic/elastic-agent/dev-tools/mage" v1 "github.com/elastic/elastic-agent/pkg/api/v1" @@ -70,6 +71,7 @@ var ( monitorsd = flag.Bool("monitors.d", false, "check monitors.d folder contents") rootOwner = flag.Bool("root-owner", false, "expect root to own package files") rootUserContainer = flag.Bool("root-user-container", false, "expect root in container user") + fips = flag.Bool("fips", false, "check agent binary for FIPS compliance") ) func TestRPM(t *testing.T) { @@ -91,7 +93,15 @@ func TestTar(t *testing.T) { // Regexp matches *-arch.tar.gz, but not *-arch.docker.tar.gz tars := getFiles(t, regexp.MustCompile(`-\w+\.tar\.gz$`)) for _, tar := range tars { - checkTar(t, tar) + checkTar(t, tar, false) + } +} + +func TestFIPSTar(t *testing.T) { + // Regexp matches *-arch-fips.tar.gz, but not *-arch.docker.tar.gz + tars := getFiles(t, regexp.MustCompile(`-\w+-fips\.tar\.gz$`)) + for _, tar := range tars { + checkTar(t, tar, *fips) } } @@ -154,7 +164,7 @@ func checkDeb(t *testing.T, file string, buf *bytes.Buffer) { checkSystemdUnitPermissions(t, p) } -func checkTar(t *testing.T, file string) { +func checkTar(t *testing.T, file string, fipsCheck bool) { p, err := readTar(file) if err != nil { t.Error(err) @@ -176,6 +186,9 @@ func checkTar(t *testing.T, file string) { require.NoError(t, err, "error extracting tar archive") containingDir := strings.TrimSuffix(path.Base(file), ".tar.gz") checkManifestFileContents(t, filepath.Join(tempExtractionPath, containingDir)) + if fipsCheck { + checkFIPS(t, filepath.Join(tempExtractionPath, containingDir)) + } }) checkSha512PackageHash(t, file) @@ -208,20 +221,7 @@ func checkZip(t *testing.T, file string) { func checkManifestFileContents(t *testing.T, extractedPackageDir string) { t.Log("Checking file manifest.yaml") - manifestReadCloser, err := os.Open(filepath.Join(extractedPackageDir, v1.ManifestFileName)) - if err != nil { - t.Errorf("opening manifest %s : %v", v1.ManifestFileName, err) - } - defer func(closer io.ReadCloser) { - err := closer.Close() - assert.NoError(t, err, "error closing manifest file") - }(manifestReadCloser) - - var m v1.PackageManifest - err = yaml.NewDecoder(manifestReadCloser).Decode(&m) - if err != nil { - t.Errorf("unmarshaling package manifest: %v", err) - } + m := parseManifest(t, extractedPackageDir) assert.Equal(t, v1.ManifestKind, m.Kind, "manifest specifies wrong kind") assert.Equal(t, v1.VERSION, m.Version, "manifest specifies wrong api version") @@ -243,6 +243,23 @@ func checkManifestFileContents(t *testing.T, extractedPackageDir string) { } } +func parseManifest(t *testing.T, dir string) v1.PackageManifest { + manifestReadCloser, err := os.Open(filepath.Join(dir, v1.ManifestFileName)) + if err != nil { + t.Errorf("opening manifest %s : %v", v1.ManifestFileName, err) + } + defer func(closer io.ReadCloser) { + err := closer.Close() + assert.NoError(t, err, "error closing manifest file") + }(manifestReadCloser) + + m, err := v1.ParseManifest(manifestReadCloser) + if err != nil { + t.Errorf("unmarshaling package manifest: %v", err) + } + return *m +} + const ( npcapLicense = `Dependency : Npcap \(https://nmap.org/npcap/\)` libpcapLicense = `Dependency : Libpcap \(http://www.tcpdump.org/\)` @@ -584,6 +601,55 @@ func checkDockerUser(t *testing.T, p *packageFile, info *dockerInfo, expectRoot }) } +func checkFIPS(t *testing.T, extractedPackageDir string) { + t.Logf("Checking agent binary in %q for FIPS compliance", extractedPackageDir) + m := parseManifest(t, extractedPackageDir) + versionedHome := m.Package.VersionedHome + require.DirExistsf(t, filepath.Join(extractedPackageDir, versionedHome), " versiondedHome directory %q not found in %q", versionedHome, extractedPackageDir) + binaryPath := filepath.Join(extractedPackageDir, versionedHome, "elastic-agent") // TODO eventually we will need to support .exe as well + require.FileExistsf(t, binaryPath, "Unable to find elastic-agent executable in versioned home in %q", extractedPackageDir) + + info, err := buildinfo.ReadFile(binaryPath) + require.NoError(t, err) + + foundTags := false + foundExperiment := false + for _, setting := range info.Settings { + switch setting.Key { + case "-tags": + foundTags = true + require.Contains(t, setting.Value, "requirefips") + continue + case "GOEXPERIMENT": + foundExperiment = true + require.Contains(t, setting.Value, "systemcrypto") + continue + } + } + + require.True(t, foundTags, "Did not find -tags within binary version information") + require.True(t, foundExperiment, "Did not find GOEXPERIMENT within binary version information") + + // TODO only elf is supported at the moment, in the future we will need to use macho (darwin) and pe (windows) + f, err := elf.Open(binaryPath) + require.NoError(t, err, "unable to open ELF file") + + symbols, err := f.Symbols() + if err != nil { + t.Logf("no symbols present in %q: %v", binaryPath, err) + return + } + + hasOpenSSL := false + for _, symbol := range symbols { + if strings.Contains(symbol.Name, "OpenSSL_version") { + hasOpenSSL = true + break + } + } + require.True(t, hasOpenSSL, "unable to find OpenSSL_version symbol") +} + // ensureNoBuildIDLinks checks for regressions related to // https://github.com/elastic/beats/issues/12956. func ensureNoBuildIDLinks(t *testing.T, p *packageFile) { diff --git a/dev-tools/packaging/packages.yml b/dev-tools/packaging/packages.yml index abfe28527f0..a8f10c256c6 100644 --- a/dev-tools/packaging/packages.yml +++ b/dev-tools/packaging/packages.yml @@ -288,6 +288,8 @@ shared: # service build is based on previous cloud variant - &agent_docker_service_spec docker_variant: 'service' + extra_tags: + - 'git-{{ substring commit 0 12 }}' files: 'data/service/connectors-{{ beat_version }}{{if .Snapshot}}-SNAPSHOT{{end}}.zip': source: '{{.AgentDropPath}}/archives/{{.GOOS}}-{{.AgentArchName}}.tar.gz/connectors-{{ beat_version }}{{if .Snapshot}}-SNAPSHOT{{end}}.zip' diff --git a/internal/pkg/agent/application/coordinator/coordinator.go b/internal/pkg/agent/application/coordinator/coordinator.go index 30c7396b25e..410f22f1e47 100644 --- a/internal/pkg/agent/application/coordinator/coordinator.go +++ b/internal/pkg/agent/application/coordinator/coordinator.go @@ -791,20 +791,21 @@ func (c *Coordinator) DiagnosticHooks() diagnostics.Hooks { Description: "current state of the agent information of the running Elastic Agent", ContentType: "application/yaml", Hook: func(_ context.Context) []byte { + meta, err := c.agentInfo.ECSMetadata(c.logger) + if err != nil { + c.logger.Errorw("Error getting ECS metadata", "error.message", err) + } + output := struct { - AgentID string `yaml:"agent_id"` - Headers map[string]string `yaml:"headers"` - LogLevel string `yaml:"log_level"` - Snapshot bool `yaml:"snapshot"` - Version string `yaml:"version"` - Unprivileged bool `yaml:"unprivileged"` + Headers map[string]string `yaml:"headers"` + LogLevel string `yaml:"log_level"` + RawLogLevel string `yaml:"log_level_raw"` + Metadata *info.ECSMeta `yaml:"metadata"` }{ - AgentID: c.agentInfo.AgentID(), - Headers: c.agentInfo.Headers(), - LogLevel: c.agentInfo.LogLevel(), - Snapshot: c.agentInfo.Snapshot(), - Version: c.agentInfo.Version(), - Unprivileged: c.agentInfo.Unprivileged(), + Headers: c.agentInfo.Headers(), + LogLevel: c.agentInfo.LogLevel(), + RawLogLevel: c.agentInfo.RawLogLevel(), + Metadata: meta, } o, err := yaml.Marshal(output) if err != nil { diff --git a/internal/pkg/agent/application/coordinator/diagnostics_test.go b/internal/pkg/agent/application/coordinator/diagnostics_test.go index 95adc0408e3..b29d1884549 100644 --- a/internal/pkg/agent/application/coordinator/diagnostics_test.go +++ b/internal/pkg/agent/application/coordinator/diagnostics_test.go @@ -18,6 +18,7 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details" "github.com/elastic/elastic-agent/internal/pkg/agent/configuration" "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" @@ -27,6 +28,7 @@ import ( "github.com/elastic/elastic-agent/pkg/component" "github.com/elastic/elastic-agent/pkg/component/runtime" agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" + "github.com/elastic/elastic-agent/pkg/core/logger" "github.com/elastic/elastic-agent/pkg/utils/broadcaster" ) @@ -144,21 +146,61 @@ func TestDiagnosticAgentInfo(t *testing.T) { "header1": "value1", "header2": "value2", }, - logLevel: "trace", - snapshot: true, - version: "8.14.0", - unprivileged: true, + logLevel: "trace", + meta: &info.ECSMeta{ + Elastic: &info.ElasticECSMeta{ + Agent: &info.AgentECSMeta{ + BuildOriginal: "8.14.0-SNAPSHOT", + ID: "agent-id", + LogLevel: "trace", + Snapshot: true, + Version: "8.14.0", + Unprivileged: true, + Upgradeable: true, + }, + }, + Host: &info.HostECSMeta{ + Arch: "arm64", + Hostname: "Test-Macbook-Pro.local", + }, + OS: &info.SystemECSMeta{ + Name: "macos", + Platform: "darwin", + }, + }, }} expected := ` -agent_id: agent-id headers: header1: value1 header2: value2 log_level: trace -snapshot: true -version: 8.14.0 -unprivileged: true +log_level_raw: trace +metadata: + elastic: + agent: + buildoriginal: "8.14.0-SNAPSHOT" + complete: false + id: agent-id + loglevel: trace + snapshot: true + unprivileged: true + upgradeable: true + version: 8.14.0 + host: + arch: arm64 + hostname: Test-Macbook-Pro.local + name: "" + id: "" + ip: [] + mac: [] + os: + family: "" + kernel: "" + platform: darwin + version: "" + name: macos + fullname: "" ` hook, ok := diagnosticHooksMap(coord)["agent-info"] @@ -606,6 +648,7 @@ type fakeAgentInfo struct { version string unprivileged bool isStandalone bool + meta *info.ECSMeta } func (a fakeAgentInfo) AgentID() string { @@ -640,5 +683,9 @@ func (a fakeAgentInfo) IsStandalone() bool { return a.isStandalone } +func (a fakeAgentInfo) ECSMetadata(l *logger.Logger) (*info.ECSMeta, error) { + return a.meta, nil +} + func (a fakeAgentInfo) ReloadID(ctx context.Context) error { panic("implement me") } func (a fakeAgentInfo) SetLogLevel(ctx context.Context, level string) error { panic("implement me") } diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index a8980a3e08e..719d305f614 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -37,21 +37,24 @@ const ( fleetStateStarting = "starting" ) +// Default backoff settings for connecting to Fleet +var defaultFleetBackoffSettings = backoffSettings{ + Init: 60 * time.Second, + Max: 10 * time.Minute, +} + // Default Configuration for the Fleet Gateway. var defaultGatewaySettings = &fleetGatewaySettings{ Duration: 1 * time.Second, // time between successful calls Jitter: 500 * time.Millisecond, // used as a jitter for duration ErrConsecutiveUnauthDuration: 1 * time.Hour, // time between calls when the agent exceeds unauthorized response limit - Backoff: backoffSettings{ // time after a failed call - Init: 60 * time.Second, - Max: 10 * time.Minute, - }, + Backoff: &defaultFleetBackoffSettings, } type fleetGatewaySettings struct { - Duration time.Duration `config:"checkin_frequency"` - Jitter time.Duration `config:"jitter"` - Backoff backoffSettings `config:"backoff"` + Duration time.Duration `config:"checkin_frequency"` + Jitter time.Duration `config:"jitter"` + Backoff *backoffSettings `config:"backoff"` ErrConsecutiveUnauthDuration time.Duration } @@ -136,11 +139,18 @@ func (f *FleetGateway) Actions() <-chan []fleetapi.Action { } func (f *FleetGateway) Run(ctx context.Context) error { - backoff := backoff.NewEqualJitterBackoff( - ctx.Done(), - f.settings.Backoff.Init, - f.settings.Backoff.Max, - ) + var requestBackoff backoff.Backoff + if f.settings.Backoff == nil { + requestBackoff = RequestBackoff(ctx.Done()) + } else { + // this is only used in tests + requestBackoff = backoff.NewEqualJitterBackoff( + ctx.Done(), + f.settings.Backoff.Init, + f.settings.Backoff.Max, + ) + } + f.log.Info("Fleet gateway started") for { select { @@ -154,7 +164,7 @@ func (f *FleetGateway) Run(ctx context.Context) error { // Execute the checkin call and for any errors returned by the fleet-server API // the function will retry to communicate with fleet-server with an exponential delay and some // jitter to help better distribute the load from a fleet of agents. - resp, err := f.doExecute(ctx, backoff) + resp, err := f.doExecute(ctx, requestBackoff) if err != nil { continue } @@ -428,3 +438,11 @@ func agentStateToString(state agentclient.State) string { // Unknown states map to degraded. return fleetStateDegraded } + +func RequestBackoff(done <-chan struct{}) backoff.Backoff { + return backoff.NewEqualJitterBackoff( + done, + defaultFleetBackoffSettings.Init, + defaultFleetBackoffSettings.Max, + ) +} diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go index c5221134007..217ac1ca457 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go @@ -124,7 +124,7 @@ func TestFleetGateway(t *testing.T) { agentInfo := &testAgentInfo{} settings := &fleetGatewaySettings{ Duration: 5 * time.Second, - Backoff: backoffSettings{Init: 1 * time.Second, Max: 5 * time.Second}, + Backoff: &backoffSettings{Init: 1 * time.Second, Max: 5 * time.Second}, } t.Run("send no event and receive no action", withGateway(agentInfo, settings, func( @@ -274,7 +274,7 @@ func TestFleetGateway(t *testing.T) { log, &fleetGatewaySettings{ Duration: d, - Backoff: backoffSettings{Init: 1 * time.Second, Max: 30 * time.Second}, + Backoff: &backoffSettings{Init: 1 * time.Second, Max: 30 * time.Second}, }, agentInfo, client, @@ -383,7 +383,7 @@ func TestRetriesOnFailures(t *testing.T) { agentInfo := &testAgentInfo{} settings := &fleetGatewaySettings{ Duration: 5 * time.Second, - Backoff: backoffSettings{Init: 100 * time.Millisecond, Max: 5 * time.Second}, + Backoff: &backoffSettings{Init: 100 * time.Millisecond, Max: 5 * time.Second}, } t.Run("When the gateway fails to communicate with the checkin API we will retry", @@ -434,7 +434,7 @@ func TestRetriesOnFailures(t *testing.T) { t.Run("The retry loop is interruptible", withGateway(agentInfo, &fleetGatewaySettings{ Duration: 0 * time.Second, - Backoff: backoffSettings{Init: 10 * time.Minute, Max: 20 * time.Minute}, + Backoff: &backoffSettings{Init: 10 * time.Minute, Max: 20 * time.Minute}, }, func( t *testing.T, gateway coordinator.FleetGateway, @@ -587,7 +587,7 @@ func TestFleetGatewaySchedulerSwitch(t *testing.T) { agentInfo := &testAgentInfo{} settings := &fleetGatewaySettings{ Duration: 1 * time.Second, - Backoff: backoffSettings{Init: 1 * time.Millisecond, Max: 2 * time.Millisecond}, + Backoff: &backoffSettings{Init: 1 * time.Millisecond, Max: 2 * time.Millisecond}, } tempSet := *defaultGatewaySettings diff --git a/internal/pkg/agent/application/info/agent_info.go b/internal/pkg/agent/application/info/agent_info.go index 56371dbf121..42f65df573a 100644 --- a/internal/pkg/agent/application/info/agent_info.go +++ b/internal/pkg/agent/application/info/agent_info.go @@ -43,6 +43,9 @@ type Agent interface { // IsStandalone returns true is the agent is running in standalone mode, i.e, without fleet IsStandalone() bool + + // ECSMetadata returns the ECS metadata that is attached as part of every Fleet checkin. + ECSMetadata(*logger.Logger) (*ECSMeta, error) } // AgentInfo is a collection of information about agent. diff --git a/internal/pkg/agent/application/monitoring/processes_cloud_test.go b/internal/pkg/agent/application/monitoring/processes_cloud_test.go index f4d6bc904e3..7f83c755ba1 100644 --- a/internal/pkg/agent/application/monitoring/processes_cloud_test.go +++ b/internal/pkg/agent/application/monitoring/processes_cloud_test.go @@ -90,7 +90,6 @@ func TestExpectedCloudProcessID(t *testing.T) { } for _, tc := range testcases { - tc := tc // make a copy to avoid implicit memory aliasing t.Run(tc.name, func(t *testing.T) { assert.Equal(t, tc.id, expectedCloudProcessID(&tc.component)) }) @@ -143,7 +142,6 @@ func TestMatchesCloudProcessID(t *testing.T) { } for _, tc := range testcases { - tc := tc // make a copy to avoid implicit memory aliasing t.Run(tc.name, func(t *testing.T) { assert.Equal(t, tc.matches, matchesCloudProcessID(&tc.component, tc.processID)) }) diff --git a/internal/pkg/agent/application/monitoring/v1_monitor.go b/internal/pkg/agent/application/monitoring/v1_monitor.go index 55e9a4dc3f9..d919febfa24 100644 --- a/internal/pkg/agent/application/monitoring/v1_monitor.go +++ b/internal/pkg/agent/application/monitoring/v1_monitor.go @@ -8,6 +8,7 @@ import ( "crypto/sha256" "fmt" "math" + "net" "net/url" "os" "path/filepath" @@ -35,12 +36,11 @@ const ( // args: pipeline name, application name agentMbEndpointFileFormatWin = `npipe:///elastic-agent` - // agentMbEndpointHTTP is used with cloud and exposes metrics on http endpoint - agentMbEndpointHTTP = "http://%s:%d" - httpPlusPrefix = "http+" - httpPrefix = "http" - fileSchemePrefix = "file" - unixSchemePrefix = "unix" + + httpPlusPrefix = "http+" + httpPrefix = "http" + fileSchemePrefix = "file" + unixSchemePrefix = "unix" defaultOutputName = "default" outputsKey = "outputs" @@ -1127,7 +1127,7 @@ func HttpPlusAgentMonitoringEndpoint(operatingSystem string, cfg *monitoringCfg. // AgentMonitoringEndpoint provides an agent monitoring endpoint path. func AgentMonitoringEndpoint(operatingSystem string, cfg *monitoringCfg.MonitoringConfig) string { if cfg != nil && cfg.Enabled { - return fmt.Sprintf(agentMbEndpointHTTP, cfg.HTTP.Host, cfg.HTTP.Port) + return "http://" + net.JoinHostPort(cfg.HTTP.Host, strconv.Itoa(cfg.HTTP.Port)) } if operatingSystem == windowsOS { diff --git a/internal/pkg/agent/cmd/enroll.go b/internal/pkg/agent/cmd/enroll.go index f7e7af52046..73f976b6048 100644 --- a/internal/pkg/agent/cmd/enroll.go +++ b/internal/pkg/agent/cmd/enroll.go @@ -559,6 +559,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command) error { &options, pathConfigFile, store, + nil, ) if err != nil { return err diff --git a/internal/pkg/agent/cmd/enroll_cmd.go b/internal/pkg/agent/cmd/enroll_cmd.go index 19426c06d6b..1765742311f 100644 --- a/internal/pkg/agent/cmd/enroll_cmd.go +++ b/internal/pkg/agent/cmd/enroll_cmd.go @@ -11,8 +11,10 @@ import ( "fmt" "io" "math/rand/v2" + "net" "os" "os/exec" + "strconv" "strings" "time" @@ -54,8 +56,8 @@ const ( defaultFleetServerPort = 8220 defaultFleetServerInternalHost = "localhost" defaultFleetServerInternalPort = 8221 - enrollBackoffInit = time.Second - enrollBackoffMax = 10 * time.Second + enrollBackoffInit = time.Second * 5 + enrollBackoffMax = time.Minute * 10 ) var ( @@ -69,13 +71,14 @@ type saver interface { // enrollCmd is an enroll subcommand that interacts between the Kibana API and the Agent. type enrollCmd struct { - log *logger.Logger - options *enrollCmdOption - client fleetclient.Sender - configStore saver - remoteConfig remote.Config - agentProc *process.Info - configPath string + log *logger.Logger + options *enrollCmdOption + client fleetclient.Sender + configStore saver + remoteConfig remote.Config + agentProc *process.Info + configPath string + backoffFactory func(done <-chan struct{}) backoff.Backoff // For testability daemonReloadFunc func(context.Context) error @@ -178,13 +181,20 @@ func newEnrollCmd( options *enrollCmdOption, configPath string, store saver, + backoffFactory func(done <-chan struct{}) backoff.Backoff, ) (*enrollCmd, error) { + if backoffFactory == nil { + backoffFactory = func(done <-chan struct{}) backoff.Backoff { + return backoff.NewEqualJitterBackoff(done, enrollBackoffInit, enrollBackoffMax) + } + } return &enrollCmd{ log: log, options: options, configStore: store, configPath: configPath, daemonReloadFunc: daemonReload, + backoffFactory: backoffFactory, }, nil } @@ -433,7 +443,7 @@ func (c *enrollCmd) prepareFleetTLS() error { if c.options.FleetServer.Host == "" { c.options.FleetServer.Host = defaultFleetServerInternalHost } - c.options.URL = fmt.Sprintf("http://%s:%d", host, port) + c.options.URL = "http://" + net.JoinHostPort(host, strconv.Itoa(int(port))) c.options.Insecure = true return nil } @@ -453,7 +463,7 @@ func (c *enrollCmd) prepareFleetTLS() error { } c.options.FleetServer.Cert = string(pair.Crt) c.options.FleetServer.CertKey = string(pair.Key) - c.options.URL = fmt.Sprintf("https://%s:%d", hostname, port) + c.options.URL = "https://" + net.JoinHostPort(hostname, strconv.Itoa(int(port))) c.options.CAs = []string{string(ca.Crt())} } // running with custom Cert and CertKey; URL is required to be set @@ -465,7 +475,7 @@ func (c *enrollCmd) prepareFleetTLS() error { if c.options.FleetServer.InternalPort != defaultFleetServerInternalPort { c.log.Warnf("Internal endpoint configured to: %d. Changing this value is not supported.", c.options.FleetServer.InternalPort) } - c.options.InternalURL = fmt.Sprintf("%s:%d", defaultFleetServerInternalHost, c.options.FleetServer.InternalPort) + c.options.InternalURL = net.JoinHostPort(defaultFleetServerInternalHost, strconv.Itoa(int(c.options.FleetServer.InternalPort))) } return nil @@ -531,7 +541,7 @@ func (c *enrollCmd) enrollWithBackoff(ctx context.Context, persistentConfig map[ signal := make(chan struct{}) defer close(signal) - backExp := backoff.NewExpBackoff(signal, enrollBackoffInit, enrollBackoffMax) + backExp := c.backoffFactory(signal) for { retry := false diff --git a/internal/pkg/agent/cmd/enroll_cmd_test.go b/internal/pkg/agent/cmd/enroll_cmd_test.go index 6131c1de959..1a6d94ead00 100644 --- a/internal/pkg/agent/cmd/enroll_cmd_test.go +++ b/internal/pkg/agent/cmd/enroll_cmd_test.go @@ -110,6 +110,7 @@ func TestEnroll(t *testing.T) { }, "", store, + nil, ) require.NoError(t, err) @@ -181,6 +182,7 @@ func TestEnroll(t *testing.T) { &enrollOptions, "", store, + nil, ) require.NoError(t, err, "could not create enroll command") @@ -254,6 +256,7 @@ func TestEnroll(t *testing.T) { }, "", store, + nil, ) require.NoError(t, err) @@ -316,6 +319,7 @@ func TestEnroll(t *testing.T) { }, "", store, + nil, ) require.NoError(t, err) @@ -380,6 +384,7 @@ func TestEnroll(t *testing.T) { }, "", store, + nil, ) require.NoError(t, err) @@ -424,6 +429,7 @@ func TestEnroll(t *testing.T) { }, "", store, + nil, ) require.NoError(t, err) @@ -492,6 +498,7 @@ func TestEnroll(t *testing.T) { }, "", store, + nil, ) require.NoError(t, err) diff --git a/internal/pkg/agent/cmd/run.go b/internal/pkg/agent/cmd/run.go index 82ae7bf04ff..f91f2a37790 100644 --- a/internal/pkg/agent/cmd/run.go +++ b/internal/pkg/agent/cmd/run.go @@ -16,6 +16,8 @@ import ( "syscall" "time" + fleetgateway "github.com/elastic/elastic-agent/internal/pkg/agent/application/gateway/fleet" + "go.elastic.co/apm/v2" apmtransport "go.elastic.co/apm/v2/transport" "gopkg.in/yaml.v2" @@ -545,6 +547,7 @@ func tryDelayEnroll(ctx context.Context, logger *logger.Logger, cfg *configurati &options, paths.ConfigFile(), store, + fleetgateway.RequestBackoff, // for delayed enroll, we want to use the same backoff settings as fleet checkins ) if err != nil { return nil, err diff --git a/internal/pkg/agent/configuration/grpc.go b/internal/pkg/agent/configuration/grpc.go index 896b4ab3eb9..d7220672079 100644 --- a/internal/pkg/agent/configuration/grpc.go +++ b/internal/pkg/agent/configuration/grpc.go @@ -5,7 +5,7 @@ package configuration import ( - "fmt" + "net" "os" "strconv" @@ -71,7 +71,7 @@ func OverrideDefaultContainerGRPCPort(cfg *GRPCConfig) { // String returns the composed listen address for the GRPC. func (cfg *GRPCConfig) String() string { - return fmt.Sprintf("%s:%d", cfg.Address, cfg.Port) + return net.JoinHostPort(cfg.Address, strconv.Itoa(int(cfg.Port))) } // IsLocal returns true if port value is less than 0 diff --git a/internal/pkg/agent/configuration/grpc_test.go b/internal/pkg/agent/configuration/grpc_test.go index 65d02ad4644..27043e277e3 100644 --- a/internal/pkg/agent/configuration/grpc_test.go +++ b/internal/pkg/agent/configuration/grpc_test.go @@ -42,3 +42,40 @@ func TestOverrideDefaultGRPCPort(t *testing.T) { }) } } + +func TestGRPCAddr(t *testing.T) { + testcases := []struct { + name string + addr string + port uint16 + expected string + }{{ + name: "ipv4", + addr: "127.0.0.1", + expected: "127.0.0.1:0", + }, { + name: "ipv4+port", + addr: "127.0.0.1", + port: 1, + expected: "127.0.0.1:1", + }, { + name: "ipv6", + addr: "::1", + expected: "[::1]:0", + }, { + name: "ipv6+port", + addr: "::1", + port: 1, + expected: "[::1]:1", + }} + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + cfg := GRPCConfig{ + Address: tc.addr, + Port: tc.port, + } + assert.Equal(t, tc.expected, cfg.String()) + }) + } +} diff --git a/internal/pkg/agent/transpiler/ast.go b/internal/pkg/agent/transpiler/ast.go index bacf1be208c..bb9a2e03866 100644 --- a/internal/pkg/agent/transpiler/ast.go +++ b/internal/pkg/agent/transpiler/ast.go @@ -105,6 +105,9 @@ func NewDictWithProcessors(nodes []Node, processors Processors) *Dict { // Find takes a string which is a key and try to find the elements in the associated K/V. func (d *Dict) Find(key string) (Node, bool) { for _, i := range d.value { + if i == nil { + continue + } if i.(*Key).name == key { return i, true } @@ -119,9 +122,12 @@ func (d *Dict) Insert(node Node) { func (d *Dict) String() string { var sb strings.Builder - for i := 0; i < len(d.value); i++ { + for i, node := range d.value { + if node == nil { + continue + } sb.WriteString("{") - sb.WriteString(d.value[i].String()) + sb.WriteString(node.String()) sb.WriteString("}") if i < len(d.value)-1 { sb.WriteString(",") @@ -166,6 +172,9 @@ func (d *Dict) ShallowClone() Node { func (d *Dict) Hash() []byte { h := sha256.New() for _, v := range d.value { + if v == nil { + continue + } h.Write(v.Hash()) } return h.Sum(nil) @@ -174,6 +183,9 @@ func (d *Dict) Hash() []byte { // Hash64With recursively computes the given hash for the Node and its children func (d *Dict) Hash64With(h *xxhash.Digest) error { for _, v := range d.value { + if v == nil { + continue + } if err := v.Hash64With(h); err != nil { return err } @@ -184,6 +196,9 @@ func (d *Dict) Hash64With(h *xxhash.Digest) error { // Vars returns a list of all variables referenced in the dictionary. func (d *Dict) Vars(vars []string, defaultProvider string) []string { for _, v := range d.value { + if v == nil { + continue + } k := v.(*Key) vars = k.Vars(vars, defaultProvider) } @@ -194,6 +209,9 @@ func (d *Dict) Vars(vars []string, defaultProvider string) []string { func (d *Dict) Apply(vars *Vars) (Node, error) { nodes := make([]Node, 0, len(d.value)) for _, v := range d.value { + if v == nil { + continue + } k := v.(*Key) n, err := k.Apply(vars) if err != nil { @@ -222,6 +240,9 @@ func (d *Dict) Processors() Processors { return d.processors } for _, v := range d.value { + if v == nil { + continue + } if p := v.Processors(); p != nil { return p } @@ -387,8 +408,11 @@ func NewListWithProcessors(nodes []Node, processors Processors) *List { func (l *List) String() string { var sb strings.Builder sb.WriteString("[") - for i := 0; i < len(l.value); i++ { - sb.WriteString(l.value[i].String()) + for i, v := range l.value { + if v == nil { + continue + } + sb.WriteString(v.String()) if i < len(l.value)-1 { sb.WriteString(",") } @@ -401,6 +425,9 @@ func (l *List) String() string { func (l *List) Hash() []byte { h := sha256.New() for _, v := range l.value { + if v == nil { + continue + } h.Write(v.Hash()) } @@ -410,6 +437,9 @@ func (l *List) Hash() []byte { // Hash64With recursively computes the given hash for the Node and its children func (l *List) Hash64With(h *xxhash.Digest) error { for _, v := range l.value { + if v == nil { + continue + } if err := v.Hash64With(h); err != nil { return err } @@ -465,6 +495,9 @@ func (l *List) ShallowClone() Node { // Vars returns a list of all variables referenced in the list. func (l *List) Vars(vars []string, defaultProvider string) []string { for _, v := range l.value { + if v == nil { + continue + } vars = v.Vars(vars, defaultProvider) } return vars @@ -474,6 +507,9 @@ func (l *List) Vars(vars []string, defaultProvider string) []string { func (l *List) Apply(vars *Vars) (Node, error) { nodes := make([]Node, 0, len(l.value)) for _, v := range l.value { + if v == nil { + continue + } n, err := v.Apply(vars) if err != nil { return nil, err @@ -492,6 +528,9 @@ func (l *List) Processors() Processors { return l.processors } for _, v := range l.value { + if v == nil { + continue + } if p := v.Processors(); p != nil { return p } diff --git a/internal/pkg/agent/transpiler/ast_test.go b/internal/pkg/agent/transpiler/ast_test.go index 3ea626c8624..624df1f6232 100644 --- a/internal/pkg/agent/transpiler/ast_test.go +++ b/internal/pkg/agent/transpiler/ast_test.go @@ -9,6 +9,8 @@ import ( "reflect" "testing" + "github.com/cespare/xxhash/v2" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent/internal/pkg/eql" @@ -1206,6 +1208,49 @@ func TestCondition(t *testing.T) { assert.Nil(t, input2.condition) } +// check that all the methods handle nil values correctly +func TestNullValues(t *testing.T) { + cfgMap := map[string]any{ + "inputs": map[string]any{ + "dict": map[string]any{ + "key": nil, + }, + "list": []any{nil}, + }, + } + ast, err := NewAST(cfgMap) + require.NoError(t, err) + inputs, ok := Lookup(ast, "inputs") + require.True(t, ok) + + assert.NotEmpty(t, inputs.String()) + + node, ok := inputs.Find("dict") + assert.True(t, ok) + assert.NotNil(t, node) + + assert.NotNil(t, inputs.Value()) + + assert.NotNil(t, inputs.Clone()) + + assert.NotNil(t, inputs.ShallowClone()) + + assert.NotEmpty(t, inputs.Hash()) + + h := xxhash.New() + err = inputs.Hash64With(h) + assert.NoError(t, err) + assert.NotEmpty(t, h.Sum64()) + + assert.Empty(t, inputs.Vars([]string{}, "default")) + + newNode, err := inputs.Apply(nil) + assert.NoError(t, err) + assert.NotNil(t, newNode) + + assert.Empty(t, inputs.Processors()) +} + func mustMakeVars(mapping map[string]interface{}) *Vars { v, err := NewVars("", mapping, nil, "") if err != nil { diff --git a/internal/pkg/composable/context.go b/internal/pkg/composable/context.go index f358e0bfe32..0eadeb9f5fb 100644 --- a/internal/pkg/composable/context.go +++ b/internal/pkg/composable/context.go @@ -17,7 +17,7 @@ import ( type ContextProviderBuilder func(log *logger.Logger, config *config.Config, managed bool) (corecomp.ContextProvider, error) // MustAddContextProvider adds a new ContextProviderBuilder and panics if it AddContextProvider returns an error. -func (r *providerRegistry) MustAddContextProvider(name string, builder ContextProviderBuilder) { +func (r *ProviderRegistry) MustAddContextProvider(name string, builder ContextProviderBuilder) { err := r.AddContextProvider(name, builder) if err != nil { panic(err) @@ -27,7 +27,7 @@ func (r *providerRegistry) MustAddContextProvider(name string, builder ContextPr // AddContextProvider adds a new ContextProviderBuilder // //nolint:dupl,goimports,nolintlint // false positive -func (r *providerRegistry) AddContextProvider(name string, builder ContextProviderBuilder) error { +func (r *ProviderRegistry) AddContextProvider(name string, builder ContextProviderBuilder) error { r.lock.Lock() defer r.lock.Unlock() @@ -55,7 +55,7 @@ func (r *providerRegistry) AddContextProvider(name string, builder ContextProvid } // GetContextProvider returns the context provider with the giving name, nil if it doesn't exist -func (r *providerRegistry) GetContextProvider(name string) (ContextProviderBuilder, bool) { +func (r *ProviderRegistry) GetContextProvider(name string) (ContextProviderBuilder, bool) { r.lock.RLock() defer r.lock.RUnlock() diff --git a/internal/pkg/composable/controller.go b/internal/pkg/composable/controller.go index 453a3d47fb7..61402d7f855 100644 --- a/internal/pkg/composable/controller.go +++ b/internal/pkg/composable/controller.go @@ -80,8 +80,13 @@ type controller struct { dynamicProviderStates map[string]*dynamicProviderState } -// New creates a new controller. +// New creates a new controller with the global set of providers. func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error) { + return NewWithProviders(log, c, managed, Providers) +} + +// NewWithProviders creates a new controller with the given set of providers. +func NewWithProviders(log *logger.Logger, c *config.Config, managed bool, providers *ProviderRegistry) (Controller, error) { l := log.Named("composable") var providersCfg Config @@ -110,7 +115,7 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error) // build all the context providers contextProviders := map[string]contextProvider{} - for name, builder := range Providers.contextProviders { + for name, builder := range providers.contextProviders { pCfg, ok := providersCfg.Providers[name] if (ok && !pCfg.Enabled()) || (!ok && !providersInitialDefault) { // explicitly disabled; skipping @@ -124,7 +129,7 @@ func New(log *logger.Logger, c *config.Config, managed bool) (Controller, error) // build all the dynamic providers dynamicProviders := map[string]dynamicProvider{} - for name, builder := range Providers.dynamicProviders { + for name, builder := range providers.dynamicProviders { pCfg, ok := providersCfg.Providers[name] if (ok && !pCfg.Enabled()) || (!ok && !providersInitialDefault) { // explicitly disabled; skipping @@ -187,45 +192,33 @@ func (c *controller) Run(ctx context.Context) error { wg.Wait() }() - // synchronize the fetch providers through a channel - var fetchProvidersLock sync.RWMutex - var fetchProviders mapstr.M - fetchCh := make(chan fetchProvider) - go func() { - for { - select { - case <-localCtx.Done(): - return - case msg := <-fetchCh: - fetchProvidersLock.Lock() - if msg.fetchProvider == nil { - _ = fetchProviders.Delete(msg.name) - } else { - _, _ = fetchProviders.Put(msg.name, msg.fetchProvider) - } - fetchProvidersLock.Unlock() - } - } - }() - - // send initial vars state - fetchProvidersLock.RLock() + // send initial vars state (empty fetch providers initially) + fetchProviders := mapstr.M{} err := c.sendVars(ctx, nil, fetchProviders) if err != nil { - fetchProvidersLock.RUnlock() // only error is context cancel, no need to add error message context return err } - fetchProvidersLock.RUnlock() // performs debounce of notifies; accumulates them into 100 millisecond chunks var observedResult chan []*transpiler.Vars + fetchCh := make(chan fetchProvider) for { DEBOUNCE: for { select { case <-ctx.Done(): return ctx.Err() + case msg := <-fetchCh: + if msg.fetchProvider == nil { + _ = fetchProviders.Delete(msg.name) + } else { + _, _ = fetchProviders.Put(msg.name, msg.fetchProvider) + } + t.Reset(100 * time.Millisecond) + c.logger.Debugf("Fetch providers state changed for composable inputs; debounce started") + drainChan(stateChangedChan) // state change trigger (no need for signal to be handled) + break DEBOUNCE case observed := <-c.observedCh: // observedResult holds the channel to send the latest observed results on // if nothing is changed then nil will be sent over the channel if the set of running @@ -235,7 +228,7 @@ func (c *controller) Run(ctx context.Context) error { if changed { t.Reset(100 * time.Millisecond) c.logger.Debugf("Observed state changed for composable inputs; debounce started") - drainChan(stateChangedChan) + drainChan(stateChangedChan) // state change trigger (no need for signal to be handled) break DEBOUNCE } else { // nothing changed send nil to alert the caller @@ -261,15 +254,12 @@ func (c *controller) Run(ctx context.Context) error { } // send the vars to the watcher or the observer caller - fetchProvidersLock.RLock() err := c.sendVars(ctx, observedResult, fetchProviders) observedResult = nil if err != nil { - fetchProvidersLock.RUnlock() // only error is context cancel, no need to add error message context return err } - fetchProvidersLock.RUnlock() } } @@ -550,6 +540,10 @@ func (c *controller) startDynamicProvider(ctx context.Context, wg *sync.WaitGrou } func (c *controller) generateVars(fetchContextProviders mapstr.M, defaultProvider string) []*transpiler.Vars { + // copy fetch providers map so they cannot change in the context + // of the currently processed variables + fetchContextProviders = fetchContextProviders.Clone() + // build the vars list of mappings vars := make([]*transpiler.Vars, 1) mapping, _ := transpiler.NewAST(map[string]any{}) diff --git a/internal/pkg/composable/controller_test.go b/internal/pkg/composable/controller_test.go index 5bedf879154..75df28069d4 100644 --- a/internal/pkg/composable/controller_test.go +++ b/internal/pkg/composable/controller_test.go @@ -8,6 +8,7 @@ import ( "context" "errors" "fmt" + "strings" "testing" "time" @@ -17,6 +18,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/transpiler" "github.com/elastic/elastic-agent/internal/pkg/composable" "github.com/elastic/elastic-agent/internal/pkg/config" + corecomp "github.com/elastic/elastic-agent/internal/pkg/core/composable" "github.com/elastic/elastic-agent/pkg/core/logger" _ "github.com/elastic/elastic-agent/internal/pkg/composable/providers/env" @@ -142,6 +144,81 @@ func TestController(t *testing.T) { assert.Len(t, vars3map, 0) // should be empty after empty Observe } +func TestControllerWithFetchProvider(t *testing.T) { + providers := composable.NewProviderRegistry() + providers.MustAddContextProvider("custom_fetch", func(_ *logger.Logger, _ *config.Config, _ bool) (corecomp.ContextProvider, error) { + // add a delay to ensure that even if it takes time to start the provider that it still gets placed + // as a fetch provider + <-time.After(1 * time.Second) + return &customFetchProvider{}, nil + }) + + cfg := config.New() + log, err := logger.New("", false) + require.NoError(t, err) + c, err := composable.NewWithProviders(log, cfg, false, providers) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + observed := false + setErr := make(chan error, 1) + go func() { + defer cancel() + for { + select { + case <-ctx.Done(): + return + case vars := <-c.Watch(): + if !observed { + vars, err = c.Observe(ctx, []string{"custom_fetch.vars.key1"}) + if err != nil { + setErr <- err + return + } + observed = true + } + if len(vars) > 0 { + node, err := vars[0].Replace("${custom_fetch.vars.key1}") + if err == nil { + // replace occurred so the fetch provider is now present + strNode, ok := node.(*transpiler.StrVal) + if !ok { + setErr <- fmt.Errorf("expected *transpiler.StrVal") + return + } + strVal, ok := strNode.Value().(string) + if !ok { + setErr <- fmt.Errorf("expected string") + return + } + if strVal != "vars.key1" { + setErr <- fmt.Errorf("expected replaced value error: %s != vars.key1", strVal) + return + } + // replacement worked + setErr <- nil + return + } + } + } + } + }() + + errCh := make(chan error) + go func() { + errCh <- c.Run(ctx) + }() + err = <-errCh + if errors.Is(err, context.Canceled) { + err = nil + } + require.NoError(t, err) + err = <-setErr + assert.NoError(t, err) +} + func TestProvidersDefaultDisabled(t *testing.T) { tests := []struct { name string @@ -425,3 +502,21 @@ func TestDefaultProvider(t *testing.T) { assert.Equal(t, "custom", c.DefaultProvider()) }) } + +type customFetchProvider struct{} + +func (c *customFetchProvider) Run(ctx context.Context, comm corecomp.ContextProviderComm) error { + <-ctx.Done() + return ctx.Err() +} + +func (c *customFetchProvider) Fetch(key string) (string, bool) { + tokens := strings.SplitN(key, ".", 2) + if len(tokens) > 0 && tokens[0] != "custom_fetch" { + return "", false + } + return tokens[1], true +} + +// validate it registers as a fetch provider +var _ corecomp.FetchContextProvider = (*customFetchProvider)(nil) diff --git a/internal/pkg/composable/dynamic.go b/internal/pkg/composable/dynamic.go index 74071dfa5dc..e54f27fdbcb 100644 --- a/internal/pkg/composable/dynamic.go +++ b/internal/pkg/composable/dynamic.go @@ -37,7 +37,7 @@ type DynamicProvider interface { type DynamicProviderBuilder func(log *logger.Logger, config *config.Config, managed bool) (DynamicProvider, error) // MustAddDynamicProvider adds a new DynamicProviderBuilder and panics if it AddDynamicProvider returns an error. -func (r *providerRegistry) MustAddDynamicProvider(name string, builder DynamicProviderBuilder) { +func (r *ProviderRegistry) MustAddDynamicProvider(name string, builder DynamicProviderBuilder) { err := r.AddDynamicProvider(name, builder) if err != nil { panic(err) @@ -47,7 +47,7 @@ func (r *providerRegistry) MustAddDynamicProvider(name string, builder DynamicPr // AddDynamicProvider adds a new DynamicProviderBuilder // //nolint:dupl,goimports,nolintlint // false positive -func (r *providerRegistry) AddDynamicProvider(providerName string, builder DynamicProviderBuilder) error { +func (r *ProviderRegistry) AddDynamicProvider(providerName string, builder DynamicProviderBuilder) error { r.lock.Lock() defer r.lock.Unlock() @@ -72,7 +72,7 @@ func (r *providerRegistry) AddDynamicProvider(providerName string, builder Dynam } // GetDynamicProvider returns the dynamic provider with the giving name, nil if it doesn't exist -func (r *providerRegistry) GetDynamicProvider(name string) (DynamicProviderBuilder, bool) { +func (r *ProviderRegistry) GetDynamicProvider(name string) (DynamicProviderBuilder, bool) { r.lock.RLock() defer r.lock.RUnlock() diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go index 2db9d65813e..cf7a63f48ea 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets.go @@ -136,7 +136,7 @@ func (p *contextProviderK8SSecrets) Fetch(key string) (string, bool) { return "", false } if len(tokens) != 4 { - p.logger.Warn("Invalid secret key format: ", key, ". Secrets should be of the format kubernetes_secrets.namespace.secret_name.value") + p.logger.Warnf(`Invalid secret key format: %q. Secrets should be of the format kubernetes_secrets.namespace.secret_name.value`, key) return "", false } @@ -151,7 +151,9 @@ func (p *contextProviderK8SSecrets) Fetch(key string) (string, bool) { if p.config.DisableCache { // cache disabled - fetch secret from the API - return p.fetchFromAPI(ctx, secretName, secretNamespace, secretKey) + apiSecretValue, apiSecretResourceVersion, ok := p.fetchFromAPI(ctx, secretName, secretNamespace, secretKey) + p.logger.Debugf(`Fetch: %q fetched. Resource Version of secret: %q`, key, apiSecretResourceVersion) + return apiSecretValue, ok } // cache enabled @@ -162,7 +164,7 @@ func (p *contextProviderK8SSecrets) Fetch(key string) (string, bool) { } // cache miss - fetch secret from the API - apiSecretValue, apiExists := p.fetchFromAPI(ctx, secretName, secretNamespace, secretKey) + apiSecretValue, apiSecretResourceVersion, apiExists := p.fetchFromAPI(ctx, secretName, secretNamespace, secretKey) now := time.Now() sd = secret{ name: secretName, @@ -175,11 +177,13 @@ func (p *contextProviderK8SSecrets) Fetch(key string) (string, bool) { p.store.AddConditionally(key, sd, true, func(existing secret, exists bool) bool { if !exists { // no existing secret in the cache thus add it + p.logger.Infof(`Fetch: %q inserted. Resource Version of secret: %q`, key, apiSecretResourceVersion) return true } if existing.value != apiSecretValue && !existing.apiFetchTime.After(now) { // there is an existing secret in the cache but its value has changed since the last time // it was fetched from the API thus update it + p.logger.Infof(`Fetch: %q updated. Resource Version of secret: %q`, key, apiSecretResourceVersion) return true } // there is an existing secret in the cache, and it points already to the latest value @@ -199,10 +203,13 @@ func (p *contextProviderK8SSecrets) refreshCache(ctx context.Context, comm corec case <-ctx.Done(): return case <-timer.C: + p.logger.Info("Cache: refresh started") hasUpdate := p.updateSecrets(ctx) if hasUpdate { - p.logger.Info("Secrets cache was updated, the agent will be notified.") + p.logger.Info("Cache: refresh ended with updates, agent will be notified") comm.Signal() + } else { + p.logger.Info("Cache: refresh ended without updates") } timer.Reset(p.config.RefreshInterval) } @@ -220,11 +227,12 @@ func (p *contextProviderK8SSecrets) updateSecrets(ctx context.Context) bool { sd, exists := p.store.Get(key, false) if !exists { // this item has expired thus mark that the cache has updates and continue + p.logger.Infof(`Cache: %q expired`, key) hasUpdates = true continue } - apiSecretValue, apiExists := p.fetchFromAPI(ctx, sd.name, sd.namespace, sd.key) + apiSecretValue, apiResourceVersion, apiExists := p.fetchFromAPI(ctx, sd.name, sd.namespace, sd.key) now := time.Now() sd = secret{ name: sd.name, @@ -247,6 +255,7 @@ func (p *contextProviderK8SSecrets) updateSecrets(ctx context.Context) bool { // the secret value has changed and the above fetchFromAPI is more recent thus // add it and mark that the cache has updates hasUpdates = true + p.logger.Infof(`Cache: %q updated. Resource Version of secret: %q`, key, apiResourceVersion) return true } // the secret value has not changed @@ -258,7 +267,7 @@ func (p *contextProviderK8SSecrets) updateSecrets(ctx context.Context) bool { } // fetchFromAPI fetches the secret value from the API -func (p *contextProviderK8SSecrets) fetchFromAPI(ctx context.Context, secretName string, secretNamespace string, secretKey string) (string, bool) { +func (p *contextProviderK8SSecrets) fetchFromAPI(ctx context.Context, secretName string, secretNamespace string, secretKey string) (string, string, bool) { ctx, cancel := context.WithTimeout(ctx, p.config.RequestTimeout) defer cancel() @@ -266,7 +275,8 @@ func (p *contextProviderK8SSecrets) fetchFromAPI(ctx context.Context, secretName if p.client == nil { // k8s client is nil most probably due to an error at p.Run p.clientMtx.RUnlock() - return "", false + p.logger.Warnf(`Could not retrieve secret %q at namespace %q because k8s client is nil`, secretName, secretNamespace) + return "", "", false } c := p.client p.clientMtx.RUnlock() @@ -274,14 +284,15 @@ func (p *contextProviderK8SSecrets) fetchFromAPI(ctx context.Context, secretName si := c.CoreV1().Secrets(secretNamespace) secret, err := si.Get(ctx, secretName, metav1.GetOptions{}) if err != nil { - p.logger.Warn("Could not retrieve secret ", secretName, " at namespace ", secretNamespace, ": ", err.Error()) - return "", false + p.logger.Warnf(`Could not retrieve secret %q at namespace %q: %s`, secretName, secretNamespace, err.Error()) + return "", "", false } - if _, ok := secret.Data[secretKey]; !ok { - p.logger.Warn("Could not retrieve value of key ", secretKey, " for secret ", secretName, " at namespace ", secretNamespace) - return "", false + secretData, ok := secret.Data[secretKey] + if !ok { + p.logger.Warnf(`Could not retrieve value of key %q for secret %q at namespace %q because it does not exist`, secretKey, secretName, secretNamespace) + return "", "", false } - return string(secret.Data[secretKey]), true + return string(secretData), secret.GetResourceVersion(), true } diff --git a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go index 262714d425f..7307fd33822 100644 --- a/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go +++ b/internal/pkg/composable/providers/kubernetessecrets/kubernetes_secrets_test.go @@ -13,6 +13,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" @@ -690,7 +692,7 @@ func Test_Run(t *testing.T) { providerCfg: Config{ RefreshInterval: 100 * time.Millisecond, RequestTimeout: 100 * time.Millisecond, - TTLDelete: 2 * time.Second, + TTLDelete: 10 * time.Second, DisableCache: false, }, expectedSignal: true, @@ -712,7 +714,7 @@ func Test_Run(t *testing.T) { providerCfg: Config{ RefreshInterval: 100 * time.Millisecond, RequestTimeout: 100 * time.Millisecond, - TTLDelete: 2 * time.Second, + TTLDelete: 10 * time.Second, DisableCache: false, }, expectedSignal: false, @@ -738,7 +740,7 @@ func Test_Run(t *testing.T) { DisableCache: false, }, expectedSignal: true, - waitForSignal: time.Second, + waitForSignal: 2 * time.Second, k8sClient: k8sfake.NewClientset( testDataBuilder.buildK8SSecret("secret_value"), ), @@ -754,7 +756,7 @@ func Test_Run(t *testing.T) { providerCfg: Config{ RefreshInterval: 100 * time.Millisecond, RequestTimeout: 100 * time.Millisecond, - TTLDelete: 2 * time.Second, + TTLDelete: 10 * time.Second, DisableCache: false, }, expectedSignal: false, @@ -775,7 +777,7 @@ func Test_Run(t *testing.T) { providerCfg: Config{ RefreshInterval: 100 * time.Millisecond, RequestTimeout: 100 * time.Millisecond, - TTLDelete: 2 * time.Second, + TTLDelete: 10 * time.Second, DisableCache: false, }, k8sClient: nil, @@ -844,13 +846,11 @@ func Test_Run(t *testing.T) { receivedSignal = true case <-time.After(tc.waitForSignal): } + list := p.store.List() cancel() - wg.Wait() require.Equal(t, tc.expectedSignal, receivedSignal) - - list := p.store.List() require.Equal(t, len(tc.postCacheState), len(list)) cacheMap := make(map[string]secret) @@ -860,12 +860,12 @@ func Test_Run(t *testing.T) { for k, v := range tc.postCacheState { inCache, exists := cacheMap[k] require.True(t, exists) - require.Equal(t, v.s.key, inCache.key) - require.Equal(t, v.s.name, inCache.name) - require.Equal(t, v.s.namespace, inCache.namespace) - require.Equal(t, v.s.key, inCache.key) - require.Equal(t, v.s.value, inCache.value) - require.Equal(t, v.s.apiExists, inCache.apiExists) + assert.Equal(t, v.s.key, inCache.key) + assert.Equal(t, v.s.name, inCache.name) + assert.Equal(t, v.s.namespace, inCache.namespace) + assert.Equal(t, v.s.key, inCache.key) + assert.Equal(t, v.s.value, inCache.value) + assert.Equal(t, v.s.apiExists, inCache.apiExists) } }) } @@ -935,6 +935,69 @@ func Test_Config(t *testing.T) { } } +func Test_FetchFromAPI(t *testing.T) { + for _, tc := range []struct { + name string + k8sClient k8sclient.Interface + secretName string + secretNamespace string + secretKey string + expectedValue string + expectedResourceVersion string + expectedOK bool + }{ + { + name: "k8s client is nil", + k8sClient: nil, + }, + { + name: "secret not found", + k8sClient: k8sfake.NewClientset(), + secretName: "secret_name", + secretNamespace: "secret_namespace", + secretKey: "secret_key", + }, + { + name: "key in secret not found", + k8sClient: k8sfake.NewClientset( + buildK8SSecret("secret_namespace", "secret_name", "secret_key", "secret_value"), + ), + secretName: "secret_name", + secretNamespace: "secret_namespace", + secretKey: "secret_key_not_found", + }, + { + name: "key in secret found", + k8sClient: k8sfake.NewClientset( + buildK8SSecretWithResourceVersion("secret_namespace", "secret_name", "secret_key", "secret_value", "100000"), + ), + secretName: "secret_name", + secretNamespace: "secret_namespace", + secretKey: "secret_key", + expectedValue: "secret_value", + expectedResourceVersion: "100000", + expectedOK: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + provider := &contextProviderK8SSecrets{ + logger: logp.NewLogger("test_k8s_secrets"), + config: defaultConfig(), + client: tc.k8sClient, + clientMtx: sync.RWMutex{}, + } + + val, resourceVersion, ok := provider.fetchFromAPI(ctx, tc.secretName, tc.secretNamespace, tc.secretKey) + assert.Equal(t, tc.expectedValue, val) + assert.Equal(t, tc.expectedOK, ok) + assert.Equal(t, tc.expectedResourceVersion, resourceVersion) + }) + } +} + type secretTestDataBuilder struct { namespace string name string @@ -976,14 +1039,19 @@ func buildCacheEntryKey(e *cacheEntry) string { } func buildK8SSecret(namespace string, name string, key string, value string) *v1.Secret { + return buildK8SSecretWithResourceVersion(namespace, name, key, value, "1") +} + +func buildK8SSecretWithResourceVersion(namespace string, name string, key string, value string, resourceVersion string) *v1.Secret { return &v1.Secret{ TypeMeta: metav1.TypeMeta{ Kind: "Secret", APIVersion: "apps/v1beta1", }, ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: namespace, + Name: name, + Namespace: namespace, + ResourceVersion: resourceVersion, }, Data: map[string][]byte{ key: []byte(value), diff --git a/internal/pkg/composable/registry.go b/internal/pkg/composable/registry.go index 4d01b9884b5..7854ac8ded4 100644 --- a/internal/pkg/composable/registry.go +++ b/internal/pkg/composable/registry.go @@ -10,8 +10,8 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -// providerRegistry is a registry of providers -type providerRegistry struct { +// ProviderRegistry is a registry of providers +type ProviderRegistry struct { contextProviders map[string]ContextProviderBuilder dynamicProviders map[string]DynamicProviderBuilder @@ -19,9 +19,14 @@ type providerRegistry struct { lock sync.RWMutex } -// Providers holds all known providers, they must be added to it to enable them for use -var Providers = &providerRegistry{ - contextProviders: make(map[string]ContextProviderBuilder), - dynamicProviders: make(map[string]DynamicProviderBuilder), - logger: logp.NewLogger("dynamic"), +// NewProviderRegistry creates a new provider registry. +func NewProviderRegistry() *ProviderRegistry { + return &ProviderRegistry{ + contextProviders: make(map[string]ContextProviderBuilder), + dynamicProviders: make(map[string]DynamicProviderBuilder), + logger: logp.NewLogger("composable"), + } } + +// Providers holds all known providers, they must be added to it to enable them for use +var Providers = NewProviderRegistry() diff --git a/internal/pkg/diagnostics/diagnostics.go b/internal/pkg/diagnostics/diagnostics.go index f4b0d70ed90..0c568302a9f 100644 --- a/internal/pkg/diagnostics/diagnostics.go +++ b/internal/pkg/diagnostics/diagnostics.go @@ -327,19 +327,23 @@ func writeRedacted(errOut, resultWriter io.Writer, fullFilePath string, fileResu // Should we support json too? if fileResult.ContentType == "application/yaml" { - unmarshalled := map[string]interface{}{} + var unmarshalled any err := yaml.Unmarshal(fileResult.Content, &unmarshalled) if err != nil { // Best effort, output a warning but still include the file fmt.Fprintf(errOut, "[WARNING] Could not redact %s due to unmarshalling error: %s\n", fullFilePath, err) } else { - unmarshalled = Redact(unmarshalled, errOut) - redacted, err := yaml.Marshal(unmarshalled) - if err != nil { - // Best effort, output a warning but still include the file - fmt.Fprintf(errOut, "[WARNING] Could not redact %s due to marshalling error: %s\n", fullFilePath, err) - } else { - out = &redacted + switch t := unmarshalled.(type) { // could be a plain string, we only redact if this is a proper map + case map[string]any: + t = Redact(t, errOut) + redacted, err := yaml.Marshal(t) + if err != nil { + // Best effort, output a warning but still include the file + fmt.Fprintf(errOut, "[WARNING] Could not redact %s due to marshalling error: %s\n", fullFilePath, err) + } else { + out = &redacted + } + default: } } } diff --git a/internal/pkg/diagnostics/diagnostics_test.go b/internal/pkg/diagnostics/diagnostics_test.go index 87252cbf3e5..c00fa728090 100644 --- a/internal/pkg/diagnostics/diagnostics_test.go +++ b/internal/pkg/diagnostics/diagnostics_test.go @@ -117,6 +117,19 @@ i4EFZLWrFRsAAAARYWxleGtAZ3JlbWluLm5lc3QBAg== require.NotContains(t, outWriter.String(), privKey) } +func TestRedactPlainString(t *testing.T) { + errOut := strings.Builder{} + outWriter := strings.Builder{} + inputString := "Just a string" + res := client.DiagnosticFileResult{Content: []byte(inputString), ContentType: "application/yaml"} + + err := writeRedacted(&errOut, &outWriter, "test/path", res) + require.NoError(t, err) + + require.Empty(t, errOut.String()) + require.Equal(t, outWriter.String(), inputString) +} + func TestRedactComplexKeys(t *testing.T) { // taken directly from the yaml spec: https://yaml.org/spec/1.1/#c-mapping-key // This test mostly serves to document that part of the YAML library doesn't work properly diff --git a/internal/pkg/eql/eql_test.go b/internal/pkg/eql/eql_test.go index c8425224415..ad37aa9c595 100644 --- a/internal/pkg/eql/eql_test.go +++ b/internal/pkg/eql/eql_test.go @@ -380,7 +380,6 @@ func TestEql(t *testing.T) { } for _, test := range testcases { - test := test var title string if test.err { title = fmt.Sprintf("%s failed parsing", test.expression) diff --git a/internal/pkg/otel/configtranslate/otelconfig.go b/internal/pkg/otel/configtranslate/otelconfig.go index 6c83e1e6836..ed9398029bd 100644 --- a/internal/pkg/otel/configtranslate/otelconfig.go +++ b/internal/pkg/otel/configtranslate/otelconfig.go @@ -77,7 +77,6 @@ func getSupportedComponents(model *component.Model) []*component.Component { var supportedComponents []*component.Component for _, comp := range model.Components { - comp := comp if IsComponentOtelSupported(&comp) { supportedComponents = append(supportedComponents, &comp) } @@ -111,12 +110,13 @@ func getExporterID(exporterType otelcomponent.Type, outputName string) otelcompo // getCollectorConfigForComponent returns the Otel collector config required to run the given component. // This function returns a full, valid configuration that can then be merged with configurations for other components. func getCollectorConfigForComponent(comp *component.Component, info info.Agent) (*confmap.Conf, error) { - outputQueueConfig := getOutputQueueConfig(comp) - receiversConfig, err := getReceiversConfigForComponent(comp, info, outputQueueConfig) + + exportersConfig, outputQueueConfig, err := getExportersConfigForComponent(comp) if err != nil { return nil, err } - exportersConfig, err := getExportersConfigForComponent(comp) + receiversConfig, err := getReceiversConfigForComponent(comp, info, outputQueueConfig) + if err != nil { return nil, err } @@ -187,33 +187,35 @@ func getReceiversConfigForComponent(comp *component.Component, info info.Agent, } // add the output queue config if present if outputQueueConfig != nil { - receiverConfig["output"] = outputQueueConfig + receiverConfig["queue"] = outputQueueConfig } return map[string]any{ receiverId.String(): receiverConfig, }, nil } -// getReceiversConfigForComponent returns the exporters configuration for a component. Usually this will be a single +// getReceiversConfigForComponent returns the exporters configuration and queue settings for a component. Usually this will be a single // exporter, but in principle it could be more. -func getExportersConfigForComponent(comp *component.Component) (map[string]any, error) { +func getExportersConfigForComponent(comp *component.Component) (exporterCfg map[string]any, queueCfg map[string]any, err error) { exportersConfig := map[string]any{} exporterType, err := getExporterTypeForComponent(comp) if err != nil { - return nil, err + return nil, nil, err } + var queueSettings map[string]any for _, unit := range comp.Units { if unit.Type == client.UnitTypeOutput { - unitExportersConfig, expErr := unitToExporterConfig(unit, exporterType, comp.InputType) - if expErr != nil { - return nil, expErr + var unitExportersConfig map[string]any + unitExportersConfig, queueSettings, err = unitToExporterConfig(unit, exporterType, comp.InputType) + if err != nil { + return nil, nil, err } for k, v := range unitExportersConfig { exportersConfig[k] = v } } } - return exportersConfig, nil + return exportersConfig, queueSettings, nil } // getBeatNameForComponent returns the beat binary name that would be used to run this component. @@ -260,14 +262,14 @@ func getExporterTypeForComponent(comp *component.Component) (otelcomponent.Type, } } -// unitToExporterConfig translates a component.Unit to an otel exporter configuration. -func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type, inputType string) (map[string]any, error) { +// unitToExporterConfig translates a component.Unit to return an otel exporter configuration and output queue settings +func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type, inputType string) (exportersCfg map[string]any, queueSettings map[string]any, err error) { if unit.Type == client.UnitTypeInput { - return nil, fmt.Errorf("unit type is an input, expected output: %v", unit) + return nil, nil, fmt.Errorf("unit type is an input, expected output: %v", unit) } configTranslationFunc, ok := configTranslationFuncForExporter[exporterType] if !ok { - return nil, fmt.Errorf("no config translation function for exporter type: %s", exporterType) + return nil, nil, fmt.Errorf("no config translation function for exporter type: %s", exporterType) } // we'd like to use the same exporter for all outputs with the same name, so we parse out the name for the unit id // these will be deduplicated by the configuration merging process at the end @@ -278,18 +280,30 @@ func unitToExporterConfig(unit component.Unit, exporterType otelcomponent.Type, unitConfigMap := unit.Config.GetSource().AsMap() // this is what beats do in libbeat/management/generate.go outputCfgC, err := config.NewConfigFrom(unitConfigMap) if err != nil { - return nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) + return nil, nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) } + // Config translation function can mutate queue settings defined under output config exporterConfig, err := configTranslationFunc(outputCfgC) if err != nil { - return nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) + return nil, nil, fmt.Errorf("error translating config for output: %s, unit: %s, error: %w", outputName, unit.ID, err) } - exportersCfg := map[string]any{ + exportersCfg = map[string]any{ exporterId.String(): exporterConfig, } - return exportersCfg, nil + // If output config contains queue settings defined by user/preset field, it should be promoted to the receiver section + if ok := outputCfgC.HasField("queue"); ok { + err := outputCfgC.Unpack(&queueSettings) + if err != nil { + return nil, nil, fmt.Errorf("error unpacking queue settings for output: %s, unit: %s, error: %w", outputName, unit.ID, err) + } + if queue, ok := queueSettings["queue"].(map[string]any); ok { + queueSettings = queue + } + } + + return exportersCfg, queueSettings, nil } // getInputsForUnit returns the beat inputs for a unit. These can directly be plugged into a beats receiver config. @@ -340,7 +354,6 @@ func translateEsOutputToExporter(cfg *config.C) (map[string]any, error) { } // we want to use dynamic indexing esConfig["logs_dynamic_index"] = map[string]any{"enabled": true} - esConfig["metrics_dynamic_index"] = map[string]any{"enabled": true} // we also want to use dynamic log ids esConfig["logs_dynamic_id"] = map[string]any{"enabled": true} @@ -349,30 +362,3 @@ func translateEsOutputToExporter(cfg *config.C) (map[string]any, error) { esConfig["mapping"] = map[string]any{"mode": "bodymap"} return esConfig, nil } - -// This is copied from https://github.com/elastic/beats/blob/main/libbeat/otelbeat/beatconverter/beatconverter.go -// getOutputQueueConfig gets the queue settings for the output unit in the component. We need to move these settings -// to the receiver configuration. -func getOutputQueueConfig(comp *component.Component) map[string]any { - // find the output unit config - var unitConfigMap map[string]any - for _, unit := range comp.Units { - if unit.Type == client.UnitTypeOutput { - unitConfigMap = unit.Config.GetSource().AsMap() - } - } - if unitConfigMap == nil { - return nil - } - - queueConfig, ok := unitConfigMap["queue"] - if !ok { - return nil - } - queueConfigMap, ok := queueConfig.(map[string]any) - if !ok { - return nil - } - - return queueConfigMap -} diff --git a/internal/pkg/otel/configtranslate/otelconfig_test.go b/internal/pkg/otel/configtranslate/otelconfig_test.go index 8883ae58fcb..533450ca188 100644 --- a/internal/pkg/otel/configtranslate/otelconfig_test.go +++ b/internal/pkg/otel/configtranslate/otelconfig_test.go @@ -164,10 +164,12 @@ func TestGetOtelConfig(t *testing.T) { }, } esOutputConfig := map[string]any{ - "type": "elasticsearch", - "hosts": []any{"localhost:9200"}, - "username": "elastic", - "password": "password", + "type": "elasticsearch", + "hosts": []any{"localhost:9200"}, + "username": "elastic", + "password": "password", + "preset": "balanced", + "queue.mem.events": 3200, } defaultProcessors := func(streamId, dataset string) []any { return []any{ @@ -300,14 +302,11 @@ func TestGetOtelConfig(t *testing.T) { "logs_dynamic_id": map[string]any{ "enabled": true, }, - "num_workers": 0, + "num_workers": 1, "api_key": "", "logs_index": "filebeat-9.0.0", "timeout": 90 * time.Second, "idle_conn_timeout": 3 * time.Second, - "metrics_dynamic_index": map[string]any{ - "enabled": true, - }, }, }, "receivers": map[string]any{ @@ -346,6 +345,15 @@ func TestGetOtelConfig(t *testing.T) { "path": map[string]any{ "data": filepath.Join(paths.Run(), "filestream-default"), }, + "queue": map[string]any{ + "mem": map[string]any{ + "events": uint64(3200), + "flush": map[string]any{ + "min_events": uint64(1600), + "timeout": "10s", + }, + }, + }, }, }, "service": map[string]any{ diff --git a/internal/pkg/otel/samples/darwin/managed_otlp/logs_metrics_traces.yml b/internal/pkg/otel/samples/darwin/managed_otlp/logs_metrics_traces.yml index f3402f29ff3..716b3615dc9 100644 --- a/internal/pkg/otel/samples/darwin/managed_otlp/logs_metrics_traces.yml +++ b/internal/pkg/otel/samples/darwin/managed_otlp/logs_metrics_traces.yml @@ -81,7 +81,7 @@ exporters: otlp/ingest: endpoint: ${env:ELASTIC_OTLP_ENDPOINT} headers: - Authorization: ${env:ELASTIC_API_KEY} + Authorization: ApiKey ${env:ELASTIC_API_KEY} service: extensions: [file_storage] diff --git a/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs.yml b/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs.yml index b4be582a6a9..f5782e2063a 100644 --- a/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs.yml +++ b/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs.yml @@ -56,7 +56,7 @@ exporters: otlp/ingest: endpoint: ${env:ELASTIC_OTLP_ENDPOINT} headers: - Authorization: ${env:ELASTIC_API_KEY} + Authorization: ApiKey ${env:ELASTIC_API_KEY} service: extensions: [file_storage] diff --git a/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs_hostmetrics.yml b/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs_hostmetrics.yml index 3911102e5b2..72bd20d5b72 100644 --- a/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs_hostmetrics.yml +++ b/internal/pkg/otel/samples/darwin/managed_otlp/platformlogs_hostmetrics.yml @@ -76,7 +76,7 @@ exporters: otlp/ingest: endpoint: ${env:ELASTIC_OTLP_ENDPOINT} headers: - Authorization: ${env:ELASTIC_API_KEY} + Authorization: ApiKey ${env:ELASTIC_API_KEY} service: extensions: [file_storage] diff --git a/internal/pkg/otel/samples/linux/managed_otlp/logs_metrics_traces.yml b/internal/pkg/otel/samples/linux/managed_otlp/logs_metrics_traces.yml index 62e74ea6c24..f27c7b32303 100644 --- a/internal/pkg/otel/samples/linux/managed_otlp/logs_metrics_traces.yml +++ b/internal/pkg/otel/samples/linux/managed_otlp/logs_metrics_traces.yml @@ -88,7 +88,7 @@ exporters: otlp/ingest: endpoint: ${env:ELASTIC_OTLP_ENDPOINT} headers: - Authorization: ${env:ELASTIC_API_KEY} + Authorization: ApiKey ${env:ELASTIC_API_KEY} service: extensions: [file_storage] diff --git a/internal/pkg/otel/samples/linux/managed_otlp/platformlogs.yml b/internal/pkg/otel/samples/linux/managed_otlp/platformlogs.yml index 6fb3bc50f61..e9c6811b999 100644 --- a/internal/pkg/otel/samples/linux/managed_otlp/platformlogs.yml +++ b/internal/pkg/otel/samples/linux/managed_otlp/platformlogs.yml @@ -56,7 +56,7 @@ exporters: otlp/ingest: endpoint: ${env:ELASTIC_OTLP_ENDPOINT} headers: - Authorization: ${env:ELASTIC_API_KEY} + Authorization: ApiKey ${env:ELASTIC_API_KEY} service: extensions: [file_storage] diff --git a/internal/pkg/otel/samples/linux/managed_otlp/platformlogs_hostmetrics.yml b/internal/pkg/otel/samples/linux/managed_otlp/platformlogs_hostmetrics.yml index a9f0aeb9c51..b1bfaa46660 100644 --- a/internal/pkg/otel/samples/linux/managed_otlp/platformlogs_hostmetrics.yml +++ b/internal/pkg/otel/samples/linux/managed_otlp/platformlogs_hostmetrics.yml @@ -83,7 +83,7 @@ exporters: otlp/ingest: endpoint: ${env:ELASTIC_OTLP_ENDPOINT} headers: - Authorization: ${env:ELASTIC_API_KEY} + Authorization: ApiKey ${env:ELASTIC_API_KEY} service: extensions: [file_storage] diff --git a/internal/pkg/release/version.go b/internal/pkg/release/version.go index ba7b01ac657..6b6d44e3686 100644 --- a/internal/pkg/release/version.go +++ b/internal/pkg/release/version.go @@ -20,6 +20,9 @@ const ( // snapshot is a flag marking build as a snapshot. var snapshot = "" +// fips is a flag for marking a FIPS compliant build. +var fips = "false" + // complete is an environment variable marking the image as complete. var complete = "ELASTIC_AGENT_COMPLETE" @@ -77,12 +80,18 @@ func Complete() bool { return ok && isComplete == "true" } +func FIPS() bool { + f, err := strconv.ParseBool(fips) + return err == nil && f +} + // VersionInfo is structure used by `version --yaml`. type VersionInfo struct { Version string `yaml:"version"` Commit string `yaml:"commit"` BuildTime time.Time `yaml:"build_time"` Snapshot bool `yaml:"snapshot"` + FIPS bool `yaml:"fips"` } // Info returns current version information. @@ -92,6 +101,7 @@ func Info() VersionInfo { Commit: Commit(), BuildTime: BuildTime(), Snapshot: Snapshot(), + FIPS: FIPS(), } } @@ -105,8 +115,12 @@ func (v VersionInfo) String() string { } sb.WriteString(" (build: ") sb.WriteString(v.Commit) + if v.FIPS { + sb.WriteString(" fips: true") + } sb.WriteString(" at ") sb.WriteString(v.BuildTime.Format("2006-01-02 15:04:05 -0700 MST")) sb.WriteString(")") + return sb.String() } diff --git a/internal/pkg/release/version_test.go b/internal/pkg/release/version_test.go index 34ca8f45e7b..915550954bc 100644 --- a/internal/pkg/release/version_test.go +++ b/internal/pkg/release/version_test.go @@ -72,3 +72,25 @@ func TestVersion(t *testing.T) { assert.Equal(t, expectedVersion, actualVersion) }) } + +func Test_VersionInfo_WithFIPS(t *testing.T) { + info := Info() + info.FIPS = false + assert.NotContains(t, info.String(), "fips:", "found fips indicator") + info.FIPS = true + assert.Contains(t, info.String(), "fips: true", "did not find fips indicator") +} + +func TestFIPS(t *testing.T) { + oldFips := fips + t.Cleanup(func() { + fips = oldFips + }) + + fips = "" + assert.False(t, FIPS(), "expected FIPS indicator to be false") + fips = "false" + assert.False(t, FIPS(), "expected FIPS indicator to be false") + fips = "true" + assert.True(t, FIPS(), "expected FIPS indicator to be true") +} diff --git a/magefile.go b/magefile.go index c4980ab37a1..41797887afd 100644 --- a/magefile.go +++ b/magefile.go @@ -83,6 +83,7 @@ const ( metaDir = "_meta" snapshotEnv = "SNAPSHOT" devEnv = "DEV" + fipsEnv = "FIPS" externalArtifacts = "EXTERNAL" platformsEnv = "PLATFORMS" packagesEnv = "PACKAGES" @@ -789,6 +790,9 @@ func (Cloud) Image(ctx context.Context) { variant := os.Getenv(dockerVariants) defer os.Setenv(dockerVariants, variant) + fips := os.Getenv(fipsEnv) + defer os.Setenv(fipsEnv, fips) + os.Setenv(platformsEnv, "linux/amd64") os.Setenv(packagesEnv, "docker") os.Setenv(devEnv, "true") @@ -803,6 +807,13 @@ func (Cloud) Image(ctx context.Context) { devtools.Snapshot = true } + fipsVal, err := strconv.ParseBool(fips) + if err != nil { + fipsVal = false + } + os.Setenv(fipsEnv, strconv.FormatBool(fipsVal)) + devtools.FIPSBuild = fipsVal + devtools.DevBuild = true devtools.Platforms = devtools.Platforms.Filter("linux/amd64") devtools.SelectedPackageTypes = []devtools.PackageType{devtools.Docker} @@ -1759,6 +1770,12 @@ func buildVars() map[string]string { isSnapshot, _ := os.LookupEnv(snapshotEnv) vars["github.com/elastic/elastic-agent/internal/pkg/release.snapshot"] = isSnapshot + if fipsFlag, fipsFound := os.LookupEnv(fipsEnv); fipsFound { + if fips, err := strconv.ParseBool(fipsFlag); err == nil && fips { + vars["github.com/elastic/elastic-agent/internal/pkg/release.fips"] = "true" + } + } + if isDevFlag, devFound := os.LookupEnv(devEnv); devFound { if isDev, err := strconv.ParseBool(isDevFlag); err == nil && isDev { vars["github.com/elastic/elastic-agent/internal/pkg/release.allowEmptyPgp"] = "true" diff --git a/pkg/component/runtime/conn_info_server_test.go b/pkg/component/runtime/conn_info_server_test.go index 9978f301e8a..ab70c7598c6 100644 --- a/pkg/component/runtime/conn_info_server_test.go +++ b/pkg/component/runtime/conn_info_server_test.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/proto" "github.com/elastic/elastic-agent/internal/pkg/testutils" "github.com/elastic/elastic-agent/pkg/ipc" + "github.com/elastic/elastic-agent/pkg/utils" ) type mockCommunicator struct { @@ -81,11 +82,11 @@ func getAddress(dir string, isLocal bool) string { if runtime.GOOS == "windows" { u.Scheme = "npipe" - return u.JoinPath("/", testSock).String() + return utils.SocketURLWithFallback(testSock, "/") } u.Scheme = "unix" - return u.JoinPath(dir, testSock).String() + return utils.SocketURLWithFallback(testSock, dir) } return fmt.Sprintf("127.0.0.1:%d", testPort) } diff --git a/pkg/component/runtime/manager.go b/pkg/component/runtime/manager.go index 5ee8b5b4681..623d0e58df1 100644 --- a/pkg/component/runtime/manager.go +++ b/pkg/component/runtime/manager.go @@ -14,6 +14,7 @@ import ( "net" "net/url" "path" + "strconv" "strings" "sync" "time" @@ -988,9 +989,9 @@ func (m *Manager) getListenAddr() string { if m.isLocal { return m.listenAddr } - addr := strings.SplitN(m.listenAddr, ":", 2) - if len(addr) == 2 && addr[1] == "0" { - return fmt.Sprintf("%s:%d", addr[0], m.listenPort) + host, port, err := net.SplitHostPort(m.listenAddr) + if err == nil && port == "0" { + return net.JoinHostPort(host, strconv.Itoa(m.listenPort)) } return m.listenAddr } diff --git a/pkg/component/runtime/runtime_comm_test.go b/pkg/component/runtime/runtime_comm_test.go index 4aa1e0eada4..b8c72910b6c 100644 --- a/pkg/component/runtime/runtime_comm_test.go +++ b/pkg/component/runtime/runtime_comm_test.go @@ -15,7 +15,9 @@ import ( "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" "github.com/elastic/elastic-agent/internal/pkg/core/authority" + "github.com/elastic/elastic-agent/pkg/core/logger" ) type agentInfoMock struct { @@ -50,6 +52,7 @@ func (a agentInfoMock) LogLevel() string { pa func (a agentInfoMock) RawLogLevel() string { panic("implement me") } func (a agentInfoMock) ReloadID(ctx context.Context) error { panic("implement me") } func (a agentInfoMock) SetLogLevel(ctx context.Context, level string) error { panic("implement me") } +func (a agentInfoMock) ECSMetadata(l *logger.Logger) (*info.ECSMeta, error) { panic("implement me") } func TestCheckinExpected(t *testing.T) { ca, err := authority.NewCA() diff --git a/pkg/testing/ess/create_deployment_csp_configuration.yaml b/pkg/testing/ess/create_deployment_csp_configuration.yaml index c0a12f8d67a..f7fb525a46d 100644 --- a/pkg/testing/ess/create_deployment_csp_configuration.yaml +++ b/pkg/testing/ess/create_deployment_csp_configuration.yaml @@ -13,3 +13,7 @@ aws: elasticsearch_conf_id: "aws.es.datahot.i3" elasticsearch_deployment_template_id: "aws-storage-optimized" kibana_instance_configuration_id: "aws.kibana.c5d" +docker: + integration_server_image: "docker.elastic.co/cloud-release/elastic-agent-cloud:9.1.0-6afab230-SNAPSHOT" + elasticsearch_image: "docker.elastic.co/cloud-release/elasticsearch-cloud-ess:9.1.0-6afab230-SNAPSHOT" + kibana_image: "docker.elastic.co/cloud-release/kibana-cloud:9.1.0-6afab230-SNAPSHOT" \ No newline at end of file diff --git a/pkg/testing/ess/create_deployment_request.tmpl.json b/pkg/testing/ess/create_deployment_request.tmpl.json index 561b40c9428..ca09593cb9f 100644 --- a/pkg/testing/ess/create_deployment_request.tmpl.json +++ b/pkg/testing/ess/create_deployment_request.tmpl.json @@ -17,6 +17,9 @@ ], "integrations_server": { "version": "{{ .request.Version }}" + {{ if .docker.integration_server_image }} + , "docker_image": "{{ .docker.integration_server_image }}" + {{ end }} } }, "ref_id": "main-integrations_server" @@ -55,6 +58,9 @@ ], "elasticsearch": { "version": "{{ .request.Version }}", + {{ if .docker.elasticsearch_image }} + "docker_image": "{{ .docker.elasticsearch_image }}", + {{ end }} "enabled_built_in_plugins": [] }, "deployment_template": { @@ -82,6 +88,9 @@ ], "kibana": { "version": "{{ .request.Version }}", + {{ if .docker.kibana_image }} + "docker_image": "{{ .docker.kibana_image }}", + {{ end }} "user_settings_json": { "xpack.fleet.enableExperimental": ["agentTamperProtectionEnabled"], "xpack.fleet.internal.registry.kibanaVersionCheckEnabled": false, diff --git a/pkg/testing/ess/deployment.go b/pkg/testing/ess/deployment.go index 6e84cd66709..3dc2474bdf3 100644 --- a/pkg/testing/ess/deployment.go +++ b/pkg/testing/ess/deployment.go @@ -384,5 +384,10 @@ func loadCspValues(csp string) (map[string]any, error) { return nil, fmt.Errorf("csp %s not supported", csp) } + // check for docker overrides + if dockerValues, dockerValuesFound := cspValues["docker"]; dockerValuesFound { + values["docker"] = dockerValues + } + return values, nil } diff --git a/pkg/testing/ess/deployment_test.go b/pkg/testing/ess/deployment_test.go index d5043199418..1cfcd795b8d 100644 --- a/pkg/testing/ess/deployment_test.go +++ b/pkg/testing/ess/deployment_test.go @@ -7,6 +7,7 @@ package ess import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -44,3 +45,168 @@ func TestOverallStatus(t *testing.T) { }) } } + +func Test_generateCreateDeploymentRequestBody(t *testing.T) { + type args struct { + req CreateDeploymentRequest + cspValues []byte + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "Deployment request with docker overrides", + args: args{ + req: CreateDeploymentRequest{ + Name: "testd", + Region: "test-someregion", + Version: "1.2.3", + Tags: nil, + }, + cspValues: []byte(` + test: + integrations_server_conf_id: "gcp.integrationsserver.n2.68x32x45" + elasticsearch_conf_id: "gcp.es.datahot.n2.68x10x45" + elasticsearch_deployment_template_id: "gcp-storage-optimized" + kibana_instance_configuration_id: "gcp.kibana.n2.68x32x45" + docker: + integration_server_image: "docker.elastic.co/cloud-release/elastic-agent-cloud:1.2.3-foo-SNAPSHOT" + elasticsearch_image: "docker.elastic.co/cloud-release/elasticsearch-cloud-ess:1.2.3-foo-SNAPSHOT" + kibana_image: "docker.elastic.co/cloud-release/kibana-cloud:1.2.3-foo-SNAPSHOT" +`), + }, + want: ` + { + "resources": { + "integrations_server": [ + { + "elasticsearch_cluster_ref_id": "main-elasticsearch", + "region": "test-someregion", + "plan": { + "cluster_topology": [ + { + "instance_configuration_id": "gcp.integrationsserver.n2.68x32x45", + "zone_count": 1, + "size": { + "resource": "memory", + "value": 1024 + } + } + ], + "integrations_server": { + "version": "1.2.3", + "docker_image": "docker.elastic.co/cloud-release/elastic-agent-cloud:1.2.3-foo-SNAPSHOT" + + } + }, + "ref_id": "main-integrations_server" + } + ], + "elasticsearch": [ + { + "region": "test-someregion", + "settings": { + "dedicated_masters_threshold": 6 + }, + "plan": { + "cluster_topology": [ + { + "zone_count": 1, + "elasticsearch": { + "node_attributes": { + "data": "hot" + } + }, + "instance_configuration_id": "gcp.es.datahot.n2.68x10x45", + "node_roles": [ + "master", + "ingest", + "transform", + "data_hot", + "remote_cluster_client", + "data_content" + ], + "id": "hot_content", + "size": { + "resource": "memory", + "value": 8192 + } + } + ], + "elasticsearch": { + "version": "1.2.3", + "docker_image": "docker.elastic.co/cloud-release/elasticsearch-cloud-ess:1.2.3-foo-SNAPSHOT", + "enabled_built_in_plugins": [] + }, + "deployment_template": { + "id": "gcp-storage-optimized" + } + }, + "ref_id": "main-elasticsearch" + } + ], + "enterprise_search": [], + "kibana": [ + { + "elasticsearch_cluster_ref_id": "main-elasticsearch", + "region": "test-someregion", + "plan": { + "cluster_topology": [ + { + "instance_configuration_id": "gcp.kibana.n2.68x32x45", + "zone_count": 1, + "size": { + "resource": "memory", + "value": 1024 + } + } + ], + "kibana": { + "version": "1.2.3", + "docker_image": "docker.elastic.co/cloud-release/kibana-cloud:1.2.3-foo-SNAPSHOT", + "user_settings_json": { + "xpack.fleet.enableExperimental": ["agentTamperProtectionEnabled"], + "xpack.fleet.internal.registry.kibanaVersionCheckEnabled": false, + "server.restrictInternalApis": false + } + } + }, + "ref_id": "main-kibana" + } + ] + }, + "settings": { + "autoscaling_enabled": false + }, + "name": "testd", + "metadata": { + "system_owned": false, + "tags": null + } + }`, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + backupCsp := cloudProviderSpecificValues + t.Cleanup(func() { + cloudProviderSpecificValues = backupCsp + }) + if tt.args.cspValues != nil { + cloudProviderSpecificValues = tt.args.cspValues + } + got, err := generateCreateDeploymentRequestBody(tt.args.req) + if (err != nil) != tt.wantErr { + t.Errorf("generateCreateDeploymentRequestBody() error = %v, wantErr %v", err, tt.wantErr) + return + } + actualJSON := string(got) + t.Logf("JSON rendered: %s", actualJSON) + assert.JSONEq(t, tt.want, actualJSON) + }) + } +} diff --git a/pkg/testing/tools/fleettools/fleet.go b/pkg/testing/tools/fleettools/fleet.go index 1c774bdb4b5..20acc6c9f8b 100644 --- a/pkg/testing/tools/fleettools/fleet.go +++ b/pkg/testing/tools/fleettools/fleet.go @@ -6,10 +6,13 @@ package fleettools import ( "context" + "encoding/json" "errors" "fmt" + "io" + "net/http" + "net/url" "os" - "strings" "github.com/gofrs/uuid/v5" @@ -22,36 +25,70 @@ type EnrollParams struct { PolicyID string `json:"policy_id"` } +func extractError(result []byte) error { + var kibanaResult struct { + Message string + Attributes struct { + Objects []struct { + ID string + Error struct { + Message string + } + } + } + } + if err := json.Unmarshal(result, &kibanaResult); err != nil { + return fmt.Errorf("error extracting JSON for error response: %w", err) + } + var errs []error + if kibanaResult.Message != "" { + for _, err := range kibanaResult.Attributes.Objects { + errs = append(errs, fmt.Errorf("id: %s, message: %s", err.ID, err.Error.Message)) + } + if len(errs) == 0 { + return fmt.Errorf("%s", kibanaResult.Message) + } + return fmt.Errorf("%s: %w", kibanaResult.Message, errors.Join(errs...)) + + } + return nil +} + // GetAgentByPolicyIDAndHostnameFromList get an agent by the local_metadata.host.name property, reading from the agents list func GetAgentByPolicyIDAndHostnameFromList(ctx context.Context, client *kibana.Client, policyID, hostname string) (*kibana.AgentExisting, error) { - listAgentsResp, err := client.ListAgents(ctx, kibana.ListAgentsRequest{}) + params := url.Values{} + params.Add("kuery", fmt.Sprintf(`local_metadata.host.name:"%s" and policy_id:"%s" and active:true`, hostname, policyID)) + + resp, err := client.Connection.SendWithContext(ctx, http.MethodGet, "/api/fleet/agents", params, nil, nil) if err != nil { - return nil, err + return nil, fmt.Errorf("error calling list agents API: %w", err) } + defer resp.Body.Close() - onPolicy := make([]string, 0, len(listAgentsResp.Items)) - matching := make([]*kibana.AgentExisting, 0, 1) - for i, item := range listAgentsResp.Items { - agentHostname := item.LocalMetadata.Host.Hostname - agentPolicyID := item.PolicyID - if agentPolicyID == policyID { - onPolicy = append(onPolicy, agentHostname) - if strings.EqualFold(agentHostname, hostname) { - matching = append(matching, &listAgentsResp.Items[i]) - } - } + b, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("reading response body: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, extractError(b) + } + var r kibana.ListAgentsResponse + err = json.Unmarshal(b, &r) + if err != nil { + return nil, fmt.Errorf("unmarshalling response json: %w", err) } - if len(matching) == 0 { - return nil, fmt.Errorf("unable to find agent with hostname [%s] for policy [%s]. Found: %v", - hostname, policyID, onPolicy) + if len(r.Items) == 0 { + return nil, fmt.Errorf("unable to find agent with hostname [%s] for policy [%s]", + hostname, policyID) } - if len(matching) > 1 { - return nil, fmt.Errorf("found %d agents with hostname [%s]; expected to find only one", len(matching), hostname) + if len(r.Items) > 1 { + return nil, fmt.Errorf("found %d agents with hostname [%s] for policy [%s]; expected to find only one, response:\n%s", len(r.Items), hostname, policyID, b) } - return matching[0], nil + return &r.Items[0], nil } func GetAgentIDByHostname(ctx context.Context, client *kibana.Client, policyID, hostname string) (string, error) { diff --git a/test_infra/ess/deployment.tf b/test_infra/ess/deployment.tf index 89f4b7c7583..ec0cdd1872f 100644 --- a/test_infra/ess/deployment.tf +++ b/test_infra/ess/deployment.tf @@ -34,6 +34,24 @@ variable "pipeline" { description = "The buildkite pipeline slug, useful for in combination with the build id to trace back to the pipeline" } +variable "integration_server_docker_image" { + type = string + default = "" + description = "Docker image override for integration server" +} + +variable "elasticsearch_docker_image" { + type = string + default = "" + description = "Docker image override for elasticsearch" +} + +variable "kibana_docker_image" { + type = string + default = "" + description = "Docker image override for kibana" +} + resource "random_uuid" "deployment_suffix" { } @@ -43,6 +61,12 @@ locals { ess_region = coalesce(var.ess_region, "gcp-us-east1") deployment_template_id = coalesce(var.deployment_template_id, "gcp-storage-optimized") + + ess_properties = yamldecode(file("${path.module}/../../pkg/testing/ess/create_deployment_csp_configuration.yaml")) + + integration_server_docker_image = coalesce(var.integration_server_docker_image, local.ess_properties.docker.integration_server_image) + elasticsearch_docker_image = coalesce(var.elasticsearch_docker_image, local.ess_properties.docker.elasticsearch_image) + kibana_docker_image = coalesce(var.kibana_docker_image, local.ess_properties.docker.kibana_image) } # If we have defined a stack version, validate that this version exists on that region and return it. @@ -60,12 +84,14 @@ resource "ec_deployment" "integration-testing" { elasticsearch = { autoscale = false - hot = { autoscaling = {} size = "8g" zone_count = 1 } + config = { + docker_image = local.elasticsearch_docker_image + } } kibana = { size = "1g" @@ -76,6 +102,7 @@ resource "ec_deployment" "integration-testing" { "xpack.fleet.internal.registry.kibanaVersionCheckEnabled" = false "server.restrictInternalApis" = false }) + docker_image = local.kibana_docker_image } } @@ -84,6 +111,9 @@ resource "ec_deployment" "integration-testing" { size = "1g" zone_count = 1 } + config = { + docker_image = local.integration_server_docker_image + } } tags = { diff --git a/testing/fleetservertest/server.go b/testing/fleetservertest/server.go index 38dd50337e6..3bbe744a333 100644 --- a/testing/fleetservertest/server.go +++ b/testing/fleetservertest/server.go @@ -110,7 +110,6 @@ func NewRouter(handlers *Handlers) *mux.Router { router := mux.NewRouter().StrictSlash(true) for _, route := range handlers.Routes() { - route := route // needed because it's been captured in the closure router. Methods(route.Method). Path(route.Pattern). @@ -514,7 +513,7 @@ func (s *statusResponseWriter) Header() http.Header { func (s *statusResponseWriter) Write(bs []byte) (int, error) { n, err := s.w.Write(bs) - s.byteCount.Add(uint64(n)) + s.byteCount.Add(uint64(n)) //nolint:gosec// output of Write is guaranteed to be non-negative return n, err } diff --git a/testing/integration/log_level_test.go b/testing/integration/log_level_test.go index ac77e9c77a0..71a40419bbe 100644 --- a/testing/integration/log_level_test.go +++ b/testing/integration/log_level_test.go @@ -13,6 +13,7 @@ import ( "fmt" "io" "net/http" + "net/http/httputil" "strings" "testing" "text/template" @@ -245,19 +246,27 @@ func updateAgentLogLevel(ctx context.Context, t *testing.T, kibanaClient *kibana } err = updateLogLevelTemplate.Execute(buf, templateData) + if err != nil { + return fmt.Errorf("rendering updateLogLevelTemplate: %w", err) + } + t.Logf("Updating agent-specific log level to %q", logLevel) - _, err = kibanaClient.SendWithContext(ctx, http.MethodPost, "/api/fleet/agents/"+agentID+"/actions", nil, nil, buf) + resp, err := kibanaClient.SendWithContext(ctx, http.MethodPost, "/api/fleet/agents/"+agentID+"/actions", nil, nil, buf) if err != nil { return fmt.Errorf("error executing fleet request: %w", err) } - // The log below is a bit spammy but it can be useful for debugging - //respDump, err := httputil.DumpResponse(fleetResp, true) - //if err != nil { - // t.Logf("Error dumping Fleet response to updating agent-specific log level: %v", err) - //} else { - // t.Logf("Fleet response to updating agent-specific log level:\n----- BEGIN RESPONSE DUMP -----\n%s\n----- END RESPONSE DUMP -----\n", string(respDump)) - //} + if resp.StatusCode != http.StatusOK { + t.Logf("error updating agent-specific log level to %q", logLevel) + // The log below is a bit spammy but it can be useful for debugging + respDump, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Logf("Error dumping Fleet response to updating agent-specific log level: %v", err) + } else { + t.Logf("Fleet response to updating agent-specific log level:\n----- BEGIN RESPONSE DUMP -----\n%s\n----- END RESPONSE DUMP -----\n", string(respDump)) + } + return fmt.Errorf("error updating agent-specific log level to %q: fleet response status code: %d", logLevel, resp.StatusCode) + } return nil } @@ -293,19 +302,23 @@ func updatePolicyLogLevel(ctx context.Context, t *testing.T, kibanaClient *kiban return fmt.Errorf("error rendering policy update template: %w", err) } - _, err = kibanaClient.SendWithContext(ctx, http.MethodPut, "/api/fleet/agent_policies/"+policy.ID, nil, nil, buf) + resp, err := kibanaClient.SendWithContext(ctx, http.MethodPut, "/api/fleet/agent_policies/"+policy.ID, nil, nil, buf) if err != nil { return fmt.Errorf("error executing fleet request: %w", err) } - // The log below is a bit spammy but it can be useful for debugging - //respDump, err := httputil.DumpResponse(fleetResp, true) - //if err != nil { - // t.Logf("Error dumping Fleet response to updating policy log level: %v", err) - //} else { - // t.Logf("Fleet response to updating policy log level:\n----- BEGIN RESPONSE DUMP -----\n%s\n----- END RESPONSE DUMP -----\n", string(respDump)) - //} + if resp.StatusCode != http.StatusOK { + t.Logf("error updating policy log level to %q", newPolicyLogLevel) + // The log below is a bit spammy but it can be useful for debugging + respDump, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Logf("Error dumping Fleet response to updating policy log level: %v", err) + } else { + t.Logf("Fleet response to updating policy log level:\n----- BEGIN RESPONSE DUMP -----\n%s\n----- END RESPONSE DUMP -----\n", string(respDump)) + } + return fmt.Errorf("error updating policy log level to %q: fleet response status code: %d", newPolicyLogLevel, resp.StatusCode) + } return nil } diff --git a/testing/integration/otel_helm_test.go b/testing/integration/otel_helm_test.go index bfe5f4183c5..590097eccd8 100644 --- a/testing/integration/otel_helm_test.go +++ b/testing/integration/otel_helm_test.go @@ -109,9 +109,9 @@ func TestOtelKubeStackHelm(t *testing.T) { // (1 EDOT collector pod per node) // - A Cluster wide Deployment to collect K8s metrics and // events (1 EDOT collector pod per cluster) - // - One Gateway pod to collect, aggregate and forward + // - Two Gateway replicas to collect, aggregate and forward // telemetry. - k8sStepCheckRunningPods("app.kubernetes.io/managed-by=opentelemetry-operator", 3, "otc-container"), + k8sStepCheckRunningPods("app.kubernetes.io/managed-by=opentelemetry-operator", 4, "otc-container"), }, }, } diff --git a/testing/integration/testdata/.upgrade-test-agent-versions.yml b/testing/integration/testdata/.upgrade-test-agent-versions.yml index 550aa561b49..96cba951c3e 100644 --- a/testing/integration/testdata/.upgrade-test-agent-versions.yml +++ b/testing/integration/testdata/.upgrade-test-agent-versions.yml @@ -5,8 +5,8 @@ # upgrade integration tests. testVersions: + - 9.0.0-SNAPSHOT - 8.19.0-SNAPSHOT - - 8.18.0-SNAPSHOT - 8.17.2 - 8.16.4 - 7.17.28-SNAPSHOT diff --git a/testing/mocks/internal_/pkg/agent/application/info/agent_mock.go b/testing/mocks/internal_/pkg/agent/application/info/agent_mock.go index c61b0bab787..32d403a7d5c 100644 --- a/testing/mocks/internal_/pkg/agent/application/info/agent_mock.go +++ b/testing/mocks/internal_/pkg/agent/application/info/agent_mock.go @@ -2,13 +2,16 @@ // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. -// Code generated by mockery v2.51.1. DO NOT EDIT. +// Code generated by mockery v2.53.0. DO NOT EDIT. package info import ( context "context" + logp "github.com/elastic/elastic-agent-libs/logp" + info "github.com/elastic/elastic-agent/internal/pkg/agent/application/info" + mock "github.com/stretchr/testify/mock" ) @@ -70,6 +73,64 @@ func (_c *Agent_AgentID_Call) RunAndReturn(run func() string) *Agent_AgentID_Cal return _c } +// ECSMetadata provides a mock function with given fields: _a0 +func (_m *Agent) ECSMetadata(_a0 *logp.Logger) (*info.ECSMeta, error) { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for ECSMetadata") + } + + var r0 *info.ECSMeta + var r1 error + if rf, ok := ret.Get(0).(func(*logp.Logger) (*info.ECSMeta, error)); ok { + return rf(_a0) + } + if rf, ok := ret.Get(0).(func(*logp.Logger) *info.ECSMeta); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*info.ECSMeta) + } + } + + if rf, ok := ret.Get(1).(func(*logp.Logger) error); ok { + r1 = rf(_a0) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Agent_ECSMetadata_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ECSMetadata' +type Agent_ECSMetadata_Call struct { + *mock.Call +} + +// ECSMetadata is a helper method to define mock.On call +// - _a0 *logp.Logger +func (_e *Agent_Expecter) ECSMetadata(_a0 interface{}) *Agent_ECSMetadata_Call { + return &Agent_ECSMetadata_Call{Call: _e.mock.On("ECSMetadata", _a0)} +} + +func (_c *Agent_ECSMetadata_Call) Run(run func(_a0 *logp.Logger)) *Agent_ECSMetadata_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(*logp.Logger)) + }) + return _c +} + +func (_c *Agent_ECSMetadata_Call) Return(_a0 *info.ECSMeta, _a1 error) *Agent_ECSMetadata_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *Agent_ECSMetadata_Call) RunAndReturn(run func(*logp.Logger) (*info.ECSMeta, error)) *Agent_ECSMetadata_Call { + _c.Call.Return(run) + return _c +} + // Headers provides a mock function with no fields func (_m *Agent) Headers() map[string]string { ret := _m.Called() diff --git a/testing/upgradetest/versions.go b/testing/upgradetest/versions.go index ff13ac18ebc..9be4854028f 100644 --- a/testing/upgradetest/versions.go +++ b/testing/upgradetest/versions.go @@ -188,6 +188,7 @@ func findRequiredVersions(sortedParsedVersions []*version.ParsedSemVer, reqs Ver currentMajor := parsedUpgradeToVersion.Major() currentMinor := parsedUpgradeToVersion.Minor() + skipCurrentMajor := false currentMajorsToFind := reqs.CurrentMajors previousMajorsToFind := reqs.PreviousMajors previousMinorsToFind := reqs.PreviousMinors @@ -214,7 +215,7 @@ func findRequiredVersions(sortedParsedVersions []*version.ParsedSemVer, reqs Ver currentMajorsToFind-- // counts as the current major as well // current majors - case currentMajorsToFind > 0 && version.Major() == currentMajor: + case currentMajorsToFind > 0 && version.Major() == currentMajor && !skipCurrentMajor: upgradableVersions = append(upgradableVersions, version.String()) currentMajorsToFind-- @@ -222,7 +223,10 @@ func findRequiredVersions(sortedParsedVersions []*version.ParsedSemVer, reqs Ver case previousMajorsToFind > 0 && version.Major() < currentMajor: upgradableVersions = append(upgradableVersions, version.String()) currentMajor = version.Major() + currentMinor = version.Minor() previousMajorsToFind-- + previousMinorsToFind-- // count as prev minor as well + skipCurrentMajor = true // since the list is sorted we can stop here default: @@ -252,7 +256,13 @@ func PreviousMinor() (*version.ParsedSemVer, error) { // will only contain minors from the previous major (vX-1). Further, since the // version list is sorted in descending order (newer versions first), we can return the // first item from the list as it will be the newest minor of the previous major. - return versions[0], nil + for _, v := range versions { + if v.Less(*current) { + return v, nil + } + } + + return nil, ErrNoPreviousMinor } for _, v := range versions {