Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Variable substitution from environment variables is not longer supported. {pull}15937{15937}
- Change aws_elb autodiscover provider field name from elb_listener.* to aws.elb.*. {issue}16219[16219] {pull}16402{16402}
- Remove `AddDockerMetadata` and `AddKubernetesMetadata` processors from the `script` processor. They can still be used as normal processors in the configuration. {issue}16349[16349] {pull}16514[16514]
- Allow the use of add_docker_metadata and add_kubernetes_metadata processors in global configuration only. {issue}16349[16349] {pull}16653[16653]

*Auditbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
var processCgroupPaths = cgroup.ProcessCgroupPaths

func init() {
processors.RegisterPlugin(processorName, New)
processors.RegisterStatefulPlugin(processorName, New)
}

type addDockerMetadata struct {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/processors/add_kubernetes_metadata/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type kubernetesAnnotator struct {
}

func init() {
processors.RegisterPlugin("add_kubernetes_metadata", New)
processors.RegisterStatefulPlugin("add_kubernetes_metadata", New)

// Register default indexers
Indexing.AddIndexer(PodNameIndexerName, NewPodNameIndexer)
Expand Down
12 changes: 11 additions & 1 deletion libbeat/processors/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ func NewList(log *logp.Logger) *Processors {

// New creates a list of processors from a list of free user configurations.
func New(config PluginConfig) (*Processors, error) {
return newFromRegistry(registry, config)
}

// NewStateful creates a list of processors from a list of free user configurations.
// These processors may need to be closed once they are not used anymore.
func NewStateful(config PluginConfig) (*Processors, error) {
return newFromRegistry(statefulRegistry, config)
}

func newFromRegistry(registry *Namespace, config PluginConfig) (*Processors, error) {
procs := NewList(nil)

for _, procConfig := range config {
Expand Down Expand Up @@ -84,7 +94,7 @@ func New(config PluginConfig) (*Processors, error) {
validActions = append(validActions, k)

}
return nil, errors.Errorf("the processor action %s does not exist. Valid actions: %v", actionName, strings.Join(validActions, ", "))
return nil, errors.Errorf("the processor action %s is not available in this context. Valid actions: %v", actionName, strings.Join(validActions, ", "))
}

actionCfg.PrintDebugf("Configure processor action '%v' with:", actionName)
Expand Down
17 changes: 17 additions & 0 deletions libbeat/processors/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,29 @@ func init() {
type Constructor func(config *common.Config) (Processor, error)

var registry = NewNamespace()
var statefulRegistry = NewNamespace()

// RegisterPlugin register a stateless processor
func RegisterPlugin(name string, constructor Constructor) {
logp.L().Named(logName).Debugf("Register plugin %s", name)

err := registry.Register(name, constructor)
if err != nil {
panic(err)
}

err = statefulRegistry.Register(name, constructor)
if err != nil {
panic(err)
}
}

// RegisterPlugin register processor that needs to be closed
func RegisterStatefulPlugin(name string, constructor Constructor) {
logp.L().Named(logName).Debugf("Register plugin %s", name)

err := statefulRegistry.Register(name, constructor)
if err != nil {
panic(err)
}
}
2 changes: 1 addition & 1 deletion libbeat/publisher/processing/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func MakeDefaultSupport(
return nil, err
}

processors, err := processors.New(cfg.Processors)
processors, err := processors.NewStateful(cfg.Processors)
if err != nil {
return nil, fmt.Errorf("error initializing processors: %v", err)
}
Expand Down