diff --git a/opensearch-observability/src/main/kotlin/org/opensearch/observability/index/ObservabilityIndex.kt b/opensearch-observability/src/main/kotlin/org/opensearch/observability/index/ObservabilityIndex.kt index 45e487acf..683781fe3 100644 --- a/opensearch-observability/src/main/kotlin/org/opensearch/observability/index/ObservabilityIndex.kt +++ b/opensearch-observability/src/main/kotlin/org/opensearch/observability/index/ObservabilityIndex.kt @@ -31,7 +31,6 @@ import org.opensearch.ResourceAlreadyExistsException import org.opensearch.ResourceNotFoundException import org.opensearch.action.DocWriteResponse import org.opensearch.action.admin.indices.create.CreateIndexRequest -import org.opensearch.action.admin.indices.delete.DeleteIndexRequest import org.opensearch.action.bulk.BulkRequest import org.opensearch.action.delete.DeleteRequest import org.opensearch.action.get.GetRequest @@ -104,7 +103,6 @@ internal object ObservabilityIndex { /** * Create index using the mapping and settings defined in resource - * If .opensearch-notebooks index exists, reindex it to .opensearch-observability index and remove it */ @Suppress("TooGenericExceptionCaught") private fun createIndex() { @@ -120,6 +118,7 @@ internal object ObservabilityIndex { val response = actionFuture.actionGet(PluginSettings.operationTimeoutMs) if (response.isAcknowledged) { log.info("$LOG_PREFIX:Index $INDEX_NAME creation Acknowledged") + reindexNotebooks() } else { throw IllegalStateException("$LOG_PREFIX:Index $INDEX_NAME creation not Acknowledged") } @@ -129,6 +128,13 @@ internal object ObservabilityIndex { } } } + } + + /** + * Reindex .opensearch-notebooks to .opensearch-observability index + */ + @Suppress("TooGenericExceptionCaught") + private fun reindexNotebooks() { if (isIndexExists(NOTEBOOKS_INDEX_NAME)) { try { log.info("$LOG_PREFIX:Index - reindex $NOTEBOOKS_INDEX_NAME to $INDEX_NAME") @@ -144,17 +150,16 @@ internal object ObservabilityIndex { throw IllegalStateException("$LOG_PREFIX:Index - reindex $NOTEBOOKS_INDEX_NAME failed with searchFailures") } else if (reindexResponse.bulkFailures.isNotEmpty()) { throw IllegalStateException("$LOG_PREFIX:Index - reindex $NOTEBOOKS_INDEX_NAME failed with bulkFailures") + } else if (reindexResponse.total != reindexResponse.created + reindexResponse.updated) { + throw IllegalStateException( + "$LOG_PREFIX:Index - reindex number of docs created:${reindexResponse.created} + " + + "updated:${reindexResponse.updated} does not equal requested:${reindexResponse.total}" + ) } - - log.info("$LOG_PREFIX:Index - ${reindexResponse.total} docs reindexed to $INDEX_NAME") - val deleteIndexRequest = DeleteIndexRequest(NOTEBOOKS_INDEX_NAME) - val actionFuture = client.admin().indices().delete(deleteIndexRequest) - val deleteIndexResponse = actionFuture.actionGet(PluginSettings.operationTimeoutMs) - if (deleteIndexResponse.isAcknowledged) { - log.info("$LOG_PREFIX:Index $INDEX_NAME deletion Acknowledged") - } else { - throw IllegalStateException("$LOG_PREFIX:Index $NOTEBOOKS_INDEX_NAME deletion not Acknowledged") - } + log.info( + "$LOG_PREFIX:Index - reindex ${reindexResponse.created} docs created " + + "and ${reindexResponse.updated} docs updated in $INDEX_NAME" + ) } catch (exception: Exception) { if (exception !is ResourceNotFoundException && exception.cause !is ResourceNotFoundException) { throw exception