Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Benedikt Bongartz <[email protected]>
  • Loading branch information
frzifus committed Oct 4, 2024
1 parent 91759b0 commit 02cb882
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 23 deletions.
2 changes: 1 addition & 1 deletion apis/v1beta1/collector_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (c CollectorWebhook) Default(_ context.Context, obj runtime.Object) error {
if len(otelcol.Spec.ManagementState) == 0 {
otelcol.Spec.ManagementState = ManagementStateManaged
}
return nil
return otelcol.Spec.Config.ApplyDefaults(c.logger)
}

func (c CollectorWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
Expand Down
38 changes: 38 additions & 0 deletions apis/v1beta1/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,40 @@ func (c *Config) getPortsForComponentKinds(logger logr.Logger, componentKinds ..
return ports, nil
}

// getPortsForComponentKinds gets the ports for the given ComponentKind(s).
func (c *Config) applyDefaultForComponentKinds(logger logr.Logger, componentKinds ...ComponentKind) error {
enabledComponents := c.GetEnabledComponents()
for _, componentKind := range componentKinds {
var retriever components.ParserRetriever
var cfg AnyConfig
switch componentKind {
case KindReceiver:
retriever = receivers.ReceiverFor
cfg = c.Receivers
case KindExporter:
continue
case KindProcessor:
continue
case KindExtension:
continue
}
for componentName := range enabledComponents[componentKind] {
parser := retriever(componentName)
if newCfg, err := parser.GetDefaultConfig(logger, cfg.Object[componentName]); err != nil {
return err
} else {
cc, ok := newCfg.(map[string]interface{})
if !ok {
return fmt.Errorf("could not apply defaults to receiver: %s", componentName)
}
cfg.Object = cc
}
}
}

return nil
}

func (c *Config) GetReceiverPorts(logger logr.Logger) ([]corev1.ServicePort, error) {
return c.getPortsForComponentKinds(logger, KindReceiver)
}
Expand All @@ -241,6 +275,10 @@ func (c *Config) GetAllRbacRules(logger logr.Logger) ([]rbacv1.PolicyRule, error
return c.getRbacRulesForComponentKinds(logger, KindReceiver, KindExporter, KindProcessor)
}

func (c *Config) ApplyDefaults(logger logr.Logger) error {
return c.applyDefaultForComponentKinds(logger, KindReceiver)
}

// GetLivenessProbe gets the first enabled liveness probe. There should only ever be one extension enabled
// that provides the hinting for the liveness probe.
func (c *Config) GetLivenessProbe(logger logr.Logger) (*corev1.Probe, error) {
Expand Down
46 changes: 30 additions & 16 deletions internal/components/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,18 @@ import (
type ParserOption[ComponentConfigType any] func(*Settings[ComponentConfigType])

type Settings[ComponentConfigType any] struct {
protocol corev1.Protocol
appProtocol *string
targetPort intstr.IntOrString
nodePort int32
name string
port int32
portParser PortParser[ComponentConfigType]
rbacGen RBACRuleGenerator[ComponentConfigType]
livenessGen ProbeGenerator[ComponentConfigType]
readinessGen ProbeGenerator[ComponentConfigType]
protocol corev1.Protocol
appProtocol *string
targetPort intstr.IntOrString
nodePort int32
name string
port int32
defaultRecAddr string
portParser PortParser[ComponentConfigType]
rbacGen RBACRuleGenerator[ComponentConfigType]
livenessGen ProbeGenerator[ComponentConfigType]
readinessGen ProbeGenerator[ComponentConfigType]
defaultsApplier Defaulter[ComponentConfigType]
}

func NewEmptySettings[ComponentConfigType any]() *Settings[ComponentConfigType] {
Expand Down Expand Up @@ -75,6 +77,11 @@ func (b Builder[ComponentConfigType]) WithAppProtocol(appProtocol *string) Build
o.appProtocol = appProtocol
})
}
func (b Builder[ComponentConfigType]) WithDefaultRecAddress(defaultRecAddr string) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.defaultRecAddr = defaultRecAddr
})
}
func (b Builder[ComponentConfigType]) WithTargetPort(targetPort int32) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.targetPort = intstr.FromInt32(targetPort)
Expand Down Expand Up @@ -118,19 +125,26 @@ func (b Builder[ComponentConfigType]) WithReadinessGen(readinessGen ProbeGenerat
})
}

func (b Builder[ComponentConfigType]) WithDefaultsApplier(defaultsApplier Defaulter[ComponentConfigType]) Builder[ComponentConfigType] {
return append(b, func(o *Settings[ComponentConfigType]) {
o.defaultsApplier = defaultsApplier
})
}

