Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package gossip

import (
"context"
"net"
"net/http"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -153,6 +154,9 @@ func (w *whiteholeNetwork) GetPeers(options ...network.PeerOption) []network.Pee
}
func (w *whiteholeNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
}
func (w *whiteholeNetwork) GetHTTPRequestConnection(request *http.Request) (conn net.Conn) {
return nil
}

func (w *whiteholeNetwork) Start() {
w.quit = make(chan struct{})
Expand Down
6 changes: 6 additions & 0 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mocks

import (
"context"
"net"
"net/http"

"github.com/algorand/go-algorand/network"
Expand Down Expand Up @@ -99,3 +100,8 @@ func (network *MockNetwork) RegisterHTTPHandler(path string, handler http.Handle

// OnNetworkAdvance - empty implementation
func (network *MockNetwork) OnNetworkAdvance() {}

// GetHTTPRequestConnection - empty implementation
func (network *MockNetwork) GetHTTPRequestConnection(request *http.Request) (conn net.Conn) {
return nil
}
117 changes: 106 additions & 11 deletions network/requestTracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ type TrackerRequest struct {
otherTelemetryGUID string
otherInstanceName string
otherPublicAddr string
connection net.Conn
noPrune bool
}

// makeTrackerRequest creates a new TrackerRequest.
func makeTrackerRequest(remoteAddr, remoteHost, remotePort string, createTime time.Time) *TrackerRequest {
func makeTrackerRequest(remoteAddr, remoteHost, remotePort string, createTime time.Time, conn net.Conn) *TrackerRequest {
if remoteHost == "" {
remoteHost, remotePort, _ = net.SplitHostPort(remoteAddr)
}
Expand All @@ -59,13 +61,15 @@ func makeTrackerRequest(remoteAddr, remoteHost, remotePort string, createTime ti
remoteAddr: remoteAddr,
remoteHost: remoteHost,
remotePort: remotePort,
connection: conn,
}
}

// hostIncomingRequests holds all the requests that are originating from a single host.
type hostIncomingRequests struct {
remoteHost string
requests []*TrackerRequest // this is an ordered list, according to the requestsHistory.created
remoteHost string
requests []*TrackerRequest // this is an ordered list, according to the requestsHistory.created
additionalHostRequests map[*TrackerRequest]struct{} // additional requests that aren't included in the "requests", and always assumed to be "alive".
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name prefix "additional" is not informative. Can you please add more information and preferably rename it.
Also, what will the map point to? What will struct{} be?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of the map is to track the count of entries for that tracker, while providing easy ( and efficient ) way to delete them.

}

// findTimestampIndex finds the first an index (i) in the sorted requests array, where requests[i].created is greater than t.
Expand All @@ -80,6 +84,45 @@ func (ard *hostIncomingRequests) findTimestampIndex(t time.Time) int {
return i
}

// convertToAdditionalRequest converts the given trackerRequest into a "additional request".
// unlike regular tracker requests, additional requests does not get pruned.
func (ard *hostIncomingRequests) convertToAdditionalRequest(trackerRequest *TrackerRequest) {
if _, has := ard.additionalHostRequests[trackerRequest]; has {
return
}

i := sort.Search(len(ard.requests), func(i int) bool {
return ard.requests[i].created.After(trackerRequest.created)
})
i--
if i < 0 {
return
}
// we could have several entries with the same timestamp, so we need to consider all of them.
for ; i >= 0; i-- {
if ard.requests[i] == trackerRequest {
break
}
if ard.requests[i].created != trackerRequest.created {
// we can't find the item in the list.
return
}
}
if i < 0 {
return
}
// ok, item was found at index i.
copy(ard.requests[i:], ard.requests[i+1:])
ard.requests[len(ard.requests)-1] = nil
ard.requests = ard.requests[:len(ard.requests)-1]
ard.additionalHostRequests[trackerRequest] = struct{}{}
}

// removeTrackedConnection removes a trackerRequest from the additional requests map
func (ard *hostIncomingRequests) removeTrackedConnection(trackerRequest *TrackerRequest) {
delete(ard.additionalHostRequests, trackerRequest)
}

// add adds the trackerRequest at the correct index within the sorted array.
func (ard *hostIncomingRequests) add(trackerRequest *TrackerRequest) {
// find the new item index.
Expand All @@ -102,7 +145,7 @@ func (ard *hostIncomingRequests) add(trackerRequest *TrackerRequest) {
// countConnections counts the number of connection that we have that occured after the provided specified time
func (ard *hostIncomingRequests) countConnections(rateLimitingWindowStartTime time.Time) (count uint) {
i := ard.findTimestampIndex(rateLimitingWindowStartTime)
return uint(len(ard.requests) - i)
return uint(len(ard.requests) - i + len(ard.additionalHostRequests))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not checking if the connections in additionalHostRequests are after rateLimitingWindowStartTime.
What is the catch? Maybe the comment should change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the connections in ard.additionalHostRequests are such that have already reached the http handler.
As such, these will get deleted only when the connection.Close() is called.
The tracking of the last-30-seconds window doesn't apply for these since these might be a long-living connections.

The purpose of the 30-seconds window was to track incoming connections that never made it to the http handler. Once they did, we don't really care about their "window".

}

type hostsIncomingMap map[string]*hostIncomingRequests
Expand Down Expand Up @@ -136,8 +179,9 @@ func (him *hostsIncomingMap) addRequest(trackerRequest *TrackerRequest) {
requestData, has := (*him)[trackerRequest.remoteHost]
if !has {
requestData = &hostIncomingRequests{
remoteHost: trackerRequest.remoteHost,
requests: make([]*TrackerRequest, 0, 1),
remoteHost: trackerRequest.remoteHost,
requests: make([]*TrackerRequest, 0, 1),
additionalHostRequests: make(map[*TrackerRequest]struct{}),
}
(*him)[trackerRequest.remoteHost] = requestData
}
Expand All @@ -153,6 +197,24 @@ func (him *hostsIncomingMap) countOriginConnections(remoteHost string, rateLimit
return 0
}

// convertToAdditionalRequest converts the given trackerRequest into a "additional request".
func (him *hostsIncomingMap) convertToAdditionalRequest(trackerRequest *TrackerRequest) {
requestData, has := (*him)[trackerRequest.remoteHost]
if !has {
return
}
requestData.convertToAdditionalRequest(trackerRequest)
}

// removeTrackedConnection removes a trackerRequest from the additional requests map
func (him *hostsIncomingMap) removeTrackedConnection(trackerRequest *TrackerRequest) {
requestData, has := (*him)[trackerRequest.remoteHost]
if !has {
return
}
requestData.removeTrackedConnection(trackerRequest)
}

// RequestTracker tracks the incoming request connections
type RequestTracker struct {
downstreamHandler http.Handler
Expand Down Expand Up @@ -185,6 +247,25 @@ func makeRequestsTracker(downstreamHandler http.Handler, log logging.Logger, con
}
}

// requestTrackedConnection used to track the active connections. In particular, it used to remove the
// tracked connection entry from the RequestTracker once a connection is closed.
type requestTrackedConnection struct {
net.Conn
tracker *RequestTracker
}

// Close removes the connection from the tracker's connections map and call the underlaying Close function.
func (c *requestTrackedConnection) Close() error {
c.tracker.hostRequestsMu.Lock()
trackerRequest := c.tracker.acceptedConnections[c.Conn.LocalAddr()]
delete(c.tracker.acceptedConnections, c.Conn.LocalAddr())
if trackerRequest != nil {
c.tracker.hostRequests.removeTrackedConnection(trackerRequest)
}
c.tracker.hostRequestsMu.Unlock()
return c.Conn.Close()
}

// Accept waits for and returns the next connection to the listener.
func (rt *RequestTracker) Accept() (conn net.Conn, err error) {
// the following for loop is a bit tricky :
Expand All @@ -196,7 +277,7 @@ func (rt *RequestTracker) Accept() (conn net.Conn, err error) {
return
}

trackerRequest := makeTrackerRequest(conn.RemoteAddr().String(), "", "", time.Now())
trackerRequest := makeTrackerRequest(conn.RemoteAddr().String(), "", "", time.Now(), conn)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conn is being passed here, but isn't this nil at this point?
It gets initialized further down:

  conn = &requestTrackedConnection{Conn: conn, tracker: rt}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. I missed the initialization above.

rateLimitingWindowStartTime := trackerRequest.created.Add(-time.Duration(rt.config.ConnectionsRateLimitingWindowSeconds) * time.Second)

rt.hostRequestsMu.Lock()
Expand Down Expand Up @@ -232,6 +313,7 @@ func (rt *RequestTracker) Accept() (conn net.Conn, err error) {
// add an entry to the acceptedConnections so that the ServeHTTP could find the connection quickly.
rt.acceptedConnections[conn.LocalAddr()] = trackerRequest
rt.hostRequestsMu.Unlock()
conn = &requestTrackedConnection{Conn: conn, tracker: rt}
return
}
}
Expand All @@ -255,10 +337,11 @@ func (rt *RequestTracker) sendBlockedConnectionResponse(conn net.Conn, requestTi
}

// pruneAcceptedConnections clean stale items form the acceptedConnections map; it's syncornized via the acceptedConnectionsMu mutex which is expected to be taken by the caller.
// in case the created is 0, the pruning is disabled for this connection. The HTTP handlers would call Close to have this entry cleared out.
func (rt *RequestTracker) pruneAcceptedConnections(pruneStartDate time.Time) {
localAddrToRemove := []net.Addr{}
for localAddr, request := range rt.acceptedConnections {
if request.created.Before(pruneStartDate) {
if request.noPrune == false && request.created.Before(pruneStartDate) {
localAddrToRemove = append(localAddrToRemove, localAddr)
}
}
Expand Down Expand Up @@ -292,6 +375,14 @@ func (rt *RequestTracker) GetTrackedRequest(request *http.Request) (trackedReque
return rt.httpConnections[localAddr]
}

// GetRequestConnection return the underlying connection for the given request
func (rt *RequestTracker) GetRequestConnection(request *http.Request) net.Conn {
rt.httpConnectionsMu.Lock()
defer rt.httpConnectionsMu.Unlock()
localAddr := request.Context().Value(http.LocalAddrContextKey).(net.Addr)
return rt.httpConnections[localAddr].connection
}

func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http.Request) {
// this function is called only after we've fetched all the headers. on some malicious clients, this could get delayed, so we can't rely on the
// tcp-connection established time to align with current time.
Expand All @@ -302,16 +393,20 @@ func (rt *RequestTracker) ServeHTTP(response http.ResponseWriter, request *http.

rt.hostRequestsMu.Lock()
trackedRequest := rt.acceptedConnections[localAddr]
delete(rt.acceptedConnections, localAddr)
if trackedRequest != nil {
// update the original tracker request so that it won't get pruned.
if trackedRequest.noPrune == false {
trackedRequest.noPrune = true
rt.hostRequests.convertToAdditionalRequest(trackedRequest)
}
// create a copy, so we can unlock
trackedRequest = makeTrackerRequest(trackedRequest.remoteAddr, trackedRequest.remoteHost, trackedRequest.remotePort, trackedRequest.created)
trackedRequest = makeTrackerRequest(trackedRequest.remoteAddr, trackedRequest.remoteHost, trackedRequest.remotePort, trackedRequest.created, trackedRequest.connection)
}
rt.hostRequestsMu.Unlock()

// we have no request tracker ? no problem; create one on the fly.
if trackedRequest == nil {
trackedRequest = makeTrackerRequest(request.RemoteAddr, "", "", time.Now())
trackedRequest = makeTrackerRequest(request.RemoteAddr, "", "", time.Now(), nil)
}

// update the origin address.
Expand Down
2 changes: 1 addition & 1 deletion network/requestTracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestHostIncomingRequestsOrdering(t *testing.T) {
now := time.Now()
perm := rand.Perm(100)
for i := 0; i < 100; i++ {
trackedRequest := makeTrackerRequest("remoteaddr", "host", "port", now.Add(time.Duration(perm[i])*time.Minute))
trackedRequest := makeTrackerRequest("remoteaddr", "host", "port", now.Add(time.Duration(perm[i])*time.Minute), nil)
hir.add(trackedRequest)
}
require.Equal(t, 100, len(hir.requests))
Expand Down
15 changes: 15 additions & 0 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ type GossipNode interface {
// arrive very quickly, but might be missing some votes. The usage of this call is expected to have similar
// characteristics as with a watchdog timer.
OnNetworkAdvance()

// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same
// request that was provided to the http handler ( or provide a fallback Context() to that )
GetHTTPRequestConnection(request *http.Request) (conn net.Conn)
}

// IncomingMessage represents a message arriving from some peer in our p2p network
Expand Down Expand Up @@ -947,6 +951,17 @@ func (wn *WebsocketNetwork) checkIncomingConnectionVariables(response http.Respo
return http.StatusOK
}

// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same
// request that was provided to the http handler ( or provide a fallback Context() to that )
// if the provided request has no associated connection, it returns nil. ( this should not happen for any http request that was registered
// by WebsocketNetwork )
func (wn *WebsocketNetwork) GetHTTPRequestConnection(request *http.Request) (conn net.Conn) {
if wn.requestsTracker != nil {
conn = wn.requestsTracker.GetRequestConnection(request)
}
return
}

// ServerHTTP handles the gossip network functions over websockets
func (wn *WebsocketNetwork) ServeHTTP(response http.ResponseWriter, request *http.Request) {
trackedRequest := wn.requestsTracker.GetTrackedRequest(request)
Expand Down
41 changes: 32 additions & 9 deletions rpcs/ledgerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gorilla/mux"

Expand All @@ -36,14 +37,25 @@ import (
"github.com/algorand/go-algorand/network"
)

// LedgerResponseContentType is the HTTP Content-Type header for a raw ledger block
const LedgerResponseContentType = "application/x-algorand-ledger-v2.1"
const (
// LedgerResponseContentType is the HTTP Content-Type header for a raw ledger block
LedgerResponseContentType = "application/x-algorand-ledger-v2.1"

const ledgerServerMaxBodyLength = 512 // we don't really pass meaningful content here, so 512 bytes should be a safe limit
ledgerServerMaxBodyLength = 512 // we don't really pass meaningful content here, so 512 bytes should be a safe limit

// LedgerServiceLedgerPath is the path to register LedgerService as a handler for when using gorilla/mux
// e.g. .Handle(LedgerServiceLedgerPath, &ls)
const LedgerServiceLedgerPath = "/v{version:[0-9.]+}/{genesisID}/ledger/{round:[0-9a-z]+}"
// LedgerServiceLedgerPath is the path to register LedgerService as a handler for when using gorilla/mux
// e.g. .Handle(LedgerServiceLedgerPath, &ls)
LedgerServiceLedgerPath = "/v{version:[0-9.]+}/{genesisID}/ledger/{round:[0-9a-z]+}"

// maxCatchpointFileSize is a rough estimate for the worst-case scenario we're going to have of all the accounts data per a single catchpoint file chunk.
maxCatchpointFileSize = 512 * 1024 * 1024 // 512MB

// expectedWorstDownloadSpeedBytesPerSecond defines the worst-case scenario upload speed we expect to get while uploading a catchpoint file
expectedWorstDownloadSpeedBytesPerSecond = 200 * 1024

// maxCatchpointFileChunkDownloadDuration is the maximum amount of time we would wait to download a single chunk off a catchpoint file
maxCatchpointFileWritingDuration = 2*time.Minute + maxCatchpointFileSize*time.Second/expectedWorstDownloadSpeedBytesPerSecond
)

// LedgerService represents the Ledger RPC API
type LedgerService struct {
Expand Down Expand Up @@ -165,6 +177,11 @@ func (ls *LedgerService) ServeHTTP(response http.ResponseWriter, request *http.R
response.Write([]byte(fmt.Sprintf("specified round number could not be parsed using base 36 : %v", err)))
return
}
if conn := ls.net.GetHTTPRequestConnection(request); conn != nil {
conn.SetWriteDeadline(time.Now().Add(maxCatchpointFileWritingDuration))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, we would figure out what the file size is, and have a custom timeout per file. For now, I'm leaving it with a higher max value as I don't want to go into the rabbit hole of retrieving the file size.

} else {
logging.Base().Warnf("LedgerService.ServeHTTP unable to set connection timeout")
}
cs, err := ls.ledger.GetCatchpointStream(basics.Round(round))
if err != nil {
switch err.(type) {
Expand All @@ -186,16 +203,22 @@ func (ls *LedgerService) ServeHTTP(response http.ResponseWriter, request *http.R
requestedCompressedResponse := strings.Contains(request.Header.Get("Accept-Encoding"), "gzip")
if requestedCompressedResponse {
response.Header().Set("Content-Encoding", "gzip")
io.Copy(response, cs)
written, err := io.Copy(response, cs)
if err != nil {
logging.Base().Infof("LedgerService.ServeHTTP : unable to write compressed catchpoint file for round %d, written bytes %d : %v", round, written, err)
}
return
}
decompressedGzip, err := gzip.NewReader(cs)
if err != nil {
logging.Base().Warnf("ServeHTTP : failed to decompress catchpoint %d %v", round, err)
logging.Base().Warnf("LedgerService.ServeHTTP : failed to decompress catchpoint %d %v", round, err)
response.WriteHeader(http.StatusInternalServerError)
response.Write([]byte(fmt.Sprintf("catchpoint file for round %d could not be decompressed due to internal error : %v", round, err)))
return
}
defer decompressedGzip.Close()
io.Copy(response, decompressedGzip)
written, err := io.Copy(response, decompressedGzip)
if err != nil {
logging.Base().Infof("LedgerService.ServeHTTP : unable to write decompressed catchpoint file for round %d, written bytes %d : %v", round, written, err)
}
}