Skip to content

Commit

Permalink
Merge pull request #338 from ipfs-force-community/feat/http-retrieve
Browse files Browse the repository at this point in the history
feat: support http retrieval
  • Loading branch information
simlecode authored Jun 15, 2023
2 parents 86a318f + 5d09be0 commit 34a7277
Show file tree
Hide file tree
Showing 15 changed files with 554 additions and 6 deletions.
74 changes: 74 additions & 0 deletions cli/retrieval-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@ import (
"text/tabwriter"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-fil-markets/retrievalmarket"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/filecoin-project/venus/venus-shared/types/market"
mtypes "github.com/ipfs-force-community/droplet/v2/types"
"github.com/ipfs-force-community/droplet/v2/utils"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/urfave/cli/v2"
)
Expand All @@ -20,6 +26,7 @@ var RetrievalCmds = &cli.Command{
retrievalDealsCmds,
retirevalAsksCmds,
retrievalDealSelectionCmds,
queryProtocols,
},
}

Expand Down Expand Up @@ -192,3 +199,70 @@ func outputRetrievalDeal(deal *market.ProviderDealState) error {

return nil
}

var queryProtocols = &cli.Command{
Name: "protocols",
Usage: "query retrieval support protocols",
ArgsUsage: "<miner>",
Action: func(cctx *cli.Context) error {
if cctx.Args().Len() == 0 {
return fmt.Errorf("must pass miner")
}

api, closer, err := NewFullNode(cctx)
if err != nil {
return err
}
defer closer()

ctx := ReqContext(cctx)

miner, err := address.NewFromString(cctx.Args().First())
if err != nil {
return err
}
minerInfo, err := api.StateMinerInfo(ctx, miner, types.EmptyTSK)
if err != nil {
return err
}
if minerInfo.PeerId == nil {
return fmt.Errorf("peer id is nil")
}

h, err := libp2p.New(
libp2p.Identity(nil),
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
)
if err != nil {
return err
}

addrs, err := utils.ConvertMultiaddr(minerInfo.Multiaddrs)
if err != nil {
return err
}
if err := h.Connect(ctx, peer.AddrInfo{ID: *minerInfo.PeerId, Addrs: addrs}); err != nil {
return err
}
stream, err := h.NewStream(ctx, *minerInfo.PeerId, mtypes.TransportsProtocolID)
if err != nil {
return fmt.Errorf("failed to open stream to peer: %w", err)
}
_ = stream.SetReadDeadline(time.Now().Add(time.Minute))
//nolint: errcheck
defer stream.SetReadDeadline(time.Time{})

// Read the response from the stream
queryResponsei, err := mtypes.BindnodeRegistry.TypeFromReader(stream, (*mtypes.QueryResponse)(nil), dagcbor.Decode)
if err != nil {
return fmt.Errorf("reading query response: %w", err)
}
queryResponse := queryResponsei.(*mtypes.QueryResponse)

for _, p := range queryResponse.Protocols {
fmt.Println(p)
}

return nil
},
}
2 changes: 1 addition & 1 deletion cmd/droplet-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,5 +322,5 @@ func marketClient(cctx *cli.Context) error {
apiHandles := []rpc.APIHandle{
{Path: "/rpc/v0", API: &marketCli},
}
return rpc.ServeRPC(ctx, cfg, &cfg.API, mux.NewRouter(), 1000, cli2.API_NAMESPACE_MARKET_CLIENT, nil, apiHandles, finishCh)
return rpc.ServeRPC(ctx, cfg, &cfg.API, mux.NewRouter(), 1000, cli2.API_NAMESPACE_MARKET_CLIENT, nil, apiHandles, finishCh, nil)
}
8 changes: 7 additions & 1 deletion cmd/droplet/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ipfs-force-community/droplet/v2/paychmgr"
"github.com/ipfs-force-community/droplet/v2/piecestorage"
"github.com/ipfs-force-community/droplet/v2/retrievalprovider"
"github.com/ipfs-force-community/droplet/v2/retrievalprovider/httpretrieval"
"github.com/ipfs-force-community/droplet/v2/rpc"
"github.com/ipfs-force-community/droplet/v2/storageprovider"
types2 "github.com/ipfs-force-community/droplet/v2/types"
Expand Down Expand Up @@ -243,6 +244,10 @@ func runDaemon(cctx *cli.Context) error {
if err = router.Handle("/resource", rpc.NewPieceStorageServer(resAPI.PieceStorageMgr)).GetError(); err != nil {
return fmt.Errorf("handle 'resource' failed: %w", err)
}
httpRetrievalServer, err := httpretrieval.NewServer(&cfg.PieceStorage)
if err != nil {
return err
}

var iMarket marketapiV1.IMarketStruct
permission.PermissionProxy(marketapiV1.IMarket(resAPI), &iMarket)
Expand All @@ -252,5 +257,6 @@ func runDaemon(cctx *cli.Context) error {
{Path: "/rpc/v1", API: api},
{Path: "/rpc/v0", API: v0api.WrapperV1IMarket{IMarket: api}},
}
return rpc.ServeRPC(ctx, cfg, &cfg.API, router, 1000, cli2.API_NAMESPACE_VENUS_MARKET, authClient, apiHandles, finishCh)

return rpc.ServeRPC(ctx, cfg, &cfg.API, router, 1000, cli2.API_NAMESPACE_VENUS_MARKET, authClient, apiHandles, finishCh, httpRetrievalServer)
}
5 changes: 5 additions & 0 deletions config/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ type ProviderConfig struct {
RetrievalPaymentAddress Address

DealPublishAddress []Address

// The public multi-address for retrieving deals with droplet.
// Note: Must be in multiaddr format, eg /ip4/127.0.0.1/tcp/41235/http
HTTPRetrievalMultiaddr string
}

