Skip to content

Commit 421b438

Browse files
AntonEliatrakolchfa-aws
authored andcommitted
adding examples to key value processor data prepper (opensearch-project#11232)
* adding examples to key value processor data prepper Signed-off-by: Anton Rubin <[email protected]> * Update key-value.md Signed-off-by: AntonEliatra <[email protected]> * Update key-value.md Signed-off-by: AntonEliatra <[email protected]> * Apply suggestions from code review Signed-off-by: kolchfa-aws <[email protected]> --------- Signed-off-by: Anton Rubin <[email protected]> Signed-off-by: AntonEliatra <[email protected]> Signed-off-by: kolchfa-aws <[email protected]> Co-authored-by: kolchfa-aws <[email protected]> Signed-off-by: Arya Soni <[email protected]>
1 parent e6920e5 commit 421b438

File tree

1 file changed

+340
-0
lines changed
  • _data-prepper/pipelines/configuration/processors

1 file changed

+340
-0
lines changed

_data-prepper/pipelines/configuration/processors/key-value.md

Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,346 @@ nav_order: 170
1111

1212
You can use the `key_value` processor to parse the specified field into key-value pairs. You can customize the `key_value` processor to parse field information with the following options. The type for each of the following options is `string`.
1313

14+
## Examples
15+
16+
The following examples demonstrate several configurations you can use with this processor.
17+
18+
The examples don't use security and are for demonstration purposes only. We strongly recommend configuring SSL before using these examples in production.
19+
{: .warning}
20+
21+
### Key-value parsing, normalization, and deduplication
22+
23+
The following example parses the `message` field into `key=value` pairs, normalizes and cleans the keys, prefixes them with `meta_`, deduplicates values, and drops keys without values into `parsed_kv`:
24+
25+
```yaml
26+
kv-basic-pipeline:
27+
source:
28+
http:
29+
path: /logs
30+
ssl: false
31+
32+
processor:
33+
- key_value:
34+
# Read key=value pairs from the "message" field (default anyway)
35+
source: message
36+
# Write parsed pairs into a nested object "parsed_kv"
37+
destination: parsed_kv
38+
39+
# Split pairs on '&' and split key vs value on '='
40+
field_split_characters: "&"
41+
value_split_characters: "="
42+
43+
# Normalize keys and trim garbage whitespace around keys/values
44+
transform_key: lowercase
45+
delete_key_regex: "\\s+" # remove spaces from keys
46+
delete_value_regex: "^\\s+|\\s+$" # trim leading/trailing spaces
47+
48+
# Add a prefix to every key (after normalization + delete_key_regex)
49+
prefix: meta_
50+
51+
# Keep a single unique value for duplicate keys
52+
skip_duplicate_values: true
53+
54+
# Drop keys whose value is empty/absent (e.g., `empty=` or `novalue`)
55+
drop_keys_with_no_value: true
56+
57+
sink:
58+
- opensearch:
59+
hosts: ["https://opensearch:9200"]
60+
insecure: true
61+
username: admin
62+
password: admin_password
63+
index_type: custom
64+
index: kv-basic-%{yyyy.MM.dd}
65+
```
66+
{% include copy.html %}
67+
68+
You can test this pipeline using the following command:
69+
70+
```bash
71+
curl -sS -X POST "http://localhost:2021/logs" \
72+
-H "Content-Type: application/json" \
73+
-d '[
74+
{"message":"key1=value1&key1=value1&Key Two = value two & empty=&novalue"},
75+
{"message":"ENV = prod & TEAM = core & owner = alice "}
76+
]'
77+
```
78+
{% include copy.html %}
79+
80+
The documents stored in OpenSearch contain the following information:
81+
82+
```json
83+
{
84+
...
85+
"hits": {
86+
"total": {
87+
"value": 2,
88+
"relation": "eq"
89+
},
90+
"max_score": 1,
91+
"hits": [
92+
{
93+
"_index": "kv-basic-2025.10.14",
94+
"_id": "M6d84pkB3P3jd6EROH_f",
95+
"_score": 1,
96+
"_source": {
97+
"message": "key1=value1&key1=value1&Key Two = value two & empty=&novalue",
98+
"parsed_kv": {
99+
"meta_key1": "value1",
100+
"meta_empty": "",
101+
"meta_keytwo": "value two"
102+
}
103+
}
104+
},
105+
{
106+
"_index": "kv-basic-2025.10.14",
107+
"_id": "NKd84pkB3P3jd6EROH_f",
108+
"_score": 1,
109+
"_source": {
110+
"message": "ENV = prod & TEAM = core & owner = alice ",
111+
"parsed_kv": {
112+
"meta_owner": "alice",
113+
"meta_team": "core",
114+
"meta_env": "prod"
115+
}
116+
}
117+
}
118+
]
119+
}
120+
}
121+
```
122+
123+
### Grouped values to root
124+
125+
The following example parses the `payload` field by using `&&` to separate pairs and `==` to separate keys and values. It preserves bracketed groups as single values, writes the parsed results to the event root without overwriting existing fields, and records any unmatched tokens as `null`:
126+
127+
```yaml
128+
kv-grouping-pipeline:
129+
source:
130+
http:
131+
path: /logs
132+
ssl: false
133+
134+
processor:
135+
- key_value:
136+
source: "payload"
137+
destination: null
138+
139+
field_split_characters: "&&" # pair delimiter (OK with grouping)
140+
value_split_characters: null # disable the default "="
141+
key_value_delimiter_regex: "==" # exact '==' for key/value
142+
143+
value_grouping: true
144+
remove_brackets: false
145+
overwrite_if_destination_exists: false
146+
non_match_value: null
147+
148+
sink:
149+
- opensearch:
150+
hosts: ["https://opensearch:9200"]
151+
insecure: true
152+
username: admin
153+
password: "admin_pass"
154+
index_type: custom
155+
index: "kv-regex-%{yyyy.MM.dd}"
156+
```
157+
{% include copy.html %}
158+
159+
You can test this pipeline using the following command:
160+
161+
```bash
162+
curl -sS -X POST "http://localhost:2021/logs" \
163+
-H "Content-Type: application/json" \
164+
-d '[
165+
{
166+
"payload":"a==1&&b==[x=y,z=w]&&c==(inner=thing)&&http==http://example.com path",
167+
"a":"keep-me"
168+
},
169+
{
170+
"payload":"good==yes&&broken-token&&url==https://opensearch.org home",
171+
"note":"second doc"
172+
}
173+
]'
174+
```
175+
{% include copy.html %}
176+
177+
The documents stored in OpenSearch contain the following information:
178+
179+
```json
180+
{
181+
...
182+
"hits": {
183+
"total": {
184+
"value": 2,
185+
"relation": "eq"
186+
},
187+
"max_score": 1,
188+
"hits": [
189+
{
190+
"_index": "kv-regex-2025.10.14",
191+
"_id": "FuCX4pkB344hN2Iu62oT",
192+
"_score": 1,
193+
"_source": {
194+
"payload": "a==1&&b==[x=y,z=w]&&c==(inner=thing)&&http==http://example.com path",
195+
"a": "keep-me",
196+
"b": "[x=y,z=w]",
197+
"c": "(inner=thing)",
198+
"http": "http://example.com path"
199+
}
200+
},
201+
{
202+
"_index": "kv-regex-2025.10.14",
203+
"_id": "F-CX4pkB344hN2Iu62oT",
204+
"_score": 1,
205+
"_source": {
206+
"payload": "good==yes&&broken-token&&url==https://opensearch.org home",
207+
"note": "second doc",
208+
"broken-token": null,
209+
"good": "yes",
210+
"url": "https://opensearch.org home"
211+
}
212+
}
213+
]
214+
}
215+
}
216+
```
217+
218+
### Conditional recursive key-value parsing
219+
220+
The following example parses bracketed nested `key=value` structures from `body` into `parsed.*` only when `/type == "nested"`. It preserves group hierarchy, enforces strict nesting rules, applies default fields, and leaves non-nested events unchanged:
221+
222+
```yaml
223+
kv-conditional-recursive-pipeline:
224+
source:
225+
http:
226+
path: /logs
227+
ssl: false
228+
229+
processor:
230+
- key_value:
231+
source: "body"
232+
destination: "parsed"
233+
234+
key_value_when: '/type == "nested"'
235+
recursive: true
236+
237+
# Split rules (per docs; not regex)
238+
field_split_characters: "&"
239+
value_split_characters: "="
240+
241+
# Grouping & quoting (per docs)
242+
value_grouping: true
243+
string_literal_character: "\""
244+
remove_brackets: false
245+
246+
# Keep only some top-level keys; then set defaults
247+
include_keys: ["item1","item2","owner"]
248+
default_values:
249+
owner: "unknown"
250+
region: "eu-west-1"
251+
252+
strict_grouping: true
253+
tags_on_failure: ["keyvalueprocessor_failure"]
254+
255+
sink:
256+
- opensearch:
257+
hosts: ["https://opensearch:9200"]
258+
insecure: true
259+
username: admin
260+
password: "admin_pass"
261+
index_type: custom
262+
index: "kv-recursive-%{yyyy.MM.dd}"
263+
```
264+
{% include copy.html %}
265+
266+
You can test this pipeline using the following command:
267+
268+
```bash
269+
curl -sS -X POST "http://localhost:2021/logs" \
270+
-H "Content-Type: application/json" \
271+
-d '[
272+
{
273+
"type":"nested","body":"item1=[a=1&b=(c=3&d=<e=5>)]&item2=2&owner=alice"
274+
},
275+
{
276+
"type":"flat","body":"item1=[should=not&be=parsed]&item2=42"
277+
},
278+
{
279+
"type":"nested","body":"item1=[desc=\"a=b + c=d\"&x=1]&item2=2"
280+
}
281+
]'
282+
```
283+
{% include copy.html %}
284+
285+
The documents stored in OpenSearch contain the following information:
286+
287+
```json
288+
{
289+
...
290+
"hits": {
291+
"total": {
292+
"value": 3,
293+
"relation": "eq"
294+
},
295+
"max_score": 1,
296+
"hits": [
297+
{
298+
"_index": "kv-recursive-2025.10.14",
299+
"_id": "Q7fC4pkBc0UY8I7pZ6vZ",
300+
"_score": 1,
301+
"_source": {
302+
"type": "nested",
303+
"body": "item1=[a=1&b=(c=3&d=<e=5>)]&item2=2&owner=alice",
304+
"parsed": {
305+
"owner": "alice",
306+
"item2": "2",
307+
"item1": {
308+
"a": "1",
309+
"b": {
310+
"c": "3",
311+
"d": {
312+
"e": "5"
313+
}
314+
}
315+
},
316+
"region": "eu-west-1"
317+
}
318+
}
319+
},
320+
{
321+
"_index": "kv-recursive-2025.10.14",
322+
"_id": "RLfC4pkBc0UY8I7pZ6vZ",
323+
"_score": 1,
324+
"_source": {
325+
"type": "flat",
326+
"body": "item1=[should=not&be=parsed]&item2=42"
327+
}
328+
},
329+
{
330+
"_index": "kv-recursive-2025.10.14",
331+
"_id": "RbfC4pkBc0UY8I7pZ6vZ",
332+
"_score": 1,
333+
"_source": {
334+
"type": "nested",
335+
"body": """item1=[desc="a=b + c=d"&x=1]&item2=2""",
336+
"parsed": {
337+
"owner": "unknown",
338+
"item2": "2",
339+
"item1": {
340+
"desc": "\"a=b + c=d\"",
341+
"x": "1"
342+
},
343+
"region": "eu-west-1"
344+
}
345+
}
346+
}
347+
]
348+
}
349+
}
350+
```
351+
352+
## Configuration
353+
14354
Option | Description | Example
15355
:--- | :--- | :---
16356
`source` | The message field to be parsed. Optional. Default value is `message`. | If `source` is `"message1"`, `{"message1": {"key1=value1"}, "message2": {"key2=value2"}}` parses into `{"message1": {"key1=value1"}, "message2": {"key2=value2"}, "parsed_message": {"key1": "value1"}}`.

0 commit comments

Comments
 (0)