diff --git a/agreement/gossip/network_test.go b/agreement/gossip/network_test.go index a3c5328716..2867734ff6 100644 --- a/agreement/gossip/network_test.go +++ b/agreement/gossip/network_test.go @@ -155,9 +155,6 @@ func (w *whiteholeNetwork) GetPeers(options ...network.PeerOption) []network.Pee } func (w *whiteholeNetwork) RegisterHTTPHandler(path string, handler http.Handler) { } -func (w *whiteholeNetwork) GetHTTPRequestConnection(request *http.Request) (conn network.DeadlineSettableConn) { - return nil -} func (w *whiteholeNetwork) Start() error { w.quit = make(chan struct{}) diff --git a/components/mocks/mockNetwork.go b/components/mocks/mockNetwork.go index f933a553a9..47b1a5b5e4 100644 --- a/components/mocks/mockNetwork.go +++ b/components/mocks/mockNetwork.go @@ -106,11 +106,6 @@ func (network *MockNetwork) RegisterHTTPHandler(path string, handler http.Handle // OnNetworkAdvance - empty implementation func (network *MockNetwork) OnNetworkAdvance() {} -// GetHTTPRequestConnection - empty implementation -func (network *MockNetwork) GetHTTPRequestConnection(request *http.Request) (conn network.DeadlineSettableConn) { - return nil -} - // GetGenesisID - empty implementation func (network *MockNetwork) GetGenesisID() string { if network.GenesisID == "" { diff --git a/network/gossipNode.go b/network/gossipNode.go index 6a028ff193..1592641f70 100644 --- a/network/gossipNode.go +++ b/network/gossipNode.go @@ -20,7 +20,6 @@ import ( "context" "net/http" "strings" - "time" "github.com/algorand/go-algorand/config" "github.com/algorand/go-algorand/protocol" @@ -50,13 +49,6 @@ const ( PeersPhonebookArchivalNodes PeerOption = iota ) -// DeadlineSettableConn abstracts net.Conn and related types as deadline-settable -type DeadlineSettableConn interface { - SetDeadline(time.Time) error - SetReadDeadline(time.Time) error - SetWriteDeadline(time.Time) error -} - // GossipNode represents a node in the gossip network type GossipNode interface { Address() (string, bool) @@ -104,10 +96,6 @@ type GossipNode interface { // characteristics as with a watchdog timer. OnNetworkAdvance() - // GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same - // request that was provided to the http handler ( or provide a fallback Context() to that ) - GetHTTPRequestConnection(request *http.Request) (conn DeadlineSettableConn) - // GetGenesisID returns the network-specific genesisID. GetGenesisID() string diff --git a/network/hybridNetwork.go b/network/hybridNetwork.go index f324deb73f..27fc6edbb0 100644 --- a/network/hybridNetwork.go +++ b/network/hybridNetwork.go @@ -212,16 +212,6 @@ func (n *HybridP2PNetwork) OnNetworkAdvance() { }) } -// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same -// request that was provided to the http handler ( or provide a fallback Context() to that ) -func (n *HybridP2PNetwork) GetHTTPRequestConnection(request *http.Request) (conn DeadlineSettableConn) { - conn = n.wsNetwork.GetHTTPRequestConnection(request) - if conn != nil { - return conn - } - return n.p2pNetwork.GetHTTPRequestConnection(request) -} - // GetGenesisID returns the network-specific genesisID. func (n *HybridP2PNetwork) GetGenesisID() string { return n.genesisID diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index f4ed670f3e..ac0489d5e1 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -38,7 +38,6 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" - libp2phttp "github.com/libp2p/go-libp2p/p2p/http" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" "github.com/libp2p/go-libp2p/p2p/security/noise" "github.com/libp2p/go-libp2p/p2p/transport/tcp" @@ -67,8 +66,6 @@ type Service interface { ListPeersForTopic(topic string) []peer.ID Subscribe(topic string, val pubsub.ValidatorEx) (SubNextCancellable, error) Publish(ctx context.Context, topic string, data []byte) error - - GetStream(peer.ID) (network.Stream, bool) } // serviceImpl manages integration with libp2p and implements the Service interface @@ -137,47 +134,7 @@ func MakeHost(cfg config.Local, datadir string, pstore *pstore.PeerStore) (host. libp2p.Security(noise.ID, noise.New), disableMetrics, ) - return &StreamChainingHost{ - Host: host, - handlers: map[protocol.ID][]network.StreamHandler{}, - }, listenAddr, err -} - -// StreamChainingHost is a wrapper around host.Host that overrides SetStreamHandler -// to allow chaining multiple handlers for the same protocol. -// Note, there should be probably only single handler that writes/reads streams. -type StreamChainingHost struct { - host.Host - handlers map[protocol.ID][]network.StreamHandler - mutex deadlock.Mutex -} - -// SetStreamHandler overrides the host.Host.SetStreamHandler method for chaining multiple handlers. -// Function objects are not comparable so theoretically it could have duplicates. -// The main use case is to track HTTP streams for ProtocolIDForMultistreamSelect = "/http/1.1" -// so it could just filter for such protocol if there any issues with other protocols like kad or mesh. -func (h *StreamChainingHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) { - h.mutex.Lock() - defer h.mutex.Unlock() - - handlers := h.handlers[pid] - if len(handlers) == 0 { - // no other handlers, do not set a proxy handler - h.Host.SetStreamHandler(pid, handler) - h.handlers[pid] = append(handlers, handler) - return - } - // otherwise chain the handlers with a copy of the existing handlers - handlers = append(handlers, handler) - // copy to save it in the closure and call lock free - currentHandlers := make([]network.StreamHandler, len(handlers)) - copy(currentHandlers, handlers) - h.Host.SetStreamHandler(pid, func(s network.Stream) { - for _, h := range currentHandlers { - h(s) - } - }) - h.handlers[pid] = handlers + return host, listenAddr, err } // MakeService creates a P2P service instance @@ -186,7 +143,6 @@ func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h ho sm := makeStreamManager(ctx, log, h, wsStreamHandler) h.Network().Notify(sm) h.SetStreamHandler(AlgorandWsProtocol, sm.streamHandler) - h.SetStreamHandler(libp2phttp.ProtocolIDForMultistreamSelect, sm.streamHandlerHTTP) // set an empty handler for telemetryID/telemetryInstance protocol in order to allow other peers to know our telemetryID telemetryID := log.GetTelemetryGUID() @@ -294,10 +250,6 @@ func (s *serviceImpl) ClosePeer(peer peer.ID) error { return s.host.Network().ClosePeer(peer) } -func (s *serviceImpl) GetStream(peerID peer.ID) (network.Stream, bool) { - return s.streams.getStream(peerID) -} - // netAddressToListenAddress converts a netAddress in "ip:port" format to a listen address // that can be passed in to libp2p.ListenAddrStrings func netAddressToListenAddress(netAddress string) (string, error) { diff --git a/network/p2p/p2p_test.go b/network/p2p/p2p_test.go index fb14193a55..dab6aa5456 100644 --- a/network/p2p/p2p_test.go +++ b/network/p2p/p2p_test.go @@ -19,19 +19,14 @@ package p2p import ( "context" "fmt" - "sync/atomic" "testing" - "time" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" - "github.com/algorand/go-algorand/config" - "github.com/algorand/go-algorand/network/p2p/peerstore" "github.com/algorand/go-algorand/test/partitiontest" ) @@ -86,67 +81,6 @@ func TestNetAddressToListenAddress(t *testing.T) { } } -func TestP2PStreamingHost(t *testing.T) { - partitiontest.PartitionTest(t) - - cfg := config.GetDefaultLocal() - dir := t.TempDir() - pstore, err := peerstore.NewPeerStore(nil, "") - require.NoError(t, err) - h, la, err := MakeHost(cfg, dir, pstore) - require.NoError(t, err) - - var h1calls atomic.Int64 - h1 := func(network.Stream) { - h1calls.Add(1) - } - var h2calls atomic.Int64 - h2 := func(network.Stream) { - h2calls.Add(1) - } - - ma, err := multiaddr.NewMultiaddr(la) - require.NoError(t, err) - h.Network().Listen(ma) - defer h.Close() - - h.SetStreamHandler(AlgorandWsProtocol, h1) - h.SetStreamHandler(AlgorandWsProtocol, h2) - - addrInfo := peer.AddrInfo{ - ID: h.ID(), - Addrs: h.Addrs(), - } - cpstore, err := peerstore.NewPeerStore([]*peer.AddrInfo{&addrInfo}, "") - require.NoError(t, err) - c, _, err := MakeHost(cfg, dir, cpstore) - require.NoError(t, err) - defer c.Close() - - s1, err := c.NewStream(context.Background(), h.ID(), AlgorandWsProtocol) - require.NoError(t, err) - s1.Write([]byte("hello")) - defer s1.Close() - - require.Eventually(t, func() bool { - return h1calls.Load() == 1 && h2calls.Load() == 1 - }, 5*time.Second, 100*time.Millisecond) - - // ensure a single handler also works as expected - h1calls.Store(0) - h.SetStreamHandler(algorandP2pHTTPProtocol, h1) - - s2, err := c.NewStream(context.Background(), h.ID(), algorandP2pHTTPProtocol) - require.NoError(t, err) - s2.Write([]byte("hello")) - defer s2.Close() - - require.Eventually(t, func() bool { - return h1calls.Load() == 1 - }, 5*time.Second, 100*time.Millisecond) - -} - // TestP2PGetPeerTelemetryInfo tests the GetPeerTelemetryInfo function func TestP2PGetPeerTelemetryInfo(t *testing.T) { partitiontest.PartitionTest(t) diff --git a/network/p2p/streams.go b/network/p2p/streams.go index d16633adfd..e7277f4871 100644 --- a/network/p2p/streams.go +++ b/network/p2p/streams.go @@ -104,20 +104,6 @@ func (n *streamManager) streamHandler(stream network.Stream) { n.handler(n.ctx, remotePeer, stream, incoming) } -// streamHandlerHTTP tracks the ProtocolIDForMultistreamSelect = "/http/1.1" streams -func (n *streamManager) streamHandlerHTTP(stream network.Stream) { - n.streamsLock.Lock() - defer n.streamsLock.Unlock() - n.streams[stream.Conn().LocalPeer()] = stream -} - -func (n *streamManager) getStream(peerID peer.ID) (network.Stream, bool) { - n.streamsLock.Lock() - defer n.streamsLock.Unlock() - stream, ok := n.streams[peerID] - return stream, ok -} - // Connected is called when a connection is opened func (n *streamManager) Connected(net network.Network, conn network.Conn) { remotePeer := conn.RemotePeer() @@ -174,12 +160,6 @@ func (n *streamManager) Disconnected(net network.Network, conn network.Conn) { stream.Close() delete(n.streams, conn.RemotePeer()) } - - stream, ok = n.streams[conn.LocalPeer()] - if ok { - stream.Close() - delete(n.streams, conn.LocalPeer()) - } } // Listen is called when network starts listening on an addr diff --git a/network/p2pNetwork.go b/network/p2pNetwork.go index 7ebbb5a665..1ad49bd045 100644 --- a/network/p2pNetwork.go +++ b/network/p2pNetwork.go @@ -19,7 +19,6 @@ package network import ( "context" "math/rand" - "net" "net/http" "strings" "sync" @@ -725,23 +724,6 @@ func (n *P2PNetwork) OnNetworkAdvance() { } } -// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same -// request that was provided to the http handler ( or provide a fallback Context() to that ) -func (n *P2PNetwork) GetHTTPRequestConnection(request *http.Request) (conn DeadlineSettableConn) { - addr := request.Context().Value(http.LocalAddrContextKey).(net.Addr) - peerID, err := peer.Decode(addr.String()) - if err != nil { - n.log.Infof("GetHTTPRequestConnection failed to decode %s", addr.String()) - return nil - } - conn, ok := n.service.GetStream(peerID) - if !ok { - n.log.Warnf("GetHTTPRequestConnection no such stream for peer %s", peerID.String()) - return nil - } - return conn -} - // wsStreamHandler is a callback that the p2p package calls when a new peer connects and establishes a // stream for the websocket protocol. func (n *P2PNetwork) wsStreamHandler(ctx context.Context, p2pPeer peer.ID, stream network.Stream, incoming bool) { diff --git a/network/p2pNetwork_test.go b/network/p2pNetwork_test.go index 5bd582ead0..3548dbd1cb 100644 --- a/network/p2pNetwork_test.go +++ b/network/p2pNetwork_test.go @@ -366,10 +366,6 @@ func (s *mockService) Publish(ctx context.Context, topic string, data []byte) er return nil } -func (s *mockService) GetStream(peer.ID) (network.Stream, bool) { - return nil, false -} - func makeMockService(id peer.ID, addrs []ma.Multiaddr) *mockService { return &mockService{ id: id, @@ -725,8 +721,9 @@ type p2phttpHandler struct { func (h *p2phttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Write([]byte(h.retData)) if r.URL.Path == "/check-conn" { - c := h.net.GetHTTPRequestConnection(r) - require.NotNil(h.tb, c) + rc := http.NewResponseController(w) + err := rc.SetWriteDeadline(time.Now().Add(10 * time.Second)) + require.NoError(h.tb, err) } } diff --git a/network/requestTracker.go b/network/requestTracker.go index 47eba90c7e..8c603e2a32 100644 --- a/network/requestTracker.go +++ b/network/requestTracker.go @@ -62,12 +62,10 @@ type TrackerRequest struct { otherTelemetryGUID string otherInstanceName string - connection net.Conn - noPrune bool } // makeTrackerRequest creates a new TrackerRequest. -func makeTrackerRequest(remoteAddr, remoteHost, remotePort string, createTime time.Time, conn net.Conn) *TrackerRequest { +func makeTrackerRequest(remoteAddr, remoteHost, remotePort string, createTime time.Time) *TrackerRequest { if remoteHost == "" { remoteHost, remotePort, _ = net.SplitHostPort(remoteAddr) } @@ -77,7 +75,6 @@ func makeTrackerRequest(remoteAddr, remoteHost, remotePort string, createTime ti remoteAddr: remoteAddr, remoteHost: remoteHost, remotePort: remotePort, - connection: conn, } } @@ -120,9 +117,8 @@ func (tr *TrackerRequest) remoteAddress() string { // hostIncomingRequests holds all the requests that are originating from a single host. type hostIncomingRequests struct { - remoteHost string - requests []*TrackerRequest // this is an ordered list, according to the requestsHistory.created - additionalHostRequests map[*TrackerRequest]struct{} // additional requests that aren't included in the "requests", and always assumed to be "alive". + remoteHost string + requests []*TrackerRequest // this is an ordered list, according to the requestsHistory.created } // findTimestampIndex finds the first an index (i) in the sorted requests array, where requests[i].created is greater than t. @@ -137,45 +133,6 @@ func (ard *hostIncomingRequests) findTimestampIndex(t time.Time) int { return i } -// convertToAdditionalRequest converts the given trackerRequest into a "additional request". -// unlike regular tracker requests, additional requests does not get pruned. -func (ard *hostIncomingRequests) convertToAdditionalRequest(trackerRequest *TrackerRequest) { - if _, has := ard.additionalHostRequests[trackerRequest]; has { - return - } - - i := sort.Search(len(ard.requests), func(i int) bool { - return ard.requests[i].created.After(trackerRequest.created) - }) - i-- - if i < 0 { - return - } - // we could have several entries with the same timestamp, so we need to consider all of them. - for ; i >= 0; i-- { - if ard.requests[i] == trackerRequest { - break - } - if ard.requests[i].created != trackerRequest.created { - // we can't find the item in the list. - return - } - } - if i < 0 { - return - } - // ok, item was found at index i. - copy(ard.requests[i:], ard.requests[i+1:]) - ard.requests[len(ard.requests)-1] = nil - ard.requests = ard.requests[:len(ard.requests)-1] - ard.additionalHostRequests[trackerRequest] = struct{}{} -} - -// removeTrackedConnection removes a trackerRequest from the additional requests map -func (ard *hostIncomingRequests) removeTrackedConnection(trackerRequest *TrackerRequest) { - delete(ard.additionalHostRequests, trackerRequest) -} - // add adds the trackerRequest at the correct index within the sorted array. func (ard *hostIncomingRequests) add(trackerRequest *TrackerRequest) { // find the new item index. @@ -197,7 +154,7 @@ func (ard *hostIncomingRequests) add(trackerRequest *TrackerRequest) { // countConnections counts the number of connection that we have that occurred after the provided specified time func (ard *hostIncomingRequests) countConnections(rateLimitingWindowStartTime time.Time) (count uint) { i := ard.findTimestampIndex(rateLimitingWindowStartTime) - return uint(len(ard.requests) - i + len(ard.additionalHostRequests)) + return uint(len(ard.requests) - i) } //msgp:ignore hostsIncomingMap @@ -232,9 +189,8 @@ func (him *hostsIncomingMap) addRequest(trackerRequest *TrackerRequest) { requestData, has := (*him)[trackerRequest.remoteHost] if !has { requestData = &hostIncomingRequests{ - remoteHost: trackerRequest.remoteHost, - requests: make([]*TrackerRequest, 0, 1), - additionalHostRequests: make(map[*TrackerRequest]struct{}), + remoteHost: trackerRequest.remoteHost, + requests: make([]*TrackerRequest, 0, 1), } (*him)[trackerRequest.remoteHost] = requestData } @@ -250,24 +206,6 @@ func (him *hostsIncomingMap) countOriginConnections(remoteHost string, rateLimit return 0 } -// convertToAdditionalRequest converts the given trackerRequest into a "additional request". -func (him *hostsIncomingMap) convertToAdditionalRequest(trackerRequest *TrackerRequest) { - requestData, has := (*him)[trackerRequest.remoteHost] - if !has { - return - } - requestData.convertToAdditionalRequest(trackerRequest) -} - -// removeTrackedConnection removes a trackerRequest from the additional requests map -func (him *hostsIncomingMap) removeTrackedConnection(trackerRequest *TrackerRequest) { - requestData, has := (*him)[trackerRequest.remoteHost] - if !has { - return - } - requestData.removeTrackedConnection(trackerRequest) -} - // RequestTracker tracks the incoming request connections type RequestTracker struct { downstreamHandler http.Handler @@ -300,29 +238,6 @@ func makeRequestsTracker(downstreamHandler http.Handler, log logging.Logger, con } } -// requestTrackedConnection used to track the active connections. In particular, it used to remove the -// tracked connection entry from the RequestTracker once a connection is closed. -type requestTrackedConnection struct { - net.Conn - tracker *RequestTracker -} - -func (c *requestTrackedConnection) UnderlyingConn() net.Conn { - return c.Conn -} - -// Close removes the connection from the tracker's connections map and call the underlaying Close function. -func (c *requestTrackedConnection) Close() error { - c.tracker.hostRequestsMu.Lock() - trackerRequest := c.tracker.acceptedConnections[c.Conn.LocalAddr()] - delete(c.tracker.acceptedConnections, c.Conn.LocalAddr()) - if trackerRequest != nil { - c.tracker.hostRequests.removeTrackedConnection(trackerRequest) - } - c.tracker.hostRequestsMu.Unlock() - return c.Conn.Close() -} - // Accept waits for and returns the next connection to the listener. func (rt *RequestTracker) Accept() (conn net.Conn, err error) { // the following for loop is a bit tricky : @@ -334,7 +249,7 @@ func (rt *RequestTracker) Accept() (conn net.Conn, err error) { return } - trackerRequest := makeTrackerRequest(conn.RemoteAddr().String(), "", "", time.Now(), conn) + trackerRequest := makeTrackerRequest(conn.RemoteAddr().String(), "", "", time.Now()) rateLimitingWindowStartTime := trackerRequest.created.Add(-time.Duration(rt.config.ConnectionsRateLimitingWindowSeconds) * time.Second) rt.hostRequestsMu.Lock() @@ -376,7 +291,6 @@ func (rt *RequestTracker) Accept() (conn net.Conn, err error) { // add an entry to the acceptedConnections so that the ServeHTTP could find the connection quickly. rt.acceptedConnections[conn.LocalAddr()] = trackerRequest rt.hostRequestsMu.Unlock() - conn = &requestTrackedConnection{Conn: conn, tracker: rt} return } } @@ -421,7 +335,7 @@ func (rt *RequestTracker) sendBlockedConnectionResponse(conn net.Conn, requestTi func (rt *RequestTracker) pruneAcceptedConnections(pruneStartDate time.Time) { localAddrToRemove := []net.Addr{} for localAddr, request := range rt.acceptedConnections { - if !request.noPrune && request.created.Before(pruneStartDate) { + if !request.created.Before(pruneStartDate) { localAddrToRemove = append(localAddrToRemove, localAddr) } } @@ -478,14 +392,6 @@ func (rt *RequestTracker) GetTrackedRequest(request *http.Request) (trackedReque return rt.httpConnections[localAddr] } -// GetRequestConnection return the underlying connection for the given request -func (rt *RequestTracker) GetRequestConnection(request *http.Request) net.Conn { - rt.httpConnectionsMu.Lock() - defer rt.httpConnectionsMu.Unlock() - localAddr := request.Context().Value(http.LocalAddrContextKey).(net.Addr) - return rt.httpConnections[localAddr].connection -} - func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http.Request) { // this function is called only after we've fetched all the headers. on some malicious clients, this could get delayed, so we can't rely on the // tcp-connection established time to align with current time. @@ -510,20 +416,16 @@ func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http. } trackedRequest := rt.acceptedConnections[localAddr] + delete(rt.acceptedConnections, localAddr) if trackedRequest != nil { - // update the original tracker request so that it won't get pruned. - if !trackedRequest.noPrune { - trackedRequest.noPrune = true - rt.hostRequests.convertToAdditionalRequest(trackedRequest) - } // create a copy, so we can unlock - trackedRequest = makeTrackerRequest(trackedRequest.remoteAddr, trackedRequest.remoteHost, trackedRequest.remotePort, trackedRequest.created, trackedRequest.connection) + trackedRequest = makeTrackerRequest(trackedRequest.remoteAddr, trackedRequest.remoteHost, trackedRequest.remotePort, trackedRequest.created) } rt.hostRequestsMu.Unlock() // we have no request tracker ? no problem; create one on the fly. if trackedRequest == nil { - trackedRequest = makeTrackerRequest(request.RemoteAddr, "", "", time.Now(), nil) + trackedRequest = makeTrackerRequest(request.RemoteAddr, "", "", time.Now()) } // update the origin address. diff --git a/network/requestTracker_test.go b/network/requestTracker_test.go index 158cf45336..d814507c78 100644 --- a/network/requestTracker_test.go +++ b/network/requestTracker_test.go @@ -51,7 +51,7 @@ func TestHostIncomingRequestsOrdering(t *testing.T) { now := time.Now() perm := rand.Perm(100) for i := 0; i < 100; i++ { - trackedRequest := makeTrackerRequest("remoteaddr", "host", "port", now.Add(time.Duration(perm[i])*time.Minute), nil) + trackedRequest := makeTrackerRequest("remoteaddr", "host", "port", now.Add(time.Duration(perm[i])*time.Minute)) hir.add(trackedRequest) } require.Equal(t, 100, len(hir.requests)) @@ -178,7 +178,7 @@ func TestRemoteAddress(t *testing.T) { partitiontest.PartitionTest(t) t.Parallel() - tr := makeTrackerRequest("127.0.0.1:444", "", "", time.Now(), nil) + tr := makeTrackerRequest("127.0.0.1:444", "", "", time.Now()) require.Equal(t, "127.0.0.1:444", tr.remoteAddr) require.Equal(t, "127.0.0.1", tr.remoteHost) require.Equal(t, "444", tr.remotePort) diff --git a/network/wsNetwork.go b/network/wsNetwork.go index 8a43ad5234..f222d2ff27 100644 --- a/network/wsNetwork.go +++ b/network/wsNetwork.go @@ -1031,17 +1031,6 @@ func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.Respo return http.StatusOK } -// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same -// request that was provided to the http handler ( or provide a fallback Context() to that ) -// if the provided request has no associated connection, it returns nil. ( this should not happen for any http request that was registered -// by WebsocketNetwork ) -func (wn *WebsocketNetwork) GetHTTPRequestConnection(request *http.Request) (conn DeadlineSettableConn) { - if wn.requestsTracker != nil { - conn = wn.requestsTracker.GetRequestConnection(request) - } - return -} - // ServerHTTP handles the gossip network functions over websockets func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *http.Request) { if !wn.config.EnableGossipService { diff --git a/rpcs/ledgerService.go b/rpcs/ledgerService.go index 823895a417..d76273de62 100644 --- a/rpcs/ledgerService.go +++ b/rpcs/ledgerService.go @@ -34,7 +34,6 @@ import ( "github.com/algorand/go-algorand/ledger" "github.com/algorand/go-algorand/ledger/ledgercore" "github.com/algorand/go-algorand/logging" - "github.com/algorand/go-algorand/network" ) const ( @@ -63,7 +62,6 @@ type LedgerForService interface { // httpGossipNode is a reduced interface for the gossipNode that only includes the methods needed by the LedgerService type httpGossipNode interface { RegisterHTTPHandler(path string, handler http.Handler) - GetHTTPRequestConnection(request *http.Request) (conn network.DeadlineSettableConn) } // LedgerService represents the Ledger RPC API @@ -211,17 +209,16 @@ func (ls *LedgerService) ServeHTTP(response http.ResponseWriter, request *http.R response.WriteHeader(http.StatusOK) return } - if conn := ls.net.GetHTTPRequestConnection(request); conn != nil { - maxCatchpointFileWritingDuration := 2 * time.Minute + rc := http.NewResponseController(response) + maxCatchpointFileWritingDuration := 2 * time.Minute - catchpointFileSize, err := cs.Size() - if err != nil || catchpointFileSize <= 0 { - maxCatchpointFileWritingDuration += maxCatchpointFileSize * time.Second / expectedWorstUploadSpeedBytesPerSecond - } else { - maxCatchpointFileWritingDuration += time.Duration(catchpointFileSize) * time.Second / expectedWorstUploadSpeedBytesPerSecond - } - conn.SetWriteDeadline(time.Now().Add(maxCatchpointFileWritingDuration)) + catchpointFileSize, err := cs.Size() + if err != nil || catchpointFileSize <= 0 { + maxCatchpointFileWritingDuration += maxCatchpointFileSize * time.Second / expectedWorstUploadSpeedBytesPerSecond } else { + maxCatchpointFileWritingDuration += time.Duration(catchpointFileSize) * time.Second / expectedWorstUploadSpeedBytesPerSecond + } + if wdErr := rc.SetWriteDeadline(time.Now().Add(maxCatchpointFileWritingDuration)); wdErr != nil { logging.Base().Warnf("LedgerService.ServeHTTP unable to set connection timeout") }