Skip to content
This repository was archived by the owner on Aug 2, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions swarm/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (d *Descriptor) String() string {

type Store interface {
Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error)
Put(ctx context.Context, mode ModePut, ch Chunk) (err error)
Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error)
Has(ctx context.Context, addr Address) (yes bool, err error)
Set(ctx context.Context, mode ModeSet, addr Address) (err error)
LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
Expand Down Expand Up @@ -202,11 +202,11 @@ func NewValidatorStore(store Store, validators ...Validator) (s *ValidatorStore)
// Put overrides Store put method with validators check. If one of the validators
// return true, the chunk is considered valid and Store Put method is called.
// If all validators return false, ErrChunkInvalid is returned.
func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, ch Chunk) (err error) {
func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error) {
for _, v := range s.validators {
if v.Validate(ch) {
return s.Store.Put(ctx, mode, ch)
}
}
return ErrChunkInvalid
return false, ErrChunkInvalid
}
2 changes: 1 addition & 1 deletion swarm/network/stream/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (rrs *roundRobinStore) Get(_ context.Context, _ chunk.ModeGet, _ storage.Ad
return nil, errors.New("roundRobinStore doesn't support Get")
}

func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, ch storage.Chunk) error {
func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, ch storage.Chunk) (bool, error) {
i := atomic.AddUint32(&rrs.index, 1)
idx := int(i) % len(rrs.stores)
return rrs.stores[idx].Put(ctx, mode, ch)
Expand Down
2 changes: 1 addition & 1 deletion swarm/network/stream/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func (d *Delivery) handleChunkDeliveryMsg(ctx context.Context, sp *Peer, req int

msg.peer = sp
log.Trace("handle.chunk.delivery", "put", msg.Addr)
err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
_, err := d.chunkStore.Put(ctx, mode, storage.NewChunk(msg.Addr, msg.SData))
if err != nil {
if err == storage.ErrChunkInvalid {
// we removed this log because it spams the logs
Expand Down
4 changes: 2 additions & 2 deletions swarm/network/stream/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {

hash := storage.Address(hash0[:])
ch := storage.NewChunk(hash, hash)
err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch)
_, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatalf("Expected no err got %v", err)
}
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {

hash = storage.Address(hash1[:])
ch = storage.NewChunk(hash, hash1[:])
err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch)
_, err = localStore.Put(context.TODO(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatalf("Expected no err got %v", err)
}
Expand Down
8 changes: 5 additions & 3 deletions swarm/storage/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ func mput(store ChunkStore, n int, f func(i int64) Chunk) (hs []Chunk, err error
for i := int64(0); i < int64(n); i++ {
ch := f(chunk.DefaultSize)
go func() {
_, err := store.Put(ctx, chunk.ModePutUpload, ch)
select {
case errc <- store.Put(ctx, chunk.ModePutUpload, ch):
case errc <- err:
case <-ctx.Done():
}
}()
Expand Down Expand Up @@ -224,11 +225,12 @@ func NewMapChunkStore() *MapChunkStore {
}
}

func (m *MapChunkStore) Put(_ context.Context, _ chunk.ModePut, ch Chunk) error {
func (m *MapChunkStore) Put(_ context.Context, _ chunk.ModePut, ch Chunk) (bool, error) {
m.mu.Lock()
defer m.mu.Unlock()
_, exists := m.chunks[ch.Address().Hex()]
m.chunks[ch.Address().Hex()] = ch
return nil
return exists, nil
}

func (m *MapChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Chunk, error) {
Expand Down
6 changes: 3 additions & 3 deletions swarm/storage/feed/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,15 +446,15 @@ func TestValidatorInStore(t *testing.T) {
}

// put the chunks in the store and check their error status
err = store.Put(context.Background(), chunk.ModePutUpload, goodChunk)
_, err = store.Put(context.Background(), chunk.ModePutUpload, goodChunk)
if err == nil {
t.Fatal("expected error on good content address chunk with feed update validator only, but got nil")
}
err = store.Put(context.Background(), chunk.ModePutUpload, badChunk)
_, err = store.Put(context.Background(), chunk.ModePutUpload, badChunk)
if err == nil {
t.Fatal("expected error on bad content address chunk with feed update validator only, but got nil")
}
err = store.Put(context.Background(), chunk.ModePutUpload, uglyChunk)
_, err = store.Put(context.Background(), chunk.ModePutUpload, uglyChunk)
if err != nil {
t.Fatalf("expected no error on feed update chunk with feed update validator only, but got: %s", err)
}
Expand Down
3 changes: 2 additions & 1 deletion swarm/storage/hasherstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ func (h *hasherStore) newDataEncryption(key encryption.Key) encryption.Encryptio
func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) {
atomic.AddUint64(&h.nrChunks, 1)
go func() {
_, err := h.store.Put(ctx, chunk.ModePutUpload, ch)
select {
case h.errC <- h.store.Put(ctx, chunk.ModePutUpload, ch):
case h.errC <- err:
case <-h.quitC:
}
}()
Expand Down
3 changes: 2 additions & 1 deletion swarm/storage/localstore/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ func (db *DB) Import(r io.Reader) (count int64, err error) {
}

go func() {
_, err := db.Put(ctx, chunk.ModePutUpload, ch)
select {
case errC <- db.Put(ctx, chunk.ModePutUpload, ch):
case errC <- err:
case <-ctx.Done():
}
}()
Expand Down
2 changes: 1 addition & 1 deletion swarm/storage/localstore/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestExportImport(t *testing.T) {
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()

err := db1.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db1.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 4 additions & 4 deletions swarm/storage/localstore/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func testDB_collectGarbageWorker(t *testing.T) {
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()

err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
for i := 0; i < int(db.capacity)-1; i++ {
ch := generateTestRandomChunk()

err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestDB_collectGarbageWorker_withRequests(t *testing.T) {
// upload and sync another chunk to trigger
// garbage collection
ch := generateTestRandomChunk()
err = db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err = db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -275,7 +275,7 @@ func TestDB_gcSize(t *testing.T) {
for i := 0; i < count; i++ {
ch := generateTestRandomChunk()

err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions swarm/storage/localstore/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestDB_pullIndex(t *testing.T) {
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()

err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestDB_gcIndex(t *testing.T) {
for i := 0; i < chunkCount; i++ {
ch := generateTestRandomChunk()

err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions swarm/storage/localstore/localstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestDB(t *testing.T) {

ch := generateTestRandomChunk()

err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestDB_updateGCSem(t *testing.T) {

ch := generateTestRandomChunk()

err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions swarm/storage/localstore/mode_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestModeGetRequest(t *testing.T) {

ch := generateTestRandomChunk()

err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestModeGetSync(t *testing.T) {

ch := generateTestRandomChunk()

err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 1 addition & 1 deletion swarm/storage/localstore/mode_has_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestHas(t *testing.T) {

ch := generateTestRandomChunk()

err := db.Put(context.Background(), chunk.ModePutUpload, ch)
_, err := db.Put(context.Background(), chunk.ModePutUpload, ch)
if err != nil {
t.Fatal(err)
}
Expand Down
38 changes: 21 additions & 17 deletions swarm/storage/localstore/mode_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// on the Putter mode, it updates required indexes.
// Put is required to implement chunk.Store
// interface.
func (db *DB) Put(_ context.Context, mode chunk.ModePut, ch chunk.Chunk) (err error) {
func (db *DB) Put(_ context.Context, mode chunk.ModePut, ch chunk.Chunk) (exists bool, err error) {
return db.put(mode, chunkToItem(ch))
}

Expand All @@ -37,7 +37,7 @@ func (db *DB) Put(_ context.Context, mode chunk.ModePut, ch chunk.Chunk) (err er
// of this function for the same address in parallel.
// Item fields Address and Data must not be
// with their nil values.
func (db *DB) put(mode chunk.ModePut, item shed.Item) (err error) {
func (db *DB) put(mode chunk.ModePut, item shed.Item) (exists bool, err error) {
// protect parallel updates
db.batchMu.Lock()
defer db.batchMu.Unlock()
Expand All @@ -59,21 +59,25 @@ func (db *DB) put(mode chunk.ModePut, item shed.Item) (err error) {
i, err := db.retrievalAccessIndex.Get(item)
switch err {
case nil:
exists = true
item.AccessTimestamp = i.AccessTimestamp
case leveldb.ErrNotFound:
exists = false
// no chunk accesses
default:
return err
return false, err
}
i, err = db.retrievalDataIndex.Get(item)
switch err {
case nil:
exists = true
item.StoreTimestamp = i.StoreTimestamp
item.BinID = i.BinID
case leveldb.ErrNotFound:
// no chunk accesses
exists = false
default:
return err
return false, err
}
if item.AccessTimestamp != 0 {
// delete current entry from the gc index
Expand All @@ -86,7 +90,7 @@ func (db *DB) put(mode chunk.ModePut, item shed.Item) (err error) {
if item.BinID == 0 {
item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
if err != nil {
return err
return false, err
}
}
// update access timestamp
Expand All @@ -102,15 +106,15 @@ func (db *DB) put(mode chunk.ModePut, item shed.Item) (err error) {
case chunk.ModePutUpload:
// put to indexes: retrieve, push, pull

has, err := db.retrievalDataIndex.Has(item)
exists, err = db.retrievalDataIndex.Has(item)
if err != nil {
return err
return false, err
}
if !has {
if !exists {
item.StoreTimestamp = now()
item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
if err != nil {
return err
return false, err
}
db.retrievalDataIndex.PutInBatch(batch, item)
db.pullIndex.PutInBatch(batch, item)
Expand All @@ -122,39 +126,39 @@ func (db *DB) put(mode chunk.ModePut, item shed.Item) (err error) {
case chunk.ModePutSync:
// put to indexes: retrieve, pull

has, err := db.retrievalDataIndex.Has(item)
exists, err = db.retrievalDataIndex.Has(item)
if err != nil {
return err
return exists, err
}
if !has {
if !exists {
item.StoreTimestamp = now()
item.BinID, err = db.binIDs.IncInBatch(batch, uint64(db.po(item.Address)))
if err != nil {
return err
return false, err
}
db.retrievalDataIndex.PutInBatch(batch, item)
db.pullIndex.PutInBatch(batch, item)
triggerPullFeed = true
}

default:
return ErrInvalidMode
return false, ErrInvalidMode
}

err = db.incGCSizeInBatch(batch, gcSizeChange)
if err != nil {
return err
return false, err
}

err = db.shed.WriteBatch(batch)
if err != nil {
return err
return false, err
}
if triggerPullFeed {
db.triggerPullSubscriptions(db.po(item.Address))
}
if triggerPushFeed {
db.triggerPushSubscriptions()
}
return nil
return exists, nil
}
Loading