Skip to content

Commit

Permalink
add status dispatcher test.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <lcao@redhat.com>
  • Loading branch information
morvencao committed Jan 14, 2025
1 parent 67aa5b0 commit e346d62
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 4 deletions.
8 changes: 4 additions & 4 deletions test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
workv1 "open-cluster-management.io/api/work/v1"

"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
Expand Down Expand Up @@ -81,6 +80,7 @@ type Helper struct {
APIServer server.Server
MetricsServer server.Server
HealthCheckServer *server.HealthCheckServer
StatusDispatcher dispatcher.Dispatcher
EventServer server.EventServer
EventFilter controllers.EventFilter
ControllerManager *server.ControllersServer
Expand Down Expand Up @@ -143,13 +143,13 @@ func NewHelper(t *testing.T) *Helper {
helper.Env().Config.GRPCServer.DisableTLS = true

if helper.Broker != "grpc" {
statusDispatcher := dispatcher.NewHashDispatcher(
helper.StatusDispatcher = dispatcher.NewHashDispatcher(
helper.Env().Config.MessageBroker.ClientID,
helper.Env().Database.SessionFactory,
helper.Env().Clients.CloudEventsSource,
helper.Env().Config.EventServer.ConsistentHashConfig,
)
helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, statusDispatcher)
helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher)
helper.EventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory))
} else {
helper.EventServer = server.NewGRPCBroker(helper.EventBroadcaster)
Expand Down Expand Up @@ -296,7 +296,7 @@ func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bu
brokerConfig = mqttOptions
} else {
// initilize the grpc options
grpcOptions := grpc.NewGRPCOptions()
grpcOptions := grpcoptions.NewGRPCOptions()
grpcOptions.URL = fmt.Sprintf("%s:%s", helper.Env().Config.HTTPServer.Hostname, helper.Env().Config.GRPCServer.BrokerBindPort)
brokerConfig = grpcOptions
}
Expand Down
104 changes: 104 additions & 0 deletions test/integration/stash_dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package integration

import (
"context"
"os"
"testing"
"time"

. "github.com/onsi/gomega"
"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/test"
prommodel "github.com/prometheus/client_model/go"
)

func TestStatusDispatcher(t *testing.T) {
broker := os.Getenv("BROKER")
if broker == "grpc" {
t.Skip("StatusDispatcher is not supported with gRPC broker")
}

h, _ := test.RegisterIntegration(t)
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
}()

// two instances besides the current instance
instance1 := "instance1"
instance2 := "instance2"

// create 3 consumers
consumer1 := "plugh"
consumer2 := "xyzzy"
consumer3 := "thud"
_ = h.CreateConsumer(consumer1)
_ = h.CreateConsumer(consumer2)
_ = h.CreateConsumer(consumer3)

// should dispatch to all consumers for current instance
Eventually(func() bool {
return h.StatusDispatcher.Dispatch(consumer1) &&
h.StatusDispatcher.Dispatch(consumer2) &&
h.StatusDispatcher.Dispatch(consumer3)
}, 6*time.Second, 1*time.Second).Should(BeTrue())

// insert a new instance
instanceDao := dao.NewInstanceDao(&h.Env().Database.SessionFactory)
_, err := instanceDao.Create(ctx, &api.ServerInstance{
Meta: api.Meta{
ID: instance1,
},
LastHeartbeat: time.Now(),
Ready: false,
})
Expect(err).NotTo(HaveOccurred())

// should dispatch consumer based on the instance status
Eventually(func() bool {
return !h.StatusDispatcher.Dispatch(consumer1) &&
h.StatusDispatcher.Dispatch(consumer2) &&
!h.StatusDispatcher.Dispatch(consumer3)
}, 5*time.Second, 1*time.Second).Should(BeTrue())

// create another instance, but will be marked as unready after next checkInstances
_, err = instanceDao.Create(ctx, &api.ServerInstance{
Meta: api.Meta{
ID: instance2,
},
// last heartbeat is 3 seconds ago
LastHeartbeat: time.Now().Add(-3 * time.Second),
Ready: true,
})
Expect(err).NotTo(HaveOccurred())

// finally should dispatch to all consumers for current instance
Eventually(func() bool {
return !h.StatusDispatcher.Dispatch(consumer1) &&
h.StatusDispatcher.Dispatch(consumer2) &&
!h.StatusDispatcher.Dispatch(consumer3)
}, 6*time.Second, 1*time.Second).Should(BeTrue())

// check metrics for status resync
time.Sleep(3 * time.Second)
families := getServerMetrics(t, "http://localhost:8080/metrics")
labels := []*prommodel.LabelPair{
{Name: strPtr("source"), Value: strPtr("maestro")},
{Name: strPtr("cluster"), Value: strPtr(consumer1)},
{Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifests")},
}
checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 2.0)
labels = []*prommodel.LabelPair{
{Name: strPtr("source"), Value: strPtr("maestro")},
{Name: strPtr("cluster"), Value: strPtr(consumer2)},
{Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifests")},
}
checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 1.0)
labels = []*prommodel.LabelPair{
{Name: strPtr("source"), Value: strPtr("maestro")},
{Name: strPtr("cluster"), Value: strPtr(consumer3)},
{Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifests")},
}
checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 2.0)
}

0 comments on commit e346d62

Please sign in to comment.