Skip to content
This repository has been archived by the owner on Jun 27, 2023. It is now read-only.

Commit

Permalink
Remove tiemout and refactor sizeBelowThreshold (#102)
Browse files Browse the repository at this point in the history
* fix(dir): remove timeout and evaluate switch *before* removing entry

* BLOCKING: temporary fix

* remove Shard cid not used and not maintained

* update blocking readme

* resolve FIXME

* fix: add back concurrent option in dag.walk

* decouple switch from directory downgrade
  • Loading branch information
schomatis authored Aug 26, 2021
1 parent 39d8e43 commit 6d48cca
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 91 deletions.
10 changes: 6 additions & 4 deletions hamt/hamt.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ func (ds *Shard) isValueNode() bool {

// A Shard represents the HAMT. It should be initialized with NewShard().
type Shard struct {
cid cid.Cid

childer *childer

tableSize int
Expand Down Expand Up @@ -123,7 +121,6 @@ func NewHamtFromDag(dserv ipld.DAGService, nd ipld.Node) (*Shard, error) {

ds.childer.makeChilder(fsn.Data(), pbnd.Links())

ds.cid = pbnd.Cid()
ds.hashFunc = fsn.HashType()
ds.builder = pbnd.CidBuilder()

Expand Down Expand Up @@ -355,7 +352,12 @@ func (ds *Shard) EnumLinksAsync(ctx context.Context) <-chan format.LinkResult {
defer cancel()
getLinks := makeAsyncTrieGetLinks(ds.dserv, linkResults)
cset := cid.NewSet()
err := dag.Walk(ctx, getLinks, ds.cid, cset.Visit, dag.Concurrent())
rootNode, err := ds.Node()
if err != nil {
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
return
}
err = dag.Walk(ctx, getLinks, rootNode.Cid(), cset.Visit, dag.Concurrent())
if err != nil {
emitResult(ctx, linkResults, format.LinkResult{Link: nil, Err: err})
}
Expand Down
147 changes: 60 additions & 87 deletions io/directory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ package io
import (
"context"
"fmt"
"os"
"time"

mdag "github.com/ipfs/go-merkledag"

format "github.com/ipfs/go-unixfs"
"github.com/ipfs/go-unixfs/hamt"
"os"

"github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
Expand All @@ -26,11 +23,6 @@ var log = logging.Logger("unixfs")
// ProtoNode doesn't use the Data field so this estimate is pretty accurate).
var HAMTShardingSize = 0

// Time in seconds allowed to fetch the shards to compute the size before
// returning an error.
// FIXME: Adjust to sane value.
var EvaluateHAMTTransitionTimeout = time.Duration(1)

// DefaultShardWidth is the default value used for hamt sharding width.
var DefaultShardWidth = 256

Expand Down Expand Up @@ -430,15 +422,42 @@ func (d *HAMTDirectory) removeFromSizeChange(name string, linkCid cid.Cid) {
d.sizeChange -= estimatedLinkSize(name, linkCid)
}

// Evaluate directory size and check if it's below HAMTShardingSize threshold
// (to trigger a transition to a BasicDirectory). It returns two `bool`s:
// * whether it's below (true) or equal/above (false)
// * whether the passed timeout to compute the size has been exceeded
// FIXME: Will be extended later to the `AddEntry` case.
func (d *HAMTDirectory) needsToSwitchToBasicDir(ctx context.Context, nameToRemove string) (switchToBasic bool, err error) {
if HAMTShardingSize == 0 { // Option disabled.
return false, nil
}

entryToRemove, err := d.shard.Find(ctx, nameToRemove)
if err == os.ErrNotExist {
// Nothing to remove, no point in evaluating a switch.
return false, nil
} else if err != nil {
return false, err
}
sizeToRemove := estimatedLinkSize(nameToRemove, entryToRemove.Cid)

if d.sizeChange-sizeToRemove >= 0 {
// We won't have reduced the HAMT net size.
return false, nil
}

// We have reduced the directory size, check if went below the
// HAMTShardingSize threshold to trigger a switch.
belowThreshold, err := d.sizeBelowThreshold(ctx, -sizeToRemove)
if err != nil {
return false, err
}
return belowThreshold, nil
}

// Evaluate directory size and a future sizeChange and check if it will be below
// HAMTShardingSize threshold (to trigger a transition to a BasicDirectory).
// Instead of enumerating the entire tree we eagerly call EnumLinksAsync
// until we either reach a value above the threshold (in that case no need)
// to keep counting or the timeout runs out in which case the `below` return
// value is not to be trusted as we didn't have time to count enough shards.
func (d *HAMTDirectory) sizeBelowThreshold(timeout time.Duration) (below bool, timeoutExceeded bool) {
// until we either reach a value above the threshold (in that case no need
// to keep counting) or an error occurs (like the context being canceled
// if we take too much time fetching the necessary shards).
func (d *HAMTDirectory) sizeBelowThreshold(ctx context.Context, sizeChange int) (below bool, err error) {
if HAMTShardingSize == 0 {
panic("asked to compute HAMT size with HAMTShardingSize option off (0)")
}
Expand All @@ -447,57 +466,25 @@ func (d *HAMTDirectory) sizeBelowThreshold(timeout time.Duration) (below bool, t
// end early if we already know we're above the threshold or run out of time.
partialSize := 0

ctx, cancel := context.WithTimeout(context.Background(), time.Second*timeout)
// We stop the enumeration once we have enough information and exit this function.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for linkResult := range d.EnumLinksAsync(ctx) {
if linkResult.Err != nil {
continue
// The timeout exceeded errors will be coming through here but I'm
// not sure if we can just compare against a generic DeadlineExceeded
// error here to return early and avoid iterating the entire loop.
// (We might confuse a specific DeadlineExceeded of an internal function
// with our context here.)
// Since *our* DeadlineExceeded will quickly propagate to any other
// pending fetches it seems that iterating the entire loop won't add
// much more cost anyway.
// FIXME: Check the above reasoning.
return false, linkResult.Err
}
if linkResult.Link == nil {
panic("empty link result (both values nil)")
// FIXME: Is this *ever* possible?
}
partialSize += estimatedLinkSize(linkResult.Link.Name, linkResult.Link.Cid)

if partialSize >= HAMTShardingSize {
partialSize += estimatedLinkSize(linkResult.Link.Name, linkResult.Link.Cid)
if partialSize+sizeChange >= HAMTShardingSize {
// We have already fetched enough shards to assert we are
// above the threshold, so no need to keep fetching.
return false, false
return false, nil
}
}
// At this point either we enumerated all shards or run out of time.
// Figure out which.

if ctx.Err() == context.Canceled {
panic("the context was canceled but we're still evaluating a possible switch")
}
if partialSize >= HAMTShardingSize {
panic("we reach the threshold but we're still evaluating a possible switch")
}

if ctx.Err() == context.DeadlineExceeded {
return false, true
}

// If we reach this then:
// * We are below the threshold (we didn't return inside the EnumLinksAsync
// loop).
// * The context wasn't cancelled so we iterated *all* shards
// and are sure that we have the full size.
// FIXME: Can we actually verify the last claim here to be sure?
// (Iterating all the shards in the HAMT as a plumbing function maybe.
// If they're in memory it shouldn't be that expensive, we won't be
// switching that often, probably.)
return true, false
// We enumerated *all* links in all shards and didn't reach the threshold.
return true, nil
}

// UpgradeableDirectory wraps a Directory interface and provides extra logic
Expand Down Expand Up @@ -565,42 +552,28 @@ func (d *UpgradeableDirectory) getDagService() ipld.DAGService {
// sure we make good on the value). Finding the right margin can be tricky
// and very dependent on the use case so it might not be worth it.
func (d *UpgradeableDirectory) RemoveChild(ctx context.Context, name string) error {
if err := d.Directory.RemoveChild(ctx, name); err != nil {
return err
}

hamtDir, ok := d.Directory.(*HAMTDirectory)
if !ok { // BasicDirectory
return nil
if !ok {
return d.Directory.RemoveChild(ctx, name)
}

if HAMTShardingSize == 0 || // Option disabled.
hamtDir.sizeChange >= 0 { // We haven't reduced the HAMT net size.
return nil
switchToBasic, err := hamtDir.needsToSwitchToBasicDir(ctx, name)
if err != nil {
return err
}

// We have reduced the directory size, check if it didn't go under
// the HAMTShardingSize threshold.
belowThreshold, timeoutExceeded := hamtDir.sizeBelowThreshold(EvaluateHAMTTransitionTimeout)

if timeoutExceeded {
// We run out of time before confirming if we're indeed below the
// threshold. When in doubt error to not return inconsistent structures.
// FIXME: We could allow this to return without error and enforce this
// timeout on a GetNode() call when we need to actually commit to a
// structure/CID. (The downside is that GetNode() doesn't have a
// context argument and we would have to break the API.)
return fmt.Errorf("not enought time to fetch shards")
// FIXME: Abstract in new error for testing.
if !switchToBasic {
return hamtDir.RemoveChild(ctx, name)
}

if belowThreshold { // Switch.
basicDir, err := hamtDir.switchToBasic(ctx)
if err != nil {
return err
}
d.Directory = basicDir
basicDir, err := hamtDir.switchToBasic(ctx)
if err != nil {
return err
}

basicDir.RemoveChild(ctx, name)
if err != nil {
return err
}
d.Directory = basicDir
return nil
}

0 comments on commit 6d48cca

Please sign in to comment.