Skip to content

Commit bbfecd8

Browse files
authored
no cache on iterator, remove unused snapshot (#389)
1 parent b02505d commit bbfecd8

File tree

3 files changed

+7
-27
lines changed

3 files changed

+7
-27
lines changed

partition_table.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ func newPartitionTable(topic string,
7979
builder storage.Builder,
8080
log logger,
8181
backoff Backoff,
82-
backoffResetTimeout time.Duration) *PartitionTable {
83-
82+
backoffResetTimeout time.Duration,
83+
) *PartitionTable {
8484
pt := &PartitionTable{
8585
partition: partition,
8686
state: newPartitionTableState(),
@@ -107,7 +107,6 @@ func newPartitionTable(topic string,
107107

108108
// SetupAndRecover sets up the partition storage and recovers to HWM
109109
func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error {
110-
111110
err := p.setup(ctx)
112111
if err != nil {
113112
return err
@@ -405,7 +404,6 @@ func (p *PartitionTable) markRecovered(ctx context.Context) error {
405404
}
406405

407406
func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error {
408-
409407
timeoutCtx, cancel := context.WithTimeout(context.Background(), consumerDrainTimeout)
410408
defer cancel()
411409

@@ -448,7 +446,6 @@ func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error {
448446
}
449447

450448
func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.PartitionConsumer, partitionHwm int64, stopAfterCatchup bool) error {
451-
452449
stallTicker := time.NewTicker(p.stallPeriod)
453450
defer stallTicker.Stop()
454451

@@ -527,7 +524,6 @@ func (p *PartitionTable) enqueueStatsUpdate(ctx context.Context, updater func())
527524
// recover/catchup mechanism so clients can always request stats even if the partition table is not
528525
// running (like a processor table after it's recovered).
529526
func (p *PartitionTable) RunStatsLoop(ctx context.Context) {
530-
531527
updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval)
532528
defer updateHwmStatsTicker.Stop()
533529
for {

storage/iterator.go

-3
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package storage
22

33
import (
4-
"github.com/syndtr/goleveldb/leveldb"
54
ldbiter "github.com/syndtr/goleveldb/leveldb/iterator"
65
)
76

87
// iterator wraps an Iterator implementation and handles the value decoding and
98
// offset key skipping.
109
type iterator struct {
1110
iter ldbiter.Iterator
12-
snap *leveldb.Snapshot
1311
}
1412

1513
func (i *iterator) Next() bool {
@@ -40,7 +38,6 @@ func (i *iterator) Value() ([]byte, error) {
4038

4139
func (i *iterator) Release() {
4240
i.iter.Release()
43-
i.snap.Release()
4441
}
4542

4643
func (i *iterator) Seek(key []byte) bool {

storage/storage.go

+5-18
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/syndtr/goleveldb/leveldb"
11+
"github.com/syndtr/goleveldb/leveldb/opt"
1112
"github.com/syndtr/goleveldb/leveldb/util"
1213
)
1314

@@ -44,7 +45,6 @@ type Iterator interface {
4445
// Implementations of this interface must be safe for any number of concurrent
4546
// readers with one writer.
4647
type Storage interface {
47-
4848
// Opens/Initialize the storage
4949
Open() error
5050

@@ -95,7 +95,6 @@ type storage struct {
9595

9696
// New creates a new Storage backed by LevelDB.
9797
func New(db *leveldb.DB) (Storage, error) {
98-
9998
return &storage{
10099
db: db,
101100
recovered: make(chan struct{}),
@@ -106,35 +105,23 @@ func New(db *leveldb.DB) (Storage, error) {
106105

107106
// Iterator returns an iterator that traverses over a snapshot of the storage.
108107
func (s *storage) Iterator() (Iterator, error) {
109-
snap, err := s.db.GetSnapshot()
110-
if err != nil {
111-
return nil, err
112-
}
113-
114108
return &iterator{
115-
iter: s.db.NewIterator(nil, nil),
116-
snap: snap,
109+
iter: s.db.NewIterator(nil, &opt.ReadOptions{
110+
DontFillCache: true,
111+
}),
117112
}, nil
118113
}
119114

120115
// Iterator returns an iterator that traverses over a snapshot of the storage.
121116
func (s *storage) IteratorWithRange(start, limit []byte) (Iterator, error) {
122-
snap, err := s.db.GetSnapshot()
123-
if err != nil {
124-
return nil, err
125-
}
126-
127-
if limit != nil && len(limit) > 0 {
117+
if len(limit) > 0 {
128118
return &iterator{
129119
iter: s.db.NewIterator(&util.Range{Start: start, Limit: limit}, nil),
130-
snap: snap,
131120
}, nil
132121
}
133122
return &iterator{
134123
iter: s.db.NewIterator(util.BytesPrefix(start), nil),
135-
snap: snap,
136124
}, nil
137-
138125
}
139126

140127
func (s *storage) Has(key string) (bool, error) {

0 commit comments

Comments
 (0)