From b7876cc9c14a8bbf4975bc160e719cefa3038e0a Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Thu, 6 Mar 2025 18:55:05 -0500 Subject: [PATCH] p2p: fix http RoundTripper --- network/p2p/http.go | 106 ++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 103 insertions(+), 3 deletions(-) diff --git a/network/p2p/http.go b/network/p2p/http.go index e82ac482f4..bb053cdd92 100644 --- a/network/p2p/http.go +++ b/network/p2p/http.go @@ -17,6 +17,10 @@ package p2p import ( + "bufio" + "context" + "fmt" + "io" "net/http" "sync" "time" @@ -26,7 +30,10 @@ import ( "github.com/gorilla/mux" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + bhost "github.com/libp2p/go-libp2p/p2p/host/basic" libp2phttp "github.com/libp2p/go-libp2p/p2p/http" "github.com/multiformats/go-multiaddr" ) @@ -117,13 +124,16 @@ func makeHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Cl logging.Base().Debugf("MakeHTTPClient made a new P2P host %s for %s", clientStreamHost.ID(), addrInfo.String()) } - client := libp2phttp.Host{StreamHost: clientStreamHost} + host := &libp2phttp.Host{StreamHost: clientStreamHost} - // Do not use client.NamespacedClient to prevent it making connection to a well-known handler + // Do not use libp2phttp.Host.NamespacedClient to prevent it making connection to a well-known handler // to make a NamespaceRoundTripper that limits to specific URL paths. // First, we do not want make requests when listing peers (the main MakeHTTPClient invoker). // Secondly, this makes unit testing easier - no need to register fake handlers. - rt, err := client.NewConstrainedRoundTripper(*addrInfo) + // Do not use libp2phttp.Host.NewConstrainedRoundTripper because it uses http.Request's context + // when establishing new streams so might not be able to bail out in reasonable time when used for catchpoints. + // Instead, we use a custom RoundTripper that uses its own context with a timeout when establishing a new stream. + rt, err := newP2pHTTPRoundTripper(host, addrInfo) if err != nil { return nil, err } @@ -131,6 +141,96 @@ func makeHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Cl return &http.Client{Transport: rt}, nil } +// newP2pHTTPRoundTripper is a ligtweight version of libp2phttp.Host.NewConstrainedRoundTripper +// that returns an own p2pHttpRoundTripper instead of streamRoundTripper. +func newP2pHTTPRoundTripper(h *libp2phttp.Host, server *peer.AddrInfo) (http.RoundTripper, error) { + // Do we have an existing connection to this peer? + existingStreamConn := false + if server.ID != "" && h.StreamHost != nil { + existingStreamConn = len(h.StreamHost.Network().ConnsToPeer(server.ID)) > 0 + } + + // Otherwise use a stream based transport + if h.StreamHost == nil { + return nil, fmt.Errorf("can not use the HTTP transport (either no address or PeerID auth is required), and no stream host provided") + } + if !existingStreamConn { + if server.ID == "" { + return nil, fmt.Errorf("can not use the HTTP transport, and no server peer ID provided") + } + } + + return &p2pHTTPRoundTripper{h: h.StreamHost, server: server.ID, serverAddrs: server.Addrs, httpHost: h}, nil +} + +// p2pHTTPRoundTripper is a custom http.RoundTripper that uses libp2p transport. +// It is a lightweight version of libp2phttp.streamRoundTripper with its own RoundTrip implementation. +// The main difference and the reason for this custom implementation is that it does not use http.Request's context +// but uses its own context with a timeout when establishing a new stream. +type p2pHTTPRoundTripper struct { + h host.Host + server peer.ID + serverAddrs []multiaddr.Multiaddr + httpHost *libp2phttp.Host + addrsAdded sync.Once +} + +// streamReadCloser is a copy of libp2phttp.streamReadCloser +type streamReadCloser struct { + io.ReadCloser + s network.Stream +} + +func (s *streamReadCloser) Close() error { + s.s.Close() + return s.ReadCloser.Close() +} + +func (rt *p2pHTTPRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + rt.addrsAdded.Do(func() { + if len(rt.serverAddrs) > 0 { + rt.h.Peerstore().AddAddrs(rt.server, rt.serverAddrs, peerstore.TempAddrTTL) + } + rt.serverAddrs = nil // may as well cleanup + }) + + ctx, cancel := context.WithTimeout(context.Background(), bhost.DefaultNegotiationTimeout) + defer cancel() + s, err := rt.h.NewStream(ctx, rt.server, libp2phttp.ProtocolIDForMultistreamSelect) + if err != nil { + return nil, err + } + + // Write connection: close header to ensure the stream is closed after the response + r.Header.Add("connection", "close") + + go func() { + defer func() { + _ = s.CloseWrite() + }() + _ = r.Write(s) + if r.Body != nil { + r.Body.Close() + } + }() + + if deadline, ok := r.Context().Deadline(); ok { + err = s.SetReadDeadline(deadline) + if err != nil { + s.Close() + return nil, err + } + } + + resp, err := http.ReadResponse(bufio.NewReader(s), r) + if err != nil { + s.Close() + return nil, err + } + resp.Body = &streamReadCloser{resp.Body, s} + return resp, nil +} + // makeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address. func makeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, p2pService *serviceImpl, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) { cl, err := makeHTTPClient(addrInfo, WithHost(p2pService.host))