Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 103 additions & 3 deletions network/p2p/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
package p2p

import (
"bufio"
"context"
"fmt"
"io"
"net/http"
"sync"
"time"
Expand All @@ -26,7 +30,10 @@ import (
"github.com/gorilla/mux"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
bhost "github.com/libp2p/go-libp2p/p2p/host/basic"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/multiformats/go-multiaddr"
)
Expand Down Expand Up @@ -117,20 +124,113 @@ func makeHTTPClient(addrInfo *peer.AddrInfo, opts ...httpClientOption) (*http.Cl
logging.Base().Debugf("MakeHTTPClient made a new P2P host %s for %s", clientStreamHost.ID(), addrInfo.String())
}

client := libp2phttp.Host{StreamHost: clientStreamHost}
host := &libp2phttp.Host{StreamHost: clientStreamHost}

// Do not use client.NamespacedClient to prevent it making connection to a well-known handler
// Do not use libp2phttp.Host.NamespacedClient to prevent it making connection to a well-known handler
// to make a NamespaceRoundTripper that limits to specific URL paths.
// First, we do not want make requests when listing peers (the main MakeHTTPClient invoker).
// Secondly, this makes unit testing easier - no need to register fake handlers.
rt, err := client.NewConstrainedRoundTripper(*addrInfo)
Comment thread
algorandskiy marked this conversation as resolved.
// Do not use libp2phttp.Host.NewConstrainedRoundTripper because it uses http.Request's context
// when establishing new streams so might not be able to bail out in reasonable time when used for catchpoints.
// Instead, we use a custom RoundTripper that uses its own context with a timeout when establishing a new stream.
rt, err := newP2pHTTPRoundTripper(host, addrInfo)
if err != nil {
return nil, err
}

return &http.Client{Transport: rt}, nil
}

// newP2pHTTPRoundTripper is a ligtweight version of libp2phttp.Host.NewConstrainedRoundTripper
// that returns an own p2pHttpRoundTripper instead of streamRoundTripper.
func newP2pHTTPRoundTripper(h *libp2phttp.Host, server *peer.AddrInfo) (http.RoundTripper, error) {
// Do we have an existing connection to this peer?
existingStreamConn := false
if server.ID != "" && h.StreamHost != nil {
existingStreamConn = len(h.StreamHost.Network().ConnsToPeer(server.ID)) > 0
}

// Otherwise use a stream based transport
if h.StreamHost == nil {
return nil, fmt.Errorf("can not use the HTTP transport (either no address or PeerID auth is required), and no stream host provided")
}
if !existingStreamConn {
if server.ID == "" {
return nil, fmt.Errorf("can not use the HTTP transport, and no server peer ID provided")
}
}

return &p2pHTTPRoundTripper{h: h.StreamHost, server: server.ID, serverAddrs: server.Addrs, httpHost: h}, nil
}

// p2pHTTPRoundTripper is a custom http.RoundTripper that uses libp2p transport.
// It is a lightweight version of libp2phttp.streamRoundTripper with its own RoundTrip implementation.
// The main difference and the reason for this custom implementation is that it does not use http.Request's context
// but uses its own context with a timeout when establishing a new stream.
type p2pHTTPRoundTripper struct {
h host.Host
server peer.ID
serverAddrs []multiaddr.Multiaddr
httpHost *libp2phttp.Host
addrsAdded sync.Once
}

// streamReadCloser is a copy of libp2phttp.streamReadCloser
type streamReadCloser struct {
io.ReadCloser
s network.Stream
}

func (s *streamReadCloser) Close() error {
s.s.Close()
return s.ReadCloser.Close()
}

func (rt *p2pHTTPRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
rt.addrsAdded.Do(func() {
if len(rt.serverAddrs) > 0 {
rt.h.Peerstore().AddAddrs(rt.server, rt.serverAddrs, peerstore.TempAddrTTL)
}
rt.serverAddrs = nil // may as well cleanup
})

ctx, cancel := context.WithTimeout(context.Background(), bhost.DefaultNegotiationTimeout)
defer cancel()
s, err := rt.h.NewStream(ctx, rt.server, libp2phttp.ProtocolIDForMultistreamSelect)
if err != nil {
return nil, err
}

// Write connection: close header to ensure the stream is closed after the response
r.Header.Add("connection", "close")

go func() {
defer func() {
_ = s.CloseWrite()
}()
_ = r.Write(s)
if r.Body != nil {
r.Body.Close()
}
}()

if deadline, ok := r.Context().Deadline(); ok {
err = s.SetReadDeadline(deadline)
if err != nil {
s.Close()
return nil, err
}
}

resp, err := http.ReadResponse(bufio.NewReader(s), r)
if err != nil {
s.Close()
return nil, err
}
resp.Body = &streamReadCloser{resp.Body, s}
return resp, nil
}

// makeHTTPClientWithRateLimit creates a http.Client that uses libp2p transport for a given protocol and peer address.
func makeHTTPClientWithRateLimit(addrInfo *peer.AddrInfo, p2pService *serviceImpl, connTimeStore limitcaller.ConnectionTimeStore, queueingTimeout time.Duration) (*http.Client, error) {
cl, err := makeHTTPClient(addrInfo, WithHost(p2pService.host))
Expand Down