-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathk4.go
2023 lines (1680 loc) · 54.1 KB
/
k4.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Package k4
// BSD 3-Clause License
// @depreciated please refer to github.com/guycipher/k4/v2 for the latest version
// Copyright (c) 2024, Alex Gaetano Padula
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
// FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
// DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
// OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
package k4
import (
"bytes"
"encoding/gob"
"fmt"
"github.com/guycipher/k4/compressor"
"github.com/guycipher/k4/hashset"
"github.com/guycipher/k4/pager"
"github.com/guycipher/k4/skiplist"
"log"
"os"
"sort"
"sync"
"time"
)
const SSTABLE_EXTENSION = ".sst" // The SSTable file extension
const LOG_EXTENSION = ".log" // The log file extension
const WAL_EXTENSION = ".wal" // The write ahead log file extension
const TOMBSTONE_VALUE = "$tombstone" // The tombstone value
const COMPRESSION_WINDOW_SIZE = 1024 * 32 // The compression window size
const BACKGROUND_OP_SLEEP = 5 * time.Microsecond // The background sleep time for the background operations
// K4 is the main structure for the k4 database
type K4 struct {
sstables []*SSTable // in memory sstables. We just keep the opened file descriptors
sstablesLock *sync.RWMutex // read write lock for sstables
memtable *skiplist.SkipList // in memory memtable (skip list)
memtableLock *sync.RWMutex // read write lock for memtable
memtableFlushThreshold int // in bytes
memtableMaxLevel int // the maximum level of the memtable (default 12)
memtableP float64 // the probability of the memtable (default 0.25)
compactionInterval int // in seconds, pairs up sstables and merges them
directory string // the directory where the database files are stored
lastCompaction time.Time // the last time a compaction was run
transactions []*Transaction // in memory transactions
transactionsLock *sync.RWMutex // read write lock for transactions
logging bool // whether or not to log to the log file
logFile *os.File // the log file
wal *pager.Pager // the write ahead log
wg *sync.WaitGroup // wait group for the wal
walQueue []*Operation // the write ahead log queue
walQueueLock *sync.Mutex // mutex for the wal queue
exit chan struct{} // channel to signal the background operations to exit
compress bool // whether to compress the keys and their values
flushQueue []*skiplist.SkipList // queue for flushing memtables to disk
flushQueueLock *sync.Mutex // mutex for the flush queue
}
// SSTable is the structure for the SSTable files
type SSTable struct {
pager *pager.Pager // the pager for the sstable file
lock *sync.RWMutex // read write lock for the sstable
compressed bool // whether the sstable is compressed; this gets set when the sstable is created, the configuration is passed from K4
}
// Transaction is the structure for the transactions
type Transaction struct {
id int64 // Unique identifier for the transaction
ops []*Operation // List of operations in the transaction
lock *sync.RWMutex // The lock for the transaction
}
// Operation Used for transaction operations and WAL
type Operation struct {
Op OPR_CODE // Operation code
Key []byte // Key for the operation
Value []byte // Value for the operation
Rollback *Operation // Pointer to the operation that will undo this operation
} // fields must be exported for gob
type OPR_CODE int // Operation code
const (
PUT OPR_CODE = iota
DELETE
GET
)
// SSTableIterator is the structure for the SSTable iterator
type SSTableIterator struct {
pager *pager.Pager // the pager for the sstable file
currentPage int // the current page
lastPage int // the last page in the sstable
compressed bool // whether the sstable is compressed
}
// WALIterator is the structure for the WAL iterator
type WALIterator struct {
pager *pager.Pager // the pager for the wal file
currentPage int // the current page
lastPage int // the last page in the wal
compressed bool // whether the wal is compressed; this gets set when the sstable is created, the configuration is passed from K4
}
// KV mainly used for serialization
type KV struct {
Key []byte // Binary array of key
Value []byte // Binary array of keys value
TTL *time.Time // Time to live
}
// KeyValueArray type to hold a slice of KeyValue's
type KeyValueArray []*KV
// Iterator is a structure for an iterator which goes through
// memtable and sstables. First it goes through the memtable, then once exhausted goes through the sstables
type Iterator struct {
instance *K4 // the instance of K4
memtableIter *skiplist.SkipListIterator // memtable iterator
sstablesIter []*SSTableIterator // an iterator for each sstable
currentKey []byte // the current key
currentValue []byte // the current value
sstIterIndex int // the current sstable iterator index
prevStarted bool // whether the previous function was called
}
// Open opens a new K4 instance at the specified directory.
// will reopen the database if it already exists
// directory - the directory where the database files are stored
// memtableFlushThreshold - the threshold in bytes for flushing the memtable to disk
// compactionInterval - the interval in seconds for running compactions
// logging - whether or not to log to the log file
func Open(directory string, memtableFlushThreshold int, compactionInterval int, logging, compress bool, args ...interface{}) (*K4, error) {
// Create directory if it doesn't exist
err := os.MkdirAll(directory, 0755) // MkdirAll does nothing if directory exists..
if err != nil {
return nil, err
}
// Register *time.Time with gob
gob.Register(&time.Time{})
// Initialize K4
k4 := &K4{
memtableLock: &sync.RWMutex{},
directory: directory,
memtableFlushThreshold: memtableFlushThreshold,
compactionInterval: compactionInterval,
sstables: make([]*SSTable, 0),
sstablesLock: &sync.RWMutex{},
lastCompaction: time.Now(),
transactions: make([]*Transaction, 0),
transactionsLock: &sync.RWMutex{},
logging: logging,
wg: &sync.WaitGroup{},
walQueue: make([]*Operation, 0),
walQueueLock: &sync.Mutex{},
exit: make(chan struct{}),
compress: compress,
flushQueue: make([]*skiplist.SkipList, 0),
flushQueueLock: &sync.Mutex{},
}
// Check for max level and probability for memtable (skiplist)
// this is optional
if len(args) > 0 { // if there are arguments
// First argument should be max level
if maxLevel, ok := args[0].(int); ok {
k4.memtableMaxLevel = maxLevel
} else { // if not provided, default to 12
k4.memtableMaxLevel = 12
}
// Check for p
if len(args) > 1 { // if there are more arguments
// the argument after max level should be a probability
if p, ok := args[1].(float64); ok {
k4.memtableP = p
} else { // if not provided, default to 0.25
k4.memtableP = 0.25
}
}
} else { // If no optional memtable arguments, set defaults
k4.memtableMaxLevel = 12
k4.memtableP = 0.25
}
k4.memtable = skiplist.NewSkipList(k4.memtableMaxLevel, k4.memtableP) // Set the memtable
// Load SSTables
// We open sstable files in the configured directory
k4.loadSSTables()
// If logging is set we will open a logging file, so we can write to it
if logging {
// Create log file
logFile, err := os.OpenFile(directory+string(os.PathSeparator)+LOG_EXTENSION, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
// Set log output to the log file
log.SetOutput(logFile)
// Set log file in K4
k4.logFile = logFile
}
// open the write ahead log
wal, err := pager.OpenPager(directory+string(os.PathSeparator)+WAL_EXTENSION, os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
// Set wal in K4
k4.wal = wal
// Start the background wal writer
k4.wg.Add(1)
go k4.backgroundWalWriter() // start the background wal writer
k4.printLog("Background WAL writer started")
// Start the background flusher
k4.wg.Add(1)
go k4.backgroundFlusher() // start the background flusher
k4.printLog("Background flusher started")
// Start the background compactor
k4.wg.Add(1)
go k4.backgroundCompactor() // start the background compactor
k4.printLog("Background compactor started")
k4.printLog("K4 opened successfully")
return k4, nil
}
// Close closes the K4
func (k4 *K4) Close() error {
k4.printLog("Closing up")
if k4.memtable.Size() > 0 {
k4.printLog(fmt.Sprintf("Memtable is of size %d bytes and must be flushed to disk", k4.memtable.Size()))
k4.appendMemtableToFlushQueue()
}
close(k4.exit)
k4.printLog("Waiting for background operations to finish")
// wait for the background operations to finish
k4.wg.Wait()
k4.printLog("Background operations finished")
k4.printLog("Closing SSTables")
// Close SSTables
for _, sstable := range k4.sstables {
err := sstable.pager.Close()
if err != nil {
return err
}
}
k4.printLog("SSTables closed")
// Close WAL
if k4.wal != nil {
k4.printLog("Closing WAL")
err := k4.wal.Close()
if err != nil {
return err
}
k4.printLog("WAL closed")
}
if k4.logging {
// Close log file
err := k4.logFile.Close()
if err != nil {
return err
}
}
return nil
}
// printLog prints a log message to the log file or stdout
// takes a string message
func (k4 *K4) printLog(msg string) {
if k4.logging {
log.Println(msg) // will log to the log file
}
}
// backgroundWalWriter writes operations to the write ahead log
// This function runs in the background and pops operations from the wal queue and writes
// to write ahead log. The reason we do this is to optimize write speed
func (k4 *K4) backgroundWalWriter() {
defer k4.wg.Done() // Defer completion of routine
for {
select {
case <-k4.exit:
// Escalate what hasn't been written to the wal
k4.walQueueLock.Lock() // lock the wal queue
for _, op := range k4.walQueue {
data := serializeOp(op.Op, op.Key, op.Value)
_, err := k4.wal.Write(data)
if err != nil {
k4.printLog(fmt.Sprintf("Failed to write to WAL: %v", err))
}
}
k4.walQueueLock.Unlock() // unlock the wal queue
// break out
return
default:
k4.walQueueLock.Lock() // lock up the wal queue
if len(k4.walQueue) > 0 { // Check if there are operations in the wal queue
op := k4.walQueue[0] // Get the first operation
k4.walQueue = k4.walQueue[1:] // Remove the first operation
k4.walQueueLock.Unlock() // Unlock the wal queue
// Serialize operation
data := serializeOp(op.Op, op.Key, op.Value)
// Write to WAL
_, err := k4.wal.Write(data)
if err != nil {
k4.printLog(fmt.Sprintf("Failed to write to WAL: %v", err)) // Log error
}
} else {
k4.walQueueLock.Unlock() // Unlock the wal queue
time.Sleep(BACKGROUND_OP_SLEEP) // If you have a speedy loop your cpu will be cycled greatly
// What we do here is sleep for a tiny bit of time each iteration if no work is to be done
}
}
}
}
// serializeOp serializes an operation
func serializeOp(op OPR_CODE, key, value []byte) []byte {
var buf bytes.Buffer // create a buffer
enc := gob.NewEncoder(&buf) // create a new encoder with the buffer
// create an operation struct and initialize it
operation := Operation{
Op: op,
Key: key,
Value: value,
}
// encode the operation
err := enc.Encode(&operation)
if err != nil {
return nil
}
return buf.Bytes() // return the encoded bytes
}
// deserializeOp deserializes an operation
func deserializeOp(data []byte) (OPR_CODE, []byte, []byte, error) {
operation := Operation{} // The operation to be deserialized
dec := gob.NewDecoder(bytes.NewReader(data)) // Create a new decoder
err := dec.Decode(&operation) // Decode the operation
if err != nil {
return 0, nil, nil, err
}
return operation.Op, operation.Key, operation.Value, nil
}
// serializeKv serializes a key-value pair
func serializeKv(key, value []byte, ttl *time.Time) []byte {
var buf bytes.Buffer // create a buffer
enc := gob.NewEncoder(&buf) // create a new encoder with the buffer
// create a key value pair struct
kv := KV{
Key: key,
Value: value,
TTL: ttl,
}
// encode the key value pair
err := enc.Encode(kv)
if err != nil {
return nil
}
return buf.Bytes() // return the bytes
}
// deserializeKv deserializes a key-value pair
func deserializeKv(data []byte) (key, value []byte, ttl *time.Time, err error) {
kv := KV{} // The key value pair to be deserialized
dec := gob.NewDecoder(bytes.NewReader(data)) // Create a new decoder
err = dec.Decode(&kv) // Decode the key value pair
if err != nil {
return nil, nil, kv.TTL, err
}
return kv.Key, kv.Value, kv.TTL, nil
}
// loadSSTables loads SSTables from the directory
func (k4 *K4) loadSSTables() {
// Open configured K4 directory
dir, err := os.Open(k4.directory)
if err != nil {
k4.printLog(fmt.Sprintf("Failed to open directory: %v", err))
return
}
defer dir.Close() // defer closing the directory
// Read directory
files, err := dir.Readdir(-1)
if err != nil {
return
}
// Filter and sort files by modification time
var sstableFiles []os.FileInfo
for _, file := range files {
if !file.IsDir() && len(file.Name()) > len(SSTABLE_EXTENSION) && file.Name()[len(file.Name())-len(SSTABLE_EXTENSION):] == SSTABLE_EXTENSION {
sstableFiles = append(sstableFiles, file)
}
}
sort.Slice(sstableFiles, func(i, j int) bool {
return sstableFiles[i].ModTime().Before(sstableFiles[j].ModTime())
}) // sort the sstable files by modification time
// Open and append SSTables
for _, file := range sstableFiles {
sstablePager, err := pager.OpenPager(k4.directory+string(os.PathSeparator)+file.Name(), os.O_RDWR, 0644)
if err != nil {
// could possibly handle this better
k4.printLog(fmt.Sprintf("Failed to open sstable: %v", err))
continue
}
k4.sstables = append(k4.sstables, &SSTable{
pager: sstablePager,
lock: &sync.RWMutex{},
compressed: k4.compress,
}) // append the sstable to the list of sstables
}
}
// appendMemtableToFlushQueue appends the memtable to the flush queue clearing the memtable
// This opens up the memtable for new writes
func (k4 *K4) appendMemtableToFlushQueue() {
k4.flushQueueLock.Lock() // lock the flush queue
defer k4.flushQueueLock.Unlock() // unlock flush queue on defer
copyOfMemtable := k4.memtable.Copy() // copy the memtable
k4.flushQueue = append(k4.flushQueue, copyOfMemtable) // append the copy of the memtable to the flush queue
k4.memtable = skiplist.NewSkipList(k4.memtableMaxLevel, k4.memtableP) // clear the instance memtable to welcome to new writes
}
// flushMemtable flushes the memtable into an SSTable
func (k4 *K4) flushMemtable(memtable *skiplist.SkipList) error {
k4.printLog("Flushing memtable off flush queue")
// Create SSTable
sstable, err := k4.createSSTable()
if err != nil {
return err
}
// Create a new skiplist iterator
it := skiplist.NewIterator(memtable)
// first we will create a hashset which will be on initial pages of sstable
// we will add all the keys to the hashset
// then we will add the key value pairs to the sstable
// create a hashset
hs := hashset.NewHashSet()
// add all the keys to the hashset
for it.Next() {
// get the current key and value
key, val, _ := it.Current()
if key == nil {
continue
}
// Check if tombstone
if bytes.Equal(val, []byte(TOMBSTONE_VALUE)) {
continue // skip tombstones
}
hs.Add(key) // add key to hash set
}
// serialize the hashset
hsData, err := hs.Serialize()
if err != nil {
return err
}
// Write the hashset to the intitial pages of SSTable
_, err = sstable.pager.Write(hsData)
if err != nil {
return err
}
// We create another iterator to write the key value pairs to the sstable
it = skiplist.NewIterator(memtable)
for it.Next() {
key, value, ttl := it.Current()
if bytes.Equal(value, []byte(TOMBSTONE_VALUE)) {
continue // skip tombstones
}
// Check for compression
if k4.compress {
key, value, err = compressKeyValue(key, value) // compress key and value
if err != nil {
return err
}
}
// Serialize key-value pair
var data []byte
if ttl != nil {
expiry := time.Now().Add(*ttl)
data = serializeKv(key, value, &expiry)
} else {
data = serializeKv(key, value, nil)
}
// Write to SSTable
_, err := sstable.pager.Write(data)
if err != nil {
return err
}
}
// We only lock sstables array when we are appending a new sstable
// this is because we don't want to block reads while we are flushing the memtable only when we are appending a new sstable
k4.sstablesLock.Lock() // lock the sstables
// Append SSTable to list of SSTables
k4.sstables = append(k4.sstables, sstable)
k4.sstablesLock.Unlock() // unlock the sstables
k4.printLog("Flushed memtable")
return nil
}
// createSSTable creates an SSTable
// creates an sstable in directory, opens file and returns the sstable
func (k4 *K4) createSSTable() (*SSTable, error) {
k4.sstablesLock.RLock() // read lock
defer k4.sstablesLock.RUnlock() // unlock on defer
// Create SSTable file
sstablePager, err := pager.OpenPager(k4.directory+string(os.PathSeparator)+sstableFilename(len(k4.sstables)), os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
// Create SSTable
return &SSTable{
pager: sstablePager,
lock: &sync.RWMutex{},
compressed: k4.compress,
}, nil
}
// createSSTableNoLock creates an SSTable without locking ssTables slice
// (used mainly for functions that lock the ssTables slice prior to calling this function)
func (k4 *K4) createSSTableNoLock() (*SSTable, error) {
// Create SSTable file
sstablePager, err := pager.OpenPager(k4.directory+string(os.PathSeparator)+sstableFilename(len(k4.sstables)), os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return nil, err
}
// Create SSTable
return &SSTable{
pager: sstablePager,
lock: &sync.RWMutex{},
compressed: k4.compress,
}, nil
}
// sstableFilename returns the filename for an SSTable
func sstableFilename(index int) string {
return "sstable_" + fmt.Sprintf("%d", index) + SSTABLE_EXTENSION
}
// newSSTableIterator creates a new SSTable iterator
func newSSTableIterator(pager *pager.Pager, compressed bool) *SSTableIterator {
return &SSTableIterator{
pager: pager, // the pager for the sstable file
currentPage: 0, // skip the first page which is the hashset
lastPage: int(pager.Count() - 1), // the last page in the sstable
compressed: compressed, // whether the sstable is compressed
}
}
// next returns true if there is another key-value pair in the SSTable
func (it *SSTableIterator) next() bool {
// We check if the current page is greater than the last page
// if so we return false
if it.currentPage > it.lastPage {
return false
}
it.currentPage++ // increment the current page
return true
}
// current returns the current key-value pair in the SSTable
func (it *SSTableIterator) current() ([]byte, []byte, *time.Time) {
// Get the current page
data, err := it.pager.GetPage(int64(it.currentPage))
if err != nil {
return nil, nil, nil
}
// Deserialize key-value pair
key, value, ttl, err := deserializeKv(data)
if err != nil {
return nil, nil, ttl
}
// Check if key value has TTL set, if so we check if it has expired
if ttl != nil {
if time.Now().After(*ttl) {
// skip and go to next
if it.next() {
return it.current()
} else {
return nil, nil, nil
}
}
}
// Check for compression
if it.compressed {
// If so we decompress the key and value
key, value, err = decompressKeyValue(key, value)
if err != nil {
return nil, nil, nil
}
}
return key, value, ttl
}
// currentKey returns the current key in the SSTable
func (it *SSTableIterator) currentKey() []byte {
// Get the current page
data, err := it.pager.GetPage(int64(it.currentPage))
if err != nil {
return nil
}
// Deserialize key-value pair
key, value, _, err := deserializeKv(data)
if err != nil {
return nil
}
// Check for compression
if it.compressed {
// If so we decompress the key
key, _, err = decompressKeyValue(key, value)
if err != nil {
return nil
}
}
return key
}
// prev returns true if there is a previous key-value pair in the SSTable
func (it *SSTableIterator) prev() bool {
// We check if the current page is less than 0
// if so we return false
if it.currentPage < 0 {
return false
}
it.currentPage-- // decrement the current page
return true
}
// newWALIterator creates a new WAL iterator
func newWALIterator(pager *pager.Pager, compressed bool) *WALIterator {
return &WALIterator{
pager: pager,
currentPage: -1,
lastPage: int(pager.Count() - 1),
compressed: compressed,
}
}
// next returns true if there is another operation in the WAL
func (it *WALIterator) next() bool {
it.currentPage++
return it.currentPage <= it.lastPage
}
// current returns the current operation in the WAL
func (it *WALIterator) current() (OPR_CODE, []byte, []byte) {
data, err := it.pager.GetPage(int64(it.currentPage))
if err != nil {
return -1, nil, nil
}
// Deserialize operation
op, key, value, err := deserializeOp(data)
if err != nil {
return -1, nil, nil
}
if it.compressed {
key, value, err = decompressKeyValue(key, value)
if err != nil {
return -1, nil, nil
}
}
return op, key, value
}
// compact compacts K4's sstables by pairing and merging them in parallel
func (k4 *K4) compact() error {
k4.sstablesLock.Lock() // lock up the sstables to prevent reads while we are compacting
defer k4.sstablesLock.Unlock() // defer unlocking the sstables
k4.printLog("Starting compaction")
pairs := len(k4.sstables) / 2 // determine the number of pairs
var wg sync.WaitGroup // create a wait group
newSStables := make([]*SSTable, 0) // create a new slice of sstables
sstablesToRemove := make([]int, 0) // create a new slice of sstables to remove
routinesLock := &sync.Mutex{} // create a new mutex for the routines
// iterate over the pairs of sstables
for i := 0; i < pairs*2; i += 2 {
// if we are at the end of the sstables we break
if i+1 >= len(k4.sstables) {
break
}
// increment the wait group
wg.Add(1)
// start a goroutine to compact the pair of sstables into a new sstable
go func(i int, sstablesToRemove *[]int, newSStables *[]*SSTable, routinesLock *sync.Mutex) {
defer wg.Done() // defer completion of goroutine
hs := hashset.NewHashSet() // create a new hashset
// create a new sstable
newSstable, err := k4.createSSTableNoLock()
if err != nil {
k4.printLog(fmt.Sprintf("Failed to create SSTable: %v", err))
return
}
// set sst1 and sst2 to the sstables at index i and i+1
sstable1 := k4.sstables[i]
sstable2 := k4.sstables[i+1]
// create a new iterator for sstable1
it := newSSTableIterator(sstable1.pager, k4.compress)
for it.next() {
// iterate over the keys in sstable1 and add them to the hashset
key := it.currentKey()
hs.Add(key)
}
// create a new iterator for sstable2
it = newSSTableIterator(sstable2.pager, k4.compress)
for it.next() {
// iterate over the keys in sstable2 and add them to the hashset
key := it.currentKey()
hs.Add(key)
}
// serialize the hashset
hsData, err := hs.Serialize()
if err != nil {
k4.printLog(fmt.Sprintf("Failed to serialize hashset: %v", err))
return
}
// wrote hashset to initial sstable pages
_, err = newSstable.pager.Write(hsData)
if err != nil {
k4.printLog(fmt.Sprintf("Failed to write hashset to SSTable: %v", err))
return
}
// create a new iterator for sstable1 to add entries to the new sstable
it = newSSTableIterator(sstable1.pager, k4.compress)
for it.next() {
key, value, ttl := it.current()
if ttl != nil && time.Now().After(*ttl) { // if the key has expired we skip it
continue
}
// Check for compression
if k4.compress {
key, value, err = compressKeyValue(key, value) // compress key and value
if err != nil {
k4.printLog(fmt.Sprintf("Failed to compress key-value: %v", err))
return
}
}
data := serializeKv(key, value, ttl)
_, err := newSstable.pager.Write(data)
if err != nil {
k4.printLog(fmt.Sprintf("Failed to write key-value to SSTable: %v", err))
return
}
}
// create a new iterator for sstable2 to add entries to the new sstable
it = newSSTableIterator(sstable2.pager, k4.compress)
for it.next() {
key, value, ttl := it.current()
if ttl != nil && time.Now().After(*ttl) { // if the key has expired we skip it
continue
}
// Check for compression
if k4.compress {
key, value, err = compressKeyValue(key, value) // compress key and value
if err != nil {
k4.printLog(fmt.Sprintf("Failed to compress key-value: %v", err))
return
}
}
data := serializeKv(key, value, ttl) // serialize key-value pair
_, err := newSstable.pager.Write(data) // write to new sstable
if err != nil {
k4.printLog(fmt.Sprintf("Failed to write key-value to SSTable: %v", err))
return
}
}
// close sstable1 and sstable2
err = sstable1.pager.Close()
if err != nil {
k4.printLog(fmt.Sprintf("Failed to close SSTable1: %v", err))
return
}
err = sstable2.pager.Close()
if err != nil {
k4.printLog(fmt.Sprintf("Failed to close SSTable2: %v", err))
return
}
routinesLock.Lock()
*sstablesToRemove = append(*sstablesToRemove, i)
*newSStables = append(*newSStables, newSstable)
routinesLock.Unlock()
err = os.Remove(k4.directory + string(os.PathSeparator) + sstableFilename(i))
if err != nil {
k4.printLog(fmt.Sprintf("Failed to remove SSTable1 file: %v", err))
return
}
err = os.Remove(k4.directory + string(os.PathSeparator) + sstableFilename(i+1))
if err != nil {
k4.printLog(fmt.Sprintf("Failed to remove SSTable2 file: %v", err))
return
}
}(i, &sstablesToRemove, &newSStables, routinesLock)
}
wg.Wait()
// remove paired sstables
for _, index := range sstablesToRemove {
if index >= len(k4.sstables) {
continue
}
k4.sstables = append(k4.sstables[:index], k4.sstables[index+2:]...)
}
// append new sstables
k4.sstables = append(k4.sstables, newSStables...)
k4.printLog("Compaction completed")
return nil
}
// RecoverFromWAL recovers K4 from a write ahead log
func (k4 *K4) RecoverFromWAL() error {
k4.printLog("Starting to recover from write ahead log")
// Iterate over the write ahead log
it := newWALIterator(k4.wal, k4.compress)
for it.next() {
op, key, value := it.current()
switch op {
case PUT:
err := k4.Put(key, value, nil)
if err != nil {
return err
}
case DELETE:
err := k4.Delete(key)
if err != nil {
return err
}
default:
return fmt.Errorf("invalid operation")
}
}
k4.printLog("Recovery from write ahead log completed")
return nil
}
// compressKeyValue compresses a key and value
func compressKeyValue(key, value []byte) ([]byte, []byte, error) {
// compress the key and value
// create new compressor for key
keyCompressor, err := compressor.NewCompressor(COMPRESSION_WINDOW_SIZE)
if err != nil {
return nil, nil, err
}