Skip to content

Commit

Permalink
test(): rollup&tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
kevin-zhangzh committed Sep 16, 2022
1 parent 748689c commit 95adde4
Show file tree
Hide file tree
Showing 8 changed files with 218 additions and 14 deletions.
33 changes: 29 additions & 4 deletions example/rollup-bolt.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,45 @@
package main

import (
"encoding/json"
"fmt"
"github.com/everFinance/goar/types"
ts "github.com/everFinance/turing/example/schema"
"github.com/everFinance/turing/rollup"
"github.com/everFinance/turing/store/schema"
"time"
)

func main() {
tags := []types.Tag{
{Name: "Owner", Value: "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"},
{Name: "App", Value: "turing-test"},
{Name: "Owner", Value: "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"},
}
suggestLastArTxId := ""
arOwner := "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"
arOwner := "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"
arNode := "https://arweave.net"
arWalletKeyPath := "./key.json"
arWalletKeyPath := "./k9s.json"
rol := rollup.New(suggestLastArTxId, arNode, "", arWalletKeyPath, arOwner, tags, schema.Config{})
rol.Run(5*time.Second, 999)
rol.Run(2*time.Minute, 999)
feedData(rol.AddTx())
}

func feedData(ch chan<- []byte) {
ticker := time.NewTicker(30 * time.Second)
var cnt int64
for {
select {
case <-ticker.C:
tx := &ts.Tx{
Name: fmt.Sprintf("test-%v", cnt),
Timestamp: time.Now().UnixMilli(),
}
data, err := json.Marshal(tx)
if err != nil {
panic(err)
}
cnt += 1
ch <- data
}
}
}
51 changes: 51 additions & 0 deletions example/rollup-s3.go
Original file line number Diff line number Diff line change
@@ -1 +1,52 @@
package main

import (
"encoding/json"
"fmt"
"github.com/everFinance/goar/types"
ts "github.com/everFinance/turing/example/schema"
"github.com/everFinance/turing/rollup"
"github.com/everFinance/turing/store/schema"
"time"
)

func main() {
tags := []types.Tag{
{Name: "App", Value: "turing-s3-test"},
{Name: "Owner", Value: "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"},
}
suggestLastArTxId := ""
arOwner := "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"
arNode := "https://arweave.net"
arWalletKeyPath := "./k9s.json"
cfg := schema.Config{
UseS3: true,
AccKey: "",
SecretKey: "MOPfuebKVsSTZK7XGq/",
BktPrefix: "turing",
Region: "ap-northeast-1",
}
rol := rollup.New(suggestLastArTxId, arNode, "", arWalletKeyPath, arOwner, tags, cfg)
rol.Run(2*time.Minute, 999)
feedDataS3(rol.AddTx())
}

func feedDataS3(ch chan<- []byte) {
ticker := time.NewTicker(30 * time.Second)
var cnt int64
for {
select {
case <-ticker.C:
tx := &ts.Tx{
Name: fmt.Sprintf("test-S3-%v", cnt),
Timestamp: time.Now().UnixMilli(),
}
data, err := json.Marshal(tx)
if err != nil {
panic(err)
}
cnt += 1
ch <- data
}
}
}
6 changes: 6 additions & 0 deletions example/schema/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package schema

type Tx struct {
Name string `json:"name"`
Timestamp int64 `json:"timestamp"`
}
18 changes: 13 additions & 5 deletions example/tracker-bolt.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
package main

import (
"encoding/json"
"fmt"
"github.com/everFinance/goar/types"
ts "github.com/everFinance/turing/example/schema"
"github.com/everFinance/turing/store/schema"

"github.com/everFinance/turing/tracker"
)

