-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathrdoc.go
265 lines (226 loc) · 6.71 KB
/
rdoc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
// Package rdoc is a native go implementation of a conflict-free replicated
// JSON data structure (JSON CRDT). A JSON CRDT is a data structure that
// automatically resolves concurrent modifications such that no updates are
// lost, and such that all replicas converge towards the same state.
package rdoc
import (
"encoding/json"
"fmt"
"strings"
jpatch "github.com/evanphx/json-patch"
"github.com/gpestana/rdoc/idset"
"github.com/gpestana/rdoc/lclock"
)
// Doc represents a JSON CRDT document; It maintains the metadata necessary to guarantee
// that the state of the document replicas converge over time without losing data.
type Doc struct {
id string
appliedIDs *idset.Set
clock lclock.Clock
operations []Operation
bufferedOperations map[string]Operation
}
// Operation the metadata of a valid operation to perform a mutation on the
// JSON CRDT's document state.
type Operation struct {
id string
deps []string
raw jpatch.Operation
}
// Init initiates and returns a new JSON CRDT document. The input is a string encoding an
// unique ID of the replica. The application logic *must* ensure that the replica IDs are
// unique within its network
func Init(id string) *Doc {
return &Doc{
id: id,
appliedIDs: idset.New(),
clock: lclock.New([]byte(id)),
operations: []Operation{},
bufferedOperations: map[string]Operation{},
}
}
// Apply applies a valid operation represented as a JSON patch (https://tools.ietf.org/html/rfc6902)
// on the document. Apply handles both local and remote operations.
func (doc *Doc) Apply(rawPatch []byte) error {
patch, err := jpatch.DecodePatch(rawPatch)
if err != nil {
return err
}
appliedRemoteOperations := false
for _, opRaw := range patch {
op, err := doc.getOperationFromPatch(opRaw)
if err != nil {
return err
}
isFromSameClock, err := doc.clock.CheckTick(op.id)
if err != nil {
return err
}
// apply local operations and continues
if isFromSameClock {
doc.applyOperation(*op)
continue
}
// attempts to apply remote operations by checking if all operation
// dependencies have been applied on the doc
if len(doc.appliedIDs.Diff(op.deps)) != 0 {
doc.bufferedOperations[op.id] = *op
} else {
appliedRemoteOperations = true
delete(doc.bufferedOperations, op.id) // remove buffered operation in case it was buffered
doc.applyOperation(*op)
}
}
// if remote operation hasbeen applied, attemps to apply buffered operations
if appliedRemoteOperations {
doc.tryBufferedOperations()
}
return nil
}
func (doc *Doc) applyOperation(operation Operation) {
doc.appliedIDs.Add(operation.id)
doc.operations = append(doc.operations, operation)
}
func (doc *Doc) tryBufferedOperations() {
bufferedOperations, err := doc.marshalBufferedOperations()
if err != nil {
panic(fmt.Sprintf("Error marshalling buffered operations -- this should never happen: %v\n", err))
}
err = doc.Apply(bufferedOperations)
if err != nil {
panic(fmt.Sprintf("Error applying buffered operations -- this should never happen: %v\n", err))
}
}
// Operations returns the encoded operations applied to the document. The output
// of this function can be sent over the wire to other replicas, in order to
// achieve convergence.
func (doc Doc) Operations() ([]byte, error) {
return doc.MarshalFullJSON()
}
// MarshalJSON marshals a Doc into a buffer, excluding the deps field of each
// operation. The returned buffer *contains only the applied operations* that
// mutated the document state. The buffered operations as not included in the
// document serialization.
func (doc Doc) MarshalJSON() ([]byte, error) {
type operationNoDeps struct {
ID string `json:"id"`
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value"`
}
buffer := []operationNoDeps{}
for _, operation := range doc.operations {
path, err := operation.raw.Path()
if err != nil {
return nil, err
}
value, err := operation.raw.ValueInterface()
if err != nil {
return nil, err
}
opNoDeps := operationNoDeps{
ID: operation.id,
Op: operation.raw.Kind(),
Path: path,
Value: value,
}
buffer = append(buffer, opNoDeps)
}
return json.Marshal(buffer)
}
// MarshalFullJSON marshals a Doc into a buffer, including the dependencies
// field of each operation.
func (doc Doc) MarshalFullJSON() ([]byte, error) {
type operation struct {
ID string `json:"id"`
Op string `json:"op"`
Path string `json:"path"`
Deps []string `json:"deps"`
Value interface{} `json:"value"`
}
buffer := []operation{}
for _, op := range doc.operations {
path, err := op.raw.Path()
if err != nil {
return nil, err
}
value, err := op.raw.ValueInterface()
if err != nil {
return nil, err
}
structOp := operation{
ID: op.id,
Deps: op.deps,
Op: op.raw.Kind(),
Path: path,
Value: value,
}
buffer = append(buffer, structOp)
}
return json.Marshal(buffer)
}
func (doc Doc) marshalBufferedOperations() ([]byte, error) {
type bufferedOperation struct {
ID string `json:"id"`
Op string `json:"op"`
Path string `json:"path"`
Deps []string `json:"deps"`
Value interface{} `json:"value"`
}
buffer := []bufferedOperation{}
for _, operation := range doc.bufferedOperations {
path, err := operation.raw.Path()
if err != nil {
return nil, err
}
value, err := operation.raw.ValueInterface()
if err != nil {
return nil, err
}
op := bufferedOperation{
ID: operation.id,
Deps: operation.deps,
Op: operation.raw.Kind(),
Path: path,
Value: value,
}
buffer = append(buffer, op)
}
return json.Marshal(buffer)
}
func (doc *Doc) getOperationFromPatch(rawOp jpatch.Operation) (*Operation, error) {
var id string
deps := []string{}
rawDeps := rawOp["deps"]
rawID := rawOp["id"]
// ID is set but dependency set is not (or vice-versa) means that the remote operation
// is not valid
if rawID == nil && rawDeps != nil || rawID != nil && rawDeps == nil {
return nil,
fmt.Errorf("Remote operation must have an associated id and set of dependencies")
}
// if id AND deps are not set, we assume it is a local operation. Thus, we should add
// metadata based on local replica state (id and dependencies)
if rawID == nil && rawDeps == nil {
// local operation
doc.clock.Tick()
return &Operation{
id: doc.clock.String(),
deps: doc.appliedIDs.GetIDs(),
raw: rawOp,
}, nil
}
// remote operation
id = string(*rawID)
id = strings.TrimSuffix(id, "\"")
id = strings.TrimPrefix(id, "\"")
err := json.Unmarshal(*rawDeps, &deps)
if err != nil {
return nil, err
}
return &Operation{
id: id,
deps: deps,
raw: rawOp,
}, nil
}