-
Notifications
You must be signed in to change notification settings - Fork 3
/
batch_iterator.go
156 lines (139 loc) · 3.63 KB
/
batch_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package asc
import (
"fmt"
"github.com/aerospike/aerospike-client-go"
"github.com/viant/toolbox"
"os"
"sync"
"sync/atomic"
)
//BatchIterator represents a helper iterator for full scan, in this case keys were scanned first from each node separately
//to be used by this batch iterator, after keys files are fully process they will be removed.
type BatchIterator struct {
mutex *sync.RWMutex
fileNames []string
fileIndex int32
namespace string
table string
binNames []string
client *aerospike.Client
batchPolicy *aerospike.BatchPolicy
batch *Batch
fileInfo os.FileInfo
recordCount int32
filePosition int32
err error
batchSize int
file *os.File
}
//initialiseScannerIfNeeded, returns true if success
func (i *BatchIterator) initialiseScannerIfNeeded(filename string) bool {
if i.file != nil {
return true
}
i.file, i.err = os.Open(filename)
if i.err != nil {
return false
}
i.fileInfo, i.err = i.file.Stat()
if i.err != nil {
return false
}
return true
}
func (i *BatchIterator) scanKeys() ([]*aerospike.Key, bool) {
var keys = make([]*aerospike.Key, 0)
for j := 0; j < i.batchSize; j++ {
var position = int(atomic.LoadInt32(&i.filePosition))
if position >= int(i.fileInfo.Size()) {
i.file.Close()
atomic.StoreInt32(&i.filePosition, 0)
i.file = nil
i.fileInfo = nil
toolbox.RemoveFileIfExist(i.fileNames[i.fileIndex])
atomic.AddInt32(&i.fileIndex, 1)
atomic.StoreInt32(&i.recordCount, 0)
break
}
key, readCount, err := ReadKey(i.file, i.namespace, i.table)
atomic.AddInt32(&i.filePosition, int32(readCount))
if err != nil {
i.err = err
return nil, false
}
keys = append(keys, key)
}
return keys, true
}
func (i *BatchIterator) readInBatch(keys []*aerospike.Key) bool {
i.batch.Keys = keys
var err error
i.batch.Records, err = i.client.BatchGet(i.batchPolicy, keys, i.binNames...)
if err != nil {
i.err = nil
} else {
atomic.AddInt32(&i.recordCount, int32(len(i.batch.Records)))
}
return i.err == nil
}
//HasNext check is has more record, if needed it will scan keys from files to batch corresponding records
func (i *BatchIterator) HasNext() bool {
i.mutex.Lock()
defer i.mutex.Unlock()
var iterationCont = 10000 //some high number
for k := 0; k < iterationCont; k++ {
var index = int(atomic.LoadInt32(&i.fileIndex))
if index >= len(i.fileNames) {
return false
}
if !i.initialiseScannerIfNeeded(i.fileNames[index]) {
return true //to catch error
}
keys, shallContinue := i.scanKeys()
if !shallContinue {
return true //to catch error
}
if len(keys) == 0 {
continue
}
if !i.readInBatch(keys) {
return true //to catch error
}
if len(i.batch.Records) == 0 {
continue
}
return true
}
return false
}
func (i *BatchIterator) Next(target interface{}) error {
if i.err != nil {
return i.err
}
if targetPointer, ok := target.(*Batch); ok {
*targetPointer = *i.batch
return nil
}
if targetPointer, ok := target.(**Batch); ok {
*targetPointer = i.batch
return nil
}
return fmt.Errorf("unsupporter target type %T, expected %T or %T", target, i.batch, &i.batch)
}
func NewBatchIterator(client *aerospike.Client, batchPolicy *aerospike.BatchPolicy, batchSize int, namespace, table string, fileNames []string, binNames ...string) *BatchIterator {
return &BatchIterator{
mutex: &sync.RWMutex{},
client: client,
batchPolicy: batchPolicy,
batchSize: batchSize,
namespace: namespace,
table: table,
fileNames: fileNames,
binNames: binNames,
batch: &Batch{},
}
}
type Batch struct {
Keys []*aerospike.Key
Records []*aerospike.Record
}