-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathkipper.py
executable file
·1097 lines (819 loc) · 37 KB
/
kipper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python
# -*- coding: utf-8 -*-
import subprocess
import datetime
import dateutil.parser as parser2
import calendar
import optparse
import re
import os
import sys
from shutil import copy
import tempfile
import json
import glob
import gzip
CODE_VERSION = '1.0.0'
REGEX_NATURAL_SORT = re.compile('([0-9]+)')
KEYDB_LIST = 1
KEYDB_EXTRACT = 2
KEYDB_REVERT = 3
KEYDB_IMPORT = 4
class MyParser(optparse.OptionParser):
"""
Provides a better class for displaying formatted help info.
From http://stackoverflow.com/questions/1857346/python-optparse-how-to-include-additional-info-in-usage-output.
"""
def format_epilog(self, formatter):
return self.epilog
def stop_err( msg ):
sys.stderr.write("%s\n" % msg)
sys.exit(1)
class Kipper(object):
def __init__(self):
# Provide defaults
self.db_master_file_name = None
self.db_master_file_path = None
self.metadata_file_path = None
self.db_import_file_path = None
self.output_file = None # By default, printed to stdout
self.volume_id = None
self.version_id = None # Note, this is natural #, starts from 1;
self.metadata = None
self.options = None
self.compression = ''
_nowabout = datetime.datetime.utcnow()
self.dateTime = long(_nowabout.strftime("%s"))
self.delim = "\t"
self.nl = "\n"
def __main__(self):
"""
Handles all command line options for creating kipper archives, and extracting or reverting to a version.
"""
options, args = self.get_command_line()
self.options = options
if options.code_version:
print CODE_VERSION
return CODE_VERSION
# *********************** Get Master kipper file ***********************
if not len(args):
stop_err('A Kipper database file name needs to be included as first parameter!')
self.db_master_file_name = args[0] #accepts relative path with file name
self.db_master_file_path = self.check_folder(self.db_master_file_name, "Kipper database file")
# db_master_file_path is used from now on; db_master_file_name is used just for metadata labeling.
# Adjust it to remove any relative path component.
self.db_master_file_name = os.path.basename(self.db_master_file_name)
if os.path.isdir(self.db_master_file_path):
stop_err('Error: Kipper data file "%s" is actually a folder!' % (self.db_master_file_path) )
self.metadata_file_path = self.db_master_file_path + '.md'
# Returns path but makes sure its folder is real. Must come before get_metadata()
self.output_file = self.check_folder(options.db_output_file_path)
# ************************* Get Metadata ******************************
if options.initialize:
if options.compression:
self.compression = options.compression
self.set_metadata(type=options.initialize, compression=self.compression)
self.get_metadata(options);
self.check_date_input(options)
if options.version_id or (options.extract and options.version_index):
if options.version_index:
vol_ver = self.version_lookup(options.version_index)
else:
# Note version_id info overrides any date input above.
vol_ver = self.get_version(options.version_id)
if not vol_ver:
stop_err("Error: Given version number or name does not exist in this database")
(volume, version) = vol_ver
self.volume_id = volume['id']
self.version_id = version['id']
self.dateTime = float(version['created'])
else:
# Use latest version by default
if not self.version_id and len(self.metadata['volumes'][-1]['versions']) > 0:
self.volume_id = self.metadata['volumes'][-1]['id']
self.version_id = self.metadata['volumes'][-1]['versions'][-1]['id']
# ************************** Action triggers **************************
if options.volume == True:
# Add a new volume to the metadata
self.metadata_create_volume()
self.write_metadata(self.metadata)
if options.db_import_file_path != None:
# Any time an import file is specified, this is the only action:
self.try_import_file(options)
return
if options.metadata == True:
# Writes metadata to disk or stdout
self.write_metadata2(self.metadata)
return
if options.extract == True:
# Defaults to pulling latest version
if not (self.version_id):
stop_err('Error: Please supply a version id (-n [number]) or date (-d [date]) to extract.')
if self.output_file and os.path.isdir(self.output_file):
# A general output file name for the data store as a whole
output_name = self.metadata['file_name']
if output_name == '':
# Get output file name from version's original import file_name
output_name = self.metadata['volumes'][self.volume_id-1]['versions'][self.version_id-1]['file_name']
# But remove the .gz suffix if it is there (refactor later).
if output_name[-3:] == '.gz':
output_name = output_name[0:-3]
self.output_file = os.path.join(self.output_file, output_name)
self.db_scan_action(KEYDB_EXTRACT)
return
if options.revert == True:
if not (options.version_id or options.dateTime or options.unixTime):
stop_err('Error: Please supply a version id (-n [number]) or date (-d [date]) to revert to.')
# Send database back to given revision
if self.output_file and self.output_file == os.path.dirname(self.db_master_file_path):
self.output_file = self.get_db_path()
self.db_scan_action(KEYDB_REVERT)
return
# Default to list datastore versions
self.get_list()
def get_db_path(self, volume_id = None):
#Note: metadata must be established before this method is called.
if volume_id is None: volume_id = self.volume_id
return self.db_master_file_path + '_' + str(volume_id) + self.metadata['compression']
def get_temp_output_file(self, action = None, path=None):
# Returns write handle (+name) of temp file. Returns gzip interface if compression is on.
if path == None:
path = self.output_file
temp = tempfile.NamedTemporaryFile(mode='w+t',delete=False, dir=os.path.dirname(path) )
# If compression is called for, then we have to switch to gzip handler on the temp name:
if action in [KEYDB_REVERT, KEYDB_IMPORT] and self.metadata['compression'] == '.gz':
temp.close()
temp = myGzipFile(temp.name, 'wb')
return temp
def get_list(self):
volumes = self.metadata['volumes']
for ptr in range(0, len(volumes)):
volume = volumes[ptr]
if ptr < len(volumes)-1:
ceiling = str(volumes[ptr+1]['floor_id'] - 1)
else:
ceiling = ''
print "Volume " + str(ptr+1) + ", Versions " + str(volume['floor_id']) + "-" + ceiling
for version in volume['versions']:
print str(version['id']) + ": " + self.dateISOFormat(float(version['created'])) + '_v' + version['name']
def set_metadata(self, type='text', compression=''):
"""
Request to initialize metadata file
Output metadata to stdio or to -o output file by way of temp file.
If one doesn't include -o, then output goes to stdio;
If one includes only -o, then output overwrites .md file.
If one includes -o [filename] output overwrites [filename]
Algorithm processes each line as it comes in database. This means there
is no significance to the version_ids ordering; earlier items in list can
in fact be later versions of db. So must resort and re-assign ids in end.
@param type string text or fasta etc.
"""
if os.path.isfile(self.metadata_file_path):
stop_err('Error: Metadata file "%s" exists. You must remove it before generating a new one.' % (self.metadata_file_path) )
self.metadata_create(type, compression)
volumes = glob.glob(self.db_master_file_path + '_[0-9]*')
volumes.sort(key=lambda x: natural_sort_key(x))
for volume in volumes:
# Note: scanned volumes must be consecutive from 1. No error detection yet.
self.metadata_create_volume(False)
versions = self.metadata['volumes'][-1]['versions']
import_modified = os.path.getmtime(volume)
dbReader = bigFileReader(volume)
version_ids = []
db_key_value = dbReader.read()
old_key = ''
while db_key_value:
(created_vid, deleted_vid, db_key, restofline) = db_key_value.split(self.delim, 3)
version = versions[self.version_dict_lookup(version_ids, long(created_vid), import_modified)]
version['rows'] +=1
if old_key != db_key:
version['keys'] +=1
old_key = db_key
version['inserts'] += 1
if deleted_vid:
version = versions[self.version_dict_lookup(version_ids, long(deleted_vid), import_modified)]
version['deletes'] += 1
db_key_value = dbReader.read()
# Reorder, and reassign numeric version ids:
versions.sort(key=lambda x: x['id'])
for ptr, version in enumerate(versions):
version['id'] = ptr+1
# If first master db volume doesn't exist, then this is an initialization situation
if len(volumes) == 0:
self.metadata_create_volume()
self.create_volume_file()
with open(self.metadata_file_path,'w') as metadata_handle:
metadata_handle.write(json.dumps(self.metadata, sort_keys=True, indent=4, separators=(',', ': ')))
return True
def get_metadata(self, options):
"""
Read in json metadata from file, and set file processor [fasta|text] engine accordingly.
"""
if not os.path.isfile(self.metadata_file_path):
#stop_err('Error: Metadata file "%s" does not exist. You must regenerate it with the -m option before performing other actions.' % (self.metadata_file_path) )
stop_err('Error: Unable to locate the "%s" metadata file. It should accompany the "%s" file. Use the -M parameter to initialize or regenerate the basic file.' % (self.metadata_file_path, self.db_master_file_name) )
with open(self.metadata_file_path,'r') as metadata_handle:
self.metadata = json.load(metadata_handle)
# ******************* Select Kipper Pre/Post Processor **********************
# FUTURE: More processor options here - including custom ones referenced in metadata
if self.metadata['type'] == 'fasta':
self.processor = VDBFastaProcessor() # for fasta sequence databases
else:
self.processor = VDBProcessor() # default text
# Handle any JSON metadata defaults here for items that aren't present in previous databases.
if not 'compression' in self.metadata:
self.metadata['compression'] = ''
def write_metadata(self, content):
"""
Called when data store changes occur (revert and import).
If they are going to stdout then don't stream metadata there too.
"""
if self.output_file: self.write_metadata2(content)
def write_metadata2(self,content):
with (open(self.metadata_file_path,'w') if self.output_file else sys.stdout) as metadata_handle:
metadata_handle.write(json.dumps(content, sort_keys=True, indent=4, separators=(',', ': ')))
def metadata_create(self, type, compression, floor_id=1):
"""
Initial metadata structure
"""
file_name = self.db_master_file_name.rsplit('.',1)
self.metadata = {
'version': CODE_VERSION,
'name': self.db_master_file_name,
'db_file_name': self.db_master_file_name,
# A guess about what best base file name would be to write versions out as
'file_name': file_name[0] + '.' + type,
'type': type,
'description': '',
'processor': '', # Processing that overrides type-matched processor.
'compression': self.compression,
'volumes': []
}
def metadata_create_volume(self, file_create = True):
# Only add a volume if previous volume has at least 1 version in it.
if len(self.metadata['volumes']) == 0 or len(self.metadata['volumes'][-1]['versions']) > 0:
id = len(self.metadata['volumes']) + 1
volume = {
'floor_id': self.get_last_version()+1,
'id': id,
'versions': []
}
self.metadata['volumes'].append(volume)
self.volume_id = id
if file_create:
self.create_volume_file()
return id
else:
stop_err("Error: Didn't create a new volume because last one is empty already.")
def create_volume_file(self):
if self.metadata['compression'] == '.gz':
gzip.open(self.get_db_path(), 'wb').close()
else:
open(self.get_db_path(),'w').close()
def metadata_create_version(self, mydate, file_name = '', file_size = 0, version_name = None):
id = self.get_last_version()+1
if version_name == None:
version_name = str(id)
version = {
'id': id,
'created': mydate,
'name': version_name,
'file_name': file_name,
'file_size': file_size,
'inserts': 0,
'deletes': 0,
'rows': 0,
'keys': 0
}
self.metadata['volumes'][-1]['versions'].append(version)
return version
def get_version(self, version_id = None):
if version_id is None:
version_id = self.version_id
for volume in self.metadata['volumes']:
for version in volume['versions']:
if version_id == version['id']:
return (volume, version)
return False
def version_lookup(self, version_name):
for volume in self.metadata['volumes']:
for version in volume['versions']:
if version_name == version['name']:
return (volume, version)
return False
def version_dict_lookup(self, version_ids, id, timestamp = None):
if id not in version_ids:
version_ids.append(id)
version = self.metadata_create_version(timestamp)
return version_ids.index(id)
#****************** Methods Involving Scan of Master Kipper file **********************
def db_scan_action (self, action):
"""
#Python 2.6 needs this reopened if it was previously closed.
#sys.stdout = open("/dev/stdout", "w")
"""
dbReader = bigFileReader(self.get_db_path())
# Setup temp file:
if self.output_file:
temp_file = self.get_temp_output_file(action=action)
# Use temporary file so that db_output_file_path switches to new content only when complete
with (temp_file if self.output_file else sys.stdout) as output:
db_key_value = dbReader.read()
while db_key_value:
if action == KEYDB_EXTRACT:
okLines = self.version_extract(db_key_value)
elif action == KEYDB_REVERT:
okLines = self.version_revert(db_key_value)
if okLines:
output.writelines(okLines)
db_key_value = dbReader.read()
# Issue: metadata lock while quick update with output_file???
if self.output_file:
if action == KEYDB_EXTRACT:
self.processor.postprocess_file(temp_file.name)
# Is there a case where we fail to get to this point?
os.rename(temp_file.name, self.output_file)
if action == KEYDB_REVERT:
# When reverting, clear all volumes having versions > self.version_id
# Takes out volume structure too.
volumes = self.metadata['volumes']
for volptr in range(len(volumes)-1, -1, -1):
volume = volumes[volptr]
if volume['floor_id'] > self.version_id: #TO REVERT IS TO KILL ALL LATER VOLUMES.
os.remove(self.get_db_path(volume['id']))
versions = volume['versions']
for verptr in range(len(versions)-1, -1, -1):
if versions[verptr]['id'] > self.version_id:
popped = versions.pop(verptr)
if len(versions) == 0 and volptr > 0:
volumes.pop(volptr)
self.write_metadata(self.metadata)
def db_scan_line(self, db_key_value):
"""
FUTURE: transact_code will signal how key/value should be interpreted, to
allow for differential change storage from previous entries.
"""
# (created_vid, deleted_vid, transact_code, restofline) = db_key_value.split(self.delim,3)
(created_vid, deleted_vid, restofline) = db_key_value.split(self.delim,2)
if deleted_vid: deleted_vid = long(deleted_vid)
return (long(created_vid), deleted_vid, restofline)
def version_extract(self, db_key_value):
(created_vid, deleted_vid, restofline) = self.db_scan_line(db_key_value)
if created_vid <= self.version_id and (not deleted_vid or deleted_vid > self.version_id):
return self.processor.postprocess_line(restofline)
return False
def version_revert(self, db_key_value):
"""
Reverting database here.
"""
(created_vid, deleted_vid, restofline) = self.db_scan_line(db_key_value)
if created_vid <= self.version_id:
if (not deleted_vid) or deleted_vid <= self.version_id:
return [str(created_vid) + self.delim + str(deleted_vid) + self.delim + restofline]
else:
return [str(created_vid) + self.delim + self.delim + restofline]
return False
def check_date_input(self, options):
"""
"""
if options.unixTime != None:
try:
_userTime = float(options.unixTime)
# if it is not a float, triggers exception
except ValueError:
stop_err("Given Unix time could not be parsed [" + options.unixTime + "]. Format should be [integer]")
elif options.dateTime != None:
try:
_userTime = parse_date(options.dateTime)
except ValueError:
stop_err("Given date could not be parsed [" + options.dateTime + "]. Format should include at least the year, and any of the other more granular parts, in order: YYYY/MM/DD [H:M:S AM/PM]")
else:
return False
_dtobject = datetime.datetime.fromtimestamp(float(_userTime)) #
self.dateTime = long(_dtobject.strftime("%s"))
# Now see if we can set version_id by it. We look for version_id that has created <= self.dateTime
for volume in self.metadata['volumes']:
for version in volume['versions']:
if version['created'] <= self.dateTime:
self.version_id = version['id']
self.volume_id = volume['id']
else:
break
return True
def check_folder(self, file_path, message = "Output directory for "):
"""
Ensures file folder path for output file exists.
We don't want to create output in a mistaken location.
"""
if file_path != None:
path = os.path.normpath(file_path)
if not os.path.isdir(os.path.dirname(path)):
# Not an absolute path, so try default folder where script launched from:
path = os.path.normpath(os.path.join(os.getcwd(), path) )
if not os.path.isdir(os.path.dirname(path)):
stop_err(message + "[" + path + "] does not exist!")
return path
return None
def check_file_path(self, file, message = "File "):
path = os.path.normpath(file)
# make sure any relative paths are converted to absolute ones
if not os.path.isdir(os.path.dirname(path)) or not os.path.isfile(path):
# Not an absolute path, so try default folder where script was called:
path = os.path.normpath(os.path.join(os.getcwd(),path) )
if not os.path.isfile(path):
stop_err(message + "[" + path + "] doesn't exist!")
return path
def try_import_file(self, options):
"""
Create new version from comparison of import data file against Kipper
Note "-o ." parameter enables writing back to master database.
"""
self.db_import_file_path = self.check_file_path(options.db_import_file_path, "Import data file ")
check_file = self.processor.preprocess_validate_file(self.db_import_file_path)
if not check_file:
stop_err("Import data file isn't sorted or composed correctly!")
# SET version date to creation date of import file.
import_modified = os.path.getmtime(self.db_import_file_path)
original_name = os.path.basename(self.db_import_file_path)
# creates a temporary file, which has conversion into 1 line key-value records
temp = self.processor.preprocess_file(self.db_import_file_path)
if (temp):
self.db_import_file_path = temp.name
self.import_file(original_name, import_modified, options.version_index)
os.remove(temp.name)
def import_file(self, file_name, import_modified, version_index = None):
"""
Imports from an import file (or temp file if transformation done above) to
temp Kipper version which is copied over to main database on completion.
Import algorithm only works if the import file is already sorted in the same way as the Kipper database file
@uses self.db_import_file_path string A file full of one line key[tab]value records.
@uses self.output_file string A file to save results in. If empty, then stdio.
@uses dateTime string Date time to mark created/deleted records by.
@puses delim char Separator between key/value pairs.ake it the function.
@param file_name name of file being imported. This is stored in version record so that output file will be the same.
"""
delim = self.delim
file_size = os.path.getsize(self.db_import_file_path)
if version_index == None:
version_index = str(self.get_last_version()+1)
self.volume_id = self.metadata['volumes'][-1]['id'] #For get_db_path() call below.
if self.output_file:
temp_file = self.get_temp_output_file(action=KEYDB_IMPORT, path=self.get_db_path())
# We want to update database here when output file is db itself.
if os.path.isdir(self.output_file):
self.output_file = self.get_db_path()
version = self.metadata_create_version(import_modified, file_name, file_size, version_index)
version_id = str(version['id'])
with (temp_file if self.output_file else sys.stdout) as outputFile :
dbReader = bigFileReader(self.get_db_path())
importReader = bigFileReader(self.db_import_file_path)
old_import_key=''
while True:
db_key_value = dbReader.turn()
#if import_key_value
import_key_value = importReader.turn()
# Skip empty or whitespace lines:
if import_key_value and len(import_key_value.lstrip()) == 0:
import_key_value = importReader.read()
continue
if not db_key_value: # eof
while import_key_value: # Insert remaining import lines:
(import_key, import_value) = self.get_key_value(import_key_value)
outputFile.write(version_id + delim + delim + import_key + delim + import_value)
import_key_value = importReader.read()
version['inserts'] += 1
version['rows'] += 1
if import_key != old_import_key:
version['keys'] += 1
old_import_key = import_key
break # Both inputs are eof, so exit
elif not import_key_value: # db has key that import file no longer has, so mark each subsequent db line as a delete of the key (if it isn't already)
while db_key_value:
(created_vid, deleted_vid, dbKey, dbValue) = db_key_value.split(delim,3)
version['rows'] += 1
if deleted_vid:
outputFile.write(db_key_value)
else:
outputFile.write(created_vid + delim + version_id + delim + dbKey + delim + dbValue)
version['deletes'] += 1
db_key_value = dbReader.read()
break
else:
(import_key, import_value) = self.get_key_value(import_key_value)
(created_vid, deleted_vid, dbKey, dbValue) = db_key_value.split(delim,3)
if import_key != old_import_key:
version['keys'] += 1
old_import_key = import_key
# All cases below lead to writing a row ...
version['rows'] += 1
if import_key == dbKey:
# When the keys match, we have enough information to act on the current db_key_value content;
# therefore ensure on next pass that we read it.
dbReader.step()
if import_value == dbValue:
outputFile.write(db_key_value)
# All past items marked with insert will also have a delete. Step until we find one
# not marked as a delete... or a new key.
if deleted_vid: # Good to go in processing next lines in both files.
pass
else:
importReader.step()
else: # Case where value changed - so process all db_key_values until key no longer matches.
# Some future pass will cause import line to be written to db
# (when key mismatch occurs) as long as we dont advance it (prematurely).
if deleted_vid:
#preserve deletion record.
outputFile.write(db_key_value)
else:
# Mark record deletion
outputFile.write(created_vid + delim + version_id + delim + dbKey + delim + dbValue)
version['deletes'] += 1
# Then advance since new key/value means new create
else:
# Natural sort doesn't do text sort on numeric parts, ignores capitalization.
dbKeySort = natural_sort_key(dbKey)
import_keySort = natural_sort_key(import_key)
# False if dbKey less; Means db key is no longer in sync db,
if cmp(dbKeySort, import_keySort) == -1:
if deleted_vid: #Already marked as a delete
outputFile.write(db_key_value)
else: # Write dbKey as a new delete
outputFile.write(created_vid + delim + version_id + delim + dbKey + delim + dbValue)
version['deletes'] += 1
# Advance ... there could be another db_key_value for deletion too.
dbReader.step()
else: #DB key is greater, so insert import_key,import_value in db.
# Write a create record
outputFile.write(version_id + delim + delim + import_key + delim + import_value)
version['inserts'] += 1
importReader.step() # Now compare next two candidates.
if self.output_file:
# Kipper won't write an empty version - since this is usually a mistake.
# If user has just added new volume though, then slew of inserts will occur
# even if version is identical to tail end of previous volume version.
if version['inserts'] > 0 or version['deletes'] > 0:
#print "Temp file:" + temp_file.name
os.rename(temp_file.name, self.output_file)
self.write_metadata(self.metadata)
else:
os.remove(temp_file.name)
def get_last_version(self):
"""
Returns first Volume version counting from most recent.
Catch is that some volume might be empty, so have to go to previous one
"""
for ptr in range(len(self.metadata['volumes'])-1, -1, -1):
versions = self.metadata['volumes'][ptr]['versions']
if len(versions) > 0:
return versions[-1]['id']
return 0
# May want to move this to individual data store processor since it can be sensitive to different kinds of whitespace then.
def get_key_value(self, key_value):
# ACCEPTS SPLIT AT ANY WHITESPACE PAST KEY BY DEFAULT
kvparse = key_value.split(None,1)
#return (key_value[0:kvptr], key_value[kvptr:].lstrip())
return (kvparse[0], kvparse[1] if len(kvparse) >1 else '')
def dateISOFormat(self, atimestamp):
return datetime.datetime.isoformat(datetime.datetime.fromtimestamp(atimestamp))
def get_command_line(self):
"""
*************************** Parse Command Line *****************************
"""
parser = MyParser(
description = 'Maintains versions of a file-based database with comparison to full-copy import file updates.',
usage = 'kipper.py [kipper database file] [options]*',
epilog="""
All outputs go to stdout and affect no change in Kipper database unless the '-o' parameter is supplied. (The one exception to this is when the -M regenerate metadata command is provided, as described below.) Thus by default one sees what would happen if an action were taken, but must take an additional step to affect the data.
'-o .' is a special request that leads to:
* an update of the Kipper database for --import or --revert actions
* an update of the .md file for -M --rebuild action
As well, when -o parameter is a path, and not a specific filename, then kipper.py looks up what the appropriate output file name is according to the metadata file.
USAGE
Initialize metadata file and Kipper file.
kipper.py [database file] -M --rebuild [type of database:text|fasta]
View metadata (json) file.
kipper.py [database file] -m --metadata
Import key/value inserts/deletes based on import file. (Current date used).
kipper.py [database file] -i --import [import file]
e.g.
kipper.py cpn60 -i sequences.fasta # outputs new master database to stdout; doesn't rewrite it.
kipper.py cpn60 -i sequences.fasta -o . # rewrites cpn60 with new version added.
Extract a version of the file based on given date/time
kipper.py [database file] -e --extract -d datetime -o [output file]
Extract a version of the file based on given version Id
kipper.py [database file] -e --extract -n [version id] -o [output file]
List versions of dbFile key/value pairs (by date/time)
kipper.py [database file]
kipper.py [database file] -l --list
Have database revert to previous version. Drops future records, unmarks corresponding deletes.
kipper.py [database file] -r --revert -d datetime -o [output file]
Return version of this code:
kipper.py -v --version
""")
# Data/Metadata changing actions
parser.add_option('-M', '--rebuild', type='choice', dest='initialize', choices=['text','fasta'],
help='(Re)generate metadata file [name of db].md . Provide the type of db [text|fasta| etc.].')
parser.add_option('-i', '--import', type='string', dest='db_import_file_path',
help='Import key/value inserts/deletes based on delta comparison with import file')
parser.add_option('-e', '--extract', dest='extract', default=False, action='store_true',
help='Extract a version of the file based on given date/time')
parser.add_option('-r', '--revert', dest='revert', default=False, action='store_true',
help='Have database revert to previous version (-d date/time required). Drops future records, unmarks corresponding deletes.')
parser.add_option('-V', '--volume', dest='volume', default=False, action='store_true',
help='Add a new volume to the metadata. New imports will be added here.')
# Passive actions
parser.add_option('-m', '--metadata', dest='metadata', default=False, action='store_true',
help='View metadata file [name of db].md')
parser.add_option('-l', '--list', dest='list', default=False, action='store_true',
help='List versions of dbFile key/value pairs (by date/time)')
parser.add_option('-c', '--compression', dest='compression', type='choice', choices=['.gz'],
help='Enable compression of database. options:[.gz]')
# Used "v" for standard code version identifier.
parser.add_option('-v', '--version', dest='code_version', default=False, action='store_true',
help='Return version of kipper.py code.')
parser.add_option('-o', '--output', type='string', dest='db_output_file_path',
help='Output to this file. Default is to stdio')
parser.add_option('-I', '--index', type='string', dest='version_index',
help='Provide title (index) e.g. "1.4" of version being imported/extracted.')
parser.add_option('-d', '--date', type='string', dest='dateTime',
help='Provide date/time for sync, extract or revert operations. Defaults to now.')
parser.add_option('-u', '--unixTime', type='int', dest='unixTime',
help='Provide Unix time (integer) for sync, extract or revert operations.')
parser.add_option('-n', '--number', type='int', dest='version_id',
help='Provide a version id to extract or revert to.')
return parser.parse_args()
class VDBProcessor(object):
delim = '\t'
nl = '\n'
#def preprocess_line(self, line):
# return [line]
def preprocess_file(self, file_path):
temp = tempfile.NamedTemporaryFile(mode='w+t',delete=False, dir=os.path.dirname(file_path) )
copy (file_path, temp.name)
temp.close()
sort_a = subprocess.call(['sort','-sfV','-t\t','-k1,1', '-o',temp.name, temp.name])
return temp #Enables temp file name to be used by caller.
def preprocess_validate_file(self, file_path):
# Do import file preprocessing:
# 1) Mechanism to verify if downloaded file is complete - check md5 hash?
# 4) Could test file.newlines(): returns \r, \n, \r\n if started to read file (1st line).
# 5) Could auto-uncompress .tar.gz, bz2 etc.
# Ensures "[key] [value]" entries are sorted
# "sort --check ..." returns nothing if sorted, or e.g "sort: sequences_A.fastx.sorted:12: disorder: >114 AJ009959.1 … "
# if not subprocess.call(['sort','--check','-V',db_import_file_path]): #very fast check
# subprocess.call(['sort','-V',db_import_file_path]):
return True
def postprocess_line(self, line):
#Lines are placed in array so that one can map to many in output file
return [line]
def postprocess_file(self, file_path):
return False
def sort(self, a, b):
pass
class VDBFastaProcessor(VDBProcessor):
def preprocess_file(self, file_path):
"""
Converts input fasta data into one line tab-delimited record format, then sorts.
"""
temp = tempfile.NamedTemporaryFile(mode='w+t',delete=False, dir=os.path.dirname(file_path) )
fileReader = bigFileReader(file_path)
line = fileReader.read()
old_line = ''
while line:
line = line.strip()
if len(line) > 0:
if line[0] == '>':
if len(old_line):
temp.write(old_line + self.nl)
lineparse = line.split(None,1)
key = lineparse[0].strip()
if len(lineparse) > 1:
description = lineparse[1].strip().replace(self.delim, ' ')
else:
description = ''
old_line = key[1:] + self.delim + description + self.delim
else:
old_line = old_line + line
line = fileReader.read()
if len(old_line)>0:
temp.write(old_line+self.nl)
temp.close()
import stat
# TEST sort failure: set it so temp file is read only.
#mode = os.stat(temp.name)[stat.ST_MODE]
#os.chmod(temp.name,mode & ~0222)
# Is this a consideration for natural sort in Python vs bash sort?:
# *** WARNING *** The locale specified by the environment affects sort order.
# Set LC_ALL=C to get the traditional sort order that uses native byte values.
#-s stable; -f ignore case; V natural sort (versioning) ; -k column, -t tab delimiter
try:
subprocess.check_call(['sort', '-sfV', '-t\t', '-k1,1', '-o',temp.name, temp.name])
except subprocess.CalledProcessError as e:
stop_err("Error: Sort of import file could not be completed, sort() returned error code " + str(e.returncode) )
except OSError as e:
stop_err("Error: Sort of import file could not be completed, sort command or %s file not found?" % temp.name)
return temp #Enables temp file name to be used by caller.
def postprocess_line(self, line):
"""
Transform Kipper fasta 1 line format key/value back into output file line(s) - an array
@param line string containing [accession id][TAB][description][TAB][fasta sequence]
@return string containing lines each ending with newline, except end.
"""
line_data = line.split('\t',2)
# Set up ">[accession id] [description]\n" :
fasta_header = '>' + ' '.join(line_data[0:2]) + '\n'
# Put fasta sequences back into multi-line; note trailing item has newline.
sequences= self.split_len(line_data[2],80)
if len(sequences) and sequences[-1].strip() == '':
sequences[-1] = ''
return fasta_header + '\n'.join(sequences)
def split_len(self, seq, length):
return [seq[i:i+length] for i in range(0, len(seq), length)]
class bigFileReader(object):
"""
This provides some advantage over reading line by line, and as well has a system
for skipping/not advancing reads - it has a memory via "take_step" about whether
it should advance or not - this is used when the master database and the import
database are feeding lines into a new database.
Interestingly, using readlines() with byte hint parameter less
than file size improves performance by at least 30% over readline().
FUTURE: Adjust buffer lines dynamically based on file size/lines ratio?
"""
def __init__(self, filename):
self.lines = []
# This simply allows any .gz repository to be opened
# It isn't connected to the Kipper metadata['compression'] feature.
if filename[-3:] == '.gz':
self.file = gzip.open(filename,'rb')
else: