diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 3180559e9..ffc3364c4 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -24,9 +24,9 @@ import org.opensearch.common.settings.Setting import org.opensearch.common.settings.Settings import org.opensearch.common.settings.SettingsFilter import org.opensearch.common.util.concurrent.ThreadContext +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser.Token -import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment import org.opensearch.indexmanagement.controlcenter.notification.ControlCenterIndices @@ -123,6 +123,7 @@ import org.opensearch.indexmanagement.rollup.settings.RollupSettings import org.opensearch.indexmanagement.rollup.util.QueryShardContextFactory import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver import org.opensearch.indexmanagement.settings.IndexManagementSettings +import org.opensearch.indexmanagement.snapshotmanagement.SMRunner import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestCreateSMPolicyHandler import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestDeleteSMPolicyHandler import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestExplainSMPolicyHandler @@ -138,7 +139,6 @@ import org.opensearch.indexmanagement.snapshotmanagement.api.transport.get.Trans import org.opensearch.indexmanagement.snapshotmanagement.api.transport.index.TransportIndexSMPolicyAction import org.opensearch.indexmanagement.snapshotmanagement.api.transport.start.TransportStartSMAction import org.opensearch.indexmanagement.snapshotmanagement.api.transport.stop.TransportStopSMAction -import org.opensearch.indexmanagement.snapshotmanagement.SMRunner import org.opensearch.indexmanagement.snapshotmanagement.model.SMMetadata import org.opensearch.indexmanagement.snapshotmanagement.model.SMPolicy import org.opensearch.indexmanagement.snapshotmanagement.settings.SnapshotManagementSettings @@ -382,7 +382,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin namedWriteableRegistry, environment ) - rollupInterceptor = RollupInterceptor(clusterService, settings, indexNameExpressionResolver) + rollupInterceptor = RollupInterceptor(clusterService, client, settings, indexNameExpressionResolver) val jvmService = JvmService(environment.settings()) val transformRunner = TransformRunner.initialize( client, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt index c1f6070d4..90a31973e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt @@ -5,6 +5,11 @@ package org.opensearch.indexmanagement.rollup.action.index +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener @@ -15,26 +20,27 @@ import org.opensearch.action.index.IndexRequest import org.opensearch.action.index.IndexResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction -import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.client.Client +import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings -import org.opensearch.core.xcontent.NamedXContentRegistry -import org.opensearch.core.xcontent.ToXContent import org.opensearch.common.xcontent.XContentFactory.jsonBuilder import org.opensearch.commons.ConfigConstants import org.opensearch.commons.authuser.User +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX +import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver import org.opensearch.indexmanagement.rollup.util.parseRollup +import org.opensearch.indexmanagement.rollup.util.populateSourceIndexFieldMappings import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.util.IndexUtils import org.opensearch.indexmanagement.util.SecurityUtils import org.opensearch.indexmanagement.util.SecurityUtils.Companion.buildUser -import org.opensearch.indexmanagement.util.SecurityUtils.Companion.validateUserConfiguration import org.opensearch.rest.RestStatus import org.opensearch.tasks.Task import org.opensearch.transport.TransportService @@ -46,12 +52,14 @@ class TransportIndexRollupAction @Inject constructor( val client: Client, actionFilters: ActionFilters, val indexManagementIndices: IndexManagementIndices, + val indexNameExpressionResolver: IndexNameExpressionResolver, val clusterService: ClusterService, val settings: Settings, val xContentRegistry: NamedXContentRegistry ) : HandledTransportAction( IndexRollupAction.NAME, transportService, actionFilters, ::IndexRollupRequest -) { +), + CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("TransportIndexRollupAction")) { @Volatile private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings) @@ -80,72 +88,78 @@ class TransportIndexRollupAction @Inject constructor( ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT )}" ) - client.threadPool().threadContext.stashContext().use { - if (!validateUserConfiguration(user, filterByEnabled, actionListener)) { - return - } - indexManagementIndices.checkAndUpdateIMConfigIndex(ActionListener.wrap(::onCreateMappingsResponse, actionListener::onFailure)) - } - } - - private fun onCreateMappingsResponse(response: AcknowledgedResponse) { - if (response.isAcknowledged) { - log.info("Successfully created or updated $INDEX_MANAGEMENT_INDEX with newest mappings.") - if (request.opType() == DocWriteRequest.OpType.CREATE) { - if (!validateTargetIndexName()) { - return actionListener.onFailure( - OpenSearchStatusException( - "target_index value is invalid: ${request.rollup.targetIndex}", - RestStatus.BAD_REQUEST + launch { + try { + indexManagementIndices.checkAndUpdateIMConfigIndex(log) + log.info("Successfully created or updated $INDEX_MANAGEMENT_INDEX with newest mappings.") + if (request.opType() == DocWriteRequest.OpType.CREATE) { + if (!validateTargetIndexName()) { + actionListener.onFailure( + OpenSearchStatusException( + "target_index value is invalid: ${request.rollup.targetIndex}", + RestStatus.BAD_REQUEST + ) ) + } + request.rollup = populateSourceIndexFieldMappings( + request.rollup, + indexNameExpressionResolver, + clusterService.state(), + client, + log ) + indexRollupDoc() + } else { + updateRollup() } - putRollup() - } else { - getRollup() + } catch (e: Exception) { + actionListener.onFailure(e) } - } else { - val message = "Unable to create or update $INDEX_MANAGEMENT_INDEX with newest mapping." - log.error(message) - actionListener.onFailure(OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)) } } - private fun getRollup() { - val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.rollup.id) - client.get(getRequest, ActionListener.wrap(::onGetRollup, actionListener::onFailure)) - } - @Suppress("ReturnCount") - private fun onGetRollup(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) - return - } - - val rollup: Rollup? + private suspend fun updateRollup() { try { - rollup = parseRollup(response, xContentRegistry) - } catch (e: IllegalArgumentException) { - actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) - return - } - if (!SecurityUtils.userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", rollup.id, actionListener)) { - return - } - val modified = modifiedImmutableProperties(rollup, request.rollup) - if (modified.isNotEmpty()) { - return actionListener.onFailure(OpenSearchStatusException("Not allowed to modify $modified", RestStatus.BAD_REQUEST)) - } - if (!validateTargetIndexName()) { - return actionListener.onFailure( - OpenSearchStatusException( - "target_index value is invalid: ${request.rollup.targetIndex}", - RestStatus.BAD_REQUEST + val getRequest = GetRequest(INDEX_MANAGEMENT_INDEX, request.rollup.id) + val getResponse: GetResponse = client.suspendUntil { + get(getRequest, it) + } + + if (!getResponse.isExists) { + actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) + return + } + + val rollup: Rollup? + try { + rollup = parseRollup(getResponse, xContentRegistry) + } catch (e: IllegalArgumentException) { + actionListener.onFailure(OpenSearchStatusException("Rollup not found", RestStatus.NOT_FOUND)) + return + } + if (!SecurityUtils.userHasPermissionForResource(user, rollup.user, filterByEnabled, "rollup", rollup.id, actionListener)) { + return + } + val modified = modifiedImmutableProperties(rollup, request.rollup) + if (modified.isNotEmpty()) { + return actionListener.onFailure(OpenSearchStatusException("Not allowed to modify $modified", RestStatus.BAD_REQUEST)) + } + if (!validateTargetIndexName()) { + return actionListener.onFailure( + OpenSearchStatusException( + "target_index value is invalid: ${request.rollup.targetIndex}", + RestStatus.BAD_REQUEST + ) ) - ) + } + + request.rollup = populateSourceIndexFieldMappings(request.rollup, indexNameExpressionResolver, clusterService.state(), client, log) + + indexRollupDoc() + } catch (e: Exception) { + actionListener.onFailure(e) } - putRollup() } private fun modifiedImmutableProperties(rollup: Rollup, newRollup: Rollup): List { @@ -158,35 +172,30 @@ class TransportIndexRollupAction @Inject constructor( return modified.toList() } - private fun putRollup() { - val rollup = request.rollup.copy(schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, user = this.user) - request.index(INDEX_MANAGEMENT_INDEX) - .id(request.rollup.id) - .source(rollup.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) - .timeout(IndexRequest.DEFAULT_TIMEOUT) - client.index( - request, - object : ActionListener { - override fun onResponse(response: IndexResponse) { - if (response.shardInfo.failed > 0) { - val failureReasons = response.shardInfo.failures.joinToString(", ") { it.reason() } - actionListener.onFailure(OpenSearchStatusException(failureReasons, response.status())) - } else { - val status = if (request.opType() == DocWriteRequest.OpType.CREATE) RestStatus.CREATED else RestStatus.OK - actionListener.onResponse( - IndexRollupResponse( - response.id, response.version, response.seqNo, response.primaryTerm, status, - rollup.copy(seqNo = response.seqNo, primaryTerm = response.primaryTerm) - ) - ) - } - } - - override fun onFailure(e: Exception) { - actionListener.onFailure(e) - } + private suspend fun indexRollupDoc() { + try { + val rollup = request.rollup.copy(schemaVersion = IndexUtils.indexManagementConfigSchemaVersion, user = this.user) + request.index(INDEX_MANAGEMENT_INDEX) + .id(request.rollup.id) + .source(rollup.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .timeout(IndexRequest.DEFAULT_TIMEOUT) + + val response: IndexResponse = client.suspendUntil { index(request, it) } + if (response.shardInfo.failed > 0) { + val failureReasons = response.shardInfo.failures.joinToString(", ") { it.reason() } + actionListener.onFailure(OpenSearchStatusException(failureReasons, response.status())) + } else { + val status = if (request.opType() == DocWriteRequest.OpType.CREATE) RestStatus.CREATED else RestStatus.OK + actionListener.onResponse( + IndexRollupResponse( + response.id, response.version, response.seqNo, response.primaryTerm, status, + rollup.copy(seqNo = response.seqNo, primaryTerm = response.primaryTerm) + ) + ) } - ) + } catch (e: Exception) { + actionListener.onFailure(e) + } } private fun validateTargetIndexName(): Boolean { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt index 2f0caf351..6eff5fcc0 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt @@ -21,16 +21,15 @@ import org.opensearch.common.bytes.BytesReference import org.opensearch.common.inject.Inject import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.Writeable -import org.opensearch.core.xcontent.MediaType import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType +import org.opensearch.core.xcontent.MediaType import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver import org.opensearch.indexmanagement.util.IndexUtils.Companion._META import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.TransportService -import java.lang.Exception class TransportUpdateRollupMappingAction @Inject constructor( threadPool: ThreadPool, @@ -101,15 +100,8 @@ class TransportUpdateRollupMappingAction @Inject constructor( val updatedRollups = mapOf("rollups" to rollupJobEntries) metaMappings[_META] = updatedRollups } else { - if ((rollups as Map<*, *>).containsKey(request.rollup.id)) { - log.debug("Meta rollup mappings already contain rollup ${request.rollup.id} for index [$index]") - return listener.onFailure( - IllegalStateException("Meta rollup mappings already contain rollup ${request.rollup.id} for index [$index]") - ) - } - // In this case rollup mappings exists and there is no entry for request.rollup.id - val rollupJobEntries = rollups.toMutableMap() + val rollupJobEntries = (rollups as Map<*, *>).toMutableMap() rollupJobEntries[request.rollup.id] = rollup val updatedRollups = mapOf("rollups" to rollupJobEntries) metaMappings[_META] = updatedRollups diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index ffd1e4bd7..1e1ed9426 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -5,8 +5,15 @@ package org.opensearch.indexmanagement.rollup.interceptor +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.action.support.IndicesOptions +import org.opensearch.action.support.master.AcknowledgedResponse +import org.opensearch.client.Client import org.opensearch.cluster.ClusterState import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService @@ -24,6 +31,9 @@ import org.opensearch.index.query.TermQueryBuilder import org.opensearch.index.query.TermsQueryBuilder import org.opensearch.index.search.MatchQuery import org.opensearch.indexmanagement.common.model.dimension.Dimension +import org.opensearch.indexmanagement.opensearchapi.suspendUntil +import org.opensearch.indexmanagement.rollup.action.mapping.UpdateRollupMappingAction +import org.opensearch.indexmanagement.rollup.action.mapping.UpdateRollupMappingRequest import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupFieldMapping import org.opensearch.indexmanagement.rollup.model.RollupFieldMapping.Companion.UNKNOWN_MAPPING @@ -33,6 +43,7 @@ import org.opensearch.indexmanagement.rollup.util.getDateHistogram import org.opensearch.indexmanagement.rollup.util.getRollupJobs import org.opensearch.indexmanagement.rollup.util.isRollupIndex import org.opensearch.indexmanagement.rollup.util.populateFieldMappings +import org.opensearch.indexmanagement.rollup.util.populateSourceIndexFieldMappings import org.opensearch.indexmanagement.rollup.util.rewriteSearchSourceBuilder import org.opensearch.indexmanagement.util.IndexUtils import org.opensearch.search.aggregations.AggregationBuilder @@ -52,11 +63,13 @@ import org.opensearch.transport.TransportInterceptor import org.opensearch.transport.TransportRequest import org.opensearch.transport.TransportRequestHandler +@Suppress("TooManyFunctions") class RollupInterceptor( val clusterService: ClusterService, + val client: Client, val settings: Settings, val indexNameExpressionResolver: IndexNameExpressionResolver -) : TransportInterceptor { +) : TransportInterceptor, CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("RollupInterceptor")) { private val logger = LogManager.getLogger(javaClass) @@ -81,40 +94,116 @@ class RollupInterceptor( ): TransportRequestHandler { return object : TransportRequestHandler { override fun messageReceived(request: T, channel: TransportChannel, task: Task) { - if (searchEnabled && request is ShardSearchRequest) { - val index = request.shardId().indexName - val isRollupIndex = isRollupIndex(index, clusterService.state()) - if (isRollupIndex) { - if (request.source().size() != 0) { - throw IllegalArgumentException("Rollup search must have size explicitly set to 0, but found ${request.source().size()}") - } + if (!searchEnabled || request !is ShardSearchRequest) { + actualHandler.messageReceived(request, channel, task) + return + } + val index = request.shardId().indexName + val isRollupIndex = isRollupIndex(index, clusterService.state()) + if (!isRollupIndex) { + actualHandler.messageReceived(request, channel, task) + return + } + if (request.source().size() != 0) { + throw IllegalArgumentException("Rollup search must have size explicitly set to 0, but found ${request.source().size()}") + } + var rollupJobs = clusterService.state().metadata.index(index).getRollupJobs() + if (rollupJobs.isNullOrEmpty()) { + throw IllegalArgumentException("No rollup job associated with target_index") + } + + if (verifySourceIndexMappingsExists(rollupJobs)) { + try { + rewriteRollupSearchRequest(request, rollupJobs) + } catch (e: Exception) { + channel.sendResponse(e) + } + actualHandler.messageReceived(request, channel, task) + } else { + launch { + + val updatedJobs = updateSourceIndexFieldMappings(rollupJobs) + updateRollupIndexMetaField(updatedJobs) + // Create new list of jobs with merged updated jobs + val allRollupJobs = + if (updatedJobs.isNullOrEmpty()) rollupJobs + else rollupJobs.filter { + job -> + updatedJobs.find { newJob -> newJob.id == job.id } == null + }.plus(updatedJobs) - val indices = request.indices().map { it.toString() }.toTypedArray() - val concreteIndices = indexNameExpressionResolver - .concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices) - // To extract fields from QueryStringQueryBuilder we need concrete source index name. - val rollupJob = clusterService.state().metadata.index(index).getRollupJobs()?.get(0) - ?: throw IllegalArgumentException("No rollup job associated with target_index") - val queryFieldMappings = getQueryMetadata( - request.source().query(), - getConcreteSourceIndex(rollupJob.sourceIndex, indexNameExpressionResolver, clusterService.state()) - ) - val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories) - val fieldMappings = queryFieldMappings + aggregationFieldMappings - - val allMatchingRollupJobs = validateIndicies(concreteIndices, fieldMappings) - - // only rebuild if there is necessity to rebuild - if (fieldMappings.isNotEmpty()) { - rewriteShardSearchForRollupJobs(request, allMatchingRollupJobs) + try { + rewriteRollupSearchRequest(request, allRollupJobs) + } catch (e: Exception) { + channel.sendResponse(e) } + actualHandler.messageReceived(request, channel, task) } } - actualHandler.messageReceived(request, channel, task) } } } + private suspend fun updateRollupIndexMetaField(updatedJobs: List?) { + updatedJobs?.forEach { rollup -> + client.suspendUntil { + execute(UpdateRollupMappingAction.INSTANCE, UpdateRollupMappingRequest(rollup), it) + } + } + } + + @Suppress("SpreadOperator") + private fun rewriteRollupSearchRequest(request: ShardSearchRequest, rollupJobs: List) { + + val rollupJob = rollupJobs.get(0) + + val indices = request.indices().map { it.toString() }.toTypedArray() + val concreteIndices = indexNameExpressionResolver + .concreteIndexNames(clusterService.state(), request.indicesOptions(), *indices) + + val queryFieldMappings = getQueryMetadata( + request.source().query(), + rollupJob + ) + val aggregationFieldMappings = getAggregationMetadata(request.source().aggregations()?.aggregatorFactories) + val fieldMappings = queryFieldMappings + aggregationFieldMappings + + val allMatchingRollupJobs = validateIndicies(concreteIndices, fieldMappings) + + // only rebuild if there is necessity to rebuild + if (fieldMappings.isNotEmpty()) { + rewriteShardSearchForRollupJobs(request, allMatchingRollupJobs) + } + } + + private fun verifySourceIndexMappingsExists(rollupJobs: List): Boolean { + return rollupJobs.find { it.sourceIndexFieldMappings.isNullOrEmpty() } == null + } + + private suspend fun updateSourceIndexFieldMappings(rollupJobs: List): List? { + + val updatedJobs = mutableListOf() + rollupJobs.forEach { rollup -> + if (!rollup.sourceIndexFieldMappings.isNullOrEmpty()) { + return@forEach + } + + try { + val updatedRollup = populateSourceIndexFieldMappings( + rollup, + indexNameExpressionResolver, + clusterService.state(), + client, + logger + ) + updatedJobs += updatedRollup + } catch (e: Exception) { + logger.debug("Failed to update sourceIndex mappings for sourceIndex [${rollup.sourceIndex}]", e) + } + } + return updatedJobs + } + fun getConcreteSourceIndex(sourceIndex: String, resolver: IndexNameExpressionResolver, clusterState: ClusterState): String { val concreteIndexNames = resolver.concreteIndexNames(clusterState, IndicesOptions.LENIENT_EXPAND_OPEN, sourceIndex) if (concreteIndexNames.isEmpty()) { @@ -196,7 +285,7 @@ class RollupInterceptor( @Suppress("ComplexMethod", "ThrowsCount", "LongMethod") private fun getQueryMetadata( query: QueryBuilder?, - concreteSourceIndexName: String?, + rollup: Rollup, fieldMappings: MutableSet = mutableSetOf() ): Set { if (query == null) { @@ -216,20 +305,20 @@ class RollupInterceptor( // do nothing } is BoolQueryBuilder -> { - query.must()?.forEach { this.getQueryMetadata(it, concreteSourceIndexName, fieldMappings) } - query.mustNot()?.forEach { this.getQueryMetadata(it, concreteSourceIndexName, fieldMappings) } - query.should()?.forEach { this.getQueryMetadata(it, concreteSourceIndexName, fieldMappings) } - query.filter()?.forEach { this.getQueryMetadata(it, concreteSourceIndexName, fieldMappings) } + query.must()?.forEach { this.getQueryMetadata(it, rollup, fieldMappings) } + query.mustNot()?.forEach { this.getQueryMetadata(it, rollup, fieldMappings) } + query.should()?.forEach { this.getQueryMetadata(it, rollup, fieldMappings) } + query.filter()?.forEach { this.getQueryMetadata(it, rollup, fieldMappings) } } is BoostingQueryBuilder -> { - this.getQueryMetadata(query.positiveQuery(), concreteSourceIndexName, fieldMappings) - this.getQueryMetadata(query.negativeQuery(), concreteSourceIndexName, fieldMappings) + this.getQueryMetadata(query.positiveQuery(), rollup, fieldMappings) + this.getQueryMetadata(query.negativeQuery(), rollup, fieldMappings) } is ConstantScoreQueryBuilder -> { - this.getQueryMetadata(query.innerQuery(), concreteSourceIndexName, fieldMappings) + this.getQueryMetadata(query.innerQuery(), rollup, fieldMappings) } is DisMaxQueryBuilder -> { - query.innerQueries().forEach { this.getQueryMetadata(it, concreteSourceIndexName, fieldMappings) } + query.innerQueries().forEach { this.getQueryMetadata(it, rollup, fieldMappings) } } is MatchPhraseQueryBuilder -> { if (!query.analyzer().isNullOrEmpty() || query.slop() != MatchQuery.DEFAULT_PHRASE_SLOP || @@ -242,11 +331,11 @@ class RollupInterceptor( fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, query.fieldName(), Dimension.Type.TERMS.type)) } is QueryStringQueryBuilder -> { - if (concreteSourceIndexName.isNullOrEmpty()) { + if (rollup.sourceIndexFieldMappings.isNullOrEmpty()) { throw IllegalArgumentException("Can't parse query_string query without sourceIndex mappings!") } // Throws IllegalArgumentException if unable to parse query - val (queryFields, otherFields) = QueryStringQueryUtil.extractFieldsFromQueryString(query, concreteSourceIndexName) + val (queryFields, otherFields) = QueryStringQueryUtil.extractFieldsFromQueryString(query, rollup.sourceIndexFieldMappings) for (field in queryFields) { fieldMappings.add(RollupFieldMapping(RollupFieldMapping.Companion.FieldType.DIMENSION, field, Dimension.Type.TERMS.type)) } @@ -328,11 +417,10 @@ class RollupInterceptor( private fun rewriteShardSearchForRollupJobs(request: ShardSearchRequest, matchingRollupJobs: Map>) { val matchedRollup = pickRollupJob(matchingRollupJobs.keys) val fieldNameMappingTypeMap = matchingRollupJobs.getValue(matchedRollup).associateBy({ it.fieldName }, { it.mappingType }) - val concreteSourceIndex = getConcreteSourceIndex(matchedRollup.sourceIndex, indexNameExpressionResolver, clusterService.state()) if (searchAllJobs) { - request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex)) + request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap)) } else { - request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex)) + request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap)) } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt index 6806a887b..87f2b615c 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt @@ -8,12 +8,12 @@ package org.opensearch.indexmanagement.rollup.model import org.opensearch.common.io.stream.StreamInput import org.opensearch.common.io.stream.StreamOutput import org.opensearch.common.io.stream.Writeable +import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import org.opensearch.commons.authuser.User import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.XContentBuilder import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParser.Token -import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken -import org.opensearch.commons.authuser.User import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Dimension @@ -54,7 +54,8 @@ data class Rollup( val continuous: Boolean, val dimensions: List, val metrics: List, - val user: User? = null + val user: User? = null, + val sourceIndexFieldMappings: Map? = mapOf() ) : ScheduledJobParameter, Writeable { init { @@ -150,7 +151,8 @@ data class Rollup( metrics = sin.readList(::RollupMetrics), user = if (sin.readBoolean()) { User(sin) - } else null + } else null, + sourceIndexFieldMappings = sin.readMap() ) override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { @@ -171,6 +173,7 @@ data class Rollup( .field(CONTINUOUS_FIELD, continuous) .field(DIMENSIONS_FIELD, dimensions.toTypedArray()) .field(RollupMetrics.METRICS_FIELD, metrics.toTypedArray()) + .field(SOURCE_INDEX_FIELD_MAPPINGS_FIELD, sourceIndexFieldMappings) if (params.paramAsBoolean(WITH_USER, true)) builder.optionalUserField(USER_FIELD, user) if (params.paramAsBoolean(WITH_TYPE, true)) builder.endObject() builder.endObject() @@ -211,6 +214,7 @@ data class Rollup( out.writeCollection(metrics) out.writeBoolean(user != null) user?.writeTo(out) + out.writeMap(sourceIndexFieldMappings) } companion object { @@ -236,6 +240,7 @@ data class Rollup( const val CONTINUOUS_FIELD = "continuous" const val DIMENSIONS_FIELD = "dimensions" const val METRICS_FIELD = "metrics" + const val SOURCE_INDEX_FIELD_MAPPINGS_FIELD = "source_index_field_mappings" const val MINIMUM_JOB_INTERVAL = 1 const val MINIMUM_DELAY = 0 const val MINIMUM_PAGE_SIZE = 1 @@ -273,6 +278,7 @@ data class Rollup( val dimensions = mutableListOf() val metrics = mutableListOf() var user: User? = null + var sourceIndexFieldMappings = mutableMapOf() ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) @@ -316,6 +322,9 @@ data class Rollup( USER_FIELD -> { user = if (xcp.currentToken() == Token.VALUE_NULL) null else User.parse(xcp) } + SOURCE_INDEX_FIELD_MAPPINGS_FIELD -> { + sourceIndexFieldMappings = xcp.map() + } else -> throw IllegalArgumentException("Invalid field [$fieldName] found in Rollup.") } } @@ -351,7 +360,8 @@ data class Rollup( continuous = continuous, dimensions = dimensions, metrics = metrics, - user = user + user = user, + sourceIndexFieldMappings = sourceIndexFieldMappings ) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt index a82197d5a..fbb5cf480 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/query/QueryStringQueryUtil.kt @@ -24,11 +24,11 @@ object QueryStringQueryUtil { fun rewriteQueryStringQuery( queryBuilder: QueryBuilder, - concreteIndexName: String + sourceIndexMappings: Map ): QueryStringQueryBuilder { val qsqBuilder = queryBuilder as QueryStringQueryBuilder // Parse query_string query and extract all discovered fields - val (fieldsFromQueryString, otherFields) = extractFieldsFromQueryString(queryBuilder, concreteIndexName) + val (fieldsFromQueryString, otherFields) = extractFieldsFromQueryString(queryBuilder, sourceIndexMappings) // Rewrite query_string var newQueryString = qsqBuilder.queryString() fieldsFromQueryString.forEach { field -> @@ -93,8 +93,8 @@ object QueryStringQueryUtil { } @Suppress("ComplexMethod", "LongMethod", "ThrowsCount", "EmptyCatchBlock") - fun extractFieldsFromQueryString(queryBuilder: QueryBuilder, concreteIndexName: String): Pair, Map> { - val context = QueryShardContextFactory.createShardContext(concreteIndexName) + fun extractFieldsFromQueryString(queryBuilder: QueryBuilder, sourceIndexMappings: Map): Pair, Map> { + val context = QueryShardContextFactory.createShardContext(sourceIndexMappings) val qsqBuilder = queryBuilder as QueryStringQueryBuilder val rewrittenQueryString = if (qsqBuilder.escape()) QueryParser.escape(qsqBuilder.queryString()) else qsqBuilder.queryString() val queryParser: QueryStringQueryParserExt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt index c2f28113f..55c571c41 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/QueryShardContextFactory.kt @@ -58,18 +58,8 @@ object QueryShardContextFactory { this.environment = environment } - private fun getIndexSettingsAndMetadata(indexName: String?): Triple { - val index: Index? - val indexSettings: Settings? - val indexMetadata = clusterService.state().metadata.index(indexName) - ?: throw IllegalArgumentException("Can't find IndexMetadata for index: [$indexName]") - index = indexMetadata.index - indexSettings = indexMetadata.settings - return Triple(index, indexSettings, indexMetadata) - } - - fun createShardContext(indexName: String?): QueryShardContext { - val (index, indexSettings, indexMetadata) = getIndexSettingsAndMetadata(indexName) + fun createShardContext(sourceIndexMappings: Map): QueryShardContext { + val index = Index("dummyIndex", "dummyUUID") val nodeSettings = Settings.builder() .put("node.name", "dummyNodeName") .put(Environment.PATH_HOME_SETTING.key, environment.tmpDir()) @@ -83,7 +73,7 @@ object QueryShardContextFactory { pluginsService.pluginSettingsFilter, emptySet() ) val indexScopedSettings: IndexScopedSettings = settingsModule.indexScopedSettings - val idxSettings = newIndexSettings(index, indexSettings, indexScopedSettings) + val idxSettings = newIndexSettings(index, Settings.EMPTY, indexScopedSettings) val indicesModule = IndicesModule(pluginsService.filterPlugins(MapperPlugin::class.java)) val mapperRegistry = indicesModule.mapperRegistry val analysisModule = AnalysisModule(environment, emptyList()) @@ -95,12 +85,12 @@ object QueryShardContextFactory { xContentRegistry, similarityService, mapperRegistry, - { createShardContext(null) }, + { createShardContext(mapOf()) }, { false }, scriptService ) // In order to be able to call toQuery method on QueryBuilder, we need to setup mappings in MapperService - mapperService.merge("_doc", indexMetadata?.mapping()?.source(), MapperService.MergeReason.MAPPING_UPDATE) + mapperService.merge("_doc", sourceIndexMappings, MapperService.MergeReason.MAPPING_UPDATE) return QueryShardContext( 0, @@ -117,7 +107,7 @@ object QueryShardContextFactory { null, { Instant.now().toEpochMilli() }, null, - { pattern -> Regex.simpleMatch(pattern, index?.name) }, + { pattern -> Regex.simpleMatch(pattern, index.name) }, { true }, null ) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt index f6d749fbb..5eb868afe 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupUtils.kt @@ -7,10 +7,18 @@ package org.opensearch.indexmanagement.rollup.util +import org.apache.logging.log4j.Logger +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest +import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse import org.opensearch.action.get.GetResponse import org.opensearch.action.search.SearchRequest +import org.opensearch.action.support.IndicesOptions +import org.opensearch.client.Client import org.opensearch.cluster.ClusterState import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.metadata.IndexNameExpressionResolver +import org.opensearch.cluster.metadata.MappingMetadata import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken @@ -33,6 +41,7 @@ import org.opensearch.indexmanagement.common.model.dimension.Dimension import org.opensearch.indexmanagement.common.model.dimension.Histogram import org.opensearch.indexmanagement.common.model.dimension.Terms import org.opensearch.indexmanagement.opensearchapi.parseWithType +import org.opensearch.indexmanagement.opensearchapi.suspendUntil import org.opensearch.indexmanagement.rollup.RollupMapperService import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupFieldMapping @@ -46,6 +55,7 @@ import org.opensearch.indexmanagement.rollup.query.QueryStringQueryUtil import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings import org.opensearch.indexmanagement.rollup.settings.RollupSettings import org.opensearch.indexmanagement.util.IndexUtils +import org.opensearch.rest.RestStatus import org.opensearch.script.Script import org.opensearch.script.ScriptType import org.opensearch.search.aggregations.AggregationBuilder @@ -290,8 +300,7 @@ fun Rollup.rewriteAggregationBuilder(aggregationBuilder: AggregationBuilder): Ag @Suppress("ComplexMethod", "LongMethod") fun Rollup.rewriteQueryBuilder( queryBuilder: QueryBuilder, - fieldNameMappingTypeMap: Map, - concreteIndexName: String = "" + fieldNameMappingTypeMap: Map ): QueryBuilder { return when (queryBuilder) { is TermQueryBuilder -> { @@ -322,19 +331,19 @@ fun Rollup.rewriteQueryBuilder( is BoolQueryBuilder -> { val newBoolQueryBuilder = BoolQueryBuilder() queryBuilder.must()?.forEach { - val newMustQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName) + val newMustQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap) newBoolQueryBuilder.must(newMustQueryBuilder) } queryBuilder.mustNot()?.forEach { - val newMustNotQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName) + val newMustNotQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap) newBoolQueryBuilder.mustNot(newMustNotQueryBuilder) } queryBuilder.should()?.forEach { - val newShouldQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName) + val newShouldQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap) newBoolQueryBuilder.should(newShouldQueryBuilder) } queryBuilder.filter()?.forEach { - val newFilterQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName) + val newFilterQueryBuilder = this.rewriteQueryBuilder(it, fieldNameMappingTypeMap) newBoolQueryBuilder.filter(newFilterQueryBuilder) } newBoolQueryBuilder.minimumShouldMatch(queryBuilder.minimumShouldMatch()) @@ -343,15 +352,15 @@ fun Rollup.rewriteQueryBuilder( newBoolQueryBuilder.boost(queryBuilder.boost()) } is BoostingQueryBuilder -> { - val newPositiveQueryBuilder = this.rewriteQueryBuilder(queryBuilder.positiveQuery(), fieldNameMappingTypeMap, concreteIndexName) - val newNegativeQueryBuilder = this.rewriteQueryBuilder(queryBuilder.negativeQuery(), fieldNameMappingTypeMap, concreteIndexName) + val newPositiveQueryBuilder = this.rewriteQueryBuilder(queryBuilder.positiveQuery(), fieldNameMappingTypeMap) + val newNegativeQueryBuilder = this.rewriteQueryBuilder(queryBuilder.negativeQuery(), fieldNameMappingTypeMap) val newBoostingQueryBuilder = BoostingQueryBuilder(newPositiveQueryBuilder, newNegativeQueryBuilder) if (queryBuilder.negativeBoost() >= 0) newBoostingQueryBuilder.negativeBoost(queryBuilder.negativeBoost()) newBoostingQueryBuilder.queryName(queryBuilder.queryName()) newBoostingQueryBuilder.boost(queryBuilder.boost()) } is ConstantScoreQueryBuilder -> { - val newInnerQueryBuilder = this.rewriteQueryBuilder(queryBuilder.innerQuery(), fieldNameMappingTypeMap, concreteIndexName) + val newInnerQueryBuilder = this.rewriteQueryBuilder(queryBuilder.innerQuery(), fieldNameMappingTypeMap) val newConstantScoreQueryBuilder = ConstantScoreQueryBuilder(newInnerQueryBuilder) newConstantScoreQueryBuilder.boost(queryBuilder.boost()) newConstantScoreQueryBuilder.queryName(queryBuilder.queryName()) @@ -359,7 +368,7 @@ fun Rollup.rewriteQueryBuilder( is DisMaxQueryBuilder -> { val newDisMaxQueryBuilder = DisMaxQueryBuilder() queryBuilder.innerQueries().forEach { - newDisMaxQueryBuilder.add(this.rewriteQueryBuilder(it, fieldNameMappingTypeMap, concreteIndexName)) + newDisMaxQueryBuilder.add(this.rewriteQueryBuilder(it, fieldNameMappingTypeMap)) } newDisMaxQueryBuilder.tieBreaker(queryBuilder.tieBreaker()) newDisMaxQueryBuilder.queryName(queryBuilder.queryName()) @@ -372,16 +381,16 @@ fun Rollup.rewriteQueryBuilder( newMatchPhraseQueryBuilder.boost(queryBuilder.boost()) } is QueryStringQueryBuilder -> { - QueryStringQueryUtil.rewriteQueryStringQuery(queryBuilder, concreteIndexName) + QueryStringQueryUtil.rewriteQueryStringQuery(queryBuilder, this.sourceIndexFieldMappings!!) } // We do nothing otherwise, the validation logic should have already verified so not throwing an exception else -> queryBuilder } } -fun Set.buildRollupQuery(fieldNameMappingTypeMap: Map, oldQuery: QueryBuilder, targetIndexName: String = ""): QueryBuilder { +fun Set.buildRollupQuery(fieldNameMappingTypeMap: Map, oldQuery: QueryBuilder): QueryBuilder { val wrappedQueryBuilder = BoolQueryBuilder() - wrappedQueryBuilder.must(this.first().rewriteQueryBuilder(oldQuery, fieldNameMappingTypeMap, targetIndexName)) + wrappedQueryBuilder.must(this.first().rewriteQueryBuilder(oldQuery, fieldNameMappingTypeMap)) wrappedQueryBuilder.should(TermsQueryBuilder("rollup._id", this.map { it.id })) wrappedQueryBuilder.minimumShouldMatch(1) return wrappedQueryBuilder @@ -406,7 +415,6 @@ fun Rollup.populateFieldMappings(): Set { fun SearchSourceBuilder.rewriteSearchSourceBuilder( jobs: Set, fieldNameMappingTypeMap: Map, - concreteIndexName: String ): SearchSourceBuilder { val ssb = SearchSourceBuilder() // can use first() here as all jobs in the set will have a superset of the query's terms @@ -422,7 +430,7 @@ fun SearchSourceBuilder.rewriteSearchSourceBuilder( if (this.minScore() != null) ssb.minScore(this.minScore()) if (this.postFilter() != null) ssb.postFilter(this.postFilter()) ssb.profile(this.profile()) - if (this.query() != null) ssb.query(jobs.buildRollupQuery(fieldNameMappingTypeMap, this.query(), concreteIndexName)) + if (this.query() != null) ssb.query(jobs.buildRollupQuery(fieldNameMappingTypeMap, this.query())) this.rescores()?.forEach { ssb.addRescorer(it) } this.scriptFields()?.forEach { ssb.scriptField(it.fieldName(), it.script(), it.ignoreFailure()) } if (this.searchAfter() != null) ssb.searchAfter(this.searchAfter()) @@ -444,9 +452,8 @@ fun SearchSourceBuilder.rewriteSearchSourceBuilder( fun SearchSourceBuilder.rewriteSearchSourceBuilder( job: Rollup, fieldNameMappingTypeMap: Map, - concreteIndexName: String ): SearchSourceBuilder { - return this.rewriteSearchSourceBuilder(setOf(job), fieldNameMappingTypeMap, concreteIndexName) + return this.rewriteSearchSourceBuilder(setOf(job), fieldNameMappingTypeMap) } fun Rollup.getInitialDocValues(docCount: Long): MutableMap = @@ -464,3 +471,56 @@ fun parseRollup(response: GetResponse, xContentRegistry: NamedXContentRegistry = return xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, Rollup.Companion::parse) } + +/** + * QueryStringQueryParser (core's class) requires proper field mappings to parse query_string queries + * We will store only mappings for dimensions as they are only allowed to be used in rollup queries + * */ +@Suppress("ReturnCount") +suspend fun populateSourceIndexFieldMappings( + rollup: Rollup, + resolver: IndexNameExpressionResolver, + state: ClusterState, + client: Client, + log: Logger +): Rollup { + + if (!rollup.sourceIndexFieldMappings.isNullOrEmpty()) { + return rollup + } + + val concreteIndices = resolver.concreteIndexNames( + state, + IndicesOptions.strictExpandOpen(), + true, + rollup.sourceIndex + ) + if (concreteIndices.isEmpty()) { + throw OpenSearchStatusException("Unable to resolve sourceIndex: [${rollup.sourceIndex}]", RestStatus.BAD_REQUEST) + } + // Taking newest index by creation date is correct approach here, since datastreams or "datastream-like constructs" can "change" mappings + // of field over time.(for example: update mappings in index template then rollover) + val concreteIndex = IndexUtils.getNewestIndexByCreationDate(concreteIndices, state) + val getMappingsResponse: GetMappingsResponse = client.admin().indices().suspendUntil { + getMappings(GetMappingsRequest().indices(concreteIndex), it) + } + if (getMappingsResponse.mappings().isEmpty() || !getMappingsResponse.mappings().containsKey(concreteIndex)) { + throw OpenSearchStatusException("Unable to get mappings for concrete index: [$concreteIndex]", RestStatus.INTERNAL_SERVER_ERROR) + } + val rootProperties = mutableMapOf() + val mappingMetadata: MappingMetadata = getMappingsResponse.mappings()[concreteIndex]!! + // Since DATE_HISTOGRAM field is not allowed in queries, we can skip it. + rollup.dimensions.filter { it.type != Dimension.Type.DATE_HISTOGRAM }.forEach { dimension -> + val fieldProps = IndexUtils.getFieldFromMappings(dimension.sourceField, mappingMetadata.sourceAsMap()) + if (fieldProps == null) { + log.error("failed to fetch field properties for sourceIndex field [${dimension.sourceField}]") + return@forEach + } + // Flattened fields (for example: ip.addr.v4) are fine here, since MapperService can handle them. + rootProperties[dimension.sourceField] = fieldProps + } + if (rootProperties.isEmpty()) { + return rollup + } + return rollup.copy(sourceIndexFieldMappings = mapOf("_doc" to mapOf("properties" to rootProperties))) +} diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index 92084c3ad..ffec973f9 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 17 + "schema_version": 18 }, "dynamic": "strict", "properties": { @@ -1038,6 +1038,10 @@ } } } + }, + "source_index_field_mappings": { + "type": "object", + "enabled": false } } }, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt index 56a0cc0b8..6e12b4663 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementIndicesIT.kt @@ -8,8 +8,8 @@ package org.opensearch.indexmanagement import org.apache.hc.core5.http.ContentType import org.apache.hc.core5.http.io.entity.StringEntity import org.opensearch.common.settings.Settings -import org.opensearch.core.xcontent.ToXContent import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.core.xcontent.ToXContent import org.opensearch.indexmanagement.IndexManagementIndices.Companion.HISTORY_INDEX_BASE import org.opensearch.indexmanagement.IndexManagementIndices.Companion.HISTORY_WRITE_INDEX_ALIAS import org.opensearch.indexmanagement.IndexManagementIndices.Companion.indexManagementMappings @@ -213,7 +213,9 @@ class IndexManagementIndicesIT : IndexStateManagementRestTestCase() { } fun `test rollup backward compatibility with opendistro`() { - val rollup = randomRollup() + var rollup = randomRollup() + rollup = rollup.copy(sourceIndex = "bwc_index_1") + createIndex("bwc_index_1", Settings.builder().build()) val rollupJsonString = rollup.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS).string() val createRollupResponse = client().makeRequest( "PUT", "${IndexManagementPlugin.LEGACY_ROLLUP_JOBS_BASE_URI}/${rollup.id}", emptyMap(), diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index b98fbb125..7d2c3e656 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -12,17 +12,17 @@ import org.junit.Before import org.junit.rules.DisableOnDebug import org.opensearch.action.admin.cluster.node.tasks.list.ListTasksAction import org.opensearch.client.Request +import org.opensearch.client.RequestOptions import org.opensearch.client.Response +import org.opensearch.client.ResponseException import org.opensearch.client.RestClient -import org.opensearch.client.RequestOptions import org.opensearch.client.WarningsHandler -import org.opensearch.client.ResponseException import org.opensearch.common.Strings import org.opensearch.common.io.PathUtils import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.XContentType import org.opensearch.core.xcontent.DeprecationHandler import org.opensearch.core.xcontent.NamedXContentRegistry -import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN import org.opensearch.rest.RestStatus import java.io.IOException @@ -32,12 +32,10 @@ import javax.management.MBeanServerInvocationHandler import javax.management.ObjectName import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL -import kotlin.collections.ArrayList -import kotlin.collections.HashSet abstract class IndexManagementRestTestCase : ODFERestTestCase() { - val configSchemaVersion = 17 + val configSchemaVersion = 18 val historySchemaVersion = 5 // Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 8af7a035b..192a6094a 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -7,10 +7,10 @@ package org.opensearch.indexmanagement.rollup.interceptor import org.apache.hc.core5.http.ContentType import org.apache.hc.core5.http.io.entity.StringEntity -import org.junit.Assert import org.opensearch.client.ResponseException import org.opensearch.indexmanagement.common.model.dimension.DateHistogram import org.opensearch.indexmanagement.common.model.dimension.Terms +import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.rollup.RollupRestTestCase import org.opensearch.indexmanagement.rollup.model.Rollup @@ -21,6 +21,7 @@ import org.opensearch.indexmanagement.rollup.model.metric.Max import org.opensearch.indexmanagement.rollup.model.metric.Min import org.opensearch.indexmanagement.rollup.model.metric.Sum import org.opensearch.indexmanagement.rollup.model.metric.ValueCount +import org.opensearch.indexmanagement.rollup.toJsonString import org.opensearch.indexmanagement.waitFor import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.rest.RestStatus @@ -1334,7 +1335,7 @@ class RollupInterceptorIT : RollupRestTestCase() { } catch (e: ResponseException) { assertTrue( e.message?.contains( - "[missing terms grouping on earnings, missing terms grouping on event_ts, missing field test.vvv, missing field test.fff]" + "missing terms grouping on event_ts" ) ?: false ) } @@ -1364,7 +1365,7 @@ class RollupInterceptorIT : RollupRestTestCase() { } catch (e: ResponseException) { assertTrue( e.message?.contains( - "[missing terms grouping on earnings, missing terms grouping on event_ts, missing field test.vvv, missing field test.fff]" + "missing terms grouping on event_ts" ) ?: false ) } @@ -1419,58 +1420,11 @@ class RollupInterceptorIT : RollupRestTestCase() { } catch (e: ResponseException) { assertTrue( e.message?.contains( - "[missing terms grouping on earnings, missing terms grouping on event_ts, missing field test.vvv, missing field test.fff]" + "missing terms grouping on event_ts" ) ?: false ) } - // fallback on index settings index.query.default_field:state_ordinal - client().makeRequest( - "PUT", "$sourceIndex/_settings", - StringEntity( - """ - { - "index": { - "query": { - "default_field":"state_ordinal" - } - } - } - """.trimIndent(), - ContentType.APPLICATION_JSON - ) - ) - // - req = """ - { - "size": 0, - "query": { - "query_string": { - "query": "state:TX AND state_ext:CA AND 7" - } - - }, - "aggs": { - "earnings_total": { - "sum": { - "field": "earnings" - } - } - } - } - """.trimIndent() - rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) - assertTrue(rawRes.restStatus() == RestStatus.OK) - rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) - assertTrue(rollupRes.restStatus() == RestStatus.OK) - rawAggRes = rawRes.asMap()["aggregations"] as Map> - rollupAggRes = rollupRes.asMap()["aggregations"] as Map> - assertEquals( - "Source and rollup index did not return same min results", - rawAggRes.getValue("earnings_total")["value"], - rollupAggRes.getValue("earnings_total")["value"] - ) - // prefix pattern in "default_field" field req = """ { @@ -1847,8 +1801,224 @@ class RollupInterceptorIT : RollupRestTestCase() { refreshAllIndices() + // Term query + var req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 0", + "default_field": "state_ordinal" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + + var rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + deleteIndex(sourceIndex) + var rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + var rawAggRes = rawRes.asMap()["aggregations"] as Map> + var rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + } + + /** + * Rollup job exists without sourceIndexFieldMappings field and sourceIndex exists - we are able populate sourceIndexFieldMappings + * */ + fun `test roll up search with rollup job without sourceIndexFieldMappings populated success`() { + val sourceIndex = "source_999_rollup_search_qsq_982439" + val targetIndex = "target_rollup_qsq_search_982439331" + + createSampleIndexForQSQTest(sourceIndex) + + val rollup = Rollup( + id = "rollup_without_sourceindexfieldmapping_set", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = "source_999*", + targetIndex = targetIndex, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "event_ts", fixedInterval = "1h"), + Terms("state", "state"), + Terms("state_ext", "state_ext"), + Terms("state_ext2", "state_ext2"), + Terms("state_ordinal", "state_ordinal"), + Terms("abc test", "abc test"), + ), + metrics = listOf( + RollupMetrics( + sourceField = "earnings", targetField = "earnings", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + val rollupString = rollup.toJsonString(params = XCONTENT_WITHOUT_TYPE) + val mappingString = """{ + "_meta": { + "rollups": { + "${rollup.id}": $rollupString + } + } + + } + """.trimIndent() + val response = client() + .makeRequest( + "PUT", + "$targetIndex/_mapping", + emptyMap(), + StringEntity(mappingString, ContentType.APPLICATION_JSON) + ) + assertEquals("Unable to create a new rollup", RestStatus.OK, response.restStatus()) + + // Term query + var req = """ + { + "size": 0, + "query": { + "query_string": { + "query": "state:TX AND state_ext:CA AND 0", + "default_field": "state_ordinal" + } + + }, + "aggs": { + "earnings_total": { + "sum": { + "field": "earnings" + } + } + } + } + """.trimIndent() + + var rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + var rollupRes = client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rollupRes.restStatus() == RestStatus.OK) + var rawAggRes = rawRes.asMap()["aggregations"] as Map> + var rollupAggRes = rollupRes.asMap()["aggregations"] as Map> + assertEquals( + "Source and rollup index did not return same min results", + rawAggRes.getValue("earnings_total")["value"], + rollupAggRes.getValue("earnings_total")["value"] + ) + } + + /** + * Rollup job exists without sourceIndexFieldMappings field and sourceIndex is already deleted - expect failure + * */ + fun `test roll up search with rollup job without sourceIndexFieldMappings populated failure`() { + val sourceIndex = "hhh_source_999_rollup_search_qsq_9824392" + val targetIndex = "target_rollup_qsq_search_9824393313" + + createSampleIndexForQSQTest(sourceIndex) + + val rollup = Rollup( + id = "rollup_without_sourceindexfieldmapping_set_f", + enabled = true, + schemaVersion = 1L, + jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES), + jobLastUpdatedTime = Instant.now(), + jobEnabledTime = Instant.now(), + description = "basic search test", + sourceIndex = sourceIndex, + targetIndex = targetIndex, + metadataID = null, + roles = emptyList(), + pageSize = 10, + delay = 0, + continuous = false, + dimensions = listOf( + DateHistogram(sourceField = "event_ts", fixedInterval = "1h"), + Terms("state", "state"), + Terms("state_ext", "state_ext"), + Terms("state_ext2", "state_ext2"), + Terms("state_ordinal", "state_ordinal"), + Terms("abc test", "abc test"), + ), + metrics = listOf( + RollupMetrics( + sourceField = "earnings", targetField = "earnings", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ) + ) + ).let { createRollup(it, it.id) } + + updateRollupStartTime(rollup) + + waitFor { + val rollupJob = getRollup(rollupId = rollup.id) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } + + refreshAllIndices() + + val rollupString = rollup.toJsonString(params = XCONTENT_WITHOUT_TYPE) + val mappingString = """{ + "_meta": { + "rollups": { + "${rollup.id}": $rollupString + } + } + + } + """.trimIndent() + val response = client() + .makeRequest( + "PUT", + "$targetIndex/_mapping", + emptyMap(), + StringEntity(mappingString, ContentType.APPLICATION_JSON) + ) + assertEquals("Unable to create a new rollup", RestStatus.OK, response.restStatus()) + // Term query var req = """ { @@ -1869,11 +2039,15 @@ class RollupInterceptorIT : RollupRestTestCase() { } } """.trimIndent() + + var rawRes = client().makeRequest("POST", "/$sourceIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes.restStatus() == RestStatus.OK) + + deleteIndex(sourceIndex) try { client().makeRequest("POST", "/$targetIndex/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) - fail("Failure was expected when searching rollup index using qsq query when sourceIndex does not exist!") } catch (e: ResponseException) { - Assert.assertTrue(e.message!!.contains("Can't parse query_string query without sourceIndex mappings!")) + assertTrue(e.message!!.contains("Can't parse query_string query without sourceIndex mappings!")) } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index 168ea3a64..8d1ab7199 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -7,6 +7,7 @@ package org.opensearch.indexmanagement.rollup.runner import org.apache.hc.core5.http.ContentType import org.apache.hc.core5.http.io.entity.StringEntity +import org.junit.Assert import org.opensearch.common.settings.Settings import org.opensearch.indexmanagement.IndexManagementPlugin import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI @@ -337,27 +338,12 @@ class RollupRunnerIT : RollupRestTestCase() { ) // Create rollup job - rollup = createRollup(rollup = rollup, rollupId = rollup.id) - assertEquals(indexName, rollup.sourceIndex) - assertEquals(null, rollup.metadataID) - - // Update rollup start time to run first execution - updateRollupStartTime(rollup) - - var rollupMetadata: RollupMetadata? - // Assert on first execution - waitFor { - val rollupJob = getRollup(rollupId = rollup.id) - assertNotNull("Rollup job not found", rollupJob) - assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) - - rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) - assertNotNull("Rollup metadata not found", rollupMetadata) - assertEquals("Unexpected metadata status", RollupMetadata.Status.FAILED, rollupMetadata!!.status) - assertEquals("Unexpected failure reason", "No indices found for [${rollup.sourceIndex}]", rollupMetadata!!.failureReason) + try { + createRollup(rollup = rollup, rollupId = rollup.id) + Assert.fail("Rollup creation shouldn've failed because sourceIndex does not exists") + } catch (e: Exception) { + assertTrue(e.message!!.contains("no such index [test_index_runner_fourth]")) } - - // TODO: Call _start to retry and test recovery behavior? } fun `test metadata stats contains correct info`() { diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index 92084c3ad..ffec973f9 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 17 + "schema_version": 18 }, "dynamic": "strict", "properties": { @@ -1038,6 +1038,10 @@ } } } + }, + "source_index_field_mappings": { + "type": "object", + "enabled": false } } },