Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 175 additions & 10 deletions x-pack/filebeat/input/azureeventhub/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,170 @@
# azure-eventhub input plugin for Filebeat

## Invalid JSON sanitization

A few Azure services produce invalid JSON logs. As of now, the following Azure services are known to produce
invalid logs:

- Azure App Services
- Azure Functions
- PostreSQL Flexible Servers

To deal with these logs, over time, we have implemented a sanitization mechanism that tries to fix the invalid JSON or
remove the invalid JSON from the log.

Users can use the `sanitizers` configuration option to set up the input sanitization mechanism. If enabled, when the
input detects invalid JSON, the input will try to fix or remove it from the log.

Existing sanitizers:

- `new_lines`: removes new lines inside JSON strings.
- `single_quotes`: replaces single quotes with double quotes in JSON strings.
- `replace_all`: replaces all occurrences of a substring matching a regex pattern with a fixed literal string.

### Sanitizers

#### new_lines

Here is an example of the `sanitizers` configuration option with the `new_lines` sanitizer:

```yaml
- type: azure-eventhub
eventhub: "my-event-hub"
consumer_group: "$Default"
connection_string: "<redacted>"
storage_account: "my-storage-account"
storage_account_container: "my-container"
storage_account_key: "<redacted>"

sanitizers:
- type: new_lines
```

With the previous configuration, the `new_lines` sanitizer removes all the new lines inside JSON strings.

For example, if the diagnostic settings send the following message:

```json
{
"test":"this is 'some' message
",
"time":"2019-12-17T13:43:44.4946995Z"
}
```

With the previous sample configuration, the input will replace the invalid JSON, updating the message to the following
version:

```json
{
"test":"this is 'some' message",
"time":"2019-12-17T13:43:44.4946995Z"
}
```

The `new_lines` sanitizer aims to restore JSON syntax validity.

The new lines inside JSON strings were first spotted in the Azure App Services logs. To learn more, see the following
GitHub issue:

- https://github.com/elastic/beats/issues/34092

#### double_quotes

Here is an example of the `sanitizers` configuration option with the `double_quotes` sanitizer:

```yaml
- type: azure-eventhub
eventhub: "my-event-hub"
consumer_group: "$Default"
connection_string: "<redacted>"
storage_account: "my-storage-account"
storage_account_container: "my-container"
storage_account_key: "<redacted>"

sanitizers:
- type: double_quotes
```

With the previous configuration, the `double_quotes` sanitizer replaces all the single quotes used to delimit JSON
strings with double quotes.

For example, if the diagnostic settings send the following message:

```json
{
"test":'this is a message',
"time":"2019-12-17T13:43:44.4946995Z"
}
```

With the previous sample configuration, the input will replace the invalid JSON, updating the message to the following
version:

```json
{
"test":"this is a message",
"time":"2019-12-17T13:43:44.4946995Z"
}
```

The `double_quotes` sanitizer aims to restore JSON syntax validity.

Users first reported logs using single quotes to delimit JSON strings in the Azure Functions logs. To learn more, see
the following GitHub issue:

- https://github.com/elastic/azuremarketplacedev/issues/190

#### replace_all

Here is an example of the `sanitizers` configuration option with the `replace_all` sanitizer:

```yaml
- type: azure-eventhub
eventhub: "my-event-hub"
consumer_group: "$Default"
connection_string: "<redacted>"
storage_account: "my-storage-account"
storage_account_container: "my-container"
storage_account_key: "<redacted>"

sanitizers:
- type: replace_all
spec:
pattern: '\[\s*([^\[\]{},\s]+(?:\s+[^\[\]{},\s]+)*)\s*\]'
replacement: "{}"
```


With the previous configuration, the `replace_all` sanitizer replaces all the occurrences of substring matching the
regex expression `pattern` with a fixed literal string `replacement`.

For example, if the diagnostic settings send the following message:

```json
{
"AppImage": "orcas/postgres_standalone_16_u18:38.1.240825",
"AppType": "PostgreSQL",
"properties": [
218 B blob data
]
}
```

With the previous sample configuration, the input will replace the invalid JSON, updating the message to the following
version:

