Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added delivery options to broker create #1670

Merged
merged 7 commits into from
May 26, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions docs/cmd/kn_broker_create.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ kn broker create NAME
### Options

```
--class string Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available)
-h, --help help for create
-n, --namespace string Specify the namespace to operate in.
--backoff-delay string The delay before retrying.
--backoff-policy string The retry backoff policy (linear, exponential).
--class string Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available).
--dl-sink string The sink receiving event that could not be sent to a destination.
-h, --help help for create
-n, --namespace string Specify the namespace to operate in.
--retry int32 The minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
--retry-after-max string An optional upper bound on the duration specified in a "Retry-After" header when calculating backoff times for retrying 429 and 503 response codes. Setting the value to zero ("PT0S") can be used to opt-out of respecting "Retry-After" header values altogether. This value only takes effect if "Retry" is configured, and also depends on specific implementations (Channels, Sources, etc.) choosing to provide this capability.
--timeout string The timeout of each single request. The value must be greater than 0.
```

### Options inherited from parent commands
Expand Down
81 changes: 77 additions & 4 deletions pkg/eventing/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ import (
"fmt"
"time"

"knative.dev/client/pkg/config"

"k8s.io/client-go/util/retry"

apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/retry"
"knative.dev/client/pkg/config"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/client/clientset/versioned/scheme"
clientv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
Expand Down Expand Up @@ -387,6 +386,80 @@ func (b *BrokerBuilder) Class(class string) *BrokerBuilder {
return b
}

// DlSink for the broker builder
func (b *BrokerBuilder) DlSink(dlSink *duckv1.Destination) *BrokerBuilder {
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
empty := duckv1.Destination{}
if dlSink == nil || *dlSink == empty {
return b
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.DeadLetterSink = dlSink
return b
}

// Retry for the broker builder
func (b *BrokerBuilder) Retry(retry *int32) *BrokerBuilder {
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
if retry == nil || *retry == 0 {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.Retry = retry
return b
}

// Timeout for the broker builder
func (b *BrokerBuilder) Timeout(timeout *string) *BrokerBuilder {
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
if timeout == nil || *timeout == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.Timeout = timeout
return b
}

// BackoffPolicy for the broker builder
func (b *BrokerBuilder) BackoffPolicy(policyType *v1.BackoffPolicyType) *BrokerBuilder {
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
if policyType == nil || *policyType == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.BackoffPolicy = policyType
return b
}

// BackoffDelay for the broker builder
func (b *BrokerBuilder) BackoffDelay(backoffDelay *string) *BrokerBuilder {
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
if backoffDelay == nil || *backoffDelay == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.BackoffDelay = backoffDelay
return b
}

// RetryAfterMax for the broker builder
func (b *BrokerBuilder) RetryAfterMax(max *string) *BrokerBuilder {
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
if max == nil || *max == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.RetryAfterMax = max
return b

}

// Build to return an instance of broker object
func (b *BrokerBuilder) Build() *eventingv1.Broker {
return b.broker
Expand Down
83 changes: 83 additions & 0 deletions pkg/eventing/v1/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
v1 "knative.dev/eventing/pkg/apis/duck/v1"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -334,6 +335,8 @@ func TestBrokerCreate(t *testing.T) {

objNew := newBroker(name)
brokerObjWithClass := newBrokerWithClass(name)
brokerObjWithDeliveryOptions := newBrokerWithDeliveryOptions(name)
brokerObjWithNilDeliveryOptions := newBrokerWithNilDeliveryOptions(name)

server.AddReactor("create", "brokers",
func(a client_testing.Action) (bool, runtime.Object, error) {
Expand All @@ -360,6 +363,56 @@ func TestBrokerCreate(t *testing.T) {
err := client.CreateBroker(context.Background(), newBroker("unknown"))
assert.ErrorContains(t, err, "unknown")
})

t.Run("create broker with delivery options", func(t *testing.T) {
err := client.CreateBroker(context.Background(), brokerObjWithDeliveryOptions)
assert.NilError(t, err)
})

t.Run("create broker with nil delivery options", func(t *testing.T) {
err := client.CreateBroker(context.Background(), brokerObjWithNilDeliveryOptions)
assert.NilError(t, err)
})

t.Run("create broker with nil delivery spec", func(t *testing.T) {
builderFuncs := []func(builder *BrokerBuilder) *BrokerBuilder{
func(builder *BrokerBuilder) *BrokerBuilder {
var sink = &duckv1.Destination{
Ref: &duckv1.KReference{Name: "test-svc", Kind: "Service", APIVersion: "serving.knative.dev/v1", Namespace: "default"},
}
return builder.DlSink(sink)
},
func(builder *BrokerBuilder) *BrokerBuilder {
var retry int32 = 5
return builder.Retry(&retry)
},
func(builder *BrokerBuilder) *BrokerBuilder {
var timeout = "PT5S"
return builder.Timeout(&timeout)
},
func(builder *BrokerBuilder) *BrokerBuilder {
var policy = v1.BackoffPolicyType("linear")
return builder.BackoffPolicy(&policy)
},
func(builder *BrokerBuilder) *BrokerBuilder {
var delay = "PT5S"
return builder.BackoffDelay(&delay)
},
func(builder *BrokerBuilder) *BrokerBuilder {
var max = "PT5S"
return builder.RetryAfterMax(&max)
},
}
for _, bf := range builderFuncs {
brokerBuilder := NewBrokerBuilder(name)
brokerBuilder.broker.Spec.Delivery = nil
updatedBuilder := bf(brokerBuilder)

broker := updatedBuilder.Build()
err := client.CreateBroker(context.Background(), broker)
assert.NilError(t, err)
}
})
}

func TestBrokerGet(t *testing.T) {
Expand Down Expand Up @@ -518,6 +571,36 @@ func newBrokerWithClass(name string) *eventingv1.Broker {
Build()
}

func newBrokerWithDeliveryOptions(name string) *eventingv1.Broker {
sink := &duckv1.Destination{
Ref: &duckv1.KReference{Name: "test-svc", Kind: "Service", APIVersion: "serving.knative.dev/v1", Namespace: "default"},
}
testTimeout := "PT10S"
retry := int32(2)
policy := v1.BackoffPolicyType("linear")
return NewBrokerBuilder(name).
Namespace(testNamespace).
DlSink(sink).
Timeout(&testTimeout).
Retry(&retry).
BackoffDelay(&testTimeout).
BackoffPolicy(&policy).
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
RetryAfterMax(&testTimeout).
Build()
}

func newBrokerWithNilDeliveryOptions(name string) *eventingv1.Broker {
return NewBrokerBuilder(name).
Namespace(testNamespace).
DlSink(nil).
Timeout(nil).
Retry(nil).
BackoffDelay(nil).
BackoffPolicy(nil).
RetryAfterMax(nil).
Build()
}

func getBrokerDeleteEvents(name string) []watch.Event {
return []watch.Event{
{Type: watch.Added, Object: createBrokerWithConditions(name, corev1.ConditionUnknown, corev1.ConditionUnknown, "", "msg1")},
Expand Down
48 changes: 48 additions & 0 deletions pkg/kn/commands/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@ package broker
import (
"bytes"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
"knative.dev/client/pkg/dynamic"
dynamicfake "knative.dev/client/pkg/dynamic/fake"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
servingv1 "knative.dev/serving/pkg/apis/serving/v1"

clientv1beta1 "knative.dev/client/pkg/eventing/v1"
"knative.dev/client/pkg/kn/commands"
Expand All @@ -28,6 +34,12 @@ import (
// Helper methods
var blankConfig clientcmd.ClientConfig

const (
testSvc = "test-svc"
testTimeout = "PT10S"
testRetry = int32(5)
)

func init() {
var err error
blankConfig, err = clientcmd.NewClientConfigFromBytes([]byte(`kind: Config
Expand Down Expand Up @@ -61,6 +73,14 @@ func executeBrokerCommand(brokerClient clientv1beta1.KnEventingClient, args ...s
return brokerClient, nil
}

mysvc := &servingv1.Service{
TypeMeta: metav1.TypeMeta{Kind: "Service", APIVersion: "serving.knative.dev/v1"},
ObjectMeta: metav1.ObjectMeta{Name: testSvc, Namespace: "default"},
}
knParams.NewDynamicClient = func(namespace string) (dynamic.KnDynamicClient, error) {
return dynamicfake.CreateFakeKnDynamicClient("default", mysvc), nil
}

cmd := NewBrokerCommand(knParams)
cmd.SetArgs(args)
cmd.SetOutput(output)
Expand All @@ -85,3 +105,31 @@ func createBrokerWithNamespace(brokerName, namespace string) *v1beta1.Broker {
func createBrokerWithClass(brokerName, class string) *v1beta1.Broker {
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").Class(class).Build()
}

func createBrokerWithDlSink(brokerName, service string) *v1beta1.Broker {
sink := &duckv1.Destination{
Ref: &duckv1.KReference{Name: service, Kind: "Service", APIVersion: "serving.knative.dev/v1", Namespace: "default"},
}
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").DlSink(sink).Build()
}

func createBrokerWithTimeout(brokerName, timeout string) *v1beta1.Broker {
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").Timeout(&timeout).Build()
}

func createBrokerWithRetry(brokerName string, retry int32) *v1beta1.Broker {
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").Retry(&retry).Build()
}

func createBrokerWithBackoffPolicy(brokerName, policy string) *v1beta1.Broker {
boPolicy := v1.BackoffPolicyType(policy)
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").BackoffPolicy(&boPolicy).Build()
}

func createBrokerWithBackoffDelay(brokerName, delay string) *v1beta1.Broker {
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").BackoffDelay(&delay).Build()
}

func createBrokerWithRetryAfterMax(brokerName, timeout string) *v1beta1.Broker {
return clientv1beta1.NewBrokerBuilder(brokerName).Namespace("default").RetryAfterMax(&timeout).Build()
}
31 changes: 29 additions & 2 deletions pkg/kn/commands/broker/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"fmt"

"github.com/spf13/cobra"
"knative.dev/client/pkg/kn/commands/flags"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"

clientv1beta1 "knative.dev/client/pkg/eventing/v1"
"knative.dev/client/pkg/kn/commands"
Expand All @@ -39,6 +42,7 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command {

var className string

var deliveryFlags DeliveryOptionFlags
cmd := &cobra.Command{
Use: "create NAME",
Short: "Create a broker",
Expand All @@ -59,10 +63,32 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command {
return err
}

dynamicClient, err := p.NewDynamicClient(namespace)
if err != nil {
return err
}

var empty = flags.SinkFlags{}
var destination *duckv1.Destination
if deliveryFlags.SinkFlags != empty {
destination, err = deliveryFlags.SinkFlags.ResolveSink(cmd.Context(), dynamicClient, namespace)
if err != nil {
return err
}
}

backoffPolicy := v1.BackoffPolicyType(deliveryFlags.BackoffPolicy)

brokerBuilder := clientv1beta1.
NewBrokerBuilder(name).
Namespace(namespace).
Class(className)
Class(className).
DlSink(destination).
Retry(&deliveryFlags.RetryCount).
Timeout(&deliveryFlags.Timeout).
BackoffPolicy(&backoffPolicy).
BackoffDelay(&deliveryFlags.BackoffDelay).
RetryAfterMax(&deliveryFlags.RetryAfterMax)

err = eventingClient.CreateBroker(cmd.Context(), brokerBuilder.Build())
if err != nil {
Expand All @@ -75,6 +101,7 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command {
},
}
commands.AddNamespaceFlags(cmd.Flags(), false)
cmd.Flags().StringVar(&className, "class", "", "Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available)")
cmd.Flags().StringVar(&className, "class", "", "Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available).")
deliveryFlags.Add(cmd)
return cmd
}
Loading