Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package crdt

import (
"context"
"errors"
"fmt"
"io"
"math/rand"
Expand All @@ -36,7 +37,6 @@ import (
query "github.com/ipfs/go-datastore/query"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/pkg/errors"
)

var _ ds.Datastore = (*Datastore)(nil)
Expand Down Expand Up @@ -262,12 +262,12 @@ func New(
set, err := newCRDTSet(ctx, store, fullSetNs, dagSyncer, opts.Logger, setPutHook, setDeleteHook)
if err != nil {
cancel()
return nil, errors.Wrap(err, "error setting up crdt set")
return nil, fmt.Errorf("error setting up crdt set: %w", err)
}
heads, err := newHeads(ctx, store, fullHeadsNs, opts.Logger)
if err != nil {
cancel()
return nil, errors.Wrap(err, "error building heads")
return nil, fmt.Errorf("error building heads: %w", err)
}

dstore := &Datastore{
Expand Down Expand Up @@ -587,7 +587,7 @@ func (store *Datastore) handleBlock(ctx context.Context, c cid.Cid) error {
// head.
isProcessed, err := store.isProcessed(ctx, c)
if err != nil {
return errors.Wrapf(err, "error checking for known block %s", c)
return fmt.Errorf("error checking for known block %s: %w", c, err)
}
if isProcessed {
store.logger.Debugf("%s is known. Skip walking tree", c)
Expand Down Expand Up @@ -669,7 +669,7 @@ func (store *Datastore) sendNewJobs(ctx context.Context, session *sync.WaitGroup
if rootPrio == 0 {
prio, err := ng.GetPriority(cctx, children[0])
if err != nil {
return errors.Wrapf(err, "error getting root delta priority")
return fmt.Errorf("error getting root delta priority: %w", err)
}
rootPrio = prio
}
Expand All @@ -681,7 +681,7 @@ loop:
for deltaOpt := range ng.GetDeltas(cctx, children) {
// we abort whenever we a delta comes back in error.
if deltaOpt.err != nil {
err = errors.Wrapf(deltaOpt.err, "error getting delta")
err = fmt.Errorf("error getting delta: %w", deltaOpt.err)
break
}
goodDeltas[deltaOpt.node.Cid()] = struct{}{}
Expand Down Expand Up @@ -792,14 +792,14 @@ func (store *Datastore) processNode(ctx context.Context, ng *crdtNodeGetter, roo
blockKey := dshelp.MultihashToDsKey(current.Hash()).String()
err := store.set.Merge(ctx, delta, blockKey)
if err != nil {
return nil, errors.Wrapf(err, "error merging delta from %s", current)
return nil, fmt.Errorf("error merging delta from %s: %w", current, err)
}

// Record that we have processed the node so that any other worker
// can skip it.
err = store.markProcessed(ctx, current)
if err != nil {
return nil, errors.Wrapf(err, "error recording %s as processed", current)
return nil, fmt.Errorf("error recording %s as processed: %w", current, err)
}

// Remove from the set that has the children which are queued for
Expand All @@ -820,7 +820,7 @@ func (store *Datastore) processNode(ctx context.Context, ng *crdtNodeGetter, roo
if len(links) == 0 {
err := store.heads.Add(ctx, root, rootPrio)
if err != nil {
return nil, errors.Wrapf(err, "error adding head %s", root)
return nil, fmt.Errorf("error adding head %s: %w", root, err)
}
}

Expand All @@ -836,20 +836,20 @@ func (store *Datastore) processNode(ctx context.Context, ng *crdtNodeGetter, roo

isHead, _, err := store.heads.IsHead(ctx, child)
if err != nil {
return nil, errors.Wrapf(err, "error checking if %s is head", child)
return nil, fmt.Errorf("error checking if %s is head: %w", child, err)
}

isProcessed, err := store.isProcessed(ctx, child)
if err != nil {
return nil, errors.Wrapf(err, "error checking for known block %s", child)
return nil, fmt.Errorf("error checking for known block %s: %w", child, err)
}

if isHead {
// reached one of the current heads. Replace it with
// the tip of this branch
err := store.heads.Replace(ctx, child, root, rootPrio)
if err != nil {
return nil, errors.Wrapf(err, "error replacing head: %s->%s", child, root)
return nil, fmt.Errorf("error replacing head: %s->%s: %w", child, root, err)
}
addedAsHead = true

Expand All @@ -876,7 +876,7 @@ func (store *Datastore) processNode(ctx context.Context, ng *crdtNodeGetter, roo
if err != nil {
// Don't let this failure prevent us
// from processing the other links.
store.logger.Error(errors.Wrapf(err, "error adding head %s", root))
store.logger.Error(fmt.Errorf("error adding head %s: %w", root, err))
}
}
addedAsHead = true
Expand Down Expand Up @@ -904,7 +904,7 @@ func (store *Datastore) repairDAG(ctx context.Context) error {

heads, _, err := store.heads.List(ctx)
if err != nil {
return errors.Wrapf(err, "error listing heads")
return fmt.Errorf("error listing heads: %w", err)
}

type nodeHead struct {
Expand Down Expand Up @@ -967,21 +967,21 @@ func (store *Datastore) repairDAG(ctx context.Context) error {
n, delta, err := getter.GetDelta(cctx, cur)
if err != nil {
cancel()
return errors.Wrapf(err, "error getting node for reprocessing %s", cur)
return fmt.Errorf("error getting node for reprocessing %s: %w", cur, err)
}
cancel()

isProcessed, err := store.isProcessed(ctx, cur)
if err != nil {
return errors.Wrapf(err, "error checking for reprocessed block %s", cur)
return fmt.Errorf("error checking for reprocessed block %s: %w", cur, err)
}
if !isProcessed {
store.logger.Debugf("reprocessing %s / %d", cur, delta.Priority)
// start syncing from here.
// do not add children to our queue.
err = store.handleBranch(ctx, head, cur)
if err != nil {
return errors.Wrapf(err, "error reprocessing block %s", cur)
return fmt.Errorf("error reprocessing block %s: %w", cur, err)
}
}
links := n.Links()
Expand Down Expand Up @@ -1212,14 +1212,14 @@ func (store *Datastore) putBlock(ctx context.Context, heads []cid.Cid, height ui
}
node, err := makeNode(delta, heads)
if err != nil {
return nil, errors.Wrap(err, "error creating new block")
return nil, fmt.Errorf("error creating new block: %w", err)
}

cctx, cancel := context.WithTimeout(ctx, store.opts.DAGSyncerTimeout)
defer cancel()
err = store.dagService.Add(cctx, node)
if err != nil {
return nil, errors.Wrapf(err, "error writing new block %s", node.Cid())
return nil, fmt.Errorf("error writing new block %s: %w", node.Cid(), err)
}

return node, nil
Expand All @@ -1240,7 +1240,7 @@ func (store *Datastore) publish(ctx context.Context, delta *pb.Delta) error {
func (store *Datastore) addDAGNode(ctx context.Context, delta *pb.Delta) (cid.Cid, error) {
heads, height, err := store.heads.List(ctx)
if err != nil {
return cid.Undef, errors.Wrap(err, "error listing heads")
return cid.Undef, fmt.Errorf("error listing heads: %w", err)
}
height = height + 1 // This implies our minimum height is 1

Expand Down Expand Up @@ -1270,7 +1270,7 @@ func (store *Datastore) addDAGNode(ctx context.Context, delta *pb.Delta) (cid.Ci
)
if err != nil {
store.MarkDirty(ctx) // not sure if this will fix much if this happens.
return cid.Undef, errors.Wrap(err, "error processing new block")
return cid.Undef, fmt.Errorf("error processing new block: %w", err)
}
if len(children) != 0 {
store.logger.Warnf("bug: created a block to unknown children")
Expand Down Expand Up @@ -1303,7 +1303,7 @@ func (store *Datastore) broadcast(ctx context.Context, cids []cid.Cid) error {

err = store.broadcaster.Broadcast(ctx, bcastBytes)
if err != nil {
return errors.Wrapf(err, "error broadcasting %s", cids)
return fmt.Errorf("error broadcasting %s: %w", cids, err)
}
return nil
}
Expand Down
46 changes: 22 additions & 24 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
module github.com/ipfs/go-ds-crdt

go 1.23

toolchain go1.23.2
go 1.23.0

require (
github.com/dgraph-io/badger v1.6.2
github.com/ipfs/bbloom v0.0.4
github.com/ipfs/boxo v0.24.3
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-badger v0.3.0
github.com/ipfs/boxo v0.28.0
github.com/ipfs/go-cid v0.5.0
github.com/ipfs/go-datastore v0.8.0
github.com/ipfs/go-ds-badger v0.3.2
github.com/ipfs/go-ipld-format v0.6.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-libp2p-pubsub v0.12.0
github.com/libp2p/go-libp2p-pubsub v0.13.0
github.com/multiformats/go-multihash v0.2.3
github.com/pkg/errors v0.9.1
go.uber.org/multierr v1.11.0
google.golang.org/protobuf v1.35.2
google.golang.org/protobuf v1.36.5
)

require (
Expand All @@ -27,43 +22,46 @@ require (
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 // indirect
github.com/dgraph-io/ristretto v0.0.2 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/gammazero/deque v1.0.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-block-format v0.2.0 // indirect
github.com/ipfs/go-detect-race v0.0.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.3 // indirect
github.com/ipfs/go-ipld-legacy v0.2.1 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
github.com/ipld/go-ipld-prime v0.21.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.37.0 // indirect
github.com/libp2p/go-libp2p v0.40.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/minio/sha256-simd v1.0.1 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.1.0 // indirect
github.com/multiformats/go-base36 v0.2.0 // indirect
github.com/multiformats/go-multiaddr v0.13.0 // indirect
github.com/multiformats/go-multiaddr v0.14.0 // indirect
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
github.com/multiformats/go-multibase v0.2.0 // indirect
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-multistream v0.6.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.opentelemetry.io/otel v1.31.0 // indirect
go.opentelemetry.io/otel/metric v1.31.0 // indirect
go.opentelemetry.io/otel/trace v1.31.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel v1.34.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
go.opentelemetry.io/otel/trace v1.34.0 // indirect
go.uber.org/zap v1.27.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/crypto v0.33.0 // indirect
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac // indirect
golang.org/x/net v0.35.0 // indirect
golang.org/x/sys v0.30.0 // indirect
lukechampine.com/blake3 v1.3.0 // indirect
)
Loading
Loading