diff --git a/.changeset/olive-mirrors-kick.md b/.changeset/olive-mirrors-kick.md new file mode 100644 index 0000000000000..af72327866597 --- /dev/null +++ b/.changeset/olive-mirrors-kick.md @@ -0,0 +1,5 @@ +--- +'@eth-optimism/proxyd': major +--- + +Update metrics, support WS diff --git a/go/proxyd/backend.go b/go/proxyd/backend.go index ffd668c080f52..3201d8bd0c3df 100644 --- a/go/proxyd/backend.go +++ b/go/proxyd/backend.go @@ -6,74 +6,72 @@ import ( "errors" "fmt" "github.com/ethereum/go-ethereum/log" + "github.com/gorilla/websocket" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "io" "io/ioutil" "math" "math/rand" "net/http" - "sync/atomic" "time" ) const ( - JSONRPCVersion = "2.0" + JSONRPCVersion = "2.0" + JSONRPCErrorInternal = -32000 ) var ( - ErrNoBackend = errors.New("no backend available for method") - ErrBackendsInconsistent = errors.New("backends inconsistent, try again") - ErrBackendOffline = errors.New("backend offline") - - backendRequestsCtr = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "proxyd", - Name: "backend_requests_total", - Help: "Count of backend requests.", - }, []string{ - "backend_name", - "method_name", - }) - - backendErrorsCtr = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "proxyd", - Name: "backend_errors_total", - Help: "Count of backend errors.", - }, []string{ - "backend_name", - "method_name", - }) - - backendPermanentErrorsCtr = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "proxyd", - Name: "backend_permanent_errors_total", - Help: "Count of backend errors that mark a backend as offline.", - }, []string{ - "backend_name", - "method_name", - }) - - backendResponseTimeSummary = promauto.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: "proxyd", - Name: "backend_response_time_seconds", - Help: "Summary of backend response times broken down by backend and method name.", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, - }, []string{ - "backend_name", - "method_name", - }) + ErrInvalidRequest = &RPCErr{ + Code: -32601, + Message: "invalid request", + } + ErrParseErr = &RPCErr{ + Code: -32700, + Message: "parse error", + } + ErrInternal = &RPCErr{ + Code: JSONRPCErrorInternal, + Message: "internal error", + } + ErrMethodNotWhitelisted = &RPCErr{ + Code: JSONRPCErrorInternal - 1, + Message: "rpc method is not whitelisted", + } + ErrBackendOffline = &RPCErr{ + Code: JSONRPCErrorInternal - 10, + Message: "backend offline", + } + ErrNoBackends = &RPCErr{ + Code: JSONRPCErrorInternal - 11, + Message: "no backends available for method", + } + ErrBackendOverCapacity = &RPCErr{ + Code: JSONRPCErrorInternal - 12, + Message: "backend is over capacity", + } + ErrBackendBadResponse = &RPCErr{ + Code: JSONRPCErrorInternal - 13, + Message: "backend returned an invalid response", + } ) type Backend struct { - Name string - authUsername string - authPassword string - baseURL string - client *http.Client - maxRetries int - maxResponseSize int64 - lastPermError int64 - unhealthyRetryInterval int64 + Name string + rpcURL string + wsURL string + authUsername string + authPassword string + allowedRPCMethods *StringSet + allowedWSMethods *StringSet + redis Redis + client *http.Client + dialer *websocket.Dialer + maxRetries int + maxResponseSize int64 + maxRPS int + maxWSConns int + outOfServiceInterval time.Duration } type BackendOpt func(b *Backend) @@ -103,20 +101,45 @@ func WithMaxResponseSize(size int64) BackendOpt { } } -func WithUnhealthyRetryInterval(interval int64) BackendOpt { +func WithOutOfServiceDuration(interval time.Duration) BackendOpt { return func(b *Backend) { - b.unhealthyRetryInterval = interval + b.outOfServiceInterval = interval } } -func NewBackend(name, baseURL string, opts ...BackendOpt) *Backend { +func WithMaxRPS(maxRPS int) BackendOpt { + return func(b *Backend) { + b.maxRPS = maxRPS + } +} + +func WithMaxWSConns(maxConns int) BackendOpt { + return func(b *Backend) { + b.maxWSConns = maxConns + } +} + +func NewBackend( + name string, + rpcURL string, + wsURL string, + allowedRPCMethods *StringSet, + allowedWSMethods *StringSet, + redis Redis, + opts ...BackendOpt, +) *Backend { backend := &Backend{ - Name: name, - baseURL: baseURL, - maxResponseSize: math.MaxInt64, + Name: name, + rpcURL: rpcURL, + wsURL: wsURL, + allowedRPCMethods: allowedRPCMethods, + allowedWSMethods: allowedWSMethods, + redis: redis, + maxResponseSize: math.MaxInt64, client: &http.Client{ Timeout: 5 * time.Second, }, + dialer: &websocket.Dialer{}, } for _, opt := range opts { @@ -127,45 +150,131 @@ func NewBackend(name, baseURL string, opts ...BackendOpt) *Backend { } func (b *Backend) Forward(req *RPCReq) (*RPCRes, error) { - if time.Now().Unix()-atomic.LoadInt64(&b.lastPermError) < b.unhealthyRetryInterval { + if !b.allowedRPCMethods.Has(req.Method) { + return nil, ErrMethodNotWhitelisted + } + if !b.Online() { return nil, ErrBackendOffline } + if b.IsRateLimited() { + return nil, ErrBackendOverCapacity + } var lastError error // <= to account for the first attempt not technically being // a retry for i := 0; i <= b.maxRetries; i++ { + rpcForwardsTotal.WithLabelValues(b.Name, req.Method, RPCRequestSourceHTTP).Inc() + respTimer := prometheus.NewTimer(rpcBackendRequestDurationSumm.WithLabelValues(b.Name, req.Method)) resB, err := b.doForward(req) if err != nil { lastError = err log.Warn("backend request failed, trying again", "err", err, "name", b.Name) + respTimer.ObserveDuration() time.Sleep(calcBackoff(i)) continue } + respTimer.ObserveDuration() res := new(RPCRes) // don't mark the backend down if they give us a bad response body if err := json.Unmarshal(resB, res); err != nil { - return nil, wrapErr(err, "error unmarshaling JSON") + return nil, ErrBackendBadResponse } return res, nil } - atomic.StoreInt64(&b.lastPermError, time.Now().Unix()) - backendPermanentErrorsCtr.WithLabelValues(b.Name, req.Method).Inc() + b.setOffline() return nil, wrapErr(lastError, "permanent error forwarding request") } -func (b *Backend) doForward(rpcReq *RPCReq) ([]byte, error) { - body, err := json.Marshal(rpcReq) +func (b *Backend) ProxyWS(clientConn *websocket.Conn) (*WSProxier, error) { + if !b.Online() { + return nil, ErrBackendOffline + } + if b.IsWSSaturated() { + return nil, ErrBackendOverCapacity + } + + backendConn, _, err := b.dialer.Dial(b.wsURL, nil) + if err != nil { + b.setOffline() + if err := b.redis.DecBackendWSConns(b.Name); err != nil { + log.Error("error decrementing backend ws conns", "name", b.Name, "err", err) + } + return nil, wrapErr(err, "error dialing backend") + } + + activeBackendWsConnsGauge.WithLabelValues(b.Name).Inc() + return NewWSProxier(b, clientConn, backendConn), nil +} + +func (b *Backend) Online() bool { + online, err := b.redis.IsBackendOnline(b.Name) if err != nil { - return nil, wrapErr(err, "error marshaling request in forward") + log.Warn( + "error getting backend availability, assuming it is offline", + "name", b.Name, + "err", err, + ) + return false + } + return online +} + +func (b *Backend) IsRateLimited() bool { + if b.maxRPS == 0 { + return false } - httpReq, err := http.NewRequest("POST", b.baseURL, bytes.NewReader(body)) + usedLimit, err := b.redis.IncBackendRPS(b.Name) + if err != nil { + log.Error( + "error getting backend used rate limit, assuming limit is exhausted", + "name", b.Name, + "err", err, + ) + return true + } + + return b.maxRPS < usedLimit +} + +func (b *Backend) IsWSSaturated() bool { + if b.maxWSConns == 0 { + return false + } + + incremented, err := b.redis.IncBackendWSConns(b.Name, b.maxWSConns) + if err != nil { + log.Error( + "error getting backend used ws conns, assuming limit is exhausted", + "name", b.Name, + "err", err, + ) + return true + } + + return !incremented +} + +func (b *Backend) setOffline() { + err := b.redis.SetBackendOffline(b.Name, b.outOfServiceInterval) + if err != nil { + log.Warn( + "error setting backend offline", + "name", b.Name, + "err", err, + ) + } +} + +func (b *Backend) doForward(rpcReq *RPCReq) ([]byte, error) { + body := mustMarshalJSON(rpcReq) + + httpReq, err := http.NewRequest("POST", b.rpcURL, bytes.NewReader(body)) if err != nil { - backendErrorsCtr.WithLabelValues(b.Name, rpcReq.Method).Inc() return nil, wrapErr(err, "error creating backend request") } @@ -173,24 +282,18 @@ func (b *Backend) doForward(rpcReq *RPCReq) ([]byte, error) { httpReq.SetBasicAuth(b.authUsername, b.authPassword) } - timer := prometheus.NewTimer(backendResponseTimeSummary.WithLabelValues(b.Name, rpcReq.Method)) - defer timer.ObserveDuration() - defer backendRequestsCtr.WithLabelValues(b.Name, rpcReq.Method).Inc() res, err := b.client.Do(httpReq) if err != nil { - backendErrorsCtr.WithLabelValues(b.Name, rpcReq.Method).Inc() return nil, wrapErr(err, "error in backend request") } if res.StatusCode != 200 { - backendErrorsCtr.WithLabelValues(b.Name, rpcReq.Method).Inc() return nil, fmt.Errorf("response code %d", res.StatusCode) } defer res.Body.Close() resB, err := ioutil.ReadAll(io.LimitReader(res.Body, b.maxResponseSize)) if err != nil { - backendErrorsCtr.WithLabelValues(b.Name, rpcReq.Method).Inc() return nil, wrapErr(err, "error reading response body") } @@ -199,51 +302,227 @@ func (b *Backend) doForward(rpcReq *RPCReq) ([]byte, error) { type BackendGroup struct { Name string - backends []*Backend - i int64 + Backends []*Backend } func (b *BackendGroup) Forward(rpcReq *RPCReq) (*RPCRes, error) { - var outRes *RPCRes - for _, back := range b.backends { + rpcRequestsTotal.Inc() + + for _, back := range b.Backends { res, err := back.Forward(rpcReq) - if err == ErrBackendOffline { + if errors.Is(err, ErrMethodNotWhitelisted) { + return nil, err + } + if errors.Is(err, ErrBackendOffline) { log.Debug("skipping offline backend", "name", back.Name) continue } + if errors.Is(err, ErrBackendOverCapacity) { + log.Debug("skipping over-capacity backend", "name", back.Name) + continue + } if err != nil { log.Error("error forwarding request to backend", "err", err, "name", b.Name) continue } - outRes = res - break + return res, nil } - if outRes == nil { - return nil, errors.New("no backends available") + return nil, ErrNoBackends +} + +func (b *BackendGroup) ProxyWS(clientConn *websocket.Conn) (*WSProxier, error) { + for _, back := range b.Backends { + proxier, err := back.ProxyWS(clientConn) + if errors.Is(err, ErrBackendOffline) { + log.Debug("skipping offline backend", "name", back.Name) + continue + } + if errors.Is(err, ErrBackendOverCapacity) { + log.Debug("skipping over-capacity backend", "name", back.Name) + continue + } + if err != nil { + log.Warn("error dialing ws backend", "name", back.Name, "err", err) + continue + } + return proxier, nil } - return outRes, nil + return nil, ErrNoBackends } -type MethodMapping struct { - methods map[string]*BackendGroup +func calcBackoff(i int) time.Duration { + jitter := float64(rand.Int63n(250)) + ms := math.Min(math.Pow(2, float64(i))*1000+jitter, 10000) + return time.Duration(ms) * time.Millisecond } -func NewMethodMapping(methods map[string]*BackendGroup) *MethodMapping { - return &MethodMapping{methods: methods} +type WSProxier struct { + backend *Backend + clientConn *websocket.Conn + backendConn *websocket.Conn } -func (m *MethodMapping) BackendGroupFor(method string) (*BackendGroup, error) { - group := m.methods[method] - if group == nil { - return nil, ErrNoBackend +func NewWSProxier(backend *Backend, clientConn, backendConn *websocket.Conn, ) *WSProxier { + return &WSProxier{ + backend: backend, + clientConn: clientConn, + backendConn: backendConn, } - return group, nil } -func calcBackoff(i int) time.Duration { - jitter := float64(rand.Int63n(250)) - ms := math.Min(math.Pow(2, float64(i))*1000+jitter, 10000) - return time.Duration(ms) * time.Millisecond +func (w *WSProxier) Proxy() error { + errC := make(chan error, 2) + go w.clientPump(errC) + go w.backendPump(errC) + err := <-errC + w.close() + return err +} + +func (w *WSProxier) clientPump(errC chan error) { + for { + outConn := w.backendConn + // Block until we get a message. + msgType, msg, err := w.clientConn.ReadMessage() + if err != nil { + errC <- err + outConn.WriteMessage(websocket.CloseMessage, formatWSError(err)) + return + } + + RecordWSMessage(w.backend.Name, SourceClient) + + // Route control messages to the backend. These don't + // count towards the total RPC requests count. + if msgType != websocket.TextMessage && msgType != websocket.BinaryMessage { + err := outConn.WriteMessage(msgType, msg) + if err != nil { + errC <- err + return + } + continue + } + + rpcRequestsTotal.Inc() + + // Don't bother sending invalid requests to the backend, + // just handle them here. + req, err := w.parseClientMsg(msg) + if err != nil { + var id *int + if req != nil { + id = req.ID + } + outConn = w.clientConn + msg = mustMarshalJSON(NewRPCErrorRes(id, err)) + RecordRPCError(SourceClient, err) + } else { + rpcForwardsTotal.WithLabelValues(w.backend.Name, req.Method, RPCRequestSourceWS).Inc() + } + + err = outConn.WriteMessage(msgType, msg) + if err != nil { + errC <- err + return + } + } +} + +func (w *WSProxier) backendPump(errC chan error) { + for { + // Block until we get a message. + msgType, msg, err := w.backendConn.ReadMessage() + if err != nil { + errC <- err + w.clientConn.WriteMessage(websocket.CloseMessage, formatWSError(err)) + return + } + + RecordWSMessage(w.backend.Name, SourceBackend) + + // Route control messages directly to the client. + if msgType != websocket.TextMessage && msgType != websocket.BinaryMessage { + err := w.clientConn.WriteMessage(msgType, msg) + if err != nil { + errC <- err + return + } + continue + } + + res, err := w.parseBackendMsg(msg) + if err != nil { + var id *int + if res != nil { + id = res.ID + } + msg = mustMarshalJSON(NewRPCErrorRes(id, err)) + } + if res.IsError() { + RecordRPCError(SourceBackend, res.Error) + } + + err = w.clientConn.WriteMessage(msgType, msg) + if err != nil { + errC <- err + return + } + } +} + +func (w *WSProxier) close() { + w.clientConn.Close() + w.backendConn.Close() + if err := w.backend.redis.DecBackendWSConns(w.backend.Name); err != nil { + log.Error("error decrementing backend ws conns", "name", w.backend.Name, "err", err) + } + activeBackendWsConnsGauge.WithLabelValues(w.backend.Name).Dec() +} + +func (w *WSProxier) parseClientMsg(msg []byte) (*RPCReq, error) { + req, err := ParseRPCReq(bytes.NewReader(msg)) + if err != nil { + log.Warn("error parsing RPC request", "source", "ws", "err", err) + return nil, err + } + + if !w.backend.allowedWSMethods.Has(req.Method) { + log.Info("blocked request for non-whitelisted method", "source", "ws", "method", req.Method) + return req, ErrMethodNotWhitelisted + } + + if w.backend.IsRateLimited() { + return req, ErrBackendOverCapacity + } + + return req, nil +} + +func (w *WSProxier) parseBackendMsg(msg []byte) (*RPCRes, error) { + res, err := ParseRPCRes(bytes.NewReader(msg)) + if err != nil { + log.Warn("error parsing RPC response", "source", "ws", "err", err) + return res, ErrBackendBadResponse + } + return res, nil +} + +func mustMarshalJSON(in interface{}) []byte { + out, err := json.Marshal(in) + if err != nil { + panic(err) + } + return out +} + +func formatWSError(err error) []byte { + m := websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("%v", err)) + if e, ok := err.(*websocket.CloseError); ok { + if e.Code != websocket.CloseNoStatusReceived { + m = websocket.FormatCloseMessage(e.Code, e.Text) + } + } + return m } diff --git a/go/proxyd/config.go b/go/proxyd/config.go index f767ba3e650d9..98978774c05c3 100644 --- a/go/proxyd/config.go +++ b/go/proxyd/config.go @@ -6,6 +6,10 @@ type ServerConfig struct { MaxBodySizeBytes int64 `toml:"max_body_size_bytes"` } +type RedisConfig struct { + URL string `toml:"url"` +} + type MetricsConfig struct { Enabled bool `toml:"enabled"` Host string `toml:"host"` @@ -13,16 +17,19 @@ type MetricsConfig struct { } type BackendOptions struct { - ResponseTimeoutSeconds int `toml:"response_timeout_seconds"` - MaxResponseSizeBytes int64 `toml:"max_response_size_bytes"` - MaxRetries int `toml:"backend_retries"` - UnhealthyBackendRetryIntervalSeconds int64 `toml:"unhealthy_backend_retry_interval_seconds"` + ResponseTimeoutSeconds int `toml:"response_timeout_seconds"` + MaxResponseSizeBytes int64 `toml:"max_response_size_bytes"` + MaxRetries int `toml:"backend_retries"` + OutOfServiceSeconds int `toml:"out_of_service_seconds"` } type BackendConfig struct { - Username string `toml:"username"` - Password string `toml:"password"` - BaseURL string `toml:"base_url"` + Username string `toml:"username"` + Password string `toml:"password"` + RPCURL string `toml:"rpc_url"` + WSURL string `toml:"ws_url"` + MaxRPS int `toml:"max_rps"` + MaxWSConns int `toml:"max_ws_conns"` } type BackendsConfig map[string]*BackendConfig @@ -36,10 +43,11 @@ type BackendGroupsConfig map[string]*BackendGroupConfig type MethodMappingsConfig map[string]string type Config struct { - Server *ServerConfig `toml:"server"` - Metrics *MetricsConfig `toml:"metrics"` - BackendOptions *BackendOptions `toml:"backend"` - Backends BackendsConfig `toml:"backends"` - BackendGroups BackendGroupsConfig `toml:"backend_groups"` - MethodMappings MethodMappingsConfig `toml:"method_mappings"` + AllowedRPCMethods []string `toml:"allowed_rpc_methods"` + AllowedWSMethods []string `toml:"allowed_ws_methods"` + Server *ServerConfig `toml:"server"` + Redis *RedisConfig `toml:"redis"` + Metrics *MetricsConfig `toml:"metrics"` + BackendOptions *BackendOptions `toml:"backend_options"` + Backends BackendsConfig `toml:"backends"` } diff --git a/go/proxyd/go.mod b/go/proxyd/go.mod index f2a1a91219796..5c8d067923c88 100644 --- a/go/proxyd/go.mod +++ b/go/proxyd/go.mod @@ -5,7 +5,9 @@ go 1.16 require ( github.com/BurntSushi/toml v0.4.1 github.com/ethereum/go-ethereum v1.10.11 + github.com/go-redis/redis/v8 v8.11.4 github.com/gorilla/mux v1.8.0 + github.com/gorilla/websocket v1.4.2 github.com/prometheus/client_golang v1.11.0 github.com/rs/cors v1.8.0 ) diff --git a/go/proxyd/go.sum b/go/proxyd/go.sum index bc2d0e7041ca9..a62a130b253f3 100644 --- a/go/proxyd/go.sum +++ b/go/proxyd/go.sum @@ -77,8 +77,9 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= -github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -98,6 +99,8 @@ github.com/deepmap/oapi-codegen v1.6.0/go.mod h1:ryDa9AgbELGeB+YEXE1dR53yAjHwFvE github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8/go.mod h1:VMaSuZ+SZcx/wljOQKvp5srsbCiKDEb6K2wC4+PiBmQ= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dlclark/regexp2 v1.4.1-0.20201116162257-a2a8dda75c91/go.mod h1:2pZnwuY/m+8K6iRw6wQdMtk+rH5tNGR1i55kozfMjCc= github.com/docker/docker v1.4.2-0.20180625184442-8e610b2b55bf/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= @@ -113,6 +116,7 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fjl/memsize v0.0.0-20190710130421-bcb5799ab5e5/go.mod h1:VvhXpOYNQvB+uIk2RvXzuaQtkQJzzIx6lSBe1xv7hi0= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/gballet/go-libpcsclite v0.0.0-20190607065134-2772fd86a8ff/go.mod h1:x7DCsMOv1taUwEWCzT4cmDeAkigA5/QCwUodaVOe8Ww= github.com/getkin/kin-openapi v0.53.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4= @@ -136,10 +140,13 @@ github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34 github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= +github.com/go-redis/redis/v8 v8.11.4 h1:kHoYkfZP6+pe04aFTnhDH6GDROa5yJdHJVNxV3F46Tg= +github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Pxt6RJr792+w= github.com/go-sourcemap/sourcemap v2.1.3+incompatible/go.mod h1:F8jJfvm2KbVjc5NqelyYJmf/v5J0dwNLS2mL4sNA1Jg= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= @@ -160,8 +167,10 @@ github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrU github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= -github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -175,8 +184,9 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= @@ -190,6 +200,7 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/graph-gophers/graphql-go v0.0.0-20201113091052-beb923fada29/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0= @@ -286,15 +297,21 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRW github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.1.2-0.20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= +github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/onsi/gomega v1.16.0 h1:6gjqkI8iiRHMvdccRJM8rVKjCWk6ZIm6FTm3ddIe4/c= +github.com/onsi/gomega v1.16.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -442,6 +459,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d h1:20cMwl2fHAzkJMEA+8J4JgqBQcQGzbisXo31MIeenXI= golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -492,6 +511,7 @@ golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -509,6 +529,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -540,6 +561,7 @@ golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200108203644-89082a384178/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -589,8 +611,9 @@ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= -google.golang.org/protobuf v1.26.0-rc.1 h1:7QnIQpGRHE5RnLKnESfDoxm2dTapTZua5a0kS0A+VXQ= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -602,6 +625,7 @@ gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8 gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce/go.mod h1:5AcXVHNjg+BDxry382+8OKon8SEWiKktQR07RKPsv1c= gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6/go.mod h1:uAJfkITjFhyEEuUfm7bsmCZRbW5WRq8s9EY8HZ6hCns= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -611,6 +635,7 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= diff --git a/go/proxyd/metrics.go b/go/proxyd/metrics.go new file mode 100644 index 0000000000000..80b3346dcbc47 --- /dev/null +++ b/go/proxyd/metrics.go @@ -0,0 +1,131 @@ +package proxyd + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "strconv" +) + +const ( + MetricsNamespace = "proxyd" + + RPCRequestSourceHTTP = "http" + RPCRequestSourceWS = "ws" + + SourceClient = "client" + SourceBackend = "backend" + SourceProxyd = "proxyd" +) + +var ( + rpcRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "rpc_requests_total", + Help: "Count of total client RPC requests.", + }) + + rpcForwardsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "rpc_backend_requests_total", + Help: "Count of total RPC requests forwarded to each backend.", + }, []string{ + "backend_name", + "method_name", + "source", + }) + + rpcErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "rpc_errors_total", + Help: "Count of total RPC errors.", + }, []string{ + "source", + "error_code", + }) + + rpcBackendRequestDurationSumm = promauto.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: MetricsNamespace, + Name: "rpc_backend_request_duration_seconds", + Help: "Summary of backend response times broken down by backend and method name.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }, []string{ + "backend_name", + "method_name", + }) + + activeClientWsConnsGauge = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "active_client_ws_conns", + Help: "Gauge of active client WS connections.", + }) + + activeBackendWsConnsGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: MetricsNamespace, + Name: "active_backend_ws_conns", + Help: "Gauge of active backend WS connections.", + }, []string{ + "backend_name", + }) + + unserviceableRequestsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "unserviceable_requests_total", + Help: "Count of total requests that were rejected due to no backends being available.", + }, []string{ + "source", + }) + + httpRequestsTotal = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "http_requests_total", + Help: "Count of total HTTP requests.", + }) + + httpRequestDurationSumm = promauto.NewSummary(prometheus.SummaryOpts{ + Namespace: MetricsNamespace, + Name: "http_request_duration_seconds", + Help: "Summary of HTTP request durations, in seconds.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, + }) + + wsMessagesTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "ws_messages_total", + Help: "Count of total websocket messages including protocol control.", + }, []string{ + "backend_name", + "source", + }) + + redisErrorsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: MetricsNamespace, + Name: "redis_errors_total", + Help: "Count of total Redis errors.", + }, []string{ + "source", + }) +) + +func RecordRPCError(source string, err error) { + rpcErr, ok := err.(*RPCErr) + var code int + if ok { + code = rpcErr.Code + } else { + code = -1 + } + + rpcErrorsTotal.WithLabelValues(source, strconv.Itoa(code)).Inc() +} + +func RecordRedisError(source string) { + redisErrorsTotal.WithLabelValues(source).Inc() +} + +func RecordWSMessage(backendName, source string) { + wsMessagesTotal.WithLabelValues(backendName, source).Inc() +} + +func RecordUnserviceableRequest(source string) { + unserviceableRequestsTotal.WithLabelValues(source).Inc() +} diff --git a/go/proxyd/proxyd.go b/go/proxyd/proxyd.go index 963dc14317808..d198b6f7e14a2 100644 --- a/go/proxyd/proxyd.go +++ b/go/proxyd/proxyd.go @@ -13,28 +13,35 @@ import ( ) func Start(config *Config) error { - backendsByName := make(map[string]*Backend) - groupsByName := make(map[string]*BackendGroup) - if len(config.Backends) == 0 { return errors.New("must define at least one backend") } - if len(config.BackendGroups) == 0 { - return errors.New("must define at least one backend group") + if len(config.AllowedRPCMethods) == 0 { + return errors.New("must define at least one allowed RPC method") } - if len(config.MethodMappings) == 0 { - return errors.New("must define at least one method mapping") + + allowedRPCs := NewStringSetFromStrings(config.AllowedRPCMethods) + allowedWSRPCs := allowedRPCs.Extend(config.AllowedWSMethods) + + redis, err := NewRedis(config.Redis.URL) + if err != nil { + return err } + backends := make([]*Backend, 0) + backendNames := make([]string, 0) for name, cfg := range config.Backends { opts := make([]BackendOpt, 0) - if cfg.BaseURL == "" { - return fmt.Errorf("must define a base URL for backend %s", name) + if cfg.RPCURL == "" { + return fmt.Errorf("must define an RPC URL for backend %s", name) + } + if cfg.WSURL == "" { + return fmt.Errorf("must define a WS URL for backend %s", name) } if config.BackendOptions.ResponseTimeoutSeconds != 0 { - timeout := time.Duration(config.BackendOptions.ResponseTimeoutSeconds) * time.Second + timeout := secondsToDuration(config.BackendOptions.ResponseTimeoutSeconds) opts = append(opts, WithTimeout(timeout)) } if config.BackendOptions.MaxRetries != 0 { @@ -43,42 +50,29 @@ func Start(config *Config) error { if config.BackendOptions.MaxResponseSizeBytes != 0 { opts = append(opts, WithMaxResponseSize(config.BackendOptions.MaxResponseSizeBytes)) } - if config.BackendOptions.UnhealthyBackendRetryIntervalSeconds != 0 { - opts = append(opts, WithUnhealthyRetryInterval(config.BackendOptions.UnhealthyBackendRetryIntervalSeconds)) + if config.BackendOptions.OutOfServiceSeconds != 0 { + opts = append(opts, WithOutOfServiceDuration(secondsToDuration(config.BackendOptions.OutOfServiceSeconds))) } - if cfg.Password != "" { - opts = append(opts, WithBasicAuth(cfg.Username, cfg.Password)) + if cfg.MaxRPS != 0 { + opts = append(opts, WithMaxRPS(cfg.MaxRPS)) } - backendsByName[name] = NewBackend(name, cfg.BaseURL, opts...) - log.Info("configured backend", "name", name, "base_url", cfg.BaseURL) - } - - for groupName, cfg := range config.BackendGroups { - backs := make([]*Backend, 0) - for _, backName := range cfg.Backends { - if backendsByName[backName] == nil { - return fmt.Errorf("undefined backend %s", backName) - } - backs = append(backs, backendsByName[backName]) - log.Info("configured backend group", "name", groupName) + if cfg.MaxWSConns != 0 { + opts = append(opts, WithMaxWSConns(cfg.MaxWSConns)) } - - groupsByName[groupName] = &BackendGroup{ - Name: groupName, - backends: backs, + if cfg.Password != "" { + opts = append(opts, WithBasicAuth(cfg.Username, cfg.Password)) } + back := NewBackend(name, cfg.RPCURL, cfg.WSURL, allowedRPCs, allowedWSRPCs, redis, opts...) + backends = append(backends, back) + backendNames = append(backendNames, name) + log.Info("configured backend", "name", name, "rpc_url", cfg.RPCURL, "ws_url", cfg.WSURL) } - mappings := make(map[string]*BackendGroup) - for method, groupName := range config.MethodMappings { - if groupsByName[groupName] == nil { - return fmt.Errorf("undefined backend group %s", groupName) - } - mappings[method] = groupsByName[groupName] + backendGroup := &BackendGroup{ + Name: "main", + Backends: backends, } - methodMappings := NewMethodMapping(mappings) - - srv := NewServer(methodMappings, config.Server.MaxBodySizeBytes) + srv := NewServer(backendGroup, config.Server.MaxBodySizeBytes) if config.Metrics.Enabled { addr := fmt.Sprintf("%s:%d", config.Metrics.Host, config.Metrics.Port) @@ -88,6 +82,10 @@ func Start(config *Config) error { go func() { if err := srv.ListenAndServe(config.Server.Host, config.Server.Port); err != nil { + if errors.Is(err, http.ErrServerClosed) { + log.Info("server shut down") + return + } log.Crit("error starting server", "err", err) } }() @@ -96,5 +94,13 @@ func Start(config *Config) error { signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) recvSig := <-sig log.Info("caught signal, shutting down", "signal", recvSig) + srv.Shutdown() + if err := redis.FlushBackendWSConns(backendNames); err != nil { + log.Error("error flushing backend ws conns", "err", err) + } return nil } + +func secondsToDuration(seconds int) time.Duration { + return time.Duration(seconds) * time.Second +} diff --git a/go/proxyd/redis.go b/go/proxyd/redis.go new file mode 100644 index 0000000000000..b88514d1a5a05 --- /dev/null +++ b/go/proxyd/redis.go @@ -0,0 +1,191 @@ +package proxyd + +import ( + "context" + "crypto/rand" + "encoding/hex" + "fmt" + "github.com/ethereum/go-ethereum/log" + "github.com/go-redis/redis/v8" + "sync" + "time" +) + +const MaxRPSScript = ` +local current +current = redis.call("incr", KEYS[1]) +if current == 1 then + redis.call("expire", KEYS[1], 1) +end +return current +` + +const MaxConcurrentWSConnsScript = ` +redis.call("sadd", KEYS[1], KEYS[2]) +local total = 0 +local scanres = redis.call("sscan", KEYS[1], 0) +for _, k in ipairs(scanres[2]) do + local value = redis.call("get", k) + if value then + total = total + value + end +end + +if total < tonumber(ARGV[1]) then + redis.call("incr", KEYS[2]) + redis.call("expire", KEYS[2], 300) + return true +end + +return false +` + +type Redis interface { + IsBackendOnline(name string) (bool, error) + SetBackendOffline(name string, duration time.Duration) error + IncBackendRPS(name string) (int, error) + IncBackendWSConns(name string, max int) (bool, error) + DecBackendWSConns(name string) error + FlushBackendWSConns(names []string) error +} + +type RedisImpl struct { + rdb *redis.Client + randID string + touchKeys map[string]time.Duration + tkMtx sync.Mutex +} + +func NewRedis(url string) (Redis, error) { + opts, err := redis.ParseURL(url) + if err != nil { + return nil, err + } + rdb := redis.NewClient(opts) + if err := rdb.Ping(context.Background()).Err(); err != nil { + return nil, wrapErr(err, "error connecting to redis") + } + out := &RedisImpl{ + rdb: rdb, + randID: randStr(20), + touchKeys: make(map[string]time.Duration), + } + go out.touch() + return out, nil +} + +func (r *RedisImpl) IsBackendOnline(name string) (bool, error) { + exists, err := r.rdb.Exists(context.Background(), fmt.Sprintf("backend:%s:offline", name)).Result() + if err != nil { + RecordRedisError("IsBackendOnline") + return false, wrapErr(err, "error getting backend availability") + } + + return exists == 0, nil +} + +func (r *RedisImpl) SetBackendOffline(name string, duration time.Duration) error { + err := r.rdb.SetEX( + context.Background(), + fmt.Sprintf("backend:%s:offline", name), + 1, + duration, + ).Err() + if err != nil { + RecordRedisError("SetBackendOffline") + return wrapErr(err, "error setting backend unavailable") + } + return nil +} + +func (r *RedisImpl) IncBackendRPS(name string) (int, error) { + cmd := r.rdb.Eval( + context.Background(), + MaxRPSScript, + []string{fmt.Sprintf("backend:%s:ratelimit", name)}, + ) + rps, err := cmd.Int() + if err != nil { + RecordRedisError("IncBackendRPS") + return -1, wrapErr(err, "error upserting backend rate limit") + } + return rps, nil +} + +func (r *RedisImpl) IncBackendWSConns(name string, max int) (bool, error) { + connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name) + r.tkMtx.Lock() + r.touchKeys[connsKey] = 5 * time.Minute + r.tkMtx.Unlock() + cmd := r.rdb.Eval( + context.Background(), + MaxConcurrentWSConnsScript, + []string{ + fmt.Sprintf("backend:%s:proxies", name), + connsKey, + }, + max, + ) + incremented, err := cmd.Bool() + // false gets coerced to redis.nil, see https://redis.io/commands/eval#conversion-between-lua-and-redis-data-types + if err == redis.Nil { + return false, nil + } + if err != nil { + RecordRedisError("IncBackendWSConns") + return false, wrapErr(err, "error incrementing backend ws conns") + } + return incremented, nil +} + +func (r *RedisImpl) DecBackendWSConns(name string) error { + connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name) + err := r.rdb.Decr(context.Background(), connsKey).Err() + if err != nil { + RecordRedisError("DecBackendWSConns") + return wrapErr(err, "error decrementing backend ws conns") + } + return nil +} + +func (r *RedisImpl) FlushBackendWSConns(names []string) error { + ctx := context.Background() + for _, name := range names { + connsKey := fmt.Sprintf("proxy:%s:wsconns:%s", r.randID, name) + err := r.rdb.SRem( + ctx, + fmt.Sprintf("backend:%s:proxies", name), + connsKey, + ).Err() + if err != nil { + return wrapErr(err, "error flushing backend ws conns") + } + err = r.rdb.Del(ctx, connsKey).Err() + if err != nil { + return wrapErr(err, "error flushing backend ws conns") + } + } + return nil +} + +func (r *RedisImpl) touch() { + for { + r.tkMtx.Lock() + for key, dur := range r.touchKeys { + if err := r.rdb.Expire(context.Background(), key, dur).Err(); err != nil { + RecordRedisError("touch") + log.Error("error touching redis key", "key", key, "err", err) + } + } + r.tkMtx.Unlock() + time.Sleep(5 * time.Second) + } +} + +func randStr(l int) string { + b := make([]byte, l) + if _, err := rand.Read(b); err != nil { + panic(err) + } + return hex.EncodeToString(b) +} diff --git a/go/proxyd/rpc.go b/go/proxyd/rpc.go new file mode 100644 index 0000000000000..a70b525d6abc8 --- /dev/null +++ b/go/proxyd/rpc.go @@ -0,0 +1,88 @@ +package proxyd + +import ( + "encoding/json" + "io" + "io/ioutil" +) + +type RPCReq struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` + ID *int `json:"id"` +} + +type RPCRes struct { + JSONRPC string `json:"jsonrpc"` + Result interface{} `json:"result,omitempty"` + Error *RPCErr `json:"error,omitempty"` + ID *int `json:"id"` +} + +func (r *RPCRes) IsError() bool { + return r.Error != nil +} + +type RPCErr struct { + Code int `json:"code"` + Message string `json:"message"` +} + +func (r *RPCErr) Error() string { + return r.Message +} + +func ParseRPCReq(r io.Reader) (*RPCReq, error) { + body, err := ioutil.ReadAll(r) + if err != nil { + return nil, wrapErr(err, "error reading request body") + } + + req := new(RPCReq) + if err := json.Unmarshal(body, req); err != nil { + return nil, ErrParseErr + } + + if req.JSONRPC != JSONRPCVersion { + return nil, ErrInvalidRequest + } + + if req.Method == "" { + return nil, ErrInvalidRequest + } + + return req, nil +} + +func ParseRPCRes(r io.Reader) (*RPCRes, error) { + body, err := ioutil.ReadAll(r) + if err != nil { + return nil, wrapErr(err, "error reading RPC response") + } + + res := new(RPCRes) + if err := json.Unmarshal(body, res); err != nil { + return nil, wrapErr(err, "error unmarshaling RPC response") + } + + return res, nil +} + +func NewRPCErrorRes(id *int, err error) *RPCRes { + var rpcErr *RPCErr + if rr, ok := err.(*RPCErr); ok { + rpcErr = rr + } else { + rpcErr = &RPCErr{ + Code: JSONRPCErrorInternal, + Message: err.Error(), + } + } + + return &RPCRes{ + JSONRPC: JSONRPCVersion, + Error: rpcErr, + ID: id, + } +} diff --git a/go/proxyd/server.go b/go/proxyd/server.go index 51c4ce3573207..3050dec28c509 100644 --- a/go/proxyd/server.go +++ b/go/proxyd/server.go @@ -1,86 +1,37 @@ package proxyd import ( - "encoding/json" - "errors" - "fmt" - "github.com/ethereum/go-ethereum/log" - "github.com/gorilla/mux" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/rs/cors" - "io" - "io/ioutil" - "net/http" + "context" + "encoding/json" + "errors" + "fmt" + "github.com/ethereum/go-ethereum/log" + "github.com/gorilla/mux" + "github.com/gorilla/websocket" + "github.com/prometheus/client_golang/prometheus" + "github.com/rs/cors" + "io" + "net/http" + "time" ) -var ( - httpRequestsCtr = promauto.NewCounter(prometheus.CounterOpts{ - Namespace: "proxyd", - Name: "http_requests_total", - Help: "Count of total HTTP requests.", - }) - - httpRequestDurationSummary = promauto.NewSummary(prometheus.SummaryOpts{ - Namespace: "proxyd", - Name: "http_request_duration_seconds", - Help: "Summary of HTTP request durations, in seconds.", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.005, 0.99: 0.001}, - }) - - rpcRequestsCtr = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "proxyd", - Name: "rpc_requests_total", - Help: "Count of RPC requests.", - }, []string{ - "method_name", - }) - - blockedRPCsCtr = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "proxyd", - Name: "blocked_rpc_requests_total", - Help: "Count of blocked RPC requests.", - }, []string{ - "method_name", - }) - - rpcErrorsCtr = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: "proxyd", - Name: "rpc_errors_total", - Help: "Count of RPC errors.", - }, []string{ - "error_code", - }) -) - -type RPCReq struct { - JSONRPC string `json:"jsonrpc"` - Method string `json:"method"` - Params json.RawMessage `json:"params"` - ID *int `json:"id"` -} - -type RPCRes struct { - JSONRPC string `json:"jsonrpc"` - Result interface{} `json:"result,omitempty"` - Error *RPCErr `json:"error,omitempty"` - ID *int `json:"id"` -} - -type RPCErr struct { - Code int `json:"code"` - Message string `json:"message"` -} - type Server struct { - mappings *MethodMapping + backends *BackendGroup maxBodySize int64 + upgrader *websocket.Upgrader + server *http.Server } -func NewServer(mappings *MethodMapping, maxBodySize int64) *Server { +func NewServer( + backends *BackendGroup, + maxBodySize int64, +) *Server { return &Server{ - mappings: mappings, + backends: backends, maxBodySize: maxBodySize, + upgrader: &websocket.Upgrader{ + HandshakeTimeout: 5 * time.Second, + }, } } @@ -88,16 +39,21 @@ func (s *Server) ListenAndServe(host string, port int) error { hdlr := mux.NewRouter() hdlr.HandleFunc("/healthz", s.HandleHealthz).Methods("GET") hdlr.HandleFunc("/", s.HandleRPC).Methods("POST") + hdlr.HandleFunc("/ws", s.HandleWS) c := cors.New(cors.Options{ AllowedOrigins: []string{"*"}, }) addr := fmt.Sprintf("%s:%d", host, port) - server := &http.Server{ + s.server = &http.Server{ Handler: instrumentedHdlr(c.Handler(hdlr)), Addr: addr, } log.Info("starting HTTP server", "addr", addr) - return server.ListenAndServe() + return s.server.ListenAndServe() +} + +func (s *Server) Shutdown() { + s.server.Shutdown(context.Background()) } func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) { @@ -105,77 +61,93 @@ func (s *Server) HandleHealthz(w http.ResponseWriter, r *http.Request) { } func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(io.LimitReader(r.Body, s.maxBodySize)) + req, err := ParseRPCReq(io.LimitReader(r.Body, s.maxBodySize)) if err != nil { - log.Error("error reading request body", "err", err) - rpcErrorsCtr.WithLabelValues("-32700").Inc() - writeRPCError(w, nil, -32700, "could not read request body") + log.Info("rejected request with bad rpc request", "source", "rpc", "err", err) + RecordRPCError(SourceClient, err) + writeRPCError(w, nil, err) return } - req := new(RPCReq) - if err := json.Unmarshal(body, req); err != nil { - rpcErrorsCtr.WithLabelValues("-32700").Inc() - writeRPCError(w, nil, -32700, "invalid JSON") + backendRes, err := s.backends.Forward(req) + if err != nil { + if errors.Is(err, ErrNoBackends) { + RecordUnserviceableRequest(RPCRequestSourceHTTP) + RecordRPCError(SourceProxyd, err) + } else if errors.Is(err, ErrMethodNotWhitelisted) { + RecordRPCError(SourceClient, err) + } else { + RecordRPCError(SourceBackend, err) + } + log.Error("error forwarding RPC request", "method", req.Method, "err", err) + writeRPCError(w, req.ID, err) return } + if backendRes.IsError() { + RecordRPCError(SourceBackend, backendRes.Error) + } - if req.JSONRPC != JSONRPCVersion { - rpcErrorsCtr.WithLabelValues("-32600").Inc() - writeRPCError(w, nil, -32600, "invalid json-rpc version") + enc := json.NewEncoder(w) + if err := enc.Encode(backendRes); err != nil { + log.Error("error encoding response", "err", err) + RecordRPCError(SourceProxyd, err) + writeRPCError(w, req.ID, err) return } - group, err := s.mappings.BackendGroupFor(req.Method) + log.Debug("forwarded RPC method", "method", req.Method) +} + +func (s *Server) HandleWS(w http.ResponseWriter, r *http.Request) { + clientConn, err := s.upgrader.Upgrade(w, r, nil) if err != nil { - rpcErrorsCtr.WithLabelValues("-32601").Inc() - blockedRPCsCtr.WithLabelValues(req.Method).Inc() - log.Info("blocked request for non-whitelisted method", "method", req.Method) - writeRPCError(w, req.ID, -32601, "method not found") + log.Error("error upgrading client conn", "err", err) return } - backendRes, err := group.Forward(req) + proxier, err := s.backends.ProxyWS(clientConn) if err != nil { - log.Error("error forwarding RPC request", "group", group.Name, "method", req.Method, "err", err) - rpcErrorsCtr.WithLabelValues("-32603").Inc() - msg := "error fetching data from upstream" - if errors.Is(err, ErrBackendsInconsistent) { - msg = ErrBackendsInconsistent.Error() + if errors.Is(err, ErrNoBackends) { + RecordUnserviceableRequest(RPCRequestSourceWS) } - writeRPCError(w, req.ID, -32603, msg) + log.Error("error dialing ws backend", "err", err) + clientConn.Close() return } - enc := json.NewEncoder(w) - if err := enc.Encode(backendRes); err != nil { - log.Error("error encoding response", "err", err) - return - } - rpcRequestsCtr.WithLabelValues(req.Method).Inc() - log.Debug("forwarded RPC method", "method", req.Method, "group", group.Name) + activeClientWsConnsGauge.Inc() + go func() { + // Below call blocks so run it in a goroutine. + if err := proxier.Proxy(); err != nil { + log.Error("error proxying websocket", "err", err) + } + activeClientWsConnsGauge.Dec() + }() } -func writeRPCError(w http.ResponseWriter, id *int, code int, msg string) { +func writeRPCError(w http.ResponseWriter, id *int, err error) { enc := json.NewEncoder(w) w.WriteHeader(200) - body := &RPCRes{ - ID: id, - Error: &RPCErr{ - Code: code, - Message: msg, - }, + + var body *RPCRes + if r, ok := err.(*RPCErr); ok { + body = NewRPCErrorRes(id, r) + } else { + body = NewRPCErrorRes(id, &RPCErr{ + Code: JSONRPCErrorInternal, + Message: "internal error", + }) } if err := enc.Encode(body); err != nil { - log.Error("error writing RPC error", "err", err) + log.Error("error writing rpc error", "err", err) } } func instrumentedHdlr(h http.Handler) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - httpRequestsCtr.Inc() - timer := prometheus.NewTimer(httpRequestDurationSummary) - defer timer.ObserveDuration() + httpRequestsTotal.Inc() + respTimer := prometheus.NewTimer(httpRequestDurationSumm) h.ServeHTTP(w, r) + respTimer.ObserveDuration() } } diff --git a/go/proxyd/string_set.go b/go/proxyd/string_set.go new file mode 100644 index 0000000000000..4582349196116 --- /dev/null +++ b/go/proxyd/string_set.go @@ -0,0 +1,56 @@ +package proxyd + +import "sync" + +type StringSet struct { + underlying map[string]bool + mtx sync.RWMutex +} + +func NewStringSet() *StringSet { + return &StringSet{ + underlying: make(map[string]bool), + } +} + +func NewStringSetFromStrings(in []string) *StringSet { + underlying := make(map[string]bool) + for _, str := range in { + underlying[str] = true + } + return &StringSet{ + underlying: underlying, + } +} + +func (s *StringSet) Has(test string) bool { + s.mtx.RLock() + defer s.mtx.RUnlock() + return s.underlying[test] +} + +func (s *StringSet) Add(str string) { + s.mtx.Lock() + defer s.mtx.Unlock() + s.underlying[str] = true +} + +func (s *StringSet) Entries() []string { + s.mtx.RLock() + defer s.mtx.RUnlock() + out := make([]string, len(s.underlying)) + var i int + for entry := range s.underlying { + out[i] = entry + i++ + } + return out +} + +func (s *StringSet) Extend(in []string) *StringSet { + out := NewStringSetFromStrings(in) + for k := range s.underlying { + out.Add(k) + } + return out +}