diff --git a/cmd/maestro/servecmd/cmd.go b/cmd/maestro/servecmd/cmd.go index 8f393b36..2fc5073f 100755 --- a/cmd/maestro/servecmd/cmd.go +++ b/cmd/maestro/servecmd/cmd.go @@ -13,7 +13,6 @@ import ( "github.com/openshift-online/maestro/cmd/maestro/server" "github.com/openshift-online/maestro/pkg/config" "github.com/openshift-online/maestro/pkg/controllers" - "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/db" "github.com/openshift-online/maestro/pkg/dispatcher" "github.com/openshift-online/maestro/pkg/event" @@ -40,8 +39,6 @@ func runServer(cmd *cobra.Command, args []string) { klog.Fatalf("Unable to initialize environment: %s", err.Error()) } - healthcheckServer := server.NewHealthCheckServer() - // Create event broadcaster to broadcast resource status update events to subscribers eventBroadcaster := event.NewEventBroadcaster() @@ -60,16 +57,13 @@ func runServer(cmd *cobra.Command, args []string) { subscriptionType := environments.Environment().Config.EventServer.SubscriptionType switch config.SubscriptionType(subscriptionType) { case config.SharedSubscriptionType: - statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource) + statusDispatcher = dispatcher.NewNoopDispatcher(environments.Environment().Database.SessionFactory, environments.Environment().Clients.CloudEventsSource) case config.BroadcastSubscriptionType: - statusDispatcher = dispatcher.NewHashDispatcher(environments.Environment().Config.MessageBroker.ClientID, dao.NewInstanceDao(&environments.Environment().Database.SessionFactory), - dao.NewConsumerDao(&environments.Environment().Database.SessionFactory), environments.Environment().Clients.CloudEventsSource, environments.Environment().Config.EventServer.ConsistentHashConfig) + statusDispatcher = dispatcher.NewHashDispatcher(environments.Environment().Config.MessageBroker.ClientID, environments.Environment().Database.SessionFactory, + environments.Environment().Clients.CloudEventsSource, environments.Environment().Config.EventServer.ConsistentHashConfig) default: klog.Errorf("Unsupported subscription type: %s", subscriptionType) } - - // Set the status dispatcher for the healthcheck server - healthcheckServer.SetStatusDispatcher(statusDispatcher) eventServer = server.NewMessageQueueEventServer(eventBroadcaster, statusDispatcher) eventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(environments.Environment().Database.SessionFactory)) } @@ -77,6 +71,7 @@ func runServer(cmd *cobra.Command, args []string) { // Create the servers apiserver := server.NewAPIServer(eventBroadcaster) metricsServer := server.NewMetricsServer() + healthcheckServer := server.NewHealthCheckServer() controllersServer := server.NewControllersServer(eventServer, eventFilter) ctx, cancel := context.WithCancel(context.Background()) diff --git a/cmd/maestro/server/healthcheck_server.go b/cmd/maestro/server/healthcheck_server.go index 22f01875..a418cba6 100755 --- a/cmd/maestro/server/healthcheck_server.go +++ b/cmd/maestro/server/healthcheck_server.go @@ -11,7 +11,6 @@ import ( "github.com/openshift-online/maestro/pkg/api" "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/db" - "github.com/openshift-online/maestro/pkg/dispatcher" "gorm.io/gorm" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" @@ -19,7 +18,6 @@ import ( type HealthCheckServer struct { httpServer *http.Server - statusDispatcher dispatcher.Dispatcher lockFactory db.LockFactory instanceDao dao.InstanceDao instanceID string @@ -49,10 +47,6 @@ func NewHealthCheckServer() *HealthCheckServer { return server } -func (s *HealthCheckServer) SetStatusDispatcher(dispatcher dispatcher.Dispatcher) { - s.statusDispatcher = dispatcher -} - func (s *HealthCheckServer) Start(ctx context.Context) { klog.Infof("Starting HealthCheck server") @@ -153,34 +147,22 @@ func (s *HealthCheckServer) checkInstances(ctx context.Context) { inactiveInstanceIDs := []string{} for _, instance := range instances { // Instances pulsing within the last three check intervals are considered as active. - if instance.LastHeartbeat.After(time.Now().Add(time.Duration(int(-3*time.Second) * s.heartbeatInterval))) { - if s.brokerType == "mqtt" { - if err := s.statusDispatcher.OnInstanceUp(instance.ID); err != nil { - klog.Errorf("Error to call OnInstanceUp handler for maestro instance %s: %s", instance.ID, err.Error()) - } - } - // mark the instance as active after it is added to the status dispatcher + if instance.LastHeartbeat.After(time.Now().Add(time.Duration(int(-3*time.Second)*s.heartbeatInterval))) && !instance.Ready { activeInstanceIDs = append(activeInstanceIDs, instance.ID) - } else { - if s.brokerType == "mqtt" { - if err := s.statusDispatcher.OnInstanceDown(instance.ID); err != nil { - klog.Errorf("Error to call OnInstanceDown handler for maestro instance %s: %s", instance.ID, err.Error()) - } - } - // mark the instance as inactive after it is removed from the status dispatcher + } else if instance.LastHeartbeat.Before(time.Now().Add(time.Duration(int(-3*time.Second)*s.heartbeatInterval))) && instance.Ready { inactiveInstanceIDs = append(inactiveInstanceIDs, instance.ID) } } if len(activeInstanceIDs) > 0 { - // batch mark active instances + // batch mark active instances, this will tigger status dispatcher to call onInstanceUp handler. if err := s.instanceDao.MarkReadyByIDs(ctx, activeInstanceIDs); err != nil { klog.Errorf("Unable to mark active maestro instances (%s): %s", activeInstanceIDs, err.Error()) } } if len(inactiveInstanceIDs) > 0 { - // batch mark inactive instances + // batch mark inactive instances, this will tigger status dispatcher to call onInstanceDown handler. if err := s.instanceDao.MarkUnreadyByIDs(ctx, inactiveInstanceIDs); err != nil { klog.Errorf("Unable to mark inactive maestro instances (%s): %s", inactiveInstanceIDs, err.Error()) } diff --git a/pkg/dao/instance.go b/pkg/dao/instance.go index 8be26a3c..13c754e5 100644 --- a/pkg/dao/instance.go +++ b/pkg/dao/instance.go @@ -2,6 +2,8 @@ package dao import ( "context" + "fmt" + "strings" "time" "gorm.io/gorm/clause" @@ -67,7 +69,10 @@ func (d *sqlInstanceDao) MarkReadyByIDs(ctx context.Context, ids []string) error db.MarkForRollback(ctx, err) return err } - return nil + + // call pg_notify to notify the server_instances channel + notify := fmt.Sprintf("select pg_notify('%s', '%s')", "server_instances", fmt.Sprintf("ready:%s", strings.Join(ids, ","))) + return g2.Exec(notify).Error } func (d *sqlInstanceDao) MarkUnreadyByIDs(ctx context.Context, ids []string) error { @@ -76,7 +81,9 @@ func (d *sqlInstanceDao) MarkUnreadyByIDs(ctx context.Context, ids []string) err db.MarkForRollback(ctx, err) return err } - return nil + // call pg_notify to notify the server_instances channel + notify := fmt.Sprintf("select pg_notify('%s', '%s')", "server_instances", fmt.Sprintf("unready:%s", strings.Join(ids, ","))) + return g2.Exec(notify).Error } func (d *sqlInstanceDao) Delete(ctx context.Context, id string) error { diff --git a/pkg/dispatcher/dispatcher.go b/pkg/dispatcher/dispatcher.go index 5d539d99..99387aa7 100644 --- a/pkg/dispatcher/dispatcher.go +++ b/pkg/dispatcher/dispatcher.go @@ -13,8 +13,4 @@ type Dispatcher interface { Start(ctx context.Context) // Dispatch determines if the current Maestro instance should process the resource status update based on the consumer ID. Dispatch(consumerName string) bool - // OnInstanceUp is called when a new maestro instance is up. - OnInstanceUp(instanceID string) error - // OnInstanceDown is called when a maestro instance is inactive. - OnInstanceDown(instanceID string) error } diff --git a/pkg/dispatcher/hash_dispatcher.go b/pkg/dispatcher/hash_dispatcher.go index 6a94c13b..9426a3b4 100644 --- a/pkg/dispatcher/hash_dispatcher.go +++ b/pkg/dispatcher/hash_dispatcher.go @@ -3,6 +3,7 @@ package dispatcher import ( "context" "fmt" + "strings" "sync" "time" @@ -13,9 +14,11 @@ import ( "github.com/openshift-online/maestro/pkg/client/cloudevents" "github.com/openshift-online/maestro/pkg/config" "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/db" "github.com/openshift-online/maestro/pkg/logger" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" ) var _ Dispatcher = &HashDispatcher{} @@ -24,23 +27,25 @@ var _ Dispatcher = &HashDispatcher{} // Only the maestro instance that is mapped to a consumer will process the resource status update from that consumer. // Need to trigger status resync for the consumer when an instance is up or down. type HashDispatcher struct { - instanceID string - instanceDao dao.InstanceDao - consumerDao dao.ConsumerDao - sourceClient cloudevents.SourceClient - consumerSet mapset.Set[string] - workQueue workqueue.RateLimitingInterface - consistent *consistent.Consistent + instanceID string + sessionFactory db.SessionFactory + instanceDao dao.InstanceDao + consumerDao dao.ConsumerDao + sourceClient cloudevents.SourceClient + consumerSet mapset.Set[string] + workQueue workqueue.RateLimitingInterface + consistent *consistent.Consistent } -func NewHashDispatcher(instanceID string, instanceDao dao.InstanceDao, consumerDao dao.ConsumerDao, sourceClient cloudevents.SourceClient, consistentHashingConfig *config.ConsistentHashConfig) *HashDispatcher { +func NewHashDispatcher(instanceID string, sessionFactory db.SessionFactory, sourceClient cloudevents.SourceClient, consistentHashingConfig *config.ConsistentHashConfig) *HashDispatcher { return &HashDispatcher{ - instanceID: instanceID, - instanceDao: instanceDao, - consumerDao: consumerDao, - sourceClient: sourceClient, - consumerSet: mapset.NewSet[string](), - workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "hash-dispatcher"), + instanceID: instanceID, + sessionFactory: sessionFactory, + instanceDao: dao.NewInstanceDao(&sessionFactory), + consumerDao: dao.NewConsumerDao(&sessionFactory), + sourceClient: sourceClient, + consumerSet: mapset.NewSet[string](), + workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "hash-dispatcher"), consistent: consistent.New(nil, consistent.Config{ PartitionCount: consistentHashingConfig.PartitionCount, ReplicationFactor: consistentHashingConfig.ReplicationFactor, @@ -58,6 +63,10 @@ func (d *HashDispatcher) Start(ctx context.Context) { // start a goroutine to periodically check the instances and consumers. go wait.UntilWithContext(ctx, d.check, 5*time.Second) + // listen for server_instance update + klog.Infof("HashDispatcher listening for server_instances updates") + go d.sessionFactory.NewListener(ctx, "server_instances", d.onInstanceUpdate) + // start a goroutine to resync current consumers for this source when the client is reconnected go d.resyncOnReconnect(ctx) @@ -66,6 +75,28 @@ func (d *HashDispatcher) Start(ctx context.Context) { d.workQueue.ShutDown() } +func (d *HashDispatcher) onInstanceUpdate(ids string) { + states := strings.Split(ids, ":") + if len(states) != 2 { + klog.Infof("watched server instances updated with invalid ids: %s", ids) + return + } + idList := strings.Split(states[1], ",") + if states[0] == "ready" { + for _, id := range idList { + if err := d.onInstanceUp(id); err != nil { + klog.Errorf("failed to call OnInstancesUp for instance %s: %s", id, err) + } + } + } else { + for _, id := range idList { + if err := d.onInstanceDown(id); err != nil { + klog.Errorf("failed to call OnInstancesDown for instance %s: %s", id, err) + } + } + } +} + // resyncOnReconnect listens for the client reconnected signal and resyncs current consumers for this source. func (d *HashDispatcher) resyncOnReconnect(ctx context.Context) { log := logger.NewOCMLogger(ctx) @@ -90,8 +121,8 @@ func (d *HashDispatcher) Dispatch(consumerName string) bool { return d.consumerSet.Contains(consumerName) } -// OnInstanceUp adds the new instance to the hashing ring and updates the consumer set for the current instance. -func (d *HashDispatcher) OnInstanceUp(instanceID string) error { +// onInstanceUp adds the new instance to the hashing ring and updates the consumer set for the current instance. +func (d *HashDispatcher) onInstanceUp(instanceID string) error { members := d.consistent.GetMembers() for _, member := range members { if member.String() == instanceID { @@ -110,8 +141,8 @@ func (d *HashDispatcher) OnInstanceUp(instanceID string) error { return d.updateConsumerSet() } -// OnInstanceDown removes the instance from the hashing ring and updates the consumer set for the current instance. -func (d *HashDispatcher) OnInstanceDown(instanceID string) error { +// onInstanceDown removes the instance from the hashing ring and updates the consumer set for the current instance. +func (d *HashDispatcher) onInstanceDown(instanceID string) error { members := d.consistent.GetMembers() deletedMember := true for _, member := range members { diff --git a/pkg/dispatcher/noop_dispatcher.go b/pkg/dispatcher/noop_dispatcher.go index f1dd0f66..13bb58c9 100644 --- a/pkg/dispatcher/noop_dispatcher.go +++ b/pkg/dispatcher/noop_dispatcher.go @@ -3,10 +3,13 @@ package dispatcher import ( "context" "fmt" + "strings" "github.com/openshift-online/maestro/pkg/client/cloudevents" "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/pkg/db" "github.com/openshift-online/maestro/pkg/logger" + "k8s.io/klog/v2" ) var _ Dispatcher = &NoopDispatcher{} @@ -15,22 +18,32 @@ var _ Dispatcher = &NoopDispatcher{} // to the current maestro instance. This is the default implementation when shared subscription is enabled. // Need to trigger status resync from all consumers when an instance is down. type NoopDispatcher struct { - consumerDao dao.ConsumerDao - sourceClient cloudevents.SourceClient + sessionFactory db.SessionFactory + consumerDao dao.ConsumerDao + sourceClient cloudevents.SourceClient } // NewNoopDispatcher creates a new NoopDispatcher instance. -func NewNoopDispatcher(consumerDao dao.ConsumerDao, sourceClient cloudevents.SourceClient) *NoopDispatcher { +func NewNoopDispatcher(sessionFactory db.SessionFactory, sourceClient cloudevents.SourceClient) *NoopDispatcher { return &NoopDispatcher{ - consumerDao: consumerDao, - sourceClient: sourceClient, + sessionFactory: sessionFactory, + consumerDao: dao.NewConsumerDao(&sessionFactory), + sourceClient: sourceClient, } } // Start is a no-op implementation. func (d *NoopDispatcher) Start(ctx context.Context) { // handle client reconnected signal and resync status from consumers for this source - d.resyncOnReconnect(ctx) + go d.resyncOnReconnect(ctx) + + // listen for server_instance update + klog.Infof("NoopDispatcher listening for server_instances updates") + go d.sessionFactory.NewListener(ctx, "server_instances", d.onInstanceUpdate) + + // wait until context is canceled + <-ctx.Done() + } // resyncOnReconnect listens for client reconnected signal and resyncs all consumers for this source. @@ -61,18 +74,28 @@ func (d *NoopDispatcher) resyncOnReconnect(ctx context.Context) { } } +func (d *NoopDispatcher) onInstanceUpdate(ids string) { + states := strings.Split(ids, ":") + if len(states) != 2 { + klog.Infof("watched server instances updated with invalid ids: %s", ids) + return + } + idList := strings.Split(states[1], ",") + if states[0] == "unready" && len(idList) > 0 { + // only call onInstanceDown once with empty instance id to reduce the number of status resync requests + if err := d.onInstanceDown(); err != nil { + klog.Errorf("failed to call OnInstancesDown: %s", err) + } + } +} + // Dispatch always returns true, indicating that the current maestro instance should process the resource status update. func (d *NoopDispatcher) Dispatch(consumerID string) bool { return true } -// OnInstanceUp is a no-op implementation. -func (d *NoopDispatcher) OnInstanceUp(instanceID string) error { - return nil -} - -// OnInstanceDown triggers status resync from all consumers. -func (d *NoopDispatcher) OnInstanceDown(instanceID string) error { +// onInstanceDown calls status resync when there is down instance watched. +func (d *NoopDispatcher) onInstanceDown() error { // send resync request to each consumer // TODO: optimize this to only resync resource status for necessary consumers consumerIDs := []string{} diff --git a/test/helper.go b/test/helper.go index 4af5f48e..ea060f97 100755 --- a/test/helper.go +++ b/test/helper.go @@ -24,7 +24,6 @@ import ( workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" @@ -81,6 +80,7 @@ type Helper struct { APIServer server.Server MetricsServer server.Server HealthCheckServer *server.HealthCheckServer + StatusDispatcher dispatcher.Dispatcher EventServer server.EventServer EventFilter controllers.EventFilter ControllerManager *server.ControllersServer @@ -143,15 +143,13 @@ func NewHelper(t *testing.T) *Helper { helper.Env().Config.GRPCServer.DisableTLS = true if helper.Broker != "grpc" { - statusDispatcher := dispatcher.NewHashDispatcher( + helper.StatusDispatcher = dispatcher.NewHashDispatcher( helper.Env().Config.MessageBroker.ClientID, - dao.NewInstanceDao(&helper.Env().Database.SessionFactory), - dao.NewConsumerDao(&helper.Env().Database.SessionFactory), + helper.Env().Database.SessionFactory, helper.Env().Clients.CloudEventsSource, helper.Env().Config.EventServer.ConsistentHashConfig, ) - helper.HealthCheckServer.SetStatusDispatcher(statusDispatcher) - helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, statusDispatcher) + helper.EventServer = server.NewMessageQueueEventServer(helper.EventBroadcaster, helper.StatusDispatcher) helper.EventFilter = controllers.NewLockBasedEventFilter(db.NewAdvisoryLockFactory(helper.Env().Database.SessionFactory)) } else { helper.EventServer = server.NewGRPCBroker(helper.EventBroadcaster) @@ -298,7 +296,7 @@ func (helper *Helper) StartWorkAgent(ctx context.Context, clusterName string, bu brokerConfig = mqttOptions } else { // initilize the grpc options - grpcOptions := grpc.NewGRPCOptions() + grpcOptions := grpcoptions.NewGRPCOptions() grpcOptions.URL = fmt.Sprintf("%s:%s", helper.Env().Config.HTTPServer.Hostname, helper.Env().Config.GRPCServer.BrokerBindPort) brokerConfig = grpcOptions } diff --git a/test/integration/healthcheck_test.go b/test/integration/healthcheck_test.go new file mode 100644 index 00000000..508bf768 --- /dev/null +++ b/test/integration/healthcheck_test.go @@ -0,0 +1,70 @@ +package integration + +import ( + "context" + "fmt" + "testing" + "time" + + . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/test" +) + +func TestHealthCheckServer(t *testing.T) { + h, _ := test.RegisterIntegration(t) + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + }() + + instanceDao := dao.NewInstanceDao(&h.Env().Database.SessionFactory) + // insert two existing instances, one is ready and the other is not + _, err := instanceDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ + ID: "instance1", + }, + // last heartbeat is 3 seconds ago + LastHeartbeat: time.Now().Add(-3 * time.Second), + Ready: true, + }) + Expect(err).NotTo(HaveOccurred()) + + _, err = instanceDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ + ID: "instance2", + }, + // last heartbeat is 3 seconds ago + LastHeartbeat: time.Now().Add(-3 * time.Second), + Ready: false, + }) + Expect(err).NotTo(HaveOccurred()) + + instanceID := &h.Env().Config.MessageBroker.ClientID + Eventually(func() error { + instances, err := instanceDao.All(ctx) + if err != nil { + return err + } + + if len(instances) != 3 { + return fmt.Errorf("expected 3 instances, got %d", len(instances)) + } + + readyInstanceIDs, err := instanceDao.FindReadyIDs(ctx) + if err != nil { + return err + } + + if len(readyInstanceIDs) != 1 { + return fmt.Errorf("expected 1 ready instance, got %d", len(readyInstanceIDs)) + } + + if readyInstanceIDs[0] != *instanceID { + return fmt.Errorf("expected instance %s to be ready, got %s", *instanceID, readyInstanceIDs[0]) + } + + return nil + }, 10*time.Second, 1*time.Second).Should(Succeed()) +} diff --git a/test/integration/status_dispatcher_test.go b/test/integration/status_dispatcher_test.go new file mode 100644 index 00000000..9055491a --- /dev/null +++ b/test/integration/status_dispatcher_test.go @@ -0,0 +1,79 @@ +package integration + +import ( + "context" + "os" + "testing" + "time" + + . "github.com/onsi/gomega" + "github.com/openshift-online/maestro/pkg/api" + "github.com/openshift-online/maestro/pkg/dao" + "github.com/openshift-online/maestro/test" + prommodel "github.com/prometheus/client_model/go" +) + +func TestStatusDispatcher(t *testing.T) { + broker := os.Getenv("BROKER") + if broker == "grpc" { + t.Skip("StatusDispatcher is not supported with gRPC broker") + } + + h, _ := test.RegisterIntegration(t) + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + }() + + // create 2 consumers + consumer1 := "xyzzy" + consumer2 := "thud" + _ = h.CreateConsumer(consumer1) + _ = h.CreateConsumer(consumer2) + + // should dispatch to all consumers for current instance + Eventually(func() bool { + return h.StatusDispatcher.Dispatch(consumer1) && + h.StatusDispatcher.Dispatch(consumer2) + }, 6*time.Second, 1*time.Second).Should(BeTrue()) + + // insert a new instance and healthcheck server will mark it as ready and then add it to the hash ring + instanceDao := dao.NewInstanceDao(&h.Env().Database.SessionFactory) + _, err := instanceDao.Create(ctx, &api.ServerInstance{ + Meta: api.Meta{ + ID: "instance1", + }, + LastHeartbeat: time.Now(), + Ready: false, + }) + Expect(err).NotTo(HaveOccurred()) + + // should dispatch consumer based on the new hash ring + Eventually(func() bool { + return h.StatusDispatcher.Dispatch(consumer1) && + !h.StatusDispatcher.Dispatch(consumer2) + }, 5*time.Second, 1*time.Second).Should(BeTrue()) + + // finally should dispatch to all consumers for current instance + // as instance1 will be unready and removed from the hash ring + Eventually(func() bool { + return h.StatusDispatcher.Dispatch(consumer1) && + h.StatusDispatcher.Dispatch(consumer2) + }, 6*time.Second, 1*time.Second).Should(BeTrue()) + + // check metrics for status resync + time.Sleep(1 * time.Second) + families := getServerMetrics(t, "http://localhost:8080/metrics") + labels := []*prommodel.LabelPair{ + {Name: strPtr("source"), Value: strPtr("maestro")}, + {Name: strPtr("cluster"), Value: strPtr(consumer1)}, + {Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifests")}, + } + checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 1.0) + labels = []*prommodel.LabelPair{ + {Name: strPtr("source"), Value: strPtr("maestro")}, + {Name: strPtr("cluster"), Value: strPtr(consumer2)}, + {Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifests")}, + } + checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 2.0) +} diff --git a/test/integration/status_hash_test.go b/test/integration/status_hash_test.go deleted file mode 100644 index ec6d4d1b..00000000 --- a/test/integration/status_hash_test.go +++ /dev/null @@ -1,145 +0,0 @@ -package integration - -import ( - "context" - "fmt" - "testing" - "time" - - . "github.com/onsi/gomega" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/rand" - workv1 "open-cluster-management.io/api/work/v1" - - "github.com/openshift-online/maestro/pkg/api" - "github.com/openshift-online/maestro/pkg/dao" - "github.com/openshift-online/maestro/test" -) - -func TestEventServer(t *testing.T) { - h, _ := test.RegisterIntegration(t) - ctx, cancel := context.WithCancel(context.Background()) - defer func() { - cancel() - }() - - instanceDao := dao.NewInstanceDao(&h.Env().Database.SessionFactory) - // insert one existing instances - _, err := instanceDao.Create(ctx, &api.ServerInstance{ - Meta: api.Meta{ - ID: "instance1", - }, - LastHeartbeat: time.Now(), - }) - Expect(err).NotTo(HaveOccurred()) - - instanceID := &h.Env().Config.MessageBroker.ClientID - Eventually(func() error { - instances, err := instanceDao.All(ctx) - if err != nil { - return err - } - - if len(instances) != 2 { - return fmt.Errorf("expected 1 instance, got %d", len(instances)) - } - - var instance *api.ServerInstance - for _, i := range instances { - if i.ID == *instanceID { - instance = i - } - } - - if instance.LastHeartbeat.IsZero() { - return fmt.Errorf("expected instance.LastHeartbeat to be non-zero") - } - - if !instance.Ready { - return fmt.Errorf("expected instance.Ready to be true") - } - - if instance.ID != *instanceID { - return fmt.Errorf("expected instance.ID to be %s, got %s", *instanceID, instance.ID) - } - - return nil - }, 10*time.Second, 1*time.Second).Should(Succeed()) - - // the cluster1 name cannot be changed, because consistent hash makes it allocate to different instance. - // the case here we want to the new consumer allocate to new instance(cluster1) which is a fake instance. - // after 3*heartbeatInterval (3s), it will relocate to maestro instance. - clusterName := "cluster1" - consumer := h.CreateConsumer(clusterName) - - // insert a new instance with the same name to consumer name - // to make sure the consumer is hashed to the new instance firstly. - // after the new instance is stale after 3*heartbeatInterval (3s), the current - // instance will take over the consumer and resync the resource status. - _, err = instanceDao.Create(ctx, &api.ServerInstance{ - Meta: api.Meta{ - ID: clusterName, - }, - LastHeartbeat: time.Now(), - Ready: true, - }) - Expect(err).NotTo(HaveOccurred()) - - deployName := fmt.Sprintf("nginx-%s", rand.String(5)) - res := h.CreateResource(consumer.Name, deployName, 1) - h.StartControllerManager(ctx) - h.StartWorkAgent(ctx, consumer.Name, false) - clientHolder := h.WorkAgentHolder - agentWorkClient := clientHolder.ManifestWorks(consumer.Name) - resourceService := h.Env().Services.Resources() - - var work *workv1.ManifestWork - Eventually(func() error { - // ensure the work can be get by work client - work, err = agentWorkClient.Get(ctx, res.ID, metav1.GetOptions{}) - if err != nil { - return err - } - - return nil - }, 3*time.Second, 1*time.Second).Should(Succeed()) - - Expect(work).NotTo(BeNil()) - Expect(work.Spec.Workload).NotTo(BeNil()) - Expect(len(work.Spec.Workload.Manifests)).To(Equal(1)) - - newWorkStatus := workv1.ManifestWorkStatus{ - ResourceStatus: workv1.ManifestResourceStatus{ - Manifests: []workv1.ManifestCondition{ - { - Conditions: []metav1.Condition{ - { - Type: "Applied", - Status: metav1.ConditionTrue, - }, - }, - }, - }, - }, - } - - // update the work status to make sure the resource status is updated - Expect(updateWorkStatus(ctx, agentWorkClient, work, newWorkStatus)).To(Succeed()) - - // after the instance ("cluster") is stale, the current instance ("maestro") will take over the consumer - // and resync status, then the resource status will be updated finally - Eventually(func() error { - newRes, err := resourceService.Get(ctx, res.ID) - if err != nil { - return err - } - if newRes.Status == nil || len(newRes.Status) == 0 { - return fmt.Errorf("resource status is empty") - } - return nil - }, 10*time.Second, 1*time.Second).Should(Succeed()) - - newRes, err := resourceService.Get(ctx, res.ID) - Expect(err).NotTo(HaveOccurred(), "Error getting resource: %v", err) - Expect(newRes.Version).To(Equal(res.Version)) -}