Skip to content

Commit

Permalink
feat: proxy and custom SSL certificates support (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
jansimonb authored Feb 27, 2025
1 parent 924a355 commit 8aa0156
Show file tree
Hide file tree
Showing 9 changed files with 235 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 2.4.0 [unreleased]

### Features

1. [#141](https://github.com/InfluxCommunity/influxdb3-go/pull/141): Add proxy and custom SSL root certificate support.

## 2.3.0 [2025-02-20]

### Features
Expand Down
12 changes: 9 additions & 3 deletions examples/Basic/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ func main() {
url := os.Getenv("INFLUX_URL")
token := os.Getenv("INFLUX_TOKEN")
database := os.Getenv("INFLUX_DATABASE")
// (optional) Custom SSL root certificates file path
sslRootsFilePath := os.Getenv("INFLUX_SSL_ROOTS_FILE_PATH")
// (optional) Proxy URL
proxyURL := os.Getenv("INFLUX_PROXY_URL")

// Instantiate a client using your credentials.
client, err := influxdb3.New(influxdb3.ClientConfig{
Host: url,
Token: token,
Database: database,
Host: url,
Token: token,
Database: database,
SSLRootsFilePath: sslRootsFilePath,
Proxy: proxyURL,
})
if err != nil {
panic(err)
Expand Down
70 changes: 68 additions & 2 deletions influxdb3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ package influxdb3

import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"log/slog"
"mime"
"net/http"
"net/url"
"os"
"path"
"strconv"
"strings"
Expand Down Expand Up @@ -94,9 +98,48 @@ func New(config ClientConfig) (*Client, error) {
}
c.authorization = fmt.Sprintf("%s %s", authScheme, c.config.Token)

// Prepare SSL certificate pool (if host URL is secure)
var certPool *x509.CertPool
hostPortURL, safe := ReplaceURLProtocolWithPort(c.config.Host)
if safe == nil || *safe {
// Use the system certificate pool
certPool, err = x509.SystemCertPool()
if err != nil {
return nil, fmt.Errorf("x509: %w", err)
}

// Set additional SSL root certificates (if configured)
if config.SSLRootsFilePath != "" {
certs, err := os.ReadFile(config.SSLRootsFilePath)
if err != nil {
return nil, fmt.Errorf("error reading %s: %w", config.SSLRootsFilePath, err)
}
ok := certPool.AppendCertsFromPEM(certs)
if !ok {
slog.Warn("No valid certificates found in " + config.SSLRootsFilePath)
}
}
}

// Prepare proxy (if configured)
var proxyURL *url.URL
if config.Proxy != "" {
proxyURL, err = url.Parse(config.Proxy)
if err != nil {
return nil, fmt.Errorf("parsing proxy URL: %w", err)
}
}

// Prepare HTTP client
if c.config.HTTPClient == nil {
c.config.HTTPClient = http.DefaultClient
var copied = *http.DefaultClient
c.config.HTTPClient = &copied
}
if certPool != nil {
setHTTPClientCertPool(c.config.HTTPClient, certPool)
}
if proxyURL != nil {
setHTTPClientProxy(c.config.HTTPClient, proxyURL)
}

// Use default write option if not set
Expand All @@ -106,14 +149,37 @@ func New(config ClientConfig) (*Client, error) {
}

// Init FlightSQL client
err = c.initializeQueryClient()
err = c.initializeQueryClient(hostPortURL, certPool, proxyURL)
if err != nil {
return nil, fmt.Errorf("flight client: %w", err)
}

return c, nil
}

func ensureTransportSet(httpClient *http.Client) {
if httpClient.Transport == nil {
httpClient.Transport = http.DefaultTransport.(*http.Transport).Clone()
}
}

func setHTTPClientProxy(httpClient *http.Client, proxyURL *url.URL) {
ensureTransportSet(httpClient)
if transport, ok := httpClient.Transport.(*http.Transport); ok {
transport.Proxy = http.ProxyURL(proxyURL)
}
}

func setHTTPClientCertPool(httpClient *http.Client, certPool *x509.CertPool) {
ensureTransportSet(httpClient)
if transport, ok := httpClient.Transport.(*http.Transport); ok {
transport.TLSClientConfig = &tls.Config{
RootCAs: certPool,
MinVersion: tls.VersionTLS12,
}
}
}

// NewFromConnectionString creates new Client from the specified connection string.
// Parameters:
// - connectionString: connection string in URL format.
Expand Down
74 changes: 74 additions & 0 deletions influxdb3/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"net/http/httptest"
"net/url"
"os"
"path/filepath"
"strings"
"testing"

Expand Down Expand Up @@ -88,6 +89,79 @@ func TestNew(t *testing.T) {
assert.EqualValues(t, DefaultWriteOptions, *c.config.WriteOptions)
}

func TestNewWithCertificates(t *testing.T) {
// Valid certificates.
certFilePath := filepath.Join("testdata", "valid_certs.pem")
c, err := New(ClientConfig{
Host: "https://localhost:8086",
Token: "my-token",
SSLRootsFilePath: certFilePath,
})
require.NoError(t, err)
assert.NotNil(t, c)
assert.Equal(t, certFilePath, c.config.SSLRootsFilePath)

// Invalid certificates.
certFilePath = filepath.Join("testdata", "invalid_certs.pem")
c, err = New(ClientConfig{
Host: "https://localhost:8086",
Token: "my-token",
SSLRootsFilePath: certFilePath,
})
require.NoError(t, err)
assert.NotNil(t, c)
assert.Equal(t, certFilePath, c.config.SSLRootsFilePath)

// Missing certificates file.
certFilePath = filepath.Join("testdata", "non-existing-file")
c, err = New(ClientConfig{
Host: "https://localhost:8086",
Token: "my-token",
SSLRootsFilePath: certFilePath,
})
assert.Nil(t, c)
require.Error(t, err)
assert.ErrorContains(t, err, "error reading testdata/non-existing-file")
}

func TestNewWithProxy(t *testing.T) {
defer func() {
// Cleanup: unset proxy.
os.Unsetenv("HTTPS_PROXY")
}()

// Invalid proxy url.
c, err := New(ClientConfig{
Host: "http://localhost:8086",
Token: "my-token",
Proxy: "http://proxy:invalid-port",
})
assert.Nil(t, c)
require.Error(t, err)
assert.ErrorContains(t, err, "parsing proxy URL")

// Valid proxy url.
c, err = New(ClientConfig{
Host: "http://localhost:8086",
Token: "my-token",
Proxy: "http://proxy:8888",
})
require.NoError(t, err)
assert.NotNil(t, c)
assert.Equal(t, "http://proxy:8888", c.config.Proxy)

// Valid proxy url with HTTPS_PROXY env already set.
t.Setenv("HTTPS_PROXY", "http://another-proxy:8888")
c, err = New(ClientConfig{
Host: "http://localhost:8086",
Token: "my-token",
Proxy: "http://proxy:8888",
})
require.NoError(t, err)
assert.NotNil(t, c)
assert.Equal(t, "http://proxy:8888", c.config.Proxy)
}

func TestURLs(t *testing.T) {
urls := []struct {
HostURL string
Expand Down
6 changes: 6 additions & 0 deletions influxdb3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ type ClientConfig struct {

// Default HTTP headers to be included in requests
Headers http.Header

// SSL root certificates file path
SSLRootsFilePath string

// Proxy URL
Proxy string
}

// validate validates the config.
Expand Down
8 changes: 5 additions & 3 deletions influxdb3/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (

func ExampleNew() {
client, err := New(ClientConfig{
Host: "https://us-east-1-1.aws.cloud2.influxdata.com",
Token: "my-token",
Database: "my-database",
Host: "https://us-east-1-1.aws.cloud2.influxdata.com",
Token: "my-token",
Database: "my-database",
SSLRootsFilePath: "/path/to/certificates.pem",
Proxy: "http://localhost:8888",
})
if err != nil {
log.Fatal(err)
Expand Down
35 changes: 25 additions & 10 deletions influxdb3/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"encoding/json"
"errors"
"fmt"
"log/slog"
"net/url"
"os"
"strings"

"github.com/apache/arrow/go/v15/arrow/flight"
Expand All @@ -39,17 +42,11 @@ import (
"google.golang.org/grpc/metadata"
)

func (c *Client) initializeQueryClient() error {
url, safe := ReplaceURLProtocolWithPort(c.config.Host)

func (c *Client) initializeQueryClient(hostPortURL string, certPool *x509.CertPool, proxyURL *url.URL) error {
var transport grpc.DialOption

if safe == nil || *safe {
pool, err := x509.SystemCertPool()
if err != nil {
return fmt.Errorf("x509: %w", err)
}
transport = grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(pool, ""))
if certPool != nil {
transport = grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(certPool, ""))
} else {
transport = grpc.WithTransportCredentials(insecure.NewCredentials())
}
Expand All @@ -58,7 +55,25 @@ func (c *Client) initializeQueryClient() error {
transport,
}

client, err := flight.NewClientWithMiddleware(url, nil, nil, opts...)
if proxyURL != nil {
// Configure the grpc-go client to use a proxy by setting the HTTPS_PROXY environment variable.
// This approach is generally safer than implementing a custom Dialer because it leverages built-in
// proxy handling, reducing the risk of introducing vulnerabilities or misconfigurations.
// More info: https://github.com/grpc/grpc-go/blob/master/Documentation/proxy.md
prevHTTPSProxy := os.Getenv("HTTPS_PROXY")
if prevHTTPSProxy != "" && prevHTTPSProxy != proxyURL.String() {
slog.Warn(
fmt.Sprintf("Environment variable HTTPS_PROXY is already set, "+
"it's value will be overridden with: %s", proxyURL.String()),
)
}
err := os.Setenv("HTTPS_PROXY", proxyURL.String())
if err != nil {
return fmt.Errorf("setenv HTTPS_PROXY: %w", err)
}
}

client, err := flight.NewClientWithMiddleware(hostPortURL, nil, nil, opts...)
if err != nil {
return fmt.Errorf("flight: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions influxdb3/testdata/invalid_certs.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-----BEGIN CERTIFICATE-----
invalid cert for testing
-----END CERTIFICATE-----
41 changes: 41 additions & 0 deletions influxdb3/testdata/valid_certs.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-----BEGIN CERTIFICATE-----
MIIDOTCCAiGgAwIBAgIUds3ngsz1wADSjtgHjJeAHEXrdtEwDQYJKoZIhvcNAQEL
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yNTAyMjYwODU2MTZaFw0zNTAy
MjQwODU2MTZaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB
AQUAA4IBDwAwggEKAoIBAQCEaJeyvvMZZ4bKdjERQor15UqejebevWJYp99Qp7qW
pggQs1h/i79mSZZ+Tsx8e3FrHQY5dAWCURXHan/Eda9mLTupzwCvHs0bYdTyo2en
CV6vszCyVqKJKWrsauwjNQ53f5XxJivvN5p19JzNYhE6SDzK3adNMhAVdeFm9Nl5
Tt8/Kdc9Dxi4i57xmW1u0HnBx6dCuM/umwoqPWhDc4meiAual0/QRhxBvNp5JaBt
dinjPlFZHTSdM9Bb1iDYxXiFngprEPa5T0I50afEcSxCbdQ9pB9GrDcQlZCusKAL
3dGzktrZVrvpWGVpuE17ZbdXgV3jIivscxL3lJPrRI+lAgMBAAGjITAfMB0GA1Ud
DgQWBBTFaNfvzmp2+fu+twUYwxcDmsxNxjANBgkqhkiG9w0BAQsFAAOCAQEAGQtb
aDgMY4Zp0of5baIv4gV5TxBxOOZAlbBiRq5oKvNoR/k9+JtIaebPvq8pAl3mx1xq
Ie9OefoVSoiVFUvPH6/7NrQPaK4wTBrW7jjJtsbi4aqgfmrXvhBvomtKKiBHtX+/
Sw6Z45OTCrFKp3c/KQLIFFEO2bkzXuP0bpYn+sWNUVxkjJwHTHuFe2DWE8xip5CJ
nk/SLK4DJMzQMF1KKvJpODnXZ38nHiqwPVf1IgeJpger+9e7H942Nx24QTn44Ugv
eo7pOm9mEf6N8ffjnsG1kXi3NTAGOgG08x9Od9g4Ko4AANpdNtW8fJUdDSmkJy71
ekdLtYTJomjJi4RKAw==
-----END CERTIFICATE-----

-----BEGIN CERTIFICATE-----
MIIDOTCCAiGgAwIBAgIUJTjRsy12uoFYBDIe0finxvca0dowDQYJKoZIhvcNAQEL
BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM
GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yNTAyMjYwODU4NDRaFw0zNTAy
MjQwODU4NDRaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw
HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB
AQUAA4IBDwAwggEKAoIBAQDPozDfuIx0LcBteLmfwGSHX46no1ZvJXmexmeJeNr2
q7wmjMxqQXuGnZgqV2ihNrN4nlqxHucOYbcSL5XpBWrhpZnyMLb0xGELz9O5q0X6
V7WxDt+6kK2H+5BltqCEwGgPn/ou+KQaEU0w+CXeMMKi+y9zHnZ2AlNPDrELLTes
ZVgQ41ayh33XqINqb0Bct18IsSq3jOQM9S3HCOKwt5/YwfuByK/nUA8nsKqbiMsn
zsPC1MLmUKk7XSX1NSVJ6yr6GxbBRc/pn+/IL3EPefx8MhWlGBiQb+vNHkw88tnb
l/9/7G3KYvmkqPmtk2MctQu27MUd8C9Tn30XkKxOxgeDAgMBAAGjITAfMB0GA1Ud
DgQWBBR3j19033GyaEUbCWQkLFyEf2NzgzANBgkqhkiG9w0BAQsFAAOCAQEAEc9m
spxIYLSDTWuchJDW3mSjGDIdQhpW2MKuqYrrd27sXKgwSmdk86U6kgffVApXRBWZ
3Hbua4xekJU+oqJ7sIhe5ZUkiGb/l/E2aXTL+AVGKfYM6uH1+FJQS5CBCCnBcnNW
ts7W0eDxruiCDvMdnMReI7mjbRQbQBfzujjKxljxwqgiCMgFrdeU5D1Exv7m+LjF
OqijDPJeeWDcYf7mwAUJ1sYJgOKC57fARSQH0JwgclxFJImIJji7ZuYgPPi9xRIm
tMZJ+Tum8KDfBY3GFdK5AguOw6Nd6ZsGEscq4Y7BvUUDFP6RsTTgOgpRrqKD3oMF
1tKPLtx124L6qXdmgw==
-----END CERTIFICATE-----

0 comments on commit 8aa0156

Please sign in to comment.