diff --git a/example/rollup-bolt.go b/example/rollup-bolt.go index ab0fd5e..157491c 100644 --- a/example/rollup-bolt.go +++ b/example/rollup-bolt.go @@ -1,7 +1,10 @@ package main import ( + "encoding/json" + "fmt" "github.com/everFinance/goar/types" + ts "github.com/everFinance/turing/example/schema" "github.com/everFinance/turing/rollup" "github.com/everFinance/turing/store/schema" "time" @@ -9,12 +12,34 @@ import ( func main() { tags := []types.Tag{ - {Name: "Owner", Value: "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"}, + {Name: "App", Value: "turing-test"}, + {Name: "Owner", Value: "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"}, } suggestLastArTxId := "" - arOwner := "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM" + arOwner := "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM" arNode := "https://arweave.net" - arWalletKeyPath := "./key.json" + arWalletKeyPath := "./k9s.json" rol := rollup.New(suggestLastArTxId, arNode, "", arWalletKeyPath, arOwner, tags, schema.Config{}) - rol.Run(5*time.Second, 999) + rol.Run(2*time.Minute, 999) + feedData(rol.AddTx()) +} + +func feedData(ch chan<- []byte) { + ticker := time.NewTicker(30 * time.Second) + var cnt int64 + for { + select { + case <-ticker.C: + tx := &ts.Tx{ + Name: fmt.Sprintf("test-%v", cnt), + Timestamp: time.Now().UnixMilli(), + } + data, err := json.Marshal(tx) + if err != nil { + panic(err) + } + cnt += 1 + ch <- data + } + } } diff --git a/example/rollup-s3.go b/example/rollup-s3.go index 06ab7d0..32d7862 100644 --- a/example/rollup-s3.go +++ b/example/rollup-s3.go @@ -1 +1,52 @@ package main + +import ( + "encoding/json" + "fmt" + "github.com/everFinance/goar/types" + ts "github.com/everFinance/turing/example/schema" + "github.com/everFinance/turing/rollup" + "github.com/everFinance/turing/store/schema" + "time" +) + +func main() { + tags := []types.Tag{ + {Name: "App", Value: "turing-s3-test"}, + {Name: "Owner", Value: "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"}, + } + suggestLastArTxId := "" + arOwner := "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM" + arNode := "https://arweave.net" + arWalletKeyPath := "./k9s.json" + cfg := schema.Config{ + UseS3: true, + AccKey: "", + SecretKey: "MOPfuebKVsSTZK7XGq/", + BktPrefix: "turing", + Region: "ap-northeast-1", + } + rol := rollup.New(suggestLastArTxId, arNode, "", arWalletKeyPath, arOwner, tags, cfg) + rol.Run(2*time.Minute, 999) + feedDataS3(rol.AddTx()) +} + +func feedDataS3(ch chan<- []byte) { + ticker := time.NewTicker(30 * time.Second) + var cnt int64 + for { + select { + case <-ticker.C: + tx := &ts.Tx{ + Name: fmt.Sprintf("test-S3-%v", cnt), + Timestamp: time.Now().UnixMilli(), + } + data, err := json.Marshal(tx) + if err != nil { + panic(err) + } + cnt += 1 + ch <- data + } + } +} diff --git a/example/schema/types.go b/example/schema/types.go new file mode 100644 index 0000000..674ccb8 --- /dev/null +++ b/example/schema/types.go @@ -0,0 +1,6 @@ +package schema + +type Tx struct { + Name string `json:"name"` + Timestamp int64 `json:"timestamp"` +} diff --git a/example/tracker-bolt.go b/example/tracker-bolt.go index 1f90fe9..f7bab59 100644 --- a/example/tracker-bolt.go +++ b/example/tracker-bolt.go @@ -1,25 +1,33 @@ package main import ( + "encoding/json" "fmt" "github.com/everFinance/goar/types" + ts "github.com/everFinance/turing/example/schema" "github.com/everFinance/turing/store/schema" + "github.com/everFinance/turing/tracker" ) func main() { tags := []types.Tag{ - {Name: "Owner", Value: "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"}, + {Name: "Owner", Value: "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"}, } - arOwner := "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM" + arOwner := "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM" arNode := "https://arweave.net" arseed := "" - cursor := uint64(7) + cursor := uint64(0) dbCfg := schema.Config{} tr := tracker.New(tags, arNode, arseed, arOwner, dbCfg) tr.Run(cursor) for { - tx := <-tr.SubscribeTx() - fmt.Println(tx.CursorId) + comTx := <-tr.SubscribeTx() + tx := &ts.Tx{} + err := json.Unmarshal(comTx.Data, tx) + if err != nil { + panic(err) + } + fmt.Println(tx) } } diff --git a/example/tracker-s3.go b/example/tracker-s3.go index b0dfa96..7c81f25 100644 --- a/example/tracker-s3.go +++ b/example/tracker-s3.go @@ -17,8 +17,8 @@ func main() { cursor := uint64(40) dbCfg := schema.Config{ UseS3: true, - AccKey: "AKIATZSGGOHI72GMNSO7", - SecretKey: "MOPfueG+mRNHQHoz9GdTq6/CwyybKVsSTZK7XGq/", + AccKey: "", + SecretKey: "MOPfueG+//", BktPrefix: "turing", Region: "ap-northeast-1", } diff --git a/store/kv.go b/store/kv.go index bdccb60..40ff995 100644 --- a/store/kv.go +++ b/store/kv.go @@ -5,10 +5,9 @@ import ( "encoding/json" "errors" "github.com/everFinance/goar/utils" + types "github.com/everFinance/turing/common" "github.com/everFinance/turing/store/rawdb" "github.com/everFinance/turing/store/schema" - - types "github.com/everFinance/turing/common" ) // init log mode @@ -139,7 +138,9 @@ func (kv *Store) PutPoolTokenTxId(txId string) error { func (kv *Store) BatchDelPoolTokenTxId(txs types.Transactions) (err error) { for _, tx := range txs { err = kv.KVDb.Delete(schema.PoolTxIndex, tx.TxId) - return + if err != nil { + return + } } return nil } diff --git a/store/rawdb/database_test.go b/store/rawdb/database_test.go new file mode 100644 index 0000000..8c43845 --- /dev/null +++ b/store/rawdb/database_test.go @@ -0,0 +1,107 @@ +package rawdb + +// func TestBoltDB(t *testing.T) { +// dataPath := "./tmp/seed.db" +// bktName := schema.ConstantsBucket // cne be replaced by any bucket in schema +// keyNum := 100 +// // prepare key&val to test +// keys := make([]string, keyNum) +// values := make([][]byte, keyNum) +// for i := 0; i < keyNum; i++ { +// key := fmt.Sprintf("key%d", i) +// keys[i] = key +// val := fmt.Sprintf("v%d", i) +// values[i] = []byte(val) +// } +// assert.Equal(t, keyNum, len(keys)) +// // create a bolt db +// boltDb, err := NewBoltDB() +// assert.NoError(t, err) +// +// // test Put & Get +// for i := 0; i < keyNum; i++ { +// err = boltDb.Put(bktName, keys[i], values[i]) +// assert.NoError(t, err) +// } +// +// for i := 0; i < keyNum; i++ { +// val, err := boltDb.Get(bktName, keys[i]) +// assert.NoError(t, err) +// assert.Equal(t, values[i], val) +// } +// +// // test GetAllKey from a bucket +// allKeys, err := boltDb.GetAllKey(bktName) +// // GetAllKey return order may different from keys +// sort.Strings(allKeys) +// sort.Strings(keys) +// assert.NoError(t, err) +// assert.Equal(t, keys, allKeys) +// +// // test Delete +// for i := 0; i < keyNum; i++ { +// err = boltDb.Delete(bktName, keys[i]) +// assert.NoError(t, err) +// } +// for i := 0; i < keyNum; i++ { +// _, err = boltDb.Get(bktName, keys[i]) +// assert.Equal(t, err, schema.ErrNotExist) +// } +// } + +// func TestS3DB(t *testing.T) { +// +// bktName := schema.ConstantsBucket // cne be replaced by any bucket in schema +// keyNum := 10 +// // prepare key&val to test +// keys := make([]string, keyNum) +// values := make([][]byte, keyNum) +// for i := 0; i < keyNum; i++ { +// key := fmt.Sprintf("key%d", i) +// keys[i] = key +// val := fmt.Sprintf("v%d", i) +// values[i] = []byte(val) +// } +// assert.Equal(t, keyNum, len(keys)) +// // info that s3 needed +// accKey := "AKIATZSGGOHIV4QTYNH5" // your aws IAM access key +// secretKey := "uw3gKyHIZlaBx8vnCA/BSdNdH+Fi2j4ACoPJawOy" // your aws IAM secret key +// prefix := "arseed" // create empty bucket +// Region := "ap-northeast-1" +// // create S3DB +// s, err := NewS3DB(accKey, secretKey, Region, prefix) +// // if the bucket exist try a complex prefix, because the bucket name is unique in a specific aws region +// assert.NoError(t, err) +// +// // test Put & Get +// for i := 0; i < keyNum; i++ { +// err = s.Put(bktName, keys[i], values[i]) +// assert.NoError(t, err) +// } +// +// for i := 0; i < keyNum; i++ { +// val, err := s.Get(bktName, keys[i]) +// assert.NoError(t, err) +// assert.Equal(t, values[i], val) +// } +// +// // test GetAllKey from a bucket +// allKeys, err := s.GetAllKey(bktName) +// assert.NoError(t, err) +// // GetAllKey return order may different from keys +// sort.Strings(allKeys) +// sort.Strings(keys) +// if len(allKeys) == len(keys) { // maybe s3 bucket not empty before test +// assert.Equal(t, keys, allKeys) +// } +// +// // test Delete +// for i := 0; i < keyNum; i++ { +// err = s.Delete(bktName, keys[i]) +// assert.NoError(t, err) +// } +// for i := 0; i < keyNum; i++ { +// _, err = s.Get(bktName, keys[i]) +// assert.Equal(t, err, schema.ErrNotExist) +// } +// } diff --git a/store/rawdb/s3.go b/store/rawdb/s3.go index d7e84f3..08a03f7 100644 --- a/store/rawdb/s3.go +++ b/store/rawdb/s3.go @@ -10,6 +10,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/everFinance/turing/store/schema" "strings" + "time" ) type S3DB struct { @@ -54,7 +55,12 @@ func (s *S3DB) Put(bucket, key string, value []byte) (err error) { Key: aws.String(key), Body: bytes.NewReader(value), } + var retry uint64 _, err = s.uploader.Upload(uploadInfo) + for err != nil && retry < 5 { + time.Sleep(time.Duration(retry) * time.Second) + _, err = s.uploader.Upload(uploadInfo) + } return }