Skip to content

Commit

Permalink
Polishing cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm committed Jan 12, 2024
1 parent 4c07bc4 commit 506e877
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 75 deletions.
28 changes: 10 additions & 18 deletions cluster/activator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,24 @@ import (
"math/rand"
)

// ActivationStrategy is an interface that abstracts the logic on what member of the
// cluster the actor will be activated. Members passed into this function are guaranteed
// to have the given kind locally registered. If no member could be selected nil should be
// returned.
type ActivationStrategy interface {
ActivateOnMember(ActivationDetails) *Member
}
// ActivateOnMemberFunc will be invoked by the member that called cluster.Activate.
// Given the ActivationDetails the actor will be spawned on the returned member.
//
// Not that if no member could be selected nil should be returned.
type ActivateOnMemberFunc func(ActivationDetails) *Member

// ActivationDetails holds detailed information about an activation.
type ActivationDetails struct {
// Region where the actor should be activated on
Region string
// A slice of filtered members by the kind that needs to be activated
// Members are guaranteed to never by empty.
// A slice of members that are pre-filtered by the kind of the actor
// that need to be activated
Members []*Member
// The kind that needs to be activated
// The kind of the actor
Kind string
}

type defaultActivationStrategy struct{}

// NewDefaultActivationStrategy selects a random member in the cluster.
func NewDefaultActivationStrategy() defaultActivationStrategy {
return defaultActivationStrategy{}
}

