Skip to content

Commit 2d73331

Browse files
authored
feat: add big key tool (OpenAtomFoundation#2195)
* feat: add big key tool Signed-off-by: sjcsjc123 <1401189096@qq.com> * modify heap to maxHeap Signed-off-by: sjcsjc123 <1401189096@qq.com> * modify comment Signed-off-by: sjcsjc123 <1401189096@qq.com> * add ci test Signed-off-by: sjcsjc123 <1401189096@qq.com> * add compress and decompress Signed-off-by: sjcsjc123 <1401189096@qq.com> * fix ci Signed-off-by: sjcsjc123 <1401189096@qq.com> * fix ci Signed-off-by: sjcsjc123 <1401189096@qq.com> * fix ci Signed-off-by: sjcsjc123 <1401189096@qq.com> * fix ci Signed-off-by: sjcsjc123 <1401189096@qq.com> * fix ci Signed-off-by: sjcsjc123 <1401189096@qq.com> --------- Signed-off-by: sjcsjc123 <1401189096@qq.com>
1 parent 0d3a26f commit 2d73331

File tree

13 files changed

+1261
-3
lines changed

13 files changed

+1261
-3
lines changed

.github/workflows/pika.yml

+9-3
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,9 @@ jobs:
8484
- name: Run Go E2E Tests
8585
working-directory: ${{ github.workspace }}/build
8686
run: |
87-
cd ../tests/integration/
87+
cd ../tools/pika_keys_analysis/
88+
go test -v ./...
89+
cd ../../tests/integration/
8890
chmod +x integrate_test.sh
8991
sh integrate_test.sh
9092
@@ -152,7 +154,9 @@ jobs:
152154
- name: Run Go E2E Tests
153155
working-directory: ${{ github.workspace }}/build
154156
run: |
155-
cd ../tests/integration/
157+
cd ../tools/pika_keys_analysis/
158+
go test -v ./...
159+
cd ../../tests/integration/
156160
chmod +x integrate_test.sh
157161
sh integrate_test.sh
158162
@@ -210,7 +214,9 @@ jobs:
210214
- name: Run Go E2E Tests
211215
working-directory: ${{ github.workspace }}/build
212216
run: |
213-
cd ../tests/integration/
217+
cd ../tools/pika_keys_analysis/
218+
go test -v ./...
219+
cd ../../tests/integration/
214220
chmod +x integrate_test.sh
215221
sh integrate_test.sh
216222

tools/pika_keys_analysis/app.go

+138
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
package pika_keys_analysis
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"strings"
7+
"time"
8+
9+
"github.com/desertbit/grumble"
10+
"github.com/fatih/color"
11+
)
12+
13+
var App = grumble.New(&grumble.Config{
14+
Name: "pika_keys_analysis",
15+
Description: "A tool for analyzing keys in Pika",
16+
HistoryFile: "/tmp/.pika_keys_analysis_history",
17+
Prompt: "pika_keys_analysis > ",
18+
HistoryLimit: 100,
19+
ErrorColor: color.New(color.FgRed, color.Bold, color.Faint),
20+
HelpHeadlineColor: color.New(color.FgGreen),
21+
HelpHeadlineUnderline: false,
22+
HelpSubCommands: true,
23+
PromptColor: color.New(color.FgBlue, color.Bold),
24+
Flags: func(f *grumble.Flags) {},
25+
})
26+
27+
func init() {
28+
App.OnInit(func(a *grumble.App, fm grumble.FlagMap) error {
29+
return nil
30+
})
31+
App.SetPrintASCIILogo(func(a *grumble.App) {
32+
fmt.Println(strings.Join([]string{`
33+
............. .... ..... ..... .....
34+
################# #### ##### ##### #######
35+
#### ##### #### ##### ##### #########
36+
#### ##### #### ##### ##### #### #####
37+
#### ##### #### ##### ##### #### #####
38+
################ #### ##### ##### #### #####
39+
#### #### ##### ##### #################
40+
#### #### ##### ###### ##### #####
41+
#### #### ##### ###### ##### #####
42+
`}, "\r\n"))
43+
})
44+
register(App)
45+
}
46+
47+
func register(app *grumble.App) {
48+
app.AddCommand(&grumble.Command{
49+
Name: "bigKey",
50+
Help: "list the big keys",
51+
LongHelp: "list the big keys",
52+
Run: func(c *grumble.Context) error {
53+
listBigKeys, err := PikaInstance.ListBigKeysByScan(context.Background())
54+
if err != nil {
55+
return err
56+
}
57+
start := time.Now()
58+
for keyType, data := range listBigKeys {
59+
fmt.Printf("Type: %s, Head: %d\n", keyType, Head)
60+
if len(data.GetTopN(Head)) == 0 {
61+
fmt.Println("No big key found")
62+
}
63+
for _, v := range data.GetTopN(Head) {
64+
fmt.Printf("Key : %s, Size: %d, From: %s\n", v.Key, v.UsedSize, v.Client)
65+
}
66+
}
67+
end := time.Now()
68+
if PrintKeyNum {
69+
fmt.Println("Total Key Number:", PikaInstance.GetTotalKeyNumber())
70+
}
71+
fmt.Println("Cost Time:", end.Sub(start))
72+
return nil
73+
},
74+
})
75+
76+
app.AddCommand(&grumble.Command{
77+
Name: "apply",
78+
Help: "Apply the settings to Pika",
79+
LongHelp: "Apply the settings to Pika",
80+
Args: func(a *grumble.Args) {
81+
a.String("filename", "The configuration file")
82+
},
83+
Run: func(c *grumble.Context) error {
84+
filename := c.Args.String("filename")
85+
return Init(filename)
86+
},
87+
})
88+
89+
app.AddCommand(&grumble.Command{
90+
Name: "compress",
91+
Help: "Compress the big keys",
92+
LongHelp: "Compress the big keys and store them to pika",
93+
Args: func(a *grumble.Args) {
94+
a.String("key", "The key to compress")
95+
},
96+
Run: func(c *grumble.Context) error {
97+
key := c.Args.String("key")
98+
return PikaInstance.CompressKey(context.Background(), key)
99+
},
100+
})
101+
102+
app.AddCommand(&grumble.Command{
103+
Name: "decompress",
104+
Help: "Decompress the big keys",
105+
LongHelp: "Decompress the big keys and store them to pika",
106+
Args: func(a *grumble.Args) {
107+
a.String("key", "The key to decompress")
108+
},
109+
Flags: func(f *grumble.Flags) {
110+
f.Bool("s", "save", false, "Save the decompressed value to pika")
111+
},
112+
Run: func(c *grumble.Context) error {
113+
key := c.Args.String("key")
114+
save := c.Flags.Bool("save")
115+
decompressKey, err := PikaInstance.DecompressKey(context.Background(), key, save)
116+
if err != nil {
117+
return err
118+
}
119+
fmt.Printf("Key: %s, Decompress: %s\n", key, decompressKey)
120+
return nil
121+
},
122+
})
123+
124+
app.AddCommand(&grumble.Command{
125+
Name: "recover",
126+
Help: "Recover the big keys",
127+
LongHelp: "Recover the big keys and store them to pika",
128+
Args: func(a *grumble.Args) {
129+
a.String("key", "The key to recover")
130+
a.String("newKey", "The new key to store the recovered value")
131+
},
132+
Run: func(c *grumble.Context) error {
133+
key := c.Args.String("key")
134+
newKey := c.Args.String("newKey")
135+
return PikaInstance.RecoverKey(context.Background(), key, newKey)
136+
},
137+
})
138+
}
+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# What is this?
2+
This is a tool to analyze the keys of a pika cluster.
3+
# How to use?
4+
## 1. Install
5+
```shell
6+
go build -o pika_keys_analysis main.go
7+
```
8+
## 2. Start
9+
```shell
10+
./pika_keys_analysis config.yaml
11+
```
12+
## 3. List big keys
13+
```shell
14+
bigKey
15+
```
16+
## 4. Apply Config
17+
```shell
18+
apply config.yaml
19+
```
20+
## 5. Compress Key
21+
```shell
22+
compress <key>
23+
```
24+
## 6. Decompress Key
25+
- not save to pika
26+
```shell
27+
decompress <key>
28+
```
29+
- save to pika
30+
```shell
31+
decompress -s <key>
32+
```
33+
## 7. Recover Key
34+
```shell
35+
recover <from> <to>
36+
```
37+
# Notice
38+
39+
When using compression and decompression functions, errors in operation may cause duplicate compression or decompression, and the files used for recovery may be overwritten. If they are overwritten, the decompress command can be used to reach a state where decompression cannot continue, and then continue to compress to use the recover command normally
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
pika:
2+
- addr: 127.0.0.1:9221
3+
db: 0
4+
password: ""
5+
6+
- addr: 127.0.0.1:9221
7+
db: 1
8+
password: ""
9+
10+
scan-size: 1000 # scan size per time
11+
concurrency: 1000 # goroutine num
12+
head: 30 # show top head keys
13+
type:
14+
- string
15+
- hash
16+
- list
17+
- set
18+
- zset
19+
memory: 2000 # Memory limit, unit: MB
20+
print: true # Print key number or not, will use keys command
21+
save: ./save/ # Save dir path, will save key value pairs when compress

