Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Publish transport-grpc-spi exposing QueryBuilderProtoConverter and QueryBuilderProtoConverterRegistry ([#18949](https://github.com/opensearch-project/OpenSearch/pull/18949))
- Support system generated search pipeline. ([#19128](https://github.com/opensearch-project/OpenSearch/pull/19128))
- Add `epoch_micros` date format ([#14669](https://github.com/opensearch-project/OpenSearch/issues/14669))
- Grok processor supports capturing multiple values for same field name ([#18799](https://github.com/opensearch-project/OpenSearch/pull/18799)

### Changed
- Add CompletionStage variants to methods in the Client Interface and default to ActionListener impl ([#18998](https://github.com/opensearch-project/OpenSearch/pull/18998))
Expand Down
23 changes: 18 additions & 5 deletions libs/grok/src/main/java/org/opensearch/grok/Grok.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,29 +99,42 @@ public final class Grok {
private final Regex compiledExpression;
private final MatcherWatchdog matcherWatchdog;
private final List<GrokCaptureConfig> captureConfig;
private final boolean captureAllMatches;

public Grok(Map<String, String> patternBank, String grokPattern, Consumer<String> logCallBack) {
this(patternBank, grokPattern, true, MatcherWatchdog.noop(), logCallBack);
this(patternBank, grokPattern, true, MatcherWatchdog.noop(), logCallBack, false);
}

public Grok(Map<String, String> patternBank, String grokPattern, MatcherWatchdog matcherWatchdog, Consumer<String> logCallBack) {
this(patternBank, grokPattern, true, matcherWatchdog, logCallBack);
this(patternBank, grokPattern, true, matcherWatchdog, logCallBack, false);
}

public Grok(
Map<String, String> patternBank,
String grokPattern,
MatcherWatchdog matcherWatchdog,
Consumer<String> logCallBack,
boolean captureAllMatches
) {
this(patternBank, grokPattern, true, matcherWatchdog, logCallBack, captureAllMatches);
}

Grok(Map<String, String> patternBank, String grokPattern, boolean namedCaptures, Consumer<String> logCallBack) {
this(patternBank, grokPattern, namedCaptures, MatcherWatchdog.noop(), logCallBack);
this(patternBank, grokPattern, namedCaptures, MatcherWatchdog.noop(), logCallBack, false);
}

private Grok(
Map<String, String> patternBank,
String grokPattern,
boolean namedCaptures,
MatcherWatchdog matcherWatchdog,
Consumer<String> logCallBack
Consumer<String> logCallBack,
boolean captureAllMatches
) {
this.patternBank = patternBank;
this.namedCaptures = namedCaptures;
this.matcherWatchdog = matcherWatchdog;
this.captureAllMatches = captureAllMatches;

validatePatternBank();

Expand Down Expand Up @@ -395,7 +408,7 @@ public boolean match(byte[] utf8Bytes, int offset, int length, GrokCaptureExtrac
if (result == Matcher.FAILED) {
return false;
}
extracter.extract(utf8Bytes, offset, matcher.getEagerRegion());
extracter.extract(utf8Bytes, offset, matcher.getEagerRegion(), captureAllMatches);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,29 @@ static class MapExtracter extends GrokCaptureExtracter {
result = captureConfig.isEmpty() ? emptyMap() : new HashMap<>();
fieldExtracters = new ArrayList<>(captureConfig.size());
for (GrokCaptureConfig config : captureConfig) {
fieldExtracters.add(config.objectExtracter(v -> result.put(config.name(), v)));
fieldExtracters.add(config.objectExtracter(v -> {
String fieldName = config.name();
Object existing = result.get(fieldName);
if (existing == null) {
result.put(fieldName, v);
} else if (existing instanceof List) {
@SuppressWarnings("unchecked")
List<Object> list = (List<Object>) existing;
list.add(v);
} else {
List<Object> list = new ArrayList<>();
list.add(existing);
list.add(v);
result.put(fieldName, list);
}
}));
}
}

@Override
void extract(byte[] utf8Bytes, int offset, Region region) {
void extract(byte[] utf8Bytes, int offset, Region region, boolean captureAllMatches) {
for (GrokCaptureExtracter extracter : fieldExtracters) {
extracter.extract(utf8Bytes, offset, region);
extracter.extract(utf8Bytes, offset, region, captureAllMatches);
}
}

Expand All @@ -73,5 +88,5 @@ Map<String, Object> result() {
}
}

abstract void extract(byte[] utf8Bytes, int offset, Region region);
abstract void extract(byte[] utf8Bytes, int offset, Region region, boolean captureAllMatches);
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,16 @@ static GrokCaptureType fromString(String str) {
protected final GrokCaptureExtracter rawExtracter(int[] backRefs, Consumer<? super String> emit) {
return new GrokCaptureExtracter() {
@Override
void extract(byte[] utf8Bytes, int offset, Region region) {
void extract(byte[] utf8Bytes, int offset, Region region, boolean captureAllMatches) {
for (int number : backRefs) {
if (region.getBeg(number) >= 0) {
int matchOffset = offset + region.getBeg(number);
int matchLength = region.getEnd(number) - region.getBeg(number);
emit.accept(new String(utf8Bytes, matchOffset, matchLength, StandardCharsets.UTF_8));
return; // Capture only the first value.
// return the first match value if captureAllMatches is false, else continue to capture all values
if (!captureAllMatches) {
return;
}
}
}
}
Expand Down
98 changes: 98 additions & 0 deletions libs/grok/src/test/java/org/opensearch/grok/GrokTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,87 @@ public void testJavaFilePatternWithSpaces() {
assertThat(grok.match("Test Class.java"), is(true));
}

public void testMultipleCapturesWithSameFieldName() {
// Test that multiple captures with the same field name are collected into a list
BiConsumer<Long, Runnable> scheduler = getLongRunnableBiConsumer();
// Pattern with repeated capture groups for the same field
Grok grok = new Grok(
Grok.BUILTIN_PATTERNS,
"%{IP:ipaddress} %{IP:ipaddress}",
MatcherWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler),
logger::warn,
true
);

// Input with two different IP addresses
Map<String, Object> matches = grok.captures("192.168.1.1 192.168.1.2");

assertNotNull("Should have matches", matches);
Object ipaddress = matches.get("ipaddress");
assertTrue("Should be a List", ipaddress instanceof List);

@SuppressWarnings("unchecked")
List<String> ipList = (List<String>) ipaddress;
assertEquals("Should have 2 elements", 2, ipList.size());
assertEquals("First IP should match", "192.168.1.1", ipList.get(0));
assertEquals("Second IP should match", "192.168.1.2", ipList.get(1));
}

public void testMultipleCapturesWithSameFieldNameDifferentTypes() {
BiConsumer<Long, Runnable> scheduler = getLongRunnableBiConsumer();
// Pattern with repeated capture groups for the same field with different types
Grok grok = new Grok(
Grok.BUILTIN_PATTERNS,
"%{NUMBER:value:int} %{NUMBER:value:long}",
MatcherWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler),
logger::warn,
true
);

// Input with two different numbers
Map<String, Object> matches = grok.captures("123 456");

assertNotNull("Should have matches", matches);
Object value = matches.get("value");
assertTrue("Should be a List", value instanceof List);

@SuppressWarnings("unchecked")
List<Object> valueList = (List<Object>) value;
assertEquals("Should have 2 elements", 2, valueList.size());
assertEquals("First value should be an Integer", Integer.valueOf(123), valueList.get(0));
assertEquals("Second value should be a Long", Long.valueOf(456), valueList.get(1));
}

public void testMultipleCapturesWithSameFieldNameInComplexPattern() {
// Test a more complex pattern with multiple captures of the same field
BiConsumer<Long, Runnable> scheduler = getLongRunnableBiConsumer();

// Pattern with multiple fields, one of which appears multiple times
Grok grok = new Grok(
Grok.BUILTIN_PATTERNS,
"%{WORD:name} has IPs: %{IP:ip}, %{IP:ip} and %{IP:ip}",
MatcherWatchdog.newInstance(10, 200, System::currentTimeMillis, scheduler),
logger::warn,
true
);

// Input with a name and three IPs
Map<String, Object> matches = grok.captures("Server has IPs: 192.168.1.1, 192.168.1.2 and 192.168.1.3");

assertNotNull("Should have matches", matches);
assertEquals("Name should match", "Server", matches.get("name"));

Object ip = matches.get("ip");
assertTrue("IP should be a List", ip instanceof List);

@SuppressWarnings("unchecked")
List<String> ipList = (List<String>) ip;
assertEquals("Should have 3 IPs", 3, ipList.size());
assertEquals("First IP should match", "192.168.1.1", ipList.get(0));
assertEquals("Second IP should match", "192.168.1.2", ipList.get(1));
assertEquals("Third IP should match", "192.168.1.3", ipList.get(2));
}

public void testLogCallBack() {
AtomicReference<String> message = new AtomicReference<>();
Grok grok = new Grok(Grok.BUILTIN_PATTERNS, ".*\\[.*%{SPACE}*\\].*", message::set);
Expand All @@ -747,6 +828,23 @@ public void testLogCallBack() {
assertThat(message.get(), containsString("regular expression has redundant nested repeat operator"));
}

private static BiConsumer<Long, Runnable> getLongRunnableBiConsumer() {
AtomicBoolean run = new AtomicBoolean(true);
return (delay, command) -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
throw new AssertionError(e);
}
Thread t = new Thread(() -> {
if (run.get()) {
command.run();
}
});
t.start();
};
}

private void assertGrokedField(String fieldName) {
String line = "foo";
Grok grok = new Grok(Grok.BUILTIN_PATTERNS, "%{WORD:" + fieldName + "}", logger::warn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public final class GrokProcessor extends AbstractProcessor {
private final Grok grok;
private final boolean traceMatch;
private final boolean ignoreMissing;
private final boolean captureAllMatches;

GrokProcessor(
String tag,
Expand All @@ -67,14 +68,16 @@ public final class GrokProcessor extends AbstractProcessor {
String matchField,
boolean traceMatch,
boolean ignoreMissing,
boolean captureAllMatches,
MatcherWatchdog matcherWatchdog
) {
super(tag, description);
this.matchField = matchField;
this.matchPatterns = matchPatterns;
this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch), matcherWatchdog, logger::debug);
this.grok = new Grok(patternBank, combinePatterns(matchPatterns, traceMatch), matcherWatchdog, logger::debug, captureAllMatches);
this.traceMatch = traceMatch;
this.ignoreMissing = ignoreMissing;
this.captureAllMatches = captureAllMatches;
// Joni warnings are only emitted on an attempt to match, and the warning emitted for every call to match which is too verbose
// so here we emit a warning (if there is one) to the logfile at warn level on construction / processor creation.
new Grok(patternBank, combinePatterns(matchPatterns, traceMatch), matcherWatchdog, logger::warn).match("___nomatch___");
Expand Down Expand Up @@ -130,6 +133,10 @@ List<String> getMatchPatterns() {
return matchPatterns;
}

boolean isCaptureAllMatches() {
return captureAllMatches;
}

static String combinePatterns(List<String> patterns, boolean traceMatch) {
String combinedPattern;
if (patterns.size() > 1) {
Expand Down Expand Up @@ -176,6 +183,7 @@ public GrokProcessor create(
List<String> matchPatterns = ConfigurationUtils.readList(TYPE, processorTag, config, "patterns");
boolean traceMatch = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "trace_match", false);
boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "ignore_missing", false);
boolean captureAllMatches = ConfigurationUtils.readBooleanProperty(TYPE, processorTag, config, "capture_all_matches", false);

if (matchPatterns.isEmpty()) {
throw newConfigurationException(TYPE, processorTag, "patterns", "List of patterns must not be empty");
Expand All @@ -195,6 +203,7 @@ public GrokProcessor create(
matchField,
traceMatch,
ignoreMissing,
captureAllMatches,
matcherWatchdog
);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,19 @@ public void testCreateWithInvalidPatternDefinition() throws Exception {
equalTo("[patterns] Invalid regex pattern found in: [%{MY_PATTERN:name}!]. premature end of char-class")
);
}

public void testBuildWithCaptureAllMatches() throws Exception {
GrokProcessor.Factory factory = new GrokProcessor.Factory(Collections.emptyMap(), MatcherWatchdog.noop());

Map<String, Object> config = new HashMap<>();
config.put("field", "_field");
config.put("patterns", Collections.singletonList("(?<foo>\\w+)"));
config.put("capture_all_matches", true);
String processorTag = randomAlphaOfLength(10);
GrokProcessor processor = factory.create(null, processorTag, null, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getMatchField(), equalTo("_field"));
assertThat(processor.getGrok(), notNullValue());
assertThat(processor.isCaptureAllMatches(), is(true));
}
}
Loading
Loading