Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC Server Implementation #2758

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file modified .hooks/pre-push
100755 → 100644
Empty file.
76 changes: 76 additions & 0 deletions cns/grpc/client/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package main

import (
"context"
"fmt"
"log"
"time"

pb "github.com/Azure/azure-container-networking/cns/grpc/v1alpha"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

const (
address = "127.0.0.1"
port = 8080
)

func main() {
// Target server address
target := fmt.Sprintf("%v:%d", address, port)

// Create a connection to the gRPC server
conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()

// Create a new CNS client
client := pb.NewCNSClient(conn)

// Set up the context with a timeout
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

// Request to check server health
healthCheckRequest := &pb.HealthCheckRequest{}

// Make the gRPC call to HealthCheck
resp, err := client.HealthCheck(ctx, healthCheckRequest)
if err != nil {
log.Fatalf("failed to check health: %v", err)

Check failure on line 43 in cns/grpc/client/main.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

exitAfterDefer: log.Fatalf will exit, and `defer cancel()` will not run (gocritic)

Check failure on line 43 in cns/grpc/client/main.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, windows-latest)

exitAfterDefer: log.Fatalf will exit, and `defer cancel()` will not run (gocritic)
}

log.Printf("HealthCheck response: %v", resp.Status)

Check failure on line 46 in cns/grpc/client/main.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

avoid direct access to proto field resp.Status, use resp.GetStatus() instead (protogetter)

Check failure on line 46 in cns/grpc/client/main.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, windows-latest)

avoid direct access to proto field resp.Status, use resp.GetStatus() instead (protogetter)

// Request to get node info
nodeInfoRequest := &pb.NodeInfoRequest{
NodeID: "Node123",
}

// Make the gRPC call to GetNodeInfo
nodeInfoResp, err := client.GetNodeInfo(ctx, nodeInfoRequest)
if err != nil {
log.Fatalf("failed to get node info: %v", err)
}

log.Printf("GetNodeInfo response: NodeID=%v, Name=%v, IP=%v, IsHealthy=%v, Status=%v, Message=%v",
nodeInfoResp.NodeID, nodeInfoResp.Name, nodeInfoResp.Ip, nodeInfoResp.IsHealthy, nodeInfoResp.Status, nodeInfoResp.Message)

Check failure on line 60 in cns/grpc/client/main.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

avoid direct access to proto field nodeInfoResp.NodeID, use nodeInfoResp.GetNodeID() instead (protogetter)

// Request to set orchestrator info
// orchestratorRequest := &pb.SetOrchestratorInfoRequest{
// DncPartitionKey: "examplePartitionKey",
// NodeID: "exampleNodeID",
// OrchestratorType: "Kubernetes",
// }

// Make the gRPC call to SetOrchestratorInfo
// resp, err := client.SetOrchestratorInfo(ctx, orchestratorRequest)
// if err != nil {
// log.Fatalf("failed to set orchestrator info: %v", err)
// }

// log.Printf("SetOrchestratorInfo response: %v", resp)

Check failure on line 75 in cns/grpc/client/main.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

commentedOutCode: may want to remove commented-out code (gocritic)

Check failure on line 75 in cns/grpc/client/main.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, windows-latest)

commentedOutCode: may want to remove commented-out code (gocritic)
}
15 changes: 13 additions & 2 deletions cns/grpc/proto/server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@ syntax = "proto3";

package cns;

option go_package = "cns/grpc/cnsv1alpha";
option go_package = "cns/grpc/v1alpha";

// The Container Network Service (CNS) exposes a set of operations that allow the Delegated Network Controller (DNC) to manage
// and monitor nodes in an orchestrator's infrastructure.

// CNSService defines the gRPC service exposed by CNS to interact with DNC.
// CNS defines the gRPC service exposed by CNS to interact with DNC.
service CNS {
// Sets the orchestrator information for a node.
rpc SetOrchestratorInfo(SetOrchestratorInfoRequest) returns (SetOrchestratorInfoResponse);

// Retrieves detailed information about a specific node.
// Primarily used for health checks.
rpc GetNodeInfo(NodeInfoRequest) returns (NodeInfoResponse);

// HealthCheck is a simple method to check if the server is running.
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
}

// SetOrchestratorInfoRequest is the request message for setting the orchestrator information.
Expand All @@ -41,3 +44,11 @@ message NodeInfoResponse {
string status = 5; // The current status of the node (e.g., running, stopped).
string message = 6; // Additional information about the node's health or status.
}

// HealthCheckRequest is the request message for health check.
message HealthCheckRequest {}

// HealthCheckResponse is the response message for health check.
message HealthCheckResponse {
string status = 1; // The status message of the health check.
}
103 changes: 101 additions & 2 deletions cns/grpc/server.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
package grpc

