Skip to content

Commit

Permalink
Timestream Prometheus Connector with AWS PrivateLink (#87)
Browse files Browse the repository at this point in the history
* Timestream Prometheus Connector with AWS PrivateLink (#14)

- Added new config for setting base endpoints
- Added SAM template and guide

* updating golangci-lint and fixing used version of go (#15)
  • Loading branch information
fredjoonpark authored Feb 19, 2025
1 parent 8917c42 commit df6e96a
Show file tree
Hide file tree
Showing 10 changed files with 380 additions and 185 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go-build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: 'stable'
go-version: '1.24'
cache: false
env:
GO111MODULE: on
Expand All @@ -59,5 +59,5 @@ jobs:
uses: golangci/golangci-lint-action@v6
with:
skip-cache: true
version: v1.60.1
version: v1.64.5
args: --exclude ".Log(.*)|format.Set|level.Set" --timeout=2m
2 changes: 2 additions & 0 deletions configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var (
defaultDatabaseConfig = &configuration{flag: "default-database", envFlag: "default_database", defaultValue: ""}
defaultTableConfig = &configuration{flag: "default-table", envFlag: "default_table", defaultValue: ""}
enableSigV4AuthConfig = &configuration{flag: "enable-sigv4-auth", envFlag: "enable_sigv4_auth", defaultValue: "true"}
queryBaseEndpointConfig = &configuration{flag: "query-base-endpoint", envFlag: "query_base_endpoint", defaultValue: ""}
writeBaseEndpointConfig = &configuration{flag: "write-base-endpoint", envFlag: "write_base_endpoint", defaultValue: ""}
listenAddrConfig = &configuration{flag: "web.listen-address", envFlag: "", defaultValue: ":9201"}
telemetryPathConfig = &configuration{flag: "web.telemetry-path", envFlag: "", defaultValue: "/metrics"}
failOnLabelConfig = &configuration{flag: "fail-on-long-label", envFlag: "fail_on_long_label", defaultValue: "false"}
Expand Down
31 changes: 25 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ type connectionConfig struct {
telemetryPath string
maxReadRetries int
maxWriteRetries int
queryBaseEndpoint string
writeBaseEndpoint string
certificate string
key string
}
Expand All @@ -122,13 +124,14 @@ func main() {
logger := cfg.createLogger()

ctx := context.Background()
awsQueryConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxReadRetries)

awsQueryConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxReadRetries, cfg.queryBaseEndpoint)
if err != nil {
timestream.LogError(logger, "Failed to build AWS configuration for query", err)
os.Exit(1)
}

awsWriteConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxWriteRetries)
awsWriteConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxWriteRetries, cfg.writeBaseEndpoint)
if err != nil {
timestream.LogError(logger, "Failed to build AWS configuration for write", err)
os.Exit(1)
Expand Down Expand Up @@ -185,12 +188,12 @@ func lambdaHandler(req events.APIGatewayProxyRequest) (events.APIGatewayProxyRes
return createErrorResponse(errors.NewParseBasicAuthHeaderError().(*errors.ParseBasicAuthHeaderError).Message())
}
}
awsQueryConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxReadRetries)
awsQueryConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxReadRetries, cfg.queryBaseEndpoint)
if err != nil {
timestream.LogError(logger, "Failed to build AWS configuration for query", err)
os.Exit(1)
}
awsWriteConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxWriteRetries)
awsWriteConfigs, err := cfg.buildAWSConfig(ctx, cfg.maxWriteRetries, cfg.writeBaseEndpoint)
if err != nil {
timestream.LogError(logger, "Failed to build AWS configuration for write", err)
os.Exit(1)
Expand Down Expand Up @@ -394,6 +397,9 @@ func parseEnvironmentVariables() (*connectionConfig, error) {
return nil, errors.NewParseRetriesError(writeRetries, "write")
}

cfg.queryBaseEndpoint = getOrDefault(queryBaseEndpointConfig)
cfg.writeBaseEndpoint = getOrDefault(writeBaseEndpointConfig)

cfg.promlogConfig = promlog.Config{Level: &promlog.AllowedLevel{}, Format: &promlog.AllowedFormat{}}
cfg.promlogConfig.Level.Set(getOrDefault(promlogLevelConfig))
cfg.promlogConfig.Format.Set(getOrDefault(promlogFormatConfig))
Expand Down Expand Up @@ -431,6 +437,10 @@ func parseFlags() *connectionConfig {
a.Flag(certificateConfig.flag, "TLS server certificate file.").Default(certificateConfig.defaultValue).StringVar(&cfg.certificate)
a.Flag(keyConfig.flag, "TLS server private key file.").Default(keyConfig.defaultValue).StringVar(&cfg.key)
a.Flag(enableSigV4AuthConfig.flag, "Whether to enable SigV4 authentication with the API Gateway. Default to 'false'.").Default(enableSigV4AuthConfig.defaultValue).StringVar(&enableSigV4Auth)
a.Flag(queryBaseEndpointConfig.flag, "Override the default Timestream query endpoint (e.g., a VPC Endpoint).").
Default(queryBaseEndpointConfig.defaultValue).StringVar(&cfg.queryBaseEndpoint)
a.Flag(writeBaseEndpointConfig.flag, "Override the default Timestream write endpoint (e.g., a VPC Endpoint).").
Default(writeBaseEndpointConfig.defaultValue).StringVar(&cfg.writeBaseEndpoint)

flag.AddFlags(a, &cfg.promlogConfig)

Expand All @@ -439,7 +449,12 @@ func parseFlags() *connectionConfig {
os.Exit(1)
}

if err := cfg.parseBoolFromStrings(enableLogging, failOnLongMetricLabelName, failOnInvalidSample, enableSigV4Auth); err != nil {
if err := cfg.parseBoolFromStrings(
enableLogging,
failOnLongMetricLabelName,
failOnInvalidSample,
enableSigV4Auth,
); err != nil {
os.Exit(1)
}

Expand All @@ -457,7 +472,7 @@ func parseFlags() *connectionConfig {
}

// buildAWSConfig builds a aws.Config and return the pointer of the config.
func (cfg *connectionConfig) buildAWSConfig(ctx context.Context, maxRetries int) (aws.Config, error) {
func (cfg *connectionConfig) buildAWSConfig(ctx context.Context, maxRetries int, baseEndpoint string) (aws.Config, error) {
awsConfig, err := config.LoadDefaultConfig(ctx,
config.WithRegion(cfg.clientConfig.region),
config.WithRetryer(func() aws.Retryer {
Expand All @@ -469,6 +484,10 @@ func (cfg *connectionConfig) buildAWSConfig(ctx context.Context, maxRetries int)
if err != nil {
return aws.Config{}, fmt.Errorf("failed to build AWS config: %w", err)
}

if baseEndpoint != "" {
awsConfig.BaseEndpoint = aws.String(baseEndpoint)
}
return awsConfig, nil
}

Expand Down
8 changes: 7 additions & 1 deletion main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,16 +661,19 @@ func TestBuildAWSConfig(t *testing.T) {
name string
maxRetries int
expectedMaxAttempts int
baseEndpoint string
}{
{
name: "read config",
maxRetries: 10,
expectedMaxAttempts: 10,
baseEndpoint: "",
},
{
name: "write config",
maxRetries: 3,
expectedMaxAttempts: 3,
baseEndpoint: "https://ingest-cell1.timestream.us-west-2.amazonaws.com",
},
}

Expand All @@ -685,7 +688,7 @@ func TestBuildAWSConfig(t *testing.T) {
maxWriteRetries: test.expectedMaxAttempts,
}

actualConfig, err := input.buildAWSConfig(context.Background(), test.maxRetries)
actualConfig, err := input.buildAWSConfig(context.Background(), test.maxRetries, test.baseEndpoint)

assert.Nil(t, err)
assert.NotNil(t, actualConfig)
Expand All @@ -700,6 +703,9 @@ func TestBuildAWSConfig(t *testing.T) {
if ok {
assert.Equal(t, test.expectedMaxAttempts, standardRetryer.MaxAttempts())
}
if test.baseEndpoint != "" {
assert.Equal(t, aws.String(test.baseEndpoint), actualConfig.BaseEndpoint)
}
})
}
}
Expand Down
141 changes: 141 additions & 0 deletions privatelink/DEVELOPER_README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
# Timestream Prometheus Connector with AWS PrivateLink

## Overview

This guide explains how to set up the Prometheus Connector to ingest data to Amazon Timestream from within an isolated VPC environment using [AWS PrivateLink](https://aws.amazon.com/privatelink/).

This [serverless application](https://aws.amazon.com/serverless/) consists of the following:
- [Amazon EC2](https://aws.amazon.com/ec2/getting-started/) instance that will host the Prometheus Connector.
- [VPC Endpoints](https://docs.aws.amazon.com/whitepapers/latest/aws-privatelink/what-are-vpc-endpoints.html) for securely communicating with AWS services using PrivateLink.

This application assumes that the VPC in which the template will be deployed has no internet access and ensures that all communication stays within Amazon's internal network.

## Prerequisites

1. A VPC with at least two private subnets and route tables.
2. A Timestream database and table.
3. [Read and write cells](https://docs.aws.amazon.com/timestream/latest/developerguide/architecture.html#cells) for your Timestream account. Amazon routes requests to the write and query endpoints of the cell that your account has been mapped to for a given region.

To get your assigned cells using `awscli`:

For read endpoint:
```
aws timestream-query describe-endpoints --region <AWS_REGION>
```

For write endpoint:
```
aws timestream-write describe-endpoints --region <AWS_REGION>
```

Example output for the write endpoint:
```
{
"Endpoints": [
{
"Address": "ingest-cell1.timestream.us-west-2.amazonaws.com",
"CachePeriodInMinutes": 1440
}
]
}
```
Take note of your assigned cells (`ingest-cell1` for the above example) for both read and write endpoints.


## Deployment

From your existing VPC, you will need the following values:
- VPC ID: This is the ID of your existing VPC
- VPC CIDR : This is the CIDR range for your VPC
- Private Subnet IDs: This is where the EC2 instance and VPC endpoints will be deployed
- Private Route Table ID(s): This is how the [S3 Gateway endpoint](https://docs.aws.amazon.com/vpc/latest/privatelink/vpc-endpoints-s3.html) will resolve requests
- Query and Write cells: These are your assigned endpoint cells for Timestream


1. From the `privatelink` directory, run the following command to deploy the SAM template:

```
sam deploy --parameter-overrides "VpcId=<VPC_ID> VpcCidrIp=<VPC_CIDR_IP> PrivateSubnetIds=<PRIVATE_SUBNET_ID_1>,<PRIVATE_SUBNET_ID_2> PrivateRouteTableIds=<PRIVATE_ROUTE_TABLE_ID> TimestreamQueryCell=<QUERY_CELL> TimestreamWriteCell=<WRITE_CELL> --region <AWS_REGION>"
```

To view the full set of `sam deploy` options see the [sam deploy documentation](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-cli-command-reference-sam-deploy.html).

2. The deployment will have the following outputs upon completion:

- `InstanceId`: ID of the EC2 instance

An example of the output:

```
------------------------------------------------------------------------------
Outputs
------------------------------------------------------------------------------
Key InstanceId
Description ID of the EC2 instance
Value i-08a5d7e1700c9be5a
------------------------------------------------------------------------------
```

3. Start an AWS SSM session, replacing `INSTANCE_ID` with your EC2 instance ID from deployment. You can install the [plugin here.](https://docs.aws.amazon.com/systems-manager/latest/userguide/session-manager-working-with-install-plugin.html)

```shell
aws ssm start-session --target i-<INSTANCE_ID>
```

4. Install the Prometheus Connector.

1. Create a directory for the connector.
```
mkdir ~/connector && cd ~/connector
```
2. Download the precompiled binary from S3 for your region. [See here](https://github.com/awslabs/amazon-timestream-connector-prometheus/tags) for released versions.
```shell
curl -O https://timestreamassets-<AWS_REGION>.s3.<AWS_REGION>.amazonaws.com/timestream-prometheus-connector/timestream-prometheus-connector-linux-arm64-<VERSION>.zip
```
3. Unzip the binary.
```shell
unzip timestream-prometheus-connector-linux-arm64-<VERSION>.zip
```
4. Disable endpoint discovery by setting the `AWS_ENABLE_ENDPOINT_DISCOVERY` environment variable to `false`. This ensures requests from the connector are routed through VPC endpoints.
```
export AWS_ENABLE_ENDPOINT_DISCOVERY=false
```
5. Launch Prometheus Connector
Replace the following variables to configure your Timestream database, region, and assigned cells.
- `DEFAULT_DATABASE`: Specifies the default Timestream database for the Prometheus Connector.
- `DEFAULT_TABLE`: Specifies the default table for storing Prometheus metrics.
- `AWS_REGION`: Defines the AWS region.
- `QUERY_CELL`: Defines the query endpoint cell for Timestream.
- `INGEST_CELL`: Defines the ingestion endpoint cell for Timestream.
Run the Prometheus Connector:
```
./bootstrap \
--default-database=<DEFAULT_DATABASE> \
--default-table=<DEFAULT_TABLE> \
--region=<AWS_REGION> \
--query-base-endpoint=https://<QUERY_CELL>.timestream.<AWS_REGION>.amazonaws.com \
--write-base-endpoint=https://<INGEST_CELL>.timestream.<AWS_REGION>.amazonaws.com
```
The connector is now ready to ingest data to Timestream!
To see an example of how Prometheus can be configured, [see here](https://github.com/awslabs/amazon-timestream-connector-prometheus?tab=readme-ov-file#prometheus-configuration).
### Cleanup
Delete the CloudFormation stack. From the `privatelink` directory, run the following command:
```shell
sam delete --region <AWS_REGION>
```
10 changes: 10 additions & 0 deletions privatelink/samconfig.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version = 0.1
[default]
[default.deploy]
[default.deploy.parameters]
capabilities = "CAPABILITY_NAMED_IAM"
confirm_changeset = true
profile = "default"
region = "us-west-2"
stack_name = "PrometheusConnectorPrivateLink"
resolve_s3 = true
Loading

0 comments on commit df6e96a

Please sign in to comment.