func main() {
tags := []types.Tag{
{Name: "Owner", Value: "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"},
{Name: "Owner", Value: "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"},
}
arOwner := "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"
arOwner := "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"
arNode := "https://arweave.net"
arseed := ""
cursor := uint64(7)
cursor := uint64(0)
dbCfg := schema.Config{}
tr := tracker.New(tags, arNode, arseed, arOwner, dbCfg)
tr.Run(cursor)
for {
tx := <-tr.SubscribeTx()
fmt.Println(tx.CursorId)
comTx := <-tr.SubscribeTx()
tx := &ts.Tx{}
err := json.Unmarshal(comTx.Data, tx)
if err != nil {
panic(err)
}
fmt.Println(tx)
}
}
4 changes: 2 additions & 2 deletions example/tracker-s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ func main() {
cursor := uint64(40)
dbCfg := schema.Config{
UseS3: true,
AccKey: "AKIATZSGGOHI72GMNSO7",
SecretKey: "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",
AccKey: "",
SecretKey: "MOPfueG+//",
BktPrefix: "turing",
Region: "ap-northeast-1",
}
Expand Down
7 changes: 4 additions & 3 deletions store/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (
"encoding/json"
"errors"
"github.com/everFinance/goar/utils"
types "github.com/everFinance/turing/common"
"github.com/everFinance/turing/store/rawdb"
"github.com/everFinance/turing/store/schema"

types "github.com/everFinance/turing/common"
)

// init log mode
Expand Down Expand Up @@ -139,7 +138,9 @@ func (kv *Store) PutPoolTokenTxId(txId string) error {
func (kv *Store) BatchDelPoolTokenTxId(txs types.Transactions) (err error) {
for _, tx := range txs {
err = kv.KVDb.Delete(schema.PoolTxIndex, tx.TxId)
return
if err != nil {
return
}
}
return nil
}
Expand Down
107 changes: 107 additions & 0 deletions store/rawdb/database_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package rawdb

// func TestBoltDB(t *testing.T) {
// dataPath := "./tmp/seed.db"
// bktName := schema.ConstantsBucket // cne be replaced by any bucket in schema
// keyNum := 100
// // prepare key&val to test
// keys := make([]string, keyNum)
// values := make([][]byte, keyNum)
// for i := 0; i < keyNum; i++ {
// key := fmt.Sprintf("key%d", i)
// keys[i] = key
// val := fmt.Sprintf("v%d", i)
// values[i] = []byte(val)
// }
// assert.Equal(t, keyNum, len(keys))
// // create a bolt db
// boltDb, err := NewBoltDB()
// assert.NoError(t, err)
//
// // test Put & Get
// for i := 0; i < keyNum; i++ {
// err = boltDb.Put(bktName, keys[i], values[i])
// assert.NoError(t, err)
// }
//
// for i := 0; i < keyNum; i++ {
// val, err := boltDb.Get(bktName, keys[i])
// assert.NoError(t, err)
// assert.Equal(t, values[i], val)
// }
//
// // test GetAllKey from a bucket
// allKeys, err := boltDb.GetAllKey(bktName)
// // GetAllKey return order may different from keys
// sort.Strings(allKeys)
// sort.Strings(keys)
// assert.NoError(t, err)
// assert.Equal(t, keys, allKeys)
//
// // test Delete
// for i := 0; i < keyNum; i++ {
// err = boltDb.Delete(bktName, keys[i])
// assert.NoError(t, err)
// }
// for i := 0; i < keyNum; i++ {
// _, err = boltDb.Get(bktName, keys[i])
// assert.Equal(t, err, schema.ErrNotExist)
// }
// }

// func TestS3DB(t *testing.T) {
//
// bktName := schema.ConstantsBucket // cne be replaced by any bucket in schema
// keyNum := 10
// // prepare key&val to test
// keys := make([]string, keyNum)
// values := make([][]byte, keyNum)
// for i := 0; i < keyNum; i++ {
// key := fmt.Sprintf("key%d", i)
// keys[i] = key
// val := fmt.Sprintf("v%d", i)
// values[i] = []byte(val)
// }
// assert.Equal(t, keyNum, len(keys))
// // info that s3 needed
// accKey := "AKIATZSGGOHIV4QTYNH5" // your aws IAM access key
// secretKey := "uw3gKyHIZlaBx8vnCA/BSdNdH+Fi2j4ACoPJawOy" // your aws IAM secret key
// prefix := "arseed" // create empty bucket
// Region := "ap-northeast-1"
// // create S3DB
// s, err := NewS3DB(accKey, secretKey, Region, prefix)
// // if the bucket exist try a complex prefix, because the bucket name is unique in a specific aws region
// assert.NoError(t, err)
//
// // test Put & Get
// for i := 0; i < keyNum; i++ {
// err = s.Put(bktName, keys[i], values[i])
// assert.NoError(t, err)
// }
//
// for i := 0; i < keyNum; i++ {
// val, err := s.Get(bktName, keys[i])
// assert.NoError(t, err)
// assert.Equal(t, values[i], val)
// }
//
// // test GetAllKey from a bucket
// allKeys, err := s.GetAllKey(bktName)
// assert.NoError(t, err)
// // GetAllKey return order may different from keys
// sort.Strings(allKeys)
// sort.Strings(keys)
// if len(allKeys) == len(keys) { // maybe s3 bucket not empty before test
// assert.Equal(t, keys, allKeys)
// }
//
// // test Delete
// for i := 0; i < keyNum; i++ {
// err = s.Delete(bktName, keys[i])
// assert.NoError(t, err)
// }
// for i := 0; i < keyNum; i++ {
// _, err = s.Get(bktName, keys[i])
// assert.Equal(t, err, schema.ErrNotExist)
// }
// }
6 changes: 6 additions & 0 deletions store/rawdb/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/everFinance/turing/store/schema"
"strings"
"time"
)

type S3DB struct {
Expand Down Expand Up @@ -54,7 +55,12 @@ func (s *S3DB) Put(bucket, key string, value []byte) (err error) {
Key: aws.String(key),
Body: bytes.NewReader(value),
}
var retry uint64
_, err = s.uploader.Upload(uploadInfo)
for err != nil && retry < 5 {
time.Sleep(time.Duration(retry) * time.Second)
_, err = s.uploader.Upload(uploadInfo)
}
return
}

Expand Down

0 comments on commit 95adde4

Please sign in to comment.