tools/pika_keys_analysis/cli/main.go

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"os"
6+
"pika/tools/pika_keys_analysis"
7+
8+
"github.com/desertbit/grumble"
9+
)
10+
11+
func main() {
12+
if len(os.Args) != 2 {
13+
fmt.Println("Usage: pika_keys_analysis <config file>")
14+
os.Exit(1)
15+
}
16+
err := pika_keys_analysis.Init(os.Args[1])
17+
if err != nil {
18+
fmt.Println(err)
19+
os.Exit(1)
20+
}
21+
22+
os.Args = os.Args[0:1]
23+
grumble.Main(pika_keys_analysis.App)
24+
}

tools/pika_keys_analysis/compress.go

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package pika_keys_analysis
2+
3+
import (
4+
"bytes"
5+
"compress/gzip"
6+
"io/ioutil"
7+
"os"
8+
"path/filepath"
9+
)
10+
11+
func compress(data []byte) ([]byte, error) {
12+
var compressedData bytes.Buffer
13+
writer := gzip.NewWriter(&compressedData)
14+
15+
_, err := writer.Write(data)
16+
if err != nil {
17+
return nil, err
18+
}
19+
20+
err = writer.Close()
21+
if err != nil {
22+
return nil, err
23+
}
24+
25+
return compressedData.Bytes(), nil
26+
}
27+
28+
func decompress(compressedData []byte) ([]byte, error) {
29+
reader, err := gzip.NewReader(bytes.NewReader(compressedData))
30+
if err != nil {
31+
return nil, err
32+
}
33+
34+
decompressedData, err := ioutil.ReadAll(reader)
35+
if err != nil {
36+
return nil, err
37+
}
38+
39+
err = reader.Close()
40+
if err != nil {
41+
return nil, err
42+
}
43+
44+
return decompressedData, nil
45+
}
46+
47+
func isCompressed(data []byte) bool {
48+
return len(data) > 2 && data[0] == 0x1f && data[1] == 0x8b
49+
}
50+
51+
// saveLocal saves the key-value pair to local file system.
52+
func saveLocal(key []byte, value []byte) error {
53+
_, err := os.ReadDir(Save)
54+
if err != nil {
55+
if os.IsNotExist(err) {
56+
err = os.MkdirAll(Save, 0755)
57+
if err != nil {
58+
return err
59+
}
60+
} else {
61+
return err
62+
}
63+
}
64+
filename := filepath.Join(Save, string(key))
65+
file, err := os.Create(filename)
66+
if err != nil {
67+
return err
68+
}
69+
defer file.Close()
70+
_, err = file.Write(value)
71+
return err
72+
}

