Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polishing cluster #146

Merged
merged 2 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions cluster/activation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package cluster

import (
fmt "fmt"
"math"
"math/rand"
)

// ActivationConfig...
type ActivationConfig struct {
id string
region string
selectMember SelectMemberFunc
}

// NewActivationConfig returns a new default config.
func NewActivationConfig() ActivationConfig {
return ActivationConfig{
id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)),
region: "default",
selectMember: SelectRandomMember,
}
}

// WithSelectMemberFunc set's the fuction that will be invoked during
// the activation process.
// It will select the member where the actor will be activated/spawned on.
func (config ActivationConfig) WithSelectMemberFunc(fun SelectMemberFunc) ActivationConfig {
config.selectMember = fun
return config
}

// WithID set's the id of the actor that will be activated on the cluster.
//
// Defaults to a random identifier.
func (config ActivationConfig) WithID(id string) ActivationConfig {
config.id = id
return config
}

// WithRegion set's the region on where this actor should be spawned.
//
// Defaults to a "default".
func (config ActivationConfig) WithRegion(region string) ActivationConfig {
config.region = region
return config
}

// SelectMemberFunc will be invoked during the activation process.
// Given the ActivationDetails the actor will be spawned on the returned member.
type SelectMemberFunc func(ActivationDetails) *Member

// ActivationDetails holds detailed information about an activation.
type ActivationDetails struct {
// Region where the actor should be activated on
Region string
// A slice of members that are pre-filtered by the kind of the actor
// that need to be activated
Members []*Member
// The kind of the actor
Kind string
}

// SelectRandomMember selects a random member of the cluster.
func SelectRandomMember(details ActivationDetails) *Member {
return details.Members[rand.Intn(len(details.Members))]
}
35 changes: 0 additions & 35 deletions cluster/activator.go

This file was deleted.

