-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathraft.go
125 lines (118 loc) · 2.6 KB
/
raft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package hraft
import (
"github.com/hlccd/hraft/protocol"
"log"
"net"
"sync"
"time"
)
type Raft struct {
id int64 //该结点id号,纳秒时间戳
vote int64 //上次投票支持的节点的id
master net.Conn //核心集群中的master结点连接
cores []*core //该结点集群的其他结点地址
lastKeep int64 //最后一次接收心跳的时间戳
inVote bool //在选举中?
point int
read chan string
write chan string
sync.RWMutex
}
func NewRaft(point int, clusterAddr string) (raft *Raft) {
raft = &Raft{
id: time.Now().UnixNano(),
vote: 0,
master: nil,
cores: make([]*core, 0, 0),
lastKeep: time.Now().UnixNano(),
point: point,
read: make(chan string, 1024),
write: make(chan string, 1024),
}
power := protocol.Master
if len(clusterAddr) != 0 {
if !raft.accessCluster(clusterAddr, getAddr(point)) {
return nil
}
power = protocol.Endpoint
} else {
go raft.clusterKeep()
}
go raft.listen(point)
go raft.msg()
raft.cores = append(raft.cores, newCore(raft.id, getAddr(point), power))
return raft
}
// 开启端口监听信息
func (raft *Raft) listen(point int) {
l, err := net.Listen("tcp", getAddr(point))
log.Println("正在监听端口:", point)
if err != nil {
log.Println("err:", err)
}
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
log.Println("err:", err)
}
msg, err := protocol.GetMessage(conn)
if err == nil && msg != nil {
// 分流处理信息
go raft.shunt(msg, conn)
} else {
conn.Close()
}
}
}
func (raft *Raft) IsMaster() bool {
if raft == nil {
return false
}
return raft.master == nil
}
func (raft *Raft) Get() (s string) {
if raft == nil {
return ""
}
select {
case <-time.After(time.Second):
return ""
case s = <-raft.read:
log.Println("接收到信息:", s)
return s
}
}
func (raft *Raft) Put(s string) bool {
if raft == nil {
return false
}
ch := make(chan bool)
go func() {
raft.write <- s
ch <- true
}()
select {
case <-time.After(time.Second):
return false
case <-ch:
return true
}
}
// 将收到的信息推送到整个集群
func (raft *Raft) msg() {
var s string
for {
s = <-raft.write
if raft.IsMaster() {
log.Println("正在向集群推送信息: ", s)
for _, c := range raft.cores {
protocol.SendMessage(c.GetConn(), protocol.Master, protocol.Endpoint, protocol.MSG, s)
}
go raft.received(s)
} else {
log.Println("正在向主节点推送信息: ", s, raft.master.RemoteAddr().String())
protocol.SendMessage(raft.master, protocol.Endpoint, protocol.Master, protocol.MSG, s)
}
}
}