Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
*/
package org.elasticsearch.datastreams.mapper;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.datastreams.DataStreamsPlugin;
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.mapper.DataStreamTimestampFieldMapper;
import org.elasticsearch.index.mapper.DateFieldMapper;
Expand Down Expand Up @@ -115,16 +119,6 @@ public void testValidateInvalidFieldType() {
);
}

public void testValidateNotIndexed() {
Exception e = expectThrows(IllegalArgumentException.class, () -> createMapperService(timestampMapping(true, b -> {
b.startObject("@timestamp");
b.field("type", "date");
b.field("index", false);
b.endObject();
})));
assertThat(e.getMessage(), equalTo("data stream timestamp field [@timestamp] is not indexed"));
}

public void testValidateNotDocValues() {
Exception e = expectThrows(IllegalArgumentException.class, () -> createMapperService(timestampMapping(true, b -> {
b.startObject("@timestamp");
Expand Down Expand Up @@ -231,4 +225,196 @@ public void testValidateDefaultIgnoreMalformed() throws Exception {
assertThat(summaryTimestamp.ignoreMalformed(), is(false));
}
}

public void testFieldTypeWithSkipDocValues_LogsDBMode() throws IOException {
final MapperService mapperService = createMapperService(
Settings.builder()
.put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.name())
.put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), DataStreamTimestampFieldMapper.DEFAULT_PATH)
.build(),
timestampMapping(true, b -> {
b.startObject(DataStreamTimestampFieldMapper.DEFAULT_PATH);
b.field("type", "date");
b.endObject();
})
);

final DateFieldMapper timestampMapper = (DateFieldMapper) mapperService.documentMapper()
.mappers()
.getMapper(DataStreamTimestampFieldMapper.DEFAULT_PATH);
assertTrue(timestampMapper.fieldType().hasDocValues());
assertFalse(timestampMapper.fieldType().isIndexed());
assertTrue(timestampMapper.hasDocValuesSparseIndex());
}

public void testFieldTypeWithSkipDocValues_LogsDBModeWithoutDefaultMapping() throws IOException {
final Settings settings = Settings.builder()
.put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.name())
.put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), DataStreamTimestampFieldMapper.DEFAULT_PATH)
.build();
final MapperService mapperService = withMapping(
new TestMapperServiceBuilder().settings(settings).applyDefaultMapping(false).build(),
timestampMapping(true, b -> {
b.startObject(DataStreamTimestampFieldMapper.DEFAULT_PATH);
b.field("type", "date");
b.endObject();
})
);

final DateFieldMapper timestampMapper = (DateFieldMapper) mapperService.documentMapper()
.mappers()
.getMapper(DataStreamTimestampFieldMapper.DEFAULT_PATH);
assertTrue(timestampMapper.fieldType().hasDocValues());
assertFalse(timestampMapper.fieldType().isIndexed());
assertTrue(timestampMapper.hasDocValuesSparseIndex());
}

public void testFieldTypeWithSkipDocValues_ExplicitIndexAndDocValues() throws IOException {
final MapperService mapperService = createMapperService(
Settings.builder()
.put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.name())
.put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), DataStreamTimestampFieldMapper.DEFAULT_PATH)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build(),
timestampMapping(true, b -> {
b.startObject(DataStreamTimestampFieldMapper.DEFAULT_PATH);
b.field("type", "date");
b.field("index", true);
b.field("doc_values", true);
b.endObject();
})
);

final DateFieldMapper timestampMapper = (DateFieldMapper) mapperService.documentMapper()
.mappers()
.getMapper(DataStreamTimestampFieldMapper.DEFAULT_PATH);
assertTrue(timestampMapper.fieldType().hasDocValues());
assertTrue(timestampMapper.fieldType().isIndexed());
assertFalse(timestampMapper.hasDocValuesSparseIndex());
}

public void testFieldTypeWithSkipDocValues_IndexFalseWithoutDefaultMapping() throws IOException {
final Settings settings = Settings.builder()
.put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.name())
.put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), DataStreamTimestampFieldMapper.DEFAULT_PATH)
.build();
final IllegalArgumentException ex = assertThrows(
IllegalArgumentException.class,
() -> withMapping(
new TestMapperServiceBuilder().settings(settings).applyDefaultMapping(false).build(),
timestampMapping(true, b -> {
b.startObject(DataStreamTimestampFieldMapper.DEFAULT_PATH);
b.field("type", "date");
b.field("index", false);
b.endObject();
})
)
);

assertEquals("data stream timestamp field [@timestamp] has disallowed attributes: [index]", ex.getMessage());
}

