forked from logpy/logpy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcore.py
304 lines (227 loc) · 7.21 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import itertools as it
from functools import partial
from .util import (dicthash, interleave, take, multihash, unique, evalt)
from toolz import groupby, map
from unification import reify, unify, var # noqa
#########
# Goals #
#########
def fail(s):
return iter(())
def success(s):
return iter((s, ))
def eq(u, v):
""" Goal such that u == v
See also:
unify
"""
def goal_eq(s):
result = unify(u, v, s)
if result is not False:
yield result
return goal_eq
################################
# Logical combination of goals #
################################
def lall(*goals):
""" Logical all with goal reordering to avoid EarlyGoalErrors
See also:
EarlyGoalError
earlyorder
>>> from kanren import lall, membero
>>> x = var('x')
>>> run(0, x, lall(membero(x, (1,2,3)), membero(x, (2,3,4))))
(2, 3)
"""
return (lallgreedy, ) + tuple(earlyorder(*goals))
def lallgreedy(*goals):
""" Logical all that greedily evaluates each goals in the order provided.
Note that this may raise EarlyGoalError when the ordering of the
goals is incorrect. It is faster than lall, but should be used
with care.
>>> from kanren import eq, run, membero
>>> x, y = var('x'), var('y')
>>> run(0, x, lallgreedy((eq, y, set([1]))), (membero, x, y))
(1,)
>>> run(0, x, lallgreedy((membero, x, y), (eq, y, {1}))) # doctest: +SKIP
Traceback (most recent call last):
...
kanren.core.EarlyGoalError
"""
if not goals:
return success
if len(goals) == 1:
return goals[0]
def allgoal(s):
g = goaleval(reify(goals[0], s))
return unique(
interleave(goaleval(reify(
(lallgreedy, ) + tuple(goals[1:]), ss))(ss) for ss in g(s)),
key=dicthash)
return allgoal
def lallfirst(*goals):
""" Logical all - Run goals one at a time
>>> from kanren import membero
>>> x = var('x')
>>> g = lallfirst(membero(x, (1,2,3)), membero(x, (2,3,4)))
>>> tuple(g({}))
({~x: 2}, {~x: 3})
>>> tuple(lallfirst()({}))
({},)
"""
if not goals:
return success
if len(goals) == 1:
return goals[0]
def allgoal(s):
for i, g in enumerate(goals):
try:
goal = goaleval(reify(g, s))
except EarlyGoalError:
continue
other_goals = tuple(goals[:i] + goals[i + 1:])
return unique(
interleave(
goaleval(reify((lallfirst, ) + other_goals, ss))(ss)
for ss in goal(s)),
key=dicthash)
else:
raise EarlyGoalError()
return allgoal
def lany(*goals):
""" Logical any
>>> from kanren import lany, membero
>>> x = var('x')
>>> g = lany(membero(x, (1,2,3)), membero(x, (2,3,4)))
>>> tuple(g({}))
({~x: 1}, {~x: 2}, {~x: 3}, {~x: 4})
"""
if len(goals) == 1:
return goals[0]
return lanyseq(goals)
def earlysafe(goal):
""" Call goal be evaluated without raising an EarlyGoalError """
try:
goaleval(goal)
return True
except EarlyGoalError:
return False
def earlyorder(*goals):
""" Reorder goals to avoid EarlyGoalErrors
All goals are evaluated. Those that raise EarlyGoalErrors are placed at
the end in a lall
See also:
EarlyGoalError
"""
if not goals:
return ()
groups = groupby(earlysafe, goals)
good = groups.get(True, [])
bad = groups.get(False, [])
if not good:
raise EarlyGoalError()
elif not bad:
return tuple(good)
else:
return tuple(good) + ((lall, ) + tuple(bad), )
def conde(*goalseqs):
""" Logical cond
Goal constructor to provides logical AND and OR
conde((A, B, C), (D, E)) means (A and B and C) or (D and E)
Equivalent to the (A, B, C); (D, E) syntax in Prolog.
See Also:
lall - logical all
lany - logical any
"""
return (lany, ) + tuple((lall, ) + tuple(gs) for gs in goalseqs)
def lanyseq(goals):
""" Logical any with possibly infinite number of goals
"""
def anygoal(s):
anygoal.goals, local_goals = it.tee(anygoal.goals)
def f(goals):
for goal in goals:
try:
yield goaleval(reify(goal, s))(s)
except EarlyGoalError:
pass
return unique(
interleave(
f(local_goals),
pass_exceptions=[EarlyGoalError]),
key=dicthash)
anygoal.goals = goals
return anygoal
def condeseq(goalseqs):
"""
Like conde but supports generic (possibly infinite) iterator of goals
"""
return (lanyseq, ((lall, ) + tuple(gs) for gs in goalseqs))
def everyg(predicate, coll):
"""
Asserts that predicate applies to all elements of coll.
"""
return (lall, ) + tuple((predicate, x) for x in coll)
########################
# User level execution #
########################
def run(n, x, *goals):
""" Run a logic program. Obtain n solutions to satisfy goals.
n - number of desired solutions. See ``take``
0 for all
None for a lazy sequence
x - Output variable
goals - a sequence of goals. All must be true
>>> from kanren import run, var, eq
>>> x = var()
>>> run(1, x, eq(x, 1))
(1,)
"""
results = map(partial(reify, x), goaleval(lall(*goals))({}))
return take(n, unique(results, key=multihash))
###################
# Goal Evaluation #
###################
class EarlyGoalError(Exception):
""" A Goal has been constructed prematurely
Consider the following case
>>> from kanren import run, eq, membero, var
>>> x, coll = var(), var()
>>> run(0, x, (membero, x, coll), (eq, coll, (1, 2, 3))) # doctest: +SKIP
The first goal, membero, iterates over an infinite sequence of all possible
collections. This is unproductive. Rather than proceed, membero raises an
EarlyGoalError, stating that this goal has been called early.
The goal constructor lall Logical-All-Early will reorder such goals to
the end so that the call becomes
>>> run(0, x, (eq, coll, (1, 2, 3)), (membero, x, coll)) # doctest: +SKIP
In this case coll is first unified to ``(1, 2, 3)`` then x iterates over
all elements of coll, 1, then 2, then 3.
See Also:
lall
earlyorder
"""
def find_fixed_point(f, arg):
"""
Repeatedly calls f until a fixed point is reached.
This may not terminate, but should if you apply some eventually-idempotent
simplification operation like evalt.
"""
last, cur = object(), arg
while last != cur:
last = cur
cur = f(cur)
return cur
def goaleval(goal):
""" Expand and then evaluate a goal
Idempotent
See also:
goalexpand
"""
if callable(goal): # goal is already a function like eq(x, 1)
return goal
if isinstance(goal, tuple): # goal is not yet evaluated like (eq, x, 1)
try:
return find_fixed_point(evalt, goal)
except Exception as e:
raise EarlyGoalError(e)
raise TypeError("Expected either function or tuple")