-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathpub.coffee
119 lines (106 loc) · 3.83 KB
/
pub.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
{DB_KEY} = AnyDb # Spoofing a Mongo collection name to hack around DDP
debug = (->)
if Meteor.settings.public?.log?.pub
debug = console.log.bind(console, 'pub')
# flatten a deep object into fields separated with '.'
obj2Fields = (obj={}) ->
dest = {}
for key, value of obj
if U.isPlainObject(value)
deeperFields = obj2Fields(value)
for k,v of deeperFields
dest["#{key}.#{k}"] = v
else
dest[key] = R.clone(value)
return dest
salter = -> Random.hexString(10)
# publish with the subscriptionId and the position
createOrderedObserver = (pub, subId) ->
addedBefore: (id, fields={}, before) ->
U.set([DB_KEY, subId], "#{salter()}.#{before}", fields)
pub.added(DB_KEY, id, obj2Fields(fields))
movedBefore: (id, before) ->
fields = {}
U.set([DB_KEY, subId], "#{salter()}.#{before}", fields)
pub.changed(DB_KEY, id, obj2Fields(fields))
changed: (id, fields) ->
pub.changed(DB_KEY, id, fields)
removed: (id) ->
pub.removed(DB_KEY, id)
# pubs[name][serialize(query)][subId] = refresh
AnyDb.pubs = {}
AnyDb.refresh = (name, queryCond) ->
if AnyDb.pubs[name]
queries = Object.keys(AnyDb.pubs[name])
.map(U.deserialize)
.filter(queryCond)
.map(U.serialize)
debug 'refresh', name
queries.map (query) ->
# defer these updates so they dont block methods or subscriptions
U.mapObj AnyDb.pubs[name][query], (subId, sub) -> Meteor.defer -> sub.refresh()
AnyDb.transform = (name, queryCond, xform) ->
if AnyDb.pubs[name]
queries = Object.keys(AnyDb.pubs[name])
.map(U.deserialize)
.filter(queryCond)
.map(U.serialize)
debug 'transform', name
queries.map (query) ->
# defer these transforms so they dont block methods or subscriptions
U.mapObj AnyDb.pubs[name][query], (subId, sub) -> Meteor.defer -> sub.transform(xform)
AnyDb.publish = (name, fetcher) ->
Meteor.publish name, (query) ->
# unblock this publication so others can be processed while waiting
# for HTTP requests so they arent fetched synchronously in order.
# Thanks again Arunoda!
this.unblock()
# subscribe undefined comes through as null and this is annoying when you
# want to refresh a publication matching undefined
if query is null then query = undefined
pub = this
subId = pub._subscriptionId
sub =
subId: subId
docs: []
name: name
query: query
# fetch documents
sub.fetch = -> fetcher.call(pub, query)
# observer which sends DDP messages through merge-box through
# the publication along with subId and position information.
sub.observer = createOrderedObserver(pub, subId)
# fetch document again, diff, and publish
sub.refresh = ->
lap = U.stopwatch()
debug('refreshing', name, subId)
newDocs = sub.fetch()
DiffSequence.diffQueryChanges(true, sub.docs, newDocs, sub.observer)
sub.docs = newDocs
debug('refreshed', name, subId, lap(), 's')
# transform data, rather than refresh if we know for sure what the change
# will be.
sub.transform = (xform)->
lap = U.stopwatch()
debug('transforming', name, subId)
newDocs = xform(R.clone(sub.data))
DiffSequence.diffQueryChanges(true, sub.docs, newDocs, sub.observer)
sub.docs = newDocs
debug('transformed', name, subId, lap(), 's')
do ->
lap = U.stopwatch()
debug('start', name, subId)
sub.docs = sub.fetch()
sub.docs.map (doc) ->
id = doc._id
fields = R.clone(doc)
delete fields._id
sub.observer.addedBefore(id, fields, null)
pub.ready()
debug('ready', name, subId, lap(), 's')
# register and unregister publication
key = U.serialize(query)
U.set [name, key, subId], sub, AnyDb.pubs
pub.onStop ->
debug('stop', name, subId)
U.unset [name, key, subId], AnyDb.pubs