From a5b1aa858a702784b3d31bc2999a1557c874030f Mon Sep 17 00:00:00 2001 From: Zhongyang Xia Date: Thu, 6 Jan 2022 16:00:26 -0500 Subject: [PATCH 1/2] Set concurrency to nproc*2 in SQS broker if < 1 --- v1/brokers/sqs/sqs.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/v1/brokers/sqs/sqs.go b/v1/brokers/sqs/sqs.go index f811e16e5..69d2557eb 100644 --- a/v1/brokers/sqs/sqs.go +++ b/v1/brokers/sqs/sqs.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "runtime" "strings" "sync" "time" @@ -59,6 +60,9 @@ func New(cnf *config.Config) iface.Broker { // StartConsuming enters a loop and waits for incoming messages func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) { + if concurrency < 1 { + concurrency = runtime.NumCPU() * 2 + } b.Broker.StartConsuming(consumerTag, concurrency, taskProcessor) qURL := b.getQueueURL(taskProcessor) //save it so that it can be used later when attempting to delete task From 1b0d65821f3e9f74d7a93432bb550c7f17e02739 Mon Sep 17 00:00:00 2001 From: Zhongyang Xia Date: Fri, 11 Feb 2022 15:56:28 -0500 Subject: [PATCH 2/2] Default concurrency to use GOMAXPROCS in SQS broker --- v1/brokers/sqs/sqs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v1/brokers/sqs/sqs.go b/v1/brokers/sqs/sqs.go index 69d2557eb..66979eb4a 100644 --- a/v1/brokers/sqs/sqs.go +++ b/v1/brokers/sqs/sqs.go @@ -61,7 +61,7 @@ func New(cnf *config.Config) iface.Broker { // StartConsuming enters a loop and waits for incoming messages func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcessor iface.TaskProcessor) (bool, error) { if concurrency < 1 { - concurrency = runtime.NumCPU() * 2 + concurrency = runtime.GOMAXPROCS(0) } b.Broker.StartConsuming(consumerTag, concurrency, taskProcessor) qURL := b.getQueueURL(taskProcessor)