import (
"context"
"fmt"
"log"
"net"
"strconv"
"sync"
"time"

pb "github.com/Azure/azure-container-networking/cns/grpc/cnsv1alpha"
"github.com/Azure/azure-container-networking/cns"
pb "github.com/Azure/azure-container-networking/cns/grpc/v1alpha"
"github.com/Azure/azure-container-networking/cns/logger"
"github.com/Azure/azure-container-networking/cns/restserver"
"github.com/Azure/azure-container-networking/cns/types"
"github.com/Azure/azure-container-networking/store"
"github.com/pkg/errors"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand All @@ -28,14 +35,17 @@
}

// CNSService defines the CNS gRPC service.
type CNS struct {

Check failure on line 38 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

exposedSyncMutex: don't embed sync.RWMutex (gocritic)

Check failure on line 38 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, windows-latest)

exposedSyncMutex: don't embed sync.RWMutex (gocritic)
pb.UnimplementedCNSServer
sync.RWMutex
Logger *zap.Logger
State *restserver.HTTPRestService
*restserver.HTTPRestService
state *restserver.HttpRestServiceState
store store.KeyValueStore
}

// NewServer initializes a new gRPC server instance.
func NewServer(settings ServerSettings, cnsService pb.CNSServer, logger *zap.Logger) (*Server, error) {

Check failure on line 48 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

importShadow: shadow of imported from 'github.com/Azure/azure-container-networking/cns/logger' package 'logger' (gocritic)

Check failure on line 48 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, windows-latest)

importShadow: shadow of imported from 'github.com/Azure/azure-container-networking/cns/logger' package 'logger' (gocritic)
if cnsService == nil {
ErrCNSServiceNotDefined := errors.New("CNS service is not defined")
return nil, fmt.Errorf("Failed to create new gRPC server: %w", ErrCNSServiceNotDefined)
Expand Down Expand Up @@ -72,3 +82,92 @@

return nil
}

// HealthCheck is a simple method to check if the server is running.
func (s *CNS) HealthCheck(ctx context.Context, req *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error) {

Check warning on line 87 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)

Check warning on line 87 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, windows-latest)

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
s.Logger.Info("HealthCheck called")
return &pb.HealthCheckResponse{Status: "Server is running"}, nil
}

// areNCsPresent returns true if NCs are present in CNS, false if no NCs are present.
func (s *CNS) areNCsPresent() bool {
if len(s.state.ContainerStatus) == 0 && len(s.state.ContainerIDByOrchestratorContext) == 0 {
return false
}
return true
}

// SaveState persists the current state of the service.
func (s *CNS) SaveState() error {
// Skip if a store is not provided.
if s.store == nil {
s.Logger.Warn("Store not initialized.")
return nil
}

// Update time stamp.
s.state.TimeStamp = time.Now()
err := s.store.Write("ContainerNetworkService", s.state)
if err != nil {
s.Logger.Error("Failed to save state", zap.Error(err))
}

return err
}

// SetOrchestratorInfo handles setting the orchestrator information for a node.
func (s *CNS) SetOrchestratorInfo(ctx context.Context, req *pb.SetOrchestratorInfoRequest) (*pb.SetOrchestratorInfoResponse, error) {

Check warning on line 119 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)

Check warning on line 119 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, windows-latest)

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
s.Logger.Info("SetOrchestratorInfo called", zap.String("nodeID", req.NodeID), zap.String("orchestratorType", req.OrchestratorType))

Check failure on line 120 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

avoid direct access to proto field req.NodeID, use req.GetNodeID() instead (protogetter)

s.Lock()

nodeID := s.state.NodeID

var returnMessage string
var returnCode types.ResponseCode

if nodeID == "" || nodeID == req.NodeID || !s.areNCsPresent() {
switch req.OrchestratorType {
case cns.ServiceFabric, cns.Kubernetes, cns.KubernetesCRD, cns.WebApps, cns.Batch, cns.DBforPostgreSQL, cns.AzureFirstParty:
s.state.OrchestratorType = req.OrchestratorType
s.state.NodeID = req.NodeID
logger.SetContextDetails(req.OrchestratorType, req.NodeID)
s.SaveState()

Check failure on line 135 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

Error return value of `s.SaveState` is not checked (errcheck)

Check failure on line 135 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, windows-latest)

Error return value of `s.SaveState` is not checked (errcheck)
returnMessage = "Orchestrator information set successfully"
returnCode = types.Success
default:
returnMessage = fmt.Sprintf("Invalid Orchestrator type %v", req.OrchestratorType)
returnCode = types.UnsupportedOrchestratorType
}
} else {
returnMessage = fmt.Sprintf("Invalid request since this node has already been registered as %s", nodeID)
returnCode = types.InvalidRequest
}

s.Logger.Info("SetOrchestratorInfo response", zap.String("returnMessage", returnMessage), zap.Int("returnCode", int(returnCode)))
resp := &pb.SetOrchestratorInfoResponse{}

s.Unlock()
return resp, nil
}

