diff --git a/cluster/activation.go b/cluster/activation.go new file mode 100644 index 0000000..faf3b3f --- /dev/null +++ b/cluster/activation.go @@ -0,0 +1,67 @@ +package cluster + +import ( + fmt "fmt" + "math" + "math/rand" +) + +// ActivationConfig... +type ActivationConfig struct { + id string + region string + selectMember SelectMemberFunc +} + +// NewActivationConfig returns a new default config. +func NewActivationConfig() ActivationConfig { + return ActivationConfig{ + id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)), + region: "default", + selectMember: SelectRandomMember, + } +} + +// WithSelectMemberFunc set's the fuction that will be invoked during +// the activation process. +// It will select the member where the actor will be activated/spawned on. +func (config ActivationConfig) WithSelectMemberFunc(fun SelectMemberFunc) ActivationConfig { + config.selectMember = fun + return config +} + +// WithID set's the id of the actor that will be activated on the cluster. +// +// Defaults to a random identifier. +func (config ActivationConfig) WithID(id string) ActivationConfig { + config.id = id + return config +} + +// WithRegion set's the region on where this actor should be spawned. +// +// Defaults to a "default". +func (config ActivationConfig) WithRegion(region string) ActivationConfig { + config.region = region + return config +} + +// SelectMemberFunc will be invoked during the activation process. +// Given the ActivationDetails the actor will be spawned on the returned member. +type SelectMemberFunc func(ActivationDetails) *Member + +// ActivationDetails holds detailed information about an activation. +type ActivationDetails struct { + // Region where the actor should be activated on + Region string + // A slice of members that are pre-filtered by the kind of the actor + // that need to be activated + Members []*Member + // The kind of the actor + Kind string +} + +// SelectRandomMember selects a random member of the cluster. +func SelectRandomMember(details ActivationDetails) *Member { + return details.Members[rand.Intn(len(details.Members))] +} diff --git a/cluster/activator.go b/cluster/activator.go deleted file mode 100644 index 0725afe..0000000 --- a/cluster/activator.go +++ /dev/null @@ -1,35 +0,0 @@ -package cluster - -import ( - "math/rand" -) - -// ActivationStrategy is an interface that abstracts the logic on what member of the -// cluster the actor will be activated. Members passed into this function are guaranteed -// to have the given kind locally registered. If no member could be selected nil should be -// returned. -type ActivationStrategy interface { - ActivateOnMember(ActivationDetails) *Member -} - -// ActivationDetails holds detailed information about an activation. -type ActivationDetails struct { - // Region where the actor should be activated on - Region string - // A slice of filtered members by the kind that needs to be activated - // Members are guaranteed to never by empty. - Members []*Member - // The kind that needs to be activated - Kind string -} - -type defaultActivationStrategy struct{} - -// NewDefaultActivationStrategy selects a random member in the cluster. -func NewDefaultActivationStrategy() defaultActivationStrategy { - return defaultActivationStrategy{} -} - -func (defaultActivationStrategy) ActivateOnMember(details ActivationDetails) *Member { - return details.Members[rand.Intn(len(details.Members))] -} diff --git a/cluster/agent.go b/cluster/agent.go index 3d38226..455fb3e 100644 --- a/cluster/agent.go +++ b/cluster/agent.go @@ -11,8 +11,7 @@ import ( type ( activate struct { kind string - id string - region string + config ActivationConfig } getMembers struct{} getKinds struct{} @@ -60,7 +59,7 @@ func (a *Agent) Receive(c *actor.Context) { case *Activation: a.handleActivation(msg) case activate: - pid := a.activate(msg.kind, msg.id, msg.region) + pid := a.activate(msg.kind, msg.config) c.Respond(pid) case deactivate: a.bcast(&Deactivation{PID: msg.pid}) @@ -117,27 +116,30 @@ func (a *Agent) handleActivationRequest(msg *ActivationRequest) *ActivationRespo return resp } -func (a *Agent) activate(kind, id, region string) *actor.PID { +func (a *Agent) activate(kind string, config ActivationConfig) *actor.PID { members := a.members.FilterByKind(kind) if len(members) == 0 { slog.Warn("could not find any members with kind", "kind", kind) return nil } - owner := a.cluster.config.activationStrategy.ActivateOnMember(ActivationDetails{ + if config.selectMember == nil { + config.selectMember = SelectRandomMember + } + memberPID := config.selectMember(ActivationDetails{ Members: members, - Region: region, + Region: config.region, Kind: kind, }) - if owner == nil { + if memberPID == nil { slog.Warn("activator did not found a member to activate on") return nil } - req := &ActivationRequest{Kind: kind, ID: id} - activatorPID := actor.NewPID(owner.Host, "cluster/"+owner.ID) + req := &ActivationRequest{Kind: kind, ID: config.id} + activatorPID := actor.NewPID(memberPID.Host, "cluster/"+memberPID.ID) var activationResp *ActivationResponse // Local activation - if owner.Host == a.cluster.engine.Address() { + if memberPID.Host == a.cluster.engine.Address() { activationResp = a.handleActivationRequest(req) } else { // Remote activation diff --git a/cluster/cluster.go b/cluster/cluster.go index dca35ce..d3cc91d 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -22,24 +22,22 @@ type Producer func(c *Cluster) actor.Producer // Config holds the cluster configuration type Config struct { - listenAddr string - id string - region string - activationStrategy ActivationStrategy - engine *actor.Engine - provider Producer - requestTimeout time.Duration + listenAddr string + id string + region string + engine *actor.Engine + provider Producer + requestTimeout time.Duration } // NewConfig returns a Config that is initialized with default values. func NewConfig() Config { return Config{ - listenAddr: getRandomListenAddr(), - id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)), - region: "default", - activationStrategy: NewDefaultActivationStrategy(), - provider: NewSelfManagedProvider(NewSelfManagedConfig()), - requestTimeout: defaultRequestTimeout, + listenAddr: getRandomListenAddr(), + id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)), + region: "default", + provider: NewSelfManagedProvider(NewSelfManagedConfig()), + requestTimeout: defaultRequestTimeout, } } @@ -71,14 +69,6 @@ func (config Config) WithEngine(e *actor.Engine) Config { return config } -// TODO: Still not convinced about the name "ActivationStrategy". -// TODO: Document this more. -// WithActivationStrategy -func (config Config) WithActivationStrategy(s ActivationStrategy) Config { - config.activationStrategy = s - return config -} - // WithListenAddr set's the listen address of the underlying remote. // // Defaults to a random port number. @@ -160,42 +150,15 @@ func (c *Cluster) Spawn(p actor.Producer, id string, opts ...actor.OptFunc) *act return pid } -type ActivationConfig struct { - id string - region string -} - -// NewActivationConfig returns a new default config. -func NewActivationConfig() ActivationConfig { - return ActivationConfig{ - id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)), - region: "default", - } -} - -// WithID set's the id of the actor that will be activated on the cluster. +// Activate actives the registered kind in the cluster based on the given config. +// The actor does not need to be registered locally on the member if at least one +// member has that kind registered. // -// Defaults to a random identifier. -func (config ActivationConfig) WithID(id string) ActivationConfig { - config.id = id - return config -} - -// WithRegion set's the region on where this actor (potentially) will be spawned -// -// Defaults to a "default". -func (config ActivationConfig) WithRegion(region string) ActivationConfig { - config.region = region - return config -} - -// Activate actives the given actor kind with an optional id. If there is no id -// given, the engine will create an unique id automatically. +// playerPID := cluster.Activate("player", cluster.NewActivationConfig()) func (c *Cluster) Activate(kind string, config ActivationConfig) *actor.PID { msg := activate{ kind: kind, - id: config.id, - region: config.region, + config: config, } resp, err := c.engine.Request(c.agentPID, msg, c.config.requestTimeout).Result() if err != nil { @@ -215,23 +178,21 @@ func (c *Cluster) Deactivate(pid *actor.PID) { c.engine.Send(c.agentPID, deactivate{pid: pid}) } -// RegisterKind registers a new actor/receiver kind that can be spawned from any node -// on the cluster. -// NOTE: Kinds can only be registered if the cluster is not running. -func (c *Cluster) RegisterKind(name string, producer actor.Producer, config *KindConfig) { +// RegisterKind registers a new actor that can be activated from any member +// in the cluster. +// +// cluster.Register("player", NewPlayer, NewKindConfig()) +// +// NOTE: Kinds can only be registered before the cluster is started. +func (c *Cluster) RegisterKind(kind string, producer actor.Producer, config KindConfig) { if c.isStarted { - slog.Warn("trying to register new kind on a running cluster") + slog.Warn("failed to register kind", "reason", "cluster already started", "kind", kind) return } - if config == nil { - config = &KindConfig{} - } - kind := newKind(name, producer, *config) - c.kinds = append(c.kinds, kind) + c.kinds = append(c.kinds, newKind(kind, producer, config)) } -// HasLocalKind returns true if this members of the cluster has the kind -// locally registered. +// HasKindLocal returns true whether the node of the cluster has the kind locally registered. func (c *Cluster) HasKindLocal(name string) bool { for _, kind := range c.kinds { if kind.name == name { diff --git a/cluster/cluster_test.go b/cluster/cluster_test.go index 9927fe3..600e607 100644 --- a/cluster/cluster_test.go +++ b/cluster/cluster_test.go @@ -1,15 +1,18 @@ package cluster import ( + "context" "fmt" "log" "math/rand" "sync" "testing" + "time" "github.com/anthdm/hollywood/actor" "github.com/anthdm/hollywood/remote" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type Player struct{} @@ -28,11 +31,56 @@ func NewInventory() actor.Receiver { func (i Inventory) Receive(c *actor.Context) {} -func TestFooBarBaz(t *testing.T) { - config := NewConfig() - cluster, err := New(config) - assert.Nil(t, err) - _ = cluster +func TestClusterSelectMemberFunc(t *testing.T) { + c1, err := New(NewConfig().WithID("A")) + require.Nil(t, err) + c2, err := New(NewConfig().WithID("B")) + require.Nil(t, err) + c3, err := New(NewConfig().WithID("C")) + require.Nil(t, err) + + c1.RegisterKind("player", NewPlayer, NewKindConfig()) + c2.RegisterKind("player", NewPlayer, NewKindConfig()) + c3.RegisterKind("player", NewPlayer, NewKindConfig()) + + c1.Start() + c2.Start() + c3.Start() + + selectMember := func(details ActivationDetails) *Member { + for _, member := range details.Members { + if member.ID == "C" { + return member + } + } + return nil + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + eventPID := c1.Engine().SpawnFunc(func(c *actor.Context) { + switch msg := c.Message().(type) { + case ActivationEvent: + // test that we spawned on member C + require.Equal(t, c3.Address(), msg.PID.Address) + cancel() + case MemberJoinEvent: + if msg.Member.ID == "C" { + // Wait till member C is online before activating + // Activate the actor from member A + // Which should spawn the actor on member C + config := NewActivationConfig().WithSelectMemberFunc(selectMember) + c1.Activate("cancel_receiver", config) + } + } + }, "event") + c1.Engine().Subscribe(eventPID) + defer c1.Engine().Unsubscribe(eventPID) + + <-ctx.Done() + require.Equal(t, context.DeadlineExceeded, ctx.Err()) + c1.Stop().Wait() + c2.Stop().Wait() + c3.Stop().Wait() } func TestClusterShouldWorkWithDefaultValues(t *testing.T) { @@ -45,8 +93,8 @@ func TestClusterShouldWorkWithDefaultValues(t *testing.T) { func TestRegisterKind(t *testing.T) { c := makeCluster(t, getRandomLocalhostAddr(), "A", "eu-west") - c.RegisterKind("player", NewPlayer, nil) - c.RegisterKind("inventory", NewInventory, nil) + c.RegisterKind("player", NewPlayer, NewKindConfig()) + c.RegisterKind("inventory", NewInventory, NewKindConfig()) assert.True(t, c.HasKindLocal("player")) assert.True(t, c.HasKindLocal("inventory")) } @@ -94,7 +142,7 @@ func TestClusterSpawn(t *testing.T) { func TestMemberJoin(t *testing.T) { c1 := makeCluster(t, getRandomLocalhostAddr(), "A", "eu-west") c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") - c2.RegisterKind("player", NewPlayer, nil) + c2.RegisterKind("player", NewPlayer, NewKindConfig()) wg := sync.WaitGroup{} wg.Add(1) @@ -127,7 +175,7 @@ func TestActivate(t *testing.T) { c1 = makeCluster(t, addr, "A", "eu-west") c2 = makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") ) - c2.RegisterKind("player", NewPlayer, nil) + c2.RegisterKind("player", NewPlayer, NewKindConfig()) expectedPID := actor.NewPID(c2.engine.Address(), "player/1") wg := sync.WaitGroup{} @@ -163,7 +211,7 @@ func TestDeactivate(t *testing.T) { addr := getRandomLocalhostAddr() c1 := makeCluster(t, addr, "A", "eu-west") c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west") - c2.RegisterKind("player", NewPlayer, nil) + c2.RegisterKind("player", NewPlayer, NewKindConfig()) expectedPID := actor.NewPID(c2.engine.Address(), "player/1") wg := sync.WaitGroup{} @@ -212,7 +260,7 @@ func TestMemberLeave(t *testing.T) { assert.Nil(t, err) c1 := makeCluster(t, c1Addr, "A", "eu-west") - c2.RegisterKind("player", NewPlayer, nil) + c2.RegisterKind("player", NewPlayer, NewKindConfig()) c1.Start() wg := sync.WaitGroup{} diff --git a/cluster/kind.go b/cluster/kind.go index 7a58809..a39af31 100644 --- a/cluster/kind.go +++ b/cluster/kind.go @@ -2,10 +2,15 @@ package cluster import "github.com/anthdm/hollywood/actor" -// KindConfig holds the Kind configuration +// KindConfig holds configuration for a registered kind. type KindConfig struct{} -// A kind is a type of actor that can be activate on a node. +// NewKindConfig returns a default kind configuration. +func NewKindConfig() KindConfig { + return KindConfig{} +} + +// A kind is a type of actor that can be activated from any member of the cluster. type kind struct { config KindConfig name string diff --git a/examples/cluster/member_1/main.go b/examples/cluster/member_1/main.go index 2174f35..4628d7e 100644 --- a/examples/cluster/member_1/main.go +++ b/examples/cluster/member_1/main.go @@ -20,7 +20,7 @@ func main() { if err != nil { log.Fatal(err) } - c.RegisterKind("playerSession", shared.NewPlayer, nil) + c.RegisterKind("playerSession", shared.NewPlayer, cluster.NewKindConfig()) eventPID := c.Engine().SpawnFunc(func(ctx *actor.Context) { switch msg := ctx.Message().(type) { diff --git a/examples/cluster/member_2/main.go b/examples/cluster/member_2/main.go index 5c839a8..c3a4b08 100644 --- a/examples/cluster/member_2/main.go +++ b/examples/cluster/member_2/main.go @@ -30,7 +30,7 @@ func main() { }, "event") c.Engine().Subscribe(eventPID) - c.RegisterKind("playerSession", shared.NewPlayer, nil) + c.RegisterKind("playerSession", shared.NewPlayer, cluster.NewKindConfig()) c.Start() select {} }