Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions apis/fluentd/v1alpha1/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,48 @@ func (r *CfgResources) filterForOutputs(
return nil
}

// IdentifyCopyAndPatchOutput patches up the controller with the Manager
func (pgr *PluginResources) IdentifyCopyAndPatchOutput(cfgResources *CfgResources) error {
// patched structure for OutputPlugins
patchedOutputPlugins := []params.PluginStore{}
// copyOutputs stores the id if the output is a `copy`
copyOutputs := map[string]int{}
// outputs stores the id if the output is not a `copy`
outputs := map[string][]int{}

// Iterate over cfgResources.OutputPlugins to identify Copy output
for id, ps := range cfgResources.OutputPlugins {
if ps.Store["@type"] == string(params.CopyOutputType) {
// We store last output when 2 output with the same tag
copyOutputs[ps.Store["tag"]] = id
} else {
outputs[ps.Store["tag"]] = append(outputs[ps.Store["tag"]], id)
}
}

// Patch the outputs
for k, output := range outputs {
// Does it exist a copy output for this tag ?
if c, ok := copyOutputs[k]; ok {
// Yes, so we patch
for _, id := range output {
o := cfgResources.OutputPlugins[id]
o.Name = "store"
cfgResources.OutputPlugins[c].InsertChilds(&o)
}
patchedOutputPlugins = append(patchedOutputPlugins, cfgResources.OutputPlugins[c])
} else {
// No, we don't patch
for _, id := range output {
o := cfgResources.OutputPlugins[id]
patchedOutputPlugins = append(patchedOutputPlugins, o)
}
}
}
cfgResources.OutputPlugins = patchedOutputPlugins
return nil
}

