diff --git a/pkg/localstore/gc_test.go b/pkg/localstore/gc_test.go index 8d866fc8aac..ef33ed65013 100644 --- a/pkg/localstore/gc_test.go +++ b/pkg/localstore/gc_test.go @@ -534,3 +534,184 @@ func TestSetTestHookCollectGarbage(t *testing.T) { t.Errorf("got hook value %v, want %v", got, original) } } + +func TestPinAfterMultiGC(t *testing.T) { + db := newTestDB(t, &Options{ + Capacity: 10, + }) + + pinnedChunks := make([]swarm.Address, 0) + + // upload random chunks above db capacity to see if chunks are still pinned + for i := 0; i < 20; i++ { + ch := generateTestRandomChunk() + _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + if err != nil { + t.Fatal(err) + } + err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) + if err != nil { + t.Fatal(err) + } + + if len(pinnedChunks) < 10 { + rch := generateAndPinAChunk(t, db) + pinnedChunks = append(pinnedChunks, rch.Address()) + } + } + for i := 0; i < 20; i++ { + ch := generateTestRandomChunk() + _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + if err != nil { + t.Fatal(err) + } + err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) + if err != nil { + t.Fatal(err) + } + } + for i := 0; i < 20; i++ { + ch := generateTestRandomChunk() + _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + if err != nil { + t.Fatal(err) + } + err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) + if err != nil { + t.Fatal(err) + } + } + + t.Run("pin Index count", newItemsCountTest(db.pinIndex, len(pinnedChunks))) + + // Check if all the pinned chunks are present in the data DB + for _, addr := range pinnedChunks { + outItem := shed.Item{ + Address: addr.Bytes(), + } + gotChunk, err := db.Get(context.Background(), storage.ModeGetRequest, swarm.NewAddress(outItem.Address)) + if err != nil { + t.Fatal(err) + } + if !gotChunk.Address().Equal(swarm.NewAddress(addr.Bytes())) { + t.Fatal("Pinned chunk is not equal to got chunk") + } + } + +} + +func generateAndPinAChunk(t *testing.T, db *DB) swarm.Chunk { + // Create a chunk and pin it + pinnedChunk := generateTestRandomChunk() + + _, err := db.Put(context.Background(), storage.ModePutUpload, pinnedChunk) + if err != nil { + t.Fatal(err) + } + err = db.Set(context.Background(), storage.ModeSetPin, pinnedChunk.Address()) + if err != nil { + t.Fatal(err) + } + err = db.Set(context.Background(), storage.ModeSetSyncPull, pinnedChunk.Address()) + if err != nil { + t.Fatal(err) + } + return pinnedChunk +} + +func TestPinSyncAndAccessPutSetChunkMultipleTimes(t *testing.T) { + var closed chan struct{} + testHookCollectGarbageChan := make(chan uint64) + t.Cleanup(setTestHookCollectGarbage(func(collectedCount uint64) { + select { + case testHookCollectGarbageChan <- collectedCount: + case <-closed: + } + })) + db := newTestDB(t, &Options{ + Capacity: 10, + }) + closed = db.close + + pinnedChunks := addRandomChunks(t, 5, db, true) + rand1Chunks := addRandomChunks(t, 15, db, false) + for _, ch := range pinnedChunks { + _, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + if err != nil { + t.Fatal(err) + } + } + for _, ch := range rand1Chunks { + _, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + if err != nil { + // ignore the chunks that are GCd + continue + } + } + + rand2Chunks := addRandomChunks(t, 20, db, false) + for _, ch := range rand2Chunks { + _, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + if err != nil { + // ignore the chunks that are GCd + continue + } + } + + rand3Chunks := addRandomChunks(t, 20, db, false) + + for _, ch := range rand3Chunks { + _, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + if err != nil { + // ignore the chunks that are GCd + continue + } + } + + // check if the pinned chunk is present after GC + for _, ch := range pinnedChunks { + gotChunk, err := db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + if err != nil { + t.Fatal("Pinned chunk missing ", err) + } + if !gotChunk.Address().Equal(ch.Address()) { + t.Fatal("Pinned chunk address is not equal to got chunk") + } + + if !bytes.Equal(gotChunk.Data(), ch.Data()) { + t.Fatal("Pinned chunk data is not equal to got chunk") + } + } + +} + +func addRandomChunks(t *testing.T, count int, db *DB, pin bool) []swarm.Chunk { + var chunks []swarm.Chunk + for i := 0; i < count; i++ { + ch := generateTestRandomChunk() + _, err := db.Put(context.Background(), storage.ModePutUpload, ch) + if err != nil { + t.Fatal(err) + } + if pin { + err = db.Set(context.Background(), storage.ModeSetPin, ch.Address()) + if err != nil { + t.Fatal(err) + } + } + err = db.Set(context.Background(), storage.ModeSetSyncPull, ch.Address()) + if err != nil { + t.Fatal(err) + } + err = db.Set(context.Background(), storage.ModeSetAccess, ch.Address()) + if err != nil { + t.Fatal(err) + } + _, err = db.Get(context.Background(), storage.ModeGetRequest, ch.Address()) + if err != nil { + t.Fatal(err) + } + chunks = append(chunks, ch) + } + return chunks +} diff --git a/pkg/localstore/localstore_test.go b/pkg/localstore/localstore_test.go index d5cf1e949b3..4bc5117ab03 100644 --- a/pkg/localstore/localstore_test.go +++ b/pkg/localstore/localstore_test.go @@ -567,6 +567,6 @@ func TestDBDebugIndexes(t *testing.T) { } // assert that there's a pin and gc exclude entry now - testIndexCounts(t, 1, 1, 1, 1, 1, 1, 1, indexCounts) + testIndexCounts(t, 1, 1, 0, 1, 1, 1, 1, indexCounts) } diff --git a/pkg/localstore/mode_get.go b/pkg/localstore/mode_get.go index 1567e9b26b9..71d15f6df92 100644 --- a/pkg/localstore/mode_get.go +++ b/pkg/localstore/mode_get.go @@ -21,10 +21,11 @@ import ( "errors" "time" + "github.com/syndtr/goleveldb/leveldb" + "github.com/ethersphere/bee/pkg/shed" "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" - "github.com/syndtr/goleveldb/leveldb" ) // Get returns a chunk from the database. If the chunk is @@ -155,11 +156,19 @@ func (db *DB) updateGC(item shed.Item) (err error) { if err != nil { return err } - // add new entry to gc index - err = db.gcIndex.PutInBatch(batch, item) + + // add new entry to gc index ONLY if it is not present in pinIndex + ok, err := db.pinIndex.Has(item) if err != nil { return err } + if !ok { + err = db.gcIndex.PutInBatch(batch, item) + if err != nil { + return err + } + } + return db.shed.WriteBatch(batch) } diff --git a/pkg/localstore/mode_put.go b/pkg/localstore/mode_put.go index f41daed414e..6123c448973 100644 --- a/pkg/localstore/mode_put.go +++ b/pkg/localstore/mode_put.go @@ -338,12 +338,18 @@ func (db *DB) setGC(batch *leveldb.Batch, item shed.Item) (gcSizeChange int64, e return 0, err } - err = db.gcIndex.PutInBatch(batch, item) + // add new entry to gc index ONLY if it is not present in pinIndex + ok, err := db.pinIndex.Has(item) if err != nil { return 0, err } - - gcSizeChange++ + if !ok { + err = db.gcIndex.PutInBatch(batch, item) + if err != nil { + return 0, err + } + gcSizeChange++ + } return gcSizeChange, nil } diff --git a/pkg/localstore/mode_set.go b/pkg/localstore/mode_set.go index d41316874bd..c00e94a7e70 100644 --- a/pkg/localstore/mode_set.go +++ b/pkg/localstore/mode_set.go @@ -21,10 +21,11 @@ import ( "errors" "time" + "github.com/syndtr/goleveldb/leveldb" + "github.com/ethersphere/bee/pkg/storage" "github.com/ethersphere/bee/pkg/swarm" "github.com/ethersphere/bee/pkg/tags" - "github.com/syndtr/goleveldb/leveldb" ) // Set updates database indexes for @@ -180,11 +181,18 @@ func (db *DB) setAccess(batch *leveldb.Batch, binIDs map[uint8]uint64, addr swar if err != nil { return 0, err } - err = db.gcIndex.PutInBatch(batch, item) + + ok, err := db.pinIndex.Has(item) if err != nil { return 0, err } - gcSizeChange++ + if !ok { + err = db.gcIndex.PutInBatch(batch, item) + if err != nil { + return 0, err + } + gcSizeChange++ + } return gcSizeChange, nil }