Skip to content

Feature 936 Standalone connection pool

Jianxiang Ran edited this page Sep 16, 2021 · 20 revisions

Feature 936 Standalone connection pool

Motivation

As data process middleware, eKuiper must exchange data with other components. In eKuiper, we use Source to represent the data input and Sink for the data output. In some cases, Source and Sink need share the same connection:

  • Mqtt source and mqtt sink across all rules may want to use the same mqtt connection
  • There is no way for EdgeX Sink to connect to secure redis. This could be a way to reuse the connection with Source

Survey

At existing implement, Source and Sink set up and hold the client instances by themselvs. We can introduce a client-manager entity to maintain different client instances. Since the client instance are thread-safe, it can be used by different Sources and Sinks at the same time.

Design

client-manager is a key-value storage for client instances: connection type (mqtt or edgeX) and connection selector's combination as key, client instances as values. connection type: different Source/Sink types, like EdgeX, MQTT connection selector: user can configure Source or Sink address by choosing different endpointURL. design

The client-manager must have these capabilities:

  • builder function to build different client types according to the connection type and configure key
  • get the unique connection by the connection type and connection key
  • when all Sink and Source have no reference to a specific connection, should close it

client-manager should provide the following APIs:

        // called anytime when need the client
	GetConnection(connectionType string, configKey strgin) (interface{}, error)

	// called when this source/sink exit 
	ReleaseConnection(connectionType string, configKey strgin) error

Use Example

In this example, Source and Sink use MQTT. usage we need following steps:

  1. in connection.yaml file, write the mqtt connection related configurations from which Source and Sink can choose.

  2. in Source mqtt_source.yaml, create custom configuration items demo-config which select the connection configuration mqtt.conn2 pre-defined in step1 then create stream and specify CONF_KEY=demo-config. This will let Source use mqtt.conn2 to connect

   demo (
  	USERID BIGINT,
  	FIRST_NAME STRING,
  	LAST_NAME STRING,
  	NICKNAMES ARRAY(STRING),
  	Gender BOOLEAN
   ) WITH (DATASOURCE="test/", FORMAT="JSON", TYPE="mqtt", CONF_KEY="demo-config");
  1. create sink actions and specify connection-selector=mqtt.conn2. This will let Sink use mqtt.conn2 to connect
  "mqtt": {
        "connection-selector": "mqtt.conn2",
        "server": "ssl://xyz-ats.iot.us-east-1.amazonaws.com:8883",
        "topic": "devices/result",
        "qos": 1,
        "clientId": "demo_001",
        "certificationPath": "keys/d3807d9fa5-certificate.pem",
        "privateKeyPath": "keys/d3807d9fa5-private.pem.key", 
        "insecureSkipVerify": false,
        "retained": false
      }
  1. write MQTT Source/Sink code
    When Source or Sink have this config connection-selector=mqtt.conn2, Source/Sink need get connection from client-manager; otherwise, just init the connection by itself.
Open(config) {
  if config.connection-selector != "" {
      mqtt_client := GetConnection("mqtt", "conn2")
  } else {
      mqtt_client := GetConnByMyself()
  } 
}

 Close(config) {
  if config.connection-selector != "" {
     ReleaseConnection("mqtt", "conn2")
  } else {
      mqtt_client := DropConnByMyself()
  } 
}

Management