diff --git a/swarm/chunk/chunk.go b/swarm/chunk/chunk.go index c57d55ecc4..c8551814c2 100644 --- a/swarm/chunk/chunk.go +++ b/swarm/chunk/chunk.go @@ -164,7 +164,7 @@ func (d *Descriptor) String() string { type Store interface { Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error) - Put(ctx context.Context, mode ModePut, ch Chunk) (err error) + Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error) Has(ctx context.Context, addr Address) (yes bool, err error) Set(ctx context.Context, mode ModeSet, addr Address) (err error) LastPullSubscriptionBinID(bin uint8) (id uint64, err error) @@ -202,11 +202,11 @@ func NewValidatorStore(store Store, validators ...Validator) (s *ValidatorStore) // Put overrides Store put method with validators check. If one of the validators // return true, the chunk is considered valid and Store Put method is called. // If all validators return false, ErrChunkInvalid is returned. -func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, ch Chunk) (err error) { +func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error) { for _, v := range s.validators { if v.Validate(ch) { return s.Store.Put(ctx, mode, ch) } } - return ErrChunkInvalid + return false, ErrChunkInvalid } diff --git a/swarm/network/stream/common_test.go b/swarm/network/stream/common_test.go index 9c589c0378..1b2812f4ff 100644 --- a/swarm/network/stream/common_test.go +++ b/swarm/network/stream/common_test.go @@ -236,7 +236,7 @@ func (rrs *roundRobinStore) Get(_ context.Context, _ chunk.ModeGet, _ storage.Ad return nil, errors.New("roundRobinStore doesn't support Get") } -func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, ch storage.Chunk) error { +func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, ch storage.Chunk) (bool, error) { i := atomic.AddUint32(&rrs.index, 1) idx := int(i) % len(rrs.stores) return rrs.stores[idx].Put(ctx, mode, ch) diff --git a/swarm/network/stream/delivery.go b/swarm/network/stream/delivery.go index 6fa9f3ad19..0596667239 100644 --- a/swarm/network/stream/delivery.go +++ b/swarm/network/stream/delivery.go @@ -262,7 +262,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int msg.peer = sp log.Trace("handle.chunk.delivery", "put", msg.Addr) - err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData)) + _, err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData)) if err != nil { if err == storage.ErrChunkInvalid { // we removed this log because it spams the logs diff --git a/swarm/network/stream/delivery_test.go b/swarm/network/stream/delivery_test.go index e7f1e01c6b..801e6d98ac 100644 --- a/swarm/network/stream/delivery_test.go +++ b/swarm/network/stream/delivery_test.go @@ -191,7 +191,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { hash := storage.Address(hash0[:]) ch := storage.NewChunk(hash, hash) - err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch) + _, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch) if err != nil { t.Fatalf("Expected no err got %v", err) } @@ -243,7 +243,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) { hash = storage.Address(hash1[:]) ch = storage.NewChunk(hash, hash1[:]) - err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch) + _, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch) if err != nil { t.Fatalf("Expected no err got %v", err) } diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index a4e7120dfa..100e778a39 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -70,8 +70,9 @@ func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error for i := int64(0); i < int64(n); i++ { ch := f(chunk.DefaultSize) go func() { + _, err := store.Put(ctx, chunk.ModePutUpload, ch) select { - case errc <- store.Put(ctx, chunk.ModePutUpload, ch): + case errc <- err: case <-ctx.Done(): } }() @@ -224,11 +225,12 @@ func NewMapChunkStore() *MapChunkStore { } } -func (m *MapChunkStore) Put(_ context.Context, _ chunk.ModePut, ch Chunk) error { +func (m *MapChunkStore) Put(_ context.Context, _ chunk.ModePut, ch Chunk) (bool, error) { m.mu.Lock() defer m.mu.Unlock() + _, exists := m.chunks[ch.Address().Hex()] m.chunks[ch.Address().Hex()] = ch - return nil + return exists, nil } func (m *MapChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Chunk, error) { diff --git a/swarm/storage/feed/handler_test.go b/swarm/storage/feed/handler_test.go index e8b6c9f916..c4f6fe689e 100644 --- a/swarm/storage/feed/handler_test.go +++ b/swarm/storage/feed/handler_test.go @@ -446,15 +446,15 @@ func TestValidatorInStore(t *testing.T) { } // put the chunks in the store and check their error status - err = store.Put(context.Background(), chunk.ModePutUpload, goodChunk) + _, err = store.Put(context.Background(), chunk.ModePutUpload, goodChunk) if err == nil { t.Fatal("expected error on good content address chunk with feed update validator only, but got nil") } - err = store.Put(context.Background(), chunk.ModePutUpload, badChunk) + _, err = store.Put(context.Background(), chunk.ModePutUpload, badChunk) if err == nil { t.Fatal("expected error on bad content address chunk with feed update validator only, but got nil") } - err = store.Put(context.Background(), chunk.ModePutUpload, uglyChunk) + _, err = store.Put(context.Background(), chunk.ModePutUpload, uglyChunk) if err != nil { t.Fatalf("expected no error on feed update chunk with feed update validator only, but got: %s", err) } diff --git a/swarm/storage/hasherstore.go b/swarm/storage/hasherstore.go index b7880da2fc..2e4a1c11b0 100644 --- a/swarm/storage/hasherstore.go +++ b/swarm/storage/hasherstore.go @@ -242,8 +242,9 @@ func (h *hasherStore) newDataEncryption(key encryption.Key) encryption.Encryptio func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) { atomic.AddUint64(&h.nrChunks, 1) go func() { + _, err := h.store.Put(ctx, chunk.ModePutUpload, ch) select { - case h.errC <- h.store.Put(ctx, chunk.ModePutUpload, ch): + case h.errC <- err: case <-h.quitC: } }() diff --git a/swarm/storage/localstore/export.go b/swarm/storage/localstore/export.go index 42585d59df..e7d191f4da 100644 --- a/swarm/storage/localstore/export.go +++ b/swarm/storage/localstore/export.go @@ -157,8 +157,9 @@ func (db *DB) Import(r io.Reader) (count int64, err error) { } go func() { + _, err := db.Put(ctx, chunk.ModePutUpload, ch) select { - case errC <- db.Put(ctx, chunk.ModePutUpload, ch): + case errC <- err: case <-ctx.Done(): } }() diff --git a/swarm/storage/localstore/export_test.go b/swarm/storage/localstore/export_test.go index e1644421bd..6aa24b0b7c 100644 --- a/swarm/storage/localstore/export_test.go +++ b/swarm/storage/localstore/export_test.go @@ -37,7 +37,7 @@ func TestExportImport(t *testing.T) { for i := 0; i < chunkCount; i++ { ch := generateTestRandomChunk() - err := db1.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db1.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } diff --git a/swarm/storage/localstore/gc_test.go b/swarm/storage/localstore/gc_test.go index 53f4561f59..4a6e0a5f41 100644 --- a/swarm/storage/localstore/gc_test.go +++ b/swarm/storage/localstore/gc_test.go @@ -70,7 +70,7 @@ func testDB_collectGarbageWorker(t *testing.T) { for i := 0; i < chunkCount; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -143,7 +143,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { for i := 0; i < int(db.capacity)-1; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -186,7 +186,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) { // upload and sync another chunk to trigger // garbage collection ch := generateTestRandomChunk() - err = db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err = db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -275,7 +275,7 @@ func TestDB_gcSize(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } diff --git a/swarm/storage/localstore/index_test.go b/swarm/storage/localstore/index_test.go index d5e94096ff..0f23aa10a2 100644 --- a/swarm/storage/localstore/index_test.go +++ b/swarm/storage/localstore/index_test.go @@ -44,7 +44,7 @@ func TestDB_pullIndex(t *testing.T) { for i := 0; i < chunkCount; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -89,7 +89,7 @@ func TestDB_gcIndex(t *testing.T) { for i := 0; i < chunkCount; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } diff --git a/swarm/storage/localstore/localstore_test.go b/swarm/storage/localstore/localstore_test.go index 2dc5e0d462..6dbc4b7ad0 100644 --- a/swarm/storage/localstore/localstore_test.go +++ b/swarm/storage/localstore/localstore_test.go @@ -62,7 +62,7 @@ func TestDB(t *testing.T) { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -116,7 +116,7 @@ func TestDB_updateGCSem(t *testing.T) { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } diff --git a/swarm/storage/localstore/mode_get_test.go b/swarm/storage/localstore/mode_get_test.go index 06f8e4b69e..217fa5d2d2 100644 --- a/swarm/storage/localstore/mode_get_test.go +++ b/swarm/storage/localstore/mode_get_test.go @@ -37,7 +37,7 @@ func TestModeGetRequest(t *testing.T) { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -149,7 +149,7 @@ func TestModeGetSync(t *testing.T) { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } diff --git a/swarm/storage/localstore/mode_has_test.go b/swarm/storage/localstore/mode_has_test.go index db2f6bfbcd..043b21a2b1 100644 --- a/swarm/storage/localstore/mode_has_test.go +++ b/swarm/storage/localstore/mode_has_test.go @@ -31,7 +31,7 @@ func TestHas(t *testing.T) { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } diff --git a/swarm/storage/localstore/mode_put.go b/swarm/storage/localstore/mode_put.go index 5691f92b94..488e4d8e1a 100644 --- a/swarm/storage/localstore/mode_put.go +++ b/swarm/storage/localstore/mode_put.go @@ -28,7 +28,7 @@ import ( // on the Putter mode, it updates required indexes. // Put is required to implement chunk.Store // interface. -func (db *DB) Put(_ context.Context, mode chunk.ModePut, ch chunk.Chunk) (err error) { +func (db *DB) Put(_ context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) { return db.put(mode, chunkToItem(ch)) } @@ -37,7 +37,7 @@ func (db *DB) Put(_ context.Context, mode chunk.ModePut, ch chunk.Chunk) (err er // of this function for the same address in parallel. // Item fields Address and Data must not be // with their nil values. -func (db *DB) put(mode chunk.ModePut, item shed.Item) (err error) { +func (db *DB) put(mode chunk.ModePut, item shed.Item) (exists bool, err error) { // protect parallel updates db.batchMu.Lock() defer db.batchMu.Unlock() @@ -59,21 +59,25 @@ func (db *DB) put(mode chunk.ModePut, item shed.Item) (err error) { i, err := db.retrievalAccessIndex.Get(item) switch err { case nil: + exists = true item.AccessTimestamp = i.AccessTimestamp case leveldb.ErrNotFound: + exists = false // no chunk accesses default: - return err + return false, err } i, err = db.retrievalDataIndex.Get(item) switch err { case nil: + exists = true item.StoreTimestamp = i.StoreTimestamp item.BinID = i.BinID case leveldb.ErrNotFound: // no chunk accesses + exists = false default: - return err + return false, err } if item.AccessTimestamp != 0 { // delete current entry from the gc index @@ -86,7 +90,7 @@ func (db *DB) put(mode chunk.ModePut, item shed.Item) (err error) { if item.BinID == 0 { item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address))) if err != nil { - return err + return false, err } } // update access timestamp @@ -102,15 +106,15 @@ func (db *DB) put(mode chunk.ModePut, item shed.Item) (err error) { case chunk.ModePutUpload: // put to indexes: retrieve, push, pull - has, err := db.retrievalDataIndex.Has(item) + exists, err = db.retrievalDataIndex.Has(item) if err != nil { - return err + return false, err } - if !has { + if !exists { item.StoreTimestamp = now() item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address))) if err != nil { - return err + return false, err } db.retrievalDataIndex.PutInBatch(batch, item) db.pullIndex.PutInBatch(batch, item) @@ -122,15 +126,15 @@ func (db *DB) put(mode chunk.ModePut, item shed.Item) (err error) { case chunk.ModePutSync: // put to indexes: retrieve, pull - has, err := db.retrievalDataIndex.Has(item) + exists, err = db.retrievalDataIndex.Has(item) if err != nil { - return err + return exists, err } - if !has { + if !exists { item.StoreTimestamp = now() item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address))) if err != nil { - return err + return false, err } db.retrievalDataIndex.PutInBatch(batch, item) db.pullIndex.PutInBatch(batch, item) @@ -138,17 +142,17 @@ func (db *DB) put(mode chunk.ModePut, item shed.Item) (err error) { } default: - return ErrInvalidMode + return false, ErrInvalidMode } err = db.incGCSizeInBatch(batch, gcSizeChange) if err != nil { - return err + return false, err } err = db.shed.WriteBatch(batch) if err != nil { - return err + return false, err } if triggerPullFeed { db.triggerPullSubscriptions(db.po(item.Address)) @@ -156,5 +160,5 @@ func (db *DB) put(mode chunk.ModePut, item shed.Item) (err error) { if triggerPushFeed { db.triggerPushSubscriptions() } - return nil + return exists, nil } diff --git a/swarm/storage/localstore/mode_put_test.go b/swarm/storage/localstore/mode_put_test.go index 225ba8280f..5376aa8b38 100644 --- a/swarm/storage/localstore/mode_put_test.go +++ b/swarm/storage/localstore/mode_put_test.go @@ -45,7 +45,7 @@ func TestModePutRequest(t *testing.T) { storeTimestamp = wantTimestamp - err := db.Put(context.Background(), chunk.ModePutRequest, ch) + _, err := db.Put(context.Background(), chunk.ModePutRequest, ch) if err != nil { t.Fatal(err) } @@ -63,7 +63,7 @@ func TestModePutRequest(t *testing.T) { return wantTimestamp })() - err := db.Put(context.Background(), chunk.ModePutRequest, ch) + _, err := db.Put(context.Background(), chunk.ModePutRequest, ch) if err != nil { t.Fatal(err) } @@ -88,7 +88,7 @@ func TestModePutSync(t *testing.T) { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutSync, ch) + _, err := db.Put(context.Background(), chunk.ModePutSync, ch) if err != nil { t.Fatal(err) } @@ -110,7 +110,7 @@ func TestModePutUpload(t *testing.T) { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -145,7 +145,7 @@ func TestModePutUpload_parallel(t *testing.T) { if !ok { return } - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) select { case errChan <- err: case <-doneChan: @@ -235,10 +235,20 @@ func TestModePut_sameChunk(t *testing.T) { defer cleanupFunc() for i := 0; i < 10; i++ { - err := db.Put(context.Background(), tc.mode, ch) + exists, err := db.Put(context.Background(), tc.mode, ch) if err != nil { t.Fatal(err) } + switch exists { + case false: + if i != 0 { + t.Fatal("should not exist only on first Put") + } + case true: + if i == 0 { + t.Fatal("should exist on all cases other than the first one") + } + } count := func(b bool) (c int) { if b { @@ -336,7 +346,9 @@ func benchmarkPutUpload(b *testing.B, o *Options, count, maxParallelUploads int) go func(i int) { defer func() { <-sem }() - errs <- db.Put(context.Background(), chunk.ModePutUpload, chunks[i]) + + _, err := db.Put(context.Background(), chunk.ModePutUpload, chunks[i]) + errs <- err }(i) } }() diff --git a/swarm/storage/localstore/mode_set_test.go b/swarm/storage/localstore/mode_set_test.go index d9182a1580..9ba62cd206 100644 --- a/swarm/storage/localstore/mode_set_test.go +++ b/swarm/storage/localstore/mode_set_test.go @@ -65,7 +65,7 @@ func TestModeSetSync(t *testing.T) { return wantTimestamp })() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -93,7 +93,7 @@ func TestModeSetRemove(t *testing.T) { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } diff --git a/swarm/storage/localstore/retrieval_index_test.go b/swarm/storage/localstore/retrieval_index_test.go index 43fa77786a..4ca2e32e61 100644 --- a/swarm/storage/localstore/retrieval_index_test.go +++ b/swarm/storage/localstore/retrieval_index_test.go @@ -65,7 +65,7 @@ func benchmarkRetrievalIndexes(b *testing.B, o *Options, count int) { addrs := make([]chunk.Address, count) for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { b.Fatal(err) } @@ -139,7 +139,7 @@ func benchmarkUpload(b *testing.B, o *Options, count int) { b.StartTimer() for i := 0; i < count; i++ { - err := db.Put(context.Background(), chunk.ModePutUpload, chunks[i]) + _, err := db.Put(context.Background(), chunk.ModePutUpload, chunks[i]) if err != nil { b.Fatal(err) } diff --git a/swarm/storage/localstore/subscription_pull_test.go b/swarm/storage/localstore/subscription_pull_test.go index 0b38e098fd..bf364ed444 100644 --- a/swarm/storage/localstore/subscription_pull_test.go +++ b/swarm/storage/localstore/subscription_pull_test.go @@ -147,7 +147,7 @@ func TestDB_SubscribePull_since(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -226,7 +226,7 @@ func TestDB_SubscribePull_until(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -305,7 +305,7 @@ func TestDB_SubscribePull_sinceAndUntil(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -383,7 +383,7 @@ func uploadRandomChunksBin(t *testing.T, db *DB, addrs map[uint8][]chunk.Address for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -483,7 +483,7 @@ func TestDB_LastPullSubscriptionBinID(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } diff --git a/swarm/storage/localstore/subscription_push_test.go b/swarm/storage/localstore/subscription_push_test.go index e9f629e385..6124a534bd 100644 --- a/swarm/storage/localstore/subscription_push_test.go +++ b/swarm/storage/localstore/subscription_push_test.go @@ -44,7 +44,7 @@ func TestDB_SubscribePush(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } @@ -130,7 +130,7 @@ func TestDB_SubscribePush_multiple(t *testing.T) { for i := 0; i < count; i++ { ch := generateTestRandomChunk() - err := db.Put(context.Background(), chunk.ModePutUpload, ch) + _, err := db.Put(context.Background(), chunk.ModePutUpload, ch) if err != nil { t.Fatal(err) } diff --git a/swarm/storage/netstore.go b/swarm/storage/netstore.go index 2d8aee2210..b675384ce7 100644 --- a/swarm/storage/netstore.go +++ b/swarm/storage/netstore.go @@ -76,14 +76,14 @@ func NewNetStore(store chunk.Store, nnf NewNetFetcherFunc) (*NetStore, error) { // Put stores a chunk in localstore, and delivers to all requestor peers using the fetcher stored in // the fetchers cache -func (n *NetStore) Put(ctx context.Context, mode chunk.ModePut, ch Chunk) error { +func (n *NetStore) Put(ctx context.Context, mode chunk.ModePut, ch Chunk) (bool, error) { n.mu.Lock() defer n.mu.Unlock() // put to the chunk to the store, there should be no error - err := n.Store.Put(ctx, mode, ch) + exists, err := n.Store.Put(ctx, mode, ch) if err != nil { - return err + return exists, err } // if chunk is now put in the store, check if there was an active fetcher and call deliver on it @@ -93,7 +93,7 @@ func (n *NetStore) Put(ctx context.Context, mode chunk.ModePut, ch Chunk) error log.Trace("n.getFetcher deliver", "ref", ch.Address()) f.deliver(ctx, ch) } - return nil + return exists, nil } // Get retrieves the chunk from the NetStore DPA synchronously. diff --git a/swarm/storage/netstore_test.go b/swarm/storage/netstore_test.go index 621fbe75a2..dc0727987f 100644 --- a/swarm/storage/netstore_test.go +++ b/swarm/storage/netstore_test.go @@ -131,7 +131,7 @@ func TestNetStoreGetAndPut(t *testing.T) { return } - err := netStore.Put(ctx, chunk.ModePutRequest, ch) + _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) if err != nil { putErrC <- fmt.Errorf("Expected no err got %v", err) return @@ -181,7 +181,7 @@ func TestNetStoreGetAfterPut(t *testing.T) { defer cancel() // First we Put the chunk, so the chunk will be available locally - err := netStore.Put(ctx, chunk.ModePutRequest, ch) + _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) if err != nil { t.Fatalf("Expected no err got %v", err) } @@ -331,7 +331,7 @@ func TestNetStoreMultipleGetAndPut(t *testing.T) { putErrC <- errors.New("Expected netStore to use one fetcher for all Get calls") return } - err := netStore.Put(ctx, chunk.ModePutRequest, ch) + _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) if err != nil { putErrC <- fmt.Errorf("Expected no err got %v", err) return @@ -438,7 +438,7 @@ func TestNetStoreFetchFuncAfterPut(t *testing.T) { defer cancel() // We deliver the created the chunk with a Put - err := netStore.Put(ctx, chunk.ModePutRequest, ch) + _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) if err != nil { t.Fatalf("Expected no err got %v", err) } @@ -610,7 +610,7 @@ func TestNetStoreFetchFuncCalledMultipleTimes(t *testing.T) { } // Deliver the chunk with a Put - err := netStore.Put(ctx, chunk.ModePutRequest, ch) + _, err := netStore.Put(ctx, chunk.ModePutRequest, ch) if err != nil { t.Fatalf("Expected no err got %v", err) } diff --git a/swarm/storage/types.go b/swarm/storage/types.go index 0c7b16a671..d1d47dbe85 100644 --- a/swarm/storage/types.go +++ b/swarm/storage/types.go @@ -218,8 +218,8 @@ type FakeChunkStore struct { } // Put doesn't store anything it is just here to implement ChunkStore -func (f *FakeChunkStore) Put(_ context.Context, _ chunk.ModePut, ch Chunk) error { - return nil +func (f *FakeChunkStore) Put(_ context.Context, _ chunk.ModePut, ch Chunk) (bool, error) { + return false, nil } // Has doesn't do anything it is just here to implement ChunkStore