-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjournal_reader.py
170 lines (130 loc) · 4.9 KB
/
journal_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""
Read from journald log
Set the frequency to add logs to the table using
"""
import select
from systemd import journal
import datetime
from uuid import UUID
import json
import pytz
import itertools
from tzlocal import get_localzone
import time
# get local timezone
local_tz = get_localzone()
valid_fields = ['priority', 'message', 'errno', 'syslog_facility', 'syslog_identifier', 'syslog_pid', '_pid', '_uid',
'_gid', 'unit', '_comm', '_exe', '_cmdline', '_cap_effective', '_audit_session', '_audit_loginuid',
'_systemd_cgroup', '_systemd_session', '_systemd_unit', '_systemd_user_unit', '_systemd_owner_uid',
'_systemd_slice', '_selinux_context', '_boot_id', '_machine_id', '_hostname', '_transport',
'_kernel_device', '_kernel_subsystem', '_udev_sysname', '_udev_devnode', '_udev_devlink', 'code_file',
'code_line', 'code_function', '__realtime_timestamp', '_source_realtime_timestamp']
def chunker(iterable, chunksize):
"""
Return elements from the iterable in `chunksize`-ed lists. The last returned
chunk may be smaller (if length of collection is not divisible by `chunksize`).
>>> print list(chunker(xrange(10), 3))
[[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
"""
i = iter(iterable)
while True:
wrapped_chunk = [list(itertools.islice(i, int(chunksize)))]
if not wrapped_chunk[0]:
break
yield wrapped_chunk.pop()
def convert_value(value):
if type(value) == UUID:
return str(value)
elif isinstance(value, datetime.datetime):
#return a UTC timestamp - conversion required because systemd returns a naive timestamp...
return local_tz.localize(value).isoformat()
elif isinstance(value, datetime.timedelta):
return value.total_seconds()
return str(value)
def transform_entry(entry):
t = {}
extra = {}
for k,v in entry.iteritems():
if k.lower() in valid_fields:
t[k.lower()] = convert_value(entry[k])
elif k.startswith('__') is False:
extra[k.lower()] = convert_value(entry[k])
if extra:
t['extra'] = json.dumps(extra)
return {
'insertId': entry['__CURSOR'],
'json': t
}
class JournalReader(object):
def __init__(self, writer, log, args):
self.writer = writer
j = journal.Reader()
self.log = log
self.total_shipped = 0
self.CURSOR_FILE = args.cursor
self.COUNT_THRESHOLD = args.count
self.SECOND_THRESHOLD = args.timeout
try:
with open(self.CURSOR_FILE,'r') as cfile:
self.cursor = cfile.read()
j.seek_cursor(self.cursor)
self.log.info('Loaded cursor from file')
except IOError:
#No cursor - start from the earliest available data
self.log.info('No cursor start from the start')
j.seek_head()
j.get_previous()
p = select.poll()
p.register(j, j.get_events())
self.journal = j
self.poll = p
self.bucket = []
self.last_ship = None
def save_cursor(self):
with open(self.CURSOR_FILE,'w') as cfile:
cfile.write(self.cursor)
@property
def seconds_since_last_ship(self):
if not self.last_ship:
return self.SECOND_THRESHOLD + 10
return (datetime.datetime.now() - self.last_ship).total_seconds()
def check_bucket(self):
"""
Do we need to ship the log entries yet?
"""
if not self.bucket or len(self.bucket) == 1:
self.log.debug('Nothing in bucket')
return False
if len(self.bucket) >= self.COUNT_THRESHOLD:
self.log.debug('Bucket is full')
self.ship_logs()
elif self.seconds_since_last_ship >= self.SECOND_THRESHOLD:
self.log.debug('Timeout passed')
self.ship_logs()
self.log.debug('not ready to ship')
return False
def run(self):
while self.poll.poll():
if self.journal.process() != journal.APPEND:
continue
#get all entries currently available
for entry in self.journal:
#print transform_entry(entry)
self.bucket.append(transform_entry(entry))
self.cursor = entry['__CURSOR']
self.check_bucket()
def ship_logs(self):
"""
Now ship the logs to BigQuery
"""
count = len(self.bucket)
for chunk in chunker(self.bucket, self.COUNT_THRESHOLD):
self.writer.put(chunk)
self.last_ship = datetime.datetime.now()
self.save_cursor()
self.log.debug('SHIPPED count={}'.format(count))
self.total_shipped += count
if self.total_shipped > 500:
self.log.info('SHIPPED count={}'.format(self.total_shipped))
self.total_shipped = 0
self.bucket = []