From 878efb62a54a488c3dd87787852881eb5635fdae Mon Sep 17 00:00:00 2001 From: Chris Berkhout Date: Mon, 3 Mar 2025 17:10:14 +0100 Subject: [PATCH] Use propagated context when getting a token. --- x-pack/filebeat/input/o365audit/auth/auth.go | 6 +++--- x-pack/filebeat/input/o365audit/input.go | 11 ++++++----- x-pack/filebeat/input/o365audit/poll/poll.go | 2 +- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/x-pack/filebeat/input/o365audit/auth/auth.go b/x-pack/filebeat/input/o365audit/auth/auth.go index 96ba13dc6189..14dabee2b755 100644 --- a/x-pack/filebeat/input/o365audit/auth/auth.go +++ b/x-pack/filebeat/input/o365audit/auth/auth.go @@ -15,7 +15,7 @@ import ( // allows to obtain tokens. type TokenProvider interface { // Token returns a valid OAuth token, or an error. - Token() (string, error) + Token(ctx context.Context) (string, error) } // credentialTokenProvider extends azidentity.ClientSecretCredential with the @@ -23,10 +23,10 @@ type TokenProvider interface { type credentialTokenProvider azidentity.ClientSecretCredential // Token returns an oauth token that can be used for bearer authorization. -func (provider *credentialTokenProvider) Token() (string, error) { +func (provider *credentialTokenProvider) Token(ctx context.Context) (string, error) { inner := (*azidentity.ClientSecretCredential)(provider) tk, err := inner.GetToken( - context.TODO(), policy.TokenRequestOptions{Scopes: []string{"https://manage.office.com/.default"}}, + ctx, policy.TokenRequestOptions{Scopes: []string{"https://manage.office.com/.default"}}, ) if err != nil { return "", err diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index b01ba98ac907..b97011b3a067 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -99,7 +99,7 @@ func (inp *o365input) Test(src cursor.Source, ctx v2.TestContext) error { return err } - if _, err := auth.Token(); err != nil { + if _, err := auth.Token(ctxtool.FromCanceller(ctx.Cancelation)); err != nil { return fmt.Errorf("unable to acquire authentication token for tenant:%s: %w", tenantID, err) } @@ -135,21 +135,22 @@ func (inp *o365input) Run( } func (inp *o365input) runOnce( - ctx v2.Context, + v2ctx v2.Context, src cursor.Source, cursor cursor.Cursor, publisher cursor.Publisher, ) error { stream := src.(*stream) tenantID, contentType := stream.tenantID, stream.contentType - log := ctx.Logger.With("tenantID", tenantID, "contentType", contentType) + log := v2ctx.Logger.With("tenantID", tenantID, "contentType", contentType) + ctx := ctxtool.FromCanceller(v2ctx.Cancelation) tokenProvider, err := inp.config.NewTokenProvider(stream.tenantID) if err != nil { return err } - if _, err := tokenProvider.Token(); err != nil { + if _, err := tokenProvider.Token(ctx); err != nil { return fmt.Errorf("unable to acquire authentication token for tenant:%s: %w", stream.tenantID, err) } @@ -162,7 +163,7 @@ func (inp *o365input) runOnce( poll.WithTokenProvider(tokenProvider), poll.WithMinRequestInterval(delay), poll.WithLogger(log), - poll.WithContext(ctxtool.FromCanceller(ctx.Cancelation)), + poll.WithContext(ctx), poll.WithRequestDecorator( autorest.WithUserAgent(useragent.UserAgent("Filebeat-"+pluginName, version.GetDefaultVersion(), version.Commit(), version.BuildTime().String())), autorest.WithQueryParameters(mapstr.M{ diff --git a/x-pack/filebeat/input/o365audit/poll/poll.go b/x-pack/filebeat/input/o365audit/poll/poll.go index 09f4727a2037..1aed0cd6bbfb 100644 --- a/x-pack/filebeat/input/o365audit/poll/poll.go +++ b/x-pack/filebeat/input/o365audit/poll/poll.go @@ -94,7 +94,7 @@ func (r *Poller) fetchWithDelay(item Transaction, minDelay time.Duration) error append([]autorest.PrepareDecorator{}, item.RequestDecorators()...), r.decorators...) if r.tp != nil { - token, err := r.tp.Token() + token, err := r.tp.Token(r.ctx) if err != nil { return fmt.Errorf("failed getting a token: %w", err) }