-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathread.go
150 lines (123 loc) · 3.48 KB
/
read.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package bigqueue
import (
"errors"
)
var (
// ErrEmptyQueue is returned when dequeue is performed on an empty queue.
ErrEmptyQueue = errors.New("queue is empty")
)
// IsEmpty returns true when queue is empty for the default consumer.
func (q *MmapQueue) IsEmpty() bool {
return q.isEmpty(q.dc)
}
func (q *MmapQueue) isEmpty(base int64) bool {
q.lock.Lock()
defer q.lock.Unlock()
return q.isEmptyNoLock(base)
}
func (q *MmapQueue) isEmptyNoLock(base int64) bool {
headAid, headOffset := q.md.getConsumerHead(base)
tailAid, tailOffset := q.md.getTail()
return headAid == tailAid && headOffset == tailOffset
}
// Dequeue removes an element from the queue and returns it.
// This function uses the default consumer to consume from the queue.
func (q *MmapQueue) Dequeue() ([]byte, error) {
return q.dequeue(q.dc)
}
func (q *MmapQueue) dequeue(base int64) ([]byte, error) {
q.lock.Lock()
defer q.lock.Unlock()
if err := q.dequeueReader(&q.br, base); err != nil {
q.br.b = nil
return nil, err
}
r := q.br.b
q.br.b = nil
return r, nil
}
// DequeueString removes a string element from the queue and returns it.
// This function uses the default consumer to consume from the queue.
func (q *MmapQueue) DequeueString() (string, error) {
return q.dequeueString(q.dc)
}
func (q *MmapQueue) dequeueString(base int64) (string, error) {
q.lock.Lock()
defer q.lock.Unlock()
if err := q.dequeueReader(&q.sr, base); err != nil {
q.sr.sb.Reset()
return "", err
}
r := q.sr.sb.String()
q.sr.sb.Reset()
return r, nil
}
// dequeue reads one element of the queue into given reader.
// It takes care of reading the element that is spread across multiple arenas.
func (q *MmapQueue) dequeueReader(r reader, base int64) error {
if q.isEmptyNoLock(base) {
return ErrEmptyQueue
}
// read head
aid, offset := q.md.getConsumerHead(base)
// read length
newAid, newOffset, length, err := q.readLength(aid, offset)
if err != nil {
return err
}
aid, offset = newAid, newOffset
// read message
r.grow(length)
aid, offset, err = q.readBytes(r, aid, offset, length)
if err != nil {
return err
}
// update head
q.md.putConsumerHead(base, aid, offset)
q.incrMutOps()
return nil
}
// readLength reads length of the message.
// length is always written in 1 arena, it is never broken across arenas.
func (q *MmapQueue) readLength(aid, offset int) (int, int, int, error) {
// check if length is present in same arena, if not get next arena.
// If length is stored in next arena, get next aid with 0 offset value.
if offset+cInt64Size > q.conf.arenaSize {
aid, offset = aid+1, 0
}
// read length
aa, err := q.am.getArena(aid)
if err != nil {
return 0, 0, 0, err
}
length := int(aa.ReadUint64At(int64(offset)))
// update offset, if offset is equal to arena size,
// reset arena to next aid and offset to 0
offset += cInt64Size
if offset == q.conf.arenaSize {
aid, offset = aid+1, 0
}
return aid, offset, length, nil
}
// readBytes reads length bytes from arena aid starting at offset.
func (q *MmapQueue) readBytes(r reader, aid, offset, length int) (int, int, error) {
counter := 0
for {
aa, err := q.am.getArena(aid)
if err != nil {
return 0, 0, err
}
bytesRead := r.readFrom(aa, offset, counter)
counter += bytesRead
offset += bytesRead
// if offset is equal to arena size, reset arena to next aid and offset to 0.
if offset == q.conf.arenaSize {
aid, offset = aid+1, 0
}
// check if all bytes are read
if counter == length {
break
}
}
return aid, offset, nil
}