-
Notifications
You must be signed in to change notification settings - Fork 420
Feature 936 Standalone connection pool
- Status: Design
- Author: Jianxiang Ran
- Co-author: Rocky Jin
- Discussion: https://github.com/lf-edge/ekuiper/issues/936
As data process middleware, eKuiper must exchange data with other entities. In eKuiper, we have Source
and Sink
to do the data input and output work. Both Source
and Sink
need think about data transport and authentication with other entities, for example eKuiper needs get raw data from mqtt broker, after processing, it needs send out the processed data to a HTTP Server. In this case, Source
needs do the mqtt connection and authentication work, as for the Sink
, it needs do the http connection and authentication work.
However, in some cases both Source
and Sink
are the same type, which means eKuiper can only set up a conneciton which Source
and Sink
can reuse. Here are two examples:
- mqtt source and mqtt sink across all rules may want to use the same mqtt connection
- There is no way for EdgeX sink to connect to secure redis. This could be a way to reuse the connection with source
At existing implement, Source and Sink set up and hold the connection by themself. We need introduce a connection pool from which Source and Sink can get connections. Here is an example: https://github.com/fatih/pool
Source need implements these methods
type Source interface {
//Should be sync function for normal case. The container will run it in go func
Open(ctx StreamContext, consumer chan<- SourceTuple, errCh chan<- error)
//Called during initialization. Configure the source with the data source(e.g. topic for mqtt) and the properties
//read from the yaml
Configure(datasource string, props map[string]interface{}) error
Closable
}
Sink need implements these methods
type Sink interface {
//Should be sync function for normal case. The container will run it in go func
Open(ctx StreamContext) error
//Called during initialization. Configure the sink with the properties from rule action definition
Configure(props map[string]interface{}) error
//Called when each row of data has transferred to this sink
Collect(ctx StreamContext, data interface{}) error
Closable
}
Now we support different connection types, like EdgeX、MQTT. Under each connection types, user can configure multiple configure keys. When user set up a stream, he chooses the connection type and configure key. After that, eKuiper knows how to set up the connection:
- Set up which kind client, MQTT or EdgeX
- Connect to which endpoint
In order to share the connection, we need introduce connection pool to manage the connections. Source and Sink just get the target connection from pools, when finish using, return it. The connection pool must have these capabilities:
- builder function to build different client types according to the connection type and configure key
- get the unique connection by the connection type and configure key
- when all Sink and Source have no reference to a specific connection, should close it
According to the user configuration, the Sink/Soure implement should decide whether use the share connection. When use share connection, need get connection from Connection pool; otherwise, just init the connection by itself. Connection pool should provide the following APIs
// called anytime when need get the client
GetConnection(connectionType string, configKey strgin) (interface{}, error)
// called when this source/sink exit
Close(connectionType string, configKey strgin) error
recommand configuration file layout
etc/sources
etc/sinks