diff --git a/api/api.go b/api/api.go index 56e71b603..e75054a08 100644 --- a/api/api.go +++ b/api/api.go @@ -35,6 +35,8 @@ type Boost interface { // MethodGroup: Boost BoostIndexerAnnounceAllDeals(ctx context.Context) error //perm:admin BoostIndexerListMultihashes(ctx context.Context, proposalCid cid.Cid) ([]multihash.Multihash, error) //perm:admin + BoostIndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) //perm:admin + BoostIndexerAnnounceLatestHttp(ctx context.Context, urls []string) (cid.Cid, error) //perm:admin BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string, delAfterImport bool) (*ProviderDealRejectionInfo, error) //perm:admin BoostDeal(ctx context.Context, dealUuid uuid.UUID) (*smtypes.ProviderDealState, error) //perm:admin BoostDealBySignedProposalCid(ctx context.Context, proposalCid cid.Cid) (*smtypes.ProviderDealState, error) //perm:admin diff --git a/api/proxy_gen.go b/api/proxy_gen.go index c50f44d42..bbdefb2ae 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -68,6 +68,10 @@ type BoostStruct struct { BoostIndexerAnnounceAllDeals func(p0 context.Context) error `perm:"admin"` + BoostIndexerAnnounceLatest func(p0 context.Context) (cid.Cid, error) `perm:"admin"` + + BoostIndexerAnnounceLatestHttp func(p0 context.Context, p1 []string) (cid.Cid, error) `perm:"admin"` + BoostIndexerListMultihashes func(p0 context.Context, p1 cid.Cid) ([]multihash.Multihash, error) `perm:"admin"` BoostMakeDeal func(p0 context.Context, p1 smtypes.DealParams) (*ProviderDealRejectionInfo, error) `perm:"write"` @@ -437,6 +441,28 @@ func (s *BoostStub) BoostIndexerAnnounceAllDeals(p0 context.Context) error { return ErrNotSupported } +func (s *BoostStruct) BoostIndexerAnnounceLatest(p0 context.Context) (cid.Cid, error) { + if s.Internal.BoostIndexerAnnounceLatest == nil { + return *new(cid.Cid), ErrNotSupported + } + return s.Internal.BoostIndexerAnnounceLatest(p0) +} + +func (s *BoostStub) BoostIndexerAnnounceLatest(p0 context.Context) (cid.Cid, error) { + return *new(cid.Cid), ErrNotSupported +} + +func (s *BoostStruct) BoostIndexerAnnounceLatestHttp(p0 context.Context, p1 []string) (cid.Cid, error) { + if s.Internal.BoostIndexerAnnounceLatestHttp == nil { + return *new(cid.Cid), ErrNotSupported + } + return s.Internal.BoostIndexerAnnounceLatestHttp(p0, p1) +} + +func (s *BoostStub) BoostIndexerAnnounceLatestHttp(p0 context.Context, p1 []string) (cid.Cid, error) { + return *new(cid.Cid), ErrNotSupported +} + func (s *BoostStruct) BoostIndexerListMultihashes(p0 context.Context, p1 cid.Cid) ([]multihash.Multihash, error) { if s.Internal.BoostIndexerListMultihashes == nil { return *new([]multihash.Multihash), ErrNotSupported diff --git a/build/openrpc/boost.json.gz b/build/openrpc/boost.json.gz index 966bf3abb..bf0cce0a7 100644 Binary files a/build/openrpc/boost.json.gz and b/build/openrpc/boost.json.gz differ diff --git a/build/version.go b/build/version.go index f4436455f..dbc1ffc08 100644 --- a/build/version.go +++ b/build/version.go @@ -2,7 +2,7 @@ package build var CurrentCommit string -const BuildVersion = "1.7.0-rc1" +const BuildVersion = "1.7.3-rc3" func UserVersion() string { return BuildVersion + CurrentCommit diff --git a/cmd/boostd/import_data.go b/cmd/boostd/import_data.go index 66a7d8c89..bed8bcb33 100644 --- a/cmd/boostd/import_data.go +++ b/cmd/boostd/import_data.go @@ -67,9 +67,6 @@ var importDataCmd = &cli.Command{ // If the user has supplied a signed proposal cid deleteAfterImport := cctx.Bool("delete-after-import") if proposalCid != nil { - if deleteAfterImport { - return fmt.Errorf("legacy deal data cannot be automatically deleted after import (only new deals)") - } // Look up the deal in the boost database deal, err := napi.BoostDealBySignedProposalCid(cctx.Context, *proposalCid) diff --git a/cmd/boostd/index.go b/cmd/boostd/index.go index 3008ea184..903f3e0b8 100644 --- a/cmd/boostd/index.go +++ b/cmd/boostd/index.go @@ -14,6 +14,8 @@ var indexProvCmd = &cli.Command{ Subcommands: []*cli.Command{ indexProvAnnounceAllCmd, indexProvListMultihashesCmd, + indexProvAnnounceLatest, + indexProvAnnounceLatestHttp, }, } @@ -64,11 +66,63 @@ var indexProvListMultihashesCmd = &cli.Command{ if err != nil { return err } - + fmt.Printf("Found %d multihashes for deal with proposal cid %s:\n", len(mhs), propCid) for _, mh := range mhs { fmt.Println(" " + mh.String()) } + + return nil + }, +} + +var indexProvAnnounceLatest = &cli.Command{ + Name: "announce-latest", + Usage: "Re-publish the latest existing advertisement to pubsub", + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + + napi, closer, err := bcli.GetBoostAPI(cctx) + if err != nil { + return err + } + defer closer() + + c, err := napi.BoostIndexerAnnounceLatest(ctx) + if err != nil { + return err + } + + fmt.Printf("Announced advertisement with cid %s\n", c) + return nil + }, +} + +var indexProvAnnounceLatestHttp = &cli.Command{ + Name: "announce-latest-http", + Usage: "Re-publish the latest existing advertisement to specific indexers over http", + Flags: []cli.Flag{ + &cli.StringSliceFlag{ + Name: "announce-url", + Usage: "The url(s) to announce to. If not specified, announces to the http urls in config", + Required: false, + }, + }, + Action: func(cctx *cli.Context) error { + ctx := lcli.ReqContext(cctx) + + napi, closer, err := bcli.GetBoostAPI(cctx) + if err != nil { + return err + } + defer closer() + + c, err := napi.BoostIndexerAnnounceLatestHttp(ctx, cctx.StringSlice("announce-url")) + if err != nil { + return err + } + + fmt.Printf("Announced advertisement to indexers over http with cid %s\n", c) return nil }, } diff --git a/cmd/booster-http/http_test.go b/cmd/booster-http/http_test.go index 20bf4ecda..3fe058ef9 100644 --- a/cmd/booster-http/http_test.go +++ b/cmd/booster-http/http_test.go @@ -22,7 +22,7 @@ const testFile = "test/test_file" func TestNewHttpServer(t *testing.T) { // Create a new mock Http server ctrl := gomock.NewController(t) - httpServer := NewHttpServer("", 7777, mocks_booster_http.NewMockHttpServerApi(ctrl), nil) + httpServer := NewHttpServer("", "0.0.0.0", 7777, mocks_booster_http.NewMockHttpServerApi(ctrl), nil) err := httpServer.Start(context.Background()) require.NoError(t, err) waitServerUp(t, 7777) @@ -41,7 +41,7 @@ func TestHttpGzipResponse(t *testing.T) { // Create a new mock Http server with custom functions ctrl := gomock.NewController(t) mockHttpServer := mocks_booster_http.NewMockHttpServerApi(ctrl) - httpServer := NewHttpServer("", 7777, mockHttpServer, nil) + httpServer := NewHttpServer("", "0.0.0.0", 7777, mockHttpServer, nil) err := httpServer.Start(context.Background()) require.NoError(t, err) waitServerUp(t, 7777) @@ -99,7 +99,7 @@ func TestHttpInfo(t *testing.T) { // Create a new mock Http server ctrl := gomock.NewController(t) - httpServer := NewHttpServer("", 7777, mocks_booster_http.NewMockHttpServerApi(ctrl), nil) + httpServer := NewHttpServer("", "0.0.0.0", 7777, mocks_booster_http.NewMockHttpServerApi(ctrl), nil) err := httpServer.Start(context.Background()) require.NoError(t, err) waitServerUp(t, 7777) diff --git a/cmd/booster-http/mocks/mock_booster_http.go b/cmd/booster-http/mocks/mock_booster_http.go index 77cb30a93..2392f3ce9 100644 --- a/cmd/booster-http/mocks/mock_booster_http.go +++ b/cmd/booster-http/mocks/mock_booster_http.go @@ -1,8 +1,8 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: cmd/booster-http/server.go +// Source: server.go -// Package mock_main is a generated GoMock package. -package mock_main +// Package mocks_booster_http is a generated GoMock package. +package mocks_booster_http import ( context "context" diff --git a/cmd/booster-http/run.go b/cmd/booster-http/run.go index ce92ac81a..5eed671d4 100644 --- a/cmd/booster-http/run.go +++ b/cmd/booster-http/run.go @@ -223,8 +223,8 @@ var runCmd = &cli.Command{ pd.Start(ctx) // Start the server - log.Infof("Starting booster-http node on port %d with base path '%s'", - cctx.Int("port"), cctx.String("base-path")) + log.Infof("Starting booster-http node on listen address %s and port %d with base path '%s'", + cctx.String("address"), cctx.Int("port"), cctx.String("base-path")) err = server.Start(ctx) if err != nil { return fmt.Errorf("starting http server: %w", err) diff --git a/cmd/booster-http/server.go b/cmd/booster-http/server.go index 03c38f6b7..94d33c04c 100644 --- a/cmd/booster-http/server.go +++ b/cmd/booster-http/server.go @@ -42,11 +42,12 @@ type apiVersion struct { } type HttpServer struct { - path string - port int - api HttpServerApi - opts HttpServerOptions - idxPage string + path string + listenAddr string + port int + api HttpServerApi + opts HttpServerOptions + idxPage string ctx context.Context cancel context.CancelFunc @@ -65,11 +66,11 @@ type HttpServerOptions struct { SupportedResponseFormats []string } -func NewHttpServer(path string, port int, api HttpServerApi, opts *HttpServerOptions) *HttpServer { +func NewHttpServer(path string, listenAddr string, port int, api HttpServerApi, opts *HttpServerOptions) *HttpServer { if opts == nil { opts = &HttpServerOptions{ServePieces: true} } - return &HttpServer{path: path, port: port, api: api, opts: *opts, idxPage: parseTemplate(*opts)} + return &HttpServer{path: path, listenAddr: listenAddr, port: port, api: api, opts: *opts, idxPage: parseTemplate(*opts)} } func (s *HttpServer) pieceBasePath() string { @@ -102,7 +103,7 @@ func (s *HttpServer) Start(ctx context.Context) error { handler.HandleFunc("/info", s.handleInfo) handler.Handle("/metrics", metrics.Exporter("booster_http")) // metrics s.server = &http.Server{ - Addr: fmt.Sprintf(":%d", s.port), + Addr: fmt.Sprintf("%s:%d", s.listenAddr, s.port), Handler: handler, // This context will be the parent of the context associated with all // incoming requests diff --git a/cmd/boostx/stats_cmd.go b/cmd/boostx/stats_cmd.go index 42bcc1a96..1b8775955 100644 --- a/cmd/boostx/stats_cmd.go +++ b/cmd/boostx/stats_cmd.go @@ -2,8 +2,6 @@ package main import ( "fmt" - "github.com/filecoin-project/boost/retrievalmarket/lp2pimpl" - transports_types "github.com/filecoin-project/boost/retrievalmarket/types" "regexp" "sort" "strings" diff --git a/documentation/en/api-v1-methods.md b/documentation/en/api-v1-methods.md index 9bb6d0910..89136ca40 100644 --- a/documentation/en/api-v1-methods.md +++ b/documentation/en/api-v1-methods.md @@ -23,6 +23,8 @@ * [BoostDealBySignedProposalCid](#boostdealbysignedproposalcid) * [BoostDummyDeal](#boostdummydeal) * [BoostIndexerAnnounceAllDeals](#boostindexerannouncealldeals) + * [BoostIndexerAnnounceLatest](#boostindexerannouncelatest) + * [BoostIndexerAnnounceLatestHttp](#boostindexerannouncelatesthttp) * [BoostIndexerListMultihashes](#boostindexerlistmultihashes) * [BoostMakeDeal](#boostmakedeal) * [BoostOfflineDealWithData](#boostofflinedealwithdata) @@ -560,6 +562,41 @@ Inputs: `null` Response: `{}` +### BoostIndexerAnnounceLatest + + +Perms: admin + +Inputs: `null` + +Response: +```json +{ + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" +} +``` + +### BoostIndexerAnnounceLatestHttp + + +Perms: admin + +Inputs: +```json +[ + [ + "string value" + ] +] +``` + +Response: +```json +{ + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" +} +``` + ### BoostIndexerListMultihashes diff --git a/go.mod b/go.mod index 913eb2d1a..e805864c0 100644 --- a/go.mod +++ b/go.mod @@ -80,7 +80,7 @@ require ( github.com/ipld/go-car/v2 v2.7.0 github.com/ipld/go-ipld-prime v0.20.0 github.com/ipld/go-ipld-selector-text-lite v0.0.1 - github.com/ipni/index-provider v0.11.1 + github.com/ipni/index-provider v0.11.2 github.com/ipni/storetheindex v0.5.10 github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/jpillora/backoff v1.0.0 diff --git a/go.sum b/go.sum index 4ea8826c0..e8d604ddd 100644 --- a/go.sum +++ b/go.sum @@ -989,8 +989,8 @@ github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo= github.com/ipld/go-ipld-selector-text-lite v0.0.1 h1:lNqFsQpBHc3p5xHob2KvEg/iM5dIFn6iw4L/Hh+kS1Y= github.com/ipld/go-ipld-selector-text-lite v0.0.1/go.mod h1:U2CQmFb+uWzfIEF3I1arrDa5rwtj00PrpiwwCO+k1RM= -github.com/ipni/index-provider v0.11.1 h1:viNfSBvZA9G+Qe6/FGqfZtavnu4tTSfGUoWEECavqoI= -github.com/ipni/index-provider v0.11.1/go.mod h1:gB/wN4Mdz4MzikQubjyRRV97iS5BkD4FKB0U/bF/dY4= +github.com/ipni/index-provider v0.11.2 h1:nvykWK+/ncPTqHiuiJdXp/O0UF0V7iWesjHGKX//NYc= +github.com/ipni/index-provider v0.11.2/go.mod h1:gB/wN4Mdz4MzikQubjyRRV97iS5BkD4FKB0U/bF/dY4= github.com/ipni/storetheindex v0.5.10 h1:r97jIZsXPuwQvePJQuStu2a/kn+Zn8X4MAdA0rU2Pu4= github.com/ipni/storetheindex v0.5.10/go.mod h1:SJKFCnSx4X/4ekQuZvq8pVU/7tmxkEv632Qmgu3m2bQ= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= diff --git a/gql/resolver.go b/gql/resolver.go index e06cb5122..7f5c9cdee 100644 --- a/gql/resolver.go +++ b/gql/resolver.go @@ -31,6 +31,7 @@ import ( "github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints" "github.com/filecoin-project/boost/transport" "github.com/filecoin-project/lotus/api/v1api" + "github.com/filecoin-project/lotus/build" lotus_repo "github.com/filecoin-project/lotus/node/repo" "github.com/google/uuid" "github.com/graph-gophers/graphql-go" diff --git a/gql/resolver_mpool.go b/gql/resolver_mpool.go index 9dcc6a789..acf980b78 100644 --- a/gql/resolver_mpool.go +++ b/gql/resolver_mpool.go @@ -5,15 +5,16 @@ import ( "context" "encoding/json" "fmt" + "reflect" + "github.com/filecoin-project/lotus/chain/consensus" + cbg "github.com/whyrusleeping/cbor-gen" gqltypes "github.com/filecoin-project/boost/gql/types" "github.com/filecoin-project/boost/lib/mpoolmonitor" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" - stbig "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/lotus/chain/types" - cbg "github.com/whyrusleeping/cbor-gen" ) type msg struct { @@ -56,38 +57,18 @@ func (r *resolver) Mpool(ctx context.Context, args struct{ Alerts bool }) (mpool } } - method := m.Message.Method.String() - toact, err := r.fullNode.StateGetActor(ctx, m.Message.To, types.EmptyTSK) - if err == nil { - consensus.NewActorRegistry().Methods[toact.Code][m.Message.Method].Params.Name() - } - var params string - paramsMsg, err := messageFromBytes(m.Message.Params) - if err != nil { - return mpoolmsg{}, err - } - } else { - msgs, err = r.mpool.Alerts(ctx) - if err != nil { - return mpoolmsg{}, err - } - } - - // Convert params to human-readable and get method name - for _, m := range msgs { - var params string - methodName := m.SignedMessage.Message.Method.String() - toact, err := r.fullNode.StateGetActor(ctx, m.SignedMessage.Message.To, types.EmptyTSK) + methodName := m.Message.Method.String() + toact, err := r.fullNode.StateGetActor(ctx, m.Message.To, types.EmptyTSK) if err == nil { - method, ok := consensus.NewActorRegistry().Methods[toact.Code][m.SignedMessage.Message.Method] + method, ok := consensus.NewActorRegistry().Methods[toact.Code][m.Message.Method] if ok { methodName = method.Name - params = string(m.SignedMessage.Message.Params) + params = string(m.Message.Params) p, ok := reflect.New(method.Params.Elem()).Interface().(cbg.CBORUnmarshaler) if ok { - if err := p.UnmarshalCBOR(bytes.NewReader(m.SignedMessage.Message.Params)); err == nil { + if err := p.UnmarshalCBOR(bytes.NewReader(m.Message.Params)); err == nil { b, err := json.MarshalIndent(p, "", " ") if err == nil { params = string(b) @@ -97,22 +78,21 @@ func (r *resolver) Mpool(ctx context.Context, args struct{ Alerts bool }) (mpool } } - ret = append(ret, &msg{ - SentEpoch: gqltypes.Uint64(m.Added), - To: m.SignedMessage.Message.To.String(), - From: m.SignedMessage.Message.From.String(), - Nonce: gqltypes.Uint64(m.SignedMessage.Message.Nonce), - Value: gqltypes.BigInt{Int: m.SignedMessage.Message.Value}, - GasFeeCap: gqltypes.BigInt{Int: m.SignedMessage.Message.GasFeeCap}, - GasLimit: gqltypes.Uint64(uint64(m.SignedMessage.Message.GasLimit)), - GasPremium: gqltypes.BigInt{Int: m.SignedMessage.Message.GasPremium}, + gqlmsgs = append(gqlmsgs, &msg{ + To: m.Message.To.String(), + From: m.Message.From.String(), + Nonce: gqltypes.Uint64(m.Message.Nonce), + Value: gqltypes.BigInt{Int: m.Message.Value}, + GasFeeCap: gqltypes.BigInt{Int: m.Message.GasFeeCap}, + GasLimit: gqltypes.Uint64(uint64(m.Message.GasLimit)), + GasPremium: gqltypes.BigInt{Int: m.Message.GasPremium}, Method: methodName, Params: params, BaseFee: gqltypes.BigInt{Int: baseFee}, }) } - return mpoolmsg{Count: int32(len(msgs)), Messages: ret}, nil + return gqlmsgs, nil } func mockMessages() []*types.SignedMessage { diff --git a/indexprovider/wrapper.go b/indexprovider/wrapper.go index ddca71d9a..636f867fa 100644 --- a/indexprovider/wrapper.go +++ b/indexprovider/wrapper.go @@ -5,9 +5,6 @@ import ( "database/sql" "errors" "fmt" - "os" - "path/filepath" - "github.com/filecoin-project/boost-gfm/storagemarket" gfm_storagemarket "github.com/filecoin-project/boost-gfm/storagemarket" "github.com/filecoin-project/boost/db" @@ -17,6 +14,7 @@ import ( "github.com/filecoin-project/boost/sectorstatemgr" "github.com/filecoin-project/boost/storagemarket/types" "github.com/filecoin-project/boost/storagemarket/types/dealcheckpoints" + bdtypes "github.com/filecoin-project/boostd-data/svc/types" "github.com/filecoin-project/go-address" cborutil "github.com/filecoin-project/go-cbor-util" "github.com/filecoin-project/go-state-types/abi" @@ -24,15 +22,21 @@ import ( "github.com/filecoin-project/lotus/node/repo" "github.com/hashicorp/go-multierror" "github.com/ipfs/go-cid" + "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log/v2" + "github.com/ipld/go-ipld-prime" provider "github.com/ipni/index-provider" + "github.com/ipni/index-provider/engine" "github.com/ipni/index-provider/engine/xproviders" "github.com/ipni/index-provider/metadata" "github.com/libp2p/go-libp2p/core/crypto" - host "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multihash" "go.uber.org/fx" + "net/url" + "os" + "path/filepath" ) var log = logging.Logger("index-provider-wrapper") @@ -480,50 +484,133 @@ func (w *Wrapper) IndexerAnnounceAllDeals(ctx context.Context) error { return merr } +// While ingesting cids for each piece, if there is an error the indexer +// checks if the error contains the string "content not found": +// - if so, the indexer skips the piece and continues ingestion +// - if not, the indexer pauses ingestion +var ErrStringSkipAdIngest = "content not found" + +func skipError(err error) error { + return fmt.Errorf("%s: %s: %w", ErrStringSkipAdIngest, err.Error(), ipld.ErrNotExists{}) +} + +func (w *Wrapper) IndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) { + e, ok := w.prov.(*engine.Engine) + if !ok { + return cid.Undef, fmt.Errorf("index provider is disabled") + } + return e.PublishLatest(ctx) +} + +func (w *Wrapper) IndexerAnnounceLatestHttp(ctx context.Context, announceUrls []string) (cid.Cid, error) { + e, ok := w.prov.(*engine.Engine) + if !ok { + return cid.Undef, fmt.Errorf("index provider is disabled") + } + + if len(announceUrls) == 0 { + announceUrls = w.cfg.IndexProvider.Announce.DirectAnnounceURLs + } + + urls := make([]*url.URL, 0, len(announceUrls)) + for _, us := range announceUrls { + u, err := url.Parse(us) + if err != nil { + return cid.Undef, fmt.Errorf("parsing url %s: %w", us, err) + } + urls = append(urls, u) + } + return e.PublishLatestHTTP(ctx, urls...) +} + func (w *Wrapper) MultihashLister(ctx context.Context, prov peer.ID, contextID []byte) (provider.MultihashIterator, error) { - provideF := func(pieceCid cid.Cid) (provider.MultihashIterator, error) { + provideF := func(proposalCid cid.Cid, pieceCid cid.Cid) (provider.MultihashIterator, error) { ii, err := w.piecedirectory.GetIterableIndex(ctx, pieceCid) if err != nil { - return nil, fmt.Errorf("failed to get iterable index: %w", err) + e := fmt.Errorf("failed to get iterable index: %w", err) + if bdtypes.IsNotFound(err) { + // If it's a not found error, skip over this piece and continue ingesting + log.Infow("skipping ingestion: piece not found", "piece", pieceCid, "propCid", proposalCid, "err", e) + return nil, skipError(e) + } + + // Some other error, pause ingestion + log.Infow("pausing ingestion: error getting piece", "piece", pieceCid, "propCid", proposalCid, "err", e) + return nil, e } - // Check if there are any records in the iterator. If there are no - // records, the multihash lister expects us to return an error. + // Check if there are any records in the iterator. hasRecords := ii.ForEach(func(_ multihash.Multihash, _ uint64) error { return fmt.Errorf("has at least one record") }) if hasRecords == nil { - return nil, fmt.Errorf("no records found for piece %s", pieceCid) + // If there are no records, it's effectively the same as a not + // found error. Skip over this piece and continue ingesting. + e := fmt.Errorf("no records found for piece %s", pieceCid) + log.Infow("skipping ingestion: piece has no records", "piece", pieceCid, "propCid", proposalCid, "err", e) + return nil, skipError(e) } mhi, err := provider.CarMultihashIterator(ii) if err != nil { - return nil, fmt.Errorf("failed to get mhiterator: %w", err) + // Bad index, skip over this piece and continue ingesting + err = fmt.Errorf("failed to get mhiterator: %w", err) + log.Infow("skipping ingestion", "piece", pieceCid, "propCid", proposalCid, "err", err) + return nil, skipError(err) } + + log.Debugw("returning piece iterator", "piece", pieceCid, "propCid", proposalCid, "err", err) return mhi, nil } // convert context ID to proposal Cid proposalCid, err := cid.Cast(contextID) if err != nil { - return nil, fmt.Errorf("failed to cast context ID to a cid") + // Bad contextID, skip over this piece and continue ingesting + err = fmt.Errorf("failed to cast context ID to a cid") + log.Infow("skipping ingestion", "proposalCid", proposalCid, "err", err) + return nil, skipError(err) } - // go from proposal cid -> piece cid by looking up deal in boost and if we can't find it there -> then markets - // check Boost deals DB + // Look up deal by proposal cid in the boost database. + // If we can't find it there check legacy markets DB. pds, boostErr := w.dealsDB.BySignedProposalCID(ctx, proposalCid) if boostErr == nil { + // Found the deal, get an iterator over the piece pieceCid := pds.ClientDealProposal.Proposal.PieceCID - return provideF(pieceCid) + return provideF(proposalCid, pieceCid) } - // check in legacy markets + // Check if it's a "not found" error + if !errors.Is(boostErr, sql.ErrNoRows) { + // It's not a "not found" error: there was a problem accessing the + // database. Pause ingestion until the user can fix the DB. + e := fmt.Errorf("getting deal with proposal cid %s from boost database: %w", proposalCid, boostErr) + log.Infow("pausing ingestion", "proposalCid", proposalCid, "err", e) + return nil, e + } + + // Deal was not found in boost DB - check in legacy markets md, legacyErr := w.legacyProv.GetLocalDeal(proposalCid) if legacyErr == nil { - return provideF(md.Proposal.PieceCID) + // Found the deal, get an interator over the piece + return provideF(proposalCid, md.Proposal.PieceCID) } - return nil, fmt.Errorf("failed to look up deal in Boost, err=%s and Legacy Markets, err=%s", boostErr, legacyErr) + // Check if it's a "not found" error + if !errors.Is(legacyErr, datastore.ErrNotFound) { + // It's not a "not found" error: there was a problem accessing the + // legacy database. Pause ingestion until the user can fix the legacy DB. + e := fmt.Errorf("getting deal with proposal cid %s from Legacy Markets: %w", proposalCid, legacyErr) + log.Infow("pausing ingestion", "proposalCid", proposalCid, "err", e) + return nil, e + } + + // The deal was not found in the boost or legacy database. + // Skip this deal and continue ingestion. + err = fmt.Errorf("deal with proposal cid %s not found", proposalCid) + log.Infow("skipping ingestion", "proposalCid", proposalCid, "err", err) + return nil, skipError(err) } func (w *Wrapper) IndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) { diff --git a/node/config/def.go b/node/config/def.go index 99cc02602..c9b530d5e 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -203,8 +203,6 @@ func DefaultBoost() *Boost { TopicName: "", PurgeCacheOnStart: false, - WebHost: "cid.contact", - Announce: IndexProviderAnnounceConfig{ AnnounceOverHttp: false, DirectAnnounceURLs: []string{"https://cid.contact/ingest/announce"}, diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index cff07ffef..adc75e965 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -455,6 +455,102 @@ them disabled. These will be completely deprecated soon.`, Comment: `The port that the graphql server listens on`, }, }, + "IndexProviderAnnounceConfig": []DocField{ + { + Name: "AnnounceOverHttp", + Type: "bool", + + Comment: `Make a direct announcement to a list of indexing nodes over http. +Note that announcements are already made over pubsub regardless +of this setting.`, + }, + { + Name: "DirectAnnounceURLs", + Type: "[]string", + + Comment: `The list of URLs of indexing nodes to announce to.`, + }, + }, + "IndexProviderConfig": []DocField{ + { + Name: "Enable", + Type: "bool", + + Comment: `Enable set whether to enable indexing announcement to the network and expose endpoints that +allow indexer nodes to process announcements. Enabled by default.`, + }, + { + Name: "EntriesCacheCapacity", + Type: "int", + + Comment: `EntriesCacheCapacity sets the maximum capacity to use for caching the indexing advertisement +entries. Defaults to 1024 if not specified. The cache is evicted using LRU policy. The +maximum storage used by the cache is a factor of EntriesCacheCapacity, EntriesChunkSize and +the length of multihashes being advertised. For example, advertising 128-bit long multihashes +with the default EntriesCacheCapacity, and EntriesChunkSize means the cache size can grow to +256MiB when full.`, + }, + { + Name: "EntriesChunkSize", + Type: "int", + + Comment: `EntriesChunkSize sets the maximum number of multihashes to include in a single entries chunk. +Defaults to 16384 if not specified. Note that chunks are chained together for indexing +advertisements that include more multihashes than the configured EntriesChunkSize.`, + }, + { + Name: "TopicName", + Type: "string", + + Comment: `TopicName sets the topic name on which the changes to the advertised content are announced. +If not explicitly specified, the topic name is automatically inferred from the network name +in following format: '/indexer/ingest/' +Defaults to empty, which implies the topic name is inferred from network name.`, + }, + { + Name: "PurgeCacheOnStart", + Type: "bool", + + Comment: `PurgeCacheOnStart sets whether to clear any cached entries chunks when the provider engine +starts. By default, the cache is rehydrated from previously cached entries stored in +datastore if any is present.`, + }, + { + Name: "Announce", + Type: "IndexProviderAnnounceConfig", + + Comment: ``, + }, + { + Name: "HttpPublisher", + Type: "IndexProviderHttpPublisherConfig", + + Comment: ``, + }, + }, + "IndexProviderHttpPublisherConfig": []DocField{ + { + Name: "Enabled", + Type: "bool", + + Comment: `If not enabled, requests are served over graphsync instead.`, + }, + { + Name: "PublicHostname", + Type: "string", + + Comment: `Set the public hostname / IP for the index provider listener. +eg "82.129.73.111" +This is usually the same as the for the boost node.`, + }, + { + Name: "Port", + Type: "int", + + Comment: `Set the port on which to listen for index provider requests over HTTP. +Note that this port must be open on the firewall.`, + }, + }, "LocalIndexDirectoryConfig": []DocField{ { Name: "Yugabyte", diff --git a/node/config/types.go b/node/config/types.go index d9d382995..ef2fdd329 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -313,9 +313,6 @@ type IndexProviderConfig struct { // datastore if any is present. PurgeCacheOnStart bool - // The network indexer host that the web UI should link to for published announcements - WebHost string - Announce IndexProviderAnnounceConfig HttpPublisher IndexProviderHttpPublisherConfig diff --git a/node/impl/boost.go b/node/impl/boost.go index fb2d24d57..ca85cbae3 100644 --- a/node/impl/boost.go +++ b/node/impl/boost.go @@ -172,6 +172,14 @@ func (sm *BoostAPI) BoostIndexerListMultihashes(ctx context.Context, proposalCid } } +func (sm *BoostAPI) BoostIndexerAnnounceLatest(ctx context.Context) (cid.Cid, error) { + return sm.IndexProvider.IndexerAnnounceLatest(ctx) +} + +func (sm *BoostAPI) BoostIndexerAnnounceLatestHttp(ctx context.Context, announceUrls []string) (cid.Cid, error) { + return sm.IndexProvider.IndexerAnnounceLatestHttp(ctx, announceUrls) +} + func (sm *BoostAPI) BoostOfflineDealWithData(ctx context.Context, dealUuid uuid.UUID, filePath string, delAfterImport bool) (*api.ProviderDealRejectionInfo, error) { res, err := sm.StorageProvider.ImportOfflineDealData(ctx, dealUuid, filePath, delAfterImport) return res, err diff --git a/node/modules/storageminer_idxprov.go b/node/modules/storageminer_idxprov.go index 3af5eeea7..c2c2d78aa 100644 --- a/node/modules/storageminer_idxprov.go +++ b/node/modules/storageminer_idxprov.go @@ -13,7 +13,6 @@ import ( datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/filecoin-project/go-data-transfer/transport/graphsync" datatransferv2 "github.com/filecoin-project/go-data-transfer/v2" - "github.com/filecoin-project/lotus/node/config" lotus_dtypes "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" @@ -79,7 +78,8 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo "pid", marketHost.ID(), "topic", topicName, "retAddrs", marketHost.Addrs()) - // If announcements to the network are enabled, then set options for datatransfer publisher. + + // If announcements to the network are enabled, then set options for the publisher. var e *engine.Engine if cfg.Enable { // Join the indexer topic using the market's pubsub instance. Otherwise, the provider @@ -95,11 +95,6 @@ func IndexProvider(cfg config.IndexProviderConfig) func(params IdxProv, marketHo // The extra data is required by the lotus-specific index-provider gossip message validators. ma := address.Address(maddr) opts = append(opts, - engine.WithPublisherKind(engine.DataTransferPublisher), - engine.WithDataTransfer(dtV1ToIndexerDT(dt, func() ipld.LinkSystem { - return *e.LinkSystem() - })), - engine.WithExtraGossipData(ma.Bytes()), engine.WithTopic(t), engine.WithExtraGossipData(ma.Bytes()), ) diff --git a/react/src/Mpool.css b/react/src/Mpool.css index 45ca741b3..2a30e787e 100644 --- a/react/src/Mpool.css +++ b/react/src/Mpool.css @@ -119,7 +119,7 @@ height: 0.5em; } -.mpool .params { +.mpool .params{ width: 1080px; text-overflow: ellipsis; cursor: pointer; @@ -128,8 +128,8 @@ white-space: nowrap; } -.mpool .params.expanded { +.mpool .params:hover{ overflow: visible; + white-space: normal; width: auto; - white-space: pre } \ No newline at end of file diff --git a/react/src/Mpool.js b/react/src/Mpool.js index 5cd05be7f..b23b4802d 100644 --- a/react/src/Mpool.js +++ b/react/src/Mpool.js @@ -83,7 +83,7 @@ function MpoolMessage(props) { Params -
{msg.Params}
+
{msg.Params}
diff --git a/retrievalmarket/server/channelstate.go b/retrievalmarket/server/channelstate.go index f2a46c433..1f1b48ed8 100644 --- a/retrievalmarket/server/channelstate.go +++ b/retrievalmarket/server/channelstate.go @@ -4,6 +4,7 @@ import ( "bytes" "github.com/filecoin-project/boost-gfm/retrievalmarket" + graphsync "github.com/filecoin-project/boost-graphsync" datatransfer "github.com/filecoin-project/go-data-transfer" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime" diff --git a/storagemarket/lp2pimpl/net.go b/storagemarket/lp2pimpl/net.go index d4dec3744..9eababb07 100644 --- a/storagemarket/lp2pimpl/net.go +++ b/storagemarket/lp2pimpl/net.go @@ -23,7 +23,6 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" - typegen "github.com/whyrusleeping/cbor-gen" "go.uber.org/zap" )