-
Notifications
You must be signed in to change notification settings - Fork 0
/
ranker.go
185 lines (172 loc) · 4.59 KB
/
ranker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package ranker
import (
"bufio"
"errors"
"fmt"
"log"
"os"
"sync"
"github.com/gasparian/clickhouse-test-file-reader/internal/io"
"github.com/gasparian/clickhouse-test-file-reader/internal/record"
"github.com/gasparian/clickhouse-test-file-reader/pkg/heap"
)
// instead of max we do min here, to maintain heap of constant size
// we then have to reverse order of elements that we get from the heap
// to get topk values
func comparator(a, b record.Record) bool {
return a.Value < b.Value
}
type rankerConfig struct {
sync.RWMutex
topK int
nWorkers int
}
func (rc *rankerConfig) getTopK() int {
rc.RLock()
defer rc.RUnlock()
return rc.topK
}
// Ranker holds channels for communicating between processing stages
// and methods for parsing and ranking input text data
type Ranker struct {
inputChan chan io.FileSegmentPointer
heapsChan chan *heap.InvertedBoundedHeap[record.Record]
config rankerConfig
}
func (r *Ranker) processSegment(fileSegment io.FileSegmentPointer) (*heap.InvertedBoundedHeap[record.Record], error) {
f, err := os.Open(fileSegment.Fpath)
if err != nil {
return nil, err
}
defer f.Close()
f.Seek(fileSegment.Start, 0)
s := bufio.NewScanner(f)
buf := make([]byte, 0)
s.Buffer(buf, fileSegment.BufSize)
var nBytesRead int64 = 0
h := heap.NewHeap(comparator, r.config.getTopK(), nil)
for s.Scan() {
text := s.Text()
if len(text) > 0 {
record, err := record.ParseRecord(text)
if err != nil {
log.Println("Warning: line parsing failed with error: ", err)
continue
}
h.Push(record)
}
if err := s.Err(); err != nil {
return nil, err
}
nBytesRead += int64(len(s.Bytes()))
if nBytesRead >= fileSegment.Len {
break
}
}
return h, nil
}
func (r *Ranker) worker(wg *sync.WaitGroup) {
for fileSegmentPointer := range r.inputChan {
h, err := r.processSegment(fileSegmentPointer)
if err != nil {
log.Println("Error: cannot process file segment: ", err)
continue
}
r.heapsChan <- h
}
wg.Done()
}
func validateRankerParams(nWorkers, topK int) error {
if topK < 1 {
return fmt.Errorf("error: `topK` should be >= 1")
}
if nWorkers <= 0 {
return fmt.Errorf("error: `nWorkers` should be a non-zero positive number")
}
if nWorkers > 1023 {
nWorkers = 1023
log.Printf("info: number of workers decreased from %v to 1023, since 1024 is a soft limit (for Linux)\n", nWorkers)
}
return nil
}
// NewRanker creates new instance of the ranker
func NewRanker(nWorkers, topK int) (*Ranker, error) {
err := validateRankerParams(nWorkers, topK)
if err != nil {
return nil, err
}
r := &Ranker{
inputChan: make(chan io.FileSegmentPointer),
heapsChan: make(chan *heap.InvertedBoundedHeap[record.Record]),
config: rankerConfig{
topK: topK,
nWorkers: nWorkers,
},
}
go func() {
wg := &sync.WaitGroup{}
for i := 0; i < nWorkers; i++ {
wg.Add(1)
go r.worker(wg)
}
wg.Wait()
close(r.heapsChan)
}()
return r, nil
}
// GetRankedList merges heaps produced by mappers and
// outputs slice of topk ranked urls
func (r *Ranker) GetRankedList() []string {
topK := r.config.getTopK()
finalHeap := heap.NewHeap(comparator, topK, nil)
for h := range r.heapsChan {
finalHeap.Merge(h)
}
if finalHeap.Len() == 0 {
return []string{}
}
if finalHeap.Len() < topK {
topK = finalHeap.Len()
}
result := make([]string, topK)
// invert an order of elements, since we're maintaining min heap
// but we need highest values first in result
for i := topK - 1; i >= 0; i-- {
v := finalHeap.Pop()
result[i] = v.Url
}
return result
}
// EmitFileSegments starts parsing the file and emits found segments
// one by one to the input channel, then closes it to stop the workers
func (r *Ranker) EmitFileSegments(fpath string, bufSize int, segmentSize int64) error {
segmentsChan, err := io.GetFileSegments(fpath, bufSize, segmentSize, '\n')
if err != nil {
return err
}
go func() {
for segment := range segmentsChan {
r.inputChan <- segment
}
close(r.inputChan)
}()
return nil
}
// ProcessFile reads file, splits it in segments and sends segments to ranker workers;
// Then it waits for the final aggregated result and returns it;
// if `segmentSize` is zero - file will not be splitted in chunks
func ProcessFile(fpath string, bufSize, nWorkers, topK int, segmentSize int64) ([]string, error) {
if int64(bufSize) > segmentSize && segmentSize != 0 {
return nil, errors.New("error: segment size should be larger than buffer size")
}
r, err := NewRanker(nWorkers, topK)
if err != nil {
return nil, err
}
err = r.EmitFileSegments(fpath, bufSize, segmentSize)
if err != nil {
return nil, err
}
rank := r.GetRankedList()
return rank, nil
}