public void testFieldTypeWithSkipDocValues_IndexFalseWithDefaultMapping() throws IOException {
final Settings settings = Settings.builder()
.put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.name())
.put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), DataStreamTimestampFieldMapper.DEFAULT_PATH)
.build();
final IllegalArgumentException ex = assertThrows(
IllegalArgumentException.class,
() -> createMapperService(settings, timestampMapping(true, b -> {
b.startObject(DataStreamTimestampFieldMapper.DEFAULT_PATH);
b.field("type", "date");
b.field("index", false);
b.endObject();
}))
);

// A data stream always has a mapped @timestamp field with `index: true`
assertTrue(ex.getMessage().contains("Cannot update parameter [index] from [true] to [false]"));
}

public void testFieldTypeWithSkipDocValues_DocValuesFalse() {
final Settings settings = Settings.builder()
.put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.name())
.put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), DataStreamTimestampFieldMapper.DEFAULT_PATH)
.build();
final IllegalArgumentException ex = expectThrows(
IllegalArgumentException.class,
() -> withMapping(
new TestMapperServiceBuilder().settings(settings).applyDefaultMapping(false).build(),
timestampMapping(true, b -> {
b.startObject(DataStreamTimestampFieldMapper.DEFAULT_PATH);
b.field("type", "date");
b.field("doc_values", false);
b.endObject();
})
)
);
assertEquals(
ex.getMessage(),
"data stream timestamp field [" + DataStreamTimestampFieldMapper.DEFAULT_PATH + "] doesn't have doc values"
);
}

public void testFieldTypeWithSkipDocValues_WithoutTimestampSorting() throws IOException {
final MapperService mapperService = createMapperService(
Settings.builder().put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.name()).build(),
timestampMapping(true, b -> {
b.startObject(DataStreamTimestampFieldMapper.DEFAULT_PATH);
b.field("type", "date");
b.endObject();
})
);

final DateFieldMapper timestampMapper = (DateFieldMapper) mapperService.documentMapper()
.mappers()
.getMapper(DataStreamTimestampFieldMapper.DEFAULT_PATH);
assertTrue(timestampMapper.fieldType().hasDocValues());
assertTrue(timestampMapper.fieldType().isIndexed());
assertFalse(timestampMapper.hasDocValuesSparseIndex());
}

public void testFieldTypeWithSkipDocValues_StandardMode() throws IOException {
final MapperService mapperService = createMapperService(
Settings.builder().put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), DataStreamTimestampFieldMapper.DEFAULT_PATH).build(),
timestampMapping(true, b -> {
b.startObject(DataStreamTimestampFieldMapper.DEFAULT_PATH);
b.field("type", "date");
b.endObject();
})
);

final DateFieldMapper timestampMapper = (DateFieldMapper) mapperService.documentMapper()
.mappers()
.getMapper(DataStreamTimestampFieldMapper.DEFAULT_PATH);
assertTrue(timestampMapper.fieldType().hasDocValues());
assertTrue(timestampMapper.fieldType().isIndexed());
assertFalse(timestampMapper.hasDocValuesSparseIndex());
}

