Skip to content

Commit

Permalink
Add support queue in upstreams for plus
Browse files Browse the repository at this point in the history
  • Loading branch information
Vighneswar Rao Bojja committed Sep 25, 2019
1 parent 0bd357b commit 16f1bde
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 3 deletions.
20 changes: 20 additions & 0 deletions docs/virtualserver-and-virtualserverroute.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ This document is the reference documentation for the resources. To see additiona
- [Upstream](#upstream)
- [Upstream.Buffers](#upstreambuffers)
- [Upstream.TLS](#upstreamtls)
- [Upstream.Queue](#upstreamqueue)
- [Upstream.Healthcheck](#upstreamhealthcheck)
- [Header](#header)
- [Split](#split)
Expand Down Expand Up @@ -219,6 +220,7 @@ tls:
| `tls` | The TLS configuration for the Upstream. | [`tls`](#UpstreamTLS) | No |
| `healthCheck` | The health check configuration for the Upstream. See the [health_check](http://nginx.org/en/docs/http/ngx_http_upstream_hc_module.html#health_check) directive. Note: this feature is supported only in NGINX Plus. | [`healthcheck`](#UpstreamHealthcheck) | No |
| `slow-start` | The slow start allows an upstream server to gradually recover its weight from 0 to its nominal value after it has been recovered or became available or when the server becomes available after a period of time it was considered unavailable. By default, the slow start is disabled. See the [slow_start](https://nginx.org/en/docs/http/ngx_http_upstream_module.html#slow_start) parameter of the server directive. Note: The parameter cannot be used along with the `random`, `hash` or `ip_hash` load balancing methods and will be ignored. | `string` | No |
| `queue` | Configures a queue for an upstream. A client request will be placed into the queue if an upstream server cannot be selected immediately while processing the request. By default, no queue is configured. Note: this feature is supported only in NGINX Plus.| [`queue`](#upstreamQueue) | No |
| `buffering` | Enables buffering of responses from the upstream server. See the [proxy_buffering](https://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffering) directive. The default is set in the `proxy-buffering` ConfigMap key. | `boolean` | No |
| `buffers` | Configures the buffers used for reading a response from the upstream server for a single connection. | [`buffers`](#UpstreamBuffers) | No |
| `buffer-size` | Sets the size of the buffer used for reading the first part of a response received from the upstream server. See the [proxy_buffer_size](https://nginx.org/en/docs/http/ngx_http_proxy_module.html#proxy_buffer_size) directive. The default is set in the `proxy-buffer-size` ConfigMap key. | `string` | No |
Expand All @@ -243,6 +245,24 @@ See the [proxy_buffers](https://nginx.org/en/docs/http/ngx_http_proxy_module.htm
| ----- | ----------- | ---- | -------- |
| `enable` | Enables HTTPS for requests to upstream servers. The default is `False`, meaning that HTTP will be used. | `boolean` | No |

### Upstream.Queue

The queue field configures a queue. A client request will be placed into the queue if an upstream server cannot be selected immediately while processing the request:

```yaml
size: 10
timeout: 60s
```

See [`queue`](http://nginx.org/en/docs/http/ngx_http_upstream_module.html#queue) directive for additional information.

Note: This feature is supported only in NGINX Plus.

| Field | Description | Type | Required |
| ----- | ----------- | ---- | -------- |
| `size` | The size of the queue. | `int` | Yes |
| `timeout` | The timeout of the queue. A request cannot be queued for a period longer than the timeout. The default is `60s`. | `string` | No |

### Upstream.Healthcheck

The Healthcheck defines an [active health check](https://docs.nginx.com/nginx/admin-guide/load-balancer/http-health-check/). In the example below we enable a health check for an upstream and configure all the available parameters:
Expand Down
7 changes: 7 additions & 0 deletions internal/configs/version2/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Upstream struct {
SlowStart string
FailTimeout string
UpstreamZoneSize string
Queue *Queue
}

// UpstreamServer defines an upstream server.
Expand Down Expand Up @@ -127,3 +128,9 @@ type StatusMatch struct {
Name string
Code string
}

// Queue defines a queue in upstream.
type Queue struct {
Size int
Timeout string
}
4 changes: 4 additions & 0 deletions internal/configs/version2/nginx-plus.virtualserver.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ upstream {{ $u.Name }} {
{{ if $u.Keepalive }}
keepalive {{ $u.Keepalive }};
{{ end }}

{{ if $u.Queue }}
queue {{ $u.Queue.Size }} timeout={{ $u.Queue.Timeout }};
{{ end }}
}
{{ end }}

Expand Down
1 change: 1 addition & 0 deletions internal/configs/version2/templates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var virtualServerCfg = VirtualServerConfig{
MaxConns: 31,
SlowStart: "10s",
UpstreamZoneSize: "256k",
Queue: &Queue{Size: 10, Timeout: "60s"},
},
{
Name: "coffee-v1",
Expand Down
12 changes: 12 additions & 0 deletions internal/configs/virtualserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ func generateUpstream(upstreamName string, upstream conf_v1alpha1.Upstream, isEx

if isPlus {
ups.SlowStart = generateSlowStartForPlus(upstream, lbMethod)
ups.Queue = generateQueueForPlus(upstream.Queue, "60s")
}

return ups
Expand Down Expand Up @@ -763,3 +764,14 @@ func createUpstreamServersConfigForPlus(upstream version2.Upstream) nginx.Server
SlowStart: upstream.SlowStart,
}
}

func generateQueueForPlus(upstreamQueue *conf_v1alpha1.UpstreamQueue, defaultTimeout string) *version2.Queue {
if upstreamQueue == nil {
return nil
}

return &version2.Queue{
Size: upstreamQueue.Size,
Timeout: generateString(upstreamQueue.Timeout, defaultTimeout),
}
}
96 changes: 93 additions & 3 deletions internal/configs/virtualserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ func TestGenerateVirtualServerConfigForVirtualServerWithRules(t *testing.T) {

func TestGenerateUpstream(t *testing.T) {
name := "test-upstream"
upstream := conf_v1alpha1.Upstream{Service: name, Port: 80, SlowStart: "10s"}
upstream := conf_v1alpha1.Upstream{Service: name, Port: 80}
endpoints := []string{
"192.168.10.10:8080",
}
Expand All @@ -796,10 +796,9 @@ func TestGenerateUpstream(t *testing.T) {
LBMethod: "random",
Keepalive: 21,
UpstreamZoneSize: "256k",
SlowStart: "",
}

result := generateUpstream(name, upstream, false, endpoints, &cfgParams, true)
result := generateUpstream(name, upstream, false, endpoints, &cfgParams, false)
if !reflect.DeepEqual(result, expected) {
t.Errorf("generateUpstream() returned %v but expected %v", result, expected)
}
Expand Down Expand Up @@ -2095,3 +2094,94 @@ func TestCreateEndpointsFromUpstream(t *testing.T) {
t.Errorf("createEndpointsFromUpstream returned %v, but expected %v", endpoints, expected)
}
}

func TestGenerateUpstreamWithQueue(t *testing.T) {
serviceName := "test-queue"

tests := []struct {
name string
upstream conf_v1alpha1.Upstream
isPlus bool
expected version2.Upstream
msg string
}{
{
name: "test-upstream-queue",
upstream: conf_v1alpha1.Upstream{Service: serviceName, Port: 80, Queue: &conf_v1alpha1.UpstreamQueue{
Size: 10,
Timeout: "10s",
}},
isPlus: true,
expected: version2.Upstream{
Name: "test-upstream-queue",
Queue: &version2.Queue{
Size: 10,
Timeout: "10s",
},
},
msg: "upstream queue with size and timeout",
},
{
name: "test-upstream-queue-with-default-timeout",
upstream: conf_v1alpha1.Upstream{Service: serviceName, Port: 80, Queue: &conf_v1alpha1.UpstreamQueue{Size: 10, Timeout: ""}},
isPlus: true,
expected: version2.Upstream{
Name: "test-upstream-queue-with-default-timeout",
Queue: &version2.Queue{
Size: 10,
Timeout: "60s",
},
},
msg: "upstream queue with only size",
},
{
name: "test-upstream-queue-nil",
upstream: conf_v1alpha1.Upstream{Service: serviceName, Port: 80, Queue: nil},
isPlus: false,
expected: version2.Upstream{
Name: "test-upstream-queue-nil",
},
msg: "upstream queue with nil for OSS",
},
}

for _, test := range tests {
result := generateUpstream(test.name, test.upstream, false, []string{}, &ConfigParams{}, test.isPlus)
if !reflect.DeepEqual(result, test.expected) {
t.Errorf("generateUpstream() returned %v but expected %v for the case of %v", result, test.expected, test.msg)
}
}

}

func TestGenerateQueueForPlus(t *testing.T) {
tests := []struct {
upstreamQueue *conf_v1alpha1.UpstreamQueue
expected *version2.Queue
msg string
}{
{
upstreamQueue: &conf_v1alpha1.UpstreamQueue{Size: 10, Timeout: "10s"},
expected: &version2.Queue{Size: 10, Timeout: "10s"},
msg: "upstream queue with size and timeout",
},
{
upstreamQueue: nil,
expected: nil,
msg: "upstream queue with nil",
},
{
upstreamQueue: &conf_v1alpha1.UpstreamQueue{Size: 10},
expected: &version2.Queue{Size: 10, Timeout: "60s"},
msg: "upstream queue with only size",
},
}

for _, test := range tests {
result := generateQueueForPlus(test.upstreamQueue, "60s")
if !reflect.DeepEqual(result, test.expected) {
t.Errorf("generateQueueForPlus() returned %v but expected %v for the case of %v", result, test.expected, test.msg)
}
}

}
7 changes: 7 additions & 0 deletions pkg/apis/configuration/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type Upstream struct {
TLS UpstreamTLS `json:"tls"`
HealthCheck *HealthCheck `json:"healthCheck"`
SlowStart string `json:"slow-start"`
Queue *UpstreamQueue `json:"queue"`
}

// UpstreamBuffers defines Buffer Configuration for an Upstream
Expand Down Expand Up @@ -157,3 +158,9 @@ type VirtualServerRouteList struct {

Items []VirtualServerRoute `json:"items"`
}

// UpstreamQueue defines Queue Configuration for an Upstream
type UpstreamQueue struct {
Size int `json:"size"`
Timeout string `json:"timeout"`
}
21 changes: 21 additions & 0 deletions pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions pkg/apis/configuration/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ func validateUpstreams(upstreams []v1alpha1.Upstream, fieldPath *field.Path, isP
allErrs = append(allErrs, validateBuffer(u.ProxyBuffers, idxPath.Child("buffers"))...)
allErrs = append(allErrs, validateSize(u.ProxyBufferSize, idxPath.Child("buffer-size"))...)
allErrs = append(allErrs, rejectPlusResourcesInOSS(u, idxPath, isPlus)...)
allErrs = append(allErrs, validateQueue(u.Queue, idxPath.Child("queue"), isPlus)...)

for _, msg := range validation.IsValidPortNum(int(u.Port)) {
allErrs = append(allErrs, field.Invalid(idxPath.Child("port"), u.Port, msg))
Expand Down Expand Up @@ -815,3 +816,23 @@ func rejectPlusResourcesInOSS(upstream v1alpha1.Upstream, idxPath *field.Path, i

return allErrs
}

func validateQueue(queue *v1alpha1.UpstreamQueue, fieldPath *field.Path, isPlus bool) field.ErrorList {
allErrs := field.ErrorList{}

if queue == nil {
return allErrs
}

if !isPlus {
allErrs = append(allErrs, field.Forbidden(fieldPath, "queue is only supported in NGINX Plus"))
return allErrs
}

allErrs = append(allErrs, validateTime(queue.Timeout, fieldPath.Child("timeout"))...)
if queue.Size <= 0 {
allErrs = append(allErrs, field.Required(fieldPath.Child("size"), "must be positive"))
}

return allErrs
}
70 changes: 70 additions & 0 deletions pkg/apis/configuration/validation/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2011,3 +2011,73 @@ func TestRejectPlusResourcesInOSS(t *testing.T) {
}

}

func TestValidateQueue(t *testing.T) {
tests := []struct {
upstreamQueue *v1alpha1.UpstreamQueue
fieldPath *field.Path
isPlus bool
msg string
}{
{
upstreamQueue: &v1alpha1.UpstreamQueue{Size: 10, Timeout: "10s"},
fieldPath: field.NewPath("queue"),
isPlus: true,
msg: "valid upstream queue with size and timeout",
},
{
upstreamQueue: nil,
fieldPath: field.NewPath("queue"),
isPlus: true,
msg: "upstream queue nil",
},
{
upstreamQueue: nil,
fieldPath: field.NewPath("queue"),
isPlus: false,
msg: "upstream queue nil in OSS",
},
}

for _, test := range tests {
allErrs := validateQueue(test.upstreamQueue, test.fieldPath, test.isPlus)
if len(allErrs) != 0 {
t.Errorf("validateQueue() returned errors %v for valid input for the case of %s", allErrs, test.msg)
}
}
}

func TestValidateQueueFails(t *testing.T) {
tests := []struct {
upstreamQueue *v1alpha1.UpstreamQueue
fieldPath *field.Path
isPlus bool
msg string
}{
{
upstreamQueue: &v1alpha1.UpstreamQueue{Size: -1, Timeout: "10s"},
fieldPath: field.NewPath("queue"),
isPlus: true,
msg: "upstream queue with invalid size",
},
{
upstreamQueue: &v1alpha1.UpstreamQueue{Size: 10, Timeout: "-10"},
fieldPath: field.NewPath("queue"),
isPlus: true,
msg: "upstream queue with invalid timeout",
},
{
upstreamQueue: &v1alpha1.UpstreamQueue{Size: 10, Timeout: "10s"},
fieldPath: field.NewPath("queue"),
isPlus: false,
msg: "upstream queue with valid size and timeout in OSS",
},
}

for _, test := range tests {
allErrs := validateQueue(test.upstreamQueue, test.fieldPath, test.isPlus)
if len(allErrs) == 0 {
t.Errorf("validateQueue() returned no errors for invalid input for the case of %s", test.msg)
}
}
}

0 comments on commit 16f1bde

Please sign in to comment.