From 16f1bde27b42b519d96c53f2e25ed82ec79331e0 Mon Sep 17 00:00:00 2001 From: Vighneswar Rao Bojja Date: Wed, 25 Sep 2019 08:30:52 +0530 Subject: [PATCH] Add support queue in upstreams for plus --- docs/virtualserver-and-virtualserverroute.md | 20 ++++ internal/configs/version2/config.go | 7 ++ .../version2/nginx-plus.virtualserver.tmpl | 4 + internal/configs/version2/templates_test.go | 1 + internal/configs/virtualserver.go | 12 +++ internal/configs/virtualserver_test.go | 96 ++++++++++++++++++- pkg/apis/configuration/v1alpha1/types.go | 7 ++ .../v1alpha1/zz_generated.deepcopy.go | 21 ++++ .../configuration/validation/validation.go | 21 ++++ .../validation/validation_test.go | 70 ++++++++++++++ 10 files changed, 256 insertions(+), 3 deletions(-) diff --git a/docs/virtualserver-and-virtualserverroute.md b/docs/virtualserver-and-virtualserverroute.md index 32010d3bf1..77ee5ee859 100644 --- a/docs/virtualserver-and-virtualserverroute.md +++ b/docs/virtualserver-and-virtualserverroute.md @@ -20,6 +20,7 @@ This document is the reference documentation for the resources. To see additiona - [Upstream](#upstream) - [Upstream.Buffers](#upstreambuffers) - [Upstream.TLS](#upstreamtls) + - [Upstream.Queue](#upstreamqueue) - [Upstream.Healthcheck](#upstreamhealthcheck) - [Header](#header) - [Split](#split) @@ -219,6 +220,7 @@ tls: | `tls` | The TLS configuration for the Upstream. | [`tls`](#UpstreamTLS) | No | | `healthCheck` | The health check configuration for the Upstream. See the [health_check](http://nginx.org/en/docs/http/ngx_http_upstream_hc_module.html#health_check) directive. Note: this feature is supported only in NGINX Plus. | [`healthcheck`](#UpstreamHealthcheck) | No | | `slow-start` | The slow start allows an upstream server to gradually recover its weight from 0 to its nominal value after it has been recovered or became available or when the server becomes available after a period of time it was considered unavailable. By default, the slow start is disabled. See the [slow_start](https://nginx.org/en/docs/http/ngx_http_upstream_module.html#slow_start) parameter of the server directive. Note: The parameter cannot be used along with the `random`, `hash` or `ip_hash` load balancing methods and will be ignored. | `string` | No | +| `queue` | Configures a queue for an upstream. A client request will be placed into the queue if an upstream server cannot be selected immediately while processing the request. By default, no queue is configured. Note: this feature is supported only in NGINX Plus.| [`queue`](#upstreamQueue) | No | | `buffering` | Enables buffering of responses from the upstream server. See the [proxy_buffering](https://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering) directive. The default is set in the `proxy-buffering` ConfigMap key. | `boolean` | No | | `buffers` | Configures the buffers used for reading a response from the upstream server for a single connection. | [`buffers`](#UpstreamBuffers) | No | | `buffer-size` | Sets the size of the buffer used for reading the first part of a response received from the upstream server. See the [proxy_buffer_size](https://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffer_size) directive. The default is set in the `proxy-buffer-size` ConfigMap key. | `string` | No | @@ -243,6 +245,24 @@ See the [proxy_buffers](https://nginx.org/en/docs/http/ngx_http_proxy_module.htm | ----- | ----------- | ---- | -------- | | `enable` | Enables HTTPS for requests to upstream servers. The default is `False`, meaning that HTTP will be used. | `boolean` | No | +### Upstream.Queue + +The queue field configures a queue. A client request will be placed into the queue if an upstream server cannot be selected immediately while processing the request: + +```yaml +size: 10 +timeout: 60s +``` + +See [`queue`](http://nginx.org/en/docs/http/ngx_http_upstream_module.html#queue) directive for additional information. + +Note: This feature is supported only in NGINX Plus. + +| Field | Description | Type | Required | +| ----- | ----------- | ---- | -------- | +| `size` | The size of the queue. | `int` | Yes | +| `timeout` | The timeout of the queue. A request cannot be queued for a period longer than the timeout. The default is `60s`. | `string` | No | + ### Upstream.Healthcheck The Healthcheck defines an [active health check](https://docs.nginx.com/nginx/admin-guide/load-balancer/http-health-check/). In the example below we enable a health check for an upstream and configure all the available parameters: diff --git a/internal/configs/version2/config.go b/internal/configs/version2/config.go index f54612f363..81c9816dad 100644 --- a/internal/configs/version2/config.go +++ b/internal/configs/version2/config.go @@ -21,6 +21,7 @@ type Upstream struct { SlowStart string FailTimeout string UpstreamZoneSize string + Queue *Queue } // UpstreamServer defines an upstream server. @@ -127,3 +128,9 @@ type StatusMatch struct { Name string Code string } + +// Queue defines a queue in upstream. +type Queue struct { + Size int + Timeout string +} diff --git a/internal/configs/version2/nginx-plus.virtualserver.tmpl b/internal/configs/version2/nginx-plus.virtualserver.tmpl index 906ff97fb0..8df70103e8 100644 --- a/internal/configs/version2/nginx-plus.virtualserver.tmpl +++ b/internal/configs/version2/nginx-plus.virtualserver.tmpl @@ -11,6 +11,10 @@ upstream {{ $u.Name }} { {{ if $u.Keepalive }} keepalive {{ $u.Keepalive }}; {{ end }} + + {{ if $u.Queue }} + queue {{ $u.Queue.Size }} timeout={{ $u.Queue.Timeout }}; + {{ end }} } {{ end }} diff --git a/internal/configs/version2/templates_test.go b/internal/configs/version2/templates_test.go index 9f45d98ed6..93e32dcef2 100644 --- a/internal/configs/version2/templates_test.go +++ b/internal/configs/version2/templates_test.go @@ -21,6 +21,7 @@ var virtualServerCfg = VirtualServerConfig{ MaxConns: 31, SlowStart: "10s", UpstreamZoneSize: "256k", + Queue: &Queue{Size: 10, Timeout: "60s"}, }, { Name: "coffee-v1", diff --git a/internal/configs/virtualserver.go b/internal/configs/virtualserver.go index f79083066c..896c66b217 100644 --- a/internal/configs/virtualserver.go +++ b/internal/configs/virtualserver.go @@ -369,6 +369,7 @@ func generateUpstream(upstreamName string, upstream conf_v1alpha1.Upstream, isEx if isPlus { ups.SlowStart = generateSlowStartForPlus(upstream, lbMethod) + ups.Queue = generateQueueForPlus(upstream.Queue, "60s") } return ups @@ -763,3 +764,14 @@ func createUpstreamServersConfigForPlus(upstream version2.Upstream) nginx.Server SlowStart: upstream.SlowStart, } } + +func generateQueueForPlus(upstreamQueue *conf_v1alpha1.UpstreamQueue, defaultTimeout string) *version2.Queue { + if upstreamQueue == nil { + return nil + } + + return &version2.Queue{ + Size: upstreamQueue.Size, + Timeout: generateString(upstreamQueue.Timeout, defaultTimeout), + } +} diff --git a/internal/configs/virtualserver_test.go b/internal/configs/virtualserver_test.go index dae261d5d7..75c58650d3 100644 --- a/internal/configs/virtualserver_test.go +++ b/internal/configs/virtualserver_test.go @@ -770,7 +770,7 @@ func TestGenerateVirtualServerConfigForVirtualServerWithRules(t *testing.T) { func TestGenerateUpstream(t *testing.T) { name := "test-upstream" - upstream := conf_v1alpha1.Upstream{Service: name, Port: 80, SlowStart: "10s"} + upstream := conf_v1alpha1.Upstream{Service: name, Port: 80} endpoints := []string{ "192.168.10.10:8080", } @@ -796,10 +796,9 @@ func TestGenerateUpstream(t *testing.T) { LBMethod: "random", Keepalive: 21, UpstreamZoneSize: "256k", - SlowStart: "", } - result := generateUpstream(name, upstream, false, endpoints, &cfgParams, true) + result := generateUpstream(name, upstream, false, endpoints, &cfgParams, false) if !reflect.DeepEqual(result, expected) { t.Errorf("generateUpstream() returned %v but expected %v", result, expected) } @@ -2095,3 +2094,94 @@ func TestCreateEndpointsFromUpstream(t *testing.T) { t.Errorf("createEndpointsFromUpstream returned %v, but expected %v", endpoints, expected) } } + +func TestGenerateUpstreamWithQueue(t *testing.T) { + serviceName := "test-queue" + + tests := []struct { + name string + upstream conf_v1alpha1.Upstream + isPlus bool + expected version2.Upstream + msg string + }{ + { + name: "test-upstream-queue", + upstream: conf_v1alpha1.Upstream{Service: serviceName, Port: 80, Queue: &conf_v1alpha1.UpstreamQueue{ + Size: 10, + Timeout: "10s", + }}, + isPlus: true, + expected: version2.Upstream{ + Name: "test-upstream-queue", + Queue: &version2.Queue{ + Size: 10, + Timeout: "10s", + }, + }, + msg: "upstream queue with size and timeout", + }, + { + name: "test-upstream-queue-with-default-timeout", + upstream: conf_v1alpha1.Upstream{Service: serviceName, Port: 80, Queue: &conf_v1alpha1.UpstreamQueue{Size: 10, Timeout: ""}}, + isPlus: true, + expected: version2.Upstream{ + Name: "test-upstream-queue-with-default-timeout", + Queue: &version2.Queue{ + Size: 10, + Timeout: "60s", + }, + }, + msg: "upstream queue with only size", + }, + { + name: "test-upstream-queue-nil", + upstream: conf_v1alpha1.Upstream{Service: serviceName, Port: 80, Queue: nil}, + isPlus: false, + expected: version2.Upstream{ + Name: "test-upstream-queue-nil", + }, + msg: "upstream queue with nil for OSS", + }, + } + + for _, test := range tests { + result := generateUpstream(test.name, test.upstream, false, []string{}, &ConfigParams{}, test.isPlus) + if !reflect.DeepEqual(result, test.expected) { + t.Errorf("generateUpstream() returned %v but expected %v for the case of %v", result, test.expected, test.msg) + } + } + +} + +func TestGenerateQueueForPlus(t *testing.T) { + tests := []struct { + upstreamQueue *conf_v1alpha1.UpstreamQueue + expected *version2.Queue + msg string + }{ + { + upstreamQueue: &conf_v1alpha1.UpstreamQueue{Size: 10, Timeout: "10s"}, + expected: &version2.Queue{Size: 10, Timeout: "10s"}, + msg: "upstream queue with size and timeout", + }, + { + upstreamQueue: nil, + expected: nil, + msg: "upstream queue with nil", + }, + { + upstreamQueue: &conf_v1alpha1.UpstreamQueue{Size: 10}, + expected: &version2.Queue{Size: 10, Timeout: "60s"}, + msg: "upstream queue with only size", + }, + } + + for _, test := range tests { + result := generateQueueForPlus(test.upstreamQueue, "60s") + if !reflect.DeepEqual(result, test.expected) { + t.Errorf("generateQueueForPlus() returned %v but expected %v for the case of %v", result, test.expected, test.msg) + } + } + +} diff --git a/pkg/apis/configuration/v1alpha1/types.go b/pkg/apis/configuration/v1alpha1/types.go index a0b16b84a0..de9cefcb75 100644 --- a/pkg/apis/configuration/v1alpha1/types.go +++ b/pkg/apis/configuration/v1alpha1/types.go @@ -46,6 +46,7 @@ type Upstream struct { TLS UpstreamTLS `json:"tls"` HealthCheck *HealthCheck `json:"healthCheck"` SlowStart string `json:"slow-start"` + Queue *UpstreamQueue `json:"queue"` } // UpstreamBuffers defines Buffer Configuration for an Upstream @@ -157,3 +158,9 @@ type VirtualServerRouteList struct { Items []VirtualServerRoute `json:"items"` } + +// UpstreamQueue defines Queue Configuration for an Upstream +type UpstreamQueue struct { + Size int `json:"size"` + Timeout string `json:"timeout"` +} diff --git a/pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go index 65909e0ae8..4b4be791ca 100644 --- a/pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go @@ -207,6 +207,11 @@ func (in *Upstream) DeepCopyInto(out *Upstream) { *out = new(HealthCheck) (*in).DeepCopyInto(*out) } + if in.Queue != nil { + in, out := &in.Queue, &out.Queue + *out = new(UpstreamQueue) + **out = **in + } return } @@ -236,6 +241,22 @@ func (in *UpstreamBuffers) DeepCopy() *UpstreamBuffers { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *UpstreamQueue) DeepCopyInto(out *UpstreamQueue) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new UpstreamQueue. +func (in *UpstreamQueue) DeepCopy() *UpstreamQueue { + if in == nil { + return nil + } + out := new(UpstreamQueue) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *UpstreamTLS) DeepCopyInto(out *UpstreamTLS) { *out = *in diff --git a/pkg/apis/configuration/validation/validation.go b/pkg/apis/configuration/validation/validation.go index 1e50eafbf3..807a6e6b4b 100644 --- a/pkg/apis/configuration/validation/validation.go +++ b/pkg/apis/configuration/validation/validation.go @@ -374,6 +374,7 @@ func validateUpstreams(upstreams []v1alpha1.Upstream, fieldPath *field.Path, isP allErrs = append(allErrs, validateBuffer(u.ProxyBuffers, idxPath.Child("buffers"))...) allErrs = append(allErrs, validateSize(u.ProxyBufferSize, idxPath.Child("buffer-size"))...) allErrs = append(allErrs, rejectPlusResourcesInOSS(u, idxPath, isPlus)...) + allErrs = append(allErrs, validateQueue(u.Queue, idxPath.Child("queue"), isPlus)...) for _, msg := range validation.IsValidPortNum(int(u.Port)) { allErrs = append(allErrs, field.Invalid(idxPath.Child("port"), u.Port, msg)) @@ -815,3 +816,23 @@ func rejectPlusResourcesInOSS(upstream v1alpha1.Upstream, idxPath *field.Path, i return allErrs } + +func validateQueue(queue *v1alpha1.UpstreamQueue, fieldPath *field.Path, isPlus bool) field.ErrorList { + allErrs := field.ErrorList{} + + if queue == nil { + return allErrs + } + + if !isPlus { + allErrs = append(allErrs, field.Forbidden(fieldPath, "queue is only supported in NGINX Plus")) + return allErrs + } + + allErrs = append(allErrs, validateTime(queue.Timeout, fieldPath.Child("timeout"))...) + if queue.Size <= 0 { + allErrs = append(allErrs, field.Required(fieldPath.Child("size"), "must be positive")) + } + + return allErrs +} diff --git a/pkg/apis/configuration/validation/validation_test.go b/pkg/apis/configuration/validation/validation_test.go index d291068446..bbefc13fa2 100644 --- a/pkg/apis/configuration/validation/validation_test.go +++ b/pkg/apis/configuration/validation/validation_test.go @@ -2011,3 +2011,73 @@ func TestRejectPlusResourcesInOSS(t *testing.T) { } } + +func TestValidateQueue(t *testing.T) { + tests := []struct { + upstreamQueue *v1alpha1.UpstreamQueue + fieldPath *field.Path + isPlus bool + msg string + }{ + { + upstreamQueue: &v1alpha1.UpstreamQueue{Size: 10, Timeout: "10s"}, + fieldPath: field.NewPath("queue"), + isPlus: true, + msg: "valid upstream queue with size and timeout", + }, + { + upstreamQueue: nil, + fieldPath: field.NewPath("queue"), + isPlus: true, + msg: "upstream queue nil", + }, + { + upstreamQueue: nil, + fieldPath: field.NewPath("queue"), + isPlus: false, + msg: "upstream queue nil in OSS", + }, + } + + for _, test := range tests { + allErrs := validateQueue(test.upstreamQueue, test.fieldPath, test.isPlus) + if len(allErrs) != 0 { + t.Errorf("validateQueue() returned errors %v for valid input for the case of %s", allErrs, test.msg) + } + } +} + +func TestValidateQueueFails(t *testing.T) { + tests := []struct { + upstreamQueue *v1alpha1.UpstreamQueue + fieldPath *field.Path + isPlus bool + msg string + }{ + { + upstreamQueue: &v1alpha1.UpstreamQueue{Size: -1, Timeout: "10s"}, + fieldPath: field.NewPath("queue"), + isPlus: true, + msg: "upstream queue with invalid size", + }, + { + upstreamQueue: &v1alpha1.UpstreamQueue{Size: 10, Timeout: "-10"}, + fieldPath: field.NewPath("queue"), + isPlus: true, + msg: "upstream queue with invalid timeout", + }, + { + upstreamQueue: &v1alpha1.UpstreamQueue{Size: 10, Timeout: "10s"}, + fieldPath: field.NewPath("queue"), + isPlus: false, + msg: "upstream queue with valid size and timeout in OSS", + }, + } + + for _, test := range tests { + allErrs := validateQueue(test.upstreamQueue, test.fieldPath, test.isPlus) + if len(allErrs) == 0 { + t.Errorf("validateQueue() returned no errors for invalid input for the case of %s", test.msg) + } + } +}