Skip to content

Commit

Permalink
Added delivery options to broker create (#1670)
Browse files Browse the repository at this point in the history
* Added delivery flags

* Added test cases

* Added client tests

* Added unit tests and updated flag usage

* Added broker update command

* Added unit tests for update command

* Added mock and update retry tests
  • Loading branch information
vyasgun authored May 26, 2022
1 parent 4df6010 commit 2a56f07
Show file tree
Hide file tree
Showing 14 changed files with 908 additions and 10 deletions.
1 change: 1 addition & 0 deletions docs/cmd/kn_broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ kn broker
* [kn broker delete](kn_broker_delete.md) - Delete a broker
* [kn broker describe](kn_broker_describe.md) - Describe broker
* [kn broker list](kn_broker_list.md) - List brokers
* [kn broker update](kn_broker_update.md) - Update a broker

12 changes: 9 additions & 3 deletions docs/cmd/kn_broker_create.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ kn broker create NAME
### Options

```
--class string Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available)
-h, --help help for create
-n, --namespace string Specify the namespace to operate in.
--backoff-delay string The delay before retrying.
--backoff-policy string The retry backoff policy (linear, exponential).
--class string Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available).
--dl-sink string The sink receiving event that could not be sent to a destination.
-h, --help help for create
-n, --namespace string Specify the namespace to operate in.
--retry int32 The minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
--retry-after-max string An optional upper bound on the duration specified in a "Retry-After" header when calculating backoff times for retrying 429 and 503 response codes. Setting the value to zero ("PT0S") can be used to opt-out of respecting "Retry-After" header values altogether. This value only takes effect if "Retry" is configured, and also depends on specific implementations (Channels, Sources, etc.) choosing to provide this capability.
--timeout string The timeout of each single request. The value must be greater than 0.
```

### Options inherited from parent commands
Expand Down
47 changes: 47 additions & 0 deletions docs/cmd/kn_broker_update.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
## kn broker update

Update a broker

```
kn broker update NAME
```

### Examples

```
# Update a broker 'mybroker' in the current namespace with delivery sink svc1
kn broker update mybroker --dl-sink svc1
# Update a broker 'mybroker' in the 'myproject' namespace and with retry 2 seconds
kn broker update mybroker --namespace myproject --retry 2
```

### Options

```
--backoff-delay string The delay before retrying.
--backoff-policy string The retry backoff policy (linear, exponential).
--dl-sink string The sink receiving event that could not be sent to a destination.
-h, --help help for update
-n, --namespace string Specify the namespace to operate in.
--retry int32 The minimum number of retries the sender should attempt when sending an event before moving it to the dead letter sink.
--retry-after-max string An optional upper bound on the duration specified in a "Retry-After" header when calculating backoff times for retrying 429 and 503 response codes. Setting the value to zero ("PT0S") can be used to opt-out of respecting "Retry-After" header values altogether. This value only takes effect if "Retry" is configured, and also depends on specific implementations (Channels, Sources, etc.) choosing to provide this capability.
--timeout string The timeout of each single request. The value must be greater than 0.
```

### Options inherited from parent commands

```
--cluster string name of the kubeconfig cluster to use
--config string kn configuration file (default: ~/.config/kn/config.yaml)
--context string name of the kubeconfig context to use
--kubeconfig string kubectl configuration file (default: ~/.kube/config)
--log-http log http traffic
```

### SEE ALSO

* [kn broker](kn_broker.md) - Manage message brokers

132 changes: 128 additions & 4 deletions pkg/eventing/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ import (
"fmt"
"time"

"knative.dev/client/pkg/config"

"k8s.io/client-go/util/retry"

apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/retry"
"knative.dev/client/pkg/config"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/client/clientset/versioned/scheme"
clientv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
Expand All @@ -38,6 +37,7 @@ import (
)

type TriggerUpdateFunc func(origTrigger *eventingv1.Trigger) (*eventingv1.Trigger, error)
type BrokerUpdateFunc func(origBroker *eventingv1.Broker) (*eventingv1.Broker, error)

// KnEventingClient to Eventing Sources. All methods are relative to the
// namespace specified during construction
Expand All @@ -64,6 +64,10 @@ type KnEventingClient interface {
DeleteBroker(ctx context.Context, name string, timeout time.Duration) error
// ListBrokers returns list of broker CRDs
ListBrokers(ctx context.Context) (*eventingv1.BrokerList, error)
// UpdateBroker is used to update an instance of broker
UpdateBroker(ctx context.Context, broker *eventingv1.Broker) error
// UpdateBrokerWithRetry is used to update an instance of broker
UpdateBrokerWithRetry(ctx context.Context, name string, updateFunc BrokerUpdateFunc, nrRetries int) error
}

// KnEventingClient is a combination of Sources client interface and namespace
Expand Down Expand Up @@ -349,6 +353,45 @@ func (c *knEventingClient) ListBrokers(ctx context.Context) (*eventingv1.BrokerL
return brokerListNew, nil
}

// UpdateBroker is used to update an instance of broker
func (c *knEventingClient) UpdateBroker(ctx context.Context, broker *eventingv1.Broker) error {
_, err := c.client.Brokers(c.namespace).Update(ctx, broker, meta_v1.UpdateOptions{})
if err != nil {
return kn_errors.GetError(err)
}
return nil
}

func (c *knEventingClient) UpdateBrokerWithRetry(ctx context.Context, name string, updateFunc BrokerUpdateFunc, nrRetries int) error {
return updateBrokerWithRetry(ctx, c, name, updateFunc, nrRetries)
}

func updateBrokerWithRetry(ctx context.Context, c KnEventingClient, name string, updateFunc BrokerUpdateFunc, nrRetries int) error {
b := config.DefaultRetry
b.Steps = nrRetries
updateBrokerFunc := func() error {
return updateBroker(ctx, c, name, updateFunc)
}
err := retry.RetryOnConflict(b, updateBrokerFunc)
return err
}

func updateBroker(ctx context.Context, c KnEventingClient, name string, updateFunc BrokerUpdateFunc) error {
broker, err := c.GetBroker(ctx, name)
if err != nil {
return err
}
if broker.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't update broker %s because it has been marked for deletion", name)
}
updatedBroker, err := updateFunc(broker.DeepCopy())
if err != nil {
return err
}

return c.UpdateBroker(ctx, updatedBroker)
}

// BrokerBuilder is for building the broker
type BrokerBuilder struct {
broker *eventingv1.Broker
Expand All @@ -363,6 +406,13 @@ func NewBrokerBuilder(name string) *BrokerBuilder {
}}
}

