diff --git a/pkg/adapter/mtping/adapter.go b/pkg/adapter/mtping/adapter.go index 745777a060a..589ee80501c 100644 --- a/pkg/adapter/mtping/adapter.go +++ b/pkg/adapter/mtping/adapter.go @@ -56,9 +56,9 @@ func NewEnvConfig() adapter.EnvConfigAccessor { return &adapter.EnvConfig{} } -func NewAdapter(ctx context.Context, _ adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter { +func NewAdapter(ctx context.Context, env adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter { logger := logging.FromContext(ctx) - runner := NewCronJobsRunner(ceClient, kubeclient.Get(ctx), logging.FromContext(ctx)) + runner := NewCronJobsRunner(adapter.GetClientConfig(ctx), kubeclient.Get(ctx), logging.FromContext(ctx)) return &mtpingAdapter{ logger: logger, diff --git a/pkg/adapter/mtping/runner.go b/pkg/adapter/mtping/runner.go index 135861ca687..d66ad11ecae 100644 --- a/pkg/adapter/mtping/runner.go +++ b/pkg/adapter/mtping/runner.go @@ -33,6 +33,7 @@ import ( typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" + "knative.dev/eventing/pkg/adapter/v2" kncloudevents "knative.dev/eventing/pkg/adapter/v2" "knative.dev/eventing/pkg/adapter/v2/util/crstatusevent" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" @@ -50,26 +51,25 @@ type cronJobsRunner struct { // The cron job runner cron cron.Cron - // client sends cloudevents. - Client cloudevents.Client - // Where to send logs Logger *zap.SugaredLogger // kubeClient for sending k8s events kubeClient kubernetes.Interface + + clientConfig kncloudevents.ClientConfig } const ( resourceGroup = "pingsources.sources.knative.dev" ) -func NewCronJobsRunner(ceClient cloudevents.Client, kubeClient kubernetes.Interface, logger *zap.SugaredLogger, opts ...cron.Option) *cronJobsRunner { +func NewCronJobsRunner(cfg adapter.ClientConfig, kubeClient kubernetes.Interface, logger *zap.SugaredLogger, opts ...cron.Option) *cronJobsRunner { return &cronJobsRunner{ - cron: *cron.New(opts...), - Client: ceClient, - Logger: logger, - kubeClient: kubeClient, + cron: *cron.New(opts...), + Logger: logger, + kubeClient: kubeClient, + clientConfig: cfg, } } @@ -107,7 +107,18 @@ func (a *cronJobsRunner) AddSchedule(source *sourcesv1.PingSource) cron.EntryID } ctx = kncloudevents.ContextWithMetricTag(ctx, metricTag) - id, _ := a.cron.AddFunc(schedule, a.cronTick(ctx, event)) + + client, err := a.newPingSourceClient(source) + if err != nil { + a.Logger.Desugar().Error("Failed to create client", + zap.String("name", source.GetName()), + zap.String("namespace", source.GetNamespace()), + zap.Error(err), + ) + return -1 + } + + id, _ := a.cron.AddFunc(schedule, a.cronTick(ctx, client, event)) return id } @@ -128,7 +139,7 @@ func (a *cronJobsRunner) Stop() { } } -func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event) func() { +func (a *cronJobsRunner) cronTick(ctx context.Context, client adapter.Client, event cloudevents.Event) func() { return func() { event := event.Clone() event.SetID(uuid.New().String()) // provide an ID here so we can track it with logging @@ -141,11 +152,13 @@ func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event) a.Logger.Debugf("sending cloudevent id: %s, source: %s, target: %s", event.ID(), source, target) - if result := a.Client.Send(ctx, event); !cloudevents.IsACK(result) { + if result := client.Send(ctx, event); !cloudevents.IsACK(result) { // Exhausted number of retries. Event is lost. a.Logger.Error("failed to send cloudevent result: ", zap.Any("result", result), zap.String("source", source), zap.String("target", target), zap.String("id", event.ID())) } + + client.CloseIdleConnections() } } @@ -174,3 +187,27 @@ func makeEvent(source *sourcesv1.PingSource) (cloudevents.Event, error) { return event, nil } + +func (a *cronJobsRunner) newPingSourceClient(source *sourcesv1.PingSource) (adapter.Client, error) { + var env adapter.EnvConfig + if a.clientConfig.Env != nil { + env = adapter.EnvConfig{ + Namespace: source.GetNamespace(), + Name: a.clientConfig.Env.GetName(), + EnvSinkTimeout: fmt.Sprintf("%d", a.clientConfig.Env.GetSinktimeout()), + } + } + + env.Sink = source.Status.SinkURI.String() + + cfg := adapter.ClientConfig{ + Env: &env, + CeOverrides: source.Spec.CloudEventOverrides, + Reporter: a.clientConfig.Reporter, + CrStatusEventClient: a.clientConfig.CrStatusEventClient, + Options: a.clientConfig.Options, + Client: a.clientConfig.Client, + } + + return adapter.NewClient(cfg) +} diff --git a/pkg/adapter/mtping/runner_test.go b/pkg/adapter/mtping/runner_test.go index 541a3f473c0..2705069ac08 100644 --- a/pkg/adapter/mtping/runner_test.go +++ b/pkg/adapter/mtping/runner_test.go @@ -35,6 +35,7 @@ import ( "knative.dev/pkg/logging" rectesting "knative.dev/pkg/reconciler/testing" + "knative.dev/eventing/pkg/adapter/v2" adaptertesting "knative.dev/eventing/pkg/adapter/v2/test" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" ) @@ -201,7 +202,7 @@ func TestAddRunRemoveSchedules(t *testing.T) { logger := logging.FromContext(ctx) ce := adaptertesting.NewTestClient() - runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger) + runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger) entryId := runner.AddSchedule(tc.src) entry := runner.cron.Entry(entryId) @@ -228,7 +229,7 @@ func TestStartStopCron(t *testing.T) { logger := logging.FromContext(ctx) ce := adaptertesting.NewTestClient() - runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger) + runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger) ctx, cancel := context.WithCancel(context.Background()) wctx, wcancel := context.WithCancel(context.Background()) @@ -257,7 +258,7 @@ func TestStartStopCronDelayWait(t *testing.T) { logger := logging.FromContext(ctx) ce := adaptertesting.NewTestClientWithDelay(time.Second * 5) - runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger) + runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/adapter/v2/cloudevents.go b/pkg/adapter/v2/cloudevents.go index dae960b5e4b..25bb76264b4 100644 --- a/pkg/adapter/v2/cloudevents.go +++ b/pkg/adapter/v2/cloudevents.go @@ -35,14 +35,23 @@ import ( "knative.dev/pkg/tracing/propagation/tracecontextb3" "knative.dev/eventing/pkg/adapter/v2/util/crstatusevent" - "knative.dev/eventing/pkg/eventingtls" + "knative.dev/eventing/pkg/apis" "knative.dev/eventing/pkg/metrics/source" obsclient "knative.dev/eventing/pkg/observability/client" ) +type closeIdler interface { + CloseIdleConnections() +} + +type Client interface { + cloudevents.Client + closeIdler +} + var newClientHTTPObserved = NewClientHTTPObserved -func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (ceclient.Client, error) { +func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (Client, error) { t, err := obshttp.NewObservedHTTP(topt...) if err != nil { return nil, err @@ -55,85 +64,117 @@ func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (ceclient return nil, err } - return c, nil + return &client{ + ceClient: c, + }, nil } // NewCloudEventsClient returns a client that will apply the ceOverrides to // outbound events and report outbound event counts. -func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (cloudevents.Client, error) { +func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (Client, error) { opts := make([]http.Option, 0) if len(target) > 0 { opts = append(opts, cloudevents.WithTarget(target)) } - return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...) + return NewClient(ClientConfig{ + CeOverrides: ceOverrides, + Reporter: reporter, + Options: opts, + }) } // NewCloudEventsClientWithOptions returns a client created with provided options -func NewCloudEventsClientWithOptions(ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, opts ...http.Option) (cloudevents.Client, error) { - return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...) +func NewCloudEventsClientWithOptions(ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, opts ...http.Option) (Client, error) { + return NewClient(ClientConfig{ + CeOverrides: ceOverrides, + Reporter: reporter, + Options: opts, + }) } // NewCloudEventsClientCRStatus returns a client CR status -func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) { - return newCloudEventsClientCRStatus(env, nil, reporter, crStatusEventClient) +func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (Client, error) { + return NewClient(ClientConfig{ + Env: env, + Reporter: reporter, + CrStatusEventClient: crStatusEventClient, + }) } -func newCloudEventsClientCRStatus(env EnvConfigAccessor, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, - crStatusEventClient *crstatusevent.CRStatusEventClient, opts ...http.Option) (cloudevents.Client, error) { - pOpts := make([]http.Option, 0) - pOpts = append(pOpts, cloudevents.WithRoundTripper(&ochttp.Transport{ +type ClientConfig struct { + Env EnvConfigAccessor + CeOverrides *duckv1.CloudEventOverrides + Reporter source.StatsReporter + CrStatusEventClient *crstatusevent.CRStatusEventClient + Options []http.Option + + Client Client +} + +type clientConfigKey struct{} + +func withClientConfig(ctx context.Context, r ClientConfig) context.Context { + return context.WithValue(ctx, clientConfigKey{}, r) +} + +func GetClientConfig(ctx context.Context) ClientConfig { + val := ctx.Value(clientConfigKey{}) + if val == nil { + return ClientConfig{} + } + return val.(ClientConfig) +} + +func NewClient(cfg ClientConfig) (Client, error) { + if cfg.Client != nil { + return cfg.Client, nil + } + + transport := &ochttp.Transport{ Propagation: tracecontextb3.TraceContextEgress, - })) + } - if env != nil { - if target := env.GetSink(); len(target) > 0 { + pOpts := make([]http.Option, 0) + var closeIdler closeIdler = nethttp.DefaultTransport.(*nethttp.Transport) + + ceOverrides := cfg.CeOverrides + if cfg.Env != nil { + if target := cfg.Env.GetSink(); len(target) > 0 { pOpts = append(pOpts, cloudevents.WithTarget(target)) } - if sinkWait := env.GetSinktimeout(); sinkWait > 0 { + if sinkWait := cfg.Env.GetSinktimeout(); sinkWait > 0 { pOpts = append(pOpts, setTimeOut(time.Duration(sinkWait)*time.Second)) } - if caCerts := env.GetCACerts(); (caCerts != nil && *caCerts != "") && eventingtls.IsHttpsSink(env.GetSink()) { - var err error - - clientConfig := eventingtls.NewDefaultClientConfig() - clientConfig.CACerts = caCerts - - transport := nethttp.DefaultTransport.(*nethttp.Transport).Clone() - transport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig) - if err != nil { - return nil, err - } - - pOpts = append(pOpts, http.WithRoundTripper(&ochttp.Transport{ - Base: transport, - Propagation: tracecontextb3.TraceContextEgress, - })) - } if ceOverrides == nil { var err error - ceOverrides, err = env.GetCloudEventOverrides() + ceOverrides, err = cfg.Env.GetCloudEventOverrides() if err != nil { return nil, err } } + + pOpts = append(pOpts, http.WithHeader(apis.KnNamespaceHeader, cfg.Env.GetNamespace())) } + pOpts = append(pOpts, http.WithRoundTripper(transport)) + // Make sure that explicitly set options have priority - opts = append(pOpts, opts...) + opts := append(pOpts, cfg.Options...) ceClient, err := newClientHTTPObserved(opts, nil) - if crStatusEventClient == nil { - crStatusEventClient = crstatusevent.GetDefaultClient() + if cfg.CrStatusEventClient == nil { + cfg.CrStatusEventClient = crstatusevent.GetDefaultClient() } if err != nil { return nil, err } return &client{ ceClient: ceClient, + closeIdler: closeIdler, ceOverrides: ceOverrides, - reporter: reporter, - crStatusEventClient: crStatusEventClient, + reporter: cfg.Reporter, + crStatusEventClient: cfg.CrStatusEventClient, }, nil } @@ -155,6 +196,11 @@ type client struct { ceOverrides *duckv1.CloudEventOverrides reporter source.StatsReporter crStatusEventClient *crstatusevent.CRStatusEventClient + closeIdler closeIdler +} + +func (c *client) CloseIdleConnections() { + c.closeIdler.CloseIdleConnections() } var _ cloudevents.Client = (*client)(nil) @@ -189,6 +235,9 @@ func (c *client) applyOverrides(event *cloudevents.Event) { } func (c *client) reportMetrics(ctx context.Context, event cloudevents.Event, result protocol.Result) { + if c.reporter == nil { + return + } tags := MetricTagFromContext(ctx) reportArgs := &source.ReportArgs{ Namespace: tags.Namespace, diff --git a/pkg/adapter/v2/cloudevents_test.go b/pkg/adapter/v2/cloudevents_test.go index 2b9fd500ce4..ec776dab2e3 100644 --- a/pkg/adapter/v2/cloudevents_test.go +++ b/pkg/adapter/v2/cloudevents_test.go @@ -142,7 +142,7 @@ func TestNewCloudEventsClient_send(t *testing.T) { } } - defer func(restoreHTTP func(topt []http.Option, copt []v2client.Option) (v2client.Client, error), restoreEnv string, setEnv bool) { + defer func(restoreHTTP func(topt []http.Option, copt []v2client.Option) (Client, error), restoreEnv string, setEnv bool) { newClientHTTPObserved = restoreHTTP if setEnv { if err := os.Setenv("K_SINK_TIMEOUT", restoreEnv); err != nil { @@ -157,7 +157,7 @@ func TestNewCloudEventsClient_send(t *testing.T) { }(restoreHTTP, restoreEnv, setEnv) sendOptions := []http.Option{} - newClientHTTPObserved = func(topt []http.Option, copt []v2client.Option) (v2client.Client, error) { + newClientHTTPObserved = func(topt []http.Option, copt []v2client.Option) (Client, error) { sendOptions = append(sendOptions, topt...) return nil, nil } diff --git a/pkg/adapter/v2/main.go b/pkg/adapter/v2/main.go index 582e468c7a8..a839cd66896 100644 --- a/pkg/adapter/v2/main.go +++ b/pkg/adapter/v2/main.go @@ -214,7 +214,14 @@ func MainWithInformers(ctx context.Context, component string, env EnvConfigAcces logger.Errorw("Error building statsreporter", zap.Error(err)) } - eventsClient, err := NewCloudEventsClientCRStatus(env, reporter, crStatusEventClient) + clientConfig := ClientConfig{ + Env: env, + Reporter: reporter, + CrStatusEventClient: crStatusEventClient, + } + ctx = withClientConfig(ctx, clientConfig) + + eventsClient, err := NewClient(clientConfig) if err != nil { logger.Fatalw("Error building cloud event client", zap.Error(err)) } diff --git a/pkg/adapter/v2/test/test_client.go b/pkg/adapter/v2/test/test_client.go index 32ececf6c4f..602f7b9b57b 100644 --- a/pkg/adapter/v2/test/test_client.go +++ b/pkg/adapter/v2/test/test_client.go @@ -41,12 +41,13 @@ type TestCloudEventsClient struct { } } +func (c *TestCloudEventsClient) CloseIdleConnections() { +} + type EventData struct { ID string `json:"id"` Type string `json:"type"` } - -var _ cloudevents.Client = (*TestCloudEventsClient)(nil) var eventData EventData // Send_AppendResult will enqueue a response for the following Send call. diff --git a/pkg/apis/http.go b/pkg/apis/http.go new file mode 100644 index 00000000000..30f8f4d8c06 --- /dev/null +++ b/pkg/apis/http.go @@ -0,0 +1,21 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apis + +const ( + KnNamespaceHeader = "Kn-Namespace" +) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index e8be6f9095e..d04b1e92159 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -34,12 +34,14 @@ import ( cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" "go.opencensus.io/trace" "go.uber.org/zap" - channelAttributes "knative.dev/eventing/pkg/channel/attributes" "knative.dev/pkg/logging" + channelAttributes "knative.dev/eventing/pkg/channel/attributes" + + "knative.dev/eventing/pkg/apis" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/feature" - broker "knative.dev/eventing/pkg/broker" + "knative.dev/eventing/pkg/broker" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1" "knative.dev/eventing/pkg/eventfilter" "knative.dev/eventing/pkg/eventfilter/attributes" @@ -222,12 +224,12 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { h.reportArrivalTime(event, reportArgs) - h.send(ctx, writer, request.Header, subscriberURI.URL(), reportArgs, event, ttl) + h.send(ctx, writer, request.Header, subscriberURI.URL(), reportArgs, event, t, ttl) } -func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target *url.URL, reportArgs *ReportArgs, event *cloudevents.Event, ttl int32) { +func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target *url.URL, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) { // send the event to trigger's subscriber - response, responseErr := h.sendEvent(ctx, headers, target, event, reportArgs) + response, responseErr := h.sendEvent(ctx, headers, target, event, t, reportArgs) if responseErr.err != nil { h.logger.Error("failed to send event", zap.Error(responseErr.err)) @@ -270,7 +272,7 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers _ = h.reporter.ReportEventCount(reportArgs, statusCode) } -func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target *url.URL, event *cloudevents.Event, reporterArgs *ReportArgs) (*http.Response, ErrHandler) { +func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target *url.URL, event *cloudevents.Event, t *eventingv1.Trigger, reporterArgs *ReportArgs) (*http.Response, ErrHandler) { responseErr := ErrHandler{ ResponseCode: NoResponse, } @@ -286,6 +288,7 @@ func (h *Handler) sendEvent(ctx context.Context, headers http.Header, target *ur defer message.Finish(nil) additionalHeaders := utils.PassThroughHeaders(headers) + additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace()) // Following the spec https://github.com/knative/specs/blob/main/specs/eventing/data-plane.md#derived-reply-events additionalHeaders.Set("prefer", "reply") diff --git a/pkg/channel/fanout/fanout_message_handler.go b/pkg/channel/fanout/fanout_message_handler.go index 77be714bdeb..952daf7cada 100644 --- a/pkg/channel/fanout/fanout_message_handler.go +++ b/pkg/channel/fanout/fanout_message_handler.go @@ -33,6 +33,8 @@ import ( "github.com/cloudevents/sdk-go/v2/binding/buffering" "go.opencensus.io/trace" "go.uber.org/zap" + + "knative.dev/eventing/pkg/apis" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/kncloudevents" @@ -185,6 +187,7 @@ func createMessageReceiverFunction(f *FanoutMessageHandler) func(context.Context go func(m binding.Message, h nethttp.Header, s *trace.Span, r *channel.StatsReporter, args *channel.ReportArgs) { // Run async dispatch with background context. ctx = trace.NewContext(context.Background(), s) + h.Set(apis.KnNamespaceHeader, ref.Namespace) // Any returned error is already logged in f.dispatch(). dispatchResultForFanout := f.dispatch(ctx, subs, m, h) _ = ParseDispatchResultAndReportMetrics(dispatchResultForFanout, *r, *args) diff --git a/pkg/channel/message_dispatcher.go b/pkg/channel/message_dispatcher.go index d6a611a20b8..a0d628711ac 100644 --- a/pkg/channel/message_dispatcher.go +++ b/pkg/channel/message_dispatcher.go @@ -37,6 +37,8 @@ import ( "knative.dev/pkg/network" "knative.dev/pkg/system" + eventingapis "knative.dev/eventing/pkg/apis" + "knative.dev/eventing/pkg/broker" "knative.dev/eventing/pkg/channel/attributes" "knative.dev/eventing/pkg/kncloudevents" @@ -158,6 +160,13 @@ func (d *MessageDispatcherImpl) DispatchMessageWithRetries(ctx context.Context, responseAdditionalHeaders = additionalHeaders } + if additionalHeaders.Get(eventingapis.KnNamespaceHeader) != "" { + if responseAdditionalHeaders == nil { + responseAdditionalHeaders = make(nethttp.Header) + } + responseAdditionalHeaders.Set(eventingapis.KnNamespaceHeader, additionalHeaders.Get(eventingapis.KnNamespaceHeader)) + } + // No response, dispatch completed if responseMessage == nil { return dispatchExecutionInfo, nil diff --git a/pkg/metrics/source/stats_reporter.go b/pkg/metrics/source/stats_reporter.go index ae45b968fa2..9fcce5ca338 100644 --- a/pkg/metrics/source/stats_reporter.go +++ b/pkg/metrics/source/stats_reporter.go @@ -20,9 +20,10 @@ import ( "context" "go.opencensus.io/stats/view" - eventingmetrics "knative.dev/eventing/pkg/metrics" "knative.dev/pkg/metrics" + eventingmetrics "knative.dev/eventing/pkg/metrics" + "go.opencensus.io/stats" "go.opencensus.io/tag" ) diff --git a/test/rekt/features/broker/feature.go b/test/rekt/features/broker/feature.go index 58ff8bc2b3b..0b0a3138222 100644 --- a/test/rekt/features/broker/feature.go +++ b/test/rekt/features/broker/feature.go @@ -26,17 +26,19 @@ import ( "github.com/cloudevents/sdk-go/v2/binding/spec" "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" + "knative.dev/reconciler-test/pkg/environment" + + "knative.dev/pkg/ptr" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/test/rekt/features" "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/channel" "knative.dev/eventing/test/rekt/resources/subscription" "knative.dev/eventing/test/rekt/resources/trigger" v1 "knative.dev/pkg/apis/duck/v1" - "knative.dev/pkg/ptr" - "knative.dev/reconciler-test/pkg/eventshub" eventasssert "knative.dev/reconciler-test/pkg/eventshub/assert" "knative.dev/reconciler-test/pkg/feature" @@ -343,18 +345,30 @@ func brokerChannelFlowWithTransformation(createSubscriberFn func(ref *v1.KRefere ) f.Stable("(Trigger1 point to) sink1 has all the events"). - Must("delivers original events", - eventasssert.OnStore(sink1).Match(eventMatcher).AtLeast(1)) + Must("delivers original events", func(ctx context.Context, t feature.T) { + eventasssert.OnStore(sink1). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + Match(eventMatcher). + AtLeast(1)(ctx, t) + }) f.Stable("(Trigger2 point to) sink2 has all the events"). Must("delivers original events", eventasssert.OnStore(sink2).Match(eventMatcher).AtLeast(1)). - Must("delivers transformation events", - eventasssert.OnStore(sink2).Match(transformEventMatcher).AtLeast(1)) + Must("delivers transformation events", func(ctx context.Context, t feature.T) { + eventasssert.OnStore(sink2). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + Match(transformEventMatcher). + AtLeast(1)(ctx, t) + }) f.Stable("(Trigger3 point to) Channel's subscriber just has events after transformation"). - Must("delivers transformation events", - eventasssert.OnStore(sink3).Match(transformEventMatcher).AtLeast(1)). + Must("delivers transformation events", func(ctx context.Context, t feature.T) { + eventasssert.OnStore(sink3). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + Match(transformEventMatcher). + AtLeast(1)(ctx, t) + }). Must("delivers original events", eventasssert.OnStore(sink3).Match(eventMatcher).Not()) @@ -628,14 +642,19 @@ func brokerRedeliveryDropN(retryNum int32, dropNum uint) *feature.Feature { f.Stable("Broker Redelivery failed the first n events"). Must("delivers events", - eventasssert.OnStore(sink).Match( - eventasssert.MatchKind(eventasssert.EventReceived), - eventasssert.MatchEvent( - test.HasSource(eventSource), - test.HasType(eventType), - test.HasData([]byte(eventBody)), - ), - ).AtLeast(1)) + func(ctx context.Context, t feature.T) { + eventasssert.OnStore(sink). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + Match( + eventasssert.MatchKind(eventasssert.EventReceived), + eventasssert.MatchEvent( + test.HasSource(eventSource), + test.HasType(eventType), + test.HasData([]byte(eventBody)), + ), + ). + AtLeast(1)(ctx, t) + }) return f } @@ -693,11 +712,14 @@ func brokerSubscriberUnreachable() *feature.Feature { )) f.Assert("Receives dls extensions when subscriber is unreachable", - eventasssert.OnStore(sink). - MatchEvent( - test.HasExtension("knativeerrordest", "http://fake.svc.cluster.local"), - ). - AtLeast(1), + func(ctx context.Context, t feature.T) { + eventasssert.OnStore(sink). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + MatchEvent( + test.HasExtension("knativeerrordest", "http://fake.svc.cluster.local"), + ). + AtLeast(1)(ctx, t) + }, ) return f } diff --git a/test/rekt/features/channel/features.go b/test/rekt/features/channel/features.go index 9cee8ffb74e..4e514695160 100644 --- a/test/rekt/features/channel/features.go +++ b/test/rekt/features/channel/features.go @@ -26,6 +26,7 @@ import ( "github.com/cloudevents/sdk-go/v2/test" "github.com/google/uuid" duckv1 "knative.dev/pkg/apis/duck/v1" + "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/manifest" @@ -35,6 +36,7 @@ import ( eventasssert "knative.dev/reconciler-test/pkg/eventshub/assert" + "knative.dev/eventing/test/rekt/features" "knative.dev/eventing/test/rekt/resources/channel" "knative.dev/eventing/test/rekt/resources/channel_impl" "knative.dev/eventing/test/rekt/resources/containersource" @@ -108,10 +110,12 @@ func DeadLetterSink(createSubscriberFn func(ref *duckv1.KReference, uri string) f.Requirement("containersource is ready", containersource.IsReady(cs)) f.Requirement("Channel has dead letter sink uri", channel_impl.HasDeadLetterSinkURI(name, channel_impl.GVR())) - f.Assert("dls receives events", assert.OnStore(sink). - MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")). - AtLeast(1), - ) + f.Assert("dls receives events", func(ctx context.Context, t feature.T) { + assert.OnStore(sink). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")). + AtLeast(1)(ctx, t) + }) return f } @@ -141,10 +145,12 @@ func DeadLetterSinkGenericChannel(createSubscriberFn func(ref *duckv1.KReference f.Requirement("containersource is ready", containersource.IsReady(cs)) f.Requirement("Channel has dead letter sink uri", channel_impl.HasDeadLetterSinkURI(name, channel.GVR())) - f.Assert("dls receives events", assert.OnStore(sink). - MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")). - AtLeast(1), - ) + f.Assert("dls receives events", func(ctx context.Context, t feature.T) { + assert.OnStore(sink). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + MatchEvent(test.HasType("dev.knative.eventing.samples.heartbeat")). + AtLeast(1)(ctx, t) + }) return f } @@ -307,10 +313,12 @@ func ChannelPreferHeaderCheck(createSubscriberFn func(ref *duckv1.KReference, ur )) f.Stable("test message without explicit prefer header should have the header"). - Must("delivers events", + Must("delivers events", func(ctx context.Context, t feature.T) { eventasssert.OnStore(sink).Match( + features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace()), eventasssert.HasAdditionalHeader("Prefer", "reply"), - ).AtLeast(1)) + ).AtLeast(1)(ctx, t) + }) return f } diff --git a/test/rekt/features/matchers.go b/test/rekt/features/matchers.go new file mode 100644 index 00000000000..292b84f4e0f --- /dev/null +++ b/test/rekt/features/matchers.go @@ -0,0 +1,40 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package features + +import ( + "fmt" + + "knative.dev/reconciler-test/pkg/eventshub" + + "knative.dev/eventing/pkg/apis" +) + +func HasKnNamespaceHeader(ns string) eventshub.EventInfoMatcher { + return func(info eventshub.EventInfo) error { + values, ok := info.HTTPHeaders[apis.KnNamespaceHeader] + if !ok { + return fmt.Errorf("%s header not found", apis.KnNamespaceHeader) + } + for _, v := range values { + if v == ns { + return nil + } + } + return fmt.Errorf("wanted %s header to have value %s, got %+v", apis.KnNamespaceHeader, ns, values) + } +} diff --git a/test/rekt/features/pingsource/features.go b/test/rekt/features/pingsource/features.go index 4eb76a0befe..26b51fce108 100644 --- a/test/rekt/features/pingsource/features.go +++ b/test/rekt/features/pingsource/features.go @@ -21,6 +21,7 @@ import ( "github.com/cloudevents/sdk-go/v2/test" "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/eventshub" "knative.dev/reconciler-test/pkg/eventshub/assert" "knative.dev/reconciler-test/pkg/feature" @@ -28,6 +29,7 @@ import ( "knative.dev/reconciler-test/pkg/resources/service" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" + "knative.dev/eventing/test/rekt/features" "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/eventtype" "knative.dev/eventing/test/rekt/resources/pingsource" @@ -46,7 +48,13 @@ func SendsEventsWithSinkRef() *feature.Feature { f.Stable("pingsource as event source"). Must("delivers events", - assert.OnStore(sink).MatchEvent(test.HasType("dev.knative.sources.ping")).AtLeast(1)) + func(ctx context.Context, t feature.T) { + assert.OnStore(sink). + Match(features.HasKnNamespaceHeader(environment.FromContext(ctx).Namespace())). + MatchEvent(test.HasType("dev.knative.sources.ping")). + AtLeast(1)(ctx, t) + }, + ) return f }