-
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcsvimporter.py
executable file
·452 lines (405 loc) · 15.1 KB
/
csvimporter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Commandline interface
to control CsvImporter class"""
import csv
import json
import locale
import logging
from datetime import datetime, timedelta
import click
from dateutil.parser import parse
from influxdb import InfluxDBClient
from pytz import timezone
class CsvImporter(object):
"""Class to read .csv files
and write the values to InfluxDB"""
def __init__(self, csv_filename, delimiter=','):
"""Constructor"""
logging.debug('CSV filename is set to "' + csv_filename + '"')
logging.debug('CSV delimter is set to "' + delimiter + '"')
self.csv_rows = []
with open(csv_filename, 'r') as csv_file:
csv_dict_reader = csv.DictReader(
csv_file,
delimiter=delimiter)
for row in csv_dict_reader:
self.csv_rows.append(row.copy())
# Declare variables
self.cfg_server = None
self.cfg_port = None
self.cfg_ssl = None
self.cfg_user = None
self.cfg_password = None
self.cfg_database = None
self.cfg_measurement = None
self.cfg_tags_columns = None
self.cfg_timestamp_column = None
self.cfg_timestamp_format = None
self.cfg_timestamp_timezone = None
self.cfg_locale = None
self.cfg_date_filter = None
self.cfg_column_ignorelist = None
self.cfg_convert_int_to_float = None
self.influxdb_connection = None
def set_server(self, server):
"""Sets the InfluxDB server address"""
self.cfg_server = server
logging.debug('InfluxDB sever address is set to "' + self.cfg_server + '"')
def set_port(self, port):
"""Sets the InfluxDB server port"""
self.cfg_port = port
logging.debug('InfluxDB sever port is set to "' + self.cfg_port + '"')
def set_ssl(self, toggle):
"""Sets toggle for ssl"""
self.cfg_ssl = toggle
logging.debug('Toggle for ssl is set to "' + str(self.cfg_ssl) + '"')
def set_user(self, user):
"""Sets the InfluxDB user for authentication"""
self.cfg_user = user
logging.debug('InfluxDB user is set to "' + self.cfg_user + '"')
def set_password(self, password):
"""Sets the InfluxDB password for authentication"""
self.cfg_password = password
logging.debug('InfluxDB password is set to "' + self.cfg_password + '"')
def set_database(self, database):
"""Sets the InfluxDB database"""
self.cfg_database = database
logging.debug('InfluxDB database is set to "' + self.cfg_database + '"')
def set_measurement(self, measurement):
"""Sets the InfluxDB measurement"""
self.cfg_measurement = measurement
logging.debug('InfluxDB measurement is set to "' + self.cfg_measurement + '"')
def set_tags_columns(self, tags_columns):
"""Sets columns in csv that should be tags"""
self.cfg_tags_columns = tags_columns.split(',')
logging.debug('Tags are set to "' + tags_columns + '"')
def set_timestamp_column(self, column):
"""Sets the column to use as timestamp"""
self.cfg_timestamp_column = column
logging.debug('Timestamp column is set to "' + self.cfg_timestamp_column + '"')
def set_timestamp_format(self, fmt):
"""Sets the format of the timestamp column"""
self.cfg_timestamp_format = fmt
logging.debug('Timestamp format is set to "' + self.cfg_timestamp_format + '"')
def set_timestamp_timezone(self, tz):
"""Sets the timezone of the timestamp column"""
self.cfg_timestamp_timezone = tz
logging.debug('Timestamp timezone is set to "' + self.cfg_timestamp_timezone + '"')
def set_locale(self, lc):
"""Sets the locale for ctype, numeric and monetary values"""
self.cfg_locale = lc
locale.setlocale(locale.LC_CTYPE, self.cfg_locale)
logging.debug('Locale for ctype values is set to "' + str(locale.getlocale(locale.LC_CTYPE)) + '"')
locale.setlocale(locale.LC_NUMERIC, self.cfg_locale)
logging.debug('Locale for numeric values is set to "' + str(locale.getlocale(locale.LC_NUMERIC)) + '"')
locale.setlocale(locale.LC_MONETARY, self.cfg_locale)
logging.debug('Locale for monetary values is set to "' + str(locale.getlocale(locale.LC_MONETARY)) + '"')
def set_date_filter(self, date):
"""Sets the date for rows to filter"""
self.cfg_date_filter = date
logging.debug('Date filter is set to "' + self.cfg_date_filter + '"')
def set_column_ignorelist(self, columns):
"""Sets the list of columns to ignore"""
columns = columns.split(',')
columns = [x.strip(' ') for x in columns]
columns = [x for x in columns if x]
self.cfg_column_ignorelist = columns
logging.debug('Column ignorelist is set to ' + str(self.cfg_column_ignorelist))
def set_convert_int_to_float(self, toggle):
"""Sets toggle for integer to float conversion"""
self.cfg_convert_int_to_float = toggle
logging.debug('Toggle for int to float conversion is set to "' + str(self.cfg_convert_int_to_float) + '"')
def print_columns(self):
"""Returns all column names in pretty json format"""
columns = []
for row in self.csv_rows:
for key, value in row.items():
columns.append(key)
break
j = json.dumps(sorted(columns), indent=4, sort_keys=True)
return j
def print_rows(self):
"""Returns all rows in pretty json format"""
j = json.dumps(self.csv_rows, indent=4, sort_keys=True)
return j
@staticmethod
def match_date(epoch_timestamp, date_str='2020-01-01'):
"""Returns true if timestamp is inside the range of date
Returns false if timestamp is outside the range of date"""
utc_timestamp = datetime.utcfromtimestamp(int(epoch_timestamp))
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
date_nextday = date_obj + timedelta(days=1)
if utc_timestamp >= date_obj and utc_timestamp < date_nextday:
return True
else:
return False
@staticmethod
def convert_int_to_float(data, tags_columns=None):
"""Returns a dictionary where all integer values
are converted to float"""
if data is not None:
for key, value in data.items():
if not tags_columns or key not in tags_columns:
if value:
try:
data[key] = float(locale.atof(value))
except ValueError as exception:
logging.warning(exception)
else:
data[key] = None
return data
@staticmethod
def convert_into_utc_timestamp(date_str, fmt, tz):
"""Converts a datetime or epoch string into UTC timezone
because InfluxDB only works internally with UTC timestamps"""
datetime_tz = timezone('UTC').localize(datetime.utcfromtimestamp(0))
if fmt == 'raw':
return int(date_str)
elif fmt == 'epoch':
datetime_naive = datetime.utcfromtimestamp(int(date_str))
datetime_tz = timezone('UTC').localize(datetime_naive)
elif fmt == 'datetime':
datetime_naive = parse(date_str)
datetime_tz = timezone(tz).localize(datetime_naive)
else:
logging.error('Time format is not supported')
datetime_utc = datetime_tz.astimezone(timezone('UTC'))
return datetime_utc
def write_measurement(self, name, fields, tags=None, time=None):
"""Writes a single measurement to InfluxDB"""
json_body = [
{
'measurement': name,
'fields': fields
}
]
if tags is not None:
json_body[0]['tags'] = tags
if time is not None:
json_body[0]['time'] = time.strftime("%Y-%m-%dT%H:%M:%SZ") if isinstance(time, str) else time
try:
logging.debug(json_body)
self.influxdb_connection.write_points(json_body)
if logging.getLogger().getEffectiveLevel() == logging.WARNING:
print(".", end='', flush=True)
except Exception as exception:
logging.error(exception)
raise
def write_data(self):
"""Writes processed data to InfluxDB"""
logging.debug('Initialize InfluxDB connection')
self.influxdb_connection = InfluxDBClient(
self.cfg_server,
self.cfg_port,
self.cfg_user,
self.cfg_password,
self.cfg_database,
ssl=self.cfg_ssl,
verify_ssl=self.cfg_ssl)
measurements_count = 0
for row in self.csv_rows:
utc_timestamp = None
if self.cfg_timestamp_column is not None:
utc_timestamp = CsvImporter.convert_into_utc_timestamp(
row[self.cfg_timestamp_column],
self.cfg_timestamp_format,
self.cfg_timestamp_timezone)
row_copy = row.copy()
if self.cfg_date_filter is not None and \
self.cfg_timestamp_column is not None:
match = CsvImporter.match_date(
row[self.cfg_timestamp_column],
self.cfg_date_filter)
if not match:
row_copy = None
if self.cfg_column_ignorelist is not None:
for column in self.cfg_column_ignorelist:
if row_copy is not None:
del row_copy[column]
if self.cfg_convert_int_to_float is True:
if row_copy is not None:
row_copy = CsvImporter.convert_int_to_float(row_copy, self.cfg_tags_columns)
tags = None
if self.cfg_tags_columns is not None:
tags = {}
for column in self.cfg_tags_columns:
if row_copy and column in row_copy:
if column == '' or row_copy[column] == '':
del row_copy[column]
continue
else:
tags[column] = row_copy[column]
del row_copy[column]
if row_copy is not None:
self.write_measurement(
self.cfg_measurement,
row_copy,
tags=tags,
time=utc_timestamp)
measurements_count += 1
print(f"\nWrote {measurements_count} measurements to InfluxDB")
@click.command()
@click.argument('csvfile',
type=click.Path(exists=True))
@click.option(
'--delimiter',
default=',',
help='Delimiter of .csv file (Default: ,)'
)
@click.option(
'--server',
default='localhost',
help='Server address (Default: localhost)'
)
@click.option(
'--port',
default='8086',
help='Server port (Default: 8086)'
)
@click.option(
'--ssl',
is_flag=True,
default=False,
help='Use ssl for connection to InfluxDB',
)
@click.option(
'--user',
help='User for authentication'
)
@click.option(
'--password',
help='Pasword for authentication'
)
@click.option(
'--database',
help='Database name'
)
@click.option(
'--measurement',
help='Measurement name'
)
@click.option(
'--tags-columns',
help='Columns that should be tags \
\b \
e.g. col1,col2,col3'
)
@click.option(
'--timestamp-column',
help='Name of the column to use as timestamp; \
if option is not set, the current timestamp is used'
)
@click.option(
'--timestamp-format',
default='epoch',
type=click.Choice(['epoch', 'datetime', 'raw']),
help='Format of the timestamp column used \
to parse all timestamp \
\b \
(Default: epoch timestamp); \
\b \
epoch = epoch / unix timestamp \
\b \
datetime = normal date and/or time notation \
raw = raw epoch timestamp, do not convert'
)
@click.option(
'--timestamp-timezone',
default='UTC',
help='Timezone of the timestamp column'
)
@click.option(
'--locale',
help='Locale for ctype, numeric and monetary \
values e.g. de_DE.UTF-8'
)
@click.option(
'--date-filter',
help='Select only rows with a specific date \
in the timestamp column for import \
e.g. 2020-01-01'
)
@click.option(
'--column-ignorelist',
help='Ignore a list of columns for import \
e.g. col1,col2,col3'
)
@click.option(
'--convert-int-to-float',
is_flag=True,
default=True,
help='Convert integer values to float'
)
@click.option(
'--print-columns',
is_flag=True,
help='Print all column names in pretty json format'
)
@click.option(
'--print-rows',
is_flag=True,
help='Print all rows in pretty json format'
)
@click.option(
'--write-data',
is_flag=True,
help='Write data into InfluxDB'
)
@click.option(
'--verbose',
is_flag=True,
help='Enable verbose logging output'
)
def cli(*args, **kwargs):
"""Commandline interface for InfluxDB / CSV Importer"""
# Configure logging
log_format = '%(levelname)s: %(message)s'
if kwargs['verbose']:
logging.basicConfig(format=log_format, level=logging.DEBUG)
else:
logging.basicConfig(format=log_format)
# Instantiate CsvImporter
csv_importer = CsvImporter(kwargs['csvfile'], kwargs['delimiter'])
# Handle options
if kwargs['server']:
csv_importer.set_server(kwargs['server'])
if kwargs['port']:
csv_importer.set_port(kwargs['port'])
if kwargs['ssl']:
csv_importer.set_ssl(kwargs['ssl'])
if kwargs['user']:
csv_importer.set_user(kwargs['user'])
if kwargs['password']:
csv_importer.set_password(kwargs['password'])
if kwargs['database']:
csv_importer.set_database(kwargs['database'])
if kwargs['measurement']:
csv_importer.set_measurement(kwargs['measurement'])
if kwargs['tags_columns']:
csv_importer.set_tags_columns(kwargs['tags_columns'])
if kwargs['timestamp_column']:
csv_importer.set_timestamp_column(kwargs['timestamp_column'])
if kwargs['timestamp_format']:
csv_importer.set_timestamp_format(kwargs['timestamp_format'])
if kwargs['timestamp_timezone']:
csv_importer.set_timestamp_timezone(kwargs['timestamp_timezone'])
if kwargs['locale']:
csv_importer.set_locale(kwargs['locale'])
if kwargs['date_filter']:
csv_importer.set_date_filter(kwargs['date_filter'])
if kwargs['column_ignorelist']:
csv_importer.set_column_ignorelist(kwargs['column_ignorelist'])
# Handle toggles
csv_importer.set_convert_int_to_float(kwargs['convert_int_to_float'])
# Handle actions
if kwargs['print_columns']:
columns = csv_importer.print_columns()
click.echo(columns)
if kwargs['print_rows']:
rows = csv_importer.print_rows()
click.echo(rows)
if kwargs['write_data']:
csv_importer.write_data()
if __name__ == '__main__':
cli()