Skip to content

Commit

Permalink
Remove use of AWS mocks (prepping for v2)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Jan 30, 2024
1 parent b16f01a commit 3a7dfc1
Show file tree
Hide file tree
Showing 20 changed files with 65 additions and 50 deletions.
5 changes: 2 additions & 3 deletions internal/impl/aws/cache_dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/aws/aws-sdk-go/service/dynamodb/expression"
"github.com/cenkalti/backoff/v4"

Expand Down Expand Up @@ -127,7 +126,7 @@ func newDynamodbCacheFromConfig(conf *service.ParsedConfig) (*dynamodbCache, err
//------------------------------------------------------------------------------

type dynamodbCache struct {
client dynamodbiface.DynamoDBAPI
client dynamoDBAPI

table *string
hashKey string
Expand All @@ -140,7 +139,7 @@ type dynamodbCache struct {
}

func newDynamodbCache(
client dynamodbiface.DynamoDBAPI,
client dynamoDBAPI,
table, hashKey, dataKey string,
consistentRead bool,
ttlKey *string, ttl *time.Duration,
Expand Down
5 changes: 2 additions & 3 deletions internal/impl/aws/cache_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/cenkalti/backoff/v4"

"github.com/benthosdev/benthos/v4/internal/impl/aws/config"
Expand Down Expand Up @@ -94,15 +93,15 @@ func newS3CacheFromConfig(conf *service.ParsedConfig) (*s3Cache, error) {
//------------------------------------------------------------------------------

type s3Cache struct {
s3 s3iface.S3API
s3 *s3.S3

bucket string
contentType string

boffPool sync.Pool
}

func newS3Cache(bucket, contentType string, backOff *backoff.ExponentialBackOff, s3 s3iface.S3API) *s3Cache {
func newS3Cache(bucket, contentType string, backOff *backoff.ExponentialBackOff, s3 *s3.S3) *s3Cache {
return &s3Cache{
s3: s3,

Expand Down
3 changes: 1 addition & 2 deletions internal/impl/aws/input_kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"github.com/cenkalti/backoff/v4"
"github.com/gofrs/uuid"

Expand Down Expand Up @@ -181,7 +180,7 @@ type kinesisReader struct {

boffPool sync.Pool

svc kinesisiface.KinesisAPI
svc *kinesis.Kinesis
checkpointer *awsKinesisCheckpointer

streamShards map[string][]string
Expand Down
3 changes: 1 addition & 2 deletions internal/impl/aws/input_kinesis_checkpointer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"

"github.com/benthosdev/benthos/v4/public/service"
)
Expand Down Expand Up @@ -57,7 +56,7 @@ type awsKinesisCheckpointer struct {
clientID string
leaseDuration time.Duration
commitPeriod time.Duration
svc dynamodbiface.DynamoDBAPI
svc *dynamodb.DynamoDB
}

// newAWSKinesisCheckpointer creates a new DynamoDB checkpointer from an AWS
Expand Down
10 changes: 8 additions & 2 deletions internal/impl/aws/input_sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/cenkalti/backoff/v4"

"github.com/benthosdev/benthos/v4/internal/component"
Expand Down Expand Up @@ -130,11 +129,18 @@ func init() {

//------------------------------------------------------------------------------

type sqsAPI interface {
ReceiveMessageWithContext(aws.Context, *sqs.ReceiveMessageInput, ...request.Option) (*sqs.ReceiveMessageOutput, error)
DeleteMessageBatchWithContext(aws.Context, *sqs.DeleteMessageBatchInput, ...request.Option) (*sqs.DeleteMessageBatchOutput, error)
ChangeMessageVisibilityBatchWithContext(aws.Context, *sqs.ChangeMessageVisibilityBatchInput, ...request.Option) (*sqs.ChangeMessageVisibilityBatchOutput, error)
SendMessageBatchWithContext(aws.Context, *sqs.SendMessageBatchInput, ...request.Option) (*sqs.SendMessageBatchOutput, error)
}

type awsSQSReader struct {
conf sqsiConfig

session *session.Session
sqs sqsiface.SQSAPI
sqs sqsAPI

messagesChan chan *sqs.Message
ackMessagesChan chan sqsMessageHandle
Expand Down
3 changes: 1 addition & 2 deletions internal/impl/aws/input_sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ import (
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/stretchr/testify/require"
)

type mockSqsInput struct {
sqsiface.SQSAPI
sqsAPI

mtx chan struct{}
queueTimeout int
Expand Down
9 changes: 6 additions & 3 deletions internal/impl/aws/metrics_cloudwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"

"github.com/benthosdev/benthos/v4/internal/component/metrics"
"github.com/benthosdev/benthos/v4/internal/impl/aws/config"
Expand Down Expand Up @@ -278,8 +277,12 @@ func (c *cloudWatchGaugeVec) With(labelValues ...string) metrics.StatGauge {

//------------------------------------------------------------------------------

type cloudWatchAPI interface {
PutMetricDataWithContext(aws.Context, *cloudwatch.PutMetricDataInput, ...request.Option) (*cloudwatch.PutMetricDataOutput, error)
}

type cwMetrics struct {
client cloudwatchiface.CloudWatchAPI
client cloudWatchAPI

datumses map[string]*cloudWatchDatum
datumLock *sync.Mutex
Expand Down Expand Up @@ -485,7 +488,7 @@ func (c *cwMetrics) flush() error {
}
throttled = false

if _, err := c.client.PutMetricData(&input); err != nil {
if _, err := c.client.PutMetricDataWithContext(context.Background(), &input); err != nil {
if request.IsErrorThrottle(err) {
throttled = true
c.log.Warn("Metrics request was throttled. Either increase flush period or reduce number of services sending metrics.")
Expand Down
8 changes: 4 additions & 4 deletions internal/impl/aws/metrics_cloudwatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
"github.com/stretchr/testify/assert"
)

type mockCloudWatchClient struct {
cloudwatchiface.CloudWatchAPI
errs []error

inputs []cloudwatch.PutMetricDataInput
}

func cwmMock(svc cloudwatchiface.CloudWatchAPI) *cwMetrics {
func cwmMock(svc cloudWatchAPI) *cwMetrics {
return &cwMetrics{
config: cwmConfig{Namespace: "Benthos", FlushPeriod: 100 * time.Millisecond},
datumses: map[string]*cloudWatchDatum{},
Expand All @@ -29,7 +29,7 @@ func cwmMock(svc cloudwatchiface.CloudWatchAPI) *cwMetrics {
}
}

func (m *mockCloudWatchClient) PutMetricData(input *cloudwatch.PutMetricDataInput) (*cloudwatch.PutMetricDataOutput, error) {
func (m *mockCloudWatchClient) PutMetricDataWithContext(ctx aws.Context, input *cloudwatch.PutMetricDataInput, opts ...request.Option) (*cloudwatch.PutMetricDataOutput, error) {
m.inputs = append(m.inputs, *input)
if len(m.errs) > 0 {
err := m.errs[0]
Expand Down
13 changes: 11 additions & 2 deletions internal/impl/aws/output_dynamodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

"github.com/Jeffail/gabs/v2"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/cenkalti/backoff/v4"
"github.com/google/go-cmp/cmp"

Expand Down Expand Up @@ -171,8 +171,17 @@ func init() {
}
}

type dynamoDBAPI interface {
PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error)
BatchWriteItem(input *dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error)
BatchExecuteStatementWithContext(aws.Context, *dynamodb.BatchExecuteStatementInput, ...request.Option) (*dynamodb.BatchExecuteStatementOutput, error)
DescribeTable(*dynamodb.DescribeTableInput) (*dynamodb.DescribeTableOutput, error)
GetItem(*dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error)
DeleteItem(*dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error)
}

type dynamoDBWriter struct {
client dynamodbiface.DynamoDBAPI
client dynamoDBAPI
conf ddboConfig
log *service.Logger

Expand Down
3 changes: 1 addition & 2 deletions internal/impl/aws/output_dynamodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/benthosdev/benthos/v4/public/service"
)

type mockDynamoDB struct {
dynamodbiface.DynamoDBAPI
dynamoDBAPI
fn func(*dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error)
batchFn func(*dynamodb.BatchWriteItemInput) (*dynamodb.BatchWriteItemOutput, error)
}
Expand Down
7 changes: 5 additions & 2 deletions internal/impl/aws/output_kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"github.com/cenkalti/backoff/v4"

"github.com/benthosdev/benthos/v4/internal/component"
Expand Down Expand Up @@ -111,9 +110,13 @@ const (
mebibyte = 1048576
)

type kinesisAPI interface {
PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
}

type kinesisWriter struct {
conf koConfig
kinesis kinesisiface.KinesisAPI
kinesis kinesisAPI
log *service.Logger
}

Expand Down
8 changes: 6 additions & 2 deletions internal/impl/aws/output_kinesis_firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/aws/aws-sdk-go/service/firehose/firehoseiface"
"github.com/cenkalti/backoff/v4"

"github.com/benthosdev/benthos/v4/internal/component"
Expand Down Expand Up @@ -91,8 +90,13 @@ func init() {
}
}

type firehoseAPI interface {
DescribeDeliveryStream(*firehose.DescribeDeliveryStreamInput) (*firehose.DescribeDeliveryStreamOutput, error)
PutRecordBatch(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error)
}

type kinesisFirehoseWriter struct {
firehose firehoseiface.FirehoseAPI
firehose firehoseAPI

conf kfoConfig
log *service.Logger
Expand Down
3 changes: 1 addition & 2 deletions internal/impl/aws/output_kinesis_firehose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/firehose"
"github.com/aws/aws-sdk-go/service/firehose/firehoseiface"
"github.com/cenkalti/backoff/v4"
"github.com/stretchr/testify/require"

"github.com/benthosdev/benthos/v4/public/service"
)

type mockKinesisFirehose struct {
firehoseiface.FirehoseAPI
firehoseAPI
fn func(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error)
}

Expand Down
2 changes: 0 additions & 2 deletions internal/impl/aws/output_kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/benthosdev/benthos/v4/public/service"
)

type mockKinesis struct {
kinesisiface.KinesisAPI
fn func(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
}

Expand Down
5 changes: 2 additions & 3 deletions internal/impl/aws/output_sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/cenkalti/backoff/v4"

"github.com/benthosdev/benthos/v4/internal/component"
Expand Down Expand Up @@ -136,7 +135,7 @@ func init() {

type sqsWriter struct {
conf sqsoConfig
sqs sqsiface.SQSAPI
sqs sqsAPI

closer sync.Once
closeChan chan struct{}
Expand Down Expand Up @@ -296,7 +295,7 @@ func (a *sqsWriter) WriteBatch(ctx context.Context, batch service.MessageBatch)
wait := backOff.NextBackOff()

var batchResult *sqs.SendMessageBatchOutput
if batchResult, err = a.sqs.SendMessageBatch(input); err != nil {
if batchResult, err = a.sqs.SendMessageBatchWithContext(ctx, input); err != nil {
a.log.Warnf("SQS error: %v\n", err)
// bail if a message is too large or all retry attempts expired
if wait == backoff.Stop {
Expand Down
6 changes: 3 additions & 3 deletions internal/impl/aws/output_sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/aws/aws-sdk-go/service/sqs/sqsiface"
"github.com/cenkalti/backoff/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -111,11 +111,11 @@ func TestSQSHeaderCheck(t *testing.T) {
}

type mockSqs struct {
sqsiface.SQSAPI
sqsAPI
fn func(*sqs.SendMessageBatchInput) (*sqs.SendMessageBatchOutput, error)
}

func (m *mockSqs) SendMessageBatch(input *sqs.SendMessageBatchInput) (*sqs.SendMessageBatchOutput, error) {
func (m *mockSqs) SendMessageBatchWithContext(ctx aws.Context, input *sqs.SendMessageBatchInput, opts ...request.Option) (*sqs.SendMessageBatchOutput, error) {
return m.fn(input)
}

Expand Down
5 changes: 2 additions & 3 deletions internal/impl/aws/processor_dynamodb_partiql.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"

"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/mitchellh/mapstructure"

"github.com/benthosdev/benthos/v4/internal/impl/aws/config"
Expand Down Expand Up @@ -81,7 +80,7 @@ pipeline:

type dynamoDBPartiQL struct {
logger *service.Logger
client dynamodbiface.DynamoDBAPI
client dynamoDBAPI

query string
dynQuery *service.InterpolatedString
Expand All @@ -90,7 +89,7 @@ type dynamoDBPartiQL struct {

func newDynamoDBPartiQL(
logger *service.Logger,
client dynamodbiface.DynamoDBAPI,
client dynamoDBAPI,
query string,
dynQuery *service.InterpolatedString,
args *bloblang.Executor,
Expand Down
Loading

0 comments on commit 3a7dfc1

Please sign in to comment.