-
Notifications
You must be signed in to change notification settings - Fork 13
/
lsmt.go
137 lines (127 loc) · 3.02 KB
/
lsmt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package lsmt
import (
"fmt"
"log"
"sync"
"time"
)
type Element struct {
Key, Value string
}
type LSMTree struct {
// Read write lock to control access to the in-memory tree.
rwm sync.RWMutex
tree *TreeNode
treeInFlush *TreeNode
flushThreshold int
// Read write lock to control access to the disk files.
drwm sync.RWMutex
diskFiles []DiskFile
}
func NewLSMTree(flushThreshold int) *LSMTree {
t := &LSMTree{flushThreshold: flushThreshold}
go t.compactService()
return t
}
func (t *LSMTree) Put(key, value string) {
t.rwm.Lock()
defer t.rwm.Unlock()
Upsert(&(t.tree), Element{Key: key, Value: value})
if t.tree.Size >= t.flushThreshold && t.treeInFlush == nil {
// Trigger flush.
log.Printf("triggering flush %v", Traverse(t.tree))
t.treeInFlush = t.tree
t.tree = nil
go t.flush()
}
}
func (t *LSMTree) Get(key string) (string, error) {
t.rwm.RLock()
if e, err := Find(t.tree, key); err == nil {
t.rwm.RUnlock()
return e.Value, nil
}
if e, err := Find(t.treeInFlush, key); err == nil {
t.rwm.RUnlock()
return e.Value, nil
}
t.rwm.RUnlock()
// The key is not in memory. Search in disk files.
t.drwm.RLock()
defer t.drwm.RUnlock()
for _, d := range t.diskFiles {
e, err := d.Search(key)
if err == nil {
// Found in disk
return e.Value, nil
}
}
return "", fmt.Errorf("key %s not found", key)
}
func (t *LSMTree) flush() {
// Create a new disk file.
d := []DiskFile{NewDiskFile(Traverse(t.treeInFlush))}
// Put the disk file in the list.
t.drwm.Lock()
t.diskFiles = append(d, t.diskFiles...)
t.drwm.Unlock()
// Remove the tree in flush.
t.rwm.Lock()
t.treeInFlush = nil
t.rwm.Unlock()
}
func (t *LSMTree) compactService() {
for {
time.Sleep(time.Second)
var d1, d2 DiskFile
t.drwm.RLock()
if len(t.diskFiles) >= 2 {
d1 = t.diskFiles[len(t.diskFiles)-1]
d2 = t.diskFiles[len(t.diskFiles)-2]
}
t.drwm.RUnlock()
if d1.Empty() || d2.Empty() {
continue
}
// Create a new compacted disk file.
d := compact(d1, d2)
// Replace the two old files.
t.drwm.Lock()
t.diskFiles = t.diskFiles[0:len(t.diskFiles)-2]
t.diskFiles = append(t.diskFiles, d)
t.drwm.Unlock()
}
}
func compact(d1, d2 DiskFile) DiskFile {
elems1 := d1.AllElements()
elems2 := d2.AllElements()
log.Printf("compacting d1: %v; d2: %v", elems1, elems2)
size := min(len(elems1), len(elems2))
var newElems []Element
var i1, i2 int
for i1 < size && i2 < size {
e1 := elems1[i1]
e2 := elems2[i2]
if e1.Key < e2.Key {
newElems = append(newElems, e1)
i1++
} else if e1.Key > e2.Key {
newElems = append(newElems, e2)
i2++
} else {
// d1 is assumed to be older than d2.
newElems = append(newElems, e2)
i1++
i2++
}
}
newElems = append(newElems, elems1[i1:len(elems1)]...)
newElems = append(newElems, elems2[i2:len(elems2)]...)
return NewDiskFile(newElems)
}
func min(i, j int) int {
if i < j {
return i
}
return j
}