-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathe2e_test.go
104 lines (98 loc) · 2.7 KB
/
e2e_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main
import (
"fmt"
"io"
"net/http"
"quackpipe/config"
"quackpipe/merge"
"quackpipe/model"
"quackpipe/router"
"strings"
"testing"
"time"
)
func TestE2E(t *testing.T) {
config.Config = &config.Configuration{
QuackPipe: config.QuackPipeConfiguration{
Enabled: true,
Root: "_testdata",
MergeTimeoutS: 10,
Secret: "XXXXXX",
},
DBPath: "_testdata",
}
config.AppFlags = &model.CommandLineFlags{
Host: toPtr("localhost"),
Port: toPtr("8123"),
Stdin: toPtr(false),
Alias: toPtr(true),
Format: toPtr(""),
Params: toPtr(""),
DBPath: toPtr("_testdata"),
Config: toPtr(""),
}
go runServer()
time.Sleep(1 * time.Second)
resp, err := http.Post("http://localhost:8123/quackdb/create", "application/x-yaml",
strings.NewReader(`create_table: test
fields:
timestamp_ns: Int64
fingerprint: Int64
str: String
value: Float64
engine: Merge
order_by:
- timestamp_ns
timestamp:
field: timestamp_ns
precision: ns
partition_by: ""
`))
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if resp.StatusCode != 200 {
t.Fatalf("[%d]: %s", resp.StatusCode, string(body))
}
fmt.Println(string(body))
resp, err = http.Post("http://localhost:8123/quackdb/test/insert", "application/x-ndjson",
strings.NewReader(
`{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456}
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456}
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456}
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456}
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456}
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456}
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456}
{"timestamp_ns": 1668326823000000000, "fingerprint": 1234567890, "str": "hello", "value": 123.456}`,
),
)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()
body, err = io.ReadAll(resp.Body)
if err != nil {
t.Fatal(err)
}
if resp.StatusCode != 200 {
t.Fatalf("[%d]: %s", resp.StatusCode, string(body))
}
fmt.Println(string(body))
return
}
func toPtr[X any](val X) *X {
return &val
}
func runServer() {
if config.Config.QuackPipe.Enabled {
merge.Init()
}
r := router.NewRouter(config.AppFlags)
fmt.Printf("QuackPipe API Running: %s:%s\n", *config.AppFlags.Host, *config.AppFlags.Port)
if err := http.ListenAndServe(*config.AppFlags.Host+":"+*config.AppFlags.Port, r); err != nil {
panic(err)
}
}