diff --git a/pkg/adapter/mtping/adapter.go b/pkg/adapter/mtping/adapter.go index 745777a060a..589ee80501c 100644 --- a/pkg/adapter/mtping/adapter.go +++ b/pkg/adapter/mtping/adapter.go @@ -56,9 +56,9 @@ func NewEnvConfig() adapter.EnvConfigAccessor { return &adapter.EnvConfig{} } -func NewAdapter(ctx context.Context, _ adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter { +func NewAdapter(ctx context.Context, env adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter { logger := logging.FromContext(ctx) - runner := NewCronJobsRunner(ceClient, kubeclient.Get(ctx), logging.FromContext(ctx)) + runner := NewCronJobsRunner(adapter.GetClientConfig(ctx), kubeclient.Get(ctx), logging.FromContext(ctx)) return &mtpingAdapter{ logger: logger, diff --git a/pkg/adapter/mtping/runner.go b/pkg/adapter/mtping/runner.go index 135861ca687..19f8e36763d 100644 --- a/pkg/adapter/mtping/runner.go +++ b/pkg/adapter/mtping/runner.go @@ -33,6 +33,7 @@ import ( typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" + "knative.dev/eventing/pkg/adapter/v2" kncloudevents "knative.dev/eventing/pkg/adapter/v2" "knative.dev/eventing/pkg/adapter/v2/util/crstatusevent" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" @@ -50,26 +51,25 @@ type cronJobsRunner struct { // The cron job runner cron cron.Cron - // client sends cloudevents. - Client cloudevents.Client - // Where to send logs Logger *zap.SugaredLogger // kubeClient for sending k8s events kubeClient kubernetes.Interface + + clientConfig kncloudevents.ClientConfig } const ( resourceGroup = "pingsources.sources.knative.dev" ) -func NewCronJobsRunner(ceClient cloudevents.Client, kubeClient kubernetes.Interface, logger *zap.SugaredLogger, opts ...cron.Option) *cronJobsRunner { +func NewCronJobsRunner(cfg adapter.ClientConfig, kubeClient kubernetes.Interface, logger *zap.SugaredLogger, opts ...cron.Option) *cronJobsRunner { return &cronJobsRunner{ - cron: *cron.New(opts...), - Client: ceClient, - Logger: logger, - kubeClient: kubeClient, + cron: *cron.New(opts...), + Logger: logger, + kubeClient: kubeClient, + clientConfig: cfg, } } @@ -107,7 +107,18 @@ func (a *cronJobsRunner) AddSchedule(source *sourcesv1.PingSource) cron.EntryID } ctx = kncloudevents.ContextWithMetricTag(ctx, metricTag) - id, _ := a.cron.AddFunc(schedule, a.cronTick(ctx, event)) + + client, err := a.newPingSourceClient(source) + if err != nil { + a.Logger.Desugar().Error("Failed to create client", + zap.String("name", source.GetName()), + zap.String("namespace", source.GetNamespace()), + zap.Error(err), + ) + return -1 + } + + id, _ := a.cron.AddFunc(schedule, a.cronTick(ctx, client, event)) return id } @@ -128,7 +139,7 @@ func (a *cronJobsRunner) Stop() { } } -func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event) func() { +func (a *cronJobsRunner) cronTick(ctx context.Context, client adapter.Client, event cloudevents.Event) func() { return func() { event := event.Clone() event.SetID(uuid.New().String()) // provide an ID here so we can track it with logging @@ -141,11 +152,13 @@ func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event) a.Logger.Debugf("sending cloudevent id: %s, source: %s, target: %s", event.ID(), source, target) - if result := a.Client.Send(ctx, event); !cloudevents.IsACK(result) { + if result := client.Send(ctx, event); !cloudevents.IsACK(result) { // Exhausted number of retries. Event is lost. a.Logger.Error("failed to send cloudevent result: ", zap.Any("result", result), zap.String("source", source), zap.String("target", target), zap.String("id", event.ID())) } + + client.CloseIdleConnections() } } @@ -174,3 +187,28 @@ func makeEvent(source *sourcesv1.PingSource) (cloudevents.Event, error) { return event, nil } + +func (a *cronJobsRunner) newPingSourceClient(source *sourcesv1.PingSource) (adapter.Client, error) { + var env adapter.EnvConfig + if a.clientConfig.Env != nil { + env = adapter.EnvConfig{ + Namespace: a.clientConfig.Env.GetNamespace(), + Name: a.clientConfig.Env.GetName(), + EnvSinkTimeout: fmt.Sprintf("%d", a.clientConfig.Env.GetSinktimeout()), + } + } + + env.Sink = source.Status.SinkURI.String() + env.CACerts = nil // TODO CA Certs from source status + + cfg := adapter.ClientConfig{ + Env: &env, + CeOverrides: source.Spec.CloudEventOverrides, + Reporter: a.clientConfig.Reporter, + CrStatusEventClient: a.clientConfig.CrStatusEventClient, + Options: a.clientConfig.Options, + Client: a.clientConfig.Client, + } + + return adapter.NewClient(cfg) +} diff --git a/pkg/adapter/mtping/runner_test.go b/pkg/adapter/mtping/runner_test.go index 541a3f473c0..2705069ac08 100644 --- a/pkg/adapter/mtping/runner_test.go +++ b/pkg/adapter/mtping/runner_test.go @@ -35,6 +35,7 @@ import ( "knative.dev/pkg/logging" rectesting "knative.dev/pkg/reconciler/testing" + "knative.dev/eventing/pkg/adapter/v2" adaptertesting "knative.dev/eventing/pkg/adapter/v2/test" sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1" ) @@ -201,7 +202,7 @@ func TestAddRunRemoveSchedules(t *testing.T) { logger := logging.FromContext(ctx) ce := adaptertesting.NewTestClient() - runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger) + runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger) entryId := runner.AddSchedule(tc.src) entry := runner.cron.Entry(entryId) @@ -228,7 +229,7 @@ func TestStartStopCron(t *testing.T) { logger := logging.FromContext(ctx) ce := adaptertesting.NewTestClient() - runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger) + runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger) ctx, cancel := context.WithCancel(context.Background()) wctx, wcancel := context.WithCancel(context.Background()) @@ -257,7 +258,7 @@ func TestStartStopCronDelayWait(t *testing.T) { logger := logging.FromContext(ctx) ce := adaptertesting.NewTestClientWithDelay(time.Second * 5) - runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger) + runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/pkg/adapter/v2/cloudevents.go b/pkg/adapter/v2/cloudevents.go index dae960b5e4b..980689b25a4 100644 --- a/pkg/adapter/v2/cloudevents.go +++ b/pkg/adapter/v2/cloudevents.go @@ -40,9 +40,18 @@ import ( obsclient "knative.dev/eventing/pkg/observability/client" ) +type closeIdler interface { + CloseIdleConnections() +} + +type Client interface { + cloudevents.Client + closeIdler +} + var newClientHTTPObserved = NewClientHTTPObserved -func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (ceclient.Client, error) { +func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (Client, error) { t, err := obshttp.NewObservedHTTP(topt...) if err != nil { return nil, err @@ -55,85 +64,134 @@ func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (ceclient return nil, err } - return c, nil + return &client{ + ceClient: c, + }, nil } // NewCloudEventsClient returns a client that will apply the ceOverrides to // outbound events and report outbound event counts. -func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (cloudevents.Client, error) { +func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (Client, error) { opts := make([]http.Option, 0) if len(target) > 0 { opts = append(opts, cloudevents.WithTarget(target)) } - return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...) + return NewClient(ClientConfig{ + CeOverrides: ceOverrides, + Reporter: reporter, + Options: opts, + }) } // NewCloudEventsClientWithOptions returns a client created with provided options -func NewCloudEventsClientWithOptions(ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, opts ...http.Option) (cloudevents.Client, error) { - return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...) +func NewCloudEventsClientWithOptions(ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, opts ...http.Option) (Client, error) { + return NewClient(ClientConfig{ + CeOverrides: ceOverrides, + Reporter: reporter, + Options: opts, + }) } // NewCloudEventsClientCRStatus returns a client CR status -func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) { - return newCloudEventsClientCRStatus(env, nil, reporter, crStatusEventClient) +func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (Client, error) { + return NewClient(ClientConfig{ + Env: env, + Reporter: reporter, + CrStatusEventClient: crStatusEventClient, + }) } -func newCloudEventsClientCRStatus(env EnvConfigAccessor, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, - crStatusEventClient *crstatusevent.CRStatusEventClient, opts ...http.Option) (cloudevents.Client, error) { - pOpts := make([]http.Option, 0) - pOpts = append(pOpts, cloudevents.WithRoundTripper(&ochttp.Transport{ +type ClientConfig struct { + Env EnvConfigAccessor + CeOverrides *duckv1.CloudEventOverrides + Reporter source.StatsReporter + CrStatusEventClient *crstatusevent.CRStatusEventClient + Options []http.Option + + Client Client +} + +type clientConfigKey struct{} + +func withClientConfig(ctx context.Context, r ClientConfig) context.Context { + return context.WithValue(ctx, clientConfigKey{}, r) +} + +func GetClientConfig(ctx context.Context) ClientConfig { + val := ctx.Value(clientConfigKey{}) + if val == nil { + return ClientConfig{} + } + return val.(ClientConfig) +} + +func NewClient(cfg ClientConfig) (Client, error) { + if cfg.Client != nil { + return cfg.Client, nil + } + + transport := &ochttp.Transport{ Propagation: tracecontextb3.TraceContextEgress, - })) + } + + pOpts := make([]http.Option, 0) + var closeIdler closeIdler = nethttp.DefaultTransport.(*nethttp.Transport) - if env != nil { - if target := env.GetSink(); len(target) > 0 { + ceOverrides := cfg.CeOverrides + if cfg.Env != nil { + if target := cfg.Env.GetSink(); len(target) > 0 { pOpts = append(pOpts, cloudevents.WithTarget(target)) } - if sinkWait := env.GetSinktimeout(); sinkWait > 0 { + if sinkWait := cfg.Env.GetSinktimeout(); sinkWait > 0 { pOpts = append(pOpts, setTimeOut(time.Duration(sinkWait)*time.Second)) } - if caCerts := env.GetCACerts(); (caCerts != nil && *caCerts != "") && eventingtls.IsHttpsSink(env.GetSink()) { + if caCerts := cfg.Env.GetCACerts(); (caCerts != nil && *caCerts != "") && eventingtls.IsHttpsSink(cfg.Env.GetSink()) { var err error clientConfig := eventingtls.NewDefaultClientConfig() clientConfig.CACerts = caCerts - transport := nethttp.DefaultTransport.(*nethttp.Transport).Clone() - transport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig) + httpTransport := nethttp.DefaultTransport.(*nethttp.Transport).Clone() + httpTransport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig) if err != nil { return nil, err } - pOpts = append(pOpts, http.WithRoundTripper(&ochttp.Transport{ - Base: transport, + closeIdler = httpTransport + + transport = &ochttp.Transport{ + Base: httpTransport, Propagation: tracecontextb3.TraceContextEgress, - })) + } } if ceOverrides == nil { var err error - ceOverrides, err = env.GetCloudEventOverrides() + ceOverrides, err = cfg.Env.GetCloudEventOverrides() if err != nil { return nil, err } } } + pOpts = append(pOpts, http.WithRoundTripper(transport)) + // Make sure that explicitly set options have priority - opts = append(pOpts, opts...) + opts := append(pOpts, cfg.Options...) ceClient, err := newClientHTTPObserved(opts, nil) - if crStatusEventClient == nil { - crStatusEventClient = crstatusevent.GetDefaultClient() + if cfg.CrStatusEventClient == nil { + cfg.CrStatusEventClient = crstatusevent.GetDefaultClient() } if err != nil { return nil, err } return &client{ ceClient: ceClient, + closeIdler: closeIdler, ceOverrides: ceOverrides, - reporter: reporter, - crStatusEventClient: crStatusEventClient, + reporter: cfg.Reporter, + crStatusEventClient: cfg.CrStatusEventClient, }, nil } @@ -155,6 +213,11 @@ type client struct { ceOverrides *duckv1.CloudEventOverrides reporter source.StatsReporter crStatusEventClient *crstatusevent.CRStatusEventClient + closeIdler closeIdler +} + +func (c *client) CloseIdleConnections() { + c.closeIdler.CloseIdleConnections() } var _ cloudevents.Client = (*client)(nil) @@ -189,6 +252,9 @@ func (c *client) applyOverrides(event *cloudevents.Event) { } func (c *client) reportMetrics(ctx context.Context, event cloudevents.Event, result protocol.Result) { + if c.reporter == nil { + return + } tags := MetricTagFromContext(ctx) reportArgs := &source.ReportArgs{ Namespace: tags.Namespace, diff --git a/pkg/adapter/v2/cloudevents_test.go b/pkg/adapter/v2/cloudevents_test.go index 2b9fd500ce4..ab025a82a73 100644 --- a/pkg/adapter/v2/cloudevents_test.go +++ b/pkg/adapter/v2/cloudevents_test.go @@ -142,7 +142,7 @@ func TestNewCloudEventsClient_send(t *testing.T) { } } - defer func(restoreHTTP func(topt []http.Option, copt []v2client.Option) (v2client.Client, error), restoreEnv string, setEnv bool) { + defer func(restoreHTTP func(topt []http.Option, copt []v2client.Option) (Client, error), restoreEnv string, setEnv bool) { newClientHTTPObserved = restoreHTTP if setEnv { if err := os.Setenv("K_SINK_TIMEOUT", restoreEnv); err != nil { @@ -157,7 +157,7 @@ func TestNewCloudEventsClient_send(t *testing.T) { }(restoreHTTP, restoreEnv, setEnv) sendOptions := []http.Option{} - newClientHTTPObserved = func(topt []http.Option, copt []v2client.Option) (v2client.Client, error) { + newClientHTTPObserved = func(topt []http.Option, copt []v2client.Option) (Client, error) { sendOptions = append(sendOptions, topt...) return nil, nil } @@ -358,6 +358,8 @@ func TestTLS(t *testing.T) { } else if cloudevents.IsNACK(result) || cloudevents.IsUndelivered(result) { t.Fatalf("wantErr %v, got %v IsACK %v", tc.wantErr, result, cloudevents.IsACK(result)) } + + c.CloseIdleConnections() }) } } diff --git a/pkg/adapter/v2/main.go b/pkg/adapter/v2/main.go index 582e468c7a8..a839cd66896 100644 --- a/pkg/adapter/v2/main.go +++ b/pkg/adapter/v2/main.go @@ -214,7 +214,14 @@ func MainWithInformers(ctx context.Context, component string, env EnvConfigAcces logger.Errorw("Error building statsreporter", zap.Error(err)) } - eventsClient, err := NewCloudEventsClientCRStatus(env, reporter, crStatusEventClient) + clientConfig := ClientConfig{ + Env: env, + Reporter: reporter, + CrStatusEventClient: crStatusEventClient, + } + ctx = withClientConfig(ctx, clientConfig) + + eventsClient, err := NewClient(clientConfig) if err != nil { logger.Fatalw("Error building cloud event client", zap.Error(err)) } diff --git a/pkg/adapter/v2/test/test_client.go b/pkg/adapter/v2/test/test_client.go index 32ececf6c4f..602f7b9b57b 100644 --- a/pkg/adapter/v2/test/test_client.go +++ b/pkg/adapter/v2/test/test_client.go @@ -41,12 +41,13 @@ type TestCloudEventsClient struct { } } +func (c *TestCloudEventsClient) CloseIdleConnections() { +} + type EventData struct { ID string `json:"id"` Type string `json:"type"` } - -var _ cloudevents.Client = (*TestCloudEventsClient)(nil) var eventData EventData // Send_AppendResult will enqueue a response for the following Send call. diff --git a/pkg/metrics/source/stats_reporter.go b/pkg/metrics/source/stats_reporter.go index ae45b968fa2..9fcce5ca338 100644 --- a/pkg/metrics/source/stats_reporter.go +++ b/pkg/metrics/source/stats_reporter.go @@ -20,9 +20,10 @@ import ( "context" "go.opencensus.io/stats/view" - eventingmetrics "knative.dev/eventing/pkg/metrics" "knative.dev/pkg/metrics" + eventingmetrics "knative.dev/eventing/pkg/metrics" + "go.opencensus.io/stats" "go.opencensus.io/tag" )