Skip to content

Commit

Permalink
support configuration for consistent hash. (#192)
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <lcao@redhat.com>
  • Loading branch information
morvencao authored Sep 11, 2024
1 parent 9dd7f8e commit 5ccb4ea
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 13 deletions.
3 changes: 2 additions & 1 deletion cmd/maestro/server/pulse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ func NewPulseServer(eventBroadcaster *event.EventBroadcaster) EventServer {
case config.SharedSubscriptionType:
statusDispatcher = dispatcher.NewNoopDispatcher(dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource)
case config.BroadcastSubscriptionType:
statusDispatcher = dispatcher.NewHashDispatcher(env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&env().Database.SessionFactory), dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource)
statusDispatcher = dispatcher.NewHashDispatcher(env().Config.MessageBroker.ClientID, dao.NewInstanceDao(&env().Database.SessionFactory),
dao.NewConsumerDao(&env().Database.SessionFactory), env().Clients.CloudEventsSource, env().Config.PulseServer.ConsistentHashConfig)
default:
glog.Fatalf("Unsupported subscription type: %s", env().Config.PulseServer.SubscriptionType)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewApplicationConfig() *ApplicationConfig {
GRPCServer: NewGRPCServerConfig(),
Metrics: NewMetricsConfig(),
HealthCheck: NewHealthCheckConfig(),
PulseServer: MewPulseServerConfig(),
PulseServer: NewPulseServerConfig(),
Database: NewDatabaseConfig(),
MessageBroker: NewMessageBrokerConfig(),
OCM: NewOCMConfig(),
Expand Down
50 changes: 43 additions & 7 deletions pkg/config/pulse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,36 @@ const (

// PulseServerConfig contains the configuration for the maestro pulse server.
type PulseServerConfig struct {
PulseInterval int64 `json:"pulse_interval"`
SubscriptionType string `json:"subscription_type"`
PulseInterval int64 `json:"pulse_interval"`
SubscriptionType string `json:"subscription_type"`
ConsistentHashConfig *ConsistentHashConfig `json:"consistent_hash_config"`
}

// ConsistentHashConfig contains the configuration for the consistent hashing algorithm.
type ConsistentHashConfig struct {
PartitionCount int `json:"partition_count"`
ReplicationFactor int `json:"replication_factor"`
Load float64 `json:"load"`
}

// NewPulseServerConfig creates a new PulseServerConfig with default 15 second pulse interval.
func MewPulseServerConfig() *PulseServerConfig {
func NewPulseServerConfig() *PulseServerConfig {
return &PulseServerConfig{
PulseInterval: 15,
SubscriptionType: "shared",
PulseInterval: 15,
SubscriptionType: "shared",
ConsistentHashConfig: NewConsistentHashConfig(),
}
}

// NewConsistentHashConfig creates a new ConsistentHashConfig with default values.
// - PartitionCount: 7
// - ReplicationFactor: 20
// - Load: 1.25
func NewConsistentHashConfig() *ConsistentHashConfig {
return &ConsistentHashConfig{
PartitionCount: 7,
ReplicationFactor: 20,
Load: 1.25,
}
}

Expand All @@ -31,11 +52,26 @@ func MewPulseServerConfig() *PulseServerConfig {
// - "subscription-type" specifies the subscription type for resource status updates from message broker, either "shared" or "broadcast".
// "shared" subscription type uses MQTT feature to ensure only one Maestro instance receives resource status messages.
// "broadcast" subscription type will make all Maestro instances to receive resource status messages and hash the message to determine which instance should process it.
// If subscription type is "broadcast", ConsistentHashConfig settings can be configured for the hashing algorithm.
func (c *PulseServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.Int64Var(&c.PulseInterval, "pulse-interval", c.PulseInterval, "Sets the pulse interval for maestro instances (seconds) to indicate liveness (default: 10 seconds)")
fs.StringVar(&c.SubscriptionType, "subscription-type", c.SubscriptionType, "Sets the subscription type for resource status updates from message broker, Options: \"shared\" (only one instance receives resource status message, MQTT feature ensures exclusivity) or \"broadcast\" (all instances receive messages, hashed to determine processing instance) (default: \"shared\")")
fs.Int64Var(&c.PulseInterval, "pulse-interval", c.PulseInterval, "Sets the pulse interval for maestro instances (seconds) to indicate liveness")
fs.StringVar(&c.SubscriptionType, "subscription-type", c.SubscriptionType, "Sets the subscription type for resource status updates from message broker, Options: \"shared\" (only one instance receives resource status message, MQTT feature ensures exclusivity) or \"broadcast\" (all instances receive messages, hashed to determine processing instance)")
c.ConsistentHashConfig.AddFlags(fs)
}

func (c *PulseServerConfig) ReadFiles() error {
c.ConsistentHashConfig.ReadFiles()
return nil
}

// AddFlags configures the ConsistentHashConfig with command line flags. Only take effect when subscription type is "broadcast".
// It allows users to customize the partition count, replication factor, and load for the consistent hashing algorithm.
func (c *ConsistentHashConfig) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&c.PartitionCount, "consistent-hash-partition-count", c.PartitionCount, "Sets the partition count for consistent hashing algorithm, select a big PartitionCount for more consumers. only take effect when subscription type is \"broadcast\"")
fs.IntVar(&c.ReplicationFactor, "consistent-hash-replication-factor", c.ReplicationFactor, "Sets the replication factor for maestro instances to be replicated on consistent hash ring. only take effect when subscription type is \"broadcast\"")
fs.Float64Var(&c.Load, "consistent-hash-load", c.Load, "Sets the load for consistent hashing algorithm, only take effect when subscription type is \"broadcast\"")
}

func (c *ConsistentHashConfig) ReadFiles() error {
return nil
}
83 changes: 83 additions & 0 deletions pkg/config/pulse_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package config

import (
"reflect"
"testing"

"github.com/spf13/pflag"
)

func TestPulseServerConfig(t *testing.T) {
cases := []struct {
name string
input map[string]string
want *PulseServerConfig
}{
{
name: "default subscription type",
input: map[string]string{},
want: &PulseServerConfig{
PulseInterval: 15,
SubscriptionType: "shared",
ConsistentHashConfig: &ConsistentHashConfig{
PartitionCount: 7,
ReplicationFactor: 20,
Load: 1.25,
},
},
},
{
name: "broadcast subscription type",
input: map[string]string{
"subscription-type": "broadcast",
},
want: &PulseServerConfig{
PulseInterval: 15,
SubscriptionType: "broadcast",
ConsistentHashConfig: &ConsistentHashConfig{
PartitionCount: 7,
ReplicationFactor: 20,
Load: 1.25,
},
},
},
{
name: "custom consistent hash config",
input: map[string]string{
"subscription-type": "broadcast",
"consistent-hash-partition-count": "10",
"consistent-hash-replication-factor": "30",
"consistent-hash-load": "1.5",
},
want: &PulseServerConfig{
PulseInterval: 15,
SubscriptionType: "broadcast",
ConsistentHashConfig: &ConsistentHashConfig{
PartitionCount: 10,
ReplicationFactor: 30,
Load: 1.5,
},
},
},
}

config := NewPulseServerConfig()
pflag.NewFlagSet("test", pflag.ContinueOnError)
fs := pflag.CommandLine
config.AddFlags(fs)
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
// set flags
for key, value := range tc.input {
fs.Set(key, value)
}
if !reflect.DeepEqual(config, tc.want) {
t.Errorf("NewPulseServerConfig() = %v; want %v", config, tc.want)
}
// clear flags
fs.VisitAll(func(f *pflag.Flag) {
fs.Lookup(f.Name).Changed = false
})
})
}
}
9 changes: 5 additions & 4 deletions pkg/dispatcher/hash_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
mapset "github.com/deckarep/golang-set/v2"
"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/config"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/pkg/logger"
"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -32,7 +33,7 @@ type HashDispatcher struct {
consistent *consistent.Consistent
}

