Skip to content

Commit

Permalink
Pass namespaces to Fleet Server managed documents (#3535)
Browse files Browse the repository at this point in the history
  • Loading branch information
nchaulet authored May 16, 2024
1 parent da02802 commit 53ebe67
Show file tree
Hide file tree
Showing 12 changed files with 499 additions and 193 deletions.
27 changes: 27 additions & 0 deletions changelog/fragments/1715862782-support-namespaces.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: support-namespaces

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: Add suport for namespaces, Fleet server will now add the Namespaces property to created .fleet-* documennts.

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: "fleet-server"
# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/fleet-server/pull/3535
25 changes: 14 additions & 11 deletions internal/pkg/api/handleAck.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,23 +163,25 @@ func (ack *AckT) validateRequest(zlog zerolog.Logger, w http.ResponseWriter, r *
return &req, nil
}

func eventToActionResult(agentID, aType string, ev AckRequest_Events_Item) (acr model.ActionResult) {
func eventToActionResult(agentID, aType string, namespaces []string, ev AckRequest_Events_Item) (acr model.ActionResult) {
switch aType {
case string(REQUESTDIAGNOSTICS):
event, _ := ev.AsDiagnosticsEvent()
p, _ := json.Marshal(event.Data)
return model.ActionResult{
ActionID: event.ActionId,
AgentID: agentID,
Data: p,
Error: fromPtr(event.Error),
Timestamp: event.Timestamp.Format(time.RFC3339Nano),
ActionID: event.ActionId,
AgentID: agentID,
Namespaces: namespaces,
Data: p,
Error: fromPtr(event.Error),
Timestamp: event.Timestamp.Format(time.RFC3339Nano),
}
case string(INPUTACTION):
event, _ := ev.AsInputEvent()
return model.ActionResult{
ActionID: event.ActionId,
AgentID: agentID,
Namespaces: namespaces,
ActionInputType: event.ActionInputType,
StartedAt: event.StartedAt.Format(time.RFC3339Nano),
CompletedAt: event.CompletedAt.Format(time.RFC3339Nano),
Expand All @@ -191,10 +193,11 @@ func eventToActionResult(agentID, aType string, ev AckRequest_Events_Item) (acr
default: // UPGRADE action acks are also handled by handelUpgrade (deprecated func)
event, _ := ev.AsGenericEvent()
return model.ActionResult{
ActionID: event.ActionId,
AgentID: agentID,
Error: fromPtr(event.Error),
Timestamp: event.Timestamp.Format(time.RFC3339Nano),
ActionID: event.ActionId,
Namespaces: namespaces,
AgentID: agentID,
Error: fromPtr(event.Error),
Timestamp: event.Timestamp.Format(time.RFC3339Nano),
}
}
}
Expand Down Expand Up @@ -358,7 +361,7 @@ func (ack *AckT) handleActionResult(ctx context.Context, zlog zerolog.Logger, ag
defer span.End()

// Convert ack event to action result document
acr := eventToActionResult(agent.Id, action.Type, ev)
acr := eventToActionResult(agent.Id, action.Type, action.Namespaces, ev)

// Save action result document
if err := dl.CreateActionResult(ctx, ack.bulk, acr); err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/api/handleAck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestMakeUpdatePolicyBody(t *testing.T) {
func TestEventToActionResult(t *testing.T) {
agentID := "6e9b6655-8cfe-4eb6-9b2f-c10aefae7517"
t.Run("generic", func(t *testing.T) {
r := eventToActionResult(agentID, "UPGRADE", AckRequest_Events_Item{json.RawMessage(`{
r := eventToActionResult(agentID, "UPGRADE", []string{}, AckRequest_Events_Item{json.RawMessage(`{
"action_id": "test-action-id",
"message": "action message",
"timestamp": "2022-02-23T18:26:08.506128Z"
Expand All @@ -72,7 +72,7 @@ func TestEventToActionResult(t *testing.T) {
assert.Empty(t, r.Error)
})
t.Run("with error", func(t *testing.T) {
r := eventToActionResult(agentID, "UPGRADE", AckRequest_Events_Item{json.RawMessage(`{
r := eventToActionResult(agentID, "UPGRADE", []string{}, AckRequest_Events_Item{json.RawMessage(`{
"action_id": "test-action-id",
"message": "action message",
"timestamp": "2022-02-23T18:26:08.506128Z",
Expand All @@ -84,7 +84,7 @@ func TestEventToActionResult(t *testing.T) {
assert.Equal(t, "error message", r.Error)
})
t.Run("request diagnostics", func(t *testing.T) {
r := eventToActionResult(agentID, "REQUEST_DIAGNOSTICS", AckRequest_Events_Item{json.RawMessage(`{
r := eventToActionResult(agentID, "REQUEST_DIAGNOSTICS", []string{}, AckRequest_Events_Item{json.RawMessage(`{
"action_id": "test-action-id",
"message": "action message",
"timestamp": "2022-02-23T18:26:08.506128Z",
Expand All @@ -98,7 +98,7 @@ func TestEventToActionResult(t *testing.T) {
assert.Equal(t, "error message", r.Error)
})
t.Run("input action", func(t *testing.T) {
r := eventToActionResult(agentID, "INPUT_ACTION", AckRequest_Events_Item{json.RawMessage(`{
r := eventToActionResult(agentID, "INPUT_ACTION", []string{}, AckRequest_Events_Item{json.RawMessage(`{
"action_id": "test-action-id",
"message": "action message",
"timestamp": "2022-02-23T18:26:08.506128Z",
Expand Down
6 changes: 4 additions & 2 deletions internal/pkg/api/handleEnroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (et *EnrollerT) processRequest(zlog zerolog.Logger, w http.ResponseWriter,

cntEnroll.bodyIn.Add(readCounter.Count())

return et._enroll(r.Context(), rb, zlog, req, enrollAPI.PolicyID, ver)
return et._enroll(r.Context(), rb, zlog, req, enrollAPI.PolicyID, enrollAPI.Namespaces, ver)
}

// retrieveStaticTokenEnrollmentToken fetches the enrollment key record from the config static tokens.
Expand Down Expand Up @@ -190,7 +190,8 @@ func (et *EnrollerT) _enroll(
rb *rollback.Rollback,
zlog zerolog.Logger,
req *EnrollRequest,
policyID,
policyID string,
namespaces []string,
ver string,
) (*EnrollResponse, error) {
var agent model.Agent
Expand Down Expand Up @@ -272,6 +273,7 @@ func (et *EnrollerT) _enroll(
agentData := model.Agent{
Active: true,
PolicyID: policyID,
Namespaces: namespaces,
Type: string(req.Type),
EnrolledAt: now.UTC().Format(time.RFC3339),
LocalMetadata: localMeta,
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/api/handleEnroll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestEnroll(t *testing.T) {
}, nil)
bulker.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
"", nil)
resp, _ := et._enroll(ctx, rb, zlog, req, "1234", "8.9.0")
resp, _ := et._enroll(ctx, rb, zlog, req, "1234", []string{}, "8.9.0")

if resp.Action != "created" {
t.Fatal("enroll failed")
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/api/handleUpload.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ func (ut *UploadT) handleUploadBegin(_ zerolog.Logger, w http.ResponseWriter, r
return err
}

_, err = ut.authAgent(r, &agentID, ut.bulker, ut.cache)
agent, err := ut.authAgent(r, &agentID, ut.bulker, ut.cache)
if err != nil {
return err
}

// validate payload, enrich with additional fields, and write metadata doc to ES
info, err := ut.uploader.Begin(r.Context(), payload)
info, err := ut.uploader.Begin(r.Context(), agent.Namespaces, payload)
if err != nil {
return err
}
Expand Down
49 changes: 26 additions & 23 deletions internal/pkg/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ type Hash struct {
}

type MetaDoc struct {
ActionID string `json:"action_id"`
AgentID string `json:"agent_id"`
Source string `json:"src"`
File FileData `json:"file"`
UploadID string `json:"upload_id"`
Start time.Time `json:"upload_start"`
ActionID string `json:"action_id"`
AgentID string `json:"agent_id"`
Source string `json:"src"`
File FileData `json:"file"`
UploadID string `json:"upload_id"`
Start time.Time `json:"upload_start"`
Namespaces []string `json:"namespaces"`
}

// custom unmarshaller to make unix-epoch values work
Expand All @@ -71,26 +72,28 @@ func (m *MetaDoc) UnmarshalJSON(b []byte) error {
}

type ChunkInfo struct {
Pos int // Ordered chunk position in file
Last bool // Is this the final chunk in the file
SHA2 string
Size int
BID string // base id, matches metadata doc's _id
Index string
ID string // chunk _id
Pos int // Ordered chunk position in file
Last bool // Is this the final chunk in the file
SHA2 string
Size int
BID string // base id, matches metadata doc's _id
Index string
ID string // chunk _id
Namespaces []string
}

type Info struct {
ID string // upload operation identifier. Used to identify the upload process
DocID string // document ID of the uploaded file and chunks
Source string // which integration is performing the upload
AgentID string
ActionID string
ChunkSize int64
Total int64
Count int
Start time.Time
Status Status
ID string // upload operation identifier. Used to identify the upload process
DocID string // document ID of the uploaded file and chunks
Source string // which integration is performing the upload
AgentID string
ActionID string
Namespaces []string
ChunkSize int64
Total int64
Count int
Start time.Time
Status Status
}

// convenience functions for computing current "Status" based on the fields
Expand Down
24 changes: 14 additions & 10 deletions internal/pkg/file/uploader/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func New(chunkClient *elasticsearch.Client, bulker bulk.Bulk, cache cache.Cache,
}

// Start an upload operation
func (u *Uploader) Begin(ctx context.Context, data JSDict) (file.Info, error) {
func (u *Uploader) Begin(ctx context.Context, namespaces []string, data JSDict) (file.Info, error) {
vSpan, _ := apm.StartSpan(ctx, "validateFileInfo", "validate")
if data == nil {
vSpan.End()
Expand Down Expand Up @@ -92,15 +92,16 @@ func (u *Uploader) Begin(ctx context.Context, data JSDict) (file.Info, error) {
docID := fmt.Sprintf("%s.%s", actionID, agentID)

info := file.Info{
ID: id,
DocID: docID,
AgentID: agentID,
ActionID: actionID,
ChunkSize: file.MaxChunkSize,
Source: source,
Total: size,
Status: file.StatusAwaiting,
Start: time.Now(),
ID: id,
DocID: docID,
AgentID: agentID,
ActionID: actionID,
Namespaces: namespaces,
ChunkSize: file.MaxChunkSize,
Source: source,
Total: size,
Status: file.StatusAwaiting,
Start: time.Now(),
}
chunkCount := info.Total / info.ChunkSize
if info.Total%info.ChunkSize > 0 {
Expand All @@ -127,6 +128,9 @@ func (u *Uploader) Begin(ctx context.Context, data JSDict) (file.Info, error) {
if err := data.Put(info.Start.UnixMilli(), "@timestamp"); err != nil {
return file.Info{}, err
}
if err := data.Put(info.Namespaces, "namespaces"); err != nil {
return file.Info{}, err
}

/*
Write to storage
Expand Down
12 changes: 6 additions & 6 deletions internal/pkg/file/uploader/upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestUploadBeginReturnsCorrectInfo(t *testing.T) {
c, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
require.NoError(t, err)
u := New(nil, fakeBulk, c, int64(size), time.Hour)
info, err := u.Begin(context.Background(), data)
info, err := u.Begin(context.Background(), []string{}, data)
assert.NoError(t, err)

assert.Equal(t, int64(size), info.Total)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestUploadBeginWritesDocumentFromInputs(t *testing.T) {
c, err := cache.New(config.Cache{NumCounters: 100, MaxCost: 100000})
require.NoError(t, err)
u := New(nil, fakeBulk, c, int64(size), time.Hour)
_, err = u.Begin(context.Background(), data)
_, err = u.Begin(context.Background(), []string{}, data)
assert.NoError(t, err)

payload, ok := fakeBulk.Calls[0].Arguments[3].([]byte)
Expand Down Expand Up @@ -172,7 +172,7 @@ func TestUploadBeginCalculatesCorrectChunkCount(t *testing.T) {
data := makeUploadRequestDict(map[string]interface{}{
"file.size": tc.FileSize,
})
info, err := u.Begin(context.Background(), data)
info, err := u.Begin(context.Background(), []string{}, data)
assert.NoError(t, err)
assert.Equal(t, tc.ExpectedCount, info.Count)
})
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestUploadBeginMaxFileSize(t *testing.T) {
data := makeUploadRequestDict(map[string]interface{}{
"file.size": tc.FileSize,
})
_, err := u.Begin(context.Background(), data)
_, err := u.Begin(context.Background(), []string{}, data)
if tc.ShouldError {
assert.ErrorIs(t, err, ErrFileSizeTooLarge)
} else {
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestUploadRejectsMissingRequiredFields(t *testing.T) {
}
}

_, err = u.Begin(context.Background(), data)
_, err = u.Begin(context.Background(), []string{}, data)
assert.Errorf(t, err, "%s is a required field and should error if not provided", field)
})

Expand Down Expand Up @@ -343,7 +343,7 @@ func TestChunkMarksFinal(t *testing.T) {
"file.size": tc.FileSize,
})

info, err := u.Begin(context.Background(), data)
info, err := u.Begin(context.Background(), []string{}, data)
assert.NoError(t, err)

// for anything larger than 1-chunk, check for off-by-ones
Expand Down
21 changes: 18 additions & 3 deletions internal/pkg/model/schema.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 53ebe67

Please sign in to comment.