diff --git a/test/helper.go b/test/helper.go index 17537654..ea060f97 100755 --- a/test/helper.go +++ b/test/helper.go @@ -24,7 +24,6 @@ import ( workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" @@ -81,6 +80,7 @@ type Helper struct { APIServer server.Server MetricsServer server.Server HealthCheckServer *server.HealthCheckServer + StatusDispatcher dispatcher.Dispatcher EventServer server.EventServer EventFilter controllers.EventFilter ControllerManager *server.ControllersServer @@ -143,13 +143,13 @@ func NewHelper(t *testing.T) *Helper { helper.Env().Config.GRPCServer.DisableTLS = true if helper.Broker != "grpc" { - statusDispatcher := dispatcher.NewHashDispatcher( + helper.StatusDispatcher = dispatcher.NewHashDispatcher( helper.Env().Config.MessageBroker.ClientID, helper.Env().Database.SessionFactory, helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig, ) - helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, statusDispatcher) + helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher) helper.EventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory)) } else { helper.EventServer = server.NewGRPCBroker(helper.EventBroadcaster) @@ -296,7 +296,7 @@ func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bu brokerConfig = mqttOptions } else { // initilize the grpc options - grpcOptions := grpc.NewGRPCOptions() + grpcOptions := grpcoptions.NewGRPCOptions() grpcOptions.URL = fmt.Sprintf("%s:%s", helper.Env().Config.HTTPServer.Hostname, helper.Env().Config.GRPCServer.BrokerBindPort) brokerConfig = grpcOptions } diff --git a/test/integration/stash_dispatcher_test.go b/test/integration/stash_dispatcher_test.go new file mode 100644 index 00000000..46a45c6a --- /dev/null +++ b/test/integration/stash_dispatcher_test.go @@ -0,0 +1,104 @@ +package integration + +import ( + "context" + "os" + "testing" + "time" + + . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/test" + prommodel "github.com/prometheus/client_model/go" +) + +func TestStatusDispatcher(t *testing.T) { + broker := os.Getenv("BROKER") + if broker == "grpc" { + t.Skip("StatusDispatcher is not supported with gRPC broker") + } + + h, _ := test.RegisterIntegration(t) + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + }() + + // two instances besides the current instance + instance1 := "instance1" + instance2 := "instance2" + + // create 3 consumers + consumer1 := "plugh" + consumer2 := "xyzzy" + consumer3 := "thud" + _ = h.CreateConsumer(consumer1) + _ = h.CreateConsumer(consumer2) + _ = h.CreateConsumer(consumer3) + + // should dispatch to all consumers for current instance + Eventually(func() bool { + return h.StatusDispatcher.Dispatch(consumer1) && + h.StatusDispatcher.Dispatch(consumer2) && + h.StatusDispatcher.Dispatch(consumer3) + }, 6*time.Second, 1*time.Second).Should(BeTrue()) + + // insert a new instance + instanceDao := dao.NewInstanceDao(&h.Env().Database.SessionFactory) + _, err := instanceDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ + ID: instance1, + }, + LastHeartbeat: time.Now(), + Ready: false, + }) + Expect(err).NotTo(HaveOccurred()) + + // should dispatch consumer based on the instance status + Eventually(func() bool { + return !h.StatusDispatcher.Dispatch(consumer1) && + h.StatusDispatcher.Dispatch(consumer2) && + !h.StatusDispatcher.Dispatch(consumer3) + }, 5*time.Second, 1*time.Second).Should(BeTrue()) + + // create another instance, but will be marked as unready after next checkInstances + _, err = instanceDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ + ID: instance2, + }, + // last heartbeat is 3 seconds ago + LastHeartbeat: time.Now().Add(-3 * time.Second), + Ready: true, + }) + Expect(err).NotTo(HaveOccurred()) + + // finally should dispatch to all consumers for current instance + Eventually(func() bool { + return !h.StatusDispatcher.Dispatch(consumer1) && + h.StatusDispatcher.Dispatch(consumer2) && + !h.StatusDispatcher.Dispatch(consumer3) + }, 6*time.Second, 1*time.Second).Should(BeTrue()) + + // check metrics for status resync + time.Sleep(3 * time.Second) + families := getServerMetrics(t, "http://localhost:8080/metrics") + labels := []*prommodel.LabelPair{ + {Name: strPtr("source"), Value: strPtr("maestro")}, + {Name: strPtr("cluster"), Value: strPtr(consumer1)}, + {Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifests")}, + } + checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 2.0) + labels = []*prommodel.LabelPair{ + {Name: strPtr("source"), Value: strPtr("maestro")}, + {Name: strPtr("cluster"), Value: strPtr(consumer2)}, + {Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifests")}, + } + checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 1.0) + labels = []*prommodel.LabelPair{ + {Name: strPtr("source"), Value: strPtr("maestro")}, + {Name: strPtr("cluster"), Value: strPtr(consumer3)}, + {Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifests")}, + } + checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 2.0) +}