func NewHashDispatcher(instanceID string, instanceDao dao.InstanceDao, consumerDao dao.ConsumerDao, sourceClient cloudevents.SourceClient) *HashDispatcher {
func NewHashDispatcher(instanceID string, instanceDao dao.InstanceDao, consumerDao dao.ConsumerDao, sourceClient cloudevents.SourceClient, consistentHashingConfig *config.ConsistentHashConfig) *HashDispatcher {
return &HashDispatcher{
instanceID: instanceID,
instanceDao: instanceDao,
Expand All @@ -41,9 +42,9 @@ func NewHashDispatcher(instanceID string, instanceDao dao.InstanceDao, consumerD
consumerSet: mapset.NewSet[string](),
workQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "hash-dispatcher"),
consistent: consistent.New(nil, consistent.Config{
PartitionCount: 7, // consumer IDs are distributed among partitions, select a big PartitionCount for more consumers.
ReplicationFactor: 20, // the numbers for maestro instances to be replicated on consistent hash ring.
Load: 1.25, // Load is used to calculate average load, 1.25 is reasonable for most cases.
PartitionCount: consistentHashingConfig.PartitionCount,
ReplicationFactor: consistentHashingConfig.ReplicationFactor,
Load: consistentHashingConfig.Load,
Hasher: hasher{},
}),
}
Expand Down

0 comments on commit 5ccb4ea

Please sign in to comment.