diff --git a/cli/retrieval-deals.go b/cli/retrieval-deals.go index 4364a26f..8dcbe2ff 100644 --- a/cli/retrieval-deals.go +++ b/cli/retrieval-deals.go @@ -7,8 +7,14 @@ import ( "text/tabwriter" "time" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-fil-markets/retrievalmarket" + "github.com/filecoin-project/venus/venus-shared/types" "github.com/filecoin-project/venus/venus-shared/types/market" + mtypes "github.com/ipfs-force-community/droplet/v2/types" + "github.com/ipfs-force-community/droplet/v2/utils" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/peer" "github.com/urfave/cli/v2" ) @@ -20,6 +26,7 @@ var RetrievalCmds = &cli.Command{ retrievalDealsCmds, retirevalAsksCmds, retrievalDealSelectionCmds, + queryProtocols, }, } @@ -192,3 +199,70 @@ func outputRetrievalDeal(deal *market.ProviderDealState) error { return nil } + +var queryProtocols = &cli.Command{ + Name: "protocols", + Usage: "query retrieval support protocols", + ArgsUsage: "", + Action: func(cctx *cli.Context) error { + if cctx.Args().Len() == 0 { + return fmt.Errorf("must pass miner") + } + + api, closer, err := NewFullNode(cctx) + if err != nil { + return err + } + defer closer() + + ctx := ReqContext(cctx) + + miner, err := address.NewFromString(cctx.Args().First()) + if err != nil { + return err + } + minerInfo, err := api.StateMinerInfo(ctx, miner, types.EmptyTSK) + if err != nil { + return err + } + if minerInfo.PeerId == nil { + return fmt.Errorf("peer id is nil") + } + + h, err := libp2p.New( + libp2p.Identity(nil), + libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"), + ) + if err != nil { + return err + } + + addrs, err := utils.ConvertMultiaddr(minerInfo.Multiaddrs) + if err != nil { + return err + } + if err := h.Connect(ctx, peer.AddrInfo{ID: *minerInfo.PeerId, Addrs: addrs}); err != nil { + return err + } + stream, err := h.NewStream(ctx, *minerInfo.PeerId, mtypes.TransportsProtocolID) + if err != nil { + return fmt.Errorf("failed to open stream to peer: %w", err) + } + _ = stream.SetReadDeadline(time.Now().Add(time.Minute)) + //nolint: errcheck + defer stream.SetReadDeadline(time.Time{}) + + // Read the response from the stream + queryResponsei, err := mtypes.BindnodeRegistry.TypeFromReader(stream, (*mtypes.QueryResponse)(nil), dagcbor.Decode) + if err != nil { + return fmt.Errorf("reading query response: %w", err) + } + queryResponse := queryResponsei.(*mtypes.QueryResponse) + + for _, p := range queryResponse.Protocols { + fmt.Println(p) + } + + return nil + }, +} diff --git a/cmd/droplet-client/main.go b/cmd/droplet-client/main.go index 872086fd..3df42b9e 100644 --- a/cmd/droplet-client/main.go +++ b/cmd/droplet-client/main.go @@ -322,5 +322,5 @@ func marketClient(cctx *cli.Context) error { apiHandles := []rpc.APIHandle{ {Path: "/rpc/v0", API: &marketCli}, } - return rpc.ServeRPC(ctx, cfg, &cfg.API, mux.NewRouter(), 1000, cli2.API_NAMESPACE_MARKET_CLIENT, nil, apiHandles, finishCh) + return rpc.ServeRPC(ctx, cfg, &cfg.API, mux.NewRouter(), 1000, cli2.API_NAMESPACE_MARKET_CLIENT, nil, apiHandles, finishCh, nil) } diff --git a/cmd/droplet/run.go b/cmd/droplet/run.go index bd05d68b..2ab41c3a 100644 --- a/cmd/droplet/run.go +++ b/cmd/droplet/run.go @@ -28,6 +28,7 @@ import ( "github.com/ipfs-force-community/droplet/v2/paychmgr" "github.com/ipfs-force-community/droplet/v2/piecestorage" "github.com/ipfs-force-community/droplet/v2/retrievalprovider" + "github.com/ipfs-force-community/droplet/v2/retrievalprovider/httpretrieval" "github.com/ipfs-force-community/droplet/v2/rpc" "github.com/ipfs-force-community/droplet/v2/storageprovider" types2 "github.com/ipfs-force-community/droplet/v2/types" @@ -243,6 +244,10 @@ func runDaemon(cctx *cli.Context) error { if err = router.Handle("/resource", rpc.NewPieceStorageServer(resAPI.PieceStorageMgr)).GetError(); err != nil { return fmt.Errorf("handle 'resource' failed: %w", err) } + httpRetrievalServer, err := httpretrieval.NewServer(&cfg.PieceStorage) + if err != nil { + return err + } var iMarket marketapiV1.IMarketStruct permission.PermissionProxy(marketapiV1.IMarket(resAPI), &iMarket) @@ -252,5 +257,6 @@ func runDaemon(cctx *cli.Context) error { {Path: "/rpc/v1", API: api}, {Path: "/rpc/v0", API: v0api.WrapperV1IMarket{IMarket: api}}, } - return rpc.ServeRPC(ctx, cfg, &cfg.API, router, 1000, cli2.API_NAMESPACE_VENUS_MARKET, authClient, apiHandles, finishCh) + + return rpc.ServeRPC(ctx, cfg, &cfg.API, router, 1000, cli2.API_NAMESPACE_VENUS_MARKET, authClient, apiHandles, finishCh, httpRetrievalServer) } diff --git a/config/common.go b/config/common.go index d7866dee..595710be 100644 --- a/config/common.go +++ b/config/common.go @@ -118,6 +118,10 @@ type ProviderConfig struct { RetrievalPaymentAddress Address DealPublishAddress []Address + + // The public multi-address for retrieving deals with droplet. + // Note: Must be in multiaddr format, eg /ip4/127.0.0.1/tcp/41235/http + HTTPRetrievalMultiaddr string } func defaultProviderConfig() *ProviderConfig { @@ -153,5 +157,6 @@ func defaultProviderConfig() *ProviderConfig { MaxPublishDealsFee: types.FIL(types.NewInt(0)), MaxMarketBalanceAddFee: types.FIL(types.NewInt(0)), + HTTPRetrievalMultiaddr: "", } } diff --git a/go.mod b/go.mod index e43501ae..1da1a5ad 100644 --- a/go.mod +++ b/go.mod @@ -38,7 +38,7 @@ require ( github.com/hashicorp/go-multierror v1.1.1 github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea - github.com/ipfs-force-community/sophon-auth v1.12.0-rc1 + github.com/ipfs-force-community/sophon-auth v1.12.0-rc1.0.20230614091443-9d27e566c685 github.com/ipfs-force-community/sophon-gateway v1.12.0-rc1 github.com/ipfs-force-community/sophon-messager v1.12.0-rc1 github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7 @@ -96,6 +96,7 @@ require ( contrib.go.opencensus.io/exporter/jaeger v0.2.1 // indirect contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect github.com/DataDog/zstd v1.4.5 // indirect + github.com/NYTimes/gziphandler v1.1.1 github.com/Stebalien/go-bitfield v0.0.1 // indirect github.com/acobaugh/osrelease v0.0.0-20181218015638-a93a0a55a249 // indirect github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect @@ -291,7 +292,7 @@ require ( go.opentelemetry.io/otel/trace v1.10.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/dig v1.14.1 // indirect - go.uber.org/zap v1.23.0 // indirect + go.uber.org/zap v1.23.0 go4.org v0.0.0-20200411211856-f5505b9728dd // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/crypto v0.9.0 // indirect diff --git a/go.sum b/go.sum index c5dac9b6..645e4019 100644 --- a/go.sum +++ b/go.sum @@ -88,6 +88,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.0/go.mod h1:yDgFjdqOqDEKOvasDdhWNXY github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y= +github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I= +github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c= github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= @@ -830,8 +832,8 @@ github.com/ipfs-force-community/metrics v1.0.0/go.mod h1:mn40SioMuKtjmRumHFy/fJ2 github.com/ipfs-force-community/metrics v1.0.1-0.20211022060227-11142a08b729/go.mod h1:mn40SioMuKtjmRumHFy/fJ26Pn028XuDjUJE9dorjyw= github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea h1:mtR+uqI444dF1j3IcpDq3MCFBR5B/OQnHFulTuM7a6o= github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea/go.mod h1:5rXHGpIN2wCkdgDnucJ1rEeKhLzbAur5IO1aWAIwogY= -github.com/ipfs-force-community/sophon-auth v1.12.0-rc1 h1:Rm91rxnwbx6ejEIPh7Bmf/+cWaNezhSj/zwmIkG9MG4= -github.com/ipfs-force-community/sophon-auth v1.12.0-rc1/go.mod h1:cGYCfjA/BDB/Km9P7Za6t1hmwpFlPKf8ho/CHmLG81Y= +github.com/ipfs-force-community/sophon-auth v1.12.0-rc1.0.20230614091443-9d27e566c685 h1:ZoPQPSb0UNcSCBDoBCwVomxBbBXrksMGHaQISBhNQSY= +github.com/ipfs-force-community/sophon-auth v1.12.0-rc1.0.20230614091443-9d27e566c685/go.mod h1:cGYCfjA/BDB/Km9P7Za6t1hmwpFlPKf8ho/CHmLG81Y= github.com/ipfs-force-community/sophon-gateway v1.12.0-rc1 h1:c5BdvrB4g+Se/Uyotef6TR2H9pkm4ExtawR0zPx+GDs= github.com/ipfs-force-community/sophon-gateway v1.12.0-rc1/go.mod h1:F+f4qzx+uVMPftk/sv2JnNOFgxpl83l5zrb9UFtzcCM= github.com/ipfs-force-community/sophon-messager v1.12.0-rc1 h1:278xZjJt7yE18VOgPp/Nv9Scib37CkN2ZsBRrPSN1Qg= diff --git a/retrievalprovider/httpretrieval/READMD.md b/retrievalprovider/httpretrieval/READMD.md new file mode 100644 index 00000000..e8b798c2 --- /dev/null +++ b/retrievalprovider/httpretrieval/READMD.md @@ -0,0 +1,18 @@ +## http 检索 + +支持通过 piece cid 检索,通过直接去 piecestore 查找和读取 piece,然后返回结果。 + +### 配置 + +需要调整 `droplet` 配置文件 `config.toml` 中 `HTTPRetrievalMultiaddr` 字段的值,参考下面示例: + +```toml +[CommonProvider] + HTTPRetrievalMultiaddr = "/ip4//tcp/41235/http" +``` + +> 上面配置中的 `ip` 是你本机的 IP 地址,`41235` 要确保和 `droplet` 使用的端口一致。 + +### TODO + +[filplus 提出的 HTTP V2 检索要求](https://github.com/data-preservation-programs/RetrievalBot/blob/main/filplus.md#http-v2) diff --git a/retrievalprovider/httpretrieval/server.go b/retrievalprovider/httpretrieval/server.go new file mode 100644 index 00000000..eaae579c --- /dev/null +++ b/retrievalprovider/httpretrieval/server.go @@ -0,0 +1,158 @@ +package httpretrieval + +import ( + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/NYTimes/gziphandler" + "github.com/filecoin-project/venus/venus-shared/types" + "github.com/ipfs-force-community/droplet/v2/config" + "github.com/ipfs-force-community/droplet/v2/piecestorage" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "go.uber.org/zap" +) + +var log = logging.Logger("httpserver") + +type Server struct { + // path string + pieceMgr *piecestorage.PieceStorageManager +} + +func NewServer(cfg *config.PieceStorage) (*Server, error) { + pieceMgr, err := piecestorage.NewPieceStorageManager(cfg) + if err != nil { + return nil, err + } + + return &Server{pieceMgr: pieceMgr}, nil +} + +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.RetrievalByPieceCID(w, r) +} + +func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) { + pieceCID, err := convertPieceCID(r.URL.Path) + if err != nil { + log.Warn(err) + badResponse(w, http.StatusBadRequest, err) + return + } + + ctx := r.Context() + pieceCIDStr := pieceCID.String() + log := log.With("piece cid", pieceCIDStr) + log.Info("start retrieval deal") + store, err := s.pieceMgr.FindStorageForRead(ctx, pieceCIDStr) + if err != nil { + log.Warn(err) + badResponse(w, http.StatusNotFound, err) + return + } + mountReader, err := store.GetMountReader(ctx, pieceCIDStr) + if err != nil { + log.Warn(err) + badResponse(w, http.StatusNotFound, err) + return + } + + serveContent(w, r, mountReader, log) + log.Info("end retrieval deal") +} + +func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, log *zap.SugaredLogger) { + // Set the Content-Type header explicitly so that http.ServeContent doesn't + // try to do it implicitly + w.Header().Set("Content-Type", "application/piece") + + var writer http.ResponseWriter + + // http.ServeContent ignores errors when writing to the stream, so we + // replace the writer with a class that watches for errors + var err error + writeErrWatcher := &writeErrorWatcher{ResponseWriter: w, onError: func(e error) { + err = e + }} + + writer = writeErrWatcher //Need writeErrWatcher to be of type writeErrorWatcher for addCommas() + + // Note that the last modified time is a constant value because the data + // in a piece identified by a cid will never change. + start := time.Now() + log.Infof("start %s\t %d\tGET %s", start, http.StatusOK, r.URL) + isGzipped := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") + if isGzipped { + // If Accept-Encoding header contains gzip then send a gzipped response + gzwriter := gziphandler.GzipResponseWriter{ + ResponseWriter: writeErrWatcher, + } + // Close the writer to flush buffer + defer gzwriter.Close() // nolint + writer = &gzwriter + } + + if r.Method == "HEAD" { + // For an HTTP HEAD request ServeContent doesn't send any data (just headers) + http.ServeContent(writer, r, "", time.Time{}, content) + log.Infof("%d\tHEAD %s", http.StatusOK, r.URL) + return + } + + // Send the content + http.ServeContent(writer, r, "", time.Unix(1, 0), content) + + // Write a line to the log + end := time.Now() + completeMsg := fmt.Sprintf("GET %s\t%s - %s: %s / %s transferred", + r.URL, end.Format(time.RFC3339), start.Format(time.RFC3339), time.Since(start), + fmt.Sprintf("%s (%d B)", types.SizeStr(types.NewInt(writeErrWatcher.count)), writeErrWatcher.count)) + if isGzipped { + completeMsg += " (gzipped)" + } + if err == nil { + log.Infof("%s %s", completeMsg, "Done") + } else { + log.Warnf("%s %s\n%s", completeMsg, "FAIL", err) + } +} + +func convertPieceCID(path string) (cid.Cid, error) { + l := len("/piece/") + if len(path) <= l { + return cid.Undef, fmt.Errorf("path %s too short", path) + } + + cidStr := path[l:] + c, err := cid.Parse(cidStr) + if err != nil { + return cid.Undef, fmt.Errorf("parse piece cid failed: %s, %v", cidStr, err) + } + + return c, nil +} + +func badResponse(w http.ResponseWriter, code int, err error) { + w.WriteHeader(code) + w.Write([]byte("Error: " + err.Error())) // nolint +} + +// writeErrorWatcher calls onError if there is an error writing to the writer +type writeErrorWatcher struct { + http.ResponseWriter + count uint64 + onError func(err error) +} + +func (w *writeErrorWatcher) Write(bz []byte) (int, error) { + count, err := w.ResponseWriter.Write(bz) + if err != nil { + w.onError(err) + } + w.count += uint64(count) + return count, err +} diff --git a/retrievalprovider/httpretrieval/server_test.go b/retrievalprovider/httpretrieval/server_test.go new file mode 100644 index 00000000..0956e010 --- /dev/null +++ b/retrievalprovider/httpretrieval/server_test.go @@ -0,0 +1,119 @@ +package httpretrieval + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "regexp" + "testing" + "time" + + "github.com/gorilla/mux" + "github.com/ipfs-force-community/droplet/v2/config" + "github.com/stretchr/testify/assert" +) + +func TestPathRegexp(t *testing.T) { + reg, err := regexp.Compile(`/piece/[a-z0-9]+`) + assert.NoError(t, err) + + cases := []struct { + str string + expect bool + }{ + { + str: "xxx", + expect: false, + }, + { + str: "/piece/", + expect: false, + }, + { + str: "/piece/ssss", + expect: true, + }, + { + str: "/piece/ss1ss1", + expect: true, + }, + } + + for _, c := range cases { + assert.Equal(t, c.expect, reg.MatchString(c.str)) + } +} + +func TestRetrievalByPiece(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tmpDri := t.TempDir() + cfg := config.DefaultMarketConfig + cfg.Home.HomeDir = tmpDri + cfg.PieceStorage.Fs = []*config.FsPieceStorage{ + { + Name: "test", + ReadOnly: false, + Path: tmpDri, + }, + } + assert.NoError(t, config.SaveConfig(cfg)) + + pieceStr := "baga6ea4seaqpzcr744w2rvqhkedfqbuqrbo7xtkde2ol6e26khu3wni64nbpaeq" + buf := &bytes.Buffer{} + f, err := os.Create(filepath.Join(tmpDri, pieceStr)) + assert.NoError(t, err) + for i := 0; i < 100; i++ { + buf.WriteString("TEST TEST\n") + } + _, err = f.Write(buf.Bytes()) + assert.NoError(t, err) + assert.NoError(t, f.Close()) + + s, err := NewServer(&cfg.PieceStorage) + assert.NoError(t, err) + port := "34897" + startHTTPServer(ctx, t, port, s) + + url := fmt.Sprintf("http://127.0.0.1:%s/piece/%s", port, pieceStr) + req, err := http.NewRequest(http.MethodGet, url, nil) + assert.NoError(t, err) + resp, err := http.DefaultClient.Do(req) + assert.NoError(t, err) + defer resp.Body.Close() // nolint + + data, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + assert.Equal(t, buf.Bytes(), data) +} + +func startHTTPServer(ctx context.Context, t *testing.T, port string, s *Server) { + mux := mux.NewRouter() + err := mux.HandleFunc("/piece/{cid}", s.RetrievalByPieceCID).GetError() + assert.NoError(t, err) + + ser := &http.Server{ + Addr: "127.0.0.1:" + port, + Handler: mux, + } + + go func() { + if err := ser.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + assert.NoError(t, err) + } + }() + + go func() { + // wait server exit + <-ctx.Done() + assert.NoError(t, ser.Shutdown(context.TODO())) + }() + // wait serve up + time.Sleep(time.Second * 2) +} diff --git a/retrievalprovider/modules.go b/retrievalprovider/modules.go index b2ed6514..77c186ae 100644 --- a/retrievalprovider/modules.go +++ b/retrievalprovider/modules.go @@ -82,5 +82,6 @@ var RetrievalProviderOpts = func(cfg *config.MarketConfig) builder.Option { builder.Override(new(gatewayAPIV2.IMarketEvent), NewMarketEventStream), builder.Override(new(gatewayAPIV2.IMarketClient), builder.From(new(gatewayAPIV2.IMarketEvent))), builder.Override(new(gatewayAPIV2.IMarketServiceProvider), builder.From(new(gatewayAPIV2.IMarketEvent))), + builder.Override(new(*TransportsListener), NewTransportsListener), ) } diff --git a/retrievalprovider/provider.go b/retrievalprovider/provider.go index ba683d0a..c9807fc4 100644 --- a/retrievalprovider/provider.go +++ b/retrievalprovider/provider.go @@ -49,6 +49,8 @@ type RetrievalProvider struct { storageDealRepo repo.StorageDealRepo retrievalStreamHandler *RetrievalStreamHandler + + transportListener *TransportsListener } // NewProvider returns a new retrieval Provider @@ -63,6 +65,7 @@ func NewProvider( rdf config.RetrievalDealFilter, pieceStorageMgr *piecestorage.PieceStorageManager, gatewayMarketClient gateway.IMarketClient, + transportLister *TransportsListener, ) (*RetrievalProvider, error) { storageDealsRepo := repo.StorageDealRepo() retrievalDealRepo := repo.RetrievalDealRepo() @@ -77,6 +80,7 @@ func NewProvider( storageDealRepo: storageDealsRepo, stores: stores.NewReadOnlyBlockstores(), retrievalStreamHandler: NewRetrievalStreamHandler(cfg, retrievalAskRepo, retrievalDealRepo, storageDealsRepo, pieceInfo), + transportListener: transportLister, } retrievalHandler := NewRetrievalDealHandler(&providerDealEnvironment{p}, retrievalDealRepo, storageDealsRepo, gatewayMarketClient, pieceStorageMgr) @@ -138,12 +142,14 @@ func NewProvider( // Stop stops handling incoming requests. func (p *RetrievalProvider) Stop() error { + p.transportListener.Stop() return p.network.StopHandlingRequests() } // Start begins listening for deals on the given host. // Start must be called in order to accept incoming deals. func (p *RetrievalProvider) Start(ctx context.Context) error { + p.transportListener.Start() return p.network.SetDelegate(p.retrievalStreamHandler) } diff --git a/retrievalprovider/transports.go b/retrievalprovider/transports.go new file mode 100644 index 00000000..33500e9d --- /dev/null +++ b/retrievalprovider/transports.go @@ -0,0 +1,79 @@ +package retrievalprovider + +import ( + "fmt" + "time" + + "github.com/ipfs-force-community/droplet/v2/config" + "github.com/ipfs-force-community/droplet/v2/types" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/multiformats/go-multiaddr" +) + +// TransportsListener listens for incoming queries over libp2p +type TransportsListener struct { + host host.Host + protocols []types.Protocol +} + +func NewTransportsListener(h host.Host, cfg *config.MarketConfig) (*TransportsListener, error) { + var protos []types.Protocol + + // Get the libp2p addresses from the Host + if len(h.Addrs()) > 0 { + protos = append(protos, types.Protocol{ + Name: "libp2p", + Addresses: h.Addrs(), + }) + } + + // If there's an http retrieval address specified, add HTTP to the list + // of supported protocols + // todo: handle cfg.Miners[].HTTPRetrievalMultiaddr? + if len(cfg.CommonProvider.HTTPRetrievalMultiaddr) != 0 { + maddr, err := multiaddr.NewMultiaddr(cfg.CommonProvider.HTTPRetrievalMultiaddr) + if err != nil { + return nil, fmt.Errorf("could not parse '%s' as multiaddr: %w", cfg.CommonProvider.HTTPRetrievalMultiaddr, err) + } + + protos = append(protos, types.Protocol{ + Name: "http", + Addresses: []multiaddr.Multiaddr{maddr}, + }) + } + + return &TransportsListener{ + host: h, + protocols: protos, + }, nil +} + +func (l *TransportsListener) Start() { + l.host.SetStreamHandler(types.TransportsProtocolID, l.handleNewQueryStream) +} + +func (l *TransportsListener) Stop() { + l.host.RemoveStreamHandler(types.TransportsProtocolID) +} + +// Called when the client opens a libp2p stream +func (l *TransportsListener) handleNewQueryStream(s network.Stream) { + defer s.Close() // nolint + + log.Debugw("query", "peer", s.Conn().RemotePeer()) + + response := types.QueryResponse{Protocols: l.protocols} + + // Set a deadline on writing to the stream so it doesn't hang + _ = s.SetWriteDeadline(time.Now().Add(time.Second * 30)) + defer s.SetWriteDeadline(time.Time{}) // nolint + + // Write the response to the client + err := types.BindnodeRegistry.TypeToWriter(&response, s, dagcbor.Encode) + if err != nil { + log.Infow("error writing query response", "peer", s.Conn().RemotePeer(), "err", err) + return + } +} diff --git a/rpc/rpc.go b/rpc/rpc.go index d113d622..f4d0b61f 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "net/http" "path" + "regexp" "github.com/etherlabsio/healthcheck/v2" "github.com/filecoin-project/go-jsonrpc" @@ -19,6 +20,7 @@ import ( "github.com/ipfs-force-community/sophon-auth/jwtclient" "github.com/ipfs-force-community/droplet/v2/config" + "github.com/ipfs-force-community/droplet/v2/retrievalprovider/httpretrieval" ) var log = logging.Logger("modules") @@ -38,6 +40,7 @@ func ServeRPC( authClient *jwtclient.AuthClient, apiHandles []APIHandle, shutdownCh <-chan struct{}, + httpRetrievalServer *httpretrieval.Server, ) error { serverOptions := make([]jsonrpc.ServerOption, 0) if maxRequestSize != 0 { // config set @@ -67,6 +70,10 @@ func ServeRPC( authMux = jwtclient.NewAuthMux(localJwtClient, nil, mux) } authMux.TrustHandle("/healthcheck", healthcheck.Handler()) + if httpRetrievalServer != nil { + authMux.TrustHandle("/piece/", httpRetrievalServer, jwtclient.RegexpOption(regexp.MustCompile(`/piece/[a-z0-9]+`))) + } + srv := &http.Server{Handler: authMux} go func() { diff --git a/types/transports.go b/types/transports.go new file mode 100644 index 00000000..38f656ea --- /dev/null +++ b/types/transports.go @@ -0,0 +1,57 @@ +package types + +import ( + _ "embed" + "fmt" + + "github.com/ipld/go-ipld-prime/node/bindnode" + "github.com/ipld/go-ipld-prime/node/bindnode/registry" + "github.com/multiformats/go-multiaddr" +) + +// boost retrieval protocol +const TransportsProtocolID = "/fil/retrieval/transports/1.0.0" + +// copy from https://github.com/filecoin-project/boost/blob/main/retrievalmarket/types/transports.go#L12 +type Protocol struct { + // The name of the transport protocol eg "libp2p" or "http" + Name string + // The address of the endpoint in multiaddr format + Addresses []multiaddr.Multiaddr +} + +type QueryResponse struct { + Protocols []Protocol +} + +//go:embed transports.ipldsch +var embedSchema []byte + +func multiAddrFromBytes(b []byte) (interface{}, error) { + ma, err := multiaddr.NewMultiaddrBytes(b) + if err != nil { + return nil, err + } + return &ma, err +} + +func multiAddrToBytes(iface interface{}) ([]byte, error) { + ma, ok := iface.(*multiaddr.Multiaddr) + if !ok { + return nil, fmt.Errorf("expected *Multiaddr value") + } + + return (*ma).Bytes(), nil +} + +var BindnodeRegistry = registry.NewRegistry() + +func init() { + var dummyMa multiaddr.Multiaddr + var bindnodeOptions = []bindnode.Option{ + bindnode.TypedBytesConverter(&dummyMa, multiAddrFromBytes, multiAddrToBytes), + } + if err := BindnodeRegistry.RegisterType((*QueryResponse)(nil), string(embedSchema), "QueryResponse", bindnodeOptions...); err != nil { + panic(err.Error()) + } +} diff --git a/types/transports.ipldsch b/types/transports.ipldsch new file mode 100644 index 00000000..f6ab6d79 --- /dev/null +++ b/types/transports.ipldsch @@ -0,0 +1,15 @@ +# Defines the response to a query asking which transport protocols a +# Storage Provider supports +type Multiaddr bytes + +type Protocol struct { + # The name of the transport protocol + # Known protocols: "libp2p", "http", "https" + Name String + # The addresses of the endpoint in multiaddr format + Addresses [Multiaddr] +} + +type QueryResponse struct { + Protocols [Protocol] +}