From f84ced215237205e0a115fb2614efc93aafaf740 Mon Sep 17 00:00:00 2001 From: tomasmik Date: Thu, 21 Oct 2021 14:44:12 +0300 Subject: [PATCH] Add a wait timer after each transaction --- transfer/multichain_queue_test.go | 4 ++-- transfer/queue.go | 23 +++++++++++++++++++---- transfer/queue_test.go | 6 +++--- 3 files changed, 24 insertions(+), 9 deletions(-) diff --git a/transfer/multichain_queue_test.go b/transfer/multichain_queue_test.go index 565abef..750d2ad 100644 --- a/transfer/multichain_queue_test.go +++ b/transfer/multichain_queue_test.go @@ -62,8 +62,8 @@ func TestMultichainQueue(t *testing.T) { }) t.Run("chain does not exist", func(t *testing.T) { mq := NewMultichainQueue() - q1 := NewQueue(1) - q2 := NewQueue(1) + q1 := NewQueue(1, 0) + q2 := NewQueue(1, 0) mq.AddQueue(1, q1) mq.AddQueue(2, q2) go q1.Run() diff --git a/transfer/queue.go b/transfer/queue.go index afafaed..877e97d 100644 --- a/transfer/queue.go +++ b/transfer/queue.go @@ -20,6 +20,7 @@ package transfer import ( "errors" "sync" + "time" "github.com/ethereum/go-ethereum/core/types" ) @@ -30,7 +31,8 @@ type Queue struct { queue chan queueEntry stop chan struct{} - once sync.Once + debounce time.Duration + once sync.Once } // ErrQueueClosed is returned when queue is closed and transaction was not processed. @@ -46,10 +48,11 @@ type queueEntry struct { // NewQueue returns a new queue. Size for the queue can be given // so that more than 1 transaction could be queued at a time. -func NewQueue(size uint) *Queue { +func NewQueue(size uint, debounceTime time.Duration) *Queue { return &Queue{ - queue: make(chan queueEntry, size), - stop: make(chan struct{}, 0), + queue: make(chan queueEntry, size), + debounce: debounceTime, + stop: make(chan struct{}, 0), } } @@ -67,10 +70,22 @@ func (q *Queue) Run() { case entry := <-q.queue: tx, err := entry.exec() q.resp(tx, err, entry.resp) + q.waitAfterSend() } } } +func (q *Queue) waitAfterSend() { + if q.debounce == 0 { + return + } + + select { + case <-q.stop: + case <-time.After(q.debounce): + } +} + func (q *Queue) resp(tx *types.Transaction, err error, ch chan<- QueueResponse) { ch <- QueueResponse{ tx: tx, diff --git a/transfer/queue_test.go b/transfer/queue_test.go index a52d87a..8bd0946 100644 --- a/transfer/queue_test.go +++ b/transfer/queue_test.go @@ -12,7 +12,7 @@ import ( func TestQueue(t *testing.T) { t.Run("green path", func(t *testing.T) { - q := NewQueue(1) + q := NewQueue(1, 0) go q.Run() defer q.Stop() fn := func() (*types.Transaction, error) { @@ -28,7 +28,7 @@ func TestQueue(t *testing.T) { assert.Equal(t, uint64(0x1), tx.Nonce()) }) t.Run("exec produced an error", func(t *testing.T) { - q := NewQueue(1) + q := NewQueue(1, 0) go q.Run() defer q.Stop() fn := func() (*types.Transaction, error) { @@ -43,7 +43,7 @@ func TestQueue(t *testing.T) { assert.Nil(t, tx) }) t.Run("closed queue returns an error", func(t *testing.T) { - q := NewQueue(1) + q := NewQueue(1, 0) q.Stop() fn := func() (*types.Transaction, error) { return types.NewTransaction(1, common.Address{}, big.NewInt(1), 10, big.NewInt(1), []byte{}), nil