// convert the cfg plugins to a label plugin, appends to the global label plugins
func (pgr *PluginResources) WithCfgResources(cfgRouteLabel string, r *CfgResources) error {
if len(r.InputPlugins) == 0 && len(r.FilterPlugins) == 0 && len(r.OutputPlugins) == 0 {
Expand Down
8 changes: 8 additions & 0 deletions apis/fluentd/v1alpha1/plugins/output/copy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package output

// Copy defines the parameters for out_Copy plugin
type Copy struct {
// CopyMode defines how to pass the events to <store> plugins.
// +kubebuilder:validation:Enum:=no_copy;shallow;deep;marshal
CopyMode *string `json:"copyMode"`
}
15 changes: 15 additions & 0 deletions apis/fluentd/v1alpha1/plugins/output/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ type Output struct {
CloudWatch *CloudWatch `json:"cloudWatch,omitempty"`
// datadog plugin
Datadog *Datadog `json:"datadog,omitempty"`
// copy plugin
Copy *Copy `json:"copy,omitempty"`
}

// DeepCopyInto implements the DeepCopyInto interface.
Expand Down Expand Up @@ -163,6 +165,12 @@ func (o *Output) Params(loader plugins.SecretLoader) (*params.PluginStore, error
ps.InsertType(string(params.DatadogOutputType))
return o.datadogPlugin(ps, loader), nil
}

if o.Copy != nil {
ps.InsertType(string(params.CopyOutputType))
return o.copyPlugin(ps, loader), nil
}

return o.customOutput(ps, loader), nil

}
Expand Down Expand Up @@ -907,4 +915,11 @@ func (o *Output) datadogPlugin(parent *params.PluginStore, sl plugins.SecretLoad
return parent
}

func (o *Output) copyPlugin(parent *params.PluginStore, sl plugins.SecretLoader) *params.PluginStore {
if o.Copy.CopyMode != nil {
parent.InsertPairs("copy_mode", fmt.Sprint(*o.Copy.CopyMode))
}
return parent
}

var _ plugins.Plugin = &Output{}
1 change: 1 addition & 0 deletions apis/fluentd/v1alpha1/plugins/params/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const (
LokiOutputType OutputType = "loki"
CloudWatchOutputType OutputType = "cloudwatch_logs"
DatadogOutputType OutputType = "datadog"
CopyOutputType OutputType = "copy"
)

var (
Expand Down
5 changes: 4 additions & 1 deletion apis/fluentd/v1alpha1/plugins/params/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,12 @@ func (ps *PluginStore) InsertChilds(childs ...*PluginStore) {
// The total hash string for this plugin store
func (ps *PluginStore) Hash() string {
c := NewPluginStore(ps.Name)
isNotCopyOutput := ps.Store["@type"] != "copy"

// We must consider the tag when the output is a Copy one
// as copy is a "flag" output: it can exist identical outputs with different tag
for k, v := range ps.Store {
if k == "@id" || k == "tag" {
if k == "@id" || (k == "tag" && isNotCopyOutput) {
continue
}
c.Store[k] = v
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @a2170d34e9940ec56d328100e375c43e
<match>
namespaces default,kube-system
</match>
</route>
</match>
<label @a2170d34e9940ec56d328100e375c43e>
<filter **>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusterfilter::fluentd-filter-0
@type record_transformer
enable_ruby true
<record>
kubernetes_ns ${record["kubernetes"]["namespace_name"]}
</record>
</filter>
<match **>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-copy-stdout-and-loki-0
@type copy
copy_mode no_copy
<store>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-copy-stdout-and-loki-1
@type stdout
</store>
<store>
@id ClusterFluentdConfig-cluster-fluentd-config::cluster::clusteroutput::fluentd-output-copy-stdout-and-loki-2
@type loki
drop_single_key true
extra_labels {"key11":"value11","key12":"value12"}
extract_kubernetes_labels true
include_thread_label true
insecure_tls true
remove_keys key31,key32
url http://loki-logging-data.kubesphere-logging-system.svc:3100
<label>
key21 key21
key22 key22
</label>
</store>
</match>
</label>
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @2d9e59757d3bfc66d93c3bc44b408922
<match>
namespaces fluent
</match>
</route>
</match>
<label @2d9e59757d3bfc66d93c3bc44b408922>
<match **>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-1-0
@type copy
copy_mode no_copy
<store>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-1-1
@type elasticsearch
host elasticsearch-logging-data.kubesphere-logging-system.svc
index_name fluentd-mixed-copy-es-1
port 9243
scheme https
</store>
<store>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-1-2
@type elasticsearch
host elasticsearch-logging-data.kubesphere-logging-system.svc
index_name fluentd-mixed-copy-es-2
port 9243
scheme https
</store>
<store>
@id FluentdConfig-fluent-fluentd-config::cluster::clusteroutput::fluentd-output-loki-0
@type loki
drop_single_key true
extra_labels {"key11":"value11","key12":"value12"}
extract_kubernetes_labels true
include_thread_label true
insecure_tls true
remove_keys key31,key32
url http://loki-logging-data.kubesphere-logging-system.svc:3100
<label>
key21 key21
key22 key22
</label>
</store>
</match>
</label>
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @2d9e59757d3bfc66d93c3bc44b408922
<match>
namespaces fluent
</match>
</route>
</match>
<label @2d9e59757d3bfc66d93c3bc44b408922>
<match mixed2>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-2-0
@type copy
copy_mode no_copy
<store>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-2-1
@type elasticsearch
host elasticsearch-logging-data.kubesphere-logging-system.svc
index_name fluentd-mixed-copy-es-3
port 9243
scheme https
</store>
<store>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-2-2
@type elasticsearch
host elasticsearch-logging-data.kubesphere-logging-system.svc
index_name fluentd-mixed-copy-es-4
port 9243
scheme https
</store>
</match>
</label>
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<source>
@type forward
bind 0.0.0.0
port 24224
</source>
<match **>
@id main
@type label_router
<route>
@label @2d9e59757d3bfc66d93c3bc44b408922
<match>
namespaces fluent
</match>
</route>
</match>
<label @2d9e59757d3bfc66d93c3bc44b408922>
<match mixed3>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-3-0
@type elasticsearch
host elasticsearch-logging-data.kubesphere-logging-system.svc
index_name fluentd-mixed-copy-es-5
port 9243
scheme https
</match>
<match mixed3>
@id FluentdConfig-fluent-fluentd-config::fluent::output::fluentd-mixed-copy-es-3-1
@type elasticsearch
host elasticsearch-logging-data.kubesphere-logging-system.svc
index_name fluentd-mixed-copy-es-6
port 9243
scheme https
</match>
</label>
Loading