Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/adapter/mtping/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ func NewEnvConfig() adapter.EnvConfigAccessor {
return &adapter.EnvConfig{}
}

func NewAdapter(ctx context.Context, _ adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
func NewAdapter(ctx context.Context, env adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {
logger := logging.FromContext(ctx)
runner := NewCronJobsRunner(ceClient, kubeclient.Get(ctx), logging.FromContext(ctx))
runner := NewCronJobsRunner(adapter.GetClientConfig(ctx), kubeclient.Get(ctx), logging.FromContext(ctx))

return &mtpingAdapter{
logger: logger,
Expand Down
60 changes: 49 additions & 11 deletions pkg/adapter/mtping/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"

"knative.dev/eventing/pkg/adapter/v2"
kncloudevents "knative.dev/eventing/pkg/adapter/v2"
"knative.dev/eventing/pkg/adapter/v2/util/crstatusevent"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
Expand All @@ -50,26 +51,25 @@ type cronJobsRunner struct {
// The cron job runner
cron cron.Cron

// client sends cloudevents.
Client cloudevents.Client

// Where to send logs
Logger *zap.SugaredLogger

// kubeClient for sending k8s events
kubeClient kubernetes.Interface

clientConfig kncloudevents.ClientConfig
}

const (
resourceGroup = "pingsources.sources.knative.dev"
)

func NewCronJobsRunner(ceClient cloudevents.Client, kubeClient kubernetes.Interface, logger *zap.SugaredLogger, opts ...cron.Option) *cronJobsRunner {
func NewCronJobsRunner(cfg adapter.ClientConfig, kubeClient kubernetes.Interface, logger *zap.SugaredLogger, opts ...cron.Option) *cronJobsRunner {
return &cronJobsRunner{
cron: *cron.New(opts...),
Client: ceClient,
Logger: logger,
kubeClient: kubeClient,
cron: *cron.New(opts...),
Logger: logger,
kubeClient: kubeClient,
clientConfig: cfg,
}
}

Expand Down Expand Up @@ -107,7 +107,18 @@ func (a *cronJobsRunner) AddSchedule(source *sourcesv1.PingSource) cron.EntryID
}

ctx = kncloudevents.ContextWithMetricTag(ctx, metricTag)
id, _ := a.cron.AddFunc(schedule, a.cronTick(ctx, event))

client, err := a.newPingSourceClient(source)
if err != nil {
a.Logger.Desugar().Error("Failed to create client",
zap.String("name", source.GetName()),
zap.String("namespace", source.GetNamespace()),
zap.Error(err),
)
return -1
}

id, _ := a.cron.AddFunc(schedule, a.cronTick(ctx, client, event))
return id
}

Expand All @@ -128,7 +139,7 @@ func (a *cronJobsRunner) Stop() {
}
}

func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event) func() {
func (a *cronJobsRunner) cronTick(ctx context.Context, client adapter.Client, event cloudevents.Event) func() {
return func() {
event := event.Clone()
event.SetID(uuid.New().String()) // provide an ID here so we can track it with logging
Expand All @@ -141,11 +152,13 @@ func (a *cronJobsRunner) cronTick(ctx context.Context, event cloudevents.Event)

a.Logger.Debugf("sending cloudevent id: %s, source: %s, target: %s", event.ID(), source, target)

if result := a.Client.Send(ctx, event); !cloudevents.IsACK(result) {
if result := client.Send(ctx, event); !cloudevents.IsACK(result) {
// Exhausted number of retries. Event is lost.
a.Logger.Error("failed to send cloudevent result: ", zap.Any("result", result),
zap.String("source", source), zap.String("target", target), zap.String("id", event.ID()))
}

client.CloseIdleConnections()
}
}

Expand Down Expand Up @@ -174,3 +187,28 @@ func makeEvent(source *sourcesv1.PingSource) (cloudevents.Event, error) {

return event, nil
}

func (a *cronJobsRunner) newPingSourceClient(source *sourcesv1.PingSource) (adapter.Client, error) {
var env adapter.EnvConfig
if a.clientConfig.Env != nil {
env = adapter.EnvConfig{
Namespace: a.clientConfig.Env.GetNamespace(),
Name: a.clientConfig.Env.GetName(),
EnvSinkTimeout: fmt.Sprintf("%d", a.clientConfig.Env.GetSinktimeout()),
}
}

env.Sink = source.Status.SinkURI.String()
env.CACerts = nil // TODO CA Certs from source status

cfg := adapter.ClientConfig{
Env: &env,
CeOverrides: source.Spec.CloudEventOverrides,
Reporter: a.clientConfig.Reporter,
CrStatusEventClient: a.clientConfig.CrStatusEventClient,
Options: a.clientConfig.Options,
Client: a.clientConfig.Client,
}

return adapter.NewClient(cfg)
}
7 changes: 4 additions & 3 deletions pkg/adapter/mtping/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"knative.dev/pkg/logging"
rectesting "knative.dev/pkg/reconciler/testing"

"knative.dev/eventing/pkg/adapter/v2"
adaptertesting "knative.dev/eventing/pkg/adapter/v2/test"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
)
Expand Down Expand Up @@ -201,7 +202,7 @@ func TestAddRunRemoveSchedules(t *testing.T) {
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClient()

runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger)
runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)
entryId := runner.AddSchedule(tc.src)

entry := runner.cron.Entry(entryId)
Expand All @@ -228,7 +229,7 @@ func TestStartStopCron(t *testing.T) {
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClient()

runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger)
runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)

