-
-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mega update #4610
mega update #4610
Changes from 21 commits
fe8846f
d82b527
8899e98
281d5ea
b90d7bd
f6d507b
fdb0046
49569a6
730754d
d154b4a
0d12a97
699b2c4
1159abd
97cb6a0
d73bf86
e049228
118ecb2
d0998a9
5acbecc
faae63f
1054826
f1aba97
f9d935b
87c6914
2379aa0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,45 +7,59 @@ import ( | |
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
|
||
"github.com/ipfs/go-ipfs/blocks/blockstore" | ||
exchange "github.com/ipfs/go-ipfs/exchange" | ||
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap" | ||
|
||
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" | ||
blocks "gx/ipfs/QmYsEQydGrsxNZfAiskvQ76N2xE9hDQtSAkRSynwMiUK3c/go-block-format" | ||
cid "gx/ipfs/QmeSrf6pzut73u6zLQkRFQ3ygt3k6XFT2kjdYP8Tnkwwyg/go-cid" | ||
cid "gx/ipfs/QmcZfnkapfECQGcLZaf9B79NRg7cRa9EnZh4LSbkCzwNvY/go-cid" | ||
blocks "gx/ipfs/Qmej7nf81hi2x2tvjRBF3mcp74sQyuDH4VMYDGd1YtXjb2/go-block-format" | ||
) | ||
|
||
var log = logging.Logger("blockservice") | ||
|
||
var ErrNotFound = errors.New("blockservice: key not found") | ||
|
||
// BlockGetter is the common interface shared between blockservice sessions and | ||
// the blockservice. | ||
type BlockGetter interface { | ||
// GetBlock gets the requested block. | ||
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) | ||
|
||
// GetBlocks does a batch request for the given cids, returning blocks as | ||
// they are found, in no particular order. | ||
// | ||
// It may not be able to find all requested blocks (or the context may | ||
// be canceled). In that case, it will close the channel early. It is up | ||
// to the consumer to detect this situation and keep track which blocks | ||
// it has received and which it hasn't. | ||
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block | ||
} | ||
|
||
// BlockService is a hybrid block datastore. It stores data in a local | ||
// datastore and may retrieve data from a remote Exchange. | ||
// It uses an internal `datastore.Datastore` instance to store values. | ||
type BlockService interface { | ||
io.Closer | ||
BlockGetter | ||
|
||
// Blockstore returns a reference to the underlying blockstore | ||
Blockstore() blockstore.Blockstore | ||
|
||
// Exchange returns a reference to the underlying exchange (usually bitswap) | ||
Exchange() exchange.Interface | ||
|
||
// AddBlock puts a given block to the underlying datastore | ||
AddBlock(o blocks.Block) (*cid.Cid, error) | ||
AddBlock(o blocks.Block) error | ||
|
||
// AddBlocks adds a slice of blocks at the same time using batching | ||
// capabilities of the underlying datastore whenever possible. | ||
AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) | ||
AddBlocks(bs []blocks.Block) error | ||
|
||
GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) | ||
DeleteBlock(o blocks.Block) error | ||
|
||
// GetBlocks does a batch request for the given cids, returning blocks as | ||
// they are found, in no particular order. | ||
GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block | ||
|
||
Close() error | ||
// DeleteBlock deletes the given block from the blockservice. | ||
DeleteBlock(o *cid.Cid) error | ||
} | ||
|
||
type blockService struct { | ||
|
@@ -83,12 +97,14 @@ func NewWriteThrough(bs blockstore.Blockstore, rem exchange.Interface) BlockServ | |
} | ||
} | ||
|
||
func (bs *blockService) Blockstore() blockstore.Blockstore { | ||
return bs.blockstore | ||
// Blockstore returns the blockstore behind this blockservice. | ||
func (s *blockService) Blockstore() blockstore.Blockstore { | ||
return s.blockstore | ||
} | ||
|
||
func (bs *blockService) Exchange() exchange.Interface { | ||
return bs.exchange | ||
// Exchange returns the exchange behind this blockservice. | ||
func (s *blockService) Exchange() exchange.Interface { | ||
return s.exchange | ||
} | ||
|
||
// NewSession creates a bitswap session that allows for controlled exchange of | ||
|
@@ -110,38 +126,34 @@ func NewSession(ctx context.Context, bs BlockService) *Session { | |
|
||
// AddBlock adds a particular block to the service, Putting it into the datastore. | ||
// TODO pass a context into this if the remote.HasBlock is going to remain here. | ||
func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) { | ||
func (s *blockService) AddBlock(o blocks.Block) error { | ||
c := o.Cid() | ||
if s.checkFirst { | ||
has, err := s.blockstore.Has(c) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
if has { | ||
return c, nil | ||
if has, err := s.blockstore.Has(c); has || err != nil { | ||
return err | ||
} | ||
} | ||
|
||
err := s.blockstore.Put(o) | ||
if err != nil { | ||
return nil, err | ||
if err := s.blockstore.Put(o); err != nil { | ||
return err | ||
} | ||
|
||
if err := s.exchange.HasBlock(o); err != nil { | ||
return nil, errors.New("blockservice is closed") | ||
// TODO(stebalien): really an error? | ||
return errors.New("blockservice is closed") | ||
} | ||
|
||
return c, nil | ||
return nil | ||
} | ||
|
||
func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { | ||
func (s *blockService) AddBlocks(bs []blocks.Block) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are we losing any functionality by dropping the cids from the return type? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really. We can get the CIDs from the blocks so there's no need to return them here. It just complicates the blockservice (and dagservice which is why I removed this). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although based on the current implementation we are not losing any functionally here. If There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wouldn't call that an enhancement There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair enough. But it is still possible for something to go wrong while adding the blocks, such as running out of space. However, since the underlying BlockStore does not indicate which blocks where added there is no way to get this information. I will make a separate issue for this so we can figure out a proper solution. |
||
var toput []blocks.Block | ||
if s.checkFirst { | ||
toput = make([]blocks.Block, 0, len(bs)) | ||
for _, b := range bs { | ||
has, err := s.blockstore.Has(b.Cid()) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
if !has { | ||
toput = append(toput, b) | ||
|
@@ -153,18 +165,16 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { | |
|
||
err := s.blockstore.PutMany(toput) | ||
if err != nil { | ||
return nil, err | ||
return err | ||
} | ||
|
||
var ks []*cid.Cid | ||
for _, o := range toput { | ||
if err := s.exchange.HasBlock(o); err != nil { | ||
return nil, fmt.Errorf("blockservice is closed (%s)", err) | ||
// TODO(stebalien): Should this really *return*? | ||
return fmt.Errorf("blockservice is closed (%s)", err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same? |
||
} | ||
|
||
ks = append(ks, o.Cid()) | ||
} | ||
return ks, nil | ||
return nil | ||
} | ||
|
||
// GetBlock retrieves a particular block from the service, | ||
|
@@ -256,8 +266,8 @@ func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f e | |
} | ||
|
||
// DeleteBlock deletes a block in the blockservice from the datastore | ||
func (s *blockService) DeleteBlock(o blocks.Block) error { | ||
return s.blockstore.DeleteBlock(o.Cid()) | ||
func (s *blockService) DeleteBlock(c *cid.Cid) error { | ||
return s.blockstore.DeleteBlock(c) | ||
} | ||
|
||
func (s *blockService) Close() error { | ||
|
@@ -280,3 +290,5 @@ func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error | |
func (s *Session) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { | ||
return getBlocks(ctx, ks, s.bs, s.ses) | ||
} | ||
|
||
var _ BlockGetter = (*Session)(nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@whyrusleeping Should we not just log and continue here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unclear. For now lets leave it as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair. Reported at #4623 (and the comment has been updated to point to the issue).