Skip to content

Commit

Permalink
add: opensearch output
Browse files Browse the repository at this point in the history
  • Loading branch information
sugihara1997 committed Jan 31, 2024
1 parent 53c1ee8 commit 03b3b29
Show file tree
Hide file tree
Showing 8 changed files with 405 additions and 253 deletions.
59 changes: 53 additions & 6 deletions application/export_change_streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,18 @@ import (
"cloud.google.com/go/pubsub"
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/cam-inc/mxtransporter/config"
opensearchConfig "github.com/cam-inc/mxtransporter/config/opensearch"
pconfig "github.com/cam-inc/mxtransporter/config/pubsub"
interfaceForBigquery "github.com/cam-inc/mxtransporter/interfaces/bigquery"
iff "github.com/cam-inc/mxtransporter/interfaces/file"
interfaceForKinesisStream "github.com/cam-inc/mxtransporter/interfaces/kinesis-stream"
mongoConnection "github.com/cam-inc/mxtransporter/interfaces/mongo"
interfaceForOpenSearch "github.com/cam-inc/mxtransporter/interfaces/opensearch"
interfaceForPubsub "github.com/cam-inc/mxtransporter/interfaces/pubsub"
"github.com/cam-inc/mxtransporter/pkg/client"
"github.com/cam-inc/mxtransporter/pkg/errors"
irt "github.com/cam-inc/mxtransporter/usecases/resume-token"
"github.com/opensearch-project/opensearch-go/v3/opensearchapi"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
Expand All @@ -32,6 +35,7 @@ const (
BigQuery agent = "bigquery"
CloudPubSub agent = "pubsub"
KinesisStream agent = "kinesisStream"
OpenSearch agent = "opensearch"
File agent = "file"
)

Expand All @@ -40,6 +44,7 @@ type (
newBigqueryClient(ctx context.Context, projectID string) (*bigquery.Client, error)
newPubsubClient(ctx context.Context, projectID string) (*pubsub.Client, error)
newKinesisClient(ctx context.Context) (*kinesis.Client, error)
newOpenSearchClient(ctx context.Context) (*opensearchapi.Client, error)
watch(ctx context.Context, ops *options.ChangeStreamOptions) (*mongo.ChangeStream, error)
newFileClient(ctx context.Context) (iff.Exporter, error)
setCsExporter(exporter ChangeStreamsExporterImpl)
Expand Down Expand Up @@ -82,6 +87,14 @@ func (*ChangeStreamsWatcherClientImpl) newKinesisClient(ctx context.Context) (*k
return ksClient, nil
}

func (*ChangeStreamsWatcherClientImpl) newOpenSearchClient(ctx context.Context) (*opensearchapi.Client, error) {
osClient, err := client.NewOpenSearchClient(ctx)
if err != nil {
return nil, err
}
return osClient, nil
}

func (*ChangeStreamsWatcherClientImpl) newFileClient(_ context.Context) (iff.Exporter, error) {
return iff.New(config.FileExportConfig()), nil
}
Expand Down Expand Up @@ -147,6 +160,7 @@ func (c *ChangeStreamsWatcherImpl) WatchChangeStreams(ctx context.Context) error
bqImpl interfaceForBigquery.BigqueryImpl
psImpl interfaceForPubsub.PubsubImpl
ksImpl interfaceForKinesisStream.KinesisStreamImpl
osImpl interfaceForOpenSearch.OpenSearchImpl
fe iff.Exporter
)

Expand All @@ -158,22 +172,44 @@ func (c *ChangeStreamsWatcherImpl) WatchChangeStreams(ctx context.Context) error
if err != nil {
return err
}
bqClientImpl := &interfaceForBigquery.BigqueryClientImpl{bqClient}
bqImpl = interfaceForBigquery.BigqueryImpl{bqClientImpl}
bqClientImpl := &interfaceForBigquery.BigqueryClientImpl{BqClient: bqClient}
bqImpl = interfaceForBigquery.BigqueryImpl{Bq: bqClientImpl}
case CloudPubSub:
psClient, err := c.Watcher.newPubsubClient(ctx, projectID)
if err != nil {
return err
}
psClientImpl := &interfaceForPubsub.PubsubClientImpl{psClient, c.Log}
psImpl = interfaceForPubsub.PubsubImpl{psClientImpl, c.Log, pconfig.PubSubConfig().OrderingBy}
psClientImpl := &interfaceForPubsub.PubsubClientImpl{PubsubClient: psClient, Log: c.Log}
psImpl = interfaceForPubsub.PubsubImpl{Pubsub: psClientImpl, Log: c.Log, OrderingBy: pconfig.PubSubConfig().OrderingBy}
case KinesisStream:
ksClient, err := c.Watcher.newKinesisClient(ctx)
if err != nil {
return err
}
ksClientImpl := &interfaceForKinesisStream.KinesisStreamClientImpl{ksClient}
ksImpl = interfaceForKinesisStream.KinesisStreamImpl{ksClientImpl}
ksClientImpl := &interfaceForKinesisStream.KinesisStreamClientImpl{KinesisStreamClient: ksClient}
ksImpl = interfaceForKinesisStream.KinesisStreamImpl{KinesisStream: ksClientImpl}
case OpenSearch:
osClient, err := c.Watcher.newOpenSearchClient(ctx)
if err != nil {
return err
}

osImpl = interfaceForOpenSearch.OpenSearchImpl{}

osCfg := opensearchConfig.OpenSearchConfig()
if osCfg.BulkEnabled {
bi, err := osImpl.NewBulkIndexer(ctx, osClient)
if err != nil {
return err
}
osImpl.OpenSearchBulkIndexer = bi
} else {
si, err := osImpl.NewSingleIndexer(osClient)
if err != nil {
return err
}
osImpl.OpenSearchSingleIndexer = si
}
case File:
fCli, err := c.Watcher.newFileClient(ctx)
if err != nil {
Expand All @@ -190,6 +226,7 @@ func (c *ChangeStreamsWatcherImpl) WatchChangeStreams(ctx context.Context) error
bq: bqImpl,
pubsub: psImpl,
kinesisStream: ksImpl,
opensearch: osImpl,
fileExporter: fe,
resumeToken: c.resumeTokenManager,
}
Expand All @@ -215,6 +252,7 @@ type (
exportToBigquery(ctx context.Context, cs primitive.M) error
exportToPubsub(ctx context.Context, cs primitive.M) error
exportToKinesisStream(ctx context.Context, cs primitive.M) error
exportToOpenSearch(ctx context.Context, cs primitive.M) error
exportToFile(ctx context.Context, cs primitive.M) error
saveResumeToken(ctx context.Context, rt string) error
err() error
Expand All @@ -230,6 +268,7 @@ type (
bq interfaceForBigquery.BigqueryImpl
pubsub interfaceForPubsub.PubsubImpl
kinesisStream interfaceForKinesisStream.KinesisStreamImpl
opensearch interfaceForOpenSearch.OpenSearchImpl
fileExporter iff.Exporter
resumeToken irt.ResumeToken
}
Expand Down Expand Up @@ -264,6 +303,10 @@ func (c *changeStreamsExporterClientImpl) exportToKinesisStream(ctx context.Cont
return c.kinesisStream.ExportToKinesisStream(ctx, cs)
}

func (c *changeStreamsExporterClientImpl) exportToOpenSearch(ctx context.Context, cs primitive.M) error {
return c.opensearch.ExportToOpenSearch(ctx, cs)
}

func (c *changeStreamsExporterClientImpl) exportToFile(ctx context.Context, cs primitive.M) error {
return c.fileExporter.Export(ctx, cs)
}
Expand Down Expand Up @@ -316,6 +359,10 @@ func (c *ChangeStreamsExporterImpl) exportChangeStreams(ctx context.Context) err
if err := c.exporter.exportToKinesisStream(ctx, csMap); err != nil {
return err
}
case OpenSearch:
if err := c.exporter.exportToOpenSearch(ctx, csMap); err != nil {
return err
}
case File:
if err := c.exporter.exportToFile(ctx, csMap); err != nil {
return err
Expand Down
17 changes: 10 additions & 7 deletions config/constant/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ const (
PUBSUB_TOPIC_NAME = "PUBSUB_TOPIC_NAME"
PUBSUB_ORDERING_BY = "PUBSUB_ORDERING_BY"

OPENSEARCH_HOST = "OPENSEARCH_HOST"
OPENSEARCH_INDEX_NAME = "OPENSEARCH_INDEX_NAME"
OPENSEARCH_SYNC_ENABLED = "OPENSEARCH_SYNC_ENABLED"
OPENSEARCH_BULK_ENABLED = "OPENSEARCH_BULK_ENABLED"
OPENSEARCH_BULK_FLUSH_BYTES = "OPENSEARCH_BULK_FLUSH_BYTES"
OPENSEARCH_BULK_FLUSH_INTERVAL_SECONDS = "OPENSEARCH_BULK_FLUSH_INTERVAL_SECONDS"
OPENSEARCH_BULK_SYNC_AGGREGATION_ENABLED = "OPENSEARCH_BULK_SYNC_AGGREGATION_ENABLED"
OPENSEARCH_END_POINT = "OPENSEARCH_END_POINT"
OPENSEARCH_INDEX_NAME = "OPENSEARCH_INDEX_NAME"
OPENSEARCH_SYNC_ENABLED = "OPENSEARCH_SYNC_ENABLED"
OPENSEARCH_BULK_ENABLED = "OPENSEARCH_BULK_ENABLED"
OPENSEARCH_BULK_FLUSH_BYTES = "OPENSEARCH_BULK_FLUSH_BYTES"
OPENSEARCH_BULK_FLUSH_INTERVAL_SECONDS = "OPENSEARCH_BULK_FLUSH_INTERVAL_SECONDS"
OPENSEARCH_BULK_SYNC_AGGREGATION_ENABLED = "OPENSEARCH_BULK_SYNC_AGGREGATION_ENABLED"
OPENSEARCH_USE_AMAZON_OPENSEARCH_SERVICE = "OPENSEARCH_USE_AMAZON_OPENSEARCH_SERVICE"
OPENSEARCH_USE_AMAZON_OPENSEARCH_SERVERLESS = "OPENSEARCH_USE_AMAZON_OPENSEARCH_SERVERLESS"
OPENSEARCH_AWS_REGION = "OPENSEARCH_AWS_REGION"

MONGODB_HOST = "MONGODB_HOST"
MONGODB_DATABASE = "MONGODB_DATABASE"
Expand Down
23 changes: 14 additions & 9 deletions config/opensearch/opensearch_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
)

type OpenSearch struct {
OpenSearchConnectionUrl string
IndexName string
SyncEnabled bool
BulkEnabled bool
BulkFlushBytes int
BulkFlushIntervalSeconds int
BulkSyncAggregationEnabled bool
EndPoint string
IndexName string
SyncEnabled bool
BulkEnabled bool
BulkFlushBytes int
BulkSyncAggregationEnabled bool
UseAmazonOpenSearchService bool
UseAmazonOpenSearchServerless bool
AwsRegion string
}

func getBoolEnvWithDefault(key string, defaultVal bool) bool {
Expand Down Expand Up @@ -47,12 +49,15 @@ func getIntEnvWithDefault(key string, defaultVal int) int {

func OpenSearchConfig() OpenSearch {
var osCfg OpenSearch
osCfg.OpenSearchConnectionUrl = os.Getenv(constant.OPENSEARCH_HOST)
osCfg.EndPoint = os.Getenv(constant.OPENSEARCH_END_POINT)
osCfg.IndexName = os.Getenv(constant.OPENSEARCH_INDEX_NAME)
osCfg.SyncEnabled = getBoolEnvWithDefault(constant.OPENSEARCH_SYNC_ENABLED, false)
osCfg.BulkEnabled = getBoolEnvWithDefault(constant.OPENSEARCH_BULK_ENABLED, false)
osCfg.BulkFlushBytes = getIntEnvWithDefault(constant.OPENSEARCH_BULK_FLUSH_BYTES, 5e+6)
osCfg.BulkFlushIntervalSeconds = getIntEnvWithDefault(constant.OPENSEARCH_BULK_FLUSH_INTERVAL_SECONDS, 30)
osCfg.BulkSyncAggregationEnabled = getBoolEnvWithDefault(constant.OPENSEARCH_BULK_SYNC_AGGREGATION_ENABLED, false)
osCfg.UseAmazonOpenSearchService = getBoolEnvWithDefault(constant.OPENSEARCH_USE_AMAZON_OPENSEARCH_SERVICE, false)
osCfg.UseAmazonOpenSearchServerless = getBoolEnvWithDefault(constant.OPENSEARCH_USE_AMAZON_OPENSEARCH_SERVERLESS, false)
osCfg.AwsRegion = os.Getenv(constant.OPENSEARCH_AWS_REGION)

return osCfg
}
42 changes: 22 additions & 20 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,49 +1,51 @@
module github.com/cam-inc/mxtransporter

go 1.17
go 1.21

require (
cloud.google.com/go/bigquery v1.18.0
cloud.google.com/go/datacatalog v1.0.0 // indirect
cloud.google.com/go/kms v1.1.0 // indirect
cloud.google.com/go/pubsub v1.12.2
cloud.google.com/go/storage v1.18.2
github.com/aws/aws-sdk-go-v2 v1.15.0
github.com/aws/aws-sdk-go-v2/config v1.15.0
github.com/aws/aws-sdk-go-v2 v1.24.1
github.com/aws/aws-sdk-go-v2/config v1.26.6
github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.1
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.0
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0
github.com/joho/godotenv v1.3.0
github.com/opensearch-project/opensearch-go/v3 v3.0.0
github.com/spf13/cobra v1.2.1
go.mongodb.org/mongo-driver v1.5.3
go.uber.org/zap v1.19.1
golang.org/x/net v0.0.0-20210716203947-853a461950ff // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/text v0.3.7 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/text v0.13.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)

require (
cloud.google.com/go v0.97.0 // indirect
github.com/aws/aws-sdk-go v1.34.28 // indirect
github.com/aws/aws-sdk-go v1.48.13 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.10.0 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.6 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.0 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.0 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.16.16 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.14.11 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.2.10 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.5.10 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.7.3 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.10.4 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.0 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.10.10 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.13.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.11.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.16.0 // indirect
github.com/aws/smithy-go v1.11.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.18.7 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.21.7 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.26.7 // indirect
github.com/aws/smithy-go v1.19.0 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/go-cmp v0.5.7 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/googleapis/gax-go/v2 v2.1.1 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand All @@ -57,9 +59,9 @@ require (
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 // indirect
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/api v0.58.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
Loading

0 comments on commit 03b3b29

Please sign in to comment.