-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathreasoner.py
360 lines (271 loc) · 13.2 KB
/
reasoner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
from collections import defaultdict
from itertools import combinations
import argparse
from py4j.java_gateway import JavaGateway
class ELReasoner:
def __init__(self, ontology_file, class_name) -> None:
## Setup java gateway and load the ontology
self.gateway = JavaGateway()
# get a parser from OWL files to DL ontologies
self.parser = self.gateway.getOWLParser()
# get a formatter to print in nice DL format
self.formatter = self.gateway.getSimpleDLFormatter()
# load an ontology from a file
self.ontology = self.parser.parseFile(ontology_file)
# EL algorithm can only handle binary conjunctions
self.gateway.convertToBinaryConjunctions(self.ontology)
# get the TBox axioms
self.tbox = self.ontology.tbox()
self.axioms = self.tbox.getAxioms()
# get all concepts occurring in the ontology and print
self.all_concepts = self.ontology.getSubConcepts()
# retrieve concept names and print
self.concept_names = self.ontology.getConceptNames()
# to create EL concepts
self.el_factory = self.gateway.getELFactory()
# initialize attributes for algorithm rum
self.subsumee = self.el_factory.getConceptName(class_name)
self.first_individual = 1
self.last_individual = 1
self.initial_concepts = {}
self.blocked_individuals = set()
self.interpretation = defaultdict(set)
self.roles_successors = defaultdict(lambda: defaultdict(set))
self.top = self.contains_top(self.ontology)
self.GCIs = self.get_GCIs()
def contains_top(self, ontology):
"""Function checks whether Top concept occurs in ontology.
Returns:
bool: True if Top occurs in docstring
"""
all_concepts = ontology.getSubConcepts()
# loop over all concepts and check if concept is Top.
for concept in all_concepts:
concept_type = concept.getClass().getSimpleName()
if concept_type == "TopConcept$":
return concept
return False
def get_GCIs(self):
"""Function creates a dictionary for all GCIs and Equivalence axioms
in the ontology. The dictionary keys are the concepts on the left hand
side of the GCIs, the values are a set of all concepts that occur on
the right hand side for the given key concept.
Returns:
Dict: Dictionary with left hand side concepts as keys, and a set of
right hand side concepts as value.
"""
GCIs = defaultdict(set)
for axiom in self.axioms:
axiom_type = axiom.getClass().getSimpleName()
# if GCI, just add right hand side to the set
if axiom_type == "GeneralConceptInclusion":
GCIs[axiom.lhs()].add(axiom.rhs())
# if equivalence axiom, add both directions to dictionary
elif axiom_type == "EquivalenceAxiom":
axiom_concepts = axiom.getConcepts().toArray()
GCIs[axiom_concepts[0]].add(axiom_concepts[1])
GCIs[axiom_concepts[1]].add(axiom_concepts[0])
return GCIs
def top_rule(self, individual):
"""
Assign top to this individual.
"""
if self.top and self.el_factory.getTop() not in self.interpretation[individual]:
self.interpretation[individual].add(self.top)
return True
return False
def intersect_rule_1(self, individual, conjunction):
"""Assign the individual concepts of the conjunction
to the individual in the representation
Args:
individual (int): integer representing individual
concept : A conjunction concept
Returns:
bool: True if something actually changed.
"""
add_concepts = set()
changed = False
for conjunct in conjunction.getConjuncts():
# assign the conjuncts of this conjunction to individual (if not already present)
if conjunct not in self.interpretation[individual]:
add_concepts.add(conjunct)
changed = True
# add all conjuncts at once
self.interpretation[individual].update(add_concepts)
return changed
def intersect_rule_2(self, individual):
"""For any combination of two concepts assigned to an individual,
also assign the conjunction, but only if the conjunction occurs in the
tbox.
Args:
individual (int): Integer representing the individual
Returns:
Bool: True if a conjunction was actually added.
"""
add_concepts = set()
changed = False
# get all combinations of 2 for the concepts of this individual
individual_concepts = list(self.interpretation[individual])
all_combinations = combinations(individual_concepts, 2)
# TODO: remove these lines if we def don't need it.
# Don't need to check for combinations of individual with itself
# all_combinations = [(concept, ind) for ind in individual_concepts if ind != concept]
# create a conjunction for all combinations
for combination in all_combinations:
conjunction = self.el_factory.getConjunction(combination[0], combination[1])
# assign the conjunction to the individual if it's also in Tbox and not assigned yet
if conjunction in self.all_concepts and conjunction not in self.interpretation[individual]:
add_concepts.add(conjunction)
changed = True
# add all conjunctions at once
self.interpretation[individual].update(add_concepts)
return changed
def exists_rule_1(self, individual, concept):
"""
# E-rule 1: If d has Er.C assigned, apply E-rules 1.1 and 1.2
# E-rule 1.1: If there is an element e with initial concept C assigned, e the r-successor of d.
# E-rule 1.2: Otherwise, add a new r-successor to d, and assign to it as initial concept C.
"""
changed = False
# "If d has Er.C assigned, apply E-rules 1.1 and 1.2"
role_r = concept.role() # r of Er.C
concept_c = concept.filler() # C of Er.C
# E-rule 1.1:
# If there is an element e with initial concept C assigned, e is the r-successor of d.
if concept_c in self.initial_concepts:
element_e = self.initial_concepts[concept_c]
if role_r not in self.roles_successors[individual]:
self.roles_successors[individual][role_r] = set()
# only add the element if not already assigned
if element_e not in self.roles_successors[individual][role_r]:
self.roles_successors[individual][role_r].add(element_e)
changed = True
# E-rule 1.2:
# Otherwise, add a new r-successor to d, and assign to it as initial concept C.
else:
self.last_individual += 1
if role_r not in self.roles_successors[individual]:
self.roles_successors[individual][role_r] = set()
# "add a new r-successor to d"
self.roles_successors[individual][role_r].add(self.last_individual)
# "and assign to it as initial concept C."
self.initial_concepts[concept_c] = self.last_individual
self.interpretation[self.last_individual].add(concept_c)
changed = True
return changed
def exists_rule_2(self, individual):
"""For each role succesor of the individual, assign the existential role restriction
to this individual as well. Only if the existential concept occurs in the tbox.
Args:
individual (int): Integer representing the individual
Returns:
Bool: True if an existential role restriction was assigned
"""
add_concepts = set()
changed = False
# loop over each succesor and their assigned concepts of the individual
for role, successors in self.roles_successors[individual].items():
for successor in successors:
for concept in self.interpretation[successor]:
existential = self.el_factory.getExistentialRoleRestriction(role, concept)
# add the role restriction if not yet assigned, and it occurs in the tbox
if existential in self.all_concepts and existential not in self.interpretation[individual]:
add_concepts.add(existential)
changed = True
# assign all role restrictions at once
self.interpretation[individual].update(add_concepts)
return changed
def subsumption_rule(self, individual, concept):
"""Assign all subsumers of 'concept' to this individual as well.
Args:
individual (int): Integer representing an individual
concept : Concept from which the subsumers should be assigned.
Returns:
Bool: True if subsumers were assigned to individual.
"""
changed = False
previous_len = len(self.interpretation[individual])
# assign all subsumers
self.interpretation[individual].update(self.GCIs[concept])
current_len = len(self.interpretation[individual])
# if individual now has more concepts assigned, the interpretation was changed
if current_len > previous_len:
changed = True
return changed
def apply_rules(self, individual):
"""Apply all the rules to this individual.
Args:
individual (int): Integer representing the individual
Returns:
Bool: True if any rule resulted in a change
"""
changes = []
# top rule only applies if top in tbox:
if self.top:
changes.append(self.top_rule(individual))
# Intersect rule 2
changes.append(self.intersect_rule_2(individual))
# all the rules that depend on for concept in interpretation[individual]:
# Casting this to a list to fix a bug where the set was modified during iteration.
for concept in list(self.interpretation[individual]):
concept_type = concept.getClass().getSimpleName()
# Intersect rule 1
if concept_type == "ConceptConjunction":
changes.append(self.intersect_rule_1(individual, concept))
# Exists rule 2
if concept_type == "ExistentialRoleRestriction":
changes.append(self.exists_rule_1(individual, concept))
# Subsumption rule
changes.append(self.subsumption_rule(individual, concept))
# Exists rule 2 (Unchanged - not dependent on concepts in individual)
changes.append(self.exists_rule_2(individual))
return True in changes
def get_subsumers(self, interpretation):
"""Find and print all subsumers using the first individual from the interpretation.
Args:
interpretation (dict): Dictionary of indivdiuals and their assigned concepts
"""
# loop over all concept names in ontology to check if they are subsumers of subsumee
for concept in self.concept_names:
# if the concept was assigned to first individual, it is a subsumer
if concept in interpretation[self.first_individual]:
self.subsumers.append(concept)
# also check if top is a subsumer (if not, the ontology is not coherent)
top = self.el_factory.getTop()
if top in interpretation[self.first_individual]:
self.subsumers.append(top)
# print the subsumers
for x in self.subsumers:
print(self.formatter.format(x).strip('"'))
def run(self):
"""Run the EL algorithm for self.ontology and self.subsumee, print the subsumers found for the
subsumee.
"""
self.subsumers = []
# start interpretation with one individual with subsumee assigned
self.interpretation[self.first_individual].add(self.subsumee)
# to keep track of changes in interpretation
changed = True
# loop as long as interpretation changed
while changed:
# reset changed to check if applying rules will change interpretation
changed = False
changes = set()
# apply all rules to every (non-blocked) individual in current interpretation
for individual in list(self.interpretation.keys()):
changes.add(self.apply_rules(individual))
# if any rule made a change to interpretation
changed = True in changes
self.get_subsumers(self.interpretation)
if __name__ == "__main__":
# Set-up parsing command line arguments
parser = argparse.ArgumentParser(description="Run EL reasoner and show subsumers for a given class name.")
# Adding arguments
parser.add_argument("ontology_file", help="Path to the ontology file in OWL format.")
parser.add_argument("class_name", help="Class name to compute subsumers for. Write as: ClassName")
# Read arguments from command line
args = parser.parse_args()
# Create reasoner object
reasoner = ELReasoner(args.ontology_file, args.class_name)
# Run main with provide arguments
reasoner.run()