Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 181 additions & 0 deletions pkg/localstore/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,3 +534,184 @@ func TestSetTestHookCollectGarbage(t *testing.T) {
t.Errorf("got hook value %v, want %v", got, original)
}
}

func TestPinAfterMultiGC(t *testing.T) {
db := newTestDB(t, &Options{
Capacity: 10,
})

pinnedChunks := make([]swarm.Address, 0)

// upload random chunks above db capacity to see if chunks are still pinned
for i := 0; i < 20; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}

if len(pinnedChunks) < 10 {
Comment thread
acud marked this conversation as resolved.
rch := generateAndPinAChunk(t, db)
pinnedChunks = append(pinnedChunks, rch.Address())
}
}
for i := 0; i < 20; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
}
for i := 0; i < 20; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
}

t.Run("pin Index count", newItemsCountTest(db.pinIndex, len(pinnedChunks)))

// Check if all the pinned chunks are present in the data DB
for _, addr := range pinnedChunks {
outItem := shed.Item{
Address: addr.Bytes(),
}
gotChunk, err := db.Get(context.Background(), storage.ModeGetRequest, swarm.NewAddress(outItem.Address))
if err != nil {
t.Fatal(err)
}
if !gotChunk.Address().Equal(swarm.NewAddress(addr.Bytes())) {
t.Fatal("Pinned chunk is not equal to got chunk")
}
}

}

func generateAndPinAChunk(t *testing.T, db *DB) swarm.Chunk {
// Create a chunk and pin it
pinnedChunk := generateTestRandomChunk()

_, err := db.Put(context.Background(), storage.ModePutUpload, pinnedChunk)
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetPin, pinnedChunk.Address())
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetSyncPull, pinnedChunk.Address())
if err != nil {
t.Fatal(err)
}
return pinnedChunk
}

func TestPinSyncAndAccessPutSetChunkMultipleTimes(t *testing.T) {
var closed chan struct{}
testHookCollectGarbageChan := make(chan uint64)
t.Cleanup(setTestHookCollectGarbage(func(collectedCount uint64) {
select {
case testHookCollectGarbageChan <- collectedCount:
case <-closed:
}
}))
db := newTestDB(t, &Options{
Capacity: 10,
})
closed = db.close

pinnedChunks := addRandomChunks(t, 5, db, true)
rand1Chunks := addRandomChunks(t, 15, db, false)
for _, ch := range pinnedChunks {
_, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
}
for _, ch := range rand1Chunks {
_, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
// ignore the chunks that are GCd
continue
}
}

rand2Chunks := addRandomChunks(t, 20, db, false)
for _, ch := range rand2Chunks {
_, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
// ignore the chunks that are GCd
continue
}
}

rand3Chunks := addRandomChunks(t, 20, db, false)

for _, ch := range rand3Chunks {
_, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
// ignore the chunks that are GCd
continue
}
}

// check if the pinned chunk is present after GC
for _, ch := range pinnedChunks {
gotChunk, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal("Pinned chunk missing ", err)
}
if !gotChunk.Address().Equal(ch.Address()) {
t.Fatal("Pinned chunk address is not equal to got chunk")
}

if !bytes.Equal(gotChunk.Data(), ch.Data()) {
t.Fatal("Pinned chunk data is not equal to got chunk")
}
}

}

func addRandomChunks(t *testing.T, count int, db *DB, pin bool) []swarm.Chunk {
var chunks []swarm.Chunk
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()
_, err := db.Put(context.Background(), storage.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
if pin {
err = db.Set(context.Background(), storage.ModeSetPin, ch.Address())
if err != nil {
t.Fatal(err)
}
}
err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address())
if err != nil {
t.Fatal(err)
}
err = db.Set(context.Background(), storage.ModeSetAccess, ch.Address())
if err != nil {
t.Fatal(err)
}
_, err = db.Get(context.Background(), storage.ModeGetRequest, ch.Address())
if err != nil {
t.Fatal(err)
}
chunks = append(chunks, ch)
}
return chunks
}
2 changes: 1 addition & 1 deletion pkg/localstore/localstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,6 @@ func TestDBDebugIndexes(t *testing.T) {
}

// assert that there's a pin and gc exclude entry now
testIndexCounts(t, 1, 1, 1, 1, 1, 1, 1, indexCounts)
testIndexCounts(t, 1, 1, 0, 1, 1, 1, 1, indexCounts)
Comment thread
acud marked this conversation as resolved.

}
15 changes: 12 additions & 3 deletions pkg/localstore/mode_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"errors"
"time"

"github.com/syndtr/goleveldb/leveldb"

"github.com/ethersphere/bee/pkg/shed"
"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/syndtr/goleveldb/leveldb"
)

// Get returns a chunk from the database. If the chunk is
Expand Down Expand Up @@ -155,11 +156,19 @@ func (db *DB) updateGC(item shed.Item) (err error) {
if err != nil {
return err
}
// add new entry to gc index
err = db.gcIndex.PutInBatch(batch, item)

// add new entry to gc index ONLY if it is not present in pinIndex
ok, err := db.pinIndex.Has(item)
if err != nil {
return err
}
if !ok {
err = db.gcIndex.PutInBatch(batch, item)
if err != nil {
return err
}
}

return db.shed.WriteBatch(batch)
}

Expand Down
12 changes: 9 additions & 3 deletions pkg/localstore/mode_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,18 @@ func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, e
return 0, err
}

err = db.gcIndex.PutInBatch(batch, item)
// add new entry to gc index ONLY if it is not present in pinIndex
ok, err := db.pinIndex.Has(item)
if err != nil {
return 0, err
}

gcSizeChange++
if !ok {
err = db.gcIndex.PutInBatch(batch, item)
if err != nil {
return 0, err
}
gcSizeChange++
}

return gcSizeChange, nil
}
Expand Down
14 changes: 11 additions & 3 deletions pkg/localstore/mode_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ import (
"errors"
"time"

"github.com/syndtr/goleveldb/leveldb"

"github.com/ethersphere/bee/pkg/storage"
"github.com/ethersphere/bee/pkg/swarm"
"github.com/ethersphere/bee/pkg/tags"
"github.com/syndtr/goleveldb/leveldb"
)

// Set updates database indexes for
Expand Down Expand Up @@ -180,11 +181,18 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr swar
if err != nil {
return 0, err
}
err = db.gcIndex.PutInBatch(batch, item)

ok, err := db.pinIndex.Has(item)
if err != nil {
return 0, err
}
gcSizeChange++
if !ok {
err = db.gcIndex.PutInBatch(batch, item)
if err != nil {
return 0, err
}
gcSizeChange++
}

return gcSizeChange, nil
}
Expand Down