Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove direct dependency on Azure/go-autorest/autorest/adal #42959

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Fix documentation and implementation of raw message handling in Filebeat http_endpoint by removing it. {pull}41498[41498]
- Fix flaky test in filebeat Okta entity analytics provider. {issue}42059[42059] {pull}42123[42123]
- Fix IIS module logging errors in case application pool PDH counter is not found. {pull}42274[42274]
- Removed direct dependency on Azure/go-autorest/autorest/adal, which is deprecated. {issue}41463[41463] {pull}42959[42959]

==== Added

Expand Down
402 changes: 201 additions & 201 deletions NOTICE.txt

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.5.0
github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Azure/go-autorest/autorest/adal v0.9.24
github.com/aerospike/aerospike-client-go/v7 v7.7.1
github.com/apache/arrow/go/v17 v17.0.0
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.22
Expand Down Expand Up @@ -248,6 +247,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.10.0 // indirect
github.com/Azure/go-amqp v1.3.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.24 // indirect
github.com/Azure/go-autorest/autorest/to v0.4.0 // indirect
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
Expand Down
35 changes: 14 additions & 21 deletions x-pack/filebeat/input/o365audit/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,31 @@
package auth

import (
"fmt"
"context"

"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
)

// TokenProvider is the interface that wraps an authentication mechanism and
// allows to obtain tokens.
type TokenProvider interface {
// Token returns a valid OAuth token, or an error.
Token() (string, error)

// Renew must be called to re-authenticate against the oauth2 endpoint if
// when the API returns an Authentication error.
Renew() error
Token(ctx context.Context) (string, error)
}

// servicePrincipalToken extends adal.ServicePrincipalToken with the
// credentialTokenProvider extends azidentity.ClientSecretCredential with the
// the TokenProvider interface.
type servicePrincipalToken adal.ServicePrincipalToken
type credentialTokenProvider azidentity.ClientSecretCredential

