From 2e86c387306ecc0f8692aadb7298787456c8d8f8 Mon Sep 17 00:00:00 2001 From: Gunjan Vyas Date: Mon, 2 May 2022 17:08:38 +0530 Subject: [PATCH] Added delivery flags --- docs/cmd/kn_broker_create.md | 12 ++- pkg/eventing/v1/client.go | 74 ++++++++++++++++++- pkg/kn/commands/broker/create.go | 29 +++++++- .../commands/broker/delivery_option_flags.go | 40 ++++++++++ 4 files changed, 147 insertions(+), 8 deletions(-) create mode 100644 pkg/kn/commands/broker/delivery_option_flags.go diff --git a/docs/cmd/kn_broker_create.md b/docs/cmd/kn_broker_create.md index 20b997a469..909cc686dd 100644 --- a/docs/cmd/kn_broker_create.md +++ b/docs/cmd/kn_broker_create.md @@ -21,9 +21,15 @@ kn broker create NAME ### Options ``` - --class string Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available) - -h, --help help for create - -n, --namespace string Specify the namespace to operate in. + --backoff-delay string Based delay between retries + --backoff-policy string Backoff policy for retries, either "linear" or "exponential" + --class string Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available) + --dl-sink string Reference to a sink for delivering events that can not be sent + -h, --help help for create + -n, --namespace string Specify the namespace to operate in. + --retry int32 Number of retries before sending the event to a dead-letter sink + --retry-after-max string Upper bound for a duration specified in an "Retry-After" header (experimental) + --timeout string Timeout for a single request ``` ### Options inherited from parent commands diff --git a/pkg/eventing/v1/client.go b/pkg/eventing/v1/client.go index 0d6effb636..e37a89dd07 100644 --- a/pkg/eventing/v1/client.go +++ b/pkg/eventing/v1/client.go @@ -19,14 +19,13 @@ import ( "fmt" "time" - "knative.dev/client/pkg/config" - - "k8s.io/client-go/util/retry" - apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/util/retry" + "knative.dev/client/pkg/config" + v1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/client/clientset/versioned/scheme" clientv1 "knative.dev/eventing/pkg/client/clientset/versioned/typed/eventing/v1" @@ -387,6 +386,73 @@ func (b *BrokerBuilder) Class(class string) *BrokerBuilder { return b } +func (b *BrokerBuilder) DlSink(dlSink *duckv1.Destination) *BrokerBuilder { + empty := duckv1.Destination{} + if dlSink == nil || *dlSink == empty { + return b + } + if b.broker.Spec.Delivery == nil { + b.broker.Spec.Delivery = &v1.DeliverySpec{} + } + b.broker.Spec.Delivery.DeadLetterSink = dlSink + return b +} + +func (b *BrokerBuilder) Retry(retry *int32) *BrokerBuilder { + if retry == nil || *retry == 0 { + return b + } + if b.broker.Spec.Delivery == nil { + b.broker.Spec.Delivery = &v1.DeliverySpec{} + } + b.broker.Spec.Delivery.Retry = retry + return b +} + +func (b *BrokerBuilder) Timeout(timeout *string) *BrokerBuilder { + if timeout == nil || *timeout == "" { + return b + } + if b.broker.Spec.Delivery == nil { + b.broker.Spec.Delivery = &v1.DeliverySpec{} + } + b.broker.Spec.Delivery.Timeout = timeout + return b +} +func (b *BrokerBuilder) BackoffPolicy(policyType *v1.BackoffPolicyType) *BrokerBuilder { + if policyType == nil || *policyType == "" { + return b + } + if b.broker.Spec.Delivery == nil { + b.broker.Spec.Delivery = &v1.DeliverySpec{} + } + b.broker.Spec.Delivery.BackoffPolicy = policyType + return b +} + +func (b *BrokerBuilder) BackoffDelay(backoffDelay *string) *BrokerBuilder { + if backoffDelay == nil || *backoffDelay == "" { + return b + } + if b.broker.Spec.Delivery == nil { + b.broker.Spec.Delivery = &v1.DeliverySpec{} + } + b.broker.Spec.Delivery.BackoffDelay = backoffDelay + return b +} + +func (b *BrokerBuilder) RetryAfterMax(max *string) *BrokerBuilder { + if max == nil || *max == "" { + return b + } + if b.broker.Spec.Delivery == nil { + b.broker.Spec.Delivery = &v1.DeliverySpec{} + } + b.broker.Spec.Delivery.RetryAfterMax = max + return b + +} + // Build to return an instance of broker object func (b *BrokerBuilder) Build() *eventingv1.Broker { return b.broker diff --git a/pkg/kn/commands/broker/create.go b/pkg/kn/commands/broker/create.go index bb60c2e6cd..853e1b9e05 100644 --- a/pkg/kn/commands/broker/create.go +++ b/pkg/kn/commands/broker/create.go @@ -21,6 +21,9 @@ import ( "fmt" "github.com/spf13/cobra" + "knative.dev/client/pkg/kn/commands/flags" + v1 "knative.dev/eventing/pkg/apis/duck/v1" + duckv1 "knative.dev/pkg/apis/duck/v1" clientv1beta1 "knative.dev/client/pkg/eventing/v1" "knative.dev/client/pkg/kn/commands" @@ -39,6 +42,7 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command { var className string + var deliveryFlags DeliveryOptionFlags cmd := &cobra.Command{ Use: "create NAME", Short: "Create a broker", @@ -59,10 +63,32 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command { return err } + dynamicClient, err := p.NewDynamicClient(namespace) + if err != nil { + return err + } + + var empty = flags.SinkFlags{} + var destination *duckv1.Destination + if deliveryFlags.SinkFlags != empty { + destination, err = deliveryFlags.SinkFlags.ResolveSink(cmd.Context(), dynamicClient, namespace) + if err != nil { + return err + } + } + + backoffPolicy := v1.BackoffPolicyType(deliveryFlags.BackoffPolicy) + brokerBuilder := clientv1beta1. NewBrokerBuilder(name). Namespace(namespace). - Class(className) + Class(className). + DlSink(destination). + Retry(&deliveryFlags.RetryCount). + Timeout(&deliveryFlags.Timeout). + BackoffPolicy(&backoffPolicy). + BackoffDelay(&deliveryFlags.BackoffDelay). + RetryAfterMax(&deliveryFlags.RetryAfterMax) err = eventingClient.CreateBroker(cmd.Context(), brokerBuilder.Build()) if err != nil { @@ -76,5 +102,6 @@ func NewBrokerCreateCommand(p *commands.KnParams) *cobra.Command { } commands.AddNamespaceFlags(cmd.Flags(), false) cmd.Flags().StringVar(&className, "class", "", "Broker class like 'MTChannelBasedBroker' or 'Kafka' (if available)") + deliveryFlags.Add(cmd) return cmd } diff --git a/pkg/kn/commands/broker/delivery_option_flags.go b/pkg/kn/commands/broker/delivery_option_flags.go new file mode 100644 index 0000000000..218c69db8b --- /dev/null +++ b/pkg/kn/commands/broker/delivery_option_flags.go @@ -0,0 +1,40 @@ +// Copyright © 2022 The Knative Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package broker + +import ( + "github.com/spf13/cobra" + "knative.dev/client/pkg/kn/commands/flags" +) + +type DeliveryOptionFlags struct { + SinkFlags flags.SinkFlags + RetryCount int32 + Timeout string + BackoffPolicy string + BackoffDelay string + RetryAfterMax string +} + +func (d *DeliveryOptionFlags) Add(cmd *cobra.Command) { + d.SinkFlags.AddWithFlagName(cmd, "dl-sink", "") + cmd.Flag("dl-sink").Usage = "Reference to a sink for delivering events that can not be sent" + + cmd.Flags().Int32Var(&d.RetryCount, "retry", 0, "Number of retries before sending the event to a dead-letter sink") + cmd.Flags().StringVar(&d.Timeout, "timeout", "", "Timeout for a single request") + cmd.Flags().StringVar(&d.BackoffPolicy, "backoff-policy", "", "Backoff policy for retries, either \"linear\" or \"exponential\"") + cmd.Flags().StringVar(&d.BackoffDelay, "backoff-delay", "", "Based delay between retries") + cmd.Flags().StringVar(&d.RetryAfterMax, "retry-after-max", "", "Upper bound for a duration specified in an \"Retry-After\" header (experimental)") +}