-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtimeline.py
232 lines (175 loc) · 8.58 KB
/
timeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# -*- coding: utf-8 -*-
"""
Timeline class combines partial lists of ordered events into longest possible unambiguous sequences.
"""
import collections
import itertools
import logging_for_recursion as logging
class Timeline(object):
def __init__(self, partial_timelines=None):
"""Initialize Timeline instance with provided partial timelines.
Partial timelines can be added later using the add_partial_timeline method.
:type partial_timelines: [[unicode]]
:arg partial_timelines: list of ordered lists of strings that are events
"""
# Map events to sets of subsequent events
self._subsequent_events = collections.defaultdict(set)
# Merge partial timelines
for timeline in partial_timelines or []:
self.add_partial_timeline(timeline)
def add_partial_timeline(self, partial_timeline):
"""Add events from provided timeline to internal data.
:type partial_timeline: [<unicode>
:arg partial_timeline: ordered list of strings that are events
"""
for i, event in enumerate(partial_timeline or []):
if event not in self._subsequent_events:
self._subsequent_events[event] = set()
if i > 0:
self._add_subsequent_event(partial_timeline[i-1], event)
if i < len(partial_timeline)-1:
self._add_subsequent_event(event, partial_timeline[i+1])
def get_merged_timelines(self):
"""Merge partial timelines to generate longest possible unambiguous sequences of events.
Multiple timelines will be returned if a single absolute ordering of events cannot
be determined based on the partial timelines. For example, if one timeline specifes
that event 'x' occurred after 'w' and before 'z', and another timeline specifies that
event 'y' occurred after 'w' and before 'z', the absolute ordering of 'x' and 'y'
cannot be determined, and multiple timelines will be returned.
:rtype: [[unicode]]
:return: list of ordered lists of events
"""
return self._get_merged_timelines__recurse()
# private methods
def _add_subsequent_event(self, event, subsequent_event):
"""Add specified subsequent_event to self._subsequent_events for event.
:raise: ValueError if specified subsequent_event is already recorded as coming before event
:type event: <unicode>
:arg event: name of current event
:type subsequent_event: <unicode>
:arg subsequent_event: name of event succeeding current event
"""
subsequent_events = self._get_all_subsequent_events(subsequent_event)
if subsequent_events and event in subsequent_events:
raise ValueError(
"Contradiction detected: event '{0}' comes before and after event '{1}'".format(
event, subsequent_event))
self._subsequent_events[event].add(subsequent_event)
def _dedupe_timelines(self, timelines):
"""Filter out timelines that are exactly contained by other timelines.
:rtype: [[unicode]]
:return: subset of timelines with unique event sequences
:type timelines: [[unicode]]
:arg timelines: list of ordered lists of events
"""
if len(timelines) == 1:
return [timelines[0]]
logging.debug("Combining overlapping timelines {0}".format(timelines))
# Since we trust the order of the events within each timeline, we can find
# timelines that are included in another timeline by comparing sets of events.
timeline_events = [set(events) for events in timelines]
# Build list of booleans indicating if each timeline is unique or not.
unique_timeline_selectors = [True for i in range(len(timelines))]
for i in range(len(timelines)-1):
i_events = timeline_events[i]
for j in range(i+1, len(timelines)):
j_events = timeline_events[j]
if i_events.issubset(j_events):
unique_timeline_selectors[i] = False
if j_events.issubset(i_events):
unique_timeline_selectors[j] = False
return itertools.compress(timelines, unique_timeline_selectors)
def _get_all_subsequent_events(self, event):
direct_events = self._subsequent_events.get(event)
if not direct_events:
return None
subsequent_events = set()
for event in direct_events:
subsequent_events.add(event)
indirect_events = self._get_all_subsequent_events(event)
if indirect_events:
subsequent_events.update(indirect_events)
return subsequent_events
def _find_first_events(self):
"""Find events with no preceding events.
:rtype: set(unicode)
:return: set of events that have no preceding events
"""
all_events = set(self._subsequent_events.keys())
all_subsequent_events = set()
for subsequent_events in self._subsequent_events.values():
[all_subsequent_events.add(e) for e in subsequent_events]
return all_events.difference(all_subsequent_events)
def _get_next_events(self, from_event=None):
"""Find all events that succeed specified event.
:rtype: set(unicode)
:return: set of events that succeed specified event
:type from_event: unicode
:arg from_event: event for which subsequent events should be found
"""
if not from_event:
return self._find_first_events()
return self._subsequent_events.get(from_event) or set()
def _get_merged_timelines__recurse(self, from_event=None):
"""Internal recursive method for generating ordered timelines starting from specified event.
:rtype: [[unicode]]
:return: list of ordered lists of events
:type from_event: unicode
:arg from_event: optional name of starting event; None to begin
"""
next_events = self._get_next_events(from_event=from_event)
if not next_events:
return [[]]
# Generate timelines starting at each possible next event
merged_timelines = []
for next_event in next_events:
logging.debug("from_event={0}".format(from_event))
logging.debug("next_event={0}".format(next_event))
logging.increment_recursion_depth()
sub_timelines = self._get_merged_timelines__recurse(from_event=next_event)
# Discard sub_timelines that are represented by other sub_timelines.
# In practice, I think that duplicate timelines will match the end of another
# timeline, but look for any overlap just in case.
for timeline in self._dedupe_timelines(sub_timelines):
merged_timelines.append([next_event] + timeline)
logging.increment_recursion_depth(-1)
logging.debug("Returning timelines {0}".format(merged_timelines))
return merged_timelines
if __name__ == '__main__':
"""Command-line driver for merging arbitrary timeline data and displaying merged timelines.
"""
import argparse
import logging as pylogging
import json
import pprint
parser = argparse.ArgumentParser(
description='Combine partial timelines into longest possible sequences of events',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('infile',
help='input filename containing JSON list of ordered event sequences')
group = parser.add_mutually_exclusive_group()
group.add_argument('-v', '--verbose', action='store_true', help='info-level output')
group.add_argument('-V', '--very-verbose', action='store_true', help='debug-level output')
args = parser.parse_args()
# set logging to INFO or DEBUG based on command line arg
log_level = pylogging.ERROR
if args.verbose:
log_level = pylogging.INFO
elif args.very_verbose:
log_level = pylogging.DEBUG
pylogging.basicConfig(level=log_level)
# read data from input file
partial_timelines = None
with open(args.infile) as infile:
try:
partial_timelines = json.loads(infile.read())
except ValueError:
raise
if partial_timelines:
pylogging.info('Input timelines: {0}'.format(partial_timelines))
# Merge partial timelines
timeline = Timeline(partial_timelines=partial_timelines)
merged_timelines = timeline.get_merged_timelines()
print('\nMerged timelines:')
for t in merged_timelines:
print(t)