```json
{
"AppImage": "orcas/postgres_standalone_16_u18:38.1.240825",
"AppType": "PostgreSQL",
"properties": {}
}
```

The `replace_all` sanitizer aims to restore JSON syntax validity by replacing invalid, unfixable JSON with literal
values.

## Test Scenarios

Test event:
Expand Down Expand Up @@ -224,7 +389,7 @@ I see we have the following folder:
filebeat-activitylogs-zmoog-0005 / mbranca-general.servicebus.windows.net / eventhubsdkupgrade / $Default / checkpoint
```

The folder containts four blobs `0`, `1`, `2`, and `3`
The folder containts four blobs `0`, `1`, `2`, and `3`

The metadata of blobs `0`, `2`, and `3`:

Expand Down Expand Up @@ -306,7 +471,7 @@ sending batch of 20 events
batch sent successfully
```

#### Check that the 100 events are processed
#### Check that the 100 events are processed

I see the `filebeat-8.15.0` contains 100 events.

Expand All @@ -322,7 +487,7 @@ Here are the current sequence numbers:
| 2 | 19 | 66880 |
| 3 | 0 | -1 |

Of the 100 events published,
Of the 100 events published,

- 40 landed on partition 0 (0 > 39)
- 40 landed on partition 1 (9 > 49)
Expand Down Expand Up @@ -404,7 +569,7 @@ Using the following configuration for all inputs:
```

- Started input 1
- Input 1 is running and processing events
- Input 1 is running and processing events

```shell
$ pbpaste | grep '^{' | jq -r 'select(."log.logger" == "input.azure-eventhub") | [."@timestamp",."log.level",."log.logger",.message,.partition,.count//0,.acked//0,.error.message//"na",.error] | @tsv' | sort
Expand Down Expand Up @@ -475,7 +640,7 @@ batch sent successfully
```


#### Check that the 100 events are processed
#### Check that the 100 events are processed

I see the `filebeat-8.15.0` contains 100 events.

Expand All @@ -502,10 +667,10 @@ After
| 2 | 59 | 66880 |
| 3 | 39 | 137280 |

Of the 100 events published,
Of the 100 events published,

- 20 landed on partition 0 (39 > 59)
- 0 landed on partition 1
- 0 landed on partition 1
- 40 landed on partition 2 (19 > 59)
- 40 landed on partition 3 (0 > 39)

Expand Down Expand Up @@ -622,7 +787,7 @@ Using the following configuration:
start_position: "earliest"
```

Important: set the `cloud.id` with a deleted deployment, or set `cloud.auth` with invalid credentials.
Important: set the `cloud.id` with a deleted deployment, or set `cloud.auth` with invalid credentials.

```shell
./filebeat -e -v -d * \
Expand Down Expand Up @@ -715,7 +880,7 @@ I see the `.monitoring.metrics.filebeat.events.active` and `.monitoring.metrics.

#### Check that after fixing the problem the input successfully processed the 10 events

- Update `cloud.auth` with valid credentials
- Update `cloud.auth` with valid credentials
- restart the input

After restarting the input, here are the input metrics:
Expand Down Expand Up @@ -755,7 +920,7 @@ Current checkpoint info are:
| 3 | 39 | 137280 |


Of the 10 events published,
Of the 10 events published,

