-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathenvironment.go
118 lines (99 loc) · 2.89 KB
/
environment.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Tideland Go Cells - Environment
//
// Copyright (C) 2010-2017 Frank Mueller / Tideland / Oldenburg / Germany
//
// All rights reserved. Use of this source code is governed
// by the new BSD license.
package cells
//--------------------
// IMPORTS
//--------------------
import (
"runtime"
"github.com/tideland/golib/identifier"
"github.com/tideland/golib/logger"
)
//--------------------
// ENVIRONMENT
//--------------------
// Environment implements the Environment interface.
type environment struct {
id string
cells *registry
}
// NewEnvironment creates a new environment.
func NewEnvironment(idParts ...interface{}) Environment {
var id string
if len(idParts) == 0 {
id = identifier.NewUUID().String()
} else {
id = identifier.Identifier(idParts...)
}
env := &environment{
id: id,
cells: newRegistry(),
}
runtime.SetFinalizer(env, (*environment).Stop)
logger.Infof("cells environment %q started", env.ID())
return env
}
// ID implements the Environment interface.
func (env *environment) ID() string {
return env.id
}
// StartCell implements the Environment interface.
func (env *environment) StartCell(id string, behavior Behavior) error {
return env.cells.startCell(env, id, behavior)
}
// StopCell implements the Environment interface.
func (env *environment) StopCell(id string) error {
return env.cells.stopCell(id)
}
// HasCell implements the Environment interface.
func (env *environment) HasCell(id string) bool {
_, err := env.cells.cell(id)
return err == nil
}
// Subscribe implements the Environment interface.
func (env *environment) Subscribe(emitterID string, subscriberIDs ...string) error {
return env.cells.subscribe(emitterID, subscriberIDs...)
}
// Subscribers implements the Environment interface.
func (env *environment) Subscribers(id string) ([]string, error) {
return env.cells.subscribers(id)
}
// Unsubscribe implements the Environment interface.
func (env *environment) Unsubscribe(emitterID string, subscriberIDs ...string) error {
return env.cells.unsubscribe(emitterID, subscriberIDs...)
}
// Emit implements the Environment interface.
func (env *environment) Emit(id string, event Event) error {
c, err := env.cells.cell(id)
if err != nil {
return err
}
return c.ProcessEvent(event)
}
// EmitNew implements the Environment interface.
func (env *environment) EmitNew(id, topic string, payload interface{}) error {
event, err := NewEvent(topic, payload)
if err != nil {
return err
}
return env.Emit(id, event)
}
// Stop implements the Environment interface.
func (env *environment) Stop() error {
runtime.SetFinalizer(env, nil)
if err := env.cells.stop(); err != nil {
return err
}
logger.Infof("cells environment %q terminated", env.ID())
return nil
}
// createQueue is a factory for the configured type of queues.
func (env *environment) createQueue() Queue {
// Currently only one queue type is supported.
return newInMemoryQueue()
}
// EOF