func (b Builder[ComponentConfigType]) Build() (*GenericParser[ComponentConfigType], error) {
o := NewEmptySettings[ComponentConfigType]()
o.Apply(b...)
if len(o.name) == 0 {
return nil, fmt.Errorf("invalid settings struct, no name specified")
}
return &GenericParser[ComponentConfigType]{
name: o.name,
portParser: o.portParser,
rbacGen: o.rbacGen,
livenessGen: o.livenessGen,
readinessGen: o.readinessGen,
settings: o,
name: o.name,
portParser: o.portParser,
rbacGen: o.rbacGen,
livenessGen: o.livenessGen,
readinessGen: o.readinessGen,
defaultsApplier: o.defaultsApplier,
settings: o,
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions internal/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type RBACRuleGenerator[ComponentConfigType any] func(logger logr.Logger, config
// It's expected that type Config is the configuration used by a parser.
type ProbeGenerator[ComponentConfigType any] func(logger logr.Logger, config ComponentConfigType) (*corev1.Probe, error)

// Defaulter is a function that applies given defaults to the passed Config.
// It's expected that type Config is the configuration used by a parser.
type Defaulter[ComponentConfigType any] func(logger logr.Logger, defaultRecAddr string, config ComponentConfigType) (ComponentConfigType, error)

// ComponentType returns the type for a given component name.
// components have a name like:
// - mycomponent/custom
Expand Down Expand Up @@ -87,6 +91,9 @@ func PortFromEndpoint(endpoint string) (int32, error) {
type ParserRetriever func(string) Parser

type Parser interface {
// GetDefaultConfig .. TODO
GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error)

// Ports returns the service ports parsed based on the component's configuration where name is the component's name
// of the form "name" or "type/name"
Ports(logger logr.Logger, name string, config interface{}) ([]corev1.ServicePort, error)
Expand Down
46 changes: 40 additions & 6 deletions internal/components/generic_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package components

import (
"fmt"
"strings"

"github.com/go-logr/logr"
"github.com/mitchellh/mapstructure"
Expand All @@ -27,15 +28,48 @@ var (
_ Parser = &GenericParser[SingleEndpointConfig]{}
)

// AddressDefaulter ...
func AddressDefaulter(logger logr.Logger, defaultRecAddr string, config *SingleEndpointConfig) (*SingleEndpointConfig, error) {
if config.Endpoint == "" {
return config, nil
}

v := strings.Split(config.Endpoint, ":")
if len(v) < 1 {
return config, nil
}
if v[0] == "" {
config.Endpoint = fmt.Sprintf("%s:%s", defaultRecAddr, v[1])
}
return config, nil
}

// GenericParser serves as scaffolding for custom parsing logic by isolating
// functionality to idempotent functions.
type GenericParser[T any] struct {
name string
settings *Settings[T]
portParser PortParser[T]
rbacGen RBACRuleGenerator[T]
livenessGen ProbeGenerator[T]
readinessGen ProbeGenerator[T]
name string
settings *Settings[T]
portParser PortParser[T]
rbacGen RBACRuleGenerator[T]
livenessGen ProbeGenerator[T]
readinessGen ProbeGenerator[T]
defaultsApplier Defaulter[T]
}

func (g *GenericParser[T]) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) {
if g.settings == nil {
return config, nil
}

if g.settings.defaultRecAddr == "" {
return config, nil
}

var parsed T
if err := mapstructure.Decode(config, &parsed); err != nil {
return nil, err
}
return g.defaultsApplier(logger, g.settings.defaultRecAddr, parsed)
}

func (g *GenericParser[T]) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {
Expand Down
17 changes: 17 additions & 0 deletions internal/components/multi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,23 @@ func (m *MultiPortReceiver) ParserName() string {
return fmt.Sprintf("__%s", m.name)
}

func (m *MultiPortReceiver) GetDefaultConfig(logger logr.Logger, config interface{}) (interface{}, error) {
multiProtoEndpointCfg := &MultiProtocolEndpointConfig{}
if err := mapstructure.Decode(config, multiProtoEndpointCfg); err != nil {
return nil, err
}
for protocol, ec := range multiProtoEndpointCfg.Protocols {
// TODO: Should we take default receiver address from settngs?
res, err := AddressDefaulter(logger, "0.0.0.0", ec)
if err != nil {
return nil, err
}
multiProtoEndpointCfg.Protocols[protocol].Endpoint = res.Endpoint
}
// Encode and return.
return config, mapstructure.Decode(multiProtoEndpointCfg, config)

}
func (m *MultiPortReceiver) GetLivenessProbe(logger logr.Logger, config interface{}) (*corev1.Probe, error) {
return nil, nil
}
Expand Down
2 changes: 2 additions & 0 deletions internal/components/receivers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ var (
components.NewMultiPortReceiverBuilder("otlp").
AddPortMapping(components.NewProtocolBuilder("grpc", 4317).
WithAppProtocol(&components.GrpcProtocol).
WithDefaultRecAddress("0.0.0.0").
WithTargetPort(4317)).
AddPortMapping(components.NewProtocolBuilder("http", 4318).
WithAppProtocol(&components.HttpProtocol).
WithDefaultRecAddress("0.0.0.0").
WithTargetPort(4318)).
MustBuild(),
components.NewMultiPortReceiverBuilder("skywalking").
Expand Down

0 comments on commit 02cb882

Please sign in to comment.