// GetNodeInfo handles retrieving detailed information about a specific node.
func (s *CNS) GetNodeInfo(ctx context.Context, req *pb.NodeInfoRequest) (*pb.NodeInfoResponse, error) {

Check warning on line 155 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)

Check warning on line 155 in cns/grpc/server.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, windows-latest)

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
s.Logger.Info("GetNodeInfo called", zap.String("nodeID", req.NodeID))

s.RLock()
defer s.RUnlock()

// Simulate getting node information.
nodeInfo := &pb.NodeInfoResponse{
NodeID: req.NodeID,
Name: "Sample",
Ip: "192.168.1.1",
IsHealthy: true,
Status: "running",
Message: "Node is healthy",
}

s.Logger.Info("GetNodeInfo response", zap.String("nodeID", nodeInfo.NodeID), zap.String("status", nodeInfo.Status))
return nodeInfo, nil
}
124 changes: 124 additions & 0 deletions cns/grpc/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package grpc

// import (
// "context"
// "fmt"
// "testing"

// "github.com/Azure/azure-container-networking/cns"
// pb "github.com/Azure/azure-container-networking/cns/grpc/cnsv1alpha"
// "github.com/Azure/azure-container-networking/cns/restserver"
// "github.com/Azure/azure-container-networking/cns/types"
// "github.com/stretchr/testify/assert"
// "go.uber.org/zap"
// )

// // MockKeyValueStore is a mock implementation of the KeyValueStore interface for testing purposes.
// type MockKeyValueStore struct {
// store map[string]interface{}
// }

// func NewMockKeyValueStore() *MockKeyValueStore {
// return &MockKeyValueStore{
// store: make(map[string]interface{}),
// }
// }

// func (m *MockKeyValueStore) Write(key string, value interface{}) error {
// m.store[key] = value
// return nil
// }

// func (m *MockKeyValueStore) Read(key string, value interface{}) error {
// if v, ok := m.store[key]; ok {
// // Type assertion to match the expected type of value
// ptr, ok := value.(*interface{})
// if !ok {
// return fmt.Errorf("value should be a pointer to interface{}")
// }
// *ptr = v
// return nil
// }
// return fmt.Errorf("key not found")
// }

// func (m *MockKeyValueStore) Delete(key string) error {
// delete(m.store, key)
// return nil
// }

// func (m *MockKeyValueStore) Exists(key string) bool {
// _, ok := m.store[key]
// return ok
// }

// func TestSetOrchestratorInfo(t* testing.T) {
// logger, _ := zap.NewDevelopment()
// state := &restserver.HttpRestServiceState{}
// // mockStore := NewMockKeyValueStore()

// cnsService := &CNS{
// Logger: logger,
// HTTPRestService: &restserver.HTTPRestService{},
// state: state,
// // store: mockStore,
// }

// tests := []struct {
// name string
// req *pb.SetOrchestratorInfoRequest
// expectedMessage string
// expectedCode types.ResponseCode
// }{
// {
// name: "ValidOrchestratorType",
// req: &pb.SetOrchestratorInfoRequest{
// DncPartitionKey: "partitionKey1",
// NodeID: "node1",
// OrchestratorType: cns.Kubernetes,
// },
// expectedMessage: "Orchestrator information set successfully",
// expectedCode: types.Success,
// },
// {
// name: "InvalidOrchestratorType",
// req: &pb.SetOrchestratorInfoRequest{
// DncPartitionKey: "partitionKey1",
// NodeID: "node1",
// OrchestratorType: "InvalidType",
// },
// expectedMessage: "Invalid Orchestrator type InvalidType",
// expectedCode: types.UnsupportedOrchestratorType,
// },
// {
// name: "NodeAlreadyRegistered",
// req: &pb.SetOrchestratorInfoRequest{
// DncPartitionKey: "partitionKey1",
// NodeID: "node2",
// OrchestratorType: cns.Kubernetes,
// },
// expectedMessage: "Invalid request since this node has already been registered as node1",
// expectedCode: types.InvalidRequest,
// },
// }

// for _, tt := range tests {
// t.Run(tt.name, func(t *testing.T) {
// // Simulate existing state for the "NodeAlreadyRegistered" test case
// if tt.name == "NodeAlreadyRegistered" {
// cnsService.state.NodeID = "node1"
// } else {
// cnsService.state.NodeID = ""
// }

// resp, err := cnsService.SetOrchestratorInfo(context.Background(), tt.req)
// assert.NoError(t, err)
// assert.NotNil(t, resp)

// // Verify the log message and return code
// assert.Equal(t, tt.req.OrchestratorType, cnsService.state.OrchestratorType)
// assert.Equal(t, tt.req.NodeID, cnsService.state.NodeID)
// })
// }
// }

Check failure on line 124 in cns/grpc/server_test.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

File is not `gofmt`-ed with `-s` (gofmt)

Check failure on line 124 in cns/grpc/server_test.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, windows-latest)

File is not `gofmt`-ed with `-s` (gofmt)
Loading
Loading