diff --git a/x-pack/filebeat/input/azureeventhub/README.md b/x-pack/filebeat/input/azureeventhub/README.md index 8f6bf5272d59..146f0cbf50ae 100644 --- a/x-pack/filebeat/input/azureeventhub/README.md +++ b/x-pack/filebeat/input/azureeventhub/README.md @@ -1,5 +1,170 @@ # azure-eventhub input plugin for Filebeat +## Invalid JSON sanitization + +A few Azure services produce invalid JSON logs. As of now, the following Azure services are known to produce +invalid logs: + +- Azure App Services +- Azure Functions +- PostreSQL Flexible Servers + +To deal with these logs, over time, we have implemented a sanitization mechanism that tries to fix the invalid JSON or +remove the invalid JSON from the log. + +Users can use the `sanitizers` configuration option to set up the input sanitization mechanism. If enabled, when the +input detects invalid JSON, the input will try to fix or remove it from the log. + +Existing sanitizers: + +- `new_lines`: removes new lines inside JSON strings. +- `single_quotes`: replaces single quotes with double quotes in JSON strings. +- `replace_all`: replaces all occurrences of a substring matching a regex pattern with a fixed literal string. + +### Sanitizers + +#### new_lines + +Here is an example of the `sanitizers` configuration option with the `new_lines` sanitizer: + +```yaml +- type: azure-eventhub + eventhub: "my-event-hub" + consumer_group: "$Default" + connection_string: "" + storage_account: "my-storage-account" + storage_account_container: "my-container" + storage_account_key: "" + + sanitizers: + - type: new_lines +``` + +With the previous configuration, the `new_lines` sanitizer removes all the new lines inside JSON strings. + +For example, if the diagnostic settings send the following message: + +```json +{ + "test":"this is 'some' message + ", + "time":"2019-12-17T13:43:44.4946995Z" +} +``` + +With the previous sample configuration, the input will replace the invalid JSON, updating the message to the following +version: + +```json +{ + "test":"this is 'some' message", + "time":"2019-12-17T13:43:44.4946995Z" +} +``` + +The `new_lines` sanitizer aims to restore JSON syntax validity. + +The new lines inside JSON strings were first spotted in the Azure App Services logs. To learn more, see the following +GitHub issue: + +- https://github.com/elastic/beats/issues/34092 + +#### double_quotes + +Here is an example of the `sanitizers` configuration option with the `double_quotes` sanitizer: + +```yaml +- type: azure-eventhub + eventhub: "my-event-hub" + consumer_group: "$Default" + connection_string: "" + storage_account: "my-storage-account" + storage_account_container: "my-container" + storage_account_key: "" + + sanitizers: + - type: double_quotes +``` + +With the previous configuration, the `double_quotes` sanitizer replaces all the single quotes used to delimit JSON +strings with double quotes. + +For example, if the diagnostic settings send the following message: + +```json +{ + "test":'this is a message', + "time":"2019-12-17T13:43:44.4946995Z" +} +``` + +With the previous sample configuration, the input will replace the invalid JSON, updating the message to the following +version: + +```json +{ + "test":"this is a message", + "time":"2019-12-17T13:43:44.4946995Z" +} +``` + +The `double_quotes` sanitizer aims to restore JSON syntax validity. + +Users first reported logs using single quotes to delimit JSON strings in the Azure Functions logs. To learn more, see +the following GitHub issue: + +- https://github.com/elastic/azuremarketplacedev/issues/190 + +#### replace_all + +Here is an example of the `sanitizers` configuration option with the `replace_all` sanitizer: + +```yaml +- type: azure-eventhub + eventhub: "my-event-hub" + consumer_group: "$Default" + connection_string: "" + storage_account: "my-storage-account" + storage_account_container: "my-container" + storage_account_key: "" + + sanitizers: + - type: replace_all + spec: + pattern: '\[\s*([^\[\]{},\s]+(?:\s+[^\[\]{},\s]+)*)\s*\]' + replacement: "{}" +``` + + +With the previous configuration, the `replace_all` sanitizer replaces all the occurrences of substring matching the +regex expression `pattern` with a fixed literal string `replacement`. + +For example, if the diagnostic settings send the following message: + +```json +{ + "AppImage": "orcas/postgres_standalone_16_u18:38.1.240825", + "AppType": "PostgreSQL", + "properties": [ + 218 B blob data + ] +} +``` + +With the previous sample configuration, the input will replace the invalid JSON, updating the message to the following +version: + +```json +{ + "AppImage": "orcas/postgres_standalone_16_u18:38.1.240825", + "AppType": "PostgreSQL", + "properties": {} +} +``` + +The `replace_all` sanitizer aims to restore JSON syntax validity by replacing invalid, unfixable JSON with literal +values. + ## Test Scenarios Test event: @@ -224,7 +389,7 @@ I see we have the following folder: filebeat-activitylogs-zmoog-0005 / mbranca-general.servicebus.windows.net / eventhubsdkupgrade / $Default / checkpoint ``` -The folder containts four blobs `0`, `1`, `2`, and `3` +The folder containts four blobs `0`, `1`, `2`, and `3` The metadata of blobs `0`, `2`, and `3`: @@ -306,7 +471,7 @@ sending batch of 20 events batch sent successfully ``` -#### Check that the 100 events are processed +#### Check that the 100 events are processed I see the `filebeat-8.15.0` contains 100 events. @@ -322,7 +487,7 @@ Here are the current sequence numbers: | 2 | 19 | 66880 | | 3 | 0 | -1 | -Of the 100 events published, +Of the 100 events published, - 40 landed on partition 0 (0 > 39) - 40 landed on partition 1 (9 > 49) @@ -404,7 +569,7 @@ Using the following configuration for all inputs: ``` - Started input 1 -- Input 1 is running and processing events +- Input 1 is running and processing events ```shell $ pbpaste | grep '^{' | jq -r 'select(."log.logger" == "input.azure-eventhub") | [."@timestamp",."log.level",."log.logger",.message,.partition,.count//0,.acked//0,.error.message//"na",.error] | @tsv' | sort @@ -475,7 +640,7 @@ batch sent successfully ``` -#### Check that the 100 events are processed +#### Check that the 100 events are processed I see the `filebeat-8.15.0` contains 100 events. @@ -502,10 +667,10 @@ After | 2 | 59 | 66880 | | 3 | 39 | 137280 | -Of the 100 events published, +Of the 100 events published, - 20 landed on partition 0 (39 > 59) -- 0 landed on partition 1 +- 0 landed on partition 1 - 40 landed on partition 2 (19 > 59) - 40 landed on partition 3 (0 > 39) @@ -622,7 +787,7 @@ Using the following configuration: start_position: "earliest" ``` -Important: set the `cloud.id` with a deleted deployment, or set `cloud.auth` with invalid credentials. +Important: set the `cloud.id` with a deleted deployment, or set `cloud.auth` with invalid credentials. ```shell ./filebeat -e -v -d * \ @@ -715,7 +880,7 @@ I see the `.monitoring.metrics.filebeat.events.active` and `.monitoring.metrics. #### Check that after fixing the problem the input successfully processed the 10 events -- Update `cloud.auth` with valid credentials +- Update `cloud.auth` with valid credentials - restart the input After restarting the input, here are the input metrics: @@ -755,7 +920,7 @@ Current checkpoint info are: | 3 | 39 | 137280 | -Of the 10 events published, +Of the 10 events published, - 0 landed on partition 0 - 10 landed on partition 1 (49 > 59) diff --git a/x-pack/filebeat/input/azureeventhub/config.go b/x-pack/filebeat/input/azureeventhub/config.go index 4aca4f247aa5..752141ce3e16 100644 --- a/x-pack/filebeat/input/azureeventhub/config.go +++ b/x-pack/filebeat/input/azureeventhub/config.go @@ -36,8 +36,36 @@ type azureInputConfig struct { SAContainer string `config:"storage_account_container"` // by default the azure public environment is used, to override, users can provide a specific resource manager endpoint OverrideEnvironment string `config:"resource_manager_endpoint"` - // cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES - SanitizeOptions []string `config:"sanitize_options"` + // LegacySanitizeOptions is a list of sanitization options to apply to messages. + // + // The supported options are: + // + // * NEW_LINES: replaces new lines with spaces + // * SINGLE_QUOTES: replaces single quotes with double quotes + // + // IMPORTANT: Users should use the `sanitizers` configuration option + // instead. + // + // Instead of using the `sanitize_options` configuration option: + // + // sanitize_options: + // - NEW_LINES + // - SINGLE_QUOTES + // + // use the `sanitizers` configuration option: + // + // sanitizers: + // - type: new_lines + // - type: single_quotes + // + // The `sanitize_options` option is deprecated and will be + // removed in 9.0 release. + // + // Default is an empty list (no sanitization). + LegacySanitizeOptions []string `config:"sanitize_options"` + // Sanitizers is a list of sanitizers to apply to messages that + // contain invalid JSON. + Sanitizers []SanitizerSpec `config:"sanitizers"` // MigrateCheckpoint controls if the input should perform the checkpoint information // migration from v1 to v2 (processor v2 only). Default is false. MigrateCheckpoint bool `config:"migrate_checkpoint"` @@ -82,7 +110,7 @@ func defaultConfig() azureInputConfig { PartitionReceiveTimeout: 5 * time.Second, PartitionReceiveCount: 100, // Default - SanitizeOptions: []string{}, + LegacySanitizeOptions: []string{}, } } @@ -121,7 +149,8 @@ func (conf *azureInputConfig) Validate() error { } // log a warning for each sanitization option not supported - for _, opt := range conf.SanitizeOptions { + for _, opt := range conf.LegacySanitizeOptions { + logger.Warnw("legacy sanitization `sanitize_options` options are deprecated and will be removed in the 9.0 release; use the `sanitizers` option instead", "option", opt) err := sanitizeOptionsValidate(opt) if err != nil { logger.Warnf("%s: %v", opt, err) diff --git a/x-pack/filebeat/input/azureeventhub/decoder.go b/x-pack/filebeat/input/azureeventhub/decoder.go index 49f6ca3f6480..55c9f3a88607 100644 --- a/x-pack/filebeat/input/azureeventhub/decoder.go +++ b/x-pack/filebeat/input/azureeventhub/decoder.go @@ -14,9 +14,10 @@ import ( ) type messageDecoder struct { - config azureInputConfig - log *logp.Logger - metrics *inputMetrics + config azureInputConfig + log *logp.Logger + metrics *inputMetrics + sanitizers []Sanitizer } // Decode splits the message into multiple ones based on @@ -61,9 +62,17 @@ func (u *messageDecoder) Decode(bMessage []byte) []string { // Sanitization occurs if options are available and the message contains an invalid JSON. // // [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps - if len(u.config.SanitizeOptions) != 0 && !json.Valid(bMessage) { - bMessage = sanitize(bMessage, u.config.SanitizeOptions...) - u.metrics.sanitizedMessages.Inc() + if !json.Valid(bMessage) { + // Count messages containing invalid JSON + u.metrics.invalidJSONMessages.Inc() + + if len(u.sanitizers) > 0 { + for _, sanitizer := range u.sanitizers { + bMessage = sanitizer.Sanitize(bMessage) + } + + u.metrics.sanitizedMessages.Inc() + } } // check if the message is a "records" object containing a list of events diff --git a/x-pack/filebeat/input/azureeventhub/decoder_test.go b/x-pack/filebeat/input/azureeventhub/decoder_test.go index 1142fe2fbef8..af170828b69a 100644 --- a/x-pack/filebeat/input/azureeventhub/decoder_test.go +++ b/x-pack/filebeat/input/azureeventhub/decoder_test.go @@ -11,6 +11,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -90,16 +91,20 @@ func TestDecodeRecords(t *testing.T) { func TestDecodeRecordsWithSanitization(t *testing.T) { config := defaultConfig() - config.SanitizeOptions = []string{"SINGLE_QUOTES", "NEW_LINES"} + config.LegacySanitizeOptions = []string{"SINGLE_QUOTES", "NEW_LINES"} log := logp.NewLogger(fmt.Sprintf("%s test for input", inputName)) reg := monitoring.NewRegistry() metrics := newInputMetrics("test", reg) defer metrics.Close() + sanitizers, err := newSanitizers(config.Sanitizers, config.LegacySanitizeOptions) + require.NoError(t, err) + decoder := messageDecoder{ - config: config, - log: log, - metrics: metrics, + config: config, + log: log, + metrics: metrics, + sanitizers: sanitizers, } msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," + diff --git a/x-pack/filebeat/input/azureeventhub/metrics.go b/x-pack/filebeat/input/azureeventhub/metrics.go index e0a8eeb08efb..8671771fcdda 100644 --- a/x-pack/filebeat/input/azureeventhub/metrics.go +++ b/x-pack/filebeat/input/azureeventhub/metrics.go @@ -21,10 +21,11 @@ func newInputMetrics(id string, parentRegistry *monitoring.Registry) *inputMetri unregister: unregister, // Messages - receivedMessages: monitoring.NewUint(reg, "received_messages_total"), - receivedBytes: monitoring.NewUint(reg, "received_bytes_total"), - sanitizedMessages: monitoring.NewUint(reg, "sanitized_messages_total"), - processedMessages: monitoring.NewUint(reg, "processed_messages_total"), + receivedMessages: monitoring.NewUint(reg, "received_messages_total"), + receivedBytes: monitoring.NewUint(reg, "received_bytes_total"), + invalidJSONMessages: monitoring.NewUint(reg, "invalid_json_messages_total"), + sanitizedMessages: monitoring.NewUint(reg, "sanitized_messages_total"), + processedMessages: monitoring.NewUint(reg, "processed_messages_total"), // Events receivedEvents: monitoring.NewUint(reg, "received_events_total"), @@ -73,10 +74,11 @@ type inputMetrics struct { unregister func() // Messages - receivedMessages *monitoring.Uint // receivedMessages tracks the number of messages received from eventhub. - receivedBytes *monitoring.Uint // receivedBytes tracks the number of bytes received from eventhub. - sanitizedMessages *monitoring.Uint // sanitizedMessages tracks the number of messages that were sanitized successfully. - processedMessages *monitoring.Uint // processedMessages tracks the number of messages that were processed successfully. + receivedMessages *monitoring.Uint // receivedMessages tracks the number of messages received from eventhub. + receivedBytes *monitoring.Uint // receivedBytes tracks the number of bytes received from eventhub. + invalidJSONMessages *monitoring.Uint // invalidJSONMessages tracks the number of messages containing invalid JSON. + sanitizedMessages *monitoring.Uint // sanitizedMessages tracks the number of messages containing invalid JSON that were sanitized. + processedMessages *monitoring.Uint // processedMessages tracks the number of messages that were processed successfully. // Events receivedEvents *monitoring.Uint // receivedEvents tracks the number of events received decoding messages. diff --git a/x-pack/filebeat/input/azureeventhub/metrics_test.go b/x-pack/filebeat/input/azureeventhub/metrics_test.go index fbab4e1e1229..e9071b73b354 100644 --- a/x-pack/filebeat/input/azureeventhub/metrics_test.go +++ b/x-pack/filebeat/input/azureeventhub/metrics_test.go @@ -14,6 +14,7 @@ import ( eventhub "github.com/Azure/azure-event-hubs-go/v3" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/monitoring" @@ -44,25 +45,27 @@ func TestInputMetricsEventsReceived(t *testing.T) { expectedRecords []string sanitizationOption []string // Expected results - receivedMessages uint64 - sanitizedMessages uint64 - processedMessages uint64 - receivedEvents uint64 - sentEvents uint64 - processingTime uint64 - decodeErrors uint64 - processorRestarts uint64 + receivedMessages uint64 + invalidJSONMessages uint64 + sanitizedMessages uint64 + processedMessages uint64 + receivedEvents uint64 + sentEvents uint64 + processingTime uint64 + decodeErrors uint64 + processorRestarts uint64 }{ { - event: []byte("{\"records\": [{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}"), - expectedRecords: []string{"{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"}, - receivedMessages: 1, - sanitizedMessages: 0, - processedMessages: 1, - receivedEvents: 1, - sentEvents: 1, - decodeErrors: 0, - processorRestarts: 0, + event: []byte("{\"records\": [{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}"), + expectedRecords: []string{"{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"}, + receivedMessages: 1, + invalidJSONMessages: 0, + sanitizedMessages: 0, + processedMessages: 1, + receivedEvents: 1, + sentEvents: 1, + decodeErrors: 0, + processorRestarts: 0, }, { event: []byte("{\"records\": [{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}, {\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}]}"), @@ -70,53 +73,56 @@ func TestInputMetricsEventsReceived(t *testing.T) { "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", }, - receivedMessages: 1, - sanitizedMessages: 0, - processedMessages: 1, - receivedEvents: 2, - sentEvents: 2, - decodeErrors: 0, - processorRestarts: 0, + receivedMessages: 1, + invalidJSONMessages: 0, + sanitizedMessages: 0, + processedMessages: 1, + receivedEvents: 2, + sentEvents: 2, + decodeErrors: 0, + processorRestarts: 0, }, { event: []byte("{\"records\": [{'test':'this is some message','time':'2019-12-17T13:43:44.4946995Z'}]}"), // Thank you, Azure Functions logs. expectedRecords: []string{ "{\"test\":\"this is some message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}", }, - sanitizationOption: []string{"SINGLE_QUOTES"}, - receivedMessages: 1, - sanitizedMessages: 1, - processedMessages: 1, - receivedEvents: 1, - sentEvents: 1, - decodeErrors: 0, - processorRestarts: 0, + sanitizationOption: []string{"SINGLE_QUOTES"}, + receivedMessages: 1, + invalidJSONMessages: 1, + sanitizedMessages: 1, + processedMessages: 1, + receivedEvents: 1, + sentEvents: 1, + decodeErrors: 0, + processorRestarts: 0, }, { event: []byte("{\"records\": [{'test':'this is some message','time':'2019-12-17T13:43:44.4946995Z'}]}"), expectedRecords: []string{ "{\"records\": [{'test':'this is some message','time':'2019-12-17T13:43:44.4946995Z'}]}", }, - sanitizationOption: []string{}, // no sanitization options - receivedMessages: 1, - sanitizedMessages: 0, // Since we have no sanitization options, we don't try to sanitize. - processedMessages: 1, - decodeErrors: 1, - receivedEvents: 0, // If we can't decode the message, we can't count the events in it. - sentEvents: 1, // The input sends the unmodified message as a string to the outlet. - processorRestarts: 0, + sanitizationOption: []string{}, // no sanitization options + receivedMessages: 1, + invalidJSONMessages: 1, + sanitizedMessages: 0, // Since we have no sanitization options, we don't try to sanitize. + processedMessages: 1, + decodeErrors: 1, + receivedEvents: 0, // If we can't decode the message, we can't count the events in it. + sentEvents: 1, // The input sends the unmodified message as a string to the outlet. + processorRestarts: 0, }, } for _, tc := range cases { inputConfig := azureInputConfig{ - SAKey: "", - SAName: "", - SAContainer: ephContainerName, - ConnectionString: "", - ConsumerGroup: "", - SanitizeOptions: tc.sanitizationOption, + SAKey: "", + SAName: "", + SAContainer: ephContainerName, + ConnectionString: "", + ConsumerGroup: "", + LegacySanitizeOptions: tc.sanitizationOption, } reg := monitoring.NewRegistry() @@ -124,15 +130,19 @@ func TestInputMetricsEventsReceived(t *testing.T) { fakeClient := fakeClient{} + sanitizers, err := newSanitizers(inputConfig.Sanitizers, inputConfig.LegacySanitizeOptions) + require.NoError(t, err) + input := eventHubInputV1{ config: inputConfig, metrics: metrics, pipelineClient: &fakeClient, log: log, messageDecoder: messageDecoder{ - config: inputConfig, - metrics: metrics, - log: log, + config: inputConfig, + metrics: metrics, + log: log, + sanitizers: sanitizers, }, } @@ -158,6 +168,7 @@ func TestInputMetricsEventsReceived(t *testing.T) { // Messages assert.Equal(t, tc.receivedMessages, metrics.receivedMessages.Get()) assert.Equal(t, uint64(len(tc.event)), metrics.receivedBytes.Get()) + assert.Equal(t, tc.invalidJSONMessages, metrics.invalidJSONMessages.Get()) assert.Equal(t, tc.sanitizedMessages, metrics.sanitizedMessages.Get()) assert.Equal(t, tc.processedMessages, metrics.processedMessages.Get()) diff --git a/x-pack/filebeat/input/azureeventhub/sanitization.go b/x-pack/filebeat/input/azureeventhub/sanitization.go deleted file mode 100644 index 537d29951c9f..000000000000 --- a/x-pack/filebeat/input/azureeventhub/sanitization.go +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build !aix -// +build !aix - -package azureeventhub - -import ( - "bytes" - "errors" -) - -type sanitizationOption string - -const ( - newLines sanitizationOption = "NEW_LINES" - singleQuotes sanitizationOption = "SINGLE_QUOTES" -) - -// sanitizeOptionsValidate validates for supported sanitization options -func sanitizeOptionsValidate(s string) error { - switch s { - case "NEW_LINES": - return nil - case "SINGLE_QUOTES": - return nil - default: - return errors.New("invalid sanitization option") - } -} - -// sanitize applies the sanitization options specified in the config -// if no sanitization options are provided, the message remains unchanged -func sanitize(jsonByte []byte, opts ...string) []byte { - res := jsonByte - - for _, opt := range opts { - switch sanitizationOption(opt) { - case newLines: - res = sanitizeNewLines(res) - case singleQuotes: - res = sanitizeSingleQuotes(res) - } - } - - return res -} - -// sanitizeNewLines removes newlines found in the message -func sanitizeNewLines(jsonByte []byte) []byte { - return bytes.ReplaceAll(jsonByte, []byte("\n"), []byte{}) -} - -// sanitizeSingleQuotes replaces single quotes with double quotes in the message -// single quotes that are in between double quotes remain unchanged -func sanitizeSingleQuotes(jsonByte []byte) []byte { - var result bytes.Buffer - var prevChar byte - - inDoubleQuotes := false - - for _, r := range jsonByte { - if r == '"' && prevChar != '\\' { - inDoubleQuotes = !inDoubleQuotes - } - - if r == '\'' && !inDoubleQuotes { - result.WriteRune('"') - } else { - result.WriteByte(r) - } - prevChar = r - } - - return result.Bytes() -} diff --git a/x-pack/filebeat/input/azureeventhub/sanitization_test.go b/x-pack/filebeat/input/azureeventhub/sanitization_test.go deleted file mode 100644 index 3ad8928cdc3c..000000000000 --- a/x-pack/filebeat/input/azureeventhub/sanitization_test.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build !aix - -package azureeventhub - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestSanitize(t *testing.T) { - jsonByte := []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}") - - testCases := []struct { - name string - opts []string - expected []byte - }{ - { - name: "no options", - opts: []string{}, - expected: jsonByte, - }, - { - name: "NEW_LINES option", - opts: []string{"NEW_LINES"}, - expected: []byte("{'test':\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), - }, - { - name: "SINGLE_QUOTES option", - opts: []string{"SINGLE_QUOTES"}, - expected: []byte("{\"test\":\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), - }, - { - name: "both options", - opts: []string{"NEW_LINES", "SINGLE_QUOTES"}, - expected: []byte("{\"test\":\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), - }, - } - - // Run test cases - for _, tc := range testCases { - tc := tc - t.Run(tc.name, func(t *testing.T) { - res := sanitize(jsonByte, tc.opts...) - assert.Equal(t, tc.expected, res) - }) - } -} diff --git a/x-pack/filebeat/input/azureeventhub/sanitizers.go b/x-pack/filebeat/input/azureeventhub/sanitizers.go new file mode 100644 index 000000000000..24f7e79df5ce --- /dev/null +++ b/x-pack/filebeat/input/azureeventhub/sanitizers.go @@ -0,0 +1,216 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !aix + +package azureeventhub + +import ( + "bytes" + "errors" + "fmt" + "regexp" +) + +const ( + SanitizerNewLines = "new_lines" + SanitizerSingleQuotes = "single_quotes" + SanitizerReplaceAll = "replace_all" +) + +// ---------------------------------------------------------------------------- +// Sanitizer API +// ---------------------------------------------------------------------------- + +// SanitizerSpec defines a sanitizer configuration. +// +// Sanitizers can use the spec field to provide additional +// configuration. +type SanitizerSpec struct { + Type string `config:"type"` + Spec map[string]interface{} `config:"spec"` +} + +// Sanitizer defines the interface for sanitizing JSON data. +// +// Implementing `Init` is optional. If implemented, it should +// be used to initialize the sanitizer with the provided spec. +type Sanitizer interface { + Sanitize(jsonByte []byte) []byte + Init() error +} + +// ---------------------------------------------------------------------------- +// Convenience builder functions +// ---------------------------------------------------------------------------- + +// newSanitizer creates a new sanitizer based on the provided spec. +func newSanitizer(spec SanitizerSpec) (Sanitizer, error) { + var s Sanitizer + + switch spec.Type { + case SanitizerNewLines: + s = &newLinesSanitizer{} + case SanitizerSingleQuotes: + s = &singleQuotesSanitizer{} + case SanitizerReplaceAll: + s = &replaceAllSanitizer{spec: spec.Spec} + default: + return nil, fmt.Errorf("unknown sanitizer type: %s", spec.Type) + } + + // Initialize the sanitizer with the provided spec. + err := s.Init() + if err != nil { + return nil, fmt.Errorf("failed to initialize sanitizer '%s': %w", spec.Type, err) + } + + return s, nil +} + +// newSanitizers creates a list of sanitizers based on the provided specs. +// +// The legacySanitizerOptions are used to add legacy sanitizers to the list. +// `legacySanitizerOptions` should be a list of strings representing the +// legacy sanitization options (it only support the two original legacy +// options: "NEW_LINES", "SINGLE_QUOTES"). +func newSanitizers(specs []SanitizerSpec, legacySanitizerOptions []string) ([]Sanitizer, error) { + var sanitizers []Sanitizer + + // Add new sanitizers + for _, spec := range specs { + sanitizer, err := newSanitizer(spec) + if err != nil { + return nil, fmt.Errorf("failed to build sanitizer: %w", err) + } + + sanitizers = append(sanitizers, sanitizer) + } + + // Add legacy sanitizers + for _, opt := range legacySanitizerOptions { + // legacy sanitizer don't need to be initialized + switch sanitizationOption(opt) { + case newLines: + sanitizers = append(sanitizers, &newLinesSanitizer{}) + case singleQuotes: + sanitizers = append(sanitizers, &singleQuotesSanitizer{}) + } + } + + return sanitizers, nil +} + +// ---------------------------------------------------------------------------- +// New line sanitizer +// ---------------------------------------------------------------------------- + +// newLinesSanitizer replaces new lines with spaces. +// +// This sanitizer is used to remove new lines inside JSON strings. +type newLinesSanitizer struct{} + +func (s *newLinesSanitizer) Sanitize(jsonByte []byte) []byte { + return bytes.ReplaceAll(jsonByte, []byte("\n"), []byte{}) +} + +// Init is a no-op for the newLinesSanitizer. +func (s *newLinesSanitizer) Init() error { + return nil +} + +// ---------------------------------------------------------------------------- +// Single quote sanitizer +// ---------------------------------------------------------------------------- + +// singleQuotesSanitizer replaces single quotes with double quotes. +type singleQuotesSanitizer struct{} + +func (s *singleQuotesSanitizer) Sanitize(jsonByte []byte) []byte { + var result bytes.Buffer + var prevChar byte + + inDoubleQuotes := false + + for _, r := range jsonByte { + if r == '"' && prevChar != '\\' { + inDoubleQuotes = !inDoubleQuotes + } + + if r == '\'' && !inDoubleQuotes { + result.WriteRune('"') + } else { + result.WriteByte(r) + } + prevChar = r + } + + return result.Bytes() +} + +// Init is a no-op for the singleQuotesSanitizer. +func (s *singleQuotesSanitizer) Init() error { + return nil +} + +// ---------------------------------------------------------------------------- +// Replace all sanitizer +// ---------------------------------------------------------------------------- + +// replaceAllSanitizer replaces all occurrences of a regex pattern with a replacement string. +type replaceAllSanitizer struct { + re *regexp.Regexp + replacement string + spec map[string]interface{} +} + +func (s *replaceAllSanitizer) Sanitize(jsonByte []byte) []byte { + if s.re == nil { + return jsonByte + } + + return s.re.ReplaceAll(jsonByte, []byte(s.replacement)) +} + +func (s *replaceAllSanitizer) Init() error { + if s.spec == nil { + return errors.New("missing required sanitizer spec") + } + + pattern, err := getStringFromSpec(s.spec, "pattern") + if err != nil { + return fmt.Errorf("invalid pattern: %w", err) + } + + re, err := regexp.Compile(pattern) + if err != nil { + return fmt.Errorf("can't compile regex pattern: %w", err) + } + s.re = re + + replacement, err := getStringFromSpec(s.spec, "replacement") + if err != nil { + return fmt.Errorf("invalid replacement: %w", err) + } + s.replacement = replacement + + return nil +} + +// getStringFromSpec returns a string from the spec map. +// +// It returns an error if the spec entry key is missing or the value is not a string. +func getStringFromSpec(spec map[string]interface{}, entryKey string) (string, error) { + value, ok := spec[entryKey] + if !ok { + return "", fmt.Errorf("missing sanitizer spec entry: %s", entryKey) + } + + strValue, ok := value.(string) + if !ok { + return "", fmt.Errorf("sanitizer spec entry %s must be a string", entryKey) + } + + return strValue, nil +} diff --git a/x-pack/filebeat/input/azureeventhub/sanitizers_legacy.go b/x-pack/filebeat/input/azureeventhub/sanitizers_legacy.go new file mode 100644 index 000000000000..822da64931b2 --- /dev/null +++ b/x-pack/filebeat/input/azureeventhub/sanitizers_legacy.go @@ -0,0 +1,43 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !aix + +package azureeventhub + +import "errors" + +// This file supports the legacy sanitization options for the Azure Event Hub input. +// +// The legacy offered two sanitization options using the `sanitize_options` +// configuration option: +// +// - NEW_LINES: replaces new lines with spaces +// - SINGLE_QUOTES: replaces single quotes with double quotes +// +// The legacy `sanitize_options` is deprecated and will be removed in the +// 9.0 release. +// Users should use the `sanitizers` configuration option instead. +// +// However, the current sanitization implementation honors the legacy sanitization +// options and applies them to the sanitizers configuration. + +type sanitizationOption string + +const ( + newLines sanitizationOption = "NEW_LINES" + singleQuotes sanitizationOption = "SINGLE_QUOTES" +) + +// sanitizeOptionsValidate validates for supported sanitization options +func sanitizeOptionsValidate(s string) error { + switch s { + case "NEW_LINES": + return nil + case "SINGLE_QUOTES": + return nil + default: + return errors.New("invalid sanitization option") + } +} diff --git a/x-pack/filebeat/input/azureeventhub/sanitizers_test.go b/x-pack/filebeat/input/azureeventhub/sanitizers_test.go new file mode 100644 index 000000000000..aede8a8c261b --- /dev/null +++ b/x-pack/filebeat/input/azureeventhub/sanitizers_test.go @@ -0,0 +1,177 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build !aix + +package azureeventhub + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSanitizersSanitize(t *testing.T) { + + // Set up some sanitizers + nlSanitizer, err := newSanitizer(SanitizerSpec{ + Type: SanitizerNewLines, + }) + require.NoError(t, err) + + sqSanitizer, err := newSanitizer(SanitizerSpec{ + Type: SanitizerSingleQuotes, + }) + require.NoError(t, err) + + raSanitizer, err := newSanitizer(SanitizerSpec{ + Type: SanitizerReplaceAll, + Spec: map[string]interface{}{ + "pattern": `\[\s*([^\[\]{},\s]+(?:\s+[^\[\]{},\s]+)*)\s*\]`, + "replacement": "{}", + }, + }) + require.NoError(t, err) + + testCases := []struct { + name string + sanitizers []Sanitizer + original []byte + expected []byte + }{ + { + name: "NEW_LINES option", + sanitizers: []Sanitizer{nlSanitizer}, + original: []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + expected: []byte("{'test':\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + }, + { + name: "SINGLE_QUOTES option", + sanitizers: []Sanitizer{sqSanitizer}, + original: []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + expected: []byte("{\"test\":\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + }, + { + name: "NEW_LINES + SINGLE_QUOTES option", + sanitizers: []Sanitizer{nlSanitizer, sqSanitizer}, + original: []byte("{'test':\"this is 'some' message\n\",\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + expected: []byte("{\"test\":\"this is 'some' message\",\"time\":\"2019-12-17T13:43:44.4946995Z\"}"), + }, + { + name: "Replace all option", + sanitizers: []Sanitizer{raSanitizer}, + original: []byte(` + { + "AppImage": "orcas/postgres_standalone_16_u18:38.1.240825", + "AppType": "PostgreSQL", + "AppVersion": "breadthpg16_2024-08-06-07-22-43", + "Region": "westeurope", + "category": "PostgreSQLLogs", + "location": "westeurope", + "operationName": "LogEvent", + "properties": [ + 218 B blob data + ], + "resourceId": "/SUBSCRIPTIONS/88D1708E-CBC3-4799-9C16-C5BB5F57A0C3/RESOURCEGROUPS/CKAUF-AZURE-OBS/PROVIDERS/MICROSOFT.DBFORPOSTGRESQL/FLEXIBLESERVERS/CHRIS-PG-TEST", + "time": "2024-08-27T14:26:08.629Z", + "ServerType": "PostgreSQL", + "LogicalServerName": "chris-pg-test", + "ServerVersion": "breadthpg16_2024-08-06-07-22-43", + "ServerLocation": "prod:westeurope", + "ReplicaRole": "Primary", + "OriginalPrimaryServerName": "chris-pg-test" + }`), + expected: []byte(` + { + "AppImage": "orcas/postgres_standalone_16_u18:38.1.240825", + "AppType": "PostgreSQL", + "AppVersion": "breadthpg16_2024-08-06-07-22-43", + "Region": "westeurope", + "category": "PostgreSQLLogs", + "location": "westeurope", + "operationName": "LogEvent", + "properties": {}, + "resourceId": "/SUBSCRIPTIONS/88D1708E-CBC3-4799-9C16-C5BB5F57A0C3/RESOURCEGROUPS/CKAUF-AZURE-OBS/PROVIDERS/MICROSOFT.DBFORPOSTGRESQL/FLEXIBLESERVERS/CHRIS-PG-TEST", + "time": "2024-08-27T14:26:08.629Z", + "ServerType": "PostgreSQL", + "LogicalServerName": "chris-pg-test", + "ServerVersion": "breadthpg16_2024-08-06-07-22-43", + "ServerLocation": "prod:westeurope", + "ReplicaRole": "Primary", + "OriginalPrimaryServerName": "chris-pg-test" + }`), + }, + } + + // Run test cases + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + res := tc.original + + for _, sanitizer := range tc.sanitizers { + res = sanitizer.Sanitize(res) + } + + // using string(..) for readability + assert.Equal(t, string(tc.expected), string(res)) + }) + } +} + +func TestSanitizersInit(t *testing.T) { + + t.Run("Support legacy sanitizers", func(t *testing.T) { + legacySanitizers := []string{"NEW_LINES", "SINGLE_QUOTES"} + + sanitizers, err := newSanitizers([]SanitizerSpec{}, legacySanitizers) + require.NoError(t, err) + + require.Len(t, sanitizers, 2) + + // Check the struct types + assert.IsType(t, &newLinesSanitizer{}, sanitizers[0]) + assert.IsType(t, &singleQuotesSanitizer{}, sanitizers[1]) + }) + + t.Run("Support new sanitizers", func(t *testing.T) { + raSanitizerSpec := SanitizerSpec{ + Type: SanitizerReplaceAll, + Spec: map[string]any{ + "pattern": `\[\s*([^\[\]{},\s]+(?:\s+[^\[\]{},\s]+)*)\s*\]`, + "replacement": "{}", + }, + } + + sanitizers, err := newSanitizers([]SanitizerSpec{raSanitizerSpec}, nil) + require.NoError(t, err) + + require.Len(t, sanitizers, 1) + + // Check the struct types + assert.IsType(t, &replaceAllSanitizer{}, sanitizers[0]) + }) + + t.Run("Support legacy and new sanitizer together", func(t *testing.T) { + raSanitizerSpec := SanitizerSpec{ + Type: SanitizerReplaceAll, + Spec: map[string]any{ + "pattern": `\[\s*([^\[\]{},\s]+(?:\s+[^\[\]{},\s]+)*)\s*\]`, + "replacement": "{}", + }, + } + + legacySanitizers := []string{"NEW_LINES", "SINGLE_QUOTES"} + + sanitizers, err := newSanitizers([]SanitizerSpec{raSanitizerSpec}, legacySanitizers) + require.NoError(t, err) + + require.Len(t, sanitizers, 3) + + // Check the struct types + assert.IsType(t, &replaceAllSanitizer{}, sanitizers[0]) + assert.IsType(t, &newLinesSanitizer{}, sanitizers[1]) + assert.IsType(t, &singleQuotesSanitizer{}, sanitizers[2]) + }) +} diff --git a/x-pack/filebeat/input/azureeventhub/v1_input.go b/x-pack/filebeat/input/azureeventhub/v1_input.go index 3a6a21189c5e..4736bc3f15a3 100644 --- a/x-pack/filebeat/input/azureeventhub/v1_input.go +++ b/x-pack/filebeat/input/azureeventhub/v1_input.go @@ -79,10 +79,17 @@ func (in *eventHubInputV1) Run( in.metrics = newInputMetrics(inputContext.ID, nil) defer in.metrics.Close() + // Set up new and legacy sanitizers, if any. + sanitizers, err := newSanitizers(in.config.Sanitizers, in.config.LegacySanitizeOptions) + if err != nil { + return fmt.Errorf("failed to create sanitizers: %w", err) + } + in.messageDecoder = messageDecoder{ - config: in.config, - log: in.log, - metrics: in.metrics, + config: in.config, + log: in.log, + metrics: in.metrics, + sanitizers: sanitizers, } ctx := v2.GoContextFromCanceler(inputContext.Cancelation) diff --git a/x-pack/filebeat/input/azureeventhub/v2_input.go b/x-pack/filebeat/input/azureeventhub/v2_input.go index 4f3645f513f7..84755ff38843 100644 --- a/x-pack/filebeat/input/azureeventhub/v2_input.go +++ b/x-pack/filebeat/input/azureeventhub/v2_input.go @@ -108,13 +108,18 @@ func (in *eventHubInputV2) Run( // setup initializes the components needed to process events. func (in *eventHubInputV2) setup(ctx context.Context) error { + sanitizers, err := newSanitizers(in.config.Sanitizers, in.config.LegacySanitizeOptions) + if err != nil { + return fmt.Errorf("failed to create sanitizers: %w", err) + } // Decode the messages from event hub into // a `[]string`. in.messageDecoder = messageDecoder{ - config: in.config, - log: in.log, - metrics: in.metrics, + config: in.config, + log: in.log, + metrics: in.metrics, + sanitizers: sanitizers, } containerClient, err := container.NewClientFromConnectionString(