Skip to content

Commit

Permalink
feat(deps): update go-legs
Browse files Browse the repository at this point in the history
Update go-legs to new multisubscriber model
  • Loading branch information
hannahhoward authored and gammazero committed Oct 13, 2021
1 parent de7f4ba commit 64bef9f
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 25 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ require (
github.com/filecoin-project/go-address v0.0.5
github.com/filecoin-project/go-dagaggregator-unixfs v0.2.0
github.com/filecoin-project/go-indexer-core v0.2.2
github.com/filecoin-project/go-legs v0.0.0-20210922204025-c6f68b62ab16
github.com/filecoin-project/go-legs v0.0.0-20211013165050-9ab325b6d2eb
github.com/gammazero/keymutex v0.0.2
github.com/gogo/protobuf v1.3.2
github.com/gorilla/mux v1.7.4
github.com/ipfs/go-cid v0.1.0
github.com/ipfs/go-datastore v0.4.6
github.com/ipfs/go-ds-leveldb v0.4.2
github.com/ipfs/go-graphsync v0.8.1-rc1
github.com/ipfs/go-graphsync v0.9.3
github.com/ipfs/go-log/v2 v2.3.0
github.com/ipld/go-ipld-prime v0.12.0
github.com/libp2p/go-libp2p v0.15.0
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,14 @@ github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03 h1:2pMX
github.com/filecoin-project/go-crypto v0.0.0-20191218222705-effae4ea9f03/go.mod h1:+viYnvGtUTgJRdy6oaeF4MTFKAfatX071MPDPBL11EQ=
github.com/filecoin-project/go-dagaggregator-unixfs v0.2.0 h1:h6nObyd44KVZNCUdfhPATclKw5Ci3QkOTTpTBCGTfXA=
github.com/filecoin-project/go-dagaggregator-unixfs v0.2.0/go.mod h1:WTuJWgBQY0omnQqa8kRPT9O0Uj5wQOgslVMUuTeHdJ8=
github.com/filecoin-project/go-data-transfer v1.6.1-0.20210608092034-e4f40bc3a685 h1:fdgmMKVVhATBVsIDVG/Y3Vg6nDe3Zvtwtu9bOgBkplE=
github.com/filecoin-project/go-data-transfer v1.6.1-0.20210608092034-e4f40bc3a685/go.mod h1:We59IRN/nAgBSzgm3enBz4UeWNlsdqvTaJdvSbuOfL8=
github.com/filecoin-project/go-data-transfer v1.10.1 h1:YQNLwhizxkdfFxegAyrnn3l7WjgMjqDlqFzr18iWiYI=
github.com/filecoin-project/go-data-transfer v1.10.1/go.mod h1:CSDMCrPK2lVGodNB1wPEogjFvM9nVGyiL1GNbBRTSdw=
github.com/filecoin-project/go-ds-versioning v0.1.0 h1:y/X6UksYTsK8TLCI7rttCKEvl8btmWxyFMEeeWGUxIQ=
github.com/filecoin-project/go-ds-versioning v0.1.0/go.mod h1:mp16rb4i2QPmxBnmanUx8i/XANp+PFCCJWiAb+VW4/s=
github.com/filecoin-project/go-indexer-core v0.2.2 h1:E2MvJFeH93gH2lcp6LrRoSy763lVW4FM8yAuohdvhfs=
github.com/filecoin-project/go-indexer-core v0.2.2/go.mod h1:wV+NmrF8fHG6Xii3ecoZf2JW3laGTe5xtsWz609jo+Y=
github.com/filecoin-project/go-legs v0.0.0-20210922204025-c6f68b62ab16 h1:xFiCa75M7kjr9/AfaVvFNjJKsMgtIP25g+DXKx4Ub7A=
github.com/filecoin-project/go-legs v0.0.0-20210922204025-c6f68b62ab16/go.mod h1:qLH/nW+bQoOmnLG4eGU36vubv0MmAezQ2Y2FQBa1hM8=
github.com/filecoin-project/go-legs v0.0.0-20211013165050-9ab325b6d2eb h1:MtGtPWYwdCtfRDJnsnaZmI2guxZ5Tw6c/Dkq8j5kqaI=
github.com/filecoin-project/go-legs v0.0.0-20211013165050-9ab325b6d2eb/go.mod h1:lKwBnslfNGG7JnsP9uQZl3yK7f74fit1MyHcwuuOP3k=
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe h1:dF8u+LEWeIcTcfUcCf3WFVlc81Fr2JKg8zPzIbBDKDw=
github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig=
github.com/filecoin-project/go-statestore v0.1.0 h1:t56reH59843TwXHkMcwyuayStBIiWBRilQjQ+5IiwdQ=
Expand Down Expand Up @@ -421,8 +421,9 @@ github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIyk
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-ds-leveldb v0.4.2 h1:QmQoAJ9WkPMUfBLnu1sBVy0xWWlJPg0m4kRAiJL9iaw=
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
github.com/ipfs/go-graphsync v0.8.1-rc1 h1:dmLAym2INCMm86HnDH8LXaWRpSw8Ltq9px1AzOPbGsE=
github.com/ipfs/go-graphsync v0.8.1-rc1/go.mod h1:/8Plco5WTSAydRzNrMTgBMgWY/NFC5x9kBXPLFU5d3M=
github.com/ipfs/go-graphsync v0.9.1/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw=
github.com/ipfs/go-graphsync v0.9.3 h1:oWqUuN3OYqLwu669fxYbgymBrIodB0fD7vFZfF//X7Y=
github.com/ipfs/go-graphsync v0.9.3/go.mod h1:J62ahWT9JbPsFL2UWsUM5rOu0lZJ0LOIH1chHdxGGcw=
github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08=
github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw=
github.com/ipfs/go-ipfs-blockstore v0.1.4/go.mod h1:Jxm3XMVjh6R17WvxFEiyKBLUGr86HgIYJW/D/MwqeYQ=
Expand Down Expand Up @@ -503,11 +504,10 @@ github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
github.com/ipld/go-car v0.1.0 h1:AaIEA5ITRnFA68uMyuIPYGM2XXllxsu8sNjFJP797us=
github.com/ipld/go-car v0.1.0/go.mod h1:RCWzaUh2i4mOEkB3W45Vc+9jnS/M6Qay5ooytiBHl3g=
github.com/ipld/go-codec-dagpb v1.2.0 h1:2umV7ud8HBMkRuJgd8gXw95cLhwmcYrihS3cQEy9zpI=
github.com/ipld/go-codec-dagpb v1.2.0/go.mod h1:6nBN7X7h8EOsEejZGqC7tej5drsdBAXbMHyBT+Fne5s=
github.com/ipld/go-codec-dagpb v1.3.0 h1:czTcaoAuNNyIYWs6Qe01DJ+sEX7B+1Z0LcXjSatMGe8=
github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA=
github.com/ipld/go-ipld-prime v0.0.2-0.20191108012745-28a82f04c785/go.mod h1:bDDSvVz7vaK12FNvMeRYnpRFkSUPNQOiCYQezMD/P3w=
github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8=
github.com/ipld/go-ipld-prime v0.12.0 h1:JapyKWTsJgmhrPI7hfx4V798c/RClr85sXfBZnH1VIw=
github.com/ipld/go-ipld-prime v0.12.0/go.mod h1:hy8b93WleDMRKumOJnTIrr0MbbFbx9GD6Kzxa53Xppc=
github.com/ipld/go-ipld-prime-proto v0.0.0-20191113031812-e32bd156a1e5/go.mod h1:gcvzoEDBjwycpXt3LBE061wT9f46szXGHAmj9uoP6fU=
Expand Down
14 changes: 7 additions & 7 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type LegIngester interface {
type legIngester struct {
host host.Host
ds datastore.Batching
lt *legs.LegTransport
lms legs.LegMultiSubscriber
indexer *indexer.Engine

newClient func(context.Context, host.Host, peer.ID) (pclient.Provider, error)
Expand All @@ -68,7 +68,7 @@ func NewLegIngester(ctx context.Context, cfg config.Ingest, h host.Host,
idxr *indexer.Engine, reg *registry.Registry, ds datastore.Batching) (LegIngester, error) {

lsys := mkLinkSystem(ds, reg)
lt, err := legs.MakeLegTransport(context.Background(), h, ds, lsys, cfg.PubSubTopic)
lms, err := legs.NewMultiSubscriber(ctx, h, ds, lsys, cfg.PubSubTopic)
if err != nil {
log.Errorf("Failed to state LegTransport in ingester: %s", err)
return nil, err
Expand All @@ -85,14 +85,14 @@ func NewLegIngester(ctx context.Context, cfg config.Ingest, h host.Host,
ds: ds,
indexer: idxr,
newClient: newClient,
lt: lt,
lms: lms,
subs: make(map[peer.ID]*subscriber),
sublk: keymutex.New(0),
batchSize: cfg.StoreBatchSize,
}

// Register storage hook to index data as we receive it.
lt.Gs.RegisterIncomingBlockHook(li.storageHook())
lms.GraphSync().RegisterIncomingBlockHook(li.storageHook())
log.Debugf("LegIngester started and all hooks and linksystem registered")
return li, nil
}
Expand Down Expand Up @@ -271,10 +271,10 @@ func (i *legIngester) newPeerSubscriber(ctx context.Context, peerID peer.ID) (*s
// If not synced start a brand new subscriber
var ls legs.LegSubscriber
if c == cid.Undef {
ls, err = legs.NewSubscriber(ctx, i.lt, legs.FilterPeerPolicy(peerID))
ls, err = i.lms.NewSubscriber(legs.FilterPeerPolicy(peerID))
} else {
// If yes, start a partially synced subscriber.
ls, err = legs.NewSubscriberPartiallySynced(ctx, i.lt, legs.FilterPeerPolicy(peerID), c)
ls, err = i.lms.NewSubscriberPartiallySynced(legs.FilterPeerPolicy(peerID), c)
}
if err != nil {
return nil, err
Expand All @@ -296,7 +296,7 @@ func (i *legIngester) Close(ctx context.Context) error {
}
}
// Close leg transport.
return i.lt.Close(ctx)
return i.lms.Close(ctx)
}

// Get the latest cid synced for the peer.
Expand Down
8 changes: 2 additions & 6 deletions internal/ingest/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/filecoin-project/go-indexer-core/engine"
"github.com/filecoin-project/go-indexer-core/store/storethehash"
"github.com/filecoin-project/go-legs"
"github.com/filecoin-project/storetheindex/api/v0"
v0 "github.com/filecoin-project/storetheindex/api/v0"
schema "github.com/filecoin-project/storetheindex/api/v0/ingest/schema"
"github.com/filecoin-project/storetheindex/config"
"github.com/filecoin-project/storetheindex/internal/registry"
Expand Down Expand Up @@ -247,11 +247,7 @@ func mkProvLinkSystem(ds datastore.Batching) ipld.LinkSystem {
func mkMockPublisher(t *testing.T, h host.Host, store datastore.Batching) (legs.LegPublisher, ipld.LinkSystem) {
ctx := context.Background()
lsys := mkProvLinkSystem(store)
lt, err := legs.MakeLegTransport(context.Background(), h, store, lsys, ingestCfg.PubSubTopic)
if err != nil {
t.Fatal(err)
}
ls, err := legs.NewPublisher(ctx, lt)
ls, err := legs.NewPublisher(ctx, h, store, lsys, ingestCfg.PubSubTopic)
require.NoError(t, err)
return ls, lsys
}
Expand Down

0 comments on commit 64bef9f

Please sign in to comment.