tools/pika_keys_analysis/config.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package pika_keys_analysis
2+
3+
import (
4+
"os"
5+
6+
"gopkg.in/yaml.v3"
7+
)
8+
9+
var (
10+
PikaInstance *Pika
11+
ScanSize = 1000
12+
GoroutineNum = 100
13+
Head = 10
14+
Type = []string{"string", "hash", "list", "set", "zset"}
15+
MemoryLimit = 1024 * 1024 * 200
16+
PrintKeyNum = false
17+
Save = "./save/"
18+
)
19+
20+
type Config struct {
21+
PikaConfig []PikaConfig `yaml:"pika"`
22+
Concurrency int `yaml:"concurrency"`
23+
ScanSize int `yaml:"scan-size"`
24+
Head int `yaml:"head"`
25+
MemoryLimit int `yaml:"memory"`
26+
Type []string `yaml:"type"`
27+
PrintKeyNum bool `yaml:"print"`
28+
Save string `yaml:"save"`
29+
}
30+
31+
type PikaConfig struct {
32+
Addr string `yaml:"addr"`
33+
Password string `yaml:"password"`
34+
DB int `yaml:"db"`
35+
}
36+
37+
func Init(filename string) error {
38+
bytes, err := os.ReadFile(filename)
39+
if err != nil {
40+
return err
41+
}
42+
config := Config{}
43+
err = yaml.Unmarshal(bytes, &config)
44+
if err != nil {
45+
return err
46+
}
47+
PikaInstance = NewPika(config.PikaConfig)
48+
ScanSize = config.ScanSize
49+
GoroutineNum = config.Concurrency
50+
Head = config.Head
51+
Type = config.Type
52+
MemoryLimit = config.MemoryLimit * 1024 * 1024
53+
PrintKeyNum = config.PrintKeyNum
54+
Save = config.Save
55+
return nil
56+
}

0 commit comments

Comments
 (0)