-
Notifications
You must be signed in to change notification settings - Fork 420
Feature 936 Standalone connection pool
- Status: Design
- Author: Jianxiang Ran
- Co-author: Rocky Jin
- Discussion: https://github.com/lf-edge/ekuiper/issues/936
As data process middleware, eKuiper must exchange data with other components. In eKuiper, we use Source
to represent the data input and Sink
for the data output. In some cases, Source
and Sink
need share the same connection:
- Mqtt source and mqtt sink across all rules may want to use the same mqtt connection
- There is no way for EdgeX Sink to connect to secure redis. This could be a way to reuse the connection with Source
At existing implement, Source and Sink set up and hold the client instances by themselvs. We can introduce a client-manager
entity to maintain different client instances. Since the client instance are thread-safe, it can be used by different Sources and Sinks at the same time.
client-manager
is a key-value storage for client instances: connection type (mqtt or edgeX) and connection selector's combination as key, client instances as values.
connection type: different Source/Sink types, like EdgeX, MQTT
connection selector: user can configure Source or Sink address by choosing different endpointURL.
The client-manager
must have these capabilities:
- builder function to build different client types according to the connection type and configure key
- get the unique connection by the connection type and connection key
- when all Sink and Source have no reference to a specific connection, should close it
client-manager
should provide the following APIs:
// called anytime when need the client
GetConnection(connectionType string, configKey strgin) (interface{}, error)
// called when this source/sink exit
ReleaseConnection(connectionType string, configKey strgin) error
In this example, Source and Sink use MQTT. we need following steps:
-
in connection.yaml file, write the mqtt connection related configurations from which Source and Sink can choose.
-
in Source mqtt_source.yaml, create custom configuration items
demo-config
which select the connection configurationmqtt.conn2
pre-defined in step1 then create stream and specifyCONF_KEY=demo-config
. This will let Source usemqtt.conn2
to connect
demo (
USERID BIGINT,
FIRST_NAME STRING,
LAST_NAME STRING,
NICKNAMES ARRAY(STRING),
Gender BOOLEAN
) WITH (DATASOURCE="test/", FORMAT="JSON", TYPE="mqtt", CONF_KEY="demo-config");
- create sink actions and specify
connection-selector=mqtt.conn2
. This will let Sink usemqtt.conn2
to connect
"mqtt": {
"connection-selector": "mqtt.conn2",
"server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
"topic": "devices/result",
"qos": 1,
"clientId": "demo_001",
"certificationPath": "keys/d3807d9fa5-certificate.pem",
"privateKeyPath": "keys/d3807d9fa5-private.pem.key",
"insecureSkipVerify": false,
"retained": false
}
- write MQTT Source/Sink code
When Source or Sink have this configconnection-selector=mqtt.conn2
, Source/Sink need get connection fromclient-manager
; otherwise, just init the connection by itself.
Open(config) {
if config.connection-selector != "" {
mqtt_client := GetConnection("mqtt", "conn2")
} else {
mqtt_client := GetConnByMyself()
}
}
Close(config) {
if config.connection-selector != "" {
ReleaseConnection("mqtt", "conn2")
} else {
mqtt_client := DropConnByMyself()
}
}