diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc
index d85072319533..46e5412a3824 100644
--- a/CHANGELOG-developer.next.asciidoc
+++ b/CHANGELOG-developer.next.asciidoc
@@ -68,6 +68,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Split split httpmon out of x-pack/filebeat/input/internal/httplog. {pull}36385[36385]
- Beats publishing pipeline does not propagate the close signal to its clients any more. It's responsibility of the user to close the pipeline client. {issue}38197[38197] {pull}38556[38556]
- Debug log entries from the acker (`stateful ack ...` or `stateless ack ...`) removed. {pull}39672[39672]
+- Rename x-pack/filebeat websocket input to streaming. {issue}40264[40264] {pull}40421[40421]
==== Bugfixes
diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc
index d30d3c3b9e3a..860f98f2356a 100644
--- a/filebeat/docs/filebeat-options.asciidoc
+++ b/filebeat/docs/filebeat-options.asciidoc
@@ -91,10 +91,10 @@ You can configure {beatname_uc} to use the following inputs:
* <<{beatname_lc}-input-redis>>
* <<{beatname_lc}-input-salesforce>>
* <<{beatname_lc}-input-stdin>>
+* <<{beatname_lc}-input-streaming>>
* <<{beatname_lc}-input-syslog>>
* <<{beatname_lc}-input-tcp>>
* <<{beatname_lc}-input-udp>>
-* <<{beatname_lc}-input-websocket>>
include::multiline.asciidoc[]
@@ -148,6 +148,8 @@ include::../../x-pack/filebeat/docs/inputs/input-salesforce.asciidoc[]
include::inputs/input-stdin.asciidoc[]
+include::../../x-pack/filebeat/docs/inputs/input-streaming.asciidoc[]
+
include::inputs/input-syslog.asciidoc[]
include::inputs/input-tcp.asciidoc[]
@@ -155,5 +157,3 @@ include::inputs/input-tcp.asciidoc[]
include::inputs/input-udp.asciidoc[]
include::inputs/input-unix.asciidoc[]
-
-include::../../x-pack/filebeat/docs/inputs/input-websocket.asciidoc[]
diff --git a/x-pack/agentbeat/agentbeat.spec.yml b/x-pack/agentbeat/agentbeat.spec.yml
index 8f153a02ebae..d1f86dfe05e2 100644
--- a/x-pack/agentbeat/agentbeat.spec.yml
+++ b/x-pack/agentbeat/agentbeat.spec.yml
@@ -248,8 +248,10 @@ inputs:
platforms: *platforms
outputs: *outputs
command: *filebeat_command
- - name: websocket
- description: "Websocket"
+ - name: streaming
+ aliases:
+ - websocket
+ description: "Streaming"
platforms: *platforms
outputs: *outputs
command: *filebeat_command
diff --git a/x-pack/filebeat/docs/inputs/input-websocket.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc
similarity index 83%
rename from x-pack/filebeat/docs/inputs/input-websocket.asciidoc
rename to x-pack/filebeat/docs/inputs/input-streaming.asciidoc
index eed0a25d4b55..3ec257f852d2 100644
--- a/x-pack/filebeat/docs/inputs/input-websocket.asciidoc
+++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc
@@ -1,18 +1,18 @@
[role="xpack"]
-:type: websocket
+:type: streaming
:mito_version: v1.8.0
:mito_docs: https://pkg.go.dev/github.com/elastic/mito@{mito_version}
[id="{beatname_lc}-input-{type}"]
-=== Websocket Input
+=== Streaming Input
experimental[]
++++
-Websocket
+Streaming
++++
-The `websocket` input reads messages from a websocket server or api endpoint. This input uses the `CEL engine` and the `mito` library interally to parse and process the messages. Having support for `CEL` allows you to parse and process the messages in a more flexible way. It has many similarities with the `cel` input as to how the `CEL` programs are written but deviates in the way the messages are read and processed. The `websocket` input is a `streaming` input and can only be used to read messages from a websocket server or api endpoint.
+The `streaming` input reads messages from a streaming data source, for example a websocket server. This input uses the `CEL engine` and the `mito` library interally to parse and process the messages. Having support for `CEL` allows you to parse and process the messages in a more flexible way. It has many similarities with the `cel` input as to how the `CEL` programs are written but deviates in the way the messages are read and processed. Currently only websocket server or API endpoints are supported.
This input supports:
@@ -21,7 +21,7 @@ This input supports:
** Bearer
** Custom
-NOTE: The `websocket` input as of now does not support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.
+NOTE: The `streaming` input websocket handler does not currently support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.
==== Execution
@@ -38,7 +38,7 @@ On start the `state` will be something like this:
...
}
----
-The `websocket` input creates a `response` field in the state map and attaches the websocket message to this field. All `CEL` programs written should act on this `response` field. Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent. Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below.
+The `streaming` input websocket handler creates a `response` field in the state map and attaches the websocket message to this field. All `CEL` programs written should act on this `response` field. Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent. Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below.
If the cursor is present the program should process or filter out responses based on its value. If cursor is not present all responses should be processed as per the program's logic.
@@ -59,7 +59,7 @@ After completion of a program's execution it should return a single object with
----
<1> The `events` field must be present, but may be empty or null. If it is not empty, it must only have objects as elements.
-The field could be an array or a single object that will be treated as an array with a single element. This depends completely on the websocket server or api endpoint. The `events` field is the array of events to be published to the output. Each event must be a JSON object.
+The field could be an array or a single object that will be treated as an array with a single element. This depends completely on the streaming data source. The `events` field is the array of events to be published to the output. Each event must be a JSON object.
<2> If `cursor` is present it must be either be a single object or an array with the same length as events; each element _i_ of the `cursor` will be the details for obtaining the events at and beyond event _i_ in the `events` array. If the `cursor` is a single object, it will be the details for obtaining events after the last event in the `events` array and will only be retained on successful publication of all the events in the `events` array.
@@ -70,7 +70,7 @@ Example configuration:
----
filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
-- type: websocket
+- type: streaming
url: ws://localhost:443/v1/stream
program: |
bytes(state.response).decode_json().as(inner_body,{
@@ -83,7 +83,7 @@ filebeat.inputs:
==== Debug state logging
The Websocket input will log the complete state when logging at the DEBUG level before and after CEL evaluation.
-This will include any sensitive or secret information kept in the `state` object, and so DEBUG level logging should not be used in production when sensitive information is retained in the `state` object. See <> configuration parameters for settings to exclude sensitive fields from DEBUG logs.
+This will include any sensitive or secret information kept in the `state` object, and so DEBUG level logging should not be used in production when sensitive information is retained in the `state` object. See <> configuration parameters for settings to exclude sensitive fields from DEBUG logs.
==== Authentication
The Websocket input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.
@@ -93,7 +93,7 @@ Example configurations with authentication:
["source","yaml",subs="attributes"]
----
filebeat.inputs:
-- type: websocket
+- type: streaming
auth.basic_token: "dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----
@@ -101,7 +101,7 @@ filebeat.inputs:
["source","yaml",subs="attributes"]
----
filebeat.inputs:
-- type: websocket
+- type: streaming
auth.bearer_token: "dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----
@@ -109,7 +109,7 @@ filebeat.inputs:
["source","yaml",subs="attributes"]
----
filebeat.inputs:
-- type: websocket
+- type: streaming
auth.custom:
header: "x-api-key"
value: "dXNlcjpwYXNzd29yZA=="
@@ -119,25 +119,25 @@ filebeat.inputs:
["source","yaml",subs="attributes"]
----
filebeat.inputs:
-- type: websocket
+- type: streaming
auth.custom:
header: "Auth"
value: "Bearer dXNlcjpwYXNzd29yZA=="
url: wss://localhost:443/_stream
----
-[[input-state-websocket]]
+[[input-state-streaming]]
==== Input state
-The `websocket` input keeps a runtime state between every message received. This state can be accessed by the CEL program and may contain arbitrary objects.
+The `streaming` input keeps a runtime state between every message received. This state can be accessed by the CEL program and may contain arbitrary objects.
The state must contain a `response` map and may contain any object the user wishes to store in it. All objects are stored at runtime, except `cursor`, which has values that are persisted between restarts.
==== Configuration options
-The `websocket` input supports the following configuration options plus the
+The `streaming` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.
-[[program-websocket]]
+[[program-streaming]]
[float]
==== `program`
@@ -153,11 +153,11 @@ program: |
})
----
-[[input-url-program-websocket]]
+[[input-url-program-streaming]]
[float]
==== `url_program`
-If present, this CEL program is executed before the websocket connection is established using the `state` object, including any stored cursor value. It must evaluate to a valid URL. The returned URL is used to make the websocket connection for processing. The program may use cursor values or other state defined values to customize the URL at runtime.
+If present, this CEL program is executed before the streaming connection is established using the `state` object, including any stored cursor value. It must evaluate to a valid URL. The returned URL is used to make the streaming connection for processing. The program may use cursor values or other state defined values to customize the URL at runtime.
["source","yaml",subs="attributes"]
----
@@ -177,13 +177,13 @@ program: |
})
----
-[[state-websocket]]
+[[state-streaming]]
[float]
==== `state`
`state` is an optional object that is passed to the CEL program on the first execution. It is available to the executing program as the `state` variable. Except for the `state.cursor` field, `state` does not persist over restarts.
-[[cursor-websocket]]
+[[cursor-streaming]]
[float]
==== `state.cursor`
@@ -193,7 +193,7 @@ The cursor is an object available as `state.cursor` where arbitrary values may b
----
filebeat.inputs:
# Read and process simple websocket messages from a local websocket server
-- type: websocket
+- type: streaming
url: ws://localhost:443/v1/stream
program: |
bytes(state.response).as(body, {
@@ -207,7 +207,7 @@ filebeat.inputs:
})
----
-[[regexp-websocket]]
+[[regexp-streaming]]
[float]
==== `regexp`
@@ -216,14 +216,14 @@ A set of named regular expressions that may be used during a CEL program's execu
["source","yaml",subs="attributes"]
----
filebeat.inputs:
-- type: websocket
+- type: streaming
# Define two regular expressions, 'products' and 'solutions' for use during CEL program execution.
regexp:
products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)'
solutions: '(?i)(Search|Observability|Security)'
----
-[[websocket-state-redact]]
+[[streaming-state-redact]]
[float]
==== `redact`
@@ -233,7 +233,7 @@ In the case of no-required redaction an empty `redact.fields` configuration shou
["source","yaml",subs="attributes"]
----
-- type: websocket
+- type: streaming
redact:
fields: ~
----
@@ -243,7 +243,7 @@ As an example, if a user-constructed Basic Authentication request is used in a C
["source","yaml",subs="attributes"]
----
filebeat.inputs:
-- type: websocket
+- type: streaming
url: ws://localhost:443/_stream
state:
user: user@domain.tld
@@ -290,11 +290,11 @@ observe the activity of the input.
==== Developer tools
-A stand-alone CEL environment that implements the majority of the websocket input's Comment Expression Language functionality is available in the https://github.com/elastic/mito[Elastic Mito] repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running `go install github.com/elastic/mito/cmd/mito@latest` and requires a Go toolchain.
+A stand-alone CEL environment that implements the majority of the streaming input's Comment Expression Language functionality is available in the https://github.com/elastic/mito[Elastic Mito] repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running `go install github.com/elastic/mito/cmd/mito@latest` and requires a Go toolchain.
[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]
-NOTE: The `websocket` input is currently tagged as experimental and might have bugs and other issues. Please report any issues on the https://github.com/elastic/beats[Github] repository.
+NOTE: The `streaming` input is currently tagged as experimental and might have bugs and other issues. Please report any issues on the https://github.com/elastic/beats[Github] repository.
-:type!:
\ No newline at end of file
+:type!:
diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go
index 6c7708d7c0f4..731f8979766e 100644
--- a/x-pack/filebeat/input/default-inputs/inputs_other.go
+++ b/x-pack/filebeat/input/default-inputs/inputs_other.go
@@ -25,7 +25,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow"
"github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit"
"github.com/elastic/beats/v7/x-pack/filebeat/input/salesforce"
- "github.com/elastic/beats/v7/x-pack/filebeat/input/websocket"
+ "github.com/elastic/beats/v7/x-pack/filebeat/input/streaming"
"github.com/elastic/elastic-agent-libs/logp"
)
@@ -44,7 +44,8 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2
awscloudwatch.Plugin(),
lumberjack.Plugin(),
salesforce.Plugin(log, store),
- websocket.Plugin(log, store),
+ streaming.Plugin(log, store),
+ streaming.PluginWebsocketAlias(log, store),
netflow.Plugin(log),
benchmark.Plugin(),
}
diff --git a/x-pack/filebeat/input/websocket/cel.go b/x-pack/filebeat/input/streaming/cel.go
similarity index 99%
rename from x-pack/filebeat/input/websocket/cel.go
rename to x-pack/filebeat/input/streaming/cel.go
index 0938da053535..b26b004d92cb 100644
--- a/x-pack/filebeat/input/websocket/cel.go
+++ b/x-pack/filebeat/input/streaming/cel.go
@@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
-package websocket
+package streaming
import (
"compress/gzip"
diff --git a/x-pack/filebeat/input/websocket/config.go b/x-pack/filebeat/input/streaming/config.go
similarity index 99%
rename from x-pack/filebeat/input/websocket/config.go
rename to x-pack/filebeat/input/streaming/config.go
index 490a407790d7..be4ff28180e9 100644
--- a/x-pack/filebeat/input/websocket/config.go
+++ b/x-pack/filebeat/input/streaming/config.go
@@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
-package websocket
+package streaming
import (
"context"
diff --git a/x-pack/filebeat/input/websocket/config_test.go b/x-pack/filebeat/input/streaming/config_test.go
similarity index 99%
rename from x-pack/filebeat/input/websocket/config_test.go
rename to x-pack/filebeat/input/streaming/config_test.go
index c1aaac973283..c74fba4589e9 100644
--- a/x-pack/filebeat/input/websocket/config_test.go
+++ b/x-pack/filebeat/input/streaming/config_test.go
@@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
-package websocket
+package streaming
import (
"fmt"
diff --git a/x-pack/filebeat/input/websocket/input.go b/x-pack/filebeat/input/streaming/input.go
similarity index 96%
rename from x-pack/filebeat/input/websocket/input.go
rename to x-pack/filebeat/input/streaming/input.go
index 8c9f2d629900..01106c945780 100644
--- a/x-pack/filebeat/input/websocket/input.go
+++ b/x-pack/filebeat/input/streaming/input.go
@@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
-package websocket
+package streaming
import (
"context"
@@ -38,7 +38,7 @@ type StreamFollower interface {
}
const (
- inputName string = "websocket"
+ inputName string = "streaming"
root string = "state"
)
@@ -47,8 +47,19 @@ func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin {
Name: inputName,
Stability: feature.Experimental,
Deprecated: false,
+ Info: "Streaming Input",
+ Doc: "Collect data from streaming data sources",
+ Manager: NewInputManager(log, store),
+ }
+}
+
+func PluginWebsocketAlias(log *logp.Logger, store inputcursor.StateStore) v2.Plugin {
+ return v2.Plugin{
+ Name: "websocket",
+ Stability: feature.Experimental,
+ Deprecated: false,
Info: "Websocket Input",
- Doc: "Collect data from websocket api endpoints",
+ Doc: "Collect data from websocket data sources",
Manager: NewInputManager(log, store),
}
}
diff --git a/x-pack/filebeat/input/websocket/input_manager.go b/x-pack/filebeat/input/streaming/input_manager.go
similarity index 99%
rename from x-pack/filebeat/input/websocket/input_manager.go
rename to x-pack/filebeat/input/streaming/input_manager.go
index 7611b6c341a8..f20b867755b2 100644
--- a/x-pack/filebeat/input/websocket/input_manager.go
+++ b/x-pack/filebeat/input/streaming/input_manager.go
@@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
-package websocket
+package streaming
import (
"github.com/elastic/go-concert/unison"
diff --git a/x-pack/filebeat/input/websocket/input_test.go b/x-pack/filebeat/input/streaming/input_test.go
similarity index 98%
rename from x-pack/filebeat/input/websocket/input_test.go
rename to x-pack/filebeat/input/streaming/input_test.go
index 4f3d9317b9f5..69a65c8207f1 100644
--- a/x-pack/filebeat/input/websocket/input_test.go
+++ b/x-pack/filebeat/input/streaming/input_test.go
@@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
-package websocket
+package streaming
import (
"context"
@@ -472,8 +472,8 @@ func TestURLEval(t *testing.T) {
}
name := input{}.Name()
- if name != "websocket" {
- t.Errorf(`unexpected input name: got:%q want:"websocket"`, name)
+ if name != "streaming" {
+ t.Errorf(`unexpected input name: got:%q want:"streaming"`, name)
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -524,8 +524,8 @@ func TestInput(t *testing.T) {
}
name := input{}.Name()
- if name != "websocket" {
- t.Errorf(`unexpected input name: got:%q want:"websocket"`, name)
+ if name != "streaming" {
+ t.Errorf(`unexpected input name: got:%q want:"streaming"`, name)
}
src := &source{conf}
err = input{}.Test(src, v2.TestContext{})
diff --git a/x-pack/filebeat/input/websocket/metrics.go b/x-pack/filebeat/input/streaming/metrics.go
similarity index 99%
rename from x-pack/filebeat/input/websocket/metrics.go
rename to x-pack/filebeat/input/streaming/metrics.go
index 34e6a9620f93..6f94b5e888b3 100644
--- a/x-pack/filebeat/input/websocket/metrics.go
+++ b/x-pack/filebeat/input/streaming/metrics.go
@@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
-package websocket
+package streaming
import (
"github.com/rcrowley/go-metrics"
diff --git a/x-pack/filebeat/input/websocket/redact.go b/x-pack/filebeat/input/streaming/redact.go
similarity index 99%
rename from x-pack/filebeat/input/websocket/redact.go
rename to x-pack/filebeat/input/streaming/redact.go
index 86583f0691c1..c6124e885a3a 100644
--- a/x-pack/filebeat/input/websocket/redact.go
+++ b/x-pack/filebeat/input/streaming/redact.go
@@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
-package websocket
+package streaming
import (
"strings"
diff --git a/x-pack/filebeat/input/websocket/redact_test.go b/x-pack/filebeat/input/streaming/redact_test.go
similarity index 99%
rename from x-pack/filebeat/input/websocket/redact_test.go
rename to x-pack/filebeat/input/streaming/redact_test.go
index c66db60d97b0..1b6bff097d3d 100644
--- a/x-pack/filebeat/input/websocket/redact_test.go
+++ b/x-pack/filebeat/input/streaming/redact_test.go
@@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
-package websocket
+package streaming
import (
"fmt"
diff --git a/x-pack/filebeat/input/websocket/websocket.go b/x-pack/filebeat/input/streaming/websocket.go
similarity index 99%
rename from x-pack/filebeat/input/websocket/websocket.go
rename to x-pack/filebeat/input/streaming/websocket.go
index c512f2016ea0..409d1ee369b1 100644
--- a/x-pack/filebeat/input/websocket/websocket.go
+++ b/x-pack/filebeat/input/streaming/websocket.go
@@ -2,7 +2,7 @@
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
-package websocket
+package streaming
import (
"bytes"