func (defaultActivationStrategy) ActivateOnMember(details ActivationDetails) *Member {
// ActivateOnRandomMember returns a random member of the cluster.
func ActivateOnRandomMember(details ActivationDetails) *Member {
return details.Members[rand.Intn(len(details.Members))]
}
12 changes: 5 additions & 7 deletions cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import (

type (
activate struct {
kind string
id string
region string
details ActivationDetails
}
getMembers struct{}
getKinds struct{}
Expand Down Expand Up @@ -60,7 +58,7 @@ func (a *Agent) Receive(c *actor.Context) {
case *Activation:
a.handleActivation(msg)
case activate:
pid := a.activate(msg.kind, msg.id, msg.region)
pid := a.activate(msg.details)
c.Respond(pid)
case deactivate:
a.bcast(&Deactivation{PID: msg.pid})
Expand Down Expand Up @@ -117,10 +115,10 @@ func (a *Agent) handleActivationRequest(msg *ActivationRequest) *ActivationRespo
return resp
}

func (a *Agent) activate(kind, id, region string) *actor.PID {
members := a.members.FilterByKind(kind)
func (a *Agent) activate(details ActivationDetails) *actor.PID {
members := a.members.FilterByKind(details.Kind)
if len(members) == 0 {
slog.Warn("could not find any members with kind", "kind", kind)
slog.Warn("could not find any members with kind", "kind", details.Kind)
return nil
}
owner := a.cluster.config.activationStrategy.ActivateOnMember(ActivationDetails{

Check failure on line 124 in cluster/agent.go

View workflow job for this annotation

GitHub Actions / build

a.cluster.config.activationStrategy undefined (type Config has no field or method activationStrategy)

Check failure on line 124 in cluster/agent.go

View workflow job for this annotation

GitHub Actions / build

a.cluster.config.activationStrategy undefined (type Config has no field or method activationStrategy)
Expand Down
64 changes: 28 additions & 36 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,22 @@ type Producer func(c *Cluster) actor.Producer

// Config holds the cluster configuration
type Config struct {
listenAddr string
id string
region string
activationStrategy ActivationStrategy
engine *actor.Engine
provider Producer
requestTimeout time.Duration
listenAddr string
id string
region string
engine *actor.Engine
provider Producer
requestTimeout time.Duration
}

// NewConfig returns a Config that is initialized with default values.
func NewConfig() Config {
return Config{
listenAddr: getRandomListenAddr(),
id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)),
region: "default",
activationStrategy: NewDefaultActivationStrategy(),
provider: NewSelfManagedProvider(NewSelfManagedConfig()),
requestTimeout: defaultRequestTimeout,
listenAddr: getRandomListenAddr(),
id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)),
region: "default",
provider: NewSelfManagedProvider(NewSelfManagedConfig()),
requestTimeout: defaultRequestTimeout,
}
}

Expand Down Expand Up @@ -71,14 +69,6 @@ func (config Config) WithEngine(e *actor.Engine) Config {
return config
}

// TODO: Still not convinced about the name "ActivationStrategy".
// TODO: Document this more.
// WithActivationStrategy
func (config Config) WithActivationStrategy(s ActivationStrategy) Config {
config.activationStrategy = s
return config
}

// WithListenAddr set's the listen address of the underlying remote.
//
// Defaults to a random port number.
Expand Down Expand Up @@ -160,6 +150,7 @@ func (c *Cluster) Spawn(p actor.Producer, id string, opts ...actor.OptFunc) *act
return pid
}

// ActivationConfig...
type ActivationConfig struct {
id string
region string
Expand All @@ -181,16 +172,19 @@ func (config ActivationConfig) WithID(id string) ActivationConfig {
return config
}

// WithRegion set's the region on where this actor (potentially) will be spawned
// WithRegion set's the region on where this actor should be spawned.
//
// Defaults to a "default".
func (config ActivationConfig) WithRegion(region string) ActivationConfig {
config.region = region
return config
}

// Activate actives the given actor kind with an optional id. If there is no id
// given, the engine will create an unique id automatically.
// Activate actives the registered kind in the cluster based on the given config.
// The actor does not need to be registered locally on the member if at least one
// member has that kind registered.
//
// playerPID := cluster.Activate("player", cluster.NewActivationConfig())
func (c *Cluster) Activate(kind string, config ActivationConfig) *actor.PID {
msg := activate{
kind: kind,

Check failure on line 190 in cluster/cluster.go

View workflow job for this annotation

GitHub Actions / build

unknown field kind in struct literal of type activate

Check failure on line 190 in cluster/cluster.go

View workflow job for this annotation

GitHub Actions / build

unknown field kind in struct literal of type activate
Expand All @@ -215,23 +209,21 @@ func (c *Cluster) Deactivate(pid *actor.PID) {
c.engine.Send(c.agentPID, deactivate{pid: pid})
}

// RegisterKind registers a new actor/receiver kind that can be spawned from any node
// on the cluster.
// NOTE: Kinds can only be registered if the cluster is not running.
func (c *Cluster) RegisterKind(name string, producer actor.Producer, config *KindConfig) {
// RegisterKind registers a new actor that can be activated from any member
// in the cluster.
//
// cluster.Register("player", NewPlayer, NewKindConfig())
//
// NOTE: Kinds can only be registered before the cluster is started.
func (c *Cluster) RegisterKind(kind string, producer actor.Producer, config KindConfig) {
if c.isStarted {
slog.Warn("trying to register new kind on a running cluster")
slog.Warn("failed to register kind", "reason", "cluster already started", "kind", kind)
return
}
if config == nil {
config = &KindConfig{}
}
kind := newKind(name, producer, *config)
c.kinds = append(c.kinds, kind)
c.kinds = append(c.kinds, newKind(kind, producer, config))
}

// HasLocalKind returns true if this members of the cluster has the kind
// locally registered.
// HasKindLocal returns true whether the node of the cluster has the kind locally registered.
func (c *Cluster) HasKindLocal(name string) bool {
for _, kind := range c.kinds {
if kind.name == name {
Expand Down
23 changes: 12 additions & 11 deletions cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/anthdm/hollywood/actor"
"github.com/anthdm/hollywood/remote"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type Player struct{}
Expand All @@ -28,11 +29,11 @@ func NewInventory() actor.Receiver {

func (i Inventory) Receive(c *actor.Context) {}

func TestFooBarBaz(t *testing.T) {
config := NewConfig()
cluster, err := New(config)
assert.Nil(t, err)
_ = cluster
func TestClusterActivationOnMemberFunc(t *testing.T) {
c, err := New(NewConfig())
require.Nil(t, err)

c.RegisterKind("player", NewPlayer, NewKindConfig())
}

func TestClusterShouldWorkWithDefaultValues(t *testing.T) {
Expand All @@ -45,8 +46,8 @@ func TestClusterShouldWorkWithDefaultValues(t *testing.T) {

func TestRegisterKind(t *testing.T) {
c := makeCluster(t, getRandomLocalhostAddr(), "A", "eu-west")
c.RegisterKind("player", NewPlayer, nil)
c.RegisterKind("inventory", NewInventory, nil)
c.RegisterKind("player", NewPlayer, NewKindConfig())
c.RegisterKind("inventory", NewInventory, NewKindConfig())
assert.True(t, c.HasKindLocal("player"))
assert.True(t, c.HasKindLocal("inventory"))
}
Expand Down Expand Up @@ -94,7 +95,7 @@ func TestClusterSpawn(t *testing.T) {
func TestMemberJoin(t *testing.T) {
c1 := makeCluster(t, getRandomLocalhostAddr(), "A", "eu-west")
c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west")
c2.RegisterKind("player", NewPlayer, nil)
c2.RegisterKind("player", NewPlayer, NewKindConfig())

wg := sync.WaitGroup{}
wg.Add(1)
Expand Down Expand Up @@ -127,7 +128,7 @@ func TestActivate(t *testing.T) {
c1 = makeCluster(t, addr, "A", "eu-west")
c2 = makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west")
)
c2.RegisterKind("player", NewPlayer, nil)
c2.RegisterKind("player", NewPlayer, NewKindConfig())

expectedPID := actor.NewPID(c2.engine.Address(), "player/1")
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -163,7 +164,7 @@ func TestDeactivate(t *testing.T) {
addr := getRandomLocalhostAddr()
c1 := makeCluster(t, addr, "A", "eu-west")
c2 := makeCluster(t, getRandomLocalhostAddr(), "B", "eu-west")
c2.RegisterKind("player", NewPlayer, nil)
c2.RegisterKind("player", NewPlayer, NewKindConfig())

expectedPID := actor.NewPID(c2.engine.Address(), "player/1")
wg := sync.WaitGroup{}
Expand Down Expand Up @@ -212,7 +213,7 @@ func TestMemberLeave(t *testing.T) {
assert.Nil(t, err)

c1 := makeCluster(t, c1Addr, "A", "eu-west")
c2.RegisterKind("player", NewPlayer, nil)
c2.RegisterKind("player", NewPlayer, NewKindConfig())
c1.Start()

wg := sync.WaitGroup{}
Expand Down
22 changes: 19 additions & 3 deletions cluster/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,26 @@ package cluster

import "github.com/anthdm/hollywood/actor"

// KindConfig holds the Kind configuration
type KindConfig struct{}
// KindConfig holds configuration for a registered kind.
type KindConfig struct {
activateOnMember ActivateOnMemberFunc
}

// NewKindConfig returns a default kind configuration.
func NewKindConfig() KindConfig {
return KindConfig{
activateOnMember: ActivateOnRandomMember,
}
}

// WithActivateOnMemberFunc set the function that will be used to select the member
// where this kind will be spawned/activated on.
func (config KindConfig) WithActivateOnMemberFunc(fun ActivateOnMemberFunc) KindConfig {
config.activateOnMember = fun
return config
}

// A kind is a type of actor that can be activate on a node.
// A kind is a type of actor that can be activated from any member of the cluster.
type kind struct {
config KindConfig
name string
Expand Down

0 comments on commit 506e877

Please sign in to comment.