func defaultProviderConfig() *ProviderConfig {
Expand Down Expand Up @@ -153,5 +157,6 @@ func defaultProviderConfig() *ProviderConfig {

MaxPublishDealsFee: types.FIL(types.NewInt(0)),
MaxMarketBalanceAddFee: types.FIL(types.NewInt(0)),
HTTPRetrievalMultiaddr: "",
}
}
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1
github.com/howeyc/gopass v0.0.0-20210920133722-c8aef6fb66ef
github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea
github.com/ipfs-force-community/sophon-auth v1.12.0-rc1
github.com/ipfs-force-community/sophon-auth v1.12.0-rc1.0.20230614091443-9d27e566c685
github.com/ipfs-force-community/sophon-gateway v1.12.0-rc1
github.com/ipfs-force-community/sophon-messager v1.12.0-rc1
github.com/ipfs-force-community/venus-common-utils v0.0.0-20220217030526-e5e4c6bc14f7
Expand Down Expand Up @@ -96,6 +96,7 @@ require (
contrib.go.opencensus.io/exporter/jaeger v0.2.1 // indirect
contrib.go.opencensus.io/exporter/prometheus v0.4.0 // indirect
github.com/DataDog/zstd v1.4.5 // indirect
github.com/NYTimes/gziphandler v1.1.1
github.com/Stebalien/go-bitfield v0.0.1 // indirect
github.com/acobaugh/osrelease v0.0.0-20181218015638-a93a0a55a249 // indirect
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect
Expand Down Expand Up @@ -291,7 +292,7 @@ require (
go.opentelemetry.io/otel/trace v1.10.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.14.1 // indirect
go.uber.org/zap v1.23.0 // indirect
go.uber.org/zap v1.23.0
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ github.com/HdrHistogram/hdrhistogram-go v1.1.0/go.mod h1:yDgFjdqOqDEKOvasDdhWNXY
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/NYTimes/gziphandler v1.1.1 h1:ZUDjpQae29j0ryrS0u/B8HZfJBtBQHjqw2rQ2cqUQ3I=
github.com/NYTimes/gziphandler v1.1.1/go.mod h1:n/CVRwUEOgIxrgPvAQhUUr9oeUtvrhMomdKFjzJNB0c=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
Expand Down Expand Up @@ -830,8 +832,8 @@ github.com/ipfs-force-community/metrics v1.0.0/go.mod h1:mn40SioMuKtjmRumHFy/fJ2
github.com/ipfs-force-community/metrics v1.0.1-0.20211022060227-11142a08b729/go.mod h1:mn40SioMuKtjmRumHFy/fJ26Pn028XuDjUJE9dorjyw=
github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea h1:mtR+uqI444dF1j3IcpDq3MCFBR5B/OQnHFulTuM7a6o=
github.com/ipfs-force-community/metrics v1.0.1-0.20220824061112-ac916bacf2ea/go.mod h1:5rXHGpIN2wCkdgDnucJ1rEeKhLzbAur5IO1aWAIwogY=
github.com/ipfs-force-community/sophon-auth v1.12.0-rc1 h1:Rm91rxnwbx6ejEIPh7Bmf/+cWaNezhSj/zwmIkG9MG4=
github.com/ipfs-force-community/sophon-auth v1.12.0-rc1/go.mod h1:cGYCfjA/BDB/Km9P7Za6t1hmwpFlPKf8ho/CHmLG81Y=
github.com/ipfs-force-community/sophon-auth v1.12.0-rc1.0.20230614091443-9d27e566c685 h1:ZoPQPSb0UNcSCBDoBCwVomxBbBXrksMGHaQISBhNQSY=
github.com/ipfs-force-community/sophon-auth v1.12.0-rc1.0.20230614091443-9d27e566c685/go.mod h1:cGYCfjA/BDB/Km9P7Za6t1hmwpFlPKf8ho/CHmLG81Y=
github.com/ipfs-force-community/sophon-gateway v1.12.0-rc1 h1:c5BdvrB4g+Se/Uyotef6TR2H9pkm4ExtawR0zPx+GDs=
github.com/ipfs-force-community/sophon-gateway v1.12.0-rc1/go.mod h1:F+f4qzx+uVMPftk/sv2JnNOFgxpl83l5zrb9UFtzcCM=
github.com/ipfs-force-community/sophon-messager v1.12.0-rc1 h1:278xZjJt7yE18VOgPp/Nv9Scib37CkN2ZsBRrPSN1Qg=
Expand Down
18 changes: 18 additions & 0 deletions retrievalprovider/httpretrieval/READMD.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
## http 检索

支持通过 piece cid 检索,通过直接去 piecestore 查找和读取 piece,然后返回结果。

### 配置

需要调整 `droplet` 配置文件 `config.toml``HTTPRetrievalMultiaddr` 字段的值,参考下面示例:

```toml
[CommonProvider]
HTTPRetrievalMultiaddr = "/ip4/<ip>/tcp/41235/http"
```

> 上面配置中的 `ip` 是你本机的 IP 地址,`41235` 要确保和 `droplet` 使用的端口一致。
### TODO

[filplus 提出的 HTTP V2 检索要求](https://github.com/data-preservation-programs/RetrievalBot/blob/main/filplus.md#http-v2)
158 changes: 158 additions & 0 deletions retrievalprovider/httpretrieval/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package httpretrieval

import (
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/NYTimes/gziphandler"
"github.com/filecoin-project/venus/venus-shared/types"
"github.com/ipfs-force-community/droplet/v2/config"
"github.com/ipfs-force-community/droplet/v2/piecestorage"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/zap"
)

var log = logging.Logger("httpserver")

type Server struct {
// path string
pieceMgr *piecestorage.PieceStorageManager
}

func NewServer(cfg *config.PieceStorage) (*Server, error) {
pieceMgr, err := piecestorage.NewPieceStorageManager(cfg)
if err != nil {
return nil, err
}

return &Server{pieceMgr: pieceMgr}, nil
}

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.RetrievalByPieceCID(w, r)
}

func (s *Server) RetrievalByPieceCID(w http.ResponseWriter, r *http.Request) {
pieceCID, err := convertPieceCID(r.URL.Path)
if err != nil {
log.Warn(err)
badResponse(w, http.StatusBadRequest, err)
return
}

ctx := r.Context()
pieceCIDStr := pieceCID.String()
log := log.With("piece cid", pieceCIDStr)
log.Info("start retrieval deal")
store, err := s.pieceMgr.FindStorageForRead(ctx, pieceCIDStr)
if err != nil {
log.Warn(err)
badResponse(w, http.StatusNotFound, err)
return
}
mountReader, err := store.GetMountReader(ctx, pieceCIDStr)
if err != nil {
log.Warn(err)
badResponse(w, http.StatusNotFound, err)
return
}

serveContent(w, r, mountReader, log)
log.Info("end retrieval deal")
}

func serveContent(w http.ResponseWriter, r *http.Request, content io.ReadSeeker, log *zap.SugaredLogger) {
// Set the Content-Type header explicitly so that http.ServeContent doesn't
// try to do it implicitly
w.Header().Set("Content-Type", "application/piece")

var writer http.ResponseWriter

// http.ServeContent ignores errors when writing to the stream, so we
// replace the writer with a class that watches for errors
var err error
writeErrWatcher := &writeErrorWatcher{ResponseWriter: w, onError: func(e error) {
err = e
}}

writer = writeErrWatcher //Need writeErrWatcher to be of type writeErrorWatcher for addCommas()

// Note that the last modified time is a constant value because the data
// in a piece identified by a cid will never change.
start := time.Now()
log.Infof("start %s\t %d\tGET %s", start, http.StatusOK, r.URL)
isGzipped := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip")
if isGzipped {
// If Accept-Encoding header contains gzip then send a gzipped response
gzwriter := gziphandler.GzipResponseWriter{
ResponseWriter: writeErrWatcher,
}
// Close the writer to flush buffer
defer gzwriter.Close() // nolint
writer = &gzwriter
}

if r.Method == "HEAD" {
// For an HTTP HEAD request ServeContent doesn't send any data (just headers)
http.ServeContent(writer, r, "", time.Time{}, content)
log.Infof("%d\tHEAD %s", http.StatusOK, r.URL)
return
}

// Send the content
http.ServeContent(writer, r, "", time.Unix(1, 0), content)

// Write a line to the log
end := time.Now()
completeMsg := fmt.Sprintf("GET %s\t%s - %s: %s / %s transferred",
r.URL, end.Format(time.RFC3339), start.Format(time.RFC3339), time.Since(start),
fmt.Sprintf("%s (%d B)", types.SizeStr(types.NewInt(writeErrWatcher.count)), writeErrWatcher.count))
if isGzipped {
completeMsg += " (gzipped)"
}
if err == nil {
log.Infof("%s %s", completeMsg, "Done")
} else {
log.Warnf("%s %s\n%s", completeMsg, "FAIL", err)
}
}

func convertPieceCID(path string) (cid.Cid, error) {
l := len("/piece/")
if len(path) <= l {
return cid.Undef, fmt.Errorf("path %s too short", path)
}

cidStr := path[l:]
c, err := cid.Parse(cidStr)
if err != nil {
return cid.Undef, fmt.Errorf("parse piece cid failed: %s, %v", cidStr, err)
}

return c, nil
}

func badResponse(w http.ResponseWriter, code int, err error) {
w.WriteHeader(code)
w.Write([]byte("Error: " + err.Error())) // nolint
}

// writeErrorWatcher calls onError if there is an error writing to the writer
type writeErrorWatcher struct {
http.ResponseWriter
count uint64
onError func(err error)
}

func (w *writeErrorWatcher) Write(bz []byte) (int, error) {
count, err := w.ResponseWriter.Write(bz)
if err != nil {
w.onError(err)
}
w.count += uint64(count)
return count, err
}
Loading

0 comments on commit 34a7277

Please sign in to comment.