Skip to content

Commit

Permalink
feat: resource upsert (#239)
Browse files Browse the repository at this point in the history
* feat: add upsert resource in resource service

* feat: add upsert resource in resource handler

* feat: accept multiple resources in upsert resource api

* feat: add resource upload command

* feat: add error message to the upsert response

* fix: return error when failure found in resource upload command

* chore: update latest proto

* chore: update to latest proto with unary changes

* test: fix broken test cases in resource upsert

* refactor: reuse resource upsert & diff detection logic from deploy, in new upsert api

* chore: reword resource upload command examples

* feat: add successful resource names in resource upsert response

* chore: update proton commit
  • Loading branch information
arinda-arif authored Jun 25, 2024
1 parent 4dae8f4 commit 3172c53
Show file tree
Hide file tree
Showing 12 changed files with 1,639 additions and 304 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ NAME = "github.com/goto/optimus"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
OPMS_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "5499ce17808277216119aa715a105b688875666f"
PROTON_COMMIT := "0278f0402ee9d1b5f392e03c7f45160879924d78"


.PHONY: build test test-ci generate-proto unit-test-ci integration-test vet coverage clean install lint
Expand Down
1 change: 1 addition & 0 deletions client/cmd/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func NewResourceCommand() *cobra.Command {
cmd.AddCommand(NewChangeNamespaceCommand())
cmd.AddCommand(NewApplyCommand())
cmd.AddCommand(NewDeleteCommand())
cmd.AddCommand(NewUploadCommand())
cmd.AddCommand(NewPlanCommand())
return cmd
}
193 changes: 193 additions & 0 deletions client/cmd/resource/upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
package resource

import (
"context"
"fmt"
"time"

"github.com/MakeNowJust/heredoc"
"github.com/goto/salt/log"
"github.com/spf13/afero"
"github.com/spf13/cobra"

"github.com/goto/optimus/client/cmd/internal/connection"
"github.com/goto/optimus/client/cmd/internal/logger"
"github.com/goto/optimus/client/local"
"github.com/goto/optimus/client/local/model"
"github.com/goto/optimus/client/local/specio"
"github.com/goto/optimus/config"
"github.com/goto/optimus/core/resource"
pb "github.com/goto/optimus/protos/gotocompany/optimus/core/v1beta1"
)

const (
uploadTimeout = time.Minute * 30
defaultBatchSize = 10
)

type uploadCommand struct {
logger log.Logger
connection connection.Connection

clientConfig *config.ClientConfig
configFilePath string

namespaceName string
resourceNames []string
batchSize int

resourceSpecReadWriter local.SpecReadWriter[*model.ResourceSpec]
}

// NewUploadCommand initializes command for uploading a single resource
func NewUploadCommand() *cobra.Command {
uploadCmd := &uploadCommand{
logger: logger.NewClientLogger(),
}

cmd := &cobra.Command{
Use: "upload",
Short: "Upload a resource to server",
Long: heredoc.Doc(`Apply local changes to destination server which includes creating/updating resources`),
Example: "optimus resource upload -R <resource-1,resource-2> -n <namespace>",
Annotations: map[string]string{
"group:core": "true",
},
RunE: uploadCmd.RunE,
PreRunE: uploadCmd.PreRunE,
}
cmd.Flags().StringVarP(&uploadCmd.configFilePath, "config", "c", uploadCmd.configFilePath, "File path for client configuration")
cmd.Flags().StringVarP(&uploadCmd.namespaceName, "namespace", "n", "", "Namespace name in which the resources resides")
cmd.Flags().StringSliceVarP(&uploadCmd.resourceNames, "resources", "R", nil, "Resource names")
cmd.Flags().IntVarP(&uploadCmd.batchSize, "batch-size", "b", defaultBatchSize, "Number of resources to upload in a batch")

cmd.MarkFlagRequired("namespace")
return cmd
}

func (u *uploadCommand) PreRunE(_ *cobra.Command, _ []string) error {
var err error
u.clientConfig, err = config.LoadClientConfig(u.configFilePath)
if err != nil {
return err
}
u.connection = connection.New(u.logger, u.clientConfig)

resourceSpecReadWriter, err := specio.NewResourceSpecReadWriter(afero.NewOsFs())
if err != nil {
return fmt.Errorf("couldn't instantiate resource spec reader")
}
u.resourceSpecReadWriter = resourceSpecReadWriter

return nil
}

func (u *uploadCommand) RunE(_ *cobra.Command, _ []string) error {
namespace, err := u.clientConfig.GetNamespaceByName(u.namespaceName)
if err != nil {
return err
}

return u.upload(namespace)
}

func (u *uploadCommand) upload(namespace *config.Namespace) error {
conn, err := u.connection.Create(u.clientConfig.Host)
if err != nil {
return err
}
defer conn.Close()

resourceClient := pb.NewResourceServiceClient(conn)

ctx, cancelFunc := context.WithTimeout(context.Background(), uploadTimeout)
defer cancelFunc()

isFailed := false
for _, ds := range namespace.Datastore {
resourceSpecs, err := u.getResourceSpecs(ds.Path)
if err != nil {
u.logger.Error(err.Error())
isFailed = true
continue
}

resourceProtos := make([]*pb.ResourceSpecification, 0)
for _, resourceSpec := range resourceSpecs {
resourceProto, err := resourceSpec.ToProto()
if err != nil {
u.logger.Error(err.Error())
isFailed = true
continue
}
resourceProtos = append(resourceProtos, resourceProto)
}

countResources := len(resourceProtos)
for i := 0; i < countResources; i += u.batchSize {
endIndex := i + u.batchSize
if countResources < endIndex {
endIndex = countResources
}

upsertRequest := &pb.UpsertResourceRequest{
ProjectName: u.clientConfig.Project.Name,
NamespaceName: namespace.Name,
DatastoreName: ds.Type,
Resources: resourceProtos[i:endIndex],
}

resp, err := resourceClient.UpsertResource(ctx, upsertRequest)
if err != nil {
u.logger.Error("Unable to upload resource of namespace %s, err: %s", u.namespaceName, err)
isFailed = true
}
for _, result := range resp.Results {
message := result.Message
if message != "" {
message = fmt.Sprintf("(%s)", message)
}
if result.Status == resource.StatusFailure.String() {
u.logger.Error("[%s] %s %s", result.Status, result.ResourceName, message)
isFailed = true
continue
}
u.logger.Info("[%s] %s %s", result.Status, result.ResourceName, message)
}
}
}
if isFailed {
return fmt.Errorf("upload resource specifications to namespace %s failed", u.namespaceName)
}
u.logger.Info("finished uploading resource specifications to server\n")
return nil
}

func (u *uploadCommand) getResourceSpecs(namespaceResourcePath string) ([]*model.ResourceSpec, error) {
allResourcesInNamespace, err := u.resourceSpecReadWriter.ReadAll(namespaceResourcePath)
if err != nil {
return nil, err
}
resourceNameToSpecMap := make(map[string]*model.ResourceSpec, len(allResourcesInNamespace))
for _, spec := range allResourcesInNamespace {
resourceNameToSpecMap[spec.Name] = spec
}

resourceSpecs := make([]*model.ResourceSpec, 0)

if len(u.resourceNames) == 0 {
for _, spec := range resourceNameToSpecMap {
resourceSpecs = append(resourceSpecs, spec)
}
return resourceSpecs, nil
}

for _, resourceName := range u.resourceNames {
resourceSpec, ok := resourceNameToSpecMap[resourceName]
if !ok {
return nil, fmt.Errorf("resource %s not found in namespace %s", resourceName, u.namespaceName)
}
resourceSpecs = append(resourceSpecs, resourceSpec)
}
return resourceSpecs, nil
}
60 changes: 60 additions & 0 deletions core/resource/handler/v1beta1/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
type ResourceService interface {
Create(ctx context.Context, res *resource.Resource) error
Update(ctx context.Context, res *resource.Resource, logWriter writer.LogWriter) error
Upsert(ctx context.Context, res *resource.Resource, logWriter writer.LogWriter) error
Delete(ctx context.Context, req *resource.DeleteRequest) (*resource.DeleteResponse, error)
ChangeNamespace(ctx context.Context, datastore resource.Store, resourceFullName string, oldTenant, newTenant tenant.Tenant) error
Get(ctx context.Context, tnnt tenant.Tenant, store resource.Store, resourceName string) (*resource.Resource, error)
Expand Down Expand Up @@ -266,6 +267,65 @@ func (rh ResourceHandler) UpdateResource(ctx context.Context, req *pb.UpdateReso
return &pb.UpdateResourceResponse{}, nil
}

func (rh ResourceHandler) UpsertResource(ctx context.Context, req *pb.UpsertResourceRequest) (*pb.UpsertResourceResponse, error) {
tnnt, err := tenant.NewTenant(req.GetProjectName(), req.GetNamespaceName())
if err != nil {
rh.l.Error("invalid tenant information request project [%s] namespace [%s]: %s", req.GetProjectName(), req.GetNamespaceName(), err)
return nil, errors.GRPCErr(err, "failed to upsert resource")
}

store, err := resource.FromStringToStore(req.GetDatastoreName())
if err != nil {
rh.l.Error("invalid datastore name [%s]: %s", req.GetDatastoreName(), err)
return nil, errors.GRPCErr(err, "invalid upsert resource request")
}

if len(req.Resources) == 0 {
return nil, errors.InvalidArgument(resource.EntityResource, "empty resource")
}

logWriter := writer.NewLogWriter(rh.l)
result := make([]*pb.ResourceStatus, 0)

var successfulResourceNames []string
for _, reqResource := range req.Resources {
resourceSpec, err := fromResourceProto(reqResource, tnnt, store)
if err != nil {
errMsg := fmt.Sprintf("error adapting resource [%s]: %s", reqResource.GetName(), err)
logWriter.Write(writer.LogLevelError, errMsg)
result = append(result, rh.newResourceStatus(reqResource.GetName(), resource.StatusFailure.String(), errMsg))
continue
}

if err = rh.service.Upsert(ctx, resourceSpec, logWriter); err != nil {
errMsg := fmt.Sprintf("error deploying resource [%s]: %s", reqResource.GetName(), err)
logWriter.Write(writer.LogLevelError, errMsg)
result = append(result, rh.newResourceStatus(reqResource.GetName(), resource.StatusFailure.String(), errMsg))
continue
}

result = append(result, rh.newResourceStatus(resourceSpec.FullName(), resourceSpec.Status().String(), ""))
raiseResourceDatastoreEventMetric(tnnt, resourceSpec.Store().String(), resourceSpec.Kind(), resourceSpec.Status().String())

if resourceSpec.Status() == resource.StatusSuccess {
successfulResourceNames = append(successfulResourceNames, resourceSpec.FullName())
}
}

return &pb.UpsertResourceResponse{
Results: result,
SuccessfulResourceNames: successfulResourceNames,
}, nil
}

func (ResourceHandler) newResourceStatus(name, status, message string) *pb.ResourceStatus {
return &pb.ResourceStatus{
ResourceName: name,
Status: status,
Message: message,
}
}

func (rh ResourceHandler) DeleteResource(ctx context.Context, req *pb.DeleteResourceRequest) (*pb.DeleteResourceResponse, error) {
tnnt, err := tenant.NewTenant(req.GetProjectName(), req.GetNamespaceName())
if err != nil {
Expand Down
Loading

0 comments on commit 3172c53

Please sign in to comment.