From 4d527f21120670cdba145ce0b37794b461f7635a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Breda?= Date: Wed, 25 Jan 2023 22:41:40 +0000 Subject: [PATCH] testreplica: Use first available port for TransactionReceiver listener Avoids collisions that happen during parallel executions of tests that use TestReplica. --- cmd/bench/cmd/node.go | 13 +++++---- pkg/deploytest/deployment.go | 27 ++++++++----------- pkg/deploytest/testreplica.go | 21 +++++---------- .../transactionreceiver.go | 18 +++---------- 4 files changed, 29 insertions(+), 50 deletions(-) diff --git a/cmd/bench/cmd/node.go b/cmd/bench/cmd/node.go index f03050f66..f0faa7232 100644 --- a/cmd/bench/cmd/node.go +++ b/cmd/bench/cmd/node.go @@ -8,7 +8,7 @@ import ( "context" "encoding/csv" "fmt" - "net" + gonet "net" "os" "strconv" "time" @@ -172,10 +172,13 @@ func runNode() error { return es.Errorf("could not create node: %w", err) } - txReceiver := transactionreceiver.NewTransactionReceiver(node, "mempool", logger) - if err := txReceiver.Start(TxReceiverBasePort + ownNumericID); err != nil { - return es.Errorf("could not start transaction receiver: %w", err) + txReceiverListener, err := gonet.Listen("tcp", fmt.Sprintf(":%v", TxReceiverBasePort+ownNumericID)) + if err != nil { + return fmt.Errorf("could not create transaction receiver listener: %w", err) } + + txReceiver := transactionreceiver.NewTransactionReceiver(node, "mempool", logger) + txReceiver.Start(txReceiverListener) defer txReceiver.Stop() if err := benchApp.Start(); err != nil { @@ -230,7 +233,7 @@ func getPortStr(addressStr string) (string, error) { return "", err } - _, portStr, err := net.SplitHostPort(addrStr) + _, portStr, err := gonet.SplitHostPort(addrStr) if err != nil { return "", err } diff --git a/pkg/deploytest/deployment.go b/pkg/deploytest/deployment.go index 672ce06d0..9722fe06f 100644 --- a/pkg/deploytest/deployment.go +++ b/pkg/deploytest/deployment.go @@ -10,6 +10,7 @@ import ( "context" "crypto" "fmt" + "net" "path/filepath" "runtime" "sync" @@ -183,9 +184,17 @@ func (d *Deployment) Run(ctx context.Context) (nodeErrors []error, heapObjects i // Start the Mir nodes. nodeWg.Add(len(d.TestReplicas)) + trAddrs := make(map[t.NodeID]string, len(d.TestReplicas)) for i, testReplica := range d.TestReplicas { i, testReplica := i, testReplica + trListener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + nodeErrors[i] = err + return nodeErrors, 0, 0 + } + trAddrs[testReplica.ID] = fmt.Sprintf("127.0.0.1:%v", trListener.Addr().(*net.TCPAddr).Port) + // Start the replica in a separate goroutine. start := make(chan struct{}) go func() { @@ -193,7 +202,7 @@ func (d *Deployment) Run(ctx context.Context) (nodeErrors []error, heapObjects i <-start testReplica.Config.Logger.Log(logging.LevelDebug, "running") - nodeErrors[i] = testReplica.Run(ctx2) + nodeErrors[i] = testReplica.Run(ctx2, trListener) if err := nodeErrors[i]; err != nil { testReplica.Config.Logger.Log(logging.LevelError, "exit with error", "err", errstack.ToString(err)) } else { @@ -220,7 +229,7 @@ func (d *Deployment) Run(ctx context.Context) (nodeErrors []error, heapObjects i go func(c *dummyclient.DummyClient) { defer clientWg.Done() - c.Connect(ctx2, d.localTransactionreceiverAddrs()) + c.Connect(ctx2, trAddrs) submitDummyTransactions(ctx2, c, d.TestConfig.NumNetTXs) c.Disconnect() }(client) @@ -258,20 +267,6 @@ func (d *Deployment) EventLogFiles() map[t.NodeID]string { return logFiles } -// localTransactionreceiverAddrs computes network addresses and ports for the Transactionreceivers at all replicas and returns -// an address map. -// It is assumed that node ID strings must be parseable to decimal numbers. -// Each test replica is on the local machine - 127.0.0.1 -func (d *Deployment) localTransactionreceiverAddrs() map[t.NodeID]string { - - addrs := make(map[t.NodeID]string, len(d.TestReplicas)) - for i, tr := range d.TestReplicas { - addrs[tr.ID] = fmt.Sprintf("127.0.0.1:%d", TXListenPort+i) - } - - return addrs -} - // submitDummyTransactions submits n dummy transactions using client. // It returns when all transactions have been submitted or when ctx is done. func submitDummyTransactions(ctx context.Context, client *dummyclient.DummyClient, n int) { diff --git a/pkg/deploytest/testreplica.go b/pkg/deploytest/testreplica.go index 4c57a4f1e..e66f975ba 100644 --- a/pkg/deploytest/testreplica.go +++ b/pkg/deploytest/testreplica.go @@ -3,8 +3,8 @@ package deploytest import ( "context" "fmt" + gonet "net" "path/filepath" - "strconv" "sync" es "github.com/go-errors/errors" @@ -67,7 +67,7 @@ func (tr *TestReplica) EventLogFile() string { // The function blocks until the replica stops. // The replica stops when stopC is closed. // Run returns the error returned by the run of the underlying Mir node. -func (tr *TestReplica) Run(ctx context.Context) error { +func (tr *TestReplica) Run(ctx context.Context, txReceiverListener gonet.Listener) error { // Initialize recording of events. interceptor, err := eventlog.NewRecorder( @@ -104,17 +104,8 @@ func (tr *TestReplica) Run(ctx context.Context) error { } // Create a Transactionreceiver for transactions coming over the network. - txreceiver := transactionreceiver.NewTransactionReceiver(node, tr.FakeTXDestModule, logging.Decorate(tr.Config.Logger, "TxRec: ")) - - // TODO: do not assume that node IDs are integers. - p, err := strconv.Atoi(tr.ID.Pb()) - if err != nil { - return es.Errorf("error converting node ID %s: %w", tr.ID, err) - } - err = txreceiver.Start(TXListenPort + p) - if err != nil { - return es.Errorf("error starting transaction receiver: %w", err) - } + txReceiver := transactionreceiver.NewTransactionReceiver(node, tr.FakeTXDestModule, logging.Decorate(tr.Config.Logger, "TxRec: ")) + txReceiver.Start(txReceiverListener) // Initialize WaitGroup for the replica's transaction submission thread. var wg sync.WaitGroup @@ -149,8 +140,8 @@ func (tr *TestReplica) Run(ctx context.Context) error { tr.Config.Logger.Log(logging.LevelDebug, "Node run returned!") // Stop the transaction receiver. - txreceiver.Stop() - if err := txreceiver.ServerError(); err != nil { + txReceiver.Stop() + if err := txReceiver.ServerError(); err != nil { return es.Errorf("transaction receiver returned server error: %w", err) } diff --git a/pkg/transactionreceiver/transactionreceiver.go b/pkg/transactionreceiver/transactionreceiver.go index 3e4edf20f..d47cdd65f 100644 --- a/pkg/transactionreceiver/transactionreceiver.go +++ b/pkg/transactionreceiver/transactionreceiver.go @@ -9,7 +9,6 @@ package transactionreceiver import ( "fmt" "net" - "strconv" "sync" es "github.com/go-errors/errors" @@ -113,32 +112,23 @@ func (rr *TransactionReceiver) Listen(srv TransactionReceiver_ListenServer) erro } // Start starts the TransactionReceiver by initializing and starting the internal gRPC server, -// listening on the passed port. +// listening on the passed net.Listener. // Before ths method is called, no client connections are accepted. -func (rr *TransactionReceiver) Start(port int) error { +func (rr *TransactionReceiver) Start(listener net.Listener) { - rr.logger.Log(logging.LevelInfo, fmt.Sprintf("Listening for transaction connections on port %d", port)) + rr.logger.Log(logging.LevelInfo, fmt.Sprintf("Listening for transaction connections on %v", listener.Addr().String())) // Create a gRPC server and assign it the logic of this TransactionReceiver. rr.grpcServer = grpc.NewServer() RegisterTransactionReceiverServer(rr.grpcServer, rr) - // Start listening on the network - conn, err := net.Listen("tcp", ":"+strconv.Itoa(port)) - if err != nil { - return es.Errorf("failed to listen for connections on port %d: %w", port, err) - } - // Start the gRPC server in a separate goroutine. // When the server stops, it will write its exit error into gt.grpcServerError. rr.grpcServerWg.Add(1) go func() { - rr.grpcServerError = rr.grpcServer.Serve(conn) + rr.grpcServerError = rr.grpcServer.Serve(listener) rr.grpcServerWg.Done() }() - - // If we got all the way here, no error occurred. - return nil } // Stop stops the own gRPC server (preventing further incoming connections).