From 506e87787a77e87718226bdfadcca8cd80983c86 Mon Sep 17 00:00:00 2001 From: anthdm Date: Fri, 12 Jan 2024 22:09:45 +0100 Subject: [PATCH] Polishing cluster --- cluster/activator.go | 28 +++++++----------- cluster/agent.go | 12 ++++---- cluster/cluster.go | 64 ++++++++++++++++++----------------------- cluster/cluster_test.go | 23 ++++++++------- cluster/kind.go | 22 ++++++++++++-- 5 files changed, 74 insertions(+), 75 deletions(-) diff --git a/cluster/activator.go b/cluster/activator.go index 0725afe..ed7e33d 100644 --- a/cluster/activator.go +++ b/cluster/activator.go @@ -4,32 +4,24 @@ import ( "math/rand" ) -// ActivationStrategy is an interface that abstracts the logic on what member of the -// cluster the actor will be activated. Members passed into this function are guaranteed -// to have the given kind locally registered. If no member could be selected nil should be -// returned. -type ActivationStrategy interface { - ActivateOnMember(ActivationDetails) *Member -} +// ActivateOnMemberFunc will be invoked by the member that called cluster.Activate. +// Given the ActivationDetails the actor will be spawned on the returned member. +// +// Not that if no member could be selected nil should be returned. +type ActivateOnMemberFunc func(ActivationDetails) *Member // ActivationDetails holds detailed information about an activation. type ActivationDetails struct { // Region where the actor should be activated on Region string - // A slice of filtered members by the kind that needs to be activated - // Members are guaranteed to never by empty. + // A slice of members that are pre-filtered by the kind of the actor + // that need to be activated Members []*Member - // The kind that needs to be activated + // The kind of the actor Kind string } -type defaultActivationStrategy struct{} - -// NewDefaultActivationStrategy selects a random member in the cluster. -func NewDefaultActivationStrategy() defaultActivationStrategy { - return defaultActivationStrategy{} -} - -func (defaultActivationStrategy) ActivateOnMember(details ActivationDetails) *Member { +// ActivateOnRandomMember returns a random member of the cluster. +func ActivateOnRandomMember(details ActivationDetails) *Member { return details.Members[rand.Intn(len(details.Members))] } diff --git a/cluster/agent.go b/cluster/agent.go index 3d38226..04a5fa4 100644 --- a/cluster/agent.go +++ b/cluster/agent.go @@ -10,9 +10,7 @@ import ( type ( activate struct { - kind string - id string - region string + details ActivationDetails } getMembers struct{} getKinds struct{} @@ -60,7 +58,7 @@ func (a *Agent) Receive(c *actor.Context) { case *Activation: a.handleActivation(msg) case activate: - pid := a.activate(msg.kind, msg.id, msg.region) + pid := a.activate(msg.details) c.Respond(pid) case deactivate: a.bcast(&Deactivation{PID: msg.pid}) @@ -117,10 +115,10 @@ func (a *Agent) handleActivationRequest(msg *ActivationRequest) *ActivationRespo return resp } -func (a *Agent) activate(kind, id, region string) *actor.PID { - members := a.members.FilterByKind(kind) +func (a *Agent) activate(details ActivationDetails) *actor.PID { + members := a.members.FilterByKind(details.Kind) if len(members) == 0 { - slog.Warn("could not find any members with kind", "kind", kind) + slog.Warn("could not find any members with kind", "kind", details.Kind) return nil } owner := a.cluster.config.activationStrategy.ActivateOnMember(ActivationDetails{ diff --git a/cluster/cluster.go b/cluster/cluster.go index dca35ce..cdbfb2b 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -22,24 +22,22 @@ type Producer func(c *Cluster) actor.Producer // Config holds the cluster configuration type Config struct { - listenAddr string - id string - region string - activationStrategy ActivationStrategy - engine *actor.Engine - provider Producer - requestTimeout time.Duration + listenAddr string + id string + region string + engine *actor.Engine + provider Producer + requestTimeout time.Duration } // NewConfig returns a Config that is initialized with default values. func NewConfig() Config { return Config{ - listenAddr: getRandomListenAddr(), - id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)), - region: "default", - activationStrategy: NewDefaultActivationStrategy(), - provider: NewSelfManagedProvider(NewSelfManagedConfig()), - requestTimeout: defaultRequestTimeout, + listenAddr: getRandomListenAddr(), + id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)), + region: "default", + provider: NewSelfManagedProvider(NewSelfManagedConfig()), + requestTimeout: defaultRequestTimeout, } } @@ -71,14 +69,6 @@ func (config Config) WithEngine(e *actor.Engine) Config { return config } -// TODO: Still not convinced about the name "ActivationStrategy". -// TODO: Document this more. -// WithActivationStrategy -func (config Config) WithActivationStrategy(s ActivationStrategy) Config { - config.activationStrategy = s - return config -} - // WithListenAddr set's the listen address of the underlying remote. // // Defaults to a random port number. @@ -160,6 +150,7 @@ func (c *Cluster) Spawn(p actor.Producer, id string, opts ...actor.OptFunc) *act return pid } +// ActivationConfig... type ActivationConfig struct { id string region string @@ -181,7 +172,7 @@ func (config ActivationConfig) WithID(id string) ActivationConfig { return config } -// WithRegion set's the region on where this actor (potentially) will be spawned +// WithRegion set's the region on where this actor should be spawned. // // Defaults to a "default". func (config ActivationConfig) WithRegion(region string) ActivationConfig { @@ -189,8 +180,11 @@ func (config ActivationConfig) WithRegion(region string) ActivationConfig { return config } -// Activate actives the given actor kind with an optional id. If there is no id -// given, the engine will create an unique id automatically. +// Activate actives the registered kind in the cluster based on the given config. +// The actor does not need to be registered locally on the member if at least one +// member has that kind registered. +// +// playerPID := cluster.Activate("player", cluster.NewActivationConfig()) func (c *Cluster) Activate(kind string, config ActivationConfig) *actor.PID { msg := activate{ kind: kind, @@ -215,23 +209,21 @@ func (c *Cluster) Deactivate(pid *actor.PID) { c.engine.Send(c.agentPID, deactivate{pid: pid}) } -// RegisterKind registers a new actor/receiver kind that can be spawned from any node -// on the cluster. -// NOTE: Kinds can only be registered if the cluster is not running. -func (c *Cluster) RegisterKind(name string, producer actor.Producer, config *KindConfig) { +// RegisterKind registers a new actor that can be activated from any member +// in the cluster. +// +// cluster.Register("player", NewPlayer, NewKindConfig()) +// +// NOTE: Kinds can only be registered before the cluster is started. +func (c *Cluster) RegisterKind(kind string, producer actor.Producer, config KindConfig) { if c.isStarted { - slog.Warn("trying to register new kind on a running cluster") + slog.Warn("failed to register kind", "reason", "cluster already started", "kind", kind) return } - if config == nil { - config = &KindConfig{} - } - kind := newKind(name, producer, *config) - c.kinds = append(c.kinds, kind) + c.kinds = append(c.kinds, newKind(kind, producer, config)) } -// HasLocalKind returns true if this members of the cluster has the kind -// locally registered. +// HasKindLocal returns true whether the node of the cluster has the kind locally registered. func (c *Cluster) HasKindLocal(name string) bool { for _, kind := range c.kinds { if kind.name == name { diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 9927fe3..91309c2 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -10,6 +10,7 @@ import ( "github.com/anthdm/hollywood/actor" "github.com/anthdm/hollywood/remote" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type Player struct{} @@ -28,11 +29,11 @@ func NewInventory() actor.Receiver { func (i Inventory) Receive(c *actor.Context) {} -func TestFooBarBaz(t *testing.T) { - config := NewConfig() - cluster, err := New(config) - assert.Nil(t, err) - _ = cluster +func TestClusterActivationOnMemberFunc(t *testing.T) { + c, err := New(NewConfig()) + require.Nil(t, err) + + c.RegisterKind("player", NewPlayer, NewKindConfig()) } func TestClusterShouldWorkWithDefaultValues(t *testing.T) { @@ -45,8 +46,8 @@ func TestClusterShouldWorkWithDefaultValues(t *testing.T) { func TestRegisterKind(t *testing.T) { c := makeCluster(t, getRandomLocalhostAddr(), "A", "eu-west") - c.RegisterKind("player", NewPlayer, nil) - c.RegisterKind("inventory", NewInventory, nil) + c.RegisterKind("player", NewPlayer, NewKindConfig()) + c.RegisterKind("inventory", NewInventory, NewKindConfig()) assert.True(t, c.HasKindLocal("player")) assert.True(t, c.HasKindLocal("inventory")) } @@ -94,7 +95,7 @@ func TestClusterSpawn(t *testing.T) { func TestMemberJoin(t *testing.T) { c1 := makeCluster(t, getRandomLocalhostAddr(), "A", "eu-west") c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") - c2.RegisterKind("player", NewPlayer, nil) + c2.RegisterKind("player", NewPlayer, NewKindConfig()) wg := sync.WaitGroup{} wg.Add(1) @@ -127,7 +128,7 @@ func TestActivate(t *testing.T) { c1 = makeCluster(t, addr, "A", "eu-west") c2 = makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") ) - c2.RegisterKind("player", NewPlayer, nil) + c2.RegisterKind("player", NewPlayer, NewKindConfig()) expectedPID := actor.NewPID(c2.engine.Address(), "player/1") wg := sync.WaitGroup{} @@ -163,7 +164,7 @@ func TestDeactivate(t *testing.T) { addr := getRandomLocalhostAddr() c1 := makeCluster(t, addr, "A", "eu-west") c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") - c2.RegisterKind("player", NewPlayer, nil) + c2.RegisterKind("player", NewPlayer, NewKindConfig()) expectedPID := actor.NewPID(c2.engine.Address(), "player/1") wg := sync.WaitGroup{} @@ -212,7 +213,7 @@ func TestMemberLeave(t *testing.T) { assert.Nil(t, err) c1 := makeCluster(t, c1Addr, "A", "eu-west") - c2.RegisterKind("player", NewPlayer, nil) + c2.RegisterKind("player", NewPlayer, NewKindConfig()) c1.Start() wg := sync.WaitGroup{} diff --git a/cluster/kind.go b/cluster/kind.go index 7a58809..87b4e07 100644 --- a/cluster/kind.go +++ b/cluster/kind.go @@ -2,10 +2,26 @@ package cluster import "github.com/anthdm/hollywood/actor" -// KindConfig holds the Kind configuration -type KindConfig struct{} +// KindConfig holds configuration for a registered kind. +type KindConfig struct { + activateOnMember ActivateOnMemberFunc +} + +// NewKindConfig returns a default kind configuration. +func NewKindConfig() KindConfig { + return KindConfig{ + activateOnMember: ActivateOnRandomMember, + } +} + +// WithActivateOnMemberFunc set the function that will be used to select the member +// where this kind will be spawned/activated on. +func (config KindConfig) WithActivateOnMemberFunc(fun ActivateOnMemberFunc) KindConfig { + config.activateOnMember = fun + return config +} -// A kind is a type of actor that can be activate on a node. +// A kind is a type of actor that can be activated from any member of the cluster. type kind struct { config KindConfig name string