// NewBrokerBuilderFromExisting returns broker builder from original broker
func NewBrokerBuilderFromExisting(broker *eventingv1.Broker) *BrokerBuilder {
return &BrokerBuilder{
broker: broker,
}
}

// WithGvk add the GVK coordinates for read tests
func (b *BrokerBuilder) WithGvk() *BrokerBuilder {
_ = updateEventingGVK(b.broker)
Expand All @@ -387,6 +437,80 @@ func (b *BrokerBuilder) Class(class string) *BrokerBuilder {
return b
}

// DlSink for the broker builder
func (b *BrokerBuilder) DlSink(dlSink *duckv1.Destination) *BrokerBuilder {
empty := duckv1.Destination{}
if dlSink == nil || *dlSink == empty {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.DeadLetterSink = dlSink
return b
}

// Retry for the broker builder
func (b *BrokerBuilder) Retry(retry *int32) *BrokerBuilder {
if retry == nil || *retry == 0 {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.Retry = retry
return b
}

// Timeout for the broker builder
func (b *BrokerBuilder) Timeout(timeout *string) *BrokerBuilder {
if timeout == nil || *timeout == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.Timeout = timeout
return b
}

// BackoffPolicy for the broker builder
func (b *BrokerBuilder) BackoffPolicy(policyType *v1.BackoffPolicyType) *BrokerBuilder {
if policyType == nil || *policyType == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.BackoffPolicy = policyType
return b
}

// BackoffDelay for the broker builder
func (b *BrokerBuilder) BackoffDelay(backoffDelay *string) *BrokerBuilder {
if backoffDelay == nil || *backoffDelay == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.BackoffDelay = backoffDelay
return b
}

// RetryAfterMax for the broker builder
func (b *BrokerBuilder) RetryAfterMax(max *string) *BrokerBuilder {
if max == nil || *max == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.RetryAfterMax = max
return b

}

// Build to return an instance of broker object
func (b *BrokerBuilder) Build() *eventingv1.Broker {
return b.broker
Expand Down
16 changes: 15 additions & 1 deletion pkg/eventing/v1/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (c *MockKnEventingClient) ListTriggers(context.Context) (*eventingv1.Trigge
return call.Result[0].(*eventingv1.TriggerList), mock.ErrorOrNil(call.Result[1])
}

// UpdateTrigger records a call for ListTriggers with the expected result and error (nil if none)
// UpdateTrigger records a call for UpdateTrigger with the expected result and error (nil if none)
func (sr *EventingRecorder) UpdateTrigger(trigger interface{}, err error) {
sr.r.Add("UpdateTrigger", []interface{}{trigger}, []interface{}{err})
}
Expand Down Expand Up @@ -163,6 +163,20 @@ func (c *MockKnEventingClient) ListBrokers(context.Context) (*eventingv1.BrokerL
return call.Result[0].(*eventingv1.BrokerList), mock.ErrorOrNil(call.Result[1])
}

// UpdateBroker records a call for UpdateBroker with the expected result and error (nil if none)
func (sr *EventingRecorder) UpdateBroker(broker *eventingv1.Broker, err error) {
sr.r.Add("UpdateBroker", []interface{}{broker}, []interface{}{err})
}

func (c *MockKnEventingClient) UpdateBroker(ctx context.Context, broker *eventingv1.Broker) error {
call := c.recorder.r.VerifyCall("UpdateBroker")
return mock.ErrorOrNil(call.Result[0])
}

func (c *MockKnEventingClient) UpdateBrokerWithRetry(ctx context.Context, name string, updateFunc BrokerUpdateFunc, nrRetries int) error {
return updateBrokerWithRetry(ctx, c, name, updateFunc, nrRetries)
}

// Validate validates whether every recorded action has been called
func (sr *EventingRecorder) Validate() {
sr.r.CheckThatAllRecordedMethodsHaveBeenCalled()
Expand Down
7 changes: 7 additions & 0 deletions pkg/eventing/v1/client_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ func TestMockKnClient(t *testing.T) {
recorder.GetBroker("foo", nil, nil)
recorder.DeleteBroker("foo", time.Duration(10)*time.Second, nil)
recorder.ListBrokers(nil, nil)
recorder.GetBroker("foo", &eventingv1.Broker{}, nil)
recorder.UpdateBroker(&eventingv1.Broker{}, nil)
recorder.UpdateBroker(&eventingv1.Broker{}, nil)

// Call all service
ctx := context.Background()
Expand All @@ -57,6 +60,10 @@ func TestMockKnClient(t *testing.T) {
client.GetBroker(ctx, "foo")
client.DeleteBroker(ctx, "foo", time.Duration(10)*time.Second)
client.ListBrokers(ctx)
client.UpdateBroker(ctx, &eventingv1.Broker{})
client.UpdateBrokerWithRetry(ctx, "foo", func(origBroker *eventingv1.Broker) (*eventingv1.Broker, error) {
return origBroker, nil
}, 10)

// Validate
recorder.Validate()
Expand Down
Loading

0 comments on commit 2a56f07

Please sign in to comment.