public void testFieldTypeWithSkipDocValues_InvalidFieldName() throws IOException {
final MapperService mapperService = createMapperService(
Settings.builder()
.put(IndexSettings.MODE.getKey(), IndexMode.LOGSDB.name())
.put(IndexSortConfig.INDEX_SORT_FIELD_SETTING.getKey(), DataStreamTimestampFieldMapper.DEFAULT_PATH)
.build(),
timestampMapping(true, b -> {
b.startObject("timestamp");
b.field("type", "date");
b.endObject();
})
);

final DateFieldMapper customTimestamp = (DateFieldMapper) mapperService.documentMapper().mappers().getMapper("timestamp");
assertTrue(customTimestamp.fieldType().hasDocValues());
assertTrue(customTimestamp.fieldType().isIndexed());
assertFalse(customTimestamp.hasDocValuesSparseIndex());

// Default LogsDB mapping including @timestamp field is used
final DateFieldMapper defaultTimestamp = (DateFieldMapper) mapperService.documentMapper().mappers().getMapper("@timestamp");
assertTrue(defaultTimestamp.fieldType().hasDocValues());
assertFalse(defaultTimestamp.fieldType().isIndexed());
assertTrue(defaultTimestamp.hasDocValuesSparseIndex());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ private static Version parseUnchecked(String version) {
public static final IndexVersion SOURCE_MAPPER_MODE_ATTRIBUTE_NOOP = def(9_007_00_0, Version.LUCENE_10_0_0);
public static final IndexVersion HOSTNAME_DOC_VALUES_SPARSE_INDEX = def(9_008_00_0, Version.LUCENE_10_0_0);
public static final IndexVersion UPGRADE_TO_LUCENE_10_1_0 = def(9_009_00_0, Version.LUCENE_10_1_0);
public static final IndexVersion TIMESTAMP_DOC_VALUES_SPARSE_INDEX = def(9_010_00_0, Version.LUCENE_10_1_0);
/*
* STOP! READ THIS FIRST! No, really,
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.apache.lucene.search.Query;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.TimestampBounds;
import org.elasticsearch.index.mapper.DateFieldMapper.Resolution;
import org.elasticsearch.index.query.SearchExecutionContext;
Expand All @@ -25,6 +27,7 @@
import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.Map;
import java.util.function.Function;

import static org.elasticsearch.core.TimeValue.NSEC_PER_MSEC;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
Expand All @@ -39,8 +42,10 @@ public class DataStreamTimestampFieldMapper extends MetadataFieldMapper {
public static final String DEFAULT_PATH = "@timestamp";
public static final String TIMESTAMP_VALUE_KEY = "@timestamp._value";

public static final DataStreamTimestampFieldMapper ENABLED_INSTANCE = new DataStreamTimestampFieldMapper(true);
private static final DataStreamTimestampFieldMapper DISABLED_INSTANCE = new DataStreamTimestampFieldMapper(false);
public static final Function<IndexVersion, DataStreamTimestampFieldMapper> ENABLED_INSTANCE =
indexCreatedVersion -> new DataStreamTimestampFieldMapper(true, indexCreatedVersion);
private static final Function<IndexVersion, DataStreamTimestampFieldMapper> DISABLED_INSTANCE =
indexCreatedVersion -> new DataStreamTimestampFieldMapper(false, indexCreatedVersion);

// For now the field shouldn't be useable in searches.
// In the future it should act as an alias to the actual data stream timestamp field.
Expand Down Expand Up @@ -83,8 +88,11 @@ public static class Builder extends MetadataFieldMapper.Builder {
// this field mapper may be enabled but once enabled, may not be disabled
.setMergeValidator((previous, current, conflicts) -> (previous == current) || (previous == false && current));

public Builder() {
private final IndexVersion indexCreatedVersion;

public Builder(final IndexVersion indexCreatedVersion) {
super(NAME);
this.indexCreatedVersion = indexCreatedVersion;
}

@Override
Expand All @@ -94,22 +102,27 @@ protected Parameter<?>[] getParameters() {

@Override
public MetadataFieldMapper build() {
return enabled.getValue() ? ENABLED_INSTANCE : DISABLED_INSTANCE;
return enabled.getValue() ? ENABLED_INSTANCE.apply(indexCreatedVersion) : DISABLED_INSTANCE.apply(indexCreatedVersion);
}
}

public static final TypeParser PARSER = new ConfigurableTypeParser(c -> DISABLED_INSTANCE, c -> new Builder());
public static final TypeParser PARSER = new ConfigurableTypeParser(
c -> DISABLED_INSTANCE.apply(c.getIndexSettings().getIndexVersionCreated()),
c -> new Builder(c.getIndexSettings().getIndexVersionCreated())
);

private final boolean enabled;
private final IndexVersion indexCreatedVersion;

private DataStreamTimestampFieldMapper(boolean enabled) {
private DataStreamTimestampFieldMapper(boolean enabled, final IndexVersion indexCreatedVersion) {
super(TimestampFieldType.INSTANCE);
this.enabled = enabled;
this.indexCreatedVersion = indexCreatedVersion;
}

@Override
public FieldMapper.Builder getMergeBuilder() {
return new Builder().init(this);
return new Builder(indexCreatedVersion).init(this);
}

public void doValidate(MappingLookup lookup) {
Expand Down Expand Up @@ -139,9 +152,12 @@ public void doValidate(MappingLookup lookup) {
}

DateFieldMapper dateFieldMapper = (DateFieldMapper) mapper;
if (dateFieldMapper.fieldType().isIndexed() == false) {
if (dateFieldMapper.fieldType().isIndexed() == false
&& (indexCreatedVersion.onOrAfter(IndexVersions.TIMESTAMP_DOC_VALUES_SPARSE_INDEX) == false
|| dateFieldMapper.hasDocValuesSparseIndex() == false)) {
throw new IllegalArgumentException("data stream timestamp field [" + DEFAULT_PATH + "] is not indexed");
}

if (dateFieldMapper.fieldType().hasDocValues() == false) {
throw new IllegalArgumentException("data stream timestamp field [" + DEFAULT_PATH + "] doesn't have doc values");
}
Expand Down
Loading