Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support queue in upstreams for plus in VS/VSR #701

Merged
merged 1 commit into from
Sep 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions docs/virtualserver-and-virtualserverroute.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ This document is the reference documentation for the resources. To see additiona
- [Upstream](#upstream)
- [Upstream.Buffers](#upstreambuffers)
- [Upstream.TLS](#upstreamtls)
- [Upstream.Queue](#upstreamqueue)
- [Upstream.Healthcheck](#upstreamhealthcheck)
- [Header](#header)
- [Split](#split)
Expand Down Expand Up @@ -219,6 +220,7 @@ tls:
| `tls` | The TLS configuration for the Upstream. | [`tls`](#UpstreamTLS) | No |
| `healthCheck` | The health check configuration for the Upstream. See the [health_check](http://nginx.org/en/docs/http/ngx_http_upstream_hc_module.html#health_check) directive. Note: this feature is supported only in NGINX Plus. | [`healthcheck`](#UpstreamHealthcheck) | No |
| `slow-start` | The slow start allows an upstream server to gradually recover its weight from 0 to its nominal value after it has been recovered or became available or when the server becomes available after a period of time it was considered unavailable. By default, the slow start is disabled. See the [slow_start](https://nginx.org/en/docs/http/ngx_http_upstream_module.html#slow_start) parameter of the server directive. Note: The parameter cannot be used along with the `random`, `hash` or `ip_hash` load balancing methods and will be ignored. | `string` | No |
| `queue` | Configures a queue for an upstream. A client request will be placed into the queue if an upstream server cannot be selected immediately while processing the request. By default, no queue is configured. Note: this feature is supported only in NGINX Plus.| [`queue`](#upstreamQueue) | No |
| `buffering` | Enables buffering of responses from the upstream server. See the [proxy_buffering](https://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering) directive. The default is set in the `proxy-buffering` ConfigMap key. | `boolean` | No |
| `buffers` | Configures the buffers used for reading a response from the upstream server for a single connection. | [`buffers`](#UpstreamBuffers) | No |
| `buffer-size` | Sets the size of the buffer used for reading the first part of a response received from the upstream server. See the [proxy_buffer_size](https://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffer_size) directive. The default is set in the `proxy-buffer-size` ConfigMap key. | `string` | No |
Expand All @@ -243,6 +245,24 @@ See the [proxy_buffers](https://nginx.org/en/docs/http/ngx_http_proxy_module.htm
| ----- | ----------- | ---- | -------- |
| `enable` | Enables HTTPS for requests to upstream servers. The default is `False`, meaning that HTTP will be used. | `boolean` | No |

### Upstream.Queue

The queue field configures a queue. A client request will be placed into the queue if an upstream server cannot be selected immediately while processing the request:

```yaml
size: 10
timeout: 60s
```

See [`queue`](http://nginx.org/en/docs/http/ngx_http_upstream_module.html#queue) directive for additional information.

Note: This feature is supported only in NGINX Plus.

| Field | Description | Type | Required |
| ----- | ----------- | ---- | -------- |
| `size` | The size of the queue. | `int` | Yes |
| `timeout` | The timeout of the queue. A request cannot be queued for a period longer than the timeout. The default is `60s`. | `string` | No |

### Upstream.Healthcheck

The Healthcheck defines an [active health check](https://docs.nginx.com/nginx/admin-guide/load-balancer/http-health-check/). In the example below we enable a health check for an upstream and configure all the available parameters:
Expand Down
7 changes: 7 additions & 0 deletions internal/configs/version2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Upstream struct {
SlowStart string
FailTimeout string
UpstreamZoneSize string
Queue *Queue
}

// UpstreamServer defines an upstream server.
Expand Down Expand Up @@ -127,3 +128,9 @@ type StatusMatch struct {
Name string
Code string
}

// Queue defines a queue in upstream.
type Queue struct {
Size int
Timeout string
}
4 changes: 4 additions & 0 deletions internal/configs/version2/nginx-plus.virtualserver.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ upstream {{ $u.Name }} {
{{ if $u.Keepalive }}
keepalive {{ $u.Keepalive }};
{{ end }}

{{ if $u.Queue }}
queue {{ $u.Queue.Size }} timeout={{ $u.Queue.Timeout }};
{{ end }}
}
{{ end }}

Expand Down
1 change: 1 addition & 0 deletions internal/configs/version2/templates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var virtualServerCfg = VirtualServerConfig{
MaxConns: 31,
SlowStart: "10s",
UpstreamZoneSize: "256k",
Queue: &Queue{Size: 10, Timeout: "60s"},
},
{
Name: "coffee-v1",
Expand Down
12 changes: 12 additions & 0 deletions internal/configs/virtualserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ func generateUpstream(upstreamName string, upstream conf_v1alpha1.Upstream, isEx

if isPlus {
ups.SlowStart = generateSlowStartForPlus(upstream, lbMethod)
ups.Queue = generateQueueForPlus(upstream.Queue, "60s")
}

return ups
Expand Down Expand Up @@ -763,3 +764,14 @@ func createUpstreamServersConfigForPlus(upstream version2.Upstream) nginx.Server
SlowStart: upstream.SlowStart,
}
}

func generateQueueForPlus(upstreamQueue *conf_v1alpha1.UpstreamQueue, defaultTimeout string) *version2.Queue {
if upstreamQueue == nil {
return nil
}

return &version2.Queue{
Size: upstreamQueue.Size,
Timeout: generateString(upstreamQueue.Timeout, defaultTimeout),
}
}
96 changes: 93 additions & 3 deletions internal/configs/virtualserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ func TestGenerateVirtualServerConfigForVirtualServerWithRules(t *testing.T) {

func TestGenerateUpstream(t *testing.T) {
name := "test-upstream"
upstream := conf_v1alpha1.Upstream{Service: name, Port: 80, SlowStart: "10s"}
upstream := conf_v1alpha1.Upstream{Service: name, Port: 80}
endpoints := []string{
"192.168.10.10:8080",
}
Expand All @@ -796,10 +796,9 @@ func TestGenerateUpstream(t *testing.T) {
LBMethod: "random",
Keepalive: 21,
UpstreamZoneSize: "256k",
SlowStart: "",
}

result := generateUpstream(name, upstream, false, endpoints, &cfgParams, true)
result := generateUpstream(name, upstream, false, endpoints, &cfgParams, false)
if !reflect.DeepEqual(result, expected) {
t.Errorf("generateUpstream() returned %v but expected %v", result, expected)
}
Expand Down Expand Up @@ -2095,3 +2094,94 @@ func TestCreateEndpointsFromUpstream(t *testing.T) {
t.Errorf("createEndpointsFromUpstream returned %v, but expected %v", endpoints, expected)
}
}

func TestGenerateUpstreamWithQueue(t *testing.T) {
serviceName := "test-queue"

tests := []struct {
name string
upstream conf_v1alpha1.Upstream
isPlus bool
expected version2.Upstream
msg string
}{
{
name: "test-upstream-queue",
upstream: conf_v1alpha1.Upstream{Service: serviceName, Port: 80, Queue: &conf_v1alpha1.UpstreamQueue{
Size: 10,
Timeout: "10s",
}},
isPlus: true,
expected: version2.Upstream{
Name: "test-upstream-queue",
Queue: &version2.Queue{
Size: 10,
Timeout: "10s",
},
},
msg: "upstream queue with size and timeout",
},
{
name: "test-upstream-queue-with-default-timeout",
upstream: conf_v1alpha1.Upstream{Service: serviceName, Port: 80, Queue: &conf_v1alpha1.UpstreamQueue{Size: 10, Timeout: ""}},
isPlus: true,
expected: version2.Upstream{
Name: "test-upstream-queue-with-default-timeout",
Queue: &version2.Queue{
Size: 10,
Timeout: "60s",
},
},
msg: "upstream queue with only size",
},
{
name: "test-upstream-queue-nil",
upstream: conf_v1alpha1.Upstream{Service: serviceName, Port: 80, Queue: nil},
isPlus: false,
expected: version2.Upstream{
Name: "test-upstream-queue-nil",
},
msg: "upstream queue with nil for OSS",
},
}

for _, test := range tests {
result := generateUpstream(test.name, test.upstream, false, []string{}, &ConfigParams{}, test.isPlus)
if !reflect.DeepEqual(result, test.expected) {
t.Errorf("generateUpstream() returned %v but expected %v for the case of %v", result, test.expected, test.msg)
}
}

}

func TestGenerateQueueForPlus(t *testing.T) {
tests := []struct {
upstreamQueue *conf_v1alpha1.UpstreamQueue
expected *version2.Queue
msg string
}{
{
upstreamQueue: &conf_v1alpha1.UpstreamQueue{Size: 10, Timeout: "10s"},
expected: &version2.Queue{Size: 10, Timeout: "10s"},
msg: "upstream queue with size and timeout",
},
{
upstreamQueue: nil,
expected: nil,
msg: "upstream queue with nil",
},
{
upstreamQueue: &conf_v1alpha1.UpstreamQueue{Size: 10},
expected: &version2.Queue{Size: 10, Timeout: "60s"},
msg: "upstream queue with only size",
},
}

for _, test := range tests {
result := generateQueueForPlus(test.upstreamQueue, "60s")
if !reflect.DeepEqual(result, test.expected) {
t.Errorf("generateQueueForPlus() returned %v but expected %v for the case of %v", result, test.expected, test.msg)
}
}

}
7 changes: 7 additions & 0 deletions pkg/apis/configuration/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Upstream struct {
TLS UpstreamTLS `json:"tls"`
HealthCheck *HealthCheck `json:"healthCheck"`
SlowStart string `json:"slow-start"`
Queue *UpstreamQueue `json:"queue"`
}

// UpstreamBuffers defines Buffer Configuration for an Upstream
Expand Down Expand Up @@ -157,3 +158,9 @@ type VirtualServerRouteList struct {

Items []VirtualServerRoute `json:"items"`
}

// UpstreamQueue defines Queue Configuration for an Upstream
type UpstreamQueue struct {
Size int `json:"size"`
Timeout string `json:"timeout"`
}
21 changes: 21 additions & 0 deletions pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions pkg/apis/configuration/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ func validateUpstreams(upstreams []v1alpha1.Upstream, fieldPath *field.Path, isP
allErrs = append(allErrs, validateBuffer(u.ProxyBuffers, idxPath.Child("buffers"))...)
allErrs = append(allErrs, validateSize(u.ProxyBufferSize, idxPath.Child("buffer-size"))...)
allErrs = append(allErrs, rejectPlusResourcesInOSS(u, idxPath, isPlus)...)
allErrs = append(allErrs, validateQueue(u.Queue, idxPath.Child("queue"), isPlus)...)

for _, msg := range validation.IsValidPortNum(int(u.Port)) {
allErrs = append(allErrs, field.Invalid(idxPath.Child("port"), u.Port, msg))
Expand Down Expand Up @@ -815,3 +816,23 @@ func rejectPlusResourcesInOSS(upstream v1alpha1.Upstream, idxPath *field.Path, i

return allErrs
}

func validateQueue(queue *v1alpha1.UpstreamQueue, fieldPath *field.Path, isPlus bool) field.ErrorList {
allErrs := field.ErrorList{}

if queue == nil {
return allErrs
}

if !isPlus {
allErrs = append(allErrs, field.Forbidden(fieldPath, "queue is only supported in NGINX Plus"))
return allErrs
}

allErrs = append(allErrs, validateTime(queue.Timeout, fieldPath.Child("timeout"))...)
if queue.Size <= 0 {
allErrs = append(allErrs, field.Required(fieldPath.Child("size"), "must be positive"))
}

return allErrs
}
70 changes: 70 additions & 0 deletions pkg/apis/configuration/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2011,3 +2011,73 @@ func TestRejectPlusResourcesInOSS(t *testing.T) {
}

}

func TestValidateQueue(t *testing.T) {
tests := []struct {
upstreamQueue *v1alpha1.UpstreamQueue
fieldPath *field.Path
isPlus bool
msg string
}{
{
upstreamQueue: &v1alpha1.UpstreamQueue{Size: 10, Timeout: "10s"},
fieldPath: field.NewPath("queue"),
isPlus: true,
msg: "valid upstream queue with size and timeout",
},
{
upstreamQueue: nil,
fieldPath: field.NewPath("queue"),
isPlus: true,
msg: "upstream queue nil",
},
{
upstreamQueue: nil,
fieldPath: field.NewPath("queue"),
isPlus: false,
msg: "upstream queue nil in OSS",
},
}

for _, test := range tests {
allErrs := validateQueue(test.upstreamQueue, test.fieldPath, test.isPlus)
if len(allErrs) != 0 {
t.Errorf("validateQueue() returned errors %v for valid input for the case of %s", allErrs, test.msg)
}
}
}

func TestValidateQueueFails(t *testing.T) {
tests := []struct {
upstreamQueue *v1alpha1.UpstreamQueue
fieldPath *field.Path
isPlus bool
msg string
}{
{
upstreamQueue: &v1alpha1.UpstreamQueue{Size: -1, Timeout: "10s"},
fieldPath: field.NewPath("queue"),
isPlus: true,
msg: "upstream queue with invalid size",
},
{
upstreamQueue: &v1alpha1.UpstreamQueue{Size: 10, Timeout: "-10"},
fieldPath: field.NewPath("queue"),
isPlus: true,
msg: "upstream queue with invalid timeout",
},
{
upstreamQueue: &v1alpha1.UpstreamQueue{Size: 10, Timeout: "10s"},
fieldPath: field.NewPath("queue"),
isPlus: false,
msg: "upstream queue with valid size and timeout in OSS",
},
}

for _, test := range tests {
allErrs := validateQueue(test.upstreamQueue, test.fieldPath, test.isPlus)
if len(allErrs) == 0 {
t.Errorf("validateQueue() returned no errors for invalid input for the case of %s", test.msg)
}
}
}