// Token returns an oauth token that can be used for bearer authorization.
func (provider *servicePrincipalToken) Token() (string, error) {
inner := (*adal.ServicePrincipalToken)(provider)
if err := inner.EnsureFresh(); err != nil {
return "", fmt.Errorf("refreshing spt token: %w", err)
func (provider *credentialTokenProvider) Token(ctx context.Context) (string, error) {
inner := (*azidentity.ClientSecretCredential)(provider)
tk, err := inner.GetToken(
ctx, policy.TokenRequestOptions{Scopes: []string{"https://manage.office.com/.default"}},
)
if err != nil {
return "", err
}
token := inner.Token()
return token.OAuthToken(), nil
}

// Renew re-authenticates with the oauth2 endpoint to get a new Service Principal Token.
func (provider *servicePrincipalToken) Renew() error {
inner := (*adal.ServicePrincipalToken)(provider)
return inner.Refresh()
return tk.Token, nil
}
22 changes: 5 additions & 17 deletions x-pack/filebeat/input/o365audit/auth/cert.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,25 @@ import (
"crypto/x509"
"fmt"

"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"

"github.com/elastic/elastic-agent-libs/transport/tlscommon"
)

// NewProviderFromCertificate returns a TokenProvider that uses certificate-based
// authentication.
func NewProviderFromCertificate(
endpoint, resource, applicationID, tenantID string,
conf tlscommon.CertificateConfig) (sptp TokenProvider, err error) {
func NewProviderFromCertificate(resource, applicationID, tenantID string, conf tlscommon.CertificateConfig) (sptp TokenProvider, err error) {
cert, privKey, err := loadConfigCerts(conf)
if err != nil {
return nil, fmt.Errorf("failed loading certificates: %w", err)
}
oauth, err := adal.NewOAuthConfig(endpoint, tenantID)
if err != nil {
return nil, fmt.Errorf("error generating OAuthConfig: %w", err)
}

spt, err := adal.NewServicePrincipalTokenFromCertificate(
*oauth,
applicationID,
cert,
privKey,
resource,
)
cred, err := azidentity.NewClientCertificateCredential(tenantID, applicationID, []*x509.Certificate{cert}, privKey, nil)
if err != nil {
return nil, err
}
spt.SetAutoRefresh(true)
return (*servicePrincipalToken)(spt), nil

return (*credentialTokenProvider)(cred), nil
}

func loadConfigCerts(cfg tlscommon.CertificateConfig) (cert *x509.Certificate, key *rsa.PrivateKey, err error) {
Expand Down
20 changes: 10 additions & 10 deletions x-pack/filebeat/input/o365audit/auth/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@
package auth

import (
"fmt"

"github.com/Azure/go-autorest/autorest/adal"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/cloud"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
)

// NewProviderFromClientSecret returns a token provider that uses a secret
// for authentication.
func NewProviderFromClientSecret(endpoint, resource, applicationID, tenantID, secret string) (p TokenProvider, err error) {
oauth, err := adal.NewOAuthConfig(endpoint, tenantID)
if err != nil {
return nil, fmt.Errorf("error generating OAuthConfig: %w", err)
}
spt, err := adal.NewServicePrincipalToken(*oauth, applicationID, secret, resource)
clientOpts := azcore.ClientOptions{Cloud: cloud.Configuration{ActiveDirectoryAuthorityHost: endpoint}}

cred, err := azidentity.NewClientSecretCredential(
tenantID, applicationID, secret, &azidentity.ClientSecretCredentialOptions{ClientOptions: clientOpts},
)
if err != nil {
return nil, err
}
spt.SetAutoRefresh(true)
return (*servicePrincipalToken)(spt), nil

return (*credentialTokenProvider)(cred), nil
}
1 change: 0 additions & 1 deletion x-pack/filebeat/input/o365audit/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ func (c *Config) NewTokenProvider(tenantID string) (auth.TokenProvider, error) {
)
}
return auth.NewProviderFromCertificate(
c.API.AuthenticationEndpoint,
c.API.Resource,
c.ApplicationID,
tenantID,
Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/o365audit/contentblob.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@

func (c contentBlob) handleError(response *http.Response) (actions []poll.Action) {
var msg apiError
readJSONBody(response, &msg)

Check failure on line 99 in x-pack/filebeat/input/o365audit/contentblob.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value is not checked (errcheck)
c.env.Logger.Warnf("Got error %s: %+v", response.Status, msg)

if _, found := fatalErrors[msg.Error.Code]; found {
Expand All @@ -107,9 +107,8 @@
}

switch response.StatusCode {
case 401: // Authentication error. Renew oauth token and repeat this op.
case 401: // Authentication error. Repeat this op.
return []poll.Action{
poll.RenewToken(),
poll.Fetch(withDelay{contentBlob: c, delay: c.env.Config.PollInterval}),
}
case 404:
Expand Down
11 changes: 6 additions & 5 deletions x-pack/filebeat/input/o365audit/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
return err
}

if _, err := auth.Token(); err != nil {
if _, err := auth.Token(ctxtool.FromCanceller(ctx.Cancelation)); err != nil {
return fmt.Errorf("unable to acquire authentication token for tenant:%s: %w", tenantID, err)
}

Expand All @@ -117,7 +117,7 @@
if err == nil {
break
}
if ctx.Cancelation.Err() != err && err != context.Canceled {

Check failure on line 120 in x-pack/filebeat/input/o365audit/input.go

View workflow job for this annotation

GitHub Actions / lint (windows)

comparing with != will fail on wrapped errors. Use errors.Is to check for a specific error (errorlint)
msg := mapstr.M{}
msg.Put("error.message", err.Error())
msg.Put("event.kind", "pipeline_error")
Expand All @@ -125,31 +125,32 @@
Timestamp: time.Now(),
Fields: msg,
}
publisher.Publish(event, nil)

Check failure on line 128 in x-pack/filebeat/input/o365audit/input.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `publisher.Publish` is not checked (errcheck)
ctx.Logger.Errorf("Input failed: %v", err)
ctx.Logger.Infof("Restarting in %v", inp.config.API.ErrorRetryInterval)
timed.Wait(ctx.Cancelation, inp.config.API.ErrorRetryInterval)

Check failure on line 131 in x-pack/filebeat/input/o365audit/input.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `timed.Wait` is not checked (errcheck)
}
}
return nil
}

func (inp *o365input) runOnce(
ctx v2.Context,
v2ctx v2.Context,
src cursor.Source,
cursor cursor.Cursor,
publisher cursor.Publisher,
) error {
stream := src.(*stream)

Check failure on line 143 in x-pack/filebeat/input/o365audit/input.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value is not checked (errcheck)
tenantID, contentType := stream.tenantID, stream.contentType
log := ctx.Logger.With("tenantID", tenantID, "contentType", contentType)
log := v2ctx.Logger.With("tenantID", tenantID, "contentType", contentType)
ctx := ctxtool.FromCanceller(v2ctx.Cancelation)

tokenProvider, err := inp.config.NewTokenProvider(stream.tenantID)
if err != nil {
return err
}

if _, err := tokenProvider.Token(); err != nil {
if _, err := tokenProvider.Token(ctx); err != nil {
return fmt.Errorf("unable to acquire authentication token for tenant:%s: %w", stream.tenantID, err)
}

Expand All @@ -162,7 +163,7 @@
poll.WithTokenProvider(tokenProvider),
poll.WithMinRequestInterval(delay),
poll.WithLogger(log),
poll.WithContext(ctxtool.FromCanceller(ctx.Cancelation)),
poll.WithContext(ctx),
poll.WithRequestDecorator(
autorest.WithUserAgent(useragent.UserAgent("Filebeat-"+pluginName, version.GetDefaultVersion(), version.Commit(), version.BuildTime().String())),
autorest.WithQueryParameters(mapstr.M{
Expand Down Expand Up @@ -257,14 +258,14 @@
}
}
if env.Config.PreserveOriginalEvent {
b.PutValue("event.original", string(raw))

Check failure on line 261 in x-pack/filebeat/input/o365audit/input.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `b.PutValue` is not checked (errcheck)
}
if len(errs) > 0 {
msgs := make([]string, len(errs))
for idx, e := range errs {
msgs[idx] = e.Error()
}
b.PutValue("error.message", msgs)

Check failure on line 268 in x-pack/filebeat/input/o365audit/input.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `b.PutValue` is not checked (errcheck)
}
return b
}
3 changes: 1 addition & 2 deletions x-pack/filebeat/input/o365audit/listblobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
"encoding/json"
"errors"
"fmt"
"io/ioutil"

Check failure on line 11 in x-pack/filebeat/input/o365audit/listblobs.go

View workflow job for this annotation

GitHub Actions / lint (windows)

SA1019: "io/ioutil" has been deprecated since Go 1.19: As of Go 1.16, the same functionality is now provided by package [io] or package [os], and those implementations should be preferred in new code. See the specific function documentation for details. (staticcheck)
"net/http"
"sort"
"time"
Expand Down Expand Up @@ -177,16 +177,15 @@

func (l listBlob) handleError(response *http.Response) (actions []poll.Action) {
var msg apiError
readJSONBody(response, &msg)

Check failure on line 180 in x-pack/filebeat/input/o365audit/listblobs.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value is not checked (errcheck)
l.env.Logger.Warnf("Got error %s: %+v", response.Status, msg)
l.delay = l.env.Config.ErrorRetryInterval

switch response.StatusCode {
case 401:
// Authentication error. Renew oauth token and repeat this op.
// Authentication error. Repeat this op.
l.delay = l.env.Config.PollInterval
return []poll.Action{
poll.RenewToken(),
poll.Fetch(l),
}
case 408, 503:
Expand Down Expand Up @@ -275,7 +274,7 @@
}

func readJSONBody(response *http.Response, dest interface{}) error {
defer autorest.Respond(response,

Check failure on line 277 in x-pack/filebeat/input/o365audit/listblobs.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `autorest.Respond` is not checked (errcheck)
autorest.ByDiscardingBody(),
autorest.ByClosing())
body, err := ioutil.ReadAll(response.Body)
Expand Down
32 changes: 8 additions & 24 deletions x-pack/filebeat/input/o365audit/poll/poll.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,19 @@ func (r *Poller) fetch(item Transaction) error {

func (r *Poller) fetchWithDelay(item Transaction, minDelay time.Duration) error {
r.log.Debugf("* Fetch %s", item)

// Delay before getting the token, so it doesn't become stale.
delay := max(item.Delay(), minDelay)
r.log.Debugf(" -- wait %s for %s", delay, item)
time.Sleep(delay)

// The order here is important. item's decorators must come first as those
// set the URL, which is required by other decorators (WithQueryParameters).
decorators := append(
append([]autorest.PrepareDecorator{}, item.RequestDecorators()...),
r.decorators...)
if r.tp != nil {
token, err := r.tp.Token()
token, err := r.tp.Token(r.ctx)
if err != nil {
return fmt.Errorf("failed getting a token: %w", err)
}
Expand All @@ -99,12 +105,8 @@ func (r *Poller) fetchWithDelay(item Transaction, minDelay time.Duration) error
if err != nil {
return fmt.Errorf("failed preparing request: %w", err)
}
delay := max(item.Delay(), minDelay)
r.log.Debugf(" -- wait %s for %s", delay, request.URL.String())

response, err := autorest.Send(request,
autorest.DoCloseIfError(),
autorest.AfterDelay(delay))
autorest.DoCloseIfError())
if err != nil {
r.log.Warnf("-- error sending request: %v", err)
return r.fetchWithDelay(item, max(time.Minute, r.interval))
Expand Down Expand Up @@ -215,7 +217,6 @@ func (p *transactionList) pop() Transaction {
// Enqueuer is the interface provided to actions so they can act on a Poller.
type Enqueuer interface {
Enqueue(item Transaction) error
RenewToken() error
}

// Action is an operation returned by a transaction.
Expand All @@ -227,15 +228,6 @@ func (r *Poller) Enqueue(item Transaction) error {
return nil
}

// RenewToken renews the token provider's master token in the case of an
// authorization error.
func (r *Poller) RenewToken() error {
if r.tp == nil {
return errors.New("can't renew token: no token provider set")
}
return r.tp.Renew()
}

// Terminate action causes the poll loop to finish with the given error.
func Terminate(err error) Action {
return func(Enqueuer) error {
Expand All @@ -253,14 +245,6 @@ func Fetch(item Transaction) Action {
}
}

// RenewToken will renew the token provider's master token in the case of an
// authorization error.
func RenewToken() Action {
return func(q Enqueuer) error {
return q.RenewToken()
}
}

func max(a, b time.Duration) time.Duration {
if a < b {
return b
Expand Down
Loading