22 changes: 12 additions & 10 deletions cluster/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import (
type (
activate struct {
kind string
id string
region string
config ActivationConfig
}
getMembers struct{}
getKinds struct{}
Expand Down Expand Up @@ -60,7 +59,7 @@ func (a *Agent) Receive(c *actor.Context) {
case *Activation:
a.handleActivation(msg)
case activate:
pid := a.activate(msg.kind, msg.id, msg.region)
pid := a.activate(msg.kind, msg.config)
c.Respond(pid)
case deactivate:
a.bcast(&Deactivation{PID: msg.pid})
Expand Down Expand Up @@ -117,27 +116,30 @@ func (a *Agent) handleActivationRequest(msg *ActivationRequest) *ActivationRespo
return resp
}

func (a *Agent) activate(kind, id, region string) *actor.PID {
func (a *Agent) activate(kind string, config ActivationConfig) *actor.PID {
members := a.members.FilterByKind(kind)
if len(members) == 0 {
slog.Warn("could not find any members with kind", "kind", kind)
return nil
}
owner := a.cluster.config.activationStrategy.ActivateOnMember(ActivationDetails{
if config.selectMember == nil {
config.selectMember = SelectRandomMember
}
memberPID := config.selectMember(ActivationDetails{
Members: members,
Region: region,
Region: config.region,
Kind: kind,
})
if owner == nil {
if memberPID == nil {
slog.Warn("activator did not found a member to activate on")
return nil
}
req := &ActivationRequest{Kind: kind, ID: id}
activatorPID := actor.NewPID(owner.Host, "cluster/"+owner.ID)
req := &ActivationRequest{Kind: kind, ID: config.id}
activatorPID := actor.NewPID(memberPID.Host, "cluster/"+memberPID.ID)

var activationResp *ActivationResponse
// Local activation
if owner.Host == a.cluster.engine.Address() {
if memberPID.Host == a.cluster.engine.Address() {
activationResp = a.handleActivationRequest(req)
} else {
// Remote activation
Expand Down
91 changes: 26 additions & 65 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,22 @@ type Producer func(c *Cluster) actor.Producer

// Config holds the cluster configuration
type Config struct {
listenAddr string
id string
region string
activationStrategy ActivationStrategy
engine *actor.Engine
provider Producer
requestTimeout time.Duration
listenAddr string
id string
region string
engine *actor.Engine
provider Producer
requestTimeout time.Duration
}

// NewConfig returns a Config that is initialized with default values.
func NewConfig() Config {
return Config{
listenAddr: getRandomListenAddr(),
id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)),
region: "default",
activationStrategy: NewDefaultActivationStrategy(),
provider: NewSelfManagedProvider(NewSelfManagedConfig()),
requestTimeout: defaultRequestTimeout,
listenAddr: getRandomListenAddr(),
id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)),
region: "default",
provider: NewSelfManagedProvider(NewSelfManagedConfig()),
requestTimeout: defaultRequestTimeout,
}
}

Expand Down Expand Up @@ -71,14 +69,6 @@ func (config Config) WithEngine(e *actor.Engine) Config {
return config
}

// TODO: Still not convinced about the name "ActivationStrategy".
// TODO: Document this more.
// WithActivationStrategy
func (config Config) WithActivationStrategy(s ActivationStrategy) Config {
config.activationStrategy = s
return config
}

// WithListenAddr set's the listen address of the underlying remote.
//
// Defaults to a random port number.
Expand Down Expand Up @@ -160,42 +150,15 @@ func (c *Cluster) Spawn(p actor.Producer, id string, opts ...actor.OptFunc) *act
return pid
}

type ActivationConfig struct {
id string
region string
}

// NewActivationConfig returns a new default config.
func NewActivationConfig() ActivationConfig {
return ActivationConfig{
id: fmt.Sprintf("%d", rand.Intn(math.MaxInt)),
region: "default",
}
}

// WithID set's the id of the actor that will be activated on the cluster.
// Activate actives the registered kind in the cluster based on the given config.
// The actor does not need to be registered locally on the member if at least one
// member has that kind registered.
//
// Defaults to a random identifier.
func (config ActivationConfig) WithID(id string) ActivationConfig {
config.id = id
return config
}

// WithRegion set's the region on where this actor (potentially) will be spawned
//
// Defaults to a "default".
func (config ActivationConfig) WithRegion(region string) ActivationConfig {
config.region = region
return config
}

// Activate actives the given actor kind with an optional id. If there is no id
// given, the engine will create an unique id automatically.
// playerPID := cluster.Activate("player", cluster.NewActivationConfig())
func (c *Cluster) Activate(kind string, config ActivationConfig) *actor.PID {
msg := activate{
kind: kind,
id: config.id,
region: config.region,
config: config,
}
resp, err := c.engine.Request(c.agentPID, msg, c.config.requestTimeout).Result()
if err != nil {
Expand All @@ -215,23 +178,21 @@ func (c *Cluster) Deactivate(pid *actor.PID) {
c.engine.Send(c.agentPID, deactivate{pid: pid})
}

// RegisterKind registers a new actor/receiver kind that can be spawned from any node
// on the cluster.
// NOTE: Kinds can only be registered if the cluster is not running.
func (c *Cluster) RegisterKind(name string, producer actor.Producer, config *KindConfig) {
// RegisterKind registers a new actor that can be activated from any member
// in the cluster.
//
// cluster.Register("player", NewPlayer, NewKindConfig())
//
// NOTE: Kinds can only be registered before the cluster is started.
func (c *Cluster) RegisterKind(kind string, producer actor.Producer, config KindConfig) {
if c.isStarted {
slog.Warn("trying to register new kind on a running cluster")
slog.Warn("failed to register kind", "reason", "cluster already started", "kind", kind)
return
}
if config == nil {
config = &KindConfig{}
}
kind := newKind(name, producer, *config)
c.kinds = append(c.kinds, kind)
c.kinds = append(c.kinds, newKind(kind, producer, config))
}

// HasLocalKind returns true if this members of the cluster has the kind
// locally registered.
// HasKindLocal returns true whether the node of the cluster has the kind locally registered.
func (c *Cluster) HasKindLocal(name string) bool {
for _, kind := range c.kinds {
if kind.name == name {
Expand Down
Loading