-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #40 from metrico/feature/prequet-file
Feature/prequet file
- Loading branch information
Showing
36 changed files
with
2,044 additions
and
40 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
package main | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
"github.com/prometheus/client_golang/prometheus/promhttp" | ||
"io" | ||
"net/http" | ||
"strings" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
) | ||
|
||
const CLIENTS = 1 | ||
|
||
var requestDuration = promauto.NewHistogram(prometheus.HistogramOpts{ | ||
Name: "insert_request_duration_seconds", | ||
Help: "Duration of HTTP requests in seconds", | ||
ConstLabels: prometheus.Labels{ | ||
"job": "quackdb_benchmark", | ||
"clients": fmt.Sprintf("%d", CLIENTS), | ||
"mbps": "30", | ||
}, | ||
Buckets: []float64{0.1, 0.5, 1, 5, 10, 20, 30}, | ||
}) | ||
|
||
var totalRequests = promauto.NewCounter(prometheus.CounterOpts{ | ||
Name: "total_insert_requests", | ||
Help: "Duration of HTTP requests in seconds", | ||
ConstLabels: prometheus.Labels{ | ||
"job": "quackdb_benchmark", | ||
"clients": fmt.Sprintf("%d", CLIENTS), | ||
"mbps": "30", | ||
}, | ||
}) | ||
|
||
var totalBytes = promauto.NewCounter(prometheus.CounterOpts{ | ||
Name: "total_insert_bytes", | ||
Help: "Duration of HTTP requests in seconds", | ||
ConstLabels: prometheus.Labels{ | ||
"job": "quackdb_benchmark", | ||
"clients": fmt.Sprintf("%d", CLIENTS), | ||
"mbps": "30", | ||
}, | ||
}) | ||
|
||
func main() { | ||
go func() { | ||
http.Handle("/metrics", promhttp.Handler()) | ||
if err := http.ListenAndServe(":9090", nil); err != nil { | ||
panic(err) | ||
} | ||
}() | ||
time.Sleep(time.Minute) | ||
runBenchmark(30, CLIENTS, time.Minute*5) | ||
} | ||
|
||
func runBenchmark(mbps int, clients int, timeout time.Duration) { | ||
resp, err := http.Post("http://localhost:8333/quackdb/create", "application/x-yaml", | ||
strings.NewReader(`create_table: test | ||
fields: | ||
timestamp_ns: Int64 | ||
fingerprint: Int64 | ||
str: String | ||
value: Float64 | ||
engine: Merge | ||
order_by: | ||
- timestamp_ns | ||
timestamp: | ||
field: timestamp_ns | ||
precision: ns | ||
partition_by: "" | ||
`)) | ||
if err != nil { | ||
panic(err) | ||
} | ||
defer resp.Body.Close() | ||
body, err := io.ReadAll(resp.Body) | ||
if resp.StatusCode != 200 { | ||
panic(fmt.Errorf("[%d]: %s", resp.StatusCode, string(body))) | ||
} | ||
|
||
bPClient := mbps * 1024 * 1024 / clients | ||
wg := &sync.WaitGroup{} | ||
var working int32 = 1 | ||
t := time.NewTicker(time.Second) | ||
for i := 0; i < clients; i++ { | ||
wg.Add(1) | ||
go func(i int) { | ||
defer wg.Done() | ||
var _wg sync.WaitGroup | ||
for range t.C { | ||
if atomic.LoadInt32(&working) != 1 { | ||
return | ||
} | ||
for i := 0; i < clients; i++ { | ||
_wg.Add(1) | ||
go func() { | ||
defer _wg.Done() | ||
bodyBuilder := strings.Builder{} | ||
for bodyBuilder.Len() < bPClient { | ||
s := fmt.Sprintf( | ||
"{\"timestamp_ns\": %d, \"fingerprint\": 1234567890, \"str\": \"hello %[1]d\", \"value\": 123.456}\n", | ||
time.Now().UnixNano()) | ||
bodyBuilder.WriteString(s) | ||
} | ||
body = []byte(bodyBuilder.String()) | ||
start := time.Now() | ||
res, err := http.Post("http://localhost:8333/quackdb/test/insert", "application/x-ndjson", | ||
bytes.NewReader(body), | ||
) | ||
if err != nil { | ||
panic(err) | ||
} | ||
defer res.Body.Close() | ||
requestDuration.Observe(time.Since(start).Seconds()) | ||
totalRequests.Inc() | ||
totalBytes.Add(float64(len(body))) | ||
}() | ||
} | ||
_wg.Wait() | ||
} | ||
}(i) | ||
} | ||
time.Sleep(timeout) | ||
atomic.StoreInt32(&working, 0) | ||
wg.Wait() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
package config | ||
|
||
import "quackpipe/model" | ||
|
||
var AppFlags *model.CommandLineFlags |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
quack_pipe: | ||
root: /tmp/data | ||
merge_timeout_s: 10 | ||
secret: XXXXXX |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package config | ||
|
||
import ( | ||
"fmt" | ||
"github.com/spf13/viper" | ||
"strings" | ||
) | ||
|
||
type QuackPipeConfiguration struct { | ||
Enabled bool `json:"enabled" mapstructure:"enabled" default:"false"` | ||
Root string `json:"root" mapstructure:"root" default:""` | ||
MergeTimeoutS int `json:"merge_timeout_s" mapstructure:"merge_timeout_s" default:"60"` | ||
Secret string `json:"secret" mapstructure:"secret" default:""` | ||
} | ||
|
||
type Configuration struct { | ||
QuackPipe QuackPipeConfiguration `json:"quack_pipe" mapstructure:"quack_pipe" default:""` | ||
} | ||
|
||
var Config *Configuration | ||
|
||
func InitConfig(file string) { | ||
|
||
viper.SetEnvPrefix("") | ||
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) | ||
viper.AutomaticEnv() | ||
// If a file is provided, use it as the config file | ||
if file != "" { | ||
viper.SetConfigFile(file) | ||
err := viper.ReadInConfig() | ||
if err != nil { | ||
panic(fmt.Errorf("error reading config file: %s", err)) | ||
} | ||
fmt.Println("Using config file:", viper.ConfigFileUsed()) | ||
} else { | ||
fmt.Println("Using environment variables for configuration") | ||
} | ||
|
||
Config = &Configuration{} | ||
err := viper.Unmarshal(Config) | ||
if err != nil { | ||
panic(fmt.Errorf("unable to decode into struct: %s", err)) | ||
} | ||
fmt.Printf("Loaded configuration: %+v\n", Config) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package config | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
) | ||
|
||
func TestInitConfig(t *testing.T) { | ||
InitConfig("config_test.yaml") | ||
fmt.Println(Config) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,104 @@ | ||
package main | ||
|
||
import ( | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"quackpipe/config" | ||
"quackpipe/merge" | ||
"quackpipe/model" | ||
"quackpipe/router" | ||
"strings" | ||
"testing" | ||
"time" | ||
) | ||
|
||
func TestE2E(t *testing.T) { | ||
config.Config = &config.Configuration{ | ||
QuackPipe: config.QuackPipeConfiguration{ | ||
Enabled: true, | ||
Root: "_testdata", | ||
MergeTimeoutS: 10, | ||
Secret: "XXXXXX", | ||
}, | ||
DBPath: "_testdata", | ||
} | ||
config.AppFlags = &model.CommandLineFlags{ | ||
Host: toPtr("localhost"), | ||
Port: toPtr("8123"), | ||
Stdin: toPtr(false), | ||
Alias: toPtr(true), | ||
Format: toPtr(""), | ||
Params: toPtr(""), | ||
DBPath: toPtr("_testdata"), | ||
Config: toPtr(""), | ||
} | ||
go runServer() | ||
time.Sleep(1 * time.Second) | ||
resp, err := http.Post("http://localhost:8123/quackdb/create", "application/x-yaml", | ||
strings.NewReader(`create_table: test | ||
fields: | ||
timestamp_ns: Int64 | ||
fingerprint: Int64 | ||
str: String | ||
value: Float64 | ||
engine: Merge | ||
order_by: | ||
- timestamp_ns | ||
timestamp: | ||
field: timestamp_ns | ||
precision: ns | ||
partition_by: "" | ||
`)) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer resp.Body.Close() | ||
body, err := io.ReadAll(resp.Body) | ||
if resp.StatusCode != 200 { | ||
t.Fatalf("[%d]: %s", resp.StatusCode, string(body)) | ||
} | ||
fmt.Println(string(body)) | ||
|
||
resp, err = http.Post("http://localhost:8123/quackdb/test/insert", "application/x-ndjson", | ||
strings.NewReader( | ||
`{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456} | ||
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456} | ||
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456} | ||
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456} | ||
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456} | ||
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456} | ||
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456} | ||
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456}`, | ||
), | ||
) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer resp.Body.Close() | ||
body, err = io.ReadAll(resp.Body) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
if resp.StatusCode != 200 { | ||
t.Fatalf("[%d]: %s", resp.StatusCode, string(body)) | ||
} | ||
fmt.Println(string(body)) | ||
|
||
return | ||
} | ||
|
||
func toPtr[X any](val X) *X { | ||
return &val | ||
} | ||
|
||
func runServer() { | ||
if config.Config.QuackPipe.Enabled { | ||
merge.Init() | ||
} | ||
r := router.NewRouter(config.AppFlags) | ||
fmt.Printf("QuackPipe API Running: %s:%s\n", *config.AppFlags.Host, *config.AppFlags.Port) | ||
if err := http.ListenAndServe(*config.AppFlags.Host+":"+*config.AppFlags.Port, r); err != nil { | ||
panic(err) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,26 +1,71 @@ | ||
module quackpipe | ||
|
||
go 1.21 | ||
go 1.23 | ||
|
||
toolchain go1.23.2 | ||
|
||
require ( | ||
github.com/apache/arrow/go/v18 v18.0.0-20240829005432-58415d1fac50 | ||
github.com/go-faster/jx v1.1.0 | ||
github.com/google/uuid v1.6.0 | ||
github.com/gorilla/mux v1.8.1 | ||
github.com/marcboeker/go-duckdb v1.8.3 | ||
github.com/prometheus/client_golang v1.20.4 | ||
github.com/spf13/viper v1.18.1 | ||
github.com/stretchr/testify v1.9.0 | ||
github.com/tidwall/btree v1.7.0 | ||
golang.org/x/sync v0.8.0 | ||
gopkg.in/yaml.v3 v3.0.1 | ||
) | ||
|
||
require ( | ||
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect | ||
github.com/andybalholm/brotli v1.1.1 // indirect | ||
github.com/apache/arrow-go/v18 v18.0.0 // indirect | ||
github.com/apache/thrift v0.21.0 // indirect | ||
github.com/beorn7/perks v1.0.1 // indirect | ||
github.com/cespare/xxhash/v2 v2.3.0 // indirect | ||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect | ||
github.com/fsnotify/fsnotify v1.7.0 // indirect | ||
github.com/go-faster/errors v0.6.1 // indirect | ||
github.com/goccy/go-json v0.10.3 // indirect | ||
github.com/golang/snappy v0.0.4 // indirect | ||
github.com/google/flatbuffers v24.3.25+incompatible // indirect | ||
github.com/hashicorp/hcl v1.0.0 // indirect | ||
github.com/klauspost/asmfmt v1.3.2 // indirect | ||
github.com/klauspost/compress v1.17.11 // indirect | ||
github.com/klauspost/cpuid/v2 v2.2.8 // indirect | ||
github.com/magiconair/properties v1.8.7 // indirect | ||
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect | ||
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect | ||
github.com/mitchellh/mapstructure v1.5.0 // indirect | ||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect | ||
github.com/pelletier/go-toml/v2 v2.2.2 // indirect | ||
github.com/pierrec/lz4/v4 v4.1.21 // indirect | ||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect | ||
github.com/prometheus/client_model v0.6.1 // indirect | ||
github.com/prometheus/common v0.55.0 // indirect | ||
github.com/prometheus/procfs v0.15.1 // indirect | ||
github.com/sagikazarmark/locafero v0.4.0 // indirect | ||
github.com/sagikazarmark/slog-shim v0.1.0 // indirect | ||
github.com/segmentio/asm v1.2.0 // indirect | ||
github.com/sourcegraph/conc v0.3.0 // indirect | ||
github.com/spf13/afero v1.11.0 // indirect | ||
github.com/spf13/cast v1.6.0 // indirect | ||
github.com/spf13/pflag v1.0.5 // indirect | ||
github.com/subosito/gotenv v1.6.0 // indirect | ||
github.com/zeebo/xxh3 v1.0.2 // indirect | ||
go.uber.org/atomic v1.9.0 // indirect | ||
go.uber.org/multierr v1.9.0 // indirect | ||
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect | ||
golang.org/x/mod v0.21.0 // indirect | ||
golang.org/x/sync v0.8.0 // indirect | ||
golang.org/x/net v0.30.0 // indirect | ||
golang.org/x/sys v0.26.0 // indirect | ||
golang.org/x/text v0.19.0 // indirect | ||
golang.org/x/tools v0.26.0 // indirect | ||
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect | ||
golang.org/x/xerrors v0.0.0-20240716161551-93cc26a95ae9 // indirect | ||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240903143218-8af14fe29dc1 // indirect | ||
google.golang.org/grpc v1.67.1 // indirect | ||
google.golang.org/protobuf v1.35.1 // indirect | ||
gopkg.in/ini.v1 v1.67.0 // indirect | ||
) |
Oops, something went wrong.