Skip to content

Commit

Permalink
Added delivery flags
Browse files Browse the repository at this point in the history
  • Loading branch information
vyasgun committed May 2, 2022
1 parent 736c7c2 commit 2e86c38
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 8 deletions.
12 changes: 9 additions & 3 deletions docs/cmd/kn_broker_create.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ kn broker create NAME
### Options

```
--class string Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available)
-h, --help help for create
-n, --namespace string Specify the namespace to operate in.
--backoff-delay string Based delay between retries
--backoff-policy string Backoff policy for retries, either "linear" or "exponential"
--class string Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available)
--dl-sink string Reference to a sink for delivering events that can not be sent
-h, --help help for create
-n, --namespace string Specify the namespace to operate in.
--retry int32 Number of retries before sending the event to a dead-letter sink
--retry-after-max string Upper bound for a duration specified in an "Retry-After" header (experimental)
--timeout string Timeout for a single request
```

### Options inherited from parent commands
Expand Down
74 changes: 70 additions & 4 deletions pkg/eventing/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ import (
"fmt"
"time"

"knative.dev/client/pkg/config"

"k8s.io/client-go/util/retry"

apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/util/retry"
"knative.dev/client/pkg/config"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/client/clientset/versioned/scheme"
clientv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1"
Expand Down Expand Up @@ -387,6 +386,73 @@ func (b *BrokerBuilder) Class(class string) *BrokerBuilder {
return b
}

func (b *BrokerBuilder) DlSink(dlSink *duckv1.Destination) *BrokerBuilder {
empty := duckv1.Destination{}
if dlSink == nil || *dlSink == empty {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.DeadLetterSink = dlSink
return b
}

func (b *BrokerBuilder) Retry(retry *int32) *BrokerBuilder {
if retry == nil || *retry == 0 {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.Retry = retry
return b
}

func (b *BrokerBuilder) Timeout(timeout *string) *BrokerBuilder {
if timeout == nil || *timeout == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.Timeout = timeout
return b
}
func (b *BrokerBuilder) BackoffPolicy(policyType *v1.BackoffPolicyType) *BrokerBuilder {
if policyType == nil || *policyType == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.BackoffPolicy = policyType
return b
}

func (b *BrokerBuilder) BackoffDelay(backoffDelay *string) *BrokerBuilder {
if backoffDelay == nil || *backoffDelay == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.BackoffDelay = backoffDelay
return b
}

func (b *BrokerBuilder) RetryAfterMax(max *string) *BrokerBuilder {
if max == nil || *max == "" {
return b
}
if b.broker.Spec.Delivery == nil {
b.broker.Spec.Delivery = &v1.DeliverySpec{}
}
b.broker.Spec.Delivery.RetryAfterMax = max
return b

}

// Build to return an instance of broker object
func (b *BrokerBuilder) Build() *eventingv1.Broker {
return b.broker
Expand Down
29 changes: 28 additions & 1 deletion pkg/kn/commands/broker/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"fmt"

"github.com/spf13/cobra"
"knative.dev/client/pkg/kn/commands/flags"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"

clientv1beta1 "knative.dev/client/pkg/eventing/v1"
"knative.dev/client/pkg/kn/commands"
Expand All @@ -39,6 +42,7 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command {

var className string

var deliveryFlags DeliveryOptionFlags
cmd := &cobra.Command{
Use: "create NAME",
Short: "Create a broker",
Expand All @@ -59,10 +63,32 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command {
return err
}

dynamicClient, err := p.NewDynamicClient(namespace)
if err != nil {
return err
}

var empty = flags.SinkFlags{}
var destination *duckv1.Destination
if deliveryFlags.SinkFlags != empty {
destination, err = deliveryFlags.SinkFlags.ResolveSink(cmd.Context(), dynamicClient, namespace)
if err != nil {
return err
}
}

backoffPolicy := v1.BackoffPolicyType(deliveryFlags.BackoffPolicy)

brokerBuilder := clientv1beta1.
NewBrokerBuilder(name).
Namespace(namespace).
Class(className)
Class(className).
DlSink(destination).
Retry(&deliveryFlags.RetryCount).
Timeout(&deliveryFlags.Timeout).
BackoffPolicy(&backoffPolicy).
BackoffDelay(&deliveryFlags.BackoffDelay).
RetryAfterMax(&deliveryFlags.RetryAfterMax)

err = eventingClient.CreateBroker(cmd.Context(), brokerBuilder.Build())
if err != nil {
Expand All @@ -76,5 +102,6 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command {
}
commands.AddNamespaceFlags(cmd.Flags(), false)
cmd.Flags().StringVar(&className, "class", "", "Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available)")
deliveryFlags.Add(cmd)
return cmd
}
40 changes: 40 additions & 0 deletions pkg/kn/commands/broker/delivery_option_flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright © 2022 The Knative Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package broker

import (
"github.com/spf13/cobra"
"knative.dev/client/pkg/kn/commands/flags"
)

type DeliveryOptionFlags struct {
SinkFlags flags.SinkFlags
RetryCount int32
Timeout string
BackoffPolicy string
BackoffDelay string
RetryAfterMax string
}

func (d *DeliveryOptionFlags) Add(cmd *cobra.Command) {
d.SinkFlags.AddWithFlagName(cmd, "dl-sink", "")
cmd.Flag("dl-sink").Usage = "Reference to a sink for delivering events that can not be sent"

cmd.Flags().Int32Var(&d.RetryCount, "retry", 0, "Number of retries before sending the event to a dead-letter sink")
cmd.Flags().StringVar(&d.Timeout, "timeout", "", "Timeout for a single request")
cmd.Flags().StringVar(&d.BackoffPolicy, "backoff-policy", "", "Backoff policy for retries, either \"linear\" or \"exponential\"")
cmd.Flags().StringVar(&d.BackoffDelay, "backoff-delay", "", "Based delay between retries")
cmd.Flags().StringVar(&d.RetryAfterMax, "retry-after-max", "", "Upper bound for a duration specified in an \"Retry-After\" header (experimental)")
}

0 comments on commit 2e86c38

Please sign in to comment.