-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathacl.py
228 lines (187 loc) · 8.27 KB
/
acl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
from collections import namedtuple
from copy import copy
from itertools import chain
from flask import session
import fastcache
import config
import util
MAX_PARENT_REFERENCE_RESOLUTION_ROUNDS = 10
CHANNEL_PREFIXES = '#&'
ANY = '*'
PRIVATE_MESSAGE = 'PRIVATE_MESSAGE'
# not used
Rule = namedtuple('Rule', ['verdict', 'user', 'target', 'parent'])
def is_value_rule_value(value, rule_value):
if rule_value == ANY:
return True
return value in value_multi(rule_value)
def value_multi(value):
if isinstance(value, str):
return [value]
return value
class Node:
TERMINAL_SCOPES = (
util.Scope.CHANNEL,
util.Scope.DATE,
)
# Most specific to least.
SCOPE_SPECIFICITY = (
util.Scope.CHANNEL,
util.Scope.NETWORK,
util.Scope.ROOT,
)
VERDICT_DISAMBIGUATION = (
util.Verdict.DENY,
util.Verdict.ALLOW,
)
def __init__(self, verdict, user, scope, value, parent_scope, parent_value):
self.verdict = verdict
self.user = user
self.scope = scope
self.value = value
self.parent_scope = parent_scope
self.parent_value = parent_value
self.parent = None
self.children = []
def add_child(self, child):
"""Try to add the child in the correct place in the tree.
Return True if successfully parented anywhere, False otherwise.
"""
if all([
child.parent_scope == self.scope,
# At least one of the own values is a child's parent value
# Don't check for ANY here - that is covered by AccessControl and ANY parents aren't allowed anyway
bool(set(value_multi(child.parent_value)) & set(value_multi(self.value))),
# At least one of the users is covered by this rule
bool(set(value_multi(child.user)) & set(value_multi(self.user))) or self.user == ANY,
]):
node_child = copy(child)
node_child.parent = self
self.children.append(node_child)
return True
else:
return any([node.add_child(child) for node in self.children])
def find_rule(self, user, network, channel):
if is_value_rule_value(user, self.user):
if self.scope in Node.TERMINAL_SCOPES:
# This will make more sense once we have date scopes... or
# something.
if self.scope == util.Scope.CHANNEL and \
(is_value_rule_value(channel, self.value) or \
(self.value == PRIVATE_MESSAGE and channel[0] not in CHANNEL_PREFIXES)
):
return [self]
return []
elif self.scope == util.Scope.NETWORK and is_value_rule_value(network, self.value):
if channel:
return self._ask_children(user, network, channel)
else:
return [self]
elif self.scope == util.Scope.ROOT:
return self._ask_children(user, network, channel)
return []
def _ask_children(self, user, network, channel):
result = [child.find_rule(user, network, channel) for child in self.children]
return filter(
None,
chain.from_iterable(result)
)
def __repr__(self):
return "<Node verdict={}, user={}, scope={}, value={}>".format(
self.verdict,
self.user,
self.scope,
self.value,
)
def __str__(self, tree=False, indent=0):
return "{indent}Node verdict={}, user={}, scope={}, value={}{}".format(
self.verdict,
self.user,
self.scope,
self.value,
'\n' + '{spaces}'.join([child.__str__(indent=indent + 1, tree=tree) for child in self.children]).format(spaces=' ' * indent) if tree else '',
indent=' ' * indent,
)
class AccessControl:
# TODO: Enforce scope parent types
def __init__(self, rules):
self.rules = Node(None, ANY, util.Scope.ROOT, 'root', util.Scope.ROOT, 'root')
unresolved_nodes = []
self.wildcard_nodes = set()
for rule in rules:
verdict, user, (target_scope, target_value), (parent_scope, parent_value) = rule
target = Node(verdict, user, target_scope, target_value, parent_scope, parent_value)
if target_scope == ANY or target_value == ANY or parent_scope == ANY or parent_value == ANY:
self.wildcard_nodes.add(target)
else:
unresolved_nodes.append(target)
resolution_rounds = 0
while True:
next_unresolved_nodes = []
for node in unresolved_nodes:
if not (self.rules.add_child(node)):
next_unresolved_nodes.append(node)
unresolved_nodes = next_unresolved_nodes
resolution_rounds += 1
if not unresolved_nodes:
break
if resolution_rounds > MAX_PARENT_REFERENCE_RESOLUTION_ROUNDS:
raise RuntimeError("Spent too much time resolving parent references, probably a node has a non-existent or misspelled parent", unresolved_nodes)
@property
def user_email(self):
user = session.get('user')
if not user:
return ''
return user.get('email')
def evaluate(self, network, channel):
return self._evaluate(self.user_email, network, channel)
@fastcache.clru_cache(maxsize=1024)
def _evaluate(self, user, network, channel):
# From the tree.
applicable = list(self.rules.find_rule(user, network, channel))
# Add in wildcards.
# Don't worry about any wildcard scopes right now outside of
# whether to evaluate it or not; we currently don't
# have any scopes that can attach to arbitrary other scopes.
for wildcard_node in self.wildcard_nodes:
if is_value_rule_value(user, wildcard_node.user):
if channel:
if wildcard_node.scope == util.Scope.NETWORK:
# Granting network/anything should have no effect on a
# channel decision.
pass
elif wildcard_node.scope in (util.Scope.CHANNEL, ANY):
# Check that our own value is fine.
# Then check the parent (a network) to see if it passes
# muster.
# Wildcard ALLOWs don't apply if the target is actually a private message.
if all([
(is_value_rule_value(network, wildcard_node.parent_value) or wildcard_node.parent_scope == util.Scope.ROOT),
any([
(is_value_rule_value(channel, wildcard_node.value) and (channel[0] in CHANNEL_PREFIXES or wildcard_node.verdict == util.Verdict.DENY)),
(wildcard_node.value == PRIVATE_MESSAGE and channel[0] not in CHANNEL_PREFIXES),
]),
]):
if wildcard_node.scope == ANY:
wildcard_node = copy(wildcard_node)
wildcard_node.scope = util.Scope.CHANNEL
applicable.append(wildcard_node)
elif wildcard_node.scope in (util.Scope.NETWORK, ANY):
# If this is about a network only, channel scoped rules do nothing
if is_value_rule_value(network, wildcard_node.value):
node_copy = copy(wildcard_node)
node_copy.scope = util.Scope.NETWORK
applicable.append(node_copy)
# Prefer closest scope, matching user over wildcard
applicable = sorted(
applicable,
key=lambda node: (
Node.SCOPE_SPECIFICITY.index(node.scope), # Order by nearest scope,
node.value == ANY, # specific target over wildcard,
node.user == ANY, # specific user over wildcard,
Node.VERDICT_DISAMBIGUATION.index(node.verdict), # Deny over allow
),
)
assert applicable # We need at least one...
rule = applicable[0]
return rule.verdict == util.Verdict.ALLOW