This repository has been archived by the owner on Feb 12, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsistent_hashing.go
105 lines (95 loc) · 3.11 KB
/
consistent_hashing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
"bytes"
"crypto/md5"
"encoding/binary"
"sort"
"strconv"
"strings"
)
type hashRingEntry struct {
Position uint16
Hostname string
Instance string
DestinationIndex int
}
// hashRing represents the ring of elements and implements sort.Interface
// comparing `Position`, `Hostname`, and `Instance`, in that order.
type hashRing []hashRingEntry
// Len, Swap, and Less make up sort.Interface.
func (r hashRing) Len() int {
return len(r)
}
func (r hashRing) Swap(i, j int) {
r[i], r[j] = r[j], r[i]
}
func (r hashRing) Less(i, j int) bool {
return r[i].Position < r[j].Position ||
(r[i].Position == r[j].Position && r[i].Hostname < r[j].Hostname) ||
(r[i].Position == r[j].Position && r[i].Hostname == r[j].Hostname && r[i].Instance < r[j].Instance)
}
type ConsistentHasher struct {
Ring hashRing
destinations []*Destination
replicaCount int
}
func computeRingPosition(key []byte) uint16 {
var Position uint16
hash := md5.Sum(key)
buf := bytes.NewReader(hash[0:2])
binary.Read(buf, binary.BigEndian, &Position)
return Position
}
func NewConsistentHasher(destinations []*Destination) ConsistentHasher {
return NewConsistentHasherReplicaCount(destinations, 100)
}
func NewConsistentHasherReplicaCount(destinations []*Destination, replicaCount int) ConsistentHasher {
hashRing := ConsistentHasher{replicaCount: replicaCount}
for _, d := range destinations {
hashRing.AddDestination(d)
}
return hashRing
}
func (h *ConsistentHasher) AddDestination(d *Destination) {
newDestinationIndex := len(h.destinations)
h.destinations = append(h.destinations, d)
newRingEntries := make(hashRing, h.replicaCount)
for i := 0; i < h.replicaCount; i++ {
var keyBuf bytes.Buffer
// The part of the key prior to the ':' is actually the Python
// string representation of the tuple (server, instance) in the
// original Carbon code. Note that the server component excludes
// the port.
server := strings.Split(d.Addr, ":")
keyBuf.WriteString("('")
keyBuf.WriteString(server[0])
keyBuf.WriteString("', ")
if d.Instance != "" {
keyBuf.WriteString("'")
keyBuf.WriteString(d.Instance)
keyBuf.WriteString("'")
} else {
keyBuf.WriteString("None")
}
keyBuf.WriteString(")")
keyBuf.WriteString(":")
keyBuf.WriteString(strconv.Itoa(i))
position := computeRingPosition(keyBuf.Bytes())
newRingEntries[i].Position = position
newRingEntries[i].Hostname = server[0]
newRingEntries[i].Instance = d.Instance
newRingEntries[i].DestinationIndex = newDestinationIndex
}
h.Ring = append(h.Ring, newRingEntries...)
sort.Sort(h.Ring)
}
// GetDestinationIndex returns the index of the destination corresponding
// to the provided key.
func (h *ConsistentHasher) GetDestinationIndex(key []byte) int {
position := computeRingPosition(key)
// Find the index where we would insert a server entry with the same
// position field as the position for the specified key.
// This is equivalent to bisect_left in the Python implementation.
index := sort.Search(len(h.Ring), func(i int) bool { return h.Ring[i].Position >= position }) % len(h.Ring)
return h.Ring[index].DestinationIndex
}