Skip to content

Feature 936 Standalone connection pool

superxan edited this page Sep 14, 2021 · 20 revisions

Feature 936 Standalone connection pool

Requirement

Motivation

As data process middleware, eKuiper must exchange data with other entities. In eKuiper, we have Source and Sink to do the data input and output work. Both Source and Sink need think about data transport and authentication with other entities, for example eKuiper needs get raw data from mqtt broker, after processing, it needs send out the processed data to a HTTP Server. In this case, Source needs do the mqtt connection and authentication work, as for the Sink, it needs do the http connection and authentication work.

However, in some cases both Source and Sink are the same type, which means eKuiper can only set up a conneciton which Source and Sink can reuse. Here are two examples:

  • mqtt source and mqtt sink across all rules may want to use the same mqtt connection
  • There is no way for EdgeX sink to connect to secure redis. This could be a way to reuse the connection with source

Survey

At existing implement, Source and Sink set up and hold the connection by themself. We need introduce a connection pool from which Source and Sink can get connections. Here is an example: https://github.com/fatih/pool

Source need implements these methods

   type Source interface {
    //Should be sync function for normal case. The container will run it in go func
    Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
    //Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
    //read from the yaml
    Configure(datasource string, props map[string]interface{}) error
    Closable
  }

Sink need implements these methods

   type Sink interface {
	//Should be sync function for normal case. The container will run it in go func
	Open(ctx StreamContext) error
	//Called during initialization. Configure the sink with the properties from rule action definition
	Configure(props map[string]interface{}) error
	//Called when each row of data has transferred to this sink
	Collect(ctx StreamContext, data interface{}) error
	Closable
    }

Design

Now we support different connection types, like EdgeX、MQTT. Under each connection types, user can configure multiple configure keys. When user set up a stream, he chooses the connection type and configure key. After that, eKuiper knows how to set up the connection:

  • Set up which kind client, MQTT or EdgeX
  • Connect to which endpoint

In order to share the connection, we need introduce connection pool to manage the connections. Source and Sink just get the target connection from pools, when finish using, return it. The connection pool must have these capabilities:

  • builder function to build different client types according to the connection type and configure key
  • get the unique connection by the connection type and configure key
  • when all Sink and Source have no reference to a specific connection, should close it

According to the user configuration, the Sink/Soure implement should decide whether use the share connection. When use share connection, need get connection from Connection pool; otherwise, just init the connection by itself. Connection pool should provide the following APIs

        // called anytime when need get the client
	GetConnection(connectionType string, configKey strgin) (interface{}, error)

	// called when this source/sink exit 
	Close(connectionType string, configKey strgin) error

Management

configure model

recommand configuration file layout

   etc/sources   
   etc/sinks