ctx, cancel := context.WithCancel(context.Background())
wctx, wcancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -257,7 +258,7 @@ func TestStartStopCronDelayWait(t *testing.T) {
logger := logging.FromContext(ctx)
ce := adaptertesting.NewTestClientWithDelay(time.Second * 5)

runner := NewCronJobsRunner(ce, kubeclient.Get(ctx), logger)
runner := NewCronJobsRunner(adapter.ClientConfig{Client: ce}, kubeclient.Get(ctx), logger)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
122 changes: 94 additions & 28 deletions pkg/adapter/v2/cloudevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,18 @@ import (
obsclient "knative.dev/eventing/pkg/observability/client"
)

type closeIdler interface {
CloseIdleConnections()
}

type Client interface {
cloudevents.Client
closeIdler
}

var newClientHTTPObserved = NewClientHTTPObserved

func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (ceclient.Client, error) {
func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (Client, error) {
t, err := obshttp.NewObservedHTTP(topt...)
if err != nil {
return nil, err
Expand All @@ -55,85 +64,134 @@ func NewClientHTTPObserved(topt []http.Option, copt []ceclient.Option) (ceclient
return nil, err
}

return c, nil
return &client{
ceClient: c,
}, nil
}

// NewCloudEventsClient returns a client that will apply the ceOverrides to
// outbound events and report outbound event counts.
func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (cloudevents.Client, error) {
func NewCloudEventsClient(target string, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter) (Client, error) {
opts := make([]http.Option, 0)
if len(target) > 0 {
opts = append(opts, cloudevents.WithTarget(target))
}
return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...)
return NewClient(ClientConfig{
CeOverrides: ceOverrides,
Reporter: reporter,
Options: opts,
})
}

// NewCloudEventsClientWithOptions returns a client created with provided options
func NewCloudEventsClientWithOptions(ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, opts ...http.Option) (cloudevents.Client, error) {
return newCloudEventsClientCRStatus(nil, ceOverrides, reporter, nil, opts...)
func NewCloudEventsClientWithOptions(ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter, opts ...http.Option) (Client, error) {
return NewClient(ClientConfig{
CeOverrides: ceOverrides,
Reporter: reporter,
Options: opts,
})
}

// NewCloudEventsClientCRStatus returns a client CR status
func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (cloudevents.Client, error) {
return newCloudEventsClientCRStatus(env, nil, reporter, crStatusEventClient)
func NewCloudEventsClientCRStatus(env EnvConfigAccessor, reporter source.StatsReporter, crStatusEventClient *crstatusevent.CRStatusEventClient) (Client, error) {
return NewClient(ClientConfig{
Env: env,
Reporter: reporter,
CrStatusEventClient: crStatusEventClient,
})
}
func newCloudEventsClientCRStatus(env EnvConfigAccessor, ceOverrides *duckv1.CloudEventOverrides, reporter source.StatsReporter,
crStatusEventClient *crstatusevent.CRStatusEventClient, opts ...http.Option) (cloudevents.Client, error) {

pOpts := make([]http.Option, 0)
pOpts = append(pOpts, cloudevents.WithRoundTripper(&ochttp.Transport{
type ClientConfig struct {
Env EnvConfigAccessor
CeOverrides *duckv1.CloudEventOverrides
Reporter source.StatsReporter
CrStatusEventClient *crstatusevent.CRStatusEventClient
Options []http.Option

Client Client
}

type clientConfigKey struct{}

func withClientConfig(ctx context.Context, r ClientConfig) context.Context {
return context.WithValue(ctx, clientConfigKey{}, r)
}

func GetClientConfig(ctx context.Context) ClientConfig {
val := ctx.Value(clientConfigKey{})
if val == nil {
return ClientConfig{}
}
return val.(ClientConfig)
}

func NewClient(cfg ClientConfig) (Client, error) {
if cfg.Client != nil {
return cfg.Client, nil
}

transport := &ochttp.Transport{
Propagation: tracecontextb3.TraceContextEgress,
}))
}

pOpts := make([]http.Option, 0)
var closeIdler closeIdler = nethttp.DefaultTransport.(*nethttp.Transport)

if env != nil {
if target := env.GetSink(); len(target) > 0 {
ceOverrides := cfg.CeOverrides
if cfg.Env != nil {
if target := cfg.Env.GetSink(); len(target) > 0 {
pOpts = append(pOpts, cloudevents.WithTarget(target))
}
if sinkWait := env.GetSinktimeout(); sinkWait > 0 {
if sinkWait := cfg.Env.GetSinktimeout(); sinkWait > 0 {
pOpts = append(pOpts, setTimeOut(time.Duration(sinkWait)*time.Second))
}
if caCerts := env.GetCACerts(); (caCerts != nil && *caCerts != "") && eventingtls.IsHttpsSink(env.GetSink()) {
if caCerts := cfg.Env.GetCACerts(); (caCerts != nil && *caCerts != "") && eventingtls.IsHttpsSink(cfg.Env.GetSink()) {
var err error

clientConfig := eventingtls.NewDefaultClientConfig()
clientConfig.CACerts = caCerts

transport := nethttp.DefaultTransport.(*nethttp.Transport).Clone()
transport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig)
httpTransport := nethttp.DefaultTransport.(*nethttp.Transport).Clone()
httpTransport.TLSClientConfig, err = eventingtls.GetTLSClientConfig(clientConfig)
if err != nil {
return nil, err
}

pOpts = append(pOpts, http.WithRoundTripper(&ochttp.Transport{
Base: transport,
closeIdler = httpTransport

transport = &ochttp.Transport{
Base: httpTransport,
Propagation: tracecontextb3.TraceContextEgress,
}))
}
}
if ceOverrides == nil {
var err error
ceOverrides, err = env.GetCloudEventOverrides()
ceOverrides, err = cfg.Env.GetCloudEventOverrides()
if err != nil {
return nil, err
}
}
}

pOpts = append(pOpts, http.WithRoundTripper(transport))

// Make sure that explicitly set options have priority
opts = append(pOpts, opts...)
opts := append(pOpts, cfg.Options...)

ceClient, err := newClientHTTPObserved(opts, nil)

if crStatusEventClient == nil {
crStatusEventClient = crstatusevent.GetDefaultClient()
if cfg.CrStatusEventClient == nil {
cfg.CrStatusEventClient = crstatusevent.GetDefaultClient()
}
if err != nil {
return nil, err
}
return &client{
ceClient: ceClient,
closeIdler: closeIdler,
ceOverrides: ceOverrides,
reporter: reporter,
crStatusEventClient: crStatusEventClient,
reporter: cfg.Reporter,
crStatusEventClient: cfg.CrStatusEventClient,
}, nil
}

Expand All @@ -155,6 +213,11 @@ type client struct {
ceOverrides *duckv1.CloudEventOverrides
reporter source.StatsReporter
crStatusEventClient *crstatusevent.CRStatusEventClient
closeIdler closeIdler
}

func (c *client) CloseIdleConnections() {
c.closeIdler.CloseIdleConnections()
}

var _ cloudevents.Client = (*client)(nil)
Expand Down Expand Up @@ -189,6 +252,9 @@ func (c *client) applyOverrides(event *cloudevents.Event) {
}

func (c *client) reportMetrics(ctx context.Context, event cloudevents.Event, result protocol.Result) {
if c.reporter == nil {
return
}
tags := MetricTagFromContext(ctx)
reportArgs := &source.ReportArgs{
Namespace: tags.Namespace,
Expand Down
6 changes: 4 additions & 2 deletions pkg/adapter/v2/cloudevents_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestNewCloudEventsClient_send(t *testing.T) {
}
}

defer func(restoreHTTP func(topt []http.Option, copt []v2client.Option) (v2client.Client, error), restoreEnv string, setEnv bool) {
defer func(restoreHTTP func(topt []http.Option, copt []v2client.Option) (Client, error), restoreEnv string, setEnv bool) {
newClientHTTPObserved = restoreHTTP
if setEnv {
if err := os.Setenv("K_SINK_TIMEOUT", restoreEnv); err != nil {
Expand All @@ -157,7 +157,7 @@ func TestNewCloudEventsClient_send(t *testing.T) {
}(restoreHTTP, restoreEnv, setEnv)

sendOptions := []http.Option{}
newClientHTTPObserved = func(topt []http.Option, copt []v2client.Option) (v2client.Client, error) {
newClientHTTPObserved = func(topt []http.Option, copt []v2client.Option) (Client, error) {
sendOptions = append(sendOptions, topt...)
return nil, nil
}
Expand Down Expand Up @@ -358,6 +358,8 @@ func TestTLS(t *testing.T) {
} else if cloudevents.IsNACK(result) || cloudevents.IsUndelivered(result) {
t.Fatalf("wantErr %v, got %v IsACK %v", tc.wantErr, result, cloudevents.IsACK(result))
}

c.CloseIdleConnections()
})
}
}
Expand Down
Loading