Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions modules/caddyhttp/reverseproxy/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,33 +102,31 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er
})
return true
})
// Iterate over the inflight hosts
inflightHosts.Range(func(key, val any) bool {
address, ok := key.(string)
if !ok {
rangeErr = caddy.APIError{
HTTPStatus: http.StatusInternalServerError,
Err: fmt.Errorf("could not type assert upstream address"),
}
return false
}

upstream, ok := val.(*Host)
if !ok {
rangeErr = caddy.APIError{
HTTPStatus: http.StatusInternalServerError,
Err: fmt.Errorf("could not type assert upstream struct"),
// Iterate over our new in-flight tracker map
currentInFlight := getInFlightRequests()
for address, count := range currentInFlight {
// We only add entries that are actively in-flight but not present
// in the static hosts pool (e.g. dynamic upstreams during cleanup)

// Check if this address is already in the results list (from the static hosts pool)
alreadyInResults := false
for _, res := range results {
if res.Address == address {
alreadyInResults = true
break
}
return false
}

results = append(results, upstreamStatus{
Address: address,
NumRequests: upstream.NumRequests(),
Fails: upstream.Fails(),
})
return true
})
// If it's not in the static pool, we append it to expose it in the API
if !alreadyInResults {
results = append(results, upstreamStatus{
Address: address,
NumRequests: int(count), // Cast uint from our map to int for the struct
Fails: 0, // Ephemeral in-flight tracking doesn't track historic fails
})
}
}

// If an error happened during the range, return it
if rangeErr != nil {
Expand Down
14 changes: 0 additions & 14 deletions modules/caddyhttp/reverseproxy/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,16 +132,6 @@ func (u *Upstream) fillHost() {
u.Host = host
}

func (u *Upstream) fillInfilghtHost(numRemaiRequests int) {
host := new(Host)
existingHost, loaded := inflightHosts.LoadOrStore(u.String(), host)
if loaded {
host = existingHost.(*Host)
}
_ = host.countRequest(numRemaiRequests)
u.Host = host
}

// Host is the basic, in-memory representation of the state of a remote host.
// Its fields are accessed atomically and Host values must not be copied.
type Host struct {
Expand Down Expand Up @@ -278,10 +268,6 @@ func GetDialInfo(ctx context.Context) (DialInfo, bool) {
// through config reloads.
var hosts = caddy.NewUsagePool()

// inflightHosts is the global repository for hosts that are
// currently in use by inflight upstream request.
var inflightHosts = caddy.NewUsagePool()

// dialInfoVarKey is the key used for the variable that holds
// the dial info for the upstream connection.
const dialInfoVarKey = "reverse_proxy.dial_info"
Expand Down
54 changes: 37 additions & 17 deletions modules/caddyhttp/reverseproxy/reverseproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,40 @@ import (
"github.com/caddyserver/caddy/v2/modules/caddyhttp/rewrite"
)

var (
inFlightRequests = make(map[string]uint)
inFlightRequestsMu sync.RWMutex
)

func incInFlightRequest(address string) {
inFlightRequestsMu.Lock()
inFlightRequests[address]++
inFlightRequestsMu.Unlock()
}

func decInFlightRequest(address string) {
inFlightRequestsMu.Lock()
defer inFlightRequestsMu.Unlock()

if inFlightRequests[address] > 0 {
inFlightRequests[address]--
}
if inFlightRequests[address] == 0 {
delete(inFlightRequests, address)
}
}

func getInFlightRequests() map[string]uint {
inFlightRequestsMu.RLock()
defer inFlightRequestsMu.RUnlock()

copyMap := make(map[string]uint, len(inFlightRequests))
for k, v := range inFlightRequests {
copyMap[k] = v
}
return copyMap
}

func init() {
caddy.RegisterModule(Handler{})
}
Expand Down Expand Up @@ -394,9 +428,6 @@ func (h *Handler) Cleanup() error {

// remove hosts from our config from the pool
for _, upstream := range h.Upstreams {
if upstream.NumRequests() > 0 {
upstream.fillInfilghtHost(upstream.NumRequests())
}
_, _ = hosts.Delete(upstream.String())
}

Expand Down Expand Up @@ -461,16 +492,8 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
}

var done bool
done, dialInfo, proxyErr := h.proxyLoopIteration(clonedReq, r, w, proxyErr, start, retries, repl, reqHeader, reqHost, next)
done, _, proxyErr = h.proxyLoopIteration(clonedReq, r, w, proxyErr, start, retries, repl, reqHeader, reqHost, next)
if done {
key := dialInfo.Address
val := inflightHosts.Load(key)
if val != nil {
host, _ := val.(*Host)
if host.NumRequests() <= 0 {
_, _ = inflightHosts.Delete(key)
}
}
break
}
if h.VerboseLogs {
Expand Down Expand Up @@ -839,14 +862,11 @@ func (h Handler) addForwardedHeaders(req *http.Request) error {
// Go standard library which was used as the foundation.)
func (h *Handler) reverseProxy(rw http.ResponseWriter, req *http.Request, origReq *http.Request, repl *caddy.Replacer, di DialInfo, next caddyhttp.Handler) error {
_ = di.Upstream.Host.countRequest(1)
incInFlightRequest(di.Address)
//nolint:errcheck
defer func() {
di.Upstream.Host.countRequest(-1)
inflightHost := inflightHosts.Load(di.Address)
if inflightHost != nil {
host, _ := inflightHost.(*Host)
host.countRequest(-1)
}
decInFlightRequest(di.Address)
}()
// point the request to this upstream
h.directRequest(req, di)
Expand Down
10 changes: 0 additions & 10 deletions usagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,6 @@ func (up *UsagePool) Delete(key any) (deleted bool, err error) {
return deleted, err
}

func (up *UsagePool) Load(key any) any {
up.RLock()
defer up.RUnlock()
upv, loaded := up.pool[key]
if loaded {
return upv.value
}
return nil
}

// References returns the number of references (count of usages) to a
// key in the pool, and true if the key exists, or false otherwise.
func (up *UsagePool) References(key any) (int, bool) {
Expand Down