- 0 landed on partition 0
- 10 landed on partition 1 (49 > 59)
Expand Down
37 changes: 33 additions & 4 deletions x-pack/filebeat/input/azureeventhub/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,36 @@ type azureInputConfig struct {
SAContainer string `config:"storage_account_container"`
// by default the azure public environment is used, to override, users can provide a specific resource manager endpoint
OverrideEnvironment string `config:"resource_manager_endpoint"`
// cleanup the log JSON input for known issues, options: SINGLE_QUOTES, NEW_LINES
SanitizeOptions []string `config:"sanitize_options"`
// LegacySanitizeOptions is a list of sanitization options to apply to messages.
//
// The supported options are:
//
// * NEW_LINES: replaces new lines with spaces
// * SINGLE_QUOTES: replaces single quotes with double quotes
//
// IMPORTANT: Users should use the `sanitizers` configuration option
// instead.
//
// Instead of using the `sanitize_options` configuration option:
//
// sanitize_options:
// - NEW_LINES
// - SINGLE_QUOTES
//
// use the `sanitizers` configuration option:
//
// sanitizers:
// - type: new_lines
// - type: single_quotes
//
// The `sanitize_options` option is deprecated and will be
// removed in 9.0 release.
//
// Default is an empty list (no sanitization).
LegacySanitizeOptions []string `config:"sanitize_options"`
// Sanitizers is a list of sanitizers to apply to messages that
// contain invalid JSON.
Sanitizers []SanitizerSpec `config:"sanitizers"`
// MigrateCheckpoint controls if the input should perform the checkpoint information
// migration from v1 to v2 (processor v2 only). Default is false.
MigrateCheckpoint bool `config:"migrate_checkpoint"`
Expand Down Expand Up @@ -82,7 +110,7 @@ func defaultConfig() azureInputConfig {
PartitionReceiveTimeout: 5 * time.Second,
PartitionReceiveCount: 100,
// Default
SanitizeOptions: []string{},
LegacySanitizeOptions: []string{},
}
}

Expand Down Expand Up @@ -121,7 +149,8 @@ func (conf *azureInputConfig) Validate() error {
}

// log a warning for each sanitization option not supported
for _, opt := range conf.SanitizeOptions {
for _, opt := range conf.LegacySanitizeOptions {
logger.Warnw("legacy sanitization `sanitize_options` options are deprecated and will be removed in the 9.0 release; use the `sanitizers` option instead", "option", opt)
err := sanitizeOptionsValidate(opt)
if err != nil {
logger.Warnf("%s: %v", opt, err)
Expand Down
21 changes: 15 additions & 6 deletions x-pack/filebeat/input/azureeventhub/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
)

type messageDecoder struct {
config azureInputConfig
log *logp.Logger
metrics *inputMetrics
config azureInputConfig
log *logp.Logger
metrics *inputMetrics
sanitizers []Sanitizer
}

// Decode splits the message into multiple ones based on
Expand Down Expand Up @@ -61,9 +62,17 @@ func (u *messageDecoder) Decode(bMessage []byte) []string {
// Sanitization occurs if options are available and the message contains an invalid JSON.
//
// [1]: https://learn.microsoft.com/en-us/answers/questions/1001797/invalid-json-logs-produced-for-function-apps
if len(u.config.SanitizeOptions) != 0 && !json.Valid(bMessage) {
bMessage = sanitize(bMessage, u.config.SanitizeOptions...)
u.metrics.sanitizedMessages.Inc()
if !json.Valid(bMessage) {
// Count messages containing invalid JSON
u.metrics.invalidJSONMessages.Inc()

if len(u.sanitizers) > 0 {
for _, sanitizer := range u.sanitizers {
bMessage = sanitizer.Sanitize(bMessage)
}

u.metrics.sanitizedMessages.Inc()
}
}

// check if the message is a "records" object containing a list of events
Expand Down
13 changes: 9 additions & 4 deletions x-pack/filebeat/input/azureeventhub/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
Expand Down Expand Up @@ -90,16 +91,20 @@ func TestDecodeRecords(t *testing.T) {

func TestDecodeRecordsWithSanitization(t *testing.T) {
config := defaultConfig()
config.SanitizeOptions = []string{"SINGLE_QUOTES", "NEW_LINES"}
config.LegacySanitizeOptions = []string{"SINGLE_QUOTES", "NEW_LINES"}
log := logp.NewLogger(fmt.Sprintf("%s test for input", inputName))
reg := monitoring.NewRegistry()
metrics := newInputMetrics("test", reg)
defer metrics.Close()

sanitizers, err := newSanitizers(config.Sanitizers, config.LegacySanitizeOptions)
require.NoError(t, err)

decoder := messageDecoder{
config: config,
log: log,
metrics: metrics,
config: config,
log: log,
metrics: metrics,
sanitizers: sanitizers,
}

msg := "{\"records\":[{'test':\"this is some message\",\n\n\"time\":\"2019-12-17T13:43:44.4946